From b97e7a03e8b74335688e3368b1d19b36ac12b659 Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Wed, 17 May 2023 10:32:23 -0500 Subject: [PATCH] feat(system-update): remove datahub-update requirement on schema reg (#7999) --- build.gradle | 3 +- datahub-upgrade/build.gradle | 6 + .../upgrade/config/SystemUpdateConfig.java | 35 +++++- .../datahub/upgrade/system/SystemUpdate.java | 8 +- .../DatahubUpgradeNoSchemaRegistryTest.java | 69 +++++++++++ .../com/linkedin/metadata/EventUtils.java | 2 +- .../spring/MCLSpringTestConfiguration.java | 9 ++ .../kafka/DataHubKafkaProducerFactory.java | 16 ++- .../kafka/KafkaEventConsumerFactory.java | 114 ++++++++++++------ .../AwsGlueSchemaRegistryFactory.java | 2 +- .../DUHESchemaRegistryFactory.java | 32 +++++ .../InternalSchemaRegistryFactory.java | 2 +- .../KafkaSchemaRegistryFactory.java | 2 +- .../kafka/DataHubUpgradeKafkaListener.java | 14 ++- .../boot/kafka/MockDUHEDeserializer.java | 57 +++++++++ .../boot/kafka/MockDUHESerializer.java | 45 +++++++ 16 files changed, 354 insertions(+), 62 deletions(-) create mode 100644 datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeNoSchemaRegistryTest.java create mode 100644 metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/DUHESchemaRegistryFactory.java create mode 100644 metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockDUHEDeserializer.java create mode 100644 metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockDUHESerializer.java diff --git a/build.gradle b/build.gradle index 1d45360f3c..2c707c4da2 100644 --- a/build.gradle +++ b/build.gradle @@ -18,6 +18,7 @@ buildscript { ext.slf4jVersion = '1.7.36' ext.logbackClassic = '1.2.12' ext.hadoop3Version = '3.3.5' + ext.kafkaVersion = '2.3.0' ext.docker_registry = 'linkedin' @@ -135,7 +136,7 @@ project.ext.externalDependency = [ // avro-serde includes dependencies for `kafka-avro-serializer` `kafka-schema-registry-client` and `avro` 'kafkaAvroSerde': 'io.confluent:kafka-streams-avro-serde:5.5.1', 'kafkaAvroSerializer': 'io.confluent:kafka-avro-serializer:5.1.4', - 'kafkaClients': 'org.apache.kafka:kafka-clients:2.3.0', + 'kafkaClients': "org.apache.kafka:kafka-clients:$kafkaVersion", 'logbackClassic': "ch.qos.logback:logback-classic:$logbackClassic", 'slf4jApi': "org.slf4j:slf4j-api:$slf4jVersion", 'log4jCore': "org.apache.logging.log4j:log4j-core:$log4jVersion", diff --git a/datahub-upgrade/build.gradle b/datahub-upgrade/build.gradle index d6252e2f60..cd4a3ebcda 100644 --- a/datahub-upgrade/build.gradle +++ b/datahub-upgrade/build.gradle @@ -40,6 +40,12 @@ dependencies { } } + + // mock internal schema registry + implementation externalDependency.kafkaAvroSerde + implementation externalDependency.kafkaAvroSerializer + implementation "org.apache.kafka:kafka_2.12:$kafkaVersion" + implementation externalDependency.slf4jApi compileOnly externalDependency.lombok compile externalDependency.picocli diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java index 2d517347f1..826bd0180d 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java @@ -3,27 +3,37 @@ package com.linkedin.datahub.upgrade.config; import com.linkedin.datahub.upgrade.system.SystemUpdate; import com.linkedin.datahub.upgrade.system.elasticsearch.BuildIndices; import com.linkedin.datahub.upgrade.system.elasticsearch.CleanIndices; +import com.linkedin.gms.factory.common.TopicConventionFactory; +import com.linkedin.gms.factory.config.ConfigurationProvider; +import com.linkedin.gms.factory.kafka.DataHubKafkaProducerFactory; +import com.linkedin.gms.factory.kafka.schemaregistry.SchemaRegistryConfig; +import com.linkedin.metadata.config.kafka.KafkaConfiguration; +import com.linkedin.metadata.dao.producer.KafkaEventProducer; import com.linkedin.metadata.dao.producer.KafkaHealthChecker; import com.linkedin.metadata.version.GitVersion; import com.linkedin.mxe.TopicConvention; +import lombok.extern.slf4j.Slf4j; import org.apache.avro.generic.IndexedRecord; +import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +@Slf4j @Configuration public class SystemUpdateConfig { @Bean(name = "systemUpdate") public SystemUpdate systemUpdate(final BuildIndices buildIndices, final CleanIndices cleanIndices, - final Producer producer, - final TopicConvention convention, final GitVersion gitVersion, - final KafkaHealthChecker kafkaHealthChecker, @Qualifier("revision") String revision) { + @Qualifier("duheKafkaEventProducer") final KafkaEventProducer kafkaEventProducer, + final GitVersion gitVersion, @Qualifier("revision") String revision) { String version = String.format("%s-%s", gitVersion.getVersion(), revision); - return new SystemUpdate(buildIndices, cleanIndices, producer, convention, version, kafkaHealthChecker); + return new SystemUpdate(buildIndices, cleanIndices, kafkaEventProducer, version); } @Value("#{systemEnvironment['DATAHUB_REVISION'] ?: '0'}") @@ -33,4 +43,21 @@ public class SystemUpdateConfig { public String getRevision() { return revision; } + + @Autowired + @Qualifier(TopicConventionFactory.TOPIC_CONVENTION_BEAN) + private TopicConvention topicConvention; + + @Autowired + private KafkaHealthChecker kafkaHealthChecker; + + @Bean(name = "duheKafkaEventProducer") + protected KafkaEventProducer duheKafkaEventProducer(@Qualifier("configurationProvider") ConfigurationProvider provider, + KafkaProperties properties, + @Qualifier("duheSchemaRegistryConfig") SchemaRegistryConfig duheSchemaRegistryConfig) { + KafkaConfiguration kafkaConfiguration = provider.getKafka(); + Producer producer = new KafkaProducer<>( + DataHubKafkaProducerFactory.buildProducerProperties(duheSchemaRegistryConfig, kafkaConfiguration, properties)); + return new KafkaEventProducer(producer, topicConvention, kafkaHealthChecker); + } } diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/SystemUpdate.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/SystemUpdate.java index a2728bf64d..d7e490501a 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/SystemUpdate.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/SystemUpdate.java @@ -7,11 +7,7 @@ import com.linkedin.datahub.upgrade.system.elasticsearch.BuildIndices; import com.linkedin.datahub.upgrade.system.elasticsearch.CleanIndices; import com.linkedin.datahub.upgrade.system.elasticsearch.steps.DataHubStartupStep; import com.linkedin.metadata.dao.producer.KafkaEventProducer; -import com.linkedin.metadata.dao.producer.KafkaHealthChecker; -import com.linkedin.mxe.TopicConvention; import lombok.extern.slf4j.Slf4j; -import org.apache.avro.generic.IndexedRecord; -import org.apache.kafka.clients.producer.Producer; import java.util.List; import java.util.stream.Collectors; @@ -25,9 +21,7 @@ public class SystemUpdate implements Upgrade { private final List _steps; public SystemUpdate(final BuildIndices buildIndicesJob, final CleanIndices cleanIndicesJob, - final Producer producer, - final TopicConvention convention, final String version, final KafkaHealthChecker kafkaHealthChecker) { - final KafkaEventProducer kafkaEventProducer = new KafkaEventProducer(producer, convention, kafkaHealthChecker); + final KafkaEventProducer kafkaEventProducer, final String version) { _preStartupUpgrades = List.of(buildIndicesJob); _steps = List.of(new DataHubStartupStep(kafkaEventProducer, version)); diff --git a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeNoSchemaRegistryTest.java b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeNoSchemaRegistryTest.java new file mode 100644 index 0000000000..dd47e049dd --- /dev/null +++ b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeNoSchemaRegistryTest.java @@ -0,0 +1,69 @@ +package com.linkedin.datahub.upgrade; + +import com.linkedin.datahub.upgrade.system.SystemUpdate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.testng.AbstractTestNGSpringContextTests; +import org.testng.annotations.Test; + +import javax.inject.Named; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertNotNull; + +@ActiveProfiles("test") +@SpringBootTest(classes = {UpgradeCliApplication.class, UpgradeCliApplicationTestConfiguration.class}, + properties = { + "kafka.schemaRegistry.type=INTERNAL" + }) +public class DatahubUpgradeNoSchemaRegistryTest extends AbstractTestNGSpringContextTests { + + @Autowired + @Named("systemUpdate") + private SystemUpdate systemUpdate; + + @Test + public void testSystemUpdateInit() { + assertNotNull(systemUpdate); + } + + @Test + public void testSystemUpdateSend() { + UpgradeStepResult.Result result = systemUpdate.steps().stream() + .filter(s -> s.id().equals("DataHubStartupStep")) + .findFirst().get() + .executable().apply(new UpgradeContext() { + @Override + public Upgrade upgrade() { + return null; + } + + @Override + public List stepResults() { + return null; + } + + @Override + public UpgradeReport report() { + return null; + } + + @Override + public List args() { + return null; + } + + @Override + public Map> parsedArgs() { + return null; + } + }).result(); + assertEquals("SUCCEEDED", result.toString()); + } + +} diff --git a/metadata-events/mxe-utils-avro-1.7/src/main/java/com/linkedin/metadata/EventUtils.java b/metadata-events/mxe-utils-avro-1.7/src/main/java/com/linkedin/metadata/EventUtils.java index be0302061a..9c95d9f4aa 100644 --- a/metadata-events/mxe-utils-avro-1.7/src/main/java/com/linkedin/metadata/EventUtils.java +++ b/metadata-events/mxe-utils-avro-1.7/src/main/java/com/linkedin/metadata/EventUtils.java @@ -66,7 +66,7 @@ public class EventUtils { private static final Schema ORIGINAL_PE_AVRO_SCHEMA = getAvroSchemaFromResource("avro/com/linkedin/mxe/PlatformEvent.avsc"); - private static final Schema ORIGINAL_DUHE_AVRO_SCHEMA = + public static final Schema ORIGINAL_DUHE_AVRO_SCHEMA = getAvroSchemaFromResource("avro/com/linkedin/mxe/DataHubUpgradeHistoryEvent.avsc"); private static final Schema RENAMED_MCE_AVRO_SCHEMA = com.linkedin.pegasus2avro.mxe.MetadataChangeEvent.SCHEMA$; diff --git a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLSpringTestConfiguration.java b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLSpringTestConfiguration.java index a0c8e2f3d1..8196ed43df 100644 --- a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLSpringTestConfiguration.java +++ b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLSpringTestConfiguration.java @@ -3,6 +3,7 @@ package com.linkedin.metadata.kafka.hook.spring; import com.datahub.authentication.Authentication; import com.datahub.metadata.ingestion.IngestionScheduler; import com.linkedin.entity.client.RestliEntityClient; +import com.linkedin.gms.factory.kafka.schemaregistry.SchemaRegistryConfig; import com.linkedin.metadata.boot.kafka.DataHubUpgradeKafkaListener; import com.linkedin.metadata.graph.elastic.ElasticSearchGraphService; import com.linkedin.metadata.models.registry.EntityRegistry; @@ -11,9 +12,11 @@ import com.linkedin.metadata.search.elasticsearch.ElasticSearchService; import com.linkedin.metadata.search.transformer.SearchDocumentTransformer; import com.linkedin.metadata.systemmetadata.SystemMetadataService; import com.linkedin.metadata.timeseries.TimeseriesAspectService; +import org.apache.avro.generic.GenericRecord; import org.springframework.boot.test.mock.mockito.MockBean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; @Configuration @@ -52,6 +55,12 @@ public class MCLSpringTestConfiguration { @MockBean(name = "dataHubUpgradeKafkaListener") public DataHubUpgradeKafkaListener dataHubUpgradeKafkaListener; + @MockBean(name = "duheSchemaRegistryConfig") + public SchemaRegistryConfig schemaRegistryConfig; + + @MockBean(name = "duheKafkaConsumerFactory") + public DefaultKafkaConsumerFactory defaultKafkaConsumerFactory; + @MockBean public SchemaRegistryService schemaRegistryService; } diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/DataHubKafkaProducerFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/DataHubKafkaProducerFactory.java index a7da2641d2..251d674909 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/DataHubKafkaProducerFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/DataHubKafkaProducerFactory.java @@ -31,12 +31,18 @@ import org.springframework.context.annotation.PropertySource; public class DataHubKafkaProducerFactory { @Autowired - private SchemaRegistryConfig schemaRegistryConfig; + @Qualifier("schemaRegistryConfig") + private SchemaRegistryConfig _schemaRegistryConfig; @Bean(name = "kafkaProducer") protected Producer createInstance(@Qualifier("configurationProvider") ConfigurationProvider provider, KafkaProperties properties) { KafkaConfiguration kafkaConfiguration = provider.getKafka(); + return new KafkaProducer<>(buildProducerProperties(_schemaRegistryConfig, kafkaConfiguration, properties)); + } + + public static Map buildProducerProperties(SchemaRegistryConfig schemaRegistryConfig, + KafkaConfiguration kafkaConfiguration, KafkaProperties properties) { KafkaProperties.Producer producerProps = properties.getProducer(); producerProps.setKeySerializer(StringSerializer.class); @@ -56,10 +62,10 @@ public class DataHubKafkaProducerFactory { // Override KafkaProperties with SchemaRegistryConfig only for non-empty values schemaRegistryConfig.getProperties().entrySet() - .stream() - .filter(entry -> entry.getValue() != null && !entry.getValue().toString().isEmpty()) - .forEach(entry -> props.put(entry.getKey(), entry.getValue())); + .stream() + .filter(entry -> entry.getValue() != null && !entry.getValue().toString().isEmpty()) + .forEach(entry -> props.put(entry.getKey(), entry.getValue())); - return new KafkaProducer<>(props); + return props; } } \ No newline at end of file diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java index c22cdf9216..5362ad0b11 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java @@ -6,9 +6,11 @@ import com.linkedin.gms.factory.kafka.schemaregistry.InternalSchemaRegistryFacto import com.linkedin.gms.factory.kafka.schemaregistry.KafkaSchemaRegistryFactory; import com.linkedin.gms.factory.kafka.schemaregistry.SchemaRegistryConfig; import com.linkedin.metadata.config.kafka.KafkaConfiguration; + import java.time.Duration; import java.util.Arrays; import java.util.Map; + import lombok.extern.slf4j.Slf4j; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -28,54 +30,90 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory; public class KafkaEventConsumerFactory { - private int kafkaEventConsumerConcurrency; + private int kafkaEventConsumerConcurrency; - @Bean(name = "kafkaConsumerFactory") - protected DefaultKafkaConsumerFactory createConsumerFactory( - @Qualifier("configurationProvider") ConfigurationProvider provider, - KafkaProperties baseKafkaProperties, SchemaRegistryConfig schemaRegistryConfig) { - kafkaEventConsumerConcurrency = provider.getKafka().getListener().getConcurrency(); + @Bean(name = "kafkaConsumerFactory") + protected DefaultKafkaConsumerFactory createConsumerFactory( + @Qualifier("configurationProvider") ConfigurationProvider provider, + KafkaProperties baseKafkaProperties, + SchemaRegistryConfig schemaRegistryConfig) { + kafkaEventConsumerConcurrency = provider.getKafka().getListener().getConcurrency(); - KafkaConfiguration kafkaConfiguration = provider.getKafka(); + KafkaConfiguration kafkaConfiguration = provider.getKafka(); + Map customizedProperties = buildCustomizedProperties(baseKafkaProperties, kafkaConfiguration, + schemaRegistryConfig); - KafkaProperties.Consumer consumerProps = baseKafkaProperties.getConsumer(); + return new DefaultKafkaConsumerFactory<>(customizedProperties); + } - // Specify (de)serializers for record keys and for record values. - consumerProps.setKeyDeserializer(StringDeserializer.class); - // Records will be flushed every 10 seconds. - consumerProps.setEnableAutoCommit(true); - consumerProps.setAutoCommitInterval(Duration.ofSeconds(10)); + @Bean(name = "duheKafkaConsumerFactory") + protected DefaultKafkaConsumerFactory duheKafkaConsumerFactory( + @Qualifier("configurationProvider") ConfigurationProvider provider, + KafkaProperties baseKafkaProperties, + @Qualifier("duheSchemaRegistryConfig") SchemaRegistryConfig schemaRegistryConfig) { - // KAFKA_BOOTSTRAP_SERVER has precedence over SPRING_KAFKA_BOOTSTRAP_SERVERS - if (kafkaConfiguration.getBootstrapServers() != null && kafkaConfiguration.getBootstrapServers().length() > 0) { - consumerProps.setBootstrapServers(Arrays.asList(kafkaConfiguration.getBootstrapServers().split(","))); - } // else we rely on KafkaProperties which defaults to localhost:9092 + KafkaConfiguration kafkaConfiguration = provider.getKafka(); + Map customizedProperties = buildCustomizedProperties(baseKafkaProperties, kafkaConfiguration, + schemaRegistryConfig); - Map customizedProperties = baseKafkaProperties.buildConsumerProperties(); - customizedProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, schemaRegistryConfig.getDeserializer()); + return new DefaultKafkaConsumerFactory<>(customizedProperties); + } - // Override KafkaProperties with SchemaRegistryConfig only for non-empty values - schemaRegistryConfig.getProperties().entrySet() - .stream() - .filter(entry -> entry.getValue() != null && !entry.getValue().toString().isEmpty()) - .forEach(entry -> customizedProperties.put(entry.getKey(), entry.getValue())); + private static Map buildCustomizedProperties(KafkaProperties baseKafkaProperties, + KafkaConfiguration kafkaConfiguration, + SchemaRegistryConfig schemaRegistryConfig) { + KafkaProperties.Consumer consumerProps = baseKafkaProperties.getConsumer(); - return new DefaultKafkaConsumerFactory<>(customizedProperties); - } + // Specify (de)serializers for record keys and for record values. + consumerProps.setKeyDeserializer(StringDeserializer.class); + // Records will be flushed every 10 seconds. + consumerProps.setEnableAutoCommit(true); + consumerProps.setAutoCommitInterval(Duration.ofSeconds(10)); - @Bean(name = "kafkaEventConsumer") - protected KafkaListenerContainerFactory createInstance( - @Qualifier("kafkaConsumerFactory") DefaultKafkaConsumerFactory kafkaConsumerFactory) { + // KAFKA_BOOTSTRAP_SERVER has precedence over SPRING_KAFKA_BOOTSTRAP_SERVERS + if (kafkaConfiguration.getBootstrapServers() != null && kafkaConfiguration.getBootstrapServers().length() > 0) { + consumerProps.setBootstrapServers(Arrays.asList(kafkaConfiguration.getBootstrapServers().split(","))); + } // else we rely on KafkaProperties which defaults to localhost:9092 - ConcurrentKafkaListenerContainerFactory factory = - new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(kafkaConsumerFactory); - factory.setContainerCustomizer(new ThreadPoolContainerCustomizer()); - factory.setConcurrency(kafkaEventConsumerConcurrency); + Map customizedProperties = baseKafkaProperties.buildConsumerProperties(); + customizedProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, schemaRegistryConfig.getDeserializer()); - log.info(String.format("Event-based KafkaListenerContainerFactory built successfully. Consumer concurrency = %s", - kafkaEventConsumerConcurrency)); + // Override KafkaProperties with SchemaRegistryConfig only for non-empty values + schemaRegistryConfig.getProperties().entrySet() + .stream() + .filter(entry -> entry.getValue() != null && !entry.getValue().toString().isEmpty()) + .forEach(entry -> customizedProperties.put(entry.getKey(), entry.getValue())); - return factory; - } + return customizedProperties; + } + + @Bean(name = "kafkaEventConsumer") + protected KafkaListenerContainerFactory createInstance( + @Qualifier("kafkaConsumerFactory") DefaultKafkaConsumerFactory kafkaConsumerFactory) { + + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(kafkaConsumerFactory); + factory.setContainerCustomizer(new ThreadPoolContainerCustomizer()); + factory.setConcurrency(kafkaEventConsumerConcurrency); + + log.info(String.format("Event-based KafkaListenerContainerFactory built successfully. Consumer concurrency = %s", + kafkaEventConsumerConcurrency)); + + return factory; + } + + @Bean(name = "duheKafkaEventConsumer") + protected KafkaListenerContainerFactory duheKafkaEventConsumer( + @Qualifier("duheKafkaConsumerFactory") DefaultKafkaConsumerFactory kafkaConsumerFactory) { + + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(kafkaConsumerFactory); + factory.setContainerCustomizer(new ThreadPoolContainerCustomizer()); + factory.setConcurrency(1); + + log.info("Event-based DUHE KafkaListenerContainerFactory built successfully. Consumer concurrency = 1"); + return factory; + } } \ No newline at end of file diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/AwsGlueSchemaRegistryFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/AwsGlueSchemaRegistryFactory.java index fcd6e88fa6..59f08e3733 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/AwsGlueSchemaRegistryFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/AwsGlueSchemaRegistryFactory.java @@ -31,7 +31,7 @@ public class AwsGlueSchemaRegistryFactory { @Value("${kafka.schemaRegistry.awsGlue.registryName}") private Optional registryName; - @Bean + @Bean("schemaRegistryConfig") @Nonnull protected SchemaRegistryConfig getInstance(ConfigurationProvider configurationProvider) { Map props = new HashMap<>(); diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/DUHESchemaRegistryFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/DUHESchemaRegistryFactory.java new file mode 100644 index 0000000000..440d7a3cf2 --- /dev/null +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/DUHESchemaRegistryFactory.java @@ -0,0 +1,32 @@ +package com.linkedin.gms.factory.kafka.schemaregistry; + +import com.linkedin.gms.factory.config.ConfigurationProvider; +import com.linkedin.metadata.boot.kafka.MockDUHEDeserializer; +import com.linkedin.metadata.boot.kafka.MockDUHESerializer; +import com.linkedin.metadata.config.kafka.KafkaConfiguration; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.HashMap; +import java.util.Map; + +@Slf4j +@Configuration +public class DUHESchemaRegistryFactory { + /** + * Configure Kafka Producer/Consumer processes with a custom schema registry. + */ + @Bean("duheSchemaRegistryConfig") + protected SchemaRegistryConfig duheSchemaRegistryConfig(ConfigurationProvider provider) { + Map props = new HashMap<>(); + KafkaConfiguration kafkaConfiguration = provider.getKafka(); + + props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaConfiguration + .getSchemaRegistry().getUrl()); + + log.info("DataHub System Update Registry"); + return new SchemaRegistryConfig(MockDUHESerializer.class, MockDUHEDeserializer.class, props); + } +} diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/InternalSchemaRegistryFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/InternalSchemaRegistryFactory.java index d7ff458de3..b103b84307 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/InternalSchemaRegistryFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/InternalSchemaRegistryFactory.java @@ -30,7 +30,7 @@ public class InternalSchemaRegistryFactory { /** * Configure Kafka Producer/Consumer processes with a custom schema registry. */ - @Bean + @Bean("schemaRegistryConfig") @Nonnull protected SchemaRegistryConfig getInstance(@Qualifier("configurationProvider") ConfigurationProvider provider) { Map props = new HashMap<>(); diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/KafkaSchemaRegistryFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/KafkaSchemaRegistryFactory.java index e6eb980534..d0e11baab9 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/KafkaSchemaRegistryFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/KafkaSchemaRegistryFactory.java @@ -44,7 +44,7 @@ public class KafkaSchemaRegistryFactory { @Value("${kafka.schema.registry.security.protocol:}") private String securityProtocol; - @Bean + @Bean("schemaRegistryConfig") @Nonnull protected SchemaRegistryConfig getInstance(ConfigurationProvider configurationProvider) { Map props = new HashMap<>(); diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/DataHubUpgradeKafkaListener.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/DataHubUpgradeKafkaListener.java index 8afc229c47..20889e6497 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/DataHubUpgradeKafkaListener.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/DataHubUpgradeKafkaListener.java @@ -12,12 +12,12 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.annotation.KafkaListener; @@ -30,7 +30,6 @@ import org.springframework.stereotype.Component; // We don't disable this on GMS since we want GMS to also wait until the system is ready to read in case of // backwards incompatible query logic dependent on system updates. @Component("dataHubUpgradeKafkaListener") -@RequiredArgsConstructor @Slf4j @EnableKafka public class DataHubUpgradeKafkaListener implements ConsumerSeekAware, BootstrapDependency { @@ -56,6 +55,15 @@ public class DataHubUpgradeKafkaListener implements ConsumerSeekAware, Bootstrap private final static AtomicBoolean IS_UPDATED = new AtomicBoolean(false); + public DataHubUpgradeKafkaListener(KafkaListenerEndpointRegistry registry, + @Qualifier("duheKafkaConsumerFactory") DefaultKafkaConsumerFactory defaultKafkaConsumerFactory, + GitVersion gitVersion, + ConfigurationProvider configurationProvider) { + this.registry = registry; + this._defaultKafkaConsumerFactory = defaultKafkaConsumerFactory; + this._gitVersion = gitVersion; + this._configurationProvider = configurationProvider; + } // Constructs a consumer to read determine final offset to assign, prevents re-reading whole topic to get the latest version @Override @@ -73,7 +81,7 @@ public class DataHubUpgradeKafkaListener implements ConsumerSeekAware, Bootstrap } } - @KafkaListener(id = CONSUMER_GROUP, topics = {TOPIC_NAME}, containerFactory = "kafkaEventConsumer", concurrency = "1") + @KafkaListener(id = CONSUMER_GROUP, topics = {TOPIC_NAME}, containerFactory = "duheKafkaEventConsumer", concurrency = "1") public void checkSystemVersion(final ConsumerRecord consumerRecord) { final GenericRecord record = consumerRecord.value(); final String expectedVersion = String.format("%s-%s", _gitVersion.getVersion(), revision); diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockDUHEDeserializer.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockDUHEDeserializer.java new file mode 100644 index 0000000000..eef68b427e --- /dev/null +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockDUHEDeserializer.java @@ -0,0 +1,57 @@ +package com.linkedin.metadata.boot.kafka; + +import com.linkedin.metadata.EventUtils; +import io.confluent.kafka.schemaregistry.ParsedSchema; +import io.confluent.kafka.schemaregistry.avro.AvroSchema; +import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.Map; + +import static com.linkedin.metadata.boot.kafka.MockDUHESerializer.DATAHUB_UPGRADE_HISTORY_EVENT_SUBJECT; + +/** + * Used for early bootstrap to avoid contact with not yet existing schema registry + */ +@Slf4j +public class MockDUHEDeserializer extends KafkaAvroDeserializer { + + public MockDUHEDeserializer() { + this.schemaRegistry = buildMockSchemaRegistryClient(); + } + + public MockDUHEDeserializer(SchemaRegistryClient client) { + this.schemaRegistry = buildMockSchemaRegistryClient(); + } + + public MockDUHEDeserializer(SchemaRegistryClient client, Map props) { + super(client, props); + this.schemaRegistry = buildMockSchemaRegistryClient(); + } + + private static MockSchemaRegistryClient buildMockSchemaRegistryClient() { + MockSchemaRegistryClient schemaRegistry = new MockSchemaRegistryClient2(); + try { + schemaRegistry.register(DATAHUB_UPGRADE_HISTORY_EVENT_SUBJECT, + new AvroSchema(EventUtils.ORIGINAL_DUHE_AVRO_SCHEMA)); + return schemaRegistry; + } catch (IOException | RestClientException e) { + throw new RuntimeException(e); + } + } + + public static class MockSchemaRegistryClient2 extends MockSchemaRegistryClient { + /** + * Previously used topics can have schema ids > 1 which fully match + * however we are replacing that registry so force schema id to 1 + */ + @Override + public synchronized ParsedSchema getSchemaById(int id) throws IOException, RestClientException { + return super.getSchemaById(1); + } + } +} diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockDUHESerializer.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockDUHESerializer.java new file mode 100644 index 0000000000..2ce66243dd --- /dev/null +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockDUHESerializer.java @@ -0,0 +1,45 @@ +package com.linkedin.metadata.boot.kafka; + +import com.linkedin.metadata.EventUtils; +import io.confluent.kafka.schemaregistry.avro.AvroSchema; +import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import io.confluent.kafka.serializers.KafkaAvroSerializer; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.Map; + +/** + * Used for early bootstrap to avoid contact with not yet existing schema registry + */ +@Slf4j +public class MockDUHESerializer extends KafkaAvroSerializer { + + static final String DATAHUB_UPGRADE_HISTORY_EVENT_SUBJECT = "DataHubUpgradeHistory_v1-value"; + + public MockDUHESerializer() { + this.schemaRegistry = buildMockSchemaRegistryClient(); + } + + public MockDUHESerializer(SchemaRegistryClient client) { + this.schemaRegistry = buildMockSchemaRegistryClient(); + } + + public MockDUHESerializer(SchemaRegistryClient client, Map props) { + super(client, props); + this.schemaRegistry = buildMockSchemaRegistryClient(); + } + + private static MockSchemaRegistryClient buildMockSchemaRegistryClient() { + MockSchemaRegistryClient schemaRegistry = new MockSchemaRegistryClient(); + try { + schemaRegistry.register(DATAHUB_UPGRADE_HISTORY_EVENT_SUBJECT, + new AvroSchema(EventUtils.ORIGINAL_DUHE_AVRO_SCHEMA)); + return schemaRegistry; + } catch (IOException | RestClientException e) { + throw new RuntimeException(e); + } + } +}