mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-09 02:02:12 +00:00
142 lines
4.2 KiB
Python
142 lines
4.2 KiB
Python
![]() |
import pathlib
|
||
|
from unittest.mock import Mock
|
||
|
|
||
|
import pytest
|
||
|
|
||
|
import datahub.metadata.schema_classes as models
|
||
|
from datahub.emitter.mcp_builder import DatabaseKey, SchemaKey
|
||
|
from datahub.errors import ItemNotFoundError, SdkUsageError
|
||
|
from datahub.ingestion.graph.client import DataHubGraph
|
||
|
from datahub.metadata.urns import DatasetUrn, TagUrn
|
||
|
from datahub.sdk.container import Container
|
||
|
from datahub.sdk.dataset import Dataset
|
||
|
from datahub.sdk.main_client import DataHubClient
|
||
|
from tests.test_helpers import mce_helpers
|
||
|
|
||
|
_GOLDEN_DIR = pathlib.Path(__file__).parent / "entity_client_goldens"
|
||
|
|
||
|
|
||
|
@pytest.fixture
|
||
|
def mock_graph() -> Mock:
|
||
|
graph = Mock(spec=DataHubGraph)
|
||
|
graph.exists.return_value = False
|
||
|
return graph
|
||
|
|
||
|
|
||
|
@pytest.fixture
|
||
|
def client(mock_graph: Mock) -> DataHubClient:
|
||
|
return DataHubClient(graph=mock_graph)
|
||
|
|
||
|
|
||
|
def assert_client_golden(
|
||
|
pytestconfig: pytest.Config,
|
||
|
client: DataHubClient,
|
||
|
golden_path: pathlib.Path,
|
||
|
) -> None:
|
||
|
mcps = client._graph.emit_mcps.call_args[0][0] # type: ignore
|
||
|
mce_helpers.check_goldens_stream(
|
||
|
outputs=mcps,
|
||
|
golden_path=golden_path,
|
||
|
ignore_order=False,
|
||
|
)
|
||
|
|
||
|
|
||
|
def test_container_creation_flow(
|
||
|
pytestconfig: pytest.Config, client: DataHubClient, mock_graph: Mock
|
||
|
) -> None:
|
||
|
# Create database and schema containers
|
||
|
db = DatabaseKey(platform="snowflake", database="test_db")
|
||
|
schema = SchemaKey(**db.dict(), schema="test_schema")
|
||
|
|
||
|
db_container = Container(db, display_name="test_db", subtype="Database")
|
||
|
schema_container = Container(schema, display_name="test_schema", subtype="Schema")
|
||
|
|
||
|
# Test database container creation
|
||
|
client.entities.upsert(db_container)
|
||
|
assert_client_golden(
|
||
|
pytestconfig, client, _GOLDEN_DIR / "test_container_db_golden.json"
|
||
|
)
|
||
|
|
||
|
# Test schema container creation
|
||
|
client.entities.upsert(schema_container)
|
||
|
assert_client_golden(
|
||
|
pytestconfig, client, _GOLDEN_DIR / "test_container_schema_golden.json"
|
||
|
)
|
||
|
|
||
|
|
||
|
def test_dataset_creation(
|
||
|
pytestconfig: pytest.Config, client: DataHubClient, mock_graph: Mock
|
||
|
) -> None:
|
||
|
schema = SchemaKey(platform="snowflake", database="test_db", schema="test_schema")
|
||
|
|
||
|
dataset = Dataset(
|
||
|
platform="snowflake",
|
||
|
name="test_db.test_schema.table_1",
|
||
|
env="prod",
|
||
|
container=schema,
|
||
|
schema=[
|
||
|
("col1", "string"),
|
||
|
("col2", "int"),
|
||
|
],
|
||
|
description="test description",
|
||
|
tags=[TagUrn("tag1")],
|
||
|
)
|
||
|
|
||
|
client.entities.create(dataset)
|
||
|
assert_client_golden(
|
||
|
pytestconfig, client, _GOLDEN_DIR / "test_dataset_creation_golden.json"
|
||
|
)
|
||
|
|
||
|
|
||
|
def test_dataset_read_modify_write(
|
||
|
pytestconfig: pytest.Config, client: DataHubClient, mock_graph: Mock
|
||
|
) -> None:
|
||
|
# Setup mock for existing dataset
|
||
|
mock_graph.exists.return_value = True
|
||
|
dataset_urn = DatasetUrn(
|
||
|
platform="snowflake", name="test_db.test_schema.table_1", env="prod"
|
||
|
)
|
||
|
|
||
|
# Mock the get_entity_semityped response with initial state
|
||
|
mock_graph.get_entity_semityped.return_value = {
|
||
|
"datasetProperties": models.DatasetPropertiesClass(
|
||
|
description="original description",
|
||
|
customProperties={},
|
||
|
tags=[],
|
||
|
)
|
||
|
}
|
||
|
|
||
|
# Get and update dataset
|
||
|
dataset = client.entities.get(dataset_urn)
|
||
|
dataset.set_description("updated description")
|
||
|
|
||
|
client.entities.update(dataset)
|
||
|
assert_client_golden(
|
||
|
pytestconfig, client, _GOLDEN_DIR / "test_dataset_update_golden.json"
|
||
|
)
|
||
|
|
||
|
|
||
|
def test_create_existing_dataset_fails(client: DataHubClient, mock_graph: Mock) -> None:
|
||
|
mock_graph.exists.return_value = True
|
||
|
|
||
|
dataset = Dataset(
|
||
|
platform="snowflake",
|
||
|
name="test_db.test_schema.table_1",
|
||
|
env="prod",
|
||
|
schema=[("col1", "string")],
|
||
|
)
|
||
|
|
||
|
with pytest.raises(SdkUsageError, match="Entity .* already exists"):
|
||
|
client.entities.create(dataset)
|
||
|
|
||
|
|
||
|
def test_get_nonexistent_dataset_fails(client: DataHubClient, mock_graph: Mock) -> None:
|
||
|
mock_graph.exists.return_value = False
|
||
|
|
||
|
dataset_urn = DatasetUrn(
|
||
|
platform="snowflake", name="test_db.test_schema.missing_table", env="prod"
|
||
|
)
|
||
|
|
||
|
with pytest.raises(ItemNotFoundError, match="Entity .* not found"):
|
||
|
client.entities.get(dataset_urn)
|