datahub/metadata-ingestion/tests/unit/test_unity_catalog_config.py

158 lines
4.8 KiB
Python

from datetime import datetime, timedelta
import pytest
from freezegun import freeze_time
from datahub.ingestion.source.unity.config import UnityCatalogSourceConfig
from datahub.ingestion.source.unity.source import UnityCatalogSource
FROZEN_TIME = datetime.fromisoformat("2023-01-01 00:00:00+00:00")
@freeze_time(FROZEN_TIME)
def test_within_thirty_days():
config = UnityCatalogSourceConfig.parse_obj(
{
"token": "token",
"workspace_url": "https://workspace_url",
"include_usage_statistics": True,
"include_hive_metastore": False,
"start_time": FROZEN_TIME - timedelta(days=30),
}
)
assert config.start_time == FROZEN_TIME - timedelta(days=30)
with pytest.raises(
ValueError, match="Query history is only maintained for 30 days."
):
UnityCatalogSourceConfig.parse_obj(
{
"token": "token",
"workspace_url": "https://workspace_url",
"include_usage_statistics": True,
"start_time": FROZEN_TIME - timedelta(days=31),
}
)
def test_profiling_requires_warehouses_id():
config = UnityCatalogSourceConfig.parse_obj(
{
"token": "token",
"workspace_url": "https://workspace_url",
"include_hive_metastore": False,
"profiling": {
"enabled": True,
"method": "ge",
"warehouse_id": "my_warehouse_id",
},
}
)
assert config.profiling.enabled is True
config = UnityCatalogSourceConfig.parse_obj(
{
"token": "token",
"workspace_url": "https://workspace_url",
"include_hive_metastore": False,
"profiling": {"enabled": False, "method": "ge"},
}
)
assert config.profiling.enabled is False
with pytest.raises(ValueError):
UnityCatalogSourceConfig.parse_obj(
{
"token": "token",
"include_hive_metastore": False,
"workspace_url": "workspace_url",
}
)
@freeze_time(FROZEN_TIME)
def test_workspace_url_should_start_with_https():
with pytest.raises(ValueError, match="Workspace URL must start with http scheme"):
UnityCatalogSourceConfig.parse_obj(
{
"token": "token",
"workspace_url": "workspace_url",
"profiling": {"enabled": True},
}
)
def test_global_warehouse_id_is_set_from_profiling():
config = UnityCatalogSourceConfig.parse_obj(
{
"token": "token",
"workspace_url": "https://XXXXXXXXXXXXXXXXXXXXX",
"profiling": {
"method": "ge",
"enabled": True,
"warehouse_id": "my_warehouse_id",
},
}
)
assert config.profiling.warehouse_id == "my_warehouse_id"
assert config.warehouse_id == "my_warehouse_id"
def test_set_different_warehouse_id_from_profiling():
with pytest.raises(
ValueError,
match="When `warehouse_id` is set, it must match the `warehouse_id` in `profiling`.",
):
UnityCatalogSourceConfig.parse_obj(
{
"token": "token",
"workspace_url": "https://XXXXXXXXXXXXXXXXXXXXX",
"warehouse_id": "my_global_warehouse_id",
"profiling": {
"method": "ge",
"enabled": True,
"warehouse_id": "my_warehouse_id",
},
}
)
def test_warehouse_id_must_be_set_if_include_hive_metastore_is_true():
with pytest.raises(
ValueError,
match="When `include_hive_metastore` is set, `warehouse_id` must be set.",
):
UnityCatalogSourceConfig.parse_obj(
{
"token": "token",
"workspace_url": "https://XXXXXXXXXXXXXXXXXXXXX",
"include_hive_metastore": True,
}
)
def test_warehouse_id_must_be_present_test_connection():
config_dict = {
"token": "token",
"workspace_url": "https://XXXXXXXXXXXXXXXXXXXXX",
"include_hive_metastore": True,
}
report = UnityCatalogSource.test_connection(config_dict)
assert report.internal_failure
print(report.internal_failure_reason)
def test_set_profiling_warehouse_id_from_global():
config = UnityCatalogSourceConfig.parse_obj(
{
"token": "token",
"workspace_url": "https://XXXXXXXXXXXXXXXXXXXXX",
"warehouse_id": "my_global_warehouse_id",
"profiling": {
"method": "ge",
"enabled": True,
},
}
)
assert config.profiling.warehouse_id == "my_global_warehouse_id"