feat(ingest): Add disable flag for TopicRecordNameStrategy (#8224)

Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
This commit is contained in:
Segun Sogunle 2023-07-10 19:51:48 +01:00 committed by GitHub
parent e4736d7b46
commit d8888cf0b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 18 additions and 1 deletions

View File

@ -80,8 +80,21 @@ class ConfluentSchemaRegistry(KafkaSchemaRegistryBase):
# Subject name format when the schema registry subject name strategy is
# (a) TopicNameStrategy(default strategy): <topic name>-<key/value>
# (b) TopicRecordNameStrategy: <topic name>-<fully-qualified record name>-<key/value>
# there's a third case
# (c) TopicNameStrategy differing by environment name suffixes.
# e.g "a.b.c.d-value" and "a.b.c.d.qa-value"
# For such instances, the wrong schema registry entries could picked by the previous logic.
for subject in self.known_schema_registry_subjects:
if subject.startswith(topic) and subject.endswith(subject_key_suffix):
if (
self.source_config.disable_topic_record_naming_strategy
and subject == subject_key
):
return subject
if (
(not self.source_config.disable_topic_record_naming_strategy)
and subject.startswith(topic)
and subject.endswith(subject_key_suffix)
):
return subject
return None

View File

@ -93,6 +93,10 @@ class KafkaSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin):
default=False,
description="Disables warnings reported for non-AVRO/Protobuf value or key schemas if set.",
)
disable_topic_record_naming_strategy: bool = pydantic.Field(
default=False,
description="Disables the utilization of the TopicRecordNameStrategy for Schema Registry subjects. For more information, visit: https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#handling-differences-between-preregistered-and-client-derived-schemas:~:text=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",
)
@dataclass