mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-21 08:32:05 +00:00
553 lines
19 KiB
Python
553 lines
19 KiB
Python
import os
|
|
import time
|
|
from typing import List
|
|
|
|
import pytest
|
|
|
|
from tests.tokens.token_utils import listUsers, removeUser
|
|
from tests.utils import (
|
|
get_admin_credentials,
|
|
get_frontend_url,
|
|
login_as,
|
|
wait_for_writes_to_sync,
|
|
)
|
|
|
|
pytestmark = pytest.mark.no_cypress_suite1
|
|
|
|
# Disable telemetry
|
|
os.environ["DATAHUB_TELEMETRY_ENABLED"] = "false"
|
|
|
|
(admin_user, admin_pass) = get_admin_credentials()
|
|
|
|
|
|
@pytest.fixture()
|
|
def auth_exclude_filter():
|
|
return {
|
|
"field": "name",
|
|
"condition": "EQUAL",
|
|
"negated": True,
|
|
"values": ["Test Session Token"],
|
|
}
|
|
|
|
|
|
@pytest.fixture(scope="class", autouse=True)
|
|
def custom_user_setup():
|
|
admin_session = login_as(admin_user, admin_pass)
|
|
try:
|
|
"""Fixture to execute setup before and tear down after all tests are run"""
|
|
res_data = removeUser(admin_session, "urn:li:corpuser:user")
|
|
assert res_data
|
|
assert "error" not in res_data
|
|
|
|
# Test getting the invite token
|
|
get_invite_token_json = {
|
|
"query": """mutation createInviteToken($input: CreateInviteTokenInput!) {
|
|
createInviteToken(input: $input){
|
|
inviteToken
|
|
}
|
|
}""",
|
|
"variables": {"input": {}},
|
|
}
|
|
|
|
get_invite_token_response = admin_session.post(
|
|
f"{get_frontend_url()}/api/v2/graphql", json=get_invite_token_json
|
|
)
|
|
get_invite_token_response.raise_for_status()
|
|
get_invite_token_res_data = get_invite_token_response.json()
|
|
|
|
assert get_invite_token_res_data
|
|
assert get_invite_token_res_data["data"]
|
|
invite_token = get_invite_token_res_data["data"]["createInviteToken"][
|
|
"inviteToken"
|
|
]
|
|
assert invite_token is not None
|
|
assert "error" not in invite_token
|
|
|
|
# Pass the invite token when creating the user
|
|
sign_up_json = {
|
|
"fullName": "Test User",
|
|
"email": "user",
|
|
"password": "user",
|
|
"title": "Data Engineer",
|
|
"inviteToken": invite_token,
|
|
}
|
|
|
|
sign_up_response = admin_session.post(
|
|
f"{get_frontend_url()}/signUp", json=sign_up_json
|
|
)
|
|
sign_up_response.raise_for_status()
|
|
assert sign_up_response
|
|
assert "error" not in sign_up_response
|
|
# Sleep for eventual consistency
|
|
wait_for_writes_to_sync(
|
|
consumer_group="datahub-usage-event-consumer-job-client"
|
|
)
|
|
|
|
# signUp will override the session cookie to the new user to be signed up.
|
|
admin_session.cookies.clear()
|
|
admin_session = login_as(admin_user, admin_pass)
|
|
|
|
# Make user created user is there.
|
|
res_data = listUsers(admin_session)
|
|
assert res_data["data"]
|
|
assert res_data["data"]["listUsers"]
|
|
assert {"username": "user"} in res_data["data"]["listUsers"]["users"]
|
|
admin_session.cookies.clear()
|
|
|
|
yield
|
|
|
|
finally:
|
|
# Delete created user
|
|
admin_session = login_as(admin_user, admin_pass)
|
|
res_data = removeUser(admin_session, "urn:li:corpuser:user")
|
|
assert res_data
|
|
assert res_data["data"]
|
|
assert res_data["data"]["removeUser"] is True
|
|
# Sleep for eventual consistency
|
|
wait_for_writes_to_sync(
|
|
consumer_group="datahub-usage-event-consumer-job-client"
|
|
)
|
|
|
|
# Make user created user is not there.
|
|
res_data = listUsers(admin_session)
|
|
assert res_data["data"]
|
|
assert res_data["data"]["listUsers"]
|
|
assert {"username": "user"} not in res_data["data"]["listUsers"]["users"]
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def access_token_setup(auth_session, auth_exclude_filter):
|
|
"""Fixture to execute asserts before and after a test is run"""
|
|
admin_session = login_as(admin_user, admin_pass)
|
|
|
|
res_data = listAccessTokens(admin_session, filters=[auth_exclude_filter])
|
|
assert res_data
|
|
assert res_data["data"]
|
|
assert res_data["data"]["listAccessTokens"]["total"] == 0
|
|
assert not res_data["data"]["listAccessTokens"]["tokens"]
|
|
|
|
yield
|
|
|
|
# Clean up
|
|
res_data = listAccessTokens(admin_session, filters=[auth_exclude_filter])
|
|
for metadata in res_data["data"]["listAccessTokens"]["tokens"]:
|
|
revokeAccessToken(admin_session, metadata["id"])
|
|
|
|
|
|
def test_audit_token_events(auth_exclude_filter):
|
|
user_session = login_as("user", "user")
|
|
|
|
# Normal user should be able to generate token for himself.
|
|
res_data = generateAccessToken_v2(user_session, "urn:li:corpuser:user")
|
|
assert res_data
|
|
assert res_data["data"]
|
|
assert res_data["data"]["createAccessToken"]
|
|
assert res_data["data"]["createAccessToken"]["accessToken"]
|
|
assert (
|
|
res_data["data"]["createAccessToken"]["metadata"]["actorUrn"]
|
|
== "urn:li:corpuser:user"
|
|
)
|
|
user_token_id = res_data["data"]["createAccessToken"]["metadata"]["id"]
|
|
# Sleep for eventual consistency
|
|
wait_for_writes_to_sync(consumer_group="datahub-usage-event-consumer-job-client")
|
|
|
|
# User should be able to revoke his own token
|
|
res_data = revokeAccessToken(user_session, user_token_id)
|
|
assert res_data
|
|
assert res_data["data"]
|
|
assert res_data["data"]["revokeAccessToken"]
|
|
assert res_data["data"]["revokeAccessToken"] is True
|
|
wait_for_writes_to_sync(consumer_group="datahub-usage-event-consumer-job-client")
|
|
|
|
# Audit events for create & revoke should show
|
|
res_data = searchForAuditEvents(
|
|
user_session,
|
|
2,
|
|
[
|
|
"CreateAccessTokenEvent",
|
|
"RevokeAccessTokenEvent",
|
|
],
|
|
["urn:li:corpuser:user"],
|
|
[],
|
|
)
|
|
print(res_data)
|
|
assert res_data
|
|
assert res_data["usageEvents"]
|
|
assert len(res_data["usageEvents"]) == 2
|
|
assert res_data["usageEvents"][0]["eventType"] == "RevokeAccessTokenEvent"
|
|
assert (
|
|
res_data["usageEvents"][0]["entityUrn"]
|
|
== f"urn:li:dataHubAccessToken:{user_token_id}"
|
|
)
|
|
assert res_data["usageEvents"][1]["eventType"] == "CreateAccessTokenEvent"
|
|
assert (
|
|
res_data["usageEvents"][1]["entityUrn"]
|
|
== f"urn:li:dataHubAccessToken:{user_token_id}"
|
|
)
|
|
|
|
|
|
def test_login_events(auth_exclude_filter):
|
|
user_session = login_as("user", "user")
|
|
time.sleep(10)
|
|
|
|
# Audit events for create & revoke should show
|
|
res_data = searchForAuditEvents(
|
|
user_session,
|
|
1,
|
|
[
|
|
"LogInEvent",
|
|
],
|
|
["urn:li:corpuser:user"],
|
|
[],
|
|
)
|
|
print(res_data)
|
|
assert res_data
|
|
assert res_data["usageEvents"]
|
|
assert len(res_data["usageEvents"]) == 1
|
|
assert res_data["usageEvents"][0]["eventType"] == "LogInEvent"
|
|
assert res_data["usageEvents"][0]["loginSource"] == "PASSWORD_LOGIN"
|
|
user_session.cookies.clear()
|
|
|
|
|
|
def test_failed_login_events(auth_exclude_filter):
|
|
try:
|
|
user_session = login_as("user", "NOTMYPASSWORD")
|
|
except Exception:
|
|
pass
|
|
|
|
time.sleep(10)
|
|
user_session = login_as(admin_user, admin_pass)
|
|
wait_for_writes_to_sync(consumer_group="datahub-usage-event-consumer-job-client")
|
|
|
|
# Audit events for failed login should show, search for both to rule out any previous failures from any other tests
|
|
res_data = searchForAuditEvents(
|
|
user_session,
|
|
1,
|
|
["FailedLogInEvent", "LogInEvent"],
|
|
["urn:li:corpuser:user"],
|
|
[],
|
|
)
|
|
print(res_data)
|
|
assert res_data
|
|
assert res_data["usageEvents"]
|
|
assert len(res_data["usageEvents"]) == 1
|
|
assert res_data["usageEvents"][0]["eventType"] == "FailedLogInEvent"
|
|
assert res_data["usageEvents"][0]["loginSource"] == "PASSWORD_LOGIN"
|
|
user_session.cookies.clear()
|
|
|
|
|
|
def test_policy_events(auth_exclude_filter):
|
|
user_session = login_as(admin_user, admin_pass)
|
|
json = {
|
|
"query": """mutation createPolicy($input: PolicyUpdateInput!) {\n
|
|
createPolicy(input: $input) }""",
|
|
"variables": {
|
|
"input": {
|
|
"type": "METADATA",
|
|
"name": "Test Metadata Policy",
|
|
"description": "My Metadaata Policy",
|
|
"state": "ACTIVE",
|
|
"resources": {"type": "dataset", "allResources": True},
|
|
"privileges": ["EDIT_ENTITY_TAGS"],
|
|
"actors": {
|
|
"users": ["urn:li:corpuser:datahub", "urn:li:corpuser:admin"],
|
|
"resourceOwners": False,
|
|
"allUsers": False,
|
|
"allGroups": False,
|
|
},
|
|
}
|
|
},
|
|
}
|
|
|
|
response = user_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json)
|
|
response.raise_for_status()
|
|
res_data = response.json()
|
|
|
|
assert res_data
|
|
assert res_data["data"]
|
|
assert res_data["data"]["createPolicy"]
|
|
|
|
wait_for_writes_to_sync(consumer_group="datahub-usage-event-consumer-job-client")
|
|
|
|
new_urn = res_data["data"]["createPolicy"]
|
|
|
|
update_json = {
|
|
"query": """mutation updatePolicy($urn: String!, $input: PolicyUpdateInput!) {\n
|
|
updatePolicy(urn: $urn, input: $input) }""",
|
|
"variables": {
|
|
"urn": new_urn,
|
|
"input": {
|
|
"type": "METADATA",
|
|
"state": "ACTIVE",
|
|
"name": "Test Metadata Policy",
|
|
"description": "Updated Metadaata Policy",
|
|
"privileges": ["EDIT_ENTITY_TAGS", "EDIT_ENTITY_GLOSSARY_TERMS"],
|
|
"actors": {
|
|
"resourceOwners": False,
|
|
"allUsers": True,
|
|
"allGroups": False,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
response = user_session.post(
|
|
f"{get_frontend_url()}/api/v2/graphql", json=update_json
|
|
)
|
|
response.raise_for_status()
|
|
res_data = response.json()
|
|
|
|
# Check updated was submitted successfully
|
|
assert res_data
|
|
assert res_data["data"]
|
|
assert res_data["data"]["updatePolicy"]
|
|
assert res_data["data"]["updatePolicy"] == new_urn
|
|
|
|
wait_for_writes_to_sync(consumer_group="datahub-usage-event-consumer-job-client")
|
|
res_data = searchForAuditEvents(
|
|
user_session,
|
|
3,
|
|
["CreatePolicyEvent", "UpdatePolicyEvent"],
|
|
["urn:li:corpuser:datahub", "urn:li:corpuser:admin"],
|
|
[],
|
|
)
|
|
print(res_data)
|
|
assert res_data
|
|
assert res_data["usageEvents"]
|
|
assert len(res_data["usageEvents"]) == 3 or len(res_data["usageEvents"]) == 2
|
|
assert (
|
|
res_data["usageEvents"][0]["eventType"] == "CreatePolicyEvent"
|
|
or res_data["usageEvents"][0]["eventType"] == "UpdatePolicyEvent"
|
|
)
|
|
assert res_data["usageEvents"][0]["entityUrn"] == new_urn
|
|
assert (
|
|
res_data["usageEvents"][1]["eventType"] == "CreatePolicyEvent"
|
|
or res_data["usageEvents"][1]["eventType"] == "UpdatePolicyEvent"
|
|
)
|
|
assert res_data["usageEvents"][1]["entityUrn"] == new_urn
|
|
if len(res_data["usageEvents"]) == 3:
|
|
assert (
|
|
res_data["usageEvents"][2]["eventType"] == "CreatePolicyEvent"
|
|
or res_data["usageEvents"][2]["eventType"] == "UpdatePolicyEvent"
|
|
)
|
|
assert res_data["usageEvents"][2]["entityUrn"] == new_urn
|
|
user_session.cookies.clear()
|
|
|
|
|
|
def test_ingestion_source_events(auth_exclude_filter):
|
|
user_session = login_as(admin_user, admin_pass)
|
|
json = {
|
|
"query": """mutation createIngestionSource($input: UpdateIngestionSourceInput!) {\n
|
|
createIngestionSource(input: $input)\n}""",
|
|
"variables": {
|
|
"input": {
|
|
"type": "snowflake",
|
|
"name": "test",
|
|
"config": {
|
|
"recipe": """{\"source\":{\"type\":\"snowflake\",\"config\":{
|
|
\"account_id\":null,
|
|
\"include_table_lineage\":true,
|
|
\"include_view_lineage\":true,
|
|
\"include_tables\":true,
|
|
\"include_views\":true,
|
|
\"profiling\":{\"enabled\":true,\"profile_table_level_only\":true},
|
|
\"stateful_ingestion\":{\"enabled\":true}}}}""",
|
|
"executorId": "default",
|
|
"debugMode": False,
|
|
"extraArgs": [],
|
|
},
|
|
}
|
|
},
|
|
}
|
|
response = user_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json)
|
|
response.raise_for_status()
|
|
res_data = response.json()
|
|
assert res_data["data"]["createIngestionSource"]
|
|
ingestion_source_urn = res_data["data"]["createIngestionSource"]
|
|
|
|
json = {
|
|
"query": """mutation updateIngestionSource($urn: String!, $input: UpdateIngestionSourceInput!) {\n
|
|
updateIngestionSource(urn: $urn, input: $input)\n}""",
|
|
"variables": {
|
|
"urn": ingestion_source_urn,
|
|
"input": {
|
|
"type": "snowflake",
|
|
"name": "test updated",
|
|
"config": {
|
|
"recipe": """{\"source\":{\"type\":\"snowflake\",\"config\":{
|
|
\"account_id\":null,
|
|
\"include_table_lineage\":true,
|
|
\"include_view_lineage\":true,
|
|
\"include_tables\":true,
|
|
\"include_views\":true,
|
|
\"profiling\":{\"enabled\":true,\"profile_table_level_only\":true},
|
|
\"stateful_ingestion\":{\"enabled\":true}}}}""",
|
|
"executorId": "default",
|
|
"debugMode": False,
|
|
"extraArgs": [],
|
|
},
|
|
},
|
|
},
|
|
}
|
|
response = user_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json)
|
|
response.raise_for_status()
|
|
res_data = response.json()
|
|
assert res_data["data"]["updateIngestionSource"]
|
|
wait_for_writes_to_sync(consumer_group="datahub-usage-event-consumer-job-client")
|
|
|
|
res_data = searchForAuditEvents(
|
|
user_session,
|
|
2,
|
|
["CreateIngestionSourceEvent", "UpdateIngestionSourceEvent"],
|
|
["urn:li:corpuser:datahub", "urn:li:corpuser:admin"],
|
|
[],
|
|
)
|
|
print(res_data)
|
|
assert res_data
|
|
assert res_data["usageEvents"]
|
|
assert len(res_data["usageEvents"]) == 2
|
|
assert res_data["usageEvents"][0]["eventType"] == "UpdateIngestionSourceEvent"
|
|
assert res_data["usageEvents"][0]["entityUrn"] == ingestion_source_urn
|
|
assert res_data["usageEvents"][1]["eventType"] == "CreateIngestionSourceEvent"
|
|
assert res_data["usageEvents"][1]["entityUrn"] == ingestion_source_urn
|
|
user_session.cookies.clear()
|
|
|
|
|
|
def test_user_events(auth_exclude_filter):
|
|
user_session = login_as(admin_user, admin_pass)
|
|
wait_for_writes_to_sync(consumer_group="datahub-usage-event-consumer-job-client")
|
|
|
|
# TODO: This feels wrong, sign up link should have the user who created the sign up link as the actor, but this
|
|
# requires a fairly significant refactor that's not in scope right now
|
|
res_data = searchForAuditEvents(
|
|
user_session,
|
|
4,
|
|
["CreateUserEvent", "UpdateUserEvent"],
|
|
["urn:li:corpuser:__datahub_system"],
|
|
["corpUserKey", "corpUserInfo", "corpUserStatus", "corpUserCredentials"],
|
|
)
|
|
print(res_data)
|
|
assert len(res_data["usageEvents"]) == 4
|
|
assert res_data["usageEvents"][0]["eventType"] == "UpdateUserEvent"
|
|
assert res_data["usageEvents"][0]["entityUrn"] == "urn:li:corpuser:user"
|
|
# Credentials and settings are in random order due to async
|
|
assert res_data["usageEvents"][0]["aspectName"] == "corpUserCredentials"
|
|
|
|
assert res_data["usageEvents"][1]["eventType"] == "UpdateUserEvent"
|
|
assert res_data["usageEvents"][1]["entityUrn"] == "urn:li:corpuser:user"
|
|
assert res_data["usageEvents"][1]["aspectName"] == "corpUserStatus"
|
|
|
|
# These get created at the same time
|
|
assert (
|
|
res_data["usageEvents"][2]["eventType"] == "UpdateUserEvent"
|
|
or res_data["usageEvents"][2]["eventType"] == "CreateUserEvent"
|
|
)
|
|
assert res_data["usageEvents"][2]["entityUrn"] == "urn:li:corpuser:user"
|
|
assert (
|
|
res_data["usageEvents"][2]["aspectName"] == "corpUserInfo"
|
|
or res_data["usageEvents"][2]["aspectName"] == "corpUserKey"
|
|
)
|
|
|
|
assert (
|
|
res_data["usageEvents"][3]["eventType"] == "UpdateUserEvent"
|
|
or res_data["usageEvents"][3]["eventType"] == "CreateUserEvent"
|
|
)
|
|
assert res_data["usageEvents"][3]["entityUrn"] == "urn:li:corpuser:user"
|
|
assert (
|
|
res_data["usageEvents"][3]["aspectName"] == "corpUserInfo"
|
|
or res_data["usageEvents"][3]["aspectName"] == "corpUserKey"
|
|
)
|
|
user_session.cookies.clear()
|
|
|
|
|
|
def generateAccessToken_v2(session, actorUrn):
|
|
# Create new token
|
|
json = {
|
|
"query": """mutation createAccessToken($input: CreateAccessTokenInput!) {
|
|
createAccessToken(input: $input) {
|
|
accessToken
|
|
metadata {
|
|
id
|
|
actorUrn
|
|
ownerUrn
|
|
name
|
|
description
|
|
}
|
|
}
|
|
}""",
|
|
"variables": {
|
|
"input": {
|
|
"type": "PERSONAL",
|
|
"actorUrn": actorUrn,
|
|
"duration": "ONE_HOUR",
|
|
"name": "my token",
|
|
}
|
|
},
|
|
}
|
|
|
|
response = session.post(f"{get_frontend_url()}/api/v2/graphql", json=json)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
|
|
def listAccessTokens(session, filters):
|
|
if filters is None:
|
|
filters = []
|
|
# Get count of existing tokens
|
|
input = {"start": 0, "count": 20}
|
|
|
|
if filters:
|
|
input["filters"] = filters
|
|
|
|
json = {
|
|
"query": """query listAccessTokens($input: ListAccessTokenInput!) {
|
|
listAccessTokens(input: $input) {
|
|
start
|
|
count
|
|
total
|
|
tokens {
|
|
urn
|
|
id
|
|
actorUrn
|
|
ownerUrn
|
|
}
|
|
}
|
|
}""",
|
|
"variables": {"input": input},
|
|
}
|
|
|
|
response = session.post(f"{get_frontend_url()}/api/v2/graphql", json=json)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
|
|
def revokeAccessToken(session, tokenId):
|
|
# Revoke token
|
|
json = {
|
|
"query": """mutation revokeAccessToken($tokenId: String!) {
|
|
revokeAccessToken(tokenId: $tokenId)
|
|
}""",
|
|
"variables": {"tokenId": tokenId},
|
|
}
|
|
|
|
response = session.post(f"{get_frontend_url()}/api/v2/graphql", json=json)
|
|
response.raise_for_status()
|
|
|
|
return response.json()
|
|
|
|
|
|
def searchForAuditEvents(
|
|
session, size, event_types: List, user_urns: List, aspect_names: List
|
|
):
|
|
json = {
|
|
"eventTypes": event_types,
|
|
"actorUrns": user_urns,
|
|
"aspectTypes": aspect_names,
|
|
}
|
|
response = session.post(
|
|
f"{get_frontend_url()}/openapi/v1/events/audit/search?size={size}", json=json
|
|
)
|
|
response.raise_for_status()
|
|
|
|
return response.json()
|