pass parameter through file.

This commit is contained in:
SunZhaonan 2016-05-03 15:17:38 -07:00
parent 702b90a2d9
commit 31de21ddcf
10 changed files with 165 additions and 122 deletions

View File

@ -1,56 +0,0 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package actors;
import java.util.Enumeration;
import java.util.Properties;
import metadata.etl.Launcher;
import metadata.etl.models.EtlJobName;
/**
* Utility class for command line
*/
public class CmdUtil {
/**
* Generate a command that start a ETL job.
* @param etlJobName
* @param refId
* @param whEtlExecId
* @param props
* @return command
*/
final static String javaCmd = "java ";
public static String generateCMD(EtlJobName etlJobName, int refId, long whEtlExecId, Properties props, String cmdParam) {
StringBuilder sb = new StringBuilder();
sb.append(javaCmd);
sb.append(cmdParam).append(" ");
sb.append("-D").append(Launcher.JOB_NAME_KEY).append("=").append(etlJobName).append(" ");
sb.append("-D").append(Launcher.REF_ID_KEY).append("=").append(refId).append(" ");
sb.append("-D").append(Launcher.WH_ETL_EXEC_ID_KEY).append("=").append(whEtlExecId).append(" ");
Enumeration e = props.propertyNames();
while (e.hasMoreElements()) {
String key = (String)e.nextElement();
String value = props.getProperty(key);
sb.append("-D").append(key).append("=").append(value).append(" ");
}
String classPath = System.getProperty("java.class.path");
sb.append("-cp").append(" '").append(classPath).append("' ");
sb.append("metadata.etl.Launcher");
return sb.toString();
}
}

View File

@ -0,0 +1,84 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package actors;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.Properties;
import metadata.etl.Launcher;
import metadata.etl.models.EtlJobName;
import wherehows.common.Constant;
/**
* Utility class for generate cmd, write and delete config files.
*/
public class ConfigUtil {
final static String javaCmd = "java";
/**
* Generate the config file in the 'wherehows.app_folder' folder
* The file name is {whEtlExecId}.config
* @param etlJobName
* @param refId
* @param whEtlExecId
* @param props
* @return void
*/
public static void generateProperties(EtlJobName etlJobName, int refId, long whEtlExecId, Properties props)
throws IOException {
props.setProperty(Launcher.JOB_NAME_KEY, etlJobName.name());
props.setProperty(Launcher.REF_ID_KEY, String.valueOf(refId));
props.setProperty(Launcher.WH_ETL_EXEC_ID_KEY, String.valueOf(whEtlExecId));
String dirName = props.getProperty(Constant.WH_APP_FOLDER_KEY) + "/exec";
File dir = new File(dirName);
if (!dir.exists()) {
dir.mkdir();
}
File configFile = new File(dirName, whEtlExecId + ".properties");
FileWriter writer = new FileWriter(configFile);
props.store(writer, "exec id : " + whEtlExecId + " job configurations");
writer.close();
}
public static void deletePropertiesFile(Properties props, long whEtlExecId) {
String dirName = props.getProperty(Constant.WH_APP_FOLDER_KEY) + "/exec";
File configFile = new File(dirName, whEtlExecId + ".properties");
if (configFile.exists()) {
configFile.delete();
}
}
public static String generateCMD(long whEtlExecId, String cmdParam) {
StringBuilder sb = new StringBuilder();
sb.append(javaCmd);
sb.append(cmdParam).append(" ");
String classPath = System.getProperty("java.class.path");
sb.append("-cp").append(" '").append(classPath).append("' ");
String dirName = "/var/tmp/wherehows/exec";
sb.append("-Dconfig=").append(dirName + "/" + whEtlExecId).append(".properties ");
sb.append("metadata.etl.Launcher");
return sb.toString();
}
}

View File

