[Issue 1017] fix bugs to handle dataset renaming and deletion properly - in Hive/Oracle/Teradata/Hdfs (#1040)

This commit is contained in:
richardxin 2018-03-15 13:31:30 -07:00 committed by Mars Lan
parent bd86a3501b
commit 978cbab4fa
5 changed files with 36 additions and 9 deletions

View File

@ -55,6 +55,7 @@ CREATE TABLE `stg_dict_dataset` (
-- dataset table
CREATE TABLE `dict_dataset` (
`id` INT(11) UNSIGNED NOT NULL AUTO_INCREMENT,
`db_id` SMALLINT(6) UNSIGNED NOT NULL DEFAULT 0,
`name` VARCHAR(200) NOT NULL,
`schema` MEDIUMTEXT CHARACTER SET utf8,
`schema_type` VARCHAR(50) DEFAULT 'JSON'
@ -87,7 +88,7 @@ CREATE TABLE `dict_dataset` (
`modified_time` INT UNSIGNED COMMENT 'latest wherehows modified',
`wh_etl_exec_id` BIGINT COMMENT 'wherehows etl execution id that modified this record',
PRIMARY KEY (`id`),
UNIQUE KEY `uq_dataset_urn` (`urn`)
UNIQUE KEY `uq_dataset_db_id_urn` (`db_id`,`urn`)
)
ENGINE = InnoDB
DEFAULT CHARSET = latin1;

View File

@ -131,7 +131,8 @@ class HdfsLoad:
source_created_time,
source_modified_time,
created_time,
wh_etl_exec_id
wh_etl_exec_id,
db_id
)
select s.name, s.schema, s.schema_type, s.fields,
s.properties, s.urn,
@ -140,7 +141,7 @@ class HdfsLoad:
s.dataset_type, s.hive_serdes_class, s.is_partitioned,
s.partition_layout_pattern_id, s.sample_partition_full_path,
s.source_created_time, s.source_modified_time, UNIX_TIMESTAMP(now()),
s.wh_etl_exec_id
s.wh_etl_exec_id, s.db_id
from stg_dict_dataset s
where s.db_id = {db_id}
on duplicate key update
@ -152,6 +153,12 @@ class HdfsLoad:
source_created_time=s.source_created_time, source_modified_time=s.source_modified_time,
modified_time=UNIX_TIMESTAMP(now()), wh_etl_exec_id=s.wh_etl_exec_id
;
-- handle deleted or renamed datasets
DELETE ds from dict_dataset ds
where ds.db_id = {db_id} AND NOT EXISTS (select 1 from stg_dict_dataset where urn = ds.urn)
;
analyze table dict_dataset;
-- update dataset_id of instance table

View File

@ -99,7 +99,8 @@ class HiveLoad:
source_created_time,
source_modified_time,
created_time,
wh_etl_exec_id
wh_etl_exec_id,
db_id
)
select s.name, s.schema, s.schema_type, s.fields,
s.properties, s.urn,
@ -108,7 +109,7 @@ class HiveLoad:
s.dataset_type, s.hive_serdes_class, s.is_partitioned,
s.partition_layout_pattern_id, s.sample_partition_full_path,
s.source_created_time, s.source_modified_time, UNIX_TIMESTAMP(now()),
s.wh_etl_exec_id
s.wh_etl_exec_id, s.db_id
from stg_dict_dataset s
where s.db_id = {db_id}
on duplicate key update
@ -120,6 +121,11 @@ class HiveLoad:
source_created_time=s.source_created_time, source_modified_time=s.source_modified_time,
modified_time=UNIX_TIMESTAMP(now()), wh_etl_exec_id=s.wh_etl_exec_id
;
-- handle deleted or renamed hive tables
DELETE ds from dict_dataset ds
where ds.db_id = {db_id} AND NOT EXISTS (select 1 from stg_dict_dataset where urn = ds.urn)
;
""".format(source_file=self.input_schema_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id)
self.executeCommands(load_cmd)

View File

@ -89,7 +89,8 @@ class OracleLoad:
source_created_time,
source_modified_time,
created_time,
wh_etl_exec_id
wh_etl_exec_id,
db_id
)
select s.name, s.schema, s.schema_type, s.fields, s.properties, s.urn,
s.source, s.location_prefix, s.parent_name,
@ -97,7 +98,7 @@ class OracleLoad:
s.dataset_type, s.hive_serdes_class, s.is_partitioned,
s.partition_layout_pattern_id, s.sample_partition_full_path,
s.source_created_time, s.source_modified_time, UNIX_TIMESTAMP(now()),
s.wh_etl_exec_id
s.wh_etl_exec_id, s.db_id
from stg_dict_dataset s
where s.db_id = {db_id}
on duplicate key update
@ -109,6 +110,11 @@ class OracleLoad:
source_created_time=s.source_created_time, source_modified_time=s.source_modified_time,
modified_time=UNIX_TIMESTAMP(now()), wh_etl_exec_id=s.wh_etl_exec_id
;
-- handle deleted or renamed datasets
DELETE ds from dict_dataset ds
where ds.db_id = {db_id} AND NOT EXISTS (select 1 from stg_dict_dataset where urn = ds.urn)
;
analyze table dict_dataset;
'''.format(source_file=self.input_table_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id)

View File

@ -109,7 +109,8 @@ class TeradataLoad:
source_created_time,
source_modified_time,
created_time,
wh_etl_exec_id
wh_etl_exec_id,
db_id
)
select s.name, s.schema, s.schema_type, s.fields,
s.properties, s.urn,
@ -118,7 +119,7 @@ class TeradataLoad:
s.dataset_type, s.hive_serdes_class, s.is_partitioned,
s.partition_layout_pattern_id, s.sample_partition_full_path,
s.source_created_time, s.source_modified_time, UNIX_TIMESTAMP(now()),
s.wh_etl_exec_id
s.wh_etl_exec_id, s.db_id
from stg_dict_dataset s
where s.db_id = {db_id}
on duplicate key update
@ -130,6 +131,12 @@ class TeradataLoad:
source_created_time=s.source_created_time, source_modified_time=s.source_modified_time,
modified_time=UNIX_TIMESTAMP(now()), wh_etl_exec_id=s.wh_etl_exec_id
;
-- handle deleted or renamed datasets
DELETE ds from dict_dataset ds
where ds.db_id = {db_id} AND NOT EXISTS (select 1 from stg_dict_dataset where urn = ds.urn)
;
analyze table dict_dataset;
-- update dataset_id of instance table