diff --git a/metadata-etl/src/main/resources/jython/AppworxExtract.py b/metadata-etl/src/main/resources/jython/AppworxExtract.py index 076afb20c1..386a4e990a 100644 --- a/metadata-etl/src/main/resources/jython/AppworxExtract.py +++ b/metadata-etl/src/main/resources/jython/AppworxExtract.py @@ -20,20 +20,13 @@ from wherehows.common.schemas import AppworxFlowExecRecord from wherehows.common.schemas import AppworxJobExecRecord from wherehows.common.schemas import AppworxFlowScheduleRecord from wherehows.common.schemas import AppworxFlowOwnerRecord -from wherehows.common.enums import AzkabanPermission from wherehows.common import Constant from wherehows.common.enums import SchedulerType from com.ziclix.python.sql import zxJDBC -import os -import DbUtil -import sys -import gzip -import StringIO -import json -import datetime -import time -import re from org.slf4j import LoggerFactory +import os, sys, re +import DbUtil + class AppworxExtract: @@ -57,11 +50,11 @@ class AppworxExtract: args[Constant.WH_DB_DRIVER_KEY]) self.wh_cursor = self.wh_con.cursor() self.aw_con = self.get_connection(args[Constant.AW_DB_URL_KEY], - args[Constant.AW_DB_PORT_KEY], - args[Constant.AW_DB_NAME_KEY], - args[Constant.AW_DB_USERNAME_KEY], - args[Constant.AW_DB_PASSWORD_KEY], - args[Constant.AW_DB_DRIVER_KEY]) + args[Constant.AW_DB_PORT_KEY], + args[Constant.AW_DB_NAME_KEY], + args[Constant.AW_DB_USERNAME_KEY], + args[Constant.AW_DB_PASSWORD_KEY], + args[Constant.AW_DB_DRIVER_KEY]) self.aw_cursor = self.aw_con.cursor() self.lookback_period = args[Constant.AW_EXEC_ETL_LOOKBACK_KEY] self.app_folder = args[Constant.WH_APP_FOLDER_KEY] @@ -85,8 +78,8 @@ class AppworxExtract: rows = DbUtil.dict_cursor(self.wh_cursor) if rows: for row in rows: - self.last_execution_unix_time = row['last_time'] - break; + self.last_execution_unix_time = long(row['last_time']) + break except: self.logger.error("Get the last execution time from job_execution failed") self.last_execution_unix_time = None @@ -108,52 +101,42 @@ class AppworxExtract: self.wh_con.close() def collect_flow_jobs(self, flow_file, job_file, dag_file): - self.logger.info("collect flow&jobs") + self.logger.info("collect flow&jobs [last_execution_unix_time=%s lookback_period=%s]" + % (self.last_execution_unix_time, self.lookback_period)) timezone = "ALTER SESSION SET TIME_ZONE = 'US/Pacific'" self.aw_cursor.execute(timezone) schema = "ALTER SESSION SET CURRENT_SCHEMA=APPWORX" self.aw_cursor.execute(schema) if self.last_execution_unix_time: - query = \ - """SELECT J.*, R.RUNS - FROM SO_JOB_TABLE J JOIN ( - SELECT SO_JOB_SEQ, COUNT(*) as RUNS - FROM - ( SELECT SO_JOB_SEQ FROM SO_JOB_HISTORY - WHERE cast((FROM_TZ(CAST(SO_JOB_FINISHED as timestamp), 'US/Pacific') at time zone 'GMT') as date) - >= (TO_DATE('1970-01-01','YYYY-MM-DD') + (%d - 3600) / 86400) - UNION ALL - SELECT SO_JOB_SEQ FROM SO_JOB_QUEUE - WHERE SO_STATUS_NAME IN ('RUNNING', 'FINISHED') - ) - GROUP BY SO_JOB_SEQ - ) R ON J.SO_JOB_SEQ = R.SO_JOB_SEQ - WHERE SO_COMMAND_TYPE = 'CHAIN' - """ % long(self.last_execution_unix_time) + time_filter = "(DATE '1970-01-01' - INTERVAL '8' HOUR) + (%d - 3600) / 86400" % long(self.last_execution_unix_time) else: - query = \ - """SELECT J.*, R.RUNS + time_filter = "SYSDATE - %d" % int(self.lookback_period) + flow_query = \ + """SELECT J.SO_JOB_SEQ, J.SO_APPLICATION, J.SO_MODULE, R.LAST_CHAIN_ID FROM SO_JOB_TABLE J JOIN ( - SELECT SO_JOB_SEQ, COUNT(*) as RUNS - FROM - ( SELECT SO_JOB_SEQ FROM SO_JOB_HISTORY - WHERE SO_JOB_FINISHED >= SYSDATE - %d + SELECT SO_JOB_SEQ, MAX(SO_CHAIN_ID) as LAST_CHAIN_ID + FROM + ( SELECT SO_JOB_SEQ, SO_CHAIN_ID FROM SO_JOB_HISTORY + WHERE SO_JOB_FINISHED >= %s + AND SO_CHILD_COUNT > 0 UNION ALL - SELECT SO_JOB_SEQ FROM SO_JOB_QUEUE - WHERE SO_STATUS_NAME IN ('RUNNING', 'FINISHED') + SELECT SO_JOB_SEQ, SO_CHAIN_ID FROM SO_JOB_QUEUE + WHERE SO_STATUS_NAME IN ('INITIATED', 'RUNNING', 'FINISHED') + AND SO_CHILD_COUNT > 0 ) GROUP BY SO_JOB_SEQ ) R ON J.SO_JOB_SEQ = R.SO_JOB_SEQ WHERE SO_COMMAND_TYPE = 'CHAIN' - """ % int(self.lookback_period) + ORDER BY 2,3 + """ % time_filter job_query = \ """SELECT d.SO_TASK_NAME, d.SO_CHAIN_ORDER, d.SO_PREDECESSORS as PREDECESSORS, d.SO_DET_SEQ as JOB_ID, t.* FROM SO_CHAIN_DETAIL d JOIN SO_JOB_TABLE t ON d.SO_JOB_SEQ = t.SO_JOB_SEQ - WHERE d.SO_CHAIN_SEQ = %d + WHERE d.SO_CHAIN_SEQ = %d ORDER BY d.SO_CHAIN_ORDER """ - self.aw_cursor.execute(query) + self.aw_cursor.execute(flow_query) rows = DbUtil.dict_cursor(self.aw_cursor) flow_writer = FileWriter(flow_file) job_writer = FileWriter(job_file) @@ -221,7 +204,8 @@ class AppworxExtract: dag_writer.close() def collect_flow_execs(self, flow_exec_file, job_exec_file, look_back_period): - self.logger.info("collect flow&job executions") + self.logger.info("collect flow&job executions [last_execution_unix_time=%s lookback_period=%s]" + % (self.last_execution_unix_time, self.lookback_period)) flow_exec_writer = FileWriter(flow_exec_file) job_exec_writer = FileWriter(job_exec_file) timezone = "ALTER SESSION SET TIME_ZONE = 'US/Pacific'" @@ -237,11 +221,15 @@ class AppworxExtract: ROUND((cast((FROM_TZ(CAST(H.SO_JOB_FINISHED as timestamp), 'US/Pacific') at time zone 'GMT') as date) - to_date('01-JAN-1970','DD-MON-YYYY'))* (86400)) as JOB_FINISHED, U.SO_USER_NAME FROM SO_JOB_TABLE J - JOIN SO_JOB_HISTORY H ON J.SO_JOB_SEQ = H.SO_JOB_SEQ + JOIN ( + SELECT * FROM SO_JOB_HISTORY WHERE SO_JOB_FINISHED >= DATE '1970-01-01' - interval '8' hour + (%d - 3600) / 86400 + AND SO_CHILD_COUNT > 0 + UNION ALL + SELECT * FROM SO_JOB_QUEUE WHERE SO_STATUS_NAME IN ('INITIATED', 'RUNNING', 'FINISHED') + AND SO_CHILD_COUNT > 0 + ) H ON J.SO_JOB_SEQ = H.SO_JOB_SEQ LEFT JOIN SO_USER_TABLE U ON H.SO_USER_SEQ = U.SO_USER_SEQ - WHERE cast((FROM_TZ(CAST(H.SO_JOB_FINISHED as timestamp), 'US/Pacific') at time zone 'GMT') as date) >= - (TO_DATE('1970-01-01','YYYY-MM-DD') + (%d - 3600) / 86400) and - J.SO_COMMAND_TYPE = 'CHAIN' """ % self.last_execution_unix_time + WHERE J.SO_COMMAND_TYPE = 'CHAIN' """ % long(self.last_execution_unix_time) else: flow_cmd = \ """SELECT J.SO_JOB_SEQ, J.SO_MODULE, J.SO_APPLICATION, H.SO_STATUS_NAME, H.SO_JOBID, H.SO_CHAIN_ID, @@ -250,11 +238,21 @@ class AppworxExtract: ROUND((cast((FROM_TZ(CAST(H.SO_JOB_FINISHED as timestamp), 'US/Pacific') at time zone 'GMT') as date) - to_date('01-JAN-1970','DD-MON-YYYY'))* (86400)) as JOB_FINISHED, U.SO_USER_NAME FROM SO_JOB_TABLE J - JOIN SO_JOB_HISTORY H ON J.SO_JOB_SEQ = H.SO_JOB_SEQ + JOIN ( + SELECT * FROM SO_JOB_HISTORY WHERE SO_JOB_FINISHED >= SYSDATE - %d + AND SO_CHILD_COUNT > 0 + UNION ALL + SELECT * FROM SO_JOB_QUEUE WHERE SO_STATUS_NAME IN ('INITIATED', 'RUNNING', 'FINISHED') + AND SO_CHILD_COUNT > 0 + ) H ON J.SO_JOB_SEQ = H.SO_JOB_SEQ LEFT JOIN SO_USER_TABLE U ON H.SO_USER_SEQ = U.SO_USER_SEQ - WHERE H.SO_JOB_FINISHED >= SYSDATE - %d and - J.SO_COMMAND_TYPE = 'CHAIN' """ % int(self.lookback_period) + WHERE J.SO_COMMAND_TYPE = 'CHAIN' """ % int(self.lookback_period) + ''' SO_CHAIN_ID = :flow_exec_id will find all job executions under the top level flow + + select SO_EXECUTE_ORDER, SO_JOBID, SO_PARENTS_JOBID, SO_DIRECT_PARENT_JOBID, SO_CHAIN_ID + from so_job_history where SO_JOBID = SO_CHAIN_ID or SO_PARENTS_JOBID <> SO_CHAIN_ID + ''' if self.last_execution_unix_time: job_cmd = \ """SELECT D.SO_TASK_NAME, U.SO_USER_NAME, H.SO_STATUS_NAME, H.SO_JOBID, D.SO_DET_SEQ as JOB_ID, @@ -265,9 +263,8 @@ class AppworxExtract: FROM SO_JOB_HISTORY H JOIN SO_CHAIN_DETAIL D ON D.SO_CHAIN_SEQ = H.SO_CHAIN_SEQ AND D.SO_DET_SEQ = H.SO_DET_SEQ LEFT JOIN SO_USER_TABLE U ON H.SO_USER_SEQ = U.SO_USER_SEQ - WHERE cast((FROM_TZ(CAST(H.SO_JOB_FINISHED as timestamp), 'US/Pacific') at time zone 'GMT') as date) >= - (TO_DATE('1970-01-01','YYYY-MM-DD') + (%d - 3600) / 86400) and - H.SO_PARENTS_JOBID = %d and H.SO_CHAIN_ID = %d""" + WHERE --H.SO_JOB_FINISHED >= DATE '1970-01-01' - interval '8' hour + (%d - 3600) / 86400) and + H.SO_CHAIN_ID = %d""" else: job_cmd = \ """SELECT D.SO_TASK_NAME, U.SO_USER_NAME, H.SO_STATUS_NAME, H.SO_JOBID, D.SO_DET_SEQ as JOB_ID, @@ -279,9 +276,12 @@ class AppworxExtract: JOIN SO_CHAIN_DETAIL D ON D.SO_CHAIN_SEQ = H.SO_CHAIN_SEQ AND D.SO_DET_SEQ = H.SO_DET_SEQ LEFT JOIN SO_USER_TABLE U ON H.SO_USER_SEQ = U.SO_USER_SEQ WHERE H.SO_JOB_FINISHED >= SYSDATE - %d and - H.SO_PARENTS_JOBID = %d and H.SO_CHAIN_ID = %d""" + H.SO_CHAIN_ID = %d""" - self.aw_cursor.execute(flow_cmd) + try: + self.aw_cursor.execute(flow_cmd) + except Exception as e: + self.logger.error(e + "\n" + flow_cmd) rows = DbUtil.dict_cursor(self.aw_cursor) row_count = 0 @@ -295,6 +295,7 @@ class AppworxExtract: flow_exec_id = int(so_flow_id) except Exception as e: self.logger.error(e) + self.logger.debug("processing flow_exec_id: %8d" % flow_exec_id) flow_exec_record = AppworxFlowExecRecord(self.app_id, long(row['SO_JOB_SEQ']), @@ -304,17 +305,17 @@ class AppworxExtract: flow_exec_id, row['SO_STATUS_NAME'], flow_attempt, - row['SO_USER_NAME'], + row['SO_USER_NAME'] if row['SO_USER_NAME'] else '', long(row['JOB_STARTED']), - long(row['JOB_FINISHED']), + long(row['JOB_FINISHED'] if row['JOB_FINISHED'] else 0), self.wh_exec_id) flow_exec_writer.append(flow_exec_record) new_appworx_cursor = self.aw_con.cursor() if self.last_execution_unix_time: - new_appworx_cursor.execute(job_cmd % (self.last_execution_unix_time, flow_exec_id, long(row['SO_CHAIN_ID']))) + new_appworx_cursor.execute(job_cmd % (long(self.last_execution_unix_time), flow_exec_id)) else: - new_appworx_cursor.execute(job_cmd % (int(self.lookback_period), flow_exec_id, long(row['SO_CHAIN_ID']))) + new_appworx_cursor.execute(job_cmd % (int(self.lookback_period), flow_exec_id)) job_rows = DbUtil.dict_cursor(new_appworx_cursor) for job in job_rows: diff --git a/metadata-etl/src/main/resources/jython/AppworxLineageExtract.py b/metadata-etl/src/main/resources/jython/AppworxLineageExtract.py index 546d2d4321..97152be7a3 100644 --- a/metadata-etl/src/main/resources/jython/AppworxLineageExtract.py +++ b/metadata-etl/src/main/resources/jython/AppworxLineageExtract.py @@ -14,23 +14,16 @@ from wherehows.common import Constant from com.ziclix.python.sql import zxJDBC -import ConfigParser, os -import DbUtil -from pyparsing import * -import sys -import hashlib -import gzip -import StringIO -import json -import datetime -import time -import re from org.slf4j import LoggerFactory from AppworxLogParser import AppworxLogParser from PigLogParser import PigLogParser from BteqLogParser import BteqLogParser +import sys, os, re, time +import DbUtil +import hashlib import ParseUtil + class AppworxLineageExtract: def __init__(self, args): @@ -46,6 +39,7 @@ class AppworxLineageExtract: self.local_script_path = args[Constant.AW_LOCAL_SCRIPT_PATH] self.remote_script_path = args[Constant.AW_REMOTE_SCRIPT_PATH] self.aw_archive_dir = args[Constant.AW_ARCHIVE_DIR] + # self.aw_log_url = args[Constant.AW_LOG_URL] self.bteq_source_target_override = args[Constant.AW_BTEQ_SOURCE_TARGET_OVERRIDE] self.metric_override = args[Constant.AW_METRIC_OVERRIDE] self.skip_already_parsed = args[Constant.AW_SKIP_ALREADY_PARSED] @@ -106,17 +100,19 @@ class AppworxLineageExtract: def get_log_file_name(self, module_name, days_offset=1): if self.last_execution_unix_time: - query = \ + query = \ """select je.*, fj.job_type, fl.flow_path, - CONCAT('%s',CONCAT(CONCAT(DATE_FORMAT(FROM_UNIXTIME(je.start_time), '%%Y%%m%%d'),'/o'), - CONCAT(je.job_exec_id, '.', LPAD(je.attempt_id, 2, 0), '.gz'))) as gzipped_file_name + CONCAT('o', je.job_exec_id, '.', LPAD(je.attempt_id, 2, 0)) as log_file_name, + CONCAT('%s', DATE_FORMAT(FROM_UNIXTIME(je.end_time), '%%Y%%m%%d'), '/', + CONCAT('o', je.job_exec_id, '.', LPAD(je.attempt_id, 2, 0)), '.gz') as gzipped_file_name from job_execution je JOIN flow_job fj on je.app_id = fj.app_id and je.flow_id = fj.flow_id and fj.is_current = 'Y' and je.job_id = fj.job_id JOIN flow fl on fj.app_id = fl.app_id and fj.flow_id = fl.flow_id WHERE je.app_id = %d - and je.end_time >= UNIX_TIMESTAMP(DATE_SUB(from_unixtime(%d), INTERVAL 1 HOUR)) - and UPPER(fj.job_type) = '%s' + and je.end_time >= %d - 3660 + and fj.job_type = '%s' + ORDER BY je.flow_exec_id DESC, je.job_exec_id """ self.logger.info(query % (self.aw_archive_dir, self.app_id, long(self.last_execution_unix_time), module_name)) self.aw_cursor.execute(query % @@ -124,19 +120,21 @@ class AppworxLineageExtract: else: query = \ """select je.*, fj.job_type, fl.flow_path, - CONCAT('%s',CONCAT(CONCAT(DATE_FORMAT(FROM_UNIXTIME(je.start_time), '%%Y%%m%%d'),'/o'), - CONCAT(je.job_exec_id, '.', LPAD(je.attempt_id, 2, 0), '.gz'))) as gzipped_file_name + CONCAT('o', je.job_exec_id, '.', LPAD(je.attempt_id, 2, 0)) as log_file_name, + CONCAT('%s', DATE_FORMAT(FROM_UNIXTIME(je.end_time), '%%Y%%m%%d'), '/', + CONCAT('o', je.job_exec_id, '.', LPAD(je.attempt_id, 2, 0)), '.gz') as gzipped_file_name from job_execution je JOIN flow_job fj on je.app_id = fj.app_id and je.flow_id = fj.flow_id and fj.is_current = 'Y' and je.job_id = fj.job_id JOIN flow fl on fj.app_id = fl.app_id and fj.flow_id = fl.flow_id - WHERE je.app_id = %d and - from_unixtime(je.end_time) >= CURRENT_DATE - INTERVAL %d HOUR and UPPER(fj.job_type) = '%s' + WHERE je.app_id = %d and je.job_exec_status in ('FINISHED', 'SUCCEEDED', 'OK') + and je.end_time >= unix_timestamp(CURRENT_DATE - INTERVAL %d DAY) and fj.job_type = '%s' + ORDER BY je.flow_exec_id DESC, je.job_exec_id """ self.logger.info(query % (self.aw_archive_dir, self.app_id, int(self.look_back_days), module_name)) self.aw_cursor.execute(query % (self.aw_archive_dir, self.app_id, int(self.look_back_days), module_name)) job_rows = DbUtil.copy_dict_cursor(self.aw_cursor) - self.logger.info(str(len(job_rows))) + self.logger.info("%d job executions will be scanned for lineage" % len(job_rows)) return job_rows @@ -426,7 +424,7 @@ class AppworxLineageExtract: self.logger.error(str(sys.exc_info()[0])) def process_li_shell_gw(self): - self.logger.info("process process_li_shell_gw") + self.logger.info("process li_shell_gw") parameter_query = \ """ SELECT param_value @@ -643,7 +641,7 @@ class AppworxLineageExtract: def process_li_hadoop(self): - self.logger.info("process li_pig") + self.logger.info("process li_hadoop") rows = self.get_log_file_name(module_name='LI_HADOOP') for row in rows: try: @@ -693,8 +691,7 @@ class AppworxLineageExtract: source_database_id = int(self.db_lookup(tab['source_database'])) full_table_name = \ - tab['source_schema'] + \ - ('.' if tab['source_schema'] is not None else '') + \ + (tab['source_schema'] + '.' if tab['source_schema'] is not None else '') + \ tab['source_table_name'] self.aw_cursor.execute(update_staging_lineage_query % (int(row['app_id']), int(row['flow_exec_id']), int(row['job_exec_id']), @@ -704,7 +701,7 @@ class AppworxLineageExtract: 'source', int(srl), 0, 0, 'JDBC Read', 0, int(row['wh_etl_exec_id']) )) source_srl_no = srl - srl = srl + 1 + srl += 1 self.aw_cursor.execute(update_staging_lineage_query % (int(row['app_id']), int(row['flow_exec_id']), int(row['job_exec_id']), row['flow_path'], row['job_name'], int(row['start_time']), int(row['end_time']), @@ -714,10 +711,10 @@ class AppworxLineageExtract: tab['end_partition'] if tab['frequency'] != 'snapshot' else None, tab['frequency'], tab['frequency'], 'HDFS', - 'target', int(srl), int(source_srl_no), int(source_srl_no), 'JDBC Write', + 'target', int(srl), int(source_srl_no), int(source_srl_no), 'HDFS Write', int(tab['record_count']), int(row['wh_etl_exec_id']) )) - srl = srl + 1 + srl += 1 elif results['script_type'] == 'CMD': db_id = self.db_lookup(results['cluster']) diff --git a/metadata-etl/src/main/resources/jython/AppworxLogParser.py b/metadata-etl/src/main/resources/jython/AppworxLogParser.py index 08cf8773e6..e0e93522a5 100644 --- a/metadata-etl/src/main/resources/jython/AppworxLogParser.py +++ b/metadata-etl/src/main/resources/jython/AppworxLogParser.py @@ -12,31 +12,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # -from wherehows.common.writers import FileWriter -from wherehows.common.schemas import AppworxFlowRecord -from wherehows.common.schemas import AppworxJobRecord -from wherehows.common.schemas import AppworxFlowDagRecord -from wherehows.common.schemas import AppworxFlowExecRecord -from wherehows.common.schemas import AppworxJobExecRecord -from wherehows.common.schemas import AppworxFlowScheduleRecord -from wherehows.common.schemas import AppworxFlowOwnerRecord -from wherehows.common.enums import AzkabanPermission -from wherehows.common import Constant -from wherehows.common.enums import SchedulerType -from com.ziclix.python.sql import zxJDBC from pyparsing import * -import ParseUtil -import os -import ast -import DbUtil -import sys -import gzip -import StringIO -import json -import datetime -import time -import re from org.slf4j import LoggerFactory +import os, sys, gzip, re, ast +import ParseUtil + ParserElement.enablePackrat() diff --git a/metadata-etl/src/main/resources/jython/BteqLogParser.py b/metadata-etl/src/main/resources/jython/BteqLogParser.py index 2cc770e8d5..dcb6589013 100644 --- a/metadata-etl/src/main/resources/jython/BteqLogParser.py +++ b/metadata-etl/src/main/resources/jython/BteqLogParser.py @@ -12,31 +12,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # -from wherehows.common.writers import FileWriter -from wherehows.common.schemas import AppworxFlowRecord -from wherehows.common.schemas import AppworxJobRecord -from wherehows.common.schemas import AppworxFlowDagRecord -from wherehows.common.schemas import AppworxFlowExecRecord -from wherehows.common.schemas import AppworxJobExecRecord -from wherehows.common.schemas import AppworxFlowScheduleRecord -from wherehows.common.schemas import AppworxFlowOwnerRecord -from wherehows.common.enums import AzkabanPermission -from wherehows.common import Constant -from wherehows.common.enums import SchedulerType -from com.ziclix.python.sql import zxJDBC from metadata.etl.sqlParser import SqlParser from pyparsing import * -import ParseUtil -import ConfigParser, os -import DbUtil -import sys -import gzip -import StringIO -import json -import datetime -import time -import re from org.slf4j import LoggerFactory +import sys, os, json +import ParseUtil +import ConfigParser + class BteqLogParser: diff --git a/metadata-etl/src/main/resources/jython/ParseUtil.py b/metadata-etl/src/main/resources/jython/ParseUtil.py index 11b91ebe88..f3e511c1d1 100644 --- a/metadata-etl/src/main/resources/jython/ParseUtil.py +++ b/metadata-etl/src/main/resources/jython/ParseUtil.py @@ -12,29 +12,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # -from wherehows.common.writers import FileWriter -from wherehows.common.schemas import AppworxFlowRecord -from wherehows.common.schemas import AppworxJobRecord -from wherehows.common.schemas import AppworxFlowDagRecord -from wherehows.common.schemas import AppworxFlowExecRecord -from wherehows.common.schemas import AppworxJobExecRecord -from wherehows.common.schemas import AppworxFlowScheduleRecord -from wherehows.common.schemas import AppworxFlowOwnerRecord -from wherehows.common.enums import AzkabanPermission -from wherehows.common import Constant -from wherehows.common.enums import SchedulerType -from com.ziclix.python.sql import zxJDBC from pyparsing import * -import os -import DbUtil -import sys -import gzip -import StringIO -import json -import datetime -import time -import re -from org.slf4j import LoggerFactory +import time, re + def get_db_table_abstraction(table_name): parsed_table_name = table_name.split('.') diff --git a/metadata-etl/src/main/resources/jython/PigLogParser.py b/metadata-etl/src/main/resources/jython/PigLogParser.py index cf19391d65..277af472ab 100644 --- a/metadata-etl/src/main/resources/jython/PigLogParser.py +++ b/metadata-etl/src/main/resources/jython/PigLogParser.py @@ -12,30 +12,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # -from wherehows.common.writers import FileWriter -from wherehows.common.schemas import AppworxFlowRecord -from wherehows.common.schemas import AppworxJobRecord -from wherehows.common.schemas import AppworxFlowDagRecord -from wherehows.common.schemas import AppworxFlowExecRecord -from wherehows.common.schemas import AppworxJobExecRecord -from wherehows.common.schemas import AppworxFlowScheduleRecord -from wherehows.common.schemas import AppworxFlowOwnerRecord -from wherehows.common.enums import AzkabanPermission -from wherehows.common import Constant -from wherehows.common.enums import SchedulerType -from com.ziclix.python.sql import zxJDBC -from pyparsing import * -import ParseUtil -import os -import DbUtil -import sys -import gzip -import StringIO -import json -import datetime -import time -import re from org.slf4j import LoggerFactory +import os, sys, gzip, re +import ParseUtil + class PigLogParser: diff --git a/metadata-etl/src/main/resources/jython/ScriptCollect.py b/metadata-etl/src/main/resources/jython/ScriptCollect.py index 5ec741ed86..b47eddbf29 100644 --- a/metadata-etl/src/main/resources/jython/ScriptCollect.py +++ b/metadata-etl/src/main/resources/jython/ScriptCollect.py @@ -14,9 +14,9 @@ from wherehows.common import Constant from com.ziclix.python.sql import zxJDBC +from org.slf4j import LoggerFactory import DbUtil import sys -from org.slf4j import LoggerFactory class ScriptCollect():