docs(sdk): add docstrings for some sdk classes (#12940)

This commit is contained in:
Harshal Sheth 2025-03-21 11:39:45 -07:00 committed by GitHub
parent 9d245fb2d6
commit fbd4c1e012
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 269 additions and 5 deletions

View File

@ -430,10 +430,22 @@ class Dataset(
HasDomain, HasDomain,
Entity, Entity,
): ):
"""Represents a dataset in DataHub.
A dataset represents a collection of data, such as a table, view, or file.
This class provides methods for managing dataset metadata including schema,
lineage, and various aspects like ownership, tags, and terms.
"""
__slots__ = () __slots__ = ()
@classmethod @classmethod
def get_urn_type(cls) -> Type[DatasetUrn]: def get_urn_type(cls) -> Type[DatasetUrn]:
"""Get the URN type for datasets.
Returns:
The DatasetUrn class.
"""
return DatasetUrn return DatasetUrn
def __init__( def __init__(
@ -466,6 +478,31 @@ class Dataset(
schema: Optional[SchemaFieldsInputType] = None, schema: Optional[SchemaFieldsInputType] = None,
upstreams: Optional[models.UpstreamLineageClass] = None, upstreams: Optional[models.UpstreamLineageClass] = None,
): ):
"""Initialize a new Dataset instance.
Args:
platform: The platform this dataset belongs to (e.g. "mysql", "snowflake").
name: The name of the dataset.
platform_instance: Optional platform instance identifier.
env: The environment this dataset belongs to (default: DEFAULT_ENV).
description: Optional description of the dataset.
display_name: Optional display name for the dataset.
qualified_name: Optional qualified name for the dataset.
external_url: Optional URL to external documentation or source.
custom_properties: Optional dictionary of custom properties.
created: Optional creation timestamp.
last_modified: Optional last modification timestamp.
parent_container: Optional parent container for this dataset.
subtype: Optional subtype of the dataset.
owners: Optional list of owners.
links: Optional list of links.
tags: Optional list of tags.
terms: Optional list of glossary terms.
domain: Optional domain this dataset belongs to.
extra_aspects: Optional list of additional aspects.
schema: Optional schema definition for the dataset.
upstreams: Optional upstream lineage information.
"""
urn = DatasetUrn.create_from_ids( urn = DatasetUrn.create_from_ids(
platform_id=platform, platform_id=platform,
table_name=name, table_name=name,
@ -539,6 +576,11 @@ class Dataset(
@property @property
def description(self) -> Optional[str]: def description(self) -> Optional[str]:
"""Get the description of the dataset.
Returns:
The description if set, None otherwise.
"""
editable_props = self._get_editable_props() editable_props = self._get_editable_props()
return first_non_null( return first_non_null(
[ [
@ -548,6 +590,15 @@ class Dataset(
) )
def set_description(self, description: str) -> None: def set_description(self, description: str) -> None:
"""Set the description of the dataset.
Args:
description: The description to set.
Note:
If called during ingestion, this will warn if overwriting
a non-ingestion description.
"""
if is_ingestion_attribution(): if is_ingestion_attribution():
editable_props = self._get_editable_props() editable_props = self._get_editable_props()
if editable_props is not None and editable_props.description is not None: if editable_props is not None and editable_props.description is not None:
@ -565,41 +616,96 @@ class Dataset(
@property @property
def display_name(self) -> Optional[str]: def display_name(self) -> Optional[str]:
"""Get the display name of the dataset.
Returns:
The display name if set, None otherwise.
"""
return self._ensure_dataset_props().name return self._ensure_dataset_props().name
def set_display_name(self, display_name: str) -> None: def set_display_name(self, display_name: str) -> None:
"""Set the display name of the dataset.
Args:
display_name: The display name to set.
"""
self._ensure_dataset_props().name = display_name self._ensure_dataset_props().name = display_name
@property @property
def qualified_name(self) -> Optional[str]: def qualified_name(self) -> Optional[str]:
"""Get the qualified name of the dataset.
Returns:
The qualified name if set, None otherwise.
"""
return self._ensure_dataset_props().qualifiedName return self._ensure_dataset_props().qualifiedName
def set_qualified_name(self, qualified_name: str) -> None: def set_qualified_name(self, qualified_name: str) -> None:
"""Set the qualified name of the dataset.
Args:
qualified_name: The qualified name to set.
"""
self._ensure_dataset_props().qualifiedName = qualified_name self._ensure_dataset_props().qualifiedName = qualified_name
@property @property
def external_url(self) -> Optional[str]: def external_url(self) -> Optional[str]:
"""Get the external URL of the dataset.
Returns:
The external URL if set, None otherwise.
"""
return self._ensure_dataset_props().externalUrl return self._ensure_dataset_props().externalUrl
def set_external_url(self, external_url: str) -> None: def set_external_url(self, external_url: str) -> None:
"""Set the external URL of the dataset.
Args:
external_url: The external URL to set.
"""
self._ensure_dataset_props().externalUrl = external_url self._ensure_dataset_props().externalUrl = external_url
@property @property
def custom_properties(self) -> Dict[str, str]: def custom_properties(self) -> Dict[str, str]:
"""Get the custom properties of the dataset.
Returns:
Dictionary of custom properties.
"""
return self._ensure_dataset_props().customProperties return self._ensure_dataset_props().customProperties
def set_custom_properties(self, custom_properties: Dict[str, str]) -> None: def set_custom_properties(self, custom_properties: Dict[str, str]) -> None:
"""Set the custom properties of the dataset.
Args:
custom_properties: Dictionary of custom properties to set.
"""
self._ensure_dataset_props().customProperties = custom_properties self._ensure_dataset_props().customProperties = custom_properties
@property @property
def created(self) -> Optional[datetime]: def created(self) -> Optional[datetime]:
"""Get the creation timestamp of the dataset.
Returns:
The creation timestamp if set, None otherwise.
"""
return parse_time_stamp(self._ensure_dataset_props().created) return parse_time_stamp(self._ensure_dataset_props().created)
def set_created(self, created: datetime) -> None: def set_created(self, created: datetime) -> None:
"""Set the creation timestamp of the dataset.
Args:
created: The creation timestamp to set.
"""
self._ensure_dataset_props().created = make_time_stamp(created) self._ensure_dataset_props().created = make_time_stamp(created)
@property @property
def last_modified(self) -> Optional[datetime]: def last_modified(self) -> Optional[datetime]:
"""Get the last modification timestamp of the dataset.
Returns:
The last modification timestamp if set, None otherwise.
"""
return parse_time_stamp(self._ensure_dataset_props().lastModified) return parse_time_stamp(self._ensure_dataset_props().lastModified)
def set_last_modified(self, last_modified: datetime) -> None: def set_last_modified(self, last_modified: datetime) -> None:
@ -614,6 +720,11 @@ class Dataset(
@property @property
def schema(self) -> List[SchemaField]: def schema(self) -> List[SchemaField]:
# TODO: Add some caching here to avoid iterating over the schema every time. # TODO: Add some caching here to avoid iterating over the schema every time.
"""Get the schema fields of the dataset.
Returns:
List of SchemaField objects representing the dataset's schema.
"""
schema_dict = self._schema_dict() schema_dict = self._schema_dict()
return [SchemaField(self, field_path) for field_path in schema_dict] return [SchemaField(self, field_path) for field_path in schema_dict]
@ -669,6 +780,17 @@ class Dataset(
def __getitem__(self, field_path: str) -> SchemaField: def __getitem__(self, field_path: str) -> SchemaField:
# TODO: Automatically deal with field path v2? # TODO: Automatically deal with field path v2?
"""Get a schema field by its path.
Args:
field_path: The path of the field to retrieve.
Returns:
A SchemaField instance.
Raises:
SchemaFieldKeyError: If the field is not found.
"""
schema_dict = self._schema_dict() schema_dict = self._schema_dict()
if field_path not in schema_dict: if field_path not in schema_dict:
raise SchemaFieldKeyError(f"Field {field_path} not found in schema") raise SchemaFieldKeyError(f"Field {field_path} not found in schema")

View File

@ -20,9 +20,24 @@ ExtraAspectsType = Union[None, List[AspectTypeVar]]
class Entity: class Entity:
"""Base class for all DataHub entities.
This class provides the core functionality for working with DataHub entities,
including aspect management and URN handling. It should not be instantiated directly;
instead, use one of its subclasses like Dataset or Container.
"""
__slots__ = ("_urn", "_prev_aspects", "_aspects") __slots__ = ("_urn", "_prev_aspects", "_aspects")
def __init__(self, /, urn: Urn): def __init__(self, /, urn: Urn):
"""Initialize a new Entity instance.
Args:
urn: The URN that uniquely identifies this entity.
Raises:
SdkUsageError: If this base class is instantiated directly.
"""
# This method is not meant for direct usage. # This method is not meant for direct usage.
if type(self) is Entity: if type(self) is Entity:
raise SdkUsageError(f"{Entity.__name__} cannot be instantiated directly.") raise SdkUsageError(f"{Entity.__name__} cannot be instantiated directly.")
@ -36,6 +51,15 @@ class Entity:
@classmethod @classmethod
def _new_from_graph(cls, urn: Urn, current_aspects: models.AspectBag) -> Self: def _new_from_graph(cls, urn: Urn, current_aspects: models.AspectBag) -> Self:
"""Create a new entity instance from graph data.
Args:
urn: The URN of the entity.
current_aspects: The current aspects of the entity from the graph.
Returns:
A new entity instance initialized with the graph data.
"""
# If an init method from a subclass adds required fields, it also needs to override this method. # If an init method from a subclass adds required fields, it also needs to override this method.
# An alternative approach would call cls.__new__() to bypass the init method, but it's a bit # An alternative approach would call cls.__new__() to bypass the init method, but it's a bit
# too hacky for my taste. # too hacky for my taste.
@ -43,6 +67,14 @@ class Entity:
return entity._init_from_graph(current_aspects) return entity._init_from_graph(current_aspects)
def _init_from_graph(self, current_aspects: models.AspectBag) -> Self: def _init_from_graph(self, current_aspects: models.AspectBag) -> Self:
"""Initialize the entity with aspects from the graph.
Args:
current_aspects: The current aspects of the entity from the graph.
Returns:
The entity instance with initialized aspects.
"""
self._prev_aspects = current_aspects self._prev_aspects = current_aspects
self._aspects = {} self._aspects = {}
@ -54,14 +86,30 @@ class Entity:
@classmethod @classmethod
@abc.abstractmethod @abc.abstractmethod
def get_urn_type(cls) -> Type[_SpecificUrn]: ... def get_urn_type(cls) -> Type[_SpecificUrn]:
"""Get the URN type for this entity class.
Returns:
The URN type class that corresponds to this entity type.
"""
...
@classmethod @classmethod
def entity_type_name(cls) -> str: def entity_type_name(cls) -> str:
"""Get the entity type name.
Returns:
The string name of this entity type.
"""
return cls.get_urn_type().ENTITY_TYPE return cls.get_urn_type().ENTITY_TYPE
@property @property
def urn(self) -> _SpecificUrn: def urn(self) -> _SpecificUrn:
"""Get the entity's URN.
Returns:
The URN that uniquely identifies this entity.
"""
return self._urn return self._urn
def _get_aspect( def _get_aspect(
@ -69,12 +117,33 @@ class Entity:
aspect_type: Type[AspectTypeVar], aspect_type: Type[AspectTypeVar],
/, /,
) -> Optional[AspectTypeVar]: ) -> Optional[AspectTypeVar]:
"""Get an aspect of the entity by its type.
Args:
aspect_type: The type of aspect to retrieve.
Returns:
The aspect if it exists, None otherwise.
"""
return self._aspects.get(aspect_type.ASPECT_NAME) # type: ignore return self._aspects.get(aspect_type.ASPECT_NAME) # type: ignore
def _set_aspect(self, value: AspectTypeVar, /) -> None: def _set_aspect(self, value: AspectTypeVar, /) -> None:
"""Set an aspect of the entity.
Args:
value: The aspect to set.
"""
self._aspects[value.ASPECT_NAME] = value # type: ignore self._aspects[value.ASPECT_NAME] = value # type: ignore
def _setdefault_aspect(self, default_aspect: AspectTypeVar, /) -> AspectTypeVar: def _setdefault_aspect(self, default_aspect: AspectTypeVar, /) -> AspectTypeVar:
"""Set a default aspect if it doesn't exist.
Args:
default_aspect: The default aspect to set if none exists.
Returns:
The existing aspect if one exists, otherwise the default aspect.
"""
# Similar semantics to dict.setdefault. # Similar semantics to dict.setdefault.
if existing_aspect := self._get_aspect(type(default_aspect)): if existing_aspect := self._get_aspect(type(default_aspect)):
return existing_aspect return existing_aspect
@ -85,6 +154,14 @@ class Entity:
self, self,
change_type: Union[str, models.ChangeTypeClass] = models.ChangeTypeClass.UPSERT, change_type: Union[str, models.ChangeTypeClass] = models.ChangeTypeClass.UPSERT,
) -> List[MetadataChangeProposalWrapper]: ) -> List[MetadataChangeProposalWrapper]:
"""Convert the entity's aspects to MetadataChangeProposals.
Args:
change_type: The type of change to apply (default: UPSERT).
Returns:
A list of MetadataChangeProposalWrapper objects.
"""
urn_str = str(self.urn) urn_str = str(self.urn)
mcps = [] mcps = []
@ -100,13 +177,32 @@ class Entity:
return mcps return mcps
def as_workunits(self) -> List[MetadataWorkUnit]: def as_workunits(self) -> List[MetadataWorkUnit]:
"""Convert the entity's aspects to MetadataWorkUnits.
Returns:
A list of MetadataWorkUnit objects.
"""
return [mcp.as_workunit() for mcp in self._as_mcps()] return [mcp.as_workunit() for mcp in self._as_mcps()]
def _set_extra_aspects(self, extra_aspects: ExtraAspectsType) -> None: def _set_extra_aspects(self, extra_aspects: ExtraAspectsType) -> None:
"""Set additional aspects on the entity.
Args:
extra_aspects: List of additional aspects to set.
Note:
This method does not validate for conflicts between extra aspects
and standard aspects.
"""
# TODO: Add validation to ensure that an "extra aspect" does not conflict # TODO: Add validation to ensure that an "extra aspect" does not conflict
# with / get overridden by a standard aspect. # with / get overridden by a standard aspect.
for aspect in extra_aspects or []: for aspect in extra_aspects or []:
self._set_aspect(aspect) self._set_aspect(aspect)
def __repr__(self) -> str: def __repr__(self) -> str:
"""Get a string representation of the entity.
Returns:
A string in the format "EntityClass('urn')".
"""
return f"{self.__class__.__name__}('{self.urn}')" return f"{self.__class__.__name__}('{self.urn}')"

View File

@ -24,7 +24,18 @@ if TYPE_CHECKING:
class EntityClient: class EntityClient:
"""Client for managing DataHub entities.
This class provides methods for retrieving and managing DataHub entities
such as datasets, containers, and other metadata objects.
"""
def __init__(self, client: DataHubClient): def __init__(self, client: DataHubClient):
"""Private constructor - use :py:attr:`DataHubClient.entities` instead.
Args:
client: The parent DataHubClient instance.
"""
self._client = client self._client = client
# TODO: Make all of these methods sync by default. # TODO: Make all of these methods sync by default.
@ -40,6 +51,19 @@ class EntityClient:
@overload @overload
def get(self, urn: Union[Urn, str]) -> Entity: ... def get(self, urn: Union[Urn, str]) -> Entity: ...
def get(self, urn: UrnOrStr) -> Entity: def get(self, urn: UrnOrStr) -> Entity:
"""Retrieve an entity by its urn.
Args:
urn: The urn of the entity to retrieve. Can be a string or :py:class:`Urn` object.
Returns:
The retrieved entity instance.
Raises:
ItemNotFoundError: If the entity does not exist.
SdkUsageError: If the entity type is not yet supported.
InvalidUrnError: If the URN is invalid.
"""
if not isinstance(urn, Urn): if not isinstance(urn, Urn):
urn = Urn.from_string(urn) urn = Urn.from_string(urn)

View File

@ -11,6 +11,17 @@ from datahub.sdk.search_client import SearchClient
class DataHubClient: class DataHubClient:
"""Main client for interacting with DataHub.
This class provides the primary interface for interacting with DataHub,
including entity management, search, and resolution capabilities.
The client can be initialized in three ways:
1. With a server URL and optional token
2. With a DatahubClientConfig object
3. With an existing (legacy) :py:class:`DataHubGraph` instance
"""
@overload @overload
def __init__(self, *, server: str, token: Optional[str] = None): ... def __init__(self, *, server: str, token: Optional[str] = None): ...
@overload @overload
@ -25,6 +36,17 @@ class DataHubClient:
graph: Optional[DataHubGraph] = None, graph: Optional[DataHubGraph] = None,
config: Optional[DatahubClientConfig] = None, config: Optional[DatahubClientConfig] = None,
): ):
"""Initialize a new DataHubClient instance.
Args:
server: The URL of the DataHub server (e.g. "http://localhost:8080").
token: Optional authentication token.
graph: An existing DataHubGraph instance to use.
config: A DatahubClientConfig object with connection details.
Raises:
SdkUsageError: If invalid combinations of arguments are provided.
"""
if server is not None: if server is not None:
if config is not None: if config is not None:
raise SdkUsageError("Cannot specify both server and config") raise SdkUsageError("Cannot specify both server and config")

View File

@ -157,7 +157,7 @@ class _EnvFilter(_BaseFilter):
class _CustomCondition(_BaseFilter): class _CustomCondition(_BaseFilter):
"""Represents a single field condition""" """Represents a single field condition."""
field: str field: str
condition: str condition: str
@ -173,7 +173,7 @@ class _CustomCondition(_BaseFilter):
class _And(_BaseFilter): class _And(_BaseFilter):
"""Represents an AND conjunction of filters""" """Represents an AND conjunction of filters."""
and_: Sequence["Filter"] = pydantic.Field(alias="and") and_: Sequence["Filter"] = pydantic.Field(alias="and")
# TODO: Add validator to ensure that the "and" field is not empty # TODO: Add validator to ensure that the "and" field is not empty
@ -221,7 +221,7 @@ class _And(_BaseFilter):
class _Or(_BaseFilter): class _Or(_BaseFilter):
"""Represents an OR conjunction of filters""" """Represents an OR conjunction of filters."""
or_: Sequence["Filter"] = pydantic.Field(alias="or") or_: Sequence["Filter"] = pydantic.Field(alias="or")
# TODO: Add validator to ensure that the "or" field is not empty # TODO: Add validator to ensure that the "or" field is not empty
@ -234,7 +234,7 @@ class _Or(_BaseFilter):
class _Not(_BaseFilter): class _Not(_BaseFilter):
"""Represents a NOT filter""" """Represents a NOT filter."""
not_: "Filter" = pydantic.Field(alias="not") not_: "Filter" = pydantic.Field(alias="not")