mirror of
				https://github.com/datahub-project/datahub.git
				synced 2025-11-04 04:39:10 +00:00 
			
		
		
		
	
		
			
	
	
		
			207 lines
		
	
	
		
			6.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
		
		
			
		
	
	
			207 lines
		
	
	
		
			6.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| 
								 | 
							
								import pathlib
							 | 
						||
| 
								 | 
							
								import re
							 | 
						||
| 
								 | 
							
								from datetime import datetime, timezone
							 | 
						||
| 
								 | 
							
								from unittest import mock
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								import pytest
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								from datahub.emitter.mce_builder import DEFAULT_ENV
							 | 
						||
| 
								 | 
							
								from datahub.emitter.mcp_builder import ContainerKey
							 | 
						||
| 
								 | 
							
								from datahub.errors import ItemNotFoundError
							 | 
						||
| 
								 | 
							
								from datahub.metadata.urns import (
							 | 
						||
| 
								 | 
							
								    CorpUserUrn,
							 | 
						||
| 
								 | 
							
								    DataFlowUrn,
							 | 
						||
| 
								 | 
							
								    DomainUrn,
							 | 
						||
| 
								 | 
							
								    GlossaryTermUrn,
							 | 
						||
| 
								 | 
							
								    TagUrn,
							 | 
						||
| 
								 | 
							
								)
							 | 
						||
| 
								 | 
							
								from datahub.sdk.container import Container
							 | 
						||
| 
								 | 
							
								from datahub.sdk.dataflow import DataFlow
							 | 
						||
| 
								 | 
							
								from datahub.testing.sdk_v2_helpers import assert_entity_golden
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								GOLDEN_DIR = pathlib.Path(__file__).parent / "dataflow_golden"
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def test_dataflow_basic(pytestconfig: pytest.Config) -> None:
							 | 
						||
| 
								 | 
							
								    d = DataFlow(
							 | 
						||
| 
								 | 
							
								        platform="airflow",
							 | 
						||
| 
								 | 
							
								        name="example_dag",
							 | 
						||
| 
								 | 
							
								    )
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    # Check urn setup.
							 | 
						||
| 
								 | 
							
								    assert DataFlow.get_urn_type() == DataFlowUrn
							 | 
						||
| 
								 | 
							
								    assert isinstance(d.urn, DataFlowUrn)
							 | 
						||
| 
								 | 
							
								    assert str(d.urn) == f"urn:li:dataFlow:(airflow,example_dag,{DEFAULT_ENV})"
							 | 
						||
| 
								 | 
							
								    assert str(d.urn) in repr(d)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    # Check most attributes.
							 | 
						||
| 
								 | 
							
								    assert d.platform is not None
							 | 
						||
| 
								 | 
							
								    assert d.platform.platform_name == "airflow"
							 | 
						||
| 
								 | 
							
								    assert d.platform_instance is None
							 | 
						||
| 
								 | 
							
								    assert d.tags is None
							 | 
						||
| 
								 | 
							
								    assert d.terms is None
							 | 
						||
| 
								 | 
							
								    assert d.created is None
							 | 
						||
| 
								 | 
							
								    assert d.last_modified is None
							 | 
						||
| 
								 | 
							
								    assert d.description is None
							 | 
						||
| 
								 | 
							
								    assert d.custom_properties == {}
							 | 
						||
| 
								 | 
							
								    assert d.domain is None
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    with pytest.raises(AttributeError):
							 | 
						||
| 
								 | 
							
								        assert d.extra_attribute  # type: ignore
							 | 
						||
| 
								 | 
							
								    with pytest.raises(AttributeError):
							 | 
						||
| 
								 | 
							
								        d.extra_attribute = "slots should reject extra fields"  # type: ignore
							 | 
						||
| 
								 | 
							
								    with pytest.raises(AttributeError):
							 | 
						||
| 
								 | 
							
								        # This should fail. Eventually we should make it suggest calling set_owners instead.
							 | 
						||
| 
								 | 
							
								        d.owners = []  # type: ignore
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    assert_entity_golden(d, GOLDEN_DIR / "test_dataflow_basic_golden.json")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def test_dataflow_complex() -> None:
							 | 
						||
| 
								 | 
							
								    created = datetime(2025, 1, 2, 3, 4, 5, tzinfo=timezone.utc)
							 | 
						||
