mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-03 23:28:11 +00:00
68 lines
2.3 KiB
Python
68 lines
2.3 KiB
Python
import logging
|
|
import pathlib
|
|
import shutil
|
|
import time
|
|
|
|
import pytest
|
|
|
|
from datahub.ingestion.run.pipeline import Pipeline
|
|
from datahub.testing import mce_helpers
|
|
from tests.test_helpers.docker_helpers import wait_for_port
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_resources_dir = pathlib.Path(__file__).parent
|
|
|
|
|
|
@pytest.mark.integration
|
|
def test_cassandra_ingest(docker_compose_runner, pytestconfig, tmp_path, monkeypatch):
|
|
# Tricky: The cassandra container makes modifications directly to the cassandra.yaml
|
|
# config file.
|
|
# See https://github.com/docker-library/cassandra/issues/165
|
|
# To avoid spurious diffs, we copy the config file to a temporary location
|
|
# and depend on that instead. The docker-compose file has the corresponding
|
|
# env variable usage to pick up the config file.
|
|
cassandra_config_file = _resources_dir / "setup/cassandra.yaml"
|
|
shutil.copy(cassandra_config_file, tmp_path / "cassandra.yaml")
|
|
monkeypatch.setenv("CASSANDRA_CONFIG_DIR", str(tmp_path))
|
|
|
|
with docker_compose_runner(
|
|
_resources_dir / "docker-compose.yml", "cassandra"
|
|
) as docker_services:
|
|
wait_for_port(docker_services, "test-cassandra", 9042)
|
|
|
|
time.sleep(5)
|
|
|
|
# Run the metadata ingestion pipeline.
|
|
logger.info("Starting the ingestion test...")
|
|
pipeline = Pipeline.create(
|
|
{
|
|
"run_id": "cassandra-test",
|
|
"source": {
|
|
"type": "cassandra",
|
|
"config": {
|
|
"platform_instance": "dev_instance",
|
|
"contact_point": "localhost",
|
|
"port": 9042,
|
|
"profiling": {"enabled": True},
|
|
},
|
|
},
|
|
"sink": {
|
|
"type": "file",
|
|
"config": {
|
|
"filename": f"{tmp_path}/cassandra_mcps.json",
|
|
},
|
|
},
|
|
}
|
|
)
|
|
pipeline.run()
|
|
pipeline.raise_from_status()
|
|
|
|
# Verify the output.
|
|
logger.info("Verifying output.")
|
|
mce_helpers.check_golden_file(
|
|
pytestconfig,
|
|
output_path=f"{tmp_path}/cassandra_mcps.json",
|
|
golden_path=_resources_dir / "cassandra_mcps_golden.json",
|
|
)
|