diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py b/metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py index e57dc853a8..709ba431f0 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py @@ -148,7 +148,7 @@ def get_kafka_consumer( ) -> confluent_kafka.Consumer: consumer = confluent_kafka.Consumer( { - "group.id": "test", + "group.id": "datahub-kafka-ingestion", "bootstrap.servers": connection.bootstrap, **connection.consumer_config, } @@ -164,6 +164,25 @@ def get_kafka_consumer( return consumer +def get_kafka_admin_client( + connection: KafkaConsumerConnectionConfig, +) -> AdminClient: + client = AdminClient( + { + "group.id": "datahub-kafka-ingestion", + "bootstrap.servers": connection.bootstrap, + **connection.consumer_config, + } + ) + if CallableConsumerConfig.is_callable_config(connection.consumer_config): + # As per documentation, we need to explicitly call the poll method to make sure OAuth callback gets executed + # https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#kafka-client-configuration + logger.debug("Initiating polling for kafka admin client") + client.poll(timeout=30) + logger.debug("Initiated polling for kafka admin client") + return client + + @dataclass class KafkaSourceReport(StaleEntityRemovalSourceReport): topics_scanned: int = 0 @@ -278,13 +297,7 @@ class KafkaSource(StatefulIngestionSourceBase, TestableSource): def init_kafka_admin_client(self) -> None: try: # TODO: Do we require separate config than existing consumer_config ? - self.admin_client = AdminClient( - { - "group.id": "test", - "bootstrap.servers": self.source_config.connection.bootstrap, - **self.source_config.connection.consumer_config, - } - ) + self.admin_client = get_kafka_admin_client(self.source_config.connection) except Exception as e: logger.debug(e, exc_info=e) self.report.report_warning( diff --git a/metadata-ingestion/tests/integration/kafka/test_kafka.py b/metadata-ingestion/tests/integration/kafka/test_kafka.py index 597889c844..7462f17768 100644 --- a/metadata-ingestion/tests/integration/kafka/test_kafka.py +++ b/metadata-ingestion/tests/integration/kafka/test_kafka.py @@ -128,11 +128,32 @@ def test_kafka_oauth_callback( pipeline.run() - is_found: bool = False - with open(log_file, "r") as file: - for line_number, line in enumerate(file, 1): - if oauth.MESSAGE in line: - is_found = True - break + # Initialize flags to track oauth events + checks = { + "consumer_polling": False, + "consumer_oauth_callback": False, + "admin_polling": False, + "admin_oauth_callback": False, + } - assert is_found + # Read log file and check for oauth events + with open(log_file, "r") as file: + for line in file: + # Check for polling events + if "Initiating polling for kafka admin client" in line: + checks["admin_polling"] = True + elif "Initiating polling for kafka consumer" in line: + checks["consumer_polling"] = True + + # Check for oauth callbacks + if oauth.MESSAGE in line: + if checks["consumer_polling"] and not checks["admin_polling"]: + checks["consumer_oauth_callback"] = True + elif checks["consumer_polling"] and checks["admin_polling"]: + checks["admin_oauth_callback"] = True + + # Verify all oauth events occurred + assert checks["consumer_polling"], "Consumer polling was not initiated" + assert checks["consumer_oauth_callback"], "Consumer oauth callback not found" + assert checks["admin_polling"], "Admin polling was not initiated" + assert checks["admin_oauth_callback"], "Admin oauth callback not found"