From 0f8e2d945e1e920d406b39c66933abd24cd281e9 Mon Sep 17 00:00:00 2001 From: wangsaisai Date: Thu, 22 Dec 2022 23:39:30 +0800 Subject: [PATCH] fix(ingest): kafka ingest task hand up with error bootstrap server (#6820) --- metadata-ingestion/src/datahub/configuration/kafka.py | 2 ++ metadata-ingestion/src/datahub/ingestion/source/kafka.py | 4 +++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/configuration/kafka.py b/metadata-ingestion/src/datahub/configuration/kafka.py index 4ecbd421b9..fa9b3a3c28 100644 --- a/metadata-ingestion/src/datahub/configuration/kafka.py +++ b/metadata-ingestion/src/datahub/configuration/kafka.py @@ -16,6 +16,8 @@ class _KafkaConnectionConfig(ConfigModel): description="Extra schema registry config serialized as JSON. These options will be passed into Kafka's SchemaRegistryClient. https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html?#schemaregistryclient", ) + client_timeout_seconds: int = Field(default=60, description="The request timeout used when interacting with the Kafka APIs.") + @validator("bootstrap") def bootstrap_host_colon_port_comma(cls, val: str) -> str: for entry in val.split(","): diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka.py b/metadata-ingestion/src/datahub/ingestion/source/kafka.py index 83bc9889de..61f75fd3a4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka.py @@ -200,7 +200,9 @@ class KafkaSource(StatefulIngestionSourceBase): return cls(config, ctx) def get_workunits(self) -> Iterable[MetadataWorkUnit]: - topics = self.consumer.list_topics().topics + topics = self.consumer.list_topics( + timeout=self.source_config.connection.client_timeout_seconds + ).topics extra_topic_details = self.fetch_extra_topic_details(topics.keys()) for t, t_detail in topics.items():