datahub/smoke-test/tests/tokens/revokable_access_token_test.py

497 lines
17 KiB
Python
Raw Permalink Normal View History

import os
2023-10-10 16:08:34 +05:30
import pytest
from tests.utils import (
get_admin_credentials,
get_frontend_url,
login_as,
wait_for_writes_to_sync,
)
from .token_utils import listUsers, removeUser
pytestmark = pytest.mark.no_cypress_suite1
# Disable telemetry
os.environ["DATAHUB_TELEMETRY_ENABLED"] = "false"
(admin_user, admin_pass) = get_admin_credentials()
@pytest.fixture()
def auth_exclude_filter():
return {
"field": "name",
"condition": "EQUAL",
"negated": True,
"values": ["Test Session Token"],
}
@pytest.fixture(scope="class", autouse=True)
def custom_user_setup():
"""Fixture to execute setup before and tear down after all tests are run"""
admin_session = login_as(admin_user, admin_pass)
res_data = removeUser(admin_session, "urn:li:corpuser:user")
assert res_data
assert "error" not in res_data
# Test getting the invite token
get_invite_token_json = {
"query": """query getInviteToken($input: GetInviteTokenInput!) {
getInviteToken(input: $input){
inviteToken
}
}""",
"variables": {"input": {}},
}
get_invite_token_response = admin_session.post(
f"{get_frontend_url()}/api/v2/graphql", json=get_invite_token_json
)
get_invite_token_response.raise_for_status()
get_invite_token_res_data = get_invite_token_response.json()
assert get_invite_token_res_data
assert get_invite_token_res_data["data"]
invite_token = get_invite_token_res_data["data"]["getInviteToken"]["inviteToken"]
assert invite_token is not None
assert "error" not in invite_token
# Pass the invite token when creating the user
sign_up_json = {
"fullName": "Test User",
"email": "user",
"password": "user",
"title": "Date Engineer",
"inviteToken": invite_token,
}
sign_up_response = admin_session.post(
f"{get_frontend_url()}/signUp", json=sign_up_json
)
sign_up_response.raise_for_status()
assert sign_up_response
assert "error" not in sign_up_response
# Sleep for eventual consistency
wait_for_writes_to_sync()
# signUp will override the session cookie to the new user to be signed up.
admin_session.cookies.clear()
admin_session = login_as(admin_user, admin_pass)
# Make user created user is there.
res_data = listUsers(admin_session)
assert res_data["data"]
assert res_data["data"]["listUsers"]
assert {"username": "user"} in res_data["data"]["listUsers"]["users"]
yield
# Delete created user
res_data = removeUser(admin_session, "urn:li:corpuser:user")
assert res_data
assert res_data["data"]
assert res_data["data"]["removeUser"] is True
# Sleep for eventual consistency
wait_for_writes_to_sync()
# Make user created user is not there.
res_data = listUsers(admin_session)
assert res_data["data"]
assert res_data["data"]["listUsers"]
assert {"username": "user"} not in res_data["data"]["listUsers"]["users"]
@pytest.fixture(autouse=True)
def access_token_setup(auth_session, auth_exclude_filter):
"""Fixture to execute asserts before and after a test is run"""
admin_session = login_as(admin_user, admin_pass)
res_data = listAccessTokens(admin_session, filters=[auth_exclude_filter])
assert res_data
assert res_data["data"]
assert res_data["data"]["listAccessTokens"]["total"] == 0
assert not res_data["data"]["listAccessTokens"]["tokens"]
yield
# Clean up
res_data = listAccessTokens(admin_session, filters=[auth_exclude_filter])
for metadata in res_data["data"]["listAccessTokens"]["tokens"]:
revokeAccessToken(admin_session, metadata["id"])
def test_admin_can_create_list_and_revoke_tokens(auth_exclude_filter):
admin_session = login_as(admin_user, admin_pass)
admin_user_urn = f"urn:li:corpuser:{admin_user}"
# Using a super account, there should be no tokens
res_data = listAccessTokens(admin_session, filters=[auth_exclude_filter])
assert res_data
assert res_data["data"]
assert res_data["data"]["listAccessTokens"]["total"] is not None
assert len(res_data["data"]["listAccessTokens"]["tokens"]) == 0
# Using a super account, generate a token for itself.
res_data = generateAccessToken_v2(admin_session, admin_user_urn)
assert res_data
assert res_data["data"]
assert res_data["data"]["createAccessToken"]
assert res_data["data"]["createAccessToken"]["accessToken"]
assert (
res_data["data"]["createAccessToken"]["metadata"]["actorUrn"] == admin_user_urn
)
access_token = res_data["data"]["createAccessToken"]["accessToken"]
admin_tokenId = res_data["data"]["createAccessToken"]["metadata"]["id"]
# Sleep for eventual consistency
wait_for_writes_to_sync()
res_data = getAccessTokenMetadata(admin_session, access_token)
assert res_data
assert res_data["data"]
assert res_data["data"]["getAccessTokenMetadata"]
assert res_data["data"]["getAccessTokenMetadata"]["ownerUrn"] == admin_user_urn
assert res_data["data"]["getAccessTokenMetadata"]["actorUrn"] == admin_user_urn
# Using a super account, list the previously created token.
res_data = listAccessTokens(admin_session, filters=[auth_exclude_filter])
assert res_data
assert res_data["data"]
assert res_data["data"]["listAccessTokens"]["total"] is not None
assert len(res_data["data"]["listAccessTokens"]["tokens"]) == 1
assert (
res_data["data"]["listAccessTokens"]["tokens"][0]["actorUrn"] == admin_user_urn
)
assert (
res_data["data"]["listAccessTokens"]["tokens"][0]["ownerUrn"] == admin_user_urn
)
# Check that the super account can revoke tokens that it created
res_data = revokeAccessToken(admin_session, admin_tokenId)
assert res_data
assert res_data["data"]
assert res_data["data"]["revokeAccessToken"]
assert res_data["data"]["revokeAccessToken"] is True
# Using a super account, there should be no tokens
res_data = listAccessTokens(admin_session, filters=[auth_exclude_filter])
assert res_data
assert res_data["data"]
assert res_data["data"]["listAccessTokens"]["total"] is not None
assert len(res_data["data"]["listAccessTokens"]["tokens"]) == 0
def test_admin_can_create_and_revoke_tokens_for_other_user(auth_exclude_filter):
admin_session = login_as(admin_user, admin_pass)
# Using a super account, there should be no tokens
res_data = listAccessTokens(admin_session, filters=[auth_exclude_filter])
assert res_data
assert res_data["data"]
assert res_data["data"]["listAccessTokens"]["total"] is not None
assert len(res_data["data"]["listAccessTokens"]["tokens"]) == 0
# Using a super account, generate a token for another user.
res_data = generateAccessToken_v2(admin_session, "urn:li:corpuser:user")
assert res_data
assert res_data["data"]
assert res_data["data"]["createAccessToken"]
assert res_data["data"]["createAccessToken"]["accessToken"]
assert (
res_data["data"]["createAccessToken"]["metadata"]["actorUrn"]
== "urn:li:corpuser:user"
)
user_tokenId = res_data["data"]["createAccessToken"]["metadata"]["id"]
# Sleep for eventual consistency
wait_for_writes_to_sync()
# Using a super account, list the previously created tokens.
res_data = listAccessTokens(admin_session, filters=[auth_exclude_filter])
assert res_data
assert res_data["data"]
assert res_data["data"]["listAccessTokens"]["total"] is not None
assert len(res_data["data"]["listAccessTokens"]["tokens"]) == 1
assert (
res_data["data"]["listAccessTokens"]["tokens"][0]["actorUrn"]
== "urn:li:corpuser:user"
)
assert (
res_data["data"]["listAccessTokens"]["tokens"][0]["ownerUrn"]
== f"urn:li:corpuser:{admin_user}"
)
# Check that the super account can revoke tokens that it created for another user
res_data = revokeAccessToken(admin_session, user_tokenId)
assert res_data
assert res_data["data"]
assert res_data["data"]["revokeAccessToken"]
assert res_data["data"]["revokeAccessToken"] is True
# Using a super account, there should be no tokens
res_data = listAccessTokens(admin_session, filters=[auth_exclude_filter])
assert res_data
assert res_data["data"]
assert res_data["data"]["listAccessTokens"]["total"] is not None
assert len(res_data["data"]["listAccessTokens"]["tokens"]) == 0
def test_non_admin_can_create_list_revoke_tokens(auth_exclude_filter):
user_session = login_as("user", "user")
# Normal user should be able to generate token for himself.
res_data = generateAccessToken_v2(user_session, "urn:li:corpuser:user")
assert res_data
assert res_data["data"]
assert res_data["data"]["createAccessToken"]
assert res_data["data"]["createAccessToken"]["accessToken"]
assert (
res_data["data"]["createAccessToken"]["metadata"]["actorUrn"]
== "urn:li:corpuser:user"
)
user_tokenId = res_data["data"]["createAccessToken"]["metadata"]["id"]
# Sleep for eventual consistency
wait_for_writes_to_sync()
# User should be able to list his own token
res_data = listAccessTokens(
user_session,
[
{"field": "ownerUrn", "values": ["urn:li:corpuser:user"]},
auth_exclude_filter,
],
)
assert res_data
assert res_data["data"]
assert res_data["data"]["listAccessTokens"]["total"] is not None
assert len(res_data["data"]["listAccessTokens"]["tokens"]) == 1
assert (
res_data["data"]["listAccessTokens"]["tokens"][0]["actorUrn"]
== "urn:li:corpuser:user"
)
assert (
res_data["data"]["listAccessTokens"]["tokens"][0]["ownerUrn"]
== "urn:li:corpuser:user"
)
assert res_data["data"]["listAccessTokens"]["tokens"][0]["id"] == user_tokenId
# User should be able to revoke his own token
res_data = revokeAccessToken(user_session, user_tokenId)
assert res_data
assert res_data["data"]
assert res_data["data"]["revokeAccessToken"]
assert res_data["data"]["revokeAccessToken"] is True
# Using a normal account, check that all its tokens where removed.
res_data = listAccessTokens(
user_session,
[
{"field": "ownerUrn", "values": ["urn:li:corpuser:user"]},
auth_exclude_filter,
],
)
assert res_data
assert res_data["data"]
assert res_data["data"]["listAccessTokens"]["total"] is not None
assert len(res_data["data"]["listAccessTokens"]["tokens"]) == 0
def test_admin_can_manage_tokens_generated_by_other_user(auth_exclude_filter):
admin_session = login_as(admin_user, admin_pass)
# Using a super account, there should be no tokens
res_data = listAccessTokens(admin_session, filters=[auth_exclude_filter])
assert res_data
assert res_data["data"]
assert res_data["data"]["listAccessTokens"]["total"] is not None
assert len(res_data["data"]["listAccessTokens"]["tokens"]) == 0
admin_session.cookies.clear()
user_session = login_as("user", "user")
res_data = generateAccessToken_v2(user_session, "urn:li:corpuser:user")
assert res_data
assert res_data["data"]
assert res_data["data"]["createAccessToken"]
assert res_data["data"]["createAccessToken"]["accessToken"]
assert (
res_data["data"]["createAccessToken"]["metadata"]["actorUrn"]
== "urn:li:corpuser:user"
)
assert (
res_data["data"]["createAccessToken"]["metadata"]["ownerUrn"]
== "urn:li:corpuser:user"
)
user_tokenId = res_data["data"]["createAccessToken"]["metadata"]["id"]
# Sleep for eventual consistency
wait_for_writes_to_sync()
# Admin should be able to list other tokens
user_session.cookies.clear()
admin_session = login_as(admin_user, admin_pass)
res_data = listAccessTokens(
admin_session,
[
{"field": "ownerUrn", "values": ["urn:li:corpuser:user"]},
auth_exclude_filter,
],
)
assert res_data
assert res_data["data"]
assert res_data["data"]["listAccessTokens"]["total"] is not None
assert len(res_data["data"]["listAccessTokens"]["tokens"]) == 1
assert (
res_data["data"]["listAccessTokens"]["tokens"][0]["actorUrn"]
== "urn:li:corpuser:user"
)
assert (
res_data["data"]["listAccessTokens"]["tokens"][0]["ownerUrn"]
== "urn:li:corpuser:user"
)
assert res_data["data"]["listAccessTokens"]["tokens"][0]["id"] == user_tokenId
# Admin can delete token created by someone else.
admin_session.cookies.clear()
admin_session = login_as(admin_user, admin_pass)
res_data = revokeAccessToken(admin_session, user_tokenId)
assert res_data
assert res_data["data"]
assert res_data["data"]["revokeAccessToken"]
assert res_data["data"]["revokeAccessToken"] is True
# Using a normal account, check that all its tokens where removed.
user_session.cookies.clear()
user_session = login_as("user", "user")
res_data = listAccessTokens(
user_session,
[
{"field": "ownerUrn", "values": ["urn:li:corpuser:user"]},
auth_exclude_filter,
],
)
assert res_data
assert res_data["data"]
assert res_data["data"]["listAccessTokens"]["total"] is not None
assert len(res_data["data"]["listAccessTokens"]["tokens"]) == 0
# Using the super account, check that all tokens where removed.
admin_session = login_as(admin_user, admin_pass)
res_data = listAccessTokens(
admin_session,
[
{"field": "ownerUrn", "values": ["urn:li:corpuser:user"]},
auth_exclude_filter,
],
)
assert res_data
assert res_data["data"]
assert res_data["data"]["listAccessTokens"]["total"] is not None
assert len(res_data["data"]["listAccessTokens"]["tokens"]) == 0
def test_non_admin_can_not_generate_tokens_for_others():
user_session = login_as("user", "user")
# Normal user should not be able to generate token for another user
res_data = generateAccessToken_v2(user_session, f"urn:li:corpuser:{admin_user}")
assert res_data
assert res_data["errors"]
assert (
res_data["errors"][0]["message"]
== "Unauthorized to perform this action. Please contact your DataHub administrator."
)
def generateAccessToken_v2(session, actorUrn):
# Create new token
json = {
"query": """mutation createAccessToken($input: CreateAccessTokenInput!) {
createAccessToken(input: $input) {
accessToken
metadata {
id
actorUrn
ownerUrn
name
description
}
}
}""",
"variables": {
"input": {
"type": "PERSONAL",
"actorUrn": actorUrn,
"duration": "ONE_HOUR",
"name": "my token",
}
},
}
response = session.post(f"{get_frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
return response.json()
def listAccessTokens(session, filters):
if filters is None:
filters = []
# Get count of existing tokens
input = {"start": 0, "count": 20}
if filters:
input["filters"] = filters
json = {
"query": """query listAccessTokens($input: ListAccessTokenInput!) {
listAccessTokens(input: $input) {
start
count
total
tokens {
urn
id
actorUrn
ownerUrn
}
}
}""",
"variables": {"input": input},
}
response = session.post(f"{get_frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
return response.json()
def revokeAccessToken(session, tokenId):
# Revoke token
json = {
"query": """mutation revokeAccessToken($tokenId: String!) {
revokeAccessToken(tokenId: $tokenId)
}""",
"variables": {"tokenId": tokenId},
}
response = session.post(f"{get_frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
return response.json()
def getAccessTokenMetadata(session, token):
json = {
"query": """
query getAccessTokenMetadata($token: String!) {
getAccessTokenMetadata(token: $token) {
id
ownerUrn
actorUrn
}
}""",
"variables": {"token": token},
}
response = session.post(f"{get_frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
return response.json()