mirror of
https://github.com/datahub-project/datahub.git
synced 2025-11-23 08:14:43 +00:00
Co-authored-by: Cursor Agent <cursoragent@cursor.com> Co-authored-by: Claude <noreply@anthropic.com>
85 lines
2.4 KiB
Python
85 lines
2.4 KiB
Python
import pathlib
|
|
|
|
import pytest
|
|
|
|
from datahub.ingestion.run.pipeline import Pipeline
|
|
from datahub.testing import mce_helpers
|
|
from tests.test_helpers.docker_helpers import wait_for_port
|
|
|
|
_resources_dir = pathlib.Path(__file__).parent
|
|
|
|
pytestmark = pytest.mark.integration_batch_2
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def neo4j_runner(docker_compose_runner):
|
|
with docker_compose_runner(
|
|
_resources_dir / "docker-compose.yml", "neo4j"
|
|
) as docker_services:
|
|
wait_for_port(docker_services, "neo4j", 7687)
|
|
|
|
# Wait for test data to be loaded by checking if we can query some test nodes
|
|
docker_services.wait_until_responsive(
|
|
timeout=120,
|
|
pause=2,
|
|
check=lambda: check_neo4j_setup_completed(),
|
|
)
|
|
|
|
yield docker_services
|
|
|
|
|
|
def check_neo4j_setup_completed() -> bool:
|
|
"""Check if test data has been loaded into Neo4j"""
|
|
try:
|
|
from neo4j import GraphDatabase
|
|
|
|
driver = GraphDatabase.driver(
|
|
"neo4j://localhost:7687", auth=("neo4j", "testpassword")
|
|
)
|
|
with driver.session() as session:
|
|
# Check if test data is loaded
|
|
result = session.run("MATCH (n) RETURN count(n) as count")
|
|
record = result.single()
|
|
if record is None:
|
|
driver.close()
|
|
return False
|
|
count = record["count"]
|
|
|
|
driver.close()
|
|
return count > 0 # Data is loaded if there are any nodes
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
@pytest.mark.integration
|
|
def test_neo4j_ingest(neo4j_runner, pytestconfig, tmp_path):
|
|
pipeline = Pipeline.create(
|
|
{
|
|
"run_id": "neo4j-test",
|
|
"source": {
|
|
"type": "neo4j",
|
|
"config": {
|
|
"uri": "neo4j://localhost:7687",
|
|
"username": "neo4j",
|
|
"password": "testpassword",
|
|
"env": "TEST",
|
|
"platform_instance": "test_instance",
|
|
},
|
|
},
|
|
"sink": {
|
|
"type": "file",
|
|
"config": {
|
|
"filename": f"{tmp_path}/neo4j_mcps.json",
|
|
},
|
|
},
|
|
}
|
|
)
|
|
pipeline.run()
|
|
pipeline.raise_from_status()
|
|
|
|
mce_helpers.check_golden_file(
|
|
pytestconfig,
|
|
output_path=f"{tmp_path}/neo4j_mcps.json",
|
|
golden_path=_resources_dir / "neo4j_mcps_golden.json",
|
|
)
|