| 
								 | 
							
								    updated = datetime(2025, 1, 9, 3, 4, 6, tzinfo=timezone.utc)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    d = DataFlow(
							 | 
						||
| 
								 | 
							
								        platform="airflow",
							 | 
						||
| 
								 | 
							
								        platform_instance="my_instance",
							 | 
						||
| 
								 | 
							
								        name="example_dag",
							 | 
						||
| 
								 | 
							
								        display_name="Example DAG",
							 | 
						||
| 
								 | 
							
								        created=created,
							 | 
						||
| 
								 | 
							
								        last_modified=updated,
							 | 
						||
| 
								 | 
							
								        custom_properties={
							 | 
						||
| 
								 | 
							
								            "key1": "value1",
							 | 
						||
| 
								 | 
							
								            "key2": "value2",
							 | 
						||
| 
								 | 
							
								        },
							 | 
						||
| 
								 | 
							
								        description="Test dataflow",
							 | 
						||
| 
								 | 
							
								        external_url="https://example.com",
							 | 
						||
| 
								 | 
							
								        owners=[
							 | 
						||
| 
								 | 
							
								            CorpUserUrn("admin@datahubproject.io"),
							 | 
						||
| 
								 | 
							
								        ],
							 | 
						||
| 
								 | 
							
								        links=[
							 | 
						||
| 
								 | 
							
								            "https://example.com/doc1",
							 | 
						||
| 
								 | 
							
								            ("https://example.com/doc2", "Documentation 2"),
							 | 
						||
| 
								 | 
							
								        ],
							 | 
						||
| 
								 | 
							
								        tags=[
							 | 
						||
| 
								 | 
							
								            TagUrn("tag1"),
							 | 
						||
| 
								 | 
							
								            TagUrn("tag2"),
							 | 
						||
| 
								 | 
							
								        ],
							 | 
						||
| 
								 | 
							
								        terms=[
							 | 
						||
| 
								 | 
							
								            GlossaryTermUrn("DataPipeline"),
							 | 
						||
| 
								 | 
							
								        ],
							 | 
						||
| 
								 | 
							
								        domain=DomainUrn("Data Engineering"),
							 | 
						||
| 
								 | 
							
								    )
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    assert d.platform is not None
							 | 
						||
| 
								 | 
							
								    assert d.platform.platform_name == "airflow"
							 | 
						||
| 
								 | 
							
								    assert d.platform_instance is not None
							 | 
						||
| 
								 | 
							
								    assert (
							 | 
						||
| 
								 | 
							
								        str(d.platform_instance)
							 | 
						||
| 
								 | 
							
								        == "urn:li:dataPlatformInstance:(urn:li:dataPlatform:airflow,my_instance)"
							 | 
						||
| 
								 | 
							
								    )
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    # Properties.
							 | 
						||
| 
								 | 
							
								    assert d.description == "Test dataflow"
							 | 
						||
| 
								 | 
							
								    assert d.display_name == "Example DAG"
							 | 
						||
| 
								 | 
							
								    assert d.external_url == "https://example.com"
							 | 
						||
| 
								 | 
							
								    assert d.created == created
							 | 
						||
| 
								 | 
							
								    assert d.last_modified == updated
							 | 
						||
| 
								 | 
							
								    assert d.custom_properties == {"key1": "value1", "key2": "value2"}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    # Check standard aspects.
							 | 
						||
| 
								 | 
							
								    assert d.owners is not None and len(d.owners) == 1
							 | 
						||
| 
								 | 
							
								    assert d.links is not None and len(d.links) == 2
							 | 
						||
| 
								 | 
							
								    assert d.tags is not None and len(d.tags) == 2
							 | 
						||
| 
								 | 
							
								    assert d.terms is not None and len(d.terms) == 1
							 | 
						||
| 
								 | 
							
								    assert d.domain == DomainUrn("Data Engineering")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    # Add assertions for links
							 | 
						||
| 
								 | 
							
								    assert d.links is not None
							 | 
						||
| 
								 | 
							
								    assert len(d.links) == 2
							 | 
						||
| 
								 | 
							
								    assert d.links[0].url == "https://example.com/doc1"
							 | 
						||
| 
								 | 
							
								    assert d.links[1].url == "https://example.com/doc2"
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    assert_entity_golden(d, GOLDEN_DIR / "test_dataflow_complex_golden.json")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def test_client_get_dataflow() -> None:
							 | 
						||
| 
								 | 
							
								    """Test retrieving DataFlows using client.entities.get()."""
							 | 
						||
| 
								 | 
							
								    # Set up mock
							 | 
						||
| 
								 | 
							
								    mock_client = mock.MagicMock()
							 | 
						||
| 
								 | 
							
								    mock_entities = mock.MagicMock()
							 | 
						||
| 
								 | 
							
								    mock_client.entities = mock_entities
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    # Basic retrieval
							 | 
						||
| 
								 | 
							
								    flow_urn = DataFlowUrn("airflow", "test_dag", DEFAULT_ENV)
							 | 
						||
| 
								 | 
							
								    expected_flow = DataFlow(
							 | 
						||
| 
								 | 
							
								        platform="airflow",
							 | 
						||
| 
								 | 
							
								        name="test_dag",
							 | 
						||
| 
								 | 
							
								        description="A test dataflow",
							 | 
						||
| 
								 | 
							
								    )
							 | 
						||
