feat(sdk): add scaffolding for sdk v2 (#12554)

This commit is contained in:
Harshal Sheth 2025-02-06 11:17:21 -08:00 committed by GitHub
parent a98d4c2a1e
commit 4448fc24b7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 2902 additions and 1 deletions

View File

@ -15,6 +15,9 @@ warn_unused_configs = yes
disallow_untyped_defs = no
# try to be a bit more strict in certain areas of the codebase
[mypy-datahub]
# Only for datahub's __init__.py - allow implicit reexport
implicit_reexport = yes
[mypy-datahub.*]
ignore_missing_imports = no
implicit_reexport = no
@ -54,7 +57,7 @@ addopts = --cov=src --cov-report= --cov-config setup.cfg --strict-markers -p no:
markers =
slow: marks tests that are slow to run, including all docker-based tests (deselect with '-m not slow')
integration: marks all integration tests, across all batches (deselect with '-m "not integration"')
integration_batch_0: mark tests to run in batch 0 of integration tests. This is done mainly for parallelisation in CI. Batch 0 is the default batch.
integration_batch_0: mark tests to run in batch 0 of integration tests. This is done mainly for parallelization in CI. Batch 0 is the default batch.
integration_batch_1: mark tests to run in batch 1 of integration tests
integration_batch_2: mark tests to run in batch 2 of integration tests
testpaths =

View File

@ -0,0 +1,35 @@
from datahub.configuration.common import MetaError
# TODO: Move all other error types to this file.
class SdkUsageError(MetaError):
pass
class AlreadyExistsError(SdkUsageError):
pass
class ItemNotFoundError(SdkUsageError):
pass
class MultipleItemsFoundError(SdkUsageError):
pass
class SchemaFieldKeyError(SdkUsageError, KeyError):
pass
class IngestionAttributionWarning(Warning):
pass
class MultipleSubtypesWarning(Warning):
pass
class ExperimentalWarning(Warning):
pass

View File

@ -0,0 +1,33 @@
import warnings
import datahub.metadata.schema_classes as models
from datahub.errors import ExperimentalWarning, SdkUsageError
from datahub.ingestion.graph.config import DatahubClientConfig
from datahub.metadata.urns import (
ChartUrn,
ContainerUrn,
CorpGroupUrn,
CorpUserUrn,
DashboardUrn,
DataPlatformInstanceUrn,
DataPlatformUrn,
DatasetUrn,
DomainUrn,
GlossaryTermUrn,
SchemaFieldUrn,
TagUrn,
)
from datahub.sdk.container import Container
from datahub.sdk.dataset import Dataset
from datahub.sdk.main_client import DataHubClient
warnings.warn(
"The new datahub SDK (e.g. datahub.sdk.*) is experimental. "
"Our typical backwards-compatibility and stability guarantees do not apply to this code. "
"When it's promoted to stable, the import path will change "
"from `from datahub.sdk import ...` to `from datahub import ...`.",
ExperimentalWarning,
stacklevel=2,
)
del warnings
del ExperimentalWarning

View File

@ -0,0 +1,15 @@
from typing import Dict, List, Type
from datahub.sdk._entity import Entity
from datahub.sdk.container import Container
from datahub.sdk.dataset import Dataset
# TODO: Is there a better way to declare this?
ENTITY_CLASSES_LIST: List[Type[Entity]] = [
Container,
Dataset,
]
ENTITY_CLASSES: Dict[str, Type[Entity]] = {
cls.get_urn_type().ENTITY_TYPE: cls for cls in ENTITY_CLASSES_LIST
}

View File

@ -0,0 +1,48 @@
from __future__ import annotations
import contextlib
from typing import Iterator
from datahub.utilities.str_enum import StrEnum
class KnownAttribution(StrEnum):
INGESTION = "INGESTION"
INGESTION_ALTERNATE = "INGESTION_ALTERNATE"
UI = "UI"
SDK = "SDK"
PROPAGATION = "PROPAGATION"
def is_ingestion(self) -> bool:
return self in (
KnownAttribution.INGESTION,
KnownAttribution.INGESTION_ALTERNATE,
)
_default_attribution = KnownAttribution.SDK
def get_default_attribution() -> KnownAttribution:
return _default_attribution
def set_default_attribution(attribution: KnownAttribution) -> None:
global _default_attribution
_default_attribution = attribution
@contextlib.contextmanager
def change_default_attribution(attribution: KnownAttribution) -> Iterator[None]:
old_attribution = get_default_attribution()
try:
set_default_attribution(attribution)
yield
finally:
set_default_attribution(old_attribution)
def is_ingestion_attribution() -> bool:
return get_default_attribution().is_ingestion()

View File

@ -0,0 +1,89 @@
import abc
from typing import List, Optional, Type, Union
from typing_extensions import Self
import datahub.metadata.schema_classes as models
from datahub.emitter.mce_builder import Aspect as AspectTypeVar
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.errors import SdkUsageError
from datahub.metadata.urns import Urn
from datahub.utilities.urns._urn_base import _SpecificUrn
class Entity:
__slots__ = ("_urn", "_prev_aspects", "_aspects")
def __init__(self, /, urn: Urn):
# This method is not meant for direct usage.
if type(self) is Entity:
raise SdkUsageError(f"{Entity.__name__} cannot be instantiated directly.")
assert isinstance(urn, self.get_urn_type())
self._urn: _SpecificUrn = urn
# prev_aspects is None means this was created from scratch
self._prev_aspects: Optional[models.AspectBag] = None
self._aspects: models.AspectBag = {}
@classmethod
def _new_from_graph(cls, urn: Urn, current_aspects: models.AspectBag) -> Self:
# If an init method from a subclass adds required fields, it also needs to override this method.
# An alternative approach would call cls.__new__() to bypass the init method, but it's a bit
# too hacky for my taste.
entity = cls(urn=urn)
return entity._init_from_graph(current_aspects)
def _init_from_graph(self, current_aspects: models.AspectBag) -> Self:
self._prev_aspects = current_aspects
aspect: models._Aspect
for aspect_name, aspect in (current_aspects or {}).items(): # type: ignore
aspect_copy = type(aspect).from_obj(aspect.to_obj())
self._aspects[aspect_name] = aspect_copy # type: ignore
return self
@classmethod
@abc.abstractmethod
def get_urn_type(cls) -> Type[_SpecificUrn]: ...
@property
def urn(self) -> _SpecificUrn:
return self._urn
def _get_aspect(
self,
aspect_type: Type[AspectTypeVar],
/,
) -> Optional[AspectTypeVar]:
return self._aspects.get(aspect_type.ASPECT_NAME) # type: ignore
def _set_aspect(self, value: AspectTypeVar, /) -> None:
self._aspects[value.ASPECT_NAME] = value # type: ignore
def _setdefault_aspect(self, default_aspect: AspectTypeVar, /) -> AspectTypeVar:
# Similar semantics to dict.setdefault.
if existing_aspect := self._get_aspect(type(default_aspect)):
return existing_aspect
self._set_aspect(default_aspect)
return default_aspect
def _as_mcps(
self,
change_type: Union[str, models.ChangeTypeClass] = models.ChangeTypeClass.UPSERT,
) -> List[MetadataChangeProposalWrapper]:
urn_str = str(self.urn)
mcps = []
for aspect in self._aspects.values():
assert isinstance(aspect, models._Aspect)
mcps.append(
MetadataChangeProposalWrapper(
entityUrn=urn_str,
aspect=aspect,
changeType=change_type,
)
)
return mcps
def __repr__(self) -> str:
return f"{self.__class__.__name__}('{self.urn}')"

View File

@ -0,0 +1,338 @@
import warnings
from datetime import datetime
from typing import (
TYPE_CHECKING,
List,
Optional,
Tuple,
Union,
)
from typing_extensions import TypeAlias
import datahub.metadata.schema_classes as models
from datahub.emitter.mce_builder import (
make_ts_millis,
make_user_urn,
parse_ts_millis,
validate_ownership_type,
)
from datahub.emitter.mcp_builder import ContainerKey
from datahub.errors import MultipleSubtypesWarning, SdkUsageError
from datahub.metadata.urns import (
CorpGroupUrn,
CorpUserUrn,
DataJobUrn,
DataPlatformInstanceUrn,
DataPlatformUrn,
DatasetUrn,
DomainUrn,
GlossaryTermUrn,
OwnershipTypeUrn,
TagUrn,
Urn,
)
from datahub.sdk._entity import Entity
from datahub.utilities.urns.error import InvalidUrnError
if TYPE_CHECKING:
from datahub.sdk.container import Container
UrnOrStr: TypeAlias = Union[Urn, str]
DatasetUrnOrStr: TypeAlias = Union[str, DatasetUrn]
DatajobUrnOrStr: TypeAlias = Union[str, DataJobUrn]
ActorUrn: TypeAlias = Union[CorpUserUrn, CorpGroupUrn]
def make_time_stamp(ts: Optional[datetime]) -> Optional[models.TimeStampClass]:
if ts is None:
return None
return models.TimeStampClass(time=make_ts_millis(ts))
def parse_time_stamp(ts: Optional[models.TimeStampClass]) -> Optional[datetime]:
if ts is None:
return None
return parse_ts_millis(ts.time)
class HasPlatformInstance(Entity):
__slots__ = ()
def _set_platform_instance(
self,
platform: Union[str, DataPlatformUrn],
instance: Union[None, str, DataPlatformInstanceUrn],
) -> None:
platform = DataPlatformUrn(platform)
if instance is not None:
try:
instance = DataPlatformInstanceUrn.from_string(instance)
except InvalidUrnError:
if not isinstance(
instance, DataPlatformInstanceUrn
): # redundant check to make mypy happy
instance = DataPlatformInstanceUrn(platform, instance)
# At this point, instance is either None or a DataPlatformInstanceUrn.
self._set_aspect(
models.DataPlatformInstanceClass(
platform=platform.urn(),
instance=instance.urn() if instance else None,
)
)
@property
def platform_instance(self) -> Optional[DataPlatformInstanceUrn]:
dataPlatformInstance = self._get_aspect(models.DataPlatformInstanceClass)
if dataPlatformInstance and dataPlatformInstance.instance:
return DataPlatformInstanceUrn.from_string(dataPlatformInstance.instance)
return None
class HasSubtype(Entity):
__slots__ = ()
@property
def subtype(self) -> Optional[str]:
subtypes = self._get_aspect(models.SubTypesClass)
if subtypes and subtypes.typeNames:
if len(subtypes.typeNames) > 1:
warnings.warn(
f"The entity {self.urn} has multiple subtypes: {subtypes.typeNames}. "
"Only the first subtype will be considered.",
MultipleSubtypesWarning,
stacklevel=2,
)
return subtypes.typeNames[0]
return None
def set_subtype(self, subtype: str) -> None:
self._set_aspect(models.SubTypesClass(typeNames=[subtype]))
OwnershipTypeType: TypeAlias = Union[str, OwnershipTypeUrn]
OwnerInputType: TypeAlias = Union[
str,
ActorUrn,
Tuple[Union[str, ActorUrn], OwnershipTypeType],
models.OwnerClass,
]
OwnersInputType: TypeAlias = List[OwnerInputType]
class HasOwnership(Entity):
__slots__ = ()
@staticmethod
def _parse_owner_class(owner: OwnerInputType) -> models.OwnerClass:
if isinstance(owner, models.OwnerClass):
return owner
owner_type = models.OwnershipTypeClass.TECHNICAL_OWNER
owner_type_urn = None
if isinstance(owner, tuple):
raw_owner, raw_owner_type = owner
if isinstance(raw_owner_type, OwnershipTypeUrn):
owner_type = models.OwnershipTypeClass.CUSTOM
owner_type_urn = str(raw_owner_type)
else:
owner_type, owner_type_urn = validate_ownership_type(raw_owner_type)
else:
raw_owner = owner
if isinstance(raw_owner, str):
# Tricky: this will gracefully handle a user passing in a group urn as a string.
# TODO: is this the right behavior? or should we require a valid urn here?
return models.OwnerClass(
owner=make_user_urn(raw_owner),
type=owner_type,
typeUrn=owner_type_urn,
)
elif isinstance(raw_owner, Urn):
return models.OwnerClass(
owner=str(raw_owner),
type=owner_type,
typeUrn=owner_type_urn,
)
else:
raise SdkUsageError(
f"Invalid owner {owner}: {type(owner)} is not a valid owner type"
)
# TODO: Return a custom type with deserialized urns, instead of the raw aspect.
# Ideally we'd also use first-class ownership type urns here, not strings.
@property
def owners(self) -> Optional[List[models.OwnerClass]]:
if owners_aspect := self._get_aspect(models.OwnershipClass):
return owners_aspect.owners
return None
def set_owners(self, owners: OwnersInputType) -> None:
# TODO: add docs on the default parsing + default ownership type
parsed_owners = [self._parse_owner_class(owner) for owner in owners]
self._set_aspect(models.OwnershipClass(owners=parsed_owners))
ContainerInputType: TypeAlias = Union["Container", ContainerKey]
class HasContainer(Entity):
__slots__ = ()
def _set_container(self, container: Optional[ContainerInputType]) -> None:
# We need to allow container to be None. It won't happen for datasets much, but
# will be required for root containers.
from datahub.sdk.container import Container
browse_path: List[Union[str, models.BrowsePathEntryClass]] = []
if isinstance(container, Container):
container_urn = container.urn.urn()
parent_browse_path = container._get_aspect(models.BrowsePathsV2Class)
if parent_browse_path is None:
raise SdkUsageError(
"Parent container does not have a browse path, so cannot generate one for its children."
)
browse_path = [
*parent_browse_path.path,
models.BrowsePathEntryClass(
id=container_urn,
urn=container_urn,
),
]
elif container is not None:
container_urn = container.as_urn()
browse_path_reversed = [container_urn]
parent_key = container.parent_key()
while parent_key is not None:
browse_path_reversed.append(parent_key.as_urn())
parent_key = parent_key.parent_key()
browse_path = list(reversed(browse_path_reversed))
else:
container_urn = None
browse_path = []
if container_urn:
self._set_aspect(models.ContainerClass(container=container_urn))
self._set_aspect(
models.BrowsePathsV2Class(
path=[
(
entry
if isinstance(entry, models.BrowsePathEntryClass)
else models.BrowsePathEntryClass(
id=entry,
urn=entry,
)
)
for entry in browse_path
]
)
)
TagInputType: TypeAlias = Union[str, TagUrn, models.TagAssociationClass]
TagsInputType: TypeAlias = List[TagInputType]
class HasTags(Entity):
__slots__ = ()
# TODO: Return a custom type with deserialized urns, instead of the raw aspect.
@property
def tags(self) -> Optional[List[models.TagAssociationClass]]:
if tags := self._get_aspect(models.GlobalTagsClass):
return tags.tags
return None
@classmethod
def _parse_tag_association_class(
cls, tag: TagInputType
) -> models.TagAssociationClass:
if isinstance(tag, models.TagAssociationClass):
return tag
elif isinstance(tag, str):
assert TagUrn.from_string(tag)
return models.TagAssociationClass(tag=str(tag))
def set_tags(self, tags: TagsInputType) -> None:
self._set_aspect(
models.GlobalTagsClass(
tags=[self._parse_tag_association_class(tag) for tag in tags]
)
)
TermInputType: TypeAlias = Union[
str, GlossaryTermUrn, models.GlossaryTermAssociationClass
]
TermsInputType: TypeAlias = List[TermInputType]
class HasTerms(Entity):
__slots__ = ()
# TODO: Return a custom type with deserialized urns, instead of the raw aspect.
@property
def terms(self) -> Optional[List[models.GlossaryTermAssociationClass]]:
if glossary_terms := self._get_aspect(models.GlossaryTermsClass):
return glossary_terms.terms
return None
@classmethod
def _parse_glossary_term_association_class(
cls, term: TermInputType
) -> models.GlossaryTermAssociationClass:
if isinstance(term, models.GlossaryTermAssociationClass):
return term
elif isinstance(term, str):
assert GlossaryTermUrn.from_string(term)
return models.GlossaryTermAssociationClass(urn=str(term))
@classmethod
def _terms_audit_stamp(self) -> models.AuditStampClass:
return models.AuditStampClass(
time=0,
# TODO figure out what to put here
actor=CorpUserUrn("__ingestion").urn(),
)
def set_terms(self, terms: TermsInputType) -> None:
self._set_aspect(
models.GlossaryTermsClass(
terms=[
self._parse_glossary_term_association_class(term) for term in terms
],
auditStamp=self._terms_audit_stamp(),
)
)
DomainInputType: TypeAlias = Union[str, DomainUrn]
class HasDomain(Entity):
__slots__ = ()
@property
def domain(self) -> Optional[DomainUrn]:
if domains := self._get_aspect(models.DomainsClass):
if len(domains.domains) > 1:
raise SdkUsageError(
f"The entity has multiple domains set, but only one is supported: {domains.domains}"
)
elif domains.domains:
domain_str = domains.domains[0]
return DomainUrn.from_string(domain_str)
return None
def set_domain(self, domain: DomainInputType) -> None:
domain_urn = DomainUrn.from_string(domain) # basically a type assertion
self._set_aspect(models.DomainsClass(domains=[str(domain_urn)]))

View File

@ -0,0 +1,193 @@
from __future__ import annotations
from datetime import datetime
from typing import Dict, Optional, Type
from typing_extensions import Self
import datahub.metadata.schema_classes as models
from datahub.emitter.mce_builder import ALL_ENV_TYPES
from datahub.emitter.mcp_builder import (
_INCLUDE_ENV_IN_CONTAINER_PROPERTIES,
ContainerKey,
)
from datahub.errors import SdkUsageError
from datahub.metadata.urns import (
ContainerUrn,
Urn,
)
from datahub.sdk._entity import Entity
from datahub.sdk._shared import (
DomainInputType,
HasContainer,
HasDomain,
HasOwnership,
HasPlatformInstance,
HasSubtype,
HasTags,
HasTerms,
OwnersInputType,
TagsInputType,
TermsInputType,
make_time_stamp,
parse_time_stamp,
)
class Container(
HasPlatformInstance,
HasSubtype,
HasContainer,
HasOwnership,
HasTags,
HasTerms,
HasDomain,
Entity,
):
__slots__ = ()
@classmethod
def get_urn_type(cls) -> Type[ContainerUrn]:
return ContainerUrn
def __init__(
self,
/,
# Identity.
container_key: ContainerKey | ContainerUrn,
*,
# Container attributes.
display_name: str,
qualified_name: Optional[str] = None,
description: Optional[str] = None,
external_url: Optional[str] = None,
# TODO: call this custom properties?
extra_properties: Optional[Dict[str, str]] = None,
created: Optional[datetime] = None,
last_modified: Optional[datetime] = None,
# Standard aspects.
subtype: Optional[str] = None,
owners: Optional[OwnersInputType] = None,
tags: Optional[TagsInputType] = None,
terms: Optional[TermsInputType] = None,
domain: Optional[DomainInputType] = None,
):
if isinstance(container_key, ContainerUrn):
urn = container_key
else:
urn = ContainerUrn.from_string(container_key.as_urn())
super().__init__(urn)
# This needs to come first to ensure that the display name is registered.
self._ensure_container_props(name=display_name)
# TODO: Normal usages should require container key. Only the graph init method can accept an urn.
if isinstance(container_key, ContainerKey):
self._set_platform_instance(container_key.platform, container_key.instance)
self._set_container(container_key.parent_key())
self.set_custom_properties(
{
**container_key.property_dict(),
**(extra_properties or {}),
}
)
# Extra validation on the env field.
# In certain cases (mainly for backwards compatibility), the env field will actually
# have a platform instance name.
env = container_key.env if container_key.env in ALL_ENV_TYPES else None
if _INCLUDE_ENV_IN_CONTAINER_PROPERTIES and env is not None:
self._ensure_container_props().env = env
if description is not None:
self.set_description(description)
if external_url is not None:
self.set_external_url(external_url)
if qualified_name is not None:
self.set_qualified_name(qualified_name)
if created is not None:
self.set_created(created)
if last_modified is not None:
self.set_last_modified(last_modified)
if subtype is not None:
self.set_subtype(subtype)
if owners is not None:
self.set_owners(owners)
if tags is not None:
self.set_tags(tags)
if terms is not None:
self.set_terms(terms)
if domain is not None:
self.set_domain(domain)
@classmethod
def _new_from_graph(cls, urn: Urn, current_aspects: models.AspectBag) -> Self:
assert isinstance(urn, ContainerUrn)
entity = cls(urn, display_name="__dummy_value__")
return entity._init_from_graph(current_aspects)
def _ensure_container_props(
self, *, name: Optional[str] = None
) -> models.ContainerPropertiesClass:
# TODO: Not super happy with this method's implementation, but it's
# internal-only and enforces the constraints that we need.
if name is not None:
return self._setdefault_aspect(models.ContainerPropertiesClass(name=name))
props = self._get_aspect(models.ContainerPropertiesClass)
if props is None:
raise SdkUsageError("Containers must have a name.")
return props
@property
def display_name(self) -> str:
return self._ensure_container_props().name
def set_display_name(self, value: str) -> None:
self._ensure_container_props().name = value
@property
def description(self) -> Optional[str]:
return self._ensure_container_props().description
def set_description(self, description: str) -> None:
self._ensure_container_props().description = description
@property
def custom_properties(self) -> Optional[Dict[str, str]]:
return self._ensure_container_props().customProperties
def set_custom_properties(self, custom_properties: Dict[str, str]) -> None:
# TODO: How do we ensure that the container key props are always retained?
self._ensure_container_props().customProperties = custom_properties
@property
def external_url(self) -> Optional[str]:
return self._ensure_container_props().externalUrl
def set_external_url(self, external_url: str) -> None:
self._ensure_container_props().externalUrl = external_url
@property
def qualified_name(self) -> Optional[str]:
return self._ensure_container_props().qualifiedName
def set_qualified_name(self, qualified_name: str) -> None:
self._ensure_container_props().qualifiedName = qualified_name
@property
def created(self) -> Optional[datetime]:
return parse_time_stamp(self._ensure_container_props().created)
def set_created(self, created: datetime) -> None:
self._ensure_container_props().created = make_time_stamp(created)
@property
def last_modified(self) -> Optional[datetime]:
return parse_time_stamp(self._ensure_container_props().lastModified)
def set_last_modified(self, last_modified: datetime) -> None:
self._ensure_container_props().lastModified = make_time_stamp(last_modified)

View File

@ -0,0 +1,584 @@
from __future__ import annotations
import warnings
from datetime import datetime
from typing import Dict, List, Optional, Tuple, Type, Union
from typing_extensions import Self, TypeAlias, assert_never
import datahub.metadata.schema_classes as models
from datahub.cli.cli_utils import first_non_null
from datahub.emitter.mce_builder import DEFAULT_ENV
from datahub.errors import (
IngestionAttributionWarning,
ItemNotFoundError,
SchemaFieldKeyError,
)
from datahub.ingestion.source.sql.sql_types import resolve_sql_type
from datahub.metadata.urns import DatasetUrn, SchemaFieldUrn, Urn
from datahub.sdk._attribution import is_ingestion_attribution
from datahub.sdk._entity import Entity
from datahub.sdk._shared import (
ContainerInputType,
DatasetUrnOrStr,
DomainInputType,
HasContainer,
HasDomain,
HasOwnership,
HasPlatformInstance,
HasSubtype,
HasTags,
HasTerms,
OwnersInputType,
TagsInputType,
TermsInputType,
make_time_stamp,
parse_time_stamp,
)
SchemaFieldInputType: TypeAlias = Union[
str,
Tuple[str, str], # (name, type)
Tuple[str, str, str], # (name, type, description)
models.SchemaFieldClass,
]
SchemaFieldsInputType: TypeAlias = Union[
List[SchemaFieldInputType],
models.SchemaMetadataClass,
]
UpstreamInputType: TypeAlias = Union[
# Dataset upstream variants.
DatasetUrnOrStr,
models.UpstreamClass,
# Column upstream variants.
models.FineGrainedLineageClass,
]
# Mapping of { downstream_column -> [upstream_columns] }
ColumnLineageMapping: TypeAlias = Dict[str, List[str]]
UpstreamLineageInputType: TypeAlias = Union[
models.UpstreamLineageClass,
List[UpstreamInputType],
# Combined variant.
# Map of { upstream_dataset -> { downstream_column -> [upstream_column] } }
Dict[DatasetUrnOrStr, ColumnLineageMapping],
]
def _parse_upstream_input(
upstream_input: UpstreamInputType,
) -> Union[models.UpstreamClass, models.FineGrainedLineageClass]:
if isinstance(upstream_input, models.UpstreamClass):
return upstream_input
elif isinstance(upstream_input, models.FineGrainedLineageClass):
return upstream_input
elif isinstance(upstream_input, (str, DatasetUrn)):
return models.UpstreamClass(
dataset=str(upstream_input),
type=models.DatasetLineageTypeClass.TRANSFORMED,
)
else:
assert_never(upstream_input)
def _parse_cll_mapping(
*,
upstream: DatasetUrnOrStr,
downstream: DatasetUrnOrStr,
cll_mapping: ColumnLineageMapping,
) -> List[models.FineGrainedLineageClass]:
cll = []
for downstream_column, upstream_columns in cll_mapping.items():
cll.append(
models.FineGrainedLineageClass(
upstreamType=models.FineGrainedLineageUpstreamTypeClass.FIELD_SET,
downstreamType=models.FineGrainedLineageDownstreamTypeClass.FIELD,
upstreams=[
SchemaFieldUrn(upstream, upstream_column).urn()
for upstream_column in upstream_columns
],
downstreams=[SchemaFieldUrn(downstream, downstream_column).urn()],
)
)
return cll
def _parse_upstream_lineage_input(
upstream_input: UpstreamLineageInputType, downstream_urn: DatasetUrn
) -> models.UpstreamLineageClass:
if isinstance(upstream_input, models.UpstreamLineageClass):
return upstream_input
elif isinstance(upstream_input, list):
upstreams = [_parse_upstream_input(upstream) for upstream in upstream_input]
# Partition into table and column lineages.
tll = [
upstream
for upstream in upstreams
if isinstance(upstream, models.UpstreamClass)
]
cll = [
upstream
for upstream in upstreams
if not isinstance(upstream, models.UpstreamClass)
]
# TODO: check that all things in cll are also in tll
return models.UpstreamLineageClass(upstreams=tll, fineGrainedLineages=cll)
elif isinstance(upstream_input, dict):
tll = []
cll = []
for dataset_urn, column_lineage in upstream_input.items():
tll.append(
models.UpstreamClass(
dataset=str(dataset_urn),
type=models.DatasetLineageTypeClass.TRANSFORMED,
)
)
cll.extend(
_parse_cll_mapping(
upstream=dataset_urn,
downstream=downstream_urn,
cll_mapping=column_lineage,
)
)
return models.UpstreamLineageClass(upstreams=tll, fineGrainedLineages=cll)
else:
assert_never(upstream_input)
class SchemaField:
__slots__ = ("_parent", "_field_path")
def __init__(self, parent: Dataset, field_path: str):
self._parent = parent
self._field_path = field_path
def _base_schema_field(self) -> models.SchemaFieldClass:
# This must exist - if it doesn't, we've got a larger bug.
schema_dict = self._parent._schema_dict()
return schema_dict[self._field_path]
def _get_editable_schema_field(
self,
) -> Optional[models.EditableSchemaFieldInfoClass]:
# This method does not make any mutations.
editable_schema = self._parent._get_aspect(models.EditableSchemaMetadataClass)
if editable_schema is None:
return None
for field in editable_schema.editableSchemaFieldInfo:
if field.fieldPath == self._field_path:
return field
return None
def _ensure_editable_schema_field(self) -> models.EditableSchemaFieldInfoClass:
if is_ingestion_attribution():
warnings.warn(
"This method should not be used in ingestion mode.",
IngestionAttributionWarning,
stacklevel=2,
)
editable_schema = self._parent._setdefault_aspect(
models.EditableSchemaMetadataClass(editableSchemaFieldInfo=[])
)
for field in editable_schema.editableSchemaFieldInfo:
if field.fieldPath == self._field_path:
return field
# If we don't have an entry for this field yet, create one.
field = models.EditableSchemaFieldInfoClass(fieldPath=self._field_path)
editable_schema.editableSchemaFieldInfo.append(field)
return field
@property
def field_path(self) -> str:
return self._field_path
@property
def mapped_type(self) -> models.SchemaFieldDataTypeClass:
return self._base_schema_field().type
@property
def native_type(self) -> str:
return self._base_schema_field().nativeDataType
# TODO expose nullability and primary/foreign key details
@property
def description(self) -> Optional[str]:
editable_field = self._get_editable_schema_field()
return first_non_null(
[
editable_field.description if editable_field is not None else None,
self._base_schema_field().description,
]
)
def set_description(self, description: str) -> None:
if is_ingestion_attribution():
editable_field = self._get_editable_schema_field()
if editable_field and editable_field.description is not None:
warnings.warn(
"The field description will be hidden by UI-based edits. "
"Change the edit mode to OVERWRITE_UI to override this behavior.",
category=IngestionAttributionWarning,
stacklevel=2,
)
self._base_schema_field().description = description
else:
self._ensure_editable_schema_field().description = description
@property
def tags(self) -> Optional[List[models.TagAssociationClass]]:
# Tricky: if either has a non-null globalTags, this will not return None.
tags = None
if (base_tags := self._base_schema_field().globalTags) is not None:
tags = tags or []
tags.extend(base_tags.tags)
if editable_field := self._get_editable_schema_field():
if (editable_tags := editable_field.globalTags) is not None:
tags = tags or []
tags.extend(editable_tags.tags)
return tags
def set_tags(self, tags: TagsInputType) -> None:
parsed_tags = [self._parent._parse_tag_association_class(tag) for tag in tags]
if is_ingestion_attribution():
editable_field = self._get_editable_schema_field()
if editable_field and editable_field.globalTags:
warnings.warn(
"Overwriting non-ingestion tags from ingestion is an anti-pattern.",
category=IngestionAttributionWarning,
stacklevel=2,
)
editable_field.globalTags = None
self._base_schema_field().globalTags = models.GlobalTagsClass(
tags=parsed_tags
)
else:
base_field = self._base_schema_field()
if base_field.globalTags:
base_field.globalTags = None
self._ensure_editable_schema_field().globalTags = models.GlobalTagsClass(
tags=parsed_tags
)
@property
def terms(self) -> Optional[List[models.GlossaryTermAssociationClass]]:
# TODO: Basically the same implementation as tags - can we share code?
terms = None
if (base_terms := self._base_schema_field().glossaryTerms) is not None:
terms = terms or []
terms.extend(base_terms.terms)
if editable_field := self._get_editable_schema_field():
if (editable_terms := editable_field.glossaryTerms) is not None:
terms = terms or []
terms.extend(editable_terms.terms)
return terms
def set_terms(self, terms: List[models.GlossaryTermAssociationClass]) -> None:
parsed_terms = [
self._parent._parse_glossary_term_association_class(term) for term in terms
]
if is_ingestion_attribution():
editable_field = self._get_editable_schema_field()
if editable_field and editable_field.glossaryTerms:
warnings.warn(
"Overwriting non-ingestion terms from ingestion is an anti-pattern.",
category=IngestionAttributionWarning,
stacklevel=2,
)
editable_field.glossaryTerms = None
self._base_schema_field().glossaryTerms = models.GlossaryTermsClass(
terms=parsed_terms,
auditStamp=self._parent._terms_audit_stamp(),
)
else:
base_field = self._base_schema_field()
if base_field.glossaryTerms:
base_field.glossaryTerms = None
self._ensure_editable_schema_field().glossaryTerms = (
models.GlossaryTermsClass(
terms=parsed_terms,
auditStamp=self._parent._terms_audit_stamp(),
)
)
class Dataset(
HasPlatformInstance,
HasSubtype,
HasContainer,
HasOwnership,
HasTags,
HasTerms,
HasDomain,
Entity,
):
__slots__ = ()
@classmethod
def get_urn_type(cls) -> Type[DatasetUrn]:
return DatasetUrn
def __init__(
self,
*,
# Identity.
platform: str,
name: str,
platform_instance: Optional[str] = None,
env: str = DEFAULT_ENV,
# Dataset properties.
description: Optional[str] = None,
display_name: Optional[str] = None,
qualified_name: Optional[str] = None,
external_url: Optional[str] = None,
custom_properties: Optional[Dict[str, str]] = None,
created: Optional[datetime] = None,
last_modified: Optional[datetime] = None,
# Standard aspects.
subtype: Optional[str] = None,
container: Optional[ContainerInputType] = None,
owners: Optional[OwnersInputType] = None,
tags: Optional[TagsInputType] = None,
terms: Optional[TermsInputType] = None,
# TODO structured_properties
domain: Optional[DomainInputType] = None,
# Dataset-specific aspects.
schema: Optional[SchemaFieldsInputType] = None,
upstreams: Optional[models.UpstreamLineageClass] = None,
):
urn = DatasetUrn.create_from_ids(
platform_id=platform,
table_name=name,
platform_instance=platform_instance,
env=env,
)
super().__init__(urn)
self._set_platform_instance(urn.platform, platform_instance)
if schema is not None:
self._set_schema(schema)
if upstreams is not None:
self.set_upstreams(upstreams)
if description is not None:
self.set_description(description)
if display_name is not None:
self.set_display_name(display_name)
if qualified_name is not None:
self.set_qualified_name(qualified_name)
if external_url is not None:
self.set_external_url(external_url)
if custom_properties is not None:
self.set_custom_properties(custom_properties)
if created is not None:
self.set_created(created)
if last_modified is not None:
self.set_last_modified(last_modified)
if subtype is not None:
self.set_subtype(subtype)
if container is not None:
self._set_container(container)
if owners is not None:
self.set_owners(owners)
if tags is not None:
self.set_tags(tags)
if terms is not None:
self.set_terms(terms)
if domain is not None:
self.set_domain(domain)
@classmethod
def _new_from_graph(cls, urn: Urn, current_aspects: models.AspectBag) -> Self:
assert isinstance(urn, DatasetUrn)
entity = cls(
platform=urn.platform,
name=urn.name,
env=urn.env,
)
return entity._init_from_graph(current_aspects)
@property
def urn(self) -> DatasetUrn:
return self._urn # type: ignore
def _ensure_dataset_props(self) -> models.DatasetPropertiesClass:
return self._setdefault_aspect(models.DatasetPropertiesClass())
def _get_editable_props(self) -> Optional[models.EditableDatasetPropertiesClass]:
return self._get_aspect(models.EditableDatasetPropertiesClass)
def _ensure_editable_props(self) -> models.EditableDatasetPropertiesClass:
# Note that most of the fields in this aspect are not used.
# The only one that's relevant for us is the description.
return self._setdefault_aspect(models.EditableDatasetPropertiesClass())
@property
def description(self) -> Optional[str]:
editable_props = self._get_editable_props()
return first_non_null(
[
editable_props.description if editable_props is not None else None,
self._ensure_dataset_props().description,
]
)
def set_description(self, description: str) -> None:
if is_ingestion_attribution():
editable_props = self._get_editable_props()
if editable_props is not None and editable_props.description is not None:
warnings.warn(
"Overwriting non-ingestion description from ingestion is an anti-pattern.",
category=IngestionAttributionWarning,
stacklevel=2,
)
# Force the ingestion description to show up.
editable_props.description = None
self._ensure_dataset_props().description = description
else:
self._ensure_editable_props().description = description
@property
def display_name(self) -> Optional[str]:
return self._ensure_dataset_props().name
def set_display_name(self, display_name: str) -> None:
self._ensure_dataset_props().name = display_name
@property
def qualified_name(self) -> Optional[str]:
return self._ensure_dataset_props().qualifiedName
def set_qualified_name(self, qualified_name: str) -> None:
self._ensure_dataset_props().qualifiedName = qualified_name
@property
def external_url(self) -> Optional[str]:
return self._ensure_dataset_props().externalUrl
def set_external_url(self, external_url: str) -> None:
self._ensure_dataset_props().externalUrl = external_url
@property
def custom_properties(self) -> Dict[str, str]:
return self._ensure_dataset_props().customProperties
def set_custom_properties(self, custom_properties: Dict[str, str]) -> None:
self._ensure_dataset_props().customProperties = custom_properties
@property
def created(self) -> Optional[datetime]:
return parse_time_stamp(self._ensure_dataset_props().created)
def set_created(self, created: datetime) -> None:
self._ensure_dataset_props().created = make_time_stamp(created)
@property
def last_modified(self) -> Optional[datetime]:
return parse_time_stamp(self._ensure_dataset_props().lastModified)
def set_last_modified(self, last_modified: datetime) -> None:
self._ensure_dataset_props().lastModified = make_time_stamp(last_modified)
def _schema_dict(self) -> Dict[str, models.SchemaFieldClass]:
schema_metadata = self._get_aspect(models.SchemaMetadataClass)
if schema_metadata is None:
raise ItemNotFoundError(f"Schema is not set for dataset {self.urn}")
return {field.fieldPath: field for field in schema_metadata.fields}
@property
def schema(self) -> List[SchemaField]:
# TODO: Add some caching here to avoid iterating over the schema every time.
schema_dict = self._schema_dict()
return [SchemaField(self, field_path) for field_path in schema_dict]
def _parse_schema_field_input(
self, schema_field_input: SchemaFieldInputType
) -> models.SchemaFieldClass:
if isinstance(schema_field_input, models.SchemaFieldClass):
return schema_field_input
elif isinstance(schema_field_input, tuple):
# Support (name, type) and (name, type, description) forms
if len(schema_field_input) == 2:
name, field_type = schema_field_input
description = None
elif len(schema_field_input) == 3:
name, field_type, description = schema_field_input
else:
assert_never(schema_field_input)
return models.SchemaFieldClass(
fieldPath=name,
type=models.SchemaFieldDataTypeClass(
resolve_sql_type(
field_type,
platform=self.urn.get_data_platform_urn().platform_name,
)
or models.NullTypeClass()
),
nativeDataType=field_type,
description=description,
)
elif isinstance(schema_field_input, str):
# TODO: Not sure this branch makes sense - we should probably just require types?
return models.SchemaFieldClass(
fieldPath=schema_field_input,
type=models.SchemaFieldDataTypeClass(models.NullTypeClass()),
nativeDataType="unknown",
description=None,
)
else:
assert_never(schema_field_input)
def _set_schema(self, schema: SchemaFieldsInputType) -> None:
# This method is not public. Ingestion/restatement users should be setting
# the schema via the constructor. SDK users that got a dataset from the graph
# probably shouldn't be adding/removing fields ad-hoc. The field-level mutators
# can be used instead.
if isinstance(schema, models.SchemaMetadataClass):
self._set_aspect(schema)
else:
parsed_schema = [self._parse_schema_field_input(field) for field in schema]
self._set_aspect(
models.SchemaMetadataClass(
fields=parsed_schema,
# The rest of these fields are not used, and so we can set them to dummy/default values.
schemaName="",
platform=self.urn.platform,
version=0,
hash="",
platformSchema=models.SchemalessClass(),
)
)
def __getitem__(self, field_path: str) -> SchemaField:
# TODO: Automatically deal with field path v2?
schema_dict = self._schema_dict()
if field_path not in schema_dict:
raise SchemaFieldKeyError(f"Field {field_path} not found in schema")
return SchemaField(self, field_path)
@property
def upstreams(self) -> Optional[models.UpstreamLineageClass]:
return self._get_aspect(models.UpstreamLineageClass)
def set_upstreams(self, upstreams: UpstreamLineageInputType) -> None:
self._set_aspect(_parse_upstream_lineage_input(upstreams, self.urn))

View File

@ -0,0 +1,115 @@
from __future__ import annotations
import warnings
from typing import TYPE_CHECKING, Union, overload
import datahub.metadata.schema_classes as models
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_patch_builder import MetadataPatchProposal
from datahub.errors import IngestionAttributionWarning, ItemNotFoundError, SdkUsageError
from datahub.ingestion.graph.client import DataHubGraph
from datahub.metadata.urns import (
ContainerUrn,
DatasetUrn,
Urn,
)
from datahub.sdk._all_entities import ENTITY_CLASSES
from datahub.sdk._entity import Entity
from datahub.sdk._shared import UrnOrStr
from datahub.sdk.container import Container
from datahub.sdk.dataset import Dataset
if TYPE_CHECKING:
from datahub.sdk.main_client import DataHubClient
class EntityClient:
def __init__(self, client: DataHubClient):
self._client = client
# TODO: Make all of these methods sync by default.
@property
def _graph(self) -> DataHubGraph:
return self._client._graph
@overload
def get(self, urn: ContainerUrn) -> Container: ...
@overload
def get(self, urn: DatasetUrn) -> Dataset: ...
@overload
def get(self, urn: Union[Urn, str]) -> Entity: ...
def get(self, urn: UrnOrStr) -> Entity:
if not isinstance(urn, Urn):
urn = Urn.from_string(urn)
# TODO: add error handling around this with a suggested alternative if not yet supported
EntityClass = ENTITY_CLASSES[urn.entity_type]
if not self._graph.exists(str(urn)):
raise ItemNotFoundError(f"Entity {urn} not found")
aspects = self._graph.get_entity_semityped(str(urn))
# TODO: save the timestamp so we can use If-Unmodified-Since on the updates
return EntityClass._new_from_graph(urn, aspects)
def create(self, entity: Entity) -> None:
mcps = []
if self._graph.exists(str(entity.urn)):
raise SdkUsageError(
f"Entity {entity.urn} already exists. Use client.entities.upsert() to update it."
)
# Extra safety check: by putting this first, we can ensure that
# the request fails if the entity already exists.
mcps.append(
MetadataChangeProposalWrapper(
entityUrn=str(entity.urn),
aspect=entity.urn.to_key_aspect(),
changeType=models.ChangeTypeClass.CREATE_ENTITY,
)
)
mcps.extend(entity._as_mcps(models.ChangeTypeClass.CREATE))
self._graph.emit_mcps(mcps)
def upsert(self, entity: Entity) -> None:
if entity._prev_aspects is None and self._graph.exists(str(entity.urn)):
warnings.warn(
f"The entity {entity.urn} already exists. This operation will partially overwrite the existing entity.",
IngestionAttributionWarning,
stacklevel=2,
)
# TODO: If there are no previous aspects but the entity exists, should we delete aspects that are not present here?
mcps = entity._as_mcps(models.ChangeTypeClass.UPSERT)
self._graph.emit_mcps(mcps)
def update(self, entity: Union[Entity, MetadataPatchProposal]) -> None:
if isinstance(entity, MetadataPatchProposal):
return self._update_patch(entity)
if entity._prev_aspects is None:
raise SdkUsageError(
f"For entities created via {entity.__class__.__name__}(...), use client.entities.create() or client.entities.upsert() instead"
)
# TODO: respect If-Unmodified-Since?
# -> probably add a "mode" parameter that can be "update" (e.g. if not modified) or "update_force"
mcps = entity._as_mcps(models.ChangeTypeClass.UPSERT)
self._graph.emit_mcps(mcps)
def _update_patch(
self, updater: MetadataPatchProposal, check_exists: bool = True
) -> None:
if check_exists and not self._graph.exists(updater.urn):
raise SdkUsageError(
f"Entity {updater.urn} does not exist, and hence cannot be updated. "
"You can bypass this check by setting check_exists=False."
)
mcps = updater.build()
self._graph.emit_mcps(mcps)

View File

@ -0,0 +1,56 @@
from __future__ import annotations
from typing import Optional, overload
from datahub.errors import SdkUsageError
from datahub.ingestion.graph.client import DataHubGraph, get_default_graph
from datahub.ingestion.graph.config import DatahubClientConfig
from datahub.sdk.entity_client import EntityClient
from datahub.sdk.resolver_client import ResolverClient
class DataHubClient:
@overload
def __init__(self, *, server: str, token: Optional[str] = None): ...
@overload
def __init__(self, *, config: DatahubClientConfig): ...
@overload
def __init__(self, *, graph: DataHubGraph): ...
def __init__(
self,
*,
server: Optional[str] = None,
token: Optional[str] = None,
graph: Optional[DataHubGraph] = None,
config: Optional[DatahubClientConfig] = None,
):
if server is not None:
if config is not None:
raise SdkUsageError("Cannot specify both server and config")
if graph is not None:
raise SdkUsageError("Cannot specify both server and graph")
graph = DataHubGraph(config=DatahubClientConfig(server=server, token=token))
elif config is not None:
if graph is not None:
raise SdkUsageError("Cannot specify both config and graph")
graph = DataHubGraph(config=config)
elif graph is None:
raise SdkUsageError("Must specify either server, config, or graph")
self._graph = graph
@classmethod
def from_env(cls) -> "DataHubClient":
# Inspired by the DockerClient.from_env() method.
# TODO: This one also reads from ~/.datahubenv, so the "from_env" name might be a bit confusing.
# That file is part of the "environment", but is not a traditional "env variable".
graph = get_default_graph()
return cls(graph=graph)
@property
def entities(self) -> EntityClient:
return EntityClient(self)
@property
def resolve(self) -> ResolverClient:
return ResolverClient(self)

View File

@ -0,0 +1,101 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Optional, overload
from datahub.errors import ItemNotFoundError, MultipleItemsFoundError, SdkUsageError
from datahub.ingestion.graph.client import DataHubGraph
from datahub.metadata.urns import (
CorpUserUrn,
DomainUrn,
GlossaryTermUrn,
)
if TYPE_CHECKING:
from datahub.sdk.main_client import DataHubClient
class ResolverClient:
def __init__(self, client: DataHubClient):
self._client = client
# TODO: add caching to this method
@property
def _graph(self) -> DataHubGraph:
return self._client._graph
def domain(self, *, name: str) -> DomainUrn:
urn_str = self._graph.get_domain_urn_by_name(name)
if urn_str is None:
raise ItemNotFoundError(f"Domain with name {name} not found")
return DomainUrn.from_string(urn_str)
@overload
def user(self, *, name: str) -> CorpUserUrn: ...
@overload
def user(self, *, email: str) -> CorpUserUrn: ...
def user(
self, *, name: Optional[str] = None, email: Optional[str] = None
) -> CorpUserUrn:
filter_explanation: str
filters = []
if name is not None:
if email is not None:
raise SdkUsageError("Cannot specify both name and email for auto_user")
# TODO: do we filter on displayName or fullName?
filter_explanation = f"with name {name}"
filters.append(
{
"field": "fullName",
"values": [name],
"condition": "EQUAL",
}
)
elif email is not None:
filter_explanation = f"with email {email}"
filters.append(
{
"field": "email",
"values": [email],
"condition": "EQUAL",
}
)
else:
raise SdkUsageError("Must specify either name or email for auto_user")
users = list(
self._graph.get_urns_by_filter(
entity_types=[CorpUserUrn.ENTITY_TYPE],
extraFilters=filters,
)
)
if len(users) == 0:
# TODO: In auto methods, should we just create the user/domain/etc if it doesn't exist?
raise ItemNotFoundError(f"User {filter_explanation} not found")
elif len(users) > 1:
raise MultipleItemsFoundError(
f"Multiple users found {filter_explanation}: {users}"
)
else:
return CorpUserUrn.from_string(users[0])
def term(self, *, name: str) -> GlossaryTermUrn:
# TODO: Add some limits on the graph fetch
terms = list(
self._graph.get_urns_by_filter(
entity_types=[GlossaryTermUrn.ENTITY_TYPE],
extraFilters=[
{
"field": "id",
"values": [name],
"condition": "EQUAL",
}
],
)
)
if len(terms) == 0:
raise ItemNotFoundError(f"Term with name {name} not found")
elif len(terms) > 1:
raise SdkUsageError(f"Multiple terms found with name {name}: {terms}")
else:
return GlossaryTermUrn.from_string(terms[0])

View File

@ -0,0 +1,91 @@
[
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,proj.dataset.table,PROD)",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:bigquery"
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,proj.dataset.table,PROD)",
"changeType": "UPSERT",
"aspectName": "schemaMetadata",
"aspect": {
"json": {
"schemaName": "",
"platform": "urn:li:dataPlatform:bigquery",
"version": 0,
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"hash": "",
"platformSchema": {
"com.linkedin.schema.Schemaless": {}
},
"fields": [
{
"fieldPath": "field1",
"nullable": false,
"description": "field1 description",
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "string",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "field2",
"nullable": false,
"description": "field2 description",
"type": {
"type": {
"com.linkedin.schema.NullType": {}
}
},
"nativeDataType": "int64",
"recursive": false,
"isPartOfKey": false
}
]
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,proj.dataset.table,PROD)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Table"
]
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,proj.dataset.table,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"json": {
"customProperties": {},
"tags": []
}
}
}
]

