Merge pull request #118 from SunZhaonan/master

Pass parameter through file.
This commit is contained in:
Zhaonan Sun 2016-05-10 13:48:29 -07:00
commit a4734ab406
10 changed files with 222 additions and 148 deletions

View File

@ -1,56 +0,0 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package actors;
import java.util.Enumeration;
import java.util.Properties;
import metadata.etl.Launcher;
import metadata.etl.models.EtlJobName;
/**
* Utility class for command line
*/
public class CmdUtil {
/**
* Generate a command that start a ETL job.
* @param etlJobName
* @param refId
* @param whEtlExecId
* @param props
* @return command
*/
final static String javaCmd = "java ";
public static String generateCMD(EtlJobName etlJobName, int refId, long whEtlExecId, Properties props, String cmdParam) {
StringBuilder sb = new StringBuilder();
sb.append(javaCmd);
sb.append(cmdParam).append(" ");
sb.append("-D").append(Launcher.JOB_NAME_KEY).append("=").append(etlJobName).append(" ");
sb.append("-D").append(Launcher.REF_ID_KEY).append("=").append(refId).append(" ");
sb.append("-D").append(Launcher.WH_ETL_EXEC_ID_KEY).append("=").append(whEtlExecId).append(" ");
Enumeration e = props.propertyNames();
while (e.hasMoreElements()) {
String key = (String)e.nextElement();
String value = props.getProperty(key);
sb.append("-D").append(key).append("=").append(value).append(" ");
}
String classPath = System.getProperty("java.class.path");
sb.append("-cp").append(" '").append(classPath).append("' ");
sb.append("metadata.etl.Launcher");
return sb.toString();
}
}

View File

@ -0,0 +1,84 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package actors;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.Properties;
import metadata.etl.Launcher;
import metadata.etl.models.EtlJobName;
import wherehows.common.Constant;
/**
* Utility class for generate cmd, write and delete config files.
*/
public class ConfigUtil {
final static String javaCmd = "java";
/**
* Generate the config file in the 'wherehows.app_folder' folder
* The file name is {whEtlExecId}.config
* @param etlJobName
* @param refId
* @param whEtlExecId
* @param props
* @return void
*/
public static void generateProperties(EtlJobName etlJobName, int refId, long whEtlExecId, Properties props)
throws IOException {
props.setProperty(Launcher.JOB_NAME_KEY, etlJobName.name());
props.setProperty(Launcher.REF_ID_KEY, String.valueOf(refId));
props.setProperty(Launcher.WH_ETL_EXEC_ID_KEY, String.valueOf(whEtlExecId));
String dirName = props.getProperty(Constant.WH_APP_FOLDER_KEY) + "/exec";
File dir = new File(dirName);
if (!dir.exists()) {
dir.mkdir();
}
File configFile = new File(dirName, whEtlExecId + ".properties");
FileWriter writer = new FileWriter(configFile);
props.store(writer, "exec id : " + whEtlExecId + " job configurations");
writer.close();
}
public static void deletePropertiesFile(Properties props, long whEtlExecId) {
String dirName = props.getProperty(Constant.WH_APP_FOLDER_KEY) + "/exec";
File configFile = new File(dirName, whEtlExecId + ".properties");
if (configFile.exists()) {
configFile.delete();
}
}
public static String generateCMD(long whEtlExecId, String cmdParam) {
StringBuilder sb = new StringBuilder();
sb.append(javaCmd);
sb.append(cmdParam).append(" ");
String classPath = System.getProperty("java.class.path");
sb.append("-cp").append(" '").append(classPath).append("' ");
String dirName = "/var/tmp/wherehows/exec";
sb.append("-Dconfig=").append(dirName + "/" + whEtlExecId).append(".properties ");
sb.append("metadata.etl.Launcher");
return sb.toString();
}
}

View File

