add ref flow id for sub flow jobs

This commit is contained in:
Zhen Chen 2015-12-17 16:26:15 -08:00
parent d6f77a6f06
commit 7fca60a527
7 changed files with 114 additions and 4 deletions

View File

@ -94,6 +94,7 @@ CREATE TABLE flow_job (
job_path VARCHAR(1024) COMMENT 'job path from top level',
job_type_id SMALLINT COMMENT 'type id of the job',
job_type VARCHAR(63) COMMENT 'type of the job',
ref_flow_id INT UNSIGNED DEFAULT NULL COMMENT 'the reference flow id of the job if the job is a subflow',
pre_jobs VARCHAR(4096) COMMENT 'comma separated job ids that run before this job',
post_jobs VARCHAR(4096) COMMENT 'comma separated job ids that run after this job',
is_current CHAR(1) COMMENT 'determine if it is a current job',
@ -104,6 +105,7 @@ CREATE TABLE flow_job (
wh_etl_exec_id BIGINT COMMENT 'wherehows etl execution id that create this record',
PRIMARY KEY (app_id, job_id, dag_version),
INDEX flow_id_idx (app_id, flow_id),
INDEX ref_flow_id_idx (app_id, ref_flow_id),
INDEX job_path_idx (app_id, job_path(255))
)
ENGINE = InnoDB
@ -122,6 +124,8 @@ CREATE TABLE stg_flow_job (
job_path VARCHAR(1024) COMMENT 'job path from top level',
job_type_id SMALLINT COMMENT 'type id of the job',
job_type VARCHAR(63) COMMENT 'type of the job',
ref_flow_id INT UNSIGNED DEFAULT NULL COMMENT 'the reference flow id of the job if the job is a subflow',
ref_flow_path VARCHAR(1024) COMMENT 'the reference flow path of the job if the job is a subflow',
pre_jobs VARCHAR(4096) COMMENT 'comma separated job ids that run before this job',
post_jobs VARCHAR(4096) COMMENT 'comma separated job ids that run after this job',
is_current CHAR(1) COMMENT 'determine if it is a current job',
@ -131,6 +135,7 @@ CREATE TABLE stg_flow_job (
INDEX (app_id, job_id, dag_version),
INDEX flow_id_idx (app_id, flow_id),
INDEX flow_path_idx (app_id, flow_path(255)),
INDEX ref_flow_path_idx (app_id, ref_flow_path(255)),
INDEX job_path_idx (app_id, job_path(255)),
INDEX job_type_idx (job_type)
)

View File

@ -118,6 +118,8 @@ class AzkabanExtract:
node['jobType'],
'Y',
self.wh_exec_id)
if node['jobType'] == 'flow':
job_record.setRefFlowPath(row['project_name'] + ":" + node['embeddedFlowId'])
job_writer.append(job_record)
# job dag

View File

@ -22,7 +22,7 @@ import sys
class AzkabanTransform(SchedulerTransform):
SchedulerTransform._tables["flows"]["columns"] = "app_id, flow_name, flow_group, flow_path, flow_level, source_modified_time, source_version, is_active, wh_etl_exec_id"
SchedulerTransform._tables["jobs"]["columns"] = "app_id, flow_path, source_version, job_name, job_path, job_type, is_current, wh_etl_exec_id"
SchedulerTransform._tables["jobs"]["columns"] = "app_id, flow_path, source_version, job_name, job_path, job_type, ref_flow_path, is_current, wh_etl_exec_id"
SchedulerTransform._tables["owners"]["columns"] = "app_id, flow_path, owner_id, permissions, owner_type, wh_etl_exec_id"
SchedulerTransform._tables["flow_execs"]["columns"] = "app_id, flow_name, flow_path, source_version, flow_exec_id, flow_exec_status, attempt_id, executed_by, start_time, end_time, wh_etl_exec_id"
SchedulerTransform._tables["job_execs"]["columns"] = "app_id, flow_path, source_version, flow_exec_id, job_name, job_path, job_exec_id, job_exec_status, attempt_id, start_time, end_time, wh_etl_exec_id"

View File

@ -81,9 +81,9 @@ class SchedulerLoad:
self.wh_con.commit()
cmd = """
INSERT INTO flow_job (app_id, flow_id, first_source_version, dag_version, job_id, job_name, job_path, job_type_id, job_type, pre_jobs, post_jobs,
INSERT INTO flow_job (app_id, flow_id, first_source_version, dag_version, job_id, job_name, job_path, job_type_id, job_type, ref_flow_id, pre_jobs, post_jobs,
is_current, is_first, is_last, created_time, modified_time, wh_etl_exec_id)
SELECT app_id, flow_id, source_version first_source_version, dag_version, job_id, job_name, job_path, job_type_id, job_type, pre_jobs, post_jobs,
SELECT app_id, flow_id, source_version first_source_version, dag_version, job_id, job_name, job_path, job_type_id, job_type, ref_flow_id, pre_jobs, post_jobs,
'Y', is_first, is_last, unix_timestamp(NOW()) created_time, NULL, wh_etl_exec_id
FROM stg_flow_job s
WHERE s.app_id = {app_id}
@ -94,6 +94,7 @@ class SchedulerLoad:
job_path = s.job_path,
job_type_id = s.job_type_id,
job_type = s.job_type,
ref_flow_id = s.ref_flow_id,
pre_jobs = s.pre_jobs,
post_jobs = s.post_jobs,
is_current = 'Y',

View File

@ -149,6 +149,27 @@ class SchedulerTransform:
self.wh_cursor.execute(query)
self.wh_con.commit()
# ad hoc fix for null values, need better solution by changing the load script
query = """
UPDATE {table} stg
SET stg.ref_flow_path = null
WHERE stg.ref_flow_path = 'null' and stg.app_id = {app_id}
""".format(table=t.get("table"), app_id=self.app_id)
print query
self.wh_cursor.execute(query)
self.wh_con.commit()
# Update sub flow id from mapping table
query = """
UPDATE {table} stg
JOIN flow_source_id_map fm
ON stg.app_id = fm.app_id AND stg.ref_flow_path = fm.source_id_string
SET stg.ref_flow_id = fm.flow_id WHERE stg.app_id = {app_id}
""".format(table=t.get("table"), app_id=self.app_id)
print query
self.wh_cursor.execute(query)
self.wh_con.commit()
# Insert new job into job map to generate job id
query = """
INSERT INTO job_source_id_map (app_id, source_id_string)

View File

@ -27,6 +27,7 @@ public class AzkabanJobRecord extends AbstractRecord {
String jobName;
String jobPath;
String jobType;
String refFlowPath;
Character isCurrent;
Long whExecId;
@ -51,9 +52,81 @@ public class AzkabanJobRecord extends AbstractRecord {
allFields.add(jobName);
allFields.add(jobPath);
allFields.add(jobType);
allFields.add(refFlowPath);
allFields.add(isCurrent);
allFields.add(whExecId);
return allFields;
}
public Integer getAppId() {
return appId;
}
public void setAppId(Integer appId) {
this.appId = appId;
}
public String getFlowPath() {
return flowPath;
}
public void setFlowPath(String flowPath) {
this.flowPath = flowPath;
}
public Integer getSourceVersion() {
return sourceVersion;
}
public void setSourceVersion(Integer sourceVersion) {
this.sourceVersion = sourceVersion;
}
public String getJobName() {
return jobName;
}
public void setJobName(String jobName) {
this.jobName = jobName;
}
public String getJobPath() {
return jobPath;
}
public void setJobPath(String jobPath) {
this.jobPath = jobPath;
}
public String getJobType() {
return jobType;
}
public void setJobType(String jobType) {
this.jobType = jobType;
}
public String getRefFlowPath() {
return refFlowPath;
}
public void setRefFlowPath(String refFlowPath) {
this.refFlowPath = refFlowPath;
}
public Character getIsCurrent() {
return isCurrent;
}
public void setIsCurrent(Character isCurrent) {
this.isCurrent = isCurrent;
}
public Long getWhExecId() {
return whExecId;
}
public void setWhExecId(Long whExecId) {
this.whExecId = whExecId;
}
}

View File

@ -20,12 +20,20 @@ public class StringUtil {
public static String toDbString(Object object) {
if (object != null) {
return "'" + object.toString().replace("\'", "\\\'").replace("\"", "\\\"") + "'";
return "'" + object.toString().replace("\\", "\\\\").replace("\'", "\\\'").replace("\"", "\\\"") + "'";
} else {
return "null";
}
}
public static String toCsvString(Object object) {
if (object != null) {
return "\"" + object.toString().replace("\\", "\\\\").replace("\'", "\\\'").replace("\"", "\\\"") + "\"";
} else {
return "\\N";
}
}
public static String replace(String s, String target, Object replacement) {
if (replacement != null) {
return s.replace(target, "'" + replacement.toString().replace("\'", "\\\'").replace("\"", "\\\"") + "'");