diff --git a/metadata-etl/src/main/resources/jython/HdfsLoad.py b/metadata-etl/src/main/resources/jython/HdfsLoad.py index 14d0ac465b..de290e4477 100644 --- a/metadata-etl/src/main/resources/jython/HdfsLoad.py +++ b/metadata-etl/src/main/resources/jython/HdfsLoad.py @@ -77,8 +77,39 @@ class HdfsLoad: end where db_id = {db_id} and parent_name is null ; + + -- load into stg_dict_dataset_instance + DELETE FROM stg_dict_dataset_instance WHERE db_id = {db_id}; + INSERT INTO stg_dict_dataset_instance + ( dataset_urn, + db_id, + deployment_tier, + data_center, + server_cluster, + slice, + status_id, + native_name, + logical_name, + `version`, + instance_created_time, + created_time, + wh_etl_exec_id, + abstract_dataset_urn, + schema_text + ) + select s.urn, {db_id}, d.deployment_tier, d.data_center, d.cluster, + '*', 0, s.name, s.name, 0, s.source_created_time, s.created_time, + {wh_etl_exec_id}, s.urn, s.schema + from stg_dict_dataset s JOIN cfg_database d on s.db_id = d.db_id + where s.db_id = {db_id} + on duplicate key update + deployment_tier=d.deployment_tier, data_center=d.data_center, + server_cluster=d.cluster, native_name=s.name, logical_name=s.name, + instance_created_time=s.source_created_time, created_time=s.created_time, + wh_etl_exec_id={wh_etl_exec_id}, abstract_dataset_urn=s.urn, schema_text=s.schema; + -- insert into final table - INSERT INTO dict_dataset + INSERT INTO dict_dataset ( `name`, `schema`, schema_type, @@ -121,6 +152,48 @@ class HdfsLoad: modified_time=UNIX_TIMESTAMP(now()), wh_etl_exec_id=s.wh_etl_exec_id ; analyze table dict_dataset; + + -- update dataset_id of instance table + update stg_dict_dataset_instance sdi, dict_dataset d + set sdi.dataset_id = d.id where sdi.abstract_dataset_urn = d.urn + and sdi.db_id = {db_id}; + + -- insert into final instance table + INSERT INTO dict_dataset_instance + ( dataset_id, + db_id, + deployment_tier, + data_center, + server_cluster, + slice, + status_id, + native_name, + logical_name, + version, + version_sort_id, + schema_text, + ddl_text, + instance_created_time, + created_time, + wh_etl_exec_id + ) + select s.dataset_id, s.db_id, s.deployment_tier, s.data_center, + s.server_cluster, s.slice, s.status_id, s.native_name, s.logical_name, s.version, + case when s.version regexp '[0-9]+\.[0-9]+\.[0-9]+' + then cast(substring_index(s.version, '.', 1) as unsigned) * 100000000 + + cast(substring_index(substring_index(s.version, '.', 2), '.', -1) as unsigned) * 10000 + + cast(substring_index(s.version, '.', -1) as unsigned) + else 0 + end version_sort_id, s.schema_text, s.ddl_text, + s.instance_created_time, s.created_time, s.wh_etl_exec_id + from stg_dict_dataset_instance s + where s.db_id = {db_id} + on duplicate key update + deployment_tier=s.deployment_tier, data_center=s.data_center, server_cluster=s.server_cluster, slice=s.slice, + status_id=s.status_id, native_name=s.native_name, logical_name=s.logical_name, version=s.version, + schema_text=s.schema_text, ddl_text=s.ddl_text, + instance_created_time=s.instance_created_time, created_time=s.created_time, wh_etl_exec_id=s.wh_etl_exec_id + ; '''.format(source_file=self.input_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id) for state in load_cmd.split(";"): self.logger.debug(state) diff --git a/metadata-etl/src/main/resources/jython/TeradataLoad.py b/metadata-etl/src/main/resources/jython/TeradataLoad.py index 1958dcd4a2..247dfcc7fb 100644 --- a/metadata-etl/src/main/resources/jython/TeradataLoad.py +++ b/metadata-etl/src/main/resources/jython/TeradataLoad.py @@ -54,6 +54,36 @@ class TeradataLoad: set parent_name = substring_index(substring_index(urn, '/', 4), '/', -1) /* teradata parent_name is it's schema name*/ where db_id = {db_id} and parent_name is null; + -- load into stg_dict_dataset_instance + DELETE FROM stg_dict_dataset_instance WHERE db_id = {db_id}; + INSERT INTO stg_dict_dataset_instance + ( dataset_urn, + db_id, + deployment_tier, + data_center, + server_cluster, + slice, + status_id, + native_name, + logical_name, + `version`, + instance_created_time, + created_time, + wh_etl_exec_id, + abstract_dataset_urn, + schema_text + ) + select s.urn, {db_id}, d.deployment_tier, d.data_center, d.cluster, + '*', 0, s.name, s.name, 0, s.source_created_time, s.created_time, + {wh_etl_exec_id}, s.urn, s.schema + from stg_dict_dataset s JOIN cfg_database d on s.db_id = d.db_id + where s.db_id = {db_id} + on duplicate key update + deployment_tier=d.deployment_tier, data_center=d.data_center, + server_cluster=d.cluster, native_name=s.name, logical_name=s.name, + instance_created_time=s.source_created_time, created_time=s.created_time, + wh_etl_exec_id={wh_etl_exec_id}, abstract_dataset_urn=s.urn, schema_text=s.schema; + -- insert into final table INSERT INTO dict_dataset ( `name`, @@ -98,6 +128,48 @@ class TeradataLoad: modified_time=UNIX_TIMESTAMP(now()), wh_etl_exec_id=s.wh_etl_exec_id ; analyze table dict_dataset; + + -- update dataset_id of instance table + update stg_dict_dataset_instance sdi, dict_dataset d + set sdi.dataset_id = d.id where sdi.abstract_dataset_urn = d.urn + and sdi.db_id = {db_id}; + + -- insert into final instance table + INSERT INTO dict_dataset_instance + ( dataset_id, + db_id, + deployment_tier, + data_center, + server_cluster, + slice, + status_id, + native_name, + logical_name, + version, + version_sort_id, + schema_text, + ddl_text, + instance_created_time, + created_time, + wh_etl_exec_id + ) + select s.dataset_id, s.db_id, s.deployment_tier, s.data_center, + s.server_cluster, s.slice, s.status_id, s.native_name, s.logical_name, s.version, + case when s.version regexp '[0-9]+\.[0-9]+\.[0-9]+' + then cast(substring_index(s.version, '.', 1) as unsigned) * 100000000 + + cast(substring_index(substring_index(s.version, '.', 2), '.', -1) as unsigned) * 10000 + + cast(substring_index(s.version, '.', -1) as unsigned) + else 0 + end version_sort_id, s.schema_text, s.ddl_text, + s.instance_created_time, s.created_time, s.wh_etl_exec_id + from stg_dict_dataset_instance s + where s.db_id = {db_id} + on duplicate key update + deployment_tier=s.deployment_tier, data_center=s.data_center, server_cluster=s.server_cluster, slice=s.slice, + status_id=s.status_id, native_name=s.native_name, logical_name=s.logical_name, version=s.version, + schema_text=s.schema_text, ddl_text=s.ddl_text, + instance_created_time=s.instance_created_time, created_time=s.created_time, wh_etl_exec_id=s.wh_etl_exec_id + ; '''.format(source_file=self.input_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id) for state in load_cmd.split(";"):