datahub/smoke-test/tests/cli/datahub_graph_test.py

161 lines
5.8 KiB
Python
Raw Permalink Normal View History

from typing import Optional
import pytest
import tenacity
from datahub.ingestion.graph.client import DataHubGraph
from datahub.metadata.schema_classes import (
DatasetPropertiesClass,
KafkaSchemaClass,
OwnershipClass,
SchemaMetadataClass,
SystemMetadataClass,
)
from tests.utils import delete_urns_from_file, get_sleep_info, ingest_file_via_rest
sleep_sec, sleep_times = get_sleep_info()
graph = "test_resources/graph_data.json"
graph_2 = "test_resources/graph_dataDiff.json"
@pytest.fixture(scope="module", autouse=False)
def ingest_cleanup_data(graph_client, auth_session, request):
print("removing graph test data")
delete_urns_from_file(graph_client, "tests/cli/graph_data.json")
print("ingesting graph test data")
ingest_file_via_rest(auth_session, "tests/cli/graph_data.json")
yield
print("removing graph test data")
delete_urns_from_file(graph_client, "tests/cli/graph_data.json")
def test_get_aspect_v2(graph_client, ingest_cleanup_data):
urn = "urn:li:dataset:(urn:li:dataPlatform:kafka,test-rollback,PROD)"
schema_metadata: Optional[SchemaMetadataClass] = graph_client.get_aspect_v2(
urn, aspect="schemaMetadata", aspect_type=SchemaMetadataClass
)
assert schema_metadata is not None
assert schema_metadata.platform == "urn:li:dataPlatform:kafka"
assert isinstance(schema_metadata.platformSchema, KafkaSchemaClass)
k_schema: KafkaSchemaClass = schema_metadata.platformSchema
assert (
k_schema.documentSchema
== '{"type":"record","name":"SampleKafkaSchema","namespace":"com.linkedin.dataset","doc":"Sample Kafka dataset","fields":[{"name":"field_foo","type":["string"]},{"name":"field_bar","type":["boolean"]}]}'
)
def test_get_entities_v3(graph_client, ingest_cleanup_data):
ownership_aspect_name = "ownership"
dataset_properties_aspect_name = "datasetProperties"
urn = "urn:li:dataset:(urn:li:dataPlatform:kafka,test-rollback,PROD)"
entities = graph_client.get_entities(
entity_name="dataset",
urns=[urn],
aspects=[ownership_aspect_name, dataset_properties_aspect_name],
)
assert entities
assert len(entities) == 1 and urn in entities
assert (
len(entities[urn]) == 2
and ownership_aspect_name in entities[urn]
and dataset_properties_aspect_name in entities[urn]
and isinstance(entities[urn][ownership_aspect_name][0], OwnershipClass)
and isinstance(
entities[urn][dataset_properties_aspect_name][0], DatasetPropertiesClass
)
and entities[urn][ownership_aspect_name][1] is None
and entities[urn][dataset_properties_aspect_name][1] is None
)
assert {
owner.owner for owner in entities[urn][ownership_aspect_name][0].owners
} == {
"urn:li:corpuser:datahub",
"urn:li:corpuser:jdoe",
}
assert not entities[urn][dataset_properties_aspect_name][0].description
assert entities[urn][dataset_properties_aspect_name][0].customProperties == {
"prop1": "fakeprop",
"prop2": "pikachu",
}
# Test with system metadata
entities_with_metadata = graph_client.get_entities(
entity_name="dataset",
urns=[urn],
aspects=[ownership_aspect_name],
with_system_metadata=True,
)
assert entities_with_metadata
assert len(entities_with_metadata) == 1 and urn in entities_with_metadata
assert (
ownership_aspect_name in entities_with_metadata[urn]
and entities_with_metadata[urn][ownership_aspect_name][0]
and isinstance(
entities_with_metadata[urn][ownership_aspect_name][0], OwnershipClass
)
and entities_with_metadata[urn][ownership_aspect_name][1]
and isinstance(
entities_with_metadata[urn][ownership_aspect_name][1], SystemMetadataClass
)
)
@tenacity.retry(
stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec)
)
def _ensure_dataset_present_correctly(auth_session, graph_client: DataHubGraph):
urn = "urn:li:dataset:(urn:li:dataPlatform:graph,graph-test,PROD)"
json = {
"query": """query getDataset($urn: String!) {\n
dataset(urn: $urn) {\n
urn\n
name\n
description\n
platform {\n
urn\n
}\n
schemaMetadata {\n
name\n
version\n
createdAt\n
}\n
outgoing: relationships(\n
input: { types: ["SchemaFieldTaggedWith"], direction: OUTGOING, start: 0, count: 10000 }\n
) {\n
start\n
count\n
total\n
relationships {\n
type\n
direction\n
entity {\n
urn\n
type\n
}\n
}\n
}\n
}\n
}""",
"variables": {"urn": urn},
}
res_data = graph_client._post_generic(f"{auth_session.gms_url()}/api/graphql", json)
assert res_data
assert res_data["data"]
assert res_data["data"]["dataset"]
assert res_data["data"]["dataset"]["urn"] == urn
assert len(res_data["data"]["dataset"]["outgoing"]["relationships"]) == 3
def test_graph_relationships(graph_client, auth_session):
delete_urns_from_file(graph_client, graph)
delete_urns_from_file(graph_client, graph_2)
ingest_file_via_rest(auth_session, graph)
ingest_file_via_rest(auth_session, graph_2)
_ensure_dataset_present_correctly(auth_session, graph_client)