diff --git a/metadata-etl/src/main/resources/jython/HdfsLoad.py b/metadata-etl/src/main/resources/jython/HdfsLoad.py index 1b84689bbb..f5947fd6ef 100644 --- a/metadata-etl/src/main/resources/jython/HdfsLoad.py +++ b/metadata-etl/src/main/resources/jython/HdfsLoad.py @@ -221,22 +221,6 @@ class HdfsLoad: and (char_length(trim(description)) = 0 or description in ('null', 'N/A', 'nothing', 'empty', 'none')); - insert into field_comments ( - user_id, comment, created, modified, comment_crc32_checksum - ) - select 0 user_id, description, now() created, now() modified, crc32(description) from - ( - select sf.description - from stg_dict_field_detail sf left join field_comments fc - on sf.description = fc.comment - where sf.description is not null - and fc.id is null - and sf.db_id = {db_id} - group by 1 order by 1 - ) d; - - analyze table field_comments; - -- update stg_dict_field_detail dataset_id update stg_dict_field_detail sf, dict_dataset d set sf.dataset_id = d.id where sf.urn = d.urn @@ -254,7 +238,7 @@ class HdfsLoad: ) x ON s.dataset_id = x.dataset_id AND s.field_name = x.field_name - AND s.parent_path = x.parent_path + AND s.parent_path <=> x.parent_path WHERE s.field_name is null ; -- run time : ~2min @@ -266,9 +250,9 @@ class HdfsLoad: select x.field_id, s.* from (select * from stg_dict_field_detail where db_id = {db_id}) s join dict_field_detail x - on s.field_name = x.field_name - and coalesce(s.parent_path, '*') = coalesce(x.parent_path, '*') - and s.dataset_id = x.dataset_id + on s.dataset_id = x.dataset_id + and s.field_name = x.field_name + and s.parent_path <=> x.parent_path where (x.sort_id <> s.sort_id or x.parent_sort_id <> s.parent_sort_id or x.data_type <> s.data_type @@ -307,25 +291,42 @@ class HdfsLoad: left join dict_field_detail t on sf.dataset_id = t.dataset_id and sf.field_name = t.field_name - and sf.parent_path = t.parent_path + and sf.parent_path <=> t.parent_path where db_id = {db_id} and t.field_id is null ; analyze table dict_field_detail; - -- delete old record in stagging + -- delete old record in staging field comment map delete from stg_dict_dataset_field_comment where db_id = {db_id}; - -- insert - insert into stg_dict_dataset_field_comment - select t.field_id field_id, fc.id comment_id, sf.dataset_id dataset_id, {db_id} + -- insert new field comments + insert into field_comments ( + user_id, comment, created, modified, comment_crc32_checksum + ) + select 0 user_id, description, now() created, now() modified, crc32(description) from + ( + select sf.description + from stg_dict_field_detail sf left join field_comments fc + on sf.description = fc.comment + where sf.description is not null + and fc.id is null + and sf.db_id = {db_id} + group by 1 order by 1 + ) d; + + analyze table field_comments; + + -- insert field to comment map to staging + insert ignore into stg_dict_dataset_field_comment + select t.field_id field_id, fc.id comment_id, sf.dataset_id, {db_id} from stg_dict_field_detail sf join field_comments fc on sf.description = fc.comment join dict_field_detail t on sf.dataset_id = t.dataset_id and sf.field_name = t.field_name - and sf.parent_path = t.parent_path + and sf.parent_path <=> t.parent_path where sf.db_id = {db_id}; -- have default comment, insert it set default to 0 @@ -335,7 +336,6 @@ class HdfsLoad: where field_id in (select field_id from stg_dict_dataset_field_comment) and is_default = 1 ) and db_id = {db_id}; - -- doesn't have this comment before, insert into it and set as default insert ignore into dict_dataset_field_comment select sd.field_id, sd.comment_id, sd.dataset_id, 1 from stg_dict_dataset_field_comment sd @@ -344,10 +344,6 @@ class HdfsLoad: and d.comment_id = sd.comment_id where d.comment_id is null and sd.db_id = {db_id}; - - - DELETE FROM stg_dict_field_detail where db_id = {db_id}; - '''.format(source_file=self.input_field_file, db_id=self.db_id) self.executeCommands(load_field_cmd) diff --git a/metadata-etl/src/main/resources/jython/HiveLoad.py b/metadata-etl/src/main/resources/jython/HiveLoad.py index 6ee4d480a5..cc00cad5fd 100644 --- a/metadata-etl/src/main/resources/jython/HiveLoad.py +++ b/metadata-etl/src/main/resources/jython/HiveLoad.py @@ -139,7 +139,7 @@ class HiveLoad: ) x ON s.dataset_id = x.dataset_id AND s.field_name = x.field_name - AND s.parent_path = x.parent_path + AND s.parent_path <=> x.parent_path WHERE s.field_name is null ; -- run time : ~2min @@ -152,7 +152,7 @@ class HiveLoad: from stg_dict_field_detail s join dict_field_detail x on s.field_name = x.field_name - and s.parent_path = x.parent_path + and s.parent_path <=> x.parent_path and s.dataset_id = x.dataset_id where s.db_id = {db_id} and (x.sort_id <> s.sort_id @@ -189,7 +189,7 @@ class HiveLoad: JOIN dict_field_detail t ON sf.dataset_id = t.dataset_id AND sf.field_name = t.field_name - AND sf.parent_path = t.parent_path + AND sf.parent_path <=> t.parent_path WHERE sf.db_id = {db_id} and sf.dataset_id IS NOT NULL group by 1,2,3 @@ -208,19 +208,38 @@ class HiveLoad: and (sf.urn, sf.sort_id, sf.db_id) not in (select urn, sort_id, db_id from t_existed_field) ; - -- delete old record in stagging + analyze table dict_field_detail; + + -- delete old record in staging field comment map delete from stg_dict_dataset_field_comment where db_id = {db_id}; - -- insert + -- insert new field comments + insert into field_comments ( + user_id, comment, created, modified, comment_crc32_checksum + ) + select 0 user_id, description, now() created, now() modified, crc32(description) from + ( + select sf.description + from stg_dict_field_detail sf left join field_comments fc + on sf.description = fc.comment + where sf.description is not null + and fc.id is null + and sf.db_id = {db_id} + group by 1 order by 1 + ) d; + + analyze table field_comments; + + -- insert field to comment map to staging insert ignore into stg_dict_dataset_field_comment - select t.field_id field_id, fc.id comment_id, sf.dataset_id, {db_id} + select t.field_id field_id, fc.id comment_id, sf.dataset_id, {db_id} from stg_dict_field_detail sf join field_comments fc on sf.description = fc.comment join dict_field_detail t on sf.dataset_id = t.dataset_id - and sf.field_name = t.field_name - and sf.parent_path = t.parent_path + and sf.field_name = t.field_name + and sf.parent_path <=> t.parent_path where sf.db_id = {db_id}; -- have default comment, insert it set default to 0 @@ -238,21 +257,6 @@ class HiveLoad: and d.comment_id = sd.comment_id where d.comment_id is null and sd.db_id = {db_id}; - - insert into field_comments ( - user_id, comment, created, modified, comment_crc32_checksum - ) - select 0 user_id, description, now() created, now() modified, crc32(description) from - ( - select sf.description - from stg_dict_field_detail sf left join field_comments fc - on sf.description = fc.comment - where sf.description is not null - and fc.id is null - and sf.db_id = {db_id} - group by 1 order by 1 - ) d - """.format(source_file=self.input_field_file, db_id=self.db_id) self.executeCommands(load_cmd) diff --git a/metadata-etl/src/main/resources/jython/OracleExtract.py b/metadata-etl/src/main/resources/jython/OracleExtract.py index 6ac702aed0..72c55d16a2 100644 --- a/metadata-etl/src/main/resources/jython/OracleExtract.py +++ b/metadata-etl/src/main/resources/jython/OracleExtract.py @@ -307,7 +307,7 @@ class OracleExtract: return None def trim_newline(self, line): - return None if line is None else line.replace('\n', ' ').replace('\r', ' ').encode('ascii', 'ignore') + return line.replace('\n', ' ').replace('\r', ' ').strip().encode('ascii', 'ignore') if line else None def write_csv(self, csv_filename, csv_columns, data_list): csvfile = open(csv_filename, 'wb') diff --git a/metadata-etl/src/main/resources/jython/OracleLoad.py b/metadata-etl/src/main/resources/jython/OracleLoad.py index 8895a12aca..9ad726cc7e 100644 --- a/metadata-etl/src/main/resources/jython/OracleLoad.py +++ b/metadata-etl/src/main/resources/jython/OracleLoad.py @@ -42,12 +42,6 @@ class OracleLoad: self.logger.info("Load Oracle Metadata into {}, db_id {}, wh_exec_id {}" .format(JDBC_URL, self.db_id, self.wh_etl_exec_id)) - self.dict_dataset_table = 'dict_dataset' - self.field_comments_table = 'field_comments' - self.dict_field_table = 'dict_field_detail' - self.dict_field_comment_table = 'dict_dataset_field_comment' - self.dict_dataset_sample_table = 'dict_dataset_sample' - def load_tables(self): load_tables_cmd = ''' @@ -64,7 +58,7 @@ class OracleLoad: wh_etl_exec_id = {wh_etl_exec_id}; -- insert into final table - INSERT INTO {dict_dataset} + INSERT INTO dict_dataset ( `name`, `schema`, schema_type, @@ -106,13 +100,11 @@ class OracleLoad: modified_time=UNIX_TIMESTAMP(now()), wh_etl_exec_id=s.wh_etl_exec_id ; - analyze table {dict_dataset}; - '''.format(source_file=self.input_table_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id, - dict_dataset=self.dict_dataset_table) + analyze table dict_dataset; + '''.format(source_file=self.input_table_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id) self.executeCommands(load_tables_cmd) - self.logger.info("finish loading oracle table metadata from {} to {}" - .format(self.input_table_file, self.dict_dataset_table)) + self.logger.info("finish loading oracle table metadata from {}".format(self.input_table_file)) def load_fields(self): @@ -134,31 +126,12 @@ class OracleLoad: -- show warnings limit 20; analyze table stg_dict_field_detail; - update stg_dict_field_detail - set default_value = trim(default_value) where db_id = {db_id}; - update stg_dict_field_detail set description = null where db_id = {db_id} and (char_length(trim(description)) = 0 or description in ('null', 'N/A', 'nothing', 'empty', 'none')); - insert into {field_comments} ( - user_id, comment, created, modified, comment_crc32_checksum - ) - select 0 user_id, description, now() created, now() modified, crc32(description) from - ( - select sf.description - from stg_dict_field_detail sf left join {field_comments} fc - on sf.description = fc.comment - where sf.description is not null - and fc.id is null - and sf.db_id = {db_id} - group by 1 order by 1 - ) d; - - analyze table {field_comments}; - -- update stg_dict_field_detail dataset_id update stg_dict_field_detail sf, dict_dataset d set sf.dataset_id = d.id where sf.urn = d.urn @@ -176,21 +149,21 @@ class OracleLoad: ) x ON s.dataset_id = x.dataset_id AND s.field_name = x.field_name - AND s.parent_path = x.parent_path + AND s.parent_path <=> x.parent_path WHERE s.field_name is null ; -- run time : ~2min - delete from {dict_field_detail} where field_id in (select field_id from t_deleted_fields); + delete from dict_field_detail where field_id in (select field_id from t_deleted_fields); -- update the old record if some thing changed - update {dict_field_detail} t join + update dict_field_detail t join ( select x.field_id, s.* from stg_dict_field_detail s - join {dict_field_detail} x - on s.field_name = x.field_name - and coalesce(s.parent_path, '*') = coalesce(x.parent_path, '*') - and s.dataset_id = x.dataset_id + join dict_field_detail x + on s.dataset_id = x.dataset_id + and s.field_name = x.field_name + and s.parent_path <=> x.parent_path where s.db_id = {db_id} and (x.sort_id <> s.sort_id or x.parent_sort_id <> s.parent_sort_id @@ -218,7 +191,7 @@ class OracleLoad: t.modified = now() ; - insert into {dict_field_detail} ( + insert into dict_field_detail ( dataset_id, fields_layout_id, sort_id, parent_sort_id, parent_path, field_name, namespace, data_type, data_size, is_nullable, default_value, modified ) @@ -226,54 +199,66 @@ class OracleLoad: sf.dataset_id, 0, sf.sort_id, sf.parent_sort_id, sf.parent_path, sf.field_name, sf.namespace, sf.data_type, sf.data_size, sf.is_nullable, sf.default_value, now() from stg_dict_field_detail sf - left join {dict_field_detail} t + left join dict_field_detail t on sf.dataset_id = t.dataset_id and sf.field_name = t.field_name - and sf.parent_path = t.parent_path + and sf.parent_path <=> t.parent_path where db_id = {db_id} and t.field_id is null ; - analyze table {dict_field_detail}; + analyze table dict_field_detail; - -- delete old record in stagging + -- delete old record in staging field comment map delete from stg_dict_dataset_field_comment where db_id = {db_id}; - -- insert - insert into stg_dict_dataset_field_comment - select t.field_id field_id, fc.id comment_id, sf.dataset_id dataset_id, {db_id} + -- insert new field comments + insert into field_comments ( + user_id, comment, created, modified, comment_crc32_checksum + ) + select 0 user_id, description, now() created, now() modified, crc32(description) from + ( + select sf.description + from stg_dict_field_detail sf left join field_comments fc + on sf.description = fc.comment + where sf.description is not null + and fc.id is null + and sf.db_id = {db_id} + group by 1 order by 1 + ) d; + + analyze table field_comments; + + -- insert field to comment map to staging + insert ignore into stg_dict_dataset_field_comment + select t.field_id field_id, fc.id comment_id, sf.dataset_id, {db_id} from stg_dict_field_detail sf - join {field_comments} fc + join field_comments fc on sf.description = fc.comment - join {dict_field_detail} t + join dict_field_detail t on sf.dataset_id = t.dataset_id and sf.field_name = t.field_name - and sf.parent_path = t.parent_path + and sf.parent_path <=> t.parent_path where sf.db_id = {db_id}; -- have default comment, insert it set default to 0 - insert ignore into {dict_dataset_field_comment} + insert ignore into dict_dataset_field_comment select field_id, comment_id, dataset_id, 0 is_default from stg_dict_dataset_field_comment where field_id in ( - select field_id from {dict_dataset_field_comment} + select field_id from dict_dataset_field_comment where field_id in (select field_id from stg_dict_dataset_field_comment) and is_default = 1 ) and db_id = {db_id}; - -- doesn't have this comment before, insert into it and set as default - insert ignore into {dict_dataset_field_comment} - select sd.field_id, sd.comment_id, sd.dataset_id, 1 - from stg_dict_dataset_field_comment sd - left join {dict_dataset_field_comment} d + insert ignore into dict_dataset_field_comment + select sd.field_id, sd.comment_id, sd.dataset_id, 1 from stg_dict_dataset_field_comment sd + left join dict_dataset_field_comment d on d.field_id = sd.field_id and d.comment_id = sd.comment_id where d.comment_id is null and sd.db_id = {db_id}; - '''.format(source_file=self.input_field_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id, - dict_dataset=self.dict_dataset_table, dict_field_detail=self.dict_field_table, - field_comments=self.field_comments_table, dict_dataset_field_comment=self.dict_field_comment_table) + '''.format(source_file=self.input_field_file, db_id=self.db_id) self.executeCommands(load_fields_cmd) - self.logger.info("finish loading oracle table fields from {} to {}" - .format(self.input_field_file, self.dict_field_table)) + self.logger.info("finish loading oracle table fields from {}".format(self.input_field_file)) def load_sample(self): @@ -289,12 +274,12 @@ class OracleLoad: -- update reference id in staging table UPDATE stg_dict_dataset_sample s - LEFT JOIN {dict_dataset} d ON s.ref_urn = d.urn + LEFT JOIN dict_dataset d ON s.ref_urn = d.urn SET s.ref_id = d.id WHERE s.db_id = {db_id}; -- first insert ref_id as 0 - INSERT INTO {dict_dataset_sample} + INSERT INTO dict_dataset_sample ( `dataset_id`, `urn`, `ref_id`, @@ -302,22 +287,20 @@ class OracleLoad: created ) select d.id as dataset_id, s.urn, s.ref_id, s.data, now() - from stg_dict_dataset_sample s left join {dict_dataset} d on d.urn = s.urn + from stg_dict_dataset_sample s left join dict_dataset d on d.urn = s.urn where s.db_id = {db_id} on duplicate key update `data`=s.data, modified=now(); -- update reference id in final table - UPDATE {dict_dataset_sample} d + UPDATE dict_dataset_sample d RIGHT JOIN stg_dict_dataset_sample s ON d.urn = s.urn SET d.ref_id = s.ref_id WHERE s.db_id = {db_id} AND d.ref_id = 0; - '''.format(source_file=self.input_sample_file, db_id=self.db_id, - dict_dataset=self.dict_dataset_table, dict_dataset_sample=self.dict_dataset_sample_table) + '''.format(source_file=self.input_sample_file, db_id=self.db_id) self.executeCommands(load_sample_cmd) - self.logger.info("finish loading oracle sample data from {} to {}" - .format(self.input_sample_file, self.dict_dataset_sample_table)) + self.logger.info("finish loading oracle sample data from {}".format(self.input_sample_file)) def executeCommands(self, commands): diff --git a/metadata-etl/src/main/resources/jython/TeradataLoad.py b/metadata-etl/src/main/resources/jython/TeradataLoad.py index 91f740d098..99e0e106e8 100644 --- a/metadata-etl/src/main/resources/jython/TeradataLoad.py +++ b/metadata-etl/src/main/resources/jython/TeradataLoad.py @@ -189,10 +189,9 @@ class TeradataLoad: @dummy ) set - data_precision=nullif(@precision,'') - , data_scale=nullif(@scale,'') - , db_id = {db_id} - ; + data_precision=nullif(@precision,''), + data_scale=nullif(@scale,''), + db_id = {db_id}; analyze table stg_dict_field_detail; @@ -219,7 +218,7 @@ class TeradataLoad: ) x ON s.dataset_id = x.dataset_id AND s.field_name = x.field_name - AND s.parent_path = x.parent_path + AND s.parent_path <=> x.parent_path WHERE s.field_name is null ; -- run time : ~2min @@ -231,9 +230,9 @@ class TeradataLoad: select x.field_id, s.* from stg_dict_field_detail s join dict_field_detail x - on s.field_name = x.field_name - and coalesce(s.parent_path, '*') = coalesce(x.parent_path, '*') - and s.dataset_id = x.dataset_id + on s.dataset_id = x.dataset_id + and s.field_name = x.field_name + and s.parent_path <=> x.parent_path where s.db_id = {db_id} and (x.sort_id <> s.sort_id or x.parent_sort_id <> s.parent_sort_id @@ -289,19 +288,36 @@ class TeradataLoad: analyze table dict_field_detail; - -- delete old record in stagging + -- delete old record in staging field comment map delete from stg_dict_dataset_field_comment where db_id = {db_id}; - -- insert - insert into stg_dict_dataset_field_comment - select t.field_id field_id, fc.id comment_id, sf.dataset_id dataset_id, {db_id} + -- insert new field comments + insert into field_comments ( + user_id, comment, created, modified, comment_crc32_checksum + ) + select 0 user_id, description, now() created, now() modified, crc32(description) from + ( + select sf.description + from stg_dict_field_detail sf left join field_comments fc + on sf.description = fc.comment + where sf.description is not null + and fc.id is null + and sf.db_id = {db_id} + group by 1 order by 1 + ) d; + + analyze table field_comments; + + -- insert field to comment map to staging + insert ignore into stg_dict_dataset_field_comment + select t.field_id field_id, fc.id comment_id, sf.dataset_id, {db_id} from stg_dict_field_detail sf join field_comments fc on sf.description = fc.comment join dict_field_detail t on sf.dataset_id = t.dataset_id - and sf.field_name = t.field_name - and sf.parent_path = t.parent_path + and sf.field_name = t.field_name + and sf.parent_path <=> t.parent_path where sf.db_id = {db_id}; -- have default comment, insert it set default to 0 @@ -311,7 +327,6 @@ class TeradataLoad: where field_id in (select field_id from stg_dict_dataset_field_comment) and is_default = 1 ) and db_id = {db_id}; - -- doesn't have this comment before, insert into it and set as default insert ignore into dict_dataset_field_comment select sd.field_id, sd.comment_id, sd.dataset_id, 1 from stg_dict_dataset_field_comment sd