MINOR: Kafka Setup SSL Arg Fix (#16469)

This commit is contained in:
Ayush Shah 2024-05-30 16:03:03 +05:30 committed by GitHub
parent 4ba05c2401
commit a98f6b8dd0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 146 additions and 5 deletions

View File

@ -14,6 +14,19 @@ source:
# example of scema registry config, for more info about accepted values visit:
# https://docs.confluent.io/5.5.1/clients/confluent-kafka-python/index.html#confluent_kafka.schema_registry.SchemaRegistryClient
#basic.auth.user.info: username:password
# schemaRegistrySSL:
# caCertificate: |
# -----BEGIN CERTIFICATE-----
# sample caCertificateData
# -----END CERTIFICATE-----
# sslCertificate: |
# -----BEGIN CERTIFICATE-----
# sample sslCertificateData
# -----END CERTIFICATE-----
# sslKey: |
# -----BEGIN RSA PRIVATE KEY
# sample sslKeyData
# -----END RSA PRIVATE KEY
sourceConfig:
config:
type: MessagingMetadata

View File

@ -0,0 +1,46 @@
source:
type: kafka
serviceName: local_kafka
serviceConnection:
config:
type: Kafka
bootstrapServers: localhost:9092
schemaRegistryURL: http://localhost:8081
#consumerConfig:
# example of consume config, for more info about accepted values visit:
# https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md
#ssl.truststore.password: password
#schemaRegistryConfig:
# example of scema registry config, for more info about accepted values visit:
# https://docs.confluent.io/5.5.1/clients/confluent-kafka-python/index.html#confluent_kafka.schema_registry.SchemaRegistryClient
#basic.auth.user.info: username:password
schemaRegistrySSL:
caCertificate: |
-----BEGIN CERTIFICATE-----
sample caCertificateData
-----END CERTIFICATE-----
sslCertificate: |
-----BEGIN CERTIFICATE-----
sample sslCertificateData
-----END CERTIFICATE-----
sslKey: |
-----BEGIN RSA PRIVATE KEY
sample sslKeyData
-----END RSA PRIVATE KEY
sourceConfig:
config:
type: MessagingMetadata
topicFilterPattern:
excludes:
- _confluent.*
generateSampleData: true
sink:
type: metadata-rest
config: {}
workflowConfig:
# loggerLevel: INFO # DEBUG, INFO, WARN or ERROR
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: openmetadata
securityConfig:
jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"

View File

@ -11,7 +11,7 @@
"""
Kafka source ingestion
"""
from typing import Optional
from typing import Optional, cast
from metadata.generated.schema.entity.services.connections.messaging.kafkaConnection import (
KafkaConnection,
@ -28,7 +28,9 @@ from metadata.utils.ssl_manager import SSLManager
class KafkaSource(CommonBrokerSource):
def __init__(self, config: WorkflowSource, metadata: OpenMetadata):
self.ssl_manager = None
service_connection = config.serviceConnection.__root__.config
service_connection = cast(
KafkaConnection, config.serviceConnection.__root__.config
)
if service_connection.schemaRegistrySSL:
self.ssl_manager = SSLManager(
@ -37,7 +39,7 @@ class KafkaSource(CommonBrokerSource):
cert=service_connection.schemaRegistrySSL.__root__.sslCertificate,
)
service_connection = self.ssl_manager.setup_ssl(
config.serviceConnection.__root__.config.sslConfig
config.serviceConnection.__root__.config
)
super().__init__(config, metadata)

View File

@ -1,15 +1,28 @@
"""
Manage SSL test cases
"""
import os
import unittest
from unittest import TestCase
from unittest.mock import patch
from pydantic import SecretStr
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
OpenMetadataJWTClientConfig,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.messaging.kafka.metadata import KafkaSource
from metadata.utils.ssl_manager import SSLManager
class SSLManagerTest(unittest.TestCase):
class SSLManagerTest(TestCase):
"""
Tests to verify the functionality of SSLManager
"""
@ -44,3 +57,70 @@ class SSLManagerTest(unittest.TestCase):
temp_file = self.ssl_manager.create_temp_file(SecretStr("Test content"))
self.ssl_manager.cleanup_temp_files()
self.assertFalse(os.path.exists(temp_file))
class KafkaSourceSSLTest(TestCase):
@patch(
"metadata.ingestion.source.messaging.messaging_service.MessagingServiceSource.test_connection"
)
@patch("metadata.ingestion.source.messaging.kafka.metadata.SSLManager")
def test_init(self, mock_ssl_manager, test_connection):
test_connection.return_value = True
config = WorkflowSource(
**{
"type": "kafka",
"serviceName": "local_kafka",
"serviceConnection": {
"config": {
"type": "Kafka",
"bootstrapServers": "localhost:9092",
}
},
"sourceConfig": {"config": {"type": "MessagingMetadata"}},
}
)
metadata = OpenMetadata(
OpenMetadataConnection(
hostPort="http://localhost:8585/api",
authProvider="openmetadata",
securityConfig=OpenMetadataJWTClientConfig(jwtToken="token"),
)
)
kafka_source = KafkaSource(config, metadata)
self.assertIsNone(kafka_source.ssl_manager)
mock_ssl_manager.assert_not_called()
config_with_ssl = WorkflowSource(
**{
"type": "kafka",
"serviceName": "local_kafka",
"serviceConnection": {
"config": {
"type": "Kafka",
"bootstrapServers": "localhost:9092",
"schemaRegistrySSL": {
"caCertificate": "caCertificateData",
"sslKey": "sslKeyData",
"sslCertificate": "sslCertificateData",
},
},
},
"sourceConfig": {"config": {"type": "MessagingMetadata"}},
}
)
kafka_source_with_ssl = KafkaSource(config_with_ssl, metadata)
self.assertIsNotNone(kafka_source_with_ssl.ssl_manager)
self.assertEqual(
kafka_source_with_ssl.service_connection.schemaRegistrySSL.__root__.caCertificate.get_secret_value(),
"caCertificateData",
)
self.assertEqual(
kafka_source_with_ssl.service_connection.schemaRegistrySSL.__root__.sslKey.get_secret_value(),
"sslKeyData",
)
self.assertEqual(
kafka_source_with_ssl.service_connection.schemaRegistrySSL.__root__.sslCertificate.get_secret_value(),
"sslCertificateData",
)