Imri Paran 5133c31d31
MINOR: kafka integration tests (#17457)
* tests: kafka integration

kafka integration tests with schema registry

* added ignore kafka for python 3.8

* fixed tests
2024-08-21 16:05:09 +05:30

141 lines
4.5 KiB
Python

import os.path
from textwrap import dedent
import pytest
import testcontainers.core.network
from docker.types import EndpointConfig
from testcontainers.core.container import DockerContainer
from testcontainers.kafka import KafkaContainer
from _openmetadata_testutils.kafka import load_csv_data
from _openmetadata_testutils.kafka.schema_registry_container import (
SchemaRegistryContainer,
)
from metadata.generated.schema.api.services.createMessagingService import (
CreateMessagingServiceRequest,
)
from metadata.generated.schema.entity.services.connections.messaging.kafkaConnection import (
KafkaConnection,
)
from metadata.generated.schema.entity.services.messagingService import (
MessagingConnection,
MessagingServiceType,
)
from metadata.generated.schema.metadataIngestion.messagingServiceMetadataPipeline import (
MessagingMetadataConfigType,
)
def _connect_to_network(
ctr: DockerContainer, network: testcontainers.core.network, alias: str
):
# Needed until https://github.com/testcontainers/testcontainers-python/issues/645 is fixed
ctr.with_kwargs(
network=network.name,
networking_config={network.name: EndpointConfig("1.33", aliases=[alias])},
)
class CustomKafkaContainer(KafkaContainer):
def __init__(self):
super().__init__()
self.security_protocol_map += ",EXTERNAL:PLAINTEXT"
self.with_env(
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", self.security_protocol_map
)
self.listeners = f"PLAINTEXT://0.0.0.0:29092,BROKER://0.0.0.0:9092,EXTERNAL://0.0.0.0:{self.port}"
self.with_env("KAFKA_LISTENERS", self.listeners)
def tc_start(self):
listeners = ",".join(
[
f"EXTERNAL://{self.get_bootstrap_server()}",
f"PLAINTEXT://{self._get_network_alias()}:29092",
"BROKER://$(hostname -i | cut -d' ' -f1):9092",
]
)
data = (
dedent(
f"""
#!/bin/bash
{self.boot_command}
export KAFKA_ADVERTISED_LISTENERS={listeners}
. /etc/confluent/docker/bash-config
/etc/confluent/docker/configure
/etc/confluent/docker/launch
"""
)
.strip()
.encode("utf-8")
)
self.create_file(data, KafkaContainer.TC_START_SCRIPT)
@pytest.fixture(scope="module")
def docker_network():
with testcontainers.core.network.Network() as network:
yield network
@pytest.fixture(scope="module")
def schema_registry_container(docker_network, kafka_container):
with SchemaRegistryContainer(
schema_registry_kafkastore_bootstrap_servers="PLAINTEXT://kafka:9092",
schema_registry_host_name="schema-registry",
).with_network(docker_network).with_network_aliases("schema-registry") as container:
load_csv_data.main(
kafka_broker=kafka_container.get_bootstrap_server(),
schema_registry_url=container.get_connection_url(),
csv_directory=os.path.dirname(__file__) + "/data",
)
yield container
@pytest.fixture(scope="module")
def kafka_container(docker_network):
container = CustomKafkaContainer()
_connect_to_network(container, docker_network, "kafka")
with container:
yield container
@pytest.fixture(scope="module")
def create_service_request(
kafka_container, schema_registry_container, tmp_path_factory
):
return CreateMessagingServiceRequest(
name="docker_test_" + tmp_path_factory.mktemp("kafka").name,
serviceType=MessagingServiceType.Kafka,
connection=MessagingConnection(
config=KafkaConnection(
bootstrapServers=kafka_container.get_bootstrap_server(),
schemaRegistryURL=schema_registry_container.get_connection_url(),
)
),
)
@pytest.fixture(scope="module")
def ingestion_config(db_service, metadata, workflow_config, sink_config):
return {
"source": {
"type": db_service.connection.config.type.value.lower(),
"serviceName": db_service.fullyQualifiedName.root,
"sourceConfig": {
"config": {"type": MessagingMetadataConfigType.MessagingMetadata.value}
},
"serviceConnection": db_service.connection.model_dump(),
},
"sink": sink_config,
"workflowConfig": workflow_config,
}
@pytest.fixture(scope="module")
def unmask_password():
def patch_password(service):
return service
return patch_password