MINOR: Kafka handle test connection if Schema Registry not passed (#15250)

This commit is contained in:
Mayur Singal 2024-02-20 12:39:01 +05:30 committed by GitHub
parent 62c0cc7563
commit eac40862ec
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 25 additions and 8 deletions

View File

@ -226,14 +226,16 @@ class CommonBrokerSource(MessagingServiceSource, ABC):
if topic_entity and self.generate_sample_data: if topic_entity and self.generate_sample_data:
topic_name = topic_details.topic_name topic_name = topic_details.topic_name
sample_data = [] sample_data = []
messages = None
try: try:
self.consumer_client.subscribe( if self.consumer_client:
[topic_name], on_assign=on_partitions_assignment_to_consumer self.consumer_client.subscribe(
) [topic_name], on_assign=on_partitions_assignment_to_consumer
logger.info( )
f"Broker consumer polling for sample messages in topic {topic_name}" logger.info(
) f"Broker consumer polling for sample messages in topic {topic_name}"
messages = self.consumer_client.consume(num_messages=10, timeout=10) )
messages = self.consumer_client.consume(num_messages=10, timeout=10)
except Exception as exc: except Exception as exc:
yield Either( yield Either(
left=StackTraceError( left=StackTraceError(

View File

@ -41,6 +41,12 @@ class InvalidKafkaCreds(Exception):
""" """
class SchemaRegistryException(Exception):
"""
Class to indicate invalid schema registry not initialized
"""
TIMEOUT_SECONDS = 10 TIMEOUT_SECONDS = 10
@ -134,9 +140,18 @@ def test_connection(
"Please validate credentials and check if you are using correct security protocol" "Please validate credentials and check if you are using correct security protocol"
) )
def schema_registry_test():
if client.schema_registry_client:
client.schema_registry_client.get_subjects()
else:
raise SchemaRegistryException(
"Schema Registry not initialized, please provide schema registry "
"credentials in case you want topic schema and sample data to be ingested"
)
test_fn = { test_fn = {
"GetTopics": custom_executor, "GetTopics": custom_executor,
"CheckSchemaRegistry": client.schema_registry_client.get_subjects, "CheckSchemaRegistry": schema_registry_test,
} }
test_connection_steps( test_connection_steps(