@ -17,8 +17,6 @@ import akka.actor.UntypedActor;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.reflect.Field;
import java.util.Properties;
import metadata.etl.models.EtlJobStatus;
@ -38,22 +36,22 @@ public class EtlJobActor extends UntypedActor {
@Override
public void onReceive(Object message)
throws Exception {
Properties props = null;
if (message instanceof EtlJobMessage) {
EtlJobMessage msg = (EtlJobMessage) message;
try {
Properties props = EtlJobPropertyDao.getJobProperties(msg.getEtlJobName(), msg.getRefId());
props = EtlJobPropertyDao.getJobProperties(msg.getEtlJobName(), msg.getRefId());
Properties whProps = EtlJobPropertyDao.getWherehowsProperties();
props.putAll(whProps);
EtlJobDao.startRun(msg.getWhEtlExecId(), "Job started!");
// start a new process here
String cmd = CmdUtil.generateCMD(msg.getEtlJobName(), msg.getRefId(), msg.getWhEtlExecId(), props, msg.getCmdParam());
// Logger.debug("run command : " + cmd);
String cmd = ConfigUtil.generateCMD(msg.getWhEtlExecId(), msg.getCmdParam());
Logger.debug("run command : " + cmd);
ConfigUtil
.generateProperties(msg.getEtlJobName(), msg.getRefId(), msg.getWhEtlExecId(), props);
process = Runtime.getRuntime().exec(cmd);
InputStream stdout = process.getInputStream();
InputStreamReader isr = new InputStreamReader(stdout);
BufferedReader br = new BufferedReader(isr);
@ -95,6 +93,8 @@ public class EtlJobActor extends UntypedActor {
EtlJobDao.endRun(msg.getWhEtlExecId(), EtlJobStatus.ERROR, e.getMessage());
} finally {
Global.removeRunningJob(((EtlJobMessage) message).getWhEtlJobId());
if (!Logger.isDebugEnabled()) // if debug enable, won't delete the config files.
ConfigUtil.deletePropertiesFile(props, msg.getWhEtlExecId());
}
}
}

View File

@ -1,27 +0,0 @@
import java.util.Properties;
import metadata.etl.models.EtlJobName;
import org.testng.Assert;
import org.testng.annotations.Test;
import actors.CmdUtil;
public class CmdUtilTest {
@Test
public void testgenerateCMD(){
EtlJobName etlJobName = EtlJobName.valueOf("AZKABAN_EXECUTION_METADATA_ETL");
Properties prop = new Properties();
prop.put("p1", "v1");
prop.put("p2", "v2");
prop.put("p3", "v3");
String cmd = CmdUtil.generateCMD(etlJobName, 0, 0L, prop);
// class path is dynamic, can't predefine
Assert.assertTrue(
cmd.startsWith("java -Djob=AZKABAN_EXECUTION_METADATA_ETL -DrefId=0 -DwhEtlId=0 -Dp3=v3 -Dp2=v2 -Dp1=v1 -cp"));
}
}

View File

@ -0,0 +1,39 @@
import java.io.File;
import java.io.IOException;
import java.util.Properties;
import metadata.etl.models.EtlJobName;
import org.testng.Assert;
import org.testng.annotations.Test;
import actors.ConfigUtil;
import wherehows.common.Constant;
public class ConfigUtilTest {
@Test
public void testgenerateCMD(){
EtlJobName etlJobName = EtlJobName.valueOf("AZKABAN_EXECUTION_METADATA_ETL");
Properties prop = new Properties();
prop.put("p1", "v1");
prop.put("p2", "v2");
prop.put("p3", "v3");
prop.put(Constant.WH_APP_FOLDER_KEY, "/var/tmp/wherehows");
String cmd = ConfigUtil.generateCMD(0L, "");
Assert.assertTrue(cmd.startsWith("java -cp "));
Assert.assertTrue(cmd.endsWith(" -Dconfig=/var/tmp/wherehows/exec/0.properties metadata.etl.Launcher"));
File configFile = new File("/var/tmp/wherehows/exec", "0.properties");
Assert.assertTrue(!configFile.exists());
try {
ConfigUtil.generateProperties(etlJobName, 0, 0L, prop);
Assert.assertTrue(configFile.exists());
ConfigUtil.deletePropertiesFile(prop, 0L);
Assert.assertTrue(!configFile.exists());
} catch (IOException e) {
e.printStackTrace();
}
}
}

View File

@ -24,7 +24,7 @@ CREATE TABLE IF NOT EXISTS `stg_job_execution_data_lineage` (
`db_id` SMALLINT(5) UNSIGNED DEFAULT NULL,
`abstracted_object_name` VARCHAR(255) DEFAULT NULL,
`full_object_name` VARCHAR(255) DEFAULT NULL,
`full_object_name` VARCHAR(1000) DEFAULT NULL,
`partition_start` VARCHAR(50) DEFAULT NULL,
`partition_end` VARCHAR(50) DEFAULT NULL,
`partition_type` VARCHAR(20) DEFAULT NULL,
@ -61,7 +61,7 @@ CREATE TABLE IF NOT EXISTS `job_execution_data_lineage` (
`db_id` SMALLINT(5) UNSIGNED DEFAULT NULL,
`abstracted_object_name` VARCHAR(255) NOT NULL,
`full_object_name` VARCHAR(255) DEFAULT NULL,
`full_object_name` VARCHAR(1000) DEFAULT NULL,
`partition_start` VARCHAR(50) DEFAULT NULL,
`partition_end` VARCHAR(50) DEFAULT NULL,
`partition_type` VARCHAR(20) DEFAULT NULL,

View File

@ -31,42 +31,40 @@ import org.slf4j.LoggerFactory;
*/
public class Launcher {
/** command line parameter keys */
/** job property parameter keys */
public static final String JOB_NAME_KEY = "job";
public static final String REF_ID_KEY = "refId";
public static final String WH_ETL_EXEC_ID_KEY = "whEtlId";
/** Only for test */
/** command line config file location parameter key */
private static final String CONFIG_FILE_LOCATION_KEY = "config";
protected static final Logger logger = LoggerFactory.getLogger("Job Launcher");
/**
* It can run as a standalone application
* @param args
* Read config file location from command line. Read all configuration from command line, execute the job.
* Example command line : java -Dconfig=/path/to/config/file -cp "lib/*" metadata.etl.Launcher
* @param args contain the config file location parameter 'confg'
* @throws Exception
*/
public static void main(String[] args)
throws Exception {
String etlJobNameString = System.getProperty(JOB_NAME_KEY);
String property_file = System.getProperty(CONFIG_FILE_LOCATION_KEY, null);
String etlJobNameString = null;
int refId = 0;
long whEtlId = 0;
Properties props = new Properties();
int refId = Integer.valueOf(System.getProperty(REF_ID_KEY, "0"));
long whEtlId = Integer.valueOf(System.getProperty(WH_ETL_EXEC_ID_KEY, "0"));
try (InputStream propFile = new FileInputStream(property_file)) {
props.load(propFile);
etlJobNameString = props.getProperty(JOB_NAME_KEY);
refId = Integer.valueOf(props.getProperty(REF_ID_KEY));
if (property_file != null) { // test mode
try (InputStream propFile = new FileInputStream(property_file)) {
props.load(propFile);
} catch (IOException e) {
//logger.error("property file '{}' not found" , property_file);
e.printStackTrace();
}
}
else { // production mode
Properties properties = System.getProperties();
props.putAll(properties);
} catch (IOException e) {
//logger.error("property file '{}' not found" , property_file);
e.printStackTrace();
System.exit(1);
}
// create the etl job
@ -76,7 +74,6 @@ public class Launcher {
try {
etlJob.run();
} catch (Exception e) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw);

View File

@ -107,6 +107,10 @@ public class AzLineageMetadataEtl extends EtlJob {
// insert into the final table
// TODO: need to be insert on duplicate update, so the running flows can be updated
String insertIntoFinalTable = "INSERT IGNORE INTO job_execution_data_lineage\n"
+ "( app_id, flow_exec_id, job_exec_id, job_exec_uuid, job_name, job_start_unixtime, job_finished_unixtime,\n"
+ "db_id, abstracted_object_name, full_object_name, partition_start, partition_end, partition_type,\n"
+ "layout_id, storage_type, source_target_type, srl_no, source_srl_no, operation,\n"
+ "record_count, insert_count, delete_count, update_count, flow_path, created_date, wh_etl_exec_id)"
+ "SELECT app_id, flow_exec_id, job_exec_id, job_exec_uuid, job_name, job_start_unixtime, job_finished_unixtime,\n"
+ "db_id, abstracted_object_name, full_object_name, partition_start, partition_end, partition_type,\n"
+ "layout_id, storage_type, source_target_type, srl_no, source_srl_no, operation,\n"

View File

@ -14,13 +14,18 @@
from org.slf4j import LoggerFactory
from javax.naming.directory import InitialDirContext
from javax.naming.ldap import InitialLdapContext
from javax.naming import Context
from javax.naming.directory import SearchControls
from javax.naming.directory import BasicAttributes
from javax.naming.ldap import Control
from javax.naming.ldap import PagedResultsControl
from javax.naming.ldap import PagedResultsResponseControl
from wherehows.common import Constant
import csv, re, os, sys
import csv, re, os, sys, json
from java.util import Hashtable
from jarray import zeros, array
from java.io import FileWriter
@ -43,9 +48,6 @@ class LdapExtract:
self.group_map = dict()
self.group_flatten_map = dict()
def split_property(self, property_value):
return re.split('\s*\'\s*,\s*\'\s*', property_value.strip('\' \t\n\r\f\v'))
def fetch_ldap_user(self, file):
"""
fetch ldap user from ldap server
@ -59,15 +61,21 @@ class LdapExtract:
settings.put(Context.SECURITY_PRINCIPAL, self.args[Constant.LDAP_CONTEXT_SECURITY_PRINCIPAL_KEY])
settings.put(Context.SECURITY_CREDENTIALS, self.args[Constant.LDAP_CONTEXT_SECURITY_CREDENTIALS_KEY])
# page the result, each page have fix number of records
pageSize = 5000
pageControl = PagedResultsControl(pageSize, Control.NONCRITICAL)
c_array = array([pageControl], Control)
# Connect to LDAP Server
ctx = InitialDirContext(settings)
ctx = InitialLdapContext(settings, None)
ctx.setRequestControls(c_array);
# load the java Hashtable out of the ldap server
# Query starting point and query target
search_target = '(objectClass=person)'
return_attributes_standard = ['user_id', 'distinct_name', 'name', 'display_name', 'title', 'employee_number',
'manager', 'mail', 'department_number', 'department', 'start_date', 'mobile']
return_attributes_actual = self.split_property(self.args[Constant.LDAP_SEARCH_RETURN_ATTRS_KEY])
return_attributes_actual = json.loads(self.args[Constant.LDAP_SEARCH_RETURN_ATTRS_KEY])
return_attributes_map = dict(zip(return_attributes_standard, return_attributes_actual))
ctls = SearchControls()
@ -75,37 +83,57 @@ class LdapExtract:
ctls.setSearchScope(SearchControls.SUBTREE_SCOPE)
ldap_records = []
# domain format should look like : 'OU=domain1','OU=domain2','OU=domain3,OU=subdomain3'
org_units = self.split_property(self.args[Constant.LDAP_SEARCH_DOMAINS_KEY])
# domain format should look like : ['OU=domain1','OU=domain2','OU=domain3,OU=subdomain3']
org_units = json.loads(self.args[Constant.LDAP_SEARCH_DOMAINS_KEY])
cookie = None
for search_unit in org_units:
search_result = ctx.search(search_unit, search_target, ctls)
# print search_return_attributes
for person in search_result:
ldap_user_tuple = [self.app_id]
if search_unit == self.args[Constant.LDAP_INACTIVE_DOMAIN_KEY]:
ldap_user_tuple.append('N')
else:
ldap_user_tuple.append('Y')
person_attributes = person.getAttributes()
user_id = person_attributes.get(return_attributes_map['user_id'])
user_id = re.sub(r"\r|\n", '', user_id.get(0)).strip().encode('utf8')
self.ldap_user.add(user_id)
for attr_name in return_attributes_actual:
attr = person_attributes.get(attr_name)
if attr:
attr = re.sub(r"\r|\n", '', attr.get(0)).strip().encode('utf8')
# special fix for start_date
if attr_name == return_attributes_map['start_date'] and len(attr) == 4:
attr += '0101'
ldap_user_tuple.append(attr)
# pagination
while True:
# do the search
search_result = ctx.search(search_unit, search_target, ctls)
for person in search_result:
ldap_user_tuple = [self.app_id]
if search_unit == self.args[Constant.LDAP_INACTIVE_DOMAIN_KEY]:
ldap_user_tuple.append('N')
else:
ldap_user_tuple.append("")
ldap_user_tuple.append('Y')
person_attributes = person.getAttributes()
user_id = person_attributes.get(return_attributes_map['user_id'])
user_id = re.sub(r"\r|\n", '', user_id.get(0)).strip().encode('utf8')
self.ldap_user.add(user_id)
ldap_user_tuple.append(self.wh_exec_id)
ldap_records.append(ldap_user_tuple)
for attr_name in return_attributes_actual:
attr = person_attributes.get(attr_name)
if attr:
attr = re.sub(r"\r|\n", '', attr.get(0)).strip().encode('utf8')
# special fix for start_date
if attr_name == return_attributes_map['start_date'] and len(attr) == 4:
attr += '0101'
ldap_user_tuple.append(attr)
else:
ldap_user_tuple.append("")
ldap_user_tuple.append(self.wh_exec_id)
ldap_records.append(ldap_user_tuple)
# Examine the paged results control response
control = ctx.getResponseControls()[0] # will always return a list, but only have one item
if isinstance(control, PagedResultsResponseControl):
cookie = control.getCookie()
# Re-activate paged results
if cookie is None:
# reset ctx, break while loop, do next search
pageControl = PagedResultsControl(pageSize, Control.NONCRITICAL)
c_array = array([pageControl], Control)
ctx.setRequestControls(c_array)
break
else:
self.logger.debug("Have more than one page of result when search " + search_unit)
pageControl = PagedResultsControl(pageSize, cookie, Control.CRITICAL)
c_array = array([pageControl], Control)
ctx.setRequestControls(c_array)
self.logger.info("%d records found in ldap search" % (len(self.ldap_user)))
@ -126,14 +154,14 @@ class LdapExtract:
ctx = InitialDirContext(settings)
search_target = "(objectClass=posixGroup)"
return_attributes_standard = ['group_id', 'member_ids']
return_attributes_actual = self.split_property(self.args[Constant.LDAP_GROUP_SEARCH_RETURN_ATTRS_KEY])
return_attributes_actual = json.loads(self.args[Constant.LDAP_GROUP_SEARCH_RETURN_ATTRS_KEY])
return_attributes_map = dict(zip(return_attributes_standard, return_attributes_actual))
ctls = SearchControls()
ctls.setReturningAttributes(return_attributes_actual)
ctls.setSearchScope(SearchControls.SUBTREE_SCOPE)
ldap_records = []
org_units = self.split_property(self.args[Constant.LDAP_GROUP_SEARCH_DOMAINS_KEY])
org_units = json.loads(self.args[Constant.LDAP_GROUP_SEARCH_DOMAINS_KEY])
for search_unit in org_units:
results = ctx.search(search_unit, search_target, ctls)
for r in results:

View File

@ -12,6 +12,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#
# Job identify info
whEtlId=
job=
refId=
# Teradata properties
teradata.databases=
teradata.db.driver=