fix(ingest): kafka ingest task hand up with error bootstrap server (#6820)

This commit is contained in:
wangsaisai 2022-12-22 23:39:30 +08:00 committed by GitHub
parent bdd346db27
commit 0f8e2d945e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 5 additions and 1 deletions

View File

@ -16,6 +16,8 @@ class _KafkaConnectionConfig(ConfigModel):
description="Extra schema registry config serialized as JSON. These options will be passed into Kafka's SchemaRegistryClient. https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html?#schemaregistryclient",
)
client_timeout_seconds: int = Field(default=60, description="The request timeout used when interacting with the Kafka APIs.")
@validator("bootstrap")
def bootstrap_host_colon_port_comma(cls, val: str) -> str:
for entry in val.split(","):

View File

@ -200,7 +200,9 @@ class KafkaSource(StatefulIngestionSourceBase):
return cls(config, ctx)
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
topics = self.consumer.list_topics().topics
topics = self.consumer.list_topics(
timeout=self.source_config.connection.client_timeout_seconds
).topics
extra_topic_details = self.fetch_extra_topic_details(topics.keys())
for t, t_detail in topics.items():