2024-07-25 20:06:14 +01:00
|
|
|
from typing import Optional
|
|
|
|
|
2022-05-22 13:28:39 -07:00
|
|
|
import pytest
|
2022-11-15 20:03:11 -06:00
|
|
|
import tenacity
|
2025-01-17 23:50:13 +05:30
|
|
|
|
2024-09-27 11:31:25 -05:00
|
|
|
from datahub.ingestion.graph.client import DataHubGraph
|
2025-04-02 10:22:14 +02:00
|
|
|
from datahub.metadata.schema_classes import (
|
|
|
|
DatasetPropertiesClass,
|
|
|
|
KafkaSchemaClass,
|
|
|
|
OwnershipClass,
|
|
|
|
SchemaMetadataClass,
|
|
|
|
SystemMetadataClass,
|
|
|
|
)
|
2024-09-27 11:31:25 -05:00
|
|
|
from tests.utils import delete_urns_from_file, get_sleep_info, ingest_file_via_rest
|
2022-07-14 22:04:06 +05:30
|
|
|
|
2022-11-15 20:03:11 -06:00
|
|
|
sleep_sec, sleep_times = get_sleep_info()
|
|
|
|
|
|
|
|
|
|
|
|
graph = "test_resources/graph_data.json"
|
|
|
|
graph_2 = "test_resources/graph_dataDiff.json"
|
2022-05-22 13:28:39 -07:00
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture(scope="module", autouse=False)
|
2024-09-27 11:31:25 -05:00
|
|
|
def ingest_cleanup_data(graph_client, auth_session, request):
|
2022-11-15 20:03:11 -06:00
|
|
|
print("removing graph test data")
|
2024-09-27 11:31:25 -05:00
|
|
|
delete_urns_from_file(graph_client, "tests/cli/graph_data.json")
|
2022-05-22 13:28:39 -07:00
|
|
|
print("ingesting graph test data")
|
2024-09-27 11:31:25 -05:00
|
|
|
ingest_file_via_rest(auth_session, "tests/cli/graph_data.json")
|
2022-05-22 13:28:39 -07:00
|
|
|
yield
|
|
|
|
print("removing graph test data")
|
2024-09-27 11:31:25 -05:00
|
|
|
delete_urns_from_file(graph_client, "tests/cli/graph_data.json")
|
2022-05-22 13:28:39 -07:00
|
|
|
|
|
|
|
|
2024-09-27 11:31:25 -05:00
|
|
|
def test_get_aspect_v2(graph_client, ingest_cleanup_data):
|
2022-05-22 13:28:39 -07:00
|
|
|
urn = "urn:li:dataset:(urn:li:dataPlatform:kafka,test-rollback,PROD)"
|
2024-09-27 11:31:25 -05:00
|
|
|
schema_metadata: Optional[SchemaMetadataClass] = graph_client.get_aspect_v2(
|
2022-05-22 13:28:39 -07:00
|
|
|
urn, aspect="schemaMetadata", aspect_type=SchemaMetadataClass
|
|
|
|
)
|
|
|
|
|
|
|
|
assert schema_metadata is not None
|
|
|
|
assert schema_metadata.platform == "urn:li:dataPlatform:kafka"
|
|
|
|
assert isinstance(schema_metadata.platformSchema, KafkaSchemaClass)
|
|
|
|
k_schema: KafkaSchemaClass = schema_metadata.platformSchema
|
2022-06-30 16:00:50 +05:30
|
|
|
assert (
|
|
|
|
k_schema.documentSchema
|
|
|
|
== '{"type":"record","name":"SampleKafkaSchema","namespace":"com.linkedin.dataset","doc":"Sample Kafka dataset","fields":[{"name":"field_foo","type":["string"]},{"name":"field_bar","type":["boolean"]}]}'
|
|
|
|
)
|
2022-11-15 20:03:11 -06:00
|
|
|
|
|
|
|
|
2025-04-02 10:22:14 +02:00
|
|
|
def test_get_entities_v3(graph_client, ingest_cleanup_data):
|
|
|
|
ownership_aspect_name = "ownership"
|
|
|
|
dataset_properties_aspect_name = "datasetProperties"
|
|
|
|
urn = "urn:li:dataset:(urn:li:dataPlatform:kafka,test-rollback,PROD)"
|
|
|
|
entities = graph_client.get_entities(
|
|
|
|
entity_name="dataset",
|
|
|
|
urns=[urn],
|
|
|
|
aspects=[ownership_aspect_name, dataset_properties_aspect_name],
|
|
|
|
)
|
|
|
|
|
|
|
|
assert entities
|
|
|
|
assert len(entities) == 1 and urn in entities
|
|
|
|
assert (
|
|
|
|
len(entities[urn]) == 2
|
|
|
|
and ownership_aspect_name in entities[urn]
|
|
|
|
and dataset_properties_aspect_name in entities[urn]
|
|
|
|
and isinstance(entities[urn][ownership_aspect_name][0], OwnershipClass)
|
|
|
|
and isinstance(
|
|
|
|
entities[urn][dataset_properties_aspect_name][0], DatasetPropertiesClass
|
|
|
|
)
|
|
|
|
and entities[urn][ownership_aspect_name][1] is None
|
|
|
|
and entities[urn][dataset_properties_aspect_name][1] is None
|
|
|
|
)
|
|
|
|
assert {
|
|
|
|
owner.owner for owner in entities[urn][ownership_aspect_name][0].owners
|
|
|
|
} == {
|
|
|
|
"urn:li:corpuser:datahub",
|
|
|
|
"urn:li:corpuser:jdoe",
|
|
|
|
}
|
|
|
|
assert not entities[urn][dataset_properties_aspect_name][0].description
|
|
|
|
assert entities[urn][dataset_properties_aspect_name][0].customProperties == {
|
|
|
|
"prop1": "fakeprop",
|
|
|
|
"prop2": "pikachu",
|
|
|
|
}
|
|
|
|
|
|
|
|
# Test with system metadata
|
|
|
|
entities_with_metadata = graph_client.get_entities(
|
|
|
|
entity_name="dataset",
|
|
|
|
urns=[urn],
|
|
|
|
aspects=[ownership_aspect_name],
|
|
|
|
with_system_metadata=True,
|
|
|
|
)
|
|
|
|
|
|
|
|
assert entities_with_metadata
|
|
|
|
assert len(entities_with_metadata) == 1 and urn in entities_with_metadata
|
|
|
|
assert (
|
|
|
|
ownership_aspect_name in entities_with_metadata[urn]
|
|
|
|
and entities_with_metadata[urn][ownership_aspect_name][0]
|
|
|
|
and isinstance(
|
|
|
|
entities_with_metadata[urn][ownership_aspect_name][0], OwnershipClass
|
|
|
|
)
|
|
|
|
and entities_with_metadata[urn][ownership_aspect_name][1]
|
|
|
|
and isinstance(
|
|
|
|
entities_with_metadata[urn][ownership_aspect_name][1], SystemMetadataClass
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
2022-11-15 20:03:11 -06:00
|
|
|
@tenacity.retry(
|
|
|
|
stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec)
|
|
|
|
)
|
2024-09-27 11:31:25 -05:00
|
|
|
def _ensure_dataset_present_correctly(auth_session, graph_client: DataHubGraph):
|
2022-11-15 20:03:11 -06:00
|
|
|
urn = "urn:li:dataset:(urn:li:dataPlatform:graph,graph-test,PROD)"
|
|
|
|
json = {
|
|
|
|
"query": """query getDataset($urn: String!) {\n
|
|
|
|
dataset(urn: $urn) {\n
|
|
|
|
urn\n
|
|
|
|
name\n
|
|
|
|
description\n
|
|
|
|
platform {\n
|
|
|
|
urn\n
|
|
|
|
}\n
|
|
|
|
schemaMetadata {\n
|
|
|
|
name\n
|
|
|
|
version\n
|
|
|
|
createdAt\n
|
|
|
|
}\n
|
|
|
|
outgoing: relationships(\n
|
|
|
|
input: { types: ["SchemaFieldTaggedWith"], direction: OUTGOING, start: 0, count: 10000 }\n
|
|
|
|
) {\n
|
|
|
|
start\n
|
|
|
|
count\n
|
|
|
|
total\n
|
|
|
|
relationships {\n
|
|
|
|
type\n
|
|
|
|
direction\n
|
|
|
|
entity {\n
|
|
|
|
urn\n
|
|
|
|
type\n
|
|
|
|
}\n
|
|
|
|
}\n
|
|
|
|
}\n
|
|
|
|
}\n
|
|
|
|
}""",
|
|
|
|
"variables": {"urn": urn},
|
|
|
|
}
|
2024-09-27 11:31:25 -05:00
|
|
|
res_data = graph_client._post_generic(f"{auth_session.gms_url()}/api/graphql", json)
|
2022-11-15 20:03:11 -06:00
|
|
|
|
|
|
|
assert res_data
|
|
|
|
assert res_data["data"]
|
|
|
|
assert res_data["data"]["dataset"]
|
|
|
|
assert res_data["data"]["dataset"]["urn"] == urn
|
|
|
|
assert len(res_data["data"]["dataset"]["outgoing"]["relationships"]) == 3
|
|
|
|
|
|
|
|
|
2024-09-27 11:31:25 -05:00
|
|
|
def test_graph_relationships(graph_client, auth_session):
|
|
|
|
delete_urns_from_file(graph_client, graph)
|
|
|
|
delete_urns_from_file(graph_client, graph_2)
|
|
|
|
ingest_file_via_rest(auth_session, graph)
|
|
|
|
ingest_file_via_rest(auth_session, graph_2)
|
|
|
|
_ensure_dataset_present_correctly(auth_session, graph_client)
|