From 6c57e3024072e60907a2c69e44ac7af8de1a51e3 Mon Sep 17 00:00:00 2001 From: Yi Wang Date: Fri, 24 Feb 2017 11:08:18 -0800 Subject: [PATCH 1/4] Fix bugs found by AppCheck in issue #328 --- .../app/controllers/DatasetController.java | 2 +- .../dao/DatasetColumnCommentRowMapper.java | 25 +++++++++---------- web/app/dao/LineageDAO.java | 5 ++-- .../schemas/DatasetConstraintRecord.java | 2 +- 4 files changed, 16 insertions(+), 18 deletions(-) diff --git a/backend-service/app/controllers/DatasetController.java b/backend-service/app/controllers/DatasetController.java index b066e1eb85..e0f354ef69 100644 --- a/backend-service/app/controllers/DatasetController.java +++ b/backend-service/app/controllers/DatasetController.java @@ -181,7 +181,7 @@ public class DatasetController extends Controller { String dataset_type = uri_parts[0]; String dataset_path = uri_parts[1].substring(2); // start from the 3rd slash if (dataset_path.indexOf(".") > 0) { - dataset_path.replace(".", "/"); + dataset_path = dataset_path.replace(".", "/"); } if (dataset_path != null) { diff --git a/web/app/dao/DatasetColumnCommentRowMapper.java b/web/app/dao/DatasetColumnCommentRowMapper.java index d29ca52745..23fa1380b8 100644 --- a/web/app/dao/DatasetColumnCommentRowMapper.java +++ b/web/app/dao/DatasetColumnCommentRowMapper.java @@ -12,6 +12,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */ package dao; + import models.DatasetColumnComment; import org.apache.commons.lang3.StringUtils; import org.springframework.jdbc.core.RowMapper; @@ -19,19 +20,18 @@ import org.springframework.jdbc.core.RowMapper; import java.sql.ResultSet; import java.sql.SQLException; -public class DatasetColumnCommentRowMapper implements RowMapper -{ - public static String ID_COLUMN = "id"; - public static String AUTHOR_COLUMN = "author"; - public static String TEXT_COLUMN = "text"; - public static String CREATED_TIME_COLUMN = "created"; - public static String MODIFIED_TIME_COLUMN = "modified"; - public static String FIELD_ID_COLUMN = "field_id"; - public static String IS_DEFAULT_COLUMN = "is_default"; + +public class DatasetColumnCommentRowMapper implements RowMapper { + private static String ID_COLUMN = "id"; + private static String AUTHOR_COLUMN = "author"; + private static String TEXT_COLUMN = "text"; + private static String CREATED_TIME_COLUMN = "created"; + private static String MODIFIED_TIME_COLUMN = "modified"; + private static String FIELD_ID_COLUMN = "field_id"; + private static String IS_DEFAULT_COLUMN = "is_default"; @Override - public DatasetColumnComment mapRow(ResultSet rs, int rowNum) throws SQLException - { + public DatasetColumnComment mapRow(ResultSet rs, int rowNum) throws SQLException { Long id = rs.getLong(ID_COLUMN); String author = rs.getString(AUTHOR_COLUMN); String text = rs.getString(TEXT_COLUMN); @@ -40,8 +40,7 @@ public class DatasetColumnCommentRowMapper implements RowMapper additionalReferences) { - this.additionalReferences = this.additionalReferences; + this.additionalReferences = additionalReferences; } public Long getModifiedTime() { From b6223001b8e59024b164ed13adbc696832d18b95 Mon Sep 17 00:00:00 2001 From: Yi Wang Date: Tue, 28 Feb 2017 15:48:50 -0800 Subject: [PATCH 2/4] Fix issues from Oracle MetadataChangeEvent integration --- .../app/models/daos/DatasetInfoDao.java | 29 ++++++++++++++++--- .../models/kafka/MetadataChangeProcessor.java | 7 +++++ .../DDL/ETL_DDL/dataset_info_metadata.sql | 1 - 3 files changed, 32 insertions(+), 5 deletions(-) diff --git a/backend-service/app/models/daos/DatasetInfoDao.java b/backend-service/app/models/daos/DatasetInfoDao.java index 13c8337ca3..152203d16b 100644 --- a/backend-service/app/models/daos/DatasetInfoDao.java +++ b/backend-service/app/models/daos/DatasetInfoDao.java @@ -22,6 +22,8 @@ import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import utils.Urn; import org.springframework.dao.DataAccessException; import org.springframework.dao.EmptyResultDataAccessException; @@ -174,7 +176,7 @@ public class DatasetInfoDao { public static final String GET_USER_BY_USER_ID = "SELECT * FROM " + EXTERNAL_USER_TABLE + " WHERE user_id = :user_id"; public static final String GET_APP_ID_BY_GROUP_ID = - "SELECT app_id FROM " + EXTERNAL_GROUP_TABLE + " WHERE group_id = :group_id GROUP BY group_id"; + "SELECT app_id FROM " + EXTERNAL_GROUP_TABLE + " WHERE group_id = :group_id GROUP BY app_id"; public static final String GET_DATASET_CONSTRAINT_BY_DATASET_ID = "SELECT * FROM " + DATASET_CONSTRAINT_TABLE + " WHERE dataset_id = :dataset_id"; @@ -223,6 +225,20 @@ public class DatasetInfoDao { PreparedStatementUtil.prepareInsertTemplateWithColumn("REPLACE", DATASET_INVENTORY_TABLE, DatasetInventoryItemRecord.getInventoryItemColumns()); + private static final String UrnRegex = "urn:li:dataset:\\(urn:li:dataPlatform:(\\w*),\\s?([^\\s]*),\\s?(\\w*)\\)"; + + private static Pattern UrnPattern = Pattern.compile(UrnRegex); + + private static String urnToUri(String urn) { + // from urn:li:dataset:(urn:li:dataPlatform:p, nativeName, dataOrigin) to p:///nativeName + Matcher m = UrnPattern.matcher(urn); + if(m.matches()) { + String platform = m.group(1) + "://"; + String datasetName = m.group(2).replace(".", "/"); + return platform + (datasetName.startsWith("/") ? "" : "/") + datasetName; + } + return null; + } private static Object[] findIdAndUrn(Integer datasetId) throws SQLException { @@ -268,7 +284,7 @@ public class DatasetInfoDao { final JsonNode urnNode = root.path("urn"); if (!urnNode.isMissingNode() && !urnNode.isNull()) { try { - final Object[] idUrn = findIdAndUrn(urnNode.asText()); + final Object[] idUrn = findIdAndUrn(urnToUri(urnNode.asText())); if (idUrn[0] != null && idUrn[1] != null) { return idUrn; } @@ -867,7 +883,7 @@ public class DatasetInfoDao { record.setIsGroup(ownerTypeString != null && ownerTypeString.equalsIgnoreCase("group") ? "Y" : "N"); if (datasetId == 0 || appId == 0) { - String sql = PreparedStatementUtil.prepareInsertTemplateWithColumn(DATASET_OWNER_UNMATCHED_TABLE, + String sql = PreparedStatementUtil.prepareInsertTemplateWithColumn("REPLACE", DATASET_OWNER_UNMATCHED_TABLE, record.getDbColumnForUnmatchedOwner()); OWNER_UNMATCHED_WRITER.execute(sql, record.getValuesForUnmatchedOwner()); } else { @@ -1197,8 +1213,13 @@ public class DatasetInfoDao { if (idUrn[0] != null && idUrn[1] != null) { datasetId = (Integer) idUrn[0]; urn = (String) idUrn[1]; - } else { + } else if (root.get("datasetProperties") != null && root.get("datasetProperties").get("uri") != null) { urn = root.path("datasetProperties").path("uri").asText(); + } else if (root.get("urn") != null) { + urn = urnToUri(root.get("urn").asText()); + } else { + Logger.info("Can't identify dataset URN, abort process."); + return; } ObjectMapper om = new ObjectMapper(); diff --git a/backend-service/app/models/kafka/MetadataChangeProcessor.java b/backend-service/app/models/kafka/MetadataChangeProcessor.java index e781d83192..b4d2ce92c2 100644 --- a/backend-service/app/models/kafka/MetadataChangeProcessor.java +++ b/backend-service/app/models/kafka/MetadataChangeProcessor.java @@ -53,6 +53,13 @@ public class MetadataChangeProcessor { && datasetIdentifier == null) { Logger.info("Can't identify dataset from uri/urn/datasetIdentifier, abort process. " + record.toString()); return null; + } else if (urn != null) { + Logger.debug("URN: " + urn); + } else if (datasetProperties != null && datasetProperties.get("uri") != null) { + Logger.debug("URI: " + datasetProperties.get("uri")); + } else { + Logger.debug( + "Dataset Identifier: " + datasetIdentifier.get("dataPlatformUrn") + datasetIdentifier.get("nativeName")); } final JsonNode rootNode = new ObjectMapper().readTree(record.toString()); diff --git a/data-model/DDL/ETL_DDL/dataset_info_metadata.sql b/data-model/DDL/ETL_DDL/dataset_info_metadata.sql index aabc648f1f..7b5e3c21a0 100644 --- a/data-model/DDL/ETL_DDL/dataset_info_metadata.sql +++ b/data-model/DDL/ETL_DDL/dataset_info_metadata.sql @@ -175,7 +175,6 @@ CREATE TABLE dataset_schema_info ( `dataset_id` INT UNSIGNED NOT NULL, `dataset_urn` VARCHAR(200) NOT NULL, `is_backward_compatible` BOOLEAN DEFAULT NULL, - `is_latest_revision` BOOLEAN NOT NULL, `create_time` BIGINT NOT NULL, `revision` INT UNSIGNED DEFAULT NULL, `version` VARCHAR(20) DEFAULT NULL, From 5390d9d2b6cb81584dbe2b319346755ca3d6acbf Mon Sep 17 00:00:00 2001 From: Yi Wang Date: Fri, 17 Mar 2017 15:05:30 -0700 Subject: [PATCH 3/4] Optimize dataset load scripts, improve speed --- backend-service/conf/application.conf | 1 - backend-service/conf/logback.xml | 2 +- .../src/main/resources/jython/HdfsLoad.py | 94 ++++++------- .../src/main/resources/jython/HiveLoad.py | 86 +++++------- .../src/main/resources/jython/OracleLoad.py | 57 ++++---- .../src/main/resources/jython/TeradataLoad.py | 129 +++++++++--------- .../teradata/TeradataMetadataEtlTest.java | 14 ++ web/conf/logback.xml | 2 +- 8 files changed, 185 insertions(+), 200 deletions(-) diff --git a/backend-service/conf/application.conf b/backend-service/conf/application.conf index a151aa6ef3..a3958a1aaf 100644 --- a/backend-service/conf/application.conf +++ b/backend-service/conf/application.conf @@ -41,7 +41,6 @@ db.wherehows.driver = com.mysql.jdbc.Driver db.wherehows.url = ${WHZ_DB_URL} db.wherehows.username = ${WHZ_DB_USERNAME} db.wherehows.password = ${WHZ_DB_PASSWORD} -db.wherehows.host = ${WHZ_DB_HOST} # You can expose this datasource via JNDI if needed (Useful for JPA) # db.default.jndiName=DefaultDS diff --git a/backend-service/conf/logback.xml b/backend-service/conf/logback.xml index 72854595d0..81da7aa6ac 100644 --- a/backend-service/conf/logback.xml +++ b/backend-service/conf/logback.xml @@ -19,7 +19,7 @@ - %coloredLevel - %logger - %message%n%xException + %d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n diff --git a/metadata-etl/src/main/resources/jython/HdfsLoad.py b/metadata-etl/src/main/resources/jython/HdfsLoad.py index b3e6d65c28..1b84689bbb 100644 --- a/metadata-etl/src/main/resources/jython/HdfsLoad.py +++ b/metadata-etl/src/main/resources/jython/HdfsLoad.py @@ -27,7 +27,6 @@ class HdfsLoad: Load dataset metadata into final table :return: nothing """ - cursor = self.conn_mysql.cursor() load_cmd = ''' DELETE FROM stg_dict_dataset WHERE db_id = {db_id}; @@ -197,15 +196,11 @@ class HdfsLoad: ; '''.format(source_file=self.input_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id) - for state in load_cmd.split(";"): - self.logger.debug(state) - cursor.execute(state) - self.conn_mysql.commit() - cursor.close() + self.executeCommands(load_cmd) self.logger.info("finish loading hdfs metadata db_id={db_id} to dict_dataset".format(db_id=self.db_id)) + def load_field(self): - cursor = self.conn_mysql.cursor() load_field_cmd = ''' DELETE FROM stg_dict_field_detail where db_id = {db_id}; @@ -242,24 +237,25 @@ class HdfsLoad: analyze table field_comments; + -- update stg_dict_field_detail dataset_id + update stg_dict_field_detail sf, dict_dataset d + set sf.dataset_id = d.id where sf.urn = d.urn + and sf.db_id = {db_id}; + delete from stg_dict_field_detail + where db_id = {db_id} and dataset_id is null; -- remove if not match to dataset + -- delete old record if it does not exist in this load batch anymore (but have the dataset id) - create temporary table if not exists t_deleted_fields (primary key (field_id)) - select x.field_id - from stg_dict_field_detail s - join dict_dataset i - on s.urn = i.urn - and s.db_id = {db_id} - right join dict_field_detail x - on i.id = x.dataset_id - and s.field_name = x.field_name - and s.parent_path = x.parent_path - where s.field_name is null - and x.dataset_id in ( - select d.id dataset_id - from stg_dict_field_detail k join dict_dataset d - on k.urn = d.urn - and k.db_id = {db_id} - ) + create temporary table if not exists t_deleted_fields (primary key (field_id)) ENGINE=MyISAM + SELECT x.field_id + FROM (select dataset_id, field_name, parent_path from stg_dict_field_detail where db_id = {db_id}) s + RIGHT JOIN + ( select dataset_id, field_id, field_name, parent_path from dict_field_detail + where dataset_id in (select dataset_id from stg_dict_field_detail where db_id = {db_id}) + ) x + ON s.dataset_id = x.dataset_id + AND s.field_name = x.field_name + AND s.parent_path = x.parent_path + WHERE s.field_name is null ; -- run time : ~2min delete from dict_field_detail where field_id in (select field_id from t_deleted_fields); @@ -269,12 +265,10 @@ class HdfsLoad: ( select x.field_id, s.* from (select * from stg_dict_field_detail where db_id = {db_id}) s - join dict_dataset d - on s.urn = d.urn join dict_field_detail x on s.field_name = x.field_name and coalesce(s.parent_path, '*') = coalesce(x.parent_path, '*') - and d.id = x.dataset_id + and s.dataset_id = x.dataset_id where (x.sort_id <> s.sort_id or x.parent_sort_id <> s.parent_sort_id or x.data_type <> s.data_type @@ -307,12 +301,11 @@ class HdfsLoad: modified ) select - d.id, 0, sf.sort_id, sf.parent_sort_id, sf.parent_path, + sf.dataset_id, 0, sf.sort_id, sf.parent_sort_id, sf.parent_path, sf.field_name, sf.namespace, sf.data_type, sf.data_size, sf.is_nullable, sf.default_value, now() - from stg_dict_field_detail sf join dict_dataset d - on sf.urn = d.urn + from stg_dict_field_detail sf left join dict_field_detail t - on d.id = t.dataset_id + on sf.dataset_id = t.dataset_id and sf.field_name = t.field_name and sf.parent_path = t.parent_path where db_id = {db_id} and t.field_id is null @@ -325,13 +318,12 @@ class HdfsLoad: -- insert insert into stg_dict_dataset_field_comment - select t.field_id field_id, fc.id comment_id, d.id dataset_id, {db_id} - from stg_dict_field_detail sf join dict_dataset d - on sf.urn = d.urn + select t.field_id field_id, fc.id comment_id, sf.dataset_id dataset_id, {db_id} + from stg_dict_field_detail sf join field_comments fc on sf.description = fc.comment join dict_field_detail t - on d.id = t.dataset_id + on sf.dataset_id = t.dataset_id and sf.field_name = t.field_name and sf.parent_path = t.parent_path where sf.db_id = {db_id}; @@ -357,15 +349,12 @@ class HdfsLoad: DELETE FROM stg_dict_field_detail where db_id = {db_id}; '''.format(source_file=self.input_field_file, db_id=self.db_id) - for state in load_field_cmd.split(";"): - self.logger.debug(state) - cursor.execute(state) - self.conn_mysql.commit() - cursor.close() + + self.executeCommands(load_field_cmd) self.logger.info("finish loading hdfs metadata db_id={db_id} to dict_field_detail".format(db_id=self.db_id)) + def load_sample(self): - cursor = self.conn_mysql.cursor() load_sample_cmd = ''' DELETE FROM stg_dict_dataset_sample where db_id = {db_id}; @@ -375,11 +364,11 @@ class HdfsLoad: (urn,ref_urn,data) SET db_id = {db_id}; - -- update reference id in stagging table - UPDATE stg_dict_dataset_sample s + -- update reference id in staging table + UPDATE stg_dict_dataset_sample s JOIN dict_dataset d ON s.ref_urn = d.urn SET s.ref_id = d.id - WHERE s.db_id = {db_id} AND s.ref_urn > '' AND s.ref_urn <> 'null'; + WHERE s.db_id = {db_id} AND s.ref_urn > ''; -- first insert ref_id as 0 INSERT INTO dict_dataset_sample @@ -401,14 +390,18 @@ class HdfsLoad: SET d.ref_id = s.ref_id WHERE s.db_id = {db_id} AND d.ref_id = 0; '''.format(source_file=self.input_sample_file, db_id=self.db_id) - for state in load_sample_cmd.split(";"): - self.logger.debug(state) - cursor.execute(state) - self.conn_mysql.commit() - cursor.close() + + self.executeCommands(load_sample_cmd) self.logger.info("finish loading hdfs sample data db_id={db_id} to dict_dataset_sample".format(db_id=self.db_id)) + def executeCommands(self, commands): + for cmd in commands.split(";"): + self.logger.debug(cmd) + self.conn_cursor.execute(cmd) + self.conn_mysql.commit() + + if __name__ == "__main__": args = sys.argv[1] @@ -426,10 +419,11 @@ if __name__ == "__main__": l.db_id = args[Constant.DB_ID_KEY] l.wh_etl_exec_id = args[Constant.WH_EXEC_ID_KEY] l.conn_mysql = zxJDBC.connect(JDBC_URL, username, password, JDBC_DRIVER) + l.conn_cursor = l.conn_mysql.cursor() if Constant.INNODB_LOCK_WAIT_TIMEOUT in args: lock_wait_time = args[Constant.INNODB_LOCK_WAIT_TIMEOUT] - l.conn_mysql.cursor().execute("SET innodb_lock_wait_timeout = %s;" % lock_wait_time) + l.conn_cursor.execute("SET innodb_lock_wait_timeout = %s;" % lock_wait_time) try: l.load_metadata() diff --git a/metadata-etl/src/main/resources/jython/HiveLoad.py b/metadata-etl/src/main/resources/jython/HiveLoad.py index 72421d0b24..6ee4d480a5 100644 --- a/metadata-etl/src/main/resources/jython/HiveLoad.py +++ b/metadata-etl/src/main/resources/jython/HiveLoad.py @@ -23,7 +23,6 @@ class HiveLoad: self.logger = LoggerFactory.getLogger("%s[%s]" % (self.__class__.__name__, wh_etl_exec_id)) def load_metadata(self): - cursor = self.conn_mysql.cursor() load_cmd = """ DELETE FROM stg_dict_dataset WHERE db_id = {db_id}; @@ -97,18 +96,15 @@ class HiveLoad: ; """.format(source_file=self.input_schema_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id) - for state in load_cmd.split(";"): - self.logger.info(state) - cursor.execute(state) - self.conn_mysql.commit() - cursor.close() + self.executeCommands(load_cmd) + self.logger.info("Load dataset metadata.") + def load_field(self): """ Load fields :return: """ - cursor = self.conn_mysql.cursor() load_cmd = """ DELETE FROM stg_dict_field_detail WHERE db_id = {db_id}; @@ -125,30 +121,26 @@ class HiveLoad: , description=nullif(@description,'null') , last_modified=now(); - -- update dataset_id + -- update stg_dict_field_detail dataset_id update stg_dict_field_detail sf, dict_dataset d set sf.dataset_id = d.id where sf.urn = d.urn and sf.db_id = {db_id}; + delete from stg_dict_field_detail + where db_id = {db_id} and dataset_id is null; -- remove if not match to dataset -- delete old record if it does not exist in this load batch anymore (but have the dataset id) -- join with dict_dataset to avoid right join using index. (using index will slow down the query) - create temporary table if not exists t_deleted_fields (primary key (field_id)) - select x.field_id - from stg_dict_field_detail s - join dict_dataset i - on s.urn = i.urn - and s.db_id = {db_id} - right join dict_field_detail x - on i.id = x.dataset_id - and s.field_name = x.field_name - and s.parent_path = x.parent_path - where s.field_name is null - and x.dataset_id in ( - select d.id dataset_id - from stg_dict_field_detail k join dict_dataset d - on k.urn = d.urn - and k.db_id = {db_id} - ) + create temporary table if not exists t_deleted_fields (primary key (field_id)) ENGINE=MyISAM + SELECT x.field_id + FROM (select dataset_id, field_name, parent_path from stg_dict_field_detail where db_id = {db_id}) s + RIGHT JOIN + ( select dataset_id, field_id, field_name, parent_path from dict_field_detail + where dataset_id in (select dataset_id from stg_dict_field_detail where db_id = {db_id}) + ) x + ON s.dataset_id = x.dataset_id + AND s.field_name = x.field_name + AND s.parent_path = x.parent_path + WHERE s.field_name is null ; -- run time : ~2min delete from dict_field_detail where field_id in (select field_id from t_deleted_fields); @@ -190,7 +182,7 @@ class HiveLoad: -- insert new ones CREATE TEMPORARY TABLE IF NOT EXISTS t_existed_field - ( primary key (urn, sort_id, db_id) ) + ( primary key (urn, sort_id, db_id) ) ENGINE=MyISAM AS ( SELECT sf.urn, sf.sort_id, sf.db_id, count(*) field_count FROM stg_dict_field_detail sf @@ -203,8 +195,7 @@ class HiveLoad: group by 1,2,3 ); - - insert ignore into dict_field_detail ( + insert ignore into dict_field_detail ( dataset_id, fields_layout_id, sort_id, parent_sort_id, parent_path, field_name, namespace, data_type, data_size, is_nullable, default_value, modified @@ -239,7 +230,6 @@ class HiveLoad: where field_id in (select field_id from stg_dict_dataset_field_comment) and is_default = 1 ) and db_id = {db_id}; - -- doesn't have this comment before, insert into it and set as default insert ignore into dict_dataset_field_comment select sd.field_id, sd.comment_id, sd.dataset_id, 1 from stg_dict_dataset_field_comment sd @@ -249,7 +239,6 @@ class HiveLoad: where d.comment_id is null and sd.db_id = {db_id}; - insert into field_comments ( user_id, comment, created, modified, comment_crc32_checksum ) @@ -266,20 +255,15 @@ class HiveLoad: """.format(source_file=self.input_field_file, db_id=self.db_id) - # didn't load into final table for now + self.executeCommands(load_cmd) + self.logger.info("Load dataset fields.") - for state in load_cmd.split(";"): - self.logger.info(state) - cursor.execute(state) - self.conn_mysql.commit() - cursor.close() def load_dataset_instance(self): """ Load dataset instance :return: """ - cursor = self.conn_mysql.cursor() load_cmd = """ DELETE FROM stg_dict_dataset_instance WHERE db_id = {db_id}; @@ -337,21 +321,15 @@ class HiveLoad: ; """.format(source_file=self.input_instance_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id) + self.executeCommands(load_cmd) + self.logger.info("Load dataset instance.") - # didn't load into final table for now - - for state in load_cmd.split(";"): - self.logger.info(state) - cursor.execute(state) - self.conn_mysql.commit() - cursor.close() def load_dataset_dependencies(self): """ - Load dataset instance + Load dataset dependencies :return: """ - cursor = self.conn_mysql.cursor() load_cmd = """ DELETE FROM stg_cfg_object_name_map; LOAD DATA LOCAL INFILE '{source_file}' @@ -372,7 +350,7 @@ class HiveLoad: -- create to be deleted table DROP TEMPORARY table IF EXISTS t_deleted_depend; - CREATE TEMPORARY TABLE t_deleted_depend + CREATE TEMPORARY TABLE t_deleted_depend ENGINE=MyISAM AS ( SELECT DISTINCT c.obj_name_map_id FROM cfg_object_name_map c LEFT JOIN stg_cfg_object_name_map s @@ -420,12 +398,15 @@ class HiveLoad: """.format(source_file=self.input_dependency_file) # didn't load into final table for now + self.executeCommands(load_cmd) + self.logger.info("Load dataset dependencies.") - for state in load_cmd.split(";"): - self.logger.info(state) - cursor.execute(state) + + def executeCommands(self, commands): + for cmd in commands.split(";"): + self.logger.debug(cmd) + self.conn_cursor.execute(cmd) self.conn_mysql.commit() - cursor.close() if __name__ == "__main__": @@ -446,10 +427,11 @@ if __name__ == "__main__": l.db_id = args[Constant.DB_ID_KEY] l.wh_etl_exec_id = args[Constant.WH_EXEC_ID_KEY] l.conn_mysql = zxJDBC.connect(JDBC_URL, username, password, JDBC_DRIVER) + l.conn_cursor = l.conn_mysql.cursor() if Constant.INNODB_LOCK_WAIT_TIMEOUT in args: lock_wait_time = args[Constant.INNODB_LOCK_WAIT_TIMEOUT] - l.conn_mysql.cursor().execute("SET innodb_lock_wait_timeout = %s;" % lock_wait_time) + l.conn_cursor.execute("SET innodb_lock_wait_timeout = %s;" % lock_wait_time) try: l.load_metadata() diff --git a/metadata-etl/src/main/resources/jython/OracleLoad.py b/metadata-etl/src/main/resources/jython/OracleLoad.py index 0785293920..8895a12aca 100644 --- a/metadata-etl/src/main/resources/jython/OracleLoad.py +++ b/metadata-etl/src/main/resources/jython/OracleLoad.py @@ -15,7 +15,7 @@ from com.ziclix.python.sql import zxJDBC from wherehows.common import Constant from org.slf4j import LoggerFactory -import sys, os, datetime +import sys, datetime class OracleLoad: @@ -159,24 +159,25 @@ class OracleLoad: analyze table {field_comments}; + -- update stg_dict_field_detail dataset_id + update stg_dict_field_detail sf, dict_dataset d + set sf.dataset_id = d.id where sf.urn = d.urn + and sf.db_id = {db_id}; + delete from stg_dict_field_detail + where db_id = {db_id} and dataset_id is null; -- remove if not match to dataset + -- delete old record if it does not exist in this load batch anymore (but have the dataset id) - create temporary table if not exists t_deleted_fields (primary key (field_id)) - select x.field_id - from stg_dict_field_detail s - join {dict_dataset} i - on s.urn = i.urn - and s.db_id = {db_id} - right join {dict_field_detail} x - on i.id = x.dataset_id - and s.field_name = x.field_name - and s.parent_path = x.parent_path - where s.field_name is null - and x.dataset_id in ( - select d.id dataset_id - from stg_dict_field_detail k join {dict_dataset} d - on k.urn = d.urn - and k.db_id = {db_id} - ) + create temporary table if not exists t_deleted_fields (primary key (field_id)) ENGINE=MyISAM + SELECT x.field_id + FROM (select dataset_id, field_name, parent_path from stg_dict_field_detail where db_id = {db_id}) s + RIGHT JOIN + ( select dataset_id, field_id, field_name, parent_path from dict_field_detail + where dataset_id in (select dataset_id from stg_dict_field_detail where db_id = {db_id}) + ) x + ON s.dataset_id = x.dataset_id + AND s.field_name = x.field_name + AND s.parent_path = x.parent_path + WHERE s.field_name is null ; -- run time : ~2min delete from {dict_field_detail} where field_id in (select field_id from t_deleted_fields); @@ -186,12 +187,10 @@ class OracleLoad: ( select x.field_id, s.* from stg_dict_field_detail s - join {dict_dataset} d - on s.urn = d.urn join {dict_field_detail} x on s.field_name = x.field_name and coalesce(s.parent_path, '*') = coalesce(x.parent_path, '*') - and d.id = x.dataset_id + and s.dataset_id = x.dataset_id where s.db_id = {db_id} and (x.sort_id <> s.sort_id or x.parent_sort_id <> s.parent_sort_id @@ -224,12 +223,11 @@ class OracleLoad: field_name, namespace, data_type, data_size, is_nullable, default_value, modified ) select - d.id, 0, sf.sort_id, sf.parent_sort_id, sf.parent_path, + sf.dataset_id, 0, sf.sort_id, sf.parent_sort_id, sf.parent_path, sf.field_name, sf.namespace, sf.data_type, sf.data_size, sf.is_nullable, sf.default_value, now() - from stg_dict_field_detail sf join {dict_dataset} d - on sf.urn = d.urn + from stg_dict_field_detail sf left join {dict_field_detail} t - on d.id = t.dataset_id + on sf.dataset_id = t.dataset_id and sf.field_name = t.field_name and sf.parent_path = t.parent_path where db_id = {db_id} and t.field_id is null @@ -242,13 +240,12 @@ class OracleLoad: -- insert insert into stg_dict_dataset_field_comment - select t.field_id field_id, fc.id comment_id, d.id dataset_id, {db_id} - from stg_dict_field_detail sf join {dict_dataset} d - on sf.urn = d.urn + select t.field_id field_id, fc.id comment_id, sf.dataset_id dataset_id, {db_id} + from stg_dict_field_detail sf join {field_comments} fc on sf.description = fc.comment join {dict_field_detail} t - on d.id = t.dataset_id + on sf.dataset_id = t.dataset_id and sf.field_name = t.field_name and sf.parent_path = t.parent_path where sf.db_id = {db_id}; @@ -290,7 +287,7 @@ class OracleLoad: (urn,ref_urn,data) SET db_id = {db_id}; - -- update reference id in stagging table + -- update reference id in staging table UPDATE stg_dict_dataset_sample s LEFT JOIN {dict_dataset} d ON s.ref_urn = d.urn SET s.ref_id = d.id diff --git a/metadata-etl/src/main/resources/jython/TeradataLoad.py b/metadata-etl/src/main/resources/jython/TeradataLoad.py index 520cb3d42b..91f740d098 100644 --- a/metadata-etl/src/main/resources/jython/TeradataLoad.py +++ b/metadata-etl/src/main/resources/jython/TeradataLoad.py @@ -12,7 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # -import sys +import sys, datetime from com.ziclix.python.sql import zxJDBC from distutils.util import strtobool from wherehows.common import Constant @@ -24,7 +24,6 @@ class TeradataLoad: self.logger = LoggerFactory.getLogger('jython script : ' + self.__class__.__name__) def load_metadata(self): - cursor = self.conn_mysql.cursor() load_cmd = ''' DELETE FROM stg_dict_dataset WHERE db_id = {db_id}; @@ -173,14 +172,11 @@ class TeradataLoad: ; '''.format(source_file=self.input_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id) - for state in load_cmd.split(";"): - self.logger.debug(state) - cursor.execute(state) - self.conn_mysql.commit() - cursor.close() + self.executeCommands(load_cmd) + self.logger.info("Finish loading metadata") + def load_field(self): - cursor = self.conn_mysql.cursor() load_cmd = ''' DELETE FROM stg_dict_field_detail where db_id = {db_id}; @@ -206,24 +202,25 @@ class TeradataLoad: and (char_length(trim(description)) = 0 or description in ('null', 'N/A', 'nothing', 'empty', 'none')); + -- update stg_dict_field_detail dataset_id + update stg_dict_field_detail sf, dict_dataset d + set sf.dataset_id = d.id where sf.urn = d.urn + and sf.db_id = {db_id}; + delete from stg_dict_field_detail + where db_id = {db_id} and dataset_id is null; -- remove if not match to dataset + -- delete old record if it does not exist in this load batch anymore (but have the dataset id) - create temporary table if not exists t_deleted_fields (primary key (field_id)) - select x.field_id - from stg_dict_field_detail s - join dict_dataset i - on s.urn = i.urn - and s.db_id = {db_id} - right join dict_field_detail x - on i.id = x.dataset_id - and s.field_name = x.field_name - and s.parent_path = x.parent_path - where s.field_name is null - and x.dataset_id in ( - select d.id dataset_id - from stg_dict_field_detail k join dict_dataset d - on k.urn = d.urn - and k.db_id = {db_id} - ) + create temporary table if not exists t_deleted_fields (primary key (field_id)) ENGINE=MyISAM + SELECT x.field_id + FROM (select dataset_id, field_name, parent_path from stg_dict_field_detail where db_id = {db_id}) s + RIGHT JOIN + ( select dataset_id, field_id, field_name, parent_path from dict_field_detail + where dataset_id in (select dataset_id from stg_dict_field_detail where db_id = {db_id}) + ) x + ON s.dataset_id = x.dataset_id + AND s.field_name = x.field_name + AND s.parent_path = x.parent_path + WHERE s.field_name is null ; -- run time : ~2min delete from dict_field_detail where field_id in (select field_id from t_deleted_fields); @@ -232,12 +229,11 @@ class TeradataLoad: update dict_field_detail t join ( select x.field_id, s.* - from stg_dict_field_detail s join dict_dataset d - on s.urn = d.urn + from stg_dict_field_detail s join dict_field_detail x on s.field_name = x.field_name and coalesce(s.parent_path, '*') = coalesce(x.parent_path, '*') - and d.id = x.dataset_id + and s.dataset_id = x.dataset_id where s.db_id = {db_id} and (x.sort_id <> s.sort_id or x.parent_sort_id <> s.parent_sort_id @@ -274,7 +270,7 @@ class TeradataLoad: modified ) select - d.id, + s.dataset_id, 0 as fields_layout_id, s.sort_id, 0 parent_sort_id, @@ -285,10 +281,9 @@ class TeradataLoad: s.data_scale, s.is_nullable, now() - from stg_dict_field_detail s join dict_dataset d - on s.urn = d.urn + from stg_dict_field_detail s left join dict_field_detail f - on d.id = f.dataset_id + on s.dataset_id = f.dataset_id and s.field_name = f.field_name where db_id = {db_id} and f.field_id is null; @@ -299,13 +294,12 @@ class TeradataLoad: -- insert insert into stg_dict_dataset_field_comment - select t.field_id field_id, fc.id comment_id, d.id dataset_id, {db_id} - from stg_dict_field_detail sf join dict_dataset d - on sf.urn = d.urn + select t.field_id field_id, fc.id comment_id, sf.dataset_id dataset_id, {db_id} + from stg_dict_field_detail sf join field_comments fc on sf.description = fc.comment join dict_field_detail t - on d.id = t.dataset_id + on sf.dataset_id = t.dataset_id and sf.field_name = t.field_name and sf.parent_path = t.parent_path where sf.db_id = {db_id}; @@ -328,11 +322,9 @@ class TeradataLoad: and sd.db_id = {db_id}; '''.format(source_file=self.input_field_file, db_id=self.db_id) - for state in load_cmd.split(";"): - self.logger.debug(state) - cursor.execute(state) - self.conn_mysql.commit() - cursor.close() + self.executeCommands(load_cmd) + self.logger.info("Finish loading fields ") + def load_sample(self): load_cmd = ''' @@ -343,11 +335,20 @@ class TeradataLoad: (urn,ref_urn,data) SET db_id = {db_id}; - -- update reference id in stagging table - UPDATE stg_dict_dataset_sample s - LEFT JOIN dict_dataset d ON s.ref_urn = d.urn - SET s.ref_id = d.id - WHERE s.db_id = {db_id}; + -- update dataset id in staging table + UPDATE stg_dict_dataset_sample s + JOIN dict_dataset d ON s.db_id = {db_id} and s.urn = d.urn + SET s.dataset_id = d.id; + + -- update reference id in staging table + UPDATE stg_dict_dataset_sample s + JOIN ( + select dataset_id, id from + (select dataset_id, ref_urn from stg_dict_dataset_sample + where db_id = 3 and ref_urn > '') st + join dict_dataset on urn = ref_urn) d + ON s.dataset_id = d.dataset_id + SET s.ref_id = d.id; -- first insert ref_id as 0 INSERT INTO dict_dataset_sample @@ -357,27 +358,24 @@ class TeradataLoad: `data`, created ) - select d.id as dataset_id, s.urn, s.ref_id, s.data, now() - from stg_dict_dataset_sample s left join dict_dataset d on d.urn = s.urn - where s.db_id = {db_id} - on duplicate key update - `data`=s.data, modified=now(); - - - -- update reference id in final table - UPDATE dict_dataset_sample d - RIGHT JOIN stg_dict_dataset_sample s ON d.urn = s.urn - SET d.ref_id = s.ref_id - WHERE s.db_id = {db_id} AND d.ref_id = 0; - + SELECT * from + (select dataset_id, urn, ref_id s_ref_id, `data` s_data, now() + from stg_dict_dataset_sample WHERE db_id = {db_id}) s + ON DUPLICATE KEY UPDATE + ref_id = COALESCE(s_ref_id, ref_id), + `data`=s_data, + modified=now(); '''.format(source_file=self.input_sampledata_file, db_id=self.db_id) - cursor = self.conn_mysql.cursor() - for state in load_cmd.split(";"): - self.logger.debug(state) - cursor.execute(state) + self.executeCommands(load_cmd) + self.logger.info("Finish loading samples ") + + + def executeCommands(self, commands): + for cmd in commands.split(";"): + self.logger.debug(cmd) + self.conn_cursor.execute(cmd) self.conn_mysql.commit() - cursor.close() if __name__ == "__main__": @@ -405,10 +403,11 @@ if __name__ == "__main__": l.db_id = args[Constant.DB_ID_KEY] l.wh_etl_exec_id = args[Constant.WH_EXEC_ID_KEY] l.conn_mysql = zxJDBC.connect(JDBC_URL, username, password, JDBC_DRIVER) + l.conn_cursor = l.conn_mysql.cursor() if Constant.INNODB_LOCK_WAIT_TIMEOUT in args: lock_wait_time = args[Constant.INNODB_LOCK_WAIT_TIMEOUT] - l.conn_mysql.cursor().execute("SET innodb_lock_wait_timeout = %s;" % lock_wait_time) + l.conn_cursor.execute("SET innodb_lock_wait_timeout = %s;" % lock_wait_time) try: l.load_metadata() diff --git a/metadata-etl/src/test/java/metadata/etl/dataset/teradata/TeradataMetadataEtlTest.java b/metadata-etl/src/test/java/metadata/etl/dataset/teradata/TeradataMetadataEtlTest.java index 9a514083e4..6719ee6e8a 100644 --- a/metadata-etl/src/test/java/metadata/etl/dataset/teradata/TeradataMetadataEtlTest.java +++ b/metadata-etl/src/test/java/metadata/etl/dataset/teradata/TeradataMetadataEtlTest.java @@ -34,4 +34,18 @@ public class TeradataMetadataEtlTest { TeradataMetadataEtl t = new TeradataMetadataEtl(3, 0L); t.extract(); } + + @Test(groups = {"needConfig"}) + public void testTransform() + throws Exception { + TeradataMetadataEtl t = new TeradataMetadataEtl(3, 0L); + t.transform(); + } + + @Test(groups = {"needConfig"}) + public void testLoad() + throws Exception { + TeradataMetadataEtl t = new TeradataMetadataEtl(3, 0L); + t.load(); + } } diff --git a/web/conf/logback.xml b/web/conf/logback.xml index 72854595d0..81da7aa6ac 100644 --- a/web/conf/logback.xml +++ b/web/conf/logback.xml @@ -19,7 +19,7 @@ - %coloredLevel - %logger - %message%n%xException + %d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n From adba53247464b14f5aa9fb9202508c8cd8ac3a0a Mon Sep 17 00:00:00 2001 From: Yi Wang Date: Thu, 23 Mar 2017 22:08:54 -0700 Subject: [PATCH 4/4] Modify compliance purge entity record to support logical type and is_subject --- .../schemas/DatasetComplianceRecord.java | 7 +++--- ...ord.java => DatasetPurgeEntityRecord.java} | 22 +++++++++++++++++-- 2 files changed, 23 insertions(+), 6 deletions(-) rename wherehows-common/src/main/java/wherehows/common/schemas/{DatasetEntityRecord.java => DatasetPurgeEntityRecord.java} (70%) diff --git a/wherehows-common/src/main/java/wherehows/common/schemas/DatasetComplianceRecord.java b/wherehows-common/src/main/java/wherehows/common/schemas/DatasetComplianceRecord.java index d377685259..f06220169a 100644 --- a/wherehows-common/src/main/java/wherehows/common/schemas/DatasetComplianceRecord.java +++ b/wherehows-common/src/main/java/wherehows/common/schemas/DatasetComplianceRecord.java @@ -14,7 +14,6 @@ package wherehows.common.schemas; import java.util.List; -import java.util.Map; public class DatasetComplianceRecord extends AbstractRecord { @@ -22,7 +21,7 @@ public class DatasetComplianceRecord extends AbstractRecord { Integer datasetId; String datasetUrn; String complianceType; - List compliancePurgeEntities; + List compliancePurgeEntities; Long modifiedTime; @Override @@ -63,11 +62,11 @@ public class DatasetComplianceRecord extends AbstractRecord { this.complianceType = complianceType; } - public List getCompliancePurgeEntities() { + public List getCompliancePurgeEntities() { return compliancePurgeEntities; } - public void setCompliancePurgeEntities(List compliancePurgeEntities) { + public void setCompliancePurgeEntities(List compliancePurgeEntities) { this.compliancePurgeEntities = compliancePurgeEntities; } diff --git a/wherehows-common/src/main/java/wherehows/common/schemas/DatasetEntityRecord.java b/wherehows-common/src/main/java/wherehows/common/schemas/DatasetPurgeEntityRecord.java similarity index 70% rename from wherehows-common/src/main/java/wherehows/common/schemas/DatasetEntityRecord.java rename to wherehows-common/src/main/java/wherehows/common/schemas/DatasetPurgeEntityRecord.java index 0b208c1fed..988811cdb7 100644 --- a/wherehows-common/src/main/java/wherehows/common/schemas/DatasetEntityRecord.java +++ b/wherehows-common/src/main/java/wherehows/common/schemas/DatasetPurgeEntityRecord.java @@ -16,17 +16,19 @@ package wherehows.common.schemas; import java.util.List; -public class DatasetEntityRecord extends AbstractRecord { +public class DatasetPurgeEntityRecord extends AbstractRecord { String identifierType; String identifierField; + String logicalType; + Boolean isSubject; @Override public List fillAllFields() { return null; } - public DatasetEntityRecord() { + public DatasetPurgeEntityRecord() { } public String getIdentifierType() { @@ -44,4 +46,20 @@ public class DatasetEntityRecord extends AbstractRecord { public void setIdentifierField(String identifierField) { this.identifierField = identifierField; } + + public String getLogicalType() { + return logicalType; + } + + public void setLogicalType(String logicalType) { + this.logicalType = logicalType; + } + + public Boolean getIsSubject() { + return isSubject; + } + + public void setIsSubject(Boolean isSubject) { + this.isSubject = isSubject; + } }