diff --git a/ingestion/src/metadata/ingestion/source/kafka.py b/ingestion/src/metadata/ingestion/source/kafka.py index 94af5b48342..d2619b4d36b 100644 --- a/ingestion/src/metadata/ingestion/source/kafka.py +++ b/ingestion/src/metadata/ingestion/source/kafka.py @@ -49,6 +49,7 @@ from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.common import logger from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.ometa.utils import exit_after from metadata.utils.connection_clients import KafkaClient from metadata.utils.connections import get_connection, test_connection from metadata.utils.filters import filter_by_topic @@ -140,16 +141,26 @@ class KafkaSource(Source[CreateTopicRequest]): iter(topic_configResource.values()) ): config_response = j.result(timeout=10) - topic.maximumMessageSize = config_response.get( - "max.message.bytes" - ).value - topic.minimumInSyncReplicas = config_response.get( - "min.insync.replicas" - ).value - topic.retentionTime = config_response.get("retention.ms").value - topic.cleanupPolicies = [ - config_response.get("cleanup.policy").value - ] + if "max.message.bytes" in config_response: + topic.maximumMessageSize = config_response.get( + "max.message.bytes", {} + ).value + + if "min.insync.replicas" in config_response: + topic.minimumInSyncReplicas = config_response.get( + "min.insync.replicas" + ).value + + if "retention.ms" in config_response: + topic.retentionTime = config_response.get( + "retention.ms" + ).value + + if "cleanup.policy" in config_response: + topic.cleanupPolicies = [ + config_response.get("cleanup.policy").value + ] + topic_config = {} for key, conf_response in config_response.items(): topic_config[key] = conf_response.value @@ -224,7 +235,6 @@ class KafkaSource(Source[CreateTopicRequest]): for partition in partitions: last_offset = a_consumer.get_watermark_offsets(partition) offset = last_offset[1] - print(offset) if offset > 0: partition.offset = offset - 10 if offset > 10 else offset new_partitions.append(partition) @@ -234,7 +244,7 @@ class KafkaSource(Source[CreateTopicRequest]): return self.status def close(self): - if self.consumer_client: + if self.generate_sample_data and self.consumer_client: self.consumer_client.close() def test_connection(self) -> None: