mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-12 10:35:51 +00:00
feat(system-update): remove datahub-update requirement on schema reg (#7999)
This commit is contained in:
parent
151eab3628
commit
b97e7a03e8
@ -18,6 +18,7 @@ buildscript {
|
||||
ext.slf4jVersion = '1.7.36'
|
||||
ext.logbackClassic = '1.2.12'
|
||||
ext.hadoop3Version = '3.3.5'
|
||||
ext.kafkaVersion = '2.3.0'
|
||||
|
||||
ext.docker_registry = 'linkedin'
|
||||
|
||||
@ -135,7 +136,7 @@ project.ext.externalDependency = [
|
||||
// avro-serde includes dependencies for `kafka-avro-serializer` `kafka-schema-registry-client` and `avro`
|
||||
'kafkaAvroSerde': 'io.confluent:kafka-streams-avro-serde:5.5.1',
|
||||
'kafkaAvroSerializer': 'io.confluent:kafka-avro-serializer:5.1.4',
|
||||
'kafkaClients': 'org.apache.kafka:kafka-clients:2.3.0',
|
||||
'kafkaClients': "org.apache.kafka:kafka-clients:$kafkaVersion",
|
||||
'logbackClassic': "ch.qos.logback:logback-classic:$logbackClassic",
|
||||
'slf4jApi': "org.slf4j:slf4j-api:$slf4jVersion",
|
||||
'log4jCore': "org.apache.logging.log4j:log4j-core:$log4jVersion",
|
||||
|
||||
@ -40,6 +40,12 @@ dependencies {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// mock internal schema registry
|
||||
implementation externalDependency.kafkaAvroSerde
|
||||
implementation externalDependency.kafkaAvroSerializer
|
||||
implementation "org.apache.kafka:kafka_2.12:$kafkaVersion"
|
||||
|
||||
implementation externalDependency.slf4jApi
|
||||
compileOnly externalDependency.lombok
|
||||
compile externalDependency.picocli
|
||||
|
||||
@ -3,27 +3,37 @@ package com.linkedin.datahub.upgrade.config;
|
||||
import com.linkedin.datahub.upgrade.system.SystemUpdate;
|
||||
import com.linkedin.datahub.upgrade.system.elasticsearch.BuildIndices;
|
||||
import com.linkedin.datahub.upgrade.system.elasticsearch.CleanIndices;
|
||||
import com.linkedin.gms.factory.common.TopicConventionFactory;
|
||||
import com.linkedin.gms.factory.config.ConfigurationProvider;
|
||||
import com.linkedin.gms.factory.kafka.DataHubKafkaProducerFactory;
|
||||
import com.linkedin.gms.factory.kafka.schemaregistry.SchemaRegistryConfig;
|
||||
import com.linkedin.metadata.config.kafka.KafkaConfiguration;
|
||||
import com.linkedin.metadata.dao.producer.KafkaEventProducer;
|
||||
import com.linkedin.metadata.dao.producer.KafkaHealthChecker;
|
||||
import com.linkedin.metadata.version.GitVersion;
|
||||
import com.linkedin.mxe.TopicConvention;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.avro.generic.IndexedRecord;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.Producer;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
|
||||
@Slf4j
|
||||
@Configuration
|
||||
public class SystemUpdateConfig {
|
||||
@Bean(name = "systemUpdate")
|
||||
public SystemUpdate systemUpdate(final BuildIndices buildIndices, final CleanIndices cleanIndices,
|
||||
final Producer<String, ? extends IndexedRecord> producer,
|
||||
final TopicConvention convention, final GitVersion gitVersion,
|
||||
final KafkaHealthChecker kafkaHealthChecker, @Qualifier("revision") String revision) {
|
||||
@Qualifier("duheKafkaEventProducer") final KafkaEventProducer kafkaEventProducer,
|
||||
final GitVersion gitVersion, @Qualifier("revision") String revision) {
|
||||
|
||||
String version = String.format("%s-%s", gitVersion.getVersion(), revision);
|
||||
return new SystemUpdate(buildIndices, cleanIndices, producer, convention, version, kafkaHealthChecker);
|
||||
return new SystemUpdate(buildIndices, cleanIndices, kafkaEventProducer, version);
|
||||
}
|
||||
|
||||
@Value("#{systemEnvironment['DATAHUB_REVISION'] ?: '0'}")
|
||||
@ -33,4 +43,21 @@ public class SystemUpdateConfig {
|
||||
public String getRevision() {
|
||||
return revision;
|
||||
}
|
||||
|
||||
@Autowired
|
||||
@Qualifier(TopicConventionFactory.TOPIC_CONVENTION_BEAN)
|
||||
private TopicConvention topicConvention;
|
||||
|
||||
@Autowired
|
||||
private KafkaHealthChecker kafkaHealthChecker;
|
||||
|
||||
@Bean(name = "duheKafkaEventProducer")
|
||||
protected KafkaEventProducer duheKafkaEventProducer(@Qualifier("configurationProvider") ConfigurationProvider provider,
|
||||
KafkaProperties properties,
|
||||
@Qualifier("duheSchemaRegistryConfig") SchemaRegistryConfig duheSchemaRegistryConfig) {
|
||||
KafkaConfiguration kafkaConfiguration = provider.getKafka();
|
||||
Producer<String, IndexedRecord> producer = new KafkaProducer<>(
|
||||
DataHubKafkaProducerFactory.buildProducerProperties(duheSchemaRegistryConfig, kafkaConfiguration, properties));
|
||||
return new KafkaEventProducer(producer, topicConvention, kafkaHealthChecker);
|
||||
}
|
||||
}
|
||||
|
||||
@ -7,11 +7,7 @@ import com.linkedin.datahub.upgrade.system.elasticsearch.BuildIndices;
|
||||
import com.linkedin.datahub.upgrade.system.elasticsearch.CleanIndices;
|
||||
import com.linkedin.datahub.upgrade.system.elasticsearch.steps.DataHubStartupStep;
|
||||
import com.linkedin.metadata.dao.producer.KafkaEventProducer;
|
||||
import com.linkedin.metadata.dao.producer.KafkaHealthChecker;
|
||||
import com.linkedin.mxe.TopicConvention;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.avro.generic.IndexedRecord;
|
||||
import org.apache.kafka.clients.producer.Producer;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
@ -25,9 +21,7 @@ public class SystemUpdate implements Upgrade {
|
||||
private final List<UpgradeStep> _steps;
|
||||
|
||||
public SystemUpdate(final BuildIndices buildIndicesJob, final CleanIndices cleanIndicesJob,
|
||||
final Producer<String, ? extends IndexedRecord> producer,
|
||||
final TopicConvention convention, final String version, final KafkaHealthChecker kafkaHealthChecker) {
|
||||
final KafkaEventProducer kafkaEventProducer = new KafkaEventProducer(producer, convention, kafkaHealthChecker);
|
||||
final KafkaEventProducer kafkaEventProducer, final String version) {
|
||||
|
||||
_preStartupUpgrades = List.of(buildIndicesJob);
|
||||
_steps = List.of(new DataHubStartupStep(kafkaEventProducer, version));
|
||||
|
||||
@ -0,0 +1,69 @@
|
||||
package com.linkedin.datahub.upgrade;
|
||||
|
||||
import com.linkedin.datahub.upgrade.system.SystemUpdate;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.test.context.ActiveProfiles;
|
||||
import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import javax.inject.Named;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.testng.AssertJUnit.assertEquals;
|
||||
import static org.testng.AssertJUnit.assertNotNull;
|
||||
|
||||
@ActiveProfiles("test")
|
||||
@SpringBootTest(classes = {UpgradeCliApplication.class, UpgradeCliApplicationTestConfiguration.class},
|
||||
properties = {
|
||||
"kafka.schemaRegistry.type=INTERNAL"
|
||||
})
|
||||
public class DatahubUpgradeNoSchemaRegistryTest extends AbstractTestNGSpringContextTests {
|
||||
|
||||
@Autowired
|
||||
@Named("systemUpdate")
|
||||
private SystemUpdate systemUpdate;
|
||||
|
||||
@Test
|
||||
public void testSystemUpdateInit() {
|
||||
assertNotNull(systemUpdate);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSystemUpdateSend() {
|
||||
UpgradeStepResult.Result result = systemUpdate.steps().stream()
|
||||
.filter(s -> s.id().equals("DataHubStartupStep"))
|
||||
.findFirst().get()
|
||||
.executable().apply(new UpgradeContext() {
|
||||
@Override
|
||||
public Upgrade upgrade() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<UpgradeStepResult> stepResults() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpgradeReport report() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> args() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Optional<String>> parsedArgs() {
|
||||
return null;
|
||||
}
|
||||
}).result();
|
||||
assertEquals("SUCCEEDED", result.toString());
|
||||
}
|
||||
|
||||
}
|
||||
@ -66,7 +66,7 @@ public class EventUtils {
|
||||
private static final Schema ORIGINAL_PE_AVRO_SCHEMA =
|
||||
getAvroSchemaFromResource("avro/com/linkedin/mxe/PlatformEvent.avsc");
|
||||
|
||||
private static final Schema ORIGINAL_DUHE_AVRO_SCHEMA =
|
||||
public static final Schema ORIGINAL_DUHE_AVRO_SCHEMA =
|
||||
getAvroSchemaFromResource("avro/com/linkedin/mxe/DataHubUpgradeHistoryEvent.avsc");
|
||||
|
||||
private static final Schema RENAMED_MCE_AVRO_SCHEMA = com.linkedin.pegasus2avro.mxe.MetadataChangeEvent.SCHEMA$;
|
||||
|
||||
@ -3,6 +3,7 @@ package com.linkedin.metadata.kafka.hook.spring;
|
||||
import com.datahub.authentication.Authentication;
|
||||
import com.datahub.metadata.ingestion.IngestionScheduler;
|
||||
import com.linkedin.entity.client.RestliEntityClient;
|
||||
import com.linkedin.gms.factory.kafka.schemaregistry.SchemaRegistryConfig;
|
||||
import com.linkedin.metadata.boot.kafka.DataHubUpgradeKafkaListener;
|
||||
import com.linkedin.metadata.graph.elastic.ElasticSearchGraphService;
|
||||
import com.linkedin.metadata.models.registry.EntityRegistry;
|
||||
@ -11,9 +12,11 @@ import com.linkedin.metadata.search.elasticsearch.ElasticSearchService;
|
||||
import com.linkedin.metadata.search.transformer.SearchDocumentTransformer;
|
||||
import com.linkedin.metadata.systemmetadata.SystemMetadataService;
|
||||
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.springframework.boot.test.mock.mockito.MockBean;
|
||||
import org.springframework.context.annotation.ComponentScan;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
|
||||
|
||||
@Configuration
|
||||
@ -52,6 +55,12 @@ public class MCLSpringTestConfiguration {
|
||||
@MockBean(name = "dataHubUpgradeKafkaListener")
|
||||
public DataHubUpgradeKafkaListener dataHubUpgradeKafkaListener;
|
||||
|
||||
@MockBean(name = "duheSchemaRegistryConfig")
|
||||
public SchemaRegistryConfig schemaRegistryConfig;
|
||||
|
||||
@MockBean(name = "duheKafkaConsumerFactory")
|
||||
public DefaultKafkaConsumerFactory<String, GenericRecord> defaultKafkaConsumerFactory;
|
||||
|
||||
@MockBean
|
||||
public SchemaRegistryService schemaRegistryService;
|
||||
}
|
||||
|
||||
@ -31,12 +31,18 @@ import org.springframework.context.annotation.PropertySource;
|
||||
public class DataHubKafkaProducerFactory {
|
||||
|
||||
@Autowired
|
||||
private SchemaRegistryConfig schemaRegistryConfig;
|
||||
@Qualifier("schemaRegistryConfig")
|
||||
private SchemaRegistryConfig _schemaRegistryConfig;
|
||||
|
||||
@Bean(name = "kafkaProducer")
|
||||
protected Producer<String, IndexedRecord> createInstance(@Qualifier("configurationProvider") ConfigurationProvider
|
||||
provider, KafkaProperties properties) {
|
||||
KafkaConfiguration kafkaConfiguration = provider.getKafka();
|
||||
return new KafkaProducer<>(buildProducerProperties(_schemaRegistryConfig, kafkaConfiguration, properties));
|
||||
}
|
||||
|
||||
public static Map<String, Object> buildProducerProperties(SchemaRegistryConfig schemaRegistryConfig,
|
||||
KafkaConfiguration kafkaConfiguration, KafkaProperties properties) {
|
||||
KafkaProperties.Producer producerProps = properties.getProducer();
|
||||
|
||||
producerProps.setKeySerializer(StringSerializer.class);
|
||||
@ -56,10 +62,10 @@ public class DataHubKafkaProducerFactory {
|
||||
|
||||
// Override KafkaProperties with SchemaRegistryConfig only for non-empty values
|
||||
schemaRegistryConfig.getProperties().entrySet()
|
||||
.stream()
|
||||
.filter(entry -> entry.getValue() != null && !entry.getValue().toString().isEmpty())
|
||||
.forEach(entry -> props.put(entry.getKey(), entry.getValue()));
|
||||
.stream()
|
||||
.filter(entry -> entry.getValue() != null && !entry.getValue().toString().isEmpty())
|
||||
.forEach(entry -> props.put(entry.getKey(), entry.getValue()));
|
||||
|
||||
return new KafkaProducer<>(props);
|
||||
return props;
|
||||
}
|
||||
}
|
||||
@ -6,9 +6,11 @@ import com.linkedin.gms.factory.kafka.schemaregistry.InternalSchemaRegistryFacto
|
||||
import com.linkedin.gms.factory.kafka.schemaregistry.KafkaSchemaRegistryFactory;
|
||||
import com.linkedin.gms.factory.kafka.schemaregistry.SchemaRegistryConfig;
|
||||
import com.linkedin.metadata.config.kafka.KafkaConfiguration;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
@ -28,54 +30,90 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
public class KafkaEventConsumerFactory {
|
||||
|
||||
|
||||
private int kafkaEventConsumerConcurrency;
|
||||
private int kafkaEventConsumerConcurrency;
|
||||
|
||||
@Bean(name = "kafkaConsumerFactory")
|
||||
protected DefaultKafkaConsumerFactory<String, GenericRecord> createConsumerFactory(
|
||||
@Qualifier("configurationProvider") ConfigurationProvider provider,
|
||||
KafkaProperties baseKafkaProperties, SchemaRegistryConfig schemaRegistryConfig) {
|
||||
kafkaEventConsumerConcurrency = provider.getKafka().getListener().getConcurrency();
|
||||
@Bean(name = "kafkaConsumerFactory")
|
||||
protected DefaultKafkaConsumerFactory<String, GenericRecord> createConsumerFactory(
|
||||
@Qualifier("configurationProvider") ConfigurationProvider provider,
|
||||
KafkaProperties baseKafkaProperties,
|
||||
SchemaRegistryConfig schemaRegistryConfig) {
|
||||
kafkaEventConsumerConcurrency = provider.getKafka().getListener().getConcurrency();
|
||||
|
||||
KafkaConfiguration kafkaConfiguration = provider.getKafka();
|
||||
KafkaConfiguration kafkaConfiguration = provider.getKafka();
|
||||
Map<String, Object> customizedProperties = buildCustomizedProperties(baseKafkaProperties, kafkaConfiguration,
|
||||
schemaRegistryConfig);
|
||||
|
||||
KafkaProperties.Consumer consumerProps = baseKafkaProperties.getConsumer();
|
||||
return new DefaultKafkaConsumerFactory<>(customizedProperties);
|
||||
}
|
||||
|
||||
// Specify (de)serializers for record keys and for record values.
|
||||
consumerProps.setKeyDeserializer(StringDeserializer.class);
|
||||
// Records will be flushed every 10 seconds.
|
||||
consumerProps.setEnableAutoCommit(true);
|
||||
consumerProps.setAutoCommitInterval(Duration.ofSeconds(10));
|
||||
@Bean(name = "duheKafkaConsumerFactory")
|
||||
protected DefaultKafkaConsumerFactory<String, GenericRecord> duheKafkaConsumerFactory(
|
||||
@Qualifier("configurationProvider") ConfigurationProvider provider,
|
||||
KafkaProperties baseKafkaProperties,
|
||||
@Qualifier("duheSchemaRegistryConfig") SchemaRegistryConfig schemaRegistryConfig) {
|
||||
|
||||
// KAFKA_BOOTSTRAP_SERVER has precedence over SPRING_KAFKA_BOOTSTRAP_SERVERS
|
||||
if (kafkaConfiguration.getBootstrapServers() != null && kafkaConfiguration.getBootstrapServers().length() > 0) {
|
||||
consumerProps.setBootstrapServers(Arrays.asList(kafkaConfiguration.getBootstrapServers().split(",")));
|
||||
} // else we rely on KafkaProperties which defaults to localhost:9092
|
||||
KafkaConfiguration kafkaConfiguration = provider.getKafka();
|
||||
Map<String, Object> customizedProperties = buildCustomizedProperties(baseKafkaProperties, kafkaConfiguration,
|
||||
schemaRegistryConfig);
|
||||
|
||||
Map<String, Object> customizedProperties = baseKafkaProperties.buildConsumerProperties();
|
||||
customizedProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, schemaRegistryConfig.getDeserializer());
|
||||
return new DefaultKafkaConsumerFactory<>(customizedProperties);
|
||||
}
|
||||
|
||||
// Override KafkaProperties with SchemaRegistryConfig only for non-empty values
|
||||
schemaRegistryConfig.getProperties().entrySet()
|
||||
.stream()
|
||||
.filter(entry -> entry.getValue() != null && !entry.getValue().toString().isEmpty())
|
||||
.forEach(entry -> customizedProperties.put(entry.getKey(), entry.getValue()));
|
||||
private static Map<String, Object> buildCustomizedProperties(KafkaProperties baseKafkaProperties,
|
||||
KafkaConfiguration kafkaConfiguration,
|
||||
SchemaRegistryConfig schemaRegistryConfig) {
|
||||
KafkaProperties.Consumer consumerProps = baseKafkaProperties.getConsumer();
|
||||
|
||||
return new DefaultKafkaConsumerFactory<>(customizedProperties);
|
||||
}
|
||||
// Specify (de)serializers for record keys and for record values.
|
||||
consumerProps.setKeyDeserializer(StringDeserializer.class);
|
||||
// Records will be flushed every 10 seconds.
|
||||
consumerProps.setEnableAutoCommit(true);
|
||||
consumerProps.setAutoCommitInterval(Duration.ofSeconds(10));
|
||||
|
||||
@Bean(name = "kafkaEventConsumer")
|
||||
protected KafkaListenerContainerFactory<?> createInstance(
|
||||
@Qualifier("kafkaConsumerFactory") DefaultKafkaConsumerFactory<String, GenericRecord> kafkaConsumerFactory) {
|
||||
// KAFKA_BOOTSTRAP_SERVER has precedence over SPRING_KAFKA_BOOTSTRAP_SERVERS
|
||||
if (kafkaConfiguration.getBootstrapServers() != null && kafkaConfiguration.getBootstrapServers().length() > 0) {
|
||||
consumerProps.setBootstrapServers(Arrays.asList(kafkaConfiguration.getBootstrapServers().split(",")));
|
||||
} // else we rely on KafkaProperties which defaults to localhost:9092
|
||||
|
||||
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory =
|
||||
new ConcurrentKafkaListenerContainerFactory<>();
|
||||
factory.setConsumerFactory(kafkaConsumerFactory);
|
||||
factory.setContainerCustomizer(new ThreadPoolContainerCustomizer());
|
||||
factory.setConcurrency(kafkaEventConsumerConcurrency);
|
||||
Map<String, Object> customizedProperties = baseKafkaProperties.buildConsumerProperties();
|
||||
customizedProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, schemaRegistryConfig.getDeserializer());
|
||||
|
||||
log.info(String.format("Event-based KafkaListenerContainerFactory built successfully. Consumer concurrency = %s",
|
||||
kafkaEventConsumerConcurrency));
|
||||
// Override KafkaProperties with SchemaRegistryConfig only for non-empty values
|
||||
schemaRegistryConfig.getProperties().entrySet()
|
||||
.stream()
|
||||
.filter(entry -> entry.getValue() != null && !entry.getValue().toString().isEmpty())
|
||||
.forEach(entry -> customizedProperties.put(entry.getKey(), entry.getValue()));
|
||||
|
||||
return factory;
|
||||
}
|
||||
return customizedProperties;
|
||||
}
|
||||
|
||||
@Bean(name = "kafkaEventConsumer")
|
||||
protected KafkaListenerContainerFactory<?> createInstance(
|
||||
@Qualifier("kafkaConsumerFactory") DefaultKafkaConsumerFactory<String, GenericRecord> kafkaConsumerFactory) {
|
||||
|
||||
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory =
|
||||
new ConcurrentKafkaListenerContainerFactory<>();
|
||||
factory.setConsumerFactory(kafkaConsumerFactory);
|
||||
factory.setContainerCustomizer(new ThreadPoolContainerCustomizer());
|
||||
factory.setConcurrency(kafkaEventConsumerConcurrency);
|
||||
|
||||
log.info(String.format("Event-based KafkaListenerContainerFactory built successfully. Consumer concurrency = %s",
|
||||
kafkaEventConsumerConcurrency));
|
||||
|
||||
return factory;
|
||||
}
|
||||
|
||||
@Bean(name = "duheKafkaEventConsumer")
|
||||
protected KafkaListenerContainerFactory<?> duheKafkaEventConsumer(
|
||||
@Qualifier("duheKafkaConsumerFactory") DefaultKafkaConsumerFactory<String, GenericRecord> kafkaConsumerFactory) {
|
||||
|
||||
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory =
|
||||
new ConcurrentKafkaListenerContainerFactory<>();
|
||||
factory.setConsumerFactory(kafkaConsumerFactory);
|
||||
factory.setContainerCustomizer(new ThreadPoolContainerCustomizer());
|
||||
factory.setConcurrency(1);
|
||||
|
||||
log.info("Event-based DUHE KafkaListenerContainerFactory built successfully. Consumer concurrency = 1");
|
||||
return factory;
|
||||
}
|
||||
}
|
||||
@ -31,7 +31,7 @@ public class AwsGlueSchemaRegistryFactory {
|
||||
@Value("${kafka.schemaRegistry.awsGlue.registryName}")
|
||||
private Optional<String> registryName;
|
||||
|
||||
@Bean
|
||||
@Bean("schemaRegistryConfig")
|
||||
@Nonnull
|
||||
protected SchemaRegistryConfig getInstance(ConfigurationProvider configurationProvider) {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
|
||||
@ -0,0 +1,32 @@
|
||||
package com.linkedin.gms.factory.kafka.schemaregistry;
|
||||
|
||||
import com.linkedin.gms.factory.config.ConfigurationProvider;
|
||||
import com.linkedin.metadata.boot.kafka.MockDUHEDeserializer;
|
||||
import com.linkedin.metadata.boot.kafka.MockDUHESerializer;
|
||||
import com.linkedin.metadata.config.kafka.KafkaConfiguration;
|
||||
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
@Slf4j
|
||||
@Configuration
|
||||
public class DUHESchemaRegistryFactory {
|
||||
/**
|
||||
* Configure Kafka Producer/Consumer processes with a custom schema registry.
|
||||
*/
|
||||
@Bean("duheSchemaRegistryConfig")
|
||||
protected SchemaRegistryConfig duheSchemaRegistryConfig(ConfigurationProvider provider) {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
KafkaConfiguration kafkaConfiguration = provider.getKafka();
|
||||
|
||||
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaConfiguration
|
||||
.getSchemaRegistry().getUrl());
|
||||
|
||||
log.info("DataHub System Update Registry");
|
||||
return new SchemaRegistryConfig(MockDUHESerializer.class, MockDUHEDeserializer.class, props);
|
||||
}
|
||||
}
|
||||
@ -30,7 +30,7 @@ public class InternalSchemaRegistryFactory {
|
||||
/**
|
||||
* Configure Kafka Producer/Consumer processes with a custom schema registry.
|
||||
*/
|
||||
@Bean
|
||||
@Bean("schemaRegistryConfig")
|
||||
@Nonnull
|
||||
protected SchemaRegistryConfig getInstance(@Qualifier("configurationProvider") ConfigurationProvider provider) {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
|
||||
@ -44,7 +44,7 @@ public class KafkaSchemaRegistryFactory {
|
||||
@Value("${kafka.schema.registry.security.protocol:}")
|
||||
private String securityProtocol;
|
||||
|
||||
@Bean
|
||||
@Bean("schemaRegistryConfig")
|
||||
@Nonnull
|
||||
protected SchemaRegistryConfig getInstance(ConfigurationProvider configurationProvider) {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
|
||||
@ -12,12 +12,12 @@ import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.kafka.annotation.EnableKafka;
|
||||
import org.springframework.kafka.annotation.KafkaListener;
|
||||
@ -30,7 +30,6 @@ import org.springframework.stereotype.Component;
|
||||
// We don't disable this on GMS since we want GMS to also wait until the system is ready to read in case of
|
||||
// backwards incompatible query logic dependent on system updates.
|
||||
@Component("dataHubUpgradeKafkaListener")
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
@EnableKafka
|
||||
public class DataHubUpgradeKafkaListener implements ConsumerSeekAware, BootstrapDependency {
|
||||
@ -56,6 +55,15 @@ public class DataHubUpgradeKafkaListener implements ConsumerSeekAware, Bootstrap
|
||||
|
||||
private final static AtomicBoolean IS_UPDATED = new AtomicBoolean(false);
|
||||
|
||||
public DataHubUpgradeKafkaListener(KafkaListenerEndpointRegistry registry,
|
||||
@Qualifier("duheKafkaConsumerFactory") DefaultKafkaConsumerFactory<String, GenericRecord> defaultKafkaConsumerFactory,
|
||||
GitVersion gitVersion,
|
||||
ConfigurationProvider configurationProvider) {
|
||||
this.registry = registry;
|
||||
this._defaultKafkaConsumerFactory = defaultKafkaConsumerFactory;
|
||||
this._gitVersion = gitVersion;
|
||||
this._configurationProvider = configurationProvider;
|
||||
}
|
||||
|
||||
// Constructs a consumer to read determine final offset to assign, prevents re-reading whole topic to get the latest version
|
||||
@Override
|
||||
@ -73,7 +81,7 @@ public class DataHubUpgradeKafkaListener implements ConsumerSeekAware, Bootstrap
|
||||
}
|
||||
}
|
||||
|
||||
@KafkaListener(id = CONSUMER_GROUP, topics = {TOPIC_NAME}, containerFactory = "kafkaEventConsumer", concurrency = "1")
|
||||
@KafkaListener(id = CONSUMER_GROUP, topics = {TOPIC_NAME}, containerFactory = "duheKafkaEventConsumer", concurrency = "1")
|
||||
public void checkSystemVersion(final ConsumerRecord<String, GenericRecord> consumerRecord) {
|
||||
final GenericRecord record = consumerRecord.value();
|
||||
final String expectedVersion = String.format("%s-%s", _gitVersion.getVersion(), revision);
|
||||
|
||||
@ -0,0 +1,57 @@
|
||||
package com.linkedin.metadata.boot.kafka;
|
||||
|
||||
import com.linkedin.metadata.EventUtils;
|
||||
import io.confluent.kafka.schemaregistry.ParsedSchema;
|
||||
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
|
||||
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
|
||||
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
|
||||
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
|
||||
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
import static com.linkedin.metadata.boot.kafka.MockDUHESerializer.DATAHUB_UPGRADE_HISTORY_EVENT_SUBJECT;
|
||||
|
||||
/**
|
||||
* Used for early bootstrap to avoid contact with not yet existing schema registry
|
||||
*/
|
||||
@Slf4j
|
||||
public class MockDUHEDeserializer extends KafkaAvroDeserializer {
|
||||
|
||||
public MockDUHEDeserializer() {
|
||||
this.schemaRegistry = buildMockSchemaRegistryClient();
|
||||
}
|
||||
|
||||
public MockDUHEDeserializer(SchemaRegistryClient client) {
|
||||
this.schemaRegistry = buildMockSchemaRegistryClient();
|
||||
}
|
||||
|
||||
public MockDUHEDeserializer(SchemaRegistryClient client, Map<String, ?> props) {
|
||||
super(client, props);
|
||||
this.schemaRegistry = buildMockSchemaRegistryClient();
|
||||
}
|
||||
|
||||
private static MockSchemaRegistryClient buildMockSchemaRegistryClient() {
|
||||
MockSchemaRegistryClient schemaRegistry = new MockSchemaRegistryClient2();
|
||||
try {
|
||||
schemaRegistry.register(DATAHUB_UPGRADE_HISTORY_EVENT_SUBJECT,
|
||||
new AvroSchema(EventUtils.ORIGINAL_DUHE_AVRO_SCHEMA));
|
||||
return schemaRegistry;
|
||||
} catch (IOException | RestClientException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static class MockSchemaRegistryClient2 extends MockSchemaRegistryClient {
|
||||
/**
|
||||
* Previously used topics can have schema ids > 1 which fully match
|
||||
* however we are replacing that registry so force schema id to 1
|
||||
*/
|
||||
@Override
|
||||
public synchronized ParsedSchema getSchemaById(int id) throws IOException, RestClientException {
|
||||
return super.getSchemaById(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,45 @@
|
||||
package com.linkedin.metadata.boot.kafka;
|
||||
|
||||
import com.linkedin.metadata.EventUtils;
|
||||
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
|
||||
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
|
||||
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
|
||||
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
|
||||
import io.confluent.kafka.serializers.KafkaAvroSerializer;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Used for early bootstrap to avoid contact with not yet existing schema registry
|
||||
*/
|
||||
@Slf4j
|
||||
public class MockDUHESerializer extends KafkaAvroSerializer {
|
||||
|
||||
static final String DATAHUB_UPGRADE_HISTORY_EVENT_SUBJECT = "DataHubUpgradeHistory_v1-value";
|
||||
|
||||
public MockDUHESerializer() {
|
||||
this.schemaRegistry = buildMockSchemaRegistryClient();
|
||||
}
|
||||
|
||||
public MockDUHESerializer(SchemaRegistryClient client) {
|
||||
this.schemaRegistry = buildMockSchemaRegistryClient();
|
||||
}
|
||||
|
||||
public MockDUHESerializer(SchemaRegistryClient client, Map<String, ?> props) {
|
||||
super(client, props);
|
||||
this.schemaRegistry = buildMockSchemaRegistryClient();
|
||||
}
|
||||
|
||||
private static MockSchemaRegistryClient buildMockSchemaRegistryClient() {
|
||||
MockSchemaRegistryClient schemaRegistry = new MockSchemaRegistryClient();
|
||||
try {
|
||||
schemaRegistry.register(DATAHUB_UPGRADE_HISTORY_EVENT_SUBJECT,
|
||||
new AvroSchema(EventUtils.ORIGINAL_DUHE_AVRO_SCHEMA));
|
||||
return schemaRegistry;
|
||||
} catch (IOException | RestClientException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user