419 lines
12 KiB
Python

import pathlib
import re
from datetime import datetime, timezone
from unittest import mock
import pytest
from datahub.emitter.mcp_builder import ContainerKey
from datahub.errors import ItemNotFoundError
from datahub.metadata.urns import (
CorpUserUrn,
DataJobUrn,
DataPlatformInstanceUrn,
DataPlatformUrn,
DatasetUrn,
DomainUrn,
TagUrn,
)
from datahub.sdk.container import Container
from datahub.sdk.dataflow import DataFlow
from datahub.sdk.datajob import DataJob
from datahub.testing.sdk_v2_helpers import assert_entity_golden
from datahub.utilities.urns.error import InvalidUrnError
GOLDEN_DIR = pathlib.Path(__file__).parent / "datajob_golden"
GOLDEN_DIR.mkdir(exist_ok=True)
def test_datajob_basic(pytestconfig: pytest.Config) -> None:
# Create a dataflow first
flow = DataFlow(
platform="airflow",
name="example_dag",
)
# Create a basic datajob
job = DataJob(
flow=flow,
name="example_task",
)
# Check URN setup
assert DataJob.get_urn_type() == DataJobUrn
assert isinstance(job.urn, DataJobUrn)
assert (
str(job.urn)
== "urn:li:dataJob:(urn:li:dataFlow:(airflow,example_dag,PROD),example_task)"
)
assert str(job.urn) in repr(job)
# Check basic attributes
assert job.platform == flow.platform
assert job.platform_instance is None
assert job.browse_path == [flow.urn]
assert job.tags is None
assert job.terms is None
assert job.created is None
assert job.last_modified is None
assert job.description is None
assert job.custom_properties == {}
assert job.domain is None
assert job.name == "example_task"
# Validate errors for non-existent attributes
with pytest.raises(AttributeError):
assert job.extra_attribute # type: ignore
with pytest.raises(AttributeError):
job.extra_attribute = "slots should reject extra fields" # type: ignore
# Validate golden file
assert_entity_golden(job, GOLDEN_DIR / "test_datajob_basic_golden.json")
def test_datajob_complex() -> None:
# Create a dataflow first
flow = DataFlow(
platform="airflow",
platform_instance="my_instance",
name="example_dag",
)
created = datetime(2025, 1, 2, 3, 4, 5, tzinfo=timezone.utc)
updated = datetime(2025, 1, 9, 3, 4, 6, tzinfo=timezone.utc)
# Create a complex datajob with all attributes
job = DataJob(
flow=flow,
name="complex_task",
display_name="Complex Task",
description="A complex data processing task",
external_url="https://example.com/airflow/task",
created=created,
last_modified=updated,
custom_properties={
"schedule": "daily",
"owner_team": "data-engineering",
},
tags=[TagUrn("tag1"), TagUrn("tag2")],
owners=[
CorpUserUrn("admin@datahubproject.io"),
],
)
# Check attributes
assert job.name == "complex_task"
assert job.display_name == "Complex Task"
assert job.description == "A complex data processing task"
assert job.external_url == "https://example.com/airflow/task"
assert job.created == created
assert job.last_modified == updated
assert job.custom_properties == {
"schedule": "daily",
"owner_team": "data-engineering",
}
assert job.platform == flow.platform
assert job.platform == DataPlatformUrn("airflow")
assert job.platform_instance == flow.platform_instance
assert job.platform_instance == DataPlatformInstanceUrn("airflow", "my_instance")
assert job.browse_path == [flow.urn]
# Validate golden file
assert_entity_golden(job, GOLDEN_DIR / "test_datajob_complex_golden.json")
def test_client_get_datajob() -> None:
"""Test retrieving DataJobs using client.entities.get()."""
# Set up mock
mock_client = mock.MagicMock()
mock_entities = mock.MagicMock()
mock_client.entities = mock_entities
# Create a test flow URN
flow = DataFlow(
platform="airflow",
name="test_dag",
)
# Basic retrieval
job_urn = DataJobUrn.create_from_ids(
job_id="test_task",
data_flow_urn=str(flow.urn),
)
expected_job = DataJob(
flow=flow,
name="test_task",
description="A test data job",
)
mock_entities.get.return_value = expected_job
result = mock_client.entities.get(job_urn)
assert result == expected_job
mock_entities.get.assert_called_once_with(job_urn)
mock_entities.get.reset_mock()
# String URN
urn_str = "urn:li:dataJob:(urn:li:dataFlow:(airflow,example_dag,PROD),string_task)"
mock_entities.get.return_value = DataJob(
flow=flow,
name="string_task",
)
result = mock_client.entities.get(urn_str)
mock_entities.get.assert_called_once_with(urn_str)
mock_entities.get.reset_mock()
# Complex job with properties
test_date = datetime(2023, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
complex_job = DataJob(
flow=flow,
name="complex_task",
description="Complex test job",
display_name="My Complex Task",
external_url="https://example.com/task",
created=test_date,
last_modified=test_date,
custom_properties={"env": "production", "owner_team": "data-eng"},
)
# Set relationships and tags
complex_job.set_tags([TagUrn("important"), TagUrn("data-pipeline")])
complex_job.set_domain(DomainUrn("Data Engineering"))
complex_job.set_owners([CorpUserUrn("john@example.com")])
complex_job_urn = DataJobUrn.create_from_ids(
job_id="complex_task",
data_flow_urn=str(flow.urn),
)
mock_entities.get.return_value = complex_job
result = mock_client.entities.get(complex_job_urn)
assert result.name == "complex_task"
assert result.display_name == "My Complex Task"
assert result.created == test_date
assert result.description == "Complex test job"
assert result.tags is not None
assert result.domain is not None
assert result.owners is not None
mock_entities.get.assert_called_once_with(complex_job_urn)
mock_entities.get.reset_mock()
# Not found case
error_message = f"Entity {complex_job_urn} not found"
mock_entities.get.side_effect = ItemNotFoundError(error_message)
with pytest.raises(ItemNotFoundError, match=re.escape(error_message)):
mock_client.entities.get(complex_job_urn)
def test_datajob_init_with_flow_urn() -> None:
# Create a dataflow first
flow = DataFlow(
platform="airflow",
name="example_dag",
platform_instance="my_instance",
)
# Create a datajob with the flow URN
job = DataJob(
flow_urn=flow.urn,
platform_instance="my_instance",
name="example_task",
)
print("\n")
print("job.flow_urn:", job.flow_urn)
print("flow.urn:", flow.urn)
print("job.platform_instance:", job.platform_instance)
print("flow.platform_instance:", flow.platform_instance)
print("job.name:", job.name)
print("flow.name:", flow.name)
assert job.flow_urn == flow.urn
assert job.platform_instance == flow.platform_instance
assert job.name == "example_task"
assert_entity_golden(
job, GOLDEN_DIR / "test_datajob_init_with_flow_urn_golden.json"
)
def test_invalid_init() -> None:
flow = DataFlow(
platform="airflow",
name="example_dag",
)
with pytest.raises(
ValueError,
match=re.escape(
"You must provide either: 1. a DataFlow object, or 2. a DataFlowUrn (and a platform_instance config if required)"
),
):
DataJob(
name="example_task",
flow_urn=flow.urn,
)
with pytest.raises(
ValueError,
match=re.escape(
"You must provide either: 1. a DataFlow object, or 2. a DataFlowUrn (and a platform_instance config if required)"
),
):
DataJob(
name="example_task",
platform_instance="my_instance",
)
def test_datajob_browse_path_without_container() -> None:
# Create a dataflow without a container
flow = DataFlow(
platform="airflow",
name="example_dag",
)
# Create a datajob with the flow
job = DataJob(
flow=flow,
name="example_task",
)
# Check that parent and browse paths are set correctly
assert job.parent_container is None
assert job.browse_path == [flow.urn]
assert_entity_golden(
job, GOLDEN_DIR / "test_datajob_browse_path_without_container_golden.json"
)
def test_datajob_browse_path_with_container() -> None:
# Create a container
container = Container(
container_key=ContainerKey(
platform="airflow", name="my_container", instance="my_instance"
),
display_name="My Container",
)
# Create a dataflow with the container
flow = DataFlow(
platform="airflow",
name="example_dag",
parent_container=container,
)
# Create a datajob with the flow
job = DataJob(
flow=flow,
name="example_task",
)
# Check that parent and browse paths are set correctly
assert flow.parent_container == container.urn
assert flow.browse_path == [container.urn]
# The job's browse path should extend the flow's browse path with the job name
expected_job_path = [container.urn, flow.urn]
assert job.browse_path == expected_job_path
# Use golden file for verification
assert_entity_golden(job, GOLDEN_DIR / "test_datajob_browse_path_golden.json")
def test_datajob_browse_path_with_containers() -> None:
# Create a container
container1 = Container(
container_key=ContainerKey(
platform="airflow", name="my_container1", instance="my_instance"
),
display_name="My Container",
)
container2 = Container(
container_key=ContainerKey(
platform="airflow", name="my_container2", instance="my_instance"
),
display_name="My Container",
parent_container=container1,
)
# Create a dataflow with the container
flow = DataFlow(
platform="airflow",
name="example_dag",
parent_container=container2,
)
# Create a datajob with the flow
job = DataJob(
flow=flow,
name="example_task",
)
# Check that parent and browse paths are set correctly
assert flow.parent_container == container2.urn
assert flow.browse_path == [container1.urn, container2.urn]
assert job.browse_path == [container1.urn, container2.urn, flow.urn]
assert_entity_golden(
job, GOLDEN_DIR / "test_datajob_browse_path_with_containers_golden.json"
)
def test_datajob_inlets_outlets() -> None:
# Create a dataflow first
flow = DataFlow(
platform="airflow",
name="example_dag",
)
# Create a datajob with the flow
job = DataJob(
flow=flow,
name="example_task",
inlets=[
"urn:li:dataset:(urn:li:dataPlatform:airflow,example_dataset1,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:airflow,example_dataset2,PROD)",
],
outlets=["urn:li:dataset:(urn:li:dataPlatform:airflow,example_dataset3,PROD)"],
)
assert job.inlets == [
DatasetUrn.from_string(
"urn:li:dataset:(urn:li:dataPlatform:airflow,example_dataset1,PROD)"
),
DatasetUrn.from_string(
"urn:li:dataset:(urn:li:dataPlatform:airflow,example_dataset2,PROD)"
),
]
assert job.outlets == [
DatasetUrn.from_string(
"urn:li:dataset:(urn:li:dataPlatform:airflow,example_dataset3,PROD)"
),
]
assert_entity_golden(job, GOLDEN_DIR / "test_datajob_inlets_outlets_golden.json")
def test_datajob_invalid_inlets_outlets() -> None:
# Create a dataflow first
flow = DataFlow(
platform="airflow",
name="example_dag",
)
# Create a datajob with the flow
job = DataJob(
flow=flow,
name="example_task",
)
with pytest.raises(InvalidUrnError):
job.set_inlets(
["urn:li:dataJob:(urn:li:dataFlow:(airflow,example_dag,PROD),example_task)"]
)
with pytest.raises(InvalidUrnError):
job.set_outlets(
["urn:li:datajob:(urn:li:dataFlow:(airflow,example_dag,PROD),example_task)"]
)