mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-07-07 17:18:23 +00:00

* tests: kafka integration kafka integration tests with schema registry * added ignore kafka for python 3.8 * fixed tests
141 lines
4.5 KiB
Python
141 lines
4.5 KiB
Python
import os.path
|
|
from textwrap import dedent
|
|
|
|
import pytest
|
|
import testcontainers.core.network
|
|
from docker.types import EndpointConfig
|
|
from testcontainers.core.container import DockerContainer
|
|
from testcontainers.kafka import KafkaContainer
|
|
|
|
from _openmetadata_testutils.kafka import load_csv_data
|
|
from _openmetadata_testutils.kafka.schema_registry_container import (
|
|
SchemaRegistryContainer,
|
|
)
|
|
from metadata.generated.schema.api.services.createMessagingService import (
|
|
CreateMessagingServiceRequest,
|
|
)
|
|
from metadata.generated.schema.entity.services.connections.messaging.kafkaConnection import (
|
|
KafkaConnection,
|
|
)
|
|
from metadata.generated.schema.entity.services.messagingService import (
|
|
MessagingConnection,
|
|
MessagingServiceType,
|
|
)
|
|
from metadata.generated.schema.metadataIngestion.messagingServiceMetadataPipeline import (
|
|
MessagingMetadataConfigType,
|
|
)
|
|
|
|
|
|
def _connect_to_network(
|
|
ctr: DockerContainer, network: testcontainers.core.network, alias: str
|
|
):
|
|
# Needed until https://github.com/testcontainers/testcontainers-python/issues/645 is fixed
|
|
ctr.with_kwargs(
|
|
network=network.name,
|
|
networking_config={network.name: EndpointConfig("1.33", aliases=[alias])},
|
|
)
|
|
|
|
|
|
class CustomKafkaContainer(KafkaContainer):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.security_protocol_map += ",EXTERNAL:PLAINTEXT"
|
|
self.with_env(
|
|
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", self.security_protocol_map
|
|
)
|
|
|
|
self.listeners = f"PLAINTEXT://0.0.0.0:29092,BROKER://0.0.0.0:9092,EXTERNAL://0.0.0.0:{self.port}"
|
|
self.with_env("KAFKA_LISTENERS", self.listeners)
|
|
|
|
def tc_start(self):
|
|
listeners = ",".join(
|
|
[
|
|
f"EXTERNAL://{self.get_bootstrap_server()}",
|
|
f"PLAINTEXT://{self._get_network_alias()}:29092",
|
|
"BROKER://$(hostname -i | cut -d' ' -f1):9092",
|
|
]
|
|
)
|
|
data = (
|
|
dedent(
|
|
f"""
|
|
#!/bin/bash
|
|
{self.boot_command}
|
|
export KAFKA_ADVERTISED_LISTENERS={listeners}
|
|
. /etc/confluent/docker/bash-config
|
|
/etc/confluent/docker/configure
|
|
/etc/confluent/docker/launch
|
|
"""
|
|
)
|
|
.strip()
|
|
.encode("utf-8")
|
|
)
|
|
self.create_file(data, KafkaContainer.TC_START_SCRIPT)
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def docker_network():
|
|
with testcontainers.core.network.Network() as network:
|
|
yield network
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def schema_registry_container(docker_network, kafka_container):
|
|
with SchemaRegistryContainer(
|
|
schema_registry_kafkastore_bootstrap_servers="PLAINTEXT://kafka:9092",
|
|
schema_registry_host_name="schema-registry",
|
|
).with_network(docker_network).with_network_aliases("schema-registry") as container:
|
|
load_csv_data.main(
|
|
kafka_broker=kafka_container.get_bootstrap_server(),
|
|
schema_registry_url=container.get_connection_url(),
|
|
csv_directory=os.path.dirname(__file__) + "/data",
|
|
)
|
|
yield container
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def kafka_container(docker_network):
|
|
container = CustomKafkaContainer()
|
|
_connect_to_network(container, docker_network, "kafka")
|
|
with container:
|
|
yield container
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def create_service_request(
|
|
kafka_container, schema_registry_container, tmp_path_factory
|
|
):
|
|
return CreateMessagingServiceRequest(
|
|
name="docker_test_" + tmp_path_factory.mktemp("kafka").name,
|
|
serviceType=MessagingServiceType.Kafka,
|
|
connection=MessagingConnection(
|
|
config=KafkaConnection(
|
|
bootstrapServers=kafka_container.get_bootstrap_server(),
|
|
schemaRegistryURL=schema_registry_container.get_connection_url(),
|
|
)
|
|
),
|
|
)
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def ingestion_config(db_service, metadata, workflow_config, sink_config):
|
|
return {
|
|
"source": {
|
|
"type": db_service.connection.config.type.value.lower(),
|
|
"serviceName": db_service.fullyQualifiedName.root,
|
|
"sourceConfig": {
|
|
"config": {"type": MessagingMetadataConfigType.MessagingMetadata.value}
|
|
},
|
|
"serviceConnection": db_service.connection.model_dump(),
|
|
},
|
|
"sink": sink_config,
|
|
"workflowConfig": workflow_config,
|
|
}
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def unmask_password():
|
|
def patch_password(service):
|
|
return service
|
|
|
|
return patch_password
|