feat(sdk): add dataflow and datajob entity (#13551)

This commit is contained in:
Hyejin Yoon 2025-05-29 22:53:56 +09:00 committed by GitHub
parent f335093607
commit a142a9e2d2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 1909 additions and 1 deletions

View File

@ -0,0 +1,13 @@
from datahub.metadata.urns import TagUrn
from datahub.sdk import DataFlow, DataHubClient
client = DataHubClient.from_env()
dataflow = DataFlow(
name="example_dataflow",
platform="airflow",
description="airflow pipeline for production",
tags=[TagUrn(name="production"), TagUrn(name="data_engineering")],
)
client.entities.upsert(dataflow)

View File

@ -0,0 +1,22 @@
from datahub.metadata.urns import TagUrn
from datahub.sdk import DataFlow, DataHubClient, DataJob
client = DataHubClient.from_env()
# datajob will inherit the platform and platform instance from the flow
dataflow = DataFlow(
platform="airflow",
name="example_dag",
platform_instance="PROD",
description="example dataflow",
tags=[TagUrn(name="tag1"), TagUrn(name="tag2")],
)
datajob = DataJob(
name="example_datajob",
flow=dataflow,
)
client.entities.upsert(dataflow)
client.entities.upsert(datajob)

View File

@ -19,8 +19,12 @@ from datahub.metadata.urns import (
TagUrn,
)
from datahub.sdk.container import Container
from datahub.sdk.dataflow import DataFlow
from datahub.sdk.datajob import DataJob
from datahub.sdk.dataset import Dataset
from datahub.sdk.main_client import DataHubClient
from datahub.sdk.mlmodel import MLModel
from datahub.sdk.mlmodelgroup import MLModelGroup
from datahub.sdk.search_filters import Filter, FilterDsl
# We want to print out the warning if people do `from datahub.sdk import X`.

View File

@ -1,6 +1,8 @@
from typing import Dict, List, Type
from datahub.sdk.container import Container
from datahub.sdk.dataflow import DataFlow
from datahub.sdk.datajob import DataJob
from datahub.sdk.dataset import Dataset
from datahub.sdk.entity import Entity
from datahub.sdk.mlmodel import MLModel
@ -12,6 +14,8 @@ ENTITY_CLASSES_LIST: List[Type[Entity]] = [
Dataset,
MLModel,
MLModelGroup,
DataFlow,
DataJob,
]
ENTITY_CLASSES: Dict[str, Type[Entity]] = {

View File

@ -29,6 +29,7 @@ from datahub.metadata.urns import (
ContainerUrn,
CorpGroupUrn,
CorpUserUrn,
DataFlowUrn,
DataJobUrn,
DataPlatformInstanceUrn,
DataPlatformUrn,
@ -47,10 +48,10 @@ from datahub.utilities.urns.error import InvalidUrnError
if TYPE_CHECKING:
from datahub.sdk.container import Container
UrnOrStr: TypeAlias = Union[Urn, str]
DatasetUrnOrStr: TypeAlias = Union[str, DatasetUrn]
DatajobUrnOrStr: TypeAlias = Union[str, DataJobUrn]
DataflowUrnOrStr: TypeAlias = Union[str, DataFlowUrn]
ActorUrn: TypeAlias = Union[CorpUserUrn, CorpGroupUrn]

View File

@ -0,0 +1,302 @@
from __future__ import annotations
import warnings
from datetime import datetime
from typing import Dict, Optional, Type, Union
from typing_extensions import Self
import datahub.metadata.schema_classes as models
from datahub.cli.cli_utils import first_non_null
from datahub.emitter.mce_builder import DEFAULT_ENV
from datahub.errors import (
IngestionAttributionWarning,
)
from datahub.metadata.urns import DataFlowUrn, Urn
from datahub.sdk._attribution import is_ingestion_attribution
from datahub.sdk._shared import (
DomainInputType,
HasContainer,
HasDomain,
HasInstitutionalMemory,
HasOwnership,
HasPlatformInstance,
HasSubtype,
HasTags,
HasTerms,
LinksInputType,
OwnersInputType,
ParentContainerInputType,
TagsInputType,
TermsInputType,
make_time_stamp,
parse_time_stamp,
)
from datahub.sdk.entity import Entity, ExtraAspectsType
from datahub.utilities.sentinels import Unset, unset
class DataFlow(
HasPlatformInstance,
HasSubtype,
HasOwnership,
HasContainer,
HasInstitutionalMemory,
HasTags,
HasTerms,
HasDomain,
Entity,
):
"""Represents a dataflow in DataHub.
A dataflow represents a collection of data, such as a table, view, or file.
This class provides methods for managing dataflow metadata including schema,
lineage, and various aspects like ownership, tags, and terms.
"""
__slots__ = ()
@classmethod
def get_urn_type(cls) -> Type[DataFlowUrn]:
"""Get the URN type for dataflows.
Returns:
The DataflowUrn class.
"""
return DataFlowUrn
def __init__(
self,
*,
# Identity.
name: str,
platform: str,
display_name: Optional[str] = None,
platform_instance: Optional[str] = None,
env: str = DEFAULT_ENV,
# Dataflow properties.
description: Optional[str] = None,
external_url: Optional[str] = None,
custom_properties: Optional[Dict[str, str]] = None,
created: Optional[datetime] = None,
last_modified: Optional[datetime] = None,
# Standard aspects.
subtype: Optional[str] = None,
owners: Optional[OwnersInputType] = None,
links: Optional[LinksInputType] = None,
tags: Optional[TagsInputType] = None,
terms: Optional[TermsInputType] = None,
domain: Optional[DomainInputType] = None,
parent_container: ParentContainerInputType | Unset = unset,
extra_aspects: ExtraAspectsType = None,
):
"""Initialize a new Dataflow instance.
Args:
platform: The platform this dataflow belongs to (e.g. "mysql", "snowflake").
name: The name of the dataflow.
platform_instance: Optional platform instance identifier.
env: The environment this dataflow belongs to (default: DEFAULT_ENV).
description: Optional description of the dataflow.
display_name: Optional display name for the dataflow.
external_url: Optional URL to external documentation or source.
custom_properties: Optional dictionary of custom properties.
created: Optional creation timestamp.
last_modified: Optional last modification timestamp.
subtype: Optional subtype of the dataflow.
owners: Optional list of owners.
links: Optional list of links.
tags: Optional list of tags.
terms: Optional list of glossary terms.
domain: Optional domain this dataflow belongs to.
extra_aspects: Optional list of additional aspects.
upstreams: Optional upstream lineage information.
"""
urn = DataFlowUrn.create_from_ids(
orchestrator=platform,
flow_id=name,
env=env,
platform_instance=platform_instance,
)
super().__init__(urn)
self._set_extra_aspects(extra_aspects)
self._set_platform_instance(urn.orchestrator, platform_instance)
# Initialize DataFlowInfoClass directly with name
self._setdefault_aspect(models.DataFlowInfoClass(name=display_name or name))
self._ensure_dataflow_props().env = env
if description is not None:
self.set_description(description)
if display_name is not None:
self.set_display_name(display_name)
if external_url is not None:
self.set_external_url(external_url)
if custom_properties is not None:
self.set_custom_properties(custom_properties)
if created is not None:
self.set_created(created)
if last_modified is not None:
self.set_last_modified(last_modified)
if subtype is not None:
self.set_subtype(subtype)
if owners is not None:
self.set_owners(owners)
if links is not None:
self.set_links(links)
if tags is not None:
self.set_tags(tags)
if terms is not None:
self.set_terms(terms)
if domain is not None:
self.set_domain(domain)
if parent_container is not unset:
self._set_container(parent_container)
@classmethod
def _new_from_graph(cls, urn: Urn, current_aspects: models.AspectBag) -> Self:
assert isinstance(urn, DataFlowUrn)
entity = cls(
platform=urn.orchestrator,
name=urn.flow_id,
)
return entity._init_from_graph(current_aspects)
@property
def urn(self) -> DataFlowUrn:
return self._urn # type: ignore
def _ensure_dataflow_props(self) -> models.DataFlowInfoClass:
props = self._get_aspect(models.DataFlowInfoClass)
if props is None:
# Use name from URN as fallback
props = models.DataFlowInfoClass(name=self.urn.flow_id)
self._set_aspect(props)
return props
def _get_editable_props(self) -> Optional[models.EditableDataFlowPropertiesClass]:
return self._get_aspect(models.EditableDataFlowPropertiesClass)
def _ensure_editable_props(self) -> models.EditableDataFlowPropertiesClass:
# Note that most of the fields in this aspect are not used.
# The only one that's relevant for us is the description.
return self._setdefault_aspect(models.EditableDataFlowPropertiesClass())
@property
def description(self) -> Optional[str]:
"""Get the description of the dataflow.
Returns:
The description if set, None otherwise.
"""
editable_props = self._get_editable_props()
return first_non_null(
[
editable_props.description if editable_props is not None else None,
self._ensure_dataflow_props().description,
]
)
def set_description(self, description: str) -> None:
"""Set the description of the dataflow.
Args:
description: The description to set.
Note:
If called during ingestion, this will warn if overwriting
a non-ingestion description.
"""
if is_ingestion_attribution():
editable_props = self._get_editable_props()
if editable_props is not None and editable_props.description is not None:
warnings.warn(
"Overwriting non-ingestion description from ingestion is an anti-pattern.",
category=IngestionAttributionWarning,
stacklevel=2,
)
# Force the ingestion description to show up.
editable_props.description = None
self._ensure_dataflow_props().description = description
else:
self._ensure_editable_props().description = description
@property
def name(self) -> str:
"""Get the name of the dataflow.
Returns:
The name of the dataflow.
"""
return self.urn.flow_id
@property
def display_name(self) -> Optional[str]:
"""Get the display name of the dataflow.
Returns:
The display name if set, None otherwise.
"""
return self._ensure_dataflow_props().name
def set_display_name(self, display_name: str) -> None:
"""Set the display name of the dataflow.
Args:
display_name: The display name to set.
"""
self._ensure_dataflow_props().name = display_name
@property
def external_url(self) -> Optional[str]:
"""Get the external URL of the dataflow.
Returns:
The external URL if set, None otherwise.
"""
return self._ensure_dataflow_props().externalUrl
def set_external_url(self, external_url: str) -> None:
"""Set the external URL of the dataflow.
Args:
external_url: The external URL to set.
"""
self._ensure_dataflow_props().externalUrl = external_url
@property
def custom_properties(self) -> Dict[str, str]:
"""Get the custom properties of the dataflow.
Returns:
Dictionary of custom properties.
"""
return self._ensure_dataflow_props().customProperties
def set_custom_properties(self, custom_properties: Dict[str, str]) -> None:
"""Set the custom properties of the dataflow.
Args:
custom_properties: Dictionary of custom properties to set.
"""
self._ensure_dataflow_props().customProperties = custom_properties
@property
def created(self) -> Optional[datetime]:
"""Get the creation timestamp of the dataflow.
Returns:
The creation timestamp if set, None otherwise.
"""
return parse_time_stamp(self._ensure_dataflow_props().created)
def set_created(self, created: datetime) -> None:
"""Set the creation timestamp of the dataflow.
Args:
created: The creation timestamp to set.
"""
self._ensure_dataflow_props().created = make_time_stamp(created)
@property
def last_modified(self) -> Optional[datetime]:
"""Get the last modification timestamp of the dataflow.
Returns:
The last modification timestamp if set, None otherwise.
"""
return parse_time_stamp(self._ensure_dataflow_props().lastModified)
def set_last_modified(self, last_modified: datetime) -> None:
self._ensure_dataflow_props().lastModified = make_time_stamp(last_modified)
@property
def env(self) -> Optional[Union[str, models.FabricTypeClass]]:
"""Get the environment of the dataflow."""
return self._ensure_dataflow_props().env

View File

@ -0,0 +1,335 @@
from __future__ import annotations
import warnings
from datetime import datetime
from typing import Dict, List, Optional, Type
from typing_extensions import Self
import datahub.metadata.schema_classes as models
from datahub.cli.cli_utils import first_non_null
from datahub.errors import IngestionAttributionWarning
from datahub.metadata.urns import (
DataFlowUrn,
DataJobUrn,
DatasetUrn,
Urn,
)
from datahub.sdk._attribution import is_ingestion_attribution
from datahub.sdk._shared import (
DataflowUrnOrStr,
DatasetUrnOrStr,
DomainInputType,
HasContainer,
HasDomain,
HasInstitutionalMemory,
HasOwnership,
HasPlatformInstance,
HasSubtype,
HasTags,
HasTerms,
LinksInputType,
OwnersInputType,
TagsInputType,
TermsInputType,
make_time_stamp,
parse_time_stamp,
)
from datahub.sdk.dataflow import DataFlow
from datahub.sdk.entity import Entity, ExtraAspectsType
class DataJob(
HasPlatformInstance,
HasSubtype,
HasContainer,
HasOwnership,
HasInstitutionalMemory,
HasTags,
HasTerms,
HasDomain,
Entity,
):
"""Represents a data job in DataHub.
A data job is an executable unit of a data pipeline, such as an Airflow task or a Spark job.
"""
__slots__ = ()
@classmethod
def get_urn_type(cls) -> Type[DataJobUrn]:
"""Get the URN type for data jobs."""
return DataJobUrn
def __init__(
self,
*,
name: str,
flow: Optional[DataFlow] = None,
flow_urn: Optional[DataflowUrnOrStr] = None,
platform_instance: Optional[str] = None,
display_name: Optional[str] = None,
description: Optional[str] = None,
external_url: Optional[str] = None,
custom_properties: Optional[Dict[str, str]] = None,
created: Optional[datetime] = None,
last_modified: Optional[datetime] = None,
# Standard aspects
subtype: Optional[str] = None,
owners: Optional[OwnersInputType] = None,
links: Optional[LinksInputType] = None,
tags: Optional[TagsInputType] = None,
terms: Optional[TermsInputType] = None,
domain: Optional[DomainInputType] = None,
extra_aspects: ExtraAspectsType = None,
inlets: Optional[List[DatasetUrnOrStr]] = None,
outlets: Optional[List[DatasetUrnOrStr]] = None,
):
"""
Initialize a DataJob with either a DataFlow or a DataFlowUrn with platform instance.
Args:
name: Name of the data job (required)
flow: A DataFlow object (optional)
flow_urn: A DataFlowUrn object (optional)
platform_instance: Platform instance name (optional, required if flow_urn is provided)
... (other optional parameters)
Raises:
ValueError: If neither flow nor (flow_urn and platform_instance) are provided
"""
if flow is None:
if flow_urn is None or platform_instance is None:
raise ValueError(
"You must provide either: 1. a DataFlow object, or 2. a DataFlowUrn (and a platform_instance config if required)"
)
flow_urn = DataFlowUrn.from_string(flow_urn)
if flow_urn.flow_id.startswith(f"{platform_instance}."):
flow_name = flow_urn.flow_id[len(platform_instance) + 1 :]
else:
flow_name = flow_urn.flow_id
flow = DataFlow(
platform=flow_urn.orchestrator,
name=flow_name,
platform_instance=platform_instance,
)
urn = DataJobUrn.create_from_ids(
job_id=name,
data_flow_urn=str(flow.urn),
)
super().__init__(urn)
self._set_extra_aspects(extra_aspects)
self._set_platform_instance(flow.urn.orchestrator, flow.platform_instance)
self._set_browse_path_from_flow(flow)
# Initialize DataJobInfoClass with default type
job_info = models.DataJobInfoClass(
name=display_name or name,
type=models.AzkabanJobTypeClass.COMMAND, # Default type
)
self._setdefault_aspect(job_info)
self._ensure_datajob_props().flowUrn = str(flow.urn)
# Set properties if provided
if description is not None:
self.set_description(description)
if external_url is not None:
self.set_external_url(external_url)
if custom_properties is not None:
self.set_custom_properties(custom_properties)
if created is not None:
self.set_created(created)
if last_modified is not None:
self.set_last_modified(last_modified)
# Set standard aspects
if subtype is not None:
self.set_subtype(subtype)
if owners is not None:
self.set_owners(owners)
if links is not None:
self.set_links(links)
if tags is not None:
self.set_tags(tags)
if terms is not None:
self.set_terms(terms)
if domain is not None:
self.set_domain(domain)
if inlets is not None:
self.set_inlets(inlets)
if outlets is not None:
self.set_outlets(outlets)
@classmethod
def _new_from_graph(cls, urn: Urn, current_aspects: models.AspectBag) -> Self:
assert isinstance(urn, DataJobUrn)
# Extracting platform from the DataFlowUrn inside the DataJobUrn
data_flow_urn = urn.get_data_flow_urn()
entity = cls(
flow=DataFlow(
platform=data_flow_urn.orchestrator,
name=data_flow_urn.flow_id,
),
name=urn.job_id,
)
return entity._init_from_graph(current_aspects)
@property
def urn(self) -> DataJobUrn:
return self._urn # type: ignore
def _ensure_datajob_props(self) -> models.DataJobInfoClass:
props = self._get_aspect(models.DataJobInfoClass)
if props is None:
# Use name from URN as fallback with default type
props = models.DataJobInfoClass(
name=self.urn.job_id, type=models.AzkabanJobTypeClass.COMMAND
)
self._set_aspect(props)
return props
def _get_datajob_inputoutput_props(
self,
) -> Optional[models.DataJobInputOutputClass]:
return self._get_aspect(models.DataJobInputOutputClass)
def _ensure_datajob_inputoutput_props(
self,
) -> models.DataJobInputOutputClass:
return self._setdefault_aspect(
models.DataJobInputOutputClass(inputDatasets=[], outputDatasets=[])
)
def _get_editable_props(self) -> Optional[models.EditableDataJobPropertiesClass]:
return self._get_aspect(models.EditableDataJobPropertiesClass)
def _ensure_editable_props(self) -> models.EditableDataJobPropertiesClass:
return self._setdefault_aspect(models.EditableDataJobPropertiesClass())
@property
def description(self) -> Optional[str]:
"""Get the description of the data job."""
editable_props = self._get_editable_props()
return first_non_null(
[
editable_props.description if editable_props is not None else None,
self._ensure_datajob_props().description,
]
)
def set_description(self, description: str) -> None:
"""Set the description of the data job."""
if is_ingestion_attribution():
editable_props = self._get_editable_props()
if editable_props is not None and editable_props.description is not None:
warnings.warn(
"Overwriting non-ingestion description from ingestion is an anti-pattern.",
category=IngestionAttributionWarning,
stacklevel=2,
)
# Force the ingestion description to show up.
editable_props.description = None
self._ensure_datajob_props().description = description
else:
self._ensure_editable_props().description = description
@property
def name(self) -> str:
"""Get the name of the data job."""
return self.urn.job_id
@property
def display_name(self) -> Optional[str]:
"""Get the display name of the data job."""
return self._ensure_datajob_props().name
def set_display_name(self, display_name: str) -> None:
"""Set the display name of the data job."""
self._ensure_datajob_props().name = display_name
@property
def external_url(self) -> Optional[str]:
"""Get the external URL of the data job."""
return self._ensure_datajob_props().externalUrl
def set_external_url(self, external_url: str) -> None:
"""Set the external URL of the data job."""
self._ensure_datajob_props().externalUrl = external_url
@property
def custom_properties(self) -> Dict[str, str]:
"""Get the custom properties of the data job."""
return self._ensure_datajob_props().customProperties
def set_custom_properties(self, custom_properties: Dict[str, str]) -> None:
"""Set the custom properties of the data job."""
self._ensure_datajob_props().customProperties = custom_properties
@property
def created(self) -> Optional[datetime]:
"""Get the creation timestamp of the data job."""
return parse_time_stamp(self._ensure_datajob_props().created)
def set_created(self, created: datetime) -> None:
"""Set the creation timestamp of the data job."""
self._ensure_datajob_props().created = make_time_stamp(created)
@property
def last_modified(self) -> Optional[datetime]:
"""Get the last modification timestamp of the data job."""
return parse_time_stamp(self._ensure_datajob_props().lastModified)
def set_last_modified(self, last_modified: datetime) -> None:
"""Set the last modification timestamp of the data job."""
self._ensure_datajob_props().lastModified = make_time_stamp(last_modified)
@property
def flow_urn(self) -> DataFlowUrn:
"""Get the data flow associated with the data job."""
return self.urn.get_data_flow_urn()
def _set_browse_path_from_flow(self, flow: DataFlow) -> None:
flow_browse_path = flow._get_aspect(models.BrowsePathsV2Class)
# extend the flow's browse path with this job
browse_path = []
if flow_browse_path is not None:
for entry in flow_browse_path.path:
browse_path.append(
models.BrowsePathEntryClass(id=entry.id, urn=entry.urn)
)
# Add the job itself to the path
browse_path.append(models.BrowsePathEntryClass(id=flow.name, urn=str(flow.urn)))
# Set the browse path aspect
self._set_aspect(models.BrowsePathsV2Class(path=browse_path))
@property
def inlets(self) -> List[DatasetUrn]:
"""Get the inlets of the data job."""
inlets = self._ensure_datajob_inputoutput_props().inputDatasets
return [DatasetUrn.from_string(inlet) for inlet in inlets]
def set_inlets(self, inlets: List[DatasetUrnOrStr]) -> None:
"""Set the inlets of the data job."""
for inlet in inlets:
inlet_urn = DatasetUrn.from_string(inlet) # type checking
self._ensure_datajob_inputoutput_props().inputDatasets.append(
str(inlet_urn)
)
@property
def outlets(self) -> List[DatasetUrn]:
"""Get the outlets of the data job."""
outlets = self._ensure_datajob_inputoutput_props().outputDatasets
return [DatasetUrn.from_string(outlet) for outlet in outlets]
def set_outlets(self, outlets: List[DatasetUrnOrStr]) -> None:
"""Set the outlets of the data job."""
for outlet in outlets:
outlet_urn = DatasetUrn.from_string(outlet) # type checking
self._ensure_datajob_inputoutput_props().outputDatasets.append(
str(outlet_urn)
)

View File

@ -10,6 +10,8 @@ from datahub.errors import IngestionAttributionWarning, ItemNotFoundError, SdkUs
from datahub.ingestion.graph.client import DataHubGraph
from datahub.metadata.urns import (
ContainerUrn,
DataFlowUrn,
DataJobUrn,
DatasetUrn,
MlModelGroupUrn,
MlModelUrn,
@ -18,6 +20,8 @@ from datahub.metadata.urns import (
from datahub.sdk._all_entities import ENTITY_CLASSES
from datahub.sdk._shared import UrnOrStr
from datahub.sdk.container import Container
from datahub.sdk.dataflow import DataFlow
from datahub.sdk.datajob import DataJob
from datahub.sdk.dataset import Dataset
from datahub.sdk.entity import Entity
from datahub.sdk.mlmodel import MLModel
@ -57,6 +61,10 @@ class EntityClient:
@overload
def get(self, urn: MlModelGroupUrn) -> MLModelGroup: ...
@overload
def get(self, urn: DataFlowUrn) -> DataFlow: ...
@overload
def get(self, urn: DataJobUrn) -> DataJob: ...
@overload
def get(self, urn: Union[Urn, str]) -> Entity: ...
def get(self, urn: UrnOrStr) -> Entity:
"""Retrieve an entity by its urn.

View File

@ -0,0 +1,26 @@
[
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,example_dag,PROD)",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:airflow"
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,example_dag,PROD)",
"changeType": "UPSERT",
"aspectName": "dataFlowInfo",
"aspect": {
"json": {
"customProperties": {},
"name": "example_dag",
"env": "PROD"
}
}
}
]

View File

@ -0,0 +1,155 @@
[
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,my_instance.example_dag,PROD)",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:airflow",
"instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:airflow,my_instance)"
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,my_instance.example_dag,PROD)",
"changeType": "UPSERT",
"aspectName": "dataFlowInfo",
"aspect": {
"json": {
"customProperties": {
"key1": "value1",
"key2": "value2"
},
"externalUrl": "https://example.com",
"name": "Example DAG",
"created": {
"time": 1735787045000
},
"lastModified": {
"time": 1736391846000
},
"env": "PROD"
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,my_instance.example_dag,PROD)",
"changeType": "UPSERT",
"aspectName": "editableDataFlowProperties",
"aspect": {
"json": {
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"description": "Test dataflow"
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,my_instance.example_dag,PROD)",
"changeType": "UPSERT",
"aspectName": "ownership",
"aspect": {
"json": {
"owners": [
{
"owner": "urn:li:corpuser:admin@datahubproject.io",
"type": "TECHNICAL_OWNER"
}
],
"ownerTypes": {},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
}
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,my_instance.example_dag,PROD)",
"changeType": "UPSERT",
"aspectName": "institutionalMemory",
"aspect": {
"json": {
"elements": [
{
"url": "https://example.com/doc1",
"description": "https://example.com/doc1",
"createStamp": {
"time": 0,
"actor": "urn:li:corpuser:__ingestion"
}
},
{
"url": "https://example.com/doc2",
"description": "Documentation 2",
"createStamp": {
"time": 0,
"actor": "urn:li:corpuser:__ingestion"
}
}
]
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,my_instance.example_dag,PROD)",
"changeType": "UPSERT",
"aspectName": "globalTags",
"aspect": {
"json": {
"tags": [
{
"tag": "urn:li:tag:tag1"
},
{
"tag": "urn:li:tag:tag2"
}
]
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,my_instance.example_dag,PROD)",
"changeType": "UPSERT",
"aspectName": "glossaryTerms",
"aspect": {
"json": {
"terms": [
{
"urn": "urn:li:glossaryTerm:DataPipeline"
}
],
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:__ingestion"
}
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,my_instance.example_dag,PROD)",
"changeType": "UPSERT",
"aspectName": "domains",
"aspect": {
"json": {
"domains": [
"urn:li:domain:Data Engineering"
]
}
}
}
]

View File

@ -0,0 +1,45 @@
[
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,example_dag,PROD),example_task)",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:airflow"
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,example_dag,PROD),example_task)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "example_dag",
"urn": "urn:li:dataFlow:(airflow,example_dag,PROD)"
}
]
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,example_dag,PROD),example_task)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {},
"name": "example_task",
"type": {
"string": "COMMAND"
},
"flowUrn": "urn:li:dataFlow:(airflow,example_dag,PROD)"
}
}
}
]

View File

@ -0,0 +1,49 @@
[
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,example_dag,PROD),example_task)",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:airflow"
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,example_dag,PROD),example_task)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:container:753b430567bda9e9948b93636f7bdd50",
"urn": "urn:li:container:753b430567bda9e9948b93636f7bdd50"
},
{
"id": "example_dag",
"urn": "urn:li:dataFlow:(airflow,example_dag,PROD)"
}
]
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,example_dag,PROD),example_task)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {},
"name": "example_task",
"type": {
"string": "COMMAND"
},
"flowUrn": "urn:li:dataFlow:(airflow,example_dag,PROD)"
}
}
}
]

View File

@ -0,0 +1,53 @@
[
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,example_dag,PROD),example_task)",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:airflow"
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,example_dag,PROD),example_task)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:container:753b430567bda9e9948b93636f7bdd50",
"urn": "urn:li:container:753b430567bda9e9948b93636f7bdd50"
},
{
"id": "urn:li:container:753b430567bda9e9948b93636f7bdd50",
"urn": "urn:li:container:753b430567bda9e9948b93636f7bdd50"
},
{
"id": "example_dag",
"urn": "urn:li:dataFlow:(airflow,example_dag,PROD)"
}
]
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,example_dag,PROD),example_task)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {},
"name": "example_task",
"type": {
"string": "COMMAND"
},
"flowUrn": "urn:li:dataFlow:(airflow,example_dag,PROD)"
}
}
}
]

View File

@ -0,0 +1,45 @@
[
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,example_dag,PROD),example_task)",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:airflow"
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,example_dag,PROD),example_task)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "example_dag",
"urn": "urn:li:dataFlow:(airflow,example_dag,PROD)"
}
]
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,example_dag,PROD),example_task)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {},
"name": "example_task",
"type": {
"string": "COMMAND"
},
"flowUrn": "urn:li:dataFlow:(airflow,example_dag,PROD)"
}
}
}
]

View File

@ -0,0 +1,114 @@
[
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,my_instance.example_dag,PROD),complex_task)",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:airflow",
"instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:airflow,my_instance)"
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,my_instance.example_dag,PROD),complex_task)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "my_instance.example_dag",
"urn": "urn:li:dataFlow:(airflow,my_instance.example_dag,PROD)"
}
]
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,my_instance.example_dag,PROD),complex_task)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {
"schedule": "daily",
"owner_team": "data-engineering"
},
"externalUrl": "https://example.com/airflow/task",
"name": "Complex Task",
"type": {
"string": "COMMAND"
},
"flowUrn": "urn:li:dataFlow:(airflow,my_instance.example_dag,PROD)",
"created": {
"time": 1735787045000
},
"lastModified": {
"time": 1736391846000
}
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,my_instance.example_dag,PROD),complex_task)",
"changeType": "UPSERT",
"aspectName": "editableDataJobProperties",
"aspect": {
"json": {
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"description": "A complex data processing task"
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,my_instance.example_dag,PROD),complex_task)",
"changeType": "UPSERT",
"aspectName": "ownership",
"aspect": {
"json": {
"owners": [
{
"owner": "urn:li:corpuser:admin@datahubproject.io",
"type": "TECHNICAL_OWNER"
}
],
"ownerTypes": {},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
}
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,my_instance.example_dag,PROD),complex_task)",
"changeType": "UPSERT",
"aspectName": "globalTags",
"aspect": {
"json": {
"tags": [
{
"tag": "urn:li:tag:tag1"
},
{
"tag": "urn:li:tag:tag2"
}
]
}
}
}
]

View File

@ -0,0 +1,46 @@
[
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,my_instance.example_dag,PROD),example_task)",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:airflow",
"instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:airflow,my_instance)"
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,my_instance.example_dag,PROD),example_task)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "my_instance.example_dag",
"urn": "urn:li:dataFlow:(airflow,my_instance.example_dag,PROD)"
}
]
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,my_instance.example_dag,PROD),example_task)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {},
"name": "example_task",
"type": {
"string": "COMMAND"
},
"flowUrn": "urn:li:dataFlow:(airflow,my_instance.example_dag,PROD)"
}
}
}
]

View File

@ -0,0 +1,62 @@
[
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,example_dag,PROD),example_task)",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:airflow"
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,example_dag,PROD),example_task)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "example_dag",
"urn": "urn:li:dataFlow:(airflow,example_dag,PROD)"
}
]
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,example_dag,PROD),example_task)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {},
"name": "example_task",
"type": {
"string": "COMMAND"
},
"flowUrn": "urn:li:dataFlow:(airflow,example_dag,PROD)"
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,example_dag,PROD),example_task)",
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspect": {
"json": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:airflow,example_dataset1,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:airflow,example_dataset2,PROD)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:airflow,example_dataset3,PROD)"
]
}
}
}
]

View File

@ -0,0 +1,206 @@
import pathlib
import re
from datetime import datetime, timezone
from unittest import mock
import pytest
from datahub.emitter.mce_builder import DEFAULT_ENV
from datahub.emitter.mcp_builder import ContainerKey
from datahub.errors import ItemNotFoundError
from datahub.metadata.urns import (
CorpUserUrn,
DataFlowUrn,
DomainUrn,
GlossaryTermUrn,
TagUrn,
)
from datahub.sdk.container import Container
from datahub.sdk.dataflow import DataFlow
from datahub.testing.sdk_v2_helpers import assert_entity_golden
GOLDEN_DIR = pathlib.Path(__file__).parent / "dataflow_golden"
def test_dataflow_basic(pytestconfig: pytest.Config) -> None:
d = DataFlow(
platform="airflow",
name="example_dag",
)
# Check urn setup.
assert DataFlow.get_urn_type() == DataFlowUrn
assert isinstance(d.urn, DataFlowUrn)
assert str(d.urn) == f"urn:li:dataFlow:(airflow,example_dag,{DEFAULT_ENV})"
assert str(d.urn) in repr(d)
# Check most attributes.
assert d.platform is not None
assert d.platform.platform_name == "airflow"
assert d.platform_instance is None
assert d.tags is None
assert d.terms is None
assert d.created is None
assert d.last_modified is None
assert d.description is None
assert d.custom_properties == {}
assert d.domain is None
with pytest.raises(AttributeError):
assert d.extra_attribute # type: ignore
with pytest.raises(AttributeError):
d.extra_attribute = "slots should reject extra fields" # type: ignore
with pytest.raises(AttributeError):
# This should fail. Eventually we should make it suggest calling set_owners instead.
d.owners = [] # type: ignore
assert_entity_golden(d, GOLDEN_DIR / "test_dataflow_basic_golden.json")
def test_dataflow_complex() -> None:
created = datetime(2025, 1, 2, 3, 4, 5, tzinfo=timezone.utc)
updated = datetime(2025, 1, 9, 3, 4, 6, tzinfo=timezone.utc)
d = DataFlow(
platform="airflow",
platform_instance="my_instance",
name="example_dag",
display_name="Example DAG",
created=created,
last_modified=updated,
custom_properties={
"key1": "value1",
"key2": "value2",
},
description="Test dataflow",
external_url="https://example.com",
owners=[
CorpUserUrn("admin@datahubproject.io"),
],
links=[
"https://example.com/doc1",
("https://example.com/doc2", "Documentation 2"),
],
tags=[
TagUrn("tag1"),
TagUrn("tag2"),
],
terms=[
GlossaryTermUrn("DataPipeline"),
],
domain=DomainUrn("Data Engineering"),
)
assert d.platform is not None
assert d.platform.platform_name == "airflow"
assert d.platform_instance is not None
assert (
str(d.platform_instance)
== "urn:li:dataPlatformInstance:(urn:li:dataPlatform:airflow,my_instance)"
)
# Properties.
assert d.description == "Test dataflow"
assert d.display_name == "Example DAG"
assert d.external_url == "https://example.com"
assert d.created == created
assert d.last_modified == updated
assert d.custom_properties == {"key1": "value1", "key2": "value2"}
# Check standard aspects.
assert d.owners is not None and len(d.owners) == 1
assert d.links is not None and len(d.links) == 2
assert d.tags is not None and len(d.tags) == 2
assert d.terms is not None and len(d.terms) == 1
assert d.domain == DomainUrn("Data Engineering")
# Add assertions for links
assert d.links is not None
assert len(d.links) == 2
assert d.links[0].url == "https://example.com/doc1"
assert d.links[1].url == "https://example.com/doc2"
assert_entity_golden(d, GOLDEN_DIR / "test_dataflow_complex_golden.json")
def test_client_get_dataflow() -> None:
"""Test retrieving DataFlows using client.entities.get()."""
# Set up mock
mock_client = mock.MagicMock()
mock_entities = mock.MagicMock()
mock_client.entities = mock_entities
# Basic retrieval
flow_urn = DataFlowUrn("airflow", "test_dag", DEFAULT_ENV)
expected_flow = DataFlow(
platform="airflow",
name="test_dag",
description="A test dataflow",
)
mock_entities.get.return_value = expected_flow
result = mock_client.entities.get(flow_urn)
assert result == expected_flow
mock_entities.get.assert_called_once_with(flow_urn)
mock_entities.get.reset_mock()
# String URN
urn_str = f"urn:li:dataFlow:(airflow,string_dag,{DEFAULT_ENV})"
mock_entities.get.return_value = DataFlow(platform="airflow", name="string_dag")
result = mock_client.entities.get(urn_str)
mock_entities.get.assert_called_once_with(urn_str)
mock_entities.get.reset_mock()
# Complex dataflow with properties
test_date = datetime(2023, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
complex_flow = DataFlow(
platform="airflow",
name="complex_dag",
description="Complex test dataflow",
display_name="My Complex DAG",
external_url="https://example.com/dag",
created=test_date,
last_modified=test_date,
custom_properties={"env": "production", "owner_team": "data-eng"},
)
# Set relationships and tags
complex_flow.set_tags([TagUrn("important"), TagUrn("data-pipeline")])
complex_flow.set_domain(DomainUrn("Data Engineering"))
complex_flow.set_owners([CorpUserUrn("john@example.com")])
flow_urn = DataFlowUrn("airflow", "complex_dag", DEFAULT_ENV)
mock_entities.get.return_value = complex_flow
result = mock_client.entities.get(flow_urn)
assert result.name == "complex_dag"
assert result.display_name == "My Complex DAG"
assert result.created == test_date
assert result.description == "Complex test dataflow"
assert result.tags is not None
assert result.domain is not None
assert result.owners is not None
mock_entities.get.assert_called_once_with(flow_urn)
mock_entities.get.reset_mock()
# Not found case
error_message = f"Entity {flow_urn} not found"
mock_entities.get.side_effect = ItemNotFoundError(error_message)
with pytest.raises(ItemNotFoundError, match=re.escape(error_message)):
mock_client.entities.get(flow_urn)
def test_dataflow_with_container() -> None:
container = Container(
container_key=ContainerKey(
platform="airflow", name="my_container", instance="my_instance"
),
display_name="My Container",
)
flow = DataFlow(
platform="airflow",
name="example_dag",
parent_container=container,
)
assert flow.parent_container == container.urn
assert flow.browse_path == [container.urn]

View File

@ -0,0 +1,418 @@
import pathlib
import re
from datetime import datetime, timezone
from unittest import mock
import pytest
from datahub.emitter.mcp_builder import ContainerKey
from datahub.errors import ItemNotFoundError
from datahub.metadata.urns import (
CorpUserUrn,
DataJobUrn,
DataPlatformInstanceUrn,
DataPlatformUrn,
DatasetUrn,
DomainUrn,
TagUrn,
)
from datahub.sdk.container import Container
from datahub.sdk.dataflow import DataFlow
from datahub.sdk.datajob import DataJob
from datahub.testing.sdk_v2_helpers import assert_entity_golden
from datahub.utilities.urns.error import InvalidUrnError
GOLDEN_DIR = pathlib.Path(__file__).parent / "datajob_golden"
GOLDEN_DIR.mkdir(exist_ok=True)
def test_datajob_basic(pytestconfig: pytest.Config) -> None:
# Create a dataflow first
flow = DataFlow(
platform="airflow",
name="example_dag",
)
# Create a basic datajob
job = DataJob(
flow=flow,
name="example_task",
)
# Check URN setup
assert DataJob.get_urn_type() == DataJobUrn
assert isinstance(job.urn, DataJobUrn)
assert (
str(job.urn)
== "urn:li:dataJob:(urn:li:dataFlow:(airflow,example_dag,PROD),example_task)"
)
assert str(job.urn) in repr(job)
# Check basic attributes
assert job.platform == flow.platform
assert job.platform_instance is None
assert job.browse_path == [flow.urn]
assert job.tags is None
assert job.terms is None
assert job.created is None
assert job.last_modified is None
assert job.description is None
assert job.custom_properties == {}
assert job.domain is None
assert job.name == "example_task"
# Validate errors for non-existent attributes
with pytest.raises(AttributeError):
assert job.extra_attribute # type: ignore
with pytest.raises(AttributeError):
job.extra_attribute = "slots should reject extra fields" # type: ignore
# Validate golden file
assert_entity_golden(job, GOLDEN_DIR / "test_datajob_basic_golden.json")
def test_datajob_complex() -> None:
# Create a dataflow first
flow = DataFlow(
platform="airflow",
platform_instance="my_instance",
name="example_dag",
)
created = datetime(2025, 1, 2, 3, 4, 5, tzinfo=timezone.utc)
updated = datetime(2025, 1, 9, 3, 4, 6, tzinfo=timezone.utc)
# Create a complex datajob with all attributes
job = DataJob(
flow=flow,
name="complex_task",
display_name="Complex Task",
description="A complex data processing task",
external_url="https://example.com/airflow/task",
created=created,
last_modified=updated,
custom_properties={
"schedule": "daily",
"owner_team": "data-engineering",
},
tags=[TagUrn("tag1"), TagUrn("tag2")],
owners=[
CorpUserUrn("admin@datahubproject.io"),
],
)
# Check attributes
assert job.name == "complex_task"
assert job.display_name == "Complex Task"
assert job.description == "A complex data processing task"
assert job.external_url == "https://example.com/airflow/task"
assert job.created == created
assert job.last_modified == updated
assert job.custom_properties == {
"schedule": "daily",
"owner_team": "data-engineering",
}
assert job.platform == flow.platform
assert job.platform == DataPlatformUrn("airflow")
assert job.platform_instance == flow.platform_instance
assert job.platform_instance == DataPlatformInstanceUrn("airflow", "my_instance")
assert job.browse_path == [flow.urn]
# Validate golden file
assert_entity_golden(job, GOLDEN_DIR / "test_datajob_complex_golden.json")
def test_client_get_datajob() -> None:
"""Test retrieving DataJobs using client.entities.get()."""
# Set up mock
mock_client = mock.MagicMock()
mock_entities = mock.MagicMock()
mock_client.entities = mock_entities
# Create a test flow URN
flow = DataFlow(
platform="airflow",
name="test_dag",
)
# Basic retrieval
job_urn = DataJobUrn.create_from_ids(
job_id="test_task",
data_flow_urn=str(flow.urn),
)
expected_job = DataJob(
flow=flow,
name="test_task",
description="A test data job",
)
mock_entities.get.return_value = expected_job
result = mock_client.entities.get(job_urn)
assert result == expected_job
mock_entities.get.assert_called_once_with(job_urn)
mock_entities.get.reset_mock()
# String URN
urn_str = "urn:li:dataJob:(urn:li:dataFlow:(airflow,example_dag,PROD),string_task)"
mock_entities.get.return_value = DataJob(
flow=flow,
name="string_task",
)
result = mock_client.entities.get(urn_str)
mock_entities.get.assert_called_once_with(urn_str)
mock_entities.get.reset_mock()
# Complex job with properties
test_date = datetime(2023, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
complex_job = DataJob(
flow=flow,
name="complex_task",
description="Complex test job",
display_name="My Complex Task",
external_url="https://example.com/task",
created=test_date,
last_modified=test_date,
custom_properties={"env": "production", "owner_team": "data-eng"},
)
# Set relationships and tags
complex_job.set_tags([TagUrn("important"), TagUrn("data-pipeline")])
complex_job.set_domain(DomainUrn("Data Engineering"))
complex_job.set_owners([CorpUserUrn("john@example.com")])
complex_job_urn = DataJobUrn.create_from_ids(
job_id="complex_task",
data_flow_urn=str(flow.urn),
)
mock_entities.get.return_value = complex_job
result = mock_client.entities.get(complex_job_urn)
assert result.name == "complex_task"
assert result.display_name == "My Complex Task"
assert result.created == test_date
assert result.description == "Complex test job"
assert result.tags is not None
assert result.domain is not None
assert result.owners is not None
mock_entities.get.assert_called_once_with(complex_job_urn)
mock_entities.get.reset_mock()
# Not found case
error_message = f"Entity {complex_job_urn} not found"
mock_entities.get.side_effect = ItemNotFoundError(error_message)
with pytest.raises(ItemNotFoundError, match=re.escape(error_message)):
mock_client.entities.get(complex_job_urn)
def test_datajob_init_with_flow_urn() -> None:
# Create a dataflow first
flow = DataFlow(
platform="airflow",
name="example_dag",
platform_instance="my_instance",
)
# Create a datajob with the flow URN
job = DataJob(
flow_urn=flow.urn,
platform_instance="my_instance",
name="example_task",
)
print("\n")
print("job.flow_urn:", job.flow_urn)
print("flow.urn:", flow.urn)
print("job.platform_instance:", job.platform_instance)
print("flow.platform_instance:", flow.platform_instance)
print("job.name:", job.name)
print("flow.name:", flow.name)
assert job.flow_urn == flow.urn
assert job.platform_instance == flow.platform_instance
assert job.name == "example_task"
assert_entity_golden(
job, GOLDEN_DIR / "test_datajob_init_with_flow_urn_golden.json"
)
def test_invalid_init() -> None:
flow = DataFlow(
platform="airflow",
name="example_dag",
)
with pytest.raises(
ValueError,
match=re.escape(
"You must provide either: 1. a DataFlow object, or 2. a DataFlowUrn (and a platform_instance config if required)"
),
):
DataJob(
name="example_task",
flow_urn=flow.urn,
)
with pytest.raises(
ValueError,
match=re.escape(
"You must provide either: 1. a DataFlow object, or 2. a DataFlowUrn (and a platform_instance config if required)"
),
):
DataJob(
name="example_task",
platform_instance="my_instance",
)
def test_datajob_browse_path_without_container() -> None:
# Create a dataflow without a container
flow = DataFlow(
platform="airflow",
name="example_dag",
)
# Create a datajob with the flow
job = DataJob(
flow=flow,
name="example_task",
)
# Check that parent and browse paths are set correctly
assert job.parent_container is None
assert job.browse_path == [flow.urn]
assert_entity_golden(
job, GOLDEN_DIR / "test_datajob_browse_path_without_container_golden.json"
)
def test_datajob_browse_path_with_container() -> None:
# Create a container
container = Container(
container_key=ContainerKey(
platform="airflow", name="my_container", instance="my_instance"
),
display_name="My Container",
)
# Create a dataflow with the container
flow = DataFlow(
platform="airflow",
name="example_dag",
parent_container=container,
)
# Create a datajob with the flow
job = DataJob(
flow=flow,
name="example_task",
)
# Check that parent and browse paths are set correctly
assert flow.parent_container == container.urn
assert flow.browse_path == [container.urn]
# The job's browse path should extend the flow's browse path with the job name
expected_job_path = [container.urn, flow.urn]
assert job.browse_path == expected_job_path
# Use golden file for verification
assert_entity_golden(job, GOLDEN_DIR / "test_datajob_browse_path_golden.json")
def test_datajob_browse_path_with_containers() -> None:
# Create a container
container1 = Container(
container_key=ContainerKey(
platform="airflow", name="my_container1", instance="my_instance"
),
display_name="My Container",
)
container2 = Container(
container_key=ContainerKey(
platform="airflow", name="my_container2", instance="my_instance"
),
display_name="My Container",
parent_container=container1,
)
# Create a dataflow with the container
flow = DataFlow(
platform="airflow",
name="example_dag",
parent_container=container2,
)
# Create a datajob with the flow
job = DataJob(
flow=flow,
name="example_task",
)
# Check that parent and browse paths are set correctly
assert flow.parent_container == container2.urn
assert flow.browse_path == [container1.urn, container2.urn]
assert job.browse_path == [container1.urn, container2.urn, flow.urn]
assert_entity_golden(
job, GOLDEN_DIR / "test_datajob_browse_path_with_containers_golden.json"
)
def test_datajob_inlets_outlets() -> None:
# Create a dataflow first
flow = DataFlow(
platform="airflow",
name="example_dag",
)
# Create a datajob with the flow
job = DataJob(
flow=flow,
name="example_task",
inlets=[
"urn:li:dataset:(urn:li:dataPlatform:airflow,example_dataset1,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:airflow,example_dataset2,PROD)",
],
outlets=["urn:li:dataset:(urn:li:dataPlatform:airflow,example_dataset3,PROD)"],
)
assert job.inlets == [
DatasetUrn.from_string(
"urn:li:dataset:(urn:li:dataPlatform:airflow,example_dataset1,PROD)"
),
DatasetUrn.from_string(
"urn:li:dataset:(urn:li:dataPlatform:airflow,example_dataset2,PROD)"
),
]
assert job.outlets == [
DatasetUrn.from_string(
"urn:li:dataset:(urn:li:dataPlatform:airflow,example_dataset3,PROD)"
),
]
assert_entity_golden(job, GOLDEN_DIR / "test_datajob_inlets_outlets_golden.json")
def test_datajob_invalid_inlets_outlets() -> None:
# Create a dataflow first
flow = DataFlow(
platform="airflow",
name="example_dag",
)
# Create a datajob with the flow
job = DataJob(
flow=flow,
name="example_task",
)
with pytest.raises(InvalidUrnError):
job.set_inlets(
["urn:li:dataJob:(urn:li:dataFlow:(airflow,example_dag,PROD),example_task)"]
)
with pytest.raises(InvalidUrnError):
job.set_outlets(
["urn:li:datajob:(urn:li:dataFlow:(airflow,example_dag,PROD),example_task)"]
)