Messaging Broker Minor Fix (#9468)

This commit is contained in:
Mayur Singal 2022-12-21 22:11:26 +05:30 committed by GitHub
parent a7fd28b5d8
commit e10fff89b4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -81,6 +81,10 @@ class CommonBrokerSource(MessagingServiceSource, ABC):
self, topic_details: BrokerTopicDetails self, topic_details: BrokerTopicDetails
) -> Iterable[CreateTopicRequest]: ) -> Iterable[CreateTopicRequest]:
try: try:
schema_type_map = {
key.lower(): value.value
for key, value in SchemaType.__members__.items()
}
logger.info(f"Fetching topic schema {topic_details.topic_name}") logger.info(f"Fetching topic schema {topic_details.topic_name}")
topic_schema = self._parse_topic_metadata(topic_details.topic_name) topic_schema = self._parse_topic_metadata(topic_details.topic_name)
logger.info(f"Fetching topic config {topic_details.topic_name}") logger.info(f"Fetching topic config {topic_details.topic_name}")
@ -115,15 +119,17 @@ class CommonBrokerSource(MessagingServiceSource, ABC):
topic_details.topic_name, topic_schema.schema_str topic_details.topic_name, topic_schema.schema_str
) )
topic.messageSchema = ( topic.messageSchema = Topic(
Topic(
schemaText=topic_schema.schema_str, schemaText=topic_schema.schema_str,
schemaType=SchemaType(topic_schema.schema_type).value, schemaType=schema_type_map.get(
schemaFields=schema_fields, topic_schema.schema_type.lower(), SchemaType.Other.value
), ),
schemaFields=schema_fields,
) )
if topic_schema.schema_type.lower() == SchemaType.Avro.value.lower(): if (
if self.generate_sample_data: topic_schema.schema_type.lower() == SchemaType.Avro.value.lower()
and self.generate_sample_data
):
topic.sampleData = self._get_sample_data(topic.name) topic.sampleData = self._get_sample_data(topic.name)
self.status.topic_scanned(topic.name.__root__) self.status.topic_scanned(topic.name.__root__)
@ -209,6 +215,7 @@ class CommonBrokerSource(MessagingServiceSource, ABC):
else: else:
if messages: if messages:
for message in messages: for message in messages:
try:
sample_data.append( sample_data.append(
str( str(
self.consumer_client._serializer.decode_message( # pylint: disable=protected-access self.consumer_client._serializer.decode_message( # pylint: disable=protected-access
@ -216,6 +223,11 @@ class CommonBrokerSource(MessagingServiceSource, ABC):
) )
) )
) )
except Exception as exc:
logger.warning(
f"Failed to decode sample data from topic {topic_name.__root__}: {exc}"
)
self.consumer_client.unsubscribe() self.consumer_client.unsubscribe()
return TopicSampleData(messages=sample_data) return TopicSampleData(messages=sample_data)