| 
								 | 
							
								    mock_entities.get.return_value = expected_flow
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    result = mock_client.entities.get(flow_urn)
							 | 
						||
| 
								 | 
							
								    assert result == expected_flow
							 | 
						||
| 
								 | 
							
								    mock_entities.get.assert_called_once_with(flow_urn)
							 | 
						||
| 
								 | 
							
								    mock_entities.get.reset_mock()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    # String URN
							 | 
						||
| 
								 | 
							
								    urn_str = f"urn:li:dataFlow:(airflow,string_dag,{DEFAULT_ENV})"
							 | 
						||
| 
								 | 
							
								    mock_entities.get.return_value = DataFlow(platform="airflow", name="string_dag")
							 | 
						||
| 
								 | 
							
								    result = mock_client.entities.get(urn_str)
							 | 
						||
| 
								 | 
							
								    mock_entities.get.assert_called_once_with(urn_str)
							 | 
						||
| 
								 | 
							
								    mock_entities.get.reset_mock()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    # Complex dataflow with properties
							 | 
						||
| 
								 | 
							
								    test_date = datetime(2023, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
							 | 
						||
| 
								 | 
							
								    complex_flow = DataFlow(
							 | 
						||
| 
								 | 
							
								        platform="airflow",
							 | 
						||
| 
								 | 
							
								        name="complex_dag",
							 | 
						||
| 
								 | 
							
								        description="Complex test dataflow",
							 | 
						||
| 
								 | 
							
								        display_name="My Complex DAG",
							 | 
						||
| 
								 | 
							
								        external_url="https://example.com/dag",
							 | 
						||
| 
								 | 
							
								        created=test_date,
							 | 
						||
| 
								 | 
							
								        last_modified=test_date,
							 | 
						||
| 
								 | 
							
								        custom_properties={"env": "production", "owner_team": "data-eng"},
							 | 
						||
| 
								 | 
							
								    )
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    # Set relationships and tags
							 | 
						||
| 
								 | 
							
								    complex_flow.set_tags([TagUrn("important"), TagUrn("data-pipeline")])
							 | 
						||
| 
								 | 
							
								    complex_flow.set_domain(DomainUrn("Data Engineering"))
							 | 
						||
| 
								 | 
							
								    complex_flow.set_owners([CorpUserUrn("john@example.com")])
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    flow_urn = DataFlowUrn("airflow", "complex_dag", DEFAULT_ENV)
							 | 
						||
| 
								 | 
							
								    mock_entities.get.return_value = complex_flow
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    result = mock_client.entities.get(flow_urn)
							 | 
						||
| 
								 | 
							
								    assert result.name == "complex_dag"
							 | 
						||
| 
								 | 
							
								    assert result.display_name == "My Complex DAG"
							 | 
						||
| 
								 | 
							
								    assert result.created == test_date
							 | 
						||
| 
								 | 
							
								    assert result.description == "Complex test dataflow"
							 | 
						||
| 
								 | 
							
								    assert result.tags is not None
							 | 
						||
| 
								 | 
							
								    assert result.domain is not None
							 | 
						||
| 
								 | 
							
								    assert result.owners is not None
							 | 
						||
| 
								 | 
							
								    mock_entities.get.assert_called_once_with(flow_urn)
							 | 
						||
| 
								 | 
							
								    mock_entities.get.reset_mock()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    # Not found case
							 | 
						||
| 
								 | 
							
								    error_message = f"Entity {flow_urn} not found"
							 | 
						||
| 
								 | 
							
								    mock_entities.get.side_effect = ItemNotFoundError(error_message)
							 | 
						||
| 
								 | 
							
								    with pytest.raises(ItemNotFoundError, match=re.escape(error_message)):
							 | 
						||
| 
								 | 
							
								        mock_client.entities.get(flow_urn)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def test_dataflow_with_container() -> None:
							 | 
						||
| 
								 | 
							
								    container = Container(
							 | 
						||
| 
								 | 
							
								        container_key=ContainerKey(
							 | 
						||
| 
								 | 
							
								            platform="airflow", name="my_container", instance="my_instance"
							 | 
						||
| 
								 | 
							
								        ),
							 | 
						||
| 
								 | 
							
								        display_name="My Container",
							 | 
						||
| 
								 | 
							
								    )
							 | 
						||
| 
								 | 
							
								    flow = DataFlow(
							 | 
						||
| 
								 | 
							
								        platform="airflow",
							 | 
						||
| 
								 | 
							
								        name="example_dag",
							 | 
						||
| 
								 | 
							
								        parent_container=container,
							 | 
						||
| 
								 | 
							
								    )
							 | 
						||
| 
								 | 
							
								    assert flow.parent_container == container.urn
							 | 
						||
| 
								 | 
							
								    assert flow.browse_path == [container.urn]
							 |