From aeea2df86ef6ac3719983bdf193169ebb33ed9ed Mon Sep 17 00:00:00 2001 From: Antoine Balliet Date: Mon, 15 Jul 2024 17:35:01 +0200 Subject: [PATCH] feat: allow specify schemaRegistryTopicSuffixName (#17027) * feat: allow specify schemaRegistryTopicSuffixName * chore: use default value * add comment * fix doc --- .../source/messaging/common_broker_source.py | 17 +++++++++++++++-- .../connections/messaging/kafkaConnection.json | 6 ++++++ 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/messaging/common_broker_source.py b/ingestion/src/metadata/ingestion/source/messaging/common_broker_source.py index 24dc33471f9..722a7d17636 100644 --- a/ingestion/src/metadata/ingestion/source/messaging/common_broker_source.py +++ b/ingestion/src/metadata/ingestion/source/messaging/common_broker_source.py @@ -75,6 +75,7 @@ class CommonBrokerSource(MessagingServiceSource, ABC): ): super().__init__(config, metadata) self.generate_sample_data = self.config.sourceConfig.config.generateSampleData + self.service_connection = self.config.serviceConnection.root.config self.admin_client = self.connection.admin_client self.schema_registry_client = self.connection.schema_registry_client self.context.processed_schemas = {} @@ -236,15 +237,27 @@ class CommonBrokerSource(MessagingServiceSource, ABC): return None def _parse_topic_metadata(self, topic_name: str) -> Optional[Schema]: + + # To find topic in artifact registry, dafault is "-value" + # But suffix can be overridden using schemaRegistryTopicSuffixName + topic_schema_registry_name = ( + topic_name + self.service_connection.schemaRegistryTopicSuffixName + ) + try: if self.schema_registry_client: registered_schema = self.schema_registry_client.get_latest_version( - topic_name + "-value" + topic_schema_registry_name ) return registered_schema.schema except Exception as exc: logger.debug(traceback.format_exc()) - logger.warning(f"Failed to get schema for topic [{topic_name}]: {exc}") + logger.warning( + ( + f"Failed to get schema for topic [{topic_name}] " + f"(looking for {topic_schema_registry_name}) in registry: {exc}" + ) + ) self.status.warning( topic_name, f"failed to get schema: {exc} for topic {topic_name}" ) diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/messaging/kafkaConnection.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/messaging/kafkaConnection.json index 7633f00c9bb..e5590befd00 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/messaging/kafkaConnection.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/messaging/kafkaConnection.json @@ -82,6 +82,12 @@ "default": {}, "additionalProperties": true }, + "schemaRegistryTopicSuffixName": { + "title": "Schema Registry Topic Suffix Name", + "description": "Schema Registry Topic Suffix Name. The suffix to be appended to the topic name to get topic schema from registry.", + "type": "string", + "default": "-value" + }, "schemaRegistrySSL": { "title": "Schema Registry SSL", "description": "Schema Registry SSL Config. Configuration for enabling SSL for the Schema Registry connection.",