load the teradata and hadoop data into table dict_dataset_instance

This commit is contained in:
jbai 2016-07-29 10:59:33 -07:00
parent 9d2c803f0c
commit ea1ac0da9f
2 changed files with 146 additions and 1 deletions

View File

@ -77,8 +77,39 @@ class HdfsLoad:
end end
where db_id = {db_id} and parent_name is null where db_id = {db_id} and parent_name is null
; ;
-- load into stg_dict_dataset_instance
DELETE FROM stg_dict_dataset_instance WHERE db_id = {db_id};
INSERT INTO stg_dict_dataset_instance
( dataset_urn,
db_id,
deployment_tier,
data_center,
server_cluster,
slice,
status_id,
native_name,
logical_name,
`version`,
instance_created_time,
created_time,
wh_etl_exec_id,
abstract_dataset_urn,
schema_text
)
select s.urn, {db_id}, d.deployment_tier, d.data_center, d.cluster,
'*', 0, s.name, s.name, 0, s.source_created_time, s.created_time,
{wh_etl_exec_id}, s.urn, s.schema
from stg_dict_dataset s JOIN cfg_database d on s.db_id = d.db_id
where s.db_id = {db_id}
on duplicate key update
deployment_tier=d.deployment_tier, data_center=d.data_center,
server_cluster=d.cluster, native_name=s.name, logical_name=s.name,
instance_created_time=s.source_created_time, created_time=s.created_time,
wh_etl_exec_id={wh_etl_exec_id}, abstract_dataset_urn=s.urn, schema_text=s.schema;
-- insert into final table -- insert into final table
INSERT INTO dict_dataset INSERT INTO dict_dataset
( `name`, ( `name`,
`schema`, `schema`,
schema_type, schema_type,
@ -121,6 +152,48 @@ class HdfsLoad:
modified_time=UNIX_TIMESTAMP(now()), wh_etl_exec_id=s.wh_etl_exec_id modified_time=UNIX_TIMESTAMP(now()), wh_etl_exec_id=s.wh_etl_exec_id
; ;
analyze table dict_dataset; analyze table dict_dataset;
-- update dataset_id of instance table
update stg_dict_dataset_instance sdi, dict_dataset d
set sdi.dataset_id = d.id where sdi.abstract_dataset_urn = d.urn
and sdi.db_id = {db_id};
-- insert into final instance table
INSERT INTO dict_dataset_instance
( dataset_id,
db_id,
deployment_tier,
data_center,
server_cluster,
slice,
status_id,
native_name,
logical_name,
version,
version_sort_id,
schema_text,
ddl_text,
instance_created_time,
created_time,
wh_etl_exec_id
)
select s.dataset_id, s.db_id, s.deployment_tier, s.data_center,
s.server_cluster, s.slice, s.status_id, s.native_name, s.logical_name, s.version,
case when s.version regexp '[0-9]+\.[0-9]+\.[0-9]+'
then cast(substring_index(s.version, '.', 1) as unsigned) * 100000000 +
cast(substring_index(substring_index(s.version, '.', 2), '.', -1) as unsigned) * 10000 +
cast(substring_index(s.version, '.', -1) as unsigned)
else 0
end version_sort_id, s.schema_text, s.ddl_text,
s.instance_created_time, s.created_time, s.wh_etl_exec_id
from stg_dict_dataset_instance s
where s.db_id = {db_id}
on duplicate key update
deployment_tier=s.deployment_tier, data_center=s.data_center, server_cluster=s.server_cluster, slice=s.slice,
status_id=s.status_id, native_name=s.native_name, logical_name=s.logical_name, version=s.version,
schema_text=s.schema_text, ddl_text=s.ddl_text,
instance_created_time=s.instance_created_time, created_time=s.created_time, wh_etl_exec_id=s.wh_etl_exec_id
;
'''.format(source_file=self.input_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id) '''.format(source_file=self.input_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id)
for state in load_cmd.split(";"): for state in load_cmd.split(";"):
self.logger.debug(state) self.logger.debug(state)

View File

@ -54,6 +54,36 @@ class TeradataLoad:
set parent_name = substring_index(substring_index(urn, '/', 4), '/', -1) /* teradata parent_name is it's schema name*/ set parent_name = substring_index(substring_index(urn, '/', 4), '/', -1) /* teradata parent_name is it's schema name*/
where db_id = {db_id} and parent_name is null; where db_id = {db_id} and parent_name is null;
-- load into stg_dict_dataset_instance
DELETE FROM stg_dict_dataset_instance WHERE db_id = {db_id};
INSERT INTO stg_dict_dataset_instance
( dataset_urn,
db_id,
deployment_tier,
data_center,
server_cluster,
slice,
status_id,
native_name,
logical_name,
`version`,
instance_created_time,
created_time,
wh_etl_exec_id,
abstract_dataset_urn,
schema_text
)
select s.urn, {db_id}, d.deployment_tier, d.data_center, d.cluster,
'*', 0, s.name, s.name, 0, s.source_created_time, s.created_time,
{wh_etl_exec_id}, s.urn, s.schema
from stg_dict_dataset s JOIN cfg_database d on s.db_id = d.db_id
where s.db_id = {db_id}
on duplicate key update
deployment_tier=d.deployment_tier, data_center=d.data_center,
server_cluster=d.cluster, native_name=s.name, logical_name=s.name,
instance_created_time=s.source_created_time, created_time=s.created_time,
wh_etl_exec_id={wh_etl_exec_id}, abstract_dataset_urn=s.urn, schema_text=s.schema;
-- insert into final table -- insert into final table
INSERT INTO dict_dataset INSERT INTO dict_dataset
( `name`, ( `name`,
@ -98,6 +128,48 @@ class TeradataLoad:
modified_time=UNIX_TIMESTAMP(now()), wh_etl_exec_id=s.wh_etl_exec_id modified_time=UNIX_TIMESTAMP(now()), wh_etl_exec_id=s.wh_etl_exec_id
; ;
analyze table dict_dataset; analyze table dict_dataset;
-- update dataset_id of instance table
update stg_dict_dataset_instance sdi, dict_dataset d
set sdi.dataset_id = d.id where sdi.abstract_dataset_urn = d.urn
and sdi.db_id = {db_id};
-- insert into final instance table
INSERT INTO dict_dataset_instance
( dataset_id,
db_id,
deployment_tier,
data_center,
server_cluster,
slice,
status_id,
native_name,
logical_name,
version,
version_sort_id,
schema_text,
ddl_text,
instance_created_time,
created_time,
wh_etl_exec_id
)
select s.dataset_id, s.db_id, s.deployment_tier, s.data_center,
s.server_cluster, s.slice, s.status_id, s.native_name, s.logical_name, s.version,
case when s.version regexp '[0-9]+\.[0-9]+\.[0-9]+'
then cast(substring_index(s.version, '.', 1) as unsigned) * 100000000 +
cast(substring_index(substring_index(s.version, '.', 2), '.', -1) as unsigned) * 10000 +
cast(substring_index(s.version, '.', -1) as unsigned)
else 0
end version_sort_id, s.schema_text, s.ddl_text,
s.instance_created_time, s.created_time, s.wh_etl_exec_id
from stg_dict_dataset_instance s
where s.db_id = {db_id}
on duplicate key update
deployment_tier=s.deployment_tier, data_center=s.data_center, server_cluster=s.server_cluster, slice=s.slice,
status_id=s.status_id, native_name=s.native_name, logical_name=s.logical_name, version=s.version,
schema_text=s.schema_text, ddl_text=s.ddl_text,
instance_created_time=s.instance_created_time, created_time=s.created_time, wh_etl_exec_id=s.wh_etl_exec_id
;
'''.format(source_file=self.input_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id) '''.format(source_file=self.input_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id)
for state in load_cmd.split(";"): for state in load_cmd.split(";"):