Modify KafkaConsumerMaster to handle more than one kafka config, add error handling

This commit is contained in:
Yi Wang 2016-08-04 13:07:19 -07:00
parent ef584552be
commit c0cfe1f5ca
2 changed files with 161 additions and 98 deletions

View File

@ -17,7 +17,6 @@ import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
import java.io.IOException;
import java.lang.reflect.Method;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
@ -31,12 +30,10 @@ import kafka.javaapi.consumer.ConsumerConnector;
import metadata.etl.models.EtlJobName;
import models.daos.ClusterDao;
import models.daos.EtlJobDao;
import org.apache.avro.generic.GenericData;
import msgs.KafkaResponseMsg;
import play.Logger;
import play.Play;
import utils.JdbcUtil;
import utils.KafkaConfig;
import utils.KafkaConfig.Topic;
import wherehows.common.kafka.schemaregistry.client.CachedSchemaRegistryClient;
@ -47,111 +44,104 @@ import wherehows.common.writers.DatabaseWriter;
/**
* Akka actor responsible to manage Kafka workers and use kafka topic processor
* to deal with response messages
*
* Akka actor responsible for managing Kafka workers
*/
public class KafkaConsumerMaster extends UntypedActor {
// List of kafka job IDs
private static List<Integer> _kafkaJobList;
private static ConsumerConnector _consumer;
private static Properties _kafkaConfig;
private static Map<String, Topic> _kafkaTopics;
private final Map<String, Object> _topicProcessorClass;
private final Map<String, Method> _topicProcessorMethod;
private final Map<String, DatabaseWriter> _topicDbWriter;
public KafkaConsumerMaster() {
this._topicProcessorClass = new HashMap<>();
this._topicProcessorMethod = new HashMap<>();
this._topicDbWriter = new HashMap<>();
}
// map of kafka job id to configs
private static Map<Integer, KafkaConfig> _kafkaConfigs = new HashMap<>();
// map of topic name to DB writer
private static Map<String, DatabaseWriter> _topicDbWriters = new HashMap<>();
@Override
public void preStart() throws Exception {
_kafkaJobList = Play.application().configuration().getIntList("kafka.consumer.etl.jobid", null);;
public void preStart()
throws Exception {
_kafkaJobList = Play.application().configuration().getIntList("kafka.consumer.etl.jobid", null);
if (_kafkaJobList == null || _kafkaJobList.size() == 0) {
context().stop(getSelf());
Logger.error("Kafka job id error, kafkaJobList: " + _kafkaJobList);
getContext().stop(getSelf());
return;
}
Logger.info("Start the KafkaConsumerMaster... Kafka etl job id list: " + _kafkaJobList);
Logger.info("Start KafkaConsumerMaster... Kafka job id list: " + _kafkaJobList);
// handle 1 kafka connection
Map<String, Object> kafkaEtlJob = EtlJobDao.getEtlJobById(_kafkaJobList.get(0));
final int kafkaJobRefId = Integer.parseInt(kafkaEtlJob.get("ref_id").toString());
final String kafkaJobName = kafkaEtlJob.get("wh_etl_job_name").toString();
for (final int kafkaJobId : _kafkaJobList) {
try {
// handle 1 kafka connection
Map<String, Object> kafkaEtlJob = EtlJobDao.getEtlJobById(kafkaJobId);
final int kafkaJobRefId = Integer.parseInt(kafkaEtlJob.get("ref_id").toString());
final String kafkaJobName = kafkaEtlJob.get("wh_etl_job_name").toString();
if (!kafkaJobName.equals(EtlJobName.KAFKA_CONSUMER_ETL.name())) {
Logger.error("Kafka job info error: job name '" + kafkaJobName + "' not equal "
+ EtlJobName.KAFKA_CONSUMER_ETL.name());
getContext().stop(getSelf());
}
// get Kafka configurations from database
final KafkaConfig kafkaConfig = new KafkaConfig();
kafkaConfig.updateKafkaProperties(EtlJobName.valueOf(kafkaJobName), kafkaJobRefId);
final Properties kafkaProps = kafkaConfig.getProperties();
final Map<String, Topic> kafkaTopics = kafkaConfig.getTopics();
// get Kafka configurations from database
KafkaConfig.updateKafkaProperties(kafkaJobRefId);
_kafkaConfig = KafkaConfig.getProperties();
_kafkaTopics = KafkaConfig.getTopics();
kafkaConfig.updateTopicProcessor();
_topicDbWriters.putAll(kafkaConfig.getTopicDbWriters());
// get list of cluster information from database and update ClusterUtil
ClusterUtil.updateClusterInfo(ClusterDao.getClusterInfo());
// create Kafka consumer connector
Logger.info("Create Kafka Consumer with config: " + kafkaProps.toString());
final SchemaRegistryClient schemaRegistryClient =
new CachedSchemaRegistryClient((String) kafkaProps.get("schemaRegistryUrl"));
final ConsumerConfig cfg = new ConsumerConfig(kafkaProps);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(cfg);
for (String topic : _kafkaTopics.keySet()) {
// get the processor class and method
final Class processorClass = Class.forName(_kafkaTopics.get(topic).processor);
_topicProcessorClass.put(topic, processorClass.newInstance());
// create Kafka message streams
final Map<String, Integer> topicCountMap = new HashMap<>();
for (Topic topic : kafkaTopics.values()) {
topicCountMap.put(topic.topic, topic.numOfWorkers);
}
final Method method = processorClass.getDeclaredMethod("process", GenericData.Record.class, String.class);
_topicProcessorMethod.put(topic, method);
final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumerConnector.createMessageStreams(topicCountMap);
// get the database writer
final DatabaseWriter dw = new DatabaseWriter(JdbcUtil.wherehowsJdbcTemplate, _kafkaTopics.get(topic).dbTable);
_topicDbWriter.put(topic, dw);
}
// add config to kafka config map
kafkaConfig.setSchemaRegistryClient(schemaRegistryClient);
kafkaConfig.setConsumer(consumerConnector);
_kafkaConfigs.put(kafkaJobId, kafkaConfig);
// create Kafka consumer connector
final SchemaRegistryClient schemaRegistryClient =
new CachedSchemaRegistryClient((String) _kafkaConfig.get("schemaRegistryUrl"));
Logger.info("Create Kafka Consumer Config: " + _kafkaConfig.toString());
final ConsumerConfig cfg = new ConsumerConfig(_kafkaConfig);
_consumer = Consumer.createJavaConsumerConnector(cfg);
// create workers to handle each message stream
for (String topic : kafkaTopics.keySet()) {
final List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
// create Kafka message streams
final Map<String, Integer> topicCountMap = new HashMap<>();
for (Topic topic : _kafkaTopics.values()) {
topicCountMap.put(topic.topic, topic.numOfWorkers);
}
int threadNumber = 0;
for (final KafkaStream<byte[], byte[]> stream : streams) {
ActorRef childActor = getContext().actorOf(
Props.create(KafkaConsumerWorker.class, topic, threadNumber, stream, schemaRegistryClient,
kafkaConfig.getProcessorClass(topic), kafkaConfig.getProcessorMethod(topic)));
final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
_consumer.createMessageStreams(topicCountMap);
for (String topic : _kafkaTopics.keySet()) {
final List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
int threadNumber = 0;
for (final KafkaStream<byte[], byte[]> stream : streams) {
ActorRef childActor =
getContext().actorOf(
Props.create(KafkaConsumerWorker.class,
topic, threadNumber, stream, schemaRegistryClient,
_topicProcessorClass.get(topic), _topicProcessorMethod.get(topic)));
childActor.tell("Start", getSelf());
threadNumber++;
childActor.tell("Start", getSelf());
threadNumber++;
}
}
Logger.info("Initiate Kafka consumer job " + kafkaJobId + " with topics " + kafkaTopics.keySet());
} catch (Exception e) {
Logger.error("Initiating Kafka properties on startup fail, job id: " + kafkaJobId, e);
}
}
try {
// get list of cluster information from database and update ClusterUtil
ClusterUtil.updateClusterInfo(ClusterDao.getClusterInfo());
} catch (Exception e) {
Logger.error("Fail to fetch cluster info from DB ", e);
}
}
@Override
public void onReceive(Object message) throws Exception {
public void onReceive(Object message)
throws Exception {
if (message instanceof KafkaResponseMsg) {
final KafkaResponseMsg kafkaMsg = (KafkaResponseMsg) message;
final String topic = kafkaMsg.getTopic();
final AbstractRecord record = kafkaMsg.getRecord();
if (record != null && _kafkaTopics.containsKey(topic)) {
if (record != null && _topicDbWriters.containsKey(topic)) {
Logger.debug("Writing to DB kafka event record: " + topic);
final DatabaseWriter dbWriter = _topicDbWriter.get(topic);
final DatabaseWriter dbWriter = _topicDbWriters.get(topic);
try {
dbWriter.append(record);
@ -171,10 +161,8 @@ public class KafkaConsumerMaster extends UntypedActor {
@Override
public void postStop() {
Logger.info("Terminating KafkaConsumerMaster...");
if (_consumer != null) {
_consumer.shutdown();
_kafkaConfig.clear();
_kafkaTopics.clear();
for (KafkaConfig config : _kafkaConfigs.values()) {
config.close();
}
}
}

View File

@ -13,12 +13,17 @@
*/
package utils;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import kafka.javaapi.consumer.ConsumerConnector;
import metadata.etl.models.EtlJobName;
import models.daos.EtlJobPropertyDao;
import org.apache.avro.generic.GenericData;
import play.Logger;
import wherehows.common.kafka.schemaregistry.client.SchemaRegistryClient;
import wherehows.common.writers.DatabaseWriter;
/**
@ -26,12 +31,6 @@ import play.Logger;
*/
public class KafkaConfig {
private static Properties _props = new Properties();
// Map of <topic_name, topic_content>
private static Map<String, Topic> _topics = new HashMap<>();
public static final EtlJobName KAFKA_JOBNAME = EtlJobName.KAFKA_CONSUMER_ETL;
/**
* Class for storing Kafka Topic info
*/
@ -49,19 +48,30 @@ public class KafkaConfig {
}
}
private final Properties _props = new Properties();
// Map of <topic_name, topic_content>
private final Map<String, Topic> _topics = new HashMap<>();
private final Map<String, Object> _topicProcessorClass = new HashMap<>();
private final Map<String, Method> _topicProcessorMethod = new HashMap<>();
private final Map<String, DatabaseWriter> _topicDbWriter = new HashMap<>();
private ConsumerConnector _consumer;
private SchemaRegistryClient _schemaRegistryClient;
/**
* Update Kafka properties and topics from etl_job_properies table
* Update Kafka properties and topics from wh_etl_job_properties table
* @throws Exception
*/
public static void updateKafkaProperties(int kafkaJobRefId) throws Exception {
Properties props = EtlJobPropertyDao.getJobProperties(KAFKA_JOBNAME, kafkaJobRefId);
if (props == null || props.size() < 5) {
Logger.error("Fail to update Kafka job properties for " + KAFKA_JOBNAME.name()
+ ", job ref id: " + kafkaJobRefId);
public void updateKafkaProperties(EtlJobName jobName, int jobRefId)
throws Exception {
Properties props;
try {
props = EtlJobPropertyDao.getJobProperties(jobName, jobRefId);
} catch (Exception e) {
Logger.error("Fail to update Kafka job properties for " + jobName.name() + ", ref id: " + jobRefId);
return;
} else {
Logger.info("Get Kafka job properties for " + KAFKA_JOBNAME.name() + ", job ref id: " + kafkaJobRefId);
}
Logger.info("Get Kafka job properties for " + jobName.name() + ", job ref id: " + jobRefId);
String[] topics = ((String) props.remove("kafka.topics")).split("\\s*,\\s*");
String[] processors = ((String) props.remove("kafka.processors")).split("\\s*,\\s*");
@ -77,11 +87,61 @@ public class KafkaConfig {
}
}
/**
* update processor class, method and db writer for each topic
*/
public void updateTopicProcessor() {
for (String topic : _topics.keySet()) {
try {
// get the processor class and method
final Class processorClass = Class.forName(_topics.get(topic).processor);
_topicProcessorClass.put(topic, processorClass.newInstance());
final Method method = processorClass.getDeclaredMethod("process", GenericData.Record.class, String.class);
_topicProcessorMethod.put(topic, method);
// get the database writer
final DatabaseWriter dw = new DatabaseWriter(JdbcUtil.wherehowsJdbcTemplate, _topics.get(topic).dbTable);
_topicDbWriter.put(topic, dw);
} catch (Exception e) {
Logger.error("Fail to create Processor for topic: " + topic, e);
_topicProcessorClass.remove(topic);
_topicProcessorMethod.remove(topic);
_topicDbWriter.remove(topic);
}
}
}
public Object getProcessorClass(String topic) {
return _topicProcessorClass.get(topic);
}
public Method getProcessorMethod(String topic) {
return _topicProcessorMethod.get(topic);
}
public DatabaseWriter getDbWriter(String topic) {
return _topicDbWriter.get(topic);
}
public Map<String, DatabaseWriter> getTopicDbWriters() {
return _topicDbWriter;
}
/**
* close the Config
*/
public void close() {
if (_consumer != null) {
_consumer.shutdown();
}
}
/**
* get Kafka configuration
* @return
*/
public static Properties getProperties() {
public Properties getProperties() {
return _props;
}
@ -89,8 +149,23 @@ public class KafkaConfig {
* get Kafka topics
* @return
*/
public static Map<String, Topic> getTopics() {
public Map<String, Topic> getTopics() {
return _topics;
}
public ConsumerConnector getConsumer() {
return _consumer;
}
public void setConsumer(ConsumerConnector consumer) {
_consumer = consumer;
}
public SchemaRegistryClient getSchemaRegistryClient() {
return _schemaRegistryClient;
}
public void setSchemaRegistryClient(SchemaRegistryClient schemaRegistryClient) {
_schemaRegistryClient = schemaRegistryClient;
}
}