diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka.py b/metadata-ingestion/src/datahub/ingestion/source/kafka.py index df6722bf8b..13e01e8e03 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka.py @@ -12,7 +12,6 @@ from confluent_kafka.admin import ( AdminClient, ConfigEntry, ConfigResource, - ResourceType, TopicMetadata, ) @@ -404,11 +403,10 @@ class KafkaSource(StatefulIngestionSourceBase): def fetch_topic_configurations(self, topics: List[str]) -> Dict[str, dict]: logger.info("Fetching config details for all topics") - configs: Dict[ ConfigResource, concurrent.futures.Future ] = self.admin_client.describe_configs( - resources=[ConfigResource(ResourceType.TOPIC, t) for t in topics], + resources=[ConfigResource(ConfigResource.Type.TOPIC, t) for t in topics], request_timeout=self.source_config.connection.client_timeout_seconds, ) logger.debug("Waiting for config details futures to complete")