diff --git a/metadata-ingestion/src/datahub/configuration/kafka.py b/metadata-ingestion/src/datahub/configuration/kafka.py index 4ecbd421b9..fa9b3a3c28 100644 --- a/metadata-ingestion/src/datahub/configuration/kafka.py +++ b/metadata-ingestion/src/datahub/configuration/kafka.py @@ -16,6 +16,8 @@ class _KafkaConnectionConfig(ConfigModel): description="Extra schema registry config serialized as JSON. These options will be passed into Kafka's SchemaRegistryClient. https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html?#schemaregistryclient", ) + client_timeout_seconds: int = Field(default=60, description="The request timeout used when interacting with the Kafka APIs.") + @validator("bootstrap") def bootstrap_host_colon_port_comma(cls, val: str) -> str: for entry in val.split(","): diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka.py b/metadata-ingestion/src/datahub/ingestion/source/kafka.py index 83bc9889de..61f75fd3a4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka.py @@ -200,7 +200,9 @@ class KafkaSource(StatefulIngestionSourceBase): return cls(config, ctx) def get_workunits(self) -> Iterable[MetadataWorkUnit]: - topics = self.consumer.list_topics().topics + topics = self.consumer.list_topics( + timeout=self.source_config.connection.client_timeout_seconds + ).topics extra_topic_details = self.fetch_extra_topic_details(topics.keys()) for t, t_detail in topics.items():