267 lines
12 KiB
Python

#
# Copyright 2015 LinkedIn Corp. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#
from wherehows.common.writers import FileWriter
from wherehows.common.schemas import AzkabanFlowRecord
from wherehows.common.schemas import AzkabanJobRecord
from wherehows.common.schemas import AzkabanFlowDagRecord
from wherehows.common.schemas import AzkabanFlowExecRecord
from wherehows.common.schemas import AzkabanJobExecRecord
from wherehows.common.schemas import AzkabanFlowScheduleRecord
from wherehows.common.schemas import AzkabanFlowOwnerRecord
from wherehows.common.enums import AzkabanPermission
from wherehows.common.utils import AzkabanJobExecUtil
from wherehows.common import Constant
from wherehows.common.enums import SchedulerType
from com.ziclix.python.sql import zxJDBC
from org.slf4j import LoggerFactory
import os, sys, json, gzip
import StringIO
import datetime, time
import DbUtil
class AzkabanExtract:
_period_unit_table = {'d': 'DAY',
'M': 'MONTH',
'h': 'HOUR',
'm': 'MINUTE',
'w': 'WEEK'}
def __init__(self, args):
self.logger = LoggerFactory.getLogger('jython script : ' + self.__class__.__name__)
self.app_id = int(args[Constant.JOB_REF_ID_KEY])
self.wh_exec_id = long(args[Constant.WH_EXEC_ID_KEY])
self.az_con = zxJDBC.connect(args[Constant.AZ_DB_URL_KEY],
args[Constant.AZ_DB_USERNAME_KEY],
args[Constant.AZ_DB_PASSWORD_KEY],
args[Constant.AZ_DB_DRIVER_KEY])
self.az_cursor = self.az_con.cursor()
self.lookback_period = args[Constant.AZ_EXEC_ETL_LOOKBACK_MINS_KEY]
self.app_folder = args[Constant.WH_APP_FOLDER_KEY]
self.metadata_folder = self.app_folder + "/" + str(SchedulerType.AZKABAN) + "/" + str(self.app_id)
if not os.path.exists(self.metadata_folder):
try:
os.makedirs(self.metadata_folder)
except Exception as e:
self.logger.error(e)
def run(self):
self.logger.info("Begin Azkaban Extract")
try:
self.collect_flow_jobs(self.metadata_folder + "/flow.csv", self.metadata_folder + "/job.csv", self.metadata_folder + "/dag.csv")
self.collect_flow_owners(self.metadata_folder + "/owner.csv")
self.collect_flow_schedules(self.metadata_folder + "/schedule.csv")
self.collect_flow_execs(self.metadata_folder + "/flow_exec.csv", self.metadata_folder + "/job_exec.csv", self.lookback_period)
finally:
self.az_cursor.close()
self.az_con.close()
def collect_flow_jobs(self, flow_file, job_file, dag_file):
self.logger.info("collect flow&jobs")
query = "SELECT distinct f.*, p.name as project_name FROM project_flows f inner join projects p on f.project_id = p.id and f.version = p.version where p.active = 1"
self.az_cursor.execute(query)
rows = DbUtil.dict_cursor(self.az_cursor)
flow_writer = FileWriter(flow_file)
job_writer = FileWriter(job_file)
dag_writer = FileWriter(dag_file)
row_count = 0
for row in rows:
row['version'] = 0 if (row["version"] is None) else row["version"]
json_column = 'json'
unzipped_content = gzip.GzipFile(mode='r', fileobj=StringIO.StringIO(row[json_column].tostring())).read()
try:
row[json_column] = json.loads(unzipped_content)
except:
pass
flow_path = row['project_name'] + ":" + row['flow_id']
flow_record = AzkabanFlowRecord(self.app_id,
row['flow_id'],
row['project_name'],
flow_path,
0,
row['modified_time'] / 1000,
row["version"],
'Y',
self.wh_exec_id)
flow_writer.append(flow_record)
# get flow jobs
nodes = row[json_column]['nodes']
for node in nodes:
job_record = AzkabanJobRecord(self.app_id,
flow_path,
row["version"],
node['id'],
flow_path + '/' + node['id'],
node['jobType'],
'Y',
self.wh_exec_id)
if node['jobType'] == 'flow':
job_record.setRefFlowPath(row['project_name'] + ":" + node['embeddedFlowId'])
job_writer.append(job_record)
# job dag
edges = row[json_column]['edges']
for edge in edges:
dag_edge = AzkabanFlowDagRecord(self.app_id,
flow_path,
row['version'],
flow_path + '/' + edge['source'],
flow_path + '/' + edge['target'],
self.wh_exec_id)
dag_writer.append(dag_edge)
row_count += 1
if row_count % 1000 == 0:
flow_writer.flush()
job_writer.flush()
dag_writer.flush()
flow_writer.close()
job_writer.close()
dag_writer.close()
def collect_flow_execs(self, flow_exec_file, job_exec_file, look_back_period):
self.logger.info( "collect flow&job executions")
flow_exec_writer = FileWriter(flow_exec_file)
job_exec_writer = FileWriter(job_exec_file)
cmd = """select * from execution_flows where end_time > UNIX_TIMESTAMP(now() - INTERVAL %d MINUTE) * 1000 """ % (int(look_back_period))
self.az_cursor.execute(cmd)
rows = DbUtil.dict_cursor(self.az_cursor)
row_count = 0
for row in rows:
json_column = 'flow_data'
unzipped_content = gzip.GzipFile(mode='r', fileobj=StringIO.StringIO(row[json_column].tostring())).read()
try:
row[json_column] = json.loads(unzipped_content)
except Exception as e:
self.logger.error(e)
pass
flow_data = row[json_column]
flow_path = flow_data['projectName'] + ":" + flow_data['flowId']
flow_exec_record = AzkabanFlowExecRecord(self.app_id,
flow_data['flowId'],
flow_path,
row['version'],
row['exec_id'],
flow_data['status'],
flow_data['attempt'],
row['submit_user'],
long(row['start_time']) / 1000,
long(row['end_time']) / 1000,
self.wh_exec_id)
flow_exec_writer.append(flow_exec_record)
nodes = flow_data['nodes']
job_exec_records = []
for node in nodes:
job_exec_record = AzkabanJobExecRecord(self.app_id,
flow_path,
row['version'],
row['exec_id'],
node['id'],
flow_path + "/" + node['id'],
None,
node['status'],
node['attempt'],
long(node['startTime']) / 1000,
long(node['endTime']) / 1000,
self.wh_exec_id)
job_exec_records.append(job_exec_record)
AzkabanJobExecUtil.sortAndSet(job_exec_records)
for r in job_exec_records:
job_exec_writer.append(r)
row_count += 1
if row_count % 10000 == 0:
flow_exec_writer.flush()
job_exec_writer.flush()
flow_exec_writer.close()
job_exec_writer.close()
def collect_flow_schedules(self, schedule_file):
# load flow scheduling info from table triggers
self.logger.info("collect flow schedule")
schedule_writer = FileWriter(schedule_file)
query = "select * from triggers"
self.az_cursor.execute(query)
rows = DbUtil.dict_cursor(self.az_cursor)
for row in rows:
json_column = 'data'
if row[json_column] != None:
unzipped_content = gzip.GzipFile(mode='r', fileobj=StringIO.StringIO(row[json_column].tostring())).read()
try:
row[json_column] = json.loads(unzipped_content)
except Exception as e:
self.logger.error(e)
pass
if not "projectId" in row[json_column]["actions"][0]["actionJson"]:
continue
# print json.dumps(row[json_column], indent=4)
if row[json_column]["triggerCondition"]["checkers"][0]["checkerJson"]["isRecurring"] == 'true':
unit, frequency, cron_expr = None, None, None
period = row[json_column]["triggerCondition"]["checkers"][0]["checkerJson"]["period"]
if period is not None and period != "null" and period[-1:] in self._period_unit_table:
unit = self._period_unit_table[period[-1:]]
frequency = int(row[json_column]["triggerCondition"]["checkers"][0]["checkerJson"]["period"][:-1])
if "cronExpression" in row[json_column]["triggerCondition"]["checkers"][0]["checkerJson"]:
cron_expr = row[json_column]["triggerCondition"]["checkers"][0]["checkerJson"]["cronExpression"]
schedule_record = AzkabanFlowScheduleRecord(self.app_id,
row[json_column]["actions"][0]["actionJson"]["projectName"] + ':' + row[json_column]["actions"][0]["actionJson"]["flowName"],
unit,
frequency,
cron_expr,
long(row[json_column]["triggerCondition"]["checkers"][0]["checkerJson"]["firstCheckTime"]) / 1000,
int(time.mktime(datetime.date(2099,12,31).timetuple())),
'0',
self.wh_exec_id
)
schedule_writer.append(schedule_record)
schedule_writer.close()
def collect_flow_owners(self, owner_file):
# load user info from table project_permissions
self.logger.info("collect owner&permissions")
user_writer = FileWriter(owner_file)
query = "select f.flow_id, p.name as project_name, p.version as project_verison, pp.name as owner, pp.permissions, pp.isGroup " \
"from project_flows f join project_permissions pp on f.project_id = pp.project_id join projects p on f.project_id = p.id where p.active = 1"
self.az_cursor.execute(query)
rows = DbUtil.dict_cursor(self.az_cursor)
for row in rows:
record = AzkabanFlowOwnerRecord(self.app_id,
row['project_name'] + ':' + row["flow_id"],
row["owner"],
AzkabanPermission(row["permissions"]).toFlatString(),
'GROUP' if row['isGroup'] == 1 else 'LDAP',
self.wh_exec_id)
user_writer.append(record)
user_writer.close()
if __name__ == "__main__":
props = sys.argv[1]
az = AzkabanExtract(props)
az.run()