Fix #4817: Processing Redpanda topics configs runs into error (#4818)

This commit is contained in:
Sriharsha Chintalapani 2022-05-09 22:50:19 -07:00 committed by GitHub
parent 5a9abb6463
commit 10dd966019
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -49,6 +49,7 @@ from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import logger from metadata.ingestion.api.common import logger
from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.ometa.utils import exit_after
from metadata.utils.connection_clients import KafkaClient from metadata.utils.connection_clients import KafkaClient
from metadata.utils.connections import get_connection, test_connection from metadata.utils.connections import get_connection, test_connection
from metadata.utils.filters import filter_by_topic from metadata.utils.filters import filter_by_topic
@ -140,16 +141,26 @@ class KafkaSource(Source[CreateTopicRequest]):
iter(topic_configResource.values()) iter(topic_configResource.values())
): ):
config_response = j.result(timeout=10) config_response = j.result(timeout=10)
if "max.message.bytes" in config_response:
topic.maximumMessageSize = config_response.get( topic.maximumMessageSize = config_response.get(
"max.message.bytes" "max.message.bytes", {}
).value ).value
if "min.insync.replicas" in config_response:
topic.minimumInSyncReplicas = config_response.get( topic.minimumInSyncReplicas = config_response.get(
"min.insync.replicas" "min.insync.replicas"
).value ).value
topic.retentionTime = config_response.get("retention.ms").value
if "retention.ms" in config_response:
topic.retentionTime = config_response.get(
"retention.ms"
).value
if "cleanup.policy" in config_response:
topic.cleanupPolicies = [ topic.cleanupPolicies = [
config_response.get("cleanup.policy").value config_response.get("cleanup.policy").value
] ]
topic_config = {} topic_config = {}
for key, conf_response in config_response.items(): for key, conf_response in config_response.items():
topic_config[key] = conf_response.value topic_config[key] = conf_response.value
@ -224,7 +235,6 @@ class KafkaSource(Source[CreateTopicRequest]):
for partition in partitions: for partition in partitions:
last_offset = a_consumer.get_watermark_offsets(partition) last_offset = a_consumer.get_watermark_offsets(partition)
offset = last_offset[1] offset = last_offset[1]
print(offset)
if offset > 0: if offset > 0:
partition.offset = offset - 10 if offset > 10 else offset partition.offset = offset - 10 if offset > 10 else offset
new_partitions.append(partition) new_partitions.append(partition)
@ -234,7 +244,7 @@ class KafkaSource(Source[CreateTopicRequest]):
return self.status return self.status
def close(self): def close(self):
if self.consumer_client: if self.generate_sample_data and self.consumer_client:
self.consumer_client.close() self.consumer_client.close()
def test_connection(self) -> None: def test_connection(self) -> None: