Feat: Enhance Kafka SSL configuration support with consumerConfigSSL (#19433)

This commit is contained in:
Ayush Shah 2025-01-27 10:15:48 +05:30 committed by GitHub
parent 31719a5bd7
commit e59ddecf28
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 93 additions and 32 deletions

View File

@ -22,21 +22,19 @@ from metadata.generated.schema.metadataIngestion.workflow import (
from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.messaging.common_broker_source import CommonBrokerSource from metadata.ingestion.source.messaging.common_broker_source import CommonBrokerSource
from metadata.utils.ssl_manager import SSLManager from metadata.utils.ssl_manager import SSLManager, check_ssl_and_init
class KafkaSource(CommonBrokerSource): class KafkaSource(CommonBrokerSource):
def __init__(self, config: WorkflowSource, metadata: OpenMetadata): def __init__(self, config: WorkflowSource, metadata: OpenMetadata):
self.ssl_manager = None self.ssl_manager = None
service_connection = cast(KafkaConnection, config.serviceConnection.root.config) self.service_connection = cast(
if service_connection.schemaRegistrySSL: KafkaConnection, config.serviceConnection.root.config
self.ssl_manager = SSLManager( )
ca=service_connection.schemaRegistrySSL.root.caCertificate, self.ssl_manager: SSLManager = check_ssl_and_init(self.service_connection)
key=service_connection.schemaRegistrySSL.root.sslKey, if self.ssl_manager:
cert=service_connection.schemaRegistrySSL.root.sslCertificate, self.service_connection = self.ssl_manager.setup_ssl(
) self.service_connection
service_connection = self.ssl_manager.setup_ssl(
config.serviceConnection.root.config
) )
super().__init__(config, metadata) super().__init__(config, metadata)

View File

@ -18,7 +18,7 @@ import tempfile
import traceback import traceback
from functools import singledispatch, singledispatchmethod from functools import singledispatch, singledispatchmethod
from ssl import CERT_REQUIRED, SSLContext from ssl import CERT_REQUIRED, SSLContext
from typing import Optional, Union, cast from typing import List, Optional, Union, cast
from pydantic import SecretStr from pydantic import SecretStr
@ -71,7 +71,9 @@ logger = utils_logger()
class SSLManager: class SSLManager:
"SSL Manager to manage SSL certificates for service connections" "SSL Manager to manage SSL certificates for service connections"
def __init__(self, ca=None, key=None, cert=None): def __init__(
self, ca=None, key=None, cert=None, *args, **kwargs
): # pylint: disable=keyword-arg-before-vararg
self.temp_files = [] self.temp_files = []
self.ca_file_path = None self.ca_file_path = None
self.cert_file_path = None self.cert_file_path = None
@ -82,6 +84,14 @@ class SSLManager:
self.cert_file_path = self.create_temp_file(cert) self.cert_file_path = self.create_temp_file(cert)
if key: if key:
self.key_file_path = self.create_temp_file(key) self.key_file_path = self.create_temp_file(key)
if args:
for arg in args:
if arg:
setattr(self, f"{arg}", self.create_temp_file(arg))
if kwargs:
for dict_key, value in kwargs.items():
if value:
setattr(self, f"{dict_key}", self.create_temp_file(value))
def create_temp_file(self, content: SecretStr): def create_temp_file(self, content: SecretStr):
with tempfile.NamedTemporaryFile(delete=False) as temp_file: with tempfile.NamedTemporaryFile(delete=False) as temp_file:
@ -202,8 +212,15 @@ class SSLManager:
return connection return connection
@setup_ssl.register(KafkaConnection) @setup_ssl.register(KafkaConnection)
def _(self, connection): def _(self, connection) -> KafkaConnection:
connection = cast(KafkaConnection, connection) connection = cast(KafkaConnection, connection)
if connection.consumerConfigSSL:
connection.consumerConfig = {
**connection.consumerConfig,
"ssl.ca.location": getattr(self, "ca_consumer_config", None),
"ssl.key.location": getattr(self, "key_consumer_config", None),
"ssl.certificate.location": getattr(self, "cert_consumer_config", None),
}
connection.schemaRegistryConfig["ssl.ca.location"] = self.ca_file_path connection.schemaRegistryConfig["ssl.ca.location"] = self.ca_file_path
connection.schemaRegistryConfig["ssl.key.location"] = self.key_file_path connection.schemaRegistryConfig["ssl.key.location"] = self.key_file_path
connection.schemaRegistryConfig[ connection.schemaRegistryConfig[
@ -232,7 +249,9 @@ class SSLManager:
@singledispatch @singledispatch
def check_ssl_and_init(_) -> Optional[SSLManager]: def check_ssl_and_init(
_, *args, **kwargs # pylint: disable=unused-argument
) -> Optional[Union[SSLManager, List[SSLManager]]]:
return None return None
@ -298,6 +317,38 @@ def _(connection):
return None return None
@check_ssl_and_init.register(KafkaConnection)
def _(connection, *args, **kwargs):
service_connection: KafkaConnection = cast(KafkaConnection, connection)
ssl_consumer_config: Optional[
verifySSLConfig.SslConfig
] = service_connection.consumerConfigSSL
ssl_schema_registry: Optional[
verifySSLConfig.SslConfig
] = service_connection.schemaRegistrySSL
ssl_consumer_config_dict = {}
if ssl_consumer_config:
ssl_consumer_config_dict = {
"ca_consumer_config": ssl_consumer_config.root.caCertificate,
"cert_consumer_config": ssl_consumer_config.root.sslCertificate,
"key_consumer_config": ssl_consumer_config.root.sslKey,
}
ssl_schema_registry_dict = {}
if ssl_schema_registry:
ssl_schema_registry_dict = {
"ca_schema_registry": ssl_schema_registry.root.caCertificate,
"cert_schema_registry": ssl_schema_registry.root.sslCertificate,
"key_schema_registry": ssl_schema_registry.root.sslKey,
}
if ssl_consumer_config_dict or ssl_schema_registry_dict:
return SSLManager(**ssl_consumer_config_dict, **ssl_schema_registry_dict)
return None
@check_ssl_and_init.register(PostgresConnection) @check_ssl_and_init.register(PostgresConnection)
@check_ssl_and_init.register(RedshiftConnection) @check_ssl_and_init.register(RedshiftConnection)
@check_ssl_and_init.register(GreenplumConnection) @check_ssl_and_init.register(GreenplumConnection)

View File

@ -88,6 +88,11 @@
"type": "string", "type": "string",
"default": "-value" "default": "-value"
}, },
"consumerConfigSSL": {
"title": "Consumer Config SSL",
"description": "Consumer Config SSL Config. Configuration for enabling SSL for the Consumer Config connection.",
"$ref": "../../../../security/ssl/verifySSLConfig.json#/definitions/sslConfig"
},
"schemaRegistrySSL": { "schemaRegistrySSL": {
"title": "Schema Registry SSL", "title": "Schema Registry SSL",
"description": "Schema Registry SSL Config. Configuration for enabling SSL for the Schema Registry connection.", "description": "Schema Registry SSL Config. Configuration for enabling SSL for the Schema Registry connection.",

View File

@ -445,6 +445,7 @@ export const ADVANCED_PROPERTIES = [
'sslConfig', 'sslConfig',
'sslMode', 'sslMode',
'schemaRegistrySSL', 'schemaRegistrySSL',
'consumerConfigSSL',
'verify', 'verify',
]; ];

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2024 Collate. * Copyright 2025 Collate.
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
@ -10,9 +10,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
/**
/**
* Kafka Connection Config * Kafka Connection Config
*/ */
export interface KafkaConnection { export interface KafkaConnection {
@ -30,6 +28,11 @@ export interface KafkaConnection {
* https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md * https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
*/ */
consumerConfig?: { [key: string]: any }; consumerConfig?: { [key: string]: any };
/**
* Consumer Config SSL Config. Configuration for enabling SSL for the Consumer Config
* connection.
*/
consumerConfigSSL?: Config;
/** /**
* sasl.mechanism Consumer Config property * sasl.mechanism Consumer Config property
*/ */
@ -73,24 +76,14 @@ export interface KafkaConnection {
} }
/** /**
* sasl.mechanism Consumer Config property * Consumer Config SSL Config. Configuration for enabling SSL for the Consumer Config
*
* SASL Mechanism consumer config property
*/
export enum SaslMechanismType {
Gssapi = "GSSAPI",
Oauthbearer = "OAUTHBEARER",
Plain = "PLAIN",
ScramSHA256 = "SCRAM-SHA-256",
ScramSHA512 = "SCRAM-SHA-512",
}
/**
* Schema Registry SSL Config. Configuration for enabling SSL for the Schema Registry
* connection. * connection.
* *
* Client SSL configuration * Client SSL configuration
* *
* Schema Registry SSL Config. Configuration for enabling SSL for the Schema Registry
* connection.
*
* OpenMetadata Client configured to validate SSL certificates. * OpenMetadata Client configured to validate SSL certificates.
*/ */
export interface Config { export interface Config {
@ -108,6 +101,19 @@ export interface Config {
sslKey?: string; sslKey?: string;
} }
/**
* sasl.mechanism Consumer Config property
*
* SASL Mechanism consumer config property
*/
export enum SaslMechanismType {
Gssapi = "GSSAPI",
Oauthbearer = "OAUTHBEARER",
Plain = "PLAIN",
ScramSHA256 = "SCRAM-SHA-256",
ScramSHA512 = "SCRAM-SHA-512",
}
/** /**
* security.protocol consumer config property * security.protocol consumer config property
*/ */