datahub/smoke-test/tests/search/test_lineage_search_index_fields.py

240 lines
8.4 KiB
Python

import json
import logging
import os
import tempfile
from typing import Any, Dict, List, Optional
import pytest
import tenacity
from tests.utils import delete_urns_from_file, ingest_file_via_rest
logger = logging.getLogger(__name__)
# Test constants
UPSTREAM_DATASET_URN = "urn:li:dataset:(urn:li:dataPlatform:hive,upstream_table,PROD)"
DOWNSTREAM_DATASET_URN = (
"urn:li:dataset:(urn:li:dataPlatform:hive,downstream_table,PROD)"
)
DATASET_WITHOUT_LINEAGE_URN = (
"urn:li:dataset:(urn:li:dataPlatform:hive,no_lineage_table,PROD)"
)
def create_upstream_dataset_mcp_data() -> Dict[str, Any]:
"""Create the MCP data for the upstream dataset."""
return {
"entityType": "dataset",
"entityUrn": UPSTREAM_DATASET_URN,
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"json": {
"name": "upstream_table",
"description": "Upstream dataset for lineage testing",
"customProperties": {"platform": "hive", "env": "PROD"},
}
},
}
def create_downstream_dataset_with_lineage_mcp_data() -> Dict[str, Any]:
"""Create the MCP data for the downstream dataset with upstream lineage."""
return {
"entityType": "dataset",
"entityUrn": DOWNSTREAM_DATASET_URN,
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"json": {
"name": "downstream_table",
"description": "Downstream dataset with lineage",
"customProperties": {"platform": "hive", "env": "PROD"},
}
},
}
def create_upstream_lineage_mcp_data() -> Dict[str, Any]:
"""Create the MCP data for upstream lineage aspect."""
return {
"entityType": "dataset",
"entityUrn": DOWNSTREAM_DATASET_URN,
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"json": {
"upstreams": [
{
"dataset": UPSTREAM_DATASET_URN,
"type": "TRANSFORMED",
"auditStamp": {
"time": 1640995200000,
"actor": "urn:li:corpuser:datahub",
},
}
]
}
},
}
def create_dataset_without_lineage_mcp_data() -> Dict[str, Any]:
"""Create the MCP data for a dataset without lineage."""
return {
"entityType": "dataset",
"entityUrn": DATASET_WITHOUT_LINEAGE_URN,
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"json": {
"name": "no_lineage_table",
"description": "Dataset without lineage",
"customProperties": {"platform": "hive", "env": "PROD"},
}
},
}
@tenacity.retry(
stop=tenacity.stop_after_attempt(5),
wait=tenacity.wait_exponential(multiplier=1, min=2, max=10),
retry=tenacity.retry_if_exception_type(AssertionError),
)
def verify_search_index_fields_via_openapi(
auth_session,
dataset_urn: str,
expected_has_upstreams: bool,
expected_has_fine_grained_upstreams: bool,
expected_fine_grained_upstreams: Optional[List[str]] = None,
) -> Dict[str, Any]:
"""Verify search index fields using the OpenAPI endpoint."""
logger.info(
f"Checking search index fields for {dataset_urn} via OpenAPI endpoint..."
)
try:
# Use the OpenAPI endpoint to get raw Elasticsearch document
openapi_url = (
f"{auth_session.gms_url()}/openapi/operations/elasticSearch/entity/raw"
)
response = auth_session.post(
openapi_url,
json=[dataset_urn],
headers={"Content-Type": "application/json"},
)
response.raise_for_status()
raw_documents = response.json()
logger.info(f"Raw documents response: {raw_documents}")
if dataset_urn not in raw_documents:
raise AssertionError(f"Dataset {dataset_urn} not found in raw documents")
document = raw_documents[dataset_urn]
logger.info(f"Raw document for {dataset_urn}: {document}")
# Check hasUpstreams field
has_upstreams = document.get("hasUpstreams", False)
logger.info(f"hasUpstreams field: {has_upstreams}")
assert has_upstreams == expected_has_upstreams, (
f"Expected hasUpstreams={expected_has_upstreams}, got {has_upstreams}"
)
# Check hasFineGrainedUpstreams field
has_fine_grained_upstreams = document.get("hasFineGrainedUpstreams", False)
logger.info(f"hasFineGrainedUpstreams field: {has_fine_grained_upstreams}")
assert has_fine_grained_upstreams == expected_has_fine_grained_upstreams, (
f"Expected hasFineGrainedUpstreams={expected_has_fine_grained_upstreams}, got {has_fine_grained_upstreams}"
)
# Check fineGrainedUpstreams field if expected
if expected_fine_grained_upstreams is not None:
fine_grained_upstreams = document.get("fineGrainedUpstreams", [])
logger.info(f"fineGrainedUpstreams field: {fine_grained_upstreams}")
assert set(fine_grained_upstreams) == set(
expected_fine_grained_upstreams
), (
f"Expected fineGrainedUpstreams={expected_fine_grained_upstreams}, got {fine_grained_upstreams}"
)
logger.info(f"Search index field verification successful for {dataset_urn}")
return {
"urn": dataset_urn,
"hasUpstreams": has_upstreams,
"hasFineGrainedUpstreams": has_fine_grained_upstreams,
"fineGrainedUpstreams": document.get("fineGrainedUpstreams", []),
}
except Exception as e:
logger.error(f"Could not verify search index fields via OpenAPI: {e}")
raise
@pytest.fixture(scope="module", autouse=True)
def ingest_cleanup_data(auth_session, graph_client, request):
"""Fixture to ingest test data and clean up after tests."""
# Create temporary file for MCP data
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
mcp_data = [
create_upstream_dataset_mcp_data(),
create_downstream_dataset_with_lineage_mcp_data(),
create_upstream_lineage_mcp_data(),
create_dataset_without_lineage_mcp_data(),
]
json.dump(mcp_data, f, indent=2)
temp_file_path = f.name
try:
logger.info("Ingesting lineage test data")
ingest_file_via_rest(auth_session, temp_file_path)
yield
logger.info("Removing lineage test data")
delete_urns_from_file(graph_client, temp_file_path)
finally:
# Clean up temporary file
if os.path.exists(temp_file_path):
os.unlink(temp_file_path)
def test_lineage_search_index_fields_with_lineage(auth_session):
"""
Test that verifies search index fields are correctly populated for a dataset with lineage.
"""
# Verify that the downstream dataset has the correct search index fields
verify_search_index_fields_via_openapi(
auth_session,
dataset_urn=DOWNSTREAM_DATASET_URN,
expected_has_upstreams=True,
expected_has_fine_grained_upstreams=False,
expected_fine_grained_upstreams=[], # No fine-grained lineage in this test
)
def test_lineage_search_index_fields_without_lineage(auth_session):
"""
Test that verifies search index fields are correctly populated for a dataset without lineage.
"""
# Verify that the dataset without lineage has the correct search index fields
verify_search_index_fields_via_openapi(
auth_session,
dataset_urn=DATASET_WITHOUT_LINEAGE_URN,
expected_has_upstreams=False,
expected_has_fine_grained_upstreams=False,
expected_fine_grained_upstreams=[],
)
def test_upstream_dataset_search_index_fields(auth_session):
"""
Test that verifies search index fields for the upstream dataset (should not have upstreams).
"""
# Verify that the upstream dataset has the correct search index fields
verify_search_index_fields_via_openapi(
auth_session,
dataset_urn=UPSTREAM_DATASET_URN,
expected_has_upstreams=False,
expected_has_fine_grained_upstreams=False,
expected_fine_grained_upstreams=[],
)