From c0cfe1f5ca177d08247ed4836e1a459fde58391d Mon Sep 17 00:00:00 2001 From: Yi Wang Date: Thu, 4 Aug 2016 13:07:19 -0700 Subject: [PATCH] Modify KafkaConsumerMaster to handle more than one kafka config, add error handling --- .../app/actors/KafkaConsumerMaster.java | 152 ++++++++---------- backend-service/app/utils/KafkaConfig.java | 107 ++++++++++-- 2 files changed, 161 insertions(+), 98 deletions(-) diff --git a/backend-service/app/actors/KafkaConsumerMaster.java b/backend-service/app/actors/KafkaConsumerMaster.java index f3668d1727..b04f7fe490 100644 --- a/backend-service/app/actors/KafkaConsumerMaster.java +++ b/backend-service/app/actors/KafkaConsumerMaster.java @@ -17,7 +17,6 @@ import akka.actor.ActorRef; import akka.actor.Props; import akka.actor.UntypedActor; import java.io.IOException; -import java.lang.reflect.Method; import java.sql.SQLException; import java.util.HashMap; import java.util.List; @@ -31,12 +30,10 @@ import kafka.javaapi.consumer.ConsumerConnector; import metadata.etl.models.EtlJobName; import models.daos.ClusterDao; import models.daos.EtlJobDao; -import org.apache.avro.generic.GenericData; import msgs.KafkaResponseMsg; import play.Logger; import play.Play; -import utils.JdbcUtil; import utils.KafkaConfig; import utils.KafkaConfig.Topic; import wherehows.common.kafka.schemaregistry.client.CachedSchemaRegistryClient; @@ -47,111 +44,104 @@ import wherehows.common.writers.DatabaseWriter; /** - * Akka actor responsible to manage Kafka workers and use kafka topic processor - * to deal with response messages - * + * Akka actor responsible for managing Kafka workers */ public class KafkaConsumerMaster extends UntypedActor { + // List of kafka job IDs private static List _kafkaJobList; - private static ConsumerConnector _consumer; - private static Properties _kafkaConfig; - private static Map _kafkaTopics; - - private final Map _topicProcessorClass; - private final Map _topicProcessorMethod; - private final Map _topicDbWriter; - - public KafkaConsumerMaster() { - this._topicProcessorClass = new HashMap<>(); - this._topicProcessorMethod = new HashMap<>(); - this._topicDbWriter = new HashMap<>(); - } + // map of kafka job id to configs + private static Map _kafkaConfigs = new HashMap<>(); + // map of topic name to DB writer + private static Map _topicDbWriters = new HashMap<>(); @Override - public void preStart() throws Exception { - _kafkaJobList = Play.application().configuration().getIntList("kafka.consumer.etl.jobid", null);; + public void preStart() + throws Exception { + _kafkaJobList = Play.application().configuration().getIntList("kafka.consumer.etl.jobid", null); if (_kafkaJobList == null || _kafkaJobList.size() == 0) { + context().stop(getSelf()); Logger.error("Kafka job id error, kafkaJobList: " + _kafkaJobList); - getContext().stop(getSelf()); + return; } - Logger.info("Start the KafkaConsumerMaster... Kafka etl job id list: " + _kafkaJobList); + Logger.info("Start KafkaConsumerMaster... Kafka job id list: " + _kafkaJobList); - // handle 1 kafka connection - Map kafkaEtlJob = EtlJobDao.getEtlJobById(_kafkaJobList.get(0)); - final int kafkaJobRefId = Integer.parseInt(kafkaEtlJob.get("ref_id").toString()); - final String kafkaJobName = kafkaEtlJob.get("wh_etl_job_name").toString(); + for (final int kafkaJobId : _kafkaJobList) { + try { + // handle 1 kafka connection + Map kafkaEtlJob = EtlJobDao.getEtlJobById(kafkaJobId); + final int kafkaJobRefId = Integer.parseInt(kafkaEtlJob.get("ref_id").toString()); + final String kafkaJobName = kafkaEtlJob.get("wh_etl_job_name").toString(); - if (!kafkaJobName.equals(EtlJobName.KAFKA_CONSUMER_ETL.name())) { - Logger.error("Kafka job info error: job name '" + kafkaJobName + "' not equal " - + EtlJobName.KAFKA_CONSUMER_ETL.name()); - getContext().stop(getSelf()); - } + // get Kafka configurations from database + final KafkaConfig kafkaConfig = new KafkaConfig(); + kafkaConfig.updateKafkaProperties(EtlJobName.valueOf(kafkaJobName), kafkaJobRefId); + final Properties kafkaProps = kafkaConfig.getProperties(); + final Map kafkaTopics = kafkaConfig.getTopics(); - // get Kafka configurations from database - KafkaConfig.updateKafkaProperties(kafkaJobRefId); - _kafkaConfig = KafkaConfig.getProperties(); - _kafkaTopics = KafkaConfig.getTopics(); + kafkaConfig.updateTopicProcessor(); + _topicDbWriters.putAll(kafkaConfig.getTopicDbWriters()); - // get list of cluster information from database and update ClusterUtil - ClusterUtil.updateClusterInfo(ClusterDao.getClusterInfo()); + // create Kafka consumer connector + Logger.info("Create Kafka Consumer with config: " + kafkaProps.toString()); + final SchemaRegistryClient schemaRegistryClient = + new CachedSchemaRegistryClient((String) kafkaProps.get("schemaRegistryUrl")); + final ConsumerConfig cfg = new ConsumerConfig(kafkaProps); + ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(cfg); - for (String topic : _kafkaTopics.keySet()) { - // get the processor class and method - final Class processorClass = Class.forName(_kafkaTopics.get(topic).processor); - _topicProcessorClass.put(topic, processorClass.newInstance()); + // create Kafka message streams + final Map topicCountMap = new HashMap<>(); + for (Topic topic : kafkaTopics.values()) { + topicCountMap.put(topic.topic, topic.numOfWorkers); + } - final Method method = processorClass.getDeclaredMethod("process", GenericData.Record.class, String.class); - _topicProcessorMethod.put(topic, method); + final Map>> consumerMap = + consumerConnector.createMessageStreams(topicCountMap); - // get the database writer - final DatabaseWriter dw = new DatabaseWriter(JdbcUtil.wherehowsJdbcTemplate, _kafkaTopics.get(topic).dbTable); - _topicDbWriter.put(topic, dw); - } + // add config to kafka config map + kafkaConfig.setSchemaRegistryClient(schemaRegistryClient); + kafkaConfig.setConsumer(consumerConnector); + _kafkaConfigs.put(kafkaJobId, kafkaConfig); - // create Kafka consumer connector - final SchemaRegistryClient schemaRegistryClient = - new CachedSchemaRegistryClient((String) _kafkaConfig.get("schemaRegistryUrl")); - Logger.info("Create Kafka Consumer Config: " + _kafkaConfig.toString()); - final ConsumerConfig cfg = new ConsumerConfig(_kafkaConfig); - _consumer = Consumer.createJavaConsumerConnector(cfg); + // create workers to handle each message stream + for (String topic : kafkaTopics.keySet()) { + final List> streams = consumerMap.get(topic); - // create Kafka message streams - final Map topicCountMap = new HashMap<>(); - for (Topic topic : _kafkaTopics.values()) { - topicCountMap.put(topic.topic, topic.numOfWorkers); - } + int threadNumber = 0; + for (final KafkaStream stream : streams) { + ActorRef childActor = getContext().actorOf( + Props.create(KafkaConsumerWorker.class, topic, threadNumber, stream, schemaRegistryClient, + kafkaConfig.getProcessorClass(topic), kafkaConfig.getProcessorMethod(topic))); - final Map>> consumerMap = - _consumer.createMessageStreams(topicCountMap); - - for (String topic : _kafkaTopics.keySet()) { - final List> streams = consumerMap.get(topic); - - int threadNumber = 0; - for (final KafkaStream stream : streams) { - ActorRef childActor = - getContext().actorOf( - Props.create(KafkaConsumerWorker.class, - topic, threadNumber, stream, schemaRegistryClient, - _topicProcessorClass.get(topic), _topicProcessorMethod.get(topic))); - - childActor.tell("Start", getSelf()); - threadNumber++; + childActor.tell("Start", getSelf()); + threadNumber++; + } + } + Logger.info("Initiate Kafka consumer job " + kafkaJobId + " with topics " + kafkaTopics.keySet()); + } catch (Exception e) { + Logger.error("Initiating Kafka properties on startup fail, job id: " + kafkaJobId, e); } } + + try { + // get list of cluster information from database and update ClusterUtil + ClusterUtil.updateClusterInfo(ClusterDao.getClusterInfo()); + } catch (Exception e) { + Logger.error("Fail to fetch cluster info from DB ", e); + } } @Override - public void onReceive(Object message) throws Exception { + public void onReceive(Object message) + throws Exception { if (message instanceof KafkaResponseMsg) { final KafkaResponseMsg kafkaMsg = (KafkaResponseMsg) message; final String topic = kafkaMsg.getTopic(); final AbstractRecord record = kafkaMsg.getRecord(); - if (record != null && _kafkaTopics.containsKey(topic)) { + if (record != null && _topicDbWriters.containsKey(topic)) { Logger.debug("Writing to DB kafka event record: " + topic); - final DatabaseWriter dbWriter = _topicDbWriter.get(topic); + final DatabaseWriter dbWriter = _topicDbWriters.get(topic); try { dbWriter.append(record); @@ -171,10 +161,8 @@ public class KafkaConsumerMaster extends UntypedActor { @Override public void postStop() { Logger.info("Terminating KafkaConsumerMaster..."); - if (_consumer != null) { - _consumer.shutdown(); - _kafkaConfig.clear(); - _kafkaTopics.clear(); + for (KafkaConfig config : _kafkaConfigs.values()) { + config.close(); } } } diff --git a/backend-service/app/utils/KafkaConfig.java b/backend-service/app/utils/KafkaConfig.java index b3722bebcf..3b8c8062c7 100644 --- a/backend-service/app/utils/KafkaConfig.java +++ b/backend-service/app/utils/KafkaConfig.java @@ -13,12 +13,17 @@ */ package utils; +import java.lang.reflect.Method; import java.util.HashMap; import java.util.Map; import java.util.Properties; +import kafka.javaapi.consumer.ConsumerConnector; import metadata.etl.models.EtlJobName; import models.daos.EtlJobPropertyDao; +import org.apache.avro.generic.GenericData; import play.Logger; +import wherehows.common.kafka.schemaregistry.client.SchemaRegistryClient; +import wherehows.common.writers.DatabaseWriter; /** @@ -26,12 +31,6 @@ import play.Logger; */ public class KafkaConfig { - private static Properties _props = new Properties(); - // Map of - private static Map _topics = new HashMap<>(); - - public static final EtlJobName KAFKA_JOBNAME = EtlJobName.KAFKA_CONSUMER_ETL; - /** * Class for storing Kafka Topic info */ @@ -49,19 +48,30 @@ public class KafkaConfig { } } + private final Properties _props = new Properties(); + // Map of + private final Map _topics = new HashMap<>(); + private final Map _topicProcessorClass = new HashMap<>(); + private final Map _topicProcessorMethod = new HashMap<>(); + private final Map _topicDbWriter = new HashMap<>(); + + private ConsumerConnector _consumer; + private SchemaRegistryClient _schemaRegistryClient; + /** - * Update Kafka properties and topics from etl_job_properies table + * Update Kafka properties and topics from wh_etl_job_properties table * @throws Exception */ - public static void updateKafkaProperties(int kafkaJobRefId) throws Exception { - Properties props = EtlJobPropertyDao.getJobProperties(KAFKA_JOBNAME, kafkaJobRefId); - if (props == null || props.size() < 5) { - Logger.error("Fail to update Kafka job properties for " + KAFKA_JOBNAME.name() - + ", job ref id: " + kafkaJobRefId); + public void updateKafkaProperties(EtlJobName jobName, int jobRefId) + throws Exception { + Properties props; + try { + props = EtlJobPropertyDao.getJobProperties(jobName, jobRefId); + } catch (Exception e) { + Logger.error("Fail to update Kafka job properties for " + jobName.name() + ", ref id: " + jobRefId); return; - } else { - Logger.info("Get Kafka job properties for " + KAFKA_JOBNAME.name() + ", job ref id: " + kafkaJobRefId); } + Logger.info("Get Kafka job properties for " + jobName.name() + ", job ref id: " + jobRefId); String[] topics = ((String) props.remove("kafka.topics")).split("\\s*,\\s*"); String[] processors = ((String) props.remove("kafka.processors")).split("\\s*,\\s*"); @@ -77,11 +87,61 @@ public class KafkaConfig { } } + /** + * update processor class, method and db writer for each topic + */ + public void updateTopicProcessor() { + for (String topic : _topics.keySet()) { + try { + // get the processor class and method + final Class processorClass = Class.forName(_topics.get(topic).processor); + _topicProcessorClass.put(topic, processorClass.newInstance()); + + final Method method = processorClass.getDeclaredMethod("process", GenericData.Record.class, String.class); + _topicProcessorMethod.put(topic, method); + + // get the database writer + final DatabaseWriter dw = new DatabaseWriter(JdbcUtil.wherehowsJdbcTemplate, _topics.get(topic).dbTable); + _topicDbWriter.put(topic, dw); + } catch (Exception e) { + Logger.error("Fail to create Processor for topic: " + topic, e); + _topicProcessorClass.remove(topic); + _topicProcessorMethod.remove(topic); + _topicDbWriter.remove(topic); + } + } + } + + public Object getProcessorClass(String topic) { + return _topicProcessorClass.get(topic); + } + + public Method getProcessorMethod(String topic) { + return _topicProcessorMethod.get(topic); + } + + public DatabaseWriter getDbWriter(String topic) { + return _topicDbWriter.get(topic); + } + + public Map getTopicDbWriters() { + return _topicDbWriter; + } + + /** + * close the Config + */ + public void close() { + if (_consumer != null) { + _consumer.shutdown(); + } + } + /** * get Kafka configuration * @return */ - public static Properties getProperties() { + public Properties getProperties() { return _props; } @@ -89,8 +149,23 @@ public class KafkaConfig { * get Kafka topics * @return */ - public static Map getTopics() { + public Map getTopics() { return _topics; } + public ConsumerConnector getConsumer() { + return _consumer; + } + + public void setConsumer(ConsumerConnector consumer) { + _consumer = consumer; + } + + public SchemaRegistryClient getSchemaRegistryClient() { + return _schemaRegistryClient; + } + + public void setSchemaRegistryClient(SchemaRegistryClient schemaRegistryClient) { + _schemaRegistryClient = schemaRegistryClient; + } }