Update Azkaban_Execution job to fetch cronExpression in flow scheduling

This commit is contained in:
Yi Wang 2016-10-06 13:35:06 -07:00
commit c9f4f18d9c
7 changed files with 28 additions and 16 deletions

View File

@ -95,8 +95,8 @@ CREATE TABLE flow_job (
job_type_id SMALLINT COMMENT 'type id of the job', job_type_id SMALLINT COMMENT 'type id of the job',
job_type VARCHAR(63) COMMENT 'type of the job', job_type VARCHAR(63) COMMENT 'type of the job',
ref_flow_id INT UNSIGNED NULL COMMENT 'the reference flow id of the job if the job is a subflow', ref_flow_id INT UNSIGNED NULL COMMENT 'the reference flow id of the job if the job is a subflow',
pre_jobs VARCHAR(4096) COMMENT 'comma separated job ids that run before this job', pre_jobs VARCHAR(20000) CHAR SET latin1 COMMENT 'comma separated job ids that run before this job',
post_jobs VARCHAR(4096) COMMENT 'comma separated job ids that run after this job', post_jobs VARCHAR(20000) CHAR SET latin1 COMMENT 'comma separated job ids that run after this job',
is_current CHAR(1) COMMENT 'determine if it is a current job', is_current CHAR(1) COMMENT 'determine if it is a current job',
is_first CHAR(1) COMMENT 'determine if it is the first job', is_first CHAR(1) COMMENT 'determine if it is the first job',
is_last CHAR(1) COMMENT 'determine if it is the last job', is_last CHAR(1) COMMENT 'determine if it is the last job',
@ -126,8 +126,8 @@ CREATE TABLE stg_flow_job (
job_type VARCHAR(63) COMMENT 'type of the job', job_type VARCHAR(63) COMMENT 'type of the job',
ref_flow_id INT UNSIGNED NULL COMMENT 'the reference flow id of the job if the job is a subflow', ref_flow_id INT UNSIGNED NULL COMMENT 'the reference flow id of the job if the job is a subflow',
ref_flow_path VARCHAR(1024) COMMENT 'the reference flow path of the job if the job is a subflow', ref_flow_path VARCHAR(1024) COMMENT 'the reference flow path of the job if the job is a subflow',
pre_jobs VARCHAR(4096) COMMENT 'comma separated job ids that run before this job', pre_jobs VARCHAR(20000) CHAR SET latin1 COMMENT 'comma separated job ids that run before this job',
post_jobs VARCHAR(4096) COMMENT 'comma separated job ids that run after this job', post_jobs VARCHAR(20000) CHAR SET latin1 COMMENT 'comma separated job ids that run after this job',
is_current CHAR(1) COMMENT 'determine if it is a current job', is_current CHAR(1) COMMENT 'determine if it is a current job',
is_first CHAR(1) COMMENT 'determine if it is the first job', is_first CHAR(1) COMMENT 'determine if it is the first job',
is_last CHAR(1) COMMENT 'determine if it is the last job', is_last CHAR(1) COMMENT 'determine if it is the last job',
@ -366,6 +366,7 @@ CREATE TABLE flow_schedule (
COMMENT 'flow id', COMMENT 'flow id',
unit VARCHAR(31) COMMENT 'unit of time', unit VARCHAR(31) COMMENT 'unit of time',
frequency INT COMMENT 'frequency of the unit', frequency INT COMMENT 'frequency of the unit',
cron_expression VARCHAR(127) COMMENT 'cron expression',
is_active CHAR(1) COMMENT 'determine if it is an active schedule', is_active CHAR(1) COMMENT 'determine if it is an active schedule',
included_instances VARCHAR(127) COMMENT 'included instance', included_instances VARCHAR(127) COMMENT 'included instance',
excluded_instances VARCHAR(127) COMMENT 'excluded instance', excluded_instances VARCHAR(127) COMMENT 'excluded instance',
@ -389,6 +390,7 @@ CREATE TABLE stg_flow_schedule (
flow_path VARCHAR(1024) COMMENT 'flow path from top level', flow_path VARCHAR(1024) COMMENT 'flow path from top level',
unit VARCHAR(31) COMMENT 'unit of time', unit VARCHAR(31) COMMENT 'unit of time',
frequency INT COMMENT 'frequency of the unit', frequency INT COMMENT 'frequency of the unit',
cron_expression VARCHAR(127) COMMENT 'cron expression',
included_instances VARCHAR(127) COMMENT 'included instance', included_instances VARCHAR(127) COMMENT 'included instance',
excluded_instances VARCHAR(127) COMMENT 'excluded instance', excluded_instances VARCHAR(127) COMMENT 'excluded instance',
effective_start_time INT UNSIGNED COMMENT 'effective start time of the flow execution', effective_start_time INT UNSIGNED COMMENT 'effective start time of the flow execution',
@ -438,7 +440,7 @@ CREATE TABLE stg_flow_owner_permission (
DEFAULT CHARSET = utf8 DEFAULT CHARSET = utf8
COMMENT = 'Scheduler owner table' PARTITION BY HASH (app_id) PARTITIONS 8; COMMENT = 'Scheduler owner table' PARTITION BY HASH (app_id) PARTITIONS 8;
CREATE TABLE job_execution_ext_reference ( CREATE TABLE job_execution_ext_reference (
app_id smallint(5) UNSIGNED COMMENT 'application id of the flow' NOT NULL, app_id smallint(5) UNSIGNED COMMENT 'application id of the flow' NOT NULL,
job_exec_id bigint(20) UNSIGNED COMMENT 'job execution id either inherit or generated' NOT NULL, job_exec_id bigint(20) UNSIGNED COMMENT 'job execution id either inherit or generated' NOT NULL,
attempt_id smallint(6) COMMENT 'job execution attempt id' DEFAULT '0', attempt_id smallint(6) COMMENT 'job execution attempt id' DEFAULT '0',
@ -463,11 +465,11 @@ PARTITION BY HASH(app_id)
PARTITION p7) PARTITION p7)
; ;
CREATE INDEX idx_job_execution_ext_ref__ext_ref_id USING BTREE CREATE INDEX idx_job_execution_ext_ref__ext_ref_id USING BTREE
ON job_execution_ext_reference(ext_ref_id); ON job_execution_ext_reference(ext_ref_id);
CREATE TABLE stg_job_execution_ext_reference ( CREATE TABLE stg_job_execution_ext_reference (
app_id smallint(5) UNSIGNED COMMENT 'application id of the flow' NOT NULL, app_id smallint(5) UNSIGNED COMMENT 'application id of the flow' NOT NULL,
job_exec_id bigint(20) UNSIGNED COMMENT 'job execution id either inherit or generated' NOT NULL, job_exec_id bigint(20) UNSIGNED COMMENT 'job execution id either inherit or generated' NOT NULL,
attempt_id smallint(6) COMMENT 'job execution attempt id' DEFAULT '0', attempt_id smallint(6) COMMENT 'job execution attempt id' DEFAULT '0',

View File

@ -224,13 +224,18 @@ class AzkabanExtract:
# print json.dumps(row[json_column], indent=4) # print json.dumps(row[json_column], indent=4)
if row[json_column]["triggerCondition"]["checkers"][0]["checkerJson"]["isRecurring"] == 'true': if row[json_column]["triggerCondition"]["checkers"][0]["checkerJson"]["isRecurring"] == 'true':
unit = row[json_column]["triggerCondition"]["checkers"][0]["checkerJson"]["period"][-1:] unit, frequency, cron_expr = None, None, None
unit = self._period_unit_table[unit] period = row[json_column]["triggerCondition"]["checkers"][0]["checkerJson"]["period"]
frequency = int(row[json_column]["triggerCondition"]["checkers"][0]["checkerJson"]["period"][:-1]) if period is not None and period != "null" and period[-1:] in self._period_unit_table:
unit = self._period_unit_table[period[-1:]]
frequency = int(row[json_column]["triggerCondition"]["checkers"][0]["checkerJson"]["period"][:-1])
if "cronExpression" in row[json_column]["triggerCondition"]["checkers"][0]["checkerJson"]:
cron_expr = row[json_column]["triggerCondition"]["checkers"][0]["checkerJson"]["cronExpression"]
schedule_record = AzkabanFlowScheduleRecord(self.app_id, schedule_record = AzkabanFlowScheduleRecord(self.app_id,
row[json_column]["actions"][0]["actionJson"]["projectName"] + ':' + row[json_column]["actions"][0]["actionJson"]["flowName"], row[json_column]["actions"][0]["actionJson"]["projectName"] + ':' + row[json_column]["actions"][0]["actionJson"]["flowName"],
unit, unit,
frequency, frequency,
cron_expr,
long(row[json_column]["triggerCondition"]["checkers"][0]["checkerJson"]["firstCheckTime"]) / 1000, long(row[json_column]["triggerCondition"]["checkers"][0]["checkerJson"]["firstCheckTime"]) / 1000,
int(time.mktime(datetime.date(2099,12,31).timetuple())), int(time.mktime(datetime.date(2099,12,31).timetuple())),
'0', '0',

View File

@ -163,6 +163,7 @@ class OozieExtract:
row['app_path'], row['app_path'],
row['time_unit'], row['time_unit'],
int(row['frequency']), int(row['frequency']),
None,
row['start_time'], row['start_time'],
row['end_time'], row['end_time'],
row['ref_id'], row['ref_id'],

View File

@ -136,15 +136,16 @@ class SchedulerLoad:
self.wh_con.commit() self.wh_con.commit()
cmd = """ cmd = """
INSERT INTO flow_schedule (app_id, flow_id, unit, frequency, included_instances, excluded_instances, effective_start_time, effective_end_time, is_active, ref_id, INSERT INTO flow_schedule (app_id, flow_id, unit, frequency, cron_expression, included_instances, excluded_instances, effective_start_time, effective_end_time, is_active, ref_id,
created_time, modified_time, wh_etl_exec_id) created_time, modified_time, wh_etl_exec_id)
SELECT app_id, flow_id, unit, frequency, included_instances, excluded_instances, effective_start_time, effective_end_time, 'Y', ref_id, SELECT app_id, flow_id, unit, frequency, cron_expression, included_instances, excluded_instances, effective_start_time, effective_end_time, 'Y', ref_id,
unix_timestamp(NOW()) created_time, NULL modified_time, wh_etl_exec_id unix_timestamp(NOW()) created_time, NULL modified_time, wh_etl_exec_id
FROM stg_flow_schedule s FROM stg_flow_schedule s
WHERE s.app_id = {app_id} AND s.flow_id IS NOT NULL WHERE s.app_id = {app_id} AND s.flow_id IS NOT NULL
ON DUPLICATE KEY UPDATE ON DUPLICATE KEY UPDATE
unit = s.unit, unit = s.unit,
frequency = s.frequency, frequency = s.frequency,
cron_expression = s.cron_expression,
is_active = 'Y', is_active = 'Y',
ref_id = s.ref_id, ref_id = s.ref_id,
included_instances = s.included_instances, included_instances = s.included_instances,

View File

@ -33,7 +33,7 @@ class SchedulerTransform:
"owners": {"columns": "app_id, flow_path, owner_id, wh_etl_exec_id", "owners": {"columns": "app_id, flow_path, owner_id, wh_etl_exec_id",
"file": "owner.csv", "file": "owner.csv",
"table": "stg_flow_owner_permission"}, "table": "stg_flow_owner_permission"},
"schedules": {"columns": "app_id, flow_path, unit, frequency, effective_start_time, effective_end_time, ref_id, wh_etl_exec_id", "schedules": {"columns": "app_id, flow_path, unit, frequency, cron_expression, effective_start_time, effective_end_time, ref_id, wh_etl_exec_id",
"file": "schedule.csv", "file": "schedule.csv",
"table": "stg_flow_schedule"}, "table": "stg_flow_schedule"},
"flow_execs": {"columns": "app_id, flow_name, flow_path, flow_exec_uuid, source_version, flow_exec_status, attempt_id, executed_by, start_time, end_time, wh_etl_exec_id", "flow_execs": {"columns": "app_id, flow_name, flow_path, flow_exec_uuid, source_version, flow_exec_status, attempt_id, executed_by, start_time, end_time, wh_etl_exec_id",

View File

@ -25,17 +25,19 @@ public class AzkabanFlowScheduleRecord extends AbstractRecord {
String flowPath; String flowPath;
String unit; String unit;
Integer frequency; Integer frequency;
String cronExpression;
Long effectiveStartTime; Long effectiveStartTime;
Long effectiveEndTime; Long effectiveEndTime;
String refId; String refId;
Long whExecId; Long whExecId;
public AzkabanFlowScheduleRecord(Integer appId, String flowPath, String unit, Integer frequency, public AzkabanFlowScheduleRecord(Integer appId, String flowPath, String unit, Integer frequency,
Long effectiveStartTime, Long effectiveEndTime, String refId, Long whExecId) { String cronExpression, Long effectiveStartTime, Long effectiveEndTime, String refId, Long whExecId) {
this.appId = appId; this.appId = appId;
this.flowPath = flowPath; this.flowPath = flowPath;
this.unit = unit; this.unit = unit;
this.frequency = frequency; this.frequency = frequency;
this.cronExpression = cronExpression;
this.effectiveStartTime = effectiveStartTime; this.effectiveStartTime = effectiveStartTime;
this.effectiveEndTime = effectiveEndTime; this.effectiveEndTime = effectiveEndTime;
this.refId = refId; this.refId = refId;
@ -49,6 +51,7 @@ public class AzkabanFlowScheduleRecord extends AbstractRecord {
allFields.add(flowPath); allFields.add(flowPath);
allFields.add(unit); allFields.add(unit);
allFields.add(frequency); allFields.add(frequency);
allFields.add(cronExpression);
allFields.add(effectiveStartTime); allFields.add(effectiveStartTime);
allFields.add(effectiveEndTime); allFields.add(effectiveEndTime);
allFields.add(refId); allFields.add(refId);

View File

@ -18,7 +18,7 @@ package wherehows.common.schemas;
*/ */
public class OozieFlowScheduleRecord extends AzkabanFlowScheduleRecord { public class OozieFlowScheduleRecord extends AzkabanFlowScheduleRecord {
public OozieFlowScheduleRecord(Integer appId, String flowPath, String frequency, Integer interval, public OozieFlowScheduleRecord(Integer appId, String flowPath, String frequency, Integer interval,
Long effectiveStartTime, Long effectiveEndTime, String refId, Long whExecId) { String cronExpression, Long effectiveStartTime, Long effectiveEndTime, String refId, Long whExecId) {
super(appId, flowPath, frequency, interval, effectiveStartTime, effectiveEndTime, refId, whExecId); super(appId, flowPath, frequency, interval, cronExpression, effectiveStartTime, effectiveEndTime, refId, whExecId);
} }
} }