diff --git a/ingestion/src/metadata/examples/workflows/kafka.yaml b/ingestion/src/metadata/examples/workflows/kafka.yaml index 6830058374b..2c1d6c04521 100644 --- a/ingestion/src/metadata/examples/workflows/kafka.yaml +++ b/ingestion/src/metadata/examples/workflows/kafka.yaml @@ -14,6 +14,19 @@ source: # example of scema registry config, for more info about accepted values visit: # https://docs.confluent.io/5.5.1/clients/confluent-kafka-python/index.html#confluent_kafka.schema_registry.SchemaRegistryClient #basic.auth.user.info: username:password + # schemaRegistrySSL: + # caCertificate: | + # -----BEGIN CERTIFICATE----- + # sample caCertificateData + # -----END CERTIFICATE----- + # sslCertificate: | + # -----BEGIN CERTIFICATE----- + # sample sslCertificateData + # -----END CERTIFICATE----- + # sslKey: | + # -----BEGIN RSA PRIVATE KEY + # sample sslKeyData + # -----END RSA PRIVATE KEY sourceConfig: config: type: MessagingMetadata diff --git a/ingestion/src/metadata/examples/workflows/kafka_ssl_config.yaml b/ingestion/src/metadata/examples/workflows/kafka_ssl_config.yaml new file mode 100644 index 00000000000..481ac910d02 --- /dev/null +++ b/ingestion/src/metadata/examples/workflows/kafka_ssl_config.yaml @@ -0,0 +1,46 @@ +source: + type: kafka + serviceName: local_kafka + serviceConnection: + config: + type: Kafka + bootstrapServers: localhost:9092 + schemaRegistryURL: http://localhost:8081 + #consumerConfig: + # example of consume config, for more info about accepted values visit: + # https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md + #ssl.truststore.password: password + #schemaRegistryConfig: + # example of scema registry config, for more info about accepted values visit: + # https://docs.confluent.io/5.5.1/clients/confluent-kafka-python/index.html#confluent_kafka.schema_registry.SchemaRegistryClient + #basic.auth.user.info: username:password + schemaRegistrySSL: + caCertificate: | + -----BEGIN CERTIFICATE----- + sample caCertificateData + -----END CERTIFICATE----- + sslCertificate: | + -----BEGIN CERTIFICATE----- + sample sslCertificateData + -----END CERTIFICATE----- + sslKey: | + -----BEGIN RSA PRIVATE KEY + sample sslKeyData + -----END RSA PRIVATE KEY + sourceConfig: + config: + type: MessagingMetadata + topicFilterPattern: + excludes: + - _confluent.* + generateSampleData: true +sink: + type: metadata-rest + config: {} +workflowConfig: +# loggerLevel: INFO # DEBUG, INFO, WARN or ERROR + openMetadataServerConfig: + hostPort: http://localhost:8585/api + authProvider: openmetadata + securityConfig: + jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" diff --git a/ingestion/src/metadata/ingestion/source/messaging/kafka/metadata.py b/ingestion/src/metadata/ingestion/source/messaging/kafka/metadata.py index 5b52595bb5e..4d03894c558 100644 --- a/ingestion/src/metadata/ingestion/source/messaging/kafka/metadata.py +++ b/ingestion/src/metadata/ingestion/source/messaging/kafka/metadata.py @@ -11,7 +11,7 @@ """ Kafka source ingestion """ -from typing import Optional +from typing import Optional, cast from metadata.generated.schema.entity.services.connections.messaging.kafkaConnection import ( KafkaConnection, @@ -28,7 +28,9 @@ from metadata.utils.ssl_manager import SSLManager class KafkaSource(CommonBrokerSource): def __init__(self, config: WorkflowSource, metadata: OpenMetadata): self.ssl_manager = None - service_connection = config.serviceConnection.__root__.config + service_connection = cast( + KafkaConnection, config.serviceConnection.__root__.config + ) if service_connection.schemaRegistrySSL: self.ssl_manager = SSLManager( @@ -37,7 +39,7 @@ class KafkaSource(CommonBrokerSource): cert=service_connection.schemaRegistrySSL.__root__.sslCertificate, ) service_connection = self.ssl_manager.setup_ssl( - config.serviceConnection.__root__.config.sslConfig + config.serviceConnection.__root__.config ) super().__init__(config, metadata) diff --git a/ingestion/tests/unit/test_ssl_manager.py b/ingestion/tests/unit/test_ssl_manager.py index 643e94a939b..d7965afd78c 100644 --- a/ingestion/tests/unit/test_ssl_manager.py +++ b/ingestion/tests/unit/test_ssl_manager.py @@ -1,15 +1,28 @@ """ Manage SSL test cases """ + import os -import unittest +from unittest import TestCase +from unittest.mock import patch from pydantic import SecretStr +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) +from metadata.generated.schema.security.client.openMetadataJWTClientConfig import ( + OpenMetadataJWTClientConfig, +) +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.source.messaging.kafka.metadata import KafkaSource from metadata.utils.ssl_manager import SSLManager -class SSLManagerTest(unittest.TestCase): +class SSLManagerTest(TestCase): """ Tests to verify the functionality of SSLManager """ @@ -44,3 +57,70 @@ class SSLManagerTest(unittest.TestCase): temp_file = self.ssl_manager.create_temp_file(SecretStr("Test content")) self.ssl_manager.cleanup_temp_files() self.assertFalse(os.path.exists(temp_file)) + + +class KafkaSourceSSLTest(TestCase): + @patch( + "metadata.ingestion.source.messaging.messaging_service.MessagingServiceSource.test_connection" + ) + @patch("metadata.ingestion.source.messaging.kafka.metadata.SSLManager") + def test_init(self, mock_ssl_manager, test_connection): + test_connection.return_value = True + config = WorkflowSource( + **{ + "type": "kafka", + "serviceName": "local_kafka", + "serviceConnection": { + "config": { + "type": "Kafka", + "bootstrapServers": "localhost:9092", + } + }, + "sourceConfig": {"config": {"type": "MessagingMetadata"}}, + } + ) + metadata = OpenMetadata( + OpenMetadataConnection( + hostPort="http://localhost:8585/api", + authProvider="openmetadata", + securityConfig=OpenMetadataJWTClientConfig(jwtToken="token"), + ) + ) + kafka_source = KafkaSource(config, metadata) + + self.assertIsNone(kafka_source.ssl_manager) + mock_ssl_manager.assert_not_called() + + config_with_ssl = WorkflowSource( + **{ + "type": "kafka", + "serviceName": "local_kafka", + "serviceConnection": { + "config": { + "type": "Kafka", + "bootstrapServers": "localhost:9092", + "schemaRegistrySSL": { + "caCertificate": "caCertificateData", + "sslKey": "sslKeyData", + "sslCertificate": "sslCertificateData", + }, + }, + }, + "sourceConfig": {"config": {"type": "MessagingMetadata"}}, + } + ) + kafka_source_with_ssl = KafkaSource(config_with_ssl, metadata) + + self.assertIsNotNone(kafka_source_with_ssl.ssl_manager) + self.assertEqual( + kafka_source_with_ssl.service_connection.schemaRegistrySSL.__root__.caCertificate.get_secret_value(), + "caCertificateData", + ) + self.assertEqual( + kafka_source_with_ssl.service_connection.schemaRegistrySSL.__root__.sslKey.get_secret_value(), + "sslKeyData", + ) + self.assertEqual( + kafka_source_with_ssl.service_connection.schemaRegistrySSL.__root__.sslCertificate.get_secret_value(), + "sslCertificateData", + )