mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-25 18:38:55 +00:00
234 lines
6.9 KiB
Python
234 lines
6.9 KiB
Python
import pytest
|
|
import tenacity
|
|
|
|
from tests.utils import delete_urns_from_file, get_sleep_info, ingest_file_via_rest
|
|
|
|
sleep_sec, sleep_times = get_sleep_info()
|
|
|
|
|
|
@pytest.fixture(scope="module", autouse=False)
|
|
def ingest_cleanup_data(auth_session, graph_client, request):
|
|
print("ingesting domains test data")
|
|
ingest_file_via_rest(auth_session, "tests/domains/data.json")
|
|
yield
|
|
print("removing domains test data")
|
|
delete_urns_from_file(graph_client, "tests/domains/data.json")
|
|
|
|
|
|
@tenacity.retry(
|
|
stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec)
|
|
)
|
|
def _ensure_more_domains(auth_session, list_domains_json, before_count):
|
|
# Get new count of Domains
|
|
response = auth_session.post(
|
|
f"{auth_session.frontend_url()}/api/v2/graphql", json=list_domains_json
|
|
)
|
|
response.raise_for_status()
|
|
res_data = response.json()
|
|
|
|
assert res_data
|
|
assert res_data["data"]
|
|
assert res_data["data"]["listDomains"]["total"] is not None
|
|
assert "errors" not in res_data
|
|
|
|
# Assert that there are more domains now.
|
|
after_count = res_data["data"]["listDomains"]["total"]
|
|
print(f"after_count is {after_count}")
|
|
assert after_count == before_count + 1
|
|
|
|
|
|
@pytest.mark.dependency()
|
|
def test_create_list_get_domain(auth_session):
|
|
# Setup: Delete the domain (if exists)
|
|
response = auth_session.post(
|
|
f"{auth_session.gms_url()}/entities?action=delete",
|
|
json={"urn": "urn:li:domain:test id"},
|
|
)
|
|
|
|
# Get count of existing secrets
|
|
list_domains_json = {
|
|
"query": """query listDomains($input: ListDomainsInput!) {\n
|
|
listDomains(input: $input) {\n
|
|
start\n
|
|
count\n
|
|
total\n
|
|
domains {\n
|
|
urn\n
|
|
properties {\n
|
|
name\n
|
|
}\n
|
|
}\n
|
|
}\n
|
|
}""",
|
|
"variables": {"input": {"start": "0", "count": "20"}},
|
|
}
|
|
|
|
response = auth_session.post(
|
|
f"{auth_session.frontend_url()}/api/v2/graphql", json=list_domains_json
|
|
)
|
|
response.raise_for_status()
|
|
res_data = response.json()
|
|
|
|
assert res_data
|
|
assert res_data["data"]
|
|
assert res_data["data"]["listDomains"]["total"] is not None
|
|
assert "errors" not in res_data
|
|
print(f"domains resp is {res_data}")
|
|
|
|
before_count = res_data["data"]["listDomains"]["total"]
|
|
print(f"before_count is {before_count}")
|
|
|
|
domain_id = "test id"
|
|
domain_name = "test name"
|
|
domain_description = "test description"
|
|
|
|
# Create new Domain
|
|
create_domain_json = {
|
|
"query": """mutation createDomain($input: CreateDomainInput!) {\n
|
|
createDomain(input: $input)
|
|
}""",
|
|
"variables": {
|
|
"input": {
|
|
"id": domain_id,
|
|
"name": domain_name,
|
|
"description": domain_description,
|
|
}
|
|
},
|
|
}
|
|
|
|
response = auth_session.post(
|
|
f"{auth_session.frontend_url()}/api/v2/graphql", json=create_domain_json
|
|
)
|
|
response.raise_for_status()
|
|
res_data = response.json()
|
|
|
|
assert res_data
|
|
assert res_data["data"]
|
|
assert res_data["data"]["createDomain"] is not None
|
|
assert "errors" not in res_data
|
|
|
|
domain_urn = res_data["data"]["createDomain"]
|
|
|
|
_ensure_more_domains(
|
|
auth_session=auth_session,
|
|
list_domains_json=list_domains_json,
|
|
before_count=before_count,
|
|
)
|
|
|
|
# Get the domain value back
|
|
get_domain_json = {
|
|
"query": """query domain($urn: String!) {\n
|
|
domain(urn: $urn) {\n
|
|
urn\n
|
|
id\n
|
|
properties {\n
|
|
name\n
|
|
description\n
|
|
}\n
|
|
}\n
|
|
}""",
|
|
"variables": {"urn": domain_urn},
|
|
}
|
|
|
|
response = auth_session.post(
|
|
f"{auth_session.frontend_url()}/api/v2/graphql", json=get_domain_json
|
|
)
|
|
response.raise_for_status()
|
|
res_data = response.json()
|
|
|
|
assert res_data
|
|
assert res_data["data"]
|
|
assert res_data["data"]["domain"] is not None
|
|
assert "errors" not in res_data
|
|
|
|
domain = res_data["data"]["domain"]
|
|
assert domain["urn"] == f"urn:li:domain:{domain_id}"
|
|
assert domain["id"] == domain_id
|
|
assert domain["properties"]["name"] == domain_name
|
|
assert domain["properties"]["description"] == domain_description
|
|
|
|
delete_json = {"urn": domain_urn}
|
|
|
|
# Cleanup: Delete the domain
|
|
response = auth_session.post(
|
|
f"{auth_session.gms_url()}/entities?action=delete", json=delete_json
|
|
)
|
|
|
|
response.raise_for_status()
|
|
|
|
|
|
@pytest.mark.dependency(depends=["test_create_list_get_domain"])
|
|
def test_set_unset_domain(auth_session, ingest_cleanup_data):
|
|
# Set and Unset a Domain for a dataset. Note that this doesn't test for adding domains to charts, dashboards, charts, & jobs.
|
|
dataset_urn = (
|
|
"urn:li:dataset:(urn:li:dataPlatform:kafka,test-tags-terms-sample-kafka,PROD)"
|
|
)
|
|
domain_urn = "urn:li:domain:engineering"
|
|
|
|
# First unset to be sure.
|
|
unset_domain_json = {
|
|
"query": """mutation unsetDomain($entityUrn: String!) {\n
|
|
unsetDomain(entityUrn: $entityUrn)}""",
|
|
"variables": {"entityUrn": dataset_urn},
|
|
}
|
|
|
|
response = auth_session.post(
|
|
f"{auth_session.frontend_url()}/api/v2/graphql", json=unset_domain_json
|
|
)
|
|
response.raise_for_status()
|
|
res_data = response.json()
|
|
|
|
assert res_data
|
|
assert res_data["data"]
|
|
assert res_data["data"]["unsetDomain"] is True
|
|
assert "errors" not in res_data
|
|
|
|
# Set a new domain
|
|
set_domain_json = {
|
|
"query": """mutation setDomain($entityUrn: String!, $domainUrn: String!) {\n
|
|
setDomain(entityUrn: $entityUrn, domainUrn: $domainUrn)}""",
|
|
"variables": {"entityUrn": dataset_urn, "domainUrn": domain_urn},
|
|
}
|
|
|
|
response = auth_session.post(
|
|
f"{auth_session.frontend_url()}/api/v2/graphql", json=set_domain_json
|
|
)
|
|
response.raise_for_status()
|
|
res_data = response.json()
|
|
|
|
assert res_data
|
|
assert res_data["data"]
|
|
assert res_data["data"]["setDomain"] is True
|
|
assert "errors" not in res_data
|
|
|
|
# Now, fetch the dataset's domain and confirm it was set.
|
|
get_dataset_json = {
|
|
"query": """query dataset($urn: String!) {\n
|
|
dataset(urn: $urn) {\n
|
|
urn\n
|
|
domain {\n
|
|
domain {\n
|
|
urn\n
|
|
properties{\n
|
|
name\n
|
|
}\n
|
|
}\n
|
|
}\n
|
|
}\n
|
|
}""",
|
|
"variables": {"urn": dataset_urn},
|
|
}
|
|
|
|
response = auth_session.post(
|
|
f"{auth_session.frontend_url()}/api/v2/graphql", json=get_dataset_json
|
|
)
|
|
response.raise_for_status()
|
|
res_data = response.json()
|
|
|
|
assert res_data
|
|
assert res_data["data"]["dataset"]["domain"]["domain"]["urn"] == domain_urn
|
|
assert (
|
|
res_data["data"]["dataset"]["domain"]["domain"]["properties"]["name"]
|
|
== "Engineering"
|
|
)
|