diff --git a/data-model/DDL/ETL_DDL/executor_metadata.sql b/data-model/DDL/ETL_DDL/executor_metadata.sql index 3b7f4f2fc2..c6da6daa7c 100644 --- a/data-model/DDL/ETL_DDL/executor_metadata.sql +++ b/data-model/DDL/ETL_DDL/executor_metadata.sql @@ -95,8 +95,8 @@ CREATE TABLE flow_job ( job_type_id SMALLINT COMMENT 'type id of the job', job_type VARCHAR(63) COMMENT 'type of the job', ref_flow_id INT UNSIGNED NULL COMMENT 'the reference flow id of the job if the job is a subflow', - pre_jobs VARCHAR(4096) COMMENT 'comma separated job ids that run before this job', - post_jobs VARCHAR(4096) COMMENT 'comma separated job ids that run after this job', + pre_jobs VARCHAR(20000) CHAR SET latin1 COMMENT 'comma separated job ids that run before this job', + post_jobs VARCHAR(20000) CHAR SET latin1 COMMENT 'comma separated job ids that run after this job', is_current CHAR(1) COMMENT 'determine if it is a current job', is_first CHAR(1) COMMENT 'determine if it is the first job', is_last CHAR(1) COMMENT 'determine if it is the last job', @@ -126,8 +126,8 @@ CREATE TABLE stg_flow_job ( job_type VARCHAR(63) COMMENT 'type of the job', ref_flow_id INT UNSIGNED NULL COMMENT 'the reference flow id of the job if the job is a subflow', ref_flow_path VARCHAR(1024) COMMENT 'the reference flow path of the job if the job is a subflow', - pre_jobs VARCHAR(4096) COMMENT 'comma separated job ids that run before this job', - post_jobs VARCHAR(4096) COMMENT 'comma separated job ids that run after this job', + pre_jobs VARCHAR(20000) CHAR SET latin1 COMMENT 'comma separated job ids that run before this job', + post_jobs VARCHAR(20000) CHAR SET latin1 COMMENT 'comma separated job ids that run after this job', is_current CHAR(1) COMMENT 'determine if it is a current job', is_first CHAR(1) COMMENT 'determine if it is the first job', is_last CHAR(1) COMMENT 'determine if it is the last job', @@ -366,6 +366,7 @@ CREATE TABLE flow_schedule ( COMMENT 'flow id', unit VARCHAR(31) COMMENT 'unit of time', frequency INT COMMENT 'frequency of the unit', + cron_expression VARCHAR(127) COMMENT 'cron expression', is_active CHAR(1) COMMENT 'determine if it is an active schedule', included_instances VARCHAR(127) COMMENT 'included instance', excluded_instances VARCHAR(127) COMMENT 'excluded instance', @@ -389,6 +390,7 @@ CREATE TABLE stg_flow_schedule ( flow_path VARCHAR(1024) COMMENT 'flow path from top level', unit VARCHAR(31) COMMENT 'unit of time', frequency INT COMMENT 'frequency of the unit', + cron_expression VARCHAR(127) COMMENT 'cron expression', included_instances VARCHAR(127) COMMENT 'included instance', excluded_instances VARCHAR(127) COMMENT 'excluded instance', effective_start_time INT UNSIGNED COMMENT 'effective start time of the flow execution', @@ -438,7 +440,7 @@ CREATE TABLE stg_flow_owner_permission ( DEFAULT CHARSET = utf8 COMMENT = 'Scheduler owner table' PARTITION BY HASH (app_id) PARTITIONS 8; -CREATE TABLE job_execution_ext_reference ( +CREATE TABLE job_execution_ext_reference ( app_id smallint(5) UNSIGNED COMMENT 'application id of the flow' NOT NULL, job_exec_id bigint(20) UNSIGNED COMMENT 'job execution id either inherit or generated' NOT NULL, attempt_id smallint(6) COMMENT 'job execution attempt id' DEFAULT '0', @@ -463,11 +465,11 @@ PARTITION BY HASH(app_id) PARTITION p7) ; -CREATE INDEX idx_job_execution_ext_ref__ext_ref_id USING BTREE +CREATE INDEX idx_job_execution_ext_ref__ext_ref_id USING BTREE ON job_execution_ext_reference(ext_ref_id); -CREATE TABLE stg_job_execution_ext_reference ( +CREATE TABLE stg_job_execution_ext_reference ( app_id smallint(5) UNSIGNED COMMENT 'application id of the flow' NOT NULL, job_exec_id bigint(20) UNSIGNED COMMENT 'job execution id either inherit or generated' NOT NULL, attempt_id smallint(6) COMMENT 'job execution attempt id' DEFAULT '0', diff --git a/metadata-etl/src/main/resources/jython/AzkabanExtract.py b/metadata-etl/src/main/resources/jython/AzkabanExtract.py index d3522d2878..88711d96cc 100644 --- a/metadata-etl/src/main/resources/jython/AzkabanExtract.py +++ b/metadata-etl/src/main/resources/jython/AzkabanExtract.py @@ -224,13 +224,18 @@ class AzkabanExtract: # print json.dumps(row[json_column], indent=4) if row[json_column]["triggerCondition"]["checkers"][0]["checkerJson"]["isRecurring"] == 'true': - unit = row[json_column]["triggerCondition"]["checkers"][0]["checkerJson"]["period"][-1:] - unit = self._period_unit_table[unit] - frequency = int(row[json_column]["triggerCondition"]["checkers"][0]["checkerJson"]["period"][:-1]) + unit, frequency, cron_expr = None, None, None + period = row[json_column]["triggerCondition"]["checkers"][0]["checkerJson"]["period"] + if period is not None and period != "null" and period[-1:] in self._period_unit_table: + unit = self._period_unit_table[period[-1:]] + frequency = int(row[json_column]["triggerCondition"]["checkers"][0]["checkerJson"]["period"][:-1]) + if "cronExpression" in row[json_column]["triggerCondition"]["checkers"][0]["checkerJson"]: + cron_expr = row[json_column]["triggerCondition"]["checkers"][0]["checkerJson"]["cronExpression"] schedule_record = AzkabanFlowScheduleRecord(self.app_id, row[json_column]["actions"][0]["actionJson"]["projectName"] + ':' + row[json_column]["actions"][0]["actionJson"]["flowName"], unit, frequency, + cron_expr, long(row[json_column]["triggerCondition"]["checkers"][0]["checkerJson"]["firstCheckTime"]) / 1000, int(time.mktime(datetime.date(2099,12,31).timetuple())), '0', diff --git a/metadata-etl/src/main/resources/jython/OozieExtract.py b/metadata-etl/src/main/resources/jython/OozieExtract.py index c10a8f1407..e183a14f1f 100644 --- a/metadata-etl/src/main/resources/jython/OozieExtract.py +++ b/metadata-etl/src/main/resources/jython/OozieExtract.py @@ -163,6 +163,7 @@ class OozieExtract: row['app_path'], row['time_unit'], int(row['frequency']), + None, row['start_time'], row['end_time'], row['ref_id'], diff --git a/metadata-etl/src/main/resources/jython/SchedulerLoad.py b/metadata-etl/src/main/resources/jython/SchedulerLoad.py index 0548d55aef..3ec63c9554 100644 --- a/metadata-etl/src/main/resources/jython/SchedulerLoad.py +++ b/metadata-etl/src/main/resources/jython/SchedulerLoad.py @@ -136,15 +136,16 @@ class SchedulerLoad: self.wh_con.commit() cmd = """ - INSERT INTO flow_schedule (app_id, flow_id, unit, frequency, included_instances, excluded_instances, effective_start_time, effective_end_time, is_active, ref_id, + INSERT INTO flow_schedule (app_id, flow_id, unit, frequency, cron_expression, included_instances, excluded_instances, effective_start_time, effective_end_time, is_active, ref_id, created_time, modified_time, wh_etl_exec_id) - SELECT app_id, flow_id, unit, frequency, included_instances, excluded_instances, effective_start_time, effective_end_time, 'Y', ref_id, + SELECT app_id, flow_id, unit, frequency, cron_expression, included_instances, excluded_instances, effective_start_time, effective_end_time, 'Y', ref_id, unix_timestamp(NOW()) created_time, NULL modified_time, wh_etl_exec_id FROM stg_flow_schedule s WHERE s.app_id = {app_id} AND s.flow_id IS NOT NULL ON DUPLICATE KEY UPDATE unit = s.unit, frequency = s.frequency, + cron_expression = s.cron_expression, is_active = 'Y', ref_id = s.ref_id, included_instances = s.included_instances, diff --git a/metadata-etl/src/main/resources/jython/SchedulerTransform.py b/metadata-etl/src/main/resources/jython/SchedulerTransform.py index 17c2cf8460..1f91868e28 100644 --- a/metadata-etl/src/main/resources/jython/SchedulerTransform.py +++ b/metadata-etl/src/main/resources/jython/SchedulerTransform.py @@ -33,7 +33,7 @@ class SchedulerTransform: "owners": {"columns": "app_id, flow_path, owner_id, wh_etl_exec_id", "file": "owner.csv", "table": "stg_flow_owner_permission"}, - "schedules": {"columns": "app_id, flow_path, unit, frequency, effective_start_time, effective_end_time, ref_id, wh_etl_exec_id", + "schedules": {"columns": "app_id, flow_path, unit, frequency, cron_expression, effective_start_time, effective_end_time, ref_id, wh_etl_exec_id", "file": "schedule.csv", "table": "stg_flow_schedule"}, "flow_execs": {"columns": "app_id, flow_name, flow_path, flow_exec_uuid, source_version, flow_exec_status, attempt_id, executed_by, start_time, end_time, wh_etl_exec_id", diff --git a/wherehows-common/src/main/java/wherehows/common/schemas/AzkabanFlowScheduleRecord.java b/wherehows-common/src/main/java/wherehows/common/schemas/AzkabanFlowScheduleRecord.java index ea56222e85..35b695edcb 100644 --- a/wherehows-common/src/main/java/wherehows/common/schemas/AzkabanFlowScheduleRecord.java +++ b/wherehows-common/src/main/java/wherehows/common/schemas/AzkabanFlowScheduleRecord.java @@ -25,17 +25,19 @@ public class AzkabanFlowScheduleRecord extends AbstractRecord { String flowPath; String unit; Integer frequency; + String cronExpression; Long effectiveStartTime; Long effectiveEndTime; String refId; Long whExecId; public AzkabanFlowScheduleRecord(Integer appId, String flowPath, String unit, Integer frequency, - Long effectiveStartTime, Long effectiveEndTime, String refId, Long whExecId) { + String cronExpression, Long effectiveStartTime, Long effectiveEndTime, String refId, Long whExecId) { this.appId = appId; this.flowPath = flowPath; this.unit = unit; this.frequency = frequency; + this.cronExpression = cronExpression; this.effectiveStartTime = effectiveStartTime; this.effectiveEndTime = effectiveEndTime; this.refId = refId; @@ -49,6 +51,7 @@ public class AzkabanFlowScheduleRecord extends AbstractRecord { allFields.add(flowPath); allFields.add(unit); allFields.add(frequency); + allFields.add(cronExpression); allFields.add(effectiveStartTime); allFields.add(effectiveEndTime); allFields.add(refId); diff --git a/wherehows-common/src/main/java/wherehows/common/schemas/OozieFlowScheduleRecord.java b/wherehows-common/src/main/java/wherehows/common/schemas/OozieFlowScheduleRecord.java index 1a4cd0e16c..ba68faf5d4 100644 --- a/wherehows-common/src/main/java/wherehows/common/schemas/OozieFlowScheduleRecord.java +++ b/wherehows-common/src/main/java/wherehows/common/schemas/OozieFlowScheduleRecord.java @@ -18,7 +18,7 @@ package wherehows.common.schemas; */ public class OozieFlowScheduleRecord extends AzkabanFlowScheduleRecord { public OozieFlowScheduleRecord(Integer appId, String flowPath, String frequency, Integer interval, - Long effectiveStartTime, Long effectiveEndTime, String refId, Long whExecId) { - super(appId, flowPath, frequency, interval, effectiveStartTime, effectiveEndTime, refId, whExecId); + String cronExpression, Long effectiveStartTime, Long effectiveEndTime, String refId, Long whExecId) { + super(appId, flowPath, frequency, interval, cronExpression, effectiveStartTime, effectiveEndTime, refId, whExecId); } }