Merge pull request #194 from alyiwang/master

Get cluster info from cfg_cluster and format kafka events cluster field
This commit is contained in:
Eric Sun 2016-08-03 20:24:07 -07:00 committed by GitHub
commit ef584552be
15 changed files with 334 additions and 131 deletions

View File

@ -28,16 +28,21 @@ import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream; import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector; import kafka.javaapi.consumer.ConsumerConnector;
import metadata.etl.models.EtlJobName;
import models.daos.ClusterDao;
import models.daos.EtlJobDao;
import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData;
import msgs.KafkaResponseMsg; import msgs.KafkaResponseMsg;
import play.Logger; import play.Logger;
import play.Play;
import utils.JdbcUtil; import utils.JdbcUtil;
import utils.KafkaConfig; import utils.KafkaConfig;
import utils.KafkaConfig.Topic; import utils.KafkaConfig.Topic;
import wherehows.common.kafka.schemaregistry.client.CachedSchemaRegistryClient; import wherehows.common.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import wherehows.common.kafka.schemaregistry.client.SchemaRegistryClient; import wherehows.common.kafka.schemaregistry.client.SchemaRegistryClient;
import wherehows.common.schemas.AbstractRecord; import wherehows.common.schemas.AbstractRecord;
import wherehows.common.utils.ClusterUtil;
import wherehows.common.writers.DatabaseWriter; import wherehows.common.writers.DatabaseWriter;
@ -48,6 +53,7 @@ import wherehows.common.writers.DatabaseWriter;
*/ */
public class KafkaConsumerMaster extends UntypedActor { public class KafkaConsumerMaster extends UntypedActor {
private static List<Integer> _kafkaJobList;
private static ConsumerConnector _consumer; private static ConsumerConnector _consumer;
private static Properties _kafkaConfig; private static Properties _kafkaConfig;
private static Map<String, Topic> _kafkaTopics; private static Map<String, Topic> _kafkaTopics;
@ -64,13 +70,32 @@ public class KafkaConsumerMaster extends UntypedActor {
@Override @Override
public void preStart() throws Exception { public void preStart() throws Exception {
Logger.info("Start the KafkaConsumerMaster actor..."); _kafkaJobList = Play.application().configuration().getIntList("kafka.consumer.etl.jobid", null);;
if (_kafkaJobList == null || _kafkaJobList.size() == 0) {
Logger.error("Kafka job id error, kafkaJobList: " + _kafkaJobList);
getContext().stop(getSelf());
}
Logger.info("Start the KafkaConsumerMaster... Kafka etl job id list: " + _kafkaJobList);
// handle 1 kafka connection
Map<String, Object> kafkaEtlJob = EtlJobDao.getEtlJobById(_kafkaJobList.get(0));
final int kafkaJobRefId = Integer.parseInt(kafkaEtlJob.get("ref_id").toString());
final String kafkaJobName = kafkaEtlJob.get("wh_etl_job_name").toString();
if (!kafkaJobName.equals(EtlJobName.KAFKA_CONSUMER_ETL.name())) {
Logger.error("Kafka job info error: job name '" + kafkaJobName + "' not equal "
+ EtlJobName.KAFKA_CONSUMER_ETL.name());
getContext().stop(getSelf());
}
// get Kafka configurations from database // get Kafka configurations from database
KafkaConfig.updateKafkaProperties(); KafkaConfig.updateKafkaProperties(kafkaJobRefId);
_kafkaConfig = KafkaConfig.getProperties(); _kafkaConfig = KafkaConfig.getProperties();
_kafkaTopics = KafkaConfig.getTopics(); _kafkaTopics = KafkaConfig.getTopics();
// get list of cluster information from database and update ClusterUtil
ClusterUtil.updateClusterInfo(ClusterDao.getClusterInfo());
for (String topic : _kafkaTopics.keySet()) { for (String topic : _kafkaTopics.keySet()) {
// get the processor class and method // get the processor class and method
final Class processorClass = Class.forName(_kafkaTopics.get(topic).processor); final Class processorClass = Class.forName(_kafkaTopics.get(topic).processor);
@ -125,7 +150,7 @@ public class KafkaConsumerMaster extends UntypedActor {
final AbstractRecord record = kafkaMsg.getRecord(); final AbstractRecord record = kafkaMsg.getRecord();
if (record != null && _kafkaTopics.containsKey(topic)) { if (record != null && _kafkaTopics.containsKey(topic)) {
Logger.info("Writing to DB kafka event record: " + topic); Logger.debug("Writing to DB kafka event record: " + topic);
final DatabaseWriter dbWriter = _topicDbWriter.get(topic); final DatabaseWriter dbWriter = _topicDbWriter.get(topic);
try { try {
@ -145,6 +170,7 @@ public class KafkaConsumerMaster extends UntypedActor {
@Override @Override
public void postStop() { public void postStop() {
Logger.info("Terminating KafkaConsumerMaster...");
if (_consumer != null) { if (_consumer != null) {
_consumer.shutdown(); _consumer.shutdown();
_kafkaConfig.clear(); _kafkaConfig.clear();

View File

@ -0,0 +1,51 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package models.daos;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import utils.JdbcUtil;
import wherehows.common.schemas.ClusterInfo;
public class ClusterDao {
private static final String GET_CLUSTER_INFO = "SELECT * FROM cfg_cluster";
public static List<ClusterInfo> getClusterInfo() throws Exception {
List<Map<String, Object>> rows = JdbcUtil.wherehowsJdbcTemplate.queryForList(GET_CLUSTER_INFO);
List<ClusterInfo> clusters = new ArrayList<>();
for (Map<String, Object> row : rows) {
String clusterCode = row.get("cluster_code").toString();
// skip the default placeholder row
if (clusterCode.equals("[all]")) {
continue;
}
int clusterId = Integer.parseInt(row.get("cluster_id").toString());
String clusterShortName = row.get("cluster_short_name").toString();
String datacenterCode = row.get("data_center_code").toString();
String clusterType = row.get("cluster_type").toString();
String deploymentTierCode = row.get("deployment_tier_code").toString();
clusters.add(
new ClusterInfo(clusterId, clusterCode, clusterShortName, datacenterCode, clusterType, deploymentTierCode));
}
return clusters;
}
}

View File

@ -18,6 +18,7 @@ import java.util.Map;
import java.util.Properties; import java.util.Properties;
import metadata.etl.models.EtlJobName; import metadata.etl.models.EtlJobName;
import models.daos.EtlJobPropertyDao; import models.daos.EtlJobPropertyDao;
import play.Logger;
/** /**
@ -29,8 +30,7 @@ public class KafkaConfig {
// Map of <topic_name, topic_content> // Map of <topic_name, topic_content>
private static Map<String, Topic> _topics = new HashMap<>(); private static Map<String, Topic> _topics = new HashMap<>();
public static final EtlJobName KAFKA_GOBBLIN_JOBNAME = EtlJobName.KAFKA_CONSUMER_GOBBLIN_ETL; public static final EtlJobName KAFKA_JOBNAME = EtlJobName.KAFKA_CONSUMER_ETL;
public static final Integer KAFKA_GOBBLIN_JOB_REFID = 50;
/** /**
* Class for storing Kafka Topic info * Class for storing Kafka Topic info
@ -53,15 +53,19 @@ public class KafkaConfig {
* Update Kafka properties and topics from etl_job_properies table * Update Kafka properties and topics from etl_job_properies table
* @throws Exception * @throws Exception
*/ */
public static void updateKafkaProperties() throws Exception { public static void updateKafkaProperties(int kafkaJobRefId) throws Exception {
Properties props = EtlJobPropertyDao.getJobProperties(KAFKA_GOBBLIN_JOBNAME, KAFKA_GOBBLIN_JOB_REFID); Properties props = EtlJobPropertyDao.getJobProperties(KAFKA_JOBNAME, kafkaJobRefId);
if (props == null || props.size() < 5) {
Logger.error("Fail to update Kafka job properties for " + KAFKA_JOBNAME.name()
+ ", job ref id: " + kafkaJobRefId);
return;
} else {
Logger.info("Get Kafka job properties for " + KAFKA_JOBNAME.name() + ", job ref id: " + kafkaJobRefId);
}
String[] topics = ((String) props.get("kafka.topics")).split("\\s*,\\s*"); String[] topics = ((String) props.remove("kafka.topics")).split("\\s*,\\s*");
props.remove("kafka.topics"); String[] processors = ((String) props.remove("kafka.processors")).split("\\s*,\\s*");
String[] processors = ((String) props.get("kafka.processors")).split("\\s*,\\s*"); String[] dbTables = ((String) props.remove("kafka.db.tables")).split("\\s*,\\s*");
props.remove("kafka.processors");
String[] dbTables = ((String) props.get("kafka.db.tables")).split("\\s*,\\s*");
props.remove("kafka.db.tables");
_props.clear(); _props.clear();
_props.putAll(props); _props.putAll(props);

View File

@ -66,4 +66,6 @@ logger.application=DEBUG
# if have this varialbe, only the id in this list will be scheduled # if have this varialbe, only the id in this list will be scheduled
# scheduler.jobid.whitelist=[1,2,3,4,5,6,7,8,9] # scheduler.jobid.whitelist=[1,2,3,4,5,6,7,8,9]
scheduler.check.interval=10 scheduler.check.interval=10
# start the following list of kafka consumer etl jobs
# kafka.consumer.etl.jobid=[44]
application.global=shared.Global application.global=shared.Global

View File

@ -13,12 +13,15 @@
*/ */
package metadata.etl.kafka; package metadata.etl.kafka;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData;
import wherehows.common.schemas.ClusterInfo;
import wherehows.common.schemas.GobblinTrackingCompactionRecord; import wherehows.common.schemas.GobblinTrackingCompactionRecord;
import wherehows.common.schemas.Record; import wherehows.common.schemas.Record;
import wherehows.common.utils.ClusterUtil;
/** /**
@ -31,7 +34,6 @@ public class GobblinTrackingCompactionProcessor extends KafkaConsumerProcessor {
private final String UrnRegex = "^(\\/\\w+\\/\\w+\\/[\\w-]+)\\/([\\w-]+)\\/(\\d+.*)$"; private final String UrnRegex = "^(\\/\\w+\\/\\w+\\/[\\w-]+)\\/([\\w-]+)\\/(\\d+.*)$";
private final Pattern UrnPattern = Pattern.compile(UrnRegex); private final Pattern UrnPattern = Pattern.compile(UrnRegex);
/** /**
* Process a Gobblin tracking event compaction record * Process a Gobblin tracking event compaction record
* @param record * @param record
@ -53,7 +55,8 @@ public class GobblinTrackingCompactionProcessor extends KafkaConsumerProcessor {
final Map<String, String> metadata = (Map<String, String>) record.get("metadata"); final Map<String, String> metadata = (Map<String, String>) record.get("metadata");
final String jobContext = "Gobblin:" + name; final String jobContext = "Gobblin:" + name;
final String cluster = parseClusterIdentifier(metadata.get("clusterIdentifier")).get("cluster"); final String cluster = ClusterUtil.matchClusterCode(metadata.get("clusterIdentifier"));
// final String cluster = parseClusterIdentifier(metadata.get("clusterIdentifier")).get("cluster");
final String projectName = metadata.get("azkabanProjectName"); final String projectName = metadata.get("azkabanProjectName");
final String flowId = metadata.get("azkabanFlowId"); final String flowId = metadata.get("azkabanFlowId");
final String jobId = metadata.get("azkabanJobId"); final String jobId = metadata.get("azkabanJobId");

View File

@ -13,12 +13,15 @@
*/ */
package metadata.etl.kafka; package metadata.etl.kafka;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData;
import wherehows.common.schemas.ClusterInfo;
import wherehows.common.schemas.GobblinTrackingDistcpNgRecord; import wherehows.common.schemas.GobblinTrackingDistcpNgRecord;
import wherehows.common.schemas.Record; import wherehows.common.schemas.Record;
import wherehows.common.utils.ClusterUtil;
/** /**
@ -51,7 +54,7 @@ public class GobblinTrackingDistcpNgProcessor extends KafkaConsumerProcessor {
final Map<String, String> metadata = (Map<String, String>) record.get("metadata"); final Map<String, String> metadata = (Map<String, String>) record.get("metadata");
final String jobContext = "DistcpNG:" + name; final String jobContext = "DistcpNG:" + name;
final String cluster = parseClusterIdentifier(metadata.get("clusterIdentifier")).get("cluster"); final String cluster = ClusterUtil.matchClusterCode(metadata.get("clusterIdentifier"));
final String projectName = metadata.get("azkabanProjectName"); final String projectName = metadata.get("azkabanProjectName");
final String flowId = metadata.get("azkabanFlowId"); final String flowId = metadata.get("azkabanFlowId");
final String jobId = metadata.get("azkabanJobId"); final String jobId = metadata.get("azkabanJobId");

View File

@ -19,6 +19,7 @@ import java.util.regex.Pattern;
import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData;
import wherehows.common.schemas.GobblinTrackingLumosRecord; import wherehows.common.schemas.GobblinTrackingLumosRecord;
import wherehows.common.schemas.Record; import wherehows.common.schemas.Record;
import wherehows.common.utils.ClusterUtil;
public class GobblinTrackingLumosProcessor extends KafkaConsumerProcessor { public class GobblinTrackingLumosProcessor extends KafkaConsumerProcessor {
@ -59,7 +60,7 @@ public class GobblinTrackingLumosProcessor extends KafkaConsumerProcessor {
logger.info("Processing Gobblin tracking event record: " + name + ", timestamp: " + timestamp); logger.info("Processing Gobblin tracking event record: " + name + ", timestamp: " + timestamp);
final String jobContext = "Lumos:" + name; final String jobContext = "Lumos:" + name;
final String cluster = parseClusterIdentifier(metadata.get("clusterIdentifier")).get("cluster"); final String cluster = ClusterUtil.matchClusterCode(metadata.get("clusterIdentifier"));
final String projectName = metadata.get("azkabanProjectName"); final String projectName = metadata.get("azkabanProjectName");
final String flowId = metadata.get("azkabanFlowId"); final String flowId = metadata.get("azkabanFlowId");
final String jobId = metadata.get("azkabanJobId"); final String jobId = metadata.get("azkabanJobId");

View File

@ -13,10 +13,6 @@
*/ */
package metadata.etl.kafka; package metadata.etl.kafka;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -31,11 +27,6 @@ public abstract class KafkaConsumerProcessor {
protected static final Logger logger = LoggerFactory.getLogger(KafkaConsumerProcessor.class); protected static final Logger logger = LoggerFactory.getLogger(KafkaConsumerProcessor.class);
// for 'ltx1-holdemrm01.grid.linkedin.com:8032', extract 'ltx1', 'holdem'
private final String ClusterIdentifierRegex = "^(\\w{4})-(\\w+)\\w{2}\\d{2}\\.grid.*$";
private final Pattern ClusterIdentifierPattern = Pattern.compile(ClusterIdentifierRegex);
/** /**
* Abstract method 'process' to be implemented by specific processor * Abstract method 'process' to be implemented by specific processor
* input Kafka record, process information and write to DB. * input Kafka record, process information and write to DB.
@ -59,24 +50,4 @@ public abstract class KafkaConsumerProcessor {
} }
} }
/**
* Parse cluster identifier to get datacenter and cluster
* if length > 10 and in the form of 'ltx1-holdemrm01.grid.linkedin.com:8032', extract 'ltx1', 'holdem'
* otherwise, put the original String as cluster
* @param text String
* @return Map<String, String>
*/
protected Map<String, String> parseClusterIdentifier(String text) {
final Map<String, String> map = new HashMap<>(4);
if (text.length() > 10) {
final Matcher m = ClusterIdentifierPattern.matcher(text);
if (m.find()) {
map.put("datacenter", m.group(1));
map.put("cluster", m.group(2));
}
} else {
map.put("cluster", text);
}
return map;
}
} }

View File

@ -16,6 +16,7 @@ package metadata.etl.kafka;
import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData;
import wherehows.common.schemas.MetastoreAuditRecord; import wherehows.common.schemas.MetastoreAuditRecord;
import wherehows.common.schemas.Record; import wherehows.common.schemas.Record;
import wherehows.common.utils.ClusterUtil;
public class MetastoreAuditProcessor extends KafkaConsumerProcessor { public class MetastoreAuditProcessor extends KafkaConsumerProcessor {
@ -35,28 +36,28 @@ public class MetastoreAuditProcessor extends KafkaConsumerProcessor {
logger.info("Processing Metastore Audit event record."); logger.info("Processing Metastore Audit event record.");
final GenericData.Record auditHeader = (GenericData.Record) record.get("auditHeader"); final GenericData.Record auditHeader = (GenericData.Record) record.get("auditHeader");
final String server = utf8ToString(auditHeader.get("server")); final String server = ClusterUtil.matchClusterCode(utf8ToString(auditHeader.get("server")));
final String instance = utf8ToString(auditHeader.get("instance")); final String instance = utf8ToString(auditHeader.get("instance"));
final String appName = utf8ToString(auditHeader.get("appName")); final String appName = utf8ToString(auditHeader.get("appName"));
String eventName; String eventName;
GenericData.Record content; GenericData.Record content;
final Object oldOne; final Object oldInfo;
final Object newOne; final Object newInfo;
// check if it is MetastoreTableAuditEvent // check if it is MetastoreTableAuditEvent
if (record.get("metastoreTableAuditContent") != null) { if (record.get("metastoreTableAuditContent") != null) {
eventName = "MetastoreTableAuditEvent"; eventName = "MetastoreTableAuditEvent";
content = (GenericData.Record) record.get("metastoreTableAuditContent"); content = (GenericData.Record) record.get("metastoreTableAuditContent");
oldOne = content.get("oldTable"); oldInfo = content.get("oldTable");
newOne = content.get("newTable"); newInfo = content.get("newTable");
} }
// check if it is MetastorePartitionAuditEvent // check if it is MetastorePartitionAuditEvent
else if (record.get("metastorePartitionAuditContent") != null) { else if (record.get("metastorePartitionAuditContent") != null) {
eventName = "MetastorePartitionAuditEvent"; eventName = "MetastorePartitionAuditEvent";
content = (GenericData.Record) record.get("metastorePartitionAuditContent"); content = (GenericData.Record) record.get("metastorePartitionAuditContent");
oldOne = content.get("oldPartition"); oldInfo = content.get("oldPartition");
newOne = content.get("newPartition"); newInfo = content.get("newPartition");
} }
else { else {
throw new IllegalArgumentException("Unknown Metastore Audit event: " + record); throw new IllegalArgumentException("Unknown Metastore Audit event: " + record);
@ -70,7 +71,7 @@ public class MetastoreAuditProcessor extends KafkaConsumerProcessor {
final String isDataDeleted = utf8ToString(content.get("isDataDeleted")); final String isDataDeleted = utf8ToString(content.get("isDataDeleted"));
// use newOne, if null, use oldOne // use newOne, if null, use oldOne
final GenericData.Record rec = newOne != null ? (GenericData.Record) newOne : (GenericData.Record) oldOne; final GenericData.Record rec = newInfo != null ? (GenericData.Record) newInfo : (GenericData.Record) oldInfo;
final String dbName = utf8ToString(rec.get("dbName")); final String dbName = utf8ToString(rec.get("dbName"));
final String tableName = utf8ToString(rec.get("tableName")); final String tableName = utf8ToString(rec.get("tableName"));
final String partition = utf8ToString(rec.get("values")); final String partition = utf8ToString(rec.get("values"));
@ -84,8 +85,8 @@ public class MetastoreAuditProcessor extends KafkaConsumerProcessor {
// set null partition to '?' for primary key // set null partition to '?' for primary key
eventRecord.setTableInfo(dbName, tableName, (partition != null ? partition : "?"), eventRecord.setTableInfo(dbName, tableName, (partition != null ? partition : "?"),
location, owner, createTime, lastAccessTime); location, owner, createTime, lastAccessTime);
eventRecord.setOldOne(utf8ToString(oldOne)); eventRecord.setOldInfo(utf8ToString(oldInfo));
eventRecord.setNewOne(utf8ToString(newOne)); eventRecord.setNewInfo(utf8ToString(newInfo));
} }
return eventRecord; return eventRecord;
} }

View File

@ -29,7 +29,7 @@ public enum EtlJobName {
ELASTICSEARCH_EXECUTION_INDEX_ETL(EtlType.OPERATION, RefIdType.APP), ELASTICSEARCH_EXECUTION_INDEX_ETL(EtlType.OPERATION, RefIdType.APP),
TREEBUILDER_EXECUTION_DATASET_ETL(EtlType.OPERATION, RefIdType.APP), TREEBUILDER_EXECUTION_DATASET_ETL(EtlType.OPERATION, RefIdType.APP),
ORACLE_DATASET_METADATA_ETL(EtlType.DATASET, RefIdType.APP), ORACLE_DATASET_METADATA_ETL(EtlType.DATASET, RefIdType.APP),
KAFKA_CONSUMER_GOBBLIN_ETL(EtlType.DATASET, RefIdType.APP), KAFKA_CONSUMER_ETL(EtlType.OPERATION, RefIdType.DB),
; ;
EtlType etlType; EtlType etlType;

View File

@ -199,7 +199,6 @@ class OracleExtract:
''' '''
schema_dict = {"fields": []} schema_dict = {"fields": []}
table_record = {} table_record = {}
field_record = {}
table_idx = 0 table_idx = 0
field_idx = 0 field_idx = 0
@ -211,14 +210,18 @@ class OracleExtract:
# This is a new table. Let's push the previous table record into output_list # This is a new table. Let's push the previous table record into output_list
if 'urn' in table_record: if 'urn' in table_record:
schema_dict["num_fields"] = field_idx schema_dict["num_fields"] = field_idx
table_record['columns'] = schema_dict table_record["columns"] = json.dumps(schema_dict)
self.table_output_list.append(table_record) self.table_output_list.append(table_record)
properties = {
"indexes": self.table_dict[table_name_key].get("indexes"),
"partition_column": self.table_dict[table_name_key].get("partition_column")
}
table_record = { table_record = {
"name": row[1], "name": row[1],
"columns": {}, "columns": None,
"schema_type": "JSON", "schema_type": "JSON",
"properties": self.table_dict[table_name_key], "properties": json.dumps(properties),
"urn": table_urn, "urn": table_urn,
"source": "Oracle", "source": "Oracle",
"location_prefix": row[0], "location_prefix": row[0],
@ -249,7 +252,7 @@ class OracleExtract:
# finish all remaining rows # finish all remaining rows
schema_dict["num_fields"] = field_idx schema_dict["num_fields"] = field_idx
table_record['columns'] = schema_dict table_record["columns"] = json.dumps(schema_dict)
self.table_output_list.append(table_record) self.table_output_list.append(table_record)
self.logger.info("%d Table records generated" % table_idx) self.logger.info("%d Table records generated" % table_idx)
@ -304,7 +307,17 @@ class OracleExtract:
return None return None
def trim_newline(self, line): def trim_newline(self, line):
return None if line is None else line.replace('\n', ' ').replace('\r', ' ') return None if line is None else line.replace('\n', ' ').replace('\r', ' ').encode('ascii', 'ignore')
def write_csv(self, csv_filename, csv_columns, data_list):
csvfile = open(csv_filename, 'wb')
os.chmod(csv_filename, 0644)
writer = csv.DictWriter(csvfile, fieldnames=csv_columns, delimiter='\x1A', lineterminator='\n',
quoting=csv.QUOTE_NONE, quotechar='\1', escapechar='\0')
writer.writeheader()
for data in data_list:
writer.writerow(data)
csvfile.close()
def run(self, database_name, table_name, table_output_file, field_output_file, sample_output_file, sample=False): def run(self, database_name, table_name, table_output_file, field_output_file, sample_output_file, sample=False):
@ -323,34 +336,25 @@ class OracleExtract:
begin = datetime.datetime.now().strftime("%H:%M:%S") begin = datetime.datetime.now().strftime("%H:%M:%S")
# table info # table info
rows = self.get_table_info(None, None) rows = self.get_table_info(None, None)
self.get_extra_table_info()
self.format_table_metadata(rows) self.format_table_metadata(rows)
end = datetime.datetime.now().strftime("%H:%M:%S") end = datetime.datetime.now().strftime("%H:%M:%S")
self.logger.info("Collecting table info [%s -> %s]" % (str(begin), str(end))) self.logger.info("Collecting table info [%s -> %s]" % (str(begin), str(end)))
csv_columns = ['name', 'columns', 'schema_type', 'properties', 'urn', 'source', 'location_prefix', csv_columns = ['name', 'columns', 'schema_type', 'properties', 'urn', 'source', 'location_prefix',
'parent_name', 'storage_type', 'dataset_type', 'is_partitioned'] 'parent_name', 'storage_type', 'dataset_type', 'is_partitioned']
csvfile = open(table_output_file, 'wb') self.write_csv(table_output_file, csv_columns, self.table_output_list)
os.chmod(table_output_file, 0666)
writer = csv.DictWriter(csvfile, fieldnames=csv_columns, delimiter='\x1A', lineterminator='\n')
writer.writeheader()
for data in self.table_output_list:
writer.writerow(data)
csvfile.close
csv_columns = ['dataset_urn', 'sort_id', 'name', 'data_type', 'nullable', csv_columns = ['dataset_urn', 'sort_id', 'name', 'data_type', 'nullable',
'size', 'precision', 'scale', 'default_value', 'doc'] 'size', 'precision', 'scale', 'default_value', 'doc']
csvfile = open(field_output_file, 'wb') self.write_csv(field_output_file, csv_columns, self.field_output_list)
os.chmod(field_output_file, 0666)
writer = csv.DictWriter(csvfile, fieldnames=csv_columns, delimiter='\x1A', lineterminator='\n')
writer.writeheader()
for data in self.field_output_list:
writer.writerow(data)
csvfile.close
if sample: if sample:
csvfile = open(sample_output_file, 'wb') csvfile = open(sample_output_file, 'wb')
os.chmod(sample_output_file, 0666) os.chmod(sample_output_file, 0666)
writer = csv.DictWriter(csvfile, fieldnames=csv_columns, delimiter='\x1A', lineterminator='\n') writer = csv.DictWriter(csvfile, fieldnames=csv_columns, delimiter='\x1A', lineterminator='\n',
quoting=csv.QUOTE_NONE, quotechar='\1', escapechar='\0')
self.logger.info("Writing to CSV file {}".format(sample_output_file))
# collect sample data # collect sample data
for onedatabase in schema: for onedatabase in schema:

View File

@ -15,7 +15,7 @@
from com.ziclix.python.sql import zxJDBC from com.ziclix.python.sql import zxJDBC
from wherehows.common import Constant from wherehows.common import Constant
from org.slf4j import LoggerFactory from org.slf4j import LoggerFactory
import datetime import sys, os, datetime
class OracleLoad: class OracleLoad:
@ -39,10 +39,19 @@ class OracleLoad:
lock_wait_time = args[Constant.INNODB_LOCK_WAIT_TIMEOUT] lock_wait_time = args[Constant.INNODB_LOCK_WAIT_TIMEOUT]
self.conn_cursor.execute("SET innodb_lock_wait_timeout = %s;" % lock_wait_time) self.conn_cursor.execute("SET innodb_lock_wait_timeout = %s;" % lock_wait_time)
self.logger.info("Load Oracle Metadata into {}, db_id {}, wh_exec_id {}"
.format(JDBC_URL, self.db_id, self.wh_etl_exec_id))
self.dict_dataset_table = 'dict_dataset'
self.field_comments_table = 'field_comments'
self.dict_field_table = 'dict_field_detail'
self.dict_field_comment_table = 'dict_dataset_field_comment'
self.dict_dataset_sample_table = 'dict_dataset_sample'
def load_tables(self): def load_tables(self):
load_tables_cmd = ''' load_tables_cmd = '''
DELETE FROM stg_dict_dataset WHERE db_id = '{db_id}'; DELETE FROM stg_dict_dataset WHERE db_id = {db_id};
-- load into stg table -- load into stg table
LOAD DATA LOCAL INFILE '{source_file}' LOAD DATA LOCAL INFILE '{source_file}'
@ -55,7 +64,7 @@ class OracleLoad:
wh_etl_exec_id = {wh_etl_exec_id}; wh_etl_exec_id = {wh_etl_exec_id};
-- insert into final table -- insert into final table
INSERT INTO dict_dataset INSERT INTO {dict_dataset}
( `name`, ( `name`,
`schema`, `schema`,
schema_type, schema_type,
@ -97,16 +106,18 @@ class OracleLoad:
modified_time=UNIX_TIMESTAMP(now()), wh_etl_exec_id=s.wh_etl_exec_id modified_time=UNIX_TIMESTAMP(now()), wh_etl_exec_id=s.wh_etl_exec_id
; ;
analyze table dict_dataset; analyze table {dict_dataset};
'''.format(source_file=self.input_table_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id) '''.format(source_file=self.input_table_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id,
dict_dataset=self.dict_dataset_table)
self.executeCommands(load_tables_cmd) self.executeCommands(load_tables_cmd)
self.logger.info("finish loading oracle table metadata") self.logger.info("finish loading oracle table metadata from {} to {}"
.format(self.input_table_file, self.dict_dataset_table))
def load_fields(self): def load_fields(self):
load_fields_cmd = ''' load_fields_cmd = '''
DELETE FROM stg_dict_field_detail where db_id = '{db_id}'; DELETE FROM stg_dict_field_detail where db_id = {db_id};
LOAD DATA LOCAL INFILE '{source_file}' LOAD DATA LOCAL INFILE '{source_file}'
INTO TABLE stg_dict_field_detail INTO TABLE stg_dict_field_detail
@ -132,13 +143,13 @@ class OracleLoad:
and (char_length(trim(description)) = 0 and (char_length(trim(description)) = 0
or description in ('null', 'N/A', 'nothing', 'empty', 'none')); or description in ('null', 'N/A', 'nothing', 'empty', 'none'));
insert into field_comments ( insert into {field_comments} (
user_id, comment, created, modified, comment_crc32_checksum user_id, comment, created, modified, comment_crc32_checksum
) )
select 0 user_id, description, now() created, now() modified, crc32(description) from select 0 user_id, description, now() created, now() modified, crc32(description) from
( (
select sf.description select sf.description
from stg_dict_field_detail sf left join field_comments fc from stg_dict_field_detail sf left join {field_comments} fc
on sf.description = fc.comment on sf.description = fc.comment
where sf.description is not null where sf.description is not null
and fc.id is null and fc.id is null
@ -146,40 +157,41 @@ class OracleLoad:
group by 1 order by 1 group by 1 order by 1
) d; ) d;
analyze table field_comments; analyze table {field_comments};
-- delete old record if it does not exist in this load batch anymore (but have the dataset id) -- delete old record if it does not exist in this load batch anymore (but have the dataset id)
create temporary table if not exists t_deleted_fields (primary key (field_id)) create temporary table if not exists t_deleted_fields (primary key (field_id))
select x.field_id select x.field_id
from stg_dict_field_detail s from stg_dict_field_detail s
join dict_dataset i join {dict_dataset} i
on s.urn = i.urn on s.urn = i.urn
and s.db_id = {db_id} and s.db_id = {db_id}
right join dict_field_detail x right join {dict_field_detail} x
on i.id = x.dataset_id on i.id = x.dataset_id
and s.field_name = x.field_name and s.field_name = x.field_name
and s.parent_path = x.parent_path and s.parent_path = x.parent_path
where s.field_name is null where s.field_name is null
and x.dataset_id in ( and x.dataset_id in (
select d.id dataset_id select d.id dataset_id
from stg_dict_field_detail k join dict_dataset d from stg_dict_field_detail k join {dict_dataset} d
on k.urn = d.urn on k.urn = d.urn
and k.db_id = {db_id} and k.db_id = {db_id}
) )
; -- run time : ~2min ; -- run time : ~2min
delete from dict_field_detail where field_id in (select field_id from t_deleted_fields); delete from {dict_field_detail} where field_id in (select field_id from t_deleted_fields);
-- update the old record if some thing changed -- update the old record if some thing changed
update dict_field_detail t join update {dict_field_detail} t join
( (
select x.field_id, s.* select x.field_id, s.*
from stg_dict_field_detail s join dict_dataset d from stg_dict_field_detail s
join {dict_dataset} d
on s.urn = d.urn on s.urn = d.urn
join dict_field_detail x join {dict_field_detail} x
on s.field_name = x.field_name on s.field_name = x.field_name
and coalesce(s.parent_path, '*') = coalesce(x.parent_path, '*') and coalesce(s.parent_path, '*') = coalesce(x.parent_path, '*')
and d.id = x.dataset_id and d.id = x.dataset_id
where s.db_id = {db_id} where s.db_id = {db_id}
and (x.sort_id <> s.sort_id and (x.sort_id <> s.sort_id
or x.parent_sort_id <> s.parent_sort_id or x.parent_sort_id <> s.parent_sort_id
@ -207,24 +219,23 @@ class OracleLoad:
t.modified = now() t.modified = now()
; ;
insert into dict_field_detail ( insert into {dict_field_detail} (
dataset_id, fields_layout_id, sort_id, parent_sort_id, parent_path, dataset_id, fields_layout_id, sort_id, parent_sort_id, parent_path,
field_name, namespace, data_type, data_size, is_nullable, default_value, field_name, namespace, data_type, data_size, is_nullable, default_value, modified
modified
) )
select select
d.id, 0, sf.sort_id, sf.parent_sort_id, sf.parent_path, d.id, 0, sf.sort_id, sf.parent_sort_id, sf.parent_path,
sf.field_name, sf.namespace, sf.data_type, sf.data_size, sf.is_nullable, sf.default_value, now() sf.field_name, sf.namespace, sf.data_type, sf.data_size, sf.is_nullable, sf.default_value, now()
from stg_dict_field_detail sf join dict_dataset d from stg_dict_field_detail sf join {dict_dataset} d
on sf.urn = d.urn on sf.urn = d.urn
left join dict_field_detail t left join {dict_field_detail} t
on d.id = t.dataset_id on d.id = t.dataset_id
and sf.field_name = t.field_name and sf.field_name = t.field_name
and sf.parent_path = t.parent_path and sf.parent_path = t.parent_path
where db_id = {db_id} and t.field_id is null where db_id = {db_id} and t.field_id is null
; ;
analyze table dict_field_detail; analyze table {dict_field_detail};
-- delete old record in stagging -- delete old record in stagging
delete from stg_dict_dataset_field_comment where db_id = {db_id}; delete from stg_dict_dataset_field_comment where db_id = {db_id};
@ -232,36 +243,40 @@ class OracleLoad:
-- insert -- insert
insert into stg_dict_dataset_field_comment insert into stg_dict_dataset_field_comment
select t.field_id field_id, fc.id comment_id, d.id dataset_id, {db_id} select t.field_id field_id, fc.id comment_id, d.id dataset_id, {db_id}
from stg_dict_field_detail sf join dict_dataset d from stg_dict_field_detail sf join {dict_dataset} d
on sf.urn = d.urn on sf.urn = d.urn
join field_comments fc join {field_comments} fc
on sf.description = fc.comment on sf.description = fc.comment
join dict_field_detail t join {dict_field_detail} t
on d.id = t.dataset_id on d.id = t.dataset_id
and sf.field_name = t.field_name and sf.field_name = t.field_name
and sf.parent_path = t.parent_path and sf.parent_path = t.parent_path
where sf.db_id = {db_id}; where sf.db_id = {db_id};
-- have default comment, insert it set default to 0 -- have default comment, insert it set default to 0
insert ignore into dict_dataset_field_comment insert ignore into {dict_dataset_field_comment}
select field_id, comment_id, dataset_id, 0 is_default from stg_dict_dataset_field_comment where field_id in ( select field_id, comment_id, dataset_id, 0 is_default from stg_dict_dataset_field_comment where field_id in (
select field_id from dict_dataset_field_comment select field_id from {dict_dataset_field_comment}
where field_id in (select field_id from stg_dict_dataset_field_comment) where field_id in (select field_id from stg_dict_dataset_field_comment)
and is_default = 1 ) and db_id = {db_id}; and is_default = 1 ) and db_id = {db_id};
-- doesn't have this comment before, insert into it and set as default -- doesn't have this comment before, insert into it and set as default
insert ignore into dict_dataset_field_comment insert ignore into {dict_dataset_field_comment}
select sd.field_id, sd.comment_id, sd.dataset_id, 1 from stg_dict_dataset_field_comment sd select sd.field_id, sd.comment_id, sd.dataset_id, 1
left join dict_dataset_field_comment d from stg_dict_dataset_field_comment sd
on d.field_id = sd.field_id left join {dict_dataset_field_comment} d
and d.comment_id = sd.comment_id on d.field_id = sd.field_id
and d.comment_id = sd.comment_id
where d.comment_id is null where d.comment_id is null
and sd.db_id = {db_id}; and sd.db_id = {db_id};
'''.format(source_file=self.input_field_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id) '''.format(source_file=self.input_field_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id,
dict_dataset=self.dict_dataset_table, dict_field_detail=self.dict_field_table,
field_comments=self.field_comments_table, dict_dataset_field_comment=self.dict_field_comment_table)
self.executeCommands(load_fields_cmd) self.executeCommands(load_fields_cmd)
self.logger.info("finish loading oracle table fields") self.logger.info("finish loading oracle table fields from {} to {}"
.format(self.input_field_file, self.dict_field_table))
def load_sample(self): def load_sample(self):
@ -277,12 +292,12 @@ class OracleLoad:
-- update reference id in stagging table -- update reference id in stagging table
UPDATE stg_dict_dataset_sample s UPDATE stg_dict_dataset_sample s
LEFT JOIN dict_dataset d ON s.ref_urn = d.urn LEFT JOIN {dict_dataset} d ON s.ref_urn = d.urn
SET s.ref_id = d.id SET s.ref_id = d.id
WHERE s.db_id = {db_id}; WHERE s.db_id = {db_id};
-- first insert ref_id as 0 -- first insert ref_id as 0
INSERT INTO dict_dataset_sample INSERT INTO {dict_dataset_sample}
( `dataset_id`, ( `dataset_id`,
`urn`, `urn`,
`ref_id`, `ref_id`,
@ -290,20 +305,22 @@ class OracleLoad:
created created
) )
select d.id as dataset_id, s.urn, s.ref_id, s.data, now() select d.id as dataset_id, s.urn, s.ref_id, s.data, now()
from stg_dict_dataset_sample s left join dict_dataset d on d.urn = s.urn from stg_dict_dataset_sample s left join {dict_dataset} d on d.urn = s.urn
where s.db_id = {db_id} where s.db_id = {db_id}
on duplicate key update on duplicate key update
`data`=s.data, modified=now(); `data`=s.data, modified=now();
-- update reference id in final table -- update reference id in final table
UPDATE dict_dataset_sample d UPDATE {dict_dataset_sample} d
RIGHT JOIN stg_dict_dataset_sample s ON d.urn = s.urn RIGHT JOIN stg_dict_dataset_sample s ON d.urn = s.urn
SET d.ref_id = s.ref_id SET d.ref_id = s.ref_id
WHERE s.db_id = {db_id} AND d.ref_id = 0; WHERE s.db_id = {db_id} AND d.ref_id = 0;
'''.format(source_file=self.input_sample_file, db_id=self.db_id) '''.format(source_file=self.input_sample_file, db_id=self.db_id,
dict_dataset=self.dict_dataset_table, dict_dataset_sample=self.dict_dataset_sample_table)
self.executeCommands(load_sample_cmd) self.executeCommands(load_sample_cmd)
self.logger.info("finish loading oracle sample data") self.logger.info("finish loading oracle sample data from {} to {}"
.format(self.input_sample_file, self.dict_dataset_sample_table))
def executeCommands(self, commands): def executeCommands(self, commands):

View File

@ -0,0 +1,57 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.schemas;
public class ClusterInfo {
int clusterId;
String clusterCode;
String clusterShortName;
String datacenterCode;
String clusterType;
String deploymentTierCode;
public ClusterInfo(int clusterId, String clusterCode, String clusterShortName, String datacenterCode,
String clusterType, String deploymentTierCode) {
this.clusterId = clusterId;
this.clusterCode = clusterCode;
this.clusterShortName = clusterShortName;
this.datacenterCode = datacenterCode;
this.clusterType = clusterType;
this.deploymentTierCode = deploymentTierCode;
}
public int getClusterId() {
return clusterId;
}
public String getClusterCode() {
return clusterCode;
}
public String getClusterShortName() {
return clusterShortName;
}
public String getDatacenterCode() {
return datacenterCode;
}
public String getClusterType() {
return clusterType;
}
public String getDeploymentTierCode() {
return deploymentTierCode;
}
}

View File

@ -227,19 +227,19 @@ public class MetastoreAuditRecord extends AbstractRecord {
this.lastAccessTime = lastAccessTime; this.lastAccessTime = lastAccessTime;
} }
public String getOldOne() { public String getOldInfo() {
return oldInfo; return oldInfo;
} }
public void setOldOne(String oldOne) { public void setOldInfo(String oldInfo) {
this.oldInfo = oldOne; this.oldInfo = oldInfo;
} }
public String getNewOne() { public String getNewInfo() {
return newInfo; return newInfo;
} }
public void setNewOne(String newOne) { public void setNewInfo(String newInfo) {
this.newInfo = newOne; this.newInfo = newInfo;
} }
} }

View File

@ -0,0 +1,63 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.utils;
import java.util.ArrayList;
import java.util.List;
import wherehows.common.schemas.ClusterInfo;
public class ClusterUtil {
private static final List<ClusterInfo> CLUSTER_LIST = new ArrayList<>();
public static void updateClusterInfo(List<ClusterInfo> clusterList) {
if (clusterList != null && clusterList.size() > 0) {
CLUSTER_LIST.clear();
CLUSTER_LIST.addAll(clusterList);
}
}
/**
* match the clusterName to the list of cluster info, return the matching clusterCode
* @param clusterName String
* @return clusterCode
*/
public static String matchClusterCode(String clusterName) {
if (clusterName == null) {
return null;
}
// first use cluster code to find match
for (ClusterInfo cluster : CLUSTER_LIST) {
if (clusterName.contains(cluster.getClusterCode())) {
return cluster.getClusterCode();
}
}
// second round use cluster short name to find match
for (ClusterInfo cluster : CLUSTER_LIST) {
if (clusterName.contains(cluster.getClusterShortName())) {
return cluster.getClusterCode();
}
}
// no match found, return original string
return clusterName;
}
public static List<ClusterInfo> getClusterList() {
return CLUSTER_LIST;
}
}