| 
									
										
										
										
											2015-12-16 16:58:32 -08:00
										 |  |  | # | 
					
						
							|  |  |  | # Copyright 2015 LinkedIn Corp. All rights reserved. | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | # Licensed under the Apache License, Version 2.0 (the "License"); | 
					
						
							|  |  |  | # you may not use this file except in compliance with the License. | 
					
						
							|  |  |  | # You may obtain a copy of the License at | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | # http://www.apache.org/licenses/LICENSE-2.0 | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | # Unless required by applicable law or agreed to in writing, software | 
					
						
							|  |  |  | # distributed under the License is distributed on an "AS IS" BASIS, | 
					
						
							|  |  |  | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-09-05 17:01:21 -07:00
										 |  |  | import os | 
					
						
							| 
									
										
										
										
											2015-12-16 16:58:32 -08:00
										 |  |  | import sys | 
					
						
							|  |  |  | from com.ziclix.python.sql import zxJDBC | 
					
						
							| 
									
										
										
										
											2017-09-05 17:01:21 -07:00
										 |  |  | from org.slf4j import LoggerFactory | 
					
						
							| 
									
										
										
										
											2015-12-16 16:58:32 -08:00
										 |  |  | from wherehows.common import Constant | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-09-05 17:01:21 -07:00
										 |  |  | import FileUtil | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-12-16 16:58:32 -08:00
										 |  |  | 
 | 
					
						
							|  |  |  | class HiveLoad: | 
					
						
							| 
									
										
										
										
											2016-09-13 14:01:14 -07:00
										 |  |  |   def __init__(self, wh_etl_exec_id='0'): | 
					
						
							|  |  |  |     self.logger = LoggerFactory.getLogger("%s[%s]" % (self.__class__.__name__, wh_etl_exec_id)) | 
					
						
							| 
									
										
										
										
											2016-02-03 19:22:18 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-09-05 17:01:21 -07:00
										 |  |  |     # set up connection | 
					
						
							|  |  |  |     username = args[Constant.WH_DB_USERNAME_KEY] | 
					
						
							|  |  |  |     password = args[Constant.WH_DB_PASSWORD_KEY] | 
					
						
							|  |  |  |     JDBC_DRIVER = args[Constant.WH_DB_DRIVER_KEY] | 
					
						
							|  |  |  |     JDBC_URL = args[Constant.WH_DB_URL_KEY] | 
					
						
							|  |  |  |     self.conn_mysql = zxJDBC.connect(JDBC_URL, username, password, JDBC_DRIVER) | 
					
						
							|  |  |  |     self.conn_cursor = self.conn_mysql.cursor() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     if Constant.INNODB_LOCK_WAIT_TIMEOUT in args: | 
					
						
							|  |  |  |       lock_wait_time = args[Constant.INNODB_LOCK_WAIT_TIMEOUT] | 
					
						
							|  |  |  |       self.conn_cursor.execute("SET innodb_lock_wait_timeout = %s;" % lock_wait_time) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     temp_dir = FileUtil.etl_temp_dir(args, "HIVE") | 
					
						
							|  |  |  |     self.input_schema_file = os.path.join(temp_dir, args[Constant.HIVE_SCHEMA_CSV_FILE_KEY]) | 
					
						
							|  |  |  |     self.input_field_file = os.path.join(temp_dir, args[Constant.HIVE_FIELD_METADATA_KEY]) | 
					
						
							|  |  |  |     self.input_instance_file = os.path.join(temp_dir, args[Constant.HIVE_INSTANCE_CSV_FILE_KEY]) | 
					
						
							|  |  |  |     self.input_dependency_file = os.path.join(temp_dir, args[Constant.HIVE_DEPENDENCY_CSV_FILE_KEY]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     self.db_id = args[Constant.JOB_REF_ID_KEY] | 
					
						
							|  |  |  |     self.wh_etl_exec_id = args[Constant.WH_EXEC_ID_KEY] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-12-16 16:58:32 -08:00
										 |  |  |   def load_metadata(self): | 
					
						
							|  |  |  |     load_cmd = """
 | 
					
						
							|  |  |  |         DELETE FROM stg_dict_dataset WHERE db_id = {db_id}; | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         LOAD DATA LOCAL INFILE '{source_file}' | 
					
						
							|  |  |  |         INTO TABLE stg_dict_dataset | 
					
						
							|  |  |  |         FIELDS TERMINATED BY '\Z' ESCAPED BY '\0' | 
					
						
							| 
									
										
										
										
											2016-06-02 18:29:44 -07:00
										 |  |  |         (`name`, `schema`, properties, fields, urn, source, dataset_type, storage_type, @sample_partition_full_path, source_created_time, @source_modified_time) | 
					
						
							| 
									
										
										
										
											2015-12-16 16:58:32 -08:00
										 |  |  |         SET db_id = {db_id}, | 
					
						
							| 
									
										
										
										
											2017-07-19 17:07:28 -07:00
										 |  |  |             source_modified_time=nullif(@source_modified_time,''), | 
					
						
							|  |  |  |             sample_partition_full_path=nullif(@sample_partition_full_path,''), | 
					
						
							|  |  |  |             is_active = TRUE, | 
					
						
							|  |  |  |             wh_etl_exec_id = {wh_etl_exec_id}; | 
					
						
							| 
									
										
										
										
											2015-12-16 16:58:32 -08:00
										 |  |  | 
 | 
					
						
							|  |  |  |         -- SELECT COUNT(*) FROM stg_dict_dataset; | 
					
						
							|  |  |  |         -- clear | 
					
						
							|  |  |  |         DELETE FROM stg_dict_dataset where db_id = {db_id} | 
					
						
							|  |  |  |           AND (length(`name`)) = 0 | 
					
						
							|  |  |  |            OR `name` like 'tmp\_%' | 
					
						
							|  |  |  |            OR `name` like 't\_%' | 
					
						
							|  |  |  |         ; | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         update stg_dict_dataset | 
					
						
							|  |  |  |         set location_prefix = substring_index(substring_index(urn, '/', 4), '/', -2) /* hive location_prefix is it's schema name*/ | 
					
						
							|  |  |  |         WHERE db_id = {db_id} and location_prefix is null; | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         update stg_dict_dataset | 
					
						
							|  |  |  |         set parent_name = substring_index(substring_index(urn, '/', 4), '/', -1) /* hive parent_name is it's schema name*/ | 
					
						
							|  |  |  |         where db_id = {db_id} and parent_name is null; | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         -- insert into final table | 
					
						
							|  |  |  |         INSERT INTO dict_dataset | 
					
						
							|  |  |  |         ( `name`, | 
					
						
							|  |  |  |           `schema`, | 
					
						
							|  |  |  |           schema_type, | 
					
						
							|  |  |  |           fields, | 
					
						
							|  |  |  |           properties, | 
					
						
							|  |  |  |           urn, | 
					
						
							|  |  |  |           source, | 
					
						
							|  |  |  |           location_prefix, | 
					
						
							|  |  |  |           parent_name, | 
					
						
							|  |  |  |           storage_type, | 
					
						
							|  |  |  |           ref_dataset_id, | 
					
						
							| 
									
										
										
										
											2017-07-19 17:07:28 -07:00
										 |  |  |           is_active, | 
					
						
							| 
									
										
										
										
											2015-12-16 16:58:32 -08:00
										 |  |  |           dataset_type, | 
					
						
							|  |  |  |           hive_serdes_class, | 
					
						
							|  |  |  |           is_partitioned, | 
					
						
							|  |  |  |           partition_layout_pattern_id, | 
					
						
							|  |  |  |           sample_partition_full_path, | 
					
						
							|  |  |  |           source_created_time, | 
					
						
							|  |  |  |           source_modified_time, | 
					
						
							|  |  |  |           created_time, | 
					
						
							|  |  |  |           wh_etl_exec_id | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         select s.name, s.schema, s.schema_type, s.fields, | 
					
						
							|  |  |  |           s.properties, s.urn, | 
					
						
							|  |  |  |           s.source, s.location_prefix, s.parent_name, | 
					
						
							| 
									
										
										
										
											2017-07-19 17:07:28 -07:00
										 |  |  |           s.storage_type, s.ref_dataset_id, s.is_active, | 
					
						
							| 
									
										
										
										
											2015-12-16 16:58:32 -08:00
										 |  |  |           s.dataset_type, s.hive_serdes_class, s.is_partitioned, | 
					
						
							|  |  |  |           s.partition_layout_pattern_id, s.sample_partition_full_path, | 
					
						
							|  |  |  |           s.source_created_time, s.source_modified_time, UNIX_TIMESTAMP(now()), | 
					
						
							|  |  |  |           s.wh_etl_exec_id | 
					
						
							|  |  |  |         from stg_dict_dataset s | 
					
						
							|  |  |  |         where s.db_id = {db_id} | 
					
						
							|  |  |  |         on duplicate key update | 
					
						
							|  |  |  |           `name`=s.name, `schema`=s.schema, schema_type=s.schema_type, fields=s.fields, | 
					
						
							|  |  |  |           properties=s.properties, source=s.source, location_prefix=s.location_prefix, parent_name=s.parent_name, | 
					
						
							| 
									
										
										
										
											2017-07-19 17:07:28 -07:00
										 |  |  |           storage_type=s.storage_type, ref_dataset_id=s.ref_dataset_id, is_active=s.is_active, | 
					
						
							|  |  |  |           dataset_type=s.dataset_type, hive_serdes_class=s.hive_serdes_class, is_partitioned=s.is_partitioned, | 
					
						
							| 
									
										
										
										
											2015-12-16 16:58:32 -08:00
										 |  |  |           partition_layout_pattern_id=s.partition_layout_pattern_id, sample_partition_full_path=s.sample_partition_full_path, | 
					
						
							|  |  |  |           source_created_time=s.source_created_time, source_modified_time=s.source_modified_time, | 
					
						
							| 
									
										
										
										
											2017-07-19 17:07:28 -07:00
										 |  |  |           modified_time=UNIX_TIMESTAMP(now()), wh_etl_exec_id=s.wh_etl_exec_id | 
					
						
							| 
									
										
										
										
											2015-12-16 16:58:32 -08:00
										 |  |  |         ; | 
					
						
							|  |  |  |         """.format(source_file=self.input_schema_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id)
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-03-22 10:23:30 -07:00
										 |  |  |     self.executeCommands(load_cmd) | 
					
						
							|  |  |  |     self.logger.info("Load dataset metadata.") | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-12-16 16:58:32 -08:00
										 |  |  | 
 | 
					
						
							|  |  |  |   def load_field(self): | 
					
						
							|  |  |  |     """
 | 
					
						
							| 
									
										
										
										
											2016-03-09 19:15:42 -08:00
										 |  |  |     Load fields | 
					
						
							| 
									
										
										
										
											2015-12-16 16:58:32 -08:00
										 |  |  |     :return: | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     load_cmd = """
 | 
					
						
							| 
									
										
										
										
											2016-02-03 19:22:18 -08:00
										 |  |  |         DELETE FROM stg_dict_field_detail WHERE db_id = {db_id}; | 
					
						
							| 
									
										
										
										
											2015-12-16 16:58:32 -08:00
										 |  |  | 
 | 
					
						
							|  |  |  |         LOAD DATA LOCAL INFILE '{source_file}' | 
					
						
							|  |  |  |         INTO TABLE stg_dict_field_detail | 
					
						
							|  |  |  |         FIELDS TERMINATED BY '\Z' | 
					
						
							| 
									
										
										
										
											2016-03-09 19:15:42 -08:00
										 |  |  |         (urn, sort_id, parent_sort_id, parent_path, field_name, data_type, | 
					
						
							|  |  |  |          @is_nullable, @default_value, @data_size, @namespace, @description) | 
					
						
							|  |  |  |         SET db_id = {db_id} | 
					
						
							| 
									
										
										
										
											2015-12-16 16:58:32 -08:00
										 |  |  |         , is_nullable=nullif(@is_nullable,'null') | 
					
						
							|  |  |  |         , default_value=nullif(@default_value,'null') | 
					
						
							| 
									
										
										
										
											2016-03-09 19:15:42 -08:00
										 |  |  |         , data_size=nullif(@data_size,'null') | 
					
						
							| 
									
										
										
										
											2015-12-16 16:58:32 -08:00
										 |  |  |         , namespace=nullif(@namespace,'null') | 
					
						
							| 
									
										
										
										
											2016-05-19 16:36:20 -07:00
										 |  |  |         , description=nullif(@description,'null') | 
					
						
							| 
									
										
										
										
											2016-06-09 18:51:05 -07:00
										 |  |  |         , last_modified=now(); | 
					
						
							| 
									
										
										
										
											2016-03-09 19:15:42 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-03-22 10:23:30 -07:00
										 |  |  |         -- update stg_dict_field_detail dataset_id | 
					
						
							| 
									
										
										
										
											2016-05-20 17:43:37 -07:00
										 |  |  |         update stg_dict_field_detail sf, dict_dataset d | 
					
						
							|  |  |  |         set sf.dataset_id = d.id where sf.urn = d.urn | 
					
						
							|  |  |  |         and sf.db_id = {db_id}; | 
					
						
							| 
									
										
										
										
											2017-03-22 10:23:30 -07:00
										 |  |  |         delete from stg_dict_field_detail | 
					
						
							|  |  |  |         where db_id = {db_id} and dataset_id is null;  -- remove if not match to dataset | 
					
						
							| 
									
										
										
										
											2016-03-09 19:15:42 -08:00
										 |  |  | 
 | 
					
						
							|  |  |  |         -- delete old record if it does not exist in this load batch anymore (but have the dataset id) | 
					
						
							| 
									
										
										
										
											2016-05-20 17:43:37 -07:00
										 |  |  |         -- join with dict_dataset to avoid right join using index. (using index will slow down the query) | 
					
						
							| 
									
										
										
										
											2017-03-22 10:23:30 -07:00
										 |  |  |         create temporary table if not exists t_deleted_fields (primary key (field_id)) ENGINE=MyISAM | 
					
						
							|  |  |  |           SELECT x.field_id | 
					
						
							|  |  |  |           FROM (select dataset_id, field_name, parent_path from stg_dict_field_detail where db_id = {db_id}) s | 
					
						
							|  |  |  |           RIGHT JOIN | 
					
						
							|  |  |  |             ( select dataset_id, field_id, field_name, parent_path from dict_field_detail | 
					
						
							|  |  |  |               where dataset_id in (select dataset_id from stg_dict_field_detail where db_id = {db_id}) | 
					
						
							|  |  |  |             ) x | 
					
						
							|  |  |  |             ON s.dataset_id = x.dataset_id | 
					
						
							|  |  |  |             AND s.field_name = x.field_name | 
					
						
							| 
									
										
										
										
											2017-04-04 13:02:46 -07:00
										 |  |  |             AND s.parent_path <=> x.parent_path | 
					
						
							| 
									
										
										
										
											2017-03-22 10:23:30 -07:00
										 |  |  |           WHERE s.field_name is null | 
					
						
							| 
									
										
										
										
											2016-03-09 19:15:42 -08:00
										 |  |  |         ; -- run time : ~2min | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         delete from dict_field_detail where field_id in (select field_id from t_deleted_fields); | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-05-20 17:43:37 -07:00
										 |  |  |         -- update the old record if some thing changed. e.g. sort id changed | 
					
						
							| 
									
										
										
										
											2016-03-09 19:15:42 -08:00
										 |  |  |         update dict_field_detail t join | 
					
						
							|  |  |  |         ( | 
					
						
							|  |  |  |           select x.field_id, s.* | 
					
						
							| 
									
										
										
										
											2016-05-20 17:43:37 -07:00
										 |  |  |           from stg_dict_field_detail s | 
					
						
							| 
									
										
										
										
											2016-03-09 19:15:42 -08:00
										 |  |  |                join dict_field_detail x | 
					
						
							|  |  |  |            on s.field_name = x.field_name | 
					
						
							| 
									
										
										
										
											2017-04-04 13:02:46 -07:00
										 |  |  |           and s.parent_path <=> x.parent_path | 
					
						
							| 
									
										
										
										
											2016-05-20 17:43:37 -07:00
										 |  |  |           and s.dataset_id = x.dataset_id | 
					
						
							| 
									
										
										
										
											2016-03-09 19:15:42 -08:00
										 |  |  |           where s.db_id = {db_id} | 
					
						
							|  |  |  |             and (x.sort_id <> s.sort_id | 
					
						
							|  |  |  |                 or x.parent_sort_id <> s.parent_sort_id | 
					
						
							|  |  |  |                 or x.data_type <> s.data_type | 
					
						
							|  |  |  |                 or x.data_size <> s.data_size or (x.data_size is null XOR s.data_size is null) | 
					
						
							|  |  |  |                 or x.data_precision <> s.data_precision or (x.data_precision is null XOR s.data_precision is null) | 
					
						
							|  |  |  |                 or x.is_nullable <> s.is_nullable or (x.is_nullable is null XOR s.is_nullable is null) | 
					
						
							|  |  |  |                 or x.is_partitioned <> s.is_partitioned or (x.is_partitioned is null XOR s.is_partitioned is null) | 
					
						
							|  |  |  |                 or x.is_distributed <> s.is_distributed or (x.is_distributed is null XOR s.is_distributed is null) | 
					
						
							|  |  |  |                 or x.default_value <> s.default_value or (x.default_value is null XOR s.default_value is null) | 
					
						
							|  |  |  |                 or x.namespace <> s.namespace or (x.namespace is null XOR s.namespace is null) | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  |         ) p | 
					
						
							|  |  |  |           on t.field_id = p.field_id | 
					
						
							|  |  |  |         set t.sort_id = p.sort_id, | 
					
						
							|  |  |  |             t.parent_sort_id = p.parent_sort_id, | 
					
						
							|  |  |  |             t.data_type = p.data_type, | 
					
						
							|  |  |  |             t.data_size = p.data_size, | 
					
						
							|  |  |  |             t.data_precision = p.data_precision, | 
					
						
							|  |  |  |             t.is_nullable = p.is_nullable, | 
					
						
							|  |  |  |             t.is_partitioned = p.is_partitioned, | 
					
						
							|  |  |  |             t.is_distributed = p.is_distributed, | 
					
						
							|  |  |  |             t.default_value = p.default_value, | 
					
						
							|  |  |  |             t.namespace = p.namespace, | 
					
						
							| 
									
										
										
										
											2016-05-20 17:43:37 -07:00
										 |  |  |             t.modified = now(); | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         -- insert new ones | 
					
						
							|  |  |  |         CREATE TEMPORARY TABLE IF NOT EXISTS t_existed_field | 
					
						
							| 
									
										
										
										
											2017-03-22 10:23:30 -07:00
										 |  |  |         ( primary key (urn, sort_id, db_id) ) ENGINE=MyISAM | 
					
						
							| 
									
										
										
										
											2016-05-20 17:43:37 -07:00
										 |  |  |         AS ( | 
					
						
							|  |  |  |         SELECT sf.urn, sf.sort_id, sf.db_id, count(*) field_count | 
					
						
							|  |  |  |         FROM stg_dict_field_detail sf | 
					
						
							|  |  |  |         JOIN dict_field_detail t | 
					
						
							|  |  |  |           ON sf.dataset_id = t.dataset_id | 
					
						
							|  |  |  |          AND sf.field_name = t.field_name | 
					
						
							| 
									
										
										
										
											2017-04-04 13:02:46 -07:00
										 |  |  |          AND sf.parent_path <=> t.parent_path | 
					
						
							| 
									
										
										
										
											2016-05-20 17:43:37 -07:00
										 |  |  |         WHERE sf.db_id = {db_id} | 
					
						
							|  |  |  |           and sf.dataset_id IS NOT NULL | 
					
						
							|  |  |  |         group by 1,2,3 | 
					
						
							|  |  |  |         ); | 
					
						
							| 
									
										
										
										
											2016-03-09 19:15:42 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-03-22 10:23:30 -07:00
										 |  |  |         insert ignore into dict_field_detail ( | 
					
						
							| 
									
										
										
										
											2016-03-09 19:15:42 -08:00
										 |  |  |           dataset_id, fields_layout_id, sort_id, parent_sort_id, parent_path, | 
					
						
							|  |  |  |           field_name, namespace, data_type, data_size, is_nullable, default_value, | 
					
						
							| 
									
										
										
										
											2016-05-20 17:43:37 -07:00
										 |  |  |            modified | 
					
						
							| 
									
										
										
										
											2016-03-09 19:15:42 -08:00
										 |  |  |         ) | 
					
						
							|  |  |  |         select | 
					
						
							| 
									
										
										
										
											2016-05-20 17:43:37 -07:00
										 |  |  |           sf.dataset_id, 0, sf.sort_id, sf.parent_sort_id, sf.parent_path, | 
					
						
							|  |  |  |           sf.field_name, sf.namespace, sf.data_type, sf.data_size, sf.is_nullable, sf.default_value, now() | 
					
						
							|  |  |  |         from stg_dict_field_detail sf | 
					
						
							|  |  |  |         where sf.db_id = {db_id} and sf.dataset_id is not null | 
					
						
							|  |  |  |           and (sf.urn, sf.sort_id, sf.db_id) not in (select urn, sort_id, db_id from t_existed_field) | 
					
						
							| 
									
										
										
										
											2016-03-09 19:15:42 -08:00
										 |  |  |         ; | 
					
						
							| 
									
										
										
										
											2016-05-20 17:43:37 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-04-04 13:02:46 -07:00
										 |  |  |         analyze table dict_field_detail; | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         -- delete old record in staging field comment map | 
					
						
							| 
									
										
										
										
											2016-05-20 17:43:37 -07:00
										 |  |  |         delete from stg_dict_dataset_field_comment where db_id = {db_id}; | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-04-04 13:02:46 -07:00
										 |  |  |         -- insert new field comments | 
					
						
							|  |  |  |         insert into field_comments ( | 
					
						
							|  |  |  |           user_id, comment, created, modified, comment_crc32_checksum | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         select 0 user_id, description, now() created, now() modified, crc32(description) from | 
					
						
							|  |  |  |         ( | 
					
						
							|  |  |  |           select sf.description | 
					
						
							|  |  |  |           from stg_dict_field_detail sf left join field_comments fc | 
					
						
							|  |  |  |             on sf.description = fc.comment | 
					
						
							|  |  |  |           where sf.description is not null | 
					
						
							|  |  |  |             and fc.id is null | 
					
						
							|  |  |  |             and sf.db_id = {db_id} | 
					
						
							|  |  |  |           group by 1 order by 1 | 
					
						
							|  |  |  |         ) d; | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         analyze table field_comments; | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         -- insert field to comment map to staging | 
					
						
							| 
									
										
										
										
											2016-05-20 17:43:37 -07:00
										 |  |  |         insert ignore into stg_dict_dataset_field_comment | 
					
						
							| 
									
										
										
										
											2017-04-04 13:02:46 -07:00
										 |  |  |         select t.field_id field_id, fc.id comment_id, sf.dataset_id, {db_id} | 
					
						
							| 
									
										
										
										
											2016-05-20 17:43:37 -07:00
										 |  |  |                 from stg_dict_field_detail sf | 
					
						
							|  |  |  |                       join field_comments fc | 
					
						
							|  |  |  |                   on sf.description = fc.comment | 
					
						
							|  |  |  |                       join dict_field_detail t | 
					
						
							|  |  |  |                   on sf.dataset_id = t.dataset_id | 
					
						
							| 
									
										
										
										
											2017-04-04 13:02:46 -07:00
										 |  |  |                   and sf.field_name = t.field_name | 
					
						
							|  |  |  |                   and sf.parent_path <=> t.parent_path | 
					
						
							| 
									
										
										
										
											2016-05-20 17:43:37 -07:00
										 |  |  |         where sf.db_id = {db_id}; | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         -- have default comment, insert it set default to 0 | 
					
						
							|  |  |  |         insert ignore into dict_dataset_field_comment | 
					
						
							|  |  |  |         select field_id, comment_id, dataset_id, 0 is_default from stg_dict_dataset_field_comment where field_id in ( | 
					
						
							|  |  |  |           select field_id from dict_dataset_field_comment | 
					
						
							|  |  |  |           where field_id in (select field_id from stg_dict_dataset_field_comment) | 
					
						
							|  |  |  |         and is_default = 1 ) and db_id = {db_id}; | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         -- doesn't have this comment before, insert into it and set as default | 
					
						
							|  |  |  |         insert ignore into dict_dataset_field_comment | 
					
						
							|  |  |  |         select sd.field_id, sd.comment_id, sd.dataset_id, 1 from stg_dict_dataset_field_comment sd | 
					
						
							|  |  |  |         left join dict_dataset_field_comment d | 
					
						
							|  |  |  |         on d.field_id = sd.field_id | 
					
						
							|  |  |  |          and d.comment_id = sd.comment_id | 
					
						
							|  |  |  |         where d.comment_id is null | 
					
						
							|  |  |  |         and sd.db_id = {db_id}; | 
					
						
							| 
									
										
										
										
											2015-12-16 16:58:32 -08:00
										 |  |  |         """.format(source_file=self.input_field_file, db_id=self.db_id)
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-03-22 10:23:30 -07:00
										 |  |  |     self.executeCommands(load_cmd) | 
					
						
							|  |  |  |     self.logger.info("Load dataset fields.") | 
					
						
							| 
									
										
										
										
											2015-12-16 16:58:32 -08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-06-02 18:29:44 -07:00
										 |  |  |   def load_dataset_instance(self): | 
					
						
							|  |  |  |       """
 | 
					
						
							|  |  |  |       Load dataset instance | 
					
						
							|  |  |  |       :return: | 
					
						
							|  |  |  |       """
 | 
					
						
							|  |  |  |       load_cmd = """
 | 
					
						
							|  |  |  |         DELETE FROM stg_dict_dataset_instance WHERE db_id = {db_id}; | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         LOAD DATA LOCAL INFILE '{source_file}' | 
					
						
							|  |  |  |         INTO TABLE stg_dict_dataset_instance | 
					
						
							|  |  |  |         FIELDS TERMINATED BY '\x1a' ESCAPED BY '\0' | 
					
						
							| 
									
										
										
										
											2016-07-20 15:59:11 -07:00
										 |  |  |         (dataset_urn, deployment_tier, data_center, server_cluster, slice, | 
					
						
							| 
									
										
										
										
											2017-07-19 17:07:28 -07:00
										 |  |  |          is_active, native_name, logical_name, version, instance_created_time, | 
					
						
							| 
									
										
										
										
											2016-07-20 18:01:25 -07:00
										 |  |  |          schema_text, ddl_text, abstract_dataset_urn) | 
					
						
							| 
									
										
										
										
											2017-07-19 17:07:28 -07:00
										 |  |  |         SET db_id = {db_id}, | 
					
						
							|  |  |  |             created_time=unix_timestamp(now()), | 
					
						
							|  |  |  |             wh_etl_exec_id = {wh_etl_exec_id}; | 
					
						
							| 
									
										
										
										
											2016-06-02 18:29:44 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |         -- update dataset_id | 
					
						
							|  |  |  |         update stg_dict_dataset_instance sdi, dict_dataset d | 
					
						
							| 
									
										
										
										
											2016-06-09 18:51:05 -07:00
										 |  |  |         set sdi.dataset_id = d.id where sdi.abstract_dataset_urn = d.urn | 
					
						
							| 
									
										
										
										
											2016-06-02 18:29:44 -07:00
										 |  |  |         and sdi.db_id = {db_id}; | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-09-30 17:15:43 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-06-09 18:51:05 -07:00
										 |  |  |         INSERT INTO dict_dataset_instance | 
					
						
							|  |  |  |         ( dataset_id, | 
					
						
							|  |  |  |           db_id, | 
					
						
							|  |  |  |           deployment_tier, | 
					
						
							|  |  |  |           data_center, | 
					
						
							|  |  |  |           server_cluster, | 
					
						
							|  |  |  |           slice, | 
					
						
							| 
									
										
										
										
											2017-07-19 17:07:28 -07:00
										 |  |  |           is_active, | 
					
						
							| 
									
										
										
										
											2016-06-09 18:51:05 -07:00
										 |  |  |           native_name, | 
					
						
							|  |  |  |           logical_name, | 
					
						
							|  |  |  |           version, | 
					
						
							| 
									
										
										
										
											2016-06-29 18:00:10 -07:00
										 |  |  |           version_sort_id, | 
					
						
							| 
									
										
										
										
											2016-07-20 18:01:25 -07:00
										 |  |  |           schema_text, | 
					
						
							|  |  |  |           ddl_text, | 
					
						
							| 
									
										
										
										
											2016-07-20 19:07:16 -07:00
										 |  |  |           instance_created_time, | 
					
						
							| 
									
										
										
										
											2016-06-09 18:51:05 -07:00
										 |  |  |           created_time, | 
					
						
							|  |  |  |           wh_etl_exec_id | 
					
						
							|  |  |  |         ) | 
					
						
							| 
									
										
										
										
											2016-09-30 17:15:43 -07:00
										 |  |  |         select s.dataset_id, s.db_id, s.deployment_tier, c.data_center, c.cluster, | 
					
						
							| 
									
										
										
										
											2017-07-19 17:07:28 -07:00
										 |  |  |           s.slice, s.is_active, s.native_name, s.logical_name, s.version, | 
					
						
							| 
									
										
										
										
											2016-06-29 18:00:10 -07:00
										 |  |  |           case when s.version regexp '[0-9]+\.[0-9]+\.[0-9]+' | 
					
						
							|  |  |  |             then cast(substring_index(s.version, '.', 1) as unsigned) * 100000000 + | 
					
						
							|  |  |  |                  cast(substring_index(substring_index(s.version, '.', 2), '.', -1) as unsigned) * 10000 + | 
					
						
							|  |  |  |                  cast(substring_index(s.version, '.', -1) as unsigned) | 
					
						
							|  |  |  |           else 0 | 
					
						
							| 
									
										
										
										
											2016-07-20 18:01:25 -07:00
										 |  |  |           end version_sort_id, s.schema_text, s.ddl_text, | 
					
						
							| 
									
										
										
										
											2016-06-09 18:51:05 -07:00
										 |  |  |           s.instance_created_time, s.created_time, s.wh_etl_exec_id | 
					
						
							| 
									
										
										
										
											2016-06-29 18:00:10 -07:00
										 |  |  |         from stg_dict_dataset_instance s join dict_dataset d on s.dataset_id = d.id | 
					
						
							| 
									
										
										
										
											2016-09-30 17:15:43 -07:00
										 |  |  |         join cfg_database c on c.db_id = {db_id} | 
					
						
							| 
									
										
										
										
											2016-06-09 18:51:05 -07:00
										 |  |  |         where s.db_id = {db_id} | 
					
						
							|  |  |  |         on duplicate key update | 
					
						
							|  |  |  |           deployment_tier=s.deployment_tier, data_center=s.data_center, server_cluster=s.server_cluster, slice=s.slice, | 
					
						
							| 
									
										
										
										
											2017-07-19 17:07:28 -07:00
										 |  |  |           is_active=s.is_active, native_name=s.native_name, logical_name=s.logical_name, version=s.version, | 
					
						
							| 
									
										
										
										
											2016-07-20 18:01:25 -07:00
										 |  |  |           schema_text=s.schema_text, ddl_text=s.ddl_text, | 
					
						
							| 
									
										
										
										
											2017-07-19 17:07:28 -07:00
										 |  |  |           instance_created_time=s.instance_created_time, created_time=s.created_time, wh_etl_exec_id=s.wh_etl_exec_id | 
					
						
							|  |  |  |           ; | 
					
						
							| 
									
										
										
										
											2016-07-20 15:59:11 -07:00
										 |  |  |         """.format(source_file=self.input_instance_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id)
 | 
					
						
							| 
									
										
										
										
											2016-06-02 18:29:44 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-03-22 10:23:30 -07:00
										 |  |  |       self.executeCommands(load_cmd) | 
					
						
							|  |  |  |       self.logger.info("Load dataset instance.") | 
					
						
							| 
									
										
										
										
											2016-09-30 17:15:43 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-06-02 18:29:44 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |   def load_dataset_dependencies(self): | 
					
						
							|  |  |  |       """
 | 
					
						
							| 
									
										
										
										
											2017-03-22 10:23:30 -07:00
										 |  |  |       Load dataset dependencies | 
					
						
							| 
									
										
										
										
											2016-06-02 18:29:44 -07:00
										 |  |  |       :return: | 
					
						
							|  |  |  |       """
 | 
					
						
							|  |  |  |       load_cmd = """
 | 
					
						
							| 
									
										
										
										
											2016-06-09 18:51:05 -07:00
										 |  |  |         DELETE FROM stg_cfg_object_name_map; | 
					
						
							| 
									
										
										
										
											2016-06-02 18:29:44 -07:00
										 |  |  |         LOAD DATA LOCAL INFILE '{source_file}' | 
					
						
							|  |  |  |         INTO TABLE stg_cfg_object_name_map | 
					
						
							|  |  |  |         FIELDS TERMINATED BY '\x1a' ESCAPED BY '\0' | 
					
						
							| 
									
										
										
										
											2016-06-09 18:51:05 -07:00
										 |  |  |         (object_type, object_sub_type, object_name, object_urn, map_phrase, is_identical_map, | 
					
						
							|  |  |  |          mapped_object_type, mapped_object_sub_type, mapped_object_name, mapped_object_urn, description, @last_modified) | 
					
						
							|  |  |  |          SET last_modified=now(); | 
					
						
							| 
									
										
										
										
											2016-06-02 18:29:44 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |         -- update source dataset_id | 
					
						
							| 
									
										
										
										
											2016-06-09 18:51:05 -07:00
										 |  |  |         UPDATE stg_cfg_object_name_map s, dict_dataset d | 
					
						
							|  |  |  |         SET s.object_dataset_id = d.id WHERE s.object_urn = d.urn; | 
					
						
							| 
									
										
										
										
											2016-06-02 18:29:44 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |         -- update mapped dataset_id | 
					
						
							| 
									
										
										
										
											2016-06-09 18:51:05 -07:00
										 |  |  |         UPDATE stg_cfg_object_name_map s, dict_dataset d | 
					
						
							|  |  |  |         SET s.mapped_object_dataset_id = d.id WHERE s.mapped_object_urn = d.urn; | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         -- create to be deleted table | 
					
						
							| 
									
										
										
										
											2016-07-20 19:07:16 -07:00
										 |  |  |         DROP TEMPORARY table IF EXISTS t_deleted_depend; | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-03-22 10:23:30 -07:00
										 |  |  |         CREATE TEMPORARY TABLE t_deleted_depend ENGINE=MyISAM | 
					
						
							| 
									
										
										
										
											2016-06-09 18:51:05 -07:00
										 |  |  |         AS ( | 
					
						
							| 
									
										
										
										
											2016-07-20 19:07:16 -07:00
										 |  |  |         SELECT DISTINCT c.obj_name_map_id | 
					
						
							| 
									
										
										
										
											2016-06-09 18:51:05 -07:00
										 |  |  |           FROM cfg_object_name_map c LEFT JOIN stg_cfg_object_name_map s | 
					
						
							|  |  |  |           ON c.object_dataset_id = s.object_dataset_id | 
					
						
							|  |  |  |             and CASE WHEN c.mapped_object_dataset_id is not null | 
					
						
							|  |  |  |                     THEN c.mapped_object_dataset_id = s.mapped_object_dataset_id | 
					
						
							|  |  |  |                     ELSE c.mapped_object_name = s.mapped_object_name | 
					
						
							|  |  |  |                 END | 
					
						
							| 
									
										
										
										
											2016-07-20 19:07:16 -07:00
										 |  |  |           WHERE s.object_name is not null | 
					
						
							| 
									
										
										
										
											2016-06-09 18:51:05 -07:00
										 |  |  |             and c.object_dataset_id is not null | 
					
						
							|  |  |  |             and c.map_phrase = 'depends on' | 
					
						
							|  |  |  |             and c.object_type in ('dalids', 'hive')); | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         -- delete old dependencies | 
					
						
							|  |  |  |         DELETE FROM cfg_object_name_map where obj_name_map_id in ( | 
					
						
							|  |  |  |           SELECT obj_name_map_id FROM t_deleted_depend | 
					
						
							|  |  |  |         ); | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         -- insert new depends | 
					
						
							| 
									
										
										
										
											2016-08-03 15:50:00 -07:00
										 |  |  |         INSERT IGNORE INTO cfg_object_name_map | 
					
						
							| 
									
										
										
										
											2016-06-09 18:51:05 -07:00
										 |  |  |         ( | 
					
						
							|  |  |  |           object_type, | 
					
						
							|  |  |  |           object_sub_type, | 
					
						
							|  |  |  |           object_name, | 
					
						
							|  |  |  |           object_dataset_id, | 
					
						
							|  |  |  |           map_phrase, | 
					
						
							|  |  |  |           is_identical_map, | 
					
						
							|  |  |  |           mapped_object_type, | 
					
						
							|  |  |  |           mapped_object_sub_type, | 
					
						
							|  |  |  |           mapped_object_name, | 
					
						
							|  |  |  |           mapped_object_dataset_id, | 
					
						
							|  |  |  |           description, | 
					
						
							|  |  |  |           last_modified | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         SELECT s.object_type, s.object_sub_type, s.object_name, s.object_dataset_id, s.map_phrase, s.is_identical_map, | 
					
						
							|  |  |  |           s.mapped_object_type, s.mapped_object_sub_type, s.mapped_object_name, s.mapped_object_dataset_id, | 
					
						
							|  |  |  |           s.description, s.last_modified | 
					
						
							|  |  |  |           FROM stg_cfg_object_name_map s LEFT JOIN cfg_object_name_map c | 
					
						
							|  |  |  |           ON s.object_dataset_id is not null and s.object_dataset_id = c.object_dataset_id | 
					
						
							|  |  |  |             and CASE WHEN s.mapped_object_dataset_id is not null | 
					
						
							|  |  |  |                     THEN s.mapped_object_dataset_id = c.mapped_object_dataset_id | 
					
						
							|  |  |  |                     ELSE s.mapped_object_name = c.mapped_object_name | 
					
						
							|  |  |  |                 END | 
					
						
							|  |  |  |           WHERE c.object_name is null; | 
					
						
							| 
									
										
										
										
											2016-06-02 18:29:44 -07:00
										 |  |  |         """.format(source_file=self.input_dependency_file)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       # didn't load into final table for now | 
					
						
							| 
									
										
										
										
											2017-03-22 10:23:30 -07:00
										 |  |  |       self.executeCommands(load_cmd) | 
					
						
							|  |  |  |       self.logger.info("Load dataset dependencies.") | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-06-02 18:29:44 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-03-22 10:23:30 -07:00
										 |  |  |   def executeCommands(self, commands): | 
					
						
							|  |  |  |       for cmd in commands.split(";"): | 
					
						
							|  |  |  |           self.logger.debug(cmd) | 
					
						
							|  |  |  |           self.conn_cursor.execute(cmd) | 
					
						
							| 
									
										
										
										
											2016-06-02 18:29:44 -07:00
										 |  |  |           self.conn_mysql.commit() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-02-03 19:22:18 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-12-16 16:58:32 -08:00
										 |  |  | if __name__ == "__main__": | 
					
						
							|  |  |  |   args = sys.argv[1] | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-09-13 14:01:14 -07:00
										 |  |  |   l = HiveLoad(args[Constant.WH_EXEC_ID_KEY]) | 
					
						
							| 
									
										
										
										
											2015-12-16 16:58:32 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-12-21 12:06:26 -08:00
										 |  |  |   try: | 
					
						
							|  |  |  |     l.load_metadata() | 
					
						
							| 
									
										
										
										
											2016-06-02 18:29:44 -07:00
										 |  |  |     l.load_dataset_instance() | 
					
						
							|  |  |  |     l.load_dataset_dependencies() | 
					
						
							| 
									
										
										
										
											2016-06-09 18:51:05 -07:00
										 |  |  |     l.load_field() | 
					
						
							|  |  |  |   except Exception as e: | 
					
						
							|  |  |  |     l.logger.error(str(e)) | 
					
						
							| 
									
										
										
										
											2015-12-21 12:06:26 -08:00
										 |  |  |   finally: | 
					
						
							|  |  |  |     l.conn_mysql.close() |