2021-08-11 15:47:18 -07:00
|
|
|
import subprocess
|
|
|
|
|
|
|
|
import pytest
|
|
|
|
from click.testing import CliRunner
|
|
|
|
from freezegun import freeze_time
|
|
|
|
|
|
|
|
from datahub.entrypoints import datahub
|
|
|
|
from tests.test_helpers import fs_helpers, mce_helpers
|
|
|
|
from tests.test_helpers.docker_helpers import wait_for_port
|
|
|
|
|
|
|
|
FROZEN_TIME = "2020-04-14 07:00:00"
|
|
|
|
|
|
|
|
|
|
|
|
@freeze_time(FROZEN_TIME)
|
|
|
|
@pytest.mark.integration
|
|
|
|
def test_kafka_ingest(docker_compose_runner, pytestconfig, tmp_path, mock_time):
|
|
|
|
test_resources_dir = pytestconfig.rootpath / "tests/integration/kafka"
|
|
|
|
|
|
|
|
with docker_compose_runner(
|
|
|
|
test_resources_dir / "docker-compose.yml", "kafka"
|
|
|
|
) as docker_services:
|
|
|
|
|
2021-10-12 18:50:39 -07:00
|
|
|
wait_for_port(docker_services, "test_broker", 59092, timeout=120)
|
|
|
|
wait_for_port(docker_services, "test_schema_registry", 8081, timeout=120)
|
2021-08-11 15:47:18 -07:00
|
|
|
|
|
|
|
# Set up topics and produce some data
|
|
|
|
command = f"{test_resources_dir}/send_records.sh {test_resources_dir}"
|
|
|
|
subprocess.run(command, shell=True, check=True)
|
|
|
|
|
|
|
|
# Run the metadata ingestion pipeline.
|
|
|
|
runner = CliRunner()
|
|
|
|
with fs_helpers.isolated_filesystem(tmp_path):
|
|
|
|
config_file = (test_resources_dir / "kafka_to_file.yml").resolve()
|
|
|
|
result = runner.invoke(datahub, ["ingest", "-c", f"{config_file}"])
|
|
|
|
assert result.exit_code == 0
|
|
|
|
|
|
|
|
# Verify the output.
|
|
|
|
mce_helpers.check_golden_file(
|
|
|
|
pytestconfig,
|
|
|
|
output_path=tmp_path / "kafka_mces.json",
|
|
|
|
golden_path=test_resources_dir / "kafka_mces_golden.json",
|
|
|
|
ignore_paths=[],
|
|
|
|
)
|