mirror of
https://github.com/datahub-project/datahub.git
synced 2025-08-12 11:17:01 +00:00
240 lines
8.4 KiB
Python
240 lines
8.4 KiB
Python
![]() |
import json
|
||
|
import logging
|
||
|
import os
|
||
|
import tempfile
|
||
|
from typing import Any, Dict, List, Optional
|
||
|
|
||
|
import pytest
|
||
|
import tenacity
|
||
|
|
||
|
from tests.utils import delete_urns_from_file, ingest_file_via_rest
|
||
|
|
||
|
logger = logging.getLogger(__name__)
|
||
|
|
||
|
# Test constants
|
||
|
UPSTREAM_DATASET_URN = "urn:li:dataset:(urn:li:dataPlatform:hive,upstream_table,PROD)"
|
||
|
DOWNSTREAM_DATASET_URN = (
|
||
|
"urn:li:dataset:(urn:li:dataPlatform:hive,downstream_table,PROD)"
|
||
|
)
|
||
|
DATASET_WITHOUT_LINEAGE_URN = (
|
||
|
"urn:li:dataset:(urn:li:dataPlatform:hive,no_lineage_table,PROD)"
|
||
|
)
|
||
|
|
||
|
|
||
|
def create_upstream_dataset_mcp_data() -> Dict[str, Any]:
|
||
|
"""Create the MCP data for the upstream dataset."""
|
||
|
return {
|
||
|
"entityType": "dataset",
|
||
|
"entityUrn": UPSTREAM_DATASET_URN,
|
||
|
"changeType": "UPSERT",
|
||
|
"aspectName": "datasetProperties",
|
||
|
"aspect": {
|
||
|
"json": {
|
||
|
"name": "upstream_table",
|
||
|
"description": "Upstream dataset for lineage testing",
|
||
|
"customProperties": {"platform": "hive", "env": "PROD"},
|
||
|
}
|
||
|
},
|
||
|
}
|
||
|
|
||
|
|
||
|
def create_downstream_dataset_with_lineage_mcp_data() -> Dict[str, Any]:
|
||
|
"""Create the MCP data for the downstream dataset with upstream lineage."""
|
||
|
return {
|
||
|
"entityType": "dataset",
|
||
|
"entityUrn": DOWNSTREAM_DATASET_URN,
|
||
|
"changeType": "UPSERT",
|
||
|
"aspectName": "datasetProperties",
|
||
|
"aspect": {
|
||
|
"json": {
|
||
|
"name": "downstream_table",
|
||
|
"description": "Downstream dataset with lineage",
|
||
|
"customProperties": {"platform": "hive", "env": "PROD"},
|
||
|
}
|
||
|
},
|
||
|
}
|
||
|
|
||
|
|
||
|
def create_upstream_lineage_mcp_data() -> Dict[str, Any]:
|
||
|
"""Create the MCP data for upstream lineage aspect."""
|
||
|
return {
|
||
|
"entityType": "dataset",
|
||
|
"entityUrn": DOWNSTREAM_DATASET_URN,
|
||
|
"changeType": "UPSERT",
|
||
|
"aspectName": "upstreamLineage",
|
||
|
"aspect": {
|
||
|
"json": {
|
||
|
"upstreams": [
|
||
|
{
|
||
|
"dataset": UPSTREAM_DATASET_URN,
|
||
|
"type": "TRANSFORMED",
|
||
|
"auditStamp": {
|
||
|
"time": 1640995200000,
|
||
|
"actor": "urn:li:corpuser:datahub",
|
||
|
},
|
||
|
}
|
||
|
]
|
||
|
}
|
||
|
},
|
||
|
}
|
||
|
|
||
|
|
||
|
def create_dataset_without_lineage_mcp_data() -> Dict[str, Any]:
|
||
|
"""Create the MCP data for a dataset without lineage."""
|
||
|
return {
|
||
|
"entityType": "dataset",
|
||
|
"entityUrn": DATASET_WITHOUT_LINEAGE_URN,
|
||
|
"changeType": "UPSERT",
|
||
|
"aspectName": "datasetProperties",
|
||
|
"aspect": {
|
||
|
"json": {
|
||
|
"name": "no_lineage_table",
|
||
|
"description": "Dataset without lineage",
|
||
|
"customProperties": {"platform": "hive", "env": "PROD"},
|
||
|
}
|
||
|
},
|
||
|
}
|
||
|
|
||
|
|
||
|
@tenacity.retry(
|
||
|
stop=tenacity.stop_after_attempt(5),
|
||
|
wait=tenacity.wait_exponential(multiplier=1, min=2, max=10),
|
||
|
retry=tenacity.retry_if_exception_type(AssertionError),
|
||
|
)
|
||
|
def verify_search_index_fields_via_openapi(
|
||
|
auth_session,
|
||
|
dataset_urn: str,
|
||
|
expected_has_upstreams: bool,
|
||
|
expected_has_fine_grained_upstreams: bool,
|
||
|
expected_fine_grained_upstreams: Optional[List[str]] = None,
|
||
|
) -> Dict[str, Any]:
|
||
|
"""Verify search index fields using the OpenAPI endpoint."""
|
||
|
logger.info(
|
||
|
f"Checking search index fields for {dataset_urn} via OpenAPI endpoint..."
|
||
|
)
|
||
|
|
||
|
try:
|
||
|
# Use the OpenAPI endpoint to get raw Elasticsearch document
|
||
|
openapi_url = (
|
||
|
f"{auth_session.gms_url()}/openapi/operations/elasticSearch/entity/raw"
|
||
|
)
|
||
|
response = auth_session.post(
|
||
|
openapi_url,
|
||
|
json=[dataset_urn],
|
||
|
headers={"Content-Type": "application/json"},
|
||
|
)
|
||
|
response.raise_for_status()
|
||
|
|
||
|
raw_documents = response.json()
|
||
|
logger.info(f"Raw documents response: {raw_documents}")
|
||
|
|
||
|
if dataset_urn not in raw_documents:
|
||
|
raise AssertionError(f"Dataset {dataset_urn} not found in raw documents")
|
||
|
|
||
|
document = raw_documents[dataset_urn]
|
||
|
logger.info(f"Raw document for {dataset_urn}: {document}")
|
||
|
|
||
|
# Check hasUpstreams field
|
||
|
has_upstreams = document.get("hasUpstreams", False)
|
||
|
logger.info(f"hasUpstreams field: {has_upstreams}")
|
||
|
assert has_upstreams == expected_has_upstreams, (
|
||
|
f"Expected hasUpstreams={expected_has_upstreams}, got {has_upstreams}"
|
||
|
)
|
||
|
|
||
|
# Check hasFineGrainedUpstreams field
|
||
|
has_fine_grained_upstreams = document.get("hasFineGrainedUpstreams", False)
|
||
|
logger.info(f"hasFineGrainedUpstreams field: {has_fine_grained_upstreams}")
|
||
|
assert has_fine_grained_upstreams == expected_has_fine_grained_upstreams, (
|
||
|
f"Expected hasFineGrainedUpstreams={expected_has_fine_grained_upstreams}, got {has_fine_grained_upstreams}"
|
||
|
)
|
||
|
|
||
|
# Check fineGrainedUpstreams field if expected
|
||
|
if expected_fine_grained_upstreams is not None:
|
||
|
fine_grained_upstreams = document.get("fineGrainedUpstreams", [])
|
||
|
logger.info(f"fineGrainedUpstreams field: {fine_grained_upstreams}")
|
||
|
assert set(fine_grained_upstreams) == set(
|
||
|
expected_fine_grained_upstreams
|
||
|
), (
|
||
|
f"Expected fineGrainedUpstreams={expected_fine_grained_upstreams}, got {fine_grained_upstreams}"
|
||
|
)
|
||
|
|
||
|
logger.info(f"Search index field verification successful for {dataset_urn}")
|
||
|
return {
|
||
|
"urn": dataset_urn,
|
||
|
"hasUpstreams": has_upstreams,
|
||
|
"hasFineGrainedUpstreams": has_fine_grained_upstreams,
|
||
|
"fineGrainedUpstreams": document.get("fineGrainedUpstreams", []),
|
||
|
}
|
||
|
|
||
|
except Exception as e:
|
||
|
logger.error(f"Could not verify search index fields via OpenAPI: {e}")
|
||
|
raise
|
||
|
|
||
|
|
||
|
@pytest.fixture(scope="module", autouse=True)
|
||
|
def ingest_cleanup_data(auth_session, graph_client, request):
|
||
|
"""Fixture to ingest test data and clean up after tests."""
|
||
|
# Create temporary file for MCP data
|
||
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
||
|
mcp_data = [
|
||
|
create_upstream_dataset_mcp_data(),
|
||
|
create_downstream_dataset_with_lineage_mcp_data(),
|
||
|
create_upstream_lineage_mcp_data(),
|
||
|
create_dataset_without_lineage_mcp_data(),
|
||
|
]
|
||
|
json.dump(mcp_data, f, indent=2)
|
||
|
temp_file_path = f.name
|
||
|
|
||
|
try:
|
||
|
logger.info("Ingesting lineage test data")
|
||
|
ingest_file_via_rest(auth_session, temp_file_path)
|
||
|
yield
|
||
|
logger.info("Removing lineage test data")
|
||
|
delete_urns_from_file(graph_client, temp_file_path)
|
||
|
finally:
|
||
|
# Clean up temporary file
|
||
|
if os.path.exists(temp_file_path):
|
||
|
os.unlink(temp_file_path)
|
||
|
|
||
|
|
||
|
def test_lineage_search_index_fields_with_lineage(auth_session):
|
||
|
"""
|
||
|
Test that verifies search index fields are correctly populated for a dataset with lineage.
|
||
|
"""
|
||
|
# Verify that the downstream dataset has the correct search index fields
|
||
|
verify_search_index_fields_via_openapi(
|
||
|
auth_session,
|
||
|
dataset_urn=DOWNSTREAM_DATASET_URN,
|
||
|
expected_has_upstreams=True,
|
||
|
expected_has_fine_grained_upstreams=False,
|
||
|
expected_fine_grained_upstreams=[], # No fine-grained lineage in this test
|
||
|
)
|
||
|
|
||
|
|
||
|
def test_lineage_search_index_fields_without_lineage(auth_session):
|
||
|
"""
|
||
|
Test that verifies search index fields are correctly populated for a dataset without lineage.
|
||
|
"""
|
||
|
# Verify that the dataset without lineage has the correct search index fields
|
||
|
verify_search_index_fields_via_openapi(
|
||
|
auth_session,
|
||
|
dataset_urn=DATASET_WITHOUT_LINEAGE_URN,
|
||
|
expected_has_upstreams=False,
|
||
|
expected_has_fine_grained_upstreams=False,
|
||
|
expected_fine_grained_upstreams=[],
|
||
|
)
|
||
|
|
||
|
|
||
|
def test_upstream_dataset_search_index_fields(auth_session):
|
||
|
"""
|
||
|
Test that verifies search index fields for the upstream dataset (should not have upstreams).
|
||
|
"""
|
||
|
# Verify that the upstream dataset has the correct search index fields
|
||
|
verify_search_index_fields_via_openapi(
|
||
|
auth_session,
|
||
|
dataset_urn=UPSTREAM_DATASET_URN,
|
||
|
expected_has_upstreams=False,
|
||
|
expected_has_fine_grained_upstreams=False,
|
||
|
expected_fine_grained_upstreams=[],
|
||
|
)
|