102 lines
3.4 KiB
Python
Raw Normal View History

import subprocess
import pytest
from freezegun import freeze_time
from datahub.ingestion.api.source import SourceCapability
from datahub.ingestion.source.kafka import KafkaSource
from tests.test_helpers import mce_helpers, test_connection_helpers
from tests.test_helpers.click_helpers import run_datahub_cmd
from tests.test_helpers.docker_helpers import wait_for_port
FROZEN_TIME = "2020-04-14 07:00:00"
@pytest.fixture(scope="module")
def test_resources_dir(pytestconfig):
return pytestconfig.rootpath / "tests/integration/kafka"
@pytest.fixture(scope="module")
def mock_kafka_service(docker_compose_runner, test_resources_dir):
with docker_compose_runner(
test_resources_dir / "docker-compose.yml", "kafka", cleanup=False
) as docker_services:
wait_for_port(docker_services, "test_zookeeper", 52181, timeout=120)
# Running docker compose twice, since the broker sometimes fails to come up on the first try.
with docker_compose_runner(
test_resources_dir / "docker-compose.yml", "kafka"
) as docker_services:
wait_for_port(docker_services, "test_broker", 29092, timeout=120)
wait_for_port(docker_services, "test_schema_registry", 8081, timeout=120)
# Set up topics and produce some data
command = f"{test_resources_dir}/send_records.sh {test_resources_dir}"
subprocess.run(command, shell=True, check=True)
yield docker_compose_runner
@freeze_time(FROZEN_TIME)
@pytest.mark.integration
def test_kafka_ingest(
mock_kafka_service, test_resources_dir, pytestconfig, tmp_path, mock_time
):
# Run the metadata ingestion pipeline.
config_file = (test_resources_dir / "kafka_to_file.yml").resolve()
run_datahub_cmd(["ingest", "-c", f"{config_file}"], tmp_path=tmp_path)
# Verify the output.
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / "kafka_mces.json",
golden_path=test_resources_dir / "kafka_mces_golden.json",
ignore_paths=[],
)
@pytest.mark.parametrize(
"config_dict, is_success",
[
(
{
"connection": {
"bootstrap": "localhost:29092",
"schema_registry_url": "http://localhost:28081",
},
},
True,
),
(
{
"connection": {
"bootstrap": "localhost:2909",
"schema_registry_url": "http://localhost:2808",
},
},
False,
),
],
)
@pytest.mark.integration
@freeze_time(FROZEN_TIME)
def test_kafka_test_connection(mock_kafka_service, config_dict, is_success):
report = test_connection_helpers.run_test_connection(KafkaSource, config_dict)
if is_success:
test_connection_helpers.assert_basic_connectivity_success(report)
test_connection_helpers.assert_capability_report(
capability_report=report.capability_report,
success_capabilities=[SourceCapability.SCHEMA_METADATA],
)
else:
test_connection_helpers.assert_basic_connectivity_failure(
report, "Failed to get metadata"
)
test_connection_helpers.assert_capability_report(
capability_report=report.capability_report,
failure_capabilities={
SourceCapability.SCHEMA_METADATA: "Failed to establish a new connection"
},
)