datahub/smoke-test/tests/domains/domains_test.py

234 lines
6.9 KiB
Python

import pytest
import tenacity
from tests.utils import delete_urns_from_file, get_sleep_info, ingest_file_via_rest
sleep_sec, sleep_times = get_sleep_info()
@pytest.fixture(scope="module", autouse=False)
def ingest_cleanup_data(auth_session, graph_client, request):
print("ingesting domains test data")
ingest_file_via_rest(auth_session, "tests/domains/data.json")
yield
print("removing domains test data")
delete_urns_from_file(graph_client, "tests/domains/data.json")
@tenacity.retry(
stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec)
)
def _ensure_more_domains(auth_session, list_domains_json, before_count):
# Get new count of Domains
response = auth_session.post(
f"{auth_session.frontend_url()}/api/v2/graphql", json=list_domains_json
)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["listDomains"]["total"] is not None
assert "errors" not in res_data
# Assert that there are more domains now.
after_count = res_data["data"]["listDomains"]["total"]
print(f"after_count is {after_count}")
assert after_count == before_count + 1
@pytest.mark.dependency()
def test_create_list_get_domain(auth_session):
# Setup: Delete the domain (if exists)
response = auth_session.post(
f"{auth_session.gms_url()}/entities?action=delete",
json={"urn": "urn:li:domain:test id"},
)
# Get count of existing secrets
list_domains_json = {
"query": """query listDomains($input: ListDomainsInput!) {\n
listDomains(input: $input) {\n
start\n
count\n
total\n
domains {\n
urn\n
properties {\n
name\n
}\n
}\n
}\n
}""",
"variables": {"input": {"start": "0", "count": "20"}},
}
response = auth_session.post(
f"{auth_session.frontend_url()}/api/v2/graphql", json=list_domains_json
)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["listDomains"]["total"] is not None
assert "errors" not in res_data
print(f"domains resp is {res_data}")
before_count = res_data["data"]["listDomains"]["total"]
print(f"before_count is {before_count}")
domain_id = "test id"
domain_name = "test name"
domain_description = "test description"
# Create new Domain
create_domain_json = {
"query": """mutation createDomain($input: CreateDomainInput!) {\n
createDomain(input: $input)
}""",
"variables": {
"input": {
"id": domain_id,
"name": domain_name,
"description": domain_description,
}
},
}
response = auth_session.post(
f"{auth_session.frontend_url()}/api/v2/graphql", json=create_domain_json
)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["createDomain"] is not None
assert "errors" not in res_data
domain_urn = res_data["data"]["createDomain"]
_ensure_more_domains(
auth_session=auth_session,
list_domains_json=list_domains_json,
before_count=before_count,
)
# Get the domain value back
get_domain_json = {
"query": """query domain($urn: String!) {\n
domain(urn: $urn) {\n
urn\n
id\n
properties {\n
name\n
description\n
}\n
}\n
}""",
"variables": {"urn": domain_urn},
}
response = auth_session.post(
f"{auth_session.frontend_url()}/api/v2/graphql", json=get_domain_json
)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["domain"] is not None
assert "errors" not in res_data
domain = res_data["data"]["domain"]
assert domain["urn"] == f"urn:li:domain:{domain_id}"
assert domain["id"] == domain_id
assert domain["properties"]["name"] == domain_name
assert domain["properties"]["description"] == domain_description
delete_json = {"urn": domain_urn}
# Cleanup: Delete the domain
response = auth_session.post(
f"{auth_session.gms_url()}/entities?action=delete", json=delete_json
)
response.raise_for_status()
@pytest.mark.dependency(depends=["test_create_list_get_domain"])
def test_set_unset_domain(auth_session, ingest_cleanup_data):
# Set and Unset a Domain for a dataset. Note that this doesn't test for adding domains to charts, dashboards, charts, & jobs.
dataset_urn = (
"urn:li:dataset:(urn:li:dataPlatform:kafka,test-tags-terms-sample-kafka,PROD)"
)
domain_urn = "urn:li:domain:engineering"
# First unset to be sure.
unset_domain_json = {
"query": """mutation unsetDomain($entityUrn: String!) {\n
unsetDomain(entityUrn: $entityUrn)}""",
"variables": {"entityUrn": dataset_urn},
}
response = auth_session.post(
f"{auth_session.frontend_url()}/api/v2/graphql", json=unset_domain_json
)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["unsetDomain"] is True
assert "errors" not in res_data
# Set a new domain
set_domain_json = {
"query": """mutation setDomain($entityUrn: String!, $domainUrn: String!) {\n
setDomain(entityUrn: $entityUrn, domainUrn: $domainUrn)}""",
"variables": {"entityUrn": dataset_urn, "domainUrn": domain_urn},
}
response = auth_session.post(
f"{auth_session.frontend_url()}/api/v2/graphql", json=set_domain_json
)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["setDomain"] is True
assert "errors" not in res_data
# Now, fetch the dataset's domain and confirm it was set.
get_dataset_json = {
"query": """query dataset($urn: String!) {\n
dataset(urn: $urn) {\n
urn\n
domain {\n
domain {\n
urn\n
properties{\n
name\n
}\n
}\n
}\n
}\n
}""",
"variables": {"urn": dataset_urn},
}
response = auth_session.post(
f"{auth_session.frontend_url()}/api/v2/graphql", json=get_dataset_json
)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]["dataset"]["domain"]["domain"]["urn"] == domain_urn
assert (
res_data["data"]["dataset"]["domain"]["domain"]["properties"]["name"]
== "Engineering"
)