datahub/smoke-test/test_e2e.py

1384 lines
42 KiB
Python
Raw Permalink Normal View History

import logging
2021-04-29 23:27:03 -07:00
import time
import urllib
from http import HTTPStatus
from typing import Any, Optional
import concurrent.futures
2021-04-29 23:27:03 -07:00
import pytest
import requests
import tenacity
2021-04-29 23:27:03 -07:00
from datahub.ingestion.run.pipeline import Pipeline
logger = logging.getLogger(__name__)
pytestmark = pytest.mark.no_cypress_suite1
from tests.utils import (
get_kafka_broker_url,
get_kafka_schema_registry,
get_sleep_info,
ingest_file_via_rest,
get_frontend_session,
get_admin_credentials,
get_root_urn,
wait_for_writes_to_sync,
)
2021-04-29 23:27:03 -07:00
bootstrap_sample_data = "../metadata-ingestion/examples/mce_files/bootstrap_mce.json"
2024-10-22 06:59:40 -05:00
usage_sample_data = "./test_resources/bigquery_usages_golden.json"
2021-04-29 23:27:03 -07:00
bq_sample_data = "./sample_bq_data.json"
restli_default_headers = {
"X-RestLi-Protocol-Version": "2.0.0",
}
kafka_post_ingestion_wait_sec = 30
2021-04-29 23:27:03 -07:00
sleep_sec, sleep_times = get_sleep_info()
2021-04-29 23:27:03 -07:00
@tenacity.retry(
stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec)
)
def _ensure_user_present(auth_session, urn: str):
response = auth_session.get(
f"{auth_session.gms_url()}/entities/{urllib.parse.quote(urn)}",
headers={
**restli_default_headers,
},
)
response.raise_for_status()
data = response.json()
user_key = "com.linkedin.metadata.snapshot.CorpUserSnapshot"
assert data["value"]
assert data["value"][user_key]
assert data["value"][user_key]["urn"] == urn
return data
@tenacity.retry(
stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec)
)
def _ensure_user_relationship_present(auth_session, urn, relationships):
json = {
"query": """query corpUser($urn: String!) {\n
corpUser(urn: $urn) {\n
urn\n
relationships(input: { types: ["IsMemberOfNativeGroup"], direction: OUTGOING, start: 0, count: 1 }) {\n
total\n
}\n
}\n
}""",
"variables": {"urn": urn},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["corpUser"]
assert res_data["data"]["corpUser"]["relationships"]
assert res_data["data"]["corpUser"]["relationships"]["total"] == relationships
@tenacity.retry(
stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec)
)
def _ensure_dataset_present(
auth_session: Any,
urn: str,
aspects: Optional[str] = "datasetProperties",
) -> Any:
response = auth_session.get(
f"{auth_session.gms_url()}/entitiesV2?ids=List({urllib.parse.quote(urn)})&aspects=List({aspects})",
headers={
**restli_default_headers,
"X-RestLi-Method": "batch_get",
},
)
response.raise_for_status()
res_data = response.json()
assert res_data["results"]
assert res_data["results"][urn]
assert res_data["results"][urn]["aspects"]["datasetProperties"]
return res_data
@tenacity.retry(
stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec)
)
def _ensure_group_not_present(auth_session, urn: str) -> Any:
json = {
"query": """query corpGroup($urn: String!) {\n
corpGroup(urn: $urn) {\n
urn\n
properties {\n
displayName\n
}\n
}\n
}""",
"variables": {"urn": urn},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["corpGroup"]
assert res_data["data"]["corpGroup"]["properties"] is None
def fixture_ingestion_via_rest(auth_session):
ingest_file_via_rest(auth_session, bootstrap_sample_data)
_ensure_user_present(auth_session, urn=get_root_urn())
def fixture_ingestion_usage_via_rest(auth_session):
ingest_file_via_rest(auth_session, usage_sample_data)
def fixture_ingestion_via_kafka(auth_session):
2021-04-29 23:27:03 -07:00
pipeline = Pipeline.create(
{
"source": {
"type": "file",
"config": {"filename": bq_sample_data},
},
"sink": {
"type": "datahub-kafka",
"config": {
"connection": {
"bootstrap": get_kafka_broker_url(),
"schema_registry_url": get_kafka_schema_registry(),
2021-04-29 23:27:03 -07:00
}
},
},
}
)
pipeline.run()
pipeline.raise_from_status()
_ensure_dataset_present(auth_session,
"urn:li:dataset:(urn:li:dataPlatform:bigquery,bigquery-public-data.covid19_geotab_mobility_impact.us_border_wait_times,PROD)"
)
2021-04-29 23:27:03 -07:00
# Since Kafka emission is asynchronous, we must wait a little bit so that
# the changes are actually processed.
time.sleep(kafka_post_ingestion_wait_sec)
@pytest.fixture(scope='module', autouse=True)
def test_run_ingestion(auth_session):
2021-04-29 23:27:03 -07:00
# Dummy test so that future ones can just depend on this one.
# The rest sink fixtures cannot run at the same time, limitation of the Pipeline code
fixture_ingestion_usage_via_rest(auth_session)
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
futures = []
for ingestion_fixture in ["fixture_ingestion_via_kafka", "fixture_ingestion_via_rest"]:
futures.append(
executor.submit(globals()[ingestion_fixture], auth_session)
)
for future in concurrent.futures.as_completed(futures):
logger.info(future.result())
wait_for_writes_to_sync()
2021-04-29 23:27:03 -07:00
def test_gms_get_user(auth_session):
2021-04-29 23:27:03 -07:00
username = "jdoe"
urn = f"urn:li:corpuser:{username}"
_ensure_user_present(auth_session, urn=urn)
2021-04-29 23:27:03 -07:00
@pytest.mark.parametrize(
"platform,dataset_name,env",
[
(
# This one tests the bootstrap sample data.
"urn:li:dataPlatform:kafka",
"SampleKafkaDataset",
"PROD",
),
(
# This one tests BigQuery ingestion.
"urn:li:dataPlatform:bigquery",
"bigquery-public-data.covid19_geotab_mobility_impact.us_border_wait_times",
"PROD",
),
],
)
def test_gms_get_dataset(auth_session, platform, dataset_name, env):
2021-04-29 23:27:03 -07:00
platform = "urn:li:dataPlatform:bigquery"
dataset_name = (
"bigquery-public-data.covid19_geotab_mobility_impact.us_border_wait_times"
)
env = "PROD"
urn = f"urn:li:dataset:({platform},{dataset_name},{env})"
response = auth_session.get(
f"{auth_session.gms_url()}/entities/{urllib.parse.quote(urn)}",
2021-04-29 23:27:03 -07:00
headers={
**restli_default_headers,
"X-RestLi-Method": "get",
},
)
response.raise_for_status()
res_data = response.json()
2021-04-29 23:27:03 -07:00
assert res_data["value"]
assert res_data["value"]["com.linkedin.metadata.snapshot.DatasetSnapshot"]
assert (
res_data["value"]["com.linkedin.metadata.snapshot.DatasetSnapshot"]["urn"]
== urn
)
2021-04-29 23:27:03 -07:00
def test_gms_batch_get_v2(auth_session):
platform = "urn:li:dataPlatform:bigquery"
env = "PROD"
name_1 = "bigquery-public-data.covid19_geotab_mobility_impact.us_border_wait_times"
name_2 = "bigquery-public-data.covid19_geotab_mobility_impact.ca_border_wait_times"
urn1 = f"urn:li:dataset:({platform},{name_1},{env})"
urn2 = f"urn:li:dataset:({platform},{name_2},{env})"
resp1 = _ensure_dataset_present(auth_session, urn1, aspects="datasetProperties,ownership")
assert resp1["results"][urn1]["aspects"]["ownership"]
resp2 = _ensure_dataset_present(auth_session, urn2, aspects="datasetProperties,ownership")
assert (
"ownership" not in resp2["results"][urn2]["aspects"]
) # Aspect does not exist.
2021-04-29 23:27:03 -07:00
@pytest.mark.parametrize(
"query,min_expected_results",
[
("covid", 1),
("sample", 3),
],
)
def test_gms_search_dataset(auth_session, query, min_expected_results):
json = {"input": f"{query}", "entity": "dataset", "start": 0, "count": 10}
print(json)
response = auth_session.post(
f"{auth_session.gms_url()}/entities?action=search",
headers=restli_default_headers,
json=json,
2021-04-29 23:27:03 -07:00
)
response.raise_for_status()
res_data = response.json()
2021-04-29 23:27:03 -07:00
assert res_data["value"]
assert res_data["value"]["numEntities"] >= min_expected_results
assert len(res_data["value"]["entities"]) >= min_expected_results
2021-04-29 23:27:03 -07:00
@pytest.mark.parametrize(
"query,min_expected_results",
[
("covid", 1),
("sample", 3),
],
)
def test_gms_search_across_entities(auth_session, query, min_expected_results):
json = {"input": f"{query}", "entities": [], "start": 0, "count": 10}
print(json)
response = auth_session.post(
f"{auth_session.gms_url()}/entities?action=searchAcrossEntities",
headers=restli_default_headers,
json=json,
)
response.raise_for_status()
res_data = response.json()
assert res_data["value"]
assert res_data["value"]["numEntities"] >= min_expected_results
assert len(res_data["value"]["entities"]) >= min_expected_results
def test_gms_usage_fetch(auth_session):
response = auth_session.post(
f"{auth_session.gms_url()}/usageStats?action=queryRange",
headers=restli_default_headers,
json={
"resource": "urn:li:dataset:(urn:li:dataPlatform:bigquery,harshal-playground-306419.test_schema.excess_deaths_derived,PROD)",
"duration": "DAY",
"rangeFromEnd": "ALL",
},
)
response.raise_for_status()
data = response.json()["value"]
assert len(data["buckets"]) == 6
assert data["buckets"][0]["metrics"]["topSqlQueries"]
fields = data["aggregations"].pop("fields")
assert len(fields) == 12
assert fields[0]["count"] == 7
users = data["aggregations"].pop("users")
assert len(users) == 1
assert users[0]["count"] == 7
assert data["aggregations"] == {
# "fields" and "users" already popped out
"totalSqlQueries": 7,
"uniqueUserCount": 1,
}
def test_frontend_auth(auth_session):
2021-04-29 23:27:03 -07:00
pass
def test_frontend_browse_datasets(auth_session):
json = {
"query": """query browse($input: BrowseInput!) {\n
browse(input: $input) {\n
start\n
count\n
total\n
groups {
name
}
entities {\n
... on Dataset {\n
urn\n
name\n
}\n
}\n
}\n
}""",
"variables": {"input": {"type": "DATASET", "path": ["prod"]}},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
2021-04-29 23:27:03 -07:00
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["browse"]
assert len(res_data["data"]["browse"]["entities"]) == 0
assert len(res_data["data"]["browse"]["groups"]) > 0
2021-04-29 23:27:03 -07:00
@pytest.mark.parametrize(
"query,min_expected_results",
[
("covid", 1),
("sample", 3),
("", 1),
2021-04-29 23:27:03 -07:00
],
)
def test_frontend_search_datasets(auth_session, query, min_expected_results):
json = {
"query": """query search($input: SearchInput!) {\n
search(input: $input) {\n
start\n
count\n
total\n
searchResults {\n
entity {\n
... on Dataset {\n
urn\n
name\n
}\n
}\n
}\n
}\n
}""",
"variables": {
"input": {"type": "DATASET", "query": f"{query}", "start": 0, "count": 10}
},
}
2021-04-29 23:27:03 -07:00
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
2021-04-29 23:27:03 -07:00
response.raise_for_status()
res_data = response.json()
2021-04-29 23:27:03 -07:00
assert res_data
assert res_data["data"]
assert res_data["data"]["search"]
assert res_data["data"]["search"]["total"] >= min_expected_results
assert len(res_data["data"]["search"]["searchResults"]) >= min_expected_results
2021-04-29 23:27:03 -07:00
@pytest.mark.parametrize(
"query,min_expected_results",
[
("covid", 1),
("sample", 3),
("", 1),
],
)
def test_frontend_search_across_entities(auth_session, query, min_expected_results):
json = {
"query": """query searchAcrossEntities($input: SearchAcrossEntitiesInput!) {\n
searchAcrossEntities(input: $input) {\n
start\n
count\n
total\n
searchResults {\n
entity {\n
... on Dataset {\n
urn\n
name\n
}\n
}\n
}\n
}\n
}""",
"variables": {
"input": {"types": [], "query": f"{query}", "start": 0, "count": 10}
},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["searchAcrossEntities"]
assert res_data["data"]["searchAcrossEntities"]["total"] >= min_expected_results
assert (
len(res_data["data"]["searchAcrossEntities"]["searchResults"])
>= min_expected_results
)
def test_frontend_user_info(auth_session):
urn = get_root_urn()
json = {
"query": """query corpUser($urn: String!) {\n
corpUser(urn: $urn) {\n
urn\n
username\n
editableInfo {\n
pictureLink\n
}\n
info {\n
firstName\n
fullName\n
title\n
email\n
}\n
}\n
}""",
"variables": {"urn": urn},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
2021-04-29 23:27:03 -07:00
response.raise_for_status()
res_data = response.json()
2021-04-29 23:27:03 -07:00
assert res_data
assert res_data["data"]
assert res_data["data"]["corpUser"]
assert res_data["data"]["corpUser"]["urn"] == urn
2021-04-29 23:27:03 -07:00
@pytest.mark.parametrize(
"platform,dataset_name,env",
[
(
# This one tests the bootstrap sample data.
"urn:li:dataPlatform:kafka",
"SampleKafkaDataset",
"PROD",
),
(
# This one tests BigQuery ingestion.
"urn:li:dataPlatform:bigquery",
"bigquery-public-data.covid19_geotab_mobility_impact.us_border_wait_times",
"PROD",
),
],
)
def test_frontend_datasets(auth_session, platform, dataset_name, env):
2021-04-29 23:27:03 -07:00
urn = f"urn:li:dataset:({platform},{dataset_name},{env})"
json = {
"query": """query getDataset($urn: String!) {\n
dataset(urn: $urn) {\n
urn\n
name\n
description\n
platform {\n
urn\n
}\n
schemaMetadata {\n
name\n
version\n
createdAt\n
}\n
}\n
}""",
"variables": {"urn": urn},
}
2021-04-29 23:27:03 -07:00
# Basic dataset info.
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
2021-04-29 23:27:03 -07:00
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["dataset"]
assert res_data["data"]["dataset"]["urn"] == urn
assert res_data["data"]["dataset"]["name"] == dataset_name
assert res_data["data"]["dataset"]["platform"]["urn"] == platform
def test_ingest_with_system_metadata(auth_session, test_run_ingestion):
response = auth_session.post(
f"{auth_session.gms_url()}/entities?action=ingest",
headers=restli_default_headers,
json={
"entity": {
"value": {
"com.linkedin.metadata.snapshot.CorpUserSnapshot": {
"urn": get_root_urn(),
"aspects": [
{
"com.linkedin.identity.CorpUserInfo": {
"active": True,
"displayName": "DataHub",
"email": "datahub@linkedin.com",
"title": "CEO",
"fullName": "DataHub",
}
}
],
}
}
},
"systemMetadata": {
"lastObserved": 1628097379571,
"runId": "af0fe6e4-f547-11eb-81b2-acde48001122",
},
},
)
response.raise_for_status()
def test_ingest_with_blank_system_metadata(auth_session):
response = auth_session.post(
f"{auth_session.gms_url()}/entities?action=ingest",
headers=restli_default_headers,
json={
"entity": {
"value": {
"com.linkedin.metadata.snapshot.CorpUserSnapshot": {
"urn": get_root_urn(),
"aspects": [
{
"com.linkedin.identity.CorpUserInfo": {
"active": True,
"displayName": "DataHub",
"email": "datahub@linkedin.com",
"title": "CEO",
"fullName": "DataHub",
}
}
],
}
}
},
"systemMetadata": {},
},
)
response.raise_for_status()
def test_ingest_without_system_metadata(auth_session):
response = auth_session.post(
f"{auth_session.gms_url()}/entities?action=ingest",
headers=restli_default_headers,
json={
"entity": {
"value": {
"com.linkedin.metadata.snapshot.CorpUserSnapshot": {
"urn": get_root_urn(),
"aspects": [
{
"com.linkedin.identity.CorpUserInfo": {
"active": True,
"displayName": "DataHub",
"email": "datahub@linkedin.com",
"title": "CEO",
"fullName": "DataHub",
}
}
],
}
}
},
},
)
response.raise_for_status()
def test_frontend_app_config(auth_session):
json = {
"query": """query appConfig {\n
appConfig {\n
analyticsConfig {\n
enabled\n
}\n
policiesConfig {\n
enabled\n
platformPrivileges {\n
type\n
displayName\n
description\n
}\n
resourcePrivileges {\n
resourceType\n
resourceTypeDisplayName\n
entityType\n
privileges {\n
type\n
displayName\n
description\n
}\n
}\n
}\n
}\n
}"""
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["appConfig"]
assert res_data["data"]["appConfig"]["analyticsConfig"]["enabled"] is True
assert res_data["data"]["appConfig"]["policiesConfig"]["enabled"] is True
def test_frontend_me_query(auth_session):
json = {
"query": """query me {\n
me {\n
corpUser {\n
urn\n
username\n
editableInfo {\n
pictureLink\n
}\n
info {\n
firstName\n
fullName\n
title\n
email\n
}\n
}\n
platformPrivileges {\n
viewAnalytics
managePolicies
manageIdentities
manageUserCredentials
generatePersonalAccessTokens
}\n
}\n
}"""
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["me"]["corpUser"]["urn"] == get_root_urn()
assert res_data["data"]["me"]["platformPrivileges"]["viewAnalytics"] is True
assert res_data["data"]["me"]["platformPrivileges"]["managePolicies"] is True
assert res_data["data"]["me"]["platformPrivileges"]["manageUserCredentials"] is True
assert res_data["data"]["me"]["platformPrivileges"]["manageIdentities"] is True
assert (
res_data["data"]["me"]["platformPrivileges"]["generatePersonalAccessTokens"]
is True
)
def test_list_users(auth_session):
json = {
"query": """query listUsers($input: ListUsersInput!) {\n
listUsers(input: $input) {\n
start\n
count\n
total\n
users {\n
urn\n
type\n
username\n
properties {\n
firstName
}\n
}\n
}\n
}""",
"variables": {
"input": {
"start": "0",
"count": "2",
}
},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["listUsers"]
assert res_data["data"]["listUsers"]["start"] == 0
assert res_data["data"]["listUsers"]["count"] == 2
assert (
len(res_data["data"]["listUsers"]["users"]) >= 2
) # Length of default user set.
@pytest.mark.dependency()
def test_list_groups(auth_session):
json = {
"query": """query listGroups($input: ListGroupsInput!) {\n
listGroups(input: $input) {\n
start\n
count\n
total\n
groups {\n
urn\n
type\n
name\n
properties {\n
displayName
}\n
}\n
}\n
}""",
"variables": {
"input": {
"start": "0",
"count": "2",
}
},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["listGroups"]
assert res_data["data"]["listGroups"]["start"] == 0
assert res_data["data"]["listGroups"]["count"] == 2
assert (
len(res_data["data"]["listGroups"]["groups"]) >= 2
) # Length of default group set.
@pytest.mark.dependency(
depends=["test_list_groups"]
)
def test_add_remove_members_from_group(auth_session):
# Assert no group edges for user jdoe
json = {
"query": """query corpUser($urn: String!) {\n
corpUser(urn: $urn) {\n
urn\n
relationships(input: { types: ["IsMemberOfNativeGroup"], direction: OUTGOING, start: 0, count: 1 }) {\n
total\n
}\n
}\n
}""",
"variables": {"urn": "urn:li:corpuser:jdoe"},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["corpUser"]
assert res_data["data"]["corpUser"]["relationships"]["total"] == 0
# Add jdoe to group
json = {
"query": """mutation addGroupMembers($input: AddGroupMembersInput!) {\n
addGroupMembers(input: $input) }""",
"variables": {
"input": {
"groupUrn": "urn:li:corpGroup:bfoo",
"userUrns": ["urn:li:corpuser:jdoe"],
}
},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
# Verify the member has been added
_ensure_user_relationship_present(auth_session, "urn:li:corpuser:jdoe", 1)
# Now remove jdoe from the group
json = {
"query": """mutation removeGroupMembers($input: RemoveGroupMembersInput!) {\n
removeGroupMembers(input: $input) }""",
"variables": {
"input": {
"groupUrn": "urn:li:corpGroup:bfoo",
"userUrns": ["urn:li:corpuser:jdoe"],
}
},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
# Verify the member has been removed
_ensure_user_relationship_present(auth_session, "urn:li:corpuser:jdoe", 0)
@pytest.mark.dependency()
def test_update_corp_group_properties(auth_session):
group_urn = "urn:li:corpGroup:bfoo"
# Update Corp Group Description
json = {
"query": """mutation updateCorpGroupProperties($urn: String!, $input: CorpGroupUpdateInput!) {\n
updateCorpGroupProperties(urn: $urn, input: $input) { urn } }""",
"variables": {
"urn": group_urn,
"input": {
"description": "My test description",
"slack": "test_group_slack",
"email": "test_group_email@email.com",
},
},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
print(res_data)
assert "errors" not in res_data
assert res_data["data"]["updateCorpGroupProperties"] is not None
# Verify the description has been updated
json = {
"query": """query corpGroup($urn: String!) {\n
corpGroup(urn: $urn) {\n
urn\n
editableProperties {\n
description\n
slack\n
email\n
}\n
}\n
}""",
"variables": {"urn": group_urn},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
assert res_data
assert "errors" not in res_data
assert res_data["data"]
assert res_data["data"]["corpGroup"]
assert res_data["data"]["corpGroup"]["editableProperties"]
assert res_data["data"]["corpGroup"]["editableProperties"] == {
"description": "My test description",
"slack": "test_group_slack",
"email": "test_group_email@email.com",
}
# Reset the editable properties
json = {
"query": """mutation updateCorpGroupProperties($urn: String!, $input: CorpGroupUpdateInput!) {\n
updateCorpGroupProperties(urn: $urn, input: $input) { urn } }""",
"variables": {
"urn": group_urn,
"input": {"description": "", "slack": "", "email": ""},
},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
@pytest.mark.dependency(
depends=[
"test_update_corp_group_properties",
]
)
def test_update_corp_group_description(auth_session):
group_urn = "urn:li:corpGroup:bfoo"
# Update Corp Group Description
json = {
"query": """mutation updateDescription($input: DescriptionUpdateInput!) {\n
updateDescription(input: $input) }""",
"variables": {
"input": {"description": "My test description", "resourceUrn": group_urn},
},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
print(res_data)
assert "errors" not in res_data
assert res_data["data"]["updateDescription"] is True
# Verify the description has been updated
json = {
"query": """query corpGroup($urn: String!) {\n
corpGroup(urn: $urn) {\n
urn\n
editableProperties {\n
description\n
}\n
}\n
}""",
"variables": {"urn": group_urn},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
assert res_data
assert "errors" not in res_data
assert res_data["data"]
assert res_data["data"]["corpGroup"]
assert res_data["data"]["corpGroup"]["editableProperties"]
assert (
res_data["data"]["corpGroup"]["editableProperties"]["description"]
== "My test description"
)
# Reset Corp Group Description
json = {
"query": """mutation updateDescription($input: DescriptionUpdateInput!) {\n
updateDescription(input: $input) }""",
"variables": {
"input": {"description": "", "resourceUrn": group_urn},
},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
@pytest.mark.dependency(
depends=[
"test_list_groups",
"test_add_remove_members_from_group",
]
)
def test_remove_user(auth_session):
json = {
"query": """mutation removeUser($urn: String!) {\n
removeUser(urn: $urn) }""",
"variables": {"urn": "urn:li:corpuser:jdoe"},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
json = {
"query": """query corpUser($urn: String!) {\n
corpUser(urn: $urn) {\n
urn\n
properties {\n
firstName\n
}\n
}\n
}""",
"variables": {"urn": "urn:li:corpuser:jdoe"},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
assert res_data
assert "errors" not in res_data
assert res_data["data"]
assert res_data["data"]["corpUser"]
assert res_data["data"]["corpUser"]["properties"] is None
@pytest.mark.dependency(
depends=[
"test_list_groups",
"test_add_remove_members_from_group",
]
)
def test_remove_group(auth_session):
group_urn = "urn:li:corpGroup:bfoo"
json = {
"query": """mutation removeGroup($urn: String!) {\n
removeGroup(urn: $urn) }""",
"variables": {"urn": group_urn},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
_ensure_group_not_present(auth_session, group_urn)
@pytest.mark.dependency(
depends=[
"test_list_groups",
"test_remove_group",
]
)
def test_create_group(auth_session):
json = {
"query": """mutation createGroup($input: CreateGroupInput!) {\n
createGroup(input: $input) }""",
"variables": {
"input": {
"id": "test-id",
"name": "Test Group",
"description": "My test group",
}
},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
json = {
"query": """query corpGroup($urn: String!) {\n
corpGroup(urn: $urn) {\n
urn\n
properties {\n
displayName\n
}\n
}\n
}""",
"variables": {"urn": "urn:li:corpGroup:test-id"},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["corpGroup"]
assert res_data["data"]["corpGroup"]["properties"]["displayName"] == "Test Group"
def test_home_page_recommendations(auth_session):
min_expected_recommendation_modules = 0
json = {
"query": """query listRecommendations($input: ListRecommendationsInput!) {\n
listRecommendations(input: $input) { modules { title } } }""",
"variables": {
"input": {
"userUrn": get_root_urn(),
"requestContext": {"scenario": "HOME"},
"limit": 5,
}
},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
print(res_data)
assert res_data
assert res_data["data"]
assert res_data["data"]["listRecommendations"]
assert "errors" not in res_data
assert (
len(res_data["data"]["listRecommendations"]["modules"])
> min_expected_recommendation_modules
)
def test_search_results_recommendations(auth_session):
# This test simply ensures that the recommendations endpoint does not return an error.
json = {
"query": """query listRecommendations($input: ListRecommendationsInput!) {\n
listRecommendations(input: $input) { modules { title } } }""",
"variables": {
"input": {
"userUrn": get_root_urn(),
"requestContext": {
"scenario": "SEARCH_RESULTS",
"searchRequestContext": {"query": "asdsdsdds", "filters": []},
},
"limit": 5,
}
},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
assert res_data
assert "errors" not in res_data
def test_generate_personal_access_token(auth_session):
# Test success case
json = {
"query": """query getAccessToken($input: GetAccessTokenInput!) {\n
getAccessToken(input: $input) {\n
accessToken\n
}\n
}""",
"variables": {
"input": {
"type": "PERSONAL",
"actorUrn": get_root_urn(),
"duration": "ONE_MONTH",
}
},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["getAccessToken"]["accessToken"] is not None
assert "errors" not in res_data
# Test unauthenticated case
json = {
"query": """query getAccessToken($input: GetAccessTokenInput!) {\n
getAccessToken(input: $input) {\n
accessToken\n
}\n
}""",
"variables": {
"input": {
"type": "PERSONAL",
"actorUrn": "urn:li:corpuser:jsmith",
"duration": "ONE_DAY",
}
},
}
response = auth_session.post(f"{auth_session.frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
assert res_data
assert "errors" in res_data # Assert the request fails
def test_native_user_endpoints(auth_session):
# Sign up tests
frontend_session = get_frontend_session()
# Test getting the invite token
get_invite_token_json = {
"query": """query getInviteToken($input: GetInviteTokenInput!) {\n
getInviteToken(input: $input){\n
inviteToken\n
}\n
}""",
"variables": {"input": {}},
}
get_invite_token_response = frontend_session.post(
f"{auth_session.frontend_url()}/api/v2/graphql", json=get_invite_token_json
)
get_invite_token_response.raise_for_status()
get_invite_token_res_data = get_invite_token_response.json()
assert get_invite_token_res_data
assert get_invite_token_res_data["data"]
invite_token = get_invite_token_res_data["data"]["getInviteToken"]["inviteToken"]
assert invite_token is not None
assert "errors" not in get_invite_token_res_data
# Pass the invite token when creating the user
sign_up_json = {
"fullName": "Test User",
"email": "test@email.com",
"password": "password",
"title": "Date Engineer",
"inviteToken": invite_token,
}
sign_up_response = frontend_session.post(
f"{auth_session.frontend_url()}/signUp", json=sign_up_json
)
assert sign_up_response
assert "errors" not in sign_up_response
# Creating the same user again fails
same_user_sign_up_response = frontend_session.post(
f"{auth_session.frontend_url()}/signUp", json=sign_up_json
)
assert not same_user_sign_up_response
# Test that a bad invite token leads to failed sign up
bad_sign_up_json = {
"fullName": "Test2 User",
"email": "test2@email.com",
"password": "password",
"title": "Date Engineer",
"inviteToken": "invite_token",
}
bad_sign_up_response = frontend_session.post(
f"{auth_session.frontend_url()}/signUp", json=bad_sign_up_json
)
assert not bad_sign_up_response
frontend_session.cookies.clear()
# Reset credentials tests
# Log in as root again
headers = {
"Content-Type": "application/json",
}
username, password = get_admin_credentials()
root_login_data = '{"username":"' + username + '", "password":"' + password + '"}'
frontend_session.post(
f"{auth_session.frontend_url()}/logIn", headers=headers, data=root_login_data
)
# Test creating the password reset token
create_reset_token_json = {
"query": """mutation createNativeUserResetToken($input: CreateNativeUserResetTokenInput!) {\n
createNativeUserResetToken(input: $input) {\n
resetToken\n
}\n
}""",
"variables": {"input": {"userUrn": "urn:li:corpuser:test@email.com"}},
}
create_reset_token_response = frontend_session.post(
f"{auth_session.frontend_url()}/api/v2/graphql", json=create_reset_token_json
)
create_reset_token_response.raise_for_status()
create_reset_token_res_data = create_reset_token_response.json()
assert create_reset_token_res_data
assert create_reset_token_res_data["data"]
reset_token = create_reset_token_res_data["data"]["createNativeUserResetToken"][
"resetToken"
]
assert reset_token is not None
assert "errors" not in create_reset_token_res_data
# Pass the reset token when resetting credentials
reset_credentials_json = {
"email": "test@email.com",
"password": "newpassword",
"resetToken": reset_token,
}
reset_credentials_response = frontend_session.post(
f"{auth_session.frontend_url()}/resetNativeUserCredentials", json=reset_credentials_json
)
assert reset_credentials_response
assert "errors" not in reset_credentials_response
# Test that a bad reset token leads to failed response
bad_user_reset_credentials_json = {
"email": "test@email.com",
"password": "newerpassword",
"resetToken": "reset_token",
}
bad_reset_credentials_response = frontend_session.post(
f"{auth_session.frontend_url()}/resetNativeUserCredentials",
json=bad_user_reset_credentials_json,
)
assert not bad_reset_credentials_response
# Test that only a native user can reset their password
jaas_user_reset_credentials_json = {
"email": "datahub",
"password": "password",
"resetToken": reset_token,
}
jaas_user_reset_credentials_response = frontend_session.post(
f"{auth_session.frontend_url()}/resetNativeUserCredentials",
json=jaas_user_reset_credentials_json,
)
assert not jaas_user_reset_credentials_response
# Tests that unauthenticated users can't invite users or send reset password links
unauthenticated_session = requests.Session()
unauthenticated_get_invite_token_response = unauthenticated_session.post(
f"{auth_session.frontend_url()}/api/v2/graphql", json=get_invite_token_json
)
assert (
unauthenticated_get_invite_token_response.status_code == HTTPStatus.UNAUTHORIZED
)
unauthenticated_create_reset_token_json = {
"query": """mutation createNativeUserResetToken($input: CreateNativeUserResetTokenInput!) {\n
createNativeUserResetToken(input: $input) {\n
resetToken\n
}\n
}""",
"variables": {"input": {"userUrn": "urn:li:corpuser:test@email.com"}},
}
unauthenticated_create_reset_token_response = unauthenticated_session.post(
f"{auth_session.frontend_url()}/api/v2/graphql",
json=unauthenticated_create_reset_token_json,
)
assert (
unauthenticated_create_reset_token_response.status_code
== HTTPStatus.UNAUTHORIZED
)
# cleanup steps
json = {
"query": """mutation removeUser($urn: String!) {\n
removeUser(urn: $urn) }""",
"variables": {"urn": "urn:li:corpuser:test@email.com"},
}
frontend_session.post(
f"{auth_session.frontend_url()}/logIn", headers=headers, data=root_login_data
)
remove_user_response = frontend_session.post(
f"{auth_session.frontend_url()}/api/v2/graphql", json=json
)
remove_user_response.raise_for_status()
assert "errors" not in remove_user_response