View File

@ -0,0 +1,275 @@
[
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:snowflake",
"instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:snowflake,my_instance)"
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)",
"changeType": "UPSERT",
"aspectName": "schemaMetadata",
"aspect": {
"json": {
"schemaName": "",
"platform": "urn:li:dataPlatform:snowflake",
"version": 0,
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"hash": "",
"platformSchema": {
"com.linkedin.schema.Schemaless": {}
},
"fields": [
{
"fieldPath": "field1",
"nullable": false,
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "string",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "field2",
"nullable": false,
"description": "field2 description",
"type": {
"type": {
"com.linkedin.schema.NullType": {}
}
},
"nativeDataType": "int64",
"recursive": false,
"isPartOfKey": false
}
]
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)",
"changeType": "UPSERT",
"aspectName": "editableDatasetProperties",
"aspect": {
"json": {
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"description": "test"
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"json": {
"customProperties": {
"key1": "value1",
"key2": "value2"
},
"externalUrl": "https://example.com",
"name": "MY_TABLE",
"qualifiedName": "MY_DB.MY_SCHEMA.MY_TABLE",
"created": {
"time": 1735787045000
},
"lastModified": {
"time": 1736391846000
},
"tags": []
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Table"
]
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:66c5ac35f0bfc521dee6f7d9533a8056"
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:container:37d6500021cda2a0aa7ae1900eab5a9c",
"urn": "urn:li:container:37d6500021cda2a0aa7ae1900eab5a9c"
},
{
"id": "urn:li:container:66c5ac35f0bfc521dee6f7d9533a8056",
"urn": "urn:li:container:66c5ac35f0bfc521dee6f7d9533a8056"
}
]
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)",
"changeType": "UPSERT",
"aspectName": "ownership",
"aspect": {
"json": {
"owners": [
{
"owner": "urn:li:corpuser:admin@datahubproject.io",
"type": "TECHNICAL_OWNER"
}
],
"ownerTypes": {},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
}
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)",
"changeType": "UPSERT",
"aspectName": "globalTags",
"aspect": {
"json": {
"tags": [
{
"tag": "urn:li:tag:tag1"
},
{
"tag": "urn:li:tag:tag2"
}
]
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)",
"changeType": "UPSERT",
"aspectName": "glossaryTerms",
"aspect": {
"json": {
"terms": [
{
"urn": "urn:li:glossaryTerm:AccountBalance"
}
],
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:__ingestion"
}
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)",
"changeType": "UPSERT",
"aspectName": "domains",
"aspect": {
"json": {
"domains": [
"urn:li:domain:Marketing"
]
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)",
"changeType": "UPSERT",
"aspectName": "editableSchemaMetadata",
"aspect": {
"json": {
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"editableSchemaFieldInfo": [
{
"fieldPath": "field1",
"description": "field1 description",
"globalTags": {
"tags": [
{
"tag": "urn:li:tag:field1_tag1"
},
{
"tag": "urn:li:tag:field1_tag2"
}
]
}
},
{
"fieldPath": "field2",
"glossaryTerms": {
"terms": [
{
"urn": "urn:li:glossaryTerm:field2_term1"
},
{
"urn": "urn:li:glossaryTerm:field2_term2"
}
],
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:__ingestion"
}
}
}
]
}
}
}
]

View File

@ -0,0 +1,231 @@
[
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:snowflake",
"instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:snowflake,my_instance)"
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)",
"changeType": "UPSERT",
"aspectName": "schemaMetadata",
"aspect": {
"json": {
"schemaName": "",
"platform": "urn:li:dataPlatform:snowflake",
"version": 0,
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"hash": "",
"platformSchema": {
"com.linkedin.schema.Schemaless": {}
},
"fields": [
{
"fieldPath": "field1",
"nullable": false,
"description": "field1 description",
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "string",
"recursive": false,
"globalTags": {
"tags": [
{
"tag": "urn:li:tag:field1_tag1"
},
{
"tag": "urn:li:tag:field1_tag2"
}
]
},
"isPartOfKey": false
},
{
"fieldPath": "field2",
"nullable": false,
"description": "field2 description",
"type": {
"type": {
"com.linkedin.schema.NullType": {}
}
},
"nativeDataType": "int64",
"recursive": false,
"glossaryTerms": {
"terms": [
{
"urn": "urn:li:glossaryTerm:field2_term1"
},
{
"urn": "urn:li:glossaryTerm:field2_term2"
}
],
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:__ingestion"
}
},
"isPartOfKey": false
}
]
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"json": {
"customProperties": {
"key1": "value1",
"key2": "value2"
},
"externalUrl": "https://example.com",
"name": "MY_TABLE",
"qualifiedName": "MY_DB.MY_SCHEMA.MY_TABLE",
"description": "test",
"created": {
"time": 1735787045000
},
"lastModified": {
"time": 1736391846000
},
"tags": []
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Table"
]
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:66c5ac35f0bfc521dee6f7d9533a8056"
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:container:37d6500021cda2a0aa7ae1900eab5a9c",
"urn": "urn:li:container:37d6500021cda2a0aa7ae1900eab5a9c"
},
{
"id": "urn:li:container:66c5ac35f0bfc521dee6f7d9533a8056",
"urn": "urn:li:container:66c5ac35f0bfc521dee6f7d9533a8056"
}
]
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)",
"changeType": "UPSERT",
"aspectName": "ownership",
"aspect": {
"json": {
"owners": [
{
"owner": "urn:li:corpuser:admin@datahubproject.io",
"type": "TECHNICAL_OWNER"
}
],
"ownerTypes": {},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
}
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)",
"changeType": "UPSERT",
"aspectName": "globalTags",
"aspect": {
"json": {
"tags": [
{
"tag": "urn:li:tag:tag1"
},
{
"tag": "urn:li:tag:tag2"
}
]
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)",
"changeType": "UPSERT",
"aspectName": "glossaryTerms",
"aspect": {
"json": {
"terms": [
{
"urn": "urn:li:glossaryTerm:AccountBalance"
}
],
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:__ingestion"
}
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_instance.my_db.my_schema.my_table,PROD)",
"changeType": "UPSERT",
"aspectName": "domains",
"aspect": {
"json": {
"domains": [
"urn:li:domain:Marketing"
]
}
}
}
]

View File

@ -0,0 +1,52 @@
[
{
"entityType": "container",
"entityUrn": "urn:li:container:ff645a51ee3284db4355c6b1b7aec6de",
"changeType": "UPSERT",
"aspectName": "containerProperties",
"aspect": {
"json": {
"customProperties": {
"platform": "snowflake",
"database": "test_db"
},
"name": "test_db"
}
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:ff645a51ee3284db4355c6b1b7aec6de",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:snowflake"
}
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:ff645a51ee3284db4355c6b1b7aec6de",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": []
}
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:ff645a51ee3284db4355c6b1b7aec6de",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Database"
]
}
}
}
]

View File

@ -0,0 +1,69 @@
[
{
"entityType": "container",
"entityUrn": "urn:li:container:07540cbc78ed6aff6882a43442d7e8c6",
"changeType": "UPSERT",
"aspectName": "containerProperties",
"aspect": {
"json": {
"customProperties": {
"platform": "snowflake",
"database": "test_db",
"schema": "test_schema"
},
"name": "test_schema"
}
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:07540cbc78ed6aff6882a43442d7e8c6",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:snowflake"
}
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:07540cbc78ed6aff6882a43442d7e8c6",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:ff645a51ee3284db4355c6b1b7aec6de"
}
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:07540cbc78ed6aff6882a43442d7e8c6",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:container:ff645a51ee3284db4355c6b1b7aec6de",
"urn": "urn:li:container:ff645a51ee3284db4355c6b1b7aec6de"
}
]
}
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:07540cbc78ed6aff6882a43442d7e8c6",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Schema"
]
}
}
}
]

