diff --git a/ingestion/src/metadata/ingestion/source/messaging/kafka/connection.py b/ingestion/src/metadata/ingestion/source/messaging/kafka/connection.py index 7310f82b424..1f1d76098a3 100644 --- a/ingestion/src/metadata/ingestion/source/messaging/kafka/connection.py +++ b/ingestion/src/metadata/ingestion/source/messaging/kafka/connection.py @@ -12,6 +12,7 @@ """ Source connection handler """ +from copy import deepcopy from dataclasses import dataclass from typing import Optional, Union @@ -64,45 +65,39 @@ def get_connection( """ Create connection """ + consumer_config = deepcopy(connection.consumerConfig) or {} + schema_registry_config = deepcopy(connection.schemaRegistryConfig) or {} if connection.saslUsername or connection.saslPassword or connection.saslMechanism: - connection.consumerConfig = connection.consumerConfig or {} if connection.saslUsername: - connection.consumerConfig["sasl.username"] = connection.saslUsername + consumer_config["sasl.username"] = connection.saslUsername if connection.saslPassword: - connection.consumerConfig[ + consumer_config[ "sasl.password" ] = connection.saslPassword.get_secret_value() if connection.saslMechanism: - connection.consumerConfig["sasl.mechanism"] = connection.saslMechanism.value + consumer_config["sasl.mechanism"] = connection.saslMechanism.value if ( connection.consumerConfig.get("security.protocol") is None and connection.securityProtocol ): - connection.consumerConfig[ - "security.protocol" - ] = connection.securityProtocol.value + consumer_config["security.protocol"] = connection.securityProtocol.value if connection.basicAuthUserInfo: - connection.schemaRegistryConfig = connection.schemaRegistryConfig or {} - connection.schemaRegistryConfig[ + schema_registry_config[ "basic.auth.user.info" ] = connection.basicAuthUserInfo.get_secret_value() - admin_client_config = connection.consumerConfig + admin_client_config = consumer_config admin_client_config["bootstrap.servers"] = connection.bootstrapServers admin_client = AdminClient(admin_client_config) schema_registry_client = None consumer_client = None if connection.schemaRegistryURL: - connection.schemaRegistryConfig["url"] = connection.schemaRegistryURL - schema_registry_client = SchemaRegistryClient(connection.schemaRegistryConfig) - connection.schemaRegistryConfig["url"] = str(connection.schemaRegistryURL) - consumer_config = { - **connection.consumerConfig, - "bootstrap.servers": connection.bootstrapServers, - } + schema_registry_config["url"] = connection.schemaRegistryURL + schema_registry_client = SchemaRegistryClient(schema_registry_config) + consumer_config["bootstrap.servers"] = connection.bootstrapServers if "group.id" not in consumer_config: consumer_config["group.id"] = "openmetadata-consumer" if "auto.offset.reset" not in consumer_config: