mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-10 10:44:39 +00:00
207 lines
6.7 KiB
Python
207 lines
6.7 KiB
Python
![]() |
import pathlib
|
||
|
import re
|
||
|
from datetime import datetime, timezone
|
||
|
from unittest import mock
|
||
|
|
||
|
import pytest
|
||
|
|
||
|
from datahub.emitter.mce_builder import DEFAULT_ENV
|
||
|
from datahub.emitter.mcp_builder import ContainerKey
|
||
|
from datahub.errors import ItemNotFoundError
|
||
|
from datahub.metadata.urns import (
|
||
|
CorpUserUrn,
|
||
|
DataFlowUrn,
|
||
|
DomainUrn,
|
||
|
GlossaryTermUrn,
|
||
|
TagUrn,
|
||
|
)
|
||
|
from datahub.sdk.container import Container
|
||
|
from datahub.sdk.dataflow import DataFlow
|
||
|
from datahub.testing.sdk_v2_helpers import assert_entity_golden
|
||
|
|
||
|
GOLDEN_DIR = pathlib.Path(__file__).parent / "dataflow_golden"
|
||
|
|
||
|
|
||
|
def test_dataflow_basic(pytestconfig: pytest.Config) -> None:
|
||
|
d = DataFlow(
|
||
|
platform="airflow",
|
||
|
name="example_dag",
|
||
|
)
|
||
|
|
||
|
# Check urn setup.
|
||
|
assert DataFlow.get_urn_type() == DataFlowUrn
|
||
|
assert isinstance(d.urn, DataFlowUrn)
|
||
|
assert str(d.urn) == f"urn:li:dataFlow:(airflow,example_dag,{DEFAULT_ENV})"
|
||
|
assert str(d.urn) in repr(d)
|
||
|
|
||
|
# Check most attributes.
|
||
|
assert d.platform is not None
|
||
|
assert d.platform.platform_name == "airflow"
|
||
|
assert d.platform_instance is None
|
||
|
assert d.tags is None
|
||
|
assert d.terms is None
|
||
|
assert d.created is None
|
||
|
assert d.last_modified is None
|
||
|
assert d.description is None
|
||
|
assert d.custom_properties == {}
|
||
|
assert d.domain is None
|
||
|
|
||
|
with pytest.raises(AttributeError):
|
||
|
assert d.extra_attribute # type: ignore
|
||
|
with pytest.raises(AttributeError):
|
||
|
d.extra_attribute = "slots should reject extra fields" # type: ignore
|
||
|
with pytest.raises(AttributeError):
|
||
|
# This should fail. Eventually we should make it suggest calling set_owners instead.
|
||
|
d.owners = [] # type: ignore
|
||
|
|
||
|
assert_entity_golden(d, GOLDEN_DIR / "test_dataflow_basic_golden.json")
|
||
|
|
||
|
|
||
|
def test_dataflow_complex() -> None:
|
||
|
created = datetime(2025, 1, 2, 3, 4, 5, tzinfo=timezone.utc)
|
||
|
updated = datetime(2025, 1, 9, 3, 4, 6, tzinfo=timezone.utc)
|
||
|
|
||
|
d = DataFlow(
|
||
|
platform="airflow",
|
||
|
platform_instance="my_instance",
|
||
|
name="example_dag",
|
||
|
display_name="Example DAG",
|
||
|
created=created,
|
||
|
last_modified=updated,
|
||
|
custom_properties={
|
||
|
"key1": "value1",
|
||
|
"key2": "value2",
|
||
|
},
|
||
|
description="Test dataflow",
|
||
|
external_url="https://example.com",
|
||
|
owners=[
|
||
|
CorpUserUrn("admin@datahubproject.io"),
|
||
|
],
|
||
|
links=[
|
||
|
"https://example.com/doc1",
|
||
|
("https://example.com/doc2", "Documentation 2"),
|
||
|
],
|
||
|
tags=[
|
||
|
TagUrn("tag1"),
|
||
|
TagUrn("tag2"),
|
||
|
],
|
||
|
terms=[
|
||
|
GlossaryTermUrn("DataPipeline"),
|
||
|
],
|
||
|
domain=DomainUrn("Data Engineering"),
|
||
|
)
|
||
|
|
||
|
assert d.platform is not None
|
||
|
assert d.platform.platform_name == "airflow"
|
||
|
assert d.platform_instance is not None
|
||
|
assert (
|
||
|
str(d.platform_instance)
|
||
|
== "urn:li:dataPlatformInstance:(urn:li:dataPlatform:airflow,my_instance)"
|
||
|
)
|
||
|
|
||
|
# Properties.
|
||
|
assert d.description == "Test dataflow"
|
||
|
assert d.display_name == "Example DAG"
|
||
|
assert d.external_url == "https://example.com"
|
||
|
assert d.created == created
|
||
|
assert d.last_modified == updated
|
||
|
assert d.custom_properties == {"key1": "value1", "key2": "value2"}
|
||
|
|
||
|
# Check standard aspects.
|
||
|
assert d.owners is not None and len(d.owners) == 1
|
||
|
assert d.links is not None and len(d.links) == 2
|
||
|
assert d.tags is not None and len(d.tags) == 2
|
||
|
assert d.terms is not None and len(d.terms) == 1
|
||
|
assert d.domain == DomainUrn("Data Engineering")
|
||
|
|
||
|
# Add assertions for links
|
||
|
assert d.links is not None
|
||
|
assert len(d.links) == 2
|
||
|
assert d.links[0].url == "https://example.com/doc1"
|
||
|
assert d.links[1].url == "https://example.com/doc2"
|
||
|
|
||
|
assert_entity_golden(d, GOLDEN_DIR / "test_dataflow_complex_golden.json")
|
||
|
|
||
|
|
||
|
def test_client_get_dataflow() -> None:
|
||
|
"""Test retrieving DataFlows using client.entities.get()."""
|
||
|
# Set up mock
|
||
|
mock_client = mock.MagicMock()
|
||
|
mock_entities = mock.MagicMock()
|
||
|
mock_client.entities = mock_entities
|
||
|
|
||
|
# Basic retrieval
|
||
|
flow_urn = DataFlowUrn("airflow", "test_dag", DEFAULT_ENV)
|
||
|
expected_flow = DataFlow(
|
||
|
platform="airflow",
|
||
|
name="test_dag",
|
||
|
description="A test dataflow",
|
||
|
)
|
||
|
mock_entities.get.return_value = expected_flow
|
||
|
|
||
|
result = mock_client.entities.get(flow_urn)
|
||
|
assert result == expected_flow
|
||
|
mock_entities.get.assert_called_once_with(flow_urn)
|
||
|
mock_entities.get.reset_mock()
|
||
|
|
||
|
# String URN
|
||
|
urn_str = f"urn:li:dataFlow:(airflow,string_dag,{DEFAULT_ENV})"
|
||
|
mock_entities.get.return_value = DataFlow(platform="airflow", name="string_dag")
|
||
|
result = mock_client.entities.get(urn_str)
|
||
|
mock_entities.get.assert_called_once_with(urn_str)
|
||
|
mock_entities.get.reset_mock()
|
||
|
|
||
|
# Complex dataflow with properties
|
||
|
test_date = datetime(2023, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
|
||
|
complex_flow = DataFlow(
|
||
|
platform="airflow",
|
||
|
name="complex_dag",
|
||
|
description="Complex test dataflow",
|
||
|
display_name="My Complex DAG",
|
||
|
external_url="https://example.com/dag",
|
||
|
created=test_date,
|
||
|
last_modified=test_date,
|
||
|
custom_properties={"env": "production", "owner_team": "data-eng"},
|
||
|
)
|
||
|
|
||
|
# Set relationships and tags
|
||
|
complex_flow.set_tags([TagUrn("important"), TagUrn("data-pipeline")])
|
||
|
complex_flow.set_domain(DomainUrn("Data Engineering"))
|
||
|
complex_flow.set_owners([CorpUserUrn("john@example.com")])
|
||
|
|
||
|
flow_urn = DataFlowUrn("airflow", "complex_dag", DEFAULT_ENV)
|
||
|
mock_entities.get.return_value = complex_flow
|
||
|
|
||
|
result = mock_client.entities.get(flow_urn)
|
||
|
assert result.name == "complex_dag"
|
||
|
assert result.display_name == "My Complex DAG"
|
||
|
assert result.created == test_date
|
||
|
assert result.description == "Complex test dataflow"
|
||
|
assert result.tags is not None
|
||
|
assert result.domain is not None
|
||
|
assert result.owners is not None
|
||
|
mock_entities.get.assert_called_once_with(flow_urn)
|
||
|
mock_entities.get.reset_mock()
|
||
|
|
||
|
# Not found case
|
||
|
error_message = f"Entity {flow_urn} not found"
|
||
|
mock_entities.get.side_effect = ItemNotFoundError(error_message)
|
||
|
with pytest.raises(ItemNotFoundError, match=re.escape(error_message)):
|
||
|
mock_client.entities.get(flow_urn)
|
||
|
|
||
|
|
||
|
def test_dataflow_with_container() -> None:
|
||
|
container = Container(
|
||
|
container_key=ContainerKey(
|
||
|
platform="airflow", name="my_container", instance="my_instance"
|
||
|
),
|
||
|
display_name="My Container",
|
||
|
)
|
||
|
flow = DataFlow(
|
||
|
platform="airflow",
|
||
|
name="example_dag",
|
||
|
parent_container=container,
|
||
|
)
|
||
|
assert flow.parent_container == container.urn
|
||
|
assert flow.browse_path == [container.urn]
|