From 07c46304b5e709603d5992776d22ae4e264db9bc Mon Sep 17 00:00:00 2001 From: SunZhaonan Date: Fri, 11 Dec 2015 19:46:35 -0800 Subject: [PATCH] Fix bug of duplicate field loading. Fix bug of subflow process in azkaban lineage ETL. --- .../app/models/daos/LineageDao.java | 4 +- backend-service/deploy | 16 ++-- data-model/DDL/ETL_DDL/dataset_metadata.sql | 27 ++++--- .../src/main/java/metadata/etl/EtlJob.java | 17 ++-- .../metadata/etl/lineage/AzJobChecker.java | 65 ++++++++++----- .../etl/lineage/AzLineageExtractorMaster.java | 9 ++- .../etl/lineage/AzLineageMetadataEtl.java | 15 +++- .../etl/lineage/HadoopNameNodeExtractor.java | 8 +- .../src/main/resources/jython/HdfsLoad.py | 62 ++++++++++++++- .../src/main/resources/jython/TeradataLoad.py | 79 +++++++++++++------ .../etl/lineage/AzJobCheckerTest.java | 42 ++++++++++ .../etl/lineage/AzLineageExtractorTest.java | 2 +- .../metadata/etl/lineage/AzLogParserTest.java | 6 +- .../main/java/wherehows/common/Constant.java | 12 +++ .../common/schemas/DatasetFieldRecord.java | 20 +++-- 15 files changed, 292 insertions(+), 92 deletions(-) diff --git a/backend-service/app/models/daos/LineageDao.java b/backend-service/app/models/daos/LineageDao.java index c242b56923..d6c58f0975 100644 --- a/backend-service/app/models/daos/LineageDao.java +++ b/backend-service/app/models/daos/LineageDao.java @@ -34,10 +34,8 @@ import wherehows.common.writers.DatabaseWriter; */ public class LineageDao { public static final String FIND_JOBS_BY_DATASET = - " select distinct ca.short_connection_string, f.flow_group, f.flow_name, jedl.job_name " + " select distinct ca.short_connection_string, jedl.job_name, jedl.flow_path " + " from job_execution_data_lineage jedl " - + " join flow_execution fe on jedl.app_id = fe.app_id and jedl.flow_exec_id = fe.flow_exec_id " - + " join flow f on fe.app_id = f.app_id and fe.flow_id = f.flow_id " + " join cfg_application ca on ca.app_id = jedl.app_id " + " join cfg_database cd on cd.db_id = jedl.db_id " + " where source_target_type = :source_target_type " diff --git a/backend-service/deploy b/backend-service/deploy index 5f7ac3ce68..a411bb2237 100755 --- a/backend-service/deploy +++ b/backend-service/deploy @@ -4,16 +4,14 @@ gradle dist; rm -r target/universal/backend-service-1.0-SNAPSHOT; unzip target/universal/backend-service-1.0-SNAPSHOT.zip -d target/universal/; -scp target/universal/backend-service-1.0-SNAPSHOT/lib/metadata-etl-1.0.jar dev_svc@lva1-rpt14:~/backendServer/lib/; -scp target/universal/backend-service-1.0-SNAPSHOT/lib/wherehows-common-1.0.jar dev_svc@lva1-rpt14:~/backendServer/lib/; +scp target/universal/backend-service-1.0-SNAPSHOT/lib/metadata-etl-1.0.jar ${TARGET_SERVER}:~/backendServer/lib/; -scp target/universal/backend-service-1.0-SNAPSHOT/lib/default.backend-service-1.0-SNAPSHOT.jar dev_svc@lva1-rpt14:~/backendServer/lib/; +scp target/universal/backend-service-1.0-SNAPSHOT/lib/default.backend-service-1.0-SNAPSHOT.jar ${TARGET_SERVER}:~/backendServer/lib/; -scp target/universal/backend-service-1.0-SNAPSHOT/lib/schemaFetch.jar dev_svc@lva1-rpt14:~/backendServer/lib/; +scp target/universal/backend-service-1.0-SNAPSHOT/lib/schemaFetch.jar ${TARGET_SERVER}:~/backendServer/lib/; + +scp target/universal/backend-service-1.0-SNAPSHOT/lib/wherehows-common-1.0.jar ${TARGET_SERVER}:~/backendServer/lib/; + +scp target/universal/backend-service-1.0-SNAPSHOT/bin/* ${TARGET_SERVER}:~/backendServer/bin/; - -scp target/universal/backend-service-1.0-SNAPSHOT/bin/* dev_svc@lva1-rpt14:~/backendServer/bin/; - - -scp target/universal/backend-service-1.0-SNAPSHOT/lib/metadata-etl-1.0.jar cloudera@172.21.98.211:~/wherehows/backend-service-1.0-SNAPSHOT/lib/ diff --git a/data-model/DDL/ETL_DDL/dataset_metadata.sql b/data-model/DDL/ETL_DDL/dataset_metadata.sql index 36c314f3ee..4015c1ab8d 100644 --- a/data-model/DDL/ETL_DDL/dataset_metadata.sql +++ b/data-model/DDL/ETL_DDL/dataset_metadata.sql @@ -165,21 +165,30 @@ CREATE TABLE `dict_field_detail` ( `field_label` VARCHAR(100) DEFAULT NULL, `data_type` VARCHAR(50) NOT NULL, `data_size` INT(10) UNSIGNED DEFAULT NULL, - `data_precision` TINYINT(4) DEFAULT NULL, - `data_fraction` TINYINT(4) DEFAULT NULL, + `data_precision` TINYINT(4) DEFAULT NULL + COMMENT 'only in decimal type', + `data_fraction` TINYINT(4) DEFAULT NULL + COMMENT 'only in decimal type', `default_comment_id` INT(11) UNSIGNED DEFAULT NULL COMMENT 'a list of comment_id', `comment_ids` VARCHAR(500) DEFAULT NULL, `is_nullable` CHAR(1) DEFAULT NULL, - `is_indexed` CHAR(1) DEFAULT NULL, - `is_partitioned` CHAR(1) DEFAULT NULL, - `is_distributed` TINYINT(4) DEFAULT NULL, + `is_indexed` CHAR(1) DEFAULT NULL + COMMENT 'only in RDBMS', + `is_partitioned` CHAR(1) DEFAULT NULL + COMMENT 'only in RDBMS', + `is_distributed` TINYINT(4) DEFAULT NULL + COMMENT 'only in RDBMS', `default_value` VARCHAR(200) DEFAULT NULL, `namespace` VARCHAR(200) DEFAULT NULL, - `java_data_type` VARCHAR(50) DEFAULT NULL, - `jdbc_data_type` VARCHAR(50) DEFAULT NULL, - `pig_data_type` VARCHAR(50) DEFAULT NULL, - `hcatalog_data_type` VARCHAR(50) DEFAULT NULL, + `java_data_type` VARCHAR(50) DEFAULT NULL + COMMENT 'correspond type in java', + `jdbc_data_type` VARCHAR(50) DEFAULT NULL + COMMENT 'correspond type in jdbc', + `pig_data_type` VARCHAR(50) DEFAULT NULL + COMMENT 'correspond type in pig', + `hcatalog_data_type` VARCHAR(50) DEFAULT NULL + COMMENT 'correspond type in hcatalog', `modified` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`field_id`), KEY `idx_dict_field__datasetid_fieldname` (`dataset_id`, `field_name`) USING BTREE, diff --git a/metadata-etl/src/main/java/metadata/etl/EtlJob.java b/metadata-etl/src/main/java/metadata/etl/EtlJob.java index b5debe4dd5..a2354bce4d 100644 --- a/metadata-etl/src/main/java/metadata/etl/EtlJob.java +++ b/metadata-etl/src/main/java/metadata/etl/EtlJob.java @@ -35,7 +35,7 @@ import java.util.Properties; * Created by zsun on 7/29/15. */ public abstract class EtlJob { - public final static String CONFIG_FILE = "application.properties"; + private final static String CONFIG_FILE = "application.properties"; public PythonInterpreter interpreter; public PySystemState sys; public Properties prop; @@ -61,8 +61,8 @@ public abstract class EtlJob { */ @Deprecated public EtlJob(Integer appId, Integer dbId, long whExecId, String configFile) { - configFromFile(appId, dbId, whExecId, configFile); - addJythonToPath(); + PySystemState sys = configFromFile(appId, dbId, whExecId, configFile); + addJythonToPath(sys); interpreter = new PythonInterpreter(null, sys); } @@ -75,11 +75,11 @@ public abstract class EtlJob { */ public EtlJob(Integer appId, Integer dbId, Long whExecId, Properties properties) { configFromProperties(appId, dbId, whExecId, properties); - addJythonToPath(); + addJythonToPath(sys); interpreter = new PythonInterpreter(null, sys); } - private void addJythonToPath() { + private void addJythonToPath(PySystemState pySystemState) { URL url = classLoader.getResource("jython"); if (url != null) { File file = new File(url.getFile()); @@ -87,12 +87,12 @@ public abstract class EtlJob { if (path.startsWith("file:")) { path = path.substring(5); } - sys.path.append(new PyString(path.replace("!", ""))); + pySystemState.path.append(new PyString(path.replace("!", ""))); } } @Deprecated - private void configFromFile(Integer appId, Integer dbId, long whExecId, String configFile) { + private PySystemState configFromFile(Integer appId, Integer dbId, long whExecId, String configFile) { prop = new Properties(); if (appId != null) { @@ -117,8 +117,9 @@ public abstract class EtlJob { config.put(new PyString(key), new PyString(value)); } - sys = new PySystemState(); + PySystemState sys = new PySystemState(); sys.argv.append(config); + return sys; } /** diff --git a/metadata-etl/src/main/java/metadata/etl/lineage/AzJobChecker.java b/metadata-etl/src/main/java/metadata/etl/lineage/AzJobChecker.java index e8156054d7..a4c53e7eaa 100644 --- a/metadata-etl/src/main/java/metadata/etl/lineage/AzJobChecker.java +++ b/metadata-etl/src/main/java/metadata/etl/lineage/AzJobChecker.java @@ -52,7 +52,7 @@ public class AzJobChecker { /** * Default 10 minutes - * @return + * @return A list of recent finished AzkabanJobExecRecord * @throws IOException * @throws SQLException */ @@ -72,23 +72,32 @@ public class AzJobChecker { public List getRecentFinishedJobFromFlow(int timeFrameMinutes) throws IOException, SQLException { long currentTimeStamp = System.currentTimeMillis(); - long oneHourAgo = currentTimeStamp - 1000 * 60 * timeFrameMinutes; - return getRecentFinishedJobFromFlow(oneHourAgo); + long beginTimeStamp = currentTimeStamp - 1000 * 60 * timeFrameMinutes; + return getRecentFinishedJobFromFlow(beginTimeStamp, currentTimeStamp); + } + + public List getRecentFinishedJobFromFlow(int timeFrameMinutes, long endTimeStamp) + throws IOException, SQLException { + long beginTimeStamp = endTimeStamp - 60 * timeFrameMinutes; + return getRecentFinishedJobFromFlow(beginTimeStamp * 1000, endTimeStamp * 1000); // convert to milli seconds } /** * Read the blob from "flow_data", do a topological sort on the nodes. Give them the sort id. - * @param timestamp the beginning timestamp + * @param startTimeStamp the begin timestamp + * @param endTimeStamp the end timestamp * @return */ - public List getRecentFinishedJobFromFlow(long timestamp) + public List getRecentFinishedJobFromFlow(long startTimeStamp, long endTimeStamp) throws SQLException, IOException { - logger.info("Get the jobs from time : {}", timestamp); + logger.info("Get the jobs from time : {} to time : {}", startTimeStamp, endTimeStamp); List results = new ArrayList<>(); Statement stmt = conn.createStatement(); final String cmd = - "select exec_id, flow_id, status, submit_user, flow_data from execution_flows where end_time > " + timestamp; + "select exec_id, flow_id, status, submit_user, flow_data from execution_flows where end_time > " + startTimeStamp + + " and end_time < " + endTimeStamp ; + logger.info("Get recent flow sql : " + cmd); final ResultSet rs = stmt.executeQuery(cmd); // this sql take 3 second to execute while (rs.next()) { @@ -101,27 +110,47 @@ public class AzJobChecker { return results; } + /** + * Parse the json of flow_data field from execution_flows. Use recursion to handle the nested case. + * @param flowJson + * @param flowExecId + * @return + * @throws IOException + */ public List parseJson(String flowJson, long flowExecId) throws IOException { - List results = new ArrayList<>(); + ObjectMapper mapper = new ObjectMapper(); JsonNode wholeFlow = mapper.readTree(flowJson); JsonNode allJobs = wholeFlow.get("nodes"); String flowPath = wholeFlow.get("projectName").asText() + ":" + wholeFlow.get("flowId").asText(); - for (JsonNode oneJob : allJobs) { - String jobName = oneJob.get("id").asText(); - long startTime = oneJob.get("startTime").asLong(); - long endTime = oneJob.get("endTime").asLong(); - String status = oneJob.get("status").asText(); - AzkabanJobExecRecord azkabanJobExecRecord = - new AzkabanJobExecRecord(appId, jobName, flowExecId, (int) (startTime / 1000), (int) (endTime / 1000), status, - flowPath); - results.add(azkabanJobExecRecord); - } + List results = parseJsonHelper(allJobs, flowExecId, "", flowPath); AzkabanJobExecUtil.sortAndSet(results); return results; } + private List parseJsonHelper(JsonNode allJobs, long flowExecId, String jobPrefix, String flowPath) { + List results = new ArrayList<>(); + for (JsonNode oneJob : allJobs) { + if (oneJob.has("nodes")) { // is a subflow + String subFlowName = oneJob.get("id").asText(); + String newJobPrefix = jobPrefix.length() > 0 ? jobPrefix + subFlowName + ":" : subFlowName + ":"; + results.addAll(parseJsonHelper(oneJob.get("nodes"), flowExecId, newJobPrefix, flowPath)); + } else { + String jobName = oneJob.get("id").asText(); + long startTime = oneJob.get("startTime").asLong(); + long endTime = oneJob.get("endTime").asLong(); + String status = oneJob.get("status").asText(); + jobName = jobPrefix.length() > 0 ? jobPrefix + jobName : jobName; + AzkabanJobExecRecord azkabanJobExecRecord = + new AzkabanJobExecRecord(appId, jobName, flowExecId, (int) (startTime / 1000), (int) (endTime / 1000), + status, flowPath); + results.add(azkabanJobExecRecord); + } + } + return results; + } + public void close() throws SQLException { conn.close(); diff --git a/metadata-etl/src/main/java/metadata/etl/lineage/AzLineageExtractorMaster.java b/metadata-etl/src/main/java/metadata/etl/lineage/AzLineageExtractorMaster.java index 00bce1cff1..6e8303f867 100644 --- a/metadata-etl/src/main/java/metadata/etl/lineage/AzLineageExtractorMaster.java +++ b/metadata-etl/src/main/java/metadata/etl/lineage/AzLineageExtractorMaster.java @@ -60,17 +60,22 @@ public class AzLineageExtractorMaster { run(10); } + public void run(int timeFrame) + throws Exception { + run(timeFrame, System.currentTimeMillis()); + } + /** * Entry point. * All recent finished azkaban jobs' lineage. Will write to database stagging table * @param timeFrame in minutes * @throws Exception */ - public void run(int timeFrame) + public void run(int timeFrame, long endTimeStamp) throws Exception { // get recent finished job AzJobChecker azJobChecker = new AzJobChecker(prop); - List jobExecList = azJobChecker.getRecentFinishedJobFromFlow(timeFrame); + List jobExecList = azJobChecker.getRecentFinishedJobFromFlow(timeFrame, endTimeStamp); azJobChecker.close(); logger.info("Total number of azkaban jobs : {}", jobExecList.size()); diff --git a/metadata-etl/src/main/java/metadata/etl/lineage/AzLineageMetadataEtl.java b/metadata-etl/src/main/java/metadata/etl/lineage/AzLineageMetadataEtl.java index 17b9026067..ffbe8a8f77 100644 --- a/metadata-etl/src/main/java/metadata/etl/lineage/AzLineageMetadataEtl.java +++ b/metadata-etl/src/main/java/metadata/etl/lineage/AzLineageMetadataEtl.java @@ -27,7 +27,8 @@ import java.util.Properties; */ public class AzLineageMetadataEtl extends EtlJob { - public int timeFrame = -1; + public Integer timeFrame = null; + public Long endTimeStamp = null; Connection conn; /** @@ -53,6 +54,8 @@ public class AzLineageMetadataEtl extends EtlJob { public AzLineageMetadataEtl(int appId, long whExecId, Properties properties) { super(appId, null, whExecId, properties); this.timeFrame = Integer.valueOf(this.prop.getProperty(Constant.AZ_LINEAGE_ETL_LOOKBACK_MINS_KEY)); + if (this.prop.contains(Constant.AZ_LINEAGE_ETL_END_TIMESTAMP_KEY)) + this.endTimeStamp = Long.valueOf(this.prop.getProperty(Constant.AZ_LINEAGE_ETL_END_TIMESTAMP_KEY)); try { setUp(); } catch (SQLException e) { @@ -78,7 +81,11 @@ public class AzLineageMetadataEtl extends EtlJob { conn.createStatement().execute(emptyStaggingTable); AzLineageExtractorMaster azLineageExtractorMaster = new AzLineageExtractorMaster(prop); // get lineage - if (timeFrame > 0) { + if (timeFrame != null && endTimeStamp != null && endTimeStamp != 0) { + azLineageExtractorMaster.run(timeFrame, endTimeStamp); + } + + else if (timeFrame != null) { azLineageExtractorMaster.run(timeFrame); } else { azLineageExtractorMaster.run(10); @@ -110,8 +117,8 @@ public class AzLineageMetadataEtl extends EtlJob { conn.createStatement().execute(insertIntoFinalTable); logger.info("Azkaban lineage metadata ETL completed"); - if (prop.getProperty(Constant.APP_ID_KEY).equals("32")) { - logger.info("TEMPORARY load war's data into cmdb database"); + if (prop.getProperty(Constant.APP_ID_KEY).equals("32") || prop.getProperty(Constant.APP_ID_KEY).equals("31") ) { + logger.info("TEMPORARY load war & nertz's data into cmdb database"); loadIntoOthers(); } } diff --git a/metadata-etl/src/main/java/metadata/etl/lineage/HadoopNameNodeExtractor.java b/metadata-etl/src/main/java/metadata/etl/lineage/HadoopNameNodeExtractor.java index ec7237fab2..a33b7b173a 100644 --- a/metadata-etl/src/main/java/metadata/etl/lineage/HadoopNameNodeExtractor.java +++ b/metadata-etl/src/main/java/metadata/etl/lineage/HadoopNameNodeExtractor.java @@ -61,15 +61,15 @@ public class HadoopNameNodeExtractor { String WH_HOME = System.getenv("WH_HOME"); String USER_HOME = System.getenv("HOME") + "/.kerberos"; String ETC = "/etc"; - String TMP = "/tmp" + "/.kerberos"; + String TMP = "/var/tmp" + "/.kerberos"; - String[] allPositions = new String[]{CURRENT_DIR, WH_HOME, USER_HOME, TMP, ETC}; + String[] allPositions = new String[]{CURRENT_DIR, WH_HOME, USER_HOME, TMP}; for (String possition : allPositions) { String gssFileName = possition + "/gss-jaas.conf"; File gssFile = new File(gssFileName); if (gssFile.exists()) { - logger.debug("find gss-jaas.conf file in : {}", gssFile.getAbsolutePath()); + logger.info("find gss-jaas.conf file in : {}", gssFile.getAbsolutePath()); System.setProperty("java.security.auth.login.config", gssFile.getAbsolutePath()); break; } else { @@ -80,7 +80,7 @@ public class HadoopNameNodeExtractor { String krb5FileName = possition + "/krb5.conf"; File krb5File = new File(krb5FileName); if (krb5File.exists()) { - logger.debug("find krb5.conf file in : {}", krb5File.getAbsolutePath()); + logger.info("find krb5.conf file in : {}", krb5File.getAbsolutePath()); System.setProperty("java.security.krb5.conf", krb5File.getAbsolutePath()); break; } else { diff --git a/metadata-etl/src/main/resources/jython/HdfsLoad.py b/metadata-etl/src/main/resources/jython/HdfsLoad.py index 4ca3505566..8f755f24a6 100644 --- a/metadata-etl/src/main/resources/jython/HdfsLoad.py +++ b/metadata-etl/src/main/resources/jython/HdfsLoad.py @@ -168,15 +168,73 @@ class HdfsLoad: analyze table field_comments; + -- delete old record if it does not exist in this load batch anymore (but have the dataset id) + create temporary table if not exists t_deleted_fields (primary key (field_id)) + select x.field_id + from stg_dict_field_detail s + join dict_dataset i + on s.urn = i.urn + and s.db_id = {db_id} + right join dict_field_detail x + on i.id = x.dataset_id + and s.field_name = x.field_name + and s.parent_path = x.parent_path + where s.field_name is null + and x.dataset_id in ( + select d.id dataset_id + from stg_dict_field_detail k join dict_dataset d + on k.urn = d.urn + and k.db_id = {db_id} + ) + ; -- run time : ~2min + + delete from dict_field_detail where field_id in (select field_id from t_deleted_fields); + + -- update the old record if some thing changed + update dict_dict_field_detail t join + ( + select x.field_id, s.* + from stg_dict_field_detail s join dict_dataset d + on s.urn = d.urn + join dict_field_detail x + on s.field_name = x.field_name + and coalesce(s.parent_path, '*') = coalesce(x.parent_path, '*') + and d.id = x.dataset_id + where s.db_id = {db_id} + and (x.sort_id <> s.sort_id + or x.parent_sort_id <> s.parent_sort_id + or x.data_type <> s.data_type + or x.data_size <> s.data_size + or x.data_precision <> s.data_precision + or x.is_nullable <> s.is_nullable + or x.is_partitioned <> s.is_partitioned + or x.is_distributed <> s.is_distributed + or x.default_value <> s.default_value + or x.namespace <> s.namespace + ) + ) p + on t.field_id = p.field_id + set t.sort_id = p.sort_id, + t.data_type = p.data_type, + t.data_size = p.data_size, + t.data_precision = p.data_precision, + t.is_nullable = p.is_nullable, + t.is_partitioned = p.is_partitioned, + t.is_distributed = p.is_distributed + t.default_value = p.default_value + t.namespace = p.namespace + t.last_modified = now() + ; + insert into dict_field_detail ( dataset_id, fields_layout_id, sort_id, parent_sort_id, parent_path, field_name, namespace, data_type, data_size, is_nullable, default_value, - default_comment_id + default_comment_id, modified ) select d.id, 0, sf.sort_id, sf.parent_sort_id, sf.parent_path, sf.field_name, sf.namespace, sf.data_type, sf.data_size, sf.is_nullable, sf.default_value, - coalesce(fc.id, t.default_comment_id) fc_id + coalesce(fc.id, t.default_comment_id) fc_id, now() from stg_dict_field_detail sf join dict_dataset d on sf.urn = d.urn left join field_comments fc diff --git a/metadata-etl/src/main/resources/jython/TeradataLoad.py b/metadata-etl/src/main/resources/jython/TeradataLoad.py index e62cdf5f13..c8bc657dc7 100644 --- a/metadata-etl/src/main/resources/jython/TeradataLoad.py +++ b/metadata-etl/src/main/resources/jython/TeradataLoad.py @@ -129,29 +129,62 @@ class TeradataLoad: and (char_length(trim(description)) = 0 or description in ('null', 'N/A', 'nothing', 'empty', 'none')); - UPDATE dict_field_detail t, ( - select - f.field_id, - s.sort_id, - s.data_type, - s.data_size, - s.data_precision, - s.data_scale, - s.is_nullable - from stg_dict_field_detail s join dict_dataset d - on s.urn = d.urn - join dict_field_detail f - on d.id = f.dataset_id - and s.field_name = f.field_name - where s.db_id = {db_id} - ) x - set t.sort_id = x.sort_id - , t.data_type = x.data_type - , t.data_size = x.data_size - , t.data_precision = x.data_precision - , t.data_fraction = x.data_scale - , t.is_nullable = x.is_nullable - where t.field_id = x.field_id + -- delete old record if it does not exist in this load batch anymore (but have the dataset id) + create temporary table if not exists t_deleted_fields (primary key (field_id)) + select x.field_id + from stg_dict_field_detail s + join dict_dataset i + on s.urn = i.urn + and s.db_id = {db_id} + right join dict_field_detail x + on i.id = x.dataset_id + and s.field_name = x.field_name + and s.parent_path = x.parent_path + where s.field_name is null + and x.dataset_id in ( + select d.id dataset_id + from stg_dict_field_detail k join dict_dataset d + on k.urn = d.urn + and k.db_id = {db_id} + ) + ; -- run time : ~2min + + delete from dict_field_detail where field_id in (select field_id from t_deleted_fields); + + -- update the old record if some thing changed + update dict_dict_field_detail t join + ( + select x.field_id, s.* + from stg_dict_field_detail s join dict_dataset d + on s.urn = d.urn + join dict_field_detail x + on s.field_name = x.field_name + and coalesce(s.parent_path, '*') = coalesce(x.parent_path, '*') + and d.id = x.dataset_id + where s.db_id = {db_id} + and (x.sort_id <> s.sort_id + or x.parent_sort_id <> s.parent_sort_id + or x.data_type <> s.data_type + or x.data_size <> s.data_size + or x.data_precision <> s.data_precision + or x.is_nullable <> s.is_nullable + or x.is_partitioned <> s.is_partitioned + or x.is_distributed <> s.is_distributed + or x.default_value <> s.default_value + or x.namespace <> s.namespace + ) + ) p + on t.field_id = p.field_id + set t.sort_id = p.sort_id, + t.data_type = p.data_type, + t.data_size = p.data_size, + t.data_precision = p.data_precision, + t.is_nullable = p.is_nullable, + t.is_partitioned = p.is_partitioned, + t.is_distributed = p.is_distributed + t.default_value = p.default_value + t.namespace = p.namespace + t.last_modified = now() ; show warnings limit 10; diff --git a/metadata-etl/src/test/java/metadata/etl/lineage/AzJobCheckerTest.java b/metadata-etl/src/test/java/metadata/etl/lineage/AzJobCheckerTest.java index cb1d6f84a5..a4e10be911 100644 --- a/metadata-etl/src/test/java/metadata/etl/lineage/AzJobCheckerTest.java +++ b/metadata-etl/src/test/java/metadata/etl/lineage/AzJobCheckerTest.java @@ -13,7 +13,12 @@ */ package metadata.etl.lineage; +import java.io.File; import java.io.IOException; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.Properties; import org.testng.Assert; import org.testng.annotations.BeforeTest; @@ -108,6 +113,8 @@ public class AzJobCheckerTest { + " \"zsun\",\n" + " \"data_svc\"],\n" + " \"startTime\": 1442224810815,\n" + " \"status\": \"SUCCEEDED\",\n" + " \"submitTime\": 1442224810778,\n" + " \"submitUser\": \"zsun\",\n" + " \"type\": null,\n" + " \"updateTime\": 1442233069065,\n" + " \"version\": 301}"; + + AzJobChecker ajc; Properties prop; @@ -130,6 +137,18 @@ public class AzJobCheckerTest { Assert.assertNotNull(results); } + @Test(groups = {"needConfig"}) + public void getRecentFinishedJobFromFlowTest2() + throws SQLException, IOException { + List results = ajc.getRecentFinishedJobFromFlow(2, 1448916456L); + for (AzkabanJobExecRecord a : results) { + System.out.print(a.getFlowExecId() + "\t"); + System.out.print(a.getJobName() + "\t"); + System.out.println(a.getJobExecId()); + } + Assert.assertNotNull(results); + } + @Test(groups = {"needConfig"}) public void parseJsonTest() throws IOException { @@ -145,4 +164,27 @@ public class AzJobCheckerTest { Assert.assertEquals((long) aje.getJobExecId(), 11111 * 1000 + i); } } + + @Test(groups = {"needConfig"}) + public void parseNestedJsonTest() + throws IOException, URISyntaxException { + + URL url = Thread.currentThread().getContextClassLoader().getResource("nestedJson"); + byte[] encoded = Files.readAllBytes(Paths.get(url.getPath())); + String nestedJson = new String(encoded, "UTF-8"); + List result = ajc.parseJson(nestedJson, 11111); + for (int i = 0; i < result.size(); i++) { + AzkabanJobExecRecord aje = result.get(i); + System.out.println(aje.getJobExecId()); + System.out.println(aje.getJobName()); + System.out.println(aje.getStartTime()); + System.out.println(aje.getEndTime()); + System.out.println(aje.getFlowPath()); + System.out.println(); + Assert.assertEquals((long) aje.getJobExecId(), 11111 * 1000 + i); + } + } + + + } diff --git a/metadata-etl/src/test/java/metadata/etl/lineage/AzLineageExtractorTest.java b/metadata-etl/src/test/java/metadata/etl/lineage/AzLineageExtractorTest.java index b75cfc2658..55506f6478 100644 --- a/metadata-etl/src/test/java/metadata/etl/lineage/AzLineageExtractorTest.java +++ b/metadata-etl/src/test/java/metadata/etl/lineage/AzLineageExtractorTest.java @@ -44,7 +44,7 @@ public class AzLineageExtractorTest { String wherehowsPassWord = prop.getProperty(Constant.WH_DB_PASSWORD_KEY); connUrl = wherehowsUrl + "?" + "user=" + wherehowsUserName + "&password=" + wherehowsPassWord; this.conn = DriverManager.getConnection(connUrl); - AzLogParser.initialize(conn, Integer.valueOf(prop.getProperty(Constant.AZ_DEFAULT_HADOOP_DATABASE_ID_KEY))); + AzLogParser.initialize(conn); PathAnalyzer.initialize(conn); } diff --git a/metadata-etl/src/test/java/metadata/etl/lineage/AzLogParserTest.java b/metadata-etl/src/test/java/metadata/etl/lineage/AzLogParserTest.java index 1542d16fce..eb79d6da7b 100644 --- a/metadata-etl/src/test/java/metadata/etl/lineage/AzLogParserTest.java +++ b/metadata-etl/src/test/java/metadata/etl/lineage/AzLogParserTest.java @@ -45,7 +45,7 @@ public class AzLogParserTest { String wherehowsPassWord = prop.getProperty(Constant.WH_DB_PASSWORD_KEY); Connection conn = DriverManager.getConnection(wherehowsHost + "?" + "user=" + wherehowsUserName + "&password=" + wherehowsPassWord); - AzLogParser.initialize(conn, -1); + AzLogParser.initialize(conn); } @Test(groups = {"needConfig"}) @@ -81,7 +81,7 @@ public class AzLogParserTest { String logSample = "asfdasdfsadf Moving from staged path[asdf] to final resting place[/tm/b/c] sdaf dsfasdfasdf"; AzkabanJobExecRecord sampleExecution = new AzkabanJobExecRecord(-1, "someJobName", (long) 0, 0, 0, "S", "path"); sampleExecution.setJobExecId((long) 11111); - List result = AzLogParser.getLineageFromLog(logSample, sampleExecution); + List result = AzLogParser.getLineageFromLog(logSample, sampleExecution, -1); System.out.println(result.get(0).toDatabaseValue()); Assert.assertEquals(result.get(0).toDatabaseValue(), @@ -105,7 +105,7 @@ public class AzLogParserTest { + "17-11-2015 01:32:27 PST endorsements_push-lva-endorsements-member-restrictions INFO - INFO tcp://lva1-voldemort-read-only-2-vip.prod.linkedin.com:10103 : Invoking fetch for Node lva1-app0610.prod.linkedin.com [id 0] for webhdfs://lva1-warnn01.grid.linkedin.com:50070/jobs/endorse/endorsements/master/tmp/endorsements-member-restrictions.store/lva1-voldemort-read-only-2-vip.prod.linkedin.com/node-0\n" + "17-11-2015 01:32:27 PST endorsements_push-lva-endorsements-member-restrictions INFO - INFO tcp://lva1-voldemort-rea"; AzkabanJobExecRecord sampleExecution = new AzkabanJobExecRecord(-1, "someJobName", (long) 0, 0, 0, "S", "path"); - List result = AzLogParser.getLineageFromLog(logSample, sampleExecution); + List result = AzLogParser.getLineageFromLog(logSample, sampleExecution, -1); System.out.println(result.get(0).toDatabaseValue()); Assert.assertEquals(result.get(0).getFullObjectName(), "tcp://lva1-voldemort-read-only-2-vip.prod.linkedin.com:10103/endorsements-member-restrictions"); diff --git a/wherehows-common/src/main/java/wherehows/common/Constant.java b/wherehows-common/src/main/java/wherehows/common/Constant.java index fd79676af0..3a09fa3291 100644 --- a/wherehows-common/src/main/java/wherehows/common/Constant.java +++ b/wherehows-common/src/main/java/wherehows/common/Constant.java @@ -38,6 +38,7 @@ public class Constant { public static final String AZ_DEFAULT_HADOOP_DATABASE_ID_KEY = "az.default.hadoop.database.id"; public static final String AZ_LINEAGE_ETL_LOOKBACK_MINS_KEY = "az.lineage_etl.lookback_period.in.minutes"; public static final String LINEAGE_ACTOR_TIMEOUT_KEY = "az.lineage.actor.timeout"; + public static final String AZ_LINEAGE_ETL_END_TIMESTAMP_KEY = "az.lineage_etl.end_timestamp"; public static final String AZ_SERVICE_URL_KEY = "az.server.url"; public static final String AZ_SERVICE_USERNAME_KEY = "az.server.username"; @@ -80,4 +81,15 @@ public class Constant { // ui public static final String DATASET_TREE_FILE_NAME_KEY = "wherehows.ui.tree.dataset.file"; public static final String FLOW_TREE_FILE_NAME_KEY = "wherehows.ui.tree.flow.file"; + + // hive + public static final String HIVE_METASTORE_JDBC_DRIVER = "hive.metastore.jdbc.driver"; + public static final String HIVE_METASTORE_JDBC_URL = "hive.metastore.jdbc.url"; + public static final String HIVE_METASTORE_USERNAME = "hive.metstore.username"; + public static final String HIVE_METASTORE_PASSWORD = "hive.metastore.password"; + + public static final String HIVE_SCHEMA_JSON_FILE_KEY = "hive.schema_json_file"; + // public static final String HIVE_SAMPLE_CSV_FILE_KEY = "hive.sample_csv"; + public static final String HIVE_SCHEMA_CSV_FILE_KEY = "hive.schema_csv_file"; + public static final String HIVE_FIELD_METADATA_KEY = "hive.field_metadata"; } diff --git a/wherehows-common/src/main/java/wherehows/common/schemas/DatasetFieldRecord.java b/wherehows-common/src/main/java/wherehows/common/schemas/DatasetFieldRecord.java index 584bbc3f1b..650f137eeb 100644 --- a/wherehows-common/src/main/java/wherehows/common/schemas/DatasetFieldRecord.java +++ b/wherehows-common/src/main/java/wherehows/common/schemas/DatasetFieldRecord.java @@ -29,8 +29,11 @@ public class DatasetFieldRecord implements Record { Integer parentSortId; String parentPath; String fieldName; + String fieldLabel; String dataType; String isNullable; + String isIndexed; + String isPartitioned; String defaultValue; Integer dataSize; String namespace; @@ -39,18 +42,21 @@ public class DatasetFieldRecord implements Record { List allFields; char SEPR = 0x001A; - public DatasetFieldRecord(String urn, Integer sortId, Integer parentSortId, String parentPath, String fieldName, - String dataType, String isNullable, String defaultValue, Integer dataSize, String namespace, String description) { + public DatasetFieldRecord(String urn, Integer sortId, Integer parentSortId, String parentPath, String fieldName, String fieldLabel, + String dataType, String isNullable, String isIndexed, String isPartitioned, String defaultValue, Integer dataSize, String namespace, String description) { this.urn = urn; this.sortId = sortId; this.parentSortId = parentSortId; this.parentPath = parentPath; this.fieldName = fieldName; + this.fieldLabel = fieldLabel; this.dataType = dataType; - this.isNullable = isNullable; - this.defaultValue = defaultValue; this.dataSize = dataSize; + this.isNullable = isNullable; + this.isIndexed = isIndexed; + this.isPartitioned = isPartitioned; + this.defaultValue = defaultValue; this.namespace = namespace; this.description = description; @@ -61,9 +67,11 @@ public class DatasetFieldRecord implements Record { this.allFields.add(parentPath); this.allFields.add(fieldName); this.allFields.add(dataType); - this.allFields.add(isNullable); - this.allFields.add(defaultValue); this.allFields.add(dataSize); + this.allFields.add(isNullable); + this.allFields.add(isIndexed); + this.allFields.add(isPartitioned); + this.allFields.add(defaultValue); this.allFields.add(namespace); this.allFields.add(description); }