diff --git a/backend-service/conf/application.conf b/backend-service/conf/application.conf index a151aa6ef3..a3958a1aaf 100644 --- a/backend-service/conf/application.conf +++ b/backend-service/conf/application.conf @@ -41,7 +41,6 @@ db.wherehows.driver = com.mysql.jdbc.Driver db.wherehows.url = ${WHZ_DB_URL} db.wherehows.username = ${WHZ_DB_USERNAME} db.wherehows.password = ${WHZ_DB_PASSWORD} -db.wherehows.host = ${WHZ_DB_HOST} # You can expose this datasource via JNDI if needed (Useful for JPA) # db.default.jndiName=DefaultDS diff --git a/backend-service/conf/logback.xml b/backend-service/conf/logback.xml index 72854595d0..81da7aa6ac 100644 --- a/backend-service/conf/logback.xml +++ b/backend-service/conf/logback.xml @@ -19,7 +19,7 @@ - %coloredLevel - %logger - %message%n%xException + %d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n diff --git a/metadata-etl/src/main/resources/jython/HdfsLoad.py b/metadata-etl/src/main/resources/jython/HdfsLoad.py index b3e6d65c28..1b84689bbb 100644 --- a/metadata-etl/src/main/resources/jython/HdfsLoad.py +++ b/metadata-etl/src/main/resources/jython/HdfsLoad.py @@ -27,7 +27,6 @@ class HdfsLoad: Load dataset metadata into final table :return: nothing """ - cursor = self.conn_mysql.cursor() load_cmd = ''' DELETE FROM stg_dict_dataset WHERE db_id = {db_id}; @@ -197,15 +196,11 @@ class HdfsLoad: ; '''.format(source_file=self.input_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id) - for state in load_cmd.split(";"): - self.logger.debug(state) - cursor.execute(state) - self.conn_mysql.commit() - cursor.close() + self.executeCommands(load_cmd) self.logger.info("finish loading hdfs metadata db_id={db_id} to dict_dataset".format(db_id=self.db_id)) + def load_field(self): - cursor = self.conn_mysql.cursor() load_field_cmd = ''' DELETE FROM stg_dict_field_detail where db_id = {db_id}; @@ -242,24 +237,25 @@ class HdfsLoad: analyze table field_comments; + -- update stg_dict_field_detail dataset_id + update stg_dict_field_detail sf, dict_dataset d + set sf.dataset_id = d.id where sf.urn = d.urn + and sf.db_id = {db_id}; + delete from stg_dict_field_detail + where db_id = {db_id} and dataset_id is null; -- remove if not match to dataset + -- delete old record if it does not exist in this load batch anymore (but have the dataset id) - create temporary table if not exists t_deleted_fields (primary key (field_id)) - select x.field_id - from stg_dict_field_detail s - join dict_dataset i - on s.urn = i.urn - and s.db_id = {db_id} - right join dict_field_detail x - on i.id = x.dataset_id - and s.field_name = x.field_name - and s.parent_path = x.parent_path - where s.field_name is null - and x.dataset_id in ( - select d.id dataset_id - from stg_dict_field_detail k join dict_dataset d - on k.urn = d.urn - and k.db_id = {db_id} - ) + create temporary table if not exists t_deleted_fields (primary key (field_id)) ENGINE=MyISAM + SELECT x.field_id + FROM (select dataset_id, field_name, parent_path from stg_dict_field_detail where db_id = {db_id}) s + RIGHT JOIN + ( select dataset_id, field_id, field_name, parent_path from dict_field_detail + where dataset_id in (select dataset_id from stg_dict_field_detail where db_id = {db_id}) + ) x + ON s.dataset_id = x.dataset_id + AND s.field_name = x.field_name + AND s.parent_path = x.parent_path + WHERE s.field_name is null ; -- run time : ~2min delete from dict_field_detail where field_id in (select field_id from t_deleted_fields); @@ -269,12 +265,10 @@ class HdfsLoad: ( select x.field_id, s.* from (select * from stg_dict_field_detail where db_id = {db_id}) s - join dict_dataset d - on s.urn = d.urn join dict_field_detail x on s.field_name = x.field_name and coalesce(s.parent_path, '*') = coalesce(x.parent_path, '*') - and d.id = x.dataset_id + and s.dataset_id = x.dataset_id where (x.sort_id <> s.sort_id or x.parent_sort_id <> s.parent_sort_id or x.data_type <> s.data_type @@ -307,12 +301,11 @@ class HdfsLoad: modified ) select - d.id, 0, sf.sort_id, sf.parent_sort_id, sf.parent_path, + sf.dataset_id, 0, sf.sort_id, sf.parent_sort_id, sf.parent_path, sf.field_name, sf.namespace, sf.data_type, sf.data_size, sf.is_nullable, sf.default_value, now() - from stg_dict_field_detail sf join dict_dataset d - on sf.urn = d.urn + from stg_dict_field_detail sf left join dict_field_detail t - on d.id = t.dataset_id + on sf.dataset_id = t.dataset_id and sf.field_name = t.field_name and sf.parent_path = t.parent_path where db_id = {db_id} and t.field_id is null @@ -325,13 +318,12 @@ class HdfsLoad: -- insert insert into stg_dict_dataset_field_comment - select t.field_id field_id, fc.id comment_id, d.id dataset_id, {db_id} - from stg_dict_field_detail sf join dict_dataset d - on sf.urn = d.urn + select t.field_id field_id, fc.id comment_id, sf.dataset_id dataset_id, {db_id} + from stg_dict_field_detail sf join field_comments fc on sf.description = fc.comment join dict_field_detail t - on d.id = t.dataset_id + on sf.dataset_id = t.dataset_id and sf.field_name = t.field_name and sf.parent_path = t.parent_path where sf.db_id = {db_id}; @@ -357,15 +349,12 @@ class HdfsLoad: DELETE FROM stg_dict_field_detail where db_id = {db_id}; '''.format(source_file=self.input_field_file, db_id=self.db_id) - for state in load_field_cmd.split(";"): - self.logger.debug(state) - cursor.execute(state) - self.conn_mysql.commit() - cursor.close() + + self.executeCommands(load_field_cmd) self.logger.info("finish loading hdfs metadata db_id={db_id} to dict_field_detail".format(db_id=self.db_id)) + def load_sample(self): - cursor = self.conn_mysql.cursor() load_sample_cmd = ''' DELETE FROM stg_dict_dataset_sample where db_id = {db_id}; @@ -375,11 +364,11 @@ class HdfsLoad: (urn,ref_urn,data) SET db_id = {db_id}; - -- update reference id in stagging table - UPDATE stg_dict_dataset_sample s + -- update reference id in staging table + UPDATE stg_dict_dataset_sample s JOIN dict_dataset d ON s.ref_urn = d.urn SET s.ref_id = d.id - WHERE s.db_id = {db_id} AND s.ref_urn > '' AND s.ref_urn <> 'null'; + WHERE s.db_id = {db_id} AND s.ref_urn > ''; -- first insert ref_id as 0 INSERT INTO dict_dataset_sample @@ -401,14 +390,18 @@ class HdfsLoad: SET d.ref_id = s.ref_id WHERE s.db_id = {db_id} AND d.ref_id = 0; '''.format(source_file=self.input_sample_file, db_id=self.db_id) - for state in load_sample_cmd.split(";"): - self.logger.debug(state) - cursor.execute(state) - self.conn_mysql.commit() - cursor.close() + + self.executeCommands(load_sample_cmd) self.logger.info("finish loading hdfs sample data db_id={db_id} to dict_dataset_sample".format(db_id=self.db_id)) + def executeCommands(self, commands): + for cmd in commands.split(";"): + self.logger.debug(cmd) + self.conn_cursor.execute(cmd) + self.conn_mysql.commit() + + if __name__ == "__main__": args = sys.argv[1] @@ -426,10 +419,11 @@ if __name__ == "__main__": l.db_id = args[Constant.DB_ID_KEY] l.wh_etl_exec_id = args[Constant.WH_EXEC_ID_KEY] l.conn_mysql = zxJDBC.connect(JDBC_URL, username, password, JDBC_DRIVER) + l.conn_cursor = l.conn_mysql.cursor() if Constant.INNODB_LOCK_WAIT_TIMEOUT in args: lock_wait_time = args[Constant.INNODB_LOCK_WAIT_TIMEOUT] - l.conn_mysql.cursor().execute("SET innodb_lock_wait_timeout = %s;" % lock_wait_time) + l.conn_cursor.execute("SET innodb_lock_wait_timeout = %s;" % lock_wait_time) try: l.load_metadata() diff --git a/metadata-etl/src/main/resources/jython/HiveLoad.py b/metadata-etl/src/main/resources/jython/HiveLoad.py index 72421d0b24..6ee4d480a5 100644 --- a/metadata-etl/src/main/resources/jython/HiveLoad.py +++ b/metadata-etl/src/main/resources/jython/HiveLoad.py @@ -23,7 +23,6 @@ class HiveLoad: self.logger = LoggerFactory.getLogger("%s[%s]" % (self.__class__.__name__, wh_etl_exec_id)) def load_metadata(self): - cursor = self.conn_mysql.cursor() load_cmd = """ DELETE FROM stg_dict_dataset WHERE db_id = {db_id}; @@ -97,18 +96,15 @@ class HiveLoad: ; """.format(source_file=self.input_schema_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id) - for state in load_cmd.split(";"): - self.logger.info(state) - cursor.execute(state) - self.conn_mysql.commit() - cursor.close() + self.executeCommands(load_cmd) + self.logger.info("Load dataset metadata.") + def load_field(self): """ Load fields :return: """ - cursor = self.conn_mysql.cursor() load_cmd = """ DELETE FROM stg_dict_field_detail WHERE db_id = {db_id}; @@ -125,30 +121,26 @@ class HiveLoad: , description=nullif(@description,'null') , last_modified=now(); - -- update dataset_id + -- update stg_dict_field_detail dataset_id update stg_dict_field_detail sf, dict_dataset d set sf.dataset_id = d.id where sf.urn = d.urn and sf.db_id = {db_id}; + delete from stg_dict_field_detail + where db_id = {db_id} and dataset_id is null; -- remove if not match to dataset -- delete old record if it does not exist in this load batch anymore (but have the dataset id) -- join with dict_dataset to avoid right join using index. (using index will slow down the query) - create temporary table if not exists t_deleted_fields (primary key (field_id)) - select x.field_id - from stg_dict_field_detail s - join dict_dataset i - on s.urn = i.urn - and s.db_id = {db_id} - right join dict_field_detail x - on i.id = x.dataset_id - and s.field_name = x.field_name - and s.parent_path = x.parent_path - where s.field_name is null - and x.dataset_id in ( - select d.id dataset_id - from stg_dict_field_detail k join dict_dataset d - on k.urn = d.urn - and k.db_id = {db_id} - ) + create temporary table if not exists t_deleted_fields (primary key (field_id)) ENGINE=MyISAM + SELECT x.field_id + FROM (select dataset_id, field_name, parent_path from stg_dict_field_detail where db_id = {db_id}) s + RIGHT JOIN + ( select dataset_id, field_id, field_name, parent_path from dict_field_detail + where dataset_id in (select dataset_id from stg_dict_field_detail where db_id = {db_id}) + ) x + ON s.dataset_id = x.dataset_id + AND s.field_name = x.field_name + AND s.parent_path = x.parent_path + WHERE s.field_name is null ; -- run time : ~2min delete from dict_field_detail where field_id in (select field_id from t_deleted_fields); @@ -190,7 +182,7 @@ class HiveLoad: -- insert new ones CREATE TEMPORARY TABLE IF NOT EXISTS t_existed_field - ( primary key (urn, sort_id, db_id) ) + ( primary key (urn, sort_id, db_id) ) ENGINE=MyISAM AS ( SELECT sf.urn, sf.sort_id, sf.db_id, count(*) field_count FROM stg_dict_field_detail sf @@ -203,8 +195,7 @@ class HiveLoad: group by 1,2,3 ); - - insert ignore into dict_field_detail ( + insert ignore into dict_field_detail ( dataset_id, fields_layout_id, sort_id, parent_sort_id, parent_path, field_name, namespace, data_type, data_size, is_nullable, default_value, modified @@ -239,7 +230,6 @@ class HiveLoad: where field_id in (select field_id from stg_dict_dataset_field_comment) and is_default = 1 ) and db_id = {db_id}; - -- doesn't have this comment before, insert into it and set as default insert ignore into dict_dataset_field_comment select sd.field_id, sd.comment_id, sd.dataset_id, 1 from stg_dict_dataset_field_comment sd @@ -249,7 +239,6 @@ class HiveLoad: where d.comment_id is null and sd.db_id = {db_id}; - insert into field_comments ( user_id, comment, created, modified, comment_crc32_checksum ) @@ -266,20 +255,15 @@ class HiveLoad: """.format(source_file=self.input_field_file, db_id=self.db_id) - # didn't load into final table for now + self.executeCommands(load_cmd) + self.logger.info("Load dataset fields.") - for state in load_cmd.split(";"): - self.logger.info(state) - cursor.execute(state) - self.conn_mysql.commit() - cursor.close() def load_dataset_instance(self): """ Load dataset instance :return: """ - cursor = self.conn_mysql.cursor() load_cmd = """ DELETE FROM stg_dict_dataset_instance WHERE db_id = {db_id}; @@ -337,21 +321,15 @@ class HiveLoad: ; """.format(source_file=self.input_instance_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id) + self.executeCommands(load_cmd) + self.logger.info("Load dataset instance.") - # didn't load into final table for now - - for state in load_cmd.split(";"): - self.logger.info(state) - cursor.execute(state) - self.conn_mysql.commit() - cursor.close() def load_dataset_dependencies(self): """ - Load dataset instance + Load dataset dependencies :return: """ - cursor = self.conn_mysql.cursor() load_cmd = """ DELETE FROM stg_cfg_object_name_map; LOAD DATA LOCAL INFILE '{source_file}' @@ -372,7 +350,7 @@ class HiveLoad: -- create to be deleted table DROP TEMPORARY table IF EXISTS t_deleted_depend; - CREATE TEMPORARY TABLE t_deleted_depend + CREATE TEMPORARY TABLE t_deleted_depend ENGINE=MyISAM AS ( SELECT DISTINCT c.obj_name_map_id FROM cfg_object_name_map c LEFT JOIN stg_cfg_object_name_map s @@ -420,12 +398,15 @@ class HiveLoad: """.format(source_file=self.input_dependency_file) # didn't load into final table for now + self.executeCommands(load_cmd) + self.logger.info("Load dataset dependencies.") - for state in load_cmd.split(";"): - self.logger.info(state) - cursor.execute(state) + + def executeCommands(self, commands): + for cmd in commands.split(";"): + self.logger.debug(cmd) + self.conn_cursor.execute(cmd) self.conn_mysql.commit() - cursor.close() if __name__ == "__main__": @@ -446,10 +427,11 @@ if __name__ == "__main__": l.db_id = args[Constant.DB_ID_KEY] l.wh_etl_exec_id = args[Constant.WH_EXEC_ID_KEY] l.conn_mysql = zxJDBC.connect(JDBC_URL, username, password, JDBC_DRIVER) + l.conn_cursor = l.conn_mysql.cursor() if Constant.INNODB_LOCK_WAIT_TIMEOUT in args: lock_wait_time = args[Constant.INNODB_LOCK_WAIT_TIMEOUT] - l.conn_mysql.cursor().execute("SET innodb_lock_wait_timeout = %s;" % lock_wait_time) + l.conn_cursor.execute("SET innodb_lock_wait_timeout = %s;" % lock_wait_time) try: l.load_metadata() diff --git a/metadata-etl/src/main/resources/jython/OracleLoad.py b/metadata-etl/src/main/resources/jython/OracleLoad.py index 0785293920..8895a12aca 100644 --- a/metadata-etl/src/main/resources/jython/OracleLoad.py +++ b/metadata-etl/src/main/resources/jython/OracleLoad.py @@ -15,7 +15,7 @@ from com.ziclix.python.sql import zxJDBC from wherehows.common import Constant from org.slf4j import LoggerFactory -import sys, os, datetime +import sys, datetime class OracleLoad: @@ -159,24 +159,25 @@ class OracleLoad: analyze table {field_comments}; + -- update stg_dict_field_detail dataset_id + update stg_dict_field_detail sf, dict_dataset d + set sf.dataset_id = d.id where sf.urn = d.urn + and sf.db_id = {db_id}; + delete from stg_dict_field_detail + where db_id = {db_id} and dataset_id is null; -- remove if not match to dataset + -- delete old record if it does not exist in this load batch anymore (but have the dataset id) - create temporary table if not exists t_deleted_fields (primary key (field_id)) - select x.field_id - from stg_dict_field_detail s - join {dict_dataset} i - on s.urn = i.urn - and s.db_id = {db_id} - right join {dict_field_detail} x - on i.id = x.dataset_id - and s.field_name = x.field_name - and s.parent_path = x.parent_path - where s.field_name is null - and x.dataset_id in ( - select d.id dataset_id - from stg_dict_field_detail k join {dict_dataset} d - on k.urn = d.urn - and k.db_id = {db_id} - ) + create temporary table if not exists t_deleted_fields (primary key (field_id)) ENGINE=MyISAM + SELECT x.field_id + FROM (select dataset_id, field_name, parent_path from stg_dict_field_detail where db_id = {db_id}) s + RIGHT JOIN + ( select dataset_id, field_id, field_name, parent_path from dict_field_detail + where dataset_id in (select dataset_id from stg_dict_field_detail where db_id = {db_id}) + ) x + ON s.dataset_id = x.dataset_id + AND s.field_name = x.field_name + AND s.parent_path = x.parent_path + WHERE s.field_name is null ; -- run time : ~2min delete from {dict_field_detail} where field_id in (select field_id from t_deleted_fields); @@ -186,12 +187,10 @@ class OracleLoad: ( select x.field_id, s.* from stg_dict_field_detail s - join {dict_dataset} d - on s.urn = d.urn join {dict_field_detail} x on s.field_name = x.field_name and coalesce(s.parent_path, '*') = coalesce(x.parent_path, '*') - and d.id = x.dataset_id + and s.dataset_id = x.dataset_id where s.db_id = {db_id} and (x.sort_id <> s.sort_id or x.parent_sort_id <> s.parent_sort_id @@ -224,12 +223,11 @@ class OracleLoad: field_name, namespace, data_type, data_size, is_nullable, default_value, modified ) select - d.id, 0, sf.sort_id, sf.parent_sort_id, sf.parent_path, + sf.dataset_id, 0, sf.sort_id, sf.parent_sort_id, sf.parent_path, sf.field_name, sf.namespace, sf.data_type, sf.data_size, sf.is_nullable, sf.default_value, now() - from stg_dict_field_detail sf join {dict_dataset} d - on sf.urn = d.urn + from stg_dict_field_detail sf left join {dict_field_detail} t - on d.id = t.dataset_id + on sf.dataset_id = t.dataset_id and sf.field_name = t.field_name and sf.parent_path = t.parent_path where db_id = {db_id} and t.field_id is null @@ -242,13 +240,12 @@ class OracleLoad: -- insert insert into stg_dict_dataset_field_comment - select t.field_id field_id, fc.id comment_id, d.id dataset_id, {db_id} - from stg_dict_field_detail sf join {dict_dataset} d - on sf.urn = d.urn + select t.field_id field_id, fc.id comment_id, sf.dataset_id dataset_id, {db_id} + from stg_dict_field_detail sf join {field_comments} fc on sf.description = fc.comment join {dict_field_detail} t - on d.id = t.dataset_id + on sf.dataset_id = t.dataset_id and sf.field_name = t.field_name and sf.parent_path = t.parent_path where sf.db_id = {db_id}; @@ -290,7 +287,7 @@ class OracleLoad: (urn,ref_urn,data) SET db_id = {db_id}; - -- update reference id in stagging table + -- update reference id in staging table UPDATE stg_dict_dataset_sample s LEFT JOIN {dict_dataset} d ON s.ref_urn = d.urn SET s.ref_id = d.id diff --git a/metadata-etl/src/main/resources/jython/TeradataLoad.py b/metadata-etl/src/main/resources/jython/TeradataLoad.py index 520cb3d42b..91f740d098 100644 --- a/metadata-etl/src/main/resources/jython/TeradataLoad.py +++ b/metadata-etl/src/main/resources/jython/TeradataLoad.py @@ -12,7 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # -import sys +import sys, datetime from com.ziclix.python.sql import zxJDBC from distutils.util import strtobool from wherehows.common import Constant @@ -24,7 +24,6 @@ class TeradataLoad: self.logger = LoggerFactory.getLogger('jython script : ' + self.__class__.__name__) def load_metadata(self): - cursor = self.conn_mysql.cursor() load_cmd = ''' DELETE FROM stg_dict_dataset WHERE db_id = {db_id}; @@ -173,14 +172,11 @@ class TeradataLoad: ; '''.format(source_file=self.input_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id) - for state in load_cmd.split(";"): - self.logger.debug(state) - cursor.execute(state) - self.conn_mysql.commit() - cursor.close() + self.executeCommands(load_cmd) + self.logger.info("Finish loading metadata") + def load_field(self): - cursor = self.conn_mysql.cursor() load_cmd = ''' DELETE FROM stg_dict_field_detail where db_id = {db_id}; @@ -206,24 +202,25 @@ class TeradataLoad: and (char_length(trim(description)) = 0 or description in ('null', 'N/A', 'nothing', 'empty', 'none')); + -- update stg_dict_field_detail dataset_id + update stg_dict_field_detail sf, dict_dataset d + set sf.dataset_id = d.id where sf.urn = d.urn + and sf.db_id = {db_id}; + delete from stg_dict_field_detail + where db_id = {db_id} and dataset_id is null; -- remove if not match to dataset + -- delete old record if it does not exist in this load batch anymore (but have the dataset id) - create temporary table if not exists t_deleted_fields (primary key (field_id)) - select x.field_id - from stg_dict_field_detail s - join dict_dataset i - on s.urn = i.urn - and s.db_id = {db_id} - right join dict_field_detail x - on i.id = x.dataset_id - and s.field_name = x.field_name - and s.parent_path = x.parent_path - where s.field_name is null - and x.dataset_id in ( - select d.id dataset_id - from stg_dict_field_detail k join dict_dataset d - on k.urn = d.urn - and k.db_id = {db_id} - ) + create temporary table if not exists t_deleted_fields (primary key (field_id)) ENGINE=MyISAM + SELECT x.field_id + FROM (select dataset_id, field_name, parent_path from stg_dict_field_detail where db_id = {db_id}) s + RIGHT JOIN + ( select dataset_id, field_id, field_name, parent_path from dict_field_detail + where dataset_id in (select dataset_id from stg_dict_field_detail where db_id = {db_id}) + ) x + ON s.dataset_id = x.dataset_id + AND s.field_name = x.field_name + AND s.parent_path = x.parent_path + WHERE s.field_name is null ; -- run time : ~2min delete from dict_field_detail where field_id in (select field_id from t_deleted_fields); @@ -232,12 +229,11 @@ class TeradataLoad: update dict_field_detail t join ( select x.field_id, s.* - from stg_dict_field_detail s join dict_dataset d - on s.urn = d.urn + from stg_dict_field_detail s join dict_field_detail x on s.field_name = x.field_name and coalesce(s.parent_path, '*') = coalesce(x.parent_path, '*') - and d.id = x.dataset_id + and s.dataset_id = x.dataset_id where s.db_id = {db_id} and (x.sort_id <> s.sort_id or x.parent_sort_id <> s.parent_sort_id @@ -274,7 +270,7 @@ class TeradataLoad: modified ) select - d.id, + s.dataset_id, 0 as fields_layout_id, s.sort_id, 0 parent_sort_id, @@ -285,10 +281,9 @@ class TeradataLoad: s.data_scale, s.is_nullable, now() - from stg_dict_field_detail s join dict_dataset d - on s.urn = d.urn + from stg_dict_field_detail s left join dict_field_detail f - on d.id = f.dataset_id + on s.dataset_id = f.dataset_id and s.field_name = f.field_name where db_id = {db_id} and f.field_id is null; @@ -299,13 +294,12 @@ class TeradataLoad: -- insert insert into stg_dict_dataset_field_comment - select t.field_id field_id, fc.id comment_id, d.id dataset_id, {db_id} - from stg_dict_field_detail sf join dict_dataset d - on sf.urn = d.urn + select t.field_id field_id, fc.id comment_id, sf.dataset_id dataset_id, {db_id} + from stg_dict_field_detail sf join field_comments fc on sf.description = fc.comment join dict_field_detail t - on d.id = t.dataset_id + on sf.dataset_id = t.dataset_id and sf.field_name = t.field_name and sf.parent_path = t.parent_path where sf.db_id = {db_id}; @@ -328,11 +322,9 @@ class TeradataLoad: and sd.db_id = {db_id}; '''.format(source_file=self.input_field_file, db_id=self.db_id) - for state in load_cmd.split(";"): - self.logger.debug(state) - cursor.execute(state) - self.conn_mysql.commit() - cursor.close() + self.executeCommands(load_cmd) + self.logger.info("Finish loading fields ") + def load_sample(self): load_cmd = ''' @@ -343,11 +335,20 @@ class TeradataLoad: (urn,ref_urn,data) SET db_id = {db_id}; - -- update reference id in stagging table - UPDATE stg_dict_dataset_sample s - LEFT JOIN dict_dataset d ON s.ref_urn = d.urn - SET s.ref_id = d.id - WHERE s.db_id = {db_id}; + -- update dataset id in staging table + UPDATE stg_dict_dataset_sample s + JOIN dict_dataset d ON s.db_id = {db_id} and s.urn = d.urn + SET s.dataset_id = d.id; + + -- update reference id in staging table + UPDATE stg_dict_dataset_sample s + JOIN ( + select dataset_id, id from + (select dataset_id, ref_urn from stg_dict_dataset_sample + where db_id = 3 and ref_urn > '') st + join dict_dataset on urn = ref_urn) d + ON s.dataset_id = d.dataset_id + SET s.ref_id = d.id; -- first insert ref_id as 0 INSERT INTO dict_dataset_sample @@ -357,27 +358,24 @@ class TeradataLoad: `data`, created ) - select d.id as dataset_id, s.urn, s.ref_id, s.data, now() - from stg_dict_dataset_sample s left join dict_dataset d on d.urn = s.urn - where s.db_id = {db_id} - on duplicate key update - `data`=s.data, modified=now(); - - - -- update reference id in final table - UPDATE dict_dataset_sample d - RIGHT JOIN stg_dict_dataset_sample s ON d.urn = s.urn - SET d.ref_id = s.ref_id - WHERE s.db_id = {db_id} AND d.ref_id = 0; - + SELECT * from + (select dataset_id, urn, ref_id s_ref_id, `data` s_data, now() + from stg_dict_dataset_sample WHERE db_id = {db_id}) s + ON DUPLICATE KEY UPDATE + ref_id = COALESCE(s_ref_id, ref_id), + `data`=s_data, + modified=now(); '''.format(source_file=self.input_sampledata_file, db_id=self.db_id) - cursor = self.conn_mysql.cursor() - for state in load_cmd.split(";"): - self.logger.debug(state) - cursor.execute(state) + self.executeCommands(load_cmd) + self.logger.info("Finish loading samples ") + + + def executeCommands(self, commands): + for cmd in commands.split(";"): + self.logger.debug(cmd) + self.conn_cursor.execute(cmd) self.conn_mysql.commit() - cursor.close() if __name__ == "__main__": @@ -405,10 +403,11 @@ if __name__ == "__main__": l.db_id = args[Constant.DB_ID_KEY] l.wh_etl_exec_id = args[Constant.WH_EXEC_ID_KEY] l.conn_mysql = zxJDBC.connect(JDBC_URL, username, password, JDBC_DRIVER) + l.conn_cursor = l.conn_mysql.cursor() if Constant.INNODB_LOCK_WAIT_TIMEOUT in args: lock_wait_time = args[Constant.INNODB_LOCK_WAIT_TIMEOUT] - l.conn_mysql.cursor().execute("SET innodb_lock_wait_timeout = %s;" % lock_wait_time) + l.conn_cursor.execute("SET innodb_lock_wait_timeout = %s;" % lock_wait_time) try: l.load_metadata() diff --git a/metadata-etl/src/test/java/metadata/etl/dataset/teradata/TeradataMetadataEtlTest.java b/metadata-etl/src/test/java/metadata/etl/dataset/teradata/TeradataMetadataEtlTest.java index 9a514083e4..6719ee6e8a 100644 --- a/metadata-etl/src/test/java/metadata/etl/dataset/teradata/TeradataMetadataEtlTest.java +++ b/metadata-etl/src/test/java/metadata/etl/dataset/teradata/TeradataMetadataEtlTest.java @@ -34,4 +34,18 @@ public class TeradataMetadataEtlTest { TeradataMetadataEtl t = new TeradataMetadataEtl(3, 0L); t.extract(); } + + @Test(groups = {"needConfig"}) + public void testTransform() + throws Exception { + TeradataMetadataEtl t = new TeradataMetadataEtl(3, 0L); + t.transform(); + } + + @Test(groups = {"needConfig"}) + public void testLoad() + throws Exception { + TeradataMetadataEtl t = new TeradataMetadataEtl(3, 0L); + t.load(); + } } diff --git a/web/conf/logback.xml b/web/conf/logback.xml index 72854595d0..81da7aa6ac 100644 --- a/web/conf/logback.xml +++ b/web/conf/logback.xml @@ -19,7 +19,7 @@ - %coloredLevel - %logger - %message%n%xException + %d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n