mirror of
https://github.com/datahub-project/datahub.git
synced 2025-08-27 10:35:58 +00:00
fix(metadata-service): stop overriding kafka registry props with empty values (#4604)
Co-authored-by: jsotelo <javier.sotelo@viasat.com>
This commit is contained in:
parent
e7c5eb357c
commit
fa4cc3ad31
@ -65,7 +65,12 @@ public class DataHubKafkaProducerFactory {
|
|||||||
Map<String, Object> props = properties.buildProducerProperties();
|
Map<String, Object> props = properties.buildProducerProperties();
|
||||||
|
|
||||||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, schemaRegistryConfig.getSerializer().getName());
|
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, schemaRegistryConfig.getSerializer().getName());
|
||||||
props.putAll(schemaRegistryConfig.getProperties());
|
|
||||||
|
// Override KafkaProperties with SchemaRegistryConfig only for non-empty values
|
||||||
|
schemaRegistryConfig.getProperties().entrySet()
|
||||||
|
.stream()
|
||||||
|
.filter(entry -> entry.getValue() != null && !entry.getValue().toString().isEmpty())
|
||||||
|
.forEach(entry -> props.put(entry.getKey(), entry.getValue()));
|
||||||
|
|
||||||
return new KafkaProducer<>(props);
|
return new KafkaProducer<>(props);
|
||||||
}
|
}
|
||||||
|
@ -76,7 +76,12 @@ public class KafkaEventConsumerFactory {
|
|||||||
|
|
||||||
consumerProps.setValueDeserializer(schemaRegistryConfig.getDeserializer());
|
consumerProps.setValueDeserializer(schemaRegistryConfig.getDeserializer());
|
||||||
Map<String, Object> props = properties.buildConsumerProperties();
|
Map<String, Object> props = properties.buildConsumerProperties();
|
||||||
props.putAll(schemaRegistryConfig.getProperties());
|
|
||||||
|
// Override KafkaProperties with SchemaRegistryConfig only for non-empty values
|
||||||
|
schemaRegistryConfig.getProperties().entrySet()
|
||||||
|
.stream()
|
||||||
|
.filter(entry -> entry.getValue() != null && !entry.getValue().toString().isEmpty())
|
||||||
|
.forEach(entry -> props.put(entry.getKey(), entry.getValue()));
|
||||||
|
|
||||||
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory =
|
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory =
|
||||||
new ConcurrentKafkaListenerContainerFactory<>();
|
new ConcurrentKafkaListenerContainerFactory<>();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user