View File

@ -0,0 +1,142 @@
[
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)",
"changeType": "CREATE_ENTITY",
"aspectName": "datasetKey",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:snowflake",
"name": "test_db.test_schema.table_1",
"origin": "PROD"
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)",
"changeType": "CREATE",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:snowflake"
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)",
"changeType": "CREATE",
"aspectName": "schemaMetadata",
"aspect": {
"json": {
"schemaName": "",
"platform": "urn:li:dataPlatform:snowflake",
"version": 0,
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"hash": "",
"platformSchema": {
"com.linkedin.schema.Schemaless": {}
},
"fields": [
{
"fieldPath": "col1",
"nullable": false,
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "string",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "col2",
"nullable": false,
"type": {
"type": {
"com.linkedin.schema.NumberType": {}
}
},
"nativeDataType": "int",
"recursive": false,
"isPartOfKey": false
}
]
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)",
"changeType": "CREATE",
"aspectName": "editableDatasetProperties",
"aspect": {
"json": {
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"description": "test description"
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)",
"changeType": "CREATE",
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:07540cbc78ed6aff6882a43442d7e8c6"
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)",
"changeType": "CREATE",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:container:ff645a51ee3284db4355c6b1b7aec6de",
"urn": "urn:li:container:ff645a51ee3284db4355c6b1b7aec6de"
},
{
"id": "urn:li:container:07540cbc78ed6aff6882a43442d7e8c6",
"urn": "urn:li:container:07540cbc78ed6aff6882a43442d7e8c6"
}
]
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)",
"changeType": "CREATE",
"aspectName": "globalTags",
"aspect": {
"json": {
"tags": [
{
"tag": "urn:li:tag:tag1"
}
]
}
}
}
]

View File

@ -0,0 +1,45 @@
[
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:snowflake"
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"json": {
"customProperties": {},
"description": "original description",
"tags": []
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)",
"changeType": "UPSERT",
"aspectName": "editableDatasetProperties",
"aspect": {
"json": {
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"description": "updated description"
}
}
}
]

View File

@ -0,0 +1,55 @@
from unittest.mock import Mock
import pytest
from datahub.errors import ItemNotFoundError, MultipleItemsFoundError, SdkUsageError
from datahub.ingestion.graph.client import DataHubGraph
from datahub.ingestion.graph.config import DatahubClientConfig
from datahub.metadata.urns import CorpUserUrn
from datahub.sdk.main_client import DataHubClient
@pytest.fixture
def mock_graph() -> Mock:
graph = Mock(spec=DataHubGraph)
graph.exists.return_value = False
return graph
def test_client_creation(mock_graph: Mock) -> None:
assert DataHubClient(graph=mock_graph)
assert DataHubClient(server="https://example.com", token="token")
def test_client_init_errors(mock_graph: Mock) -> None:
config = DatahubClientConfig(server="https://example.com", token="token")
with pytest.raises(SdkUsageError):
DataHubClient(server="https://example.com", graph=mock_graph) # type: ignore
with pytest.raises(SdkUsageError):
DataHubClient(server="https://example.com", config=config) # type: ignore
with pytest.raises(SdkUsageError):
DataHubClient(config=config, graph=mock_graph) # type: ignore
with pytest.raises(SdkUsageError):
DataHubClient() # type: ignore
def test_resolve_user(mock_graph: Mock) -> None:
client = DataHubClient(graph=mock_graph)
# This test doesn't really validate the graphql query or vars.
# It probably makes more sense to test via smoke-tests.
mock_graph.get_urns_by_filter.return_value = []
with pytest.raises(ItemNotFoundError):
client.resolve.user(name="User")
mock_graph.get_urns_by_filter.return_value = ["urn:li:corpuser:user"]
assert client.resolve.user(name="User") == CorpUserUrn("urn:li:corpuser:user")
mock_graph.get_urns_by_filter.return_value = [
"urn:li:corpuser:user",
"urn:li:corpuser:user2",
]
with pytest.raises(MultipleItemsFoundError):
client.resolve.user(name="User")