@ -17,8 +17,6 @@ import akka.actor.UntypedActor;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.InputStream; import java.io.InputStream;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.util.Properties; import java.util.Properties;
import metadata.etl.models.EtlJobStatus; import metadata.etl.models.EtlJobStatus;
@ -38,22 +36,22 @@ public class EtlJobActor extends UntypedActor {
@Override @Override
public void onReceive(Object message) public void onReceive(Object message)
throws Exception { throws Exception {
Properties props = null;
if (message instanceof EtlJobMessage) { if (message instanceof EtlJobMessage) {
EtlJobMessage msg = (EtlJobMessage) message; EtlJobMessage msg = (EtlJobMessage) message;
try { try {
Properties props = EtlJobPropertyDao.getJobProperties(msg.getEtlJobName(), msg.getRefId()); props = EtlJobPropertyDao.getJobProperties(msg.getEtlJobName(), msg.getRefId());
Properties whProps = EtlJobPropertyDao.getWherehowsProperties(); Properties whProps = EtlJobPropertyDao.getWherehowsProperties();
props.putAll(whProps); props.putAll(whProps);
EtlJobDao.startRun(msg.getWhEtlExecId(), "Job started!"); EtlJobDao.startRun(msg.getWhEtlExecId(), "Job started!");
// start a new process here // start a new process here
String cmd = CmdUtil.generateCMD(msg.getEtlJobName(), msg.getRefId(), msg.getWhEtlExecId(), props, msg.getCmdParam()); String cmd = ConfigUtil.generateCMD(msg.getWhEtlExecId(), msg.getCmdParam());
// Logger.debug("run command : " + cmd); Logger.debug("run command : " + cmd);
ConfigUtil
.generateProperties(msg.getEtlJobName(), msg.getRefId(), msg.getWhEtlExecId(), props);
process = Runtime.getRuntime().exec(cmd); process = Runtime.getRuntime().exec(cmd);
InputStream stdout = process.getInputStream(); InputStream stdout = process.getInputStream();
InputStreamReader isr = new InputStreamReader(stdout); InputStreamReader isr = new InputStreamReader(stdout);
BufferedReader br = new BufferedReader(isr); BufferedReader br = new BufferedReader(isr);
@ -95,6 +93,8 @@ public class EtlJobActor extends UntypedActor {
EtlJobDao.endRun(msg.getWhEtlExecId(), EtlJobStatus.ERROR, e.getMessage()); EtlJobDao.endRun(msg.getWhEtlExecId(), EtlJobStatus.ERROR, e.getMessage());
} finally { } finally {
Global.removeRunningJob(((EtlJobMessage) message).getWhEtlJobId()); Global.removeRunningJob(((EtlJobMessage) message).getWhEtlJobId());
if (!Logger.isDebugEnabled()) // if debug enable, won't delete the config files.
ConfigUtil.deletePropertiesFile(props, msg.getWhEtlExecId());
} }
} }
} }

View File

@ -1,27 +0,0 @@
import java.util.Properties;
import metadata.etl.models.EtlJobName;
import org.testng.Assert;
import org.testng.annotations.Test;
import actors.CmdUtil;
public class CmdUtilTest {
@Test
public void testgenerateCMD(){
EtlJobName etlJobName = EtlJobName.valueOf("AZKABAN_EXECUTION_METADATA_ETL");
Properties prop = new Properties();
prop.put("p1", "v1");
prop.put("p2", "v2");
prop.put("p3", "v3");
String cmd = CmdUtil.generateCMD(etlJobName, 0, 0L, prop);
// class path is dynamic, can't predefine
Assert.assertTrue(
cmd.startsWith("java -Djob=AZKABAN_EXECUTION_METADATA_ETL -DrefId=0 -DwhEtlId=0 -Dp3=v3 -Dp2=v2 -Dp1=v1 -cp"));
}
}

View File

@ -0,0 +1,39 @@
import java.io.File;
import java.io.IOException;
import java.util.Properties;
import metadata.etl.models.EtlJobName;
import org.testng.Assert;
import org.testng.annotations.Test;
import actors.ConfigUtil;
import wherehows.common.Constant;
public class ConfigUtilTest {
@Test
public void testgenerateCMD(){
EtlJobName etlJobName = EtlJobName.valueOf("AZKABAN_EXECUTION_METADATA_ETL");
Properties prop = new Properties();
prop.put("p1", "v1");
prop.put("p2", "v2");
prop.put("p3", "v3");
prop.put(Constant.WH_APP_FOLDER_KEY, "/var/tmp/wherehows");
String cmd = ConfigUtil.generateCMD(0L, "");
Assert.assertTrue(cmd.startsWith("java -cp "));
Assert.assertTrue(cmd.endsWith(" -Dconfig=/var/tmp/wherehows/exec/0.properties metadata.etl.Launcher"));
File configFile = new File("/var/tmp/wherehows/exec", "0.properties");
Assert.assertTrue(!configFile.exists());
try {
ConfigUtil.generateProperties(etlJobName, 0, 0L, prop);
Assert.assertTrue(configFile.exists());
ConfigUtil.deletePropertiesFile(prop, 0L);
Assert.assertTrue(!configFile.exists());
} catch (IOException e) {
e.printStackTrace();
}
}
}

View File

@ -24,7 +24,7 @@ CREATE TABLE IF NOT EXISTS `stg_job_execution_data_lineage` (
`db_id` SMALLINT(5) UNSIGNED DEFAULT NULL, `db_id` SMALLINT(5) UNSIGNED DEFAULT NULL,
`abstracted_object_name` VARCHAR(255) DEFAULT NULL, `abstracted_object_name` VARCHAR(255) DEFAULT NULL,
`full_object_name` VARCHAR(255) DEFAULT NULL, `full_object_name` VARCHAR(1000) DEFAULT NULL,
`partition_start` VARCHAR(50) DEFAULT NULL, `partition_start` VARCHAR(50) DEFAULT NULL,
`partition_end` VARCHAR(50) DEFAULT NULL, `partition_end` VARCHAR(50) DEFAULT NULL,
`partition_type` VARCHAR(20) DEFAULT NULL, `partition_type` VARCHAR(20) DEFAULT NULL,
@ -61,7 +61,7 @@ CREATE TABLE IF NOT EXISTS `job_execution_data_lineage` (
`db_id` SMALLINT(5) UNSIGNED DEFAULT NULL, `db_id` SMALLINT(5) UNSIGNED DEFAULT NULL,
`abstracted_object_name` VARCHAR(255) NOT NULL, `abstracted_object_name` VARCHAR(255) NOT NULL,
`full_object_name` VARCHAR(255) DEFAULT NULL, `full_object_name` VARCHAR(1000) DEFAULT NULL,
`partition_start` VARCHAR(50) DEFAULT NULL, `partition_start` VARCHAR(50) DEFAULT NULL,
`partition_end` VARCHAR(50) DEFAULT NULL, `partition_end` VARCHAR(50) DEFAULT NULL,
`partition_type` VARCHAR(20) DEFAULT NULL, `partition_type` VARCHAR(20) DEFAULT NULL,

View File

@ -31,42 +31,40 @@ import org.slf4j.LoggerFactory;
*/ */
public class Launcher { public class Launcher {
/** command line parameter keys */ /** job property parameter keys */
public static final String JOB_NAME_KEY = "job"; public static final String JOB_NAME_KEY = "job";
public static final String REF_ID_KEY = "refId"; public static final String REF_ID_KEY = "refId";
public static final String WH_ETL_EXEC_ID_KEY = "whEtlId"; public static final String WH_ETL_EXEC_ID_KEY = "whEtlId";
/** Only for test */ /** command line config file location parameter key */
private static final String CONFIG_FILE_LOCATION_KEY = "config"; private static final String CONFIG_FILE_LOCATION_KEY = "config";
protected static final Logger logger = LoggerFactory.getLogger("Job Launcher"); protected static final Logger logger = LoggerFactory.getLogger("Job Launcher");
/** /**
* It can run as a standalone application * Read config file location from command line. Read all configuration from command line, execute the job.
* @param args * Example command line : java -Dconfig=/path/to/config/file -cp "lib/*" metadata.etl.Launcher
* @param args contain the config file location parameter 'confg'
* @throws Exception * @throws Exception
*/ */
public static void main(String[] args) public static void main(String[] args)
throws Exception { throws Exception {
String etlJobNameString = System.getProperty(JOB_NAME_KEY);
String property_file = System.getProperty(CONFIG_FILE_LOCATION_KEY, null); String property_file = System.getProperty(CONFIG_FILE_LOCATION_KEY, null);
String etlJobNameString = null;
int refId = 0;
long whEtlId = 0;
Properties props = new Properties(); Properties props = new Properties();
int refId = Integer.valueOf(System.getProperty(REF_ID_KEY, "0"));
long whEtlId = Integer.valueOf(System.getProperty(WH_ETL_EXEC_ID_KEY, "0"));
try (InputStream propFile = new FileInputStream(property_file)) {
props.load(propFile);
etlJobNameString = props.getProperty(JOB_NAME_KEY);
refId = Integer.valueOf(props.getProperty(REF_ID_KEY));
} catch (IOException e) {
if (property_file != null) { // test mode //logger.error("property file '{}' not found" , property_file);
try (InputStream propFile = new FileInputStream(property_file)) { e.printStackTrace();
props.load(propFile); System.exit(1);
} catch (IOException e) {
//logger.error("property file '{}' not found" , property_file);
e.printStackTrace();
}
}
else { // production mode
Properties properties = System.getProperties();
props.putAll(properties);
} }
// create the etl job // create the etl job
@ -76,7 +74,6 @@ public class Launcher {
try { try {
etlJob.run(); etlJob.run();
} catch (Exception e) { } catch (Exception e) {
StringWriter sw = new StringWriter(); StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw); PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw); e.printStackTrace(pw);

View File

@ -107,6 +107,10 @@ public class AzLineageMetadataEtl extends EtlJob {
// insert into the final table // insert into the final table
// TODO: need to be insert on duplicate update, so the running flows can be updated // TODO: need to be insert on duplicate update, so the running flows can be updated
String insertIntoFinalTable = "INSERT IGNORE INTO job_execution_data_lineage\n" String insertIntoFinalTable = "INSERT IGNORE INTO job_execution_data_lineage\n"
+ "( app_id, flow_exec_id, job_exec_id, job_exec_uuid, job_name, job_start_unixtime, job_finished_unixtime,\n"
+ "db_id, abstracted_object_name, full_object_name, partition_start, partition_end, partition_type,\n"
+ "layout_id, storage_type, source_target_type, srl_no, source_srl_no, operation,\n"
+ "record_count, insert_count, delete_count, update_count, flow_path, created_date, wh_etl_exec_id)"
+ "SELECT app_id, flow_exec_id, job_exec_id, job_exec_uuid, job_name, job_start_unixtime, job_finished_unixtime,\n" + "SELECT app_id, flow_exec_id, job_exec_id, job_exec_uuid, job_name, job_start_unixtime, job_finished_unixtime,\n"
+ "db_id, abstracted_object_name, full_object_name, partition_start, partition_end, partition_type,\n" + "db_id, abstracted_object_name, full_object_name, partition_start, partition_end, partition_type,\n"
+ "layout_id, storage_type, source_target_type, srl_no, source_srl_no, operation,\n" + "layout_id, storage_type, source_target_type, srl_no, source_srl_no, operation,\n"

View File

@ -19,7 +19,7 @@ from javax.naming.directory import SearchControls
from javax.naming.directory import BasicAttributes from javax.naming.directory import BasicAttributes
from wherehows.common import Constant from wherehows.common import Constant
import csv, re, os, sys import csv, re, os, sys, json
from java.util import Hashtable from java.util import Hashtable
from java.io import FileWriter from java.io import FileWriter
@ -43,9 +43,6 @@ class LdapExtract:
self.group_map = dict() self.group_map = dict()
self.group_flatten_map = dict() self.group_flatten_map = dict()
def split_property(self, property_value):
return re.split('\s*\'\s*,\s*\'\s*', property_value.strip('\' \t\n\r\f\v'))
def fetch_ldap_user(self, file): def fetch_ldap_user(self, file):
""" """
fetch ldap user from ldap server fetch ldap user from ldap server
@ -67,7 +64,7 @@ class LdapExtract:
search_target = '(objectClass=person)' search_target = '(objectClass=person)'
return_attributes_standard = ['user_id', 'distinct_name', 'name', 'display_name', 'title', 'employee_number', return_attributes_standard = ['user_id', 'distinct_name', 'name', 'display_name', 'title', 'employee_number',
'manager', 'mail', 'department_number', 'department', 'start_date', 'mobile'] 'manager', 'mail', 'department_number', 'department', 'start_date', 'mobile']
return_attributes_actual = self.split_property(self.args[Constant.LDAP_SEARCH_RETURN_ATTRS_KEY]) return_attributes_actual = json.loads(self.args[Constant.LDAP_SEARCH_RETURN_ATTRS_KEY])
return_attributes_map = dict(zip(return_attributes_standard, return_attributes_actual)) return_attributes_map = dict(zip(return_attributes_standard, return_attributes_actual))
ctls = SearchControls() ctls = SearchControls()
@ -75,8 +72,8 @@ class LdapExtract:
ctls.setSearchScope(SearchControls.SUBTREE_SCOPE) ctls.setSearchScope(SearchControls.SUBTREE_SCOPE)
ldap_records = [] ldap_records = []
# domain format should look like : 'OU=domain1','OU=domain2','OU=domain3,OU=subdomain3' # domain format should look like : ['OU=domain1','OU=domain2','OU=domain3,OU=subdomain3']
org_units = self.split_property(self.args[Constant.LDAP_SEARCH_DOMAINS_KEY]) org_units = json.loads(self.args[Constant.LDAP_SEARCH_DOMAINS_KEY])
for search_unit in org_units: for search_unit in org_units:
search_result = ctx.search(search_unit, search_target, ctls) search_result = ctx.search(search_unit, search_target, ctls)
@ -126,14 +123,14 @@ class LdapExtract:
ctx = InitialDirContext(settings) ctx = InitialDirContext(settings)
search_target = "(objectClass=posixGroup)" search_target = "(objectClass=posixGroup)"
return_attributes_standard = ['group_id', 'member_ids'] return_attributes_standard = ['group_id', 'member_ids']
return_attributes_actual = self.split_property(self.args[Constant.LDAP_GROUP_SEARCH_RETURN_ATTRS_KEY]) return_attributes_actual = json.loads(self.args[Constant.LDAP_GROUP_SEARCH_RETURN_ATTRS_KEY])
return_attributes_map = dict(zip(return_attributes_standard, return_attributes_actual)) return_attributes_map = dict(zip(return_attributes_standard, return_attributes_actual))
ctls = SearchControls() ctls = SearchControls()
ctls.setReturningAttributes(return_attributes_actual) ctls.setReturningAttributes(return_attributes_actual)
ctls.setSearchScope(SearchControls.SUBTREE_SCOPE) ctls.setSearchScope(SearchControls.SUBTREE_SCOPE)
ldap_records = [] ldap_records = []
org_units = self.split_property(self.args[Constant.LDAP_GROUP_SEARCH_DOMAINS_KEY]) org_units = json.loads(self.args[Constant.LDAP_GROUP_SEARCH_DOMAINS_KEY])
for search_unit in org_units: for search_unit in org_units:
results = ctx.search(search_unit, search_target, ctls) results = ctx.search(search_unit, search_target, ctls)
for r in results: for r in results:

View File

@ -12,6 +12,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# #
# Job identify info
whEtlId=
job=
refId=
# Teradata properties # Teradata properties
teradata.databases= teradata.databases=
teradata.db.driver= teradata.db.driver=