Optimize dataset load scripts, improve speed (#350)

- When loading dataset fields in staging table, populate the dateset_id field first then use this in later JOIN.
- When JOIN two big tables such as dict_field_detail, use pre-select to reduce table JOIN size and DB resource.
- Refactor some SQL code.
- Modify logback setting to better capture log time.
- Remove unnecessary config in backend application.conf
This commit is contained in:
Yi (Alan) Wang 2017-03-22 10:23:30 -07:00 committed by GitHub
parent d9438dc5bc
commit b6e644fbb1
8 changed files with 185 additions and 200 deletions

View File

@ -41,7 +41,6 @@ db.wherehows.driver = com.mysql.jdbc.Driver
db.wherehows.url = ${WHZ_DB_URL} db.wherehows.url = ${WHZ_DB_URL}
db.wherehows.username = ${WHZ_DB_USERNAME} db.wherehows.username = ${WHZ_DB_USERNAME}
db.wherehows.password = ${WHZ_DB_PASSWORD} db.wherehows.password = ${WHZ_DB_PASSWORD}
db.wherehows.host = ${WHZ_DB_HOST}
# You can expose this datasource via JNDI if needed (Useful for JPA) # You can expose this datasource via JNDI if needed (Useful for JPA)
# db.default.jndiName=DefaultDS # db.default.jndiName=DefaultDS

View File

@ -19,7 +19,7 @@
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder> <encoder>
<pattern>%coloredLevel - %logger - %message%n%xException</pattern> <pattern>%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n</pattern>
</encoder> </encoder>
</appender> </appender>

View File

@ -27,7 +27,6 @@ class HdfsLoad:
Load dataset metadata into final table Load dataset metadata into final table
:return: nothing :return: nothing
""" """
cursor = self.conn_mysql.cursor()
load_cmd = ''' load_cmd = '''
DELETE FROM stg_dict_dataset WHERE db_id = {db_id}; DELETE FROM stg_dict_dataset WHERE db_id = {db_id};
@ -197,15 +196,11 @@ class HdfsLoad:
; ;
'''.format(source_file=self.input_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id) '''.format(source_file=self.input_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id)
for state in load_cmd.split(";"): self.executeCommands(load_cmd)
self.logger.debug(state)
cursor.execute(state)
self.conn_mysql.commit()
cursor.close()
self.logger.info("finish loading hdfs metadata db_id={db_id} to dict_dataset".format(db_id=self.db_id)) self.logger.info("finish loading hdfs metadata db_id={db_id} to dict_dataset".format(db_id=self.db_id))
def load_field(self): def load_field(self):
cursor = self.conn_mysql.cursor()
load_field_cmd = ''' load_field_cmd = '''
DELETE FROM stg_dict_field_detail where db_id = {db_id}; DELETE FROM stg_dict_field_detail where db_id = {db_id};
@ -242,24 +237,25 @@ class HdfsLoad:
analyze table field_comments; analyze table field_comments;
-- update stg_dict_field_detail dataset_id
update stg_dict_field_detail sf, dict_dataset d
set sf.dataset_id = d.id where sf.urn = d.urn
and sf.db_id = {db_id};
delete from stg_dict_field_detail
where db_id = {db_id} and dataset_id is null; -- remove if not match to dataset
-- delete old record if it does not exist in this load batch anymore (but have the dataset id) -- delete old record if it does not exist in this load batch anymore (but have the dataset id)
create temporary table if not exists t_deleted_fields (primary key (field_id)) create temporary table if not exists t_deleted_fields (primary key (field_id)) ENGINE=MyISAM
select x.field_id SELECT x.field_id
from stg_dict_field_detail s FROM (select dataset_id, field_name, parent_path from stg_dict_field_detail where db_id = {db_id}) s
join dict_dataset i RIGHT JOIN
on s.urn = i.urn ( select dataset_id, field_id, field_name, parent_path from dict_field_detail
and s.db_id = {db_id} where dataset_id in (select dataset_id from stg_dict_field_detail where db_id = {db_id})
right join dict_field_detail x ) x
on i.id = x.dataset_id ON s.dataset_id = x.dataset_id
and s.field_name = x.field_name AND s.field_name = x.field_name
and s.parent_path = x.parent_path AND s.parent_path = x.parent_path
where s.field_name is null WHERE s.field_name is null
and x.dataset_id in (
select d.id dataset_id
from stg_dict_field_detail k join dict_dataset d
on k.urn = d.urn
and k.db_id = {db_id}
)
; -- run time : ~2min ; -- run time : ~2min
delete from dict_field_detail where field_id in (select field_id from t_deleted_fields); delete from dict_field_detail where field_id in (select field_id from t_deleted_fields);
@ -269,12 +265,10 @@ class HdfsLoad:
( (
select x.field_id, s.* select x.field_id, s.*
from (select * from stg_dict_field_detail where db_id = {db_id}) s from (select * from stg_dict_field_detail where db_id = {db_id}) s
join dict_dataset d
on s.urn = d.urn
join dict_field_detail x join dict_field_detail x
on s.field_name = x.field_name on s.field_name = x.field_name
and coalesce(s.parent_path, '*') = coalesce(x.parent_path, '*') and coalesce(s.parent_path, '*') = coalesce(x.parent_path, '*')
and d.id = x.dataset_id and s.dataset_id = x.dataset_id
where (x.sort_id <> s.sort_id where (x.sort_id <> s.sort_id
or x.parent_sort_id <> s.parent_sort_id or x.parent_sort_id <> s.parent_sort_id
or x.data_type <> s.data_type or x.data_type <> s.data_type
@ -307,12 +301,11 @@ class HdfsLoad:
modified modified
) )
select select
d.id, 0, sf.sort_id, sf.parent_sort_id, sf.parent_path, sf.dataset_id, 0, sf.sort_id, sf.parent_sort_id, sf.parent_path,
sf.field_name, sf.namespace, sf.data_type, sf.data_size, sf.is_nullable, sf.default_value, now() sf.field_name, sf.namespace, sf.data_type, sf.data_size, sf.is_nullable, sf.default_value, now()
from stg_dict_field_detail sf join dict_dataset d from stg_dict_field_detail sf
on sf.urn = d.urn
left join dict_field_detail t left join dict_field_detail t
on d.id = t.dataset_id on sf.dataset_id = t.dataset_id
and sf.field_name = t.field_name and sf.field_name = t.field_name
and sf.parent_path = t.parent_path and sf.parent_path = t.parent_path
where db_id = {db_id} and t.field_id is null where db_id = {db_id} and t.field_id is null
@ -325,13 +318,12 @@ class HdfsLoad:
-- insert -- insert
insert into stg_dict_dataset_field_comment insert into stg_dict_dataset_field_comment
select t.field_id field_id, fc.id comment_id, d.id dataset_id, {db_id} select t.field_id field_id, fc.id comment_id, sf.dataset_id dataset_id, {db_id}
from stg_dict_field_detail sf join dict_dataset d from stg_dict_field_detail sf
on sf.urn = d.urn
join field_comments fc join field_comments fc
on sf.description = fc.comment on sf.description = fc.comment
join dict_field_detail t join dict_field_detail t
on d.id = t.dataset_id on sf.dataset_id = t.dataset_id
and sf.field_name = t.field_name and sf.field_name = t.field_name
and sf.parent_path = t.parent_path and sf.parent_path = t.parent_path
where sf.db_id = {db_id}; where sf.db_id = {db_id};
@ -357,15 +349,12 @@ class HdfsLoad:
DELETE FROM stg_dict_field_detail where db_id = {db_id}; DELETE FROM stg_dict_field_detail where db_id = {db_id};
'''.format(source_file=self.input_field_file, db_id=self.db_id) '''.format(source_file=self.input_field_file, db_id=self.db_id)
for state in load_field_cmd.split(";"):
self.logger.debug(state) self.executeCommands(load_field_cmd)
cursor.execute(state)
self.conn_mysql.commit()
cursor.close()
self.logger.info("finish loading hdfs metadata db_id={db_id} to dict_field_detail".format(db_id=self.db_id)) self.logger.info("finish loading hdfs metadata db_id={db_id} to dict_field_detail".format(db_id=self.db_id))
def load_sample(self): def load_sample(self):
cursor = self.conn_mysql.cursor()
load_sample_cmd = ''' load_sample_cmd = '''
DELETE FROM stg_dict_dataset_sample where db_id = {db_id}; DELETE FROM stg_dict_dataset_sample where db_id = {db_id};
@ -375,11 +364,11 @@ class HdfsLoad:
(urn,ref_urn,data) (urn,ref_urn,data)
SET db_id = {db_id}; SET db_id = {db_id};
-- update reference id in stagging table -- update reference id in staging table
UPDATE stg_dict_dataset_sample s UPDATE stg_dict_dataset_sample s
JOIN dict_dataset d ON s.ref_urn = d.urn JOIN dict_dataset d ON s.ref_urn = d.urn
SET s.ref_id = d.id SET s.ref_id = d.id
WHERE s.db_id = {db_id} AND s.ref_urn > '' AND s.ref_urn <> 'null'; WHERE s.db_id = {db_id} AND s.ref_urn > '';
-- first insert ref_id as 0 -- first insert ref_id as 0
INSERT INTO dict_dataset_sample INSERT INTO dict_dataset_sample
@ -401,14 +390,18 @@ class HdfsLoad:
SET d.ref_id = s.ref_id SET d.ref_id = s.ref_id
WHERE s.db_id = {db_id} AND d.ref_id = 0; WHERE s.db_id = {db_id} AND d.ref_id = 0;
'''.format(source_file=self.input_sample_file, db_id=self.db_id) '''.format(source_file=self.input_sample_file, db_id=self.db_id)
for state in load_sample_cmd.split(";"):
self.logger.debug(state) self.executeCommands(load_sample_cmd)
cursor.execute(state)
self.conn_mysql.commit()
cursor.close()
self.logger.info("finish loading hdfs sample data db_id={db_id} to dict_dataset_sample".format(db_id=self.db_id)) self.logger.info("finish loading hdfs sample data db_id={db_id} to dict_dataset_sample".format(db_id=self.db_id))
def executeCommands(self, commands):
for cmd in commands.split(";"):
self.logger.debug(cmd)
self.conn_cursor.execute(cmd)
self.conn_mysql.commit()
if __name__ == "__main__": if __name__ == "__main__":
args = sys.argv[1] args = sys.argv[1]
@ -426,10 +419,11 @@ if __name__ == "__main__":
l.db_id = args[Constant.DB_ID_KEY] l.db_id = args[Constant.DB_ID_KEY]
l.wh_etl_exec_id = args[Constant.WH_EXEC_ID_KEY] l.wh_etl_exec_id = args[Constant.WH_EXEC_ID_KEY]
l.conn_mysql = zxJDBC.connect(JDBC_URL, username, password, JDBC_DRIVER) l.conn_mysql = zxJDBC.connect(JDBC_URL, username, password, JDBC_DRIVER)
l.conn_cursor = l.conn_mysql.cursor()
if Constant.INNODB_LOCK_WAIT_TIMEOUT in args: if Constant.INNODB_LOCK_WAIT_TIMEOUT in args:
lock_wait_time = args[Constant.INNODB_LOCK_WAIT_TIMEOUT] lock_wait_time = args[Constant.INNODB_LOCK_WAIT_TIMEOUT]
l.conn_mysql.cursor().execute("SET innodb_lock_wait_timeout = %s;" % lock_wait_time) l.conn_cursor.execute("SET innodb_lock_wait_timeout = %s;" % lock_wait_time)
try: try:
l.load_metadata() l.load_metadata()

View File

@ -23,7 +23,6 @@ class HiveLoad:
self.logger = LoggerFactory.getLogger("%s[%s]" % (self.__class__.__name__, wh_etl_exec_id)) self.logger = LoggerFactory.getLogger("%s[%s]" % (self.__class__.__name__, wh_etl_exec_id))
def load_metadata(self): def load_metadata(self):
cursor = self.conn_mysql.cursor()
load_cmd = """ load_cmd = """
DELETE FROM stg_dict_dataset WHERE db_id = {db_id}; DELETE FROM stg_dict_dataset WHERE db_id = {db_id};
@ -97,18 +96,15 @@ class HiveLoad:
; ;
""".format(source_file=self.input_schema_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id) """.format(source_file=self.input_schema_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id)
for state in load_cmd.split(";"): self.executeCommands(load_cmd)
self.logger.info(state) self.logger.info("Load dataset metadata.")
cursor.execute(state)
self.conn_mysql.commit()
cursor.close()
def load_field(self): def load_field(self):
""" """
Load fields Load fields
:return: :return:
""" """
cursor = self.conn_mysql.cursor()
load_cmd = """ load_cmd = """
DELETE FROM stg_dict_field_detail WHERE db_id = {db_id}; DELETE FROM stg_dict_field_detail WHERE db_id = {db_id};
@ -125,30 +121,26 @@ class HiveLoad:
, description=nullif(@description,'null') , description=nullif(@description,'null')
, last_modified=now(); , last_modified=now();
-- update dataset_id -- update stg_dict_field_detail dataset_id
update stg_dict_field_detail sf, dict_dataset d update stg_dict_field_detail sf, dict_dataset d
set sf.dataset_id = d.id where sf.urn = d.urn set sf.dataset_id = d.id where sf.urn = d.urn
and sf.db_id = {db_id}; and sf.db_id = {db_id};
delete from stg_dict_field_detail
where db_id = {db_id} and dataset_id is null; -- remove if not match to dataset
-- delete old record if it does not exist in this load batch anymore (but have the dataset id) -- delete old record if it does not exist in this load batch anymore (but have the dataset id)
-- join with dict_dataset to avoid right join using index. (using index will slow down the query) -- join with dict_dataset to avoid right join using index. (using index will slow down the query)
create temporary table if not exists t_deleted_fields (primary key (field_id)) create temporary table if not exists t_deleted_fields (primary key (field_id)) ENGINE=MyISAM
select x.field_id SELECT x.field_id
from stg_dict_field_detail s FROM (select dataset_id, field_name, parent_path from stg_dict_field_detail where db_id = {db_id}) s
join dict_dataset i RIGHT JOIN
on s.urn = i.urn ( select dataset_id, field_id, field_name, parent_path from dict_field_detail
and s.db_id = {db_id} where dataset_id in (select dataset_id from stg_dict_field_detail where db_id = {db_id})
right join dict_field_detail x ) x
on i.id = x.dataset_id ON s.dataset_id = x.dataset_id
and s.field_name = x.field_name AND s.field_name = x.field_name
and s.parent_path = x.parent_path AND s.parent_path = x.parent_path
where s.field_name is null WHERE s.field_name is null
and x.dataset_id in (
select d.id dataset_id
from stg_dict_field_detail k join dict_dataset d
on k.urn = d.urn
and k.db_id = {db_id}
)
; -- run time : ~2min ; -- run time : ~2min
delete from dict_field_detail where field_id in (select field_id from t_deleted_fields); delete from dict_field_detail where field_id in (select field_id from t_deleted_fields);
@ -190,7 +182,7 @@ class HiveLoad:
-- insert new ones -- insert new ones
CREATE TEMPORARY TABLE IF NOT EXISTS t_existed_field CREATE TEMPORARY TABLE IF NOT EXISTS t_existed_field
( primary key (urn, sort_id, db_id) ) ( primary key (urn, sort_id, db_id) ) ENGINE=MyISAM
AS ( AS (
SELECT sf.urn, sf.sort_id, sf.db_id, count(*) field_count SELECT sf.urn, sf.sort_id, sf.db_id, count(*) field_count
FROM stg_dict_field_detail sf FROM stg_dict_field_detail sf
@ -203,8 +195,7 @@ class HiveLoad:
group by 1,2,3 group by 1,2,3
); );
insert ignore into dict_field_detail (
insert ignore into dict_field_detail (
dataset_id, fields_layout_id, sort_id, parent_sort_id, parent_path, dataset_id, fields_layout_id, sort_id, parent_sort_id, parent_path,
field_name, namespace, data_type, data_size, is_nullable, default_value, field_name, namespace, data_type, data_size, is_nullable, default_value,
modified modified
@ -239,7 +230,6 @@ class HiveLoad:
where field_id in (select field_id from stg_dict_dataset_field_comment) where field_id in (select field_id from stg_dict_dataset_field_comment)
and is_default = 1 ) and db_id = {db_id}; and is_default = 1 ) and db_id = {db_id};
-- doesn't have this comment before, insert into it and set as default -- doesn't have this comment before, insert into it and set as default
insert ignore into dict_dataset_field_comment insert ignore into dict_dataset_field_comment
select sd.field_id, sd.comment_id, sd.dataset_id, 1 from stg_dict_dataset_field_comment sd select sd.field_id, sd.comment_id, sd.dataset_id, 1 from stg_dict_dataset_field_comment sd
@ -249,7 +239,6 @@ class HiveLoad:
where d.comment_id is null where d.comment_id is null
and sd.db_id = {db_id}; and sd.db_id = {db_id};
insert into field_comments ( insert into field_comments (
user_id, comment, created, modified, comment_crc32_checksum user_id, comment, created, modified, comment_crc32_checksum
) )
@ -266,20 +255,15 @@ class HiveLoad:
""".format(source_file=self.input_field_file, db_id=self.db_id) """.format(source_file=self.input_field_file, db_id=self.db_id)
# didn't load into final table for now self.executeCommands(load_cmd)
self.logger.info("Load dataset fields.")
for state in load_cmd.split(";"):
self.logger.info(state)
cursor.execute(state)
self.conn_mysql.commit()
cursor.close()
def load_dataset_instance(self): def load_dataset_instance(self):
""" """
Load dataset instance Load dataset instance
:return: :return:
""" """
cursor = self.conn_mysql.cursor()
load_cmd = """ load_cmd = """
DELETE FROM stg_dict_dataset_instance WHERE db_id = {db_id}; DELETE FROM stg_dict_dataset_instance WHERE db_id = {db_id};
@ -337,21 +321,15 @@ class HiveLoad:
; ;
""".format(source_file=self.input_instance_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id) """.format(source_file=self.input_instance_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id)
self.executeCommands(load_cmd)
self.logger.info("Load dataset instance.")
# didn't load into final table for now
for state in load_cmd.split(";"):
self.logger.info(state)
cursor.execute(state)
self.conn_mysql.commit()
cursor.close()
def load_dataset_dependencies(self): def load_dataset_dependencies(self):
""" """
Load dataset instance Load dataset dependencies
:return: :return:
""" """
cursor = self.conn_mysql.cursor()
load_cmd = """ load_cmd = """
DELETE FROM stg_cfg_object_name_map; DELETE FROM stg_cfg_object_name_map;
LOAD DATA LOCAL INFILE '{source_file}' LOAD DATA LOCAL INFILE '{source_file}'
@ -372,7 +350,7 @@ class HiveLoad:
-- create to be deleted table -- create to be deleted table
DROP TEMPORARY table IF EXISTS t_deleted_depend; DROP TEMPORARY table IF EXISTS t_deleted_depend;
CREATE TEMPORARY TABLE t_deleted_depend CREATE TEMPORARY TABLE t_deleted_depend ENGINE=MyISAM
AS ( AS (
SELECT DISTINCT c.obj_name_map_id SELECT DISTINCT c.obj_name_map_id
FROM cfg_object_name_map c LEFT JOIN stg_cfg_object_name_map s FROM cfg_object_name_map c LEFT JOIN stg_cfg_object_name_map s
@ -420,12 +398,15 @@ class HiveLoad:
""".format(source_file=self.input_dependency_file) """.format(source_file=self.input_dependency_file)
# didn't load into final table for now # didn't load into final table for now
self.executeCommands(load_cmd)
self.logger.info("Load dataset dependencies.")
for state in load_cmd.split(";"):
self.logger.info(state) def executeCommands(self, commands):
cursor.execute(state) for cmd in commands.split(";"):
self.logger.debug(cmd)
self.conn_cursor.execute(cmd)
self.conn_mysql.commit() self.conn_mysql.commit()
cursor.close()
if __name__ == "__main__": if __name__ == "__main__":
@ -446,10 +427,11 @@ if __name__ == "__main__":
l.db_id = args[Constant.DB_ID_KEY] l.db_id = args[Constant.DB_ID_KEY]
l.wh_etl_exec_id = args[Constant.WH_EXEC_ID_KEY] l.wh_etl_exec_id = args[Constant.WH_EXEC_ID_KEY]
l.conn_mysql = zxJDBC.connect(JDBC_URL, username, password, JDBC_DRIVER) l.conn_mysql = zxJDBC.connect(JDBC_URL, username, password, JDBC_DRIVER)
l.conn_cursor = l.conn_mysql.cursor()
if Constant.INNODB_LOCK_WAIT_TIMEOUT in args: if Constant.INNODB_LOCK_WAIT_TIMEOUT in args:
lock_wait_time = args[Constant.INNODB_LOCK_WAIT_TIMEOUT] lock_wait_time = args[Constant.INNODB_LOCK_WAIT_TIMEOUT]
l.conn_mysql.cursor().execute("SET innodb_lock_wait_timeout = %s;" % lock_wait_time) l.conn_cursor.execute("SET innodb_lock_wait_timeout = %s;" % lock_wait_time)
try: try:
l.load_metadata() l.load_metadata()

View File

@ -15,7 +15,7 @@
from com.ziclix.python.sql import zxJDBC from com.ziclix.python.sql import zxJDBC
from wherehows.common import Constant from wherehows.common import Constant
from org.slf4j import LoggerFactory from org.slf4j import LoggerFactory
import sys, os, datetime import sys, datetime
class OracleLoad: class OracleLoad:
@ -159,24 +159,25 @@ class OracleLoad:
analyze table {field_comments}; analyze table {field_comments};
-- update stg_dict_field_detail dataset_id
update stg_dict_field_detail sf, dict_dataset d
set sf.dataset_id = d.id where sf.urn = d.urn
and sf.db_id = {db_id};
delete from stg_dict_field_detail
where db_id = {db_id} and dataset_id is null; -- remove if not match to dataset
-- delete old record if it does not exist in this load batch anymore (but have the dataset id) -- delete old record if it does not exist in this load batch anymore (but have the dataset id)
create temporary table if not exists t_deleted_fields (primary key (field_id)) create temporary table if not exists t_deleted_fields (primary key (field_id)) ENGINE=MyISAM
select x.field_id SELECT x.field_id
from stg_dict_field_detail s FROM (select dataset_id, field_name, parent_path from stg_dict_field_detail where db_id = {db_id}) s
join {dict_dataset} i RIGHT JOIN
on s.urn = i.urn ( select dataset_id, field_id, field_name, parent_path from dict_field_detail
and s.db_id = {db_id} where dataset_id in (select dataset_id from stg_dict_field_detail where db_id = {db_id})
right join {dict_field_detail} x ) x
on i.id = x.dataset_id ON s.dataset_id = x.dataset_id
and s.field_name = x.field_name AND s.field_name = x.field_name
and s.parent_path = x.parent_path AND s.parent_path = x.parent_path
where s.field_name is null WHERE s.field_name is null
and x.dataset_id in (
select d.id dataset_id
from stg_dict_field_detail k join {dict_dataset} d
on k.urn = d.urn
and k.db_id = {db_id}
)
; -- run time : ~2min ; -- run time : ~2min
delete from {dict_field_detail} where field_id in (select field_id from t_deleted_fields); delete from {dict_field_detail} where field_id in (select field_id from t_deleted_fields);
@ -186,12 +187,10 @@ class OracleLoad:
( (
select x.field_id, s.* select x.field_id, s.*
from stg_dict_field_detail s from stg_dict_field_detail s
join {dict_dataset} d
on s.urn = d.urn
join {dict_field_detail} x join {dict_field_detail} x
on s.field_name = x.field_name on s.field_name = x.field_name
and coalesce(s.parent_path, '*') = coalesce(x.parent_path, '*') and coalesce(s.parent_path, '*') = coalesce(x.parent_path, '*')
and d.id = x.dataset_id and s.dataset_id = x.dataset_id
where s.db_id = {db_id} where s.db_id = {db_id}
and (x.sort_id <> s.sort_id and (x.sort_id <> s.sort_id
or x.parent_sort_id <> s.parent_sort_id or x.parent_sort_id <> s.parent_sort_id
@ -224,12 +223,11 @@ class OracleLoad:
field_name, namespace, data_type, data_size, is_nullable, default_value, modified field_name, namespace, data_type, data_size, is_nullable, default_value, modified
) )
select select
d.id, 0, sf.sort_id, sf.parent_sort_id, sf.parent_path, sf.dataset_id, 0, sf.sort_id, sf.parent_sort_id, sf.parent_path,
sf.field_name, sf.namespace, sf.data_type, sf.data_size, sf.is_nullable, sf.default_value, now() sf.field_name, sf.namespace, sf.data_type, sf.data_size, sf.is_nullable, sf.default_value, now()
from stg_dict_field_detail sf join {dict_dataset} d from stg_dict_field_detail sf
on sf.urn = d.urn
left join {dict_field_detail} t left join {dict_field_detail} t
on d.id = t.dataset_id on sf.dataset_id = t.dataset_id
and sf.field_name = t.field_name and sf.field_name = t.field_name
and sf.parent_path = t.parent_path and sf.parent_path = t.parent_path
where db_id = {db_id} and t.field_id is null where db_id = {db_id} and t.field_id is null
@ -242,13 +240,12 @@ class OracleLoad:
-- insert -- insert
insert into stg_dict_dataset_field_comment insert into stg_dict_dataset_field_comment
select t.field_id field_id, fc.id comment_id, d.id dataset_id, {db_id} select t.field_id field_id, fc.id comment_id, sf.dataset_id dataset_id, {db_id}
from stg_dict_field_detail sf join {dict_dataset} d from stg_dict_field_detail sf
on sf.urn = d.urn
join {field_comments} fc join {field_comments} fc
on sf.description = fc.comment on sf.description = fc.comment
join {dict_field_detail} t join {dict_field_detail} t
on d.id = t.dataset_id on sf.dataset_id = t.dataset_id
and sf.field_name = t.field_name and sf.field_name = t.field_name
and sf.parent_path = t.parent_path and sf.parent_path = t.parent_path
where sf.db_id = {db_id}; where sf.db_id = {db_id};
@ -290,7 +287,7 @@ class OracleLoad:
(urn,ref_urn,data) (urn,ref_urn,data)
SET db_id = {db_id}; SET db_id = {db_id};
-- update reference id in stagging table -- update reference id in staging table
UPDATE stg_dict_dataset_sample s UPDATE stg_dict_dataset_sample s
LEFT JOIN {dict_dataset} d ON s.ref_urn = d.urn LEFT JOIN {dict_dataset} d ON s.ref_urn = d.urn
SET s.ref_id = d.id SET s.ref_id = d.id

View File

@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# #
import sys import sys, datetime
from com.ziclix.python.sql import zxJDBC from com.ziclix.python.sql import zxJDBC
from distutils.util import strtobool from distutils.util import strtobool
from wherehows.common import Constant from wherehows.common import Constant
@ -24,7 +24,6 @@ class TeradataLoad:
self.logger = LoggerFactory.getLogger('jython script : ' + self.__class__.__name__) self.logger = LoggerFactory.getLogger('jython script : ' + self.__class__.__name__)
def load_metadata(self): def load_metadata(self):
cursor = self.conn_mysql.cursor()
load_cmd = ''' load_cmd = '''
DELETE FROM stg_dict_dataset WHERE db_id = {db_id}; DELETE FROM stg_dict_dataset WHERE db_id = {db_id};
@ -173,14 +172,11 @@ class TeradataLoad:
; ;
'''.format(source_file=self.input_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id) '''.format(source_file=self.input_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id)
for state in load_cmd.split(";"): self.executeCommands(load_cmd)
self.logger.debug(state) self.logger.info("Finish loading metadata")
cursor.execute(state)
self.conn_mysql.commit()
cursor.close()
def load_field(self): def load_field(self):
cursor = self.conn_mysql.cursor()
load_cmd = ''' load_cmd = '''
DELETE FROM stg_dict_field_detail where db_id = {db_id}; DELETE FROM stg_dict_field_detail where db_id = {db_id};
@ -206,24 +202,25 @@ class TeradataLoad:
and (char_length(trim(description)) = 0 and (char_length(trim(description)) = 0
or description in ('null', 'N/A', 'nothing', 'empty', 'none')); or description in ('null', 'N/A', 'nothing', 'empty', 'none'));
-- update stg_dict_field_detail dataset_id
update stg_dict_field_detail sf, dict_dataset d
set sf.dataset_id = d.id where sf.urn = d.urn
and sf.db_id = {db_id};
delete from stg_dict_field_detail
where db_id = {db_id} and dataset_id is null; -- remove if not match to dataset
-- delete old record if it does not exist in this load batch anymore (but have the dataset id) -- delete old record if it does not exist in this load batch anymore (but have the dataset id)
create temporary table if not exists t_deleted_fields (primary key (field_id)) create temporary table if not exists t_deleted_fields (primary key (field_id)) ENGINE=MyISAM
select x.field_id SELECT x.field_id
from stg_dict_field_detail s FROM (select dataset_id, field_name, parent_path from stg_dict_field_detail where db_id = {db_id}) s
join dict_dataset i RIGHT JOIN
on s.urn = i.urn ( select dataset_id, field_id, field_name, parent_path from dict_field_detail
and s.db_id = {db_id} where dataset_id in (select dataset_id from stg_dict_field_detail where db_id = {db_id})
right join dict_field_detail x ) x
on i.id = x.dataset_id ON s.dataset_id = x.dataset_id
and s.field_name = x.field_name AND s.field_name = x.field_name
and s.parent_path = x.parent_path AND s.parent_path = x.parent_path
where s.field_name is null WHERE s.field_name is null
and x.dataset_id in (
select d.id dataset_id
from stg_dict_field_detail k join dict_dataset d
on k.urn = d.urn
and k.db_id = {db_id}
)
; -- run time : ~2min ; -- run time : ~2min
delete from dict_field_detail where field_id in (select field_id from t_deleted_fields); delete from dict_field_detail where field_id in (select field_id from t_deleted_fields);
@ -232,12 +229,11 @@ class TeradataLoad:
update dict_field_detail t join update dict_field_detail t join
( (
select x.field_id, s.* select x.field_id, s.*
from stg_dict_field_detail s join dict_dataset d from stg_dict_field_detail s
on s.urn = d.urn
join dict_field_detail x join dict_field_detail x
on s.field_name = x.field_name on s.field_name = x.field_name
and coalesce(s.parent_path, '*') = coalesce(x.parent_path, '*') and coalesce(s.parent_path, '*') = coalesce(x.parent_path, '*')
and d.id = x.dataset_id and s.dataset_id = x.dataset_id
where s.db_id = {db_id} where s.db_id = {db_id}
and (x.sort_id <> s.sort_id and (x.sort_id <> s.sort_id
or x.parent_sort_id <> s.parent_sort_id or x.parent_sort_id <> s.parent_sort_id
@ -274,7 +270,7 @@ class TeradataLoad:
modified modified
) )
select select
d.id, s.dataset_id,
0 as fields_layout_id, 0 as fields_layout_id,
s.sort_id, s.sort_id,
0 parent_sort_id, 0 parent_sort_id,
@ -285,10 +281,9 @@ class TeradataLoad:
s.data_scale, s.data_scale,
s.is_nullable, s.is_nullable,
now() now()
from stg_dict_field_detail s join dict_dataset d from stg_dict_field_detail s
on s.urn = d.urn
left join dict_field_detail f left join dict_field_detail f
on d.id = f.dataset_id on s.dataset_id = f.dataset_id
and s.field_name = f.field_name and s.field_name = f.field_name
where db_id = {db_id} and f.field_id is null; where db_id = {db_id} and f.field_id is null;
@ -299,13 +294,12 @@ class TeradataLoad:
-- insert -- insert
insert into stg_dict_dataset_field_comment insert into stg_dict_dataset_field_comment
select t.field_id field_id, fc.id comment_id, d.id dataset_id, {db_id} select t.field_id field_id, fc.id comment_id, sf.dataset_id dataset_id, {db_id}
from stg_dict_field_detail sf join dict_dataset d from stg_dict_field_detail sf
on sf.urn = d.urn
join field_comments fc join field_comments fc
on sf.description = fc.comment on sf.description = fc.comment
join dict_field_detail t join dict_field_detail t
on d.id = t.dataset_id on sf.dataset_id = t.dataset_id
and sf.field_name = t.field_name and sf.field_name = t.field_name
and sf.parent_path = t.parent_path and sf.parent_path = t.parent_path
where sf.db_id = {db_id}; where sf.db_id = {db_id};
@ -328,11 +322,9 @@ class TeradataLoad:
and sd.db_id = {db_id}; and sd.db_id = {db_id};
'''.format(source_file=self.input_field_file, db_id=self.db_id) '''.format(source_file=self.input_field_file, db_id=self.db_id)
for state in load_cmd.split(";"): self.executeCommands(load_cmd)
self.logger.debug(state) self.logger.info("Finish loading fields ")
cursor.execute(state)
self.conn_mysql.commit()
cursor.close()
def load_sample(self): def load_sample(self):
load_cmd = ''' load_cmd = '''
@ -343,11 +335,20 @@ class TeradataLoad:
(urn,ref_urn,data) (urn,ref_urn,data)
SET db_id = {db_id}; SET db_id = {db_id};
-- update reference id in stagging table -- update dataset id in staging table
UPDATE stg_dict_dataset_sample s UPDATE stg_dict_dataset_sample s
LEFT JOIN dict_dataset d ON s.ref_urn = d.urn JOIN dict_dataset d ON s.db_id = {db_id} and s.urn = d.urn
SET s.ref_id = d.id SET s.dataset_id = d.id;
WHERE s.db_id = {db_id};
-- update reference id in staging table
UPDATE stg_dict_dataset_sample s
JOIN (
select dataset_id, id from
(select dataset_id, ref_urn from stg_dict_dataset_sample
where db_id = 3 and ref_urn > '') st
join dict_dataset on urn = ref_urn) d
ON s.dataset_id = d.dataset_id
SET s.ref_id = d.id;
-- first insert ref_id as 0 -- first insert ref_id as 0
INSERT INTO dict_dataset_sample INSERT INTO dict_dataset_sample
@ -357,27 +358,24 @@ class TeradataLoad:
`data`, `data`,
created created
) )
select d.id as dataset_id, s.urn, s.ref_id, s.data, now() SELECT * from
from stg_dict_dataset_sample s left join dict_dataset d on d.urn = s.urn (select dataset_id, urn, ref_id s_ref_id, `data` s_data, now()
where s.db_id = {db_id} from stg_dict_dataset_sample WHERE db_id = {db_id}) s
on duplicate key update ON DUPLICATE KEY UPDATE
`data`=s.data, modified=now(); ref_id = COALESCE(s_ref_id, ref_id),
`data`=s_data,
modified=now();
-- update reference id in final table
UPDATE dict_dataset_sample d
RIGHT JOIN stg_dict_dataset_sample s ON d.urn = s.urn
SET d.ref_id = s.ref_id
WHERE s.db_id = {db_id} AND d.ref_id = 0;
'''.format(source_file=self.input_sampledata_file, db_id=self.db_id) '''.format(source_file=self.input_sampledata_file, db_id=self.db_id)
cursor = self.conn_mysql.cursor() self.executeCommands(load_cmd)
for state in load_cmd.split(";"): self.logger.info("Finish loading samples ")
self.logger.debug(state)
cursor.execute(state)
def executeCommands(self, commands):
for cmd in commands.split(";"):
self.logger.debug(cmd)
self.conn_cursor.execute(cmd)
self.conn_mysql.commit() self.conn_mysql.commit()
cursor.close()
if __name__ == "__main__": if __name__ == "__main__":
@ -405,10 +403,11 @@ if __name__ == "__main__":
l.db_id = args[Constant.DB_ID_KEY] l.db_id = args[Constant.DB_ID_KEY]
l.wh_etl_exec_id = args[Constant.WH_EXEC_ID_KEY] l.wh_etl_exec_id = args[Constant.WH_EXEC_ID_KEY]
l.conn_mysql = zxJDBC.connect(JDBC_URL, username, password, JDBC_DRIVER) l.conn_mysql = zxJDBC.connect(JDBC_URL, username, password, JDBC_DRIVER)
l.conn_cursor = l.conn_mysql.cursor()
if Constant.INNODB_LOCK_WAIT_TIMEOUT in args: if Constant.INNODB_LOCK_WAIT_TIMEOUT in args:
lock_wait_time = args[Constant.INNODB_LOCK_WAIT_TIMEOUT] lock_wait_time = args[Constant.INNODB_LOCK_WAIT_TIMEOUT]
l.conn_mysql.cursor().execute("SET innodb_lock_wait_timeout = %s;" % lock_wait_time) l.conn_cursor.execute("SET innodb_lock_wait_timeout = %s;" % lock_wait_time)
try: try:
l.load_metadata() l.load_metadata()

View File

@ -34,4 +34,18 @@ public class TeradataMetadataEtlTest {
TeradataMetadataEtl t = new TeradataMetadataEtl(3, 0L); TeradataMetadataEtl t = new TeradataMetadataEtl(3, 0L);
t.extract(); t.extract();
} }
@Test(groups = {"needConfig"})
public void testTransform()
throws Exception {
TeradataMetadataEtl t = new TeradataMetadataEtl(3, 0L);
t.transform();
}
@Test(groups = {"needConfig"})
public void testLoad()
throws Exception {
TeradataMetadataEtl t = new TeradataMetadataEtl(3, 0L);
t.load();
}
} }

View File

@ -19,7 +19,7 @@
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder> <encoder>
<pattern>%coloredLevel - %logger - %message%n%xException</pattern> <pattern>%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n</pattern>
</encoder> </encoder>
</appender> </appender>