feat(kafka-config): Add ability to configure other Kafka props (#1745)

* Integarte spring-kafka & spring-boot for security props

- Upgrade spring-kafka to 2.1.14
- Use KafkaListener and KafkaTemplates to enable KafkaAutoConfiguration
- Integrates spring-boot's KafkaProperties into spring-kafka's config

* Cleanup imports

* Add DataHub kafka env vars

* Remove kafka-streams dependency

* Add KafkaProperties to gms; Add docs

* Add to Adoption

* Remove KAFKA_BOOTSTRAP_SERVER default

Co-authored-by: jsotelo <javier.sotelo@viasat.com>
Co-authored-by: Kerem Sahin <ksahin@linkedin.com>
This commit is contained in:
Javier Sotelo 2020-07-27 05:22:51 -07:00 committed by GitHub
parent 0a8acb8ae2
commit ad4c80a753
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 239 additions and 240 deletions

18
.gitignore vendored
View File

@ -12,6 +12,22 @@
**/src/mainGenerated*
**/src/testGenerated*
# Java
.java-version
# Python
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
.mypy_cache/
.dmypy.json
dmypy.json
MANIFEST
# Added by mp-maker
**/build
/config
@ -19,4 +35,4 @@
/out
# Mac OS
**/.DS_Store
**/.DS_Store

View File

@ -99,6 +99,7 @@ Here is a list of companies currently building POC or seriously evaluating DataH
* [ThoughtWorks](https://www.thoughtworks.com)
* [University of Phoenix](https://www.phoenix.edu)
* [Vectice](https://www.vectice.com)
* [Viasat](https://viasat.com)
## Select Articles & Talks
* [DataHub: A Generalized Metadata Search & Discovery Tool](https://engineering.linkedin.com/blog/2019/data-hub)

View File

@ -52,7 +52,6 @@ project.ext.externalDependency = [
'kafkaClients': 'org.apache.kafka:kafka-clients:2.3.0',
'kafkaSchemaRegistry': 'io.confluent:kafka-schema-registry-client:3.3.1',
'kafkaSerializers': 'io.confluent:kafka-avro-serializer:3.3.1',
'kafkaStreams': 'org.apache.kafka:kafka-streams:2.3.0',
'logbackClassic': 'ch.qos.logback:logback-classic:1.2.3',
'lombok': 'org.projectlombok:lombok:1.18.12',
'mariadbConnector': 'org.mariadb.jdbc:mariadb-java-client:2.6.0',
@ -72,9 +71,10 @@ project.ext.externalDependency = [
'springContext': 'org.springframework:spring-context:5.2.3.RELEASE',
'springCore': 'org.springframework:spring-core:5.2.3.RELEASE',
'springWeb': 'org.springframework:spring-web:5.2.3.RELEASE',
'springBootAutoconfigure': 'org.springframework.boot:spring-boot-autoconfigure:2.1.2.RELEASE',
'springBootStarterWeb': 'org.springframework.boot:spring-boot-starter-web:2.1.2.RELEASE',
'springBootStarterJetty': 'org.springframework.boot:spring-boot-starter-jetty:2.1.2.RELEASE',
'springKafka': 'org.springframework.kafka:spring-kafka:2.2.3.RELEASE',
'springKafka': 'org.springframework.kafka:spring-kafka:2.2.14.RELEASE',
'springActuator': 'org.springframework.boot:spring-boot-starter-actuator:2.1.2.RELEASE',
'testng': 'org.testng:testng:6.9.9'
]

21
docs/how/kafka-config.md Normal file
View File

@ -0,0 +1,21 @@
# How to configure Kafka?
With the exception of `KAFKA_BOOTSTRAP_SERVER` and `KAFKA_SCHEMAREGISTRY_URL`, Kafka is configured via [spring-boot](https://spring.io/projects/spring-boot), specifically with [KafkaProperties](https://docs.spring.io/spring-boot/docs/current/api/org/springframework/boot/autoconfigure/kafka/KafkaProperties.html). See [Integration Properties](https://docs.spring.io/spring-boot/docs/current/reference/html/appendix-application-properties.html#integration-properties) prefixed with `spring.kafka`.
Below is an example of how SASL/GSSAPI properties can be configured via environment variables:
```bash
export KAFKA_BOOTSTRAP_SERVER=broker:29092
export KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
export SPRING_KAFKA_PROPERTIES_SASL_KERBEROS_SERVICE_NAME=kafka
export SPRING_KAFKA_PROPERTIES_SECURITY_PROTOCOL=SASL_PLAINTEXT
export SPRING_KAFKA_SASL_JAAS_CONFIG=com.sun.security.auth.module.Krb5LoginModule required principal='principal@REALM' useKeyTab=true storeKey=true keyTab='/keytab';
```
These properties can be specified via `application.properties` or `application.yml` files, or as command line switches, or as environment variables. See Spring's [Externalized Configuration](https://docs.spring.io/spring-boot/docs/current/reference/html/spring-boot-features.html#boot-features-external-config) to see how this works.
See [Kafka Connect Security](https://docs.confluent.io/current/connect/security.html) for more ways to connect.
DataHub components that connect to Kafka are currently:
- mce-consumer-job
- mae-consumer-job
- gms

View File

@ -9,8 +9,10 @@ dependencies {
compile externalDependency.lombok
compile externalDependency.servletApi
compile externalDependency.springBeans
compile externalDependency.springBootAutoconfigure
compile externalDependency.springContext
compile externalDependency.springCore
compile externalDependency.springKafka
compile externalDependency.springWeb
compile spec.product.pegasus.restliSpringBridge

View File

@ -2,32 +2,41 @@ package com.linkedin.common.factory;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import java.util.Arrays;
import java.util.Map;
import org.apache.avro.generic.IndexedRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Properties;
@Configuration
@EnableConfigurationProperties(KafkaProperties.class)
public class KafkaEventProducerFactory {
@Value("${KAFKA_BOOTSTRAP_SERVER:localhost:9092}")
@Value("${KAFKA_BOOTSTRAP_SERVER}")
private String kafkaBootstrapServers;
@Value("${KAFKA_SCHEMAREGISTRY_URL:http://localhost:8081}")
private String kafkaSchemaRegistryUrl;
@Bean(name = "kafkaEventProducer")
protected Producer<String, IndexedRecord> createInstance() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
protected Producer<String, IndexedRecord> createInstance(KafkaProperties properties) {
KafkaProperties.Producer producerProps = properties.getProducer();
producerProps.setKeySerializer(StringSerializer.class);
producerProps.setValueSerializer(KafkaAvroSerializer.class);
// KAFKA_BOOTSTRAP_SERVER has precedence over SPRING_KAFKA_BOOTSTRAP_SERVERS
if (kafkaBootstrapServers != null && kafkaBootstrapServers.length() > 0) {
producerProps.setBootstrapServers(Arrays.asList(kafkaBootstrapServers.split(",")));
} // else we rely on KafkaProperties which defaults to localhost:9092
Map<String, Object> props = properties.buildProducerProperties();
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaSchemaRegistryUrl);
return new KafkaProducer(props);

View File

@ -32,7 +32,6 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.springframework.web.HttpRequestHandler;
public class ParallelRestliHttpRequestHandler implements HttpRequestHandler {
private RAPServlet _r2Servlet;

View File

@ -28,7 +28,6 @@ dependencies {
compile externalDependency.neo4jJavaDriver
compile externalDependency.kafkaAvroSerde
compile externalDependency.kafkaStreams
compile externalDependency.kafkaSerializers
compile (externalDependency.springBootStarterWeb) {

View File

@ -3,10 +3,9 @@ package com.linkedin.metadata.kafka;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.elasticsearch.rest.RestClientAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
@SuppressWarnings("checkstyle:HideUtilityClassConstructor")
@SpringBootApplication(exclude = {RestClientAutoConfiguration.class, KafkaAutoConfiguration.class})
@SpringBootApplication(exclude = {RestClientAutoConfiguration.class})
public class MaeConsumerApplication {
public static void main(String[] args) {

View File

@ -8,7 +8,10 @@ import java.util.Arrays;
import java.util.List;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.stereotype.Component;
import org.springframework.kafka.annotation.KafkaListener;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.events.metadata.ChangeType;
@ -28,9 +31,12 @@ import com.linkedin.mxe.MetadataAuditEvent;
import lombok.extern.slf4j.Slf4j;
import com.linkedin.mxe.Topics;
@Slf4j
@Component
@EnableKafka
public class MetadataAuditEventsProcessor {
// Doc Type should be updated when ElasticSearch Version is upgraded.
private static final String DOC_TYPE = "doc";
@ -46,7 +52,9 @@ public class MetadataAuditEventsProcessor {
this.graphWriterDAO = graphWriterDAO;
}
public void processSingleMAE(final GenericRecord record) {
@KafkaListener(id = "mae-consumer-job-client", topics = "${KAFKA_TOPIC_NAME:" + Topics.METADATA_AUDIT_EVENT + "}")
public void consume(final ConsumerRecord<String, GenericRecord> consumerRecord) {
final GenericRecord record = consumerRecord.value();
log.debug("Got MAE");
try {

View File

@ -0,0 +1,56 @@
package com.linkedin.metadata.kafka.config;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.time.Duration;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
@Slf4j
@Configuration
public class KafkaConfig {
@Value("${KAFKA_BOOTSTRAP_SERVER}")
private String kafkaBootstrapServer;
@Value("${KAFKA_SCHEMAREGISTRY_URL:http://localhost:8081}")
private String kafkaSchemaRegistryUrl;
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory(KafkaProperties properties) {
KafkaProperties.Consumer consumerProps = properties.getConsumer();
// Specify (de)serializers for record keys and for record values.
consumerProps.setKeyDeserializer(StringDeserializer.class);
consumerProps.setValueDeserializer(KafkaAvroDeserializer.class);
// Records will be flushed every 10 seconds.
consumerProps.setEnableAutoCommit(true);
consumerProps.setAutoCommitInterval(Duration.ofSeconds(10));
Map<String, Object> props = properties.buildConsumerProperties();
// KAFKA_BOOTSTRAP_SERVER has precedence over SPRING_KAFKA_BOOTSTRAP_SERVERS
if (kafkaBootstrapServer != null && kafkaBootstrapServer.length() > 0) {
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServer);
} // else we rely on KafkaProperties which defaults to localhost:9092
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaSchemaRegistryUrl);
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
log.info("KafkaListenerContainerFactory built successfully");
return factory;
}
}

View File

@ -1,68 +0,0 @@
package com.linkedin.metadata.kafka.config;
import java.util.HashMap;
import java.util.Map;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import com.linkedin.metadata.kafka.MetadataAuditEventsProcessor;
import com.linkedin.mxe.Topics;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde;
@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Value("${KAFKA_BOOTSTRAP_SERVER:localhost:9092}")
private String kafkaBootstrapServer;
@Value("${KAFKA_SCHEMAREGISTRY_URL:http://localhost:8081}")
private String kafkaSchemaRegistryUrl;
@Value("${KAFKA_TOPIC_NAME:" + Topics.METADATA_AUDIT_EVENT + "}")
private String kafkaTopicName;
private final MetadataAuditEventsProcessor eventsProcessor;
public KafkaStreamsConfig(MetadataAuditEventsProcessor eventsProcessor) {
this.eventsProcessor = eventsProcessor;
}
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "mae-consumer-job");
props.put(StreamsConfig.CLIENT_ID_CONFIG, "mae-consumer-job-client");
// Where to find Kafka broker(s).
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServer);
// Specify default (de)serializers for record keys and for record values.
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class.getName());
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaSchemaRegistryUrl);
// Records will be flushed every 10 seconds.
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000);
// Disable record caches.
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
return new KafkaStreamsConfiguration(props);
}
@Bean
public KStream<String, GenericRecord> kStream(StreamsBuilder kStreamBuilder) {
KStream<String, GenericRecord> messages = kStreamBuilder.stream(kafkaTopicName);
messages.foreach((k, v) -> eventsProcessor.processSingleMAE(v));
return messages;
}
}

View File

@ -24,7 +24,6 @@ dependencies {
compile spec.product.pegasus.restliCommon
compile externalDependency.elasticSearchRest
compile externalDependency.kafkaAvroSerde
compile externalDependency.kafkaStreams
compile externalDependency.kafkaSerializers
compile (externalDependency.springBootStarterWeb) {
exclude module: "spring-boot-starter-tomcat"

View File

@ -3,10 +3,9 @@ package com.linkedin.metadata.kafka;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.elasticsearch.rest.RestClientAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
@SuppressWarnings("checkstyle:HideUtilityClassConstructor")
@SpringBootApplication(exclude = {RestClientAutoConfiguration.class, KafkaAutoConfiguration.class})
@SpringBootApplication(exclude = {RestClientAutoConfiguration.class})
public class MceConsumerApplication {
public static void main(String[] args) {

View File

@ -1,18 +1,5 @@
package com.linkedin.metadata.kafka;
import java.io.IOException;
import java.net.URISyntaxException;
import javax.annotation.Nonnull;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.EventUtils;
@ -23,25 +10,41 @@ import com.linkedin.metadata.snapshot.Snapshot;
import com.linkedin.mxe.FailedMetadataChangeEvent;
import com.linkedin.mxe.MetadataChangeEvent;
import com.linkedin.mxe.Topics;
import java.io.IOException;
import java.net.URISyntaxException;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@EnableKafka
public class MetadataChangeEventsProcessor {
private BaseRemoteWriterDAO remoteWriterDAO;
@Autowired
private KafkaProducer<String, GenericRecord> kafkaProducer;
private KafkaTemplate<String, GenericRecord> kafkaTemplate;
@Value("${KAFKA_FMCE_TOPIC_NAME:" + Topics.FAILED_METADATA_CHANGE_EVENT + "}")
private String fmceTopicName;
public MetadataChangeEventsProcessor(BaseRemoteWriterDAO remoteWriterDAO) {
public MetadataChangeEventsProcessor(BaseRemoteWriterDAO remoteWriterDAO,
KafkaTemplate<String, GenericRecord> kafkaTemplate) {
this.remoteWriterDAO = remoteWriterDAO;
this.kafkaTemplate = kafkaTemplate;
}
public void processSingleMCE(final GenericRecord record) {
@KafkaListener(id = "mce-consumer-job-client",
topics = "${KAFKA_MCE_TOPIC_NAME:" + Topics.METADATA_CHANGE_EVENT + "}")
public void consume(final ConsumerRecord<String, GenericRecord> consumerRecord) {
final GenericRecord record = consumerRecord.value();
log.debug("Got MCE");
log.debug("Record ", record);
@ -72,7 +75,7 @@ public class MetadataChangeEventsProcessor {
final GenericRecord genericFailedMCERecord = EventUtils.pegasusToAvroFailedMCE(failedMetadataChangeEvent);
log.debug("Sending FailedMessages to topic - {}", fmceTopicName);
log.info("Error while processing MCE: FailedMetadataChangeEvent - {}", failedMetadataChangeEvent);
this.kafkaProducer.send(new ProducerRecord<>(fmceTopicName, genericFailedMCERecord));
this.kafkaTemplate.send(fmceTopicName, genericFailedMCERecord);
} catch (IOException e) {
log.error("Error while sending FailedMetadataChangeEvent: Exception - {}, FailedMetadataChangeEvent - {}",
e.getStackTrace(), failedMetadataChangeEvent);

View File

@ -0,0 +1,89 @@
package com.linkedin.metadata.kafka.config;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ErrorHandler;
@Slf4j
@Configuration
public class KafkaConfig {
@Value("${KAFKA_BOOTSTRAP_SERVER}")
private String kafkaBootstrapServers;
@Value("${KAFKA_SCHEMAREGISTRY_URL:http://localhost:8081}")
private String kafkaSchemaRegistryUrl;
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory(KafkaProperties properties) {
KafkaProperties.Consumer consumerProps = properties.getConsumer();
// Specify (de)serializers for record keys and for record values.
consumerProps.setKeyDeserializer(StringDeserializer.class);
consumerProps.setValueDeserializer(KafkaAvroDeserializer.class);
// Records will be flushed every 10 seconds.
consumerProps.setEnableAutoCommit(true);
consumerProps.setAutoCommitInterval(Duration.ofSeconds(10));
// KAFKA_BOOTSTRAP_SERVER has precedence over SPRING_KAFKA_BOOTSTRAP_SERVERS
if (kafkaBootstrapServers != null && kafkaBootstrapServers.length() > 0) {
consumerProps.setBootstrapServers(Arrays.asList(kafkaBootstrapServers.split(",")));
} // else we rely on KafkaProperties which defaults to localhost:9092
Map<String, Object> props = properties.buildConsumerProperties();
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaSchemaRegistryUrl);
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
log.info("KafkaListenerContainerFactory built successfully");
return factory;
}
@Bean
public KafkaTemplate<String, GenericRecord> kafkaTemplate(KafkaProperties properties) {
KafkaProperties.Producer producerProps = properties.getProducer();
producerProps.setKeySerializer(StringSerializer.class);
producerProps.setValueSerializer(KafkaAvroSerializer.class);
// KAFKA_BOOTSTRAP_SERVER has precedence over SPRING_KAFKA_BOOTSTRAP_SERVERS
if (kafkaBootstrapServers != null && kafkaBootstrapServers.length() > 0) {
producerProps.setBootstrapServers(Arrays.asList(kafkaBootstrapServers.split(",")));
} // else we rely on KafkaProperties which defaults to localhost:9092
Map<String, Object> props = properties.buildProducerProperties();
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaSchemaRegistryUrl);
KafkaTemplate<String, GenericRecord> template =
new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(properties.buildProducerProperties()));
log.info("KafkaTemplate built successfully");
return template;
}
@Bean
public ErrorHandler errorHandler() {
return (e, r) -> log.error("Exception caught during Deserialization, topic: {}, partition: {}, offset: {}",
r.topic(), r.partition(), r.offset(), e);
}
}

View File

@ -1,58 +0,0 @@
package com.linkedin.metadata.kafka.config;
import java.util.HashMap;
import java.util.Map;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.ProducerFactory;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerializer;
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Value("${KAFKA_BOOTSTRAP_SERVER:localhost:9092}")
private String kafkaBootstrapServer;
@Value("${KAFKA_SCHEMAREGISTRY_URL:http://localhost:8081}")
private String kafkaSchemaRegistryUrl;
/**
* KafkaProducer Properties to produce FailedMetadataChangeEvent
*
* @return Properties producerConfig
*/
@Bean
public Map<String, Object> getProducerConfig() {
final Map<String, Object> producerConfigMap = new HashMap<>();
producerConfigMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
com.linkedin.util.Configuration.getEnvironmentVariable("KAFKA_BOOTSTRAP_SERVER", kafkaBootstrapServer));
producerConfigMap.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
com.linkedin.util.Configuration.getEnvironmentVariable("KAFKA_SCHEMAREGISTRY_URL", kafkaSchemaRegistryUrl));
producerConfigMap.put(ProducerConfig.CLIENT_ID_CONFIG, "failed-mce-producer");
producerConfigMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerConfigMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GenericAvroSerializer.class);
return producerConfigMap;
}
@Bean
public ProducerFactory<String, GenericRecord> producerFactory() {
return new DefaultKafkaProducerFactory<>(getProducerConfig());
}
@Bean
public KafkaProducer<String, GenericRecord> kafkaProducer() {
return new KafkaProducer<>(getProducerConfig());
}
}

View File

@ -1,75 +0,0 @@
package com.linkedin.metadata.kafka.config;
import java.util.HashMap;
import java.util.Map;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import com.linkedin.metadata.kafka.MetadataChangeEventsProcessor;
import com.linkedin.mxe.Topics;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde;
import lombok.extern.slf4j.Slf4j;
@Configuration
@EnableKafka
@EnableKafkaStreams
@Slf4j
public class KafkaStreamsConfig {
@Value("${KAFKA_BOOTSTRAP_SERVER:localhost:9092}")
private String kafkaBootstrapServer;
@Value("${KAFKA_SCHEMAREGISTRY_URL:http://localhost:8081}")
private String kafkaSchemaRegistryUrl;
@Value("${KAFKA_MCE_TOPIC_NAME:" + Topics.METADATA_CHANGE_EVENT + "}")
private String mceTopicName;
private final MetadataChangeEventsProcessor eventsProcessor;
public KafkaStreamsConfig(MetadataChangeEventsProcessor eventsProcessor) {
this.eventsProcessor = eventsProcessor;
}
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
// Give the Streams application a unique name. The name must be unique in the Kafka cluster
// against which the application is run.
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "mce-consuming-job");
props.put(StreamsConfig.CLIENT_ID_CONFIG, "mce-consuming-job-client");
// Where to find Kafka broker(s).
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServer);
// Specify default (de)serializers for record keys and for record values.
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class.getName());
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaSchemaRegistryUrl);
// Continue handling events after exception
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);
// Records will be flushed every 10 seconds.
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000);
// Disable record caches.
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
return new KafkaStreamsConfiguration(props);
}
@Bean
public KStream<String, GenericRecord> kStream(StreamsBuilder kStreamBuilder) {
final KStream<String, GenericRecord> messages = kStreamBuilder.stream(mceTopicName);
messages.foreach((k, v) -> eventsProcessor.processSingleMCE(v));
return messages;
}
}