diff --git a/backend-service/app/actors/CmdUtil.java b/backend-service/app/actors/CmdUtil.java deleted file mode 100644 index 05b219c729..0000000000 --- a/backend-service/app/actors/CmdUtil.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Copyright 2015 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ -package actors; - -import java.util.Enumeration; -import java.util.Properties; -import metadata.etl.Launcher; -import metadata.etl.models.EtlJobName; - - -/** - * Utility class for command line - */ -public class CmdUtil { - /** - * Generate a command that start a ETL job. - * @param etlJobName - * @param refId - * @param whEtlExecId - * @param props - * @return command - */ - final static String javaCmd = "java "; - public static String generateCMD(EtlJobName etlJobName, int refId, long whEtlExecId, Properties props, String cmdParam) { - StringBuilder sb = new StringBuilder(); - sb.append(javaCmd); - sb.append(cmdParam).append(" "); - sb.append("-D").append(Launcher.JOB_NAME_KEY).append("=").append(etlJobName).append(" "); - sb.append("-D").append(Launcher.REF_ID_KEY).append("=").append(refId).append(" "); - sb.append("-D").append(Launcher.WH_ETL_EXEC_ID_KEY).append("=").append(whEtlExecId).append(" "); - - Enumeration e = props.propertyNames(); - while (e.hasMoreElements()) { - String key = (String)e.nextElement(); - String value = props.getProperty(key); - sb.append("-D").append(key).append("=").append(value).append(" "); - } - - String classPath = System.getProperty("java.class.path"); - sb.append("-cp").append(" '").append(classPath).append("' "); - sb.append("metadata.etl.Launcher"); - - return sb.toString(); - } -} diff --git a/backend-service/app/actors/ConfigUtil.java b/backend-service/app/actors/ConfigUtil.java new file mode 100644 index 0000000000..720b28bf29 --- /dev/null +++ b/backend-service/app/actors/ConfigUtil.java @@ -0,0 +1,84 @@ +/** + * Copyright 2015 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package actors; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.List; +import java.util.Properties; +import metadata.etl.Launcher; +import metadata.etl.models.EtlJobName; +import wherehows.common.Constant; + + +/** + * Utility class for generate cmd, write and delete config files. + */ +public class ConfigUtil { + + final static String javaCmd = "java"; + + /** + * Generate the config file in the 'wherehows.app_folder' folder + * The file name is {whEtlExecId}.config + * @param etlJobName + * @param refId + * @param whEtlExecId + * @param props + * @return void + */ + public static void generateProperties(EtlJobName etlJobName, int refId, long whEtlExecId, Properties props) + throws IOException { + + props.setProperty(Launcher.JOB_NAME_KEY, etlJobName.name()); + props.setProperty(Launcher.REF_ID_KEY, String.valueOf(refId)); + props.setProperty(Launcher.WH_ETL_EXEC_ID_KEY, String.valueOf(whEtlExecId)); + + String dirName = props.getProperty(Constant.WH_APP_FOLDER_KEY) + "/exec"; + File dir = new File(dirName); + if (!dir.exists()) { + dir.mkdir(); + } + File configFile = new File(dirName, whEtlExecId + ".properties"); + FileWriter writer = new FileWriter(configFile); + props.store(writer, "exec id : " + whEtlExecId + " job configurations"); + writer.close(); + + } + + public static void deletePropertiesFile(Properties props, long whEtlExecId) { + String dirName = props.getProperty(Constant.WH_APP_FOLDER_KEY) + "/exec"; + File configFile = new File(dirName, whEtlExecId + ".properties"); + if (configFile.exists()) { + configFile.delete(); + } + } + + public static String generateCMD(long whEtlExecId, String cmdParam) { + + StringBuilder sb = new StringBuilder(); + sb.append(javaCmd); + sb.append(cmdParam).append(" "); + String classPath = System.getProperty("java.class.path"); + sb.append("-cp").append(" '").append(classPath).append("' "); + String dirName = "/var/tmp/wherehows/exec"; + sb.append("-Dconfig=").append(dirName + "/" + whEtlExecId).append(".properties "); + sb.append("metadata.etl.Launcher"); + + return sb.toString(); + } +} diff --git a/backend-service/app/actors/EtlJobActor.java b/backend-service/app/actors/EtlJobActor.java index 0466a62e13..ee32278e8e 100644 --- a/backend-service/app/actors/EtlJobActor.java +++ b/backend-service/app/actors/EtlJobActor.java @@ -17,8 +17,6 @@ import akka.actor.UntypedActor; import java.io.BufferedReader; import java.io.InputStream; import java.io.InputStreamReader; -import java.io.PrintWriter; -import java.io.StringWriter; import java.lang.reflect.Field; import java.util.Properties; import metadata.etl.models.EtlJobStatus; @@ -38,22 +36,22 @@ public class EtlJobActor extends UntypedActor { @Override public void onReceive(Object message) throws Exception { - + Properties props = null; if (message instanceof EtlJobMessage) { EtlJobMessage msg = (EtlJobMessage) message; try { - Properties props = EtlJobPropertyDao.getJobProperties(msg.getEtlJobName(), msg.getRefId()); + props = EtlJobPropertyDao.getJobProperties(msg.getEtlJobName(), msg.getRefId()); Properties whProps = EtlJobPropertyDao.getWherehowsProperties(); props.putAll(whProps); EtlJobDao.startRun(msg.getWhEtlExecId(), "Job started!"); // start a new process here - String cmd = CmdUtil.generateCMD(msg.getEtlJobName(), msg.getRefId(), msg.getWhEtlExecId(), props, msg.getCmdParam()); - // Logger.debug("run command : " + cmd); - + String cmd = ConfigUtil.generateCMD(msg.getWhEtlExecId(), msg.getCmdParam()); + Logger.debug("run command : " + cmd); + ConfigUtil + .generateProperties(msg.getEtlJobName(), msg.getRefId(), msg.getWhEtlExecId(), props); process = Runtime.getRuntime().exec(cmd); - InputStream stdout = process.getInputStream(); InputStreamReader isr = new InputStreamReader(stdout); BufferedReader br = new BufferedReader(isr); @@ -95,6 +93,8 @@ public class EtlJobActor extends UntypedActor { EtlJobDao.endRun(msg.getWhEtlExecId(), EtlJobStatus.ERROR, e.getMessage()); } finally { Global.removeRunningJob(((EtlJobMessage) message).getWhEtlJobId()); + if (!Logger.isDebugEnabled()) // if debug enable, won't delete the config files. + ConfigUtil.deletePropertiesFile(props, msg.getWhEtlExecId()); } } } diff --git a/backend-service/test/java/CmdUtilTest.java b/backend-service/test/java/CmdUtilTest.java deleted file mode 100644 index 0214a9f01d..0000000000 --- a/backend-service/test/java/CmdUtilTest.java +++ /dev/null @@ -1,27 +0,0 @@ - - -import java.util.Properties; -import metadata.etl.models.EtlJobName; -import org.testng.Assert; -import org.testng.annotations.Test; -import actors.CmdUtil; - - -public class CmdUtilTest { - - @Test - public void testgenerateCMD(){ - EtlJobName etlJobName = EtlJobName.valueOf("AZKABAN_EXECUTION_METADATA_ETL"); - Properties prop = new Properties(); - prop.put("p1", "v1"); - prop.put("p2", "v2"); - prop.put("p3", "v3"); - - String cmd = CmdUtil.generateCMD(etlJobName, 0, 0L, prop); - - // class path is dynamic, can't predefine - Assert.assertTrue( - cmd.startsWith("java -Djob=AZKABAN_EXECUTION_METADATA_ETL -DrefId=0 -DwhEtlId=0 -Dp3=v3 -Dp2=v2 -Dp1=v1 -cp")); - } - -} \ No newline at end of file diff --git a/backend-service/test/java/ConfigUtilTest.java b/backend-service/test/java/ConfigUtilTest.java new file mode 100644 index 0000000000..9df02bcd21 --- /dev/null +++ b/backend-service/test/java/ConfigUtilTest.java @@ -0,0 +1,39 @@ + + +import java.io.File; +import java.io.IOException; +import java.util.Properties; +import metadata.etl.models.EtlJobName; +import org.testng.Assert; +import org.testng.annotations.Test; +import actors.ConfigUtil; +import wherehows.common.Constant; + + +public class ConfigUtilTest { + + @Test + public void testgenerateCMD(){ + EtlJobName etlJobName = EtlJobName.valueOf("AZKABAN_EXECUTION_METADATA_ETL"); + Properties prop = new Properties(); + prop.put("p1", "v1"); + prop.put("p2", "v2"); + prop.put("p3", "v3"); + prop.put(Constant.WH_APP_FOLDER_KEY, "/var/tmp/wherehows"); + + String cmd = ConfigUtil.generateCMD(0L, ""); + Assert.assertTrue(cmd.startsWith("java -cp ")); + Assert.assertTrue(cmd.endsWith(" -Dconfig=/var/tmp/wherehows/exec/0.properties metadata.etl.Launcher")); + File configFile = new File("/var/tmp/wherehows/exec", "0.properties"); + Assert.assertTrue(!configFile.exists()); + try { + ConfigUtil.generateProperties(etlJobName, 0, 0L, prop); + Assert.assertTrue(configFile.exists()); + ConfigUtil.deletePropertiesFile(prop, 0L); + Assert.assertTrue(!configFile.exists()); + } catch (IOException e) { + e.printStackTrace(); + } + } + +} \ No newline at end of file diff --git a/data-model/DDL/ETL_DDL/lineage_metadata.sql b/data-model/DDL/ETL_DDL/lineage_metadata.sql index e60efae72f..fd6016ff73 100644 --- a/data-model/DDL/ETL_DDL/lineage_metadata.sql +++ b/data-model/DDL/ETL_DDL/lineage_metadata.sql @@ -24,7 +24,7 @@ CREATE TABLE IF NOT EXISTS `stg_job_execution_data_lineage` ( `db_id` SMALLINT(5) UNSIGNED DEFAULT NULL, `abstracted_object_name` VARCHAR(255) DEFAULT NULL, - `full_object_name` VARCHAR(255) DEFAULT NULL, + `full_object_name` VARCHAR(1000) DEFAULT NULL, `partition_start` VARCHAR(50) DEFAULT NULL, `partition_end` VARCHAR(50) DEFAULT NULL, `partition_type` VARCHAR(20) DEFAULT NULL, @@ -61,7 +61,7 @@ CREATE TABLE IF NOT EXISTS `job_execution_data_lineage` ( `db_id` SMALLINT(5) UNSIGNED DEFAULT NULL, `abstracted_object_name` VARCHAR(255) NOT NULL, - `full_object_name` VARCHAR(255) DEFAULT NULL, + `full_object_name` VARCHAR(1000) DEFAULT NULL, `partition_start` VARCHAR(50) DEFAULT NULL, `partition_end` VARCHAR(50) DEFAULT NULL, `partition_type` VARCHAR(20) DEFAULT NULL, diff --git a/metadata-etl/src/main/java/metadata/etl/Launcher.java b/metadata-etl/src/main/java/metadata/etl/Launcher.java index 577066c806..7a9efb415d 100644 --- a/metadata-etl/src/main/java/metadata/etl/Launcher.java +++ b/metadata-etl/src/main/java/metadata/etl/Launcher.java @@ -31,42 +31,40 @@ import org.slf4j.LoggerFactory; */ public class Launcher { - /** command line parameter keys */ + /** job property parameter keys */ public static final String JOB_NAME_KEY = "job"; public static final String REF_ID_KEY = "refId"; public static final String WH_ETL_EXEC_ID_KEY = "whEtlId"; - /** Only for test */ + /** command line config file location parameter key */ private static final String CONFIG_FILE_LOCATION_KEY = "config"; protected static final Logger logger = LoggerFactory.getLogger("Job Launcher"); /** - * It can run as a standalone application - * @param args + * Read config file location from command line. Read all configuration from command line, execute the job. + * Example command line : java -Dconfig=/path/to/config/file -cp "lib/*" metadata.etl.Launcher + * @param args contain the config file location parameter 'confg' * @throws Exception */ public static void main(String[] args) throws Exception { - String etlJobNameString = System.getProperty(JOB_NAME_KEY); + String property_file = System.getProperty(CONFIG_FILE_LOCATION_KEY, null); + String etlJobNameString = null; + int refId = 0; + long whEtlId = 0; Properties props = new Properties(); - int refId = Integer.valueOf(System.getProperty(REF_ID_KEY, "0")); - long whEtlId = Integer.valueOf(System.getProperty(WH_ETL_EXEC_ID_KEY, "0")); + try (InputStream propFile = new FileInputStream(property_file)) { + props.load(propFile); + etlJobNameString = props.getProperty(JOB_NAME_KEY); + refId = Integer.valueOf(props.getProperty(REF_ID_KEY)); - - if (property_file != null) { // test mode - try (InputStream propFile = new FileInputStream(property_file)) { - props.load(propFile); - } catch (IOException e) { - //logger.error("property file '{}' not found" , property_file); - e.printStackTrace(); - } - } - else { // production mode - Properties properties = System.getProperties(); - props.putAll(properties); + } catch (IOException e) { + //logger.error("property file '{}' not found" , property_file); + e.printStackTrace(); + System.exit(1); } // create the etl job @@ -76,7 +74,6 @@ public class Launcher { try { etlJob.run(); } catch (Exception e) { - StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw); e.printStackTrace(pw); diff --git a/metadata-etl/src/main/java/metadata/etl/lineage/AzLineageMetadataEtl.java b/metadata-etl/src/main/java/metadata/etl/lineage/AzLineageMetadataEtl.java index 6367bfde1a..20d8c0aacc 100644 --- a/metadata-etl/src/main/java/metadata/etl/lineage/AzLineageMetadataEtl.java +++ b/metadata-etl/src/main/java/metadata/etl/lineage/AzLineageMetadataEtl.java @@ -107,6 +107,10 @@ public class AzLineageMetadataEtl extends EtlJob { // insert into the final table // TODO: need to be insert on duplicate update, so the running flows can be updated String insertIntoFinalTable = "INSERT IGNORE INTO job_execution_data_lineage\n" + + "( app_id, flow_exec_id, job_exec_id, job_exec_uuid, job_name, job_start_unixtime, job_finished_unixtime,\n" + + "db_id, abstracted_object_name, full_object_name, partition_start, partition_end, partition_type,\n" + + "layout_id, storage_type, source_target_type, srl_no, source_srl_no, operation,\n" + + "record_count, insert_count, delete_count, update_count, flow_path, created_date, wh_etl_exec_id)" + "SELECT app_id, flow_exec_id, job_exec_id, job_exec_uuid, job_name, job_start_unixtime, job_finished_unixtime,\n" + "db_id, abstracted_object_name, full_object_name, partition_start, partition_end, partition_type,\n" + "layout_id, storage_type, source_target_type, srl_no, source_srl_no, operation,\n" diff --git a/metadata-etl/src/main/resources/jython/LdapExtract.py b/metadata-etl/src/main/resources/jython/LdapExtract.py index f3ac1e0056..ddd2f6e40c 100644 --- a/metadata-etl/src/main/resources/jython/LdapExtract.py +++ b/metadata-etl/src/main/resources/jython/LdapExtract.py @@ -19,7 +19,7 @@ from javax.naming.directory import SearchControls from javax.naming.directory import BasicAttributes from wherehows.common import Constant -import csv, re, os, sys +import csv, re, os, sys, json from java.util import Hashtable from java.io import FileWriter @@ -43,9 +43,6 @@ class LdapExtract: self.group_map = dict() self.group_flatten_map = dict() - def split_property(self, property_value): - return re.split('\s*\'\s*,\s*\'\s*', property_value.strip('\' \t\n\r\f\v')) - def fetch_ldap_user(self, file): """ fetch ldap user from ldap server @@ -67,7 +64,7 @@ class LdapExtract: search_target = '(objectClass=person)' return_attributes_standard = ['user_id', 'distinct_name', 'name', 'display_name', 'title', 'employee_number', 'manager', 'mail', 'department_number', 'department', 'start_date', 'mobile'] - return_attributes_actual = self.split_property(self.args[Constant.LDAP_SEARCH_RETURN_ATTRS_KEY]) + return_attributes_actual = json.loads(self.args[Constant.LDAP_SEARCH_RETURN_ATTRS_KEY]) return_attributes_map = dict(zip(return_attributes_standard, return_attributes_actual)) ctls = SearchControls() @@ -75,8 +72,8 @@ class LdapExtract: ctls.setSearchScope(SearchControls.SUBTREE_SCOPE) ldap_records = [] - # domain format should look like : 'OU=domain1','OU=domain2','OU=domain3,OU=subdomain3' - org_units = self.split_property(self.args[Constant.LDAP_SEARCH_DOMAINS_KEY]) + # domain format should look like : ['OU=domain1','OU=domain2','OU=domain3,OU=subdomain3'] + org_units = json.loads(self.args[Constant.LDAP_SEARCH_DOMAINS_KEY]) for search_unit in org_units: search_result = ctx.search(search_unit, search_target, ctls) @@ -126,14 +123,14 @@ class LdapExtract: ctx = InitialDirContext(settings) search_target = "(objectClass=posixGroup)" return_attributes_standard = ['group_id', 'member_ids'] - return_attributes_actual = self.split_property(self.args[Constant.LDAP_GROUP_SEARCH_RETURN_ATTRS_KEY]) + return_attributes_actual = json.loads(self.args[Constant.LDAP_GROUP_SEARCH_RETURN_ATTRS_KEY]) return_attributes_map = dict(zip(return_attributes_standard, return_attributes_actual)) ctls = SearchControls() ctls.setReturningAttributes(return_attributes_actual) ctls.setSearchScope(SearchControls.SUBTREE_SCOPE) ldap_records = [] - org_units = self.split_property(self.args[Constant.LDAP_GROUP_SEARCH_DOMAINS_KEY]) + org_units = json.loads(self.args[Constant.LDAP_GROUP_SEARCH_DOMAINS_KEY]) for search_unit in org_units: results = ctx.search(search_unit, search_target, ctls) for r in results: diff --git a/metadata-etl/src/main/resources/local_test.properties.template b/metadata-etl/src/main/resources/local_test.properties.template index 9f3ad664d1..51a847fc78 100644 --- a/metadata-etl/src/main/resources/local_test.properties.template +++ b/metadata-etl/src/main/resources/local_test.properties.template @@ -12,6 +12,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # +# Job identify info +whEtlId= +job= +refId= + # Teradata properties teradata.databases= teradata.db.driver=