datahub/smoke-test/tests/audit_events/audit_events_test.py
2025-05-09 11:34:27 -05:00

553 lines
19 KiB
Python

import os
import time
from typing import List
import pytest
from tests.tokens.token_utils import listUsers, removeUser
from tests.utils import (
get_admin_credentials,
get_frontend_url,
login_as,
wait_for_writes_to_sync,
)
pytestmark = pytest.mark.no_cypress_suite1
# Disable telemetry
os.environ["DATAHUB_TELEMETRY_ENABLED"] = "false"
(admin_user, admin_pass) = get_admin_credentials()
@pytest.fixture()
def auth_exclude_filter():
return {
"field": "name",
"condition": "EQUAL",
"negated": True,
"values": ["Test Session Token"],
}
@pytest.fixture(scope="class", autouse=True)
def custom_user_setup():
admin_session = login_as(admin_user, admin_pass)
try:
"""Fixture to execute setup before and tear down after all tests are run"""
res_data = removeUser(admin_session, "urn:li:corpuser:user")
assert res_data
assert "error" not in res_data
# Test getting the invite token
get_invite_token_json = {
"query": """mutation createInviteToken($input: CreateInviteTokenInput!) {
createInviteToken(input: $input){
inviteToken
}
}""",
"variables": {"input": {}},
}
get_invite_token_response = admin_session.post(
f"{get_frontend_url()}/api/v2/graphql", json=get_invite_token_json
)
get_invite_token_response.raise_for_status()
get_invite_token_res_data = get_invite_token_response.json()
assert get_invite_token_res_data
assert get_invite_token_res_data["data"]
invite_token = get_invite_token_res_data["data"]["createInviteToken"][
"inviteToken"
]
assert invite_token is not None
assert "error" not in invite_token
# Pass the invite token when creating the user
sign_up_json = {
"fullName": "Test User",
"email": "user",
"password": "user",
"title": "Data Engineer",
"inviteToken": invite_token,
}
sign_up_response = admin_session.post(
f"{get_frontend_url()}/signUp", json=sign_up_json
)
sign_up_response.raise_for_status()
assert sign_up_response
assert "error" not in sign_up_response
# Sleep for eventual consistency
wait_for_writes_to_sync(
consumer_group="datahub-usage-event-consumer-job-client"
)
# signUp will override the session cookie to the new user to be signed up.
admin_session.cookies.clear()
admin_session = login_as(admin_user, admin_pass)
# Make user created user is there.
res_data = listUsers(admin_session)
assert res_data["data"]
assert res_data["data"]["listUsers"]
assert {"username": "user"} in res_data["data"]["listUsers"]["users"]
admin_session.cookies.clear()
yield
finally:
# Delete created user
admin_session = login_as(admin_user, admin_pass)
res_data = removeUser(admin_session, "urn:li:corpuser:user")
assert res_data
assert res_data["data"]
assert res_data["data"]["removeUser"] is True
# Sleep for eventual consistency
wait_for_writes_to_sync(
consumer_group="datahub-usage-event-consumer-job-client"
)
# Make user created user is not there.
res_data = listUsers(admin_session)
assert res_data["data"]
assert res_data["data"]["listUsers"]
assert {"username": "user"} not in res_data["data"]["listUsers"]["users"]
@pytest.fixture(autouse=True)
def access_token_setup(auth_session, auth_exclude_filter):
"""Fixture to execute asserts before and after a test is run"""
admin_session = login_as(admin_user, admin_pass)
res_data = listAccessTokens(admin_session, filters=[auth_exclude_filter])
assert res_data
assert res_data["data"]
assert res_data["data"]["listAccessTokens"]["total"] == 0
assert not res_data["data"]["listAccessTokens"]["tokens"]
yield
# Clean up
res_data = listAccessTokens(admin_session, filters=[auth_exclude_filter])
for metadata in res_data["data"]["listAccessTokens"]["tokens"]:
revokeAccessToken(admin_session, metadata["id"])
def test_audit_token_events(auth_exclude_filter):
user_session = login_as("user", "user")
# Normal user should be able to generate token for himself.
res_data = generateAccessToken_v2(user_session, "urn:li:corpuser:user")
assert res_data
assert res_data["data"]
assert res_data["data"]["createAccessToken"]
assert res_data["data"]["createAccessToken"]["accessToken"]
assert (
res_data["data"]["createAccessToken"]["metadata"]["actorUrn"]
== "urn:li:corpuser:user"
)
user_token_id = res_data["data"]["createAccessToken"]["metadata"]["id"]
# Sleep for eventual consistency
wait_for_writes_to_sync(consumer_group="datahub-usage-event-consumer-job-client")
# User should be able to revoke his own token
res_data = revokeAccessToken(user_session, user_token_id)
assert res_data
assert res_data["data"]
assert res_data["data"]["revokeAccessToken"]
assert res_data["data"]["revokeAccessToken"] is True
wait_for_writes_to_sync(consumer_group="datahub-usage-event-consumer-job-client")
# Audit events for create & revoke should show
res_data = searchForAuditEvents(
user_session,
2,
[
"CreateAccessTokenEvent",
"RevokeAccessTokenEvent",
],
["urn:li:corpuser:user"],
[],
)
print(res_data)
assert res_data
assert res_data["usageEvents"]
assert len(res_data["usageEvents"]) == 2
assert res_data["usageEvents"][0]["eventType"] == "RevokeAccessTokenEvent"
assert (
res_data["usageEvents"][0]["entityUrn"]
== f"urn:li:dataHubAccessToken:{user_token_id}"
)
assert res_data["usageEvents"][1]["eventType"] == "CreateAccessTokenEvent"
assert (
res_data["usageEvents"][1]["entityUrn"]
== f"urn:li:dataHubAccessToken:{user_token_id}"
)
def test_login_events(auth_exclude_filter):
user_session = login_as("user", "user")
time.sleep(10)
# Audit events for create & revoke should show
res_data = searchForAuditEvents(
user_session,
1,
[
"LogInEvent",
],
["urn:li:corpuser:user"],
[],
)
print(res_data)
assert res_data
assert res_data["usageEvents"]
assert len(res_data["usageEvents"]) == 1
assert res_data["usageEvents"][0]["eventType"] == "LogInEvent"
assert res_data["usageEvents"][0]["loginSource"] == "PASSWORD_LOGIN"
user_session.cookies.clear()
def test_failed_login_events(auth_exclude_filter):
try:
user_session = login_as("user", "NOTMYPASSWORD")
except Exception:
pass
time.sleep(10)
user_session = login_as(admin_user, admin_pass)
wait_for_writes_to_sync(consumer_group="datahub-usage-event-consumer-job-client")
# Audit events for failed login should show, search for both to rule out any previous failures from any other tests
res_data = searchForAuditEvents(
user_session,
1,
["FailedLogInEvent", "LogInEvent"],
["urn:li:corpuser:user"],
[],
)
print(res_data)
assert res_data
assert res_data["usageEvents"]
assert len(res_data["usageEvents"]) == 1
assert res_data["usageEvents"][0]["eventType"] == "FailedLogInEvent"
assert res_data["usageEvents"][0]["loginSource"] == "PASSWORD_LOGIN"
user_session.cookies.clear()
def test_policy_events(auth_exclude_filter):
user_session = login_as(admin_user, admin_pass)
json = {
"query": """mutation createPolicy($input: PolicyUpdateInput!) {\n
createPolicy(input: $input) }""",
"variables": {
"input": {
"type": "METADATA",
"name": "Test Metadata Policy",
"description": "My Metadaata Policy",
"state": "ACTIVE",
"resources": {"type": "dataset", "allResources": True},
"privileges": ["EDIT_ENTITY_TAGS"],
"actors": {
"users": ["urn:li:corpuser:datahub", "urn:li:corpuser:admin"],
"resourceOwners": False,
"allUsers": False,
"allGroups": False,
},
}
},
}
response = user_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["createPolicy"]
wait_for_writes_to_sync(consumer_group="datahub-usage-event-consumer-job-client")
new_urn = res_data["data"]["createPolicy"]
update_json = {
"query": """mutation updatePolicy($urn: String!, $input: PolicyUpdateInput!) {\n
updatePolicy(urn: $urn, input: $input) }""",
"variables": {
"urn": new_urn,
"input": {
"type": "METADATA",
"state": "ACTIVE",
"name": "Test Metadata Policy",
"description": "Updated Metadaata Policy",
"privileges": ["EDIT_ENTITY_TAGS", "EDIT_ENTITY_GLOSSARY_TERMS"],
"actors": {
"resourceOwners": False,
"allUsers": True,
"allGroups": False,
},
},
},
}
response = user_session.post(
f"{get_frontend_url()}/api/v2/graphql", json=update_json
)
response.raise_for_status()
res_data = response.json()
# Check updated was submitted successfully
assert res_data
assert res_data["data"]
assert res_data["data"]["updatePolicy"]
assert res_data["data"]["updatePolicy"] == new_urn
wait_for_writes_to_sync(consumer_group="datahub-usage-event-consumer-job-client")
res_data = searchForAuditEvents(
user_session,
3,
["CreatePolicyEvent", "UpdatePolicyEvent"],
["urn:li:corpuser:datahub", "urn:li:corpuser:admin"],
[],
)
print(res_data)
assert res_data
assert res_data["usageEvents"]
assert len(res_data["usageEvents"]) == 3 or len(res_data["usageEvents"]) == 2
assert (
res_data["usageEvents"][0]["eventType"] == "CreatePolicyEvent"
or res_data["usageEvents"][0]["eventType"] == "UpdatePolicyEvent"
)
assert res_data["usageEvents"][0]["entityUrn"] == new_urn
assert (
res_data["usageEvents"][1]["eventType"] == "CreatePolicyEvent"
or res_data["usageEvents"][1]["eventType"] == "UpdatePolicyEvent"
)
assert res_data["usageEvents"][1]["entityUrn"] == new_urn
if len(res_data["usageEvents"]) == 3:
assert (
res_data["usageEvents"][2]["eventType"] == "CreatePolicyEvent"
or res_data["usageEvents"][2]["eventType"] == "UpdatePolicyEvent"
)
assert res_data["usageEvents"][2]["entityUrn"] == new_urn
user_session.cookies.clear()
def test_ingestion_source_events(auth_exclude_filter):
user_session = login_as(admin_user, admin_pass)
json = {
"query": """mutation createIngestionSource($input: UpdateIngestionSourceInput!) {\n
createIngestionSource(input: $input)\n}""",
"variables": {
"input": {
"type": "snowflake",
"name": "test",
"config": {
"recipe": """{\"source\":{\"type\":\"snowflake\",\"config\":{
\"account_id\":null,
\"include_table_lineage\":true,
\"include_view_lineage\":true,
\"include_tables\":true,
\"include_views\":true,
\"profiling\":{\"enabled\":true,\"profile_table_level_only\":true},
\"stateful_ingestion\":{\"enabled\":true}}}}""",
"executorId": "default",
"debugMode": False,
"extraArgs": [],
},
}
},
}
response = user_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
assert res_data["data"]["createIngestionSource"]
ingestion_source_urn = res_data["data"]["createIngestionSource"]
json = {
"query": """mutation updateIngestionSource($urn: String!, $input: UpdateIngestionSourceInput!) {\n
updateIngestionSource(urn: $urn, input: $input)\n}""",
"variables": {
"urn": ingestion_source_urn,
"input": {
"type": "snowflake",
"name": "test updated",
"config": {
"recipe": """{\"source\":{\"type\":\"snowflake\",\"config\":{
\"account_id\":null,
\"include_table_lineage\":true,
\"include_view_lineage\":true,
\"include_tables\":true,
\"include_views\":true,
\"profiling\":{\"enabled\":true,\"profile_table_level_only\":true},
\"stateful_ingestion\":{\"enabled\":true}}}}""",
"executorId": "default",
"debugMode": False,
"extraArgs": [],
},
},
},
}
response = user_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
res_data = response.json()
assert res_data["data"]["updateIngestionSource"]
wait_for_writes_to_sync(consumer_group="datahub-usage-event-consumer-job-client")
res_data = searchForAuditEvents(
user_session,
2,
["CreateIngestionSourceEvent", "UpdateIngestionSourceEvent"],
["urn:li:corpuser:datahub", "urn:li:corpuser:admin"],
[],
)
print(res_data)
assert res_data
assert res_data["usageEvents"]
assert len(res_data["usageEvents"]) == 2
assert res_data["usageEvents"][0]["eventType"] == "UpdateIngestionSourceEvent"
assert res_data["usageEvents"][0]["entityUrn"] == ingestion_source_urn
assert res_data["usageEvents"][1]["eventType"] == "CreateIngestionSourceEvent"
assert res_data["usageEvents"][1]["entityUrn"] == ingestion_source_urn
user_session.cookies.clear()
def test_user_events(auth_exclude_filter):
user_session = login_as(admin_user, admin_pass)
wait_for_writes_to_sync(consumer_group="datahub-usage-event-consumer-job-client")
# TODO: This feels wrong, sign up link should have the user who created the sign up link as the actor, but this
# requires a fairly significant refactor that's not in scope right now
res_data = searchForAuditEvents(
user_session,
4,
["CreateUserEvent", "UpdateUserEvent"],
["urn:li:corpuser:__datahub_system"],
["corpUserKey", "corpUserInfo", "corpUserStatus", "corpUserCredentials"],
)
print(res_data)
assert len(res_data["usageEvents"]) == 4
assert res_data["usageEvents"][0]["eventType"] == "UpdateUserEvent"
assert res_data["usageEvents"][0]["entityUrn"] == "urn:li:corpuser:user"
# Credentials and settings are in random order due to async
assert res_data["usageEvents"][0]["aspectName"] == "corpUserCredentials"
assert res_data["usageEvents"][1]["eventType"] == "UpdateUserEvent"
assert res_data["usageEvents"][1]["entityUrn"] == "urn:li:corpuser:user"
assert res_data["usageEvents"][1]["aspectName"] == "corpUserStatus"
# These get created at the same time
assert (
res_data["usageEvents"][2]["eventType"] == "UpdateUserEvent"
or res_data["usageEvents"][2]["eventType"] == "CreateUserEvent"
)
assert res_data["usageEvents"][2]["entityUrn"] == "urn:li:corpuser:user"
assert (
res_data["usageEvents"][2]["aspectName"] == "corpUserInfo"
or res_data["usageEvents"][2]["aspectName"] == "corpUserKey"
)
assert (
res_data["usageEvents"][3]["eventType"] == "UpdateUserEvent"
or res_data["usageEvents"][3]["eventType"] == "CreateUserEvent"
)
assert res_data["usageEvents"][3]["entityUrn"] == "urn:li:corpuser:user"
assert (
res_data["usageEvents"][3]["aspectName"] == "corpUserInfo"
or res_data["usageEvents"][3]["aspectName"] == "corpUserKey"
)
user_session.cookies.clear()
def generateAccessToken_v2(session, actorUrn):
# Create new token
json = {
"query": """mutation createAccessToken($input: CreateAccessTokenInput!) {
createAccessToken(input: $input) {
accessToken
metadata {
id
actorUrn
ownerUrn
name
description
}
}
}""",
"variables": {
"input": {
"type": "PERSONAL",
"actorUrn": actorUrn,
"duration": "ONE_HOUR",
"name": "my token",
}
},
}
response = session.post(f"{get_frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
return response.json()
def listAccessTokens(session, filters):
if filters is None:
filters = []
# Get count of existing tokens
input = {"start": 0, "count": 20}
if filters:
input["filters"] = filters
json = {
"query": """query listAccessTokens($input: ListAccessTokenInput!) {
listAccessTokens(input: $input) {
start
count
total
tokens {
urn
id
actorUrn
ownerUrn
}
}
}""",
"variables": {"input": input},
}
response = session.post(f"{get_frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
return response.json()
def revokeAccessToken(session, tokenId):
# Revoke token
json = {
"query": """mutation revokeAccessToken($tokenId: String!) {
revokeAccessToken(tokenId: $tokenId)
}""",
"variables": {"tokenId": tokenId},
}
response = session.post(f"{get_frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
return response.json()
def searchForAuditEvents(
session, size, event_types: List, user_urns: List, aspect_names: List
):
json = {
"eventTypes": event_types,
"actorUrns": user_urns,
"aspectTypes": aspect_names,
}
response = session.post(
f"{get_frontend_url()}/openapi/v1/events/audit/search?size={size}", json=json
)
response.raise_for_status()
return response.json()