diff --git a/backend-service/app/actors/KafkaConsumerMaster.java b/backend-service/app/actors/KafkaConsumerMaster.java index 081ff8d083..f3668d1727 100644 --- a/backend-service/app/actors/KafkaConsumerMaster.java +++ b/backend-service/app/actors/KafkaConsumerMaster.java @@ -28,16 +28,21 @@ import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; +import metadata.etl.models.EtlJobName; +import models.daos.ClusterDao; +import models.daos.EtlJobDao; import org.apache.avro.generic.GenericData; import msgs.KafkaResponseMsg; import play.Logger; +import play.Play; import utils.JdbcUtil; import utils.KafkaConfig; import utils.KafkaConfig.Topic; import wherehows.common.kafka.schemaregistry.client.CachedSchemaRegistryClient; import wherehows.common.kafka.schemaregistry.client.SchemaRegistryClient; import wherehows.common.schemas.AbstractRecord; +import wherehows.common.utils.ClusterUtil; import wherehows.common.writers.DatabaseWriter; @@ -48,6 +53,7 @@ import wherehows.common.writers.DatabaseWriter; */ public class KafkaConsumerMaster extends UntypedActor { + private static List _kafkaJobList; private static ConsumerConnector _consumer; private static Properties _kafkaConfig; private static Map _kafkaTopics; @@ -64,13 +70,32 @@ public class KafkaConsumerMaster extends UntypedActor { @Override public void preStart() throws Exception { - Logger.info("Start the KafkaConsumerMaster actor..."); + _kafkaJobList = Play.application().configuration().getIntList("kafka.consumer.etl.jobid", null);; + if (_kafkaJobList == null || _kafkaJobList.size() == 0) { + Logger.error("Kafka job id error, kafkaJobList: " + _kafkaJobList); + getContext().stop(getSelf()); + } + Logger.info("Start the KafkaConsumerMaster... Kafka etl job id list: " + _kafkaJobList); + + // handle 1 kafka connection + Map kafkaEtlJob = EtlJobDao.getEtlJobById(_kafkaJobList.get(0)); + final int kafkaJobRefId = Integer.parseInt(kafkaEtlJob.get("ref_id").toString()); + final String kafkaJobName = kafkaEtlJob.get("wh_etl_job_name").toString(); + + if (!kafkaJobName.equals(EtlJobName.KAFKA_CONSUMER_ETL.name())) { + Logger.error("Kafka job info error: job name '" + kafkaJobName + "' not equal " + + EtlJobName.KAFKA_CONSUMER_ETL.name()); + getContext().stop(getSelf()); + } // get Kafka configurations from database - KafkaConfig.updateKafkaProperties(); + KafkaConfig.updateKafkaProperties(kafkaJobRefId); _kafkaConfig = KafkaConfig.getProperties(); _kafkaTopics = KafkaConfig.getTopics(); + // get list of cluster information from database and update ClusterUtil + ClusterUtil.updateClusterInfo(ClusterDao.getClusterInfo()); + for (String topic : _kafkaTopics.keySet()) { // get the processor class and method final Class processorClass = Class.forName(_kafkaTopics.get(topic).processor); @@ -125,7 +150,7 @@ public class KafkaConsumerMaster extends UntypedActor { final AbstractRecord record = kafkaMsg.getRecord(); if (record != null && _kafkaTopics.containsKey(topic)) { - Logger.info("Writing to DB kafka event record: " + topic); + Logger.debug("Writing to DB kafka event record: " + topic); final DatabaseWriter dbWriter = _topicDbWriter.get(topic); try { @@ -145,6 +170,7 @@ public class KafkaConsumerMaster extends UntypedActor { @Override public void postStop() { + Logger.info("Terminating KafkaConsumerMaster..."); if (_consumer != null) { _consumer.shutdown(); _kafkaConfig.clear(); diff --git a/backend-service/app/models/daos/ClusterDao.java b/backend-service/app/models/daos/ClusterDao.java new file mode 100644 index 0000000000..5a862ead06 --- /dev/null +++ b/backend-service/app/models/daos/ClusterDao.java @@ -0,0 +1,51 @@ +/** + * Copyright 2015 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package models.daos; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import utils.JdbcUtil; +import wherehows.common.schemas.ClusterInfo; + + +public class ClusterDao { + + private static final String GET_CLUSTER_INFO = "SELECT * FROM cfg_cluster"; + + public static List getClusterInfo() throws Exception { + + List> rows = JdbcUtil.wherehowsJdbcTemplate.queryForList(GET_CLUSTER_INFO); + + List clusters = new ArrayList<>(); + for (Map row : rows) { + String clusterCode = row.get("cluster_code").toString(); + // skip the default placeholder row + if (clusterCode.equals("[all]")) { + continue; + } + + int clusterId = Integer.parseInt(row.get("cluster_id").toString()); + String clusterShortName = row.get("cluster_short_name").toString(); + String datacenterCode = row.get("data_center_code").toString(); + String clusterType = row.get("cluster_type").toString(); + String deploymentTierCode = row.get("deployment_tier_code").toString(); + + clusters.add( + new ClusterInfo(clusterId, clusterCode, clusterShortName, datacenterCode, clusterType, deploymentTierCode)); + } + + return clusters; + } +} diff --git a/backend-service/app/utils/KafkaConfig.java b/backend-service/app/utils/KafkaConfig.java index 420882653e..b3722bebcf 100644 --- a/backend-service/app/utils/KafkaConfig.java +++ b/backend-service/app/utils/KafkaConfig.java @@ -18,6 +18,7 @@ import java.util.Map; import java.util.Properties; import metadata.etl.models.EtlJobName; import models.daos.EtlJobPropertyDao; +import play.Logger; /** @@ -29,8 +30,7 @@ public class KafkaConfig { // Map of private static Map _topics = new HashMap<>(); - public static final EtlJobName KAFKA_GOBBLIN_JOBNAME = EtlJobName.KAFKA_CONSUMER_GOBBLIN_ETL; - public static final Integer KAFKA_GOBBLIN_JOB_REFID = 50; + public static final EtlJobName KAFKA_JOBNAME = EtlJobName.KAFKA_CONSUMER_ETL; /** * Class for storing Kafka Topic info @@ -53,15 +53,19 @@ public class KafkaConfig { * Update Kafka properties and topics from etl_job_properies table * @throws Exception */ - public static void updateKafkaProperties() throws Exception { - Properties props = EtlJobPropertyDao.getJobProperties(KAFKA_GOBBLIN_JOBNAME, KAFKA_GOBBLIN_JOB_REFID); + public static void updateKafkaProperties(int kafkaJobRefId) throws Exception { + Properties props = EtlJobPropertyDao.getJobProperties(KAFKA_JOBNAME, kafkaJobRefId); + if (props == null || props.size() < 5) { + Logger.error("Fail to update Kafka job properties for " + KAFKA_JOBNAME.name() + + ", job ref id: " + kafkaJobRefId); + return; + } else { + Logger.info("Get Kafka job properties for " + KAFKA_JOBNAME.name() + ", job ref id: " + kafkaJobRefId); + } - String[] topics = ((String) props.get("kafka.topics")).split("\\s*,\\s*"); - props.remove("kafka.topics"); - String[] processors = ((String) props.get("kafka.processors")).split("\\s*,\\s*"); - props.remove("kafka.processors"); - String[] dbTables = ((String) props.get("kafka.db.tables")).split("\\s*,\\s*"); - props.remove("kafka.db.tables"); + String[] topics = ((String) props.remove("kafka.topics")).split("\\s*,\\s*"); + String[] processors = ((String) props.remove("kafka.processors")).split("\\s*,\\s*"); + String[] dbTables = ((String) props.remove("kafka.db.tables")).split("\\s*,\\s*"); _props.clear(); _props.putAll(props); diff --git a/backend-service/conf/application.conf b/backend-service/conf/application.conf index dac3fab5fc..1583fd55a9 100644 --- a/backend-service/conf/application.conf +++ b/backend-service/conf/application.conf @@ -66,4 +66,6 @@ logger.application=DEBUG # if have this varialbe, only the id in this list will be scheduled # scheduler.jobid.whitelist=[1,2,3,4,5,6,7,8,9] scheduler.check.interval=10 +# start the following list of kafka consumer etl jobs +# kafka.consumer.etl.jobid=[44] application.global=shared.Global \ No newline at end of file diff --git a/metadata-etl/src/main/java/metadata/etl/kafka/GobblinTrackingCompactionProcessor.java b/metadata-etl/src/main/java/metadata/etl/kafka/GobblinTrackingCompactionProcessor.java index 7aad77c8db..524902f209 100644 --- a/metadata-etl/src/main/java/metadata/etl/kafka/GobblinTrackingCompactionProcessor.java +++ b/metadata-etl/src/main/java/metadata/etl/kafka/GobblinTrackingCompactionProcessor.java @@ -13,12 +13,15 @@ */ package metadata.etl.kafka; +import java.util.List; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.avro.generic.GenericData; +import wherehows.common.schemas.ClusterInfo; import wherehows.common.schemas.GobblinTrackingCompactionRecord; import wherehows.common.schemas.Record; +import wherehows.common.utils.ClusterUtil; /** @@ -31,7 +34,6 @@ public class GobblinTrackingCompactionProcessor extends KafkaConsumerProcessor { private final String UrnRegex = "^(\\/\\w+\\/\\w+\\/[\\w-]+)\\/([\\w-]+)\\/(\\d+.*)$"; private final Pattern UrnPattern = Pattern.compile(UrnRegex); - /** * Process a Gobblin tracking event compaction record * @param record @@ -53,7 +55,8 @@ public class GobblinTrackingCompactionProcessor extends KafkaConsumerProcessor { final Map metadata = (Map) record.get("metadata"); final String jobContext = "Gobblin:" + name; - final String cluster = parseClusterIdentifier(metadata.get("clusterIdentifier")).get("cluster"); + final String cluster = ClusterUtil.matchClusterCode(metadata.get("clusterIdentifier")); + // final String cluster = parseClusterIdentifier(metadata.get("clusterIdentifier")).get("cluster"); final String projectName = metadata.get("azkabanProjectName"); final String flowId = metadata.get("azkabanFlowId"); final String jobId = metadata.get("azkabanJobId"); diff --git a/metadata-etl/src/main/java/metadata/etl/kafka/GobblinTrackingDistcpNgProcessor.java b/metadata-etl/src/main/java/metadata/etl/kafka/GobblinTrackingDistcpNgProcessor.java index f1b49d3539..86f6c75769 100644 --- a/metadata-etl/src/main/java/metadata/etl/kafka/GobblinTrackingDistcpNgProcessor.java +++ b/metadata-etl/src/main/java/metadata/etl/kafka/GobblinTrackingDistcpNgProcessor.java @@ -13,12 +13,15 @@ */ package metadata.etl.kafka; +import java.util.List; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.avro.generic.GenericData; +import wherehows.common.schemas.ClusterInfo; import wherehows.common.schemas.GobblinTrackingDistcpNgRecord; import wherehows.common.schemas.Record; +import wherehows.common.utils.ClusterUtil; /** @@ -51,7 +54,7 @@ public class GobblinTrackingDistcpNgProcessor extends KafkaConsumerProcessor { final Map metadata = (Map) record.get("metadata"); final String jobContext = "DistcpNG:" + name; - final String cluster = parseClusterIdentifier(metadata.get("clusterIdentifier")).get("cluster"); + final String cluster = ClusterUtil.matchClusterCode(metadata.get("clusterIdentifier")); final String projectName = metadata.get("azkabanProjectName"); final String flowId = metadata.get("azkabanFlowId"); final String jobId = metadata.get("azkabanJobId"); diff --git a/metadata-etl/src/main/java/metadata/etl/kafka/GobblinTrackingLumosProcessor.java b/metadata-etl/src/main/java/metadata/etl/kafka/GobblinTrackingLumosProcessor.java index 7436d9525d..aab3e0580a 100644 --- a/metadata-etl/src/main/java/metadata/etl/kafka/GobblinTrackingLumosProcessor.java +++ b/metadata-etl/src/main/java/metadata/etl/kafka/GobblinTrackingLumosProcessor.java @@ -19,6 +19,7 @@ import java.util.regex.Pattern; import org.apache.avro.generic.GenericData; import wherehows.common.schemas.GobblinTrackingLumosRecord; import wherehows.common.schemas.Record; +import wherehows.common.utils.ClusterUtil; public class GobblinTrackingLumosProcessor extends KafkaConsumerProcessor { @@ -59,7 +60,7 @@ public class GobblinTrackingLumosProcessor extends KafkaConsumerProcessor { logger.info("Processing Gobblin tracking event record: " + name + ", timestamp: " + timestamp); final String jobContext = "Lumos:" + name; - final String cluster = parseClusterIdentifier(metadata.get("clusterIdentifier")).get("cluster"); + final String cluster = ClusterUtil.matchClusterCode(metadata.get("clusterIdentifier")); final String projectName = metadata.get("azkabanProjectName"); final String flowId = metadata.get("azkabanFlowId"); final String jobId = metadata.get("azkabanJobId"); diff --git a/metadata-etl/src/main/java/metadata/etl/kafka/KafkaConsumerProcessor.java b/metadata-etl/src/main/java/metadata/etl/kafka/KafkaConsumerProcessor.java index d1d5bd06ac..787a26b4d0 100644 --- a/metadata-etl/src/main/java/metadata/etl/kafka/KafkaConsumerProcessor.java +++ b/metadata-etl/src/main/java/metadata/etl/kafka/KafkaConsumerProcessor.java @@ -13,10 +13,6 @@ */ package metadata.etl.kafka; -import java.util.HashMap; -import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import org.apache.avro.generic.GenericData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,11 +27,6 @@ public abstract class KafkaConsumerProcessor { protected static final Logger logger = LoggerFactory.getLogger(KafkaConsumerProcessor.class); - // for 'ltx1-holdemrm01.grid.linkedin.com:8032', extract 'ltx1', 'holdem' - private final String ClusterIdentifierRegex = "^(\\w{4})-(\\w+)\\w{2}\\d{2}\\.grid.*$"; - private final Pattern ClusterIdentifierPattern = Pattern.compile(ClusterIdentifierRegex); - - /** * Abstract method 'process' to be implemented by specific processor * input Kafka record, process information and write to DB. @@ -59,24 +50,4 @@ public abstract class KafkaConsumerProcessor { } } - /** - * Parse cluster identifier to get datacenter and cluster - * if length > 10 and in the form of 'ltx1-holdemrm01.grid.linkedin.com:8032', extract 'ltx1', 'holdem' - * otherwise, put the original String as cluster - * @param text String - * @return Map - */ - protected Map parseClusterIdentifier(String text) { - final Map map = new HashMap<>(4); - if (text.length() > 10) { - final Matcher m = ClusterIdentifierPattern.matcher(text); - if (m.find()) { - map.put("datacenter", m.group(1)); - map.put("cluster", m.group(2)); - } - } else { - map.put("cluster", text); - } - return map; - } } diff --git a/metadata-etl/src/main/java/metadata/etl/kafka/MetastoreAuditProcessor.java b/metadata-etl/src/main/java/metadata/etl/kafka/MetastoreAuditProcessor.java index 9aec08c498..bef1e21d70 100644 --- a/metadata-etl/src/main/java/metadata/etl/kafka/MetastoreAuditProcessor.java +++ b/metadata-etl/src/main/java/metadata/etl/kafka/MetastoreAuditProcessor.java @@ -16,6 +16,7 @@ package metadata.etl.kafka; import org.apache.avro.generic.GenericData; import wherehows.common.schemas.MetastoreAuditRecord; import wherehows.common.schemas.Record; +import wherehows.common.utils.ClusterUtil; public class MetastoreAuditProcessor extends KafkaConsumerProcessor { @@ -35,28 +36,28 @@ public class MetastoreAuditProcessor extends KafkaConsumerProcessor { logger.info("Processing Metastore Audit event record."); final GenericData.Record auditHeader = (GenericData.Record) record.get("auditHeader"); - final String server = utf8ToString(auditHeader.get("server")); + final String server = ClusterUtil.matchClusterCode(utf8ToString(auditHeader.get("server"))); final String instance = utf8ToString(auditHeader.get("instance")); final String appName = utf8ToString(auditHeader.get("appName")); String eventName; GenericData.Record content; - final Object oldOne; - final Object newOne; + final Object oldInfo; + final Object newInfo; // check if it is MetastoreTableAuditEvent if (record.get("metastoreTableAuditContent") != null) { eventName = "MetastoreTableAuditEvent"; content = (GenericData.Record) record.get("metastoreTableAuditContent"); - oldOne = content.get("oldTable"); - newOne = content.get("newTable"); + oldInfo = content.get("oldTable"); + newInfo = content.get("newTable"); } // check if it is MetastorePartitionAuditEvent else if (record.get("metastorePartitionAuditContent") != null) { eventName = "MetastorePartitionAuditEvent"; content = (GenericData.Record) record.get("metastorePartitionAuditContent"); - oldOne = content.get("oldPartition"); - newOne = content.get("newPartition"); + oldInfo = content.get("oldPartition"); + newInfo = content.get("newPartition"); } else { throw new IllegalArgumentException("Unknown Metastore Audit event: " + record); @@ -70,7 +71,7 @@ public class MetastoreAuditProcessor extends KafkaConsumerProcessor { final String isDataDeleted = utf8ToString(content.get("isDataDeleted")); // use newOne, if null, use oldOne - final GenericData.Record rec = newOne != null ? (GenericData.Record) newOne : (GenericData.Record) oldOne; + final GenericData.Record rec = newInfo != null ? (GenericData.Record) newInfo : (GenericData.Record) oldInfo; final String dbName = utf8ToString(rec.get("dbName")); final String tableName = utf8ToString(rec.get("tableName")); final String partition = utf8ToString(rec.get("values")); @@ -84,8 +85,8 @@ public class MetastoreAuditProcessor extends KafkaConsumerProcessor { // set null partition to '?' for primary key eventRecord.setTableInfo(dbName, tableName, (partition != null ? partition : "?"), location, owner, createTime, lastAccessTime); - eventRecord.setOldOne(utf8ToString(oldOne)); - eventRecord.setNewOne(utf8ToString(newOne)); + eventRecord.setOldInfo(utf8ToString(oldInfo)); + eventRecord.setNewInfo(utf8ToString(newInfo)); } return eventRecord; } diff --git a/metadata-etl/src/main/java/metadata/etl/models/EtlJobName.java b/metadata-etl/src/main/java/metadata/etl/models/EtlJobName.java index bfc8013e44..a8aaf6747f 100644 --- a/metadata-etl/src/main/java/metadata/etl/models/EtlJobName.java +++ b/metadata-etl/src/main/java/metadata/etl/models/EtlJobName.java @@ -29,7 +29,7 @@ public enum EtlJobName { ELASTICSEARCH_EXECUTION_INDEX_ETL(EtlType.OPERATION, RefIdType.APP), TREEBUILDER_EXECUTION_DATASET_ETL(EtlType.OPERATION, RefIdType.APP), ORACLE_DATASET_METADATA_ETL(EtlType.DATASET, RefIdType.APP), - KAFKA_CONSUMER_GOBBLIN_ETL(EtlType.DATASET, RefIdType.APP), + KAFKA_CONSUMER_ETL(EtlType.OPERATION, RefIdType.DB), ; EtlType etlType; diff --git a/metadata-etl/src/main/resources/jython/OracleExtract.py b/metadata-etl/src/main/resources/jython/OracleExtract.py index 7c35effd64..6ac702aed0 100644 --- a/metadata-etl/src/main/resources/jython/OracleExtract.py +++ b/metadata-etl/src/main/resources/jython/OracleExtract.py @@ -199,7 +199,6 @@ class OracleExtract: ''' schema_dict = {"fields": []} table_record = {} - field_record = {} table_idx = 0 field_idx = 0 @@ -211,14 +210,18 @@ class OracleExtract: # This is a new table. Let's push the previous table record into output_list if 'urn' in table_record: schema_dict["num_fields"] = field_idx - table_record['columns'] = schema_dict + table_record["columns"] = json.dumps(schema_dict) self.table_output_list.append(table_record) + properties = { + "indexes": self.table_dict[table_name_key].get("indexes"), + "partition_column": self.table_dict[table_name_key].get("partition_column") + } table_record = { "name": row[1], - "columns": {}, + "columns": None, "schema_type": "JSON", - "properties": self.table_dict[table_name_key], + "properties": json.dumps(properties), "urn": table_urn, "source": "Oracle", "location_prefix": row[0], @@ -249,7 +252,7 @@ class OracleExtract: # finish all remaining rows schema_dict["num_fields"] = field_idx - table_record['columns'] = schema_dict + table_record["columns"] = json.dumps(schema_dict) self.table_output_list.append(table_record) self.logger.info("%d Table records generated" % table_idx) @@ -304,7 +307,17 @@ class OracleExtract: return None def trim_newline(self, line): - return None if line is None else line.replace('\n', ' ').replace('\r', ' ') + return None if line is None else line.replace('\n', ' ').replace('\r', ' ').encode('ascii', 'ignore') + + def write_csv(self, csv_filename, csv_columns, data_list): + csvfile = open(csv_filename, 'wb') + os.chmod(csv_filename, 0644) + writer = csv.DictWriter(csvfile, fieldnames=csv_columns, delimiter='\x1A', lineterminator='\n', + quoting=csv.QUOTE_NONE, quotechar='\1', escapechar='\0') + writer.writeheader() + for data in data_list: + writer.writerow(data) + csvfile.close() def run(self, database_name, table_name, table_output_file, field_output_file, sample_output_file, sample=False): @@ -323,34 +336,25 @@ class OracleExtract: begin = datetime.datetime.now().strftime("%H:%M:%S") # table info rows = self.get_table_info(None, None) + self.get_extra_table_info() self.format_table_metadata(rows) end = datetime.datetime.now().strftime("%H:%M:%S") self.logger.info("Collecting table info [%s -> %s]" % (str(begin), str(end))) csv_columns = ['name', 'columns', 'schema_type', 'properties', 'urn', 'source', 'location_prefix', 'parent_name', 'storage_type', 'dataset_type', 'is_partitioned'] - csvfile = open(table_output_file, 'wb') - os.chmod(table_output_file, 0666) - writer = csv.DictWriter(csvfile, fieldnames=csv_columns, delimiter='\x1A', lineterminator='\n') - writer.writeheader() - for data in self.table_output_list: - writer.writerow(data) - csvfile.close + self.write_csv(table_output_file, csv_columns, self.table_output_list) csv_columns = ['dataset_urn', 'sort_id', 'name', 'data_type', 'nullable', 'size', 'precision', 'scale', 'default_value', 'doc'] - csvfile = open(field_output_file, 'wb') - os.chmod(field_output_file, 0666) - writer = csv.DictWriter(csvfile, fieldnames=csv_columns, delimiter='\x1A', lineterminator='\n') - writer.writeheader() - for data in self.field_output_list: - writer.writerow(data) - csvfile.close + self.write_csv(field_output_file, csv_columns, self.field_output_list) if sample: csvfile = open(sample_output_file, 'wb') os.chmod(sample_output_file, 0666) - writer = csv.DictWriter(csvfile, fieldnames=csv_columns, delimiter='\x1A', lineterminator='\n') + writer = csv.DictWriter(csvfile, fieldnames=csv_columns, delimiter='\x1A', lineterminator='\n', + quoting=csv.QUOTE_NONE, quotechar='\1', escapechar='\0') + self.logger.info("Writing to CSV file {}".format(sample_output_file)) # collect sample data for onedatabase in schema: diff --git a/metadata-etl/src/main/resources/jython/OracleLoad.py b/metadata-etl/src/main/resources/jython/OracleLoad.py index 45a27d655d..0785293920 100644 --- a/metadata-etl/src/main/resources/jython/OracleLoad.py +++ b/metadata-etl/src/main/resources/jython/OracleLoad.py @@ -15,7 +15,7 @@ from com.ziclix.python.sql import zxJDBC from wherehows.common import Constant from org.slf4j import LoggerFactory -import datetime +import sys, os, datetime class OracleLoad: @@ -39,10 +39,19 @@ class OracleLoad: lock_wait_time = args[Constant.INNODB_LOCK_WAIT_TIMEOUT] self.conn_cursor.execute("SET innodb_lock_wait_timeout = %s;" % lock_wait_time) + self.logger.info("Load Oracle Metadata into {}, db_id {}, wh_exec_id {}" + .format(JDBC_URL, self.db_id, self.wh_etl_exec_id)) + + self.dict_dataset_table = 'dict_dataset' + self.field_comments_table = 'field_comments' + self.dict_field_table = 'dict_field_detail' + self.dict_field_comment_table = 'dict_dataset_field_comment' + self.dict_dataset_sample_table = 'dict_dataset_sample' + def load_tables(self): load_tables_cmd = ''' - DELETE FROM stg_dict_dataset WHERE db_id = '{db_id}'; + DELETE FROM stg_dict_dataset WHERE db_id = {db_id}; -- load into stg table LOAD DATA LOCAL INFILE '{source_file}' @@ -55,7 +64,7 @@ class OracleLoad: wh_etl_exec_id = {wh_etl_exec_id}; -- insert into final table - INSERT INTO dict_dataset + INSERT INTO {dict_dataset} ( `name`, `schema`, schema_type, @@ -97,16 +106,18 @@ class OracleLoad: modified_time=UNIX_TIMESTAMP(now()), wh_etl_exec_id=s.wh_etl_exec_id ; - analyze table dict_dataset; - '''.format(source_file=self.input_table_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id) + analyze table {dict_dataset}; + '''.format(source_file=self.input_table_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id, + dict_dataset=self.dict_dataset_table) self.executeCommands(load_tables_cmd) - self.logger.info("finish loading oracle table metadata") + self.logger.info("finish loading oracle table metadata from {} to {}" + .format(self.input_table_file, self.dict_dataset_table)) def load_fields(self): load_fields_cmd = ''' - DELETE FROM stg_dict_field_detail where db_id = '{db_id}'; + DELETE FROM stg_dict_field_detail where db_id = {db_id}; LOAD DATA LOCAL INFILE '{source_file}' INTO TABLE stg_dict_field_detail @@ -132,13 +143,13 @@ class OracleLoad: and (char_length(trim(description)) = 0 or description in ('null', 'N/A', 'nothing', 'empty', 'none')); - insert into field_comments ( + insert into {field_comments} ( user_id, comment, created, modified, comment_crc32_checksum ) select 0 user_id, description, now() created, now() modified, crc32(description) from ( select sf.description - from stg_dict_field_detail sf left join field_comments fc + from stg_dict_field_detail sf left join {field_comments} fc on sf.description = fc.comment where sf.description is not null and fc.id is null @@ -146,40 +157,41 @@ class OracleLoad: group by 1 order by 1 ) d; - analyze table field_comments; + analyze table {field_comments}; -- delete old record if it does not exist in this load batch anymore (but have the dataset id) create temporary table if not exists t_deleted_fields (primary key (field_id)) select x.field_id from stg_dict_field_detail s - join dict_dataset i + join {dict_dataset} i on s.urn = i.urn and s.db_id = {db_id} - right join dict_field_detail x + right join {dict_field_detail} x on i.id = x.dataset_id and s.field_name = x.field_name and s.parent_path = x.parent_path where s.field_name is null and x.dataset_id in ( select d.id dataset_id - from stg_dict_field_detail k join dict_dataset d + from stg_dict_field_detail k join {dict_dataset} d on k.urn = d.urn and k.db_id = {db_id} ) ; -- run time : ~2min - delete from dict_field_detail where field_id in (select field_id from t_deleted_fields); + delete from {dict_field_detail} where field_id in (select field_id from t_deleted_fields); -- update the old record if some thing changed - update dict_field_detail t join + update {dict_field_detail} t join ( select x.field_id, s.* - from stg_dict_field_detail s join dict_dataset d + from stg_dict_field_detail s + join {dict_dataset} d on s.urn = d.urn - join dict_field_detail x - on s.field_name = x.field_name - and coalesce(s.parent_path, '*') = coalesce(x.parent_path, '*') - and d.id = x.dataset_id + join {dict_field_detail} x + on s.field_name = x.field_name + and coalesce(s.parent_path, '*') = coalesce(x.parent_path, '*') + and d.id = x.dataset_id where s.db_id = {db_id} and (x.sort_id <> s.sort_id or x.parent_sort_id <> s.parent_sort_id @@ -207,24 +219,23 @@ class OracleLoad: t.modified = now() ; - insert into dict_field_detail ( + insert into {dict_field_detail} ( dataset_id, fields_layout_id, sort_id, parent_sort_id, parent_path, - field_name, namespace, data_type, data_size, is_nullable, default_value, - modified + field_name, namespace, data_type, data_size, is_nullable, default_value, modified ) select d.id, 0, sf.sort_id, sf.parent_sort_id, sf.parent_path, sf.field_name, sf.namespace, sf.data_type, sf.data_size, sf.is_nullable, sf.default_value, now() - from stg_dict_field_detail sf join dict_dataset d + from stg_dict_field_detail sf join {dict_dataset} d on sf.urn = d.urn - left join dict_field_detail t + left join {dict_field_detail} t on d.id = t.dataset_id - and sf.field_name = t.field_name - and sf.parent_path = t.parent_path + and sf.field_name = t.field_name + and sf.parent_path = t.parent_path where db_id = {db_id} and t.field_id is null ; - analyze table dict_field_detail; + analyze table {dict_field_detail}; -- delete old record in stagging delete from stg_dict_dataset_field_comment where db_id = {db_id}; @@ -232,36 +243,40 @@ class OracleLoad: -- insert insert into stg_dict_dataset_field_comment select t.field_id field_id, fc.id comment_id, d.id dataset_id, {db_id} - from stg_dict_field_detail sf join dict_dataset d + from stg_dict_field_detail sf join {dict_dataset} d on sf.urn = d.urn - join field_comments fc + join {field_comments} fc on sf.description = fc.comment - join dict_field_detail t + join {dict_field_detail} t on d.id = t.dataset_id and sf.field_name = t.field_name and sf.parent_path = t.parent_path where sf.db_id = {db_id}; -- have default comment, insert it set default to 0 - insert ignore into dict_dataset_field_comment + insert ignore into {dict_dataset_field_comment} select field_id, comment_id, dataset_id, 0 is_default from stg_dict_dataset_field_comment where field_id in ( - select field_id from dict_dataset_field_comment + select field_id from {dict_dataset_field_comment} where field_id in (select field_id from stg_dict_dataset_field_comment) and is_default = 1 ) and db_id = {db_id}; -- doesn't have this comment before, insert into it and set as default - insert ignore into dict_dataset_field_comment - select sd.field_id, sd.comment_id, sd.dataset_id, 1 from stg_dict_dataset_field_comment sd - left join dict_dataset_field_comment d - on d.field_id = sd.field_id - and d.comment_id = sd.comment_id + insert ignore into {dict_dataset_field_comment} + select sd.field_id, sd.comment_id, sd.dataset_id, 1 + from stg_dict_dataset_field_comment sd + left join {dict_dataset_field_comment} d + on d.field_id = sd.field_id + and d.comment_id = sd.comment_id where d.comment_id is null - and sd.db_id = {db_id}; - '''.format(source_file=self.input_field_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id) + and sd.db_id = {db_id}; + '''.format(source_file=self.input_field_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id, + dict_dataset=self.dict_dataset_table, dict_field_detail=self.dict_field_table, + field_comments=self.field_comments_table, dict_dataset_field_comment=self.dict_field_comment_table) self.executeCommands(load_fields_cmd) - self.logger.info("finish loading oracle table fields") + self.logger.info("finish loading oracle table fields from {} to {}" + .format(self.input_field_file, self.dict_field_table)) def load_sample(self): @@ -277,12 +292,12 @@ class OracleLoad: -- update reference id in stagging table UPDATE stg_dict_dataset_sample s - LEFT JOIN dict_dataset d ON s.ref_urn = d.urn + LEFT JOIN {dict_dataset} d ON s.ref_urn = d.urn SET s.ref_id = d.id WHERE s.db_id = {db_id}; -- first insert ref_id as 0 - INSERT INTO dict_dataset_sample + INSERT INTO {dict_dataset_sample} ( `dataset_id`, `urn`, `ref_id`, @@ -290,20 +305,22 @@ class OracleLoad: created ) select d.id as dataset_id, s.urn, s.ref_id, s.data, now() - from stg_dict_dataset_sample s left join dict_dataset d on d.urn = s.urn + from stg_dict_dataset_sample s left join {dict_dataset} d on d.urn = s.urn where s.db_id = {db_id} on duplicate key update `data`=s.data, modified=now(); -- update reference id in final table - UPDATE dict_dataset_sample d + UPDATE {dict_dataset_sample} d RIGHT JOIN stg_dict_dataset_sample s ON d.urn = s.urn SET d.ref_id = s.ref_id WHERE s.db_id = {db_id} AND d.ref_id = 0; - '''.format(source_file=self.input_sample_file, db_id=self.db_id) + '''.format(source_file=self.input_sample_file, db_id=self.db_id, + dict_dataset=self.dict_dataset_table, dict_dataset_sample=self.dict_dataset_sample_table) self.executeCommands(load_sample_cmd) - self.logger.info("finish loading oracle sample data") + self.logger.info("finish loading oracle sample data from {} to {}" + .format(self.input_sample_file, self.dict_dataset_sample_table)) def executeCommands(self, commands): diff --git a/wherehows-common/src/main/java/wherehows/common/schemas/ClusterInfo.java b/wherehows-common/src/main/java/wherehows/common/schemas/ClusterInfo.java new file mode 100644 index 0000000000..54aec1ffc8 --- /dev/null +++ b/wherehows-common/src/main/java/wherehows/common/schemas/ClusterInfo.java @@ -0,0 +1,57 @@ +/** + * Copyright 2015 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package wherehows.common.schemas; + +public class ClusterInfo { + int clusterId; + String clusterCode; + String clusterShortName; + String datacenterCode; + String clusterType; + String deploymentTierCode; + + public ClusterInfo(int clusterId, String clusterCode, String clusterShortName, String datacenterCode, + String clusterType, String deploymentTierCode) { + this.clusterId = clusterId; + this.clusterCode = clusterCode; + this.clusterShortName = clusterShortName; + this.datacenterCode = datacenterCode; + this.clusterType = clusterType; + this.deploymentTierCode = deploymentTierCode; + } + + public int getClusterId() { + return clusterId; + } + + public String getClusterCode() { + return clusterCode; + } + + public String getClusterShortName() { + return clusterShortName; + } + + public String getDatacenterCode() { + return datacenterCode; + } + + public String getClusterType() { + return clusterType; + } + + public String getDeploymentTierCode() { + return deploymentTierCode; + } +} diff --git a/wherehows-common/src/main/java/wherehows/common/schemas/MetastoreAuditRecord.java b/wherehows-common/src/main/java/wherehows/common/schemas/MetastoreAuditRecord.java index 9f80101e2b..832681b785 100644 --- a/wherehows-common/src/main/java/wherehows/common/schemas/MetastoreAuditRecord.java +++ b/wherehows-common/src/main/java/wherehows/common/schemas/MetastoreAuditRecord.java @@ -227,19 +227,19 @@ public class MetastoreAuditRecord extends AbstractRecord { this.lastAccessTime = lastAccessTime; } - public String getOldOne() { + public String getOldInfo() { return oldInfo; } - public void setOldOne(String oldOne) { - this.oldInfo = oldOne; + public void setOldInfo(String oldInfo) { + this.oldInfo = oldInfo; } - public String getNewOne() { + public String getNewInfo() { return newInfo; } - public void setNewOne(String newOne) { - this.newInfo = newOne; + public void setNewInfo(String newInfo) { + this.newInfo = newInfo; } } diff --git a/wherehows-common/src/main/java/wherehows/common/utils/ClusterUtil.java b/wherehows-common/src/main/java/wherehows/common/utils/ClusterUtil.java new file mode 100644 index 0000000000..af6f590f7d --- /dev/null +++ b/wherehows-common/src/main/java/wherehows/common/utils/ClusterUtil.java @@ -0,0 +1,63 @@ +/** + * Copyright 2015 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package wherehows.common.utils; + +import java.util.ArrayList; +import java.util.List; +import wherehows.common.schemas.ClusterInfo; + + +public class ClusterUtil { + + private static final List CLUSTER_LIST = new ArrayList<>(); + + public static void updateClusterInfo(List clusterList) { + if (clusterList != null && clusterList.size() > 0) { + CLUSTER_LIST.clear(); + CLUSTER_LIST.addAll(clusterList); + } + } + + /** + * match the clusterName to the list of cluster info, return the matching clusterCode + * @param clusterName String + * @return clusterCode + */ + public static String matchClusterCode(String clusterName) { + if (clusterName == null) { + return null; + } + + // first use cluster code to find match + for (ClusterInfo cluster : CLUSTER_LIST) { + if (clusterName.contains(cluster.getClusterCode())) { + return cluster.getClusterCode(); + } + } + + // second round use cluster short name to find match + for (ClusterInfo cluster : CLUSTER_LIST) { + if (clusterName.contains(cluster.getClusterShortName())) { + return cluster.getClusterCode(); + } + } + + // no match found, return original string + return clusterName; + } + + public static List getClusterList() { + return CLUSTER_LIST; + } +}