datahub/smoke-test/tests/platform_resources/test_platform_resource.py

189 lines
5.8 KiB
Python
Raw Normal View History

import logging
import random
import string
from typing import List
import pytest
from datahub.api.entities.platformresource.platform_resource import (
PlatformResource,
PlatformResourceKey,
)
from tests.utils import wait_for_healthcheck_util, wait_for_writes_to_sync
logger = logging.getLogger(__name__)
@pytest.fixture(scope="module")
def wait_for_healthchecks(auth_session):
wait_for_healthcheck_util(auth_session)
yield
def generate_random_id(length=8):
return "".join(random.choices(string.ascii_lowercase + string.digits, k=length))
@pytest.fixture
def test_id():
return f"test_{generate_random_id()}"
@pytest.fixture(scope="function", autouse=True)
def cleanup_resources(graph_client):
created_resources: List[PlatformResource] = []
yield created_resources
# Delete all generated platform resources after each test
for resource in created_resources:
try:
resource.delete(graph_client)
except Exception as e:
logger.warning(f"Failed to delete resource: {e}")
# Additional cleanup for any resources that might have been missed
for resource in PlatformResource.search_by_key(graph_client, "test_"):
try:
resource.delete(graph_client)
except Exception as e:
logger.warning(f"Failed to delete resource during final cleanup: {e}")
def test_platform_resource_read_write(graph_client, test_id, cleanup_resources):
key = PlatformResourceKey(
platform=f"test_platform_{test_id}",
resource_type=f"test_resource_type_{test_id}",
primary_key=f"test_primary_key_{test_id}",
)
platform_resource = PlatformResource.create(
key=key,
secondary_keys=[f"test_secondary_key_{test_id}"],
value={"test_key": f"test_value_{test_id}"},
)
platform_resource.to_datahub(graph_client)
cleanup_resources.append(platform_resource)
wait_for_writes_to_sync()
read_platform_resource = PlatformResource.from_datahub(graph_client, key)
assert read_platform_resource == platform_resource
def test_platform_resource_search(graph_client, test_id, cleanup_resources):
key = PlatformResourceKey(
platform=f"test_platform_{test_id}",
resource_type=f"test_resource_type_{test_id}",
primary_key=f"test_primary_key_{test_id}",
)
platform_resource = PlatformResource.create(
key=key,
secondary_keys=[f"test_secondary_key_{test_id}"],
value={"test_key": f"test_value_{test_id}"},
)
platform_resource.to_datahub(graph_client)
cleanup_resources.append(platform_resource)
wait_for_writes_to_sync()
search_results = [
r for r in PlatformResource.search_by_key(graph_client, key.primary_key)
]
assert len(search_results) == 1
assert search_results[0] == platform_resource
search_results = [
r
for r in PlatformResource.search_by_key(
graph_client, f"test_secondary_key_{test_id}", primary=False
)
]
assert len(search_results) == 1
assert search_results[0] == platform_resource
def test_platform_resource_non_existent(graph_client, test_id):
key = PlatformResourceKey(
platform=f"test_platform_{test_id}",
resource_type=f"test_resource_type_{test_id}",
primary_key=f"test_primary_key_{test_id}",
)
platform_resource = PlatformResource.from_datahub(
key=key,
graph_client=graph_client,
)
assert platform_resource is None
def test_platform_resource_urn_secondary_key(graph_client, test_id):
key = PlatformResourceKey(
platform=f"test_platform_{test_id}",
resource_type=f"test_resource_type_{test_id}",
primary_key=f"test_primary_key_{test_id}",
)
dataset_urn = (
f"urn:li:dataset:(urn:li:dataPlatform:test,test_secondary_key_{test_id},PROD)"
)
platform_resource = PlatformResource.create(
key=key,
value={"test_key": f"test_value_{test_id}"},
secondary_keys=[dataset_urn],
)
platform_resource.to_datahub(graph_client)
wait_for_writes_to_sync()
read_platform_resources = [
r
for r in PlatformResource.search_by_key(
graph_client, dataset_urn, primary=False
)
]
assert len(read_platform_resources) == 1
assert read_platform_resources[0] == platform_resource
def test_platform_resource_listing_by_resource_type(graph_client, test_id):
# Generate two resources with the same resource type
key1 = PlatformResourceKey(
platform=f"test_platform_{test_id}",
resource_type=f"test_resource_type_{test_id}",
primary_key=f"test_primary_key_1_{test_id}",
)
platform_resource1 = PlatformResource.create(
key=key1,
value={"test_key": f"test_value_1_{test_id}"},
)
platform_resource1.to_datahub(graph_client)
key2 = PlatformResourceKey(
platform=f"test_platform_{test_id}",
resource_type=f"test_resource_type_{test_id}",
primary_key=f"test_primary_key_2_{test_id}",
)
platform_resource2 = PlatformResource.create(
key=key2,
value={"test_key": f"test_value_2_{test_id}"},
)
platform_resource2.to_datahub(graph_client)
wait_for_writes_to_sync()
search_results = [
r
for r in PlatformResource.search_by_filters(
graph_client,
and_filters=[
{
"field": "resourceType",
"condition": "EQUAL",
"value": key1.resource_type,
}
],
)
]
assert len(search_results) == 2
read_platform_resource_1 = next(r for r in search_results if r.id == key1.id)
read_platform_resource_2 = next(r for r in search_results if r.id == key2.id)
assert read_platform_resource_1 == platform_resource1
assert read_platform_resource_2 == platform_resource2