MINOR: Fix kafka connection storage (#16313)

This commit is contained in:
Mayur Singal 2024-05-17 11:59:08 +05:30 committed by GitHub
parent 02d0a3e06e
commit 7b52f5a6b0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -12,6 +12,7 @@
""" """
Source connection handler Source connection handler
""" """
from copy import deepcopy
from dataclasses import dataclass from dataclasses import dataclass
from typing import Optional, Union from typing import Optional, Union
@ -64,45 +65,39 @@ def get_connection(
""" """
Create connection Create connection
""" """
consumer_config = deepcopy(connection.consumerConfig) or {}
schema_registry_config = deepcopy(connection.schemaRegistryConfig) or {}
if connection.saslUsername or connection.saslPassword or connection.saslMechanism: if connection.saslUsername or connection.saslPassword or connection.saslMechanism:
connection.consumerConfig = connection.consumerConfig or {}
if connection.saslUsername: if connection.saslUsername:
connection.consumerConfig["sasl.username"] = connection.saslUsername consumer_config["sasl.username"] = connection.saslUsername
if connection.saslPassword: if connection.saslPassword:
connection.consumerConfig[ consumer_config[
"sasl.password" "sasl.password"
] = connection.saslPassword.get_secret_value() ] = connection.saslPassword.get_secret_value()
if connection.saslMechanism: if connection.saslMechanism:
connection.consumerConfig["sasl.mechanism"] = connection.saslMechanism.value consumer_config["sasl.mechanism"] = connection.saslMechanism.value
if ( if (
connection.consumerConfig.get("security.protocol") is None connection.consumerConfig.get("security.protocol") is None
and connection.securityProtocol and connection.securityProtocol
): ):
connection.consumerConfig[ consumer_config["security.protocol"] = connection.securityProtocol.value
"security.protocol"
] = connection.securityProtocol.value
if connection.basicAuthUserInfo: if connection.basicAuthUserInfo:
connection.schemaRegistryConfig = connection.schemaRegistryConfig or {} schema_registry_config[
connection.schemaRegistryConfig[
"basic.auth.user.info" "basic.auth.user.info"
] = connection.basicAuthUserInfo.get_secret_value() ] = connection.basicAuthUserInfo.get_secret_value()
admin_client_config = connection.consumerConfig admin_client_config = consumer_config
admin_client_config["bootstrap.servers"] = connection.bootstrapServers admin_client_config["bootstrap.servers"] = connection.bootstrapServers
admin_client = AdminClient(admin_client_config) admin_client = AdminClient(admin_client_config)
schema_registry_client = None schema_registry_client = None
consumer_client = None consumer_client = None
if connection.schemaRegistryURL: if connection.schemaRegistryURL:
connection.schemaRegistryConfig["url"] = connection.schemaRegistryURL schema_registry_config["url"] = connection.schemaRegistryURL
schema_registry_client = SchemaRegistryClient(connection.schemaRegistryConfig) schema_registry_client = SchemaRegistryClient(schema_registry_config)
connection.schemaRegistryConfig["url"] = str(connection.schemaRegistryURL) consumer_config["bootstrap.servers"] = connection.bootstrapServers
consumer_config = {
**connection.consumerConfig,
"bootstrap.servers": connection.bootstrapServers,
}
if "group.id" not in consumer_config: if "group.id" not in consumer_config:
consumer_config["group.id"] = "openmetadata-consumer" consumer_config["group.id"] = "openmetadata-consumer"
if "auto.offset.reset" not in consumer_config: if "auto.offset.reset" not in consumer_config: