418 lines
13 KiB
Python

import pathlib
from datetime import datetime, timezone
from typing import List, Optional, Tuple
import pytest
import datahub.metadata.schema_classes as models
from datahub.emitter.mcp_builder import SchemaKey
from datahub.errors import SchemaFieldKeyError
from datahub.ingestion.source.common.subtypes import DatasetSubTypes
from datahub.metadata.urns import (
CorpGroupUrn,
CorpUserUrn,
DatasetUrn,
DomainUrn,
GlossaryTermUrn,
OwnershipTypeUrn,
TagUrn,
Urn,
)
from datahub.sdk._attribution import KnownAttribution, change_default_attribution
from datahub.sdk._shared import UrnOrStr
from datahub.sdk.dataset import Dataset
from tests.test_helpers.sdk_v2_helpers import assert_entity_golden
_GOLDEN_DIR = pathlib.Path(__file__).parent / "dataset_golden"
def test_dataset_basic(pytestconfig: pytest.Config) -> None:
d = Dataset(
platform="bigquery",
name="proj.dataset.table",
subtype=DatasetSubTypes.TABLE,
schema=[
("field1", "string", "field1 description"),
("field2", "int64", "field2 description"),
],
)
# Check urn setup.
assert Dataset.get_urn_type() == DatasetUrn
assert isinstance(d.urn, DatasetUrn)
assert (
str(d.urn)
== "urn:li:dataset:(urn:li:dataPlatform:bigquery,proj.dataset.table,PROD)"
)
assert str(d.urn) in repr(d)
# Check most attributes.
assert d.platform is not None
assert d.platform.platform_name == "bigquery"
assert d.platform_instance is None
assert d.browse_path is None
assert d.tags is None
assert d.terms is None
assert d.created is None
assert d.last_modified is None
assert d.description is None
assert d.custom_properties == {}
assert d.domain is None
# TODO: The column descriptions should go in the editable fields, since we're not in ingestion mode.
assert len(d.schema) == 2
assert d["field1"].description == "field1 description"
with pytest.raises(SchemaFieldKeyError, match=r"Field .* not found"):
d["should_be_missing"]
with pytest.raises(AttributeError):
assert d.extra_attribute # type: ignore
with pytest.raises(AttributeError):
d.extra_attribute = "slots should reject extra fields" # type: ignore
with pytest.raises(AttributeError):
# This should fail. Eventually we should make it suggest calling set_owners instead.
d.owners = [] # type: ignore
assert_entity_golden(d, _GOLDEN_DIR / "test_dataset_basic_golden.json")
def _build_complex_dataset() -> Dataset:
schema = SchemaKey(
platform="snowflake",
instance="my_instance",
database="MY_DB",
schema="MY_SCHEMA",
)
db = schema.parent_key()
assert db is not None
created = datetime(2025, 1, 2, 3, 4, 5, tzinfo=timezone.utc)
updated = datetime(2025, 1, 9, 3, 4, 6, tzinfo=timezone.utc)
d = Dataset(
platform=schema.platform,
platform_instance=schema.instance,
name="my_db.my_schema.my_table",
parent_container=schema,
subtype=DatasetSubTypes.TABLE,
schema=[
("field1", "string"),
("field2", "int64", "field2 description"),
],
display_name="MY_TABLE",
qualified_name="MY_DB.MY_SCHEMA.MY_TABLE",
created=created,
last_modified=updated,
custom_properties={
"key1": "value1",
"key2": "value2",
},
description="test",
external_url="https://example.com",
owners=[
CorpUserUrn("admin@datahubproject.io"),
],
links=[
"https://example.com/doc1",
("https://example.com/doc2", "Documentation 2"),
],
tags=[
TagUrn("tag1"),
TagUrn("tag2"),
],
terms=[
GlossaryTermUrn("AccountBalance"),
],
domain=DomainUrn("Marketing"),
)
assert d.platform is not None
assert d.platform.platform_name == "snowflake"
assert d.platform_instance is not None
assert (
str(d.platform_instance)
== "urn:li:dataPlatformInstance:(urn:li:dataPlatform:snowflake,my_instance)"
)
assert schema.parent_key() is not None
assert d.browse_path == [
d.platform_instance,
db.as_urn_typed(),
schema.as_urn_typed(),
]
# Properties.
assert d.description == "test"
assert d.display_name == "MY_TABLE"
assert d.qualified_name == "MY_DB.MY_SCHEMA.MY_TABLE"
assert d.external_url == "https://example.com"
assert d.created == created
assert d.last_modified == updated
assert d.custom_properties == {"key1": "value1", "key2": "value2"}
# Check standard aspects.
assert d.subtype == "Table"
assert d.owners is not None and len(d.owners) == 1
assert d.links is not None and len(d.links) == 2
assert d.tags is not None and len(d.tags) == 2
assert d.terms is not None and len(d.terms) == 1
assert d.domain == DomainUrn("Marketing")
assert len(d.schema) == 2
# Schema field description.
assert d["field1"].description is None
assert d["field2"].description == "field2 description"
d["field1"].set_description("field1 description")
assert d["field1"].description == "field1 description"
# Schema field tags.
assert d["field1"].tags is None
d["field1"].set_tags([TagUrn("field1_tag1"), TagUrn("field1_tag2")])
assert d["field1"].tags is not None
assert len(d["field1"].tags) == 2
# Schema field terms.
assert d["field2"].terms is None
d["field2"].set_terms(
[GlossaryTermUrn("field2_term1"), GlossaryTermUrn("field2_term2")]
)
assert d["field2"].terms is not None
assert len(d["field2"].terms) == 2
# Add assertions for links
assert d.links is not None
assert len(d.links) == 2
assert d.links[0].url == "https://example.com/doc1"
assert d.links[1].url == "https://example.com/doc2"
return d
def test_dataset_complex() -> None:
d = _build_complex_dataset()
assert_entity_golden(d, _GOLDEN_DIR / "test_dataset_complex_golden.json")
def test_dataset_ingestion() -> None:
with change_default_attribution(KnownAttribution.INGESTION):
d = _build_complex_dataset()
assert_entity_golden(d, _GOLDEN_DIR / "test_dataset_ingestion_golden.json")
def _tag_names(tags: Optional[List[models.TagAssociationClass]]) -> List[str]:
if tags is None:
return []
return [TagUrn(t.tag).name for t in tags]
def test_tags_add_remove() -> None:
d = Dataset(
platform="bigquery",
name="proj.dataset.table",
schema=[
("field1", "string"),
("field2", "int64", "field2 description"),
],
tags=[TagUrn("tag1"), TagUrn("tag2")],
)
d["field1"].set_tags([TagUrn("field1_tag1"), TagUrn("field1_tag2")])
# For each loop - the second iteration should be a no-op.
# Test tag add/remove flows.
assert _tag_names(d.tags) == ["tag1", "tag2"]
for _ in range(2):
d.add_tag(TagUrn("tag3"))
assert _tag_names(d.tags) == ["tag1", "tag2", "tag3"]
for _ in range(2):
d.remove_tag(TagUrn("tag1"))
assert _tag_names(d.tags) == ["tag2", "tag3"]
# Test field tag add/remove flows.
field = d["field1"]
assert _tag_names(field.tags) == ["field1_tag1", "field1_tag2"]
for _ in range(2):
field.add_tag(TagUrn("field1_tag3"))
assert _tag_names(field.tags) == ["field1_tag1", "field1_tag2", "field1_tag3"]
for _ in range(2):
field.remove_tag(TagUrn("field1_tag1"))
assert _tag_names(field.tags) == ["field1_tag2", "field1_tag3"]
assert_entity_golden(d, _GOLDEN_DIR / "test_tags_add_remove_golden.json")
# TODO: We should have add/remove/set tests where there's tags/terms
# in both base and editable entries for a schema field.
def _term_names(
terms: Optional[List[models.GlossaryTermAssociationClass]],
) -> List[str]:
if terms is None:
return []
return [GlossaryTermUrn(t.urn).name for t in terms]
def test_terms_add_remove() -> None:
d = Dataset(
platform="bigquery",
name="proj.dataset.table",
schema=[
("field1", "string"),
("field2", "int64", "field2 description"),
],
terms=[GlossaryTermUrn("AccountBalance")],
)
d["field2"].set_terms(
[GlossaryTermUrn("field2_term1"), GlossaryTermUrn("field2_term2")]
)
# Test term add/remove flows.
assert _term_names(d.terms) == ["AccountBalance"]
for _ in range(2):
d.add_term(GlossaryTermUrn("Sensitive"))
assert _term_names(d.terms) == ["AccountBalance", "Sensitive"]
for _ in range(2):
d.remove_term(GlossaryTermUrn("AccountBalance"))
assert _term_names(d.terms) == ["Sensitive"]
# Test field term add/remove flows.
field = d["field2"]
assert _term_names(field.terms) == ["field2_term1", "field2_term2"]
for _ in range(2):
field.add_term(GlossaryTermUrn("PII"))
assert _term_names(field.terms) == ["field2_term1", "field2_term2", "PII"]
for _ in range(2):
field.remove_term(GlossaryTermUrn("field2_term1"))
assert _term_names(field.terms) == ["field2_term2", "PII"]
assert_entity_golden(d, _GOLDEN_DIR / "test_terms_add_remove_golden.json")
def _owner_names(owners: Optional[List[models.OwnerClass]]) -> List[Tuple[Urn, str]]:
if owners is None:
return []
return [(Urn.from_string(o.owner), o.typeUrn or str(o.type)) for o in owners]
def test_owners_add_remove() -> None:
admin = CorpUserUrn("admin@datahubproject.io")
group = CorpGroupUrn("group@datahubproject.io")
business = models.OwnershipTypeClass.BUSINESS_OWNER
technical = models.OwnershipTypeClass.TECHNICAL_OWNER
custom = OwnershipTypeUrn("urn:li:ownershipType:custom_1")
d = Dataset(
platform="redshift",
name="db.schema.table",
owners=[
(admin, business),
(group, technical),
],
)
assert _owner_names(d.owners) == [
(admin, business),
(group, technical),
]
# Add the admin as a technical owner.
for _ in range(2):
d.add_owner(admin)
assert _owner_names(d.owners) == [
(admin, business),
(group, technical),
(admin, technical),
]
# Add the admin as a custom owner.
for _ in range(2):
d.add_owner((admin, custom))
assert _owner_names(d.owners) == [
(admin, business),
(group, technical),
(admin, technical),
(admin, str(custom)),
]
# Type-specific removal.
for _ in range(2):
d.remove_owner((admin, technical))
assert _owner_names(d.owners) == [
(admin, business),
(group, technical),
(admin, str(custom)),
]
# This should remove both admin owner types.
for _ in range(2):
d.remove_owner(admin)
assert _owner_names(d.owners) == [(group, technical)]
assert_entity_golden(d, _GOLDEN_DIR / "test_owners_add_remove_golden.json")
def test_browse_path() -> None:
schema = SchemaKey(
platform="snowflake",
database="MY_DB",
schema="MY_SCHEMA",
)
db = schema.parent_key()
assert db is not None
path: List[UrnOrStr] = [
"Folders",
db.as_urn_typed(),
"Subfolder",
schema.as_urn_typed(),
]
d = Dataset(
platform="snowflake",
name="MY_DB.MY_SCHEMA.MY_TABLE",
parent_container=path,
)
assert d.parent_container == schema.as_urn_typed()
assert d.browse_path == path
assert_entity_golden(d, _GOLDEN_DIR / "test_browse_path_golden.json")
def test_links_add_remove() -> None:
d = Dataset(
platform="bigquery",
name="proj.dataset.table",
schema=[
("field1", "string"),
("field2", "int64", "field2 description"),
],
links=[
"https://example.com/doc1",
("https://example.com/doc2", "Documentation 2"),
],
)
# Test initial state
assert d.links is not None
assert len(d.links) == 2
assert d.links[0].url == "https://example.com/doc1"
assert d.links[0].description == "https://example.com/doc1"
assert d.links[1].url == "https://example.com/doc2"
assert d.links[1].description == "Documentation 2"
# Test link add/remove flows
for _ in range(2): # Second iteration should be a no-op
d.add_link(("https://example.com/doc3", "Documentation 3"))
assert len(d.links) == 3
assert d.links[2].url == "https://example.com/doc3"
assert d.links[2].description == "Documentation 3"
for _ in range(2): # Second iteration should be a no-op
d.remove_link("https://example.com/doc1")
assert len(d.links) == 2
assert d.links[0].url == "https://example.com/doc2"
assert d.links[1].url == "https://example.com/doc3"
assert_entity_golden(d, _GOLDEN_DIR / "test_links_add_remove_golden.json")