Update Appworx Execution and Lineage jobs (#321)

This commit is contained in:
Yi (Alan) Wang 2017-02-14 09:22:18 -08:00 committed by Mars Lan
parent 87719c03af
commit 5219d26b8e
7 changed files with 101 additions and 181 deletions

View File

@ -20,20 +20,13 @@ from wherehows.common.schemas import AppworxFlowExecRecord
from wherehows.common.schemas import AppworxJobExecRecord from wherehows.common.schemas import AppworxJobExecRecord
from wherehows.common.schemas import AppworxFlowScheduleRecord from wherehows.common.schemas import AppworxFlowScheduleRecord
from wherehows.common.schemas import AppworxFlowOwnerRecord from wherehows.common.schemas import AppworxFlowOwnerRecord
from wherehows.common.enums import AzkabanPermission
from wherehows.common import Constant from wherehows.common import Constant
from wherehows.common.enums import SchedulerType from wherehows.common.enums import SchedulerType
from com.ziclix.python.sql import zxJDBC from com.ziclix.python.sql import zxJDBC
import os
import DbUtil
import sys
import gzip
import StringIO
import json
import datetime
import time
import re
from org.slf4j import LoggerFactory from org.slf4j import LoggerFactory
import os, sys, re
import DbUtil
class AppworxExtract: class AppworxExtract:
@ -57,11 +50,11 @@ class AppworxExtract:
args[Constant.WH_DB_DRIVER_KEY]) args[Constant.WH_DB_DRIVER_KEY])
self.wh_cursor = self.wh_con.cursor() self.wh_cursor = self.wh_con.cursor()
self.aw_con = self.get_connection(args[Constant.AW_DB_URL_KEY], self.aw_con = self.get_connection(args[Constant.AW_DB_URL_KEY],
args[Constant.AW_DB_PORT_KEY], args[Constant.AW_DB_PORT_KEY],
args[Constant.AW_DB_NAME_KEY], args[Constant.AW_DB_NAME_KEY],
args[Constant.AW_DB_USERNAME_KEY], args[Constant.AW_DB_USERNAME_KEY],
args[Constant.AW_DB_PASSWORD_KEY], args[Constant.AW_DB_PASSWORD_KEY],
args[Constant.AW_DB_DRIVER_KEY]) args[Constant.AW_DB_DRIVER_KEY])
self.aw_cursor = self.aw_con.cursor() self.aw_cursor = self.aw_con.cursor()
self.lookback_period = args[Constant.AW_EXEC_ETL_LOOKBACK_KEY] self.lookback_period = args[Constant.AW_EXEC_ETL_LOOKBACK_KEY]
self.app_folder = args[Constant.WH_APP_FOLDER_KEY] self.app_folder = args[Constant.WH_APP_FOLDER_KEY]
@ -85,8 +78,8 @@ class AppworxExtract:
rows = DbUtil.dict_cursor(self.wh_cursor) rows = DbUtil.dict_cursor(self.wh_cursor)
if rows: if rows:
for row in rows: for row in rows:
self.last_execution_unix_time = row['last_time'] self.last_execution_unix_time = long(row['last_time'])
break; break
except: except:
self.logger.error("Get the last execution time from job_execution failed") self.logger.error("Get the last execution time from job_execution failed")
self.last_execution_unix_time = None self.last_execution_unix_time = None
@ -108,44 +101,34 @@ class AppworxExtract:
self.wh_con.close() self.wh_con.close()
def collect_flow_jobs(self, flow_file, job_file, dag_file): def collect_flow_jobs(self, flow_file, job_file, dag_file):
self.logger.info("collect flow&jobs") self.logger.info("collect flow&jobs [last_execution_unix_time=%s lookback_period=%s]"
% (self.last_execution_unix_time, self.lookback_period))
timezone = "ALTER SESSION SET TIME_ZONE = 'US/Pacific'" timezone = "ALTER SESSION SET TIME_ZONE = 'US/Pacific'"
self.aw_cursor.execute(timezone) self.aw_cursor.execute(timezone)
schema = "ALTER SESSION SET CURRENT_SCHEMA=APPWORX" schema = "ALTER SESSION SET CURRENT_SCHEMA=APPWORX"
self.aw_cursor.execute(schema) self.aw_cursor.execute(schema)
if self.last_execution_unix_time: if self.last_execution_unix_time:
query = \ time_filter = "(DATE '1970-01-01' - INTERVAL '8' HOUR) + (%d - 3600) / 86400" % long(self.last_execution_unix_time)
"""SELECT J.*, R.RUNS
FROM SO_JOB_TABLE J JOIN (
SELECT SO_JOB_SEQ, COUNT(*) as RUNS
FROM
( SELECT SO_JOB_SEQ FROM SO_JOB_HISTORY
WHERE cast((FROM_TZ(CAST(SO_JOB_FINISHED as timestamp), 'US/Pacific') at time zone 'GMT') as date)
>= (TO_DATE('1970-01-01','YYYY-MM-DD') + (%d - 3600) / 86400)
UNION ALL
SELECT SO_JOB_SEQ FROM SO_JOB_QUEUE
WHERE SO_STATUS_NAME IN ('RUNNING', 'FINISHED')
)
GROUP BY SO_JOB_SEQ
) R ON J.SO_JOB_SEQ = R.SO_JOB_SEQ
WHERE SO_COMMAND_TYPE = 'CHAIN'
""" % long(self.last_execution_unix_time)
else: else:
query = \ time_filter = "SYSDATE - %d" % int(self.lookback_period)
"""SELECT J.*, R.RUNS flow_query = \
"""SELECT J.SO_JOB_SEQ, J.SO_APPLICATION, J.SO_MODULE, R.LAST_CHAIN_ID
FROM SO_JOB_TABLE J JOIN ( FROM SO_JOB_TABLE J JOIN (
SELECT SO_JOB_SEQ, COUNT(*) as RUNS SELECT SO_JOB_SEQ, MAX(SO_CHAIN_ID) as LAST_CHAIN_ID
FROM FROM
( SELECT SO_JOB_SEQ FROM SO_JOB_HISTORY ( SELECT SO_JOB_SEQ, SO_CHAIN_ID FROM SO_JOB_HISTORY
WHERE SO_JOB_FINISHED >= SYSDATE - %d WHERE SO_JOB_FINISHED >= %s
AND SO_CHILD_COUNT > 0
UNION ALL UNION ALL
SELECT SO_JOB_SEQ FROM SO_JOB_QUEUE SELECT SO_JOB_SEQ, SO_CHAIN_ID FROM SO_JOB_QUEUE
WHERE SO_STATUS_NAME IN ('RUNNING', 'FINISHED') WHERE SO_STATUS_NAME IN ('INITIATED', 'RUNNING', 'FINISHED')
AND SO_CHILD_COUNT > 0
) )
GROUP BY SO_JOB_SEQ GROUP BY SO_JOB_SEQ
) R ON J.SO_JOB_SEQ = R.SO_JOB_SEQ ) R ON J.SO_JOB_SEQ = R.SO_JOB_SEQ
WHERE SO_COMMAND_TYPE = 'CHAIN' WHERE SO_COMMAND_TYPE = 'CHAIN'
""" % int(self.lookback_period) ORDER BY 2,3
""" % time_filter
job_query = \ job_query = \
"""SELECT d.SO_TASK_NAME, d.SO_CHAIN_ORDER, d.SO_PREDECESSORS as PREDECESSORS, d.SO_DET_SEQ as JOB_ID, """SELECT d.SO_TASK_NAME, d.SO_CHAIN_ORDER, d.SO_PREDECESSORS as PREDECESSORS, d.SO_DET_SEQ as JOB_ID,
t.* FROM SO_CHAIN_DETAIL d t.* FROM SO_CHAIN_DETAIL d
@ -153,7 +136,7 @@ class AppworxExtract:
WHERE d.SO_CHAIN_SEQ = %d WHERE d.SO_CHAIN_SEQ = %d
ORDER BY d.SO_CHAIN_ORDER ORDER BY d.SO_CHAIN_ORDER
""" """
self.aw_cursor.execute(query) self.aw_cursor.execute(flow_query)
rows = DbUtil.dict_cursor(self.aw_cursor) rows = DbUtil.dict_cursor(self.aw_cursor)
flow_writer = FileWriter(flow_file) flow_writer = FileWriter(flow_file)
job_writer = FileWriter(job_file) job_writer = FileWriter(job_file)
@ -221,7 +204,8 @@ class AppworxExtract:
dag_writer.close() dag_writer.close()
def collect_flow_execs(self, flow_exec_file, job_exec_file, look_back_period): def collect_flow_execs(self, flow_exec_file, job_exec_file, look_back_period):
self.logger.info("collect flow&job executions") self.logger.info("collect flow&job executions [last_execution_unix_time=%s lookback_period=%s]"
% (self.last_execution_unix_time, self.lookback_period))
flow_exec_writer = FileWriter(flow_exec_file) flow_exec_writer = FileWriter(flow_exec_file)
job_exec_writer = FileWriter(job_exec_file) job_exec_writer = FileWriter(job_exec_file)
timezone = "ALTER SESSION SET TIME_ZONE = 'US/Pacific'" timezone = "ALTER SESSION SET TIME_ZONE = 'US/Pacific'"
@ -237,11 +221,15 @@ class AppworxExtract:
ROUND((cast((FROM_TZ(CAST(H.SO_JOB_FINISHED as timestamp), 'US/Pacific') at time zone 'GMT') as date) - ROUND((cast((FROM_TZ(CAST(H.SO_JOB_FINISHED as timestamp), 'US/Pacific') at time zone 'GMT') as date) -
to_date('01-JAN-1970','DD-MON-YYYY'))* (86400)) as JOB_FINISHED, to_date('01-JAN-1970','DD-MON-YYYY'))* (86400)) as JOB_FINISHED,
U.SO_USER_NAME FROM SO_JOB_TABLE J U.SO_USER_NAME FROM SO_JOB_TABLE J
JOIN SO_JOB_HISTORY H ON J.SO_JOB_SEQ = H.SO_JOB_SEQ JOIN (
SELECT * FROM SO_JOB_HISTORY WHERE SO_JOB_FINISHED >= DATE '1970-01-01' - interval '8' hour + (%d - 3600) / 86400
AND SO_CHILD_COUNT > 0
UNION ALL
SELECT * FROM SO_JOB_QUEUE WHERE SO_STATUS_NAME IN ('INITIATED', 'RUNNING', 'FINISHED')
AND SO_CHILD_COUNT > 0
) H ON J.SO_JOB_SEQ = H.SO_JOB_SEQ
LEFT JOIN SO_USER_TABLE U ON H.SO_USER_SEQ = U.SO_USER_SEQ LEFT JOIN SO_USER_TABLE U ON H.SO_USER_SEQ = U.SO_USER_SEQ
WHERE cast((FROM_TZ(CAST(H.SO_JOB_FINISHED as timestamp), 'US/Pacific') at time zone 'GMT') as date) >= WHERE J.SO_COMMAND_TYPE = 'CHAIN' """ % long(self.last_execution_unix_time)
(TO_DATE('1970-01-01','YYYY-MM-DD') + (%d - 3600) / 86400) and
J.SO_COMMAND_TYPE = 'CHAIN' """ % self.last_execution_unix_time
else: else:
flow_cmd = \ flow_cmd = \
"""SELECT J.SO_JOB_SEQ, J.SO_MODULE, J.SO_APPLICATION, H.SO_STATUS_NAME, H.SO_JOBID, H.SO_CHAIN_ID, """SELECT J.SO_JOB_SEQ, J.SO_MODULE, J.SO_APPLICATION, H.SO_STATUS_NAME, H.SO_JOBID, H.SO_CHAIN_ID,
@ -250,11 +238,21 @@ class AppworxExtract:
ROUND((cast((FROM_TZ(CAST(H.SO_JOB_FINISHED as timestamp), 'US/Pacific') at time zone 'GMT') as date) - ROUND((cast((FROM_TZ(CAST(H.SO_JOB_FINISHED as timestamp), 'US/Pacific') at time zone 'GMT') as date) -
to_date('01-JAN-1970','DD-MON-YYYY'))* (86400)) as JOB_FINISHED, to_date('01-JAN-1970','DD-MON-YYYY'))* (86400)) as JOB_FINISHED,
U.SO_USER_NAME FROM SO_JOB_TABLE J U.SO_USER_NAME FROM SO_JOB_TABLE J
JOIN SO_JOB_HISTORY H ON J.SO_JOB_SEQ = H.SO_JOB_SEQ JOIN (
SELECT * FROM SO_JOB_HISTORY WHERE SO_JOB_FINISHED >= SYSDATE - %d
AND SO_CHILD_COUNT > 0
UNION ALL
SELECT * FROM SO_JOB_QUEUE WHERE SO_STATUS_NAME IN ('INITIATED', 'RUNNING', 'FINISHED')
AND SO_CHILD_COUNT > 0
) H ON J.SO_JOB_SEQ = H.SO_JOB_SEQ
LEFT JOIN SO_USER_TABLE U ON H.SO_USER_SEQ = U.SO_USER_SEQ LEFT JOIN SO_USER_TABLE U ON H.SO_USER_SEQ = U.SO_USER_SEQ
WHERE H.SO_JOB_FINISHED >= SYSDATE - %d and WHERE J.SO_COMMAND_TYPE = 'CHAIN' """ % int(self.lookback_period)
J.SO_COMMAND_TYPE = 'CHAIN' """ % int(self.lookback_period)
''' SO_CHAIN_ID = :flow_exec_id will find all job executions under the top level flow
select SO_EXECUTE_ORDER, SO_JOBID, SO_PARENTS_JOBID, SO_DIRECT_PARENT_JOBID, SO_CHAIN_ID
from so_job_history where SO_JOBID = SO_CHAIN_ID or SO_PARENTS_JOBID <> SO_CHAIN_ID
'''
if self.last_execution_unix_time: if self.last_execution_unix_time:
job_cmd = \ job_cmd = \
"""SELECT D.SO_TASK_NAME, U.SO_USER_NAME, H.SO_STATUS_NAME, H.SO_JOBID, D.SO_DET_SEQ as JOB_ID, """SELECT D.SO_TASK_NAME, U.SO_USER_NAME, H.SO_STATUS_NAME, H.SO_JOBID, D.SO_DET_SEQ as JOB_ID,
@ -265,9 +263,8 @@ class AppworxExtract:
FROM SO_JOB_HISTORY H FROM SO_JOB_HISTORY H
JOIN SO_CHAIN_DETAIL D ON D.SO_CHAIN_SEQ = H.SO_CHAIN_SEQ AND D.SO_DET_SEQ = H.SO_DET_SEQ JOIN SO_CHAIN_DETAIL D ON D.SO_CHAIN_SEQ = H.SO_CHAIN_SEQ AND D.SO_DET_SEQ = H.SO_DET_SEQ
LEFT JOIN SO_USER_TABLE U ON H.SO_USER_SEQ = U.SO_USER_SEQ LEFT JOIN SO_USER_TABLE U ON H.SO_USER_SEQ = U.SO_USER_SEQ
WHERE cast((FROM_TZ(CAST(H.SO_JOB_FINISHED as timestamp), 'US/Pacific') at time zone 'GMT') as date) >= WHERE --H.SO_JOB_FINISHED >= DATE '1970-01-01' - interval '8' hour + (%d - 3600) / 86400) and
(TO_DATE('1970-01-01','YYYY-MM-DD') + (%d - 3600) / 86400) and H.SO_CHAIN_ID = %d"""
H.SO_PARENTS_JOBID = %d and H.SO_CHAIN_ID = %d"""
else: else:
job_cmd = \ job_cmd = \
"""SELECT D.SO_TASK_NAME, U.SO_USER_NAME, H.SO_STATUS_NAME, H.SO_JOBID, D.SO_DET_SEQ as JOB_ID, """SELECT D.SO_TASK_NAME, U.SO_USER_NAME, H.SO_STATUS_NAME, H.SO_JOBID, D.SO_DET_SEQ as JOB_ID,
@ -279,9 +276,12 @@ class AppworxExtract:
JOIN SO_CHAIN_DETAIL D ON D.SO_CHAIN_SEQ = H.SO_CHAIN_SEQ AND D.SO_DET_SEQ = H.SO_DET_SEQ JOIN SO_CHAIN_DETAIL D ON D.SO_CHAIN_SEQ = H.SO_CHAIN_SEQ AND D.SO_DET_SEQ = H.SO_DET_SEQ
LEFT JOIN SO_USER_TABLE U ON H.SO_USER_SEQ = U.SO_USER_SEQ LEFT JOIN SO_USER_TABLE U ON H.SO_USER_SEQ = U.SO_USER_SEQ
WHERE H.SO_JOB_FINISHED >= SYSDATE - %d and WHERE H.SO_JOB_FINISHED >= SYSDATE - %d and
H.SO_PARENTS_JOBID = %d and H.SO_CHAIN_ID = %d""" H.SO_CHAIN_ID = %d"""
self.aw_cursor.execute(flow_cmd) try:
self.aw_cursor.execute(flow_cmd)
except Exception as e:
self.logger.error(e + "\n" + flow_cmd)
rows = DbUtil.dict_cursor(self.aw_cursor) rows = DbUtil.dict_cursor(self.aw_cursor)
row_count = 0 row_count = 0
@ -295,6 +295,7 @@ class AppworxExtract:
flow_exec_id = int(so_flow_id) flow_exec_id = int(so_flow_id)
except Exception as e: except Exception as e:
self.logger.error(e) self.logger.error(e)
self.logger.debug("processing flow_exec_id: %8d" % flow_exec_id)
flow_exec_record = AppworxFlowExecRecord(self.app_id, flow_exec_record = AppworxFlowExecRecord(self.app_id,
long(row['SO_JOB_SEQ']), long(row['SO_JOB_SEQ']),
@ -304,17 +305,17 @@ class AppworxExtract:
flow_exec_id, flow_exec_id,
row['SO_STATUS_NAME'], row['SO_STATUS_NAME'],
flow_attempt, flow_attempt,
row['SO_USER_NAME'], row['SO_USER_NAME'] if row['SO_USER_NAME'] else '',
long(row['JOB_STARTED']), long(row['JOB_STARTED']),
long(row['JOB_FINISHED']), long(row['JOB_FINISHED'] if row['JOB_FINISHED'] else 0),
self.wh_exec_id) self.wh_exec_id)
flow_exec_writer.append(flow_exec_record) flow_exec_writer.append(flow_exec_record)
new_appworx_cursor = self.aw_con.cursor() new_appworx_cursor = self.aw_con.cursor()
if self.last_execution_unix_time: if self.last_execution_unix_time:
new_appworx_cursor.execute(job_cmd % (self.last_execution_unix_time, flow_exec_id, long(row['SO_CHAIN_ID']))) new_appworx_cursor.execute(job_cmd % (long(self.last_execution_unix_time), flow_exec_id))
else: else:
new_appworx_cursor.execute(job_cmd % (int(self.lookback_period), flow_exec_id, long(row['SO_CHAIN_ID']))) new_appworx_cursor.execute(job_cmd % (int(self.lookback_period), flow_exec_id))
job_rows = DbUtil.dict_cursor(new_appworx_cursor) job_rows = DbUtil.dict_cursor(new_appworx_cursor)
for job in job_rows: for job in job_rows:

