fix(ingest/kafka): fix ResourceType import error for confluent_kafka<1.9.0 (#7046)

Fixes https://github.com/datahub-project/datahub/issues/7020
This commit is contained in:
Mayuri Nehate 2023-01-18 00:03:46 +05:30 committed by GitHub
parent 5020a627fc
commit 7607c04ffa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -12,7 +12,6 @@ from confluent_kafka.admin import (
AdminClient,
ConfigEntry,
ConfigResource,
ResourceType,
TopicMetadata,
)
@ -404,11 +403,10 @@ class KafkaSource(StatefulIngestionSourceBase):
def fetch_topic_configurations(self, topics: List[str]) -> Dict[str, dict]:
logger.info("Fetching config details for all topics")
configs: Dict[
ConfigResource, concurrent.futures.Future
] = self.admin_client.describe_configs(
resources=[ConfigResource(ResourceType.TOPIC, t) for t in topics],
resources=[ConfigResource(ConfigResource.Type.TOPIC, t) for t in topics],
request_timeout=self.source_config.connection.client_timeout_seconds,
)
logger.debug("Waiting for config details futures to complete")