diff --git a/ingestion/src/metadata/ingestion/source/messaging/common_broker_source.py b/ingestion/src/metadata/ingestion/source/messaging/common_broker_source.py index 8d43b55a0ed..953e05456ae 100644 --- a/ingestion/src/metadata/ingestion/source/messaging/common_broker_source.py +++ b/ingestion/src/metadata/ingestion/source/messaging/common_broker_source.py @@ -226,14 +226,16 @@ class CommonBrokerSource(MessagingServiceSource, ABC): if topic_entity and self.generate_sample_data: topic_name = topic_details.topic_name sample_data = [] + messages = None try: - self.consumer_client.subscribe( - [topic_name], on_assign=on_partitions_assignment_to_consumer - ) - logger.info( - f"Broker consumer polling for sample messages in topic {topic_name}" - ) - messages = self.consumer_client.consume(num_messages=10, timeout=10) + if self.consumer_client: + self.consumer_client.subscribe( + [topic_name], on_assign=on_partitions_assignment_to_consumer + ) + logger.info( + f"Broker consumer polling for sample messages in topic {topic_name}" + ) + messages = self.consumer_client.consume(num_messages=10, timeout=10) except Exception as exc: yield Either( left=StackTraceError( diff --git a/ingestion/src/metadata/ingestion/source/messaging/kafka/connection.py b/ingestion/src/metadata/ingestion/source/messaging/kafka/connection.py index 40855bc4b65..7310f82b424 100644 --- a/ingestion/src/metadata/ingestion/source/messaging/kafka/connection.py +++ b/ingestion/src/metadata/ingestion/source/messaging/kafka/connection.py @@ -41,6 +41,12 @@ class InvalidKafkaCreds(Exception): """ +class SchemaRegistryException(Exception): + """ + Class to indicate invalid schema registry not initialized + """ + + TIMEOUT_SECONDS = 10 @@ -134,9 +140,18 @@ def test_connection( "Please validate credentials and check if you are using correct security protocol" ) + def schema_registry_test(): + if client.schema_registry_client: + client.schema_registry_client.get_subjects() + else: + raise SchemaRegistryException( + "Schema Registry not initialized, please provide schema registry " + "credentials in case you want topic schema and sample data to be ingested" + ) + test_fn = { "GetTopics": custom_executor, - "CheckSchemaRegistry": client.schema_registry_client.get_subjects, + "CheckSchemaRegistry": schema_registry_test, } test_connection_steps(