View File

@ -14,23 +14,16 @@
from wherehows.common import Constant from wherehows.common import Constant
from com.ziclix.python.sql import zxJDBC from com.ziclix.python.sql import zxJDBC
import ConfigParser, os
import DbUtil
from pyparsing import *
import sys
import hashlib
import gzip
import StringIO
import json
import datetime
import time
import re
from org.slf4j import LoggerFactory from org.slf4j import LoggerFactory
from AppworxLogParser import AppworxLogParser from AppworxLogParser import AppworxLogParser
from PigLogParser import PigLogParser from PigLogParser import PigLogParser
from BteqLogParser import BteqLogParser from BteqLogParser import BteqLogParser
import sys, os, re, time
import DbUtil
import hashlib
import ParseUtil import ParseUtil
class AppworxLineageExtract: class AppworxLineageExtract:
def __init__(self, args): def __init__(self, args):
@ -46,6 +39,7 @@ class AppworxLineageExtract:
self.local_script_path = args[Constant.AW_LOCAL_SCRIPT_PATH] self.local_script_path = args[Constant.AW_LOCAL_SCRIPT_PATH]
self.remote_script_path = args[Constant.AW_REMOTE_SCRIPT_PATH] self.remote_script_path = args[Constant.AW_REMOTE_SCRIPT_PATH]
self.aw_archive_dir = args[Constant.AW_ARCHIVE_DIR] self.aw_archive_dir = args[Constant.AW_ARCHIVE_DIR]
# self.aw_log_url = args[Constant.AW_LOG_URL]
self.bteq_source_target_override = args[Constant.AW_BTEQ_SOURCE_TARGET_OVERRIDE] self.bteq_source_target_override = args[Constant.AW_BTEQ_SOURCE_TARGET_OVERRIDE]
self.metric_override = args[Constant.AW_METRIC_OVERRIDE] self.metric_override = args[Constant.AW_METRIC_OVERRIDE]
self.skip_already_parsed = args[Constant.AW_SKIP_ALREADY_PARSED] self.skip_already_parsed = args[Constant.AW_SKIP_ALREADY_PARSED]
@ -106,17 +100,19 @@ class AppworxLineageExtract:
def get_log_file_name(self, module_name, days_offset=1): def get_log_file_name(self, module_name, days_offset=1):
if self.last_execution_unix_time: if self.last_execution_unix_time:
query = \ query = \
"""select je.*, fj.job_type, fl.flow_path, """select je.*, fj.job_type, fl.flow_path,
CONCAT('%s',CONCAT(CONCAT(DATE_FORMAT(FROM_UNIXTIME(je.start_time), '%%Y%%m%%d'),'/o'), CONCAT('o', je.job_exec_id, '.', LPAD(je.attempt_id, 2, 0)) as log_file_name,
CONCAT(je.job_exec_id, '.', LPAD(je.attempt_id, 2, 0), '.gz'))) as gzipped_file_name CONCAT('%s', DATE_FORMAT(FROM_UNIXTIME(je.end_time), '%%Y%%m%%d'), '/',
CONCAT('o', je.job_exec_id, '.', LPAD(je.attempt_id, 2, 0)), '.gz') as gzipped_file_name
from job_execution je from job_execution je
JOIN flow_job fj on je.app_id = fj.app_id and je.flow_id = fj.flow_id and fj.is_current = 'Y' and JOIN flow_job fj on je.app_id = fj.app_id and je.flow_id = fj.flow_id and fj.is_current = 'Y' and
je.job_id = fj.job_id je.job_id = fj.job_id
JOIN flow fl on fj.app_id = fl.app_id and fj.flow_id = fl.flow_id JOIN flow fl on fj.app_id = fl.app_id and fj.flow_id = fl.flow_id
WHERE je.app_id = %d WHERE je.app_id = %d
and je.end_time >= UNIX_TIMESTAMP(DATE_SUB(from_unixtime(%d), INTERVAL 1 HOUR)) and je.end_time >= %d - 3660
and UPPER(fj.job_type) = '%s' and fj.job_type = '%s'
ORDER BY je.flow_exec_id DESC, je.job_exec_id
""" """
self.logger.info(query % (self.aw_archive_dir, self.app_id, long(self.last_execution_unix_time), module_name)) self.logger.info(query % (self.aw_archive_dir, self.app_id, long(self.last_execution_unix_time), module_name))
self.aw_cursor.execute(query % self.aw_cursor.execute(query %
@ -124,19 +120,21 @@ class AppworxLineageExtract:
else: else:
query = \ query = \
"""select je.*, fj.job_type, fl.flow_path, """select je.*, fj.job_type, fl.flow_path,
CONCAT('%s',CONCAT(CONCAT(DATE_FORMAT(FROM_UNIXTIME(je.start_time), '%%Y%%m%%d'),'/o'), CONCAT('o', je.job_exec_id, '.', LPAD(je.attempt_id, 2, 0)) as log_file_name,
CONCAT(je.job_exec_id, '.', LPAD(je.attempt_id, 2, 0), '.gz'))) as gzipped_file_name CONCAT('%s', DATE_FORMAT(FROM_UNIXTIME(je.end_time), '%%Y%%m%%d'), '/',
CONCAT('o', je.job_exec_id, '.', LPAD(je.attempt_id, 2, 0)), '.gz') as gzipped_file_name
from job_execution je from job_execution je
JOIN flow_job fj on je.app_id = fj.app_id and je.flow_id = fj.flow_id and fj.is_current = 'Y' and JOIN flow_job fj on je.app_id = fj.app_id and je.flow_id = fj.flow_id and fj.is_current = 'Y' and
je.job_id = fj.job_id je.job_id = fj.job_id
JOIN flow fl on fj.app_id = fl.app_id and fj.flow_id = fl.flow_id JOIN flow fl on fj.app_id = fl.app_id and fj.flow_id = fl.flow_id
WHERE je.app_id = %d and WHERE je.app_id = %d and je.job_exec_status in ('FINISHED', 'SUCCEEDED', 'OK')
from_unixtime(je.end_time) >= CURRENT_DATE - INTERVAL %d HOUR and UPPER(fj.job_type) = '%s' and je.end_time >= unix_timestamp(CURRENT_DATE - INTERVAL %d DAY) and fj.job_type = '%s'
ORDER BY je.flow_exec_id DESC, je.job_exec_id
""" """
self.logger.info(query % (self.aw_archive_dir, self.app_id, int(self.look_back_days), module_name)) self.logger.info(query % (self.aw_archive_dir, self.app_id, int(self.look_back_days), module_name))
self.aw_cursor.execute(query % (self.aw_archive_dir, self.app_id, int(self.look_back_days), module_name)) self.aw_cursor.execute(query % (self.aw_archive_dir, self.app_id, int(self.look_back_days), module_name))
job_rows = DbUtil.copy_dict_cursor(self.aw_cursor) job_rows = DbUtil.copy_dict_cursor(self.aw_cursor)
self.logger.info(str(len(job_rows))) self.logger.info("%d job executions will be scanned for lineage" % len(job_rows))
return job_rows return job_rows
@ -426,7 +424,7 @@ class AppworxLineageExtract:
self.logger.error(str(sys.exc_info()[0])) self.logger.error(str(sys.exc_info()[0]))
def process_li_shell_gw(self): def process_li_shell_gw(self):
self.logger.info("process process_li_shell_gw") self.logger.info("process li_shell_gw")
parameter_query = \ parameter_query = \
""" """
SELECT param_value SELECT param_value
@ -643,7 +641,7 @@ class AppworxLineageExtract:
def process_li_hadoop(self): def process_li_hadoop(self):
self.logger.info("process li_pig") self.logger.info("process li_hadoop")
rows = self.get_log_file_name(module_name='LI_HADOOP') rows = self.get_log_file_name(module_name='LI_HADOOP')
for row in rows: for row in rows:
try: try:
@ -693,8 +691,7 @@ class AppworxLineageExtract:
source_database_id = int(self.db_lookup(tab['source_database'])) source_database_id = int(self.db_lookup(tab['source_database']))
full_table_name = \ full_table_name = \
tab['source_schema'] + \ (tab['source_schema'] + '.' if tab['source_schema'] is not None else '') + \
('.' if tab['source_schema'] is not None else '') + \
tab['source_table_name'] tab['source_table_name']
self.aw_cursor.execute(update_staging_lineage_query % self.aw_cursor.execute(update_staging_lineage_query %
(int(row['app_id']), int(row['flow_exec_id']), int(row['job_exec_id']), (int(row['app_id']), int(row['flow_exec_id']), int(row['job_exec_id']),
@ -704,7 +701,7 @@ class AppworxLineageExtract:
'source', int(srl), 0, 0, 'JDBC Read', 'source', int(srl), 0, 0, 'JDBC Read',
0, int(row['wh_etl_exec_id']) )) 0, int(row['wh_etl_exec_id']) ))
source_srl_no = srl source_srl_no = srl
srl = srl + 1 srl += 1
self.aw_cursor.execute(update_staging_lineage_query % self.aw_cursor.execute(update_staging_lineage_query %
(int(row['app_id']), int(row['flow_exec_id']), int(row['job_exec_id']), (int(row['app_id']), int(row['flow_exec_id']), int(row['job_exec_id']),
row['flow_path'], row['job_name'], int(row['start_time']), int(row['end_time']), row['flow_path'], row['job_name'], int(row['start_time']), int(row['end_time']),
@ -714,10 +711,10 @@ class AppworxLineageExtract:
tab['end_partition'] if tab['frequency'] != 'snapshot' else None, tab['end_partition'] if tab['frequency'] != 'snapshot' else None,
tab['frequency'], tab['frequency'], tab['frequency'], tab['frequency'],
'HDFS', 'HDFS',
'target', int(srl), int(source_srl_no), int(source_srl_no), 'JDBC Write', 'target', int(srl), int(source_srl_no), int(source_srl_no), 'HDFS Write',
int(tab['record_count']), int(row['wh_etl_exec_id']) )) int(tab['record_count']), int(row['wh_etl_exec_id']) ))
srl = srl + 1 srl += 1
elif results['script_type'] == 'CMD': elif results['script_type'] == 'CMD':
db_id = self.db_lookup(results['cluster']) db_id = self.db_lookup(results['cluster'])

View File

@ -12,31 +12,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# #
from wherehows.common.writers import FileWriter
from wherehows.common.schemas import AppworxFlowRecord
from wherehows.common.schemas import AppworxJobRecord
from wherehows.common.schemas import AppworxFlowDagRecord
from wherehows.common.schemas import AppworxFlowExecRecord
from wherehows.common.schemas import AppworxJobExecRecord
from wherehows.common.schemas import AppworxFlowScheduleRecord
from wherehows.common.schemas import AppworxFlowOwnerRecord
from wherehows.common.enums import AzkabanPermission
from wherehows.common import Constant
from wherehows.common.enums import SchedulerType
from com.ziclix.python.sql import zxJDBC
from pyparsing import * from pyparsing import *
import ParseUtil
import os
import ast
import DbUtil
import sys
import gzip
import StringIO
import json
import datetime
import time
import re
from org.slf4j import LoggerFactory from org.slf4j import LoggerFactory
import os, sys, gzip, re, ast
import ParseUtil
ParserElement.enablePackrat() ParserElement.enablePackrat()

View File

@ -12,31 +12,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# #
from wherehows.common.writers import FileWriter
from wherehows.common.schemas import AppworxFlowRecord
from wherehows.common.schemas import AppworxJobRecord
from wherehows.common.schemas import AppworxFlowDagRecord
from wherehows.common.schemas import AppworxFlowExecRecord
from wherehows.common.schemas import AppworxJobExecRecord
from wherehows.common.schemas import AppworxFlowScheduleRecord
from wherehows.common.schemas import AppworxFlowOwnerRecord
from wherehows.common.enums import AzkabanPermission
from wherehows.common import Constant
from wherehows.common.enums import SchedulerType
from com.ziclix.python.sql import zxJDBC
from metadata.etl.sqlParser import SqlParser from metadata.etl.sqlParser import SqlParser
from pyparsing import * from pyparsing import *
import ParseUtil
import ConfigParser, os
import DbUtil
import sys
import gzip
import StringIO
import json
import datetime
import time
import re
from org.slf4j import LoggerFactory from org.slf4j import LoggerFactory
import sys, os, json
import ParseUtil
import ConfigParser
class BteqLogParser: class BteqLogParser:

View File

@ -12,29 +12,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# #
from wherehows.common.writers import FileWriter
from wherehows.common.schemas import AppworxFlowRecord
from wherehows.common.schemas import AppworxJobRecord
from wherehows.common.schemas import AppworxFlowDagRecord
from wherehows.common.schemas import AppworxFlowExecRecord
from wherehows.common.schemas import AppworxJobExecRecord
from wherehows.common.schemas import AppworxFlowScheduleRecord
from wherehows.common.schemas import AppworxFlowOwnerRecord
from wherehows.common.enums import AzkabanPermission
from wherehows.common import Constant
from wherehows.common.enums import SchedulerType
from com.ziclix.python.sql import zxJDBC
from pyparsing import * from pyparsing import *
import os import time, re
import DbUtil
import sys
import gzip
import StringIO
import json
import datetime
import time
import re
from org.slf4j import LoggerFactory
def get_db_table_abstraction(table_name): def get_db_table_abstraction(table_name):
parsed_table_name = table_name.split('.') parsed_table_name = table_name.split('.')

View File

@ -12,30 +12,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# #
from wherehows.common.writers import FileWriter
from wherehows.common.schemas import AppworxFlowRecord
from wherehows.common.schemas import AppworxJobRecord
from wherehows.common.schemas import AppworxFlowDagRecord
from wherehows.common.schemas import AppworxFlowExecRecord
from wherehows.common.schemas import AppworxJobExecRecord
from wherehows.common.schemas import AppworxFlowScheduleRecord
from wherehows.common.schemas import AppworxFlowOwnerRecord
from wherehows.common.enums import AzkabanPermission
from wherehows.common import Constant
from wherehows.common.enums import SchedulerType
from com.ziclix.python.sql import zxJDBC
from pyparsing import *
import ParseUtil
import os
import DbUtil
import sys
import gzip
import StringIO
import json
import datetime
import time
import re
from org.slf4j import LoggerFactory from org.slf4j import LoggerFactory
import os, sys, gzip, re
import ParseUtil
class PigLogParser: class PigLogParser:

View File

@ -14,9 +14,9 @@
from wherehows.common import Constant from wherehows.common import Constant
from com.ziclix.python.sql import zxJDBC from com.ziclix.python.sql import zxJDBC
from org.slf4j import LoggerFactory
import DbUtil import DbUtil
import sys import sys
from org.slf4j import LoggerFactory
class ScriptCollect(): class ScriptCollect():