Sergio Gómez Villamor 492e28a938
feat(ingest/neo4j): migrate Neo4j source to DataHub Python SDK v2 (#14591)
Co-authored-by: Cursor Agent <cursoragent@cursor.com>
Co-authored-by: Claude <noreply@anthropic.com>
2025-09-15 08:42:54 +00:00

85 lines
2.4 KiB
Python

import pathlib
import pytest
from datahub.ingestion.run.pipeline import Pipeline
from datahub.testing import mce_helpers
from tests.test_helpers.docker_helpers import wait_for_port
_resources_dir = pathlib.Path(__file__).parent
pytestmark = pytest.mark.integration_batch_2
@pytest.fixture(scope="module")
def neo4j_runner(docker_compose_runner):
with docker_compose_runner(
_resources_dir / "docker-compose.yml", "neo4j"
) as docker_services:
wait_for_port(docker_services, "neo4j", 7687)
# Wait for test data to be loaded by checking if we can query some test nodes
docker_services.wait_until_responsive(
timeout=120,
pause=2,
check=lambda: check_neo4j_setup_completed(),
)
yield docker_services
def check_neo4j_setup_completed() -> bool:
"""Check if test data has been loaded into Neo4j"""
try:
from neo4j import GraphDatabase
driver = GraphDatabase.driver(
"neo4j://localhost:7687", auth=("neo4j", "testpassword")
)
with driver.session() as session:
# Check if test data is loaded
result = session.run("MATCH (n) RETURN count(n) as count")
record = result.single()
if record is None:
driver.close()
return False
count = record["count"]
driver.close()
return count > 0 # Data is loaded if there are any nodes
except Exception:
return False
@pytest.mark.integration
def test_neo4j_ingest(neo4j_runner, pytestconfig, tmp_path):
pipeline = Pipeline.create(
{
"run_id": "neo4j-test",
"source": {
"type": "neo4j",
"config": {
"uri": "neo4j://localhost:7687",
"username": "neo4j",
"password": "testpassword",
"env": "TEST",
"platform_instance": "test_instance",
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/neo4j_mcps.json",
},
},
}
)
pipeline.run()
pipeline.raise_from_status()
mce_helpers.check_golden_file(
pytestconfig,
output_path=f"{tmp_path}/neo4j_mcps.json",
golden_path=_resources_dir / "neo4j_mcps_golden.json",
)