From fa4cc3ad31339d784702556eaa9ab1a01b2295f6 Mon Sep 17 00:00:00 2001 From: Javier Sotelo Date: Mon, 11 Apr 2022 22:02:47 -0700 Subject: [PATCH] fix(metadata-service): stop overriding kafka registry props with empty values (#4604) Co-authored-by: jsotelo --- .../gms/factory/kafka/DataHubKafkaProducerFactory.java | 7 ++++++- .../gms/factory/kafka/KafkaEventConsumerFactory.java | 7 ++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/DataHubKafkaProducerFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/DataHubKafkaProducerFactory.java index 997ddc0c07..0549ed2fc2 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/DataHubKafkaProducerFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/DataHubKafkaProducerFactory.java @@ -65,7 +65,12 @@ public class DataHubKafkaProducerFactory { Map props = properties.buildProducerProperties(); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, schemaRegistryConfig.getSerializer().getName()); - props.putAll(schemaRegistryConfig.getProperties()); + + // Override KafkaProperties with SchemaRegistryConfig only for non-empty values + schemaRegistryConfig.getProperties().entrySet() + .stream() + .filter(entry -> entry.getValue() != null && !entry.getValue().toString().isEmpty()) + .forEach(entry -> props.put(entry.getKey(), entry.getValue())); return new KafkaProducer<>(props); } diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java index a57fe3999a..dbdb470916 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java @@ -76,7 +76,12 @@ public class KafkaEventConsumerFactory { consumerProps.setValueDeserializer(schemaRegistryConfig.getDeserializer()); Map props = properties.buildConsumerProperties(); - props.putAll(schemaRegistryConfig.getProperties()); + + // Override KafkaProperties with SchemaRegistryConfig only for non-empty values + schemaRegistryConfig.getProperties().entrySet() + .stream() + .filter(entry -> entry.getValue() != null && !entry.getValue().toString().isEmpty()) + .forEach(entry -> props.put(entry.getKey(), entry.getValue())); ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();