View File

@ -0,0 +1,189 @@
import pathlib
from datetime import datetime, timezone
import pytest
from datahub.emitter.mcp_builder import SchemaKey
from datahub.errors import SchemaFieldKeyError
from datahub.ingestion.source.common.subtypes import DatasetSubTypes
from datahub.metadata.urns import (
CorpUserUrn,
DatasetUrn,
DomainUrn,
GlossaryTermUrn,
TagUrn,
)
from datahub.sdk._attribution import KnownAttribution, change_default_attribution
from datahub.sdk._entity import Entity
from datahub.sdk.dataset import Dataset
from tests.test_helpers import mce_helpers
_GOLDEN_DIR = pathlib.Path(__file__).parent / "dataset_golden"
def assert_entity_golden(
pytestconfig: pytest.Config, entity: Entity, golden_path: pathlib.Path
) -> None:
mce_helpers.check_goldens_stream(
pytestconfig=pytestconfig,
outputs=entity._as_mcps(),
golden_path=golden_path,
ignore_order=False,
)
def test_dataset_basic(pytestconfig: pytest.Config) -> None:
d = Dataset(
platform="bigquery",
name="proj.dataset.table",
subtype=DatasetSubTypes.TABLE,
schema=[
("field1", "string", "field1 description"),
("field2", "int64", "field2 description"),
],
)
# Check urn setup.
assert Dataset.get_urn_type() == DatasetUrn
assert isinstance(d.urn, DatasetUrn)
assert (
str(d.urn)
== "urn:li:dataset:(urn:li:dataPlatform:bigquery,proj.dataset.table,PROD)"
)
assert str(d.urn) in repr(d)
# Check most attributes.
assert d.platform_instance is None
assert d.tags is None
assert d.terms is None
assert d.created is None
assert d.last_modified is None
assert d.description is None
assert d.custom_properties == {}
assert d.domain is None
# TODO: The column descriptions should go in the editable fields, since we're not in ingestion mode.
assert len(d.schema) == 2
assert d["field1"].description == "field1 description"
with pytest.raises(SchemaFieldKeyError, match=r"Field .* not found"):
d["should_be_missing"]
with pytest.raises(AttributeError):
assert d.extra_attribute # type: ignore
with pytest.raises(AttributeError):
d.extra_attribute = "slots should reject extra fields" # type: ignore
with pytest.raises(AttributeError):
# This should fail. Eventually we should make it suggest calling set_owners instead.
d.owners = [] # type: ignore
assert_entity_golden(
pytestconfig, d, _GOLDEN_DIR / "test_dataset_basic_golden.json"
)
def _build_complex_dataset() -> Dataset:
schema = SchemaKey(
platform="snowflake",
instance="my_instance",
database="MY_DB",
schema="MY_SCHEMA",
)
created = datetime(2025, 1, 2, 3, 4, 5, tzinfo=timezone.utc)
updated = datetime(2025, 1, 9, 3, 4, 6, tzinfo=timezone.utc)
d = Dataset(
platform="snowflake",
platform_instance="my_instance",
name="my_db.my_schema.my_table",
container=schema,
subtype=DatasetSubTypes.TABLE,
schema=[
("field1", "string"),
("field2", "int64", "field2 description"),
],
display_name="MY_TABLE",
qualified_name="MY_DB.MY_SCHEMA.MY_TABLE",
created=created,
last_modified=updated,
custom_properties={
"key1": "value1",
"key2": "value2",
},
description="test",
external_url="https://example.com",
owners=[
CorpUserUrn("admin@datahubproject.io"),
],
tags=[
TagUrn("tag1"),
TagUrn("tag2"),
],
terms=[
GlossaryTermUrn("AccountBalance"),
],
domain=DomainUrn("Marketing"),
)
assert d.platform_instance is not None
assert (
str(d.platform_instance)
== "urn:li:dataPlatformInstance:(urn:li:dataPlatform:snowflake,my_instance)"
)
assert d.subtype == "Table"
assert d.description == "test"
assert d.display_name == "MY_TABLE"
assert d.qualified_name == "MY_DB.MY_SCHEMA.MY_TABLE"
assert d.external_url == "https://example.com"
assert d.created == created
assert d.last_modified == updated
assert d.custom_properties == {"key1": "value1", "key2": "value2"}
# Check standard aspects.
assert d.domain == DomainUrn("Marketing")
assert d.tags is not None
assert len(d.tags) == 2
assert d.terms is not None
assert len(d.terms) == 1
assert d.owners is not None
assert len(d.owners) == 1
assert len(d.schema) == 2
# Schema field description.
assert d["field1"].description is None
assert d["field2"].description == "field2 description"
d["field1"].set_description("field1 description")
assert d["field1"].description == "field1 description"
# Schema field tags.
assert d["field1"].tags is None
d["field1"].set_tags([TagUrn("field1_tag1"), TagUrn("field1_tag2")])
assert d["field1"].tags is not None
assert len(d["field1"].tags) == 2
# Schema field terms.
assert d["field2"].terms is None
d["field2"].set_terms(
[GlossaryTermUrn("field2_term1"), GlossaryTermUrn("field2_term2")]
)
assert d["field2"].terms is not None
assert len(d["field2"].terms) == 2
return d
def test_dataset_complex(pytestconfig: pytest.Config) -> None:
d = _build_complex_dataset()
assert_entity_golden(
pytestconfig, d, _GOLDEN_DIR / "test_dataset_complex_golden.json"
)
def test_dataset_ingestion(pytestconfig: pytest.Config) -> None:
with change_default_attribution(KnownAttribution.INGESTION):
d = _build_complex_dataset()
assert_entity_golden(
pytestconfig, d, _GOLDEN_DIR / "test_dataset_ingestion_golden.json"
)

