datahub/smoke-test/tests/search/test_lineage_search_index_fields.py

240 lines
8.4 KiB
Python
Raw Permalink Normal View History

import json
import logging
import os
import tempfile
from typing import Any, Dict, List, Optional
import pytest
import tenacity
from tests.utils import delete_urns_from_file, ingest_file_via_rest
logger = logging.getLogger(__name__)
# Test constants
UPSTREAM_DATASET_URN = "urn:li:dataset:(urn:li:dataPlatform:hive,upstream_table,PROD)"
DOWNSTREAM_DATASET_URN = (
"urn:li:dataset:(urn:li:dataPlatform:hive,downstream_table,PROD)"
)
DATASET_WITHOUT_LINEAGE_URN = (
"urn:li:dataset:(urn:li:dataPlatform:hive,no_lineage_table,PROD)"
)
def create_upstream_dataset_mcp_data() -> Dict[str, Any]:
"""Create the MCP data for the upstream dataset."""
return {
"entityType": "dataset",
"entityUrn": UPSTREAM_DATASET_URN,
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"json": {
"name": "upstream_table",
"description": "Upstream dataset for lineage testing",
"customProperties": {"platform": "hive", "env": "PROD"},
}
},
}
def create_downstream_dataset_with_lineage_mcp_data() -> Dict[str, Any]:
"""Create the MCP data for the downstream dataset with upstream lineage."""
return {
"entityType": "dataset",
"entityUrn": DOWNSTREAM_DATASET_URN,
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"json": {
"name": "downstream_table",
"description": "Downstream dataset with lineage",
"customProperties": {"platform": "hive", "env": "PROD"},
}
},
}
def create_upstream_lineage_mcp_data() -> Dict[str, Any]:
"""Create the MCP data for upstream lineage aspect."""
return {
"entityType": "dataset",
"entityUrn": DOWNSTREAM_DATASET_URN,
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"json": {
"upstreams": [
{
"dataset": UPSTREAM_DATASET_URN,
"type": "TRANSFORMED",
"auditStamp": {
"time": 1640995200000,
"actor": "urn:li:corpuser:datahub",
},
}
]
}
},
}
def create_dataset_without_lineage_mcp_data() -> Dict[str, Any]:
"""Create the MCP data for a dataset without lineage."""
return {
"entityType": "dataset",
"entityUrn": DATASET_WITHOUT_LINEAGE_URN,
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"json": {
"name": "no_lineage_table",
"description": "Dataset without lineage",
"customProperties": {"platform": "hive", "env": "PROD"},
}
},
}
@tenacity.retry(
stop=tenacity.stop_after_attempt(5),
wait=tenacity.wait_exponential(multiplier=1, min=2, max=10),
retry=tenacity.retry_if_exception_type(AssertionError),
)
def verify_search_index_fields_via_openapi(
auth_session,
dataset_urn: str,
expected_has_upstreams: bool,
expected_has_fine_grained_upstreams: bool,
expected_fine_grained_upstreams: Optional[List[str]] = None,
) -> Dict[str, Any]:
"""Verify search index fields using the OpenAPI endpoint."""
logger.info(
f"Checking search index fields for {dataset_urn} via OpenAPI endpoint..."
)
try:
# Use the OpenAPI endpoint to get raw Elasticsearch document
openapi_url = (
f"{auth_session.gms_url()}/openapi/operations/elasticSearch/entity/raw"
)
response = auth_session.post(
openapi_url,
json=[dataset_urn],
headers={"Content-Type": "application/json"},
)
response.raise_for_status()
raw_documents = response.json()
logger.info(f"Raw documents response: {raw_documents}")
if dataset_urn not in raw_documents:
raise AssertionError(f"Dataset {dataset_urn} not found in raw documents")
document = raw_documents[dataset_urn]
logger.info(f"Raw document for {dataset_urn}: {document}")
# Check hasUpstreams field
has_upstreams = document.get("hasUpstreams", False)
logger.info(f"hasUpstreams field: {has_upstreams}")
assert has_upstreams == expected_has_upstreams, (
f"Expected hasUpstreams={expected_has_upstreams}, got {has_upstreams}"
)
# Check hasFineGrainedUpstreams field
has_fine_grained_upstreams = document.get("hasFineGrainedUpstreams", False)
logger.info(f"hasFineGrainedUpstreams field: {has_fine_grained_upstreams}")
assert has_fine_grained_upstreams == expected_has_fine_grained_upstreams, (
f"Expected hasFineGrainedUpstreams={expected_has_fine_grained_upstreams}, got {has_fine_grained_upstreams}"
)
# Check fineGrainedUpstreams field if expected
if expected_fine_grained_upstreams is not None:
fine_grained_upstreams = document.get("fineGrainedUpstreams", [])
logger.info(f"fineGrainedUpstreams field: {fine_grained_upstreams}")
assert set(fine_grained_upstreams) == set(
expected_fine_grained_upstreams
), (
f"Expected fineGrainedUpstreams={expected_fine_grained_upstreams}, got {fine_grained_upstreams}"
)
logger.info(f"Search index field verification successful for {dataset_urn}")
return {
"urn": dataset_urn,
"hasUpstreams": has_upstreams,
"hasFineGrainedUpstreams": has_fine_grained_upstreams,
"fineGrainedUpstreams": document.get("fineGrainedUpstreams", []),
}
except Exception as e:
logger.error(f"Could not verify search index fields via OpenAPI: {e}")
raise
@pytest.fixture(scope="module", autouse=True)
def ingest_cleanup_data(auth_session, graph_client, request):
"""Fixture to ingest test data and clean up after tests."""
# Create temporary file for MCP data
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
mcp_data = [
create_upstream_dataset_mcp_data(),
create_downstream_dataset_with_lineage_mcp_data(),
create_upstream_lineage_mcp_data(),
create_dataset_without_lineage_mcp_data(),
]
json.dump(mcp_data, f, indent=2)
temp_file_path = f.name
try:
logger.info("Ingesting lineage test data")
ingest_file_via_rest(auth_session, temp_file_path)
yield
logger.info("Removing lineage test data")
delete_urns_from_file(graph_client, temp_file_path)
finally:
# Clean up temporary file
if os.path.exists(temp_file_path):
os.unlink(temp_file_path)
def test_lineage_search_index_fields_with_lineage(auth_session):
"""
Test that verifies search index fields are correctly populated for a dataset with lineage.
"""
# Verify that the downstream dataset has the correct search index fields
verify_search_index_fields_via_openapi(
auth_session,
dataset_urn=DOWNSTREAM_DATASET_URN,
expected_has_upstreams=True,
expected_has_fine_grained_upstreams=False,
expected_fine_grained_upstreams=[], # No fine-grained lineage in this test
)
def test_lineage_search_index_fields_without_lineage(auth_session):
"""
Test that verifies search index fields are correctly populated for a dataset without lineage.
"""
# Verify that the dataset without lineage has the correct search index fields
verify_search_index_fields_via_openapi(
auth_session,
dataset_urn=DATASET_WITHOUT_LINEAGE_URN,
expected_has_upstreams=False,
expected_has_fine_grained_upstreams=False,
expected_fine_grained_upstreams=[],
)
def test_upstream_dataset_search_index_fields(auth_session):
"""
Test that verifies search index fields for the upstream dataset (should not have upstreams).
"""
# Verify that the upstream dataset has the correct search index fields
verify_search_index_fields_via_openapi(
auth_session,
dataset_urn=UPSTREAM_DATASET_URN,
expected_has_upstreams=False,
expected_has_fine_grained_upstreams=False,
expected_fine_grained_upstreams=[],
)