datahub/smoke-test/tests/tokens/revokable_access_token_test.py

339 lines
13 KiB
Python
Raw Normal View History

import pytest
import time
import requests
from tests.utils import FRONTEND_ENDPOINT
from time import sleep
from tests.utils import ingest_file_via_rest
from datahub.cli.ingest_cli import get_session_and_host
@pytest.fixture(autouse=True)
def test_setup():
"""Fixture to execute asserts before and after a test is run"""
admin_session = loginAs("datahub", "datahub")
res_data = listAccessTokens(admin_session)
assert res_data
assert res_data["data"]
assert res_data["data"]["listAccessTokens"]["total"] == 0
assert not res_data["data"]["listAccessTokens"]["tokens"]
ingest_file_via_rest("tests/tokens/revokable_test_data.json")
sleep(5)
yield
sleep(5)
# Clean up
res_data = listAccessTokens(admin_session)
for metadata in res_data["data"]["listAccessTokens"]["tokens"]:
revokeAccessToken(admin_session, metadata["id"])
@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"])
def test_admin_can_create_list_and_revoke_tokens():
admin_session = loginAs("datahub", "datahub")
# Using a super account, there should be no tokens
res_data = listAccessTokens(admin_session)
assert res_data
assert res_data["data"]
assert res_data["data"]["listAccessTokens"]["total"] is not None
assert len(res_data["data"]["listAccessTokens"]["tokens"]) == 0
# Using a super account, generate a token for itself.
res_data = generateAccessToken_v2(admin_session, "urn:li:corpuser:datahub")
assert res_data
assert res_data["data"]
assert res_data["data"]["createAccessToken"]
assert res_data["data"]["createAccessToken"]["accessToken"]
assert res_data["data"]["createAccessToken"]["metadata"]["actorUrn"] == "urn:li:corpuser:datahub"
admin_tokenId = res_data["data"]["createAccessToken"]["metadata"]["id"]
# Using a super account, list the previously created token.
res_data = listAccessTokens(admin_session)
assert res_data
assert res_data["data"]
assert res_data["data"]["listAccessTokens"]["total"] is not None
assert len(res_data["data"]["listAccessTokens"]["tokens"]) == 1
assert res_data["data"]["listAccessTokens"]["tokens"][1]["actorUrn"] == "urn:li:corpuser:datahub"
assert res_data["data"]["listAccessTokens"]["tokens"][1]["ownerUrn"] == "urn:li:corpuser:datahub"
# Check that the super account can revoke tokens that it created
res_data = revokeAccessToken(admin_session, admin_tokenId)
assert res_data
assert res_data["data"]
assert res_data["data"]["revokeAccessToken"]
assert res_data["data"]["revokeAccessToken"] == True
# Using a super account, there should be no tokens
res_data = listAccessTokens(admin_session)
assert res_data
assert res_data["data"]
assert res_data["data"]["listAccessTokens"]["total"] is not None
assert len(res_data["data"]["listAccessTokens"]["tokens"]) == 0
@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"])
def test_admin_can_create_and_revoke_tokens_for_other_user():
admin_session = loginAs("datahub", "datahub")
# Using a super account, there should be no tokens
res_data = listAccessTokens(admin_session)
assert res_data
assert res_data["data"]
assert res_data["data"]["listAccessTokens"]["total"] is not None
assert len(res_data["data"]["listAccessTokens"]["tokens"]) == 0
# Using a super account, generate a token for another user.
res_data = generateAccessToken_v2(admin_session, "urn:li:corpuser:user")
assert res_data
assert res_data["data"]
assert res_data["data"]["createAccessToken"]
assert res_data["data"]["createAccessToken"]["accessToken"]
assert res_data["data"]["createAccessToken"]["metadata"]["actorUrn"] == "urn:li:corpuser:user"
user_tokenId = res_data["data"]["createAccessToken"]["metadata"]["id"]
# Using a super account, list the previously created tokens.
res_data = listAccessTokens(admin_session)
assert res_data
assert res_data["data"]
assert res_data["data"]["listAccessTokens"]["total"] is not None
assert len(res_data["data"]["listAccessTokens"]["tokens"]) == 1
assert res_data["data"]["listAccessTokens"]["tokens"][0]["actorUrn"] == "urn:li:corpuser:user"
assert res_data["data"]["listAccessTokens"]["tokens"][0]["ownerUrn"] == "urn:li:corpuser:datahub"
# Check that the super account can revoke tokens that it created for another user
res_data = revokeAccessToken(admin_session, user_tokenId)
assert res_data
assert res_data["data"]
assert res_data["data"]["revokeAccessToken"]
assert res_data["data"]["revokeAccessToken"] == True
# Using a super account, there should be no tokens
res_data = listAccessTokens(admin_session)
assert res_data
assert res_data["data"]
assert res_data["data"]["listAccessTokens"]["total"] is not None
assert len(res_data["data"]["listAccessTokens"]["tokens"]) == 0
"""
@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"])
def test_non_admin_can_create_list_revoke_tokens():
user_session = loginAs("user", "user")
admin_session = loginAs("datahub", "datahub")
# Normal user should be able to generate token for himself.
res_data = generateAccessToken_v2(user_session, "urn:li:corpuser:user")
assert res_data
assert res_data["data"]
assert res_data["data"]["createAccessToken"]
assert res_data["data"]["createAccessToken"]["accessToken"]
assert res_data["data"]["createAccessToken"]["metadata"]["actorUrn"] == "urn:li:corpuser:user"
user_tokenId = res_data["data"]["createAccessToken"]["metadata"]["id"]
# User should be able to list his own token
res_data = listAccessTokens(user_session, [{"field": "actorUrn","value": "urn:li:corpuser:user"}])
assert res_data
assert res_data["data"]
assert res_data["data"]["listAccessTokens"]["total"] is not None
assert len(res_data["data"]["listAccessTokens"]["tokens"]) == 1
assert res_data["data"]["listAccessTokens"]["tokens"][0]["actorUrn"] == "urn:li:corpuser:user"
assert res_data["data"]["listAccessTokens"]["tokens"][0]["ownerUrn"] == "urn:li:corpuser:user"
assert res_data["data"]["listAccessTokens"]["tokens"][0]["id"] == user_tokenId
# User should be able to revoke his own token
res_data = revokeAccessToken(user_session, user_tokenId)
assert res_data
assert res_data["data"]
assert res_data["data"]["revokeAccessToken"]
assert res_data["data"]["revokeAccessToken"] == True
# Using a normal account, check that all its tokens where removed.
res_data = listAccessTokens(user_session, [{"field": "actorUrn","value": "urn:li:corpuser:user"}])
assert res_data
assert res_data["data"]
assert res_data["data"]["listAccessTokens"]["total"] is not None
assert len(res_data["data"]["listAccessTokens"]["tokens"]) == 0
@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"])
def test_admin_can_manage_tokens_generated_by_other_user():
user_session = loginAs("user", "user")
admin_session = loginAs("datahub", "datahub")
# Using a super account, there should be no tokens
res_data = listAccessTokens(admin_session)
assert res_data
assert res_data["data"]
assert res_data["data"]["listAccessTokens"]["total"] is not None
assert len(res_data["data"]["listAccessTokens"]["tokens"]) == 0
# Normal user should be able to generate token for himself.
res_data = generateAccessToken_v2(user_session, "urn:li:corpuser:user")
assert res_data
assert res_data["data"]
assert res_data["data"]["createAccessToken"]
assert res_data["data"]["createAccessToken"]["accessToken"]
assert res_data["data"]["createAccessToken"]["metadata"]["actorUrn"] == "urn:li:corpuser:user"
user_tokenId = res_data["data"]["createAccessToken"]["metadata"]["id"]
# Admin should be able to list other tokens
res_data = listAccessTokens(admin_session, [{"field": "actorUrn","value": "urn:li:corpuser:user"}])
assert res_data
assert res_data["data"]
assert res_data["data"]["listAccessTokens"]["total"] is not None
assert len(res_data["data"]["listAccessTokens"]["tokens"]) == 1
assert res_data["data"]["listAccessTokens"]["tokens"][0]["actorUrn"] == "urn:li:corpuser:user"
assert res_data["data"]["listAccessTokens"]["tokens"][0]["ownerUrn"] == "urn:li:corpuser:user"
assert res_data["data"]["listAccessTokens"]["tokens"][0]["id"] == user_tokenId
# Admin can delete token created by someone else.
res_data = revokeAccessToken(admin_session, user_tokenId)
assert res_data
assert res_data["data"]
assert res_data["data"]["revokeAccessToken"]
assert res_data["data"]["revokeAccessToken"] == True
# Using a normal account, check that all its tokens where removed.
res_data = listAccessTokens(user_session, [{"field": "actorUrn","value": "urn:li:corpuser:user"}])
assert res_data
assert res_data["data"]
assert res_data["data"]["listAccessTokens"]["total"] is not None
assert len(res_data["data"]["listAccessTokens"]["tokens"]) == 0
# Using the super account, check that all tokens where removed.
res_data = listAccessTokens(admin_session, [{"field": "actorUrn","value": "urn:li:corpuser:user"}])
assert res_data
assert res_data["data"]
assert res_data["data"]["listAccessTokens"]["total"] is not None
assert len(res_data["data"]["listAccessTokens"]["tokens"]) == 0
@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"])
def test_non_admin_can_not_generate_tokens_for_others():
user_session = loginAs("user", "user")
# Normal user should not be able to generate token for another user
res_data = generateAccessToken_v2(user_session, "urn:li:corpuser:datahub")
assert res_data
assert res_data["errors"]
assert res_data["errors"][0]["message"] == "Unauthorized to perform this action. Please contact your DataHub administrator."
"""
def generateAccessToken_v1(session, actorUrn):
# Create new token
json = {
"query": """query getAccessToken($input: GetAccessTokenInput!) {\n
getAccessToken(input: $input) {\n
accessToken\n
}\n
}""",
"variables": {
"input": {
"type": "PERSONAL",
"actorUrn": actorUrn,
"duration": "ONE_HOUR"
}
}
}
response = session.post(
f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json
)
response.raise_for_status()
return response.json()
def generateAccessToken_v2(session, actorUrn):
# Create new token
json = {
"query": """mutation createAccessToken($input: CreateAccessTokenInput!) {\n
createAccessToken(input: $input) {\n
accessToken\n
metadata {\n
id\n
actorUrn\n
ownerUrn\n
name\n
description\n
}
}\n
}""",
"variables": {
"input": {
"type": "PERSONAL",
"actorUrn": actorUrn,
"duration": "ONE_HOUR",
"name": "my token"
}
}
}
response = session.post(
f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json
)
response.raise_for_status()
sleep(5)
return response.json()
def listAccessTokens(session, filters=[]):
# Get count of existing tokens
input = {
"start": "0",
"count": "20",
}
if filters:
input['filters'] = filters
json = {
"query": """query listAccessTokens($input: ListAccessTokenInput!) {\n
listAccessTokens(input: $input) {\n
start\n
count\n
total\n
tokens {\n
urn\n
id\n
actorUrn\n
ownerUrn\n
}\n
}\n
}""",
"variables": {
"input": input
}
}
response = session.post(
f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json
)
response.raise_for_status()
return response.json()
def revokeAccessToken(session, tokenId):
# Revoke token
json = {
"query": """mutation revokeAccessToken($tokenId: String!) {\n
revokeAccessToken(tokenId: $tokenId)
}""",
"variables": {
"tokenId": tokenId
}
}
response = session.post(
f"{FRONTEND_ENDPOINT}/api/v2/graphql", json=json
)
sleep(5)
response.raise_for_status()
return response.json()
def loginAs(username, password):
session = requests.Session()
headers = {
"Content-Type": "application/json",
}
data = '{"username":"' + username +'", "password":"' + password + '"}'
response = session.post(
f"{FRONTEND_ENDPOINT}/logIn", headers=headers, data=data
)
response.raise_for_status()
return session