datahub/metadata-ingestion/tests/unit/sdk_v2/test_entity_client.py

144 lines
4.7 KiB
Python

import pathlib
from unittest.mock import Mock
import pytest
import datahub.metadata.schema_classes as models
from datahub.emitter.mcp_builder import DatabaseKey, SchemaKey
from datahub.errors import ItemNotFoundError, SdkUsageError
from datahub.ingestion.graph.client import DataHubGraph
from datahub.metadata.urns import DatasetUrn, TagUrn
from datahub.sdk.container import Container
from datahub.sdk.dataset import Dataset
from datahub.sdk.main_client import DataHubClient
from tests.test_helpers import mce_helpers
_GOLDEN_DIR = pathlib.Path(__file__).parent / "entity_client_goldens"
@pytest.fixture
def mock_graph() -> Mock:
graph = Mock(spec=DataHubGraph)
graph.exists.return_value = False
return graph
@pytest.fixture
def client(mock_graph: Mock) -> DataHubClient:
return DataHubClient(graph=mock_graph)
def assert_client_golden(client: DataHubClient, golden_path: pathlib.Path) -> None:
mcps = client._graph.emit_mcps.call_args[0][0] # type: ignore
mce_helpers.check_goldens_stream(
outputs=mcps,
golden_path=golden_path,
ignore_order=False,
)
def test_container_creation_flow(client: DataHubClient, mock_graph: Mock) -> None:
# Create database and schema containers
db = DatabaseKey(platform="snowflake", database="test_db")
schema = SchemaKey(**db.dict(), schema="test_schema")
db_container = Container(db, display_name="test_db", subtype="Database")
schema_container = Container(schema, display_name="test_schema", subtype="Schema")
# Test database container creation
client.entities.upsert(db_container)
assert_client_golden(client, _GOLDEN_DIR / "test_container_db_golden.json")
# Test schema container creation
client.entities.upsert(schema_container)
assert_client_golden(client, _GOLDEN_DIR / "test_container_schema_golden.json")
def test_dataset_creation(client: DataHubClient, mock_graph: Mock) -> None:
schema = SchemaKey(platform="snowflake", database="test_db", schema="test_schema")
dataset = Dataset(
platform="snowflake",
name="test_db.test_schema.table_1",
env="prod",
parent_container=schema,
schema=[
("col1", "string"),
("col2", "int"),
],
description="test description",
tags=[TagUrn("tag1")],
)
client.entities.create(dataset)
assert_client_golden(client, _GOLDEN_DIR / "test_dataset_creation_golden.json")
def test_dataset_read_modify_write(client: DataHubClient, mock_graph: Mock) -> None:
# Setup mock for existing dataset
mock_graph.exists.return_value = True
dataset_urn = DatasetUrn(
platform="snowflake", name="test_db.test_schema.table_1", env="prod"
)
# Mock the get_entity_semityped response with initial state
mock_graph.get_entity_semityped.return_value = {
"datasetProperties": models.DatasetPropertiesClass(
description="original description",
customProperties={},
tags=[],
)
}
# Get and update dataset
dataset = client.entities.get(dataset_urn)
dataset.set_description("updated description")
client.entities.update(dataset)
assert_client_golden(client, _GOLDEN_DIR / "test_dataset_update_golden.json")
def test_container_read_modify_write(client: DataHubClient, mock_graph: Mock) -> None:
database_key = DatabaseKey(platform="snowflake", database="test_db")
container_urn = database_key.as_urn_typed()
# Setup mocks for the container.
mock_graph.exists.return_value = True
mock_graph.get_entity_semityped.return_value = {
"containerProperties": models.ContainerPropertiesClass(
name="test_db",
)
}
# Get and update the container
container = client.entities.get(container_urn)
container.set_description("updated description")
client.entities.update(container)
assert_client_golden(client, _GOLDEN_DIR / "test_container_update_golden.json")
def test_create_existing_dataset_fails(client: DataHubClient, mock_graph: Mock) -> None:
mock_graph.exists.return_value = True
dataset = Dataset(
platform="snowflake",
name="test_db.test_schema.table_1",
env="prod",
schema=[("col1", "string")],
)
with pytest.raises(SdkUsageError, match="Entity .* already exists"):
client.entities.create(dataset)
def test_get_nonexistent_dataset_fails(client: DataHubClient, mock_graph: Mock) -> None:
mock_graph.exists.return_value = False
dataset_urn = DatasetUrn(
platform="snowflake", name="test_db.test_schema.missing_table", env="prod"
)
with pytest.raises(ItemNotFoundError, match="Entity .* not found"):
client.entities.get(dataset_urn)