mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-06 00:31:18 +00:00
144 lines
4.7 KiB
Python
144 lines
4.7 KiB
Python
import pathlib
|
|
from unittest.mock import Mock
|
|
|
|
import pytest
|
|
|
|
import datahub.metadata.schema_classes as models
|
|
from datahub.emitter.mcp_builder import DatabaseKey, SchemaKey
|
|
from datahub.errors import ItemNotFoundError, SdkUsageError
|
|
from datahub.ingestion.graph.client import DataHubGraph
|
|
from datahub.metadata.urns import DatasetUrn, TagUrn
|
|
from datahub.sdk.container import Container
|
|
from datahub.sdk.dataset import Dataset
|
|
from datahub.sdk.main_client import DataHubClient
|
|
from tests.test_helpers import mce_helpers
|
|
|
|
_GOLDEN_DIR = pathlib.Path(__file__).parent / "entity_client_goldens"
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_graph() -> Mock:
|
|
graph = Mock(spec=DataHubGraph)
|
|
graph.exists.return_value = False
|
|
return graph
|
|
|
|
|
|
@pytest.fixture
|
|
def client(mock_graph: Mock) -> DataHubClient:
|
|
return DataHubClient(graph=mock_graph)
|
|
|
|
|
|
def assert_client_golden(client: DataHubClient, golden_path: pathlib.Path) -> None:
|
|
mcps = client._graph.emit_mcps.call_args[0][0] # type: ignore
|
|
mce_helpers.check_goldens_stream(
|
|
outputs=mcps,
|
|
golden_path=golden_path,
|
|
ignore_order=False,
|
|
)
|
|
|
|
|
|
def test_container_creation_flow(client: DataHubClient, mock_graph: Mock) -> None:
|
|
# Create database and schema containers
|
|
db = DatabaseKey(platform="snowflake", database="test_db")
|
|
schema = SchemaKey(**db.dict(), schema="test_schema")
|
|
|
|
db_container = Container(db, display_name="test_db", subtype="Database")
|
|
schema_container = Container(schema, display_name="test_schema", subtype="Schema")
|
|
|
|
# Test database container creation
|
|
client.entities.upsert(db_container)
|
|
assert_client_golden(client, _GOLDEN_DIR / "test_container_db_golden.json")
|
|
|
|
# Test schema container creation
|
|
client.entities.upsert(schema_container)
|
|
assert_client_golden(client, _GOLDEN_DIR / "test_container_schema_golden.json")
|
|
|
|
|
|
def test_dataset_creation(client: DataHubClient, mock_graph: Mock) -> None:
|
|
schema = SchemaKey(platform="snowflake", database="test_db", schema="test_schema")
|
|
|
|
dataset = Dataset(
|
|
platform="snowflake",
|
|
name="test_db.test_schema.table_1",
|
|
env="prod",
|
|
parent_container=schema,
|
|
schema=[
|
|
("col1", "string"),
|
|
("col2", "int"),
|
|
],
|
|
description="test description",
|
|
tags=[TagUrn("tag1")],
|
|
)
|
|
|
|
client.entities.create(dataset)
|
|
assert_client_golden(client, _GOLDEN_DIR / "test_dataset_creation_golden.json")
|
|
|
|
|
|
def test_dataset_read_modify_write(client: DataHubClient, mock_graph: Mock) -> None:
|
|
# Setup mock for existing dataset
|
|
mock_graph.exists.return_value = True
|
|
dataset_urn = DatasetUrn(
|
|
platform="snowflake", name="test_db.test_schema.table_1", env="prod"
|
|
)
|
|
|
|
# Mock the get_entity_semityped response with initial state
|
|
mock_graph.get_entity_semityped.return_value = {
|
|
"datasetProperties": models.DatasetPropertiesClass(
|
|
description="original description",
|
|
customProperties={},
|
|
tags=[],
|
|
)
|
|
}
|
|
|
|
# Get and update dataset
|
|
dataset = client.entities.get(dataset_urn)
|
|
dataset.set_description("updated description")
|
|
|
|
client.entities.update(dataset)
|
|
assert_client_golden(client, _GOLDEN_DIR / "test_dataset_update_golden.json")
|
|
|
|
|
|
def test_container_read_modify_write(client: DataHubClient, mock_graph: Mock) -> None:
|
|
database_key = DatabaseKey(platform="snowflake", database="test_db")
|
|
container_urn = database_key.as_urn_typed()
|
|
|
|
# Setup mocks for the container.
|
|
mock_graph.exists.return_value = True
|
|
mock_graph.get_entity_semityped.return_value = {
|
|
"containerProperties": models.ContainerPropertiesClass(
|
|
name="test_db",
|
|
)
|
|
}
|
|
|
|
# Get and update the container
|
|
container = client.entities.get(container_urn)
|
|
container.set_description("updated description")
|
|
|
|
client.entities.update(container)
|
|
assert_client_golden(client, _GOLDEN_DIR / "test_container_update_golden.json")
|
|
|
|
|
|
def test_create_existing_dataset_fails(client: DataHubClient, mock_graph: Mock) -> None:
|
|
mock_graph.exists.return_value = True
|
|
|
|
dataset = Dataset(
|
|
platform="snowflake",
|
|
name="test_db.test_schema.table_1",
|
|
env="prod",
|
|
schema=[("col1", "string")],
|
|
)
|
|
|
|
with pytest.raises(SdkUsageError, match="Entity .* already exists"):
|
|
client.entities.create(dataset)
|
|
|
|
|
|
def test_get_nonexistent_dataset_fails(client: DataHubClient, mock_graph: Mock) -> None:
|
|
mock_graph.exists.return_value = False
|
|
|
|
dataset_urn = DatasetUrn(
|
|
platform="snowflake", name="test_db.test_schema.missing_table", env="prod"
|
|
)
|
|
|
|
with pytest.raises(ItemNotFoundError, match="Entity .* not found"):
|
|
client.entities.get(dataset_urn)
|