2025-04-03 12:44:37 +02:00

147 lines
4.2 KiB
Python

import pytest
import requests
from freezegun import freeze_time
from datahub.ingestion.run.pipeline import Pipeline
from tests.test_helpers import mce_helpers
from tests.test_helpers.docker_helpers import wait_for_port
FROZEN_TIME = "2025-03-25 12:00:00"
pytestmark = pytest.mark.integration_batch_2
@pytest.fixture(scope="module")
def test_resources_dir(pytestconfig):
return pytestconfig.rootpath / "tests/integration/hex"
def is_mock_api_up(port: int) -> bool:
"""Check if the mock API server is up and running"""
try:
response = requests.get(f"http://localhost:{port}/health")
response.raise_for_status()
return True
except (requests.RequestException, ConnectionError):
return False
@pytest.fixture(scope="module")
def hex_mock_api_runner(docker_compose_runner, test_resources_dir):
docker_dir = test_resources_dir / "docker"
# Start Docker Compose
with docker_compose_runner(
docker_dir / "docker-compose.yml", "hex-mock"
) as docker_services:
wait_for_port(
docker_services,
"hex-mock-api",
8000,
timeout=30,
checker=lambda: is_mock_api_up(8000),
)
wait_for_port(
docker_services,
"datahub-mock-api",
8010,
timeout=30,
checker=lambda: is_mock_api_up(8010),
)
yield docker_services
@freeze_time(FROZEN_TIME)
@pytest.mark.integration
def test_hex_ingestion(pytestconfig, hex_mock_api_runner, test_resources_dir, tmp_path):
# Path for the golden file
golden_dir = test_resources_dir / "golden"
golden_path = golden_dir / "hex_mce_golden.json"
# Create the pipeline
pipeline = Pipeline.create(
{
"pipeline_name": "test-hex",
"source": {
"type": "hex",
"config": {
"workspace_name": "test-workspace",
"token": "test-token",
"base_url": "http://localhost:8000/api/v1", # Mock Hex API URL
"platform_instance": "hex_test",
"include_lineage": False,
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/hex_mces.json",
},
},
}
)
# Run the pipeline
pipeline.run()
pipeline.raise_from_status()
# Check against golden file
mce_helpers.check_golden_file(
pytestconfig,
output_path=f"{tmp_path}/hex_mces.json",
golden_path=golden_path,
ignore_paths=mce_helpers.IGNORE_PATH_TIMESTAMPS,
)
@freeze_time(FROZEN_TIME)
@pytest.mark.integration
def test_hex_ingestion_with_lineage(
pytestconfig, hex_mock_api_runner, test_resources_dir, tmp_path
):
# Path for the golden file
golden_dir = test_resources_dir / "golden"
golden_path = golden_dir / "hex_mce_golden_with_lineage.json"
# Create the pipeline
pipeline = Pipeline.create(
{
"pipeline_name": "test-hex-with-lineage",
"datahub_api": {
"server": "http://localhost:8010", # Mock DataHub API URL
},
"source": {
"type": "hex",
"config": {
"workspace_name": "some-hex-workspace",
"token": "test-token",
"base_url": "http://localhost:8000/api/v1", # Mock Hex API URL
"platform_instance": "hex_test",
"include_lineage": True,
"datahub_page_size": 1, # Force pagination
"stateful_ingestion": {
"enabled": False,
},
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/hex_mces.json",
},
},
}
)
# Run the pipeline
pipeline.run()
pipeline.raise_from_status()
# Check against golden file
mce_helpers.check_golden_file(
pytestconfig,
output_path=f"{tmp_path}/hex_mces.json",
golden_path=golden_path,
ignore_paths=mce_helpers.IGNORE_PATH_TIMESTAMPS,
)