View File

@ -0,0 +1,142 @@
import pathlib
from unittest.mock import Mock
import pytest
import datahub.metadata.schema_classes as models
from datahub.emitter.mcp_builder import DatabaseKey, SchemaKey
from datahub.errors import ItemNotFoundError, SdkUsageError
from datahub.ingestion.graph.client import DataHubGraph
from datahub.metadata.urns import DatasetUrn, TagUrn
from datahub.sdk.container import Container
from datahub.sdk.dataset import Dataset
from datahub.sdk.main_client import DataHubClient
from tests.test_helpers import mce_helpers
_GOLDEN_DIR = pathlib.Path(__file__).parent / "entity_client_goldens"
@pytest.fixture
def mock_graph() -> Mock:
graph = Mock(spec=DataHubGraph)
graph.exists.return_value = False
return graph
@pytest.fixture
def client(mock_graph: Mock) -> DataHubClient:
return DataHubClient(graph=mock_graph)
def assert_client_golden(
pytestconfig: pytest.Config,
client: DataHubClient,
golden_path: pathlib.Path,
) -> None:
mcps = client._graph.emit_mcps.call_args[0][0] # type: ignore
mce_helpers.check_goldens_stream(
pytestconfig=pytestconfig,
outputs=mcps,
golden_path=golden_path,
ignore_order=False,
)
def test_container_creation_flow(
pytestconfig: pytest.Config, client: DataHubClient, mock_graph: Mock
) -> None:
# Create database and schema containers
db = DatabaseKey(platform="snowflake", database="test_db")
schema = SchemaKey(**db.dict(), schema="test_schema")
db_container = Container(db, display_name="test_db", subtype="Database")
schema_container = Container(schema, display_name="test_schema", subtype="Schema")
# Test database container creation
client.entities.upsert(db_container)
assert_client_golden(
pytestconfig, client, _GOLDEN_DIR / "test_container_db_golden.json"
)
# Test schema container creation
client.entities.upsert(schema_container)
assert_client_golden(
pytestconfig, client, _GOLDEN_DIR / "test_container_schema_golden.json"
)
def test_dataset_creation(
pytestconfig: pytest.Config, client: DataHubClient, mock_graph: Mock
) -> None:
schema = SchemaKey(platform="snowflake", database="test_db", schema="test_schema")
dataset = Dataset(
platform="snowflake",
name="test_db.test_schema.table_1",
env="prod",
container=schema,
schema=[
("col1", "string"),
("col2", "int"),
],
description="test description",
tags=[TagUrn("tag1")],
)
client.entities.create(dataset)
assert_client_golden(
pytestconfig, client, _GOLDEN_DIR / "test_dataset_creation_golden.json"
)
def test_dataset_read_modify_write(
pytestconfig: pytest.Config, client: DataHubClient, mock_graph: Mock
) -> None:
# Setup mock for existing dataset
mock_graph.exists.return_value = True
dataset_urn = DatasetUrn(
platform="snowflake", name="test_db.test_schema.table_1", env="prod"
)
# Mock the get_entity_semityped response with initial state
mock_graph.get_entity_semityped.return_value = {
"datasetProperties": models.DatasetPropertiesClass(
description="original description",
customProperties={},
tags=[],
)
}
# Get and update dataset
dataset = client.entities.get(dataset_urn)
dataset.set_description("updated description")
client.entities.update(dataset)
assert_client_golden(
pytestconfig, client, _GOLDEN_DIR / "test_dataset_update_golden.json"
)
def test_create_existing_dataset_fails(client: DataHubClient, mock_graph: Mock) -> None:
mock_graph.exists.return_value = True
dataset = Dataset(
platform="snowflake",
name="test_db.test_schema.table_1",
env="prod",
schema=[("col1", "string")],
)
with pytest.raises(SdkUsageError, match="Entity .* already exists"):
client.entities.create(dataset)
def test_get_nonexistent_dataset_fails(client: DataHubClient, mock_graph: Mock) -> None:
mock_graph.exists.return_value = False
dataset_urn = DatasetUrn(
platform="snowflake", name="test_db.test_schema.missing_table", env="prod"
)
with pytest.raises(ItemNotFoundError, match="Entity .* not found"):
client.entities.get(dataset_urn)