diff --git a/data-model/DDL/ETL_DDL/executor_metadata.sql b/data-model/DDL/ETL_DDL/executor_metadata.sql index 4a48cfdcbe..e4c6673ffd 100644 --- a/data-model/DDL/ETL_DDL/executor_metadata.sql +++ b/data-model/DDL/ETL_DDL/executor_metadata.sql @@ -94,6 +94,7 @@ CREATE TABLE flow_job ( job_path VARCHAR(1024) COMMENT 'job path from top level', job_type_id SMALLINT COMMENT 'type id of the job', job_type VARCHAR(63) COMMENT 'type of the job', + ref_flow_id INT UNSIGNED DEFAULT NULL COMMENT 'the reference flow id of the job if the job is a subflow', pre_jobs VARCHAR(4096) COMMENT 'comma separated job ids that run before this job', post_jobs VARCHAR(4096) COMMENT 'comma separated job ids that run after this job', is_current CHAR(1) COMMENT 'determine if it is a current job', @@ -104,6 +105,7 @@ CREATE TABLE flow_job ( wh_etl_exec_id BIGINT COMMENT 'wherehows etl execution id that create this record', PRIMARY KEY (app_id, job_id, dag_version), INDEX flow_id_idx (app_id, flow_id), + INDEX ref_flow_id_idx (app_id, ref_flow_id), INDEX job_path_idx (app_id, job_path(255)) ) ENGINE = InnoDB @@ -122,6 +124,8 @@ CREATE TABLE stg_flow_job ( job_path VARCHAR(1024) COMMENT 'job path from top level', job_type_id SMALLINT COMMENT 'type id of the job', job_type VARCHAR(63) COMMENT 'type of the job', + ref_flow_id INT UNSIGNED DEFAULT NULL COMMENT 'the reference flow id of the job if the job is a subflow', + ref_flow_path VARCHAR(1024) COMMENT 'the reference flow path of the job if the job is a subflow', pre_jobs VARCHAR(4096) COMMENT 'comma separated job ids that run before this job', post_jobs VARCHAR(4096) COMMENT 'comma separated job ids that run after this job', is_current CHAR(1) COMMENT 'determine if it is a current job', @@ -131,6 +135,7 @@ CREATE TABLE stg_flow_job ( INDEX (app_id, job_id, dag_version), INDEX flow_id_idx (app_id, flow_id), INDEX flow_path_idx (app_id, flow_path(255)), + INDEX ref_flow_path_idx (app_id, ref_flow_path(255)), INDEX job_path_idx (app_id, job_path(255)), INDEX job_type_idx (job_type) ) diff --git a/metadata-etl/src/main/resources/jython/AzkabanExtract.py b/metadata-etl/src/main/resources/jython/AzkabanExtract.py index 3fa901ff6b..20fd00d289 100644 --- a/metadata-etl/src/main/resources/jython/AzkabanExtract.py +++ b/metadata-etl/src/main/resources/jython/AzkabanExtract.py @@ -118,6 +118,8 @@ class AzkabanExtract: node['jobType'], 'Y', self.wh_exec_id) + if node['jobType'] == 'flow': + job_record.setRefFlowPath(row['project_name'] + ":" + node['embeddedFlowId']) job_writer.append(job_record) # job dag diff --git a/metadata-etl/src/main/resources/jython/AzkabanTransform.py b/metadata-etl/src/main/resources/jython/AzkabanTransform.py index 49efd9e9a9..d7e7def2f6 100644 --- a/metadata-etl/src/main/resources/jython/AzkabanTransform.py +++ b/metadata-etl/src/main/resources/jython/AzkabanTransform.py @@ -22,7 +22,7 @@ import sys class AzkabanTransform(SchedulerTransform): SchedulerTransform._tables["flows"]["columns"] = "app_id, flow_name, flow_group, flow_path, flow_level, source_modified_time, source_version, is_active, wh_etl_exec_id" - SchedulerTransform._tables["jobs"]["columns"] = "app_id, flow_path, source_version, job_name, job_path, job_type, is_current, wh_etl_exec_id" + SchedulerTransform._tables["jobs"]["columns"] = "app_id, flow_path, source_version, job_name, job_path, job_type, ref_flow_path, is_current, wh_etl_exec_id" SchedulerTransform._tables["owners"]["columns"] = "app_id, flow_path, owner_id, permissions, owner_type, wh_etl_exec_id" SchedulerTransform._tables["flow_execs"]["columns"] = "app_id, flow_name, flow_path, source_version, flow_exec_id, flow_exec_status, attempt_id, executed_by, start_time, end_time, wh_etl_exec_id" SchedulerTransform._tables["job_execs"]["columns"] = "app_id, flow_path, source_version, flow_exec_id, job_name, job_path, job_exec_id, job_exec_status, attempt_id, start_time, end_time, wh_etl_exec_id" diff --git a/metadata-etl/src/main/resources/jython/SchedulerLoad.py b/metadata-etl/src/main/resources/jython/SchedulerLoad.py index 9b7dea8476..18b7d12892 100644 --- a/metadata-etl/src/main/resources/jython/SchedulerLoad.py +++ b/metadata-etl/src/main/resources/jython/SchedulerLoad.py @@ -81,9 +81,9 @@ class SchedulerLoad: self.wh_con.commit() cmd = """ - INSERT INTO flow_job (app_id, flow_id, first_source_version, dag_version, job_id, job_name, job_path, job_type_id, job_type, pre_jobs, post_jobs, + INSERT INTO flow_job (app_id, flow_id, first_source_version, dag_version, job_id, job_name, job_path, job_type_id, job_type, ref_flow_id, pre_jobs, post_jobs, is_current, is_first, is_last, created_time, modified_time, wh_etl_exec_id) - SELECT app_id, flow_id, source_version first_source_version, dag_version, job_id, job_name, job_path, job_type_id, job_type, pre_jobs, post_jobs, + SELECT app_id, flow_id, source_version first_source_version, dag_version, job_id, job_name, job_path, job_type_id, job_type, ref_flow_id, pre_jobs, post_jobs, 'Y', is_first, is_last, unix_timestamp(NOW()) created_time, NULL, wh_etl_exec_id FROM stg_flow_job s WHERE s.app_id = {app_id} @@ -94,6 +94,7 @@ class SchedulerLoad: job_path = s.job_path, job_type_id = s.job_type_id, job_type = s.job_type, + ref_flow_id = s.ref_flow_id, pre_jobs = s.pre_jobs, post_jobs = s.post_jobs, is_current = 'Y', diff --git a/metadata-etl/src/main/resources/jython/SchedulerTransform.py b/metadata-etl/src/main/resources/jython/SchedulerTransform.py index 09dfdf7971..c89fd7667b 100644 --- a/metadata-etl/src/main/resources/jython/SchedulerTransform.py +++ b/metadata-etl/src/main/resources/jython/SchedulerTransform.py @@ -149,6 +149,27 @@ class SchedulerTransform: self.wh_cursor.execute(query) self.wh_con.commit() + # ad hoc fix for null values, need better solution by changing the load script + query = """ + UPDATE {table} stg + SET stg.ref_flow_path = null + WHERE stg.ref_flow_path = 'null' and stg.app_id = {app_id} + """.format(table=t.get("table"), app_id=self.app_id) + print query + self.wh_cursor.execute(query) + self.wh_con.commit() + + # Update sub flow id from mapping table + query = """ + UPDATE {table} stg + JOIN flow_source_id_map fm + ON stg.app_id = fm.app_id AND stg.ref_flow_path = fm.source_id_string + SET stg.ref_flow_id = fm.flow_id WHERE stg.app_id = {app_id} + """.format(table=t.get("table"), app_id=self.app_id) + print query + self.wh_cursor.execute(query) + self.wh_con.commit() + # Insert new job into job map to generate job id query = """ INSERT INTO job_source_id_map (app_id, source_id_string) diff --git a/wherehows-common/src/main/java/wherehows/common/schemas/AzkabanJobRecord.java b/wherehows-common/src/main/java/wherehows/common/schemas/AzkabanJobRecord.java index 894670db91..254ef9debb 100644 --- a/wherehows-common/src/main/java/wherehows/common/schemas/AzkabanJobRecord.java +++ b/wherehows-common/src/main/java/wherehows/common/schemas/AzkabanJobRecord.java @@ -27,6 +27,7 @@ public class AzkabanJobRecord extends AbstractRecord { String jobName; String jobPath; String jobType; + String refFlowPath; Character isCurrent; Long whExecId; @@ -51,9 +52,81 @@ public class AzkabanJobRecord extends AbstractRecord { allFields.add(jobName); allFields.add(jobPath); allFields.add(jobType); + allFields.add(refFlowPath); allFields.add(isCurrent); allFields.add(whExecId); return allFields; } + public Integer getAppId() { + return appId; + } + + public void setAppId(Integer appId) { + this.appId = appId; + } + + public String getFlowPath() { + return flowPath; + } + + public void setFlowPath(String flowPath) { + this.flowPath = flowPath; + } + + public Integer getSourceVersion() { + return sourceVersion; + } + + public void setSourceVersion(Integer sourceVersion) { + this.sourceVersion = sourceVersion; + } + + public String getJobName() { + return jobName; + } + + public void setJobName(String jobName) { + this.jobName = jobName; + } + + public String getJobPath() { + return jobPath; + } + + public void setJobPath(String jobPath) { + this.jobPath = jobPath; + } + + public String getJobType() { + return jobType; + } + + public void setJobType(String jobType) { + this.jobType = jobType; + } + + public String getRefFlowPath() { + return refFlowPath; + } + + public void setRefFlowPath(String refFlowPath) { + this.refFlowPath = refFlowPath; + } + + public Character getIsCurrent() { + return isCurrent; + } + + public void setIsCurrent(Character isCurrent) { + this.isCurrent = isCurrent; + } + + public Long getWhExecId() { + return whExecId; + } + + public void setWhExecId(Long whExecId) { + this.whExecId = whExecId; + } } diff --git a/wherehows-common/src/main/java/wherehows/common/utils/StringUtil.java b/wherehows-common/src/main/java/wherehows/common/utils/StringUtil.java index 13d90b1bfe..adbaade3ae 100644 --- a/wherehows-common/src/main/java/wherehows/common/utils/StringUtil.java +++ b/wherehows-common/src/main/java/wherehows/common/utils/StringUtil.java @@ -20,12 +20,20 @@ public class StringUtil { public static String toDbString(Object object) { if (object != null) { - return "'" + object.toString().replace("\'", "\\\'").replace("\"", "\\\"") + "'"; + return "'" + object.toString().replace("\\", "\\\\").replace("\'", "\\\'").replace("\"", "\\\"") + "'"; } else { return "null"; } } + public static String toCsvString(Object object) { + if (object != null) { + return "\"" + object.toString().replace("\\", "\\\\").replace("\'", "\\\'").replace("\"", "\\\"") + "\""; + } else { + return "\\N"; + } + } + public static String replace(String s, String target, Object replacement) { if (replacement != null) { return s.replace(target, "'" + replacement.toString().replace("\'", "\\\'").replace("\"", "\\\"") + "'");