feat: add support for AWS glue schema registry (#3083)

This commit is contained in:
Dexter Lee 2021-08-11 21:40:37 -07:00 committed by GitHub
parent 8c9c696cdd
commit 2df9d4fed6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 387 additions and 200 deletions

View File

@ -35,6 +35,7 @@ project.ext.externalDependency = [
'assertJ': 'org.assertj:assertj-core:3.11.1',
'avro_1_7': 'org.apache.avro:avro:1.7.7',
'avroCompiler_1_7': 'org.apache.avro:avro-compiler:1.7.7',
'awsGlueSchemaRegistrySerde': 'software.amazon.glue:schema-registry-serde:1.1.1',
'cacheApi' : 'javax.cache:cache-api:1.1.0',
'commonsIo': 'commons-io:commons-io:2.4',
'commonsLang': 'commons-lang:commons-lang:2.6',

View File

@ -283,3 +283,85 @@ kafka:
```
Run `helm install datahub datahub/ --values datahub/quickstart-values.yaml` to apply the changes.
### AWS Glue Schema Registry
You can use AWS Glue schema registry instead of the kafka schema registry. To do so, first provision an AWS Glue schema
registry in the "Schema Registry" tab in the AWS Glue console page.
Once the registry is provisioned, you can change helm chart as follows.
```
kafka:
bootstrap:
...
zookeeper:
...
schemaregistry:
type: AWS_GLUE
glue:
region: <<AWS region of registry>>
registry: <<name of registry>>
```
Note, it will use the name of the topic as the schema name in the registry.
Before you update the pods, you need to give the k8s worker nodes the correct permissions to access the schema registry.
The minimum permissions required looks like this
```
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"glue:GetRegistry",
"glue:ListRegistries",
"glue:CreateSchema",
"glue:UpdateSchema",
"glue:GetSchema",
"glue:ListSchemas",
"glue:RegisterSchemaVersion",
"glue:GetSchemaByDefinition",
"glue:GetSchemaVersion",
"glue:GetSchemaVersionsDiff",
"glue:ListSchemaVersions",
"glue:CheckSchemaVersionValidity",
"glue:PutSchemaVersionMetadata",
"glue:QuerySchemaVersionMetadata"
],
"Resource": [
"arn:aws:glue:*:795586375822:schema/*",
"arn:aws:glue:us-west-2:795586375822:registry/demo-shared"
]
},
{
"Sid": "VisualEditor1",
"Effect": "Allow",
"Action": [
"glue:GetSchemaVersion"
],
"Resource": [
"*"
]
}
]
}
```
The latter part is required to have "*" as the resource because of an issue in the AWS Glue schema registry library.
Refer to [this issue](https://github.com/awslabs/aws-glue-schema-registry/issues/68) for any updates.
Glue currently doesn't support AWS Signature V4. As such, we cannot use service accounts to give permissions to access
the schema registry. The workaround is to give the above permission to the EKS worker node's IAM role. Refer
to [this issue](https://github.com/awslabs/aws-glue-schema-registry/issues/69) for any updates.
Run `helm install datahub datahub/ --values datahub/quickstart-values.yaml` to apply the changes.
Note, you will be seeing log "Schema Version Id is null. Trying to register the schema" on every request. This log is
misleading, so should be ignored. Schemas are cached, so it does not register a new version on every request (aka no
performance issues). This has been fixed by [this PR](https://github.com/awslabs/aws-glue-schema-registry/pull/64) but
the code has not been released yet. We will update version once a new release is out.

View File

@ -4,6 +4,7 @@ dependencies {
compile project(':metadata-io')
compile project(':gms:impl')
compile project(':metadata-dao-impl:kafka-producer')
compile externalDependency.awsGlueSchemaRegistrySerde
compile externalDependency.elasticSearchRest
compile externalDependency.httpClient
compile externalDependency.gson

View File

@ -0,0 +1,83 @@
package com.linkedin.gms.factory.kafka;
import com.linkedin.gms.factory.kafka.schemaregistry.AwsGlueSchemaRegistryFactory;
import com.linkedin.gms.factory.kafka.schemaregistry.KafkaSchemaRegistryFactory;
import com.linkedin.gms.factory.kafka.schemaregistry.SchemaRegistryConfig;
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Lazy;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
@Slf4j
@Configuration
@EnableConfigurationProperties(KafkaProperties.class)
@Import({KafkaSchemaRegistryFactory.class, AwsGlueSchemaRegistryFactory.class})
public class KafkaEventConsumerFactory {
@Value("${KAFKA_BOOTSTRAP_SERVER:http://localhost:9092}")
private String kafkaBootstrapServers;
@Value("${SCHEMA_REGISTRY_TYPE:KAFKA}")
private String schemaRegistryType;
@Autowired
@Lazy
@Qualifier("kafkaSchemaRegistry")
private SchemaRegistryConfig kafkaSchemaRegistryConfig;
@Autowired
@Lazy
@Qualifier("awsGlueSchemaRegistry")
private SchemaRegistryConfig awsGlueSchemaRegistryConfig;
@Bean(name = "kafkaEventConsumer")
protected KafkaListenerContainerFactory<?> createInstance(KafkaProperties properties) {
KafkaProperties.Consumer consumerProps = properties.getConsumer();
// Specify (de)serializers for record keys and for record values.
consumerProps.setKeyDeserializer(StringDeserializer.class);
// Records will be flushed every 10 seconds.
consumerProps.setEnableAutoCommit(true);
consumerProps.setAutoCommitInterval(Duration.ofSeconds(10));
// KAFKA_BOOTSTRAP_SERVER has precedence over SPRING_KAFKA_BOOTSTRAP_SERVERS
if (kafkaBootstrapServers != null && kafkaBootstrapServers.length() > 0) {
consumerProps.setBootstrapServers(Arrays.asList(kafkaBootstrapServers.split(",")));
} // else we rely on KafkaProperties which defaults to localhost:9092
SchemaRegistryConfig schemaRegistryConfig;
if (schemaRegistryType.equals(KafkaSchemaRegistryFactory.TYPE)) {
schemaRegistryConfig = kafkaSchemaRegistryConfig;
} else {
schemaRegistryConfig = awsGlueSchemaRegistryConfig;
}
consumerProps.setValueDeserializer(schemaRegistryConfig.getDeserializer());
Map<String, Object> props = properties.buildConsumerProperties();
props.putAll(schemaRegistryConfig.getProperties());
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
log.info("Event-based KafkaListenerContainerFactory built successfully");
return factory;
}
}

View File

@ -1,44 +1,69 @@
package com.linkedin.gms.factory.common;
package com.linkedin.gms.factory.kafka;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import com.linkedin.gms.factory.kafka.schemaregistry.AwsGlueSchemaRegistryFactory;
import com.linkedin.gms.factory.kafka.schemaregistry.KafkaSchemaRegistryFactory;
import com.linkedin.gms.factory.kafka.schemaregistry.SchemaRegistryConfig;
import java.util.Arrays;
import java.util.Map;
import org.apache.avro.generic.IndexedRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Lazy;
@Configuration
@EnableConfigurationProperties(KafkaProperties.class)
@Import({KafkaSchemaRegistryFactory.class, AwsGlueSchemaRegistryFactory.class})
public class KafkaEventProducerFactory {
@Value("${KAFKA_BOOTSTRAP_SERVER:http://localhost:9092}")
private String kafkaBootstrapServers;
@Value("${KAFKA_SCHEMAREGISTRY_URL:http://localhost:8081}")
private String kafkaSchemaRegistryUrl;
@Value("${SCHEMA_REGISTRY_TYPE:KAFKA}")
private String schemaRegistryType;
@Autowired
@Lazy
@Qualifier("kafkaSchemaRegistry")
private SchemaRegistryConfig kafkaSchemaRegistryConfig;
@Autowired
@Lazy
@Qualifier("awsGlueSchemaRegistry")
private SchemaRegistryConfig awsGlueSchemaRegistryConfig;
@Bean(name = "kafkaEventProducer")
protected Producer<String, IndexedRecord> createInstance(KafkaProperties properties) {
KafkaProperties.Producer producerProps = properties.getProducer();
producerProps.setKeySerializer(StringSerializer.class);
producerProps.setValueSerializer(KafkaAvroSerializer.class);
// KAFKA_BOOTSTRAP_SERVER has precedence over SPRING_KAFKA_BOOTSTRAP_SERVERS
if (kafkaBootstrapServers != null && kafkaBootstrapServers.length() > 0) {
producerProps.setBootstrapServers(Arrays.asList(kafkaBootstrapServers.split(",")));
} // else we rely on KafkaProperties which defaults to localhost:9092
Map<String, Object> props = properties.buildProducerProperties();
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaSchemaRegistryUrl);
SchemaRegistryConfig schemaRegistryConfig;
if (schemaRegistryType.equals(KafkaSchemaRegistryFactory.TYPE)) {
schemaRegistryConfig = kafkaSchemaRegistryConfig;
} else {
schemaRegistryConfig = awsGlueSchemaRegistryConfig;
}
return new KafkaProducer(props);
Map<String, Object> props = properties.buildProducerProperties();
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, schemaRegistryConfig.getSerializer().getName());
props.putAll(schemaRegistryConfig.getProperties());
return new KafkaProducer<>(props);
}
}

View File

@ -0,0 +1,55 @@
package com.linkedin.gms.factory.kafka;
import com.linkedin.gms.factory.kafka.schemaregistry.AwsGlueSchemaRegistryFactory;
import com.linkedin.gms.factory.kafka.schemaregistry.KafkaSchemaRegistryFactory;
import java.time.Duration;
import java.util.Arrays;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
@Slf4j
@Configuration
@EnableConfigurationProperties(KafkaProperties.class)
@Import({KafkaSchemaRegistryFactory.class, AwsGlueSchemaRegistryFactory.class})
public class SimpleKafkaConsumerFactory {
@Value("${KAFKA_BOOTSTRAP_SERVER:http://localhost:9092}")
private String kafkaBootstrapServers;
@Bean(name = "simpleKafkaConsumer")
protected KafkaListenerContainerFactory<?> createInstance(KafkaProperties properties) {
KafkaProperties.Consumer consumerProps = properties.getConsumer();
// Specify (de)serializers for record keys and for record values.
consumerProps.setKeyDeserializer(StringDeserializer.class);
consumerProps.setValueDeserializer(StringDeserializer.class);
// Records will be flushed every 10 seconds.
consumerProps.setEnableAutoCommit(true);
consumerProps.setAutoCommitInterval(Duration.ofSeconds(10));
// KAFKA_BOOTSTRAP_SERVER has precedence over SPRING_KAFKA_BOOTSTRAP_SERVERS
if (kafkaBootstrapServers != null && kafkaBootstrapServers.length() > 0) {
consumerProps.setBootstrapServers(Arrays.asList(kafkaBootstrapServers.split(",")));
} // else we rely on KafkaProperties which defaults to localhost:9092
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(properties.buildConsumerProperties()));
log.info("Simple KafkaListenerContainerFactory built successfully");
return factory;
}
}

View File

@ -0,0 +1,41 @@
package com.linkedin.gms.factory.kafka.schemaregistry;
import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer;
import com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistryKafkaSerializer;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import com.amazonaws.services.schemaregistry.utils.AvroRecordType;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class AwsGlueSchemaRegistryFactory {
public static final String TYPE = "AWS_GLUE";
@Value("${AWS_GLUE_SCHEMA_REGISTRY_REGION:us-east-1}")
private String awsRegion;
@Value("${AWS_GLUE_SCHEMA_REGISTRY_NAME:#{null}}")
private Optional<String> registryName;
@Bean(name = "awsGlueSchemaRegistry")
@Nonnull
protected SchemaRegistryConfig getInstance() {
Map<String, Object> props = new HashMap<>();
props.put(AWSSchemaRegistryConstants.AWS_REGION, awsRegion);
props.put(AWSSchemaRegistryConstants.DATA_FORMAT, "AVRO");
props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true");
props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());
registryName.ifPresent(s -> props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, s));
log.info("Creating AWS Glue registry");
return new SchemaRegistryConfig(GlueSchemaRegistryKafkaSerializer.class, GlueSchemaRegistryKafkaDeserializer.class,
props);
}
}

View File

@ -0,0 +1,33 @@
package com.linkedin.gms.factory.kafka.schemaregistry;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class KafkaSchemaRegistryFactory {
public static final String TYPE = "KAFKA";
@Value("${KAFKA_SCHEMAREGISTRY_URL:http://localhost:8081}")
private String kafkaSchemaRegistryUrl;
@Bean(name = "kafkaSchemaRegistry")
@Nonnull
protected SchemaRegistryConfig getInstance() {
Map<String, Object> props = new HashMap<>();
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaSchemaRegistryUrl);
log.info("Creating kafka registry");
return new SchemaRegistryConfig(KafkaAvroSerializer.class, KafkaAvroDeserializer.class, props);
}
}

View File

@ -0,0 +1,12 @@
package com.linkedin.gms.factory.kafka.schemaregistry;
import java.util.Map;
import lombok.Data;
@Data
public class SchemaRegistryConfig {
private final Class<?> serializer;
private final Class<?> deserializer;
private final Map<String, Object> properties;
}

View File

@ -38,7 +38,7 @@ public class DataHubUsageEventsProcessor {
}
@KafkaListener(id = "${DATAHUB_USAGE_EVENT_KAFKA_CONSUMER_GROUP_ID:datahub-usage-event-consumer-job-client}", topics =
"${DATAHUB_USAGE_EVENT_NAME:" + Topics.DATAHUB_USAGE_EVENT + "}", containerFactory = "stringSerializedKafkaListener")
"${DATAHUB_USAGE_EVENT_NAME:" + Topics.DATAHUB_USAGE_EVENT + "}", containerFactory = "simpleKafkaConsumer")
public void consume(final ConsumerRecord<String, String> consumerRecord) {
final String record = consumerRecord.value();
log.debug("Got DHUE");

View File

@ -5,6 +5,7 @@ import com.linkedin.data.element.DataElement;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.gms.factory.common.GraphServiceFactory;
import com.linkedin.gms.factory.common.SystemMetadataServiceFactory;
import com.linkedin.gms.factory.kafka.KafkaEventConsumerFactory;
import com.linkedin.gms.factory.search.SearchServiceFactory;
import com.linkedin.gms.factory.usage.UsageServiceFactory;
import com.linkedin.metadata.EventUtils;
@ -58,7 +59,7 @@ import static com.linkedin.metadata.dao.Neo4jUtil.createRelationshipFilter;
@Component
@Conditional(MetadataChangeLogProcessorCondition.class)
@Import({GraphServiceFactory.class, SearchServiceFactory.class, UsageServiceFactory.class,
SystemMetadataServiceFactory.class})
SystemMetadataServiceFactory.class, KafkaEventConsumerFactory.class})
@EnableKafka
public class MetadataAuditEventsProcessor {
@ -82,7 +83,7 @@ public class MetadataAuditEventsProcessor {
}
@KafkaListener(id = "${KAFKA_CONSUMER_GROUP_ID:mae-consumer-job-client}", topics = "${KAFKA_TOPIC_NAME:"
+ Topics.METADATA_AUDIT_EVENT + "}", containerFactory = "avroSerializedKafkaListener")
+ Topics.METADATA_AUDIT_EVENT + "}", containerFactory = "kafkaEventConsumer")
public void consume(final ConsumerRecord<String, GenericRecord> consumerRecord) {
final GenericRecord record = consumerRecord.value();
log.debug("Got MAE");

View File

@ -7,6 +7,7 @@ import com.linkedin.data.template.RecordTemplate;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.gms.factory.common.GraphServiceFactory;
import com.linkedin.gms.factory.entityregistry.EntityRegistryFactory;
import com.linkedin.gms.factory.kafka.KafkaEventConsumerFactory;
import com.linkedin.gms.factory.search.SearchServiceFactory;
import com.linkedin.gms.factory.timeseries.TimeseriesAspectServiceFactory;
import com.linkedin.metadata.EventUtils;
@ -56,7 +57,7 @@ import static com.linkedin.metadata.dao.Neo4jUtil.createRelationshipFilter;
@Component
@Conditional(MetadataChangeLogProcessorCondition.class)
@Import({GraphServiceFactory.class, SearchServiceFactory.class, TimeseriesAspectServiceFactory.class,
EntityRegistryFactory.class})
EntityRegistryFactory.class, KafkaEventConsumerFactory.class})
@EnableKafka
public class MetadataChangeLogProcessor {
@ -79,7 +80,7 @@ public class MetadataChangeLogProcessor {
@KafkaListener(id = "${METADATA_CHANGE_LOG_KAFKA_CONSUMER_GROUP_ID:generic-mae-consumer-job-client}", topics = {
"${METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME:" + Topics.METADATA_CHANGE_LOG_VERSIONED + "}",
"${METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME:" + Topics.METADATA_CHANGE_LOG_TIMESERIES
+ "}"}, containerFactory = "avroSerializedKafkaListener")
+ "}"}, containerFactory = "kafkaEventConsumer")
public void consume(final ConsumerRecord<String, GenericRecord> consumerRecord) {
final GenericRecord record = consumerRecord.value();
log.debug("Got Generic MCL");

View File

@ -1,66 +0,0 @@
package com.linkedin.metadata.kafka.config;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
@Slf4j
@Configuration
public class MaeKafkaConfig {
@Value("${KAFKA_BOOTSTRAP_SERVER:http://localhost:9092}")
private String kafkaBootstrapServer;
@Value("${KAFKA_SCHEMAREGISTRY_URL:http://localhost:8081}")
private String kafkaSchemaRegistryUrl;
@Bean(name = "avroSerializedKafkaListener")
public KafkaListenerContainerFactory<?> avroSerializedKafkaListenerContainerFactory(KafkaProperties properties) {
return createKafkaListenerContainerFactory(properties, KafkaAvroDeserializer.class);
}
@Bean(name = "stringSerializedKafkaListener")
public KafkaListenerContainerFactory<?> stringSerializedKafkaListenerContainerFactory(KafkaProperties properties) {
return createKafkaListenerContainerFactory(properties, StringDeserializer.class);
}
public KafkaListenerContainerFactory<?> createKafkaListenerContainerFactory(KafkaProperties properties, Class<?> valueDeserializer) {
KafkaProperties.Consumer consumerProps = properties.getConsumer();
// Specify (de)serializers for record keys and for record values.
consumerProps.setKeyDeserializer(StringDeserializer.class);
consumerProps.setValueDeserializer(valueDeserializer);
// Records will be flushed every 10 seconds.
consumerProps.setEnableAutoCommit(true);
consumerProps.setAutoCommitInterval(Duration.ofSeconds(10));
Map<String, Object> props = properties.buildConsumerProperties();
// KAFKA_BOOTSTRAP_SERVER has precedence over SPRING_KAFKA_BOOTSTRAP_SERVERS
if (kafkaBootstrapServer != null && kafkaBootstrapServer.length() > 0) {
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList(kafkaBootstrapServer.split(",")));
} // else we rely on KafkaProperties which defaults to localhost:9092
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaSchemaRegistryUrl);
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
log.info("KafkaListenerContainerFactory built successfully");
return factory;
}
}

View File

@ -20,6 +20,9 @@ dependencies {
compile project(':metadata-events:mxe-utils-avro-1.7')
compile project(':metadata-io')
compile project(':gms:client')
compile (project(':gms:factories')) {
exclude group: 'org.neo4j.test'
}
compile spec.product.pegasus.restliClient
compile spec.product.pegasus.restliCommon
compile externalDependency.elasticSearchRest

View File

@ -1,8 +1,11 @@
package com.linkedin.metadata.kafka;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.entity.Entity;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.gms.factory.kafka.KafkaEventConsumerFactory;
import com.linkedin.gms.factory.kafka.KafkaEventProducerFactory;
import com.linkedin.metadata.EventUtils;
import com.linkedin.metadata.kafka.config.EntityClientConfig;
import com.linkedin.metadata.kafka.config.MetadataChangeProposalProcessorCondition;
import com.linkedin.metadata.snapshot.Snapshot;
import com.linkedin.mxe.FailedMetadataChangeEvent;
@ -11,15 +14,19 @@ import com.linkedin.mxe.Topics;
import com.linkedin.r2.RemoteInvocationException;
import java.io.IOException;
import javax.annotation.Nonnull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@ -27,23 +34,18 @@ import org.springframework.stereotype.Component;
@Component
@Conditional(MetadataChangeProposalProcessorCondition.class)
@EnableKafka
@Import({EntityClientConfig.class, KafkaEventConsumerFactory.class, KafkaEventProducerFactory.class})
@RequiredArgsConstructor
public class MetadataChangeEventsProcessor {
private EntityClient entityClient;
private KafkaTemplate<String, GenericRecord> kafkaTemplate;
private final EntityClient entityClient;
private final Producer<String, IndexedRecord> kafkaProducer;
@Value("${KAFKA_FMCE_TOPIC_NAME:" + Topics.FAILED_METADATA_CHANGE_EVENT + "}")
private String fmceTopicName;
public MetadataChangeEventsProcessor(
@Nonnull final EntityClient entityClient,
@Nonnull final KafkaTemplate<String, GenericRecord> kafkaTemplate) {
this.entityClient = entityClient;
this.kafkaTemplate = kafkaTemplate;
}
@KafkaListener(id = "${KAFKA_CONSUMER_GROUP_ID:mce-consumer-job-client}", topics = "${KAFKA_MCE_TOPIC_NAME:"
+ Topics.METADATA_CHANGE_EVENT + "}", containerFactory = "mceKafkaContainerFactory")
+ Topics.METADATA_CHANGE_EVENT + "}", containerFactory = "kafkaEventConsumer")
public void consume(final ConsumerRecord<String, GenericRecord> consumerRecord) {
final GenericRecord record = consumerRecord.value();
log.debug("Record ", record);
@ -69,7 +71,7 @@ public class MetadataChangeEventsProcessor {
final GenericRecord genericFailedMCERecord = EventUtils.pegasusToAvroFailedMCE(failedMetadataChangeEvent);
log.debug("Sending FailedMessages to topic - {}", fmceTopicName);
log.info("Error while processing MCE: FailedMetadataChangeEvent - {}", failedMetadataChangeEvent);
this.kafkaTemplate.send(fmceTopicName, genericFailedMCERecord);
kafkaProducer.send(new ProducerRecord<>(fmceTopicName, genericFailedMCERecord));
} catch (IOException e) {
log.error("Error while sending FailedMetadataChangeEvent: Exception - {}, FailedMetadataChangeEvent - {}",
e.getStackTrace(), failedMetadataChangeEvent);
@ -85,7 +87,8 @@ public class MetadataChangeEventsProcessor {
return fmce;
}
private void processProposedSnapshot(@Nonnull MetadataChangeEvent metadataChangeEvent) throws RemoteInvocationException {
private void processProposedSnapshot(@Nonnull MetadataChangeEvent metadataChangeEvent)
throws RemoteInvocationException {
final Snapshot snapshotUnion = metadataChangeEvent.getProposedSnapshot();
final Entity entity = new Entity().setValue(snapshotUnion);
entityClient.updateWithSystemMetadata(entity, metadataChangeEvent.getSystemMetadata());

View File

@ -1,22 +1,29 @@
package com.linkedin.metadata.kafka;
import com.linkedin.entity.client.AspectClient;
import com.linkedin.gms.factory.kafka.KafkaEventConsumerFactory;
import com.linkedin.gms.factory.kafka.KafkaEventProducerFactory;
import com.linkedin.metadata.EventUtils;
import com.linkedin.metadata.kafka.config.EntityClientConfig;
import com.linkedin.metadata.kafka.config.MetadataChangeProposalProcessorCondition;
import com.linkedin.mxe.FailedMetadataChangeProposal;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.mxe.Topics;
import java.io.IOException;
import javax.annotation.Nonnull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@ -24,24 +31,19 @@ import org.springframework.stereotype.Component;
@Component
@Conditional(MetadataChangeProposalProcessorCondition.class)
@EnableKafka
@Import({EntityClientConfig.class, KafkaEventConsumerFactory.class, KafkaEventProducerFactory.class})
@RequiredArgsConstructor
public class MetadataChangeProposalsProcessor {
private AspectClient aspectClient;
private KafkaTemplate<String, GenericRecord> kafkaTemplate;
private final AspectClient aspectClient;
private final Producer<String, IndexedRecord> kafkaProducer;
@Value("${FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME:" + Topics.FAILED_METADATA_CHANGE_PROPOSAL + "}")
private String fmcpTopicName;
public MetadataChangeProposalsProcessor(
@Nonnull final AspectClient aspectClient,
@Nonnull final KafkaTemplate<String, GenericRecord> kafkaTemplate) {
this.aspectClient = aspectClient;
this.kafkaTemplate = kafkaTemplate;
}
@KafkaListener(id = "${METADATA_CHANGE_PROPOSAL_KAFKA_CONSUMER_GROUP_ID:generic-mce-consumer-job-client}", topics =
"${METADATA_CHANGE_PROPOSAL_TOPIC_NAME:" + Topics.METADATA_CHANGE_PROPOSAL
+ "}", containerFactory = "mceKafkaContainerFactory")
+ "}", containerFactory = "kafkaEventConsumer")
public void consume(final ConsumerRecord<String, GenericRecord> consumerRecord) {
final GenericRecord record = consumerRecord.value();
log.debug("Record {}", record);
@ -64,7 +66,7 @@ public class MetadataChangeProposalsProcessor {
final GenericRecord genericFailedMCERecord = EventUtils.pegasusToAvroFailedMCP(failedMetadataChangeProposal);
log.debug("Sending FailedMessages to topic - {}", fmcpTopicName);
log.info("Error while processing FMCP: FailedMetadataChangeProposal - {}", failedMetadataChangeProposal);
this.kafkaTemplate.send(fmcpTopicName, genericFailedMCERecord);
kafkaProducer.send(new ProducerRecord<>(fmcpTopicName, genericFailedMCERecord));
} catch (IOException e) {
log.error("Error while sending FailedMetadataChangeProposal: Exception - {}, FailedMetadataChangeProposal - {}",
e.getStackTrace(), failedMetadataChangeProposal);

View File

@ -1,90 +0,0 @@
package com.linkedin.metadata.kafka.config;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ErrorHandler;
@Slf4j
@Configuration
public class MceKafkaConfig {
@Value("${KAFKA_BOOTSTRAP_SERVER:http://localhost:9092}")
private String kafkaBootstrapServers;
@Value("${KAFKA_SCHEMAREGISTRY_URL:http://localhost:8081}")
private String kafkaSchemaRegistryUrl;
@Bean(name = "mceKafkaContainerFactory")
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory(KafkaProperties properties) {
KafkaProperties.Consumer consumerProps = properties.getConsumer();
// Specify (de)serializers for record keys and for record values.
consumerProps.setKeyDeserializer(StringDeserializer.class);
consumerProps.setValueDeserializer(KafkaAvroDeserializer.class);
// Records will be flushed every 10 seconds.
consumerProps.setEnableAutoCommit(true);
consumerProps.setAutoCommitInterval(Duration.ofSeconds(10));
// KAFKA_BOOTSTRAP_SERVER has precedence over SPRING_KAFKA_BOOTSTRAP_SERVERS
if (kafkaBootstrapServers != null && kafkaBootstrapServers.length() > 0) {
consumerProps.setBootstrapServers(Arrays.asList(kafkaBootstrapServers.split(",")));
} // else we rely on KafkaProperties which defaults to localhost:9092
Map<String, Object> props = properties.buildConsumerProperties();
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaSchemaRegistryUrl);
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
log.info("KafkaListenerContainerFactory built successfully");
return factory;
}
@Bean
public KafkaTemplate<String, GenericRecord> kafkaTemplate(KafkaProperties properties) {
KafkaProperties.Producer producerProps = properties.getProducer();
producerProps.setKeySerializer(StringSerializer.class);
producerProps.setValueSerializer(KafkaAvroSerializer.class);
// KAFKA_BOOTSTRAP_SERVER has precedence over SPRING_KAFKA_BOOTSTRAP_SERVERS
if (kafkaBootstrapServers != null && kafkaBootstrapServers.length() > 0) {
producerProps.setBootstrapServers(Arrays.asList(kafkaBootstrapServers.split(",")));
} // else we rely on KafkaProperties which defaults to localhost:9092
Map<String, Object> props = properties.buildProducerProperties();
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaSchemaRegistryUrl);
KafkaTemplate<String, GenericRecord> template =
new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
log.info("KafkaTemplate built successfully");
return template;
}
@Bean
public ErrorHandler errorHandler() {
return (e, r) -> log.error("Exception caught during Deserialization, topic: {}, partition: {}, offset: {}",
r.topic(), r.partition(), r.offset(), e);
}
}