From 2df9d4fed675141f9a4600bd032a96e441fc3516 Mon Sep 17 00:00:00 2001 From: Dexter Lee Date: Wed, 11 Aug 2021 21:40:37 -0700 Subject: [PATCH] feat: add support for AWS glue schema registry (#3083) --- build.gradle | 1 + docs/deploy/aws.md | 86 +++++++++++++++++- gms/factories/build.gradle | 1 + .../kafka/KafkaEventConsumerFactory.java | 83 +++++++++++++++++ .../KafkaEventProducerFactory.java | 47 +++++++--- .../kafka/SimpleKafkaConsumerFactory.java | 55 ++++++++++++ .../AwsGlueSchemaRegistryFactory.java | 41 +++++++++ .../KafkaSchemaRegistryFactory.java | 33 +++++++ .../schemaregistry/SchemaRegistryConfig.java | 12 +++ .../kafka/DataHubUsageEventsProcessor.java | 2 +- .../kafka/MetadataAuditEventsProcessor.java | 5 +- .../kafka/MetadataChangeLogProcessor.java | 5 +- .../metadata/kafka/config/MaeKafkaConfig.java | 66 -------------- metadata-jobs/mce-consumer/build.gradle | 3 + .../kafka/MetadataChangeEventsProcessor.java | 31 ++++--- .../MetadataChangeProposalsProcessor.java | 26 +++--- .../metadata/kafka/config/MceKafkaConfig.java | 90 ------------------- 17 files changed, 387 insertions(+), 200 deletions(-) create mode 100644 gms/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java rename gms/factories/src/main/java/com/linkedin/gms/factory/{common => kafka}/KafkaEventProducerFactory.java (51%) create mode 100644 gms/factories/src/main/java/com/linkedin/gms/factory/kafka/SimpleKafkaConsumerFactory.java create mode 100644 gms/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/AwsGlueSchemaRegistryFactory.java create mode 100644 gms/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/KafkaSchemaRegistryFactory.java create mode 100644 gms/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/SchemaRegistryConfig.java delete mode 100644 metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/config/MaeKafkaConfig.java delete mode 100644 metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/config/MceKafkaConfig.java diff --git a/build.gradle b/build.gradle index 22e22ec069..912e3cd001 100644 --- a/build.gradle +++ b/build.gradle @@ -35,6 +35,7 @@ project.ext.externalDependency = [ 'assertJ': 'org.assertj:assertj-core:3.11.1', 'avro_1_7': 'org.apache.avro:avro:1.7.7', 'avroCompiler_1_7': 'org.apache.avro:avro-compiler:1.7.7', + 'awsGlueSchemaRegistrySerde': 'software.amazon.glue:schema-registry-serde:1.1.1', 'cacheApi' : 'javax.cache:cache-api:1.1.0', 'commonsIo': 'commons-io:commons-io:2.4', 'commonsLang': 'commons-lang:commons-lang:2.6', diff --git a/docs/deploy/aws.md b/docs/deploy/aws.md index 8bbf9bcc0e..f9695f6b89 100644 --- a/docs/deploy/aws.md +++ b/docs/deploy/aws.md @@ -247,7 +247,7 @@ You can also allow communication via HTTP (without SSL) by using the settings be indexPrefix: demo ``` -Lastly, you need to set the following env variable for **elasticsearchSetupJob**. +Lastly, you need to set the following env variable for **elasticsearchSetupJob**. ``` elasticsearchSetupJob: @@ -282,4 +282,86 @@ kafka: url: "http://prerequisites-cp-schema-registry:8081" ``` -Run `helm install datahub datahub/ --values datahub/quickstart-values.yaml` to apply the changes. +Run `helm install datahub datahub/ --values datahub/quickstart-values.yaml` to apply the changes. + +### AWS Glue Schema Registry + +You can use AWS Glue schema registry instead of the kafka schema registry. To do so, first provision an AWS Glue schema +registry in the "Schema Registry" tab in the AWS Glue console page. + +Once the registry is provisioned, you can change helm chart as follows. + +``` +kafka: + bootstrap: + ... + zookeeper: + ... + schemaregistry: + type: AWS_GLUE + glue: + region: <> + registry: <> +``` + +Note, it will use the name of the topic as the schema name in the registry. + +Before you update the pods, you need to give the k8s worker nodes the correct permissions to access the schema registry. + +The minimum permissions required looks like this + +``` +{ + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "VisualEditor0", + "Effect": "Allow", + "Action": [ + "glue:GetRegistry", + "glue:ListRegistries", + "glue:CreateSchema", + "glue:UpdateSchema", + "glue:GetSchema", + "glue:ListSchemas", + "glue:RegisterSchemaVersion", + "glue:GetSchemaByDefinition", + "glue:GetSchemaVersion", + "glue:GetSchemaVersionsDiff", + "glue:ListSchemaVersions", + "glue:CheckSchemaVersionValidity", + "glue:PutSchemaVersionMetadata", + "glue:QuerySchemaVersionMetadata" + ], + "Resource": [ + "arn:aws:glue:*:795586375822:schema/*", + "arn:aws:glue:us-west-2:795586375822:registry/demo-shared" + ] + }, + { + "Sid": "VisualEditor1", + "Effect": "Allow", + "Action": [ + "glue:GetSchemaVersion" + ], + "Resource": [ + "*" + ] + } + ] +} +``` + +The latter part is required to have "*" as the resource because of an issue in the AWS Glue schema registry library. +Refer to [this issue](https://github.com/awslabs/aws-glue-schema-registry/issues/68) for any updates. + +Glue currently doesn't support AWS Signature V4. As such, we cannot use service accounts to give permissions to access +the schema registry. The workaround is to give the above permission to the EKS worker node's IAM role. Refer +to [this issue](https://github.com/awslabs/aws-glue-schema-registry/issues/69) for any updates. + +Run `helm install datahub datahub/ --values datahub/quickstart-values.yaml` to apply the changes. + +Note, you will be seeing log "Schema Version Id is null. Trying to register the schema" on every request. This log is +misleading, so should be ignored. Schemas are cached, so it does not register a new version on every request (aka no +performance issues). This has been fixed by [this PR](https://github.com/awslabs/aws-glue-schema-registry/pull/64) but +the code has not been released yet. We will update version once a new release is out. diff --git a/gms/factories/build.gradle b/gms/factories/build.gradle index ed8e7a6e14..470aef8307 100644 --- a/gms/factories/build.gradle +++ b/gms/factories/build.gradle @@ -4,6 +4,7 @@ dependencies { compile project(':metadata-io') compile project(':gms:impl') compile project(':metadata-dao-impl:kafka-producer') + compile externalDependency.awsGlueSchemaRegistrySerde compile externalDependency.elasticSearchRest compile externalDependency.httpClient compile externalDependency.gson diff --git a/gms/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java b/gms/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java new file mode 100644 index 0000000000..261d7ce325 --- /dev/null +++ b/gms/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java @@ -0,0 +1,83 @@ +package com.linkedin.gms.factory.kafka; + +import com.linkedin.gms.factory.kafka.schemaregistry.AwsGlueSchemaRegistryFactory; +import com.linkedin.gms.factory.kafka.schemaregistry.KafkaSchemaRegistryFactory; +import com.linkedin.gms.factory.kafka.schemaregistry.SchemaRegistryConfig; +import java.time.Duration; +import java.util.Arrays; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.apache.avro.generic.GenericRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.context.annotation.Lazy; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; + + +@Slf4j +@Configuration +@EnableConfigurationProperties(KafkaProperties.class) +@Import({KafkaSchemaRegistryFactory.class, AwsGlueSchemaRegistryFactory.class}) +public class KafkaEventConsumerFactory { + + @Value("${KAFKA_BOOTSTRAP_SERVER:http://localhost:9092}") + private String kafkaBootstrapServers; + + @Value("${SCHEMA_REGISTRY_TYPE:KAFKA}") + private String schemaRegistryType; + + @Autowired + @Lazy + @Qualifier("kafkaSchemaRegistry") + private SchemaRegistryConfig kafkaSchemaRegistryConfig; + + @Autowired + @Lazy + @Qualifier("awsGlueSchemaRegistry") + private SchemaRegistryConfig awsGlueSchemaRegistryConfig; + + @Bean(name = "kafkaEventConsumer") + protected KafkaListenerContainerFactory createInstance(KafkaProperties properties) { + + KafkaProperties.Consumer consumerProps = properties.getConsumer(); + + // Specify (de)serializers for record keys and for record values. + consumerProps.setKeyDeserializer(StringDeserializer.class); + // Records will be flushed every 10 seconds. + consumerProps.setEnableAutoCommit(true); + consumerProps.setAutoCommitInterval(Duration.ofSeconds(10)); + + // KAFKA_BOOTSTRAP_SERVER has precedence over SPRING_KAFKA_BOOTSTRAP_SERVERS + if (kafkaBootstrapServers != null && kafkaBootstrapServers.length() > 0) { + consumerProps.setBootstrapServers(Arrays.asList(kafkaBootstrapServers.split(","))); + } // else we rely on KafkaProperties which defaults to localhost:9092 + + SchemaRegistryConfig schemaRegistryConfig; + if (schemaRegistryType.equals(KafkaSchemaRegistryFactory.TYPE)) { + schemaRegistryConfig = kafkaSchemaRegistryConfig; + } else { + schemaRegistryConfig = awsGlueSchemaRegistryConfig; + } + + consumerProps.setValueDeserializer(schemaRegistryConfig.getDeserializer()); + Map props = properties.buildConsumerProperties(); + props.putAll(schemaRegistryConfig.getProperties()); + + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props)); + + log.info("Event-based KafkaListenerContainerFactory built successfully"); + + return factory; + } +} diff --git a/gms/factories/src/main/java/com/linkedin/gms/factory/common/KafkaEventProducerFactory.java b/gms/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventProducerFactory.java similarity index 51% rename from gms/factories/src/main/java/com/linkedin/gms/factory/common/KafkaEventProducerFactory.java rename to gms/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventProducerFactory.java index 6b168eb731..15e4fff70a 100644 --- a/gms/factories/src/main/java/com/linkedin/gms/factory/common/KafkaEventProducerFactory.java +++ b/gms/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventProducerFactory.java @@ -1,44 +1,69 @@ -package com.linkedin.gms.factory.common; +package com.linkedin.gms.factory.kafka; -import io.confluent.kafka.serializers.KafkaAvroSerializer; -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import com.linkedin.gms.factory.kafka.schemaregistry.AwsGlueSchemaRegistryFactory; +import com.linkedin.gms.factory.kafka.schemaregistry.KafkaSchemaRegistryFactory; +import com.linkedin.gms.factory.kafka.schemaregistry.SchemaRegistryConfig; import java.util.Arrays; import java.util.Map; import org.apache.avro.generic.IndexedRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; -import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.context.annotation.Lazy; + @Configuration @EnableConfigurationProperties(KafkaProperties.class) +@Import({KafkaSchemaRegistryFactory.class, AwsGlueSchemaRegistryFactory.class}) public class KafkaEventProducerFactory { @Value("${KAFKA_BOOTSTRAP_SERVER:http://localhost:9092}") private String kafkaBootstrapServers; - @Value("${KAFKA_SCHEMAREGISTRY_URL:http://localhost:8081}") - private String kafkaSchemaRegistryUrl; + @Value("${SCHEMA_REGISTRY_TYPE:KAFKA}") + private String schemaRegistryType; + + @Autowired + @Lazy + @Qualifier("kafkaSchemaRegistry") + private SchemaRegistryConfig kafkaSchemaRegistryConfig; + + @Autowired + @Lazy + @Qualifier("awsGlueSchemaRegistry") + private SchemaRegistryConfig awsGlueSchemaRegistryConfig; @Bean(name = "kafkaEventProducer") protected Producer createInstance(KafkaProperties properties) { KafkaProperties.Producer producerProps = properties.getProducer(); producerProps.setKeySerializer(StringSerializer.class); - producerProps.setValueSerializer(KafkaAvroSerializer.class); - // KAFKA_BOOTSTRAP_SERVER has precedence over SPRING_KAFKA_BOOTSTRAP_SERVERS if (kafkaBootstrapServers != null && kafkaBootstrapServers.length() > 0) { producerProps.setBootstrapServers(Arrays.asList(kafkaBootstrapServers.split(","))); } // else we rely on KafkaProperties which defaults to localhost:9092 - Map props = properties.buildProducerProperties(); - props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaSchemaRegistryUrl); + SchemaRegistryConfig schemaRegistryConfig; + if (schemaRegistryType.equals(KafkaSchemaRegistryFactory.TYPE)) { + schemaRegistryConfig = kafkaSchemaRegistryConfig; + } else { + schemaRegistryConfig = awsGlueSchemaRegistryConfig; + } - return new KafkaProducer(props); + Map props = properties.buildProducerProperties(); + + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, schemaRegistryConfig.getSerializer().getName()); + props.putAll(schemaRegistryConfig.getProperties()); + + return new KafkaProducer<>(props); } } diff --git a/gms/factories/src/main/java/com/linkedin/gms/factory/kafka/SimpleKafkaConsumerFactory.java b/gms/factories/src/main/java/com/linkedin/gms/factory/kafka/SimpleKafkaConsumerFactory.java new file mode 100644 index 0000000000..88f0a8accf --- /dev/null +++ b/gms/factories/src/main/java/com/linkedin/gms/factory/kafka/SimpleKafkaConsumerFactory.java @@ -0,0 +1,55 @@ +package com.linkedin.gms.factory.kafka; + +import com.linkedin.gms.factory.kafka.schemaregistry.AwsGlueSchemaRegistryFactory; +import com.linkedin.gms.factory.kafka.schemaregistry.KafkaSchemaRegistryFactory; +import java.time.Duration; +import java.util.Arrays; +import lombok.extern.slf4j.Slf4j; +import org.apache.avro.generic.GenericRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; + + +@Slf4j +@Configuration +@EnableConfigurationProperties(KafkaProperties.class) +@Import({KafkaSchemaRegistryFactory.class, AwsGlueSchemaRegistryFactory.class}) +public class SimpleKafkaConsumerFactory { + + @Value("${KAFKA_BOOTSTRAP_SERVER:http://localhost:9092}") + private String kafkaBootstrapServers; + + @Bean(name = "simpleKafkaConsumer") + protected KafkaListenerContainerFactory createInstance(KafkaProperties properties) { + + KafkaProperties.Consumer consumerProps = properties.getConsumer(); + + // Specify (de)serializers for record keys and for record values. + consumerProps.setKeyDeserializer(StringDeserializer.class); + consumerProps.setValueDeserializer(StringDeserializer.class); + // Records will be flushed every 10 seconds. + consumerProps.setEnableAutoCommit(true); + consumerProps.setAutoCommitInterval(Duration.ofSeconds(10)); + + // KAFKA_BOOTSTRAP_SERVER has precedence over SPRING_KAFKA_BOOTSTRAP_SERVERS + if (kafkaBootstrapServers != null && kafkaBootstrapServers.length() > 0) { + consumerProps.setBootstrapServers(Arrays.asList(kafkaBootstrapServers.split(","))); + } // else we rely on KafkaProperties which defaults to localhost:9092 + + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(properties.buildConsumerProperties())); + + log.info("Simple KafkaListenerContainerFactory built successfully"); + + return factory; + } +} diff --git a/gms/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/AwsGlueSchemaRegistryFactory.java b/gms/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/AwsGlueSchemaRegistryFactory.java new file mode 100644 index 0000000000..28d8b2ae55 --- /dev/null +++ b/gms/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/AwsGlueSchemaRegistryFactory.java @@ -0,0 +1,41 @@ +package com.linkedin.gms.factory.kafka.schemaregistry; + +import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer; +import com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistryKafkaSerializer; +import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; +import com.amazonaws.services.schemaregistry.utils.AvroRecordType; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import javax.annotation.Nonnull; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + + +@Slf4j +@Configuration +public class AwsGlueSchemaRegistryFactory { + + public static final String TYPE = "AWS_GLUE"; + + @Value("${AWS_GLUE_SCHEMA_REGISTRY_REGION:us-east-1}") + private String awsRegion; + @Value("${AWS_GLUE_SCHEMA_REGISTRY_NAME:#{null}}") + private Optional registryName; + + @Bean(name = "awsGlueSchemaRegistry") + @Nonnull + protected SchemaRegistryConfig getInstance() { + Map props = new HashMap<>(); + props.put(AWSSchemaRegistryConstants.AWS_REGION, awsRegion); + props.put(AWSSchemaRegistryConstants.DATA_FORMAT, "AVRO"); + props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true"); + props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); + registryName.ifPresent(s -> props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, s)); + log.info("Creating AWS Glue registry"); + return new SchemaRegistryConfig(GlueSchemaRegistryKafkaSerializer.class, GlueSchemaRegistryKafkaDeserializer.class, + props); + } +} diff --git a/gms/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/KafkaSchemaRegistryFactory.java b/gms/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/KafkaSchemaRegistryFactory.java new file mode 100644 index 0000000000..f2c97b1fd5 --- /dev/null +++ b/gms/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/KafkaSchemaRegistryFactory.java @@ -0,0 +1,33 @@ +package com.linkedin.gms.factory.kafka.schemaregistry; + +import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import io.confluent.kafka.serializers.KafkaAvroSerializer; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nonnull; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + + +@Slf4j +@Configuration +public class KafkaSchemaRegistryFactory { + + public static final String TYPE = "KAFKA"; + + @Value("${KAFKA_SCHEMAREGISTRY_URL:http://localhost:8081}") + private String kafkaSchemaRegistryUrl; + + @Bean(name = "kafkaSchemaRegistry") + @Nonnull + protected SchemaRegistryConfig getInstance() { + Map props = new HashMap<>(); + props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaSchemaRegistryUrl); + + log.info("Creating kafka registry"); + return new SchemaRegistryConfig(KafkaAvroSerializer.class, KafkaAvroDeserializer.class, props); + } +} diff --git a/gms/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/SchemaRegistryConfig.java b/gms/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/SchemaRegistryConfig.java new file mode 100644 index 0000000000..1e2962bbda --- /dev/null +++ b/gms/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/SchemaRegistryConfig.java @@ -0,0 +1,12 @@ +package com.linkedin.gms.factory.kafka.schemaregistry; + +import java.util.Map; +import lombok.Data; + + +@Data +public class SchemaRegistryConfig { + private final Class serializer; + private final Class deserializer; + private final Map properties; +} diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/DataHubUsageEventsProcessor.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/DataHubUsageEventsProcessor.java index 424d5b9e3f..a0eab40310 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/DataHubUsageEventsProcessor.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/DataHubUsageEventsProcessor.java @@ -38,7 +38,7 @@ public class DataHubUsageEventsProcessor { } @KafkaListener(id = "${DATAHUB_USAGE_EVENT_KAFKA_CONSUMER_GROUP_ID:datahub-usage-event-consumer-job-client}", topics = - "${DATAHUB_USAGE_EVENT_NAME:" + Topics.DATAHUB_USAGE_EVENT + "}", containerFactory = "stringSerializedKafkaListener") + "${DATAHUB_USAGE_EVENT_NAME:" + Topics.DATAHUB_USAGE_EVENT + "}", containerFactory = "simpleKafkaConsumer") public void consume(final ConsumerRecord consumerRecord) { final String record = consumerRecord.value(); log.debug("Got DHUE"); diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataAuditEventsProcessor.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataAuditEventsProcessor.java index 6e656e786b..bccef5b180 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataAuditEventsProcessor.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataAuditEventsProcessor.java @@ -5,6 +5,7 @@ import com.linkedin.data.element.DataElement; import com.linkedin.data.template.RecordTemplate; import com.linkedin.gms.factory.common.GraphServiceFactory; import com.linkedin.gms.factory.common.SystemMetadataServiceFactory; +import com.linkedin.gms.factory.kafka.KafkaEventConsumerFactory; import com.linkedin.gms.factory.search.SearchServiceFactory; import com.linkedin.gms.factory.usage.UsageServiceFactory; import com.linkedin.metadata.EventUtils; @@ -58,7 +59,7 @@ import static com.linkedin.metadata.dao.Neo4jUtil.createRelationshipFilter; @Component @Conditional(MetadataChangeLogProcessorCondition.class) @Import({GraphServiceFactory.class, SearchServiceFactory.class, UsageServiceFactory.class, - SystemMetadataServiceFactory.class}) + SystemMetadataServiceFactory.class, KafkaEventConsumerFactory.class}) @EnableKafka public class MetadataAuditEventsProcessor { @@ -82,7 +83,7 @@ public class MetadataAuditEventsProcessor { } @KafkaListener(id = "${KAFKA_CONSUMER_GROUP_ID:mae-consumer-job-client}", topics = "${KAFKA_TOPIC_NAME:" - + Topics.METADATA_AUDIT_EVENT + "}", containerFactory = "avroSerializedKafkaListener") + + Topics.METADATA_AUDIT_EVENT + "}", containerFactory = "kafkaEventConsumer") public void consume(final ConsumerRecord consumerRecord) { final GenericRecord record = consumerRecord.value(); log.debug("Got MAE"); diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeLogProcessor.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeLogProcessor.java index 428f8e7094..44c8209a22 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeLogProcessor.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeLogProcessor.java @@ -7,6 +7,7 @@ import com.linkedin.data.template.RecordTemplate; import com.linkedin.events.metadata.ChangeType; import com.linkedin.gms.factory.common.GraphServiceFactory; import com.linkedin.gms.factory.entityregistry.EntityRegistryFactory; +import com.linkedin.gms.factory.kafka.KafkaEventConsumerFactory; import com.linkedin.gms.factory.search.SearchServiceFactory; import com.linkedin.gms.factory.timeseries.TimeseriesAspectServiceFactory; import com.linkedin.metadata.EventUtils; @@ -56,7 +57,7 @@ import static com.linkedin.metadata.dao.Neo4jUtil.createRelationshipFilter; @Component @Conditional(MetadataChangeLogProcessorCondition.class) @Import({GraphServiceFactory.class, SearchServiceFactory.class, TimeseriesAspectServiceFactory.class, - EntityRegistryFactory.class}) + EntityRegistryFactory.class, KafkaEventConsumerFactory.class}) @EnableKafka public class MetadataChangeLogProcessor { @@ -79,7 +80,7 @@ public class MetadataChangeLogProcessor { @KafkaListener(id = "${METADATA_CHANGE_LOG_KAFKA_CONSUMER_GROUP_ID:generic-mae-consumer-job-client}", topics = { "${METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME:" + Topics.METADATA_CHANGE_LOG_VERSIONED + "}", "${METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME:" + Topics.METADATA_CHANGE_LOG_TIMESERIES - + "}"}, containerFactory = "avroSerializedKafkaListener") + + "}"}, containerFactory = "kafkaEventConsumer") public void consume(final ConsumerRecord consumerRecord) { final GenericRecord record = consumerRecord.value(); log.debug("Got Generic MCL"); diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/config/MaeKafkaConfig.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/config/MaeKafkaConfig.java deleted file mode 100644 index b054f4086a..0000000000 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/config/MaeKafkaConfig.java +++ /dev/null @@ -1,66 +0,0 @@ -package com.linkedin.metadata.kafka.config; - -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; -import io.confluent.kafka.serializers.KafkaAvroDeserializer; -import java.time.Duration; -import java.util.Arrays; -import java.util.Map; -import lombok.extern.slf4j.Slf4j; -import org.apache.avro.generic.GenericRecord; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.autoconfigure.kafka.KafkaProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -import org.springframework.kafka.config.KafkaListenerContainerFactory; -import org.springframework.kafka.core.DefaultKafkaConsumerFactory; - - -@Slf4j -@Configuration -public class MaeKafkaConfig { - @Value("${KAFKA_BOOTSTRAP_SERVER:http://localhost:9092}") - private String kafkaBootstrapServer; - @Value("${KAFKA_SCHEMAREGISTRY_URL:http://localhost:8081}") - private String kafkaSchemaRegistryUrl; - - @Bean(name = "avroSerializedKafkaListener") - public KafkaListenerContainerFactory avroSerializedKafkaListenerContainerFactory(KafkaProperties properties) { - return createKafkaListenerContainerFactory(properties, KafkaAvroDeserializer.class); - } - - @Bean(name = "stringSerializedKafkaListener") - public KafkaListenerContainerFactory stringSerializedKafkaListenerContainerFactory(KafkaProperties properties) { - return createKafkaListenerContainerFactory(properties, StringDeserializer.class); - } - - public KafkaListenerContainerFactory createKafkaListenerContainerFactory(KafkaProperties properties, Class valueDeserializer) { - KafkaProperties.Consumer consumerProps = properties.getConsumer(); - - // Specify (de)serializers for record keys and for record values. - consumerProps.setKeyDeserializer(StringDeserializer.class); - consumerProps.setValueDeserializer(valueDeserializer); - // Records will be flushed every 10 seconds. - consumerProps.setEnableAutoCommit(true); - consumerProps.setAutoCommitInterval(Duration.ofSeconds(10)); - - Map props = properties.buildConsumerProperties(); - - // KAFKA_BOOTSTRAP_SERVER has precedence over SPRING_KAFKA_BOOTSTRAP_SERVERS - if (kafkaBootstrapServer != null && kafkaBootstrapServer.length() > 0) { - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList(kafkaBootstrapServer.split(","))); - } // else we rely on KafkaProperties which defaults to localhost:9092 - - props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaSchemaRegistryUrl); - - ConcurrentKafkaListenerContainerFactory factory = - new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props)); - - log.info("KafkaListenerContainerFactory built successfully"); - - return factory; - } -} diff --git a/metadata-jobs/mce-consumer/build.gradle b/metadata-jobs/mce-consumer/build.gradle index d6d60a27b4..a87b5aa78f 100644 --- a/metadata-jobs/mce-consumer/build.gradle +++ b/metadata-jobs/mce-consumer/build.gradle @@ -20,6 +20,9 @@ dependencies { compile project(':metadata-events:mxe-utils-avro-1.7') compile project(':metadata-io') compile project(':gms:client') + compile (project(':gms:factories')) { + exclude group: 'org.neo4j.test' + } compile spec.product.pegasus.restliClient compile spec.product.pegasus.restliCommon compile externalDependency.elasticSearchRest diff --git a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeEventsProcessor.java b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeEventsProcessor.java index 190b3dfb3a..312c3465b2 100644 --- a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeEventsProcessor.java +++ b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeEventsProcessor.java @@ -1,8 +1,11 @@ package com.linkedin.metadata.kafka; -import com.linkedin.entity.client.EntityClient; import com.linkedin.entity.Entity; +import com.linkedin.entity.client.EntityClient; +import com.linkedin.gms.factory.kafka.KafkaEventConsumerFactory; +import com.linkedin.gms.factory.kafka.KafkaEventProducerFactory; import com.linkedin.metadata.EventUtils; +import com.linkedin.metadata.kafka.config.EntityClientConfig; import com.linkedin.metadata.kafka.config.MetadataChangeProposalProcessorCondition; import com.linkedin.metadata.snapshot.Snapshot; import com.linkedin.mxe.FailedMetadataChangeEvent; @@ -11,15 +14,19 @@ import com.linkedin.mxe.Topics; import com.linkedin.r2.RemoteInvocationException; import java.io.IOException; import javax.annotation.Nonnull; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Conditional; +import org.springframework.context.annotation.Import; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; @@ -27,23 +34,18 @@ import org.springframework.stereotype.Component; @Component @Conditional(MetadataChangeProposalProcessorCondition.class) @EnableKafka +@Import({EntityClientConfig.class, KafkaEventConsumerFactory.class, KafkaEventProducerFactory.class}) +@RequiredArgsConstructor public class MetadataChangeEventsProcessor { - private EntityClient entityClient; - private KafkaTemplate kafkaTemplate; + private final EntityClient entityClient; + private final Producer kafkaProducer; @Value("${KAFKA_FMCE_TOPIC_NAME:" + Topics.FAILED_METADATA_CHANGE_EVENT + "}") private String fmceTopicName; - public MetadataChangeEventsProcessor( - @Nonnull final EntityClient entityClient, - @Nonnull final KafkaTemplate kafkaTemplate) { - this.entityClient = entityClient; - this.kafkaTemplate = kafkaTemplate; - } - @KafkaListener(id = "${KAFKA_CONSUMER_GROUP_ID:mce-consumer-job-client}", topics = "${KAFKA_MCE_TOPIC_NAME:" - + Topics.METADATA_CHANGE_EVENT + "}", containerFactory = "mceKafkaContainerFactory") + + Topics.METADATA_CHANGE_EVENT + "}", containerFactory = "kafkaEventConsumer") public void consume(final ConsumerRecord consumerRecord) { final GenericRecord record = consumerRecord.value(); log.debug("Record ", record); @@ -69,7 +71,7 @@ public class MetadataChangeEventsProcessor { final GenericRecord genericFailedMCERecord = EventUtils.pegasusToAvroFailedMCE(failedMetadataChangeEvent); log.debug("Sending FailedMessages to topic - {}", fmceTopicName); log.info("Error while processing MCE: FailedMetadataChangeEvent - {}", failedMetadataChangeEvent); - this.kafkaTemplate.send(fmceTopicName, genericFailedMCERecord); + kafkaProducer.send(new ProducerRecord<>(fmceTopicName, genericFailedMCERecord)); } catch (IOException e) { log.error("Error while sending FailedMetadataChangeEvent: Exception - {}, FailedMetadataChangeEvent - {}", e.getStackTrace(), failedMetadataChangeEvent); @@ -85,7 +87,8 @@ public class MetadataChangeEventsProcessor { return fmce; } - private void processProposedSnapshot(@Nonnull MetadataChangeEvent metadataChangeEvent) throws RemoteInvocationException { + private void processProposedSnapshot(@Nonnull MetadataChangeEvent metadataChangeEvent) + throws RemoteInvocationException { final Snapshot snapshotUnion = metadataChangeEvent.getProposedSnapshot(); final Entity entity = new Entity().setValue(snapshotUnion); entityClient.updateWithSystemMetadata(entity, metadataChangeEvent.getSystemMetadata()); diff --git a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java index d6b35ac895..8db6b3b3ee 100644 --- a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java +++ b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java @@ -1,22 +1,29 @@ package com.linkedin.metadata.kafka; import com.linkedin.entity.client.AspectClient; +import com.linkedin.gms.factory.kafka.KafkaEventConsumerFactory; +import com.linkedin.gms.factory.kafka.KafkaEventProducerFactory; import com.linkedin.metadata.EventUtils; +import com.linkedin.metadata.kafka.config.EntityClientConfig; import com.linkedin.metadata.kafka.config.MetadataChangeProposalProcessorCondition; import com.linkedin.mxe.FailedMetadataChangeProposal; import com.linkedin.mxe.MetadataChangeProposal; import com.linkedin.mxe.Topics; import java.io.IOException; import javax.annotation.Nonnull; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Conditional; +import org.springframework.context.annotation.Import; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; @@ -24,24 +31,19 @@ import org.springframework.stereotype.Component; @Component @Conditional(MetadataChangeProposalProcessorCondition.class) @EnableKafka +@Import({EntityClientConfig.class, KafkaEventConsumerFactory.class, KafkaEventProducerFactory.class}) +@RequiredArgsConstructor public class MetadataChangeProposalsProcessor { - private AspectClient aspectClient; - private KafkaTemplate kafkaTemplate; + private final AspectClient aspectClient; + private final Producer kafkaProducer; @Value("${FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME:" + Topics.FAILED_METADATA_CHANGE_PROPOSAL + "}") private String fmcpTopicName; - public MetadataChangeProposalsProcessor( - @Nonnull final AspectClient aspectClient, - @Nonnull final KafkaTemplate kafkaTemplate) { - this.aspectClient = aspectClient; - this.kafkaTemplate = kafkaTemplate; - } - @KafkaListener(id = "${METADATA_CHANGE_PROPOSAL_KAFKA_CONSUMER_GROUP_ID:generic-mce-consumer-job-client}", topics = "${METADATA_CHANGE_PROPOSAL_TOPIC_NAME:" + Topics.METADATA_CHANGE_PROPOSAL - + "}", containerFactory = "mceKafkaContainerFactory") + + "}", containerFactory = "kafkaEventConsumer") public void consume(final ConsumerRecord consumerRecord) { final GenericRecord record = consumerRecord.value(); log.debug("Record {}", record); @@ -64,7 +66,7 @@ public class MetadataChangeProposalsProcessor { final GenericRecord genericFailedMCERecord = EventUtils.pegasusToAvroFailedMCP(failedMetadataChangeProposal); log.debug("Sending FailedMessages to topic - {}", fmcpTopicName); log.info("Error while processing FMCP: FailedMetadataChangeProposal - {}", failedMetadataChangeProposal); - this.kafkaTemplate.send(fmcpTopicName, genericFailedMCERecord); + kafkaProducer.send(new ProducerRecord<>(fmcpTopicName, genericFailedMCERecord)); } catch (IOException e) { log.error("Error while sending FailedMetadataChangeProposal: Exception - {}, FailedMetadataChangeProposal - {}", e.getStackTrace(), failedMetadataChangeProposal); diff --git a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/config/MceKafkaConfig.java b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/config/MceKafkaConfig.java deleted file mode 100644 index 15d264dc96..0000000000 --- a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/config/MceKafkaConfig.java +++ /dev/null @@ -1,90 +0,0 @@ -package com.linkedin.metadata.kafka.config; - -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; -import io.confluent.kafka.serializers.KafkaAvroDeserializer; -import io.confluent.kafka.serializers.KafkaAvroSerializer; -import java.time.Duration; -import java.util.Arrays; -import java.util.Map; -import lombok.extern.slf4j.Slf4j; -import org.apache.avro.generic.GenericRecord; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.autoconfigure.kafka.KafkaProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -import org.springframework.kafka.config.KafkaListenerContainerFactory; -import org.springframework.kafka.core.DefaultKafkaConsumerFactory; -import org.springframework.kafka.core.DefaultKafkaProducerFactory; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.listener.ErrorHandler; - - -@Slf4j -@Configuration -public class MceKafkaConfig { - - @Value("${KAFKA_BOOTSTRAP_SERVER:http://localhost:9092}") - private String kafkaBootstrapServers; - @Value("${KAFKA_SCHEMAREGISTRY_URL:http://localhost:8081}") - private String kafkaSchemaRegistryUrl; - - @Bean(name = "mceKafkaContainerFactory") - public KafkaListenerContainerFactory kafkaListenerContainerFactory(KafkaProperties properties) { - KafkaProperties.Consumer consumerProps = properties.getConsumer(); - - // Specify (de)serializers for record keys and for record values. - consumerProps.setKeyDeserializer(StringDeserializer.class); - consumerProps.setValueDeserializer(KafkaAvroDeserializer.class); - // Records will be flushed every 10 seconds. - consumerProps.setEnableAutoCommit(true); - consumerProps.setAutoCommitInterval(Duration.ofSeconds(10)); - - // KAFKA_BOOTSTRAP_SERVER has precedence over SPRING_KAFKA_BOOTSTRAP_SERVERS - if (kafkaBootstrapServers != null && kafkaBootstrapServers.length() > 0) { - consumerProps.setBootstrapServers(Arrays.asList(kafkaBootstrapServers.split(","))); - } // else we rely on KafkaProperties which defaults to localhost:9092 - - Map props = properties.buildConsumerProperties(); - props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaSchemaRegistryUrl); - - ConcurrentKafkaListenerContainerFactory factory = - new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props)); - - log.info("KafkaListenerContainerFactory built successfully"); - - return factory; - } - - @Bean - public KafkaTemplate kafkaTemplate(KafkaProperties properties) { - KafkaProperties.Producer producerProps = properties.getProducer(); - - producerProps.setKeySerializer(StringSerializer.class); - producerProps.setValueSerializer(KafkaAvroSerializer.class); - - // KAFKA_BOOTSTRAP_SERVER has precedence over SPRING_KAFKA_BOOTSTRAP_SERVERS - if (kafkaBootstrapServers != null && kafkaBootstrapServers.length() > 0) { - producerProps.setBootstrapServers(Arrays.asList(kafkaBootstrapServers.split(","))); - } // else we rely on KafkaProperties which defaults to localhost:9092 - - Map props = properties.buildProducerProperties(); - props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaSchemaRegistryUrl); - - KafkaTemplate template = - new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props)); - - log.info("KafkaTemplate built successfully"); - - return template; - } - - @Bean - public ErrorHandler errorHandler() { - return (e, r) -> log.error("Exception caught during Deserialization, topic: {}, partition: {}, offset: {}", - r.topic(), r.partition(), r.offset(), e); - } -}