diff --git a/ingestion/src/metadata/ingestion/source/messaging/common_broker_source.py b/ingestion/src/metadata/ingestion/source/messaging/common_broker_source.py index 456fa0987a0..498f93f9614 100644 --- a/ingestion/src/metadata/ingestion/source/messaging/common_broker_source.py +++ b/ingestion/src/metadata/ingestion/source/messaging/common_broker_source.py @@ -81,6 +81,10 @@ class CommonBrokerSource(MessagingServiceSource, ABC): self, topic_details: BrokerTopicDetails ) -> Iterable[CreateTopicRequest]: try: + schema_type_map = { + key.lower(): value.value + for key, value in SchemaType.__members__.items() + } logger.info(f"Fetching topic schema {topic_details.topic_name}") topic_schema = self._parse_topic_metadata(topic_details.topic_name) logger.info(f"Fetching topic config {topic_details.topic_name}") @@ -115,16 +119,18 @@ class CommonBrokerSource(MessagingServiceSource, ABC): topic_details.topic_name, topic_schema.schema_str ) - topic.messageSchema = ( - Topic( - schemaText=topic_schema.schema_str, - schemaType=SchemaType(topic_schema.schema_type).value, - schemaFields=schema_fields, + topic.messageSchema = Topic( + schemaText=topic_schema.schema_str, + schemaType=schema_type_map.get( + topic_schema.schema_type.lower(), SchemaType.Other.value ), + schemaFields=schema_fields, ) - if topic_schema.schema_type.lower() == SchemaType.Avro.value.lower(): - if self.generate_sample_data: - topic.sampleData = self._get_sample_data(topic.name) + if ( + topic_schema.schema_type.lower() == SchemaType.Avro.value.lower() + and self.generate_sample_data + ): + topic.sampleData = self._get_sample_data(topic.name) self.status.topic_scanned(topic.name.__root__) yield topic @@ -209,13 +215,19 @@ class CommonBrokerSource(MessagingServiceSource, ABC): else: if messages: for message in messages: - sample_data.append( - str( - self.consumer_client._serializer.decode_message( # pylint: disable=protected-access - message.value() + try: + sample_data.append( + str( + self.consumer_client._serializer.decode_message( # pylint: disable=protected-access + message.value() + ) ) ) - ) + except Exception as exc: + logger.warning( + f"Failed to decode sample data from topic {topic_name.__root__}: {exc}" + ) + self.consumer_client.unsubscribe() return TopicSampleData(messages=sample_data)