datahub/smoke-test/tests/platform_resources/test_platform_resource.py
2025-01-17 23:50:13 +05:30

241 lines
7.5 KiB
Python

import logging
import random
import string
from typing import List
import pytest
from datahub.api.entities.platformresource.platform_resource import (
ElasticPlatformResourceQuery,
PlatformResource,
PlatformResourceKey,
PlatformResourceSearchFields,
)
from tests.utils import wait_for_writes_to_sync
logger = logging.getLogger(__name__)
def generate_random_id(length=8):
return "".join(random.choices(string.ascii_lowercase + string.digits, k=length))
@pytest.fixture
def test_id():
return f"test_{generate_random_id()}"
@pytest.fixture(scope="function", autouse=True)
def cleanup_resources(graph_client):
created_resources: List[PlatformResource] = []
yield created_resources
# Delete all generated platform resources after each test
for resource in created_resources:
try:
resource.delete(graph_client)
except Exception as e:
logger.warning(f"Failed to delete resource: {e}")
# Additional cleanup for any resources that might have been missed
for resource in PlatformResource.search_by_filters(
graph_client,
ElasticPlatformResourceQuery.create_from().add_wildcard(
PlatformResourceSearchFields.PRIMARY_KEY, "test_*"
),
):
try:
resource.delete(graph_client)
except Exception as e:
logger.warning(f"Failed to delete resource during final cleanup: {e}")
def test_platform_resource_read_write(graph_client, test_id, cleanup_resources):
key = PlatformResourceKey(
platform=f"test_platform_{test_id}",
resource_type=f"test_resource_type_{test_id}",
primary_key=f"test_primary_key_{test_id}",
)
platform_resource = PlatformResource.create(
key=key,
secondary_keys=[f"test_secondary_key_{test_id}"],
value={"test_key": f"test_value_{test_id}"},
)
platform_resource.to_datahub(graph_client)
cleanup_resources.append(platform_resource)
wait_for_writes_to_sync()
read_platform_resource = PlatformResource.from_datahub(graph_client, key)
assert read_platform_resource == platform_resource
def test_platform_resource_search(graph_client, test_id, cleanup_resources):
key = PlatformResourceKey(
platform=f"test_platform_{test_id}",
resource_type=f"test_resource_type_{test_id}",
primary_key=f"test_primary_key_{test_id}",
)
platform_resource = PlatformResource.create(
key=key,
secondary_keys=[f"test_secondary_key_{test_id}"],
value={"test_key": f"test_value_{test_id}"},
)
platform_resource.to_datahub(graph_client)
cleanup_resources.append(platform_resource)
wait_for_writes_to_sync()
search_results = [
r for r in PlatformResource.search_by_key(graph_client, key.primary_key)
]
assert len(search_results) == 1
assert search_results[0] == platform_resource
search_results = [
r
for r in PlatformResource.search_by_key(
graph_client, f"test_secondary_key_{test_id}", primary=False
)
]
assert len(search_results) == 1
assert search_results[0] == platform_resource
def test_platform_resource_non_existent(graph_client, test_id):
key = PlatformResourceKey(
platform=f"test_platform_{test_id}",
resource_type=f"test_resource_type_{test_id}",
primary_key=f"test_primary_key_{test_id}",
)
platform_resource = PlatformResource.from_datahub(
key=key,
graph_client=graph_client,
)
assert platform_resource is None
def test_platform_resource_urn_secondary_key(graph_client, test_id, cleanup_resources):
key = PlatformResourceKey(
platform=f"test_platform_{test_id}",
resource_type=f"test_resource_type_{test_id}",
primary_key=f"test_primary_key_{test_id}",
)
dataset_urn = (
f"urn:li:dataset:(urn:li:dataPlatform:test,test_secondary_key_{test_id},PROD)"
)
platform_resource = PlatformResource.create(
key=key,
value={"test_key": f"test_value_{test_id}"},
secondary_keys=[dataset_urn],
)
platform_resource.to_datahub(graph_client)
cleanup_resources.append(platform_resource)
wait_for_writes_to_sync()
read_platform_resources = [
r
for r in PlatformResource.search_by_key(
graph_client, dataset_urn, primary=False
)
]
assert len(read_platform_resources) == 1
assert read_platform_resources[0] == platform_resource
def test_platform_resource_listing_by_resource_type(
graph_client, test_id, cleanup_resources
):
# Generate two resources with the same resource type
key1 = PlatformResourceKey(
platform=f"test_platform_{test_id}",
resource_type=f"test_resource_type_{test_id}",
primary_key=f"test_primary_key_1_{test_id}",
)
platform_resource1 = PlatformResource.create(
key=key1,
value={"test_key": f"test_value_1_{test_id}"},
)
platform_resource1.to_datahub(graph_client)
key2 = PlatformResourceKey(
platform=f"test_platform_{test_id}",
resource_type=f"test_resource_type_{test_id}",
primary_key=f"test_primary_key_2_{test_id}",
)
platform_resource2 = PlatformResource.create(
key=key2,
value={"test_key": f"test_value_2_{test_id}"},
)
platform_resource2.to_datahub(graph_client)
wait_for_writes_to_sync()
search_results = [
r
for r in PlatformResource.search_by_filters(
graph_client,
query=ElasticPlatformResourceQuery.create_from(
(PlatformResourceSearchFields.RESOURCE_TYPE, key1.resource_type)
),
)
]
assert len(search_results) == 2
read_platform_resource_1 = next(r for r in search_results if r.id == key1.id)
read_platform_resource_2 = next(r for r in search_results if r.id == key2.id)
assert read_platform_resource_1 == platform_resource1
assert read_platform_resource_2 == platform_resource2
def test_platform_resource_listing_complex_queries(graph_client, test_id):
# Generate two resources with the same resource type
key1 = PlatformResourceKey(
platform=f"test_platform1_{test_id}",
resource_type=f"test_resource_type_{test_id}",
primary_key=f"test_primary_key_1_{test_id}",
)
platform_resource1 = PlatformResource.create(
key=key1,
value={"test_key": f"test_value_1_{test_id}"},
)
platform_resource1.to_datahub(graph_client)
key2 = PlatformResourceKey(
platform=f"test_platform2_{test_id}",
resource_type=f"test_resource_type_{test_id}",
primary_key=f"test_primary_key_2_{test_id}",
)
platform_resource2 = PlatformResource.create(
key=key2,
value={"test_key": f"test_value_2_{test_id}"},
)
platform_resource2.to_datahub(graph_client)
wait_for_writes_to_sync()
from datahub.api.entities.platformresource.platform_resource import (
ElasticPlatformResourceQuery,
LogicalOperator,
PlatformResourceSearchFields,
)
query = (
ElasticPlatformResourceQuery.create_from()
.group(LogicalOperator.AND)
.add_field_match(PlatformResourceSearchFields.RESOURCE_TYPE, key1.resource_type)
.add_field_not_match(PlatformResourceSearchFields.PLATFORM, key1.platform)
.end()
)
search_results = [
r
for r in PlatformResource.search_by_filters(
graph_client,
query=query,
)
]
assert len(search_results) == 1
read_platform_resource = search_results[0]
assert read_platform_resource == platform_resource2