mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-09 18:24:48 +00:00
419 lines
12 KiB
Python
419 lines
12 KiB
Python
import pathlib
|
|
import re
|
|
from datetime import datetime, timezone
|
|
from unittest import mock
|
|
|
|
import pytest
|
|
|
|
from datahub.emitter.mcp_builder import ContainerKey
|
|
from datahub.errors import ItemNotFoundError
|
|
from datahub.metadata.urns import (
|
|
CorpUserUrn,
|
|
DataJobUrn,
|
|
DataPlatformInstanceUrn,
|
|
DataPlatformUrn,
|
|
DatasetUrn,
|
|
DomainUrn,
|
|
TagUrn,
|
|
)
|
|
from datahub.sdk.container import Container
|
|
from datahub.sdk.dataflow import DataFlow
|
|
from datahub.sdk.datajob import DataJob
|
|
from datahub.testing.sdk_v2_helpers import assert_entity_golden
|
|
from datahub.utilities.urns.error import InvalidUrnError
|
|
|
|
GOLDEN_DIR = pathlib.Path(__file__).parent / "datajob_golden"
|
|
GOLDEN_DIR.mkdir(exist_ok=True)
|
|
|
|
|
|
def test_datajob_basic(pytestconfig: pytest.Config) -> None:
|
|
# Create a dataflow first
|
|
flow = DataFlow(
|
|
platform="airflow",
|
|
name="example_dag",
|
|
)
|
|
|
|
# Create a basic datajob
|
|
job = DataJob(
|
|
flow=flow,
|
|
name="example_task",
|
|
)
|
|
|
|
# Check URN setup
|
|
assert DataJob.get_urn_type() == DataJobUrn
|
|
assert isinstance(job.urn, DataJobUrn)
|
|
assert (
|
|
str(job.urn)
|
|
== "urn:li:dataJob:(urn:li:dataFlow:(airflow,example_dag,PROD),example_task)"
|
|
)
|
|
assert str(job.urn) in repr(job)
|
|
|
|
# Check basic attributes
|
|
assert job.platform == flow.platform
|
|
assert job.platform_instance is None
|
|
assert job.browse_path == [flow.urn]
|
|
assert job.tags is None
|
|
assert job.terms is None
|
|
assert job.created is None
|
|
assert job.last_modified is None
|
|
assert job.description is None
|
|
assert job.custom_properties == {}
|
|
assert job.domain is None
|
|
assert job.name == "example_task"
|
|
|
|
# Validate errors for non-existent attributes
|
|
with pytest.raises(AttributeError):
|
|
assert job.extra_attribute # type: ignore
|
|
with pytest.raises(AttributeError):
|
|
job.extra_attribute = "slots should reject extra fields" # type: ignore
|
|
|
|
# Validate golden file
|
|
assert_entity_golden(job, GOLDEN_DIR / "test_datajob_basic_golden.json")
|
|
|
|
|
|
def test_datajob_complex() -> None:
|
|
# Create a dataflow first
|
|
flow = DataFlow(
|
|
platform="airflow",
|
|
platform_instance="my_instance",
|
|
name="example_dag",
|
|
)
|
|
|
|
created = datetime(2025, 1, 2, 3, 4, 5, tzinfo=timezone.utc)
|
|
updated = datetime(2025, 1, 9, 3, 4, 6, tzinfo=timezone.utc)
|
|
|
|
# Create a complex datajob with all attributes
|
|
job = DataJob(
|
|
flow=flow,
|
|
name="complex_task",
|
|
display_name="Complex Task",
|
|
description="A complex data processing task",
|
|
external_url="https://example.com/airflow/task",
|
|
created=created,
|
|
last_modified=updated,
|
|
custom_properties={
|
|
"schedule": "daily",
|
|
"owner_team": "data-engineering",
|
|
},
|
|
tags=[TagUrn("tag1"), TagUrn("tag2")],
|
|
owners=[
|
|
CorpUserUrn("admin@datahubproject.io"),
|
|
],
|
|
)
|
|
|
|
# Check attributes
|
|
assert job.name == "complex_task"
|
|
assert job.display_name == "Complex Task"
|
|
assert job.description == "A complex data processing task"
|
|
assert job.external_url == "https://example.com/airflow/task"
|
|
assert job.created == created
|
|
assert job.last_modified == updated
|
|
assert job.custom_properties == {
|
|
"schedule": "daily",
|
|
"owner_team": "data-engineering",
|
|
}
|
|
assert job.platform == flow.platform
|
|
assert job.platform == DataPlatformUrn("airflow")
|
|
assert job.platform_instance == flow.platform_instance
|
|
assert job.platform_instance == DataPlatformInstanceUrn("airflow", "my_instance")
|
|
assert job.browse_path == [flow.urn]
|
|
|
|
# Validate golden file
|
|
assert_entity_golden(job, GOLDEN_DIR / "test_datajob_complex_golden.json")
|
|
|
|
|
|
def test_client_get_datajob() -> None:
|
|
"""Test retrieving DataJobs using client.entities.get()."""
|
|
# Set up mock
|
|
mock_client = mock.MagicMock()
|
|
mock_entities = mock.MagicMock()
|
|
mock_client.entities = mock_entities
|
|
|
|
# Create a test flow URN
|
|
flow = DataFlow(
|
|
platform="airflow",
|
|
name="test_dag",
|
|
)
|
|
|
|
# Basic retrieval
|
|
job_urn = DataJobUrn.create_from_ids(
|
|
job_id="test_task",
|
|
data_flow_urn=str(flow.urn),
|
|
)
|
|
expected_job = DataJob(
|
|
flow=flow,
|
|
name="test_task",
|
|
description="A test data job",
|
|
)
|
|
mock_entities.get.return_value = expected_job
|
|
|
|
result = mock_client.entities.get(job_urn)
|
|
assert result == expected_job
|
|
mock_entities.get.assert_called_once_with(job_urn)
|
|
mock_entities.get.reset_mock()
|
|
|
|
# String URN
|
|
urn_str = "urn:li:dataJob:(urn:li:dataFlow:(airflow,example_dag,PROD),string_task)"
|
|
mock_entities.get.return_value = DataJob(
|
|
flow=flow,
|
|
name="string_task",
|
|
)
|
|
result = mock_client.entities.get(urn_str)
|
|
mock_entities.get.assert_called_once_with(urn_str)
|
|
mock_entities.get.reset_mock()
|
|
|
|
# Complex job with properties
|
|
test_date = datetime(2023, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
|
|
complex_job = DataJob(
|
|
flow=flow,
|
|
name="complex_task",
|
|
description="Complex test job",
|
|
display_name="My Complex Task",
|
|
external_url="https://example.com/task",
|
|
created=test_date,
|
|
last_modified=test_date,
|
|
custom_properties={"env": "production", "owner_team": "data-eng"},
|
|
)
|
|
|
|
# Set relationships and tags
|
|
complex_job.set_tags([TagUrn("important"), TagUrn("data-pipeline")])
|
|
complex_job.set_domain(DomainUrn("Data Engineering"))
|
|
complex_job.set_owners([CorpUserUrn("john@example.com")])
|
|
|
|
complex_job_urn = DataJobUrn.create_from_ids(
|
|
job_id="complex_task",
|
|
data_flow_urn=str(flow.urn),
|
|
)
|
|
mock_entities.get.return_value = complex_job
|
|
|
|
result = mock_client.entities.get(complex_job_urn)
|
|
assert result.name == "complex_task"
|
|
assert result.display_name == "My Complex Task"
|
|
assert result.created == test_date
|
|
assert result.description == "Complex test job"
|
|
assert result.tags is not None
|
|
assert result.domain is not None
|
|
assert result.owners is not None
|
|
mock_entities.get.assert_called_once_with(complex_job_urn)
|
|
mock_entities.get.reset_mock()
|
|
|
|
# Not found case
|
|
error_message = f"Entity {complex_job_urn} not found"
|
|
mock_entities.get.side_effect = ItemNotFoundError(error_message)
|
|
with pytest.raises(ItemNotFoundError, match=re.escape(error_message)):
|
|
mock_client.entities.get(complex_job_urn)
|
|
|
|
|
|
def test_datajob_init_with_flow_urn() -> None:
|
|
# Create a dataflow first
|
|
flow = DataFlow(
|
|
platform="airflow",
|
|
name="example_dag",
|
|
platform_instance="my_instance",
|
|
)
|
|
|
|
# Create a datajob with the flow URN
|
|
job = DataJob(
|
|
flow_urn=flow.urn,
|
|
platform_instance="my_instance",
|
|
name="example_task",
|
|
)
|
|
|
|
print("\n")
|
|
print("job.flow_urn:", job.flow_urn)
|
|
print("flow.urn:", flow.urn)
|
|
print("job.platform_instance:", job.platform_instance)
|
|
print("flow.platform_instance:", flow.platform_instance)
|
|
print("job.name:", job.name)
|
|
print("flow.name:", flow.name)
|
|
|
|
assert job.flow_urn == flow.urn
|
|
assert job.platform_instance == flow.platform_instance
|
|
assert job.name == "example_task"
|
|
|
|
assert_entity_golden(
|
|
job, GOLDEN_DIR / "test_datajob_init_with_flow_urn_golden.json"
|
|
)
|
|
|
|
|
|
def test_invalid_init() -> None:
|
|
flow = DataFlow(
|
|
platform="airflow",
|
|
name="example_dag",
|
|
)
|
|
|
|
with pytest.raises(
|
|
ValueError,
|
|
match=re.escape(
|
|
"You must provide either: 1. a DataFlow object, or 2. a DataFlowUrn (and a platform_instance config if required)"
|
|
),
|
|
):
|
|
DataJob(
|
|
name="example_task",
|
|
flow_urn=flow.urn,
|
|
)
|
|
with pytest.raises(
|
|
ValueError,
|
|
match=re.escape(
|
|
"You must provide either: 1. a DataFlow object, or 2. a DataFlowUrn (and a platform_instance config if required)"
|
|
),
|
|
):
|
|
DataJob(
|
|
name="example_task",
|
|
platform_instance="my_instance",
|
|
)
|
|
|
|
|
|
def test_datajob_browse_path_without_container() -> None:
|
|
# Create a dataflow without a container
|
|
flow = DataFlow(
|
|
platform="airflow",
|
|
name="example_dag",
|
|
)
|
|
|
|
# Create a datajob with the flow
|
|
job = DataJob(
|
|
flow=flow,
|
|
name="example_task",
|
|
)
|
|
|
|
# Check that parent and browse paths are set correctly
|
|
assert job.parent_container is None
|
|
assert job.browse_path == [flow.urn]
|
|
|
|
assert_entity_golden(
|
|
job, GOLDEN_DIR / "test_datajob_browse_path_without_container_golden.json"
|
|
)
|
|
|
|
|
|
def test_datajob_browse_path_with_container() -> None:
|
|
# Create a container
|
|
container = Container(
|
|
container_key=ContainerKey(
|
|
platform="airflow", name="my_container", instance="my_instance"
|
|
),
|
|
display_name="My Container",
|
|
)
|
|
# Create a dataflow with the container
|
|
flow = DataFlow(
|
|
platform="airflow",
|
|
name="example_dag",
|
|
parent_container=container,
|
|
)
|
|
|
|
# Create a datajob with the flow
|
|
job = DataJob(
|
|
flow=flow,
|
|
name="example_task",
|
|
)
|
|
|
|
# Check that parent and browse paths are set correctly
|
|
assert flow.parent_container == container.urn
|
|
assert flow.browse_path == [container.urn]
|
|
|
|
# The job's browse path should extend the flow's browse path with the job name
|
|
expected_job_path = [container.urn, flow.urn]
|
|
assert job.browse_path == expected_job_path
|
|
|
|
# Use golden file for verification
|
|
assert_entity_golden(job, GOLDEN_DIR / "test_datajob_browse_path_golden.json")
|
|
|
|
|
|
def test_datajob_browse_path_with_containers() -> None:
|
|
# Create a container
|
|
container1 = Container(
|
|
container_key=ContainerKey(
|
|
platform="airflow", name="my_container1", instance="my_instance"
|
|
),
|
|
display_name="My Container",
|
|
)
|
|
|
|
container2 = Container(
|
|
container_key=ContainerKey(
|
|
platform="airflow", name="my_container2", instance="my_instance"
|
|
),
|
|
display_name="My Container",
|
|
parent_container=container1,
|
|
)
|
|
|
|
# Create a dataflow with the container
|
|
flow = DataFlow(
|
|
platform="airflow",
|
|
name="example_dag",
|
|
parent_container=container2,
|
|
)
|
|
|
|
# Create a datajob with the flow
|
|
job = DataJob(
|
|
flow=flow,
|
|
name="example_task",
|
|
)
|
|
|
|
# Check that parent and browse paths are set correctly
|
|
assert flow.parent_container == container2.urn
|
|
assert flow.browse_path == [container1.urn, container2.urn]
|
|
assert job.browse_path == [container1.urn, container2.urn, flow.urn]
|
|
|
|
assert_entity_golden(
|
|
job, GOLDEN_DIR / "test_datajob_browse_path_with_containers_golden.json"
|
|
)
|
|
|
|
|
|
def test_datajob_inlets_outlets() -> None:
|
|
# Create a dataflow first
|
|
flow = DataFlow(
|
|
platform="airflow",
|
|
name="example_dag",
|
|
)
|
|
|
|
# Create a datajob with the flow
|
|
job = DataJob(
|
|
flow=flow,
|
|
name="example_task",
|
|
inlets=[
|
|
"urn:li:dataset:(urn:li:dataPlatform:airflow,example_dataset1,PROD)",
|
|
"urn:li:dataset:(urn:li:dataPlatform:airflow,example_dataset2,PROD)",
|
|
],
|
|
outlets=["urn:li:dataset:(urn:li:dataPlatform:airflow,example_dataset3,PROD)"],
|
|
)
|
|
|
|
assert job.inlets == [
|
|
DatasetUrn.from_string(
|
|
"urn:li:dataset:(urn:li:dataPlatform:airflow,example_dataset1,PROD)"
|
|
),
|
|
DatasetUrn.from_string(
|
|
"urn:li:dataset:(urn:li:dataPlatform:airflow,example_dataset2,PROD)"
|
|
),
|
|
]
|
|
assert job.outlets == [
|
|
DatasetUrn.from_string(
|
|
"urn:li:dataset:(urn:li:dataPlatform:airflow,example_dataset3,PROD)"
|
|
),
|
|
]
|
|
|
|
assert_entity_golden(job, GOLDEN_DIR / "test_datajob_inlets_outlets_golden.json")
|
|
|
|
|
|
def test_datajob_invalid_inlets_outlets() -> None:
|
|
# Create a dataflow first
|
|
flow = DataFlow(
|
|
platform="airflow",
|
|
name="example_dag",
|
|
)
|
|
|
|
# Create a datajob with the flow
|
|
job = DataJob(
|
|
flow=flow,
|
|
name="example_task",
|
|
)
|
|
|
|
with pytest.raises(InvalidUrnError):
|
|
job.set_inlets(
|
|
["urn:li:dataJob:(urn:li:dataFlow:(airflow,example_dag,PROD),example_task)"]
|
|
)
|
|
|
|
with pytest.raises(InvalidUrnError):
|
|
job.set_outlets(
|
|
["urn:li:datajob:(urn:li:dataFlow:(airflow,example_dag,PROD),example_task)"]
|
|
)
|