mirror of
https://github.com/datahub-project/datahub.git
synced 2025-11-26 01:36:38 +00:00
253 lines
8.4 KiB
Python
253 lines
8.4 KiB
Python
import logging
|
|
import os
|
|
import tempfile
|
|
from random import randint
|
|
from typing import List
|
|
|
|
import pytest
|
|
import tenacity
|
|
|
|
from datahub.emitter.mce_builder import datahub_guid, make_dataset_urn
|
|
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
|
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope
|
|
from datahub.ingestion.api.sink import NoopWriteCallback
|
|
from datahub.ingestion.graph.client import DataHubGraph
|
|
from datahub.ingestion.sink.file import FileSink, FileSinkConfig
|
|
from datahub.metadata.schema_classes import (
|
|
DataProductPropertiesClass,
|
|
DatasetPropertiesClass,
|
|
DomainPropertiesClass,
|
|
DomainsClass,
|
|
)
|
|
from datahub.utilities.urns.urn import Urn
|
|
from tests.utils import (
|
|
delete_urns_from_file,
|
|
get_sleep_info,
|
|
ingest_file_via_rest,
|
|
wait_for_writes_to_sync,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
start_index = randint(10, 10000)
|
|
dataset_urns = [
|
|
make_dataset_urn("snowflake", f"table_foo_{i}")
|
|
for i in range(start_index, start_index + 10)
|
|
]
|
|
|
|
|
|
class FileEmitter:
|
|
def __init__(self, filename: str) -> None:
|
|
self.sink: FileSink = FileSink(
|
|
ctx=PipelineContext(run_id="create_test_data"),
|
|
config=FileSinkConfig(filename=filename),
|
|
)
|
|
|
|
def emit(self, event):
|
|
self.sink.write_record_async(
|
|
record_envelope=RecordEnvelope(record=event, metadata={}),
|
|
write_callback=NoopWriteCallback(),
|
|
)
|
|
|
|
def close(self):
|
|
self.sink.close()
|
|
|
|
|
|
def create_test_data(filename: str):
|
|
domain_urn = Urn("domain", [datahub_guid({"name": "Marketing"})])
|
|
|
|
domain_mcp = MetadataChangeProposalWrapper(
|
|
entityUrn=str(domain_urn),
|
|
aspect=DomainPropertiesClass(
|
|
name="Marketing", description="The marketing domain"
|
|
),
|
|
)
|
|
|
|
dataset_mcps = [
|
|
MetadataChangeProposalWrapper(
|
|
entityUrn=dataset_urn,
|
|
aspect=DatasetPropertiesClass(
|
|
name=f"Test Dataset ({dataset_urn})", description=dataset_urn
|
|
),
|
|
)
|
|
for dataset_urn in dataset_urns
|
|
]
|
|
|
|
file_emitter = FileEmitter(filename)
|
|
for mcps in [domain_mcp] + dataset_mcps:
|
|
file_emitter.emit(mcps)
|
|
|
|
file_emitter.close()
|
|
|
|
|
|
sleep_sec, sleep_times = get_sleep_info()
|
|
|
|
|
|
@pytest.fixture(scope="module", autouse=False)
|
|
def ingest_cleanup_data(auth_session, graph_client, request):
|
|
new_file, filename = tempfile.mkstemp(suffix=".json")
|
|
try:
|
|
create_test_data(filename)
|
|
print("ingesting data products test data")
|
|
ingest_file_via_rest(auth_session, filename)
|
|
yield
|
|
print("removing data products test data")
|
|
delete_urns_from_file(graph_client, filename)
|
|
wait_for_writes_to_sync()
|
|
finally:
|
|
os.remove(filename)
|
|
|
|
|
|
def get_gql_query(filename: str) -> str:
|
|
with open(filename) as fp:
|
|
return fp.read()
|
|
|
|
|
|
def validate_listing(
|
|
graph_client: DataHubGraph, data_product_urn: str, dataset_urns: List[str]
|
|
) -> None:
|
|
# Validate listing
|
|
result = graph_client.execute_graphql(
|
|
get_gql_query("tests/dataproduct/queries/list_dataproduct_assets.graphql"),
|
|
{"urn": data_product_urn, "input": {"query": "*", "start": 0, "count": 20}},
|
|
)
|
|
assert "listDataProductAssets" in result
|
|
assert "searchResults" in result["listDataProductAssets"]
|
|
search_results = [
|
|
res["entity"]["urn"] for res in result["listDataProductAssets"]["searchResults"]
|
|
]
|
|
assert set(search_results) == set(dataset_urns)
|
|
|
|
|
|
def validate_relationships(
|
|
graph_client: DataHubGraph, data_product_urn: str, dataset_urns: List[str]
|
|
) -> None:
|
|
# Validate relationships
|
|
urn_match = {k: False for k in dataset_urns}
|
|
for dataset_urn in dataset_urns:
|
|
for e in graph_client.get_related_entities(
|
|
dataset_urn,
|
|
relationship_types=["DataProductContains"],
|
|
direction=DataHubGraph.RelationshipDirection.INCOMING,
|
|
):
|
|
if e.urn == data_product_urn:
|
|
urn_match[dataset_urn] = True
|
|
|
|
urns_missing = [k for k in urn_match if urn_match[k] is False]
|
|
assert urns_missing == [], (
|
|
"All dataset urns should have a DataProductContains relationship to the data product"
|
|
)
|
|
|
|
dataset_urns_matched = set()
|
|
for e in graph_client.get_related_entities(
|
|
data_product_urn,
|
|
relationship_types=["DataProductContains"],
|
|
direction=DataHubGraph.RelationshipDirection.OUTGOING,
|
|
):
|
|
dataset_urns_matched.add(e.urn)
|
|
|
|
assert set(dataset_urns) == dataset_urns_matched, (
|
|
"All dataset urns should be navigable from the data product"
|
|
)
|
|
|
|
|
|
@tenacity.retry(
|
|
stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec)
|
|
)
|
|
def test_create_data_product(graph_client, ingest_cleanup_data):
|
|
domain_urn = Urn("domain", [datahub_guid({"name": "Marketing"})])
|
|
|
|
result = graph_client.execute_graphql(
|
|
get_gql_query("tests/dataproduct/queries/add_dataproduct.graphql"),
|
|
{
|
|
"domainUrn": str(domain_urn),
|
|
"name": "Test Data Product",
|
|
"description": "Test Description",
|
|
},
|
|
)
|
|
assert "createDataProduct" in result
|
|
data_product_urn = result["createDataProduct"]["urn"]
|
|
# Data Product Properties
|
|
data_product_props = graph_client.get_aspect(
|
|
data_product_urn, DataProductPropertiesClass
|
|
)
|
|
assert data_product_props is not None
|
|
assert data_product_props.description == "Test Description"
|
|
assert data_product_props.name == "Test Data Product"
|
|
# Domain assignment
|
|
domains = graph_client.get_aspect(data_product_urn, DomainsClass)
|
|
assert domains and domains.domains[0] == str(domain_urn)
|
|
|
|
# Add assets
|
|
result = graph_client.execute_graphql(
|
|
get_gql_query("tests/dataproduct/queries/setassets_dataproduct.graphql"),
|
|
{"dataProductUrn": data_product_urn, "resourceUrns": dataset_urns},
|
|
)
|
|
assert "batchSetDataProduct" in result
|
|
assert result["batchSetDataProduct"] is True
|
|
data_product_props = graph_client.get_aspect(
|
|
data_product_urn, DataProductPropertiesClass
|
|
)
|
|
assert data_product_props is not None
|
|
assert data_product_props.assets is not None
|
|
assert data_product_props.description == "Test Description"
|
|
assert data_product_props.name == "Test Data Product"
|
|
assert len(data_product_props.assets) == len(dataset_urns)
|
|
assert set([asset.destinationUrn for asset in data_product_props.assets]) == set(
|
|
dataset_urns
|
|
)
|
|
|
|
wait_for_writes_to_sync()
|
|
|
|
validate_listing(graph_client, data_product_urn, dataset_urns)
|
|
validate_relationships(graph_client, data_product_urn, dataset_urns)
|
|
|
|
# Update name and description
|
|
result = graph_client.execute_graphql(
|
|
get_gql_query("tests/dataproduct/queries/update_dataproduct.graphql"),
|
|
{
|
|
"urn": data_product_urn,
|
|
"name": "New Test Data Product",
|
|
"description": "New Description",
|
|
},
|
|
)
|
|
wait_for_writes_to_sync()
|
|
|
|
# Data Product Properties
|
|
data_product_props = graph_client.get_aspect(
|
|
data_product_urn, DataProductPropertiesClass
|
|
)
|
|
assert data_product_props is not None
|
|
assert data_product_props.description == "New Description"
|
|
assert data_product_props.name == "New Test Data Product"
|
|
assert data_product_props.assets is not None
|
|
assert len(data_product_props.assets) == len(dataset_urns)
|
|
|
|
validate_listing(graph_client, data_product_urn, dataset_urns)
|
|
validate_relationships(graph_client, data_product_urn, dataset_urns)
|
|
|
|
# delete dataproduct
|
|
result = graph_client.execute_graphql(
|
|
get_gql_query("tests/dataproduct/queries/delete_dataproduct.graphql"),
|
|
{"urn": data_product_urn},
|
|
)
|
|
wait_for_writes_to_sync()
|
|
assert graph_client.exists(data_product_urn) is False
|
|
|
|
# Validate relationships are removed
|
|
urn_match = {k: False for k in dataset_urns}
|
|
for dataset_urn in dataset_urns:
|
|
for e in graph_client.get_related_entities(
|
|
dataset_urn,
|
|
relationship_types=["DataProductContains"],
|
|
direction=DataHubGraph.RelationshipDirection.INCOMING,
|
|
):
|
|
if e.urn == data_product_urn:
|
|
urn_match[dataset_urn] = True
|
|
|
|
urns_missing = [k for k in urn_match if urn_match[k] is False]
|
|
assert set(urns_missing) == set(dataset_urns), (
|
|
f"All dataset urns should no longer have a DataProductContains relationship to the data product {data_product_urn}"
|
|
)
|