diff --git a/metadata-etl/src/main/java/metadata/etl/EtlJob.java b/metadata-etl/src/main/java/metadata/etl/EtlJob.java index a2354bce4d..1e625b2739 100644 --- a/metadata-etl/src/main/java/metadata/etl/EtlJob.java +++ b/metadata-etl/src/main/java/metadata/etl/EtlJob.java @@ -37,7 +37,6 @@ import java.util.Properties; public abstract class EtlJob { private final static String CONFIG_FILE = "application.properties"; public PythonInterpreter interpreter; - public PySystemState sys; public Properties prop; public ClassLoader classLoader = getClass().getClassLoader(); protected final Logger logger = LoggerFactory.getLogger(getClass()); @@ -74,7 +73,7 @@ public abstract class EtlJob { * @param properties */ public EtlJob(Integer appId, Integer dbId, Long whExecId, Properties properties) { - configFromProperties(appId, dbId, whExecId, properties); + PySystemState sys = configFromProperties(appId, dbId, whExecId, properties); addJythonToPath(sys); interpreter = new PythonInterpreter(null, sys); } @@ -127,8 +126,9 @@ public abstract class EtlJob { * @param appId * @param whExecId * @param properties + * @return PySystemState A PySystemState that contain all the arguments. */ - private void configFromProperties(Integer appId, Integer dbId, Long whExecId, Properties properties) { + private PySystemState configFromProperties(Integer appId, Integer dbId, Long whExecId, Properties properties) { this.prop = properties; if (appId != null) prop.setProperty(Constant.APP_ID_KEY, String.valueOf(appId)); @@ -140,8 +140,9 @@ public abstract class EtlJob { String value = prop.getProperty(key); config.put(new PyString(key), new PyString(value)); } - sys = new PySystemState(); + PySystemState sys = new PySystemState(); sys.argv.append(config); + return sys; } /** * Extract data from source diff --git a/metadata-etl/src/main/java/metadata/etl/lineage/AzJobChecker.java b/metadata-etl/src/main/java/metadata/etl/lineage/AzJobChecker.java index a4c53e7eaa..e10692155a 100644 --- a/metadata-etl/src/main/java/metadata/etl/lineage/AzJobChecker.java +++ b/metadata-etl/src/main/java/metadata/etl/lineage/AzJobChecker.java @@ -124,24 +124,30 @@ public class AzJobChecker { JsonNode wholeFlow = mapper.readTree(flowJson); JsonNode allJobs = wholeFlow.get("nodes"); String flowPath = wholeFlow.get("projectName").asText() + ":" + wholeFlow.get("flowId").asText(); - List results = parseJsonHelper(allJobs, flowExecId, "", flowPath); + List results = parseJsonHelper(allJobs, flowExecId, flowPath); AzkabanJobExecUtil.sortAndSet(results); return results; } - private List parseJsonHelper(JsonNode allJobs, long flowExecId, String jobPrefix, String flowPath) { + /** + * Recursively process the execution info to get {@AzkabanJobExecRecord} + * @param allJobs JsonNode in "nodes" field + * @param flowExecId + * @param flowPath Format : project_name:first_level_flow/sub_flow/sub_flow + * @return + */ + private List parseJsonHelper(JsonNode allJobs, long flowExecId, String flowPath) { List results = new ArrayList<>(); for (JsonNode oneJob : allJobs) { if (oneJob.has("nodes")) { // is a subflow String subFlowName = oneJob.get("id").asText(); - String newJobPrefix = jobPrefix.length() > 0 ? jobPrefix + subFlowName + ":" : subFlowName + ":"; - results.addAll(parseJsonHelper(oneJob.get("nodes"), flowExecId, newJobPrefix, flowPath)); + flowPath += "/" + subFlowName; + results.addAll(parseJsonHelper(oneJob.get("nodes"), flowExecId, flowPath)); } else { String jobName = oneJob.get("id").asText(); long startTime = oneJob.get("startTime").asLong(); long endTime = oneJob.get("endTime").asLong(); String status = oneJob.get("status").asText(); - jobName = jobPrefix.length() > 0 ? jobPrefix + jobName : jobName; AzkabanJobExecRecord azkabanJobExecRecord = new AzkabanJobExecRecord(appId, jobName, flowExecId, (int) (startTime / 1000), (int) (endTime / 1000), status, flowPath); diff --git a/metadata-etl/src/main/java/metadata/etl/lineage/AzLineageExtractor.java b/metadata-etl/src/main/java/metadata/etl/lineage/AzLineageExtractor.java index 362bc8b55f..85f71b2129 100644 --- a/metadata-etl/src/main/java/metadata/etl/lineage/AzLineageExtractor.java +++ b/metadata-etl/src/main/java/metadata/etl/lineage/AzLineageExtractor.java @@ -43,9 +43,16 @@ public class AzLineageExtractor { throws Exception { List oneAzkabanJobLineage = new ArrayList<>(); + + // azkaban job name should have subflow name append in front + String flowSequence[] = message.azkabanJobExecution.getFlowPath().split(":")[1].split("/"); + String jobPrefix = ""; + for (int i = 1; i < flowSequence.length; i ++) { + jobPrefix += flowSequence[i] + ":"; + } //String log = asc.getExecLog(azJobExec.execId, azJobExec.jobName); String log = - message.adc.getExecLog(message.azkabanJobExecution.getFlowExecId(), message.azkabanJobExecution.getJobName()); + message.adc.getExecLog(message.azkabanJobExecution.getFlowExecId(), jobPrefix + message.azkabanJobExecution.getJobName()); Set hadoopJobIds = AzLogParser.getHadoopJobIdFromLog(log); for (String hadoopJobId : hadoopJobIds) { diff --git a/metadata-etl/src/main/resources/jython/HdfsLoad.py b/metadata-etl/src/main/resources/jython/HdfsLoad.py index 8f755f24a6..1696ba8229 100644 --- a/metadata-etl/src/main/resources/jython/HdfsLoad.py +++ b/metadata-etl/src/main/resources/jython/HdfsLoad.py @@ -191,7 +191,7 @@ class HdfsLoad: delete from dict_field_detail where field_id in (select field_id from t_deleted_fields); -- update the old record if some thing changed - update dict_dict_field_detail t join + update dict_field_detail t join ( select x.field_id, s.* from stg_dict_field_detail s join dict_dataset d @@ -204,26 +204,27 @@ class HdfsLoad: and (x.sort_id <> s.sort_id or x.parent_sort_id <> s.parent_sort_id or x.data_type <> s.data_type - or x.data_size <> s.data_size - or x.data_precision <> s.data_precision - or x.is_nullable <> s.is_nullable - or x.is_partitioned <> s.is_partitioned - or x.is_distributed <> s.is_distributed - or x.default_value <> s.default_value - or x.namespace <> s.namespace + or x.data_size <> s.data_size or (x.data_size is null) ^ (s.data_size is null) + or x.data_precision <> s.data_precision or (x.data_precision is null) ^ (s.data_precision is null) + or x.is_nullable <> s.is_nullable or (x.is_nullable is null) ^ (s.is_nullable is null) + or x.is_partitioned <> s.is_partitioned or (x.is_partitioned is null) ^ (s.is_partitioned is null) + or x.is_distributed <> s.is_distributed or (x.is_distributed is null) ^ (s.is_distributed is null) + or x.default_value <> s.default_value or (x.default_value is null) ^ (s.default_value is null) + or x.namespace <> s.namespace or (x.namespace is null) ^ (s.namespace is null) ) ) p on t.field_id = p.field_id set t.sort_id = p.sort_id, + t.parent_sort_id = p.parent_sort_id, t.data_type = p.data_type, t.data_size = p.data_size, t.data_precision = p.data_precision, t.is_nullable = p.is_nullable, t.is_partitioned = p.is_partitioned, - t.is_distributed = p.is_distributed - t.default_value = p.default_value - t.namespace = p.namespace - t.last_modified = now() + t.is_distributed = p.is_distributed, + t.default_value = p.default_value, + t.namespace = p.namespace, + t.modified = now() ; insert into dict_field_detail ( diff --git a/metadata-etl/src/main/resources/jython/TeradataLoad.py b/metadata-etl/src/main/resources/jython/TeradataLoad.py index c8bc657dc7..1650029fe7 100644 --- a/metadata-etl/src/main/resources/jython/TeradataLoad.py +++ b/metadata-etl/src/main/resources/jython/TeradataLoad.py @@ -152,7 +152,7 @@ class TeradataLoad: delete from dict_field_detail where field_id in (select field_id from t_deleted_fields); -- update the old record if some thing changed - update dict_dict_field_detail t join + update dict_field_detail t join ( select x.field_id, s.* from stg_dict_field_detail s join dict_dataset d @@ -165,26 +165,27 @@ class TeradataLoad: and (x.sort_id <> s.sort_id or x.parent_sort_id <> s.parent_sort_id or x.data_type <> s.data_type - or x.data_size <> s.data_size - or x.data_precision <> s.data_precision - or x.is_nullable <> s.is_nullable - or x.is_partitioned <> s.is_partitioned - or x.is_distributed <> s.is_distributed - or x.default_value <> s.default_value - or x.namespace <> s.namespace + or x.data_size <> s.data_size or (x.data_size is null) ^ (s.data_size is null) + or x.data_precision <> s.data_precision or (x.data_precision is null) ^ (s.data_precision is null) + or x.is_nullable <> s.is_nullable or (x.is_nullable is null) ^ (s.is_nullable is null) + or x.is_partitioned <> s.is_partitioned or (x.is_partitioned is null) ^ (s.is_partitioned is null) + or x.is_distributed <> s.is_distributed or (x.is_distributed is null) ^ (s.is_distributed is null) + or x.default_value <> s.default_value or (x.default_value is null) ^ (s.default_value is null) + or x.namespace <> s.namespace or (x.namespace is null) ^ (s.namespace is null) ) ) p on t.field_id = p.field_id set t.sort_id = p.sort_id, + t.parent_sort_id = p.parent_sort_id, t.data_type = p.data_type, t.data_size = p.data_size, t.data_precision = p.data_precision, t.is_nullable = p.is_nullable, t.is_partitioned = p.is_partitioned, - t.is_distributed = p.is_distributed - t.default_value = p.default_value - t.namespace = p.namespace - t.last_modified = now() + t.is_distributed = p.is_distributed, + t.default_value = p.default_value, + t.namespace = p.namespace, + t.modified = now() ; show warnings limit 10; diff --git a/metadata-etl/src/test/java/metadata/etl/lineage/AzJobCheckerTest.java b/metadata-etl/src/test/java/metadata/etl/lineage/AzJobCheckerTest.java index a4e10be911..da3aad1399 100644 --- a/metadata-etl/src/test/java/metadata/etl/lineage/AzJobCheckerTest.java +++ b/metadata-etl/src/test/java/metadata/etl/lineage/AzJobCheckerTest.java @@ -33,87 +33,6 @@ import wherehows.common.schemas.AzkabanJobExecRecord; * Created by zsun on 9/9/15. */ public class AzJobCheckerTest { - final String jsonInput = "{\"attempt\": 0,\n" + " \"endTime\": 1442233069062,\n" + " \"executionId\": 832765,\n" - + " \"executionOptions\": {\"concurrentOption\": \"skip\",\n" + " \"disabled\": [],\n" - + " \"failureAction\": \"FINISH_CURRENTLY_RUNNING\",\n" - + " \"failureEmails\": [\"zsun@linkedin.com\"],\n" - + " \"failureEmailsOverride\": true,\n" + " \"flowParameters\": {},\n" - + " \"mailCreator\": \"default\",\n" + " \"memoryCheck\": true,\n" - + " \"notifyOnFirstFailure\": false,\n" - + " \"notifyOnLastFailure\": false,\n" + " \"pipelineExecId\": null,\n" - + " \"pipelineLevel\": null,\n" + " \"queueLevel\": 0,\n" - + " \"successEmails\": [],\n" + " \"successEmailsOverride\": false},\n" - + " \"executionPath\": \"executions/832765\",\n" + " \"flowId\": \"hadoop-datasets-stats\",\n" + " \"id\": null,\n" - + " \"lastModfiedTime\": 1440783373310,\n" + " \"lastModifiedUser\": \"zsun\",\n" - + " \"nodes\": [{\"attempt\": 0,\n" + " \"endTime\": 1442233069053,\n" - + " \"id\": \"hadoop-datasets-stats\",\n" - + " \"inNodes\": [\"hadoop-datasets-stats_enrich-abstract-dataset\"],\n" - + " \"jobSource\": \"hadoop-datasets-stats.job\",\n" - + " \"propSource\": \"common.properties\",\n" + " \"startTime\": 1442233069045,\n" - + " \"status\": \"SUCCEEDED\",\n" + " \"type\": \"noop\",\n" - + " \"updateTime\": 1442233069057},\n" + " {\"attempt\": 0,\n" - + " \"endTime\": 1442232636665,\n" - + " \"id\": \"hadoop-datasets-stats_load-size-avro-into-mysql\",\n" - + " \"inNodes\": [\"hadoop-datasets-stats_sizePartition\"],\n" - + " \"jobSource\": \"hadoop-datasets-stats_load-size-avro-into-mysql.job\",\n" - + " \"outNodes\": [\"hadoop-datasets-stats_extra-load-size-into-wherehows-mysql\"],\n" - + " \"propSource\": \"common.properties\",\n" + " \"startTime\": 1442229210581,\n" - + " \"status\": \"SUCCEEDED\",\n" + " \"type\": \"hadoopJava\",\n" - + " \"updateTime\": 1442232636670},\n" + " {\"attempt\": 0,\n" - + " \"endTime\": 1442229861221,\n" - + " \"id\": \"hadoop-datasets-stats_extract-dataset-layout\",\n" - + " \"inNodes\": [\"hadoop-datasets-stats_sizePartition\"],\n" - + " \"jobSource\": \"hadoop-datasets-stats_extract-dataset-layout.job\",\n" - + " \"outNodes\": [\"hadoop-datasets-stats_load-abstract-dataset-into-wherehows-mysql\"],\n" - + " \"propSource\": \"common.properties\",\n" + " \"startTime\": 1442229210582,\n" - + " \"status\": \"SUCCEEDED\",\n" + " \"type\": \"hadoopJava\",\n" - + " \"updateTime\": 1442229861231},\n" + " {\"attempt\": 0,\n" - + " \"endTime\": 1442229210579,\n" + " \"id\": \"hadoop-datasets-stats_sizePartition\",\n" - + " \"inNodes\": [\"hadoop-datasets-stats_sizeAggr\"],\n" - + " \"jobSource\": \"hadoop-datasets-stats_sizePartition.job\",\n" - + " \"outNodes\": [\"hadoop-datasets-stats_load-size-avro-into-mysql\",\n" - + " \"hadoop-datasets-stats_extract-dataset-layout\"],\n" - + " \"propSource\": \"common.properties\",\n" + " \"startTime\": 1442228463681,\n" - + " \"status\": \"SUCCEEDED\",\n" + " \"type\": \"hadoopJava\",\n" - + " \"updateTime\": 1442229210587},\n" + " {\"attempt\": 0,\n" - + " \"endTime\": 1442228463629,\n" + " \"id\": \"hadoop-datasets-stats_sizeAggr\",\n" - + " \"jobSource\": \"hadoop-datasets-stats_sizeAggr.job\",\n" - + " \"outNodes\": [\"hadoop-datasets-stats_sizePartition\"],\n" - + " \"propSource\": \"common.properties\",\n" + " \"startTime\": 1442224810817,\n" - + " \"status\": \"SUCCEEDED\",\n" + " \"type\": \"hadoopJava\",\n" - + " \"updateTime\": 1442228463679},\n" + " {\"attempt\": 0,\n" - + " \"endTime\": 1442229882257,\n" - + " \"id\": \"hadoop-datasets-stats_load-abstract-dataset-into-wherehows-mysql\",\n" - + " \"inNodes\": [\"hadoop-datasets-stats_extract-dataset-layout\"],\n" - + " \"jobSource\": \"hadoop-datasets-stats_load-abstract-dataset-into-wherehows-mysql.job\",\n" - + " \"outNodes\": [\"hadoop-datasets-stats_enrich-abstract-dataset\"],\n" - + " \"propSource\": \"common.properties\",\n" + " \"startTime\": 1442229861224,\n" - + " \"status\": \"SUCCEEDED\",\n" + " \"type\": \"hadoopJava\",\n" - + " \"updateTime\": 1442229882261},\n" + " {\"attempt\": 0,\n" - + " \"endTime\": 1442233066192,\n" - + " \"id\": \"hadoop-datasets-stats_extra-load-size-into-wherehows-mysql\",\n" - + " \"inNodes\": [\"hadoop-datasets-stats_load-size-avro-into-mysql\"],\n" - + " \"jobSource\": \"hadoop-datasets-stats_extra-load-size-into-wherehows-mysql.job\",\n" - + " \"outNodes\": [\"hadoop-datasets-stats_enrich-abstract-dataset\"],\n" - + " \"propSource\": \"common.properties\",\n" + " \"startTime\": 1442232636668,\n" - + " \"status\": \"SUCCEEDED\",\n" + " \"type\": \"hadoopJava\",\n" - + " \"updateTime\": 1442233066196},\n" + " {\"attempt\": 0,\n" - + " \"endTime\": 1442233069043,\n" - + " \"id\": \"hadoop-datasets-stats_enrich-abstract-dataset\",\n" - + " \"inNodes\": [\"hadoop-datasets-stats_load-abstract-dataset-into-wherehows-mysql\",\n" - + " \"hadoop-datasets-stats_extra-load-size-into-wherehows-mysql\"],\n" - + " \"jobSource\": \"hadoop-datasets-stats_enrich-abstract-dataset.job\",\n" - + " \"outNodes\": [\"hadoop-datasets-stats\"],\n" - + " \"propSource\": \"common.properties\",\n" + " \"startTime\": 1442233066194,\n" - + " \"status\": \"SUCCEEDED\",\n" + " \"type\": \"hadoopJava\",\n" - + " \"updateTime\": 1442233069046}],\n" + " \"projectId\": 533,\n" - + " \"projectName\": \"WherehowsETL\",\n" + " \"properties\": [{\"source\": \"common.properties\"}],\n" - + " \"proxyUsers\": [\"azkaban@GRID.LINKEDIN.COM\",\n" + " \"dev_svc\",\n" - + " \"azkaban/eat1-nertzaz01.grid.linkedin.com@GRID.LINKEDIN.COM\",\n" - + " \"zsun\",\n" + " \"data_svc\"],\n" + " \"startTime\": 1442224810815,\n" - + " \"status\": \"SUCCEEDED\",\n" + " \"submitTime\": 1442224810778,\n" + " \"submitUser\": \"zsun\",\n" - + " \"type\": null,\n" + " \"updateTime\": 1442233069065,\n" + " \"version\": 301}"; - AzJobChecker ajc; Properties prop; @@ -149,27 +68,11 @@ public class AzJobCheckerTest { Assert.assertNotNull(results); } - @Test(groups = {"needConfig"}) - public void parseJsonTest() - throws IOException { - List result = ajc.parseJson(jsonInput, 11111); - for (int i = 0; i < result.size(); i++) { - AzkabanJobExecRecord aje = result.get(i); - System.out.println(aje.getJobExecId()); - System.out.println(aje.getJobName()); - System.out.println(aje.getStartTime()); - System.out.println(aje.getEndTime()); - System.out.println(aje.getFlowPath()); - System.out.println(); - Assert.assertEquals((long) aje.getJobExecId(), 11111 * 1000 + i); - } - } - @Test(groups = {"needConfig"}) public void parseNestedJsonTest() throws IOException, URISyntaxException { - URL url = Thread.currentThread().getContextClassLoader().getResource("nestedJson"); + URL url = Thread.currentThread().getContextClassLoader().getResource("nestedFlowContent.json"); byte[] encoded = Files.readAllBytes(Paths.get(url.getPath())); String nestedJson = new String(encoded, "UTF-8"); List result = ajc.parseJson(nestedJson, 11111); diff --git a/metadata-etl/src/test/java/metadata/etl/lineage/AzLogParserTest.java b/metadata-etl/src/test/java/metadata/etl/lineage/AzLogParserTest.java index eb79d6da7b..c3b8c458b0 100644 --- a/metadata-etl/src/test/java/metadata/etl/lineage/AzLogParserTest.java +++ b/metadata-etl/src/test/java/metadata/etl/lineage/AzLogParserTest.java @@ -35,6 +35,8 @@ import wherehows.common.schemas.LineageRecord; */ public class AzLogParserTest { + private final int TEST_APP_ID = -1; + private final int TEST_DATABASE_ID = -1; @BeforeTest public void setUp() throws SQLException { @@ -79,7 +81,7 @@ public class AzLogParserTest { @Test(groups = {"needConfig"}) public void getLineageFromLogTest() { String logSample = "asfdasdfsadf Moving from staged path[asdf] to final resting place[/tm/b/c] sdaf dsfasdfasdf"; - AzkabanJobExecRecord sampleExecution = new AzkabanJobExecRecord(-1, "someJobName", (long) 0, 0, 0, "S", "path"); + AzkabanJobExecRecord sampleExecution = new AzkabanJobExecRecord(TEST_APP_ID, "someJobName", (long) 0, 0, 0, "S", "path"); sampleExecution.setJobExecId((long) 11111); List result = AzLogParser.getLineageFromLog(logSample, sampleExecution, -1); System.out.println(result.get(0).toDatabaseValue()); @@ -104,8 +106,8 @@ public class AzLogParserTest { + "17-11-2015 01:32:27 PST endorsements_push-lva-endorsements-member-restrictions INFO - INFO tcp://lva1-voldemort-read-only-2-vip.prod.linkedin.com:10103 : Initiating swap of endorsements-member-restrictions with dataDir: /jobs/endorse/endorsements/master/tmp/endorsements-member-restrictions.store/lva1-voldemort-read-only-2-vip.prod.linkedin.com\n" + "17-11-2015 01:32:27 PST endorsements_push-lva-endorsements-member-restrictions INFO - INFO tcp://lva1-voldemort-read-only-2-vip.prod.linkedin.com:10103 : Invoking fetch for Node lva1-app0610.prod.linkedin.com [id 0] for webhdfs://lva1-warnn01.grid.linkedin.com:50070/jobs/endorse/endorsements/master/tmp/endorsements-member-restrictions.store/lva1-voldemort-read-only-2-vip.prod.linkedin.com/node-0\n" + "17-11-2015 01:32:27 PST endorsements_push-lva-endorsements-member-restrictions INFO - INFO tcp://lva1-voldemort-rea"; - AzkabanJobExecRecord sampleExecution = new AzkabanJobExecRecord(-1, "someJobName", (long) 0, 0, 0, "S", "path"); - List result = AzLogParser.getLineageFromLog(logSample, sampleExecution, -1); + AzkabanJobExecRecord sampleExecution = new AzkabanJobExecRecord(TEST_APP_ID, "someJobName", (long) 0, 0, 0, "S", "path"); + List result = AzLogParser.getLineageFromLog(logSample, sampleExecution, TEST_DATABASE_ID); System.out.println(result.get(0).toDatabaseValue()); Assert.assertEquals(result.get(0).getFullObjectName(), "tcp://lva1-voldemort-read-only-2-vip.prod.linkedin.com:10103/endorsements-member-restrictions");