2024-01-22 11:46:04 -06:00
|
|
|
import logging
|
|
|
|
import os
|
2025-01-21 09:19:48 -08:00
|
|
|
import re
|
2024-01-22 11:46:04 -06:00
|
|
|
import tempfile
|
|
|
|
from random import randint
|
|
|
|
from typing import Iterable, List, Optional, Union
|
|
|
|
|
2025-01-21 09:19:48 -08:00
|
|
|
import pydantic
|
2024-01-22 11:46:04 -06:00
|
|
|
import pytest
|
2025-01-21 12:54:10 -08:00
|
|
|
|
2024-01-22 11:46:04 -06:00
|
|
|
from datahub.api.entities.dataset.dataset import Dataset
|
2024-01-31 14:42:40 +05:30
|
|
|
from datahub.api.entities.structuredproperties.structuredproperties import (
|
|
|
|
StructuredProperties,
|
|
|
|
)
|
2025-01-21 09:19:48 -08:00
|
|
|
from datahub.configuration.common import GraphError, OperationalError
|
2024-01-22 11:46:04 -06:00
|
|
|
from datahub.emitter.mce_builder import make_dataset_urn, make_schema_field_urn
|
|
|
|
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
2024-09-27 11:31:25 -05:00
|
|
|
from datahub.ingestion.graph.client import DataHubGraph
|
2024-01-22 11:46:04 -06:00
|
|
|
from datahub.metadata.schema_classes import (
|
2024-01-31 14:42:40 +05:30
|
|
|
EntityTypeInfoClass,
|
|
|
|
PropertyValueClass,
|
|
|
|
StructuredPropertiesClass,
|
|
|
|
StructuredPropertyDefinitionClass,
|
|
|
|
StructuredPropertyValueAssignmentClass,
|
|
|
|
)
|
2024-01-22 11:46:04 -06:00
|
|
|
from datahub.specific.dataset import DatasetPatchBuilder
|
2024-01-31 14:42:40 +05:30
|
|
|
from datahub.utilities.urns.structured_properties_urn import StructuredPropertyUrn
|
2024-01-22 11:46:04 -06:00
|
|
|
from datahub.utilities.urns.urn import Urn
|
2024-01-31 14:42:40 +05:30
|
|
|
from tests.consistency_utils import wait_for_writes_to_sync
|
|
|
|
from tests.utilities.file_emitter import FileEmitter
|
|
|
|
from tests.utils import (
|
|
|
|
delete_urns,
|
|
|
|
delete_urns_from_file,
|
|
|
|
get_sleep_info,
|
|
|
|
ingest_file_via_rest,
|
|
|
|
)
|
2024-01-22 11:46:04 -06:00
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
start_index = randint(10, 10000)
|
|
|
|
dataset_urns = [
|
|
|
|
make_dataset_urn("snowflake", f"table_foo_{i}")
|
|
|
|
for i in range(start_index, start_index + 10)
|
|
|
|
]
|
|
|
|
|
|
|
|
schema_field_urns = [
|
2024-01-31 14:42:40 +05:30
|
|
|
make_schema_field_urn(dataset_urn, "column_1") for dataset_urn in dataset_urns
|
2024-01-22 11:46:04 -06:00
|
|
|
]
|
|
|
|
|
|
|
|
generated_urns = [d for d in dataset_urns] + [f for f in schema_field_urns]
|
|
|
|
|
|
|
|
|
|
|
|
default_namespace = "io.acryl.privacy"
|
|
|
|
|
2024-01-31 14:42:40 +05:30
|
|
|
|
2024-01-22 11:46:04 -06:00
|
|
|
def create_logical_entity(
|
|
|
|
entity_name: str,
|
|
|
|
) -> Iterable[MetadataChangeProposalWrapper]:
|
|
|
|
mcp = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn="urn:li:entityType:" + entity_name,
|
|
|
|
aspect=EntityTypeInfoClass(
|
|
|
|
qualifiedName="io.datahubproject." + entity_name,
|
|
|
|
displayName=entity_name,
|
|
|
|
),
|
|
|
|
)
|
|
|
|
return [mcp]
|
|
|
|
|
|
|
|
|
|
|
|
def create_test_data(filename: str):
|
|
|
|
file_emitter = FileEmitter(filename)
|
|
|
|
for mcps in create_logical_entity("dataset"):
|
|
|
|
file_emitter.emit(mcps)
|
|
|
|
|
|
|
|
file_emitter.close()
|
|
|
|
wait_for_writes_to_sync()
|
|
|
|
|
2024-01-31 14:42:40 +05:30
|
|
|
|
2024-01-22 11:46:04 -06:00
|
|
|
sleep_sec, sleep_times = get_sleep_info()
|
|
|
|
|
|
|
|
|
2024-09-27 11:31:25 -05:00
|
|
|
@pytest.fixture(scope="module")
|
|
|
|
def ingest_cleanup_data(auth_session, graph_client, request):
|
2024-01-22 11:46:04 -06:00
|
|
|
new_file, filename = tempfile.mkstemp()
|
|
|
|
try:
|
|
|
|
create_test_data(filename)
|
|
|
|
print("ingesting structured properties test data")
|
2024-09-27 11:31:25 -05:00
|
|
|
ingest_file_via_rest(auth_session, filename)
|
2024-01-22 11:46:04 -06:00
|
|
|
yield
|
|
|
|
print("removing structured properties test data")
|
2024-09-27 11:31:25 -05:00
|
|
|
delete_urns_from_file(graph_client, filename)
|
|
|
|
delete_urns(graph_client, generated_urns)
|
2024-01-22 11:46:04 -06:00
|
|
|
wait_for_writes_to_sync()
|
|
|
|
finally:
|
|
|
|
os.remove(filename)
|
|
|
|
|
|
|
|
|
|
|
|
def create_property_definition(
|
|
|
|
property_name: str,
|
|
|
|
graph: DataHubGraph,
|
|
|
|
namespace: str = default_namespace,
|
|
|
|
value_type: str = "string",
|
|
|
|
cardinality: str = "SINGLE",
|
|
|
|
allowed_values: Optional[List[PropertyValueClass]] = None,
|
|
|
|
entity_types: Optional[List[str]] = None,
|
|
|
|
):
|
|
|
|
structured_property_definition = StructuredPropertyDefinitionClass(
|
|
|
|
qualifiedName=f"{namespace}.{property_name}",
|
|
|
|
valueType=Urn.make_data_type_urn(value_type),
|
|
|
|
description="The retention policy for the dataset",
|
2024-07-30 16:33:00 -07:00
|
|
|
entityTypes=(
|
|
|
|
[Urn.make_entity_type_urn(e) for e in entity_types]
|
|
|
|
if entity_types
|
|
|
|
else [Urn.make_entity_type_urn("dataset")]
|
|
|
|
),
|
2024-01-22 11:46:04 -06:00
|
|
|
cardinality=cardinality,
|
|
|
|
allowedValues=allowed_values,
|
|
|
|
)
|
|
|
|
|
|
|
|
mcp = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=f"urn:li:structuredProperty:{namespace}.{property_name}",
|
|
|
|
aspect=structured_property_definition,
|
|
|
|
)
|
|
|
|
graph.emit(mcp)
|
|
|
|
wait_for_writes_to_sync()
|
|
|
|
|
|
|
|
|
|
|
|
def attach_property_to_entity(
|
|
|
|
urn: str,
|
|
|
|
property_name: str,
|
2024-07-30 16:33:00 -07:00
|
|
|
property_value: Union[str, float, List[Union[str, float]]],
|
2024-01-22 11:46:04 -06:00
|
|
|
graph: DataHubGraph,
|
2024-01-31 14:42:40 +05:30
|
|
|
namespace: str = default_namespace,
|
2024-01-22 11:46:04 -06:00
|
|
|
):
|
|
|
|
if isinstance(property_value, list):
|
|
|
|
property_values: List[Union[str, float]] = property_value
|
|
|
|
else:
|
|
|
|
property_values = [property_value]
|
|
|
|
|
|
|
|
mcp = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=urn,
|
|
|
|
aspect=StructuredPropertiesClass(
|
|
|
|
properties=[
|
|
|
|
StructuredPropertyValueAssignmentClass(
|
|
|
|
propertyUrn=f"urn:li:structuredProperty:{namespace}.{property_name}",
|
|
|
|
values=property_values,
|
|
|
|
)
|
|
|
|
]
|
|
|
|
),
|
|
|
|
)
|
|
|
|
graph.emit_mcp(mcp)
|
|
|
|
wait_for_writes_to_sync()
|
|
|
|
|
|
|
|
|
|
|
|
def get_property_from_entity(
|
|
|
|
urn: str,
|
|
|
|
property_name: str,
|
|
|
|
graph: DataHubGraph,
|
|
|
|
):
|
2024-01-31 14:42:40 +05:30
|
|
|
structured_properties: Optional[StructuredPropertiesClass] = graph.get_aspect(
|
|
|
|
urn, StructuredPropertiesClass
|
|
|
|
)
|
2024-01-22 11:46:04 -06:00
|
|
|
assert structured_properties is not None
|
|
|
|
for property in structured_properties.properties:
|
2024-01-31 14:42:40 +05:30
|
|
|
if property.propertyUrn == f"urn:li:structuredProperty:{property_name}":
|
2024-01-22 11:46:04 -06:00
|
|
|
return property.values
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
2024-10-30 13:41:09 -05:00
|
|
|
def to_es_filter_name(
|
|
|
|
property_name=None, namespace=default_namespace, qualified_name=None
|
|
|
|
):
|
2024-06-17 19:53:54 -05:00
|
|
|
if property_name:
|
2024-10-30 13:41:09 -05:00
|
|
|
return f"structuredProperties.{namespace}.{property_name}"
|
2024-06-17 19:53:54 -05:00
|
|
|
else:
|
2024-10-30 13:41:09 -05:00
|
|
|
return f"structuredProperties.{qualified_name}"
|
2024-06-17 19:53:54 -05:00
|
|
|
|
|
|
|
|
2024-09-27 11:31:25 -05:00
|
|
|
def test_structured_property_string(ingest_cleanup_data, graph_client):
|
2024-02-21 18:16:42 -06:00
|
|
|
property_name = f"retention{randint(10, 10000)}Policy"
|
2024-01-22 11:46:04 -06:00
|
|
|
|
2024-09-27 11:31:25 -05:00
|
|
|
create_property_definition(property_name, graph_client)
|
2024-01-22 11:46:04 -06:00
|
|
|
|
2024-09-27 11:31:25 -05:00
|
|
|
attach_property_to_entity(
|
|
|
|
dataset_urns[0], property_name, ["30d"], graph=graph_client
|
|
|
|
)
|
2024-01-31 14:42:40 +05:30
|
|
|
|
2025-01-21 09:19:48 -08:00
|
|
|
with pytest.raises(OperationalError):
|
|
|
|
# Cannot add a number to a string property.
|
2024-09-27 11:31:25 -05:00
|
|
|
attach_property_to_entity(
|
|
|
|
dataset_urns[0], property_name, 200030, graph=graph_client
|
|
|
|
)
|
2024-01-22 11:46:04 -06:00
|
|
|
|
|
|
|
|
2024-09-27 11:31:25 -05:00
|
|
|
def test_structured_property_double(ingest_cleanup_data, graph_client):
|
2024-02-21 18:16:42 -06:00
|
|
|
property_name = f"expiryTime{randint(10, 10000)}"
|
|
|
|
|
2024-09-27 11:31:25 -05:00
|
|
|
create_property_definition(property_name, graph_client, value_type="number")
|
2024-01-22 11:46:04 -06:00
|
|
|
|
2024-09-27 11:31:25 -05:00
|
|
|
attach_property_to_entity(
|
|
|
|
dataset_urns[0], property_name, 2000034, graph=graph_client
|
|
|
|
)
|
2024-01-22 11:46:04 -06:00
|
|
|
|
2025-01-21 09:19:48 -08:00
|
|
|
with pytest.raises(OperationalError):
|
|
|
|
# Cannot add a string to a number property.
|
2024-01-22 11:46:04 -06:00
|
|
|
attach_property_to_entity(
|
2024-09-27 11:31:25 -05:00
|
|
|
dataset_urns[0], property_name, "30 days", graph=graph_client
|
2024-01-22 11:46:04 -06:00
|
|
|
)
|
|
|
|
|
2025-01-21 09:19:48 -08:00
|
|
|
with pytest.raises(OperationalError):
|
|
|
|
# Cannot add a list to a number property.
|
2024-01-22 11:46:04 -06:00
|
|
|
attach_property_to_entity(
|
2024-09-27 11:31:25 -05:00
|
|
|
dataset_urns[0], property_name, [2000034, 2000035], graph=graph_client
|
2024-01-22 11:46:04 -06:00
|
|
|
)
|
2025-01-21 09:19:48 -08:00
|
|
|
|
|
|
|
|
2024-09-27 11:31:25 -05:00
|
|
|
def test_structured_property_double_multiple(ingest_cleanup_data, graph_client):
|
2024-02-21 18:16:42 -06:00
|
|
|
property_name = f"versions{randint(10, 10000)}"
|
2024-01-22 11:46:04 -06:00
|
|
|
|
|
|
|
create_property_definition(
|
2024-09-27 11:31:25 -05:00
|
|
|
property_name, graph_client, value_type="number", cardinality="MULTIPLE"
|
2024-01-22 11:46:04 -06:00
|
|
|
)
|
|
|
|
|
2024-09-27 11:31:25 -05:00
|
|
|
attach_property_to_entity(
|
|
|
|
dataset_urns[0], property_name, [1.0, 2.0], graph=graph_client
|
|
|
|
)
|
2024-01-22 11:46:04 -06:00
|
|
|
|
|
|
|
|
2024-09-27 11:31:25 -05:00
|
|
|
def test_structured_property_string_allowed_values(ingest_cleanup_data, graph_client):
|
2024-02-21 18:16:42 -06:00
|
|
|
property_name = f"enumProperty{randint(10, 10000)}"
|
2024-01-22 11:46:04 -06:00
|
|
|
|
|
|
|
create_property_definition(
|
|
|
|
property_name,
|
2024-09-27 11:31:25 -05:00
|
|
|
graph_client,
|
2024-01-22 11:46:04 -06:00
|
|
|
value_type="string",
|
|
|
|
cardinality="MULTIPLE",
|
|
|
|
allowed_values=[
|
|
|
|
PropertyValueClass(value="foo"),
|
|
|
|
PropertyValueClass(value="bar"),
|
|
|
|
],
|
|
|
|
)
|
|
|
|
|
|
|
|
attach_property_to_entity(
|
2024-09-27 11:31:25 -05:00
|
|
|
dataset_urns[0], property_name, ["foo", "bar"], graph=graph_client
|
2024-01-22 11:46:04 -06:00
|
|
|
)
|
|
|
|
|
2025-01-21 09:19:48 -08:00
|
|
|
with pytest.raises(
|
|
|
|
OperationalError, match=re.escape("value: {string=baz} should be one of [")
|
|
|
|
):
|
|
|
|
# Cannot add a value that isn't in the allowed values list.
|
2024-01-22 11:46:04 -06:00
|
|
|
attach_property_to_entity(
|
2024-09-27 11:31:25 -05:00
|
|
|
dataset_urns[0], property_name, ["foo", "baz"], graph=graph_client
|
2024-01-22 11:46:04 -06:00
|
|
|
)
|
|
|
|
|
|
|
|
|
2024-09-27 11:31:25 -05:00
|
|
|
def test_structured_property_definition_evolution(ingest_cleanup_data, graph_client):
|
2024-02-21 18:16:42 -06:00
|
|
|
property_name = f"enumProperty{randint(10, 10000)}"
|
2024-01-22 11:46:04 -06:00
|
|
|
|
|
|
|
create_property_definition(
|
|
|
|
property_name,
|
2024-09-27 11:31:25 -05:00
|
|
|
graph_client,
|
2024-01-22 11:46:04 -06:00
|
|
|
value_type="string",
|
|
|
|
cardinality="MULTIPLE",
|
|
|
|
allowed_values=[
|
|
|
|
PropertyValueClass(value="foo"),
|
|
|
|
PropertyValueClass(value="bar"),
|
|
|
|
],
|
|
|
|
)
|
|
|
|
|
2025-01-21 09:19:48 -08:00
|
|
|
with pytest.raises(OperationalError):
|
|
|
|
# Cannot change cardinality from MULTIPLE to SINGLE.
|
2024-01-22 11:46:04 -06:00
|
|
|
create_property_definition(
|
|
|
|
property_name,
|
2024-09-27 11:31:25 -05:00
|
|
|
graph_client,
|
2024-01-22 11:46:04 -06:00
|
|
|
value_type="string",
|
|
|
|
cardinality="SINGLE",
|
|
|
|
allowed_values=[
|
|
|
|
PropertyValueClass(value="foo"),
|
|
|
|
PropertyValueClass(value="bar"),
|
|
|
|
],
|
|
|
|
)
|
|
|
|
|
|
|
|
|
2024-09-27 11:31:25 -05:00
|
|
|
def test_structured_property_schema_field(ingest_cleanup_data, graph_client):
|
2024-01-31 14:42:40 +05:30
|
|
|
property_name = f"deprecationDate{randint(10, 10000)}"
|
2024-01-22 11:46:04 -06:00
|
|
|
|
|
|
|
create_property_definition(
|
|
|
|
property_name,
|
2024-09-27 11:31:25 -05:00
|
|
|
graph_client,
|
2024-01-22 11:46:04 -06:00
|
|
|
namespace="io.datahubproject.test",
|
|
|
|
value_type="date",
|
|
|
|
entity_types=["schemaField"],
|
|
|
|
)
|
|
|
|
|
|
|
|
attach_property_to_entity(
|
2024-01-31 14:42:40 +05:30
|
|
|
schema_field_urns[0],
|
|
|
|
property_name,
|
|
|
|
"2020-10-01",
|
2024-09-27 11:31:25 -05:00
|
|
|
graph=graph_client,
|
2024-01-31 14:42:40 +05:30
|
|
|
namespace="io.datahubproject.test",
|
2024-01-22 11:46:04 -06:00
|
|
|
)
|
|
|
|
|
2024-01-31 14:42:40 +05:30
|
|
|
assert get_property_from_entity(
|
2024-09-27 11:31:25 -05:00
|
|
|
schema_field_urns[0],
|
|
|
|
f"io.datahubproject.test.{property_name}",
|
|
|
|
graph=graph_client,
|
2024-01-31 14:42:40 +05:30
|
|
|
) == ["2020-10-01"]
|
2024-01-22 11:46:04 -06:00
|
|
|
|
2025-01-21 09:19:48 -08:00
|
|
|
with pytest.raises(OperationalError):
|
|
|
|
# Cannot add a number to a date property.
|
2024-01-22 11:46:04 -06:00
|
|
|
attach_property_to_entity(
|
2024-01-31 14:42:40 +05:30
|
|
|
schema_field_urns[0],
|
|
|
|
property_name,
|
|
|
|
200030,
|
2024-09-27 11:31:25 -05:00
|
|
|
graph=graph_client,
|
2024-01-31 14:42:40 +05:30
|
|
|
namespace="io.datahubproject.test",
|
2024-01-22 11:46:04 -06:00
|
|
|
)
|
|
|
|
|
|
|
|
|
2024-11-16 13:45:39 -08:00
|
|
|
def test_structured_properties_yaml_load_with_bad_entity_type(
|
|
|
|
ingest_cleanup_data, graph_client
|
|
|
|
):
|
2025-01-21 09:19:48 -08:00
|
|
|
with pytest.raises(
|
|
|
|
pydantic.ValidationError,
|
|
|
|
match="urn:li:entityType:dataset is not a valid entity type urn",
|
|
|
|
):
|
2024-11-16 13:45:39 -08:00
|
|
|
StructuredProperties.create(
|
|
|
|
"tests/structured_properties/bad_entity_type.yaml",
|
|
|
|
graph=graph_client,
|
|
|
|
)
|
|
|
|
|
|
|
|
|
2024-09-27 11:31:25 -05:00
|
|
|
def test_dataset_yaml_loader(ingest_cleanup_data, graph_client):
|
2024-01-22 11:46:04 -06:00
|
|
|
StructuredProperties.create(
|
2024-09-27 11:31:25 -05:00
|
|
|
"tests/structured_properties/test_structured_properties.yaml",
|
|
|
|
graph=graph_client,
|
2024-01-22 11:46:04 -06:00
|
|
|
)
|
|
|
|
|
2024-01-31 14:42:40 +05:30
|
|
|
for dataset in Dataset.from_yaml("tests/structured_properties/test_dataset.yaml"):
|
2024-01-22 11:46:04 -06:00
|
|
|
for mcp in dataset.generate_mcp():
|
2024-09-27 11:31:25 -05:00
|
|
|
graph_client.emit(mcp)
|
|
|
|
wait_for_writes_to_sync()
|
2024-01-22 11:46:04 -06:00
|
|
|
|
|
|
|
property_name = "io.acryl.dataManagement.deprecationDate"
|
2025-03-04 23:47:51 +05:30
|
|
|
field_name = "[version=2.0].[type=ClickEvent].[type=string].ip"
|
2024-01-31 14:42:40 +05:30
|
|
|
assert get_property_from_entity(
|
2025-03-04 23:47:51 +05:30
|
|
|
make_schema_field_urn(make_dataset_urn("hive", "user.clicks"), field_name),
|
2024-01-31 14:42:40 +05:30
|
|
|
property_name,
|
2024-09-27 11:31:25 -05:00
|
|
|
graph=graph_client,
|
2024-01-31 14:42:40 +05:30
|
|
|
) == ["2023-01-01"]
|
2024-01-22 11:46:04 -06:00
|
|
|
|
|
|
|
dataset = Dataset.from_datahub(
|
2024-09-27 11:31:25 -05:00
|
|
|
graph=graph_client,
|
2024-01-22 11:46:04 -06:00
|
|
|
urn="urn:li:dataset:(urn:li:dataPlatform:hive,user.clicks,PROD)",
|
|
|
|
)
|
2024-01-31 14:42:40 +05:30
|
|
|
assert dataset.schema_metadata is not None
|
|
|
|
assert dataset.schema_metadata.fields is not None
|
2024-01-22 11:46:04 -06:00
|
|
|
matching_fields = [
|
|
|
|
f
|
|
|
|
for f in dataset.schema_metadata.fields
|
2025-03-04 23:47:51 +05:30
|
|
|
if f.id is not None and f.id == Dataset._simplify_field_path(field_name)
|
2024-01-22 11:46:04 -06:00
|
|
|
]
|
|
|
|
assert len(matching_fields) == 1
|
2024-01-31 14:42:40 +05:30
|
|
|
assert matching_fields[0].structured_properties is not None
|
2025-03-04 23:47:51 +05:30
|
|
|
assert (
|
|
|
|
matching_fields[0].structured_properties[
|
|
|
|
"io.acryl.dataManagement.deprecationDate"
|
|
|
|
]
|
|
|
|
== "2023-01-01"
|
|
|
|
)
|
2024-01-22 11:46:04 -06:00
|
|
|
|
|
|
|
|
2024-09-27 11:31:25 -05:00
|
|
|
def test_structured_property_search(
|
|
|
|
ingest_cleanup_data, graph_client: DataHubGraph, caplog
|
|
|
|
):
|
2024-01-22 11:46:04 -06:00
|
|
|
# Attach structured property to entity and to field
|
|
|
|
field_property_name = f"deprecationDate{randint(10, 10000)}"
|
|
|
|
|
|
|
|
create_property_definition(
|
|
|
|
namespace="io.datahubproject.test",
|
|
|
|
property_name=field_property_name,
|
2024-09-27 11:31:25 -05:00
|
|
|
graph=graph_client,
|
2024-01-31 14:42:40 +05:30
|
|
|
value_type="date",
|
|
|
|
entity_types=["schemaField"],
|
|
|
|
)
|
2024-01-22 11:46:04 -06:00
|
|
|
|
|
|
|
attach_property_to_entity(
|
2024-01-31 14:42:40 +05:30
|
|
|
schema_field_urns[0],
|
|
|
|
field_property_name,
|
|
|
|
"2020-10-01",
|
2024-09-27 11:31:25 -05:00
|
|
|
graph=graph_client,
|
2024-01-31 14:42:40 +05:30
|
|
|
namespace="io.datahubproject.test",
|
2024-01-22 11:46:04 -06:00
|
|
|
)
|
2024-02-21 18:16:42 -06:00
|
|
|
dataset_property_name = f"replicationSLA{randint(10, 10000)}"
|
2024-01-22 11:46:04 -06:00
|
|
|
property_value = 30
|
|
|
|
value_type = "number"
|
|
|
|
|
2024-01-31 14:42:40 +05:30
|
|
|
create_property_definition(
|
2024-09-27 11:31:25 -05:00
|
|
|
property_name=dataset_property_name, graph=graph_client, value_type=value_type
|
2024-01-31 14:42:40 +05:30
|
|
|
)
|
2024-01-22 11:46:04 -06:00
|
|
|
|
2024-01-31 14:42:40 +05:30
|
|
|
attach_property_to_entity(
|
2024-09-27 11:31:25 -05:00
|
|
|
dataset_urns[0], dataset_property_name, [property_value], graph=graph_client
|
2024-01-31 14:42:40 +05:30
|
|
|
)
|
2024-01-22 11:46:04 -06:00
|
|
|
|
|
|
|
# [] = default entities which includes datasets, does not include fields
|
2024-01-31 14:42:40 +05:30
|
|
|
entity_urns = list(
|
2024-09-27 11:31:25 -05:00
|
|
|
graph_client.get_urns_by_filter(
|
2024-01-31 14:42:40 +05:30
|
|
|
extraFilters=[
|
|
|
|
{
|
2024-10-30 13:41:09 -05:00
|
|
|
"field": to_es_filter_name(dataset_property_name),
|
2024-01-31 14:42:40 +05:30
|
|
|
"negated": "false",
|
|
|
|
"condition": "EXISTS",
|
|
|
|
}
|
|
|
|
]
|
|
|
|
)
|
|
|
|
)
|
2024-01-22 11:46:04 -06:00
|
|
|
assert len(entity_urns) == 1
|
|
|
|
assert entity_urns[0] == dataset_urns[0]
|
|
|
|
|
|
|
|
# Search over schema field specifically
|
2024-09-27 11:31:25 -05:00
|
|
|
field_structured_prop = graph_client.get_aspect(
|
2024-01-31 14:42:40 +05:30
|
|
|
entity_urn=schema_field_urns[0], aspect_type=StructuredPropertiesClass
|
|
|
|
)
|
2024-01-22 11:46:04 -06:00
|
|
|
assert field_structured_prop == StructuredPropertiesClass(
|
|
|
|
properties=[
|
|
|
|
StructuredPropertyValueAssignmentClass(
|
|
|
|
propertyUrn=f"urn:li:structuredProperty:io.datahubproject.test.{field_property_name}",
|
2024-01-31 14:42:40 +05:30
|
|
|
values=["2020-10-01"],
|
2024-01-22 11:46:04 -06:00
|
|
|
)
|
|
|
|
]
|
|
|
|
)
|
|
|
|
|
|
|
|
# Search over entities that do not include the field
|
2024-01-31 14:42:40 +05:30
|
|
|
field_urns = list(
|
2024-09-27 11:31:25 -05:00
|
|
|
graph_client.get_urns_by_filter(
|
2024-01-31 14:42:40 +05:30
|
|
|
entity_types=["tag"],
|
|
|
|
extraFilters=[
|
|
|
|
{
|
2024-10-30 13:41:09 -05:00
|
|
|
"field": to_es_filter_name(
|
2024-01-31 14:42:40 +05:30
|
|
|
field_property_name, namespace="io.datahubproject.test"
|
|
|
|
),
|
|
|
|
"negated": "false",
|
|
|
|
"condition": "EXISTS",
|
|
|
|
}
|
|
|
|
],
|
|
|
|
)
|
|
|
|
)
|
2024-01-22 11:46:04 -06:00
|
|
|
assert len(field_urns) == 0
|
|
|
|
|
|
|
|
# OR the two properties together to return both results
|
2024-01-31 14:42:40 +05:30
|
|
|
field_urns = list(
|
2024-09-27 11:31:25 -05:00
|
|
|
graph_client.get_urns_by_filter(
|
2024-01-31 14:42:40 +05:30
|
|
|
entity_types=["dataset", "tag"],
|
|
|
|
extraFilters=[
|
|
|
|
{
|
2024-10-30 13:41:09 -05:00
|
|
|
"field": to_es_filter_name(dataset_property_name),
|
2024-01-31 14:42:40 +05:30
|
|
|
"negated": "false",
|
|
|
|
"condition": "EXISTS",
|
|
|
|
}
|
|
|
|
],
|
|
|
|
)
|
|
|
|
)
|
2024-01-22 11:46:04 -06:00
|
|
|
assert len(field_urns) == 1
|
|
|
|
assert dataset_urns[0] in field_urns
|
|
|
|
|
|
|
|
|
2024-09-27 11:31:25 -05:00
|
|
|
def test_dataset_structured_property_patch(ingest_cleanup_data, graph_client, caplog):
|
2024-02-21 18:16:42 -06:00
|
|
|
# Create 1st Property
|
|
|
|
property_name = f"replicationSLA{randint(10, 10000)}"
|
|
|
|
property_value1 = 30.0
|
|
|
|
property_value2 = 100.0
|
2024-01-22 11:46:04 -06:00
|
|
|
value_type = "number"
|
2024-02-21 18:16:42 -06:00
|
|
|
cardinality = "MULTIPLE"
|
|
|
|
|
|
|
|
create_property_definition(
|
|
|
|
property_name=property_name,
|
2024-09-27 11:31:25 -05:00
|
|
|
graph=graph_client,
|
2024-02-21 18:16:42 -06:00
|
|
|
value_type=value_type,
|
|
|
|
cardinality=cardinality,
|
|
|
|
)
|
|
|
|
|
|
|
|
# Create 2nd Property
|
|
|
|
property_name_other = f"replicationSLAOther{randint(10, 10000)}"
|
|
|
|
property_value_other = 200.0
|
|
|
|
create_property_definition(
|
|
|
|
property_name=property_name_other,
|
2024-09-27 11:31:25 -05:00
|
|
|
graph=graph_client,
|
2024-02-21 18:16:42 -06:00
|
|
|
value_type=value_type,
|
|
|
|
cardinality=cardinality,
|
|
|
|
)
|
|
|
|
|
|
|
|
def patch_one(prop_name, prop_value):
|
|
|
|
dataset_patcher: DatasetPatchBuilder = DatasetPatchBuilder(urn=dataset_urns[0])
|
|
|
|
dataset_patcher.set_structured_property(
|
|
|
|
StructuredPropertyUrn.make_structured_property_urn(
|
|
|
|
f"{default_namespace}.{prop_name}"
|
|
|
|
),
|
|
|
|
prop_value,
|
|
|
|
)
|
|
|
|
|
|
|
|
for mcp in dataset_patcher.build():
|
2024-09-27 11:31:25 -05:00
|
|
|
graph_client.emit(mcp)
|
2024-02-21 18:16:42 -06:00
|
|
|
wait_for_writes_to_sync()
|
|
|
|
|
|
|
|
# Add 1 value for property 1
|
|
|
|
patch_one(property_name, property_value1)
|
|
|
|
|
|
|
|
actual_property_values = get_property_from_entity(
|
2024-09-27 11:31:25 -05:00
|
|
|
dataset_urns[0], f"{default_namespace}.{property_name}", graph=graph_client
|
2024-02-21 18:16:42 -06:00
|
|
|
)
|
|
|
|
assert actual_property_values == [property_value1]
|
|
|
|
|
|
|
|
# Add 1 value for property 2
|
|
|
|
patch_one(property_name_other, property_value_other)
|
|
|
|
|
|
|
|
actual_property_values = get_property_from_entity(
|
2024-09-27 11:31:25 -05:00
|
|
|
dataset_urns[0],
|
|
|
|
f"{default_namespace}.{property_name_other}",
|
|
|
|
graph=graph_client,
|
2024-02-21 18:16:42 -06:00
|
|
|
)
|
|
|
|
assert actual_property_values == [property_value_other]
|
|
|
|
|
|
|
|
# Add 2 values to property 1
|
|
|
|
patch_one(property_name, [property_value1, property_value2])
|
|
|
|
|
|
|
|
actual_property_values = set(
|
|
|
|
get_property_from_entity(
|
2024-09-27 11:31:25 -05:00
|
|
|
dataset_urns[0], f"{default_namespace}.{property_name}", graph=graph_client
|
2024-02-21 18:16:42 -06:00
|
|
|
)
|
|
|
|
)
|
|
|
|
assert actual_property_values == {property_value1, property_value2}
|
|
|
|
|
|
|
|
# Validate property 2 is the same
|
|
|
|
actual_property_values = get_property_from_entity(
|
2024-09-27 11:31:25 -05:00
|
|
|
dataset_urns[0],
|
|
|
|
f"{default_namespace}.{property_name_other}",
|
|
|
|
graph=graph_client,
|
2024-02-21 18:16:42 -06:00
|
|
|
)
|
|
|
|
assert actual_property_values == [property_value_other]
|
|
|
|
|
|
|
|
|
|
|
|
def test_dataset_structured_property_soft_delete_validation(
|
2024-09-27 11:31:25 -05:00
|
|
|
ingest_cleanup_data, graph_client, caplog
|
2024-02-21 18:16:42 -06:00
|
|
|
):
|
|
|
|
property_name = f"softDeleteTest{randint(10, 10000)}Property"
|
|
|
|
value_type = "string"
|
|
|
|
property_urn = f"urn:li:structuredProperty:{default_namespace}.{property_name}"
|
|
|
|
|
|
|
|
create_property_definition(
|
|
|
|
property_name=property_name,
|
2024-09-27 11:31:25 -05:00
|
|
|
graph=graph_client,
|
2024-02-21 18:16:42 -06:00
|
|
|
value_type=value_type,
|
|
|
|
cardinality="SINGLE",
|
2024-01-31 14:42:40 +05:30
|
|
|
)
|
2024-01-22 11:46:04 -06:00
|
|
|
|
2024-09-27 11:31:25 -05:00
|
|
|
test_property = StructuredProperties.from_datahub(
|
|
|
|
graph=graph_client, urn=property_urn
|
|
|
|
)
|
2024-02-21 18:16:42 -06:00
|
|
|
assert test_property is not None
|
|
|
|
|
2024-09-27 11:31:25 -05:00
|
|
|
graph_client.soft_delete_entity(urn=property_urn)
|
2024-02-21 18:16:42 -06:00
|
|
|
|
|
|
|
# Attempt to modify soft deleted definition
|
2025-01-21 09:19:48 -08:00
|
|
|
with pytest.raises(
|
|
|
|
OperationalError,
|
|
|
|
match="Cannot mutate a soft deleted Structured Property Definition",
|
|
|
|
):
|
2024-02-21 18:16:42 -06:00
|
|
|
create_property_definition(
|
|
|
|
property_name=property_name,
|
2024-09-27 11:31:25 -05:00
|
|
|
graph=graph_client,
|
2024-02-21 18:16:42 -06:00
|
|
|
value_type=value_type,
|
|
|
|
cardinality="SINGLE",
|
|
|
|
)
|
|
|
|
|
|
|
|
# Attempt to add soft deleted structured property to entity
|
2025-01-21 09:19:48 -08:00
|
|
|
with pytest.raises(
|
|
|
|
OperationalError, match="Cannot apply a soft deleted Structured Property value"
|
|
|
|
):
|
2024-02-21 18:16:42 -06:00
|
|
|
attach_property_to_entity(
|
2024-09-27 11:31:25 -05:00
|
|
|
dataset_urns[0], property_name, "test string", graph=graph_client
|
2024-02-21 18:16:42 -06:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
def test_dataset_structured_property_soft_delete_read_mutation(
|
2024-09-27 11:31:25 -05:00
|
|
|
ingest_cleanup_data, graph_client, caplog
|
2024-02-21 18:16:42 -06:00
|
|
|
):
|
|
|
|
property_name = f"softDeleteReadTest{randint(10, 10000)}Property"
|
|
|
|
value_type = "string"
|
|
|
|
property_urn = f"urn:li:structuredProperty:{default_namespace}.{property_name}"
|
|
|
|
property_value = "test string"
|
|
|
|
|
|
|
|
# Create property on a dataset
|
|
|
|
create_property_definition(
|
|
|
|
property_name=property_name,
|
2024-09-27 11:31:25 -05:00
|
|
|
graph=graph_client,
|
2024-02-21 18:16:42 -06:00
|
|
|
value_type=value_type,
|
|
|
|
cardinality="SINGLE",
|
|
|
|
)
|
|
|
|
attach_property_to_entity(
|
2024-09-27 11:31:25 -05:00
|
|
|
dataset_urns[0], property_name, property_value, graph=graph_client
|
2024-02-21 18:16:42 -06:00
|
|
|
)
|
|
|
|
|
|
|
|
# Make sure it exists on the dataset
|
|
|
|
actual_property_values = get_property_from_entity(
|
2024-09-27 11:31:25 -05:00
|
|
|
dataset_urns[0], f"{default_namespace}.{property_name}", graph=graph_client
|
2024-02-21 18:16:42 -06:00
|
|
|
)
|
|
|
|
assert actual_property_values == [property_value]
|
|
|
|
|
|
|
|
# Soft delete the structured property
|
2024-09-27 11:31:25 -05:00
|
|
|
graph_client.soft_delete_entity(urn=property_urn)
|
2024-01-22 11:46:04 -06:00
|
|
|
wait_for_writes_to_sync()
|
|
|
|
|
2024-02-21 18:16:42 -06:00
|
|
|
# Make sure it is no longer returned on the dataset
|
|
|
|
actual_property_values = get_property_from_entity(
|
2024-09-27 11:31:25 -05:00
|
|
|
dataset_urns[0], f"{default_namespace}.{property_name}", graph=graph_client
|
2024-02-21 18:16:42 -06:00
|
|
|
)
|
|
|
|
assert actual_property_values is None
|
|
|
|
|
|
|
|
|
|
|
|
def test_dataset_structured_property_soft_delete_search_filter_validation(
|
2025-01-21 09:19:48 -08:00
|
|
|
ingest_cleanup_data, graph_client: DataHubGraph, caplog: pytest.LogCaptureFixture
|
2024-02-21 18:16:42 -06:00
|
|
|
):
|
|
|
|
# Create a test structured property
|
|
|
|
dataset_property_name = f"softDeleteSearchFilter{randint(10, 10000)}"
|
|
|
|
property_value = 30
|
|
|
|
value_type = "number"
|
|
|
|
property_urn = (
|
|
|
|
f"urn:li:structuredProperty:{default_namespace}.{dataset_property_name}"
|
|
|
|
)
|
|
|
|
|
|
|
|
create_property_definition(
|
2024-09-27 11:31:25 -05:00
|
|
|
property_name=dataset_property_name, graph=graph_client, value_type=value_type
|
2024-02-21 18:16:42 -06:00
|
|
|
)
|
|
|
|
attach_property_to_entity(
|
2024-09-27 11:31:25 -05:00
|
|
|
dataset_urns[0], dataset_property_name, [property_value], graph=graph_client
|
2024-02-21 18:16:42 -06:00
|
|
|
)
|
|
|
|
|
|
|
|
# Perform search, make sure it works
|
|
|
|
entity_urns = list(
|
2024-09-27 11:31:25 -05:00
|
|
|
graph_client.get_urns_by_filter(
|
2024-02-21 18:16:42 -06:00
|
|
|
extraFilters=[
|
|
|
|
{
|
2024-10-30 13:41:09 -05:00
|
|
|
"field": to_es_filter_name(property_name=dataset_property_name),
|
2024-02-21 18:16:42 -06:00
|
|
|
"negated": "false",
|
|
|
|
"condition": "EXISTS",
|
|
|
|
}
|
|
|
|
]
|
|
|
|
)
|
|
|
|
)
|
|
|
|
assert len(entity_urns) == 1
|
|
|
|
assert entity_urns[0] == dataset_urns[0]
|
|
|
|
|
|
|
|
# Soft delete the structured property
|
2024-09-27 11:31:25 -05:00
|
|
|
graph_client.soft_delete_entity(urn=property_urn)
|
2024-02-21 18:16:42 -06:00
|
|
|
wait_for_writes_to_sync()
|
|
|
|
|
|
|
|
# Perform search, make sure it validates filter and rejects as invalid request
|
2025-01-21 09:19:48 -08:00
|
|
|
with pytest.raises(
|
|
|
|
GraphError, match="Cannot filter on deleted Structured Property"
|
|
|
|
):
|
2024-02-21 18:16:42 -06:00
|
|
|
list(
|
2024-09-27 11:31:25 -05:00
|
|
|
graph_client.get_urns_by_filter(
|
2024-02-21 18:16:42 -06:00
|
|
|
extraFilters=[
|
|
|
|
{
|
2024-10-30 13:41:09 -05:00
|
|
|
"field": to_es_filter_name(property_name=dataset_property_name),
|
2024-02-21 18:16:42 -06:00
|
|
|
"negated": "false",
|
|
|
|
"condition": "EXISTS",
|
|
|
|
}
|
|
|
|
]
|
2024-01-22 11:46:04 -06:00
|
|
|
)
|
2024-02-21 18:16:42 -06:00
|
|
|
)
|
2024-06-17 19:53:54 -05:00
|
|
|
|
|
|
|
|
2024-09-27 11:31:25 -05:00
|
|
|
def test_dataset_structured_property_delete(ingest_cleanup_data, graph_client, caplog):
|
2024-06-17 19:53:54 -05:00
|
|
|
# Create property, assign value to target dataset urn
|
|
|
|
def create_property(target_dataset, prop_value):
|
|
|
|
property_name = f"hardDeleteTest{randint(10, 10000)}Property"
|
|
|
|
value_type = "string"
|
|
|
|
property_urn = f"urn:li:structuredProperty:{default_namespace}.{property_name}"
|
|
|
|
|
|
|
|
create_property_definition(
|
|
|
|
property_name=property_name,
|
2024-09-27 11:31:25 -05:00
|
|
|
graph=graph_client,
|
2024-06-17 19:53:54 -05:00
|
|
|
value_type=value_type,
|
|
|
|
cardinality="SINGLE",
|
|
|
|
)
|
|
|
|
|
2024-09-27 11:31:25 -05:00
|
|
|
test_property = StructuredProperties.from_datahub(
|
|
|
|
graph=graph_client, urn=property_urn
|
|
|
|
)
|
2024-06-17 19:53:54 -05:00
|
|
|
assert test_property is not None
|
|
|
|
|
|
|
|
# assign
|
|
|
|
dataset_patcher: DatasetPatchBuilder = DatasetPatchBuilder(urn=target_dataset)
|
|
|
|
dataset_patcher.set_structured_property(
|
|
|
|
StructuredPropertyUrn.make_structured_property_urn(property_urn),
|
|
|
|
prop_value,
|
|
|
|
)
|
|
|
|
for mcp in dataset_patcher.build():
|
2024-09-27 11:31:25 -05:00
|
|
|
graph_client.emit(mcp)
|
2024-06-17 19:53:54 -05:00
|
|
|
|
|
|
|
return test_property
|
|
|
|
|
|
|
|
# create and assign 2 structured properties with values
|
|
|
|
property1 = create_property(dataset_urns[0], "foo")
|
|
|
|
property2 = create_property(dataset_urns[0], "bar")
|
|
|
|
wait_for_writes_to_sync()
|
|
|
|
|
|
|
|
# validate #1 & #2 properties assigned
|
|
|
|
assert get_property_from_entity(
|
|
|
|
dataset_urns[0],
|
|
|
|
property1.qualified_name,
|
2024-09-27 11:31:25 -05:00
|
|
|
graph=graph_client,
|
2024-06-17 19:53:54 -05:00
|
|
|
) == ["foo"]
|
|
|
|
assert get_property_from_entity(
|
|
|
|
dataset_urns[0],
|
|
|
|
property2.qualified_name,
|
2024-09-27 11:31:25 -05:00
|
|
|
graph=graph_client,
|
2024-06-17 19:53:54 -05:00
|
|
|
) == ["bar"]
|
|
|
|
|
|
|
|
def validate_search(qualified_name, expected):
|
|
|
|
entity_urns = list(
|
2024-09-27 11:31:25 -05:00
|
|
|
graph_client.get_urns_by_filter(
|
2024-06-17 19:53:54 -05:00
|
|
|
extraFilters=[
|
|
|
|
{
|
2024-10-30 13:41:09 -05:00
|
|
|
"field": to_es_filter_name(qualified_name=qualified_name),
|
2024-06-17 19:53:54 -05:00
|
|
|
"negated": "false",
|
|
|
|
"condition": "EXISTS",
|
|
|
|
}
|
|
|
|
]
|
|
|
|
)
|
|
|
|
)
|
|
|
|
assert entity_urns == expected
|
|
|
|
|
|
|
|
# Validate search works for property #1 & #2
|
|
|
|
validate_search(property1.qualified_name, expected=[dataset_urns[0]])
|
|
|
|
validate_search(property2.qualified_name, expected=[dataset_urns[0]])
|
|
|
|
|
|
|
|
# delete the structured property #1
|
2024-09-27 11:31:25 -05:00
|
|
|
graph_client.hard_delete_entity(urn=property1.urn)
|
2024-06-17 19:53:54 -05:00
|
|
|
wait_for_writes_to_sync()
|
|
|
|
|
|
|
|
# validate property #1 deleted and property #2 remains
|
|
|
|
assert (
|
|
|
|
get_property_from_entity(
|
|
|
|
dataset_urns[0],
|
|
|
|
property1.qualified_name,
|
2024-09-27 11:31:25 -05:00
|
|
|
graph=graph_client,
|
2024-06-17 19:53:54 -05:00
|
|
|
)
|
|
|
|
is None
|
|
|
|
)
|
|
|
|
assert get_property_from_entity(
|
|
|
|
dataset_urns[0],
|
|
|
|
property2.qualified_name,
|
2024-09-27 11:31:25 -05:00
|
|
|
graph=graph_client,
|
2024-06-17 19:53:54 -05:00
|
|
|
) == ["bar"]
|
|
|
|
|
|
|
|
# assert property 1 definition was removed
|
2024-09-27 11:31:25 -05:00
|
|
|
property1_definition = graph_client.get_aspect(
|
2024-06-17 19:53:54 -05:00
|
|
|
property1.urn, StructuredPropertyDefinitionClass
|
|
|
|
)
|
|
|
|
assert property1_definition is None
|
|
|
|
|
|
|
|
wait_for_writes_to_sync()
|
|
|
|
# Validate search works for property #1 & #2
|
|
|
|
validate_search(property1.qualified_name, expected=[])
|
|
|
|
validate_search(property2.qualified_name, expected=[dataset_urns[0]])
|
2025-01-09 07:32:22 -08:00
|
|
|
|
|
|
|
|
|
|
|
def test_structured_properties_list(ingest_cleanup_data, graph_client, caplog):
|
|
|
|
# Create property, assign value to target dataset urn
|
|
|
|
def create_property():
|
|
|
|
property_name = f"listTest{randint(10, 10000)}Property"
|
|
|
|
value_type = "string"
|
|
|
|
property_urn = f"urn:li:structuredProperty:{default_namespace}.{property_name}"
|
|
|
|
|
|
|
|
create_property_definition(
|
|
|
|
property_name=property_name,
|
|
|
|
graph=graph_client,
|
|
|
|
value_type=value_type,
|
|
|
|
cardinality="SINGLE",
|
|
|
|
)
|
|
|
|
|
|
|
|
test_property = StructuredProperties.from_datahub(
|
|
|
|
graph=graph_client, urn=property_urn
|
|
|
|
)
|
|
|
|
assert test_property is not None
|
|
|
|
|
|
|
|
return test_property
|
|
|
|
|
|
|
|
# create 2 structured properties
|
|
|
|
property1 = create_property()
|
|
|
|
property2 = create_property()
|
|
|
|
wait_for_writes_to_sync()
|
|
|
|
|
|
|
|
# validate that urns are in the list
|
|
|
|
structured_properties_urns = [
|
|
|
|
u for u in StructuredProperties.list_urns(graph_client)
|
|
|
|
]
|
|
|
|
assert property1.urn in structured_properties_urns
|
|
|
|
assert property2.urn in structured_properties_urns
|
|
|
|
|
|
|
|
# list structured properties (full)
|
|
|
|
structured_properties = StructuredProperties.list(graph_client)
|
|
|
|
matched_properties = [
|
|
|
|
p for p in structured_properties if p.urn in [property1.urn, property2.urn]
|
|
|
|
]
|
|
|
|
assert len(matched_properties) == 2
|
|
|
|
retrieved_property1 = next(p for p in matched_properties if p.urn == property1.urn)
|
|
|
|
retrieved_property2 = next(p for p in matched_properties if p.urn == property2.urn)
|
|
|
|
|
|
|
|
assert property1.dict() == retrieved_property1.dict()
|
|
|
|
assert property2.dict() == retrieved_property2.dict()
|