Fix bugs. Reenforce logging. Format jython scripts. Add missing table DDL.

This commit is contained in:
SunZhaonan 2016-02-03 19:22:18 -08:00
parent 32e2547035
commit c3da00003e
35 changed files with 282 additions and 242 deletions

View File

@ -54,13 +54,14 @@ public class EtlJobActor extends UntypedActor {
if (msg.getEtlJobName().affectFlow()) {
ActorRegistry.treeBuilderActor.tell("flow", getSelf());
}
} catch (Exception e) {
} catch (Throwable e) { // catch all throwable at the highest level.
Logger.error("ETL job {} got a problem", msg.toDebugString());
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw);
Logger.error(sw.toString());
e.printStackTrace();
EtlJobDao.endRun(msg.getWhEtlExecId(), EtlJobStatus.ERROR, e.getMessage());
EtlJobDao.endRun(msg.getWhEtlExecId(), EtlJobStatus.ERROR, sw.toString().substring(0,500));
}
}
}

View File

@ -13,5 +13,3 @@ scp target/universal/backend-service-1.0-SNAPSHOT/lib/schemaFetch.jar ${TARGET_S
scp target/universal/backend-service-1.0-SNAPSHOT/lib/wherehows-common-1.0.jar ${TARGET_SERVER}:~/backendServer/lib/;
scp target/universal/backend-service-1.0-SNAPSHOT/bin/* ${TARGET_SERVER}:~/backendServer/bin/;

View File

@ -72,3 +72,30 @@ CREATE TABLE stg_dataset_owner_unmatched (
KEY dataset_index (dataset_urn),
KEY db_name_index (db_name)
);
CREATE TABLE `dir_external_user_info` (
`app_id` smallint(5) unsigned NOT NULL,
`user_id` varchar(50) NOT NULL,
`urn` varchar(200) DEFAULT NULL,
`full_name` varchar(200) DEFAULT NULL,
`display_name` varchar(200) DEFAULT NULL,
`title` varchar(200) DEFAULT NULL,
`employee_number` int(10) unsigned DEFAULT NULL,
`manager_urn` varchar(200) DEFAULT NULL,
`manager_user_id` varchar(50) DEFAULT NULL,
`manager_employee_number` int(10) unsigned DEFAULT NULL,
`default_group_name` varchar(100) DEFAULT NULL,
`email` varchar(200) DEFAULT NULL,
`department_id` int(10) unsigned DEFAULT '0',
`department_name` varchar(200) DEFAULT NULL,
`start_date` date DEFAULT NULL,
`mobile_phone` varchar(50) DEFAULT NULL,
`is_active` char(1) DEFAULT 'Y',
`org_hierarchy` varchar(500) DEFAULT NULL,
`org_hierarchy_depth` tinyint(3) unsigned DEFAULT NULL,
`created_time` int(10) unsigned DEFAULT NULL COMMENT 'the create time in epoch',
`modified_time` int(10) unsigned DEFAULT NULL COMMENT 'the modified time in epoch',
`wh_etl_exec_id` bigint(20) DEFAULT NULL COMMENT 'wherehows etl execution id that modified this record',
PRIMARY KEY (`user_id`,`app_id`),
KEY `email` (`email`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

View File

@ -23,13 +23,10 @@ import org.slf4j.LoggerFactory;
* Created by zsun on 12/14/15.
*/
public class HiveViewDependency {
final Logger logger = LoggerFactory.getLogger(getClass());
LineageInfo lineageInfoTool;
final static Logger logger = LoggerFactory.getLogger(HiveViewDependency.class);
static LineageInfo lineageInfoTool = new LineageInfo();
public HiveViewDependency() {
lineageInfoTool = new LineageInfo();
}
public String[] getViewDependency(String hiveQl) {
public static String[] getViewDependency(String hiveQl) {
try {
lineageInfoTool.getLineageInfo(hiveQl);
TreeSet<String> inputs = lineageInfoTool.getInputTableList();

View File

@ -76,16 +76,24 @@ public class AzJobChecker {
return getRecentFinishedJobFromFlow(beginTimeStamp, currentTimeStamp);
}
/**
* Overload function getRecentFinishedJobFromFlow
* @param timeFrameMinutes
* @param endTimeStamp in milli second
* @return
* @throws IOException
* @throws SQLException
*/
public List<AzkabanJobExecRecord> getRecentFinishedJobFromFlow(int timeFrameMinutes, long endTimeStamp)
throws IOException, SQLException {
long beginTimeStamp = endTimeStamp - 60 * timeFrameMinutes;
return getRecentFinishedJobFromFlow(beginTimeStamp * 1000, endTimeStamp * 1000); // convert to milli seconds
long beginTimeStamp = endTimeStamp - 60 * timeFrameMinutes * 1000; // convert minutes to milli seconds
return getRecentFinishedJobFromFlow(beginTimeStamp, endTimeStamp);
}
/**
* Read the blob from "flow_data", do a topological sort on the nodes. Give them the sort id.
* @param startTimeStamp the begin timestamp
* @param endTimeStamp the end timestamp
* @param startTimeStamp the begin timestamp in milli seconds
* @param endTimeStamp the end timestamp in milli seconds
* @return
*/
public List<AzkabanJobExecRecord> getRecentFinishedJobFromFlow(long startTimeStamp, long endTimeStamp)

View File

@ -15,7 +15,6 @@ package metadata.etl.lineage;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.JsonPath;
import java.util.Arrays;
import org.codehaus.jettison.json.JSONException;
import wherehows.common.DatasetPath;
import wherehows.common.schemas.AzkabanJobExecRecord;

View File

@ -69,6 +69,7 @@ public class AzLineageExtractorMaster {
* Entry point.
* All recent finished azkaban jobs' lineage. Will write to database stagging table
* @param timeFrame in minutes
* @param endTimeStamp in millisecond
* @throws Exception
*/
public void run(int timeFrame, long endTimeStamp)

View File

@ -69,7 +69,7 @@ public class HadoopNameNodeExtractor {
String gssFileName = possition + "/gss-jaas.conf";
File gssFile = new File(gssFileName);
if (gssFile.exists()) {
logger.info("find gss-jaas.conf file in : {}", gssFile.getAbsolutePath());
logger.debug("find gss-jaas.conf file in : {}", gssFile.getAbsolutePath());
System.setProperty("java.security.auth.login.config", gssFile.getAbsolutePath());
break;
} else {
@ -80,7 +80,7 @@ public class HadoopNameNodeExtractor {
String krb5FileName = possition + "/krb5.conf";
File krb5File = new File(krb5FileName);
if (krb5File.exists()) {
logger.info("find krb5.conf file in : {}", krb5File.getAbsolutePath());
logger.debug("find krb5.conf file in : {}", krb5File.getAbsolutePath());
System.setProperty("java.security.krb5.conf", krb5File.getAbsolutePath());
break;
} else {

View File

@ -24,10 +24,6 @@ from wherehows.common.enums import AzkabanPermission
from wherehows.common.utils import AzkabanJobExecUtil
from wherehows.common import Constant
from wherehows.common.enums import SchedulerType
__author__ = 'zechen'
from com.ziclix.python.sql import zxJDBC
import os
import DbUtil
@ -37,6 +33,7 @@ import StringIO
import json
import datetime
import time
from org.slf4j import LoggerFactory
class AzkabanExtract:
@ -47,6 +44,7 @@ class AzkabanExtract:
'w': 'WEEK'}
def __init__(self, args):
self.logger = LoggerFactory.getLogger('jython script : ' + self.__class__.__name__)
self.app_id = int(args[Constant.APP_ID_KEY])
self.wh_exec_id = long(args[Constant.WH_EXEC_ID_KEY])
self.az_con = zxJDBC.connect(args[Constant.AZ_DB_URL_KEY],
@ -62,9 +60,10 @@ class AzkabanExtract:
try:
os.makedirs(self.metadata_folder)
except Exception as e:
print e
self.logger.error(e)
def run(self):
self.logger.info("Begin Azkaban Extract")
try:
self.collect_flow_jobs(self.metadata_folder + "/flow.csv", self.metadata_folder + "/job.csv", self.metadata_folder + "/dag.csv")
self.collect_flow_owners(self.metadata_folder + "/owner.csv")
@ -75,7 +74,7 @@ class AzkabanExtract:
self.az_con.close()
def collect_flow_jobs(self, flow_file, job_file, dag_file):
print "collect flow&jobs"
self.logger.info("collect flow&jobs")
query = "SELECT distinct f.*, p.name as project_name FROM project_flows f inner join projects p on f.project_id = p.id and f.version = p.version where p.active = 1"
self.az_cursor.execute(query)
rows = DbUtil.dict_cursor(self.az_cursor)
@ -145,7 +144,7 @@ class AzkabanExtract:
dag_writer.close()
def collect_flow_execs(self, flow_exec_file, job_exec_file, look_back_period):
print "collect flow&job executions"
self.logger.info( "collect flow&job executions")
flow_exec_writer = FileWriter(flow_exec_file)
job_exec_writer = FileWriter(job_exec_file)
@ -159,7 +158,7 @@ class AzkabanExtract:
try:
row[json_column] = json.loads(unzipped_content)
except Exception as e:
print e
self.logger.error(e)
pass
flow_data = row[json_column]
flow_path = flow_data['projectName'] + ":" + flow_data['flowId']
@ -205,7 +204,7 @@ class AzkabanExtract:
def collect_flow_schedules(self, schedule_file):
# load flow scheduling info from table triggers
print "collect flow schedule"
self.logger.info("collect flow schedule")
schedule_writer = FileWriter(schedule_file)
query = "select * from triggers"
self.az_cursor.execute(query)
@ -217,7 +216,7 @@ class AzkabanExtract:
try:
row[json_column] = json.loads(unzipped_content)
except Exception as e:
print e
self.logger.error(e)
pass
if not "projectId" in row[json_column]["actions"][0]["actionJson"]:
@ -242,7 +241,7 @@ class AzkabanExtract:
def collect_flow_owners(self, owner_file):
# load user info from table project_permissions
print "collect owner&permissions"
self.logger.info("collect owner&permissions")
user_writer = FileWriter(owner_file)
query = "select f.flow_id, p.name as project_name, p.version as project_verison, pp.name as owner, pp.permissions, pp.isGroup " \
"from project_flows f join project_permissions pp on f.project_id = pp.project_id join projects p on f.project_id = p.id where p.active = 1"

View File

@ -13,14 +13,10 @@
#
from jython.SchedulerLoad import SchedulerLoad
__author__ = 'zechen'
import sys
class AzkabanLoad(SchedulerLoad):
def __init__(self, args):
SchedulerLoad.__init__(self, args)
@ -33,11 +29,11 @@ class AzkabanLoad(SchedulerLoad):
SET f.is_active = 'N'
WHERE s.flow_id IS NULL AND f.app_id = {app_id}
""".format(app_id=self.app_id)
print cmd
self.wh_cursor.execute(cmd)
self.wh_con.commit()
SchedulerLoad.load_flows(self)
if __name__ == "__main__":
props = sys.argv[1]
az = AzkabanLoad(props)

View File

@ -14,12 +14,8 @@
from jython.SchedulerTransform import SchedulerTransform
from wherehows.common.enums import SchedulerType
__author__ = 'zechen'
import sys
class AzkabanTransform(SchedulerTransform):
SchedulerTransform._tables["flows"]["columns"] = "app_id, flow_name, flow_group, flow_path, flow_level, source_modified_time, source_version, is_active, wh_etl_exec_id"
SchedulerTransform._tables["jobs"]["columns"] = "app_id, flow_path, source_version, job_name, job_path, job_type, ref_flow_path, is_current, wh_etl_exec_id"

View File

@ -75,6 +75,7 @@ class DatasetTreeBuilder:
self.build_trie()
self.write_to_file()
if __name__ == "__main__":
d = DatasetTreeBuilder(sys.argv[1])
d.run()

View File

@ -12,8 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#
__author__ = 'zechen'
def dict_cursor(cursor):
description = [x[0] for x in cursor.description]

View File

@ -78,6 +78,7 @@ class FlowTreeBuilder:
self.build_trie()
self.write_to_file()
if __name__ == "__main__":
ftb = FlowTreeBuilder(sys.argv[1])
ftb.run()

View File

@ -12,8 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#
__author__ = 'zechen'
from org.slf4j import LoggerFactory
from wherehows.common import Constant
from com.ziclix.python.sql import zxJDBC
import sys
@ -21,6 +20,7 @@ import sys
class GitLoad:
def __init__(self, args):
self.logger = LoggerFactory.getLogger('jython script : ' + self.__class__.__name__)
self.wh_con = zxJDBC.connect(args[Constant.WH_DB_URL_KEY],
args[Constant.WH_DB_USERNAME_KEY],
args[Constant.WH_DB_PASSWORD_KEY],
@ -47,7 +47,7 @@ class GitLoad:
from stg_source_code_commit_info s
where s.app_id = {app_id}
""".format(app_id=self.app_id)
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()

View File

@ -12,8 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#
__author__ = 'zechen'
from org.slf4j import LoggerFactory
from wherehows.common import Constant
from com.ziclix.python.sql import zxJDBC
import sys
@ -40,6 +39,7 @@ class OwnerTransform:
"""
def __init__(self, args):
self.logger = LoggerFactory.getLogger('jython script : ' + self.__class__.__name__)
self.wh_con = zxJDBC.connect(args[Constant.WH_DB_URL_KEY],
args[Constant.WH_DB_USERNAME_KEY],
args[Constant.WH_DB_PASSWORD_KEY],
@ -62,7 +62,7 @@ class OwnerTransform:
# Clear stagging table
query = self._clear_staging_tempalte.format(table=t.get("table"))
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
@ -73,7 +73,7 @@ class OwnerTransform:
columns=t.get("columns"),
app_id=self.app_id,
wh_etl_exec_id=self.wh_etl_exec_id)
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()

View File

@ -12,13 +12,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#
__author__ = 'zsun'
import sys
from org.slf4j import LoggerFactory
from com.ziclix.python.sql import zxJDBC
from wherehows.common import Constant
class HdfsLoad:
def __init__(self):
self.logger = LoggerFactory.getLogger('jython script : ' + self.__class__.__name__)
def load_metadata(self):
"""
Load dataset metadata into final table
@ -30,16 +33,11 @@ class HdfsLoad:
LOAD DATA LOCAL INFILE '{source_file}'
INTO TABLE stg_dict_dataset
FIELDS TERMINATED BY '\Z' ESCAPED BY '\\'
FIELDS TERMINATED BY '\Z' ESCAPED BY '\0'
(`name`, `schema`, properties, fields, urn, source, sample_partition_full_path, source_created_time, source_modified_time)
SET db_id = {db_id},
-- TODO storage_type = 'Avro',
wh_etl_exec_id = {wh_etl_exec_id};
-- SHOW WARNINGS LIMIT 20;
-- SELECT COUNT(*) FROM stg_dict_dataset;
-- clear
DELETE FROM stg_dict_dataset
where db_id = {db_id}
@ -125,7 +123,7 @@ class HdfsLoad:
analyze table dict_dataset;
'''.format(source_file=self.input_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id)
for state in load_cmd.split(";"):
print state
self.logger.debug(state)
cursor.execute(state)
self.conn_mysql.commit()
cursor.close()
@ -256,7 +254,7 @@ class HdfsLoad:
analyze table dict_field_detail;
'''.format(source_file=self.input_field_file, db_id=self.db_id)
for state in load_field_cmd.split(";"):
print state
self.logger.debug(state)
cursor.execute(state)
self.conn_mysql.commit()
cursor.close()
@ -299,7 +297,7 @@ class HdfsLoad:
WHERE s.db_id = {db_id} AND d.ref_id = 0;
'''.format(source_file=self.input_sample_file, db_id=self.db_id)
for state in load_sample_cmd.split(";"):
print state
self.logger.debug(state)
cursor.execute(state)
self.conn_mysql.commit()
cursor.close()

View File

@ -12,16 +12,18 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#
__author__ = 'zsun'
import json
import csv, sys
from org.slf4j import LoggerFactory
from wherehows.common.writers import FileWriter
from wherehows.common.schemas import DatasetSchemaRecord, DatasetFieldRecord
from wherehows.common import Constant
class HdfsTransform:
def __init__(self):
self.logger = LoggerFactory.getLogger('jython script : ' + self.__class__.__name__)
def transform(self, raw_metadata, metadata_output, field_metadata_output):
# sys.setdefaultencoding("UTF-8")
@ -109,7 +111,7 @@ class HdfsTransform:
if effective_type_index_in_type >= 0 and type(f['type'][effective_type_index_in_type]) == dict:
if f['type'][effective_type_index_in_type].has_key('items') and type(
f['type'][effective_type_index_in_type]['items']) == list:
f['type'][effective_type_index_in_type]['items']) == list:
for item in f['type'][effective_type_index_in_type]['items']:
if type(item) == dict and item.has_key('fields'):
@ -117,7 +119,7 @@ class HdfsTransform:
fields_json_to_csv(output_list_, current_field_path, item['fields'])
elif f['type'][effective_type_index_in_type].has_key('fields'):
# if f['type'][effective_type_index_in_type].has_key('namespace'):
# o_field_namespace = f['type'][effective_type_index_in_type]['namespace']
# o_field_namespace = f['type'][effective_type_index_in_type]['namespace']
current_field_path = o_field_name if parent_field_path == '' else parent_field_path + '.' + o_field_name
fields_json_to_csv(output_list_, current_field_path, f['type'][effective_type_index_in_type]['fields'])
@ -127,7 +129,7 @@ class HdfsTransform:
try:
j = json.loads(line)
except:
print " Invalid JSON:\n%s" % line
self.logger.error(" Invalid JSON:\n%s" % line)
continue
i += 1
@ -151,7 +153,7 @@ class HdfsTransform:
elif o_properties.has_key('uri'):
o_urn = o_properties['uri']
else:
print '*** Warning: "uri" is not found in %s' % j['name']
self.logger.info('*** Warning: "uri" is not found in %s' % j['name'])
o_urn = ''
if o_urn.find('hdfs://') == 0:
@ -196,14 +198,15 @@ class HdfsTransform:
o_properties['source'] = j['attributes']['source']
if o_properties.has_key('source') and (
not o_properties.has_key('instance') or o_properties['instance'] != 'Espresso'):
not o_properties.has_key('instance') or o_properties['instance'] != 'Espresso'):
o_source = o_properties['source']
elif o_properties.has_key('instance'):
o_source = o_properties['instance']
else:
o_source = 'HDFS'
print "%4i (%6i): %4i fields found in [%s]@%s within %s" % (i, len(j), len(o_fields), o_name, o_urn, o_source)
self.logger.info(
"%4i (%6i): %4i fields found in [%s]@%s within %s" % (i, len(j), len(o_fields), o_name, o_urn, o_source))
dataset_schema_record = DatasetSchemaRecord(o_name, json.dumps(j, sort_keys=True),
json.dumps(o_properties, sort_keys=True), json.dumps(o_fields), o_urn,
@ -224,6 +227,7 @@ if __name__ == "__main__":
t = HdfsTransform()
t.transform(args[Constant.HDFS_SCHEMA_LOCAL_PATH_KEY], args[Constant.HDFS_SCHEMA_RESULT_KEY], args[Constant.HDFS_FIELD_RESULT_KEY])
t.transform(args[Constant.HDFS_SCHEMA_LOCAL_PATH_KEY], args[Constant.HDFS_SCHEMA_RESULT_KEY],
args[Constant.HDFS_FIELD_RESULT_KEY])

View File

@ -12,11 +12,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#
from org.slf4j import LoggerFactory
from com.ziclix.python.sql import zxJDBC
import sys, os, re, json
import datetime
from wherehows.common import Constant
class TableInfo:
""" Class to define the variable name """
table_name = 'name'
@ -44,6 +46,7 @@ class TableInfo:
table_type, location, view_expended_text, input_format, output_format, is_compressed,
is_storedassubdirectories, etl_source]
class HiveExtract:
"""
Extract hive metadata from hive metastore. store it in a json file
@ -53,6 +56,9 @@ class HiveExtract:
table_dict = {} # fullname : index
serde_param_columns = []
def __init__(self):
self.logger = LoggerFactory.getLogger('jython script : ' + self.__class__.__name__)
def get_table_info_from_v2(self, database_name):
"""
get table, column info from table columns_v2
@ -186,8 +192,8 @@ class HiveExtract:
previous_db_name = row_value[0]
previous_tb_name = row_value[1]
print "%s %6d tables processed for database %12s from COLUMN_V2" % (
datetime.datetime.now(), table_idx + 1, row_value[0])
self.logger.info("%s %6d tables processed for database %12s from COLUMN_V2" % (
datetime.datetime.now(), table_idx + 1, row_value[0]))
def format_table_metadata_serde(self, rows, schema):
"""
@ -231,8 +237,8 @@ class HiveExtract:
table_idx += 1
self.table_dict[full_name] = table_idx
print "%s %6d tables processed for database %12s from SERDE_PARAM" % (
datetime.datetime.now(), table_idx + 1, row_value[0])
self.logger.info("%s %6d tables processed for database %12s from SERDE_PARAM" % (
datetime.datetime.now(), table_idx + 1, row_value[0]))
def run(self, schema_output_file, sample_output_file):
"""
@ -252,7 +258,7 @@ class HiveExtract:
# sample_file_writer = FileWriter(sample_output_file)
for database_name in self.databases:
print "Collecting hive tables in database : " + database_name
self.logger.info("Collecting hive tables in database : " + database_name)
# tables from schemaLiteral
rows = []
begin = datetime.datetime.now().strftime("%H:%M:%S")
@ -260,7 +266,7 @@ class HiveExtract:
if len(rows) > 0:
self.format_table_metadata_serde(rows, schema)
end = datetime.datetime.now().strftime("%H:%M:%S")
print "Get table info from Serde %12s [%s -> %s]\n" % (database_name, str(begin), str(end))
self.logger.info("Get table info from Serde %12s [%s -> %s]\n" % (database_name, str(begin), str(end)))
# tables from Column V2
rows = []
@ -269,7 +275,7 @@ class HiveExtract:
if len(rows) > 0:
self.format_table_metadata_v2(rows, schema)
end = datetime.datetime.now().strftime("%H:%M:%S")
print "Get table info from COLUMN_V2 %12s [%s -> %s]\n" % (database_name, str(begin), str(end))
self.logger.info("Get table info from COLUMN_V2 %12s [%s -> %s]\n" % (database_name, str(begin), str(end)))
schema_json_file.write(json.dumps(schema, indent=None) + '\n')
@ -303,8 +309,6 @@ if __name__ == "__main__":
try:
e.databases = e.get_all_databases()
print 'Process databases : '
print e.databases
e.run(args[Constant.HIVE_SCHEMA_JSON_FILE_KEY], None)
finally:
e.conn_hms.close()

View File

@ -13,11 +13,15 @@
#
import sys
from org.slf4j import LoggerFactory
from com.ziclix.python.sql import zxJDBC
from wherehows.common import Constant
class HiveLoad:
def __init__(self):
self.logger = LoggerFactory.getLogger('jython script : ' + self.__class__.__name__)
def load_metadata(self):
cursor = self.conn_mysql.cursor()
load_cmd = """
@ -96,7 +100,7 @@ class HiveLoad:
""".format(source_file=self.input_schema_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id)
for state in load_cmd.split(";"):
print state
self.logger.debug(state)
cursor.execute(state)
self.conn_mysql.commit()
cursor.close()
@ -108,7 +112,7 @@ class HiveLoad:
"""
cursor = self.conn_mysql.cursor()
load_cmd = """
DELETE FROM stg_dict_field_detail where db_id = {db_id};
DELETE FROM stg_dict_field_detail WHERE db_id = {db_id};
LOAD DATA LOCAL INFILE '{source_file}'
INTO TABLE stg_dict_field_detail
@ -117,7 +121,7 @@ class HiveLoad:
@data_size, @precision, @scale, @is_nullable, @is_indexed, @is_partitioned, @default_value, @namespace, description,
@dummy
)
set
SET
parent_path=nullif(@parent_path,'null')
, data_size=nullif(@data_size,'null')
, data_precision=nullif(@precision,'null')
@ -130,18 +134,19 @@ class HiveLoad:
, db_id = {db_id}
;
analyze table stg_dict_field_detail;
ANALYZE TABLE stg_dict_field_detail;
""".format(source_file=self.input_field_file, db_id=self.db_id)
# didn't load into final table for now
for state in load_cmd.split(";"):
print state
self.logger.debug(state)
cursor.execute(state)
self.conn_mysql.commit()
cursor.close()
if __name__ == "__main__":
args = sys.argv[1]

View File

@ -13,9 +13,10 @@
#
import json
import pprint, datetime
import datetime
import sys, os
import time
from org.slf4j import LoggerFactory
from wherehows.common.writers import FileWriter
from wherehows.common.schemas import DatasetSchemaRecord, DatasetFieldRecord
from wherehows.common import Constant
@ -23,7 +24,11 @@ from HiveExtract import TableInfo
from org.apache.hadoop.hive.ql.tools import LineageInfo
from metadata.etl.dataset.hive import HiveViewDependency
class HiveTransform:
def __init__(self):
self.logger = LoggerFactory.getLogger('jython script : ' + self.__class__.__name__)
def transform(self, input, hive_metadata, hive_field_metadata):
"""
convert from json to csv
@ -32,8 +37,6 @@ class HiveTransform:
:param hive_field_metadata: output data file for hive field metadata
:return:
"""
pp = pprint.PrettyPrinter(indent=1)
f_json = open(input)
all_data = json.load(f_json)
f_json.close()
@ -73,8 +76,7 @@ class HiveTransform:
try:
schema_data = json.loads(table[TableInfo.schema_literal])
except ValueError:
print "Schema json error for table : "
print table
self.logger.error("Schema json error for table : \n" + str(table))
schema_json = schema_data
# process each field
@ -128,7 +130,7 @@ class HiveTransform:
schema_file_writer.flush()
field_file_writer.flush()
print "%20s contains %6d tables" % (one_db_info['database'], i)
self.logger.info("%20s contains %6d tables" % (one_db_info['database'], i))
schema_file_writer.close()
field_file_writer.close()

View File

@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#
from org.slf4j import LoggerFactory
from javax.naming.directory import InitialDirContext
from javax.naming import Context
from javax.naming.directory import SearchControls
@ -25,6 +26,7 @@ from java.io import FileWriter
class LdapExtract:
def __init__(self, args):
self.logger = LoggerFactory.getLogger('jython script : ' + self.__class__.__name__)
self.args = args
self.app_id = int(args[Constant.APP_ID_KEY])
self.group_app_id = int(args[Constant.LDAP_GROUP_APP_ID_KEY])
@ -35,7 +37,7 @@ class LdapExtract:
try:
os.makedirs(self.metadata_folder)
except Exception as e:
print e
self.logger.error(e)
self.ldap_user = set()
self.group_map = dict()
@ -63,7 +65,8 @@ class LdapExtract:
# load the java Hashtable out of the ldap server
# Query starting point and query target
search_target = '(objectClass=person)'
return_attributes_standard = ['user_id', 'distinct_name', 'name', 'display_name', 'title', 'employee_number', 'manager', 'mail', 'department_number', 'department', 'start_date', 'mobile']
return_attributes_standard = ['user_id', 'distinct_name', 'name', 'display_name', 'title', 'employee_number',
'manager', 'mail', 'department_number', 'department', 'start_date', 'mobile']
return_attributes_actual = self.split_property(self.args[Constant.LDAP_SEARCH_RETURN_ATTRS_KEY])
return_attributes_map = dict(zip(return_attributes_standard, return_attributes_actual))
@ -104,7 +107,7 @@ class LdapExtract:
ldap_user_tuple.append(self.wh_exec_id)
ldap_records.append(ldap_user_tuple)
print "%d records found in ldap search" % (len(self.ldap_user))
self.logger.info("%d records found in ldap search" % (len(self.ldap_user)))
csv_writer = csv.writer(open(file, "w"), delimiter='\x1a', quoting=csv.QUOTE_MINIMAL, lineterminator="\n")
csv_writer.writerows(ldap_records)
@ -159,7 +162,7 @@ class LdapExtract:
sort_id += 1
else:
pass
print "%d records found in group accounts" % (len(self.group_map))
self.logger.info("%d records found in group accounts" % (len(self.group_map)))
csv_writer = csv.writer(open(file, "w"), delimiter='\x1a', quoting=csv.QUOTE_MINIMAL, lineterminator="\n")
csv_writer.writerows(ldap_records)

View File

@ -12,8 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#
__author__ = 'zechen'
from org.slf4j import LoggerFactory
from wherehows.common import Constant
from com.ziclix.python.sql import zxJDBC
import sys
@ -21,6 +20,7 @@ import sys
class LdapLoad:
def __init__(self, args):
self.logger = LoggerFactory.getLogger('jython script : ' + self.__class__.__name__)
self.wh_con = zxJDBC.connect(args[Constant.WH_DB_URL_KEY],
args[Constant.WH_DB_USERNAME_KEY],
args[Constant.WH_DB_PASSWORD_KEY],
@ -70,7 +70,7 @@ class LdapLoad:
modified_time = unix_timestamp(NOW()),
wh_etl_exec_id = s.wh_etl_exec_id
"""
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
@ -83,7 +83,7 @@ class LdapLoad:
modified_time = unix_timestamp(NOW()),
wh_etl_exec_id = s.wh_etl_exec_id
"""
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
@ -96,7 +96,7 @@ class LdapLoad:
modified_time = unix_timestamp(NOW()),
wh_etl_exec_id = s.wh_etl_exec_id
"""
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()

View File

@ -12,8 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#
__author__ = 'zechen'
from org.slf4j import LoggerFactory
from wherehows.common import Constant
from com.ziclix.python.sql import zxJDBC
import sys
@ -94,6 +93,7 @@ class LdapTransform:
"""
def __init__(self, args):
self.logger = LoggerFactory.getLogger('jython script : ' + self.__class__.__name__)
self.wh_con = zxJDBC.connect(args[Constant.WH_DB_URL_KEY],
args[Constant.WH_DB_USERNAME_KEY],
args[Constant.WH_DB_PASSWORD_KEY],
@ -127,7 +127,7 @@ class LdapTransform:
# Load file into stagging table
query = self._read_file_template.format(folder=self.metadata_folder, file=t.get("file"), table=t.get("table"), columns=t.get("columns"))
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
@ -137,21 +137,21 @@ class LdapTransform:
if 'nullif_columns' in t:
for column in t['nullif_columns']:
query = self._update_column_to_null_template.format(table=t.get("table"), column=column, column_value=t['nullif_columns'][column], app_id=self.app_id)
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
def update_manager_info(self):
t = self._tables["ldap_user"]
query = self._update_manager_info.format(table=t.get("table"), app_id=self.app_id)
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
def update_hierarchy_info(self):
t = self._tables["ldap_user"]
query = self._get_manager_edge.format(table=t.get("table"), app_id=self.app_id)
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
pair = dict()
hierarchy = dict()
@ -177,7 +177,7 @@ class LdapTransform:
if count % 1000 == 0:
query = self._update_hierarchy_info.format(table=t.get("table"), app_id=self.app_id, user_ids=",".join(user_ids), org_hierarchy_long_string=org_hierarchy_long_string,
org_hierarchy_depth_long_string=org_hierarchy_depth_long_string)
# print query
# self.logger.debug(query)
self.wh_cursor.executemany(query)
user_ids = []
org_hierarchy_long_string = ""
@ -185,7 +185,7 @@ class LdapTransform:
query = self._update_hierarchy_info.format(table=t.get("table"), app_id=self.app_id, user_ids=",".join(user_ids), org_hierarchy_long_string=org_hierarchy_long_string,
org_hierarchy_depth_long_string=org_hierarchy_depth_long_string)
# print query
# self.logger.debug(query)
self.wh_cursor.executemany(query)
self.wh_con.commit()

View File

@ -22,21 +22,16 @@ from wherehows.common.schemas import OozieJobExecRecord
from wherehows.common.schemas import OozieFlowScheduleRecord
from wherehows.common.schemas import OozieFlowDagRecord
from wherehows.common.enums import SchedulerType
__author__ = 'zechen'
from com.ziclix.python.sql import zxJDBC
from org.slf4j import LoggerFactory
import os
import DbUtil
import sys
class OozieExtract:
def __init__(self, args):
self.logger = LoggerFactory.getLogger('jython script : ' + self.__class__.__name__)
self.app_id = int(args[Constant.APP_ID_KEY])
self.wh_exec_id = long(args[Constant.WH_EXEC_ID_KEY])
self.oz_con = zxJDBC.connect(args[Constant.OZ_DB_URL_KEY],
@ -52,7 +47,7 @@ class OozieExtract:
try:
os.makedirs(self.metadata_folder)
except Exception as e:
print e
self.logger.error(e)
self.get_oozie_version()
@ -60,7 +55,7 @@ class OozieExtract:
query = "select data from OOZIE_SYS where name = 'oozie.version'"
self.oz_cursor.execute(query)
self.oz_version = self.oz_cursor.fetchone()
print "Oozie version: ", self.oz_version[0]
self.logger.info("Oozie version: ", self.oz_version[0])
def run(self):
try:
@ -76,7 +71,7 @@ class OozieExtract:
self.oz_con.close()
def collect_flow_jobs(self, flow_file, job_file, dag_file):
print "collect flow&jobs"
self.logger.info("collect flow&jobs")
flow_writer = FileWriter(flow_file)
job_writer = FileWriter(job_file)
dag_writer = FileWriter(dag_file)
@ -136,7 +131,7 @@ class OozieExtract:
flow_writer.close()
def collect_flow_owners(self, owner_file):
print "collect owners"
self.logger.info("collect owners")
owner_writer = FileWriter(owner_file)
query = "SELECT DISTINCT app_name, app_path, user_name from WF_JOBS"
self.oz_cursor.execute(query)
@ -151,7 +146,7 @@ class OozieExtract:
owner_writer.close()
def collect_flow_schedules(self, schedule_file):
print "collect flow schedule"
self.logger.info("collect flow schedule")
schedule_writer = FileWriter(schedule_file)
query = """
SELECT DISTINCT cj.id as ref_id, cj.frequency, cj.time_unit,
@ -177,7 +172,7 @@ class OozieExtract:
schedule_writer.close()
def collect_flow_execs(self, flow_exec_file, lookback_period):
print "collect flow execs"
self.logger.info("collect flow execs")
flow_exec_writer = FileWriter(flow_exec_file)
query = "select id, app_name, app_path, unix_timestamp(start_time) as start_time, unix_timestamp(end_time) as end_time, run, status, user_name from WF_JOBS where end_time > now() - INTERVAL %d MINUTE" % (int(lookback_period))
self.oz_cursor.execute(query)
@ -200,7 +195,7 @@ class OozieExtract:
flow_exec_writer.close()
def collect_job_execs(self, job_exec_file, lookback_period):
print "collect job execs"
self.logger.info("collect job execs")
job_exec_writer = FileWriter(job_exec_file)
query = """
select a.id as job_exec_id, a.name as job_name, j.id as flow_exec_id, a.status, a.user_retry_count,

View File

@ -13,17 +13,14 @@
#
from jython.SchedulerLoad import SchedulerLoad
__author__ = 'zechen'
import sys
class OozieLoad(SchedulerLoad):
def __init__(self, args):
SchedulerLoad.__init__(self, args)
if __name__ == "__main__":
props = sys.argv[1]
oz = OozieLoad(props)

View File

@ -14,14 +14,10 @@
from jython.SchedulerTransform import SchedulerTransform
from wherehows.common.enums import SchedulerType
__author__ = 'zechen'
import sys
class OozieTransform(SchedulerTransform):
def __init__(self, args):
SchedulerTransform.__init__(self, args, SchedulerType.OOZIE)
@ -32,7 +28,7 @@ class OozieTransform(SchedulerTransform):
SET sj.is_last = 'N'
WHERE sj.job_type = ':FORK:' AND sj.app_id = {app_id}
""".format(app_id=self.app_id)
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
@ -45,7 +41,7 @@ class OozieTransform(SchedulerTransform):
WHERE sf.app_id = {app_id}
AND NOT EXISTS (SELECT * FROM flow_execution_id_map where app_id = sf.app_id AND source_exec_uuid = sf.flow_exec_uuid)
""".format(app_id=self.app_id)
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
@ -55,7 +51,7 @@ class OozieTransform(SchedulerTransform):
ON stg.app_id = fm.app_id AND stg.flow_exec_uuid = fm.source_exec_uuid
SET stg.flow_exec_id = fm.flow_exec_id WHERE stg.app_id = {app_id}
""".format(app_id=self.app_id)
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
@ -68,7 +64,7 @@ class OozieTransform(SchedulerTransform):
WHERE sj.app_id = {app_id}
AND NOT EXISTS (SELECT * FROM job_execution_id_map where app_id = sj.app_id AND source_exec_uuid = sj.job_exec_uuid)
""".format(app_id=self.app_id)
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
@ -78,7 +74,7 @@ class OozieTransform(SchedulerTransform):
ON stg.app_id = jm.app_id AND stg.job_exec_uuid = jm.source_exec_uuid
SET stg.job_exec_id = jm.job_exec_id WHERE stg.app_id = {app_id}
""".format(app_id=self.app_id)
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
@ -88,10 +84,11 @@ class OozieTransform(SchedulerTransform):
ON stg.app_id = fm.app_id AND stg.flow_exec_uuid = fm.source_exec_uuid
SET stg.flow_exec_id = fm.flow_exec_id WHERE stg.app_id = {app_id}
""".format(app_id=self.app_id)
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
if __name__ == "__main__":
props = sys.argv[1]
oz = OozieTransform(props)

View File

@ -12,17 +12,15 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#
from org.slf4j import LoggerFactory
from wherehows.common import Constant
__author__ = 'zechen'
from com.ziclix.python.sql import zxJDBC
import sys
class OwnerLoad:
def __init__(self, args):
self.logger = LoggerFactory.getLogger('jython script : ' + self.__class__.__name__)
self.wh_con = zxJDBC.connect(args[Constant.WH_DB_URL_KEY],
args[Constant.WH_DB_USERNAME_KEY],
args[Constant.WH_DB_PASSWORD_KEY],
@ -51,7 +49,7 @@ class OwnerLoad:
wh_etl_exec_id = {wh_etl_exec_id},
modified_time = unix_timestamp(NOW())
""".format(wh_etl_exec_id=self.wh_exec_id)
print cmd
self.logger.debug(cmd)
self.wh_cursor.execute(cmd)
self.wh_con.commit()
@ -77,7 +75,7 @@ class OwnerLoad:
for l in range(1, 6):
cmd = template.format(wh_etl_exec_id=self.wh_exec_id, lvl=l)
print cmd
self.logger.debug(cmd)
self.wh_cursor.execute(cmd)
self.wh_con.commit()

View File

@ -12,8 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#
__author__ = 'zechen'
from org.slf4j import LoggerFactory
from wherehows.common import Constant
from com.ziclix.python.sql import zxJDBC
import sys
@ -21,9 +20,7 @@ import sys
class OwnerTransform:
_tables = {"dataset_owner": {"columns": "dataset_urn, owner_id, sort_id, namespace, db_name, source_time",
"file": "dataset_owner.csv",
"table": "stg_dataset_owner"}
}
"file": "dataset_owner.csv", "table": "stg_dataset_owner"}}
_clear_staging_tempalte = """
DELETE FROM {table}
@ -84,6 +81,7 @@ class OwnerTransform:
"""
def __init__(self, args):
self.logger = LoggerFactory.getLogger('jython script : ' + self.__class__.__name__)
self.wh_con = zxJDBC.connect(args[Constant.WH_DB_URL_KEY],
args[Constant.WH_DB_USERNAME_KEY],
args[Constant.WH_DB_PASSWORD_KEY],
@ -109,46 +107,47 @@ class OwnerTransform:
# Clear stagging table
query = self._clear_staging_tempalte.format(table=t.get("table"))
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
# Load file into stagging table
query = self._read_file_template.format(folder=self.metadata_folder, file=t.get("file"), table=t.get("table"), columns=t.get("columns"))
print query
query = self._read_file_template.format(folder=self.metadata_folder, file=t.get("file"), table=t.get("table"),
columns=t.get("columns"))
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
def update_dataset_id(self):
t = self._tables["dataset_owner"]
query = self._update_dataset_id_template.format(table=t.get("table"))
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
def update_database_id(self):
t = self._tables["dataset_owner"]
query = self._update_database_id_template.format(table=t.get("table"))
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
def update_app_id(self):
t = self._tables["dataset_owner"]
query = self._update_app_id_template.format(table=t.get("table"))
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
query = self._update_group_app_id_template.format(table=t.get("table"))
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
def update_owner_type(self):
t = self._tables["dataset_owner"]
query = self._update_owner_type_template.format(table=t.get("table"))
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
@ -156,7 +155,7 @@ class OwnerTransform:
t = self._tables["dataset_owner"]
for l in range(1, 6):
query = self._update_parent_flag.format(table=t.get("table"), lvl=l)
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()

View File

@ -13,22 +13,17 @@
#
from wherehows.common import Constant
__author__ = 'zechen'
from com.ziclix.python.sql import zxJDBC
import sys
from org.slf4j import LoggerFactory
class SchedulerLoad:
def __init__(self, args):
self.logger = LoggerFactory.getLogger('jython script : ' + self.__class__.__name__)
self.app_id = int(args[Constant.APP_ID_KEY])
self.wh_con = zxJDBC.connect(args[Constant.WH_DB_URL_KEY],
args[Constant.WH_DB_USERNAME_KEY],
args[Constant.WH_DB_PASSWORD_KEY],
args[Constant.WH_DB_DRIVER_KEY])
self.wh_con = zxJDBC.connect(args[Constant.WH_DB_URL_KEY], args[Constant.WH_DB_USERNAME_KEY],
args[Constant.WH_DB_PASSWORD_KEY], args[Constant.WH_DB_DRIVER_KEY])
self.wh_cursor = self.wh_con.cursor()
def run(self):
@ -64,7 +59,7 @@ class SchedulerLoad:
modified_time = unix_timestamp(NOW()),
wh_etl_exec_id = s.wh_etl_exec_id
""".format(app_id=self.app_id)
print cmd
self.logger.debug(cmd)
self.wh_cursor.execute(cmd)
self.wh_con.commit()
@ -76,7 +71,7 @@ class SchedulerLoad:
SET j.is_current = 'N'
WHERE (s.job_id IS NULL OR s.dag_version > j.dag_version) AND j.app_id = {app_id}
""".format(app_id=self.app_id)
print cmd
self.logger.debug(cmd)
self.wh_cursor.execute(cmd)
self.wh_con.commit()
@ -103,7 +98,7 @@ class SchedulerLoad:
modified_time = unix_timestamp(NOW()),
wh_etl_exec_id = s.wh_etl_exec_id
""".format(app_id=self.app_id)
print cmd
self.logger.debug(cmd)
self.wh_cursor.execute(cmd)
self.wh_con.commit()
@ -113,7 +108,7 @@ class SchedulerLoad:
SET is_current = 'N'
WHERE f.app_id = {app_id}
""".format(app_id=self.app_id)
print cmd
self.logger.debug(cmd)
self.wh_cursor.execute(cmd)
cmd = """
@ -127,7 +122,7 @@ class SchedulerLoad:
is_current = 'Y',
wh_etl_exec_id = s.wh_etl_exec_id
""".format(app_id=self.app_id)
print cmd
self.logger.debug(cmd)
self.wh_cursor.execute(cmd)
self.wh_con.commit()
@ -159,7 +154,7 @@ class SchedulerLoad:
modified_time = unix_timestamp(NOW()),
wh_etl_exec_id = s.wh_etl_exec_id
""".format(app_id=self.app_id)
print cmd
self.logger.debug(cmd)
self.wh_cursor.execute(cmd)
self.wh_con.commit()
@ -177,7 +172,7 @@ class SchedulerLoad:
modified_time = unix_timestamp(NOW()),
wh_etl_exec_id = s.wh_etl_exec_id
""".format(app_id=self.app_id)
print cmd
self.logger.debug(cmd)
self.wh_cursor.execute(cmd)
self.wh_con.commit()
@ -198,7 +193,7 @@ class SchedulerLoad:
modified_time = unix_timestamp(NOW()),
wh_etl_exec_id = s.wh_etl_exec_id
""".format(app_id=self.app_id)
print cmd
self.logger.debug(cmd)
self.wh_cursor.execute(cmd)
self.wh_con.commit()
@ -221,7 +216,7 @@ class SchedulerLoad:
modified_time = unix_timestamp(NOW()),
wh_etl_exec_id = s.wh_etl_exec_id
""".format(app_id=self.app_id)
print cmd
self.logger.debug(cmd)
self.wh_cursor.execute(cmd)
self.wh_con.commit()

View File

@ -13,13 +13,10 @@
#
from wherehows.common.enums import SchedulerType
__author__ = 'zechen'
from wherehows.common import Constant
from com.ziclix.python.sql import zxJDBC
import sys
from org.slf4j import LoggerFactory
class SchedulerTransform:
@ -74,6 +71,7 @@ class SchedulerTransform:
"""
def __init__(self, args, scheduler_type):
self.logger = LoggerFactory.getLogger('jython script : ' + self.__class__.__name__)
self.app_id = int(args[Constant.APP_ID_KEY])
self.wh_con = zxJDBC.connect(args[Constant.WH_DB_URL_KEY],
args[Constant.WH_DB_USERNAME_KEY],
@ -101,13 +99,14 @@ class SchedulerTransform:
# Clear stagging table
query = self._clear_staging_tempalte.format(table=t.get("table"), app_id=self.app_id)
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
# Load file into stagging table
query = self._read_file_template.format(folder=self.metadata_folder, file=t.get("file"), table=t.get("table"), columns=t.get("columns"))
print query
query = self._read_file_template.format(folder=self.metadata_folder, file=t.get("file"), table=t.get("table"),
columns=t.get("columns"))
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
@ -118,13 +117,13 @@ class SchedulerTransform:
WHERE sf.app_id = {app_id}
AND NOT EXISTS (SELECT * FROM flow_source_id_map where app_id = sf.app_id AND source_id_string = sf.flow_path)
""".format(table=t.get("table"), app_id=self.app_id)
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
# Update flow id from mapping table
query = self._get_flow_id_template.format(table=t.get("table"), app_id=self.app_id)
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
@ -133,19 +132,20 @@ class SchedulerTransform:
# Clearing stagging table
query = self._clear_staging_tempalte.format(table=t.get("table"), app_id=self.app_id)
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
# Load file into stagging table
query = self._read_file_template.format(folder=self.metadata_folder, file=t.get("file"), table=t.get("table"), columns=t.get("columns"))
print query
query = self._read_file_template.format(folder=self.metadata_folder, file=t.get("file"), table=t.get("table"),
columns=t.get("columns"))
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
# Update flow id from mapping table
query = self._get_flow_id_template.format(table=t.get("table"), app_id=self.app_id)
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
@ -155,7 +155,7 @@ class SchedulerTransform:
SET stg.ref_flow_path = null
WHERE stg.ref_flow_path = 'null' and stg.app_id = {app_id}
""".format(table=t.get("table"), app_id=self.app_id)
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
@ -166,7 +166,7 @@ class SchedulerTransform:
ON stg.app_id = fm.app_id AND stg.ref_flow_path = fm.source_id_string
SET stg.ref_flow_id = fm.flow_id WHERE stg.app_id = {app_id}
""".format(table=t.get("table"), app_id=self.app_id)
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
@ -177,13 +177,13 @@ class SchedulerTransform:
WHERE sj.app_id = {app_id}
AND NOT EXISTS (SELECT * FROM job_source_id_map where app_id = sj.app_id AND source_id_string = sj.job_path)
""".format(table=t.get("table"), app_id=self.app_id)
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
# Update job id from mapping table
query = self._get_job_id_template.format(table=t.get("table"), app_id=self.app_id)
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
@ -195,7 +195,7 @@ class SchedulerTransform:
SET sj.job_type_id = jtm.job_type_id
WHERE sj.app_id = {app_id}
""".format(table=t.get("table"), app_id=self.app_id)
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
@ -204,18 +204,19 @@ class SchedulerTransform:
# Clearing staging table
query = self._clear_staging_tempalte.format(table=t.get("table"), app_id=self.app_id)
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
query = self._read_file_template.format(folder=self.metadata_folder, file=t.get("file"), table=t.get("table"), columns=t.get("columns"))
print query
query = self._read_file_template.format(folder=self.metadata_folder, file=t.get("file"), table=t.get("table"),
columns=t.get("columns"))
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
# Update flow id from mapping table
query = self._get_flow_id_template.format(table=t.get("table"), app_id=self.app_id)
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
@ -225,7 +226,7 @@ class SchedulerTransform:
SET sj.source_job_id = jm.job_id
WHERE sj.app_id = {app_id}
""".format(app_id=self.app_id, table=t.get("table"))
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
@ -235,14 +236,14 @@ class SchedulerTransform:
SET sj.target_job_id = jm.job_id
WHERE sj.app_id = {app_id}
""".format(app_id=self.app_id, table=t.get("table"))
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
# Update pre_jobs and post_jobs in stg_flow_jobs table
# need increase group concat max length to avoid overflow
query = "SET group_concat_max_len=40960"
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
@ -255,7 +256,7 @@ class SchedulerTransform:
SET sj.post_jobs = d.post_jobs
WHERE sj.app_id = {app_id};
""".format(app_id=self.app_id, table=t.get("table"))
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
@ -268,7 +269,7 @@ class SchedulerTransform:
SET sj.pre_jobs = d.pre_jobs
WHERE sj.app_id = {app_id};
""".format(app_id=self.app_id, table=t.get("table"))
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
@ -289,7 +290,7 @@ class SchedulerTransform:
self.wh_con.commit()
query = self._clear_staging_tempalte.format(table="stg_flow_dag", app_id=self.app_id)
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
@ -303,7 +304,7 @@ class SchedulerTransform:
FROM stg_flow sf
WHERE sf.app_id = {app_id} AND NOT EXISTS (SELECT * FROM {table} t WHERE t.app_id = sf.app_id AND t.flow_id = sf.flow_id AND t.source_version = sf.source_version)
""".format(app_id=self.app_id, table=t.get("table"))
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
@ -314,7 +315,7 @@ class SchedulerTransform:
SET s.dag_version = CASE WHEN f.dag_md5 IS NULL THEN 0 WHEN s.dag_md5 != f.dag_md5 THEN f.dag_version + 1 ELSE f.dag_version END
WHERE s.app_id = {app_id}
""".format(app_id=self.app_id)
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
@ -326,7 +327,7 @@ class SchedulerTransform:
SET sj.dag_version = dag.dag_version
WHERE sj.app_id = {app_id}
""".format(app_id=self.app_id)
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
@ -336,19 +337,20 @@ class SchedulerTransform:
# Clear stagging table
query = self._clear_staging_tempalte.format(table=t.get("table"), app_id=self.app_id)
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
# Load file into stagging table
query = self._read_file_template.format(folder=self.metadata_folder, file=t.get("file"), table=t.get("table"), columns=t.get("columns"))
print query
query = self._read_file_template.format(folder=self.metadata_folder, file=t.get("file"), table=t.get("table"),
columns=t.get("columns"))
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
# Update flow id from mapping table
query = self._get_flow_id_template.format(table=t.get("table"), app_id=self.app_id)
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
@ -357,19 +359,20 @@ class SchedulerTransform:
# Clear stagging table
query = self._clear_staging_tempalte.format(table=t.get("table"), app_id=self.app_id)
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
# Load file into stagging table
query = self._read_file_template.format(folder=self.metadata_folder, file=t.get("file"), table=t.get("table"), columns=t.get("columns"))
print query
query = self._read_file_template.format(folder=self.metadata_folder, file=t.get("file"), table=t.get("table"),
columns=t.get("columns"))
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
# Update flow id from mapping table
query = self._get_flow_id_template.format(table=t.get("table"), app_id=self.app_id)
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
@ -381,7 +384,7 @@ class SchedulerTransform:
SET f.is_scheduled = CASE WHEN fs.flow_id IS NULL THEN 'N' ELSE 'Y' END
WHERE f.app_id = {app_id}
""".format(table=t.get("table"), app_id=self.app_id)
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
@ -390,19 +393,20 @@ class SchedulerTransform:
# Clear stagging table
query = self._clear_staging_tempalte.format(table=t.get("table"), app_id=self.app_id)
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
# Load file into stagging table
query = self._read_file_template.format(folder=self.metadata_folder, file=t.get("file"), table=t.get("table"), columns=t.get("columns"))
print query
query = self._read_file_template.format(folder=self.metadata_folder, file=t.get("file"), table=t.get("table"),
columns=t.get("columns"))
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
# Update flow id from mapping table
query = self._get_flow_id_template.format(table=t.get("table"), app_id=self.app_id)
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
@ -411,28 +415,30 @@ class SchedulerTransform:
# Clear stagging table
query = self._clear_staging_tempalte.format(table=t.get("table"), app_id=self.app_id)
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
# Load file into stagging table
query = self._read_file_template.format(folder=self.metadata_folder, file=t.get("file"), table=t.get("table"), columns=t.get("columns"))
print query
query = self._read_file_template.format(folder=self.metadata_folder, file=t.get("file"), table=t.get("table"),
columns=t.get("columns"))
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
# Update flow id from mapping table
query = self._get_flow_id_template.format(table=t.get("table"), app_id=self.app_id)
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
# Update job id from mapping table
query = self._get_job_id_template.format(table=t.get("table"), app_id=self.app_id)
print query
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
if __name__ == "__main__":
props = sys.argv[1]
st = SchedulerTransform(props, SchedulerType.GENERIC)

View File

@ -19,9 +19,13 @@ import commands
from wherehows.common.schemas import SampleDataRecord
from wherehows.common.writers import FileWriter
from wherehows.common import Constant
from org.slf4j import LoggerFactory
class TeradataExtract:
def __init__(self):
self.logger = LoggerFactory.getLogger('jython script : ' + self.__class__.__name__)
def get_view_info(self, database_name, view_name):
'''
:param database_name:
@ -342,7 +346,8 @@ class TeradataExtract:
if full_name not in table_dict:
schema[db_idx]['views'].append(
{'name': row[1], 'type': 'View', 'createTime': row[3], 'lastAlterTime': row[4], 'accessCount': row[5],
'referenceTables': ref_table_list, 'viewSqlText': row[2].replace("\r", "\n"), 'columns': [], 'original_name' : full_name})
'referenceTables': ref_table_list, 'viewSqlText': row[2].replace("\r", "\n"), 'columns': [],
'original_name': full_name})
table_dict[full_name] = len(schema[db_idx]['views']) - 1
table_idx = table_dict[full_name]
schema[db_idx]['views'][table_idx]['columns'].append(
@ -352,8 +357,8 @@ class TeradataExtract:
if row[7]:
schema[db_idx]['views'][table_idx]['columns'][column_idx]['columnFormat'] = row[7].strip()
print "%s %6d views with %6d columns processed for %12s" % (
datetime.datetime.now(), table_idx + 1, len(rows), row[0])
self.logger.info("%s %6d views with %6d columns processed for %12s" % (
datetime.datetime.now(), table_idx + 1, len(rows), row[0]))
def format_table_metadata(self, rows, schema):
'''
@ -382,14 +387,14 @@ class TeradataExtract:
# full_name = row[0] + '.' + row[1]
original_name = row[0] + '.' + row[20]
if original_name not in extra_table_info:
print 'ERROR : {0} not in extra_table_info!'.format(original_name)
self.logger.error('ERROR : {0} not in extra_table_info!'.format(original_name))
continue
if full_name not in table_dict:
schema[db_idx]['tables'].append(
{'name': row[1], 'type': 'Table', 'createTime': row[2], 'lastAlterTime': row[3], 'lastAccessTime': row[4],
'accessCount': row[5], 'owner': row[19], 'sizeInMbytes': extra_table_info[original_name]['size_in_mb'],
'partitions': extra_table_info[original_name]['partitions'],
'indices': extra_table_info[original_name]['indices'], 'columns': [], 'original_name' : original_name})
'indices': extra_table_info[original_name]['indices'], 'columns': [], 'original_name': original_name})
table_idx += 1
table_dict[full_name] = table_idx
# print "%6d: %s: %s" % (table_idx, full_name, str(schema[db_idx]['tables'][table_idx]))
@ -408,8 +413,8 @@ class TeradataExtract:
schema[db_idx]['tables'][table_idx]['columns'][column_idx]['statistics'] = {
'uniqueValueCount': row[12] & 0xffff, 'lastStatsCollectTime': str(row[13])}
print "%s %6d tables with %6d columns processed for %12s" % (
datetime.datetime.now(), table_idx + 1, len(rows), row[0])
self.logger.info("%s %6d tables with %6d columns processed for %12s" % (
datetime.datetime.now(), table_idx + 1, len(rows), row[0]))
def get_sample_data(self, database_name, table_name):
"""
@ -445,13 +450,13 @@ class TeradataExtract:
row_data.append(new_value)
rows_data.append(row_data)
except Exception, e:
print 'sql : ' + sql
self.logger.error('sql : ' + sql)
if len(rows) == 0:
print "dataset {0} is empty".format(fullname)
self.logger.error("dataset {0} is empty".format(fullname))
else:
print "dataset {0} is not accessible.".format(fullname)
print 'result : ' + str(rows)
print e
self.logger.error("dataset {0} is not accessible.".format(fullname))
self.logger.error('result : ' + str(rows))
self.logger.error(e)
pass
ref_urn = 'teradata:///' + fullname.replace('.', '/').replace('"', '')
@ -480,7 +485,7 @@ class TeradataExtract:
if database_name is None and table_name is None: # default route: process everything
for database_name in self.databases:
print "Collecting tables in database : " + database_name
self.logger.info("Collecting tables in database : " + database_name)
# table info
rows = []
begin = datetime.datetime.now().strftime("%H:%M:%S")
@ -539,7 +544,8 @@ if __name__ == "__main__":
e = TeradataExtract()
e.conn_td = zxJDBC.connect(JDBC_URL, username, password, JDBC_DRIVER)
try:
e.conn_td.cursor().execute("SET QUERY_BAND = 'script=%s; pid=%d; ' FOR SESSION;" % ('TeradataExtract.py', os.getpid()))
e.conn_td.cursor().execute(
"SET QUERY_BAND = 'script=%s; pid=%d; ' FOR SESSION;" % ('TeradataExtract.py', os.getpid()))
e.conn_td.commit()
e.log_file = args[Constant.TD_LOG_KEY]
e.databases = args[Constant.TD_TARGET_DATABASES_KEY].split(',')

View File

@ -15,9 +15,13 @@
import sys
from com.ziclix.python.sql import zxJDBC
from wherehows.common import Constant
from org.slf4j import LoggerFactory
class TeradataLoad:
def __init__(self):
self.logger = LoggerFactory.getLogger('jython script : ' + self.__class__.__name__)
def load_metadata(self):
cursor = self.conn_mysql.cursor()
load_cmd = '''
@ -97,7 +101,7 @@ class TeradataLoad:
'''.format(source_file=self.input_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id)
for state in load_cmd.split(";"):
print state
self.logger.debug(state)
cursor.execute(state)
self.conn_mysql.commit()
cursor.close()
@ -219,7 +223,7 @@ class TeradataLoad:
'''.format(source_file=self.input_field_file, db_id=self.db_id)
for state in load_cmd.split(";"):
print state
self.logger.debug(state)
cursor.execute(state)
self.conn_mysql.commit()
cursor.close()
@ -264,7 +268,7 @@ class TeradataLoad:
cursor = self.conn_mysql.cursor()
for state in load_cmd.split(";"):
print state
self.logger.debug(state)
cursor.execute(state)
self.conn_mysql.commit()
cursor.close()

View File

@ -13,15 +13,19 @@
#
import json
import pprint, datetime
import datetime
import sys, os
import time
from wherehows.common.writers import FileWriter
from wherehows.common.schemas import DatasetSchemaRecord, DatasetFieldRecord
from wherehows.common import Constant
from org.slf4j import LoggerFactory
class TeradataTransform:
def __init__(self):
self.logger = LoggerFactory.getLogger('jython script : ' + self.__class__.__name__)
def transform(self, input, td_metadata, td_field_metadata):
'''
convert from json to csv
@ -30,8 +34,6 @@ class TeradataTransform:
:param td_field_metadata: output data file for teradata field metadata
:return:
'''
pp = pprint.PrettyPrinter(indent=1)
f_json = open(input)
data = json.load(f_json)
f_json.close()
@ -44,9 +46,9 @@ class TeradataTransform:
for k in d.keys():
if k not in ['tables', 'views']:
continue
print "%s %4d %s" % (datetime.datetime.now().strftime("%H:%M:%S"), len(d[k]), k)
self.logger.info("%s %4d %s" % (datetime.datetime.now().strftime("%H:%M:%S"), len(d[k]), k))
for t in d[k]:
print "%4d %s" % (i, t['name'])
self.logger.info("%4d %s" % (i, t['name']))
if t['name'] == 'HDFStoTD_2464_ERR_1':
continue
i += 1
@ -80,7 +82,6 @@ class TeradataTransform:
field_detail_list = []
sort_id = 0
for c in t['columns']:
# pp.pprint(c)
# output['fields'].append(
# { 'name' : t['name'].encode('latin-1'),
# 'type' : None if c['data_type'] is None else c['data_type'].encode('latin-1'),
@ -117,7 +118,7 @@ class TeradataTransform:
schema_file_writer.flush()
field_file_writer.flush()
print "%20s contains %6d %s" % (d['database'], i, k)
self.logger.info("%20s contains %6d %s" % (d['database'], i, k))
schema_file_writer.close()
field_file_writer.close()

View File

@ -23,8 +23,12 @@ log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1
# Redirect log messages to a log file, support file rolling.
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=/var/tmp/wherehows.log
log4j.appender.file.File=/var/tmp/wherehows/wherehows.log
log4j.appender.file.MaxFileSize=5MB
log4j.appender.file.MaxBackupIndex=10
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
log4j.logger.com.jayway.jsonpath=WARN
log4j.logger.org.apache=WARN