diff --git a/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py b/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py index 26b5410c78..dcaec4e457 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py +++ b/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py @@ -80,8 +80,21 @@ class ConfluentSchemaRegistry(KafkaSchemaRegistryBase): # Subject name format when the schema registry subject name strategy is # (a) TopicNameStrategy(default strategy): - # (b) TopicRecordNameStrategy: -- + # there's a third case + # (c) TopicNameStrategy differing by environment name suffixes. + # e.g "a.b.c.d-value" and "a.b.c.d.qa-value" + # For such instances, the wrong schema registry entries could picked by the previous logic. for subject in self.known_schema_registry_subjects: - if subject.startswith(topic) and subject.endswith(subject_key_suffix): + if ( + self.source_config.disable_topic_record_naming_strategy + and subject == subject_key + ): + return subject + if ( + (not self.source_config.disable_topic_record_naming_strategy) + and subject.startswith(topic) + and subject.endswith(subject_key_suffix) + ): return subject return None diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka.py b/metadata-ingestion/src/datahub/ingestion/source/kafka.py index 6a0f8c4f2c..61f6103347 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka.py @@ -93,6 +93,10 @@ class KafkaSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin): default=False, description="Disables warnings reported for non-AVRO/Protobuf value or key schemas if set.", ) + disable_topic_record_naming_strategy: bool = pydantic.Field( + default=False, + description="Disables the utilization of the TopicRecordNameStrategy for Schema Registry subjects. For more information, visit: https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#handling-differences-between-preregistered-and-client-derived-schemas:~:text=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy", + ) @dataclass