datahub/smoke-test/test_e2e.py
2024-10-22 06:59:40 -05:00

1384 lines
42 KiB
Python

import logging
import time
import urllib
from http import HTTPStatus
from typing import Any, Optional
import concurrent.futures
import pytest
import requests
import tenacity
from datahub.ingestion.run.pipeline import Pipeline
logger = logging.getLogger(__name__)
pytestmark = pytest.mark.no_cypress_suite1
from tests.utils import (
get_kafka_broker_url,
get_kafka_schema_registry,
get_sleep_info,
ingest_file_via_rest,
get_frontend_session,
get_admin_credentials,
get_root_urn,
wait_for_writes_to_sync,
)
bootstrap_sample_data = "../metadata-ingestion/examples/mce_files/bootstrap_mce.json"
usage_sample_data = "./test_resources/bigquery_usages_golden.json"
bq_sample_data = "./sample_bq_data.json"
restli_default_headers = {
"X-RestLi-Protocol-Version": "2.0.0",
}
kafka_post_ingestion_wait_sec = 30
sleep_sec, sleep_times = get_sleep_info()
@tenacity.retry(
stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec)
)
def _ensure_user_present(auth_session, urn: str):
response = auth_session.get(
f"{auth_session.gms_url()}/entities/{urllib.parse.quote(urn)}",
headers={
**restli_default_headers,
},
)
response.raise_for_status()
data = response.json()
user_key = "com.linkedin.metadata.snapshot.CorpUserSnapshot"
assert data["value"]
assert data["value"][user_key]
assert data["value"][user_key]["urn"] == urn
return data
@tenacity.retry(
stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec)
)
def _ensure_user_relationship_present(auth_session, urn, relationships):
json = {
"query": """query corpUser($urn: String!) {\n
corpUser(urn: $urn) {\n
urn\n
relationships(input: { types: ["IsMemberOfNativeGroup"], direction: OUTGOING, start: 0, count: 1 }) {\n
total\n
}\n
}\n
}""",
"variables": {"urn": urn},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["corpUser"]
assert res_data["data"]["corpUser"]["relationships"]
assert res_data["data"]["corpUser"]["relationships"]["total"] == relationships
@tenacity.retry(
stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec)
)
def _ensure_dataset_present(
auth_session: Any,
urn: str,
aspects: Optional[str] = "datasetProperties",
) -> Any:
response = auth_session.get(
f"{auth_session.gms_url()}/entitiesV2?ids=List({urllib.parse.quote(urn)})&aspects=List({aspects})",
headers={
**restli_default_headers,
"X-RestLi-Method": "batch_get",
},
)
response.raise_for_status()
res_data = response.json()
assert res_data["results"]
assert res_data["results"][urn]
assert res_data["results"][urn]["aspects"]["datasetProperties"]
return res_data
@tenacity.retry(
stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec)
)
def _ensure_group_not_present(auth_session, urn: str) -> Any:
json = {
"query": """query corpGroup($urn: String!) {\n
corpGroup(urn: $urn) {\n
urn\n
properties {\n
displayName\n
}\n
}\n
}""",
"variables": {"urn": urn},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["corpGroup"]
assert res_data["data"]["corpGroup"]["properties"] is None
def fixture_ingestion_via_rest(auth_session):
ingest_file_via_rest(auth_session, bootstrap_sample_data)
_ensure_user_present(auth_session, urn=get_root_urn())
def fixture_ingestion_usage_via_rest(auth_session):
ingest_file_via_rest(auth_session, usage_sample_data)
def fixture_ingestion_via_kafka(auth_session):
pipeline = Pipeline.create(
{
"source": {
"type": "file",
"config": {"filename": bq_sample_data},
},
"sink": {
"type": "datahub-kafka",
"config": {
"connection": {
"bootstrap": get_kafka_broker_url(),
"schema_registry_url": get_kafka_schema_registry(),
}
},
},
}
)
pipeline.run()
pipeline.raise_from_status()
_ensure_dataset_present(auth_session,
"urn:li:dataset:(urn:li:dataPlatform:bigquery,bigquery-public-data.covid19_geotab_mobility_impact.us_border_wait_times,PROD)"
)
# Since Kafka emission is asynchronous, we must wait a little bit so that
# the changes are actually processed.
time.sleep(kafka_post_ingestion_wait_sec)
@pytest.fixture(scope='module', autouse=True)
def test_run_ingestion(auth_session):
# Dummy test so that future ones can just depend on this one.
# The rest sink fixtures cannot run at the same time, limitation of the Pipeline code
fixture_ingestion_usage_via_rest(auth_session)
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
futures = []
for ingestion_fixture in ["fixture_ingestion_via_kafka", "fixture_ingestion_via_rest"]:
futures.append(
executor.submit(globals()[ingestion_fixture], auth_session)
)
for future in concurrent.futures.as_completed(futures):
logger.info(future.result())
wait_for_writes_to_sync()
def test_gms_get_user(auth_session):
username = "jdoe"
urn = f"urn:li:corpuser:{username}"
_ensure_user_present(auth_session, urn=urn)
@pytest.mark.parametrize(
"platform,dataset_name,env",
[
(
# This one tests the bootstrap sample data.
"urn:li:dataPlatform:kafka",
"SampleKafkaDataset",
"PROD",
),
(
# This one tests BigQuery ingestion.
"urn:li:dataPlatform:bigquery",
"bigquery-public-data.covid19_geotab_mobility_impact.us_border_wait_times",
"PROD",
),
],
)
def test_gms_get_dataset(auth_session, platform, dataset_name, env):
platform = "urn:li:dataPlatform:bigquery"
dataset_name = (
"bigquery-public-data.covid19_geotab_mobility_impact.us_border_wait_times"
)
env = "PROD"
urn = f"urn:li:dataset:({platform},{dataset_name},{env})"
response = auth_session.get(
f"{auth_session.gms_url()}/entities/{urllib.parse.quote(urn)}",
headers={
**restli_default_headers,
"X-RestLi-Method": "get",
},
)
response.raise_for_status()
res_data = response.json()
assert res_data["value"]
assert res_data["value"]["com.linkedin.metadata.snapshot.DatasetSnapshot"]
assert (
res_data["value"]["com.linkedin.metadata.snapshot.DatasetSnapshot"]["urn"]
== urn
)
def test_gms_batch_get_v2(auth_session):
platform = "urn:li:dataPlatform:bigquery"
env = "PROD"
name_1 = "bigquery-public-data.covid19_geotab_mobility_impact.us_border_wait_times"
name_2 = "bigquery-public-data.covid19_geotab_mobility_impact.ca_border_wait_times"
urn1 = f"urn:li:dataset:({platform},{name_1},{env})"
urn2 = f"urn:li:dataset:({platform},{name_2},{env})"
resp1 = _ensure_dataset_present(auth_session, urn1, aspects="datasetProperties,ownership")
assert resp1["results"][urn1]["aspects"]["ownership"]
resp2 = _ensure_dataset_present(auth_session, urn2, aspects="datasetProperties,ownership")
assert (
"ownership" not in resp2["results"][urn2]["aspects"]
) # Aspect does not exist.
@pytest.mark.parametrize(
"query,min_expected_results",
[
("covid", 1),
("sample", 3),
],
)
def test_gms_search_dataset(auth_session, query, min_expected_results):
json = {"input": f"{query}", "entity": "dataset", "start": 0, "count": 10}
print(json)
response = auth_session.post(
f"{auth_session.gms_url()}/entities?action=search",
headers=restli_default_headers,
json=json,
)
response.raise_for_status()
res_data = response.json()
assert res_data["value"]
assert res_data["value"]["numEntities"] >= min_expected_results
assert len(res_data["value"]["entities"]) >= min_expected_results
@pytest.mark.parametrize(
"query,min_expected_results",
[
("covid", 1),
("sample", 3),
],
)
def test_gms_search_across_entities(auth_session, query, min_expected_results):
json = {"input": f"{query}", "entities": [], "start": 0, "count": 10}
print(json)
response = auth_session.post(
f"{auth_session.gms_url()}/entities?action=searchAcrossEntities",
headers=restli_default_headers,
json=json,
)
response.raise_for_status()
res_data = response.json()
assert res_data["value"]
assert res_data["value"]["numEntities"] >= min_expected_results
assert len(res_data["value"]["entities"]) >= min_expected_results
def test_gms_usage_fetch(auth_session):
response = auth_session.post(
f"{auth_session.gms_url()}/usageStats?action=queryRange",
headers=restli_default_headers,
json={
"resource": "urn:li:dataset:(urn:li:dataPlatform:bigquery,harshal-playground-306419.test_schema.excess_deaths_derived,PROD)",
"duration": "DAY",
"rangeFromEnd": "ALL",
},
)
response.raise_for_status()
data = response.json()["value"]
assert len(data["buckets"]) == 6
assert data["buckets"][0]["metrics"]["topSqlQueries"]
fields = data["aggregations"].pop("fields")
assert len(fields) == 12
assert fields[0]["count"] == 7
users = data["aggregations"].pop("users")
assert len(users) == 1
assert users[0]["count"] == 7
assert data["aggregations"] == {
# "fields" and "users" already popped out
"totalSqlQueries": 7,
"uniqueUserCount": 1,
}
def test_frontend_auth(auth_session):
pass
def test_frontend_browse_datasets(auth_session):
json = {
"query": """query browse($input: BrowseInput!) {\n
browse(input: $input) {\n
start\n
count\n
total\n
groups {
name
}
entities {\n
... on Dataset {\n
urn\n
name\n
}\n
}\n
}\n
}""",
"variables": {"input": {"type": "DATASET", "path": ["prod"]}},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["browse"]
assert len(res_data["data"]["browse"]["entities"]) == 0
assert len(res_data["data"]["browse"]["groups"]) > 0
@pytest.mark.parametrize(
"query,min_expected_results",
[
("covid", 1),
("sample", 3),
("", 1),
],
)
def test_frontend_search_datasets(auth_session, query, min_expected_results):
json = {
"query": """query search($input: SearchInput!) {\n
search(input: $input) {\n
start\n
count\n
total\n
searchResults {\n
entity {\n
... on Dataset {\n
urn\n
name\n
}\n
}\n
}\n
}\n
}""",
"variables": {
"input": {"type": "DATASET", "query": f"{query}", "start": 0, "count": 10}
},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["search"]
assert res_data["data"]["search"]["total"] >= min_expected_results
assert len(res_data["data"]["search"]["searchResults"]) >= min_expected_results
@pytest.mark.parametrize(
"query,min_expected_results",
[
("covid", 1),
("sample", 3),
("", 1),
],
)
def test_frontend_search_across_entities(auth_session, query, min_expected_results):
json = {
"query": """query searchAcrossEntities($input: SearchAcrossEntitiesInput!) {\n
searchAcrossEntities(input: $input) {\n
start\n
count\n
total\n
searchResults {\n
entity {\n
... on Dataset {\n
urn\n
name\n
}\n
}\n
}\n
}\n
}""",
"variables": {
"input": {"types": [], "query": f"{query}", "start": 0, "count": 10}
},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["searchAcrossEntities"]
assert res_data["data"]["searchAcrossEntities"]["total"] >= min_expected_results
assert (
len(res_data["data"]["searchAcrossEntities"]["searchResults"])
>= min_expected_results
)
def test_frontend_user_info(auth_session):
urn = get_root_urn()
json = {
"query": """query corpUser($urn: String!) {\n
corpUser(urn: $urn) {\n
urn\n
username\n
editableInfo {\n
pictureLink\n
}\n
info {\n
firstName\n
fullName\n
title\n
email\n
}\n
}\n
}""",
"variables": {"urn": urn},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["corpUser"]
assert res_data["data"]["corpUser"]["urn"] == urn
@pytest.mark.parametrize(
"platform,dataset_name,env",
[
(
# This one tests the bootstrap sample data.
"urn:li:dataPlatform:kafka",
"SampleKafkaDataset",
"PROD",
),
(
# This one tests BigQuery ingestion.
"urn:li:dataPlatform:bigquery",
"bigquery-public-data.covid19_geotab_mobility_impact.us_border_wait_times",
"PROD",
),
],
)
def test_frontend_datasets(auth_session, platform, dataset_name, env):
urn = f"urn:li:dataset:({platform},{dataset_name},{env})"
json = {
"query": """query getDataset($urn: String!) {\n
dataset(urn: $urn) {\n
urn\n
name\n
description\n
platform {\n
urn\n
}\n
schemaMetadata {\n
name\n
version\n
createdAt\n
}\n
}\n
}""",
"variables": {"urn": urn},
}
# Basic dataset info.
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["dataset"]
assert res_data["data"]["dataset"]["urn"] == urn
assert res_data["data"]["dataset"]["name"] == dataset_name
assert res_data["data"]["dataset"]["platform"]["urn"] == platform
def test_ingest_with_system_metadata(auth_session, test_run_ingestion):
response = auth_session.post(
f"{auth_session.gms_url()}/entities?action=ingest",
headers=restli_default_headers,
json={
"entity": {
"value": {
"com.linkedin.metadata.snapshot.CorpUserSnapshot": {
"urn": get_root_urn(),
"aspects": [
{
"com.linkedin.identity.CorpUserInfo": {
"active": True,
"displayName": "DataHub",
"email": "datahub@linkedin.com",
"title": "CEO",
"fullName": "DataHub",
}
}
],
}
}
},
"systemMetadata": {
"lastObserved": 1628097379571,
"runId": "af0fe6e4-f547-11eb-81b2-acde48001122",
},
},
)
response.raise_for_status()
def test_ingest_with_blank_system_metadata(auth_session):
response = auth_session.post(
f"{auth_session.gms_url()}/entities?action=ingest",
headers=restli_default_headers,
json={
"entity": {
"value": {
"com.linkedin.metadata.snapshot.CorpUserSnapshot": {
"urn": get_root_urn(),
"aspects": [
{
"com.linkedin.identity.CorpUserInfo": {
"active": True,
"displayName": "DataHub",
"email": "datahub@linkedin.com",
"title": "CEO",
"fullName": "DataHub",
}
}
],
}
}
},
"systemMetadata": {},
},
)
response.raise_for_status()
def test_ingest_without_system_metadata(auth_session):
response = auth_session.post(
f"{auth_session.gms_url()}/entities?action=ingest",
headers=restli_default_headers,
json={
"entity": {
"value": {
"com.linkedin.metadata.snapshot.CorpUserSnapshot": {
"urn": get_root_urn(),
"aspects": [
{
"com.linkedin.identity.CorpUserInfo": {
"active": True,
"displayName": "DataHub",
"email": "datahub@linkedin.com",
"title": "CEO",
"fullName": "DataHub",
}
}
],
}
}
},
},
)
response.raise_for_status()
def test_frontend_app_config(auth_session):
json = {
"query": """query appConfig {\n
appConfig {\n
analyticsConfig {\n
enabled\n
}\n
policiesConfig {\n
enabled\n
platformPrivileges {\n
type\n
displayName\n
description\n
}\n
resourcePrivileges {\n
resourceType\n
resourceTypeDisplayName\n
entityType\n
privileges {\n
type\n
displayName\n
description\n
}\n
}\n
}\n
}\n
}"""
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["appConfig"]
assert res_data["data"]["appConfig"]["analyticsConfig"]["enabled"] is True
assert res_data["data"]["appConfig"]["policiesConfig"]["enabled"] is True
def test_frontend_me_query(auth_session):
json = {
"query": """query me {\n
me {\n
corpUser {\n
urn\n
username\n
editableInfo {\n
pictureLink\n
}\n
info {\n
firstName\n
fullName\n
title\n
email\n
}\n
}\n
platformPrivileges {\n
viewAnalytics
managePolicies
manageIdentities
manageUserCredentials
generatePersonalAccessTokens
}\n
}\n
}"""
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["me"]["corpUser"]["urn"] == get_root_urn()
assert res_data["data"]["me"]["platformPrivileges"]["viewAnalytics"] is True
assert res_data["data"]["me"]["platformPrivileges"]["managePolicies"] is True
assert res_data["data"]["me"]["platformPrivileges"]["manageUserCredentials"] is True
assert res_data["data"]["me"]["platformPrivileges"]["manageIdentities"] is True
assert (
res_data["data"]["me"]["platformPrivileges"]["generatePersonalAccessTokens"]
is True
)
def test_list_users(auth_session):
json = {
"query": """query listUsers($input: ListUsersInput!) {\n
listUsers(input: $input) {\n
start\n
count\n
total\n
users {\n
urn\n
type\n
username\n
properties {\n
firstName
}\n
}\n
}\n
}""",
"variables": {
"input": {
"start": "0",
"count": "2",
}
},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["listUsers"]
assert res_data["data"]["listUsers"]["start"] == 0
assert res_data["data"]["listUsers"]["count"] == 2
assert (
len(res_data["data"]["listUsers"]["users"]) >= 2
) # Length of default user set.
@pytest.mark.dependency()
def test_list_groups(auth_session):
json = {
"query": """query listGroups($input: ListGroupsInput!) {\n
listGroups(input: $input) {\n
start\n
count\n
total\n
groups {\n
urn\n
type\n
name\n
properties {\n
displayName
}\n
}\n
}\n
}""",
"variables": {
"input": {
"start": "0",
"count": "2",
}
},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["listGroups"]
assert res_data["data"]["listGroups"]["start"] == 0
assert res_data["data"]["listGroups"]["count"] == 2
assert (
len(res_data["data"]["listGroups"]["groups"]) >= 2
) # Length of default group set.
@pytest.mark.dependency(
depends=["test_list_groups"]
)
def test_add_remove_members_from_group(auth_session):
# Assert no group edges for user jdoe
json = {
"query": """query corpUser($urn: String!) {\n
corpUser(urn: $urn) {\n
urn\n
relationships(input: { types: ["IsMemberOfNativeGroup"], direction: OUTGOING, start: 0, count: 1 }) {\n
total\n
}\n
}\n
}""",
"variables": {"urn": "urn:li:corpuser:jdoe"},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["corpUser"]
assert res_data["data"]["corpUser"]["relationships"]["total"] == 0
# Add jdoe to group
json = {
"query": """mutation addGroupMembers($input: AddGroupMembersInput!) {\n
addGroupMembers(input: $input) }""",
"variables": {
"input": {
"groupUrn": "urn:li:corpGroup:bfoo",
"userUrns": ["urn:li:corpuser:jdoe"],
}
},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
# Verify the member has been added
_ensure_user_relationship_present(auth_session, "urn:li:corpuser:jdoe", 1)
# Now remove jdoe from the group
json = {
"query": """mutation removeGroupMembers($input: RemoveGroupMembersInput!) {\n
removeGroupMembers(input: $input) }""",
"variables": {
"input": {
"groupUrn": "urn:li:corpGroup:bfoo",
"userUrns": ["urn:li:corpuser:jdoe"],
}
},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
# Verify the member has been removed
_ensure_user_relationship_present(auth_session, "urn:li:corpuser:jdoe", 0)
@pytest.mark.dependency()
def test_update_corp_group_properties(auth_session):
group_urn = "urn:li:corpGroup:bfoo"
# Update Corp Group Description
json = {
"query": """mutation updateCorpGroupProperties($urn: String!, $input: CorpGroupUpdateInput!) {\n
updateCorpGroupProperties(urn: $urn, input: $input) { urn } }""",
"variables": {
"urn": group_urn,
"input": {
"description": "My test description",
"slack": "test_group_slack",
"email": "test_group_email@email.com",
},
},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
print(res_data)
assert "errors" not in res_data
assert res_data["data"]["updateCorpGroupProperties"] is not None
# Verify the description has been updated
json = {
"query": """query corpGroup($urn: String!) {\n
corpGroup(urn: $urn) {\n
urn\n
editableProperties {\n
description\n
slack\n
email\n
}\n
}\n
}""",
"variables": {"urn": group_urn},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
assert res_data
assert "errors" not in res_data
assert res_data["data"]
assert res_data["data"]["corpGroup"]
assert res_data["data"]["corpGroup"]["editableProperties"]
assert res_data["data"]["corpGroup"]["editableProperties"] == {
"description": "My test description",
"slack": "test_group_slack",
"email": "test_group_email@email.com",
}
# Reset the editable properties
json = {
"query": """mutation updateCorpGroupProperties($urn: String!, $input: CorpGroupUpdateInput!) {\n
updateCorpGroupProperties(urn: $urn, input: $input) { urn } }""",
"variables": {
"urn": group_urn,
"input": {"description": "", "slack": "", "email": ""},
},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
@pytest.mark.dependency(
depends=[
"test_update_corp_group_properties",
]
)
def test_update_corp_group_description(auth_session):
group_urn = "urn:li:corpGroup:bfoo"
# Update Corp Group Description
json = {
"query": """mutation updateDescription($input: DescriptionUpdateInput!) {\n
updateDescription(input: $input) }""",
"variables": {
"input": {"description": "My test description", "resourceUrn": group_urn},
},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
print(res_data)
assert "errors" not in res_data
assert res_data["data"]["updateDescription"] is True
# Verify the description has been updated
json = {
"query": """query corpGroup($urn: String!) {\n
corpGroup(urn: $urn) {\n
urn\n
editableProperties {\n
description\n
}\n
}\n
}""",
"variables": {"urn": group_urn},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
assert res_data
assert "errors" not in res_data
assert res_data["data"]
assert res_data["data"]["corpGroup"]
assert res_data["data"]["corpGroup"]["editableProperties"]
assert (
res_data["data"]["corpGroup"]["editableProperties"]["description"]
== "My test description"
)
# Reset Corp Group Description
json = {
"query": """mutation updateDescription($input: DescriptionUpdateInput!) {\n
updateDescription(input: $input) }""",
"variables": {
"input": {"description": "", "resourceUrn": group_urn},
},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
@pytest.mark.dependency(
depends=[
"test_list_groups",
"test_add_remove_members_from_group",
]
)
def test_remove_user(auth_session):
json = {
"query": """mutation removeUser($urn: String!) {\n
removeUser(urn: $urn) }""",
"variables": {"urn": "urn:li:corpuser:jdoe"},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
json = {
"query": """query corpUser($urn: String!) {\n
corpUser(urn: $urn) {\n
urn\n
properties {\n
firstName\n
}\n
}\n
}""",
"variables": {"urn": "urn:li:corpuser:jdoe"},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
assert res_data
assert "errors" not in res_data
assert res_data["data"]
assert res_data["data"]["corpUser"]
assert res_data["data"]["corpUser"]["properties"] is None
@pytest.mark.dependency(
depends=[
"test_list_groups",
"test_add_remove_members_from_group",
]
)
def test_remove_group(auth_session):
group_urn = "urn:li:corpGroup:bfoo"
json = {
"query": """mutation removeGroup($urn: String!) {\n
removeGroup(urn: $urn) }""",
"variables": {"urn": group_urn},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
_ensure_group_not_present(auth_session, group_urn)
@pytest.mark.dependency(
depends=[
"test_list_groups",
"test_remove_group",
]
)
def test_create_group(auth_session):
json = {
"query": """mutation createGroup($input: CreateGroupInput!) {\n
createGroup(input: $input) }""",
"variables": {
"input": {
"id": "test-id",
"name": "Test Group",
"description": "My test group",
}
},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
json = {
"query": """query corpGroup($urn: String!) {\n
corpGroup(urn: $urn) {\n
urn\n
properties {\n
displayName\n
}\n
}\n
}""",
"variables": {"urn": "urn:li:corpGroup:test-id"},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["corpGroup"]
assert res_data["data"]["corpGroup"]["properties"]["displayName"] == "Test Group"
def test_home_page_recommendations(auth_session):
min_expected_recommendation_modules = 0
json = {
"query": """query listRecommendations($input: ListRecommendationsInput!) {\n
listRecommendations(input: $input) { modules { title } } }""",
"variables": {
"input": {
"userUrn": get_root_urn(),
"requestContext": {"scenario": "HOME"},
"limit": 5,
}
},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
print(res_data)
assert res_data
assert res_data["data"]
assert res_data["data"]["listRecommendations"]
assert "errors" not in res_data
assert (
len(res_data["data"]["listRecommendations"]["modules"])
> min_expected_recommendation_modules
)
def test_search_results_recommendations(auth_session):
# This test simply ensures that the recommendations endpoint does not return an error.
json = {
"query": """query listRecommendations($input: ListRecommendationsInput!) {\n
listRecommendations(input: $input) { modules { title } } }""",
"variables": {
"input": {
"userUrn": get_root_urn(),
"requestContext": {
"scenario": "SEARCH_RESULTS",
"searchRequestContext": {"query": "asdsdsdds", "filters": []},
},
"limit": 5,
}
},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
assert res_data
assert "errors" not in res_data
def test_generate_personal_access_token(auth_session):
# Test success case
json = {
"query": """query getAccessToken($input: GetAccessTokenInput!) {\n
getAccessToken(input: $input) {\n
accessToken\n
}\n
}""",
"variables": {
"input": {
"type": "PERSONAL",
"actorUrn": get_root_urn(),
"duration": "ONE_MONTH",
}
},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["getAccessToken"]["accessToken"] is not None
assert "errors" not in res_data
# Test unauthenticated case
json = {
"query": """query getAccessToken($input: GetAccessTokenInput!) {\n
getAccessToken(input: $input) {\n
accessToken\n
}\n
}""",
"variables": {
"input": {
"type": "PERSONAL",
"actorUrn": "urn:li:corpuser:jsmith",
"duration": "ONE_DAY",
}
},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
assert res_data
assert "errors" in res_data # Assert the request fails
def test_native_user_endpoints(auth_session):
# Sign up tests
frontend_session = get_frontend_session()
# Test getting the invite token
get_invite_token_json = {
"query": """query getInviteToken($input: GetInviteTokenInput!) {\n
getInviteToken(input: $input){\n
inviteToken\n
}\n
}""",
"variables": {"input": {}},
}
get_invite_token_response = frontend_session.post(
f"{auth_session.frontend_url()}/api/v2/graphql", json=get_invite_token_json
)
get_invite_token_response.raise_for_status()
get_invite_token_res_data = get_invite_token_response.json()
assert get_invite_token_res_data
assert get_invite_token_res_data["data"]
invite_token = get_invite_token_res_data["data"]["getInviteToken"]["inviteToken"]
assert invite_token is not None
assert "errors" not in get_invite_token_res_data
# Pass the invite token when creating the user
sign_up_json = {
"fullName": "Test User",
"email": "test@email.com",
"password": "password",
"title": "Date Engineer",
"inviteToken": invite_token,
}
sign_up_response = frontend_session.post(
f"{auth_session.frontend_url()}/signUp", json=sign_up_json
)
assert sign_up_response
assert "errors" not in sign_up_response
# Creating the same user again fails
same_user_sign_up_response = frontend_session.post(
f"{auth_session.frontend_url()}/signUp", json=sign_up_json
)
assert not same_user_sign_up_response
# Test that a bad invite token leads to failed sign up
bad_sign_up_json = {
"fullName": "Test2 User",
"email": "test2@email.com",
"password": "password",
"title": "Date Engineer",
"inviteToken": "invite_token",
}
bad_sign_up_response = frontend_session.post(
f"{auth_session.frontend_url()}/signUp", json=bad_sign_up_json
)
assert not bad_sign_up_response
frontend_session.cookies.clear()
# Reset credentials tests
# Log in as root again
headers = {
"Content-Type": "application/json",
}
username, password = get_admin_credentials()
root_login_data = '{"username":"' + username + '", "password":"' + password + '"}'
frontend_session.post(
f"{auth_session.frontend_url()}/logIn", headers=headers, data=root_login_data
)
# Test creating the password reset token
create_reset_token_json = {
"query": """mutation createNativeUserResetToken($input: CreateNativeUserResetTokenInput!) {\n
createNativeUserResetToken(input: $input) {\n
resetToken\n
}\n
}""",
"variables": {"input": {"userUrn": "urn:li:corpuser:test@email.com"}},
}
create_reset_token_response = frontend_session.post(
f"{auth_session.frontend_url()}/api/v2/graphql", json=create_reset_token_json
)
create_reset_token_response.raise_for_status()
create_reset_token_res_data = create_reset_token_response.json()
assert create_reset_token_res_data
assert create_reset_token_res_data["data"]
reset_token = create_reset_token_res_data["data"]["createNativeUserResetToken"][
"resetToken"
]
assert reset_token is not None
assert "errors" not in create_reset_token_res_data
# Pass the reset token when resetting credentials
reset_credentials_json = {
"email": "test@email.com",
"password": "newpassword",
"resetToken": reset_token,
}
reset_credentials_response = frontend_session.post(
f"{auth_session.frontend_url()}/resetNativeUserCredentials", json=reset_credentials_json
)
assert reset_credentials_response
assert "errors" not in reset_credentials_response
# Test that a bad reset token leads to failed response
bad_user_reset_credentials_json = {
"email": "test@email.com",
"password": "newerpassword",
"resetToken": "reset_token",
}
bad_reset_credentials_response = frontend_session.post(
f"{auth_session.frontend_url()}/resetNativeUserCredentials",
json=bad_user_reset_credentials_json,
)
assert not bad_reset_credentials_response
# Test that only a native user can reset their password
jaas_user_reset_credentials_json = {
"email": "datahub",
"password": "password",
"resetToken": reset_token,
}
jaas_user_reset_credentials_response = frontend_session.post(
f"{auth_session.frontend_url()}/resetNativeUserCredentials",
json=jaas_user_reset_credentials_json,
)
assert not jaas_user_reset_credentials_response
# Tests that unauthenticated users can't invite users or send reset password links
unauthenticated_session = requests.Session()
unauthenticated_get_invite_token_response = unauthenticated_session.post(
f"{auth_session.frontend_url()}/api/v2/graphql", json=get_invite_token_json
)
assert (
unauthenticated_get_invite_token_response.status_code == HTTPStatus.UNAUTHORIZED
)
unauthenticated_create_reset_token_json = {
"query": """mutation createNativeUserResetToken($input: CreateNativeUserResetTokenInput!) {\n
createNativeUserResetToken(input: $input) {\n
resetToken\n
}\n
}""",
"variables": {"input": {"userUrn": "urn:li:corpuser:test@email.com"}},
}
unauthenticated_create_reset_token_response = unauthenticated_session.post(
f"{auth_session.frontend_url()}/api/v2/graphql",
json=unauthenticated_create_reset_token_json,
)
assert (
unauthenticated_create_reset_token_response.status_code
== HTTPStatus.UNAUTHORIZED
)
# cleanup steps
json = {
"query": """mutation removeUser($urn: String!) {\n
removeUser(urn: $urn) }""",
"variables": {"urn": "urn:li:corpuser:test@email.com"},
}
frontend_session.post(
f"{auth_session.frontend_url()}/logIn", headers=headers, data=root_login_data
)
remove_user_response = frontend_session.post(
f"{auth_session.frontend_url()}/api/v2/graphql", json=json
)
remove_user_response.raise_for_status()
assert "errors" not in remove_user_response