diff --git a/ingestion/src/metadata/ingestion/source/messaging/kafka/metadata.py b/ingestion/src/metadata/ingestion/source/messaging/kafka/metadata.py index f6f11d070fd..72873ace2dc 100644 --- a/ingestion/src/metadata/ingestion/source/messaging/kafka/metadata.py +++ b/ingestion/src/metadata/ingestion/source/messaging/kafka/metadata.py @@ -22,21 +22,19 @@ from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.messaging.common_broker_source import CommonBrokerSource -from metadata.utils.ssl_manager import SSLManager +from metadata.utils.ssl_manager import SSLManager, check_ssl_and_init class KafkaSource(CommonBrokerSource): def __init__(self, config: WorkflowSource, metadata: OpenMetadata): self.ssl_manager = None - service_connection = cast(KafkaConnection, config.serviceConnection.root.config) - if service_connection.schemaRegistrySSL: - self.ssl_manager = SSLManager( - ca=service_connection.schemaRegistrySSL.root.caCertificate, - key=service_connection.schemaRegistrySSL.root.sslKey, - cert=service_connection.schemaRegistrySSL.root.sslCertificate, - ) - service_connection = self.ssl_manager.setup_ssl( - config.serviceConnection.root.config + self.service_connection = cast( + KafkaConnection, config.serviceConnection.root.config + ) + self.ssl_manager: SSLManager = check_ssl_and_init(self.service_connection) + if self.ssl_manager: + self.service_connection = self.ssl_manager.setup_ssl( + self.service_connection ) super().__init__(config, metadata) diff --git a/ingestion/src/metadata/utils/ssl_manager.py b/ingestion/src/metadata/utils/ssl_manager.py index 8ba6aadd326..ea90ae1847f 100644 --- a/ingestion/src/metadata/utils/ssl_manager.py +++ b/ingestion/src/metadata/utils/ssl_manager.py @@ -18,7 +18,7 @@ import tempfile import traceback from functools import singledispatch, singledispatchmethod from ssl import CERT_REQUIRED, SSLContext -from typing import Optional, Union, cast +from typing import List, Optional, Union, cast from pydantic import SecretStr @@ -71,7 +71,9 @@ logger = utils_logger() class SSLManager: "SSL Manager to manage SSL certificates for service connections" - def __init__(self, ca=None, key=None, cert=None): + def __init__( + self, ca=None, key=None, cert=None, *args, **kwargs + ): # pylint: disable=keyword-arg-before-vararg self.temp_files = [] self.ca_file_path = None self.cert_file_path = None @@ -82,6 +84,14 @@ class SSLManager: self.cert_file_path = self.create_temp_file(cert) if key: self.key_file_path = self.create_temp_file(key) + if args: + for arg in args: + if arg: + setattr(self, f"{arg}", self.create_temp_file(arg)) + if kwargs: + for dict_key, value in kwargs.items(): + if value: + setattr(self, f"{dict_key}", self.create_temp_file(value)) def create_temp_file(self, content: SecretStr): with tempfile.NamedTemporaryFile(delete=False) as temp_file: @@ -202,8 +212,15 @@ class SSLManager: return connection @setup_ssl.register(KafkaConnection) - def _(self, connection): + def _(self, connection) -> KafkaConnection: connection = cast(KafkaConnection, connection) + if connection.consumerConfigSSL: + connection.consumerConfig = { + **connection.consumerConfig, + "ssl.ca.location": getattr(self, "ca_consumer_config", None), + "ssl.key.location": getattr(self, "key_consumer_config", None), + "ssl.certificate.location": getattr(self, "cert_consumer_config", None), + } connection.schemaRegistryConfig["ssl.ca.location"] = self.ca_file_path connection.schemaRegistryConfig["ssl.key.location"] = self.key_file_path connection.schemaRegistryConfig[ @@ -232,7 +249,9 @@ class SSLManager: @singledispatch -def check_ssl_and_init(_) -> Optional[SSLManager]: +def check_ssl_and_init( + _, *args, **kwargs # pylint: disable=unused-argument +) -> Optional[Union[SSLManager, List[SSLManager]]]: return None @@ -298,6 +317,38 @@ def _(connection): return None +@check_ssl_and_init.register(KafkaConnection) +def _(connection, *args, **kwargs): + + service_connection: KafkaConnection = cast(KafkaConnection, connection) + ssl_consumer_config: Optional[ + verifySSLConfig.SslConfig + ] = service_connection.consumerConfigSSL + ssl_schema_registry: Optional[ + verifySSLConfig.SslConfig + ] = service_connection.schemaRegistrySSL + + ssl_consumer_config_dict = {} + + if ssl_consumer_config: + ssl_consumer_config_dict = { + "ca_consumer_config": ssl_consumer_config.root.caCertificate, + "cert_consumer_config": ssl_consumer_config.root.sslCertificate, + "key_consumer_config": ssl_consumer_config.root.sslKey, + } + ssl_schema_registry_dict = {} + + if ssl_schema_registry: + ssl_schema_registry_dict = { + "ca_schema_registry": ssl_schema_registry.root.caCertificate, + "cert_schema_registry": ssl_schema_registry.root.sslCertificate, + "key_schema_registry": ssl_schema_registry.root.sslKey, + } + if ssl_consumer_config_dict or ssl_schema_registry_dict: + return SSLManager(**ssl_consumer_config_dict, **ssl_schema_registry_dict) + return None + + @check_ssl_and_init.register(PostgresConnection) @check_ssl_and_init.register(RedshiftConnection) @check_ssl_and_init.register(GreenplumConnection) diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/messaging/kafkaConnection.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/messaging/kafkaConnection.json index e5590befd00..649b5404c31 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/messaging/kafkaConnection.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/messaging/kafkaConnection.json @@ -88,6 +88,11 @@ "type": "string", "default": "-value" }, + "consumerConfigSSL": { + "title": "Consumer Config SSL", + "description": "Consumer Config SSL Config. Configuration for enabling SSL for the Consumer Config connection.", + "$ref": "../../../../security/ssl/verifySSLConfig.json#/definitions/sslConfig" + }, "schemaRegistrySSL": { "title": "Schema Registry SSL", "description": "Schema Registry SSL Config. Configuration for enabling SSL for the Schema Registry connection.", diff --git a/openmetadata-ui/src/main/resources/ui/src/constants/Services.constant.ts b/openmetadata-ui/src/main/resources/ui/src/constants/Services.constant.ts index 61d0576397e..0fe418f0035 100644 --- a/openmetadata-ui/src/main/resources/ui/src/constants/Services.constant.ts +++ b/openmetadata-ui/src/main/resources/ui/src/constants/Services.constant.ts @@ -445,6 +445,7 @@ export const ADVANCED_PROPERTIES = [ 'sslConfig', 'sslMode', 'schemaRegistrySSL', + 'consumerConfigSSL', 'verify', ]; diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/messaging/kafkaConnection.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/messaging/kafkaConnection.ts index e5290501e77..76178892974 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/messaging/kafkaConnection.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/messaging/kafkaConnection.ts @@ -1,5 +1,5 @@ /* - * Copyright 2024 Collate. + * Copyright 2025 Collate. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -10,9 +10,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - - - /** +/** * Kafka Connection Config */ export interface KafkaConnection { @@ -30,6 +28,11 @@ export interface KafkaConnection { * https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md */ consumerConfig?: { [key: string]: any }; + /** + * Consumer Config SSL Config. Configuration for enabling SSL for the Consumer Config + * connection. + */ + consumerConfigSSL?: Config; /** * sasl.mechanism Consumer Config property */ @@ -73,24 +76,14 @@ export interface KafkaConnection { } /** - * sasl.mechanism Consumer Config property - * - * SASL Mechanism consumer config property - */ -export enum SaslMechanismType { - Gssapi = "GSSAPI", - Oauthbearer = "OAUTHBEARER", - Plain = "PLAIN", - ScramSHA256 = "SCRAM-SHA-256", - ScramSHA512 = "SCRAM-SHA-512", -} - -/** - * Schema Registry SSL Config. Configuration for enabling SSL for the Schema Registry + * Consumer Config SSL Config. Configuration for enabling SSL for the Consumer Config * connection. * * Client SSL configuration * + * Schema Registry SSL Config. Configuration for enabling SSL for the Schema Registry + * connection. + * * OpenMetadata Client configured to validate SSL certificates. */ export interface Config { @@ -108,6 +101,19 @@ export interface Config { sslKey?: string; } +/** + * sasl.mechanism Consumer Config property + * + * SASL Mechanism consumer config property + */ +export enum SaslMechanismType { + Gssapi = "GSSAPI", + Oauthbearer = "OAUTHBEARER", + Plain = "PLAIN", + ScramSHA256 = "SCRAM-SHA-256", + ScramSHA512 = "SCRAM-SHA-512", +} + /** * security.protocol consumer config property */