mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-29 02:48:24 +00:00
Remove now-obsolete Kafka-related code from wherehows-backend (#738)
This commit is contained in:
parent
fbf518d9ee
commit
082e882268
@ -42,7 +42,4 @@ public class ActorRegistry {
|
||||
|
||||
public static ActorRef treeBuilderActor =
|
||||
actorSystem.actorOf(Props.create(TreeBuilderActor.class), "TreeBuilderActor");
|
||||
|
||||
public static ActorRef kafkaConsumerMaster =
|
||||
actorSystem.actorOf(Props.create(KafkaConsumerMaster.class), "KafkaMaster");
|
||||
}
|
||||
|
||||
@ -1,151 +0,0 @@
|
||||
/**
|
||||
* Copyright 2015 LinkedIn Corp. All rights reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
*/
|
||||
package actors;
|
||||
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.Props;
|
||||
import akka.actor.UntypedActor;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import kafka.consumer.Consumer;
|
||||
import kafka.consumer.ConsumerConfig;
|
||||
import kafka.consumer.KafkaStream;
|
||||
import kafka.javaapi.consumer.ConsumerConnector;
|
||||
import models.daos.ClusterDao;
|
||||
import models.kafka.KafkaConfig;
|
||||
import models.kafka.KafkaConfig.Topic;
|
||||
import msgs.KafkaCommMsg;
|
||||
import play.Logger;
|
||||
import play.Play;
|
||||
import play.db.DB;
|
||||
import wherehows.common.PathAnalyzer;
|
||||
import wherehows.common.kafka.schemaregistry.client.CachedSchemaRegistryClient;
|
||||
import wherehows.common.kafka.schemaregistry.client.SchemaRegistryClient;
|
||||
import wherehows.common.utils.ClusterUtil;
|
||||
import wherehows.common.utils.JobsUtil;
|
||||
|
||||
|
||||
/**
|
||||
* Akka actor responsible for managing Kafka workers
|
||||
*/
|
||||
public class KafkaConsumerMaster extends UntypedActor {
|
||||
|
||||
public static final String ETL_JOBS_DIR = Play.application().configuration().getString("etl.jobs.dir");
|
||||
|
||||
private static final String KAFKA_JOB_TYPE = "kafka";
|
||||
|
||||
// Map of kafka job name to properties
|
||||
private static Map<String, Properties> _kafkaJobList;
|
||||
// Map of kafka job name to configs
|
||||
private static Map<String, KafkaConfig> _kafkaConfigs = new HashMap<>();
|
||||
|
||||
@Override
|
||||
public void preStart() throws Exception {
|
||||
_kafkaJobList = JobsUtil.getEnabledJobsByType(ETL_JOBS_DIR, KAFKA_JOB_TYPE);
|
||||
Logger.info("Kafka jobs: {}", _kafkaJobList.keySet());
|
||||
|
||||
if (_kafkaJobList.size() == 0) {
|
||||
context().stop(getSelf());
|
||||
return;
|
||||
}
|
||||
Logger.info("Start KafkaConsumerMaster...");
|
||||
|
||||
for (Map.Entry<String, Properties> entry : _kafkaJobList.entrySet()) {
|
||||
final String kafkaJobName = entry.getKey();
|
||||
try {
|
||||
// handle 1 kafka connection
|
||||
final KafkaConfig kafkaConfig = new KafkaConfig();
|
||||
kafkaConfig.updateKafkaProperties(kafkaJobName, entry.getValue());
|
||||
final Properties kafkaProps = kafkaConfig.getProperties();
|
||||
final Map<String, Topic> kafkaTopics = kafkaConfig.getTopics();
|
||||
|
||||
kafkaConfig.updateTopicProcessor();
|
||||
|
||||
// create Kafka consumer connector
|
||||
Logger.info("Create Kafka Consumer with config: " + kafkaProps.toString());
|
||||
final SchemaRegistryClient schemaRegistryClient =
|
||||
new CachedSchemaRegistryClient((String) kafkaProps.get("schemaRegistryUrl"));
|
||||
final ConsumerConfig cfg = new ConsumerConfig(kafkaProps);
|
||||
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(cfg);
|
||||
|
||||
// create Kafka message streams
|
||||
final Map<String, Integer> topicCountMap = new HashMap<>();
|
||||
for (Topic topic : kafkaTopics.values()) {
|
||||
topicCountMap.put(topic.topic, topic.numOfWorkers);
|
||||
}
|
||||
|
||||
final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
|
||||
consumerConnector.createMessageStreams(topicCountMap);
|
||||
|
||||
// add config to kafka config map
|
||||
kafkaConfig.setSchemaRegistryClient(schemaRegistryClient);
|
||||
kafkaConfig.setConsumer(consumerConnector);
|
||||
_kafkaConfigs.put(kafkaJobName, kafkaConfig);
|
||||
|
||||
// create workers to handle each message stream
|
||||
for (String topic : kafkaTopics.keySet()) {
|
||||
final List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
|
||||
|
||||
int threadNumber = 0;
|
||||
for (final KafkaStream<byte[], byte[]> stream : streams) {
|
||||
ActorRef childActor = getContext().actorOf(
|
||||
Props.create(KafkaConsumerWorker.class, topic, threadNumber, stream, schemaRegistryClient,
|
||||
kafkaConfig.getProcessorClass(topic), kafkaConfig.getProcessorMethod(topic),
|
||||
kafkaConfig.getDbWriter(topic)));
|
||||
|
||||
childActor.tell("Start", getSelf());
|
||||
threadNumber++;
|
||||
}
|
||||
}
|
||||
Logger.info("Initiate Kafka consumer job " + kafkaJobName + " with topics " + kafkaTopics.keySet());
|
||||
} catch (Exception e) {
|
||||
Logger.error("Initiating Kafka properties on startup fail, job name: " + kafkaJobName, e);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
// get list of cluster information from database and update ClusterUtil
|
||||
ClusterUtil.updateClusterInfo(ClusterDao.getClusterInfo());
|
||||
} catch (Exception e) {
|
||||
Logger.error("Fail to fetch cluster info from DB ", e);
|
||||
}
|
||||
|
||||
try {
|
||||
// initialize PathAnalyzer
|
||||
PathAnalyzer.initialize(DB.getConnection("wherehows"));
|
||||
} catch (Exception e) {
|
||||
Logger.error("Fail to initialize PathAnalyzer from DB wherehows.", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onReceive(Object message) throws Exception {
|
||||
if (message instanceof KafkaCommMsg) {
|
||||
final KafkaCommMsg kafkaCommMsg = (KafkaCommMsg) message;
|
||||
Logger.debug(kafkaCommMsg.toString());
|
||||
} else {
|
||||
unhandled(message);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postStop() {
|
||||
Logger.info("Terminating KafkaConsumerMaster...");
|
||||
for (KafkaConfig config : _kafkaConfigs.values()) {
|
||||
config.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1,107 +0,0 @@
|
||||
/**
|
||||
* Copyright 2015 LinkedIn Corp. All rights reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
*/
|
||||
package actors;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.sql.SQLException;
|
||||
import org.apache.avro.generic.GenericData;
|
||||
import org.apache.kafka.common.serialization.Deserializer;
|
||||
import kafka.consumer.ConsumerIterator;
|
||||
import kafka.consumer.KafkaStream;
|
||||
import kafka.message.MessageAndMetadata;
|
||||
|
||||
import akka.actor.UntypedActor;
|
||||
import msgs.KafkaCommMsg;
|
||||
import play.Logger;
|
||||
import wherehows.common.kafka.schemaregistry.client.SchemaRegistryClient;
|
||||
import wherehows.common.kafka.serializers.KafkaAvroDeserializer;
|
||||
import wherehows.common.schemas.AbstractRecord;
|
||||
import wherehows.common.writers.DatabaseWriter;
|
||||
|
||||
|
||||
/**
|
||||
* Akka actor for listening to a Kafka topic and waiting for messages.
|
||||
*/
|
||||
public class KafkaConsumerWorker extends UntypedActor {
|
||||
private final String _topic;
|
||||
private final int _threadId;
|
||||
private final KafkaStream<byte[], byte[]> _kafkaStream;
|
||||
private final SchemaRegistryClient _schemaRegistryRestfulClient;
|
||||
private final Object _processorClass;
|
||||
private final Method _processorMethod;
|
||||
private final DatabaseWriter _dbWriter;
|
||||
private int _receivedRecordCount;
|
||||
private int _processedRecordCount;
|
||||
|
||||
public KafkaConsumerWorker(String topic, int threadNumber,
|
||||
KafkaStream<byte[], byte[]> stream, SchemaRegistryClient schemaRegistryRestfulClient,
|
||||
Object processorClass, Method processorMethod, DatabaseWriter dbWriter) {
|
||||
this._topic = topic;
|
||||
this._threadId = threadNumber;
|
||||
this._kafkaStream = stream;
|
||||
this._schemaRegistryRestfulClient = schemaRegistryRestfulClient;
|
||||
this._processorClass = processorClass;
|
||||
this._processorMethod = processorMethod;
|
||||
this._dbWriter = dbWriter;
|
||||
this._receivedRecordCount = 0; // number of received kafka messages
|
||||
this._processedRecordCount = 0; // number of processed records
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onReceive(Object message) throws Exception {
|
||||
if (message.equals("Start")) {
|
||||
Logger.info("Starting Thread: " + _threadId + " for topic: " + _topic);
|
||||
final ConsumerIterator<byte[], byte[]> it = _kafkaStream.iterator();
|
||||
final Deserializer<Object> avroDeserializer = new KafkaAvroDeserializer(_schemaRegistryRestfulClient);
|
||||
|
||||
while (it.hasNext()) { // block for next input
|
||||
_receivedRecordCount++;
|
||||
|
||||
try {
|
||||
MessageAndMetadata<byte[], byte[]> msg = it.next();
|
||||
GenericData.Record kafkaMsgRecord = (GenericData.Record) avroDeserializer.deserialize(_topic, msg.message());
|
||||
// Logger.debug("Kafka worker ThreadId " + _threadId + " Topic " + _topic + " record: " + rec);
|
||||
|
||||
// invoke processor
|
||||
final AbstractRecord record = (AbstractRecord) _processorMethod.invoke(
|
||||
_processorClass, kafkaMsgRecord, _topic);
|
||||
|
||||
// save record to database
|
||||
if (record != null) {
|
||||
_dbWriter.append(record);
|
||||
// _dbWriter.close();
|
||||
_dbWriter.insert();
|
||||
_processedRecordCount++;
|
||||
}
|
||||
} catch (InvocationTargetException ite) {
|
||||
Logger.error("Processing topic " + _topic + " record error: " + ite.getCause()
|
||||
+ " - " + ite.getTargetException());
|
||||
} catch (SQLException | IOException e) {
|
||||
Logger.error("Error while inserting event record: ", e);
|
||||
} catch (Throwable ex) {
|
||||
Logger.error("Error in notify order. ", ex);
|
||||
}
|
||||
|
||||
if (_receivedRecordCount % 1000 == 0) {
|
||||
Logger.debug(_topic + " received " + _receivedRecordCount + " processed " + _processedRecordCount);
|
||||
}
|
||||
}
|
||||
Logger.info("Shutting down Thread: " + _threadId + " for topic: " + _topic);
|
||||
} else {
|
||||
unhandled(message);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1,206 +0,0 @@
|
||||
/**
|
||||
* Copyright 2015 LinkedIn Corp. All rights reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
*/
|
||||
package models.kafka;
|
||||
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import controllers.Application;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import models.daos.DatasetDao;
|
||||
import models.daos.DatasetInfoDao;
|
||||
import org.apache.avro.generic.GenericData;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import wherehows.common.schemas.DatasetRecord;
|
||||
import wherehows.common.schemas.Record;
|
||||
import wherehows.common.utils.StringUtil;
|
||||
import play.libs.Json;
|
||||
import play.Logger;
|
||||
import wherehows.dao.table.DatasetClassificationDao;
|
||||
import wherehows.models.table.DatasetClassification;
|
||||
|
||||
|
||||
public class GobblinTrackingAuditProcessor extends KafkaConsumerProcessor {
|
||||
|
||||
private static final String DALI_LIMITED_RETENTION_AUDITOR = "DaliLimitedRetentionAuditor";
|
||||
private static final String DALI_AUTOPURGED_AUDITOR = "DaliAutoPurgeAuditor";
|
||||
private static final String DS_IGNORE_IDPC_AUDITOR = "DsIgnoreIDPCAuditor";
|
||||
private static final String METADATA_FILE_CLASSIFIER = "MetadataFileClassifier";
|
||||
private static final String DATASET_URN_PREFIX = "hdfs://";
|
||||
private static final String DATASET_OWNER_SOURCE = "IDPC";
|
||||
|
||||
private DatasetClassificationDao datasetClassificationDao;
|
||||
|
||||
public GobblinTrackingAuditProcessor() {
|
||||
this.datasetClassificationDao = Application.DAO_FACTORY.getDatasetClassificationDao();
|
||||
}
|
||||
|
||||
// TODO: Make these regex patterns part of job file
|
||||
private static final Pattern LOCATION_PREFIX_PATTERN = Pattern.compile("/[^/]+(/[^/]+)?");
|
||||
|
||||
private static final Pattern SHORT_NAME_PATTERN = Pattern.compile("(/[^/]+/[^/]+)$");
|
||||
|
||||
private static final List<Pattern> PARENT_PATTERNS =
|
||||
ImmutableList.<Pattern>builder().add(Pattern.compile("/data/external/gobblin/(.+)"))
|
||||
.add(Pattern.compile("/data/(databases|dbchange|external)/.+"))
|
||||
.add(Pattern.compile("/([^/]*data)/tracking/.+"))
|
||||
.add(Pattern.compile("/([^/]*data)/derived/.+"))
|
||||
.add(Pattern.compile("/(data)/service/.+"))
|
||||
.add(Pattern.compile("/([^/]+)/.+"))
|
||||
.build();
|
||||
|
||||
private static final List<Pattern> BLACKLISTED_DATASET_PATTERNS =
|
||||
ImmutableList.<Pattern>builder().add(Pattern.compile("(\\b|_)temporary(\\b|_)"))
|
||||
.add(Pattern.compile("(\\b|_)temp(\\b|_)"))
|
||||
.add(Pattern.compile("(\\b|_)tmp(\\b|_)"))
|
||||
.add(Pattern.compile("(\\b|_)staging(\\b|_)"))
|
||||
.add(Pattern.compile("(\\b|_)stg(\\b|_)"))
|
||||
.add(Pattern.compile("_distcp_"))
|
||||
.add(Pattern.compile("/output/"))
|
||||
.build();
|
||||
|
||||
/**
|
||||
* Process a Gobblin tracking event audit record
|
||||
* @param record
|
||||
* @param topic
|
||||
* @return null
|
||||
* @throws Exception
|
||||
*/
|
||||
public Record process(GenericData.Record record, String topic) throws Exception {
|
||||
|
||||
if (record == null || record.get("name") == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final String name = record.get("name").toString();
|
||||
// only handle "DaliLimitedRetentionAuditor","DaliAutoPurgeAuditor" and "DsIgnoreIDPCAuditor"
|
||||
if (name.equals(DALI_LIMITED_RETENTION_AUDITOR) || name.equals(DALI_AUTOPURGED_AUDITOR) || name.equals(
|
||||
DS_IGNORE_IDPC_AUDITOR)) {
|
||||
// TODO: Re-enable this once it's fixed.
|
||||
//updateKafkaDatasetOwner(record);
|
||||
} else if (name.equals(METADATA_FILE_CLASSIFIER)) {
|
||||
updateHdfsDatasetSchema(record);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private void updateKafkaDatasetOwner(GenericData.Record record) throws Exception {
|
||||
Long timestamp = (Long) record.get("timestamp");
|
||||
Map<String, String> metadata = StringUtil.convertObjectMapToStringMap(record.get("metadata"));
|
||||
|
||||
String hasError = metadata.get("HasError");
|
||||
if (!hasError.equalsIgnoreCase("true")) {
|
||||
String datasetPath = metadata.get("DatasetPath");
|
||||
String datasetUrn = DATASET_URN_PREFIX + (datasetPath.startsWith("/") ? "" : "/") + datasetPath;
|
||||
String ownerUrns = metadata.get("OwnerURNs");
|
||||
DatasetInfoDao.updateKafkaDatasetOwner(datasetUrn, ownerUrns, DATASET_OWNER_SOURCE, timestamp);
|
||||
}
|
||||
}
|
||||
|
||||
private void updateHdfsDatasetSchema(GenericData.Record record) throws Exception {
|
||||
Long timestamp = (Long) record.get("timestamp");
|
||||
Map<String, String> metadata = StringUtil.convertObjectMapToStringMap(record.get("metadata"));
|
||||
|
||||
String datasetName = metadata.get("dataset");
|
||||
if (StringUtils.isEmpty(datasetName) || isDatasetNameBlacklisted(datasetName)) {
|
||||
Logger.info("Skipped processing metadata event for dataset {}", datasetName);
|
||||
return;
|
||||
}
|
||||
|
||||
DatasetRecord dataset = new DatasetRecord();
|
||||
String urn = DATASET_URN_PREFIX + datasetName;
|
||||
dataset.setName(getShortName(datasetName));
|
||||
dataset.setUrn(urn);
|
||||
dataset.setSchema(metadata.get("schema"));
|
||||
dataset.setSchemaType("JSON");
|
||||
dataset.setSource("Hdfs");
|
||||
dataset.setParentName(getParentName(datasetName));
|
||||
dataset.setDatasetType("hdfs");
|
||||
dataset.setIsActive(true);
|
||||
dataset.setSourceModifiedTime(getsourceModifiedTime(metadata.get("modificationTime")));
|
||||
|
||||
Matcher matcher = LOCATION_PREFIX_PATTERN.matcher(datasetName);
|
||||
if (matcher.lookingAt()) {
|
||||
dataset.setLocationPrefix(matcher.group());
|
||||
}
|
||||
|
||||
ObjectNode properties = Json.newObject();
|
||||
properties.put("owner", metadata.get("owner"));
|
||||
properties.put("group", metadata.get("group"));
|
||||
properties.put("file_permission", metadata.get("permission"));
|
||||
properties.put("codec", metadata.get("codec"));
|
||||
properties.put("storage", metadata.get("storage"));
|
||||
properties.put("cluster", metadata.get("cluster"));
|
||||
properties.put("abstract_path", metadata.get("abstractPath"));
|
||||
dataset.setProperties(new ObjectMapper().writeValueAsString(properties));
|
||||
|
||||
Logger.info("Updating dataset {}", datasetName);
|
||||
DatasetDao.setDatasetRecord(dataset);
|
||||
|
||||
String classificationResult = metadata.get("classificationResult");
|
||||
if (classificationResult != null && !classificationResult.equals("null")) {
|
||||
updateDatasetClassificationResult(urn, classificationResult);
|
||||
} else {
|
||||
logger.warn("skip insertion since classification result is empty");
|
||||
}
|
||||
}
|
||||
|
||||
private void updateDatasetClassificationResult(String urn, String classificationResult) {
|
||||
try {
|
||||
DatasetClassification record = new DatasetClassification(urn, classificationResult, new Date());
|
||||
datasetClassificationDao.update(record);
|
||||
} catch (Exception e) {
|
||||
logger.info("unable to update classification result due to {}", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isDatasetNameBlacklisted(String datasetName) {
|
||||
for (Pattern pattern : BLACKLISTED_DATASET_PATTERNS) {
|
||||
if (pattern.matcher(datasetName).find()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private String getShortName(String datasetName) {
|
||||
Matcher matcher = SHORT_NAME_PATTERN.matcher(datasetName);
|
||||
if (matcher.find()) {
|
||||
return matcher.group();
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
private String getParentName(String datasetName) {
|
||||
for (Pattern pattern : PARENT_PATTERNS) {
|
||||
Matcher matcher = pattern.matcher(datasetName);
|
||||
if (matcher.matches()) {
|
||||
return matcher.group();
|
||||
}
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
private String getsourceModifiedTime(String hdfsModifiedTime) {
|
||||
if (hdfsModifiedTime == null) {
|
||||
return null;
|
||||
}
|
||||
return Long.toString(Long.parseLong(hdfsModifiedTime) / 1000);
|
||||
}
|
||||
}
|
||||
@ -1,105 +0,0 @@
|
||||
/**
|
||||
* Copyright 2015 LinkedIn Corp. All rights reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
*/
|
||||
package models.kafka;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import org.apache.avro.generic.GenericData;
|
||||
import wherehows.common.schemas.GobblinTrackingCompactionRecord;
|
||||
import wherehows.common.schemas.Record;
|
||||
import wherehows.common.utils.ClusterUtil;
|
||||
import wherehows.common.utils.StringUtil;
|
||||
|
||||
|
||||
/**
|
||||
* Kafka message processor for Gobblin tracking event compaction topic
|
||||
*/
|
||||
public class GobblinTrackingCompactionProcessor extends KafkaConsumerProcessor {
|
||||
|
||||
// for data tracking urn, such as '/data/tracking/ab-c/daily-dedup/2016/...'
|
||||
// cut into 3 pieces: dataset + partitionType + partition
|
||||
private final String UrnRegex = "^(\\/\\w+\\/\\w+\\/[\\w-]+)\\/([\\w-]+)\\/(\\d+.*)$";
|
||||
private final Pattern UrnPattern = Pattern.compile(UrnRegex);
|
||||
|
||||
/**
|
||||
* Process a Gobblin tracking event compaction record
|
||||
* @param record
|
||||
* @param topic
|
||||
* @return Record
|
||||
* @throws Exception
|
||||
*/
|
||||
@Override
|
||||
public Record process(GenericData.Record record, String topic)
|
||||
throws Exception {
|
||||
GobblinTrackingCompactionRecord eventRecord = null;
|
||||
|
||||
// only handle namespace "compaction.tracking.events"
|
||||
if (record != null && record.get("namespace") != null && record.get("name") != null
|
||||
&& "compaction.tracking.events".equals(record.get("namespace").toString())) {
|
||||
final String name = record.get("name").toString();
|
||||
|
||||
// for event name "CompactionCompleted" or "CompactionRecordCounts"
|
||||
if (name.equals("CompactionCompleted") || name.equals("CompactionRecordCounts")) {
|
||||
// logger.info("Processing Gobblin tracking event record: " + name);
|
||||
final long timestamp = (long) record.get("timestamp");
|
||||
final Map<String, String> metadata = StringUtil.convertObjectMapToStringMap(record.get("metadata"));
|
||||
|
||||
final String jobContext = "Gobblin:" + name;
|
||||
final String cluster = ClusterUtil.matchClusterCode(metadata.get("clusterIdentifier"));
|
||||
// final String cluster = parseClusterIdentifier(metadata.get("clusterIdentifier")).get("cluster");
|
||||
final String projectName = metadata.get("azkabanProjectName");
|
||||
final String flowId = metadata.get("azkabanFlowId");
|
||||
final String jobId = metadata.get("azkabanJobId");
|
||||
final int execId = Integer.parseInt(metadata.get("azkabanExecId"));
|
||||
|
||||
// final String metricContextId = metadata.get("metricContextID");
|
||||
// final String metricContextName = metadata.get("metricContextName");
|
||||
final String dedupeStatus = metadata.get("dedupeStatus");
|
||||
|
||||
String dataset = null;
|
||||
String partitionType = null;
|
||||
String partitionName = null;
|
||||
long recordCount = 0;
|
||||
long lateRecordCount = 0;
|
||||
|
||||
if (name.equals("CompactionCompleted")) {
|
||||
dataset = metadata.get("datasetUrn");
|
||||
partitionName = metadata.get("partition");
|
||||
recordCount = StringUtil.parseLong(metadata.get("recordCount"));
|
||||
}
|
||||
// name = "CompactionRecordCounts"
|
||||
else {
|
||||
final Matcher m = UrnPattern.matcher(metadata.get("DatasetOutputPath"));
|
||||
if (m.find()) {
|
||||
dataset = m.group(1);
|
||||
partitionType = m.group(2);
|
||||
partitionName = m.group(3);
|
||||
}
|
||||
|
||||
recordCount = StringUtil.parseLong(metadata.get("RegularRecordCount"));
|
||||
lateRecordCount = StringUtil.parseLong(metadata.get("LateRecordCount"));
|
||||
}
|
||||
|
||||
eventRecord =
|
||||
new GobblinTrackingCompactionRecord(timestamp, jobContext, cluster, projectName, flowId, jobId, execId);
|
||||
eventRecord.setDatasetUrn(dataset, partitionType, partitionName);
|
||||
eventRecord.setRecordCount(recordCount);
|
||||
eventRecord.setLateRecordCount(lateRecordCount);
|
||||
eventRecord.setDedupeStatus(dedupeStatus);
|
||||
}
|
||||
}
|
||||
return eventRecord;
|
||||
}
|
||||
}
|
||||
@ -1,96 +0,0 @@
|
||||
/**
|
||||
* Copyright 2015 LinkedIn Corp. All rights reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
*/
|
||||
package models.kafka;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import org.apache.avro.generic.GenericData;
|
||||
import wherehows.common.schemas.GobblinTrackingDistcpNgRecord;
|
||||
import wherehows.common.schemas.Record;
|
||||
import wherehows.common.utils.ClusterUtil;
|
||||
import wherehows.common.utils.StringUtil;
|
||||
|
||||
|
||||
/**
|
||||
* Kafka message processor for Gobblin tracking event distcp_ng topic
|
||||
*/
|
||||
public class GobblinTrackingDistcpNgProcessor extends KafkaConsumerProcessor {
|
||||
|
||||
// for data path hdfs://ltx1-holdemnn01.grid.linkedin.com:9000/data/tracking/abc/hourly/2016/07/10/00/part-2560590.avro
|
||||
// cut into 4 pieces: cluster + dataset + partitionType + partition
|
||||
private final String PathRegex = "^\\w+:\\/\\/(\\w+-\\w+\\.grid.*:\\d+)\\/(.*)\\/([\\w-]+)\\/(\\d+.*\\/\\d+)\\/\\w.*$";
|
||||
private final Pattern PathPattern = Pattern.compile(PathRegex);
|
||||
|
||||
/**
|
||||
* Process a Gobblin tracking event distcp_ng event record
|
||||
* @param record GenericData.Record
|
||||
* @param topic
|
||||
* @return Record
|
||||
* @throws Exception
|
||||
*/
|
||||
@Override
|
||||
public Record process(GenericData.Record record, String topic)
|
||||
throws Exception {
|
||||
GobblinTrackingDistcpNgRecord eventRecord = null;
|
||||
|
||||
// handle namespace "gobblin.copy.CopyDataPublisher"
|
||||
if (record != null && record.get("namespace") != null && record.get("name") != null
|
||||
&& "gobblin.copy.CopyDataPublisher".equals(record.get("namespace").toString())) {
|
||||
final String name = record.get("name").toString();
|
||||
|
||||
if (name.equals("DatasetPublished")) { // || name.equals("FilePublished")) {
|
||||
// logger.info("Processing Gobblin tracking event record: " + name);
|
||||
final long timestamp = (long) record.get("timestamp");
|
||||
final Map<String, String> metadata = StringUtil.convertObjectMapToStringMap(record.get("metadata"));
|
||||
|
||||
final String jobContext = "DistcpNG:" + name;
|
||||
final String cluster = ClusterUtil.matchClusterCode(metadata.get("clusterIdentifier"));
|
||||
final String projectName = metadata.get("azkabanProjectName");
|
||||
final String flowId = metadata.get("azkabanFlowId");
|
||||
final String jobId = metadata.get("azkabanJobId");
|
||||
final int execId = StringUtil.parseInteger(metadata.get("azkabanExecId"));
|
||||
// final String metricContextId = metadata.get("metricContextID");
|
||||
// final String metricContextName = metadata.get("metricContextName");
|
||||
|
||||
final long upstreamTimestamp = StringUtil.parseLong(metadata.get("upstreamTimestamp"));
|
||||
final long originTimestamp = StringUtil.parseLong(metadata.get("originTimestamp"));
|
||||
final String sourcePath = metadata.get("SourcePath");
|
||||
final String targetPath = metadata.get("TargetPath");
|
||||
|
||||
final String dataset = metadata.get("datasetUrn");
|
||||
String partitionType = null;
|
||||
String partitionName = null;
|
||||
|
||||
if (name.equals("DatasetPublished")) {
|
||||
partitionName = metadata.get("partition");
|
||||
}
|
||||
// name "FilePublished"
|
||||
else {
|
||||
final Matcher m = PathPattern.matcher(targetPath);
|
||||
if (m.find()) {
|
||||
partitionType = m.group(3);
|
||||
partitionName = m.group(4);
|
||||
}
|
||||
}
|
||||
|
||||
eventRecord =
|
||||
new GobblinTrackingDistcpNgRecord(timestamp, jobContext, cluster, projectName, flowId, jobId, execId);
|
||||
eventRecord.setDatasetUrn(dataset, partitionType, partitionName);
|
||||
eventRecord.setEventInfo(upstreamTimestamp, originTimestamp, sourcePath, targetPath);
|
||||
}
|
||||
}
|
||||
return eventRecord;
|
||||
}
|
||||
}
|
||||
@ -1,168 +0,0 @@
|
||||
/**
|
||||
* Copyright 2015 LinkedIn Corp. All rights reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
*/
|
||||
package models.kafka;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import org.apache.avro.generic.GenericData;
|
||||
import wherehows.common.schemas.GobblinTrackingLumosRecord;
|
||||
import wherehows.common.schemas.Record;
|
||||
import wherehows.common.utils.ClusterUtil;
|
||||
import wherehows.common.utils.StringUtil;
|
||||
|
||||
|
||||
public class GobblinTrackingLumosProcessor extends KafkaConsumerProcessor {
|
||||
|
||||
// dissect datasourceColo 'prod-lva1' into two parts: developmentEnv and datacenter
|
||||
private final String DatasourceColoRegex = "(\\w+)-(\\w+)";
|
||||
private final Pattern DatasourceColoPattern = Pattern.compile(DatasourceColoRegex);
|
||||
|
||||
// get partition from directory
|
||||
private final String DirectoryPartitionRegex = "^.*\\/(\\d+-\\w+-\\d+)\\/.*$";
|
||||
private final Pattern DirectoryPartitionPattern = Pattern.compile(DirectoryPartitionRegex);
|
||||
|
||||
// regular partition pattern, 146xxxx-ww-dddd
|
||||
final String RegularPartitionRegex = "146\\d{7,10}-\\w+-\\d+";
|
||||
final Pattern RegularPartitionPattern = Pattern.compile(RegularPartitionRegex);
|
||||
|
||||
// get Epoch time from Partition, 146xxxxxxxxxx-ww-dddd
|
||||
private final String PartitionEpochRegex = "(\\d+)-\\w+-\\d+";
|
||||
private final Pattern PartitionEpochPattern = Pattern.compile(PartitionEpochRegex);
|
||||
|
||||
/**
|
||||
* Process a Gobblin tracking event lumos record
|
||||
* @param record
|
||||
* @param topic
|
||||
* @return Record
|
||||
* @throws Exception
|
||||
*/
|
||||
@Override
|
||||
public Record process(GenericData.Record record, String topic)
|
||||
throws Exception {
|
||||
GobblinTrackingLumosRecord eventRecord = null;
|
||||
|
||||
if (record != null && record.get("namespace") != null && record.get("name") != null) {
|
||||
final String name = record.get("name").toString();
|
||||
|
||||
// only handle "DeltaPublished" and "SnapshotPublished"
|
||||
if (name.equals("DeltaPublished") || name.equals("SnapshotPublished")) {
|
||||
final long timestamp = (long) record.get("timestamp");
|
||||
final Map<String, String> metadata = StringUtil.convertObjectMapToStringMap(record.get("metadata"));
|
||||
// logger.info("Processing Gobblin tracking event record: " + name + ", timestamp: " + timestamp);
|
||||
|
||||
final String jobContext = "Lumos:" + name;
|
||||
final String cluster = ClusterUtil.matchClusterCode(metadata.get("clusterIdentifier"));
|
||||
final String projectName = metadata.get("azkabanProjectName");
|
||||
final String flowId = metadata.get("azkabanFlowId");
|
||||
final String jobId = metadata.get("azkabanJobId");
|
||||
final int execId = Integer.parseInt(metadata.get("azkabanExecId"));
|
||||
// final String metricContextId = metadata.get("metricContextID");
|
||||
// final String metricContextName = metadata.get("metricContextName");
|
||||
|
||||
final String dataset = metadata.get("datasetUrn");
|
||||
final String targetDirectory = metadata.get("TargetDirectory");
|
||||
|
||||
final String datasourceColo = metadata.get("DatasourceColo");
|
||||
final String sourceDatabase = metadata.get("Database");
|
||||
final String sourceTable = metadata.get("Table");
|
||||
String datacenter = null;
|
||||
String devEnv = null;
|
||||
final Matcher sourceColoMatcher = DatasourceColoPattern.matcher(datasourceColo);
|
||||
if (sourceColoMatcher.find()) {
|
||||
datacenter = sourceColoMatcher.group(2);
|
||||
devEnv = sourceColoMatcher.group(1);
|
||||
} else {
|
||||
datacenter = datasourceColo;
|
||||
}
|
||||
|
||||
final long recordCount = StringUtil.parseLong(metadata.get("recordCount"));
|
||||
|
||||
final String partitionType = "snapshot";
|
||||
final String partition = metadata.get("partition");
|
||||
String partitionName = null;
|
||||
String subpartitionType = null;
|
||||
String subpartitionName = null;
|
||||
|
||||
final long dropdate = StringUtil.parseLong(metadata.get("Dropdate"));
|
||||
long maxDataDateEpoch3 = dropdate;
|
||||
long maxDataKey = 0; // if field is null, default value 0
|
||||
if (!isPartitionRegular(partition)) {
|
||||
maxDataKey = StringUtil.parseLong(getPartitionEpoch(partition));
|
||||
}
|
||||
|
||||
// handle name 'SnapshotPublished'
|
||||
if (name.equals("SnapshotPublished")) {
|
||||
partitionName = partition;
|
||||
if (dropdate < 1460000000000L) {
|
||||
maxDataDateEpoch3 = StringUtil.parseLong(getPartitionEpoch(targetDirectory));
|
||||
}
|
||||
}
|
||||
// handle name 'DeltaPublished'
|
||||
else {
|
||||
partitionName = partitionFromTargetDirectory(targetDirectory);
|
||||
subpartitionType = "_delta";
|
||||
subpartitionName = partition;
|
||||
if (dropdate < 1460000000000L) {
|
||||
maxDataDateEpoch3 = StringUtil.parseLong(getPartitionEpoch(subpartitionName));
|
||||
}
|
||||
}
|
||||
|
||||
eventRecord =
|
||||
new GobblinTrackingLumosRecord(timestamp, cluster, jobContext, projectName, flowId, jobId, execId);
|
||||
eventRecord.setDatasetUrn(dataset, targetDirectory, partitionType, partitionName, subpartitionType,
|
||||
subpartitionName);
|
||||
eventRecord.setMaxDataDate(maxDataDateEpoch3, maxDataKey);
|
||||
eventRecord.setSource(datacenter, devEnv, sourceDatabase, sourceTable);
|
||||
eventRecord.setRecordCount(recordCount);
|
||||
}
|
||||
}
|
||||
return eventRecord;
|
||||
}
|
||||
|
||||
/**
|
||||
* get partition name from targetDirectory for delta published
|
||||
* @param targetDirectory String
|
||||
* @return String partitionName or null
|
||||
*/
|
||||
private String partitionFromTargetDirectory(String targetDirectory) {
|
||||
final Matcher m = DirectoryPartitionPattern.matcher(targetDirectory);
|
||||
if (m.find()) {
|
||||
return m.group(1);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* get epoch time from partition first part
|
||||
* @param partition String
|
||||
* @return String
|
||||
*/
|
||||
private String getPartitionEpoch(String partition) {
|
||||
final Matcher m = PartitionEpochPattern.matcher(partition);
|
||||
if (m.find()) {
|
||||
return m.group(1);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* check if partition is in the form of 146xxxxxxxxxx-ww-dddd
|
||||
* @param partition
|
||||
* @return boolean
|
||||
*/
|
||||
private boolean isPartitionRegular(String partition) {
|
||||
return RegularPartitionPattern.matcher(partition).find();
|
||||
}
|
||||
}
|
||||
@ -1,45 +0,0 @@
|
||||
/**
|
||||
* Copyright 2015 LinkedIn Corp. All rights reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
*/
|
||||
package models.kafka;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import models.daos.LineageDao;
|
||||
import org.apache.avro.generic.GenericData;
|
||||
import play.Logger;
|
||||
import wherehows.common.schemas.Record;
|
||||
|
||||
|
||||
public class JobExecutionLineageProcessor extends KafkaConsumerProcessor {
|
||||
|
||||
/**
|
||||
* Process a JobExecutionLineageEvent record
|
||||
* @param record GenericData.Record
|
||||
* @param topic String
|
||||
* @throws Exception
|
||||
* @return null
|
||||
*/
|
||||
public Record process(GenericData.Record record, String topic)
|
||||
throws Exception {
|
||||
if (record != null) {
|
||||
// Logger.info("Processing Job Execution Lineage Event record. ");
|
||||
|
||||
final GenericData.Record auditHeader = (GenericData.Record) record.get("auditHeader");
|
||||
|
||||
final JsonNode rootNode = new ObjectMapper().readTree(record.toString());
|
||||
LineageDao.updateJobExecutionLineage(rootNode);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@ -1,162 +0,0 @@
|
||||
/**
|
||||
* Copyright 2015 LinkedIn Corp. All rights reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
*/
|
||||
package models.kafka;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import kafka.javaapi.consumer.ConsumerConnector;
|
||||
import metadata.etl.models.EtlJobName;
|
||||
import org.apache.avro.generic.GenericData;
|
||||
import play.Logger;
|
||||
import utils.JdbcUtil;
|
||||
import wherehows.common.kafka.schemaregistry.client.SchemaRegistryClient;
|
||||
import wherehows.common.writers.DatabaseWriter;
|
||||
|
||||
|
||||
/**
|
||||
* Utilities for Kafka configurations and topics
|
||||
*/
|
||||
public class KafkaConfig {
|
||||
|
||||
/**
|
||||
* Class for storing Kafka Topic info
|
||||
*/
|
||||
public static class Topic {
|
||||
public String topic;
|
||||
public int numOfWorkers; // number of kafka consumer workers
|
||||
public String processor; // processor class to invoke
|
||||
public String dbTable; // Database table to write to
|
||||
|
||||
Topic(String topic, int numOfWorkers, String processor, String dbTable) {
|
||||
this.topic = topic;
|
||||
this.numOfWorkers = numOfWorkers;
|
||||
this.processor = processor;
|
||||
this.dbTable = dbTable;
|
||||
}
|
||||
}
|
||||
|
||||
private final Properties _props = new Properties();
|
||||
// Map of <topic_name, topic_content>
|
||||
private final Map<String, Topic> _topics = new HashMap<>();
|
||||
private final Map<String, Object> _topicProcessorClass = new HashMap<>();
|
||||
private final Map<String, Method> _topicProcessorMethod = new HashMap<>();
|
||||
private final Map<String, DatabaseWriter> _topicDbWriter = new HashMap<>();
|
||||
|
||||
private ConsumerConnector _consumer;
|
||||
private SchemaRegistryClient _schemaRegistryClient;
|
||||
|
||||
/**
|
||||
* Update Kafka properties and topics from job files
|
||||
* @throws Exception
|
||||
*/
|
||||
public void updateKafkaProperties(String jobName, Properties props)
|
||||
throws Exception {
|
||||
String[] topics = props.getProperty("kafka.topics").split("\\s*,\\s*");
|
||||
String[] processors = props.getProperty("kafka.processors").split("\\s*,\\s*");
|
||||
String[] dbTables = props.getProperty("kafka.db.tables").split("\\s*,\\s*");
|
||||
|
||||
_props.clear();
|
||||
_props.putAll(props);
|
||||
|
||||
_topics.clear();
|
||||
for (int i = 0; i < topics.length; i++) {
|
||||
// use 1 Kafka worker to handle each topic
|
||||
_topics.put(topics[i], new Topic(topics[i], 1, processors[i], dbTables[i]));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* update processor class, method and db writer for each topic
|
||||
*/
|
||||
public void updateTopicProcessor() {
|
||||
for (String topic : _topics.keySet()) {
|
||||
try {
|
||||
// get the processor class and method
|
||||
final Class processorClass = Class.forName(_topics.get(topic).processor);
|
||||
_topicProcessorClass.put(topic, processorClass.newInstance());
|
||||
|
||||
final Method method = processorClass.getDeclaredMethod("process", GenericData.Record.class, String.class);
|
||||
_topicProcessorMethod.put(topic, method);
|
||||
|
||||
// get the database writer
|
||||
final DatabaseWriter dw = new DatabaseWriter(JdbcUtil.wherehowsJdbcTemplate, _topics.get(topic).dbTable);
|
||||
_topicDbWriter.put(topic, dw);
|
||||
} catch (Exception e) {
|
||||
Logger.error("Fail to create Processor for topic: " + topic, e);
|
||||
_topicProcessorClass.remove(topic);
|
||||
_topicProcessorMethod.remove(topic);
|
||||
_topicDbWriter.remove(topic);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public Object getProcessorClass(String topic) {
|
||||
return _topicProcessorClass.get(topic);
|
||||
}
|
||||
|
||||
public Method getProcessorMethod(String topic) {
|
||||
return _topicProcessorMethod.get(topic);
|
||||
}
|
||||
|
||||
public DatabaseWriter getDbWriter(String topic) {
|
||||
return _topicDbWriter.get(topic);
|
||||
}
|
||||
|
||||
public Map<String, DatabaseWriter> getTopicDbWriters() {
|
||||
return _topicDbWriter;
|
||||
}
|
||||
|
||||
/**
|
||||
* close the Config
|
||||
*/
|
||||
public void close() {
|
||||
if (_consumer != null) {
|
||||
_consumer.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* get Kafka configuration
|
||||
* @return
|
||||
*/
|
||||
public Properties getProperties() {
|
||||
return _props;
|
||||
}
|
||||
|
||||
/**
|
||||
* get Kafka topics
|
||||
* @return
|
||||
*/
|
||||
public Map<String, Topic> getTopics() {
|
||||
return _topics;
|
||||
}
|
||||
|
||||
public ConsumerConnector getConsumer() {
|
||||
return _consumer;
|
||||
}
|
||||
|
||||
public void setConsumer(ConsumerConnector consumer) {
|
||||
_consumer = consumer;
|
||||
}
|
||||
|
||||
public SchemaRegistryClient getSchemaRegistryClient() {
|
||||
return _schemaRegistryClient;
|
||||
}
|
||||
|
||||
public void setSchemaRegistryClient(SchemaRegistryClient schemaRegistryClient) {
|
||||
_schemaRegistryClient = schemaRegistryClient;
|
||||
}
|
||||
}
|
||||
@ -1,40 +0,0 @@
|
||||
/**
|
||||
* Copyright 2015 LinkedIn Corp. All rights reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
*/
|
||||
package models.kafka;
|
||||
|
||||
import org.apache.avro.generic.GenericData;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import wherehows.common.schemas.Record;
|
||||
|
||||
|
||||
/**
|
||||
* Abstract class for Kafka consumer message processor.
|
||||
*
|
||||
*/
|
||||
public abstract class KafkaConsumerProcessor {
|
||||
|
||||
protected static final Logger logger = LoggerFactory.getLogger(KafkaConsumerProcessor.class);
|
||||
|
||||
/**
|
||||
* Abstract method 'process' to be implemented by specific processor
|
||||
* input Kafka record, process information and write to DB.
|
||||
* @param record GenericData.Record
|
||||
* @param topic
|
||||
* @return wherehows.common.schemas.Record
|
||||
* @throws Exception
|
||||
*/
|
||||
public abstract Record process(GenericData.Record record, String topic) throws Exception;
|
||||
|
||||
}
|
||||
@ -1,165 +0,0 @@
|
||||
/**
|
||||
* Copyright 2015 LinkedIn Corp. All rights reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
*/
|
||||
package models.kafka;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import models.daos.DatasetInfoDao;
|
||||
import org.apache.avro.generic.GenericData;
|
||||
import play.Logger;
|
||||
import wherehows.common.schemas.Record;
|
||||
|
||||
|
||||
public class MetadataChangeProcessor {
|
||||
|
||||
private final String[] CHANGE_ITEMS =
|
||||
{"schema", "owners", "datasetProperties", "references", "partitionSpec", "deploymentInfo", "tags",
|
||||
"constraints", "indices", "capacity", "privacyCompliancePolicy", "securitySpecification"};
|
||||
|
||||
/**
|
||||
* Process a MetadataChangeEvent record
|
||||
* @param record GenericData.Record
|
||||
* @param topic String
|
||||
* @throws Exception
|
||||
* @return null
|
||||
*/
|
||||
public Record process(GenericData.Record record, String topic)
|
||||
throws Exception {
|
||||
if (record != null) {
|
||||
Logger.debug("Processing Metadata Change Event record. ");
|
||||
|
||||
final GenericData.Record auditHeader = (GenericData.Record) record.get("auditHeader");
|
||||
if (auditHeader == null || auditHeader.get("time") == null) {
|
||||
Logger.info("MetadataChangeEvent without auditHeader, abort process. " + record.toString());
|
||||
return null;
|
||||
}
|
||||
|
||||
final GenericData.Record datasetIdentifier = (GenericData.Record) record.get("datasetIdentifier");
|
||||
final GenericData.Record datasetProperties = (GenericData.Record) record.get("datasetProperties");
|
||||
final String urn = String.valueOf(record.get("urn"));
|
||||
|
||||
if (urn == null && (datasetProperties == null || datasetProperties.get("uri") == null)
|
||||
&& datasetIdentifier == null) {
|
||||
Logger.info("Can't identify dataset from uri/urn/datasetIdentifier, abort process. " + record.toString());
|
||||
return null;
|
||||
} else if (urn != null) {
|
||||
Logger.debug("URN: " + urn);
|
||||
} else if (datasetProperties != null && datasetProperties.get("uri") != null) {
|
||||
Logger.debug("URI: " + datasetProperties.get("uri"));
|
||||
} else {
|
||||
Logger.debug(
|
||||
"Dataset Identifier: " + datasetIdentifier.get("dataPlatformUrn") + datasetIdentifier.get("nativeName"));
|
||||
}
|
||||
|
||||
final JsonNode rootNode = new ObjectMapper().readTree(record.toString());
|
||||
|
||||
for (String itemName : CHANGE_ITEMS) {
|
||||
// check if the corresponding change field has content
|
||||
if (record.get(itemName) == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
switch (itemName) {
|
||||
case "schema":
|
||||
try {
|
||||
DatasetInfoDao.updateDatasetSchema(rootNode);
|
||||
} catch (Exception ex) {
|
||||
Logger.debug("Metadata change exception: schema ", ex);
|
||||
}
|
||||
break;
|
||||
case "owners":
|
||||
try {
|
||||
DatasetInfoDao.updateDatasetOwner(rootNode);
|
||||
} catch (Exception ex) {
|
||||
Logger.debug("Metadata change exception: owner ", ex);
|
||||
}
|
||||
break;
|
||||
case "datasetProperties":
|
||||
try {
|
||||
DatasetInfoDao.updateDatasetCaseSensitivity(rootNode);
|
||||
} catch (Exception ex) {
|
||||
Logger.debug("Metadata change exception: case sensitivity ", ex);
|
||||
}
|
||||
break;
|
||||
case "references":
|
||||
try {
|
||||
DatasetInfoDao.updateDatasetReference(rootNode);
|
||||
} catch (Exception ex) {
|
||||
Logger.debug("Metadata change exception: reference ", ex);
|
||||
}
|
||||
break;
|
||||
case "partitionSpec":
|
||||
try {
|
||||
DatasetInfoDao.updateDatasetPartition(rootNode);
|
||||
} catch (Exception ex) {
|
||||
Logger.debug("Metadata change exception: partition ", ex);
|
||||
}
|
||||
break;
|
||||
case "deploymentInfo":
|
||||
try {
|
||||
DatasetInfoDao.updateDatasetDeployment(rootNode);
|
||||
} catch (Exception ex) {
|
||||
Logger.debug("Metadata change exception: deployment ", ex);
|
||||
}
|
||||
break;
|
||||
case "tags":
|
||||
try {
|
||||
DatasetInfoDao.updateDatasetTags(rootNode);
|
||||
} catch (Exception ex) {
|
||||
Logger.debug("Metadata change exception: tag ", ex);
|
||||
}
|
||||
break;
|
||||
case "constraints":
|
||||
try {
|
||||
DatasetInfoDao.updateDatasetConstraint(rootNode);
|
||||
} catch (Exception ex) {
|
||||
Logger.debug("Metadata change exception: constraint ", ex);
|
||||
}
|
||||
break;
|
||||
case "indices":
|
||||
try {
|
||||
DatasetInfoDao.updateDatasetIndex(rootNode);
|
||||
} catch (Exception ex) {
|
||||
Logger.debug("Metadata change exception: index ", ex);
|
||||
}
|
||||
break;
|
||||
case "capacity":
|
||||
try {
|
||||
DatasetInfoDao.updateDatasetCapacity(rootNode);
|
||||
} catch (Exception ex) {
|
||||
Logger.debug("Metadata change exception: capacity ", ex);
|
||||
}
|
||||
break;
|
||||
case "privacyCompliancePolicy":
|
||||
try {
|
||||
DatasetInfoDao.updateDatasetCompliance(rootNode);
|
||||
} catch (Exception ex) {
|
||||
Logger.debug("Metadata change exception: compliance ", ex);
|
||||
}
|
||||
break;
|
||||
case "securitySpecification":
|
||||
try {
|
||||
DatasetInfoDao.updateDatasetSecurity(rootNode);
|
||||
} catch (Exception ex) {
|
||||
Logger.debug("Metadata change exception: security ", ex);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@ -1,44 +0,0 @@
|
||||
/**
|
||||
* Copyright 2015 LinkedIn Corp. All rights reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
*/
|
||||
package models.kafka;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import models.daos.DatasetInfoDao;
|
||||
import org.apache.avro.generic.GenericData;
|
||||
import wherehows.common.schemas.Record;
|
||||
|
||||
|
||||
public class MetadataInventoryProcessor extends KafkaConsumerProcessor {
|
||||
|
||||
/**
|
||||
* Process a MetadataInventoryEvent record
|
||||
* @param record GenericData.Record
|
||||
* @param topic String
|
||||
* @throws Exception
|
||||
* @return null
|
||||
*/
|
||||
public Record process(GenericData.Record record, String topic)
|
||||
throws Exception {
|
||||
if (record != null) {
|
||||
// Logger.info("Processing Metadata Inventory Event record. ");
|
||||
|
||||
final GenericData.Record auditHeader = (GenericData.Record) record.get("auditHeader");
|
||||
|
||||
final JsonNode rootNode = new ObjectMapper().readTree(record.toString());
|
||||
DatasetInfoDao.updateDatasetInventory(rootNode);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@ -1,94 +0,0 @@
|
||||
/**
|
||||
* Copyright 2015 LinkedIn Corp. All rights reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
*/
|
||||
package models.kafka;
|
||||
|
||||
import org.apache.avro.generic.GenericData;
|
||||
import wherehows.common.schemas.MetastoreAuditRecord;
|
||||
import wherehows.common.schemas.Record;
|
||||
import wherehows.common.utils.ClusterUtil;
|
||||
import wherehows.common.utils.StringUtil;
|
||||
|
||||
|
||||
public class MetastoreAuditProcessor extends KafkaConsumerProcessor {
|
||||
|
||||
/**
|
||||
* Process a Metastore Table/Partition Audit event record
|
||||
* @param record
|
||||
* @param topic
|
||||
* @return Record
|
||||
* @throws Exception
|
||||
*/
|
||||
@Override
|
||||
public Record process(GenericData.Record record, String topic) throws Exception {
|
||||
MetastoreAuditRecord eventRecord = null;
|
||||
|
||||
// handle MetastoreTableAuditEvent and MetastorePartitionAuditEvent
|
||||
if (record != null) {
|
||||
// logger.info("Processing Metastore Audit event record.");
|
||||
|
||||
final GenericData.Record auditHeader = (GenericData.Record) record.get("auditHeader");
|
||||
final String server = ClusterUtil.matchClusterCode(String.valueOf(auditHeader.get("server")));
|
||||
final String instance = String.valueOf(auditHeader.get("instance"));
|
||||
final String appName = String.valueOf(auditHeader.get("appName"));
|
||||
|
||||
String eventName;
|
||||
GenericData.Record content;
|
||||
final Object oldInfo;
|
||||
final Object newInfo;
|
||||
|
||||
// check if it is MetastoreTableAuditEvent
|
||||
if (record.get("metastoreTableAuditContent") != null) {
|
||||
eventName = "MetastoreTableAuditEvent";
|
||||
content = (GenericData.Record) record.get("metastoreTableAuditContent");
|
||||
oldInfo = content.get("oldTable");
|
||||
newInfo = content.get("newTable");
|
||||
}
|
||||
// check if it is MetastorePartitionAuditEvent
|
||||
else if (record.get("metastorePartitionAuditContent") != null) {
|
||||
eventName = "MetastorePartitionAuditEvent";
|
||||
content = (GenericData.Record) record.get("metastorePartitionAuditContent");
|
||||
oldInfo = content.get("oldPartition");
|
||||
newInfo = content.get("newPartition");
|
||||
}
|
||||
else {
|
||||
throw new IllegalArgumentException("Unknown Metastore Audit event: " + record);
|
||||
}
|
||||
|
||||
final String eventType = String.valueOf(content.get("eventType"));
|
||||
final String metastoreThriftUri = String.valueOf(content.get("metastoreThriftUri"));
|
||||
final String metastoreVersion = StringUtil.toStringReplaceNull(content.get("metastoreVersion"), null);
|
||||
final long timestamp = (long) content.get("timestamp");
|
||||
final String isSuccessful = String.valueOf(content.get("isSuccessful"));
|
||||
final String isDataDeleted = String.valueOf(content.get("isDataDeleted"));
|
||||
|
||||
// use newOne, if null, use oldOne
|
||||
final GenericData.Record rec = newInfo != null ? (GenericData.Record) newInfo : (GenericData.Record) oldInfo;
|
||||
final String dbName = String.valueOf(rec.get("dbName"));
|
||||
final String tableName = String.valueOf(rec.get("tableName"));
|
||||
// set null / "null" partition to '?' for primary key
|
||||
final String partition = StringUtil.toStringReplaceNull(rec.get("values"), "?");
|
||||
final String location = StringUtil.toStringReplaceNull(rec.get("location"), null);
|
||||
final String owner = StringUtil.toStringReplaceNull(rec.get("owner"), null);
|
||||
final long createTime = (long) rec.get("createTime");
|
||||
final long lastAccessTime = (long) rec.get("lastAccessTime");
|
||||
|
||||
eventRecord = new MetastoreAuditRecord(server, instance, appName, eventName, eventType, timestamp);
|
||||
eventRecord.setEventInfo(metastoreThriftUri, metastoreVersion, isSuccessful, isDataDeleted);
|
||||
eventRecord.setTableInfo(dbName, tableName, partition, location, owner, createTime, lastAccessTime);
|
||||
// eventRecord.setOldInfo(StringUtil.toStringReplaceNull(oldInfo, null));
|
||||
// eventRecord.setNewInfo(StringUtil.toStringReplaceNull(newInfo, null));
|
||||
}
|
||||
return eventRecord;
|
||||
}
|
||||
}
|
||||
@ -1,82 +0,0 @@
|
||||
/**
|
||||
* Copyright 2015 LinkedIn Corp. All rights reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
*/
|
||||
package msgs;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
/**
|
||||
* Generic communication message between KafkaConsumerMaster and KafkaConsumerWorker
|
||||
*/
|
||||
public class KafkaCommMsg {
|
||||
|
||||
// Message type: AUDIT, AUDIT_RESPONSE, FLUSH, FLUSH_RESPONSE, HEARTBEAT, etc
|
||||
private String msgType;
|
||||
// Kafka topic of the worker
|
||||
private String topic;
|
||||
// Kafka worker thread number
|
||||
private int thread;
|
||||
// Message content
|
||||
private Map<String, Object> content;
|
||||
|
||||
public KafkaCommMsg(String msgType, String topic, int thread) {
|
||||
this.msgType = msgType;
|
||||
this.topic = topic;
|
||||
this.thread = thread;
|
||||
this.content = new HashMap<>();
|
||||
}
|
||||
|
||||
public String getMsgType() {
|
||||
return msgType;
|
||||
}
|
||||
|
||||
public void setMsgType(String msgType) {
|
||||
this.msgType = msgType;
|
||||
}
|
||||
|
||||
public String getTopic() {
|
||||
return topic;
|
||||
}
|
||||
|
||||
public void setTopic(String topic) {
|
||||
this.topic = topic;
|
||||
}
|
||||
|
||||
public int getThread() {
|
||||
return thread;
|
||||
}
|
||||
|
||||
public void setThread(int thread) {
|
||||
this.thread = thread;
|
||||
}
|
||||
|
||||
public Map<String, Object> getContent() {
|
||||
return content;
|
||||
}
|
||||
|
||||
public void setContent(Map<String, Object> content) {
|
||||
this.content = content;
|
||||
}
|
||||
|
||||
public void putContent(String key, Object value) {
|
||||
this.content.put(key, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "KafkaCommMsg [type=" + msgType + ", topic=" + topic + ", thread=" + thread + ", content="
|
||||
+ content.toString() + "]";
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user