mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-04 23:57:03 +00:00
418 lines
13 KiB
Python
418 lines
13 KiB
Python
import pathlib
|
|
from datetime import datetime, timezone
|
|
from typing import List, Optional, Tuple
|
|
|
|
import pytest
|
|
|
|
import datahub.metadata.schema_classes as models
|
|
from datahub.emitter.mcp_builder import SchemaKey
|
|
from datahub.errors import SchemaFieldKeyError
|
|
from datahub.ingestion.source.common.subtypes import DatasetSubTypes
|
|
from datahub.metadata.urns import (
|
|
CorpGroupUrn,
|
|
CorpUserUrn,
|
|
DatasetUrn,
|
|
DomainUrn,
|
|
GlossaryTermUrn,
|
|
OwnershipTypeUrn,
|
|
TagUrn,
|
|
Urn,
|
|
)
|
|
from datahub.sdk._attribution import KnownAttribution, change_default_attribution
|
|
from datahub.sdk._shared import UrnOrStr
|
|
from datahub.sdk.dataset import Dataset
|
|
from tests.test_helpers.sdk_v2_helpers import assert_entity_golden
|
|
|
|
_GOLDEN_DIR = pathlib.Path(__file__).parent / "dataset_golden"
|
|
|
|
|
|
def test_dataset_basic(pytestconfig: pytest.Config) -> None:
|
|
d = Dataset(
|
|
platform="bigquery",
|
|
name="proj.dataset.table",
|
|
subtype=DatasetSubTypes.TABLE,
|
|
schema=[
|
|
("field1", "string", "field1 description"),
|
|
("field2", "int64", "field2 description"),
|
|
],
|
|
)
|
|
|
|
# Check urn setup.
|
|
assert Dataset.get_urn_type() == DatasetUrn
|
|
assert isinstance(d.urn, DatasetUrn)
|
|
assert (
|
|
str(d.urn)
|
|
== "urn:li:dataset:(urn:li:dataPlatform:bigquery,proj.dataset.table,PROD)"
|
|
)
|
|
assert str(d.urn) in repr(d)
|
|
|
|
# Check most attributes.
|
|
assert d.platform is not None
|
|
assert d.platform.platform_name == "bigquery"
|
|
assert d.platform_instance is None
|
|
assert d.browse_path is None
|
|
assert d.tags is None
|
|
assert d.terms is None
|
|
assert d.created is None
|
|
assert d.last_modified is None
|
|
assert d.description is None
|
|
assert d.custom_properties == {}
|
|
assert d.domain is None
|
|
|
|
# TODO: The column descriptions should go in the editable fields, since we're not in ingestion mode.
|
|
assert len(d.schema) == 2
|
|
assert d["field1"].description == "field1 description"
|
|
|
|
with pytest.raises(SchemaFieldKeyError, match=r"Field .* not found"):
|
|
d["should_be_missing"]
|
|
|
|
with pytest.raises(AttributeError):
|
|
assert d.extra_attribute # type: ignore
|
|
with pytest.raises(AttributeError):
|
|
d.extra_attribute = "slots should reject extra fields" # type: ignore
|
|
with pytest.raises(AttributeError):
|
|
# This should fail. Eventually we should make it suggest calling set_owners instead.
|
|
d.owners = [] # type: ignore
|
|
|
|
assert_entity_golden(d, _GOLDEN_DIR / "test_dataset_basic_golden.json")
|
|
|
|
|
|
def _build_complex_dataset() -> Dataset:
|
|
schema = SchemaKey(
|
|
platform="snowflake",
|
|
instance="my_instance",
|
|
database="MY_DB",
|
|
schema="MY_SCHEMA",
|
|
)
|
|
db = schema.parent_key()
|
|
assert db is not None
|
|
|
|
created = datetime(2025, 1, 2, 3, 4, 5, tzinfo=timezone.utc)
|
|
updated = datetime(2025, 1, 9, 3, 4, 6, tzinfo=timezone.utc)
|
|
|
|
d = Dataset(
|
|
platform=schema.platform,
|
|
platform_instance=schema.instance,
|
|
name="my_db.my_schema.my_table",
|
|
parent_container=schema,
|
|
subtype=DatasetSubTypes.TABLE,
|
|
schema=[
|
|
("field1", "string"),
|
|
("field2", "int64", "field2 description"),
|
|
],
|
|
display_name="MY_TABLE",
|
|
qualified_name="MY_DB.MY_SCHEMA.MY_TABLE",
|
|
created=created,
|
|
last_modified=updated,
|
|
custom_properties={
|
|
"key1": "value1",
|
|
"key2": "value2",
|
|
},
|
|
description="test",
|
|
external_url="https://example.com",
|
|
owners=[
|
|
CorpUserUrn("admin@datahubproject.io"),
|
|
],
|
|
links=[
|
|
"https://example.com/doc1",
|
|
("https://example.com/doc2", "Documentation 2"),
|
|
],
|
|
tags=[
|
|
TagUrn("tag1"),
|
|
TagUrn("tag2"),
|
|
],
|
|
terms=[
|
|
GlossaryTermUrn("AccountBalance"),
|
|
],
|
|
domain=DomainUrn("Marketing"),
|
|
)
|
|
|
|
assert d.platform is not None
|
|
assert d.platform.platform_name == "snowflake"
|
|
assert d.platform_instance is not None
|
|
assert (
|
|
str(d.platform_instance)
|
|
== "urn:li:dataPlatformInstance:(urn:li:dataPlatform:snowflake,my_instance)"
|
|
)
|
|
assert schema.parent_key() is not None
|
|
assert d.browse_path == [
|
|
d.platform_instance,
|
|
db.as_urn_typed(),
|
|
schema.as_urn_typed(),
|
|
]
|
|
|
|
# Properties.
|
|
assert d.description == "test"
|
|
assert d.display_name == "MY_TABLE"
|
|
assert d.qualified_name == "MY_DB.MY_SCHEMA.MY_TABLE"
|
|
assert d.external_url == "https://example.com"
|
|
assert d.created == created
|
|
assert d.last_modified == updated
|
|
assert d.custom_properties == {"key1": "value1", "key2": "value2"}
|
|
|
|
# Check standard aspects.
|
|
assert d.subtype == "Table"
|
|
assert d.owners is not None and len(d.owners) == 1
|
|
assert d.links is not None and len(d.links) == 2
|
|
assert d.tags is not None and len(d.tags) == 2
|
|
assert d.terms is not None and len(d.terms) == 1
|
|
assert d.domain == DomainUrn("Marketing")
|
|
|
|
assert len(d.schema) == 2
|
|
|
|
# Schema field description.
|
|
assert d["field1"].description is None
|
|
assert d["field2"].description == "field2 description"
|
|
d["field1"].set_description("field1 description")
|
|
assert d["field1"].description == "field1 description"
|
|
|
|
# Schema field tags.
|
|
assert d["field1"].tags is None
|
|
d["field1"].set_tags([TagUrn("field1_tag1"), TagUrn("field1_tag2")])
|
|
assert d["field1"].tags is not None
|
|
assert len(d["field1"].tags) == 2
|
|
|
|
# Schema field terms.
|
|
assert d["field2"].terms is None
|
|
d["field2"].set_terms(
|
|
[GlossaryTermUrn("field2_term1"), GlossaryTermUrn("field2_term2")]
|
|
)
|
|
assert d["field2"].terms is not None
|
|
assert len(d["field2"].terms) == 2
|
|
|
|
# Add assertions for links
|
|
assert d.links is not None
|
|
assert len(d.links) == 2
|
|
assert d.links[0].url == "https://example.com/doc1"
|
|
assert d.links[1].url == "https://example.com/doc2"
|
|
|
|
return d
|
|
|
|
|
|
def test_dataset_complex() -> None:
|
|
d = _build_complex_dataset()
|
|
assert_entity_golden(d, _GOLDEN_DIR / "test_dataset_complex_golden.json")
|
|
|
|
|
|
def test_dataset_ingestion() -> None:
|
|
with change_default_attribution(KnownAttribution.INGESTION):
|
|
d = _build_complex_dataset()
|
|
assert_entity_golden(d, _GOLDEN_DIR / "test_dataset_ingestion_golden.json")
|
|
|
|
|
|
def _tag_names(tags: Optional[List[models.TagAssociationClass]]) -> List[str]:
|
|
if tags is None:
|
|
return []
|
|
return [TagUrn(t.tag).name for t in tags]
|
|
|
|
|
|
def test_tags_add_remove() -> None:
|
|
d = Dataset(
|
|
platform="bigquery",
|
|
name="proj.dataset.table",
|
|
schema=[
|
|
("field1", "string"),
|
|
("field2", "int64", "field2 description"),
|
|
],
|
|
tags=[TagUrn("tag1"), TagUrn("tag2")],
|
|
)
|
|
d["field1"].set_tags([TagUrn("field1_tag1"), TagUrn("field1_tag2")])
|
|
|
|
# For each loop - the second iteration should be a no-op.
|
|
|
|
# Test tag add/remove flows.
|
|
assert _tag_names(d.tags) == ["tag1", "tag2"]
|
|
for _ in range(2):
|
|
d.add_tag(TagUrn("tag3"))
|
|
assert _tag_names(d.tags) == ["tag1", "tag2", "tag3"]
|
|
for _ in range(2):
|
|
d.remove_tag(TagUrn("tag1"))
|
|
assert _tag_names(d.tags) == ["tag2", "tag3"]
|
|
|
|
# Test field tag add/remove flows.
|
|
field = d["field1"]
|
|
assert _tag_names(field.tags) == ["field1_tag1", "field1_tag2"]
|
|
for _ in range(2):
|
|
field.add_tag(TagUrn("field1_tag3"))
|
|
assert _tag_names(field.tags) == ["field1_tag1", "field1_tag2", "field1_tag3"]
|
|
for _ in range(2):
|
|
field.remove_tag(TagUrn("field1_tag1"))
|
|
assert _tag_names(field.tags) == ["field1_tag2", "field1_tag3"]
|
|
|
|
assert_entity_golden(d, _GOLDEN_DIR / "test_tags_add_remove_golden.json")
|
|
|
|
|
|
# TODO: We should have add/remove/set tests where there's tags/terms
|
|
# in both base and editable entries for a schema field.
|
|
|
|
|
|
def _term_names(
|
|
terms: Optional[List[models.GlossaryTermAssociationClass]],
|
|
) -> List[str]:
|
|
if terms is None:
|
|
return []
|
|
return [GlossaryTermUrn(t.urn).name for t in terms]
|
|
|
|
|
|
def test_terms_add_remove() -> None:
|
|
d = Dataset(
|
|
platform="bigquery",
|
|
name="proj.dataset.table",
|
|
schema=[
|
|
("field1", "string"),
|
|
("field2", "int64", "field2 description"),
|
|
],
|
|
terms=[GlossaryTermUrn("AccountBalance")],
|
|
)
|
|
d["field2"].set_terms(
|
|
[GlossaryTermUrn("field2_term1"), GlossaryTermUrn("field2_term2")]
|
|
)
|
|
|
|
# Test term add/remove flows.
|
|
assert _term_names(d.terms) == ["AccountBalance"]
|
|
for _ in range(2):
|
|
d.add_term(GlossaryTermUrn("Sensitive"))
|
|
assert _term_names(d.terms) == ["AccountBalance", "Sensitive"]
|
|
for _ in range(2):
|
|
d.remove_term(GlossaryTermUrn("AccountBalance"))
|
|
assert _term_names(d.terms) == ["Sensitive"]
|
|
|
|
# Test field term add/remove flows.
|
|
field = d["field2"]
|
|
assert _term_names(field.terms) == ["field2_term1", "field2_term2"]
|
|
for _ in range(2):
|
|
field.add_term(GlossaryTermUrn("PII"))
|
|
assert _term_names(field.terms) == ["field2_term1", "field2_term2", "PII"]
|
|
for _ in range(2):
|
|
field.remove_term(GlossaryTermUrn("field2_term1"))
|
|
assert _term_names(field.terms) == ["field2_term2", "PII"]
|
|
|
|
assert_entity_golden(d, _GOLDEN_DIR / "test_terms_add_remove_golden.json")
|
|
|
|
|
|
def _owner_names(owners: Optional[List[models.OwnerClass]]) -> List[Tuple[Urn, str]]:
|
|
if owners is None:
|
|
return []
|
|
return [(Urn.from_string(o.owner), o.typeUrn or str(o.type)) for o in owners]
|
|
|
|
|
|
def test_owners_add_remove() -> None:
|
|
admin = CorpUserUrn("admin@datahubproject.io")
|
|
group = CorpGroupUrn("group@datahubproject.io")
|
|
|
|
business = models.OwnershipTypeClass.BUSINESS_OWNER
|
|
technical = models.OwnershipTypeClass.TECHNICAL_OWNER
|
|
custom = OwnershipTypeUrn("urn:li:ownershipType:custom_1")
|
|
|
|
d = Dataset(
|
|
platform="redshift",
|
|
name="db.schema.table",
|
|
owners=[
|
|
(admin, business),
|
|
(group, technical),
|
|
],
|
|
)
|
|
assert _owner_names(d.owners) == [
|
|
(admin, business),
|
|
(group, technical),
|
|
]
|
|
|
|
# Add the admin as a technical owner.
|
|
for _ in range(2):
|
|
d.add_owner(admin)
|
|
assert _owner_names(d.owners) == [
|
|
(admin, business),
|
|
(group, technical),
|
|
(admin, technical),
|
|
]
|
|
|
|
# Add the admin as a custom owner.
|
|
for _ in range(2):
|
|
d.add_owner((admin, custom))
|
|
assert _owner_names(d.owners) == [
|
|
(admin, business),
|
|
(group, technical),
|
|
(admin, technical),
|
|
(admin, str(custom)),
|
|
]
|
|
|
|
# Type-specific removal.
|
|
for _ in range(2):
|
|
d.remove_owner((admin, technical))
|
|
assert _owner_names(d.owners) == [
|
|
(admin, business),
|
|
(group, technical),
|
|
(admin, str(custom)),
|
|
]
|
|
|
|
# This should remove both admin owner types.
|
|
for _ in range(2):
|
|
d.remove_owner(admin)
|
|
assert _owner_names(d.owners) == [(group, technical)]
|
|
|
|
assert_entity_golden(d, _GOLDEN_DIR / "test_owners_add_remove_golden.json")
|
|
|
|
|
|
def test_browse_path() -> None:
|
|
schema = SchemaKey(
|
|
platform="snowflake",
|
|
database="MY_DB",
|
|
schema="MY_SCHEMA",
|
|
)
|
|
db = schema.parent_key()
|
|
assert db is not None
|
|
|
|
path: List[UrnOrStr] = [
|
|
"Folders",
|
|
db.as_urn_typed(),
|
|
"Subfolder",
|
|
schema.as_urn_typed(),
|
|
]
|
|
d = Dataset(
|
|
platform="snowflake",
|
|
name="MY_DB.MY_SCHEMA.MY_TABLE",
|
|
parent_container=path,
|
|
)
|
|
assert d.parent_container == schema.as_urn_typed()
|
|
assert d.browse_path == path
|
|
|
|
assert_entity_golden(d, _GOLDEN_DIR / "test_browse_path_golden.json")
|
|
|
|
|
|
def test_links_add_remove() -> None:
|
|
d = Dataset(
|
|
platform="bigquery",
|
|
name="proj.dataset.table",
|
|
schema=[
|
|
("field1", "string"),
|
|
("field2", "int64", "field2 description"),
|
|
],
|
|
links=[
|
|
"https://example.com/doc1",
|
|
("https://example.com/doc2", "Documentation 2"),
|
|
],
|
|
)
|
|
|
|
# Test initial state
|
|
assert d.links is not None
|
|
assert len(d.links) == 2
|
|
assert d.links[0].url == "https://example.com/doc1"
|
|
assert d.links[0].description == "https://example.com/doc1"
|
|
assert d.links[1].url == "https://example.com/doc2"
|
|
assert d.links[1].description == "Documentation 2"
|
|
|
|
# Test link add/remove flows
|
|
for _ in range(2): # Second iteration should be a no-op
|
|
d.add_link(("https://example.com/doc3", "Documentation 3"))
|
|
assert len(d.links) == 3
|
|
assert d.links[2].url == "https://example.com/doc3"
|
|
assert d.links[2].description == "Documentation 3"
|
|
|
|
for _ in range(2): # Second iteration should be a no-op
|
|
d.remove_link("https://example.com/doc1")
|
|
assert len(d.links) == 2
|
|
assert d.links[0].url == "https://example.com/doc2"
|
|
assert d.links[1].url == "https://example.com/doc3"
|
|
|
|
assert_entity_golden(d, _GOLDEN_DIR / "test_links_add_remove_golden.json")
|