Fix null comparison in SQL, refactor some queries (#408)

This commit is contained in:
Yi (Alan) Wang 2017-04-04 13:02:46 -07:00 committed by GitHub
parent 488929ad93
commit e6978b97a7
5 changed files with 137 additions and 139 deletions

View File

@ -221,22 +221,6 @@ class HdfsLoad:
and (char_length(trim(description)) = 0
or description in ('null', 'N/A', 'nothing', 'empty', 'none'));
insert into field_comments (
user_id, comment, created, modified, comment_crc32_checksum
)
select 0 user_id, description, now() created, now() modified, crc32(description) from
(
select sf.description
from stg_dict_field_detail sf left join field_comments fc
on sf.description = fc.comment
where sf.description is not null
and fc.id is null
and sf.db_id = {db_id}
group by 1 order by 1
) d;
analyze table field_comments;
-- update stg_dict_field_detail dataset_id
update stg_dict_field_detail sf, dict_dataset d
set sf.dataset_id = d.id where sf.urn = d.urn
@ -254,7 +238,7 @@ class HdfsLoad:
) x
ON s.dataset_id = x.dataset_id
AND s.field_name = x.field_name
AND s.parent_path = x.parent_path
AND s.parent_path <=> x.parent_path
WHERE s.field_name is null
; -- run time : ~2min
@ -266,9 +250,9 @@ class HdfsLoad:
select x.field_id, s.*
from (select * from stg_dict_field_detail where db_id = {db_id}) s
join dict_field_detail x
on s.field_name = x.field_name
and coalesce(s.parent_path, '*') = coalesce(x.parent_path, '*')
and s.dataset_id = x.dataset_id
on s.dataset_id = x.dataset_id
and s.field_name = x.field_name
and s.parent_path <=> x.parent_path
where (x.sort_id <> s.sort_id
or x.parent_sort_id <> s.parent_sort_id
or x.data_type <> s.data_type
@ -307,25 +291,42 @@ class HdfsLoad:
left join dict_field_detail t
on sf.dataset_id = t.dataset_id
and sf.field_name = t.field_name
and sf.parent_path = t.parent_path
and sf.parent_path <=> t.parent_path
where db_id = {db_id} and t.field_id is null
;
analyze table dict_field_detail;
-- delete old record in stagging
-- delete old record in staging field comment map
delete from stg_dict_dataset_field_comment where db_id = {db_id};
-- insert
insert into stg_dict_dataset_field_comment
select t.field_id field_id, fc.id comment_id, sf.dataset_id dataset_id, {db_id}
-- insert new field comments
insert into field_comments (
user_id, comment, created, modified, comment_crc32_checksum
)
select 0 user_id, description, now() created, now() modified, crc32(description) from
(
select sf.description
from stg_dict_field_detail sf left join field_comments fc
on sf.description = fc.comment
where sf.description is not null
and fc.id is null
and sf.db_id = {db_id}
group by 1 order by 1
) d;
analyze table field_comments;
-- insert field to comment map to staging
insert ignore into stg_dict_dataset_field_comment
select t.field_id field_id, fc.id comment_id, sf.dataset_id, {db_id}
from stg_dict_field_detail sf
join field_comments fc
on sf.description = fc.comment
join dict_field_detail t
on sf.dataset_id = t.dataset_id
and sf.field_name = t.field_name
and sf.parent_path = t.parent_path
and sf.parent_path <=> t.parent_path
where sf.db_id = {db_id};
-- have default comment, insert it set default to 0
@ -335,7 +336,6 @@ class HdfsLoad:
where field_id in (select field_id from stg_dict_dataset_field_comment)
and is_default = 1 ) and db_id = {db_id};
-- doesn't have this comment before, insert into it and set as default
insert ignore into dict_dataset_field_comment
select sd.field_id, sd.comment_id, sd.dataset_id, 1 from stg_dict_dataset_field_comment sd
@ -344,10 +344,6 @@ class HdfsLoad:
and d.comment_id = sd.comment_id
where d.comment_id is null
and sd.db_id = {db_id};
DELETE FROM stg_dict_field_detail where db_id = {db_id};
'''.format(source_file=self.input_field_file, db_id=self.db_id)
self.executeCommands(load_field_cmd)

View File

@ -139,7 +139,7 @@ class HiveLoad:
) x
ON s.dataset_id = x.dataset_id
AND s.field_name = x.field_name
AND s.parent_path = x.parent_path
AND s.parent_path <=> x.parent_path
WHERE s.field_name is null
; -- run time : ~2min
@ -152,7 +152,7 @@ class HiveLoad:
from stg_dict_field_detail s
join dict_field_detail x
on s.field_name = x.field_name
and s.parent_path = x.parent_path
and s.parent_path <=> x.parent_path
and s.dataset_id = x.dataset_id
where s.db_id = {db_id}
and (x.sort_id <> s.sort_id
@ -189,7 +189,7 @@ class HiveLoad:
JOIN dict_field_detail t
ON sf.dataset_id = t.dataset_id
AND sf.field_name = t.field_name
AND sf.parent_path = t.parent_path
AND sf.parent_path <=> t.parent_path
WHERE sf.db_id = {db_id}
and sf.dataset_id IS NOT NULL
group by 1,2,3
@ -208,19 +208,38 @@ class HiveLoad:
and (sf.urn, sf.sort_id, sf.db_id) not in (select urn, sort_id, db_id from t_existed_field)
;
-- delete old record in stagging
analyze table dict_field_detail;
-- delete old record in staging field comment map
delete from stg_dict_dataset_field_comment where db_id = {db_id};
-- insert
-- insert new field comments
insert into field_comments (
user_id, comment, created, modified, comment_crc32_checksum
)
select 0 user_id, description, now() created, now() modified, crc32(description) from
(
select sf.description
from stg_dict_field_detail sf left join field_comments fc
on sf.description = fc.comment
where sf.description is not null
and fc.id is null
and sf.db_id = {db_id}
group by 1 order by 1
) d;
analyze table field_comments;
-- insert field to comment map to staging
insert ignore into stg_dict_dataset_field_comment
select t.field_id field_id, fc.id comment_id, sf.dataset_id, {db_id}
select t.field_id field_id, fc.id comment_id, sf.dataset_id, {db_id}
from stg_dict_field_detail sf
join field_comments fc
on sf.description = fc.comment
join dict_field_detail t
on sf.dataset_id = t.dataset_id
and sf.field_name = t.field_name
and sf.parent_path = t.parent_path
and sf.field_name = t.field_name
and sf.parent_path <=> t.parent_path
where sf.db_id = {db_id};
-- have default comment, insert it set default to 0
@ -238,21 +257,6 @@ class HiveLoad:
and d.comment_id = sd.comment_id
where d.comment_id is null
and sd.db_id = {db_id};
insert into field_comments (
user_id, comment, created, modified, comment_crc32_checksum
)
select 0 user_id, description, now() created, now() modified, crc32(description) from
(
select sf.description
from stg_dict_field_detail sf left join field_comments fc
on sf.description = fc.comment
where sf.description is not null
and fc.id is null
and sf.db_id = {db_id}
group by 1 order by 1
) d
""".format(source_file=self.input_field_file, db_id=self.db_id)
self.executeCommands(load_cmd)

View File

@ -307,7 +307,7 @@ class OracleExtract:
return None
def trim_newline(self, line):
return None if line is None else line.replace('\n', ' ').replace('\r', ' ').encode('ascii', 'ignore')
return line.replace('\n', ' ').replace('\r', ' ').strip().encode('ascii', 'ignore') if line else None
def write_csv(self, csv_filename, csv_columns, data_list):
csvfile = open(csv_filename, 'wb')

View File

@ -42,12 +42,6 @@ class OracleLoad:
self.logger.info("Load Oracle Metadata into {}, db_id {}, wh_exec_id {}"
.format(JDBC_URL, self.db_id, self.wh_etl_exec_id))
self.dict_dataset_table = 'dict_dataset'
self.field_comments_table = 'field_comments'
self.dict_field_table = 'dict_field_detail'
self.dict_field_comment_table = 'dict_dataset_field_comment'
self.dict_dataset_sample_table = 'dict_dataset_sample'
def load_tables(self):
load_tables_cmd = '''
@ -64,7 +58,7 @@ class OracleLoad:
wh_etl_exec_id = {wh_etl_exec_id};
-- insert into final table
INSERT INTO {dict_dataset}
INSERT INTO dict_dataset
( `name`,
`schema`,
schema_type,
@ -106,13 +100,11 @@ class OracleLoad:
modified_time=UNIX_TIMESTAMP(now()), wh_etl_exec_id=s.wh_etl_exec_id
;
analyze table {dict_dataset};
'''.format(source_file=self.input_table_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id,
dict_dataset=self.dict_dataset_table)
analyze table dict_dataset;
'''.format(source_file=self.input_table_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id)
self.executeCommands(load_tables_cmd)
self.logger.info("finish loading oracle table metadata from {} to {}"
.format(self.input_table_file, self.dict_dataset_table))
self.logger.info("finish loading oracle table metadata from {}".format(self.input_table_file))
def load_fields(self):
@ -134,31 +126,12 @@ class OracleLoad:
-- show warnings limit 20;
analyze table stg_dict_field_detail;
update stg_dict_field_detail
set default_value = trim(default_value) where db_id = {db_id};
update stg_dict_field_detail
set description = null
where db_id = {db_id}
and (char_length(trim(description)) = 0
or description in ('null', 'N/A', 'nothing', 'empty', 'none'));
insert into {field_comments} (
user_id, comment, created, modified, comment_crc32_checksum
)
select 0 user_id, description, now() created, now() modified, crc32(description) from
(
select sf.description
from stg_dict_field_detail sf left join {field_comments} fc
on sf.description = fc.comment
where sf.description is not null
and fc.id is null
and sf.db_id = {db_id}
group by 1 order by 1
) d;
analyze table {field_comments};
-- update stg_dict_field_detail dataset_id
update stg_dict_field_detail sf, dict_dataset d
set sf.dataset_id = d.id where sf.urn = d.urn
@ -176,21 +149,21 @@ class OracleLoad:
) x
ON s.dataset_id = x.dataset_id
AND s.field_name = x.field_name
AND s.parent_path = x.parent_path
AND s.parent_path <=> x.parent_path
WHERE s.field_name is null
; -- run time : ~2min
delete from {dict_field_detail} where field_id in (select field_id from t_deleted_fields);
delete from dict_field_detail where field_id in (select field_id from t_deleted_fields);
-- update the old record if some thing changed
update {dict_field_detail} t join
update dict_field_detail t join
(
select x.field_id, s.*
from stg_dict_field_detail s
join {dict_field_detail} x
on s.field_name = x.field_name
and coalesce(s.parent_path, '*') = coalesce(x.parent_path, '*')
and s.dataset_id = x.dataset_id
join dict_field_detail x
on s.dataset_id = x.dataset_id
and s.field_name = x.field_name
and s.parent_path <=> x.parent_path
where s.db_id = {db_id}
and (x.sort_id <> s.sort_id
or x.parent_sort_id <> s.parent_sort_id
@ -218,7 +191,7 @@ class OracleLoad:
t.modified = now()
;
insert into {dict_field_detail} (
insert into dict_field_detail (
dataset_id, fields_layout_id, sort_id, parent_sort_id, parent_path,
field_name, namespace, data_type, data_size, is_nullable, default_value, modified
)
@ -226,54 +199,66 @@ class OracleLoad:
sf.dataset_id, 0, sf.sort_id, sf.parent_sort_id, sf.parent_path,
sf.field_name, sf.namespace, sf.data_type, sf.data_size, sf.is_nullable, sf.default_value, now()
from stg_dict_field_detail sf
left join {dict_field_detail} t
left join dict_field_detail t
on sf.dataset_id = t.dataset_id
and sf.field_name = t.field_name
and sf.parent_path = t.parent_path
and sf.parent_path <=> t.parent_path
where db_id = {db_id} and t.field_id is null
;
analyze table {dict_field_detail};
analyze table dict_field_detail;
-- delete old record in stagging
-- delete old record in staging field comment map
delete from stg_dict_dataset_field_comment where db_id = {db_id};
-- insert
insert into stg_dict_dataset_field_comment
select t.field_id field_id, fc.id comment_id, sf.dataset_id dataset_id, {db_id}
-- insert new field comments
insert into field_comments (
user_id, comment, created, modified, comment_crc32_checksum
)
select 0 user_id, description, now() created, now() modified, crc32(description) from
(
select sf.description
from stg_dict_field_detail sf left join field_comments fc
on sf.description = fc.comment
where sf.description is not null
and fc.id is null
and sf.db_id = {db_id}
group by 1 order by 1
) d;
analyze table field_comments;
-- insert field to comment map to staging
insert ignore into stg_dict_dataset_field_comment
select t.field_id field_id, fc.id comment_id, sf.dataset_id, {db_id}
from stg_dict_field_detail sf
join {field_comments} fc
join field_comments fc
on sf.description = fc.comment
join {dict_field_detail} t
join dict_field_detail t
on sf.dataset_id = t.dataset_id
and sf.field_name = t.field_name
and sf.parent_path = t.parent_path
and sf.parent_path <=> t.parent_path
where sf.db_id = {db_id};
-- have default comment, insert it set default to 0
insert ignore into {dict_dataset_field_comment}
insert ignore into dict_dataset_field_comment
select field_id, comment_id, dataset_id, 0 is_default from stg_dict_dataset_field_comment where field_id in (
select field_id from {dict_dataset_field_comment}
select field_id from dict_dataset_field_comment
where field_id in (select field_id from stg_dict_dataset_field_comment)
and is_default = 1 ) and db_id = {db_id};
-- doesn't have this comment before, insert into it and set as default
insert ignore into {dict_dataset_field_comment}
select sd.field_id, sd.comment_id, sd.dataset_id, 1
from stg_dict_dataset_field_comment sd
left join {dict_dataset_field_comment} d
insert ignore into dict_dataset_field_comment
select sd.field_id, sd.comment_id, sd.dataset_id, 1 from stg_dict_dataset_field_comment sd
left join dict_dataset_field_comment d
on d.field_id = sd.field_id
and d.comment_id = sd.comment_id
where d.comment_id is null
and sd.db_id = {db_id};
'''.format(source_file=self.input_field_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id,
dict_dataset=self.dict_dataset_table, dict_field_detail=self.dict_field_table,
field_comments=self.field_comments_table, dict_dataset_field_comment=self.dict_field_comment_table)
'''.format(source_file=self.input_field_file, db_id=self.db_id)
self.executeCommands(load_fields_cmd)
self.logger.info("finish loading oracle table fields from {} to {}"
.format(self.input_field_file, self.dict_field_table))
self.logger.info("finish loading oracle table fields from {}".format(self.input_field_file))
def load_sample(self):
@ -289,12 +274,12 @@ class OracleLoad:
-- update reference id in staging table
UPDATE stg_dict_dataset_sample s
LEFT JOIN {dict_dataset} d ON s.ref_urn = d.urn
LEFT JOIN dict_dataset d ON s.ref_urn = d.urn
SET s.ref_id = d.id
WHERE s.db_id = {db_id};
-- first insert ref_id as 0
INSERT INTO {dict_dataset_sample}
INSERT INTO dict_dataset_sample
( `dataset_id`,
`urn`,
`ref_id`,
@ -302,22 +287,20 @@ class OracleLoad:
created
)
select d.id as dataset_id, s.urn, s.ref_id, s.data, now()
from stg_dict_dataset_sample s left join {dict_dataset} d on d.urn = s.urn
from stg_dict_dataset_sample s left join dict_dataset d on d.urn = s.urn
where s.db_id = {db_id}
on duplicate key update
`data`=s.data, modified=now();
-- update reference id in final table
UPDATE {dict_dataset_sample} d
UPDATE dict_dataset_sample d
RIGHT JOIN stg_dict_dataset_sample s ON d.urn = s.urn
SET d.ref_id = s.ref_id
WHERE s.db_id = {db_id} AND d.ref_id = 0;
'''.format(source_file=self.input_sample_file, db_id=self.db_id,
dict_dataset=self.dict_dataset_table, dict_dataset_sample=self.dict_dataset_sample_table)
'''.format(source_file=self.input_sample_file, db_id=self.db_id)
self.executeCommands(load_sample_cmd)
self.logger.info("finish loading oracle sample data from {} to {}"
.format(self.input_sample_file, self.dict_dataset_sample_table))
self.logger.info("finish loading oracle sample data from {}".format(self.input_sample_file))
def executeCommands(self, commands):

View File

@ -189,10 +189,9 @@ class TeradataLoad:
@dummy
)
set
data_precision=nullif(@precision,'')
, data_scale=nullif(@scale,'')
, db_id = {db_id}
;
data_precision=nullif(@precision,''),
data_scale=nullif(@scale,''),
db_id = {db_id};
analyze table stg_dict_field_detail;
@ -219,7 +218,7 @@ class TeradataLoad:
) x
ON s.dataset_id = x.dataset_id
AND s.field_name = x.field_name
AND s.parent_path = x.parent_path
AND s.parent_path <=> x.parent_path
WHERE s.field_name is null
; -- run time : ~2min
@ -231,9 +230,9 @@ class TeradataLoad:
select x.field_id, s.*
from stg_dict_field_detail s
join dict_field_detail x
on s.field_name = x.field_name
and coalesce(s.parent_path, '*') = coalesce(x.parent_path, '*')
and s.dataset_id = x.dataset_id
on s.dataset_id = x.dataset_id
and s.field_name = x.field_name
and s.parent_path <=> x.parent_path
where s.db_id = {db_id}
and (x.sort_id <> s.sort_id
or x.parent_sort_id <> s.parent_sort_id
@ -289,19 +288,36 @@ class TeradataLoad:
analyze table dict_field_detail;
-- delete old record in stagging
-- delete old record in staging field comment map
delete from stg_dict_dataset_field_comment where db_id = {db_id};
-- insert
insert into stg_dict_dataset_field_comment
select t.field_id field_id, fc.id comment_id, sf.dataset_id dataset_id, {db_id}
-- insert new field comments
insert into field_comments (
user_id, comment, created, modified, comment_crc32_checksum
)
select 0 user_id, description, now() created, now() modified, crc32(description) from
(
select sf.description
from stg_dict_field_detail sf left join field_comments fc
on sf.description = fc.comment
where sf.description is not null
and fc.id is null
and sf.db_id = {db_id}
group by 1 order by 1
) d;
analyze table field_comments;
-- insert field to comment map to staging
insert ignore into stg_dict_dataset_field_comment
select t.field_id field_id, fc.id comment_id, sf.dataset_id, {db_id}
from stg_dict_field_detail sf
join field_comments fc
on sf.description = fc.comment
join dict_field_detail t
on sf.dataset_id = t.dataset_id
and sf.field_name = t.field_name
and sf.parent_path = t.parent_path
and sf.field_name = t.field_name
and sf.parent_path <=> t.parent_path
where sf.db_id = {db_id};
-- have default comment, insert it set default to 0
@ -311,7 +327,6 @@ class TeradataLoad:
where field_id in (select field_id from stg_dict_dataset_field_comment)
and is_default = 1 ) and db_id = {db_id};
-- doesn't have this comment before, insert into it and set as default
insert ignore into dict_dataset_field_comment
select sd.field_id, sd.comment_id, sd.dataset_id, 1 from stg_dict_dataset_field_comment sd