datahub/smoke-test/tests/incidents/incidents_test.py
John Joyce 0480df887c
chore(): Bring latest incidents data model + GraphQL to OSS (#12671)
Co-authored-by: John Joyce <john@ip-192-168-1-200.us-west-2.compute.internal>
Co-authored-by: John Joyce <john@Johns-MBP.lan>
Co-authored-by: John Joyce <john@Johns-MBP-306.lan>
Co-authored-by: John Joyce <john@Johns-MBP-314.lan>
Co-authored-by: John Joyce <john@Johns-MBP-325.lan>
Co-authored-by: jayacryl <159848059+jayacryl@users.noreply.github.com>
Co-authored-by: John Joyce <john@Johns-MacBook-Pro.local>
Co-authored-by: John Joyce <john@Mac-4380.lan>
Co-authored-by: John Joyce <john@Mac-4387.lan>
Co-authored-by: John Joyce <john@Mac-4610.lan>
Co-authored-by: John Joyce <john@Mac-4614.lan>
2025-02-20 17:28:47 -08:00

239 lines
7.0 KiB
Python

import time
import pytest
from tests.utils import delete_urns_from_file, ingest_file_via_rest
@pytest.fixture(scope="module", autouse=True)
def ingest_cleanup_data(auth_session, graph_client, request):
print("ingesting incidents test data")
ingest_file_via_rest(auth_session, "tests/incidents/data.json")
yield
print("removing incidents test data")
delete_urns_from_file(graph_client, "tests/incidents/data.json")
TEST_DATASET_URN = (
"urn:li:dataset:(urn:li:dataPlatform:kafka,incidents-sample-dataset,PROD)"
)
TEST_INCIDENT_URN = "urn:li:incident:test"
@pytest.mark.dependency()
def test_list_dataset_incidents(auth_session):
# Sleep for eventual consistency (not ideal)
time.sleep(2)
list_dataset_incidents_json = {
"query": """query dataset($urn: String!) {\n
dataset(urn: $urn) {\n
incidents(state: ACTIVE, start: 0, count: 10) {\n
start\n
count\n
total\n
incidents {\n
urn\n
type\n
incidentType\n
title\n
description\n
status {\n
state\n
message\n
lastUpdated {\n
time\n
actor\n
}\n
}\n
source {\n
type\n
source {\n
... on Assertion {\n
urn\n
info {\n
type
}\n
}\n
}\n
}\n
entity {\n
urn\n
}\n
created {\n
time\n
actor\n
}\n
}\n
}\n
}\n
}""",
"variables": {"urn": TEST_DATASET_URN},
}
response = auth_session.post(
f"{auth_session.frontend_url()}/api/v2/graphql",
json=list_dataset_incidents_json,
)
response.raise_for_status()
res_data = response.json()
assert res_data
assert "errors" not in res_data
assert res_data["data"]
assert res_data["data"]["dataset"]["incidents"] == {
"start": 0,
"count": 10,
"total": 1,
"incidents": [
{
"urn": TEST_INCIDENT_URN,
"type": "INCIDENT",
"incidentType": "OPERATIONAL",
"title": "test title",
"description": "test description",
"status": {
"state": "ACTIVE",
"message": None,
"lastUpdated": {"time": 0, "actor": "urn:li:corpuser:admin"},
},
"source": {
"type": "MANUAL",
"source": None,
},
"entity": {"urn": TEST_DATASET_URN},
"created": {"time": 0, "actor": "urn:li:corpuser:admin"},
}
],
}
@pytest.mark.dependency(depends=["test_list_dataset_incidents"])
def test_raise_resolve_incident(auth_session):
# Raise new incident
raise_incident_json = {
"query": """mutation raiseIncident($input: RaiseIncidentInput!) {\n
raiseIncident(input: $input)
}""",
"variables": {
"input": {
"type": "OPERATIONAL",
"title": "test title 2",
"description": "test description 2",
"resourceUrn": TEST_DATASET_URN,
"priority": "CRITICAL",
}
},
}
response = auth_session.post(
f"{auth_session.frontend_url()}/api/v2/graphql", json=raise_incident_json
)
response.raise_for_status()
res_data = response.json()
assert res_data
assert "errors" not in res_data
assert res_data["data"]
assert res_data["data"]["raiseIncident"] is not None
new_incident_urn = res_data["data"]["raiseIncident"]
# Resolve the incident.
update_incident_status = {
"query": """mutation updateIncidentStatus($urn: String!, $input: IncidentStatusInput!) {\n
updateIncidentStatus(urn: $urn, input: $input)
}""",
"variables": {
"urn": new_incident_urn,
"input": {
"state": "RESOLVED",
"message": "test message 2",
},
},
}
response = auth_session.post(
f"{auth_session.frontend_url()}/api/v2/graphql", json=update_incident_status
)
response.raise_for_status()
res_data = response.json()
assert res_data
assert "errors" not in res_data
assert res_data["data"]
assert res_data["data"]["updateIncidentStatus"] is True
# Sleep for eventual consistency (not ideal)
time.sleep(2)
# Fetch the dataset's incidents to confirm there's a resolved incident.new_incident_urn
list_dataset_incidents_json = {
"query": """query dataset($urn: String!) {\n
dataset(urn: $urn) {\n
incidents(state: RESOLVED, start: 0, count: 10) {\n
start\n
count\n
total\n
incidents {\n
urn\n
type\n
incidentType\n
title\n
description\n
priority\n
status {\n
state\n
message\n
lastUpdated {\n
time\n
actor\n
}\n
}\n
entity {\n
urn\n
}\n
created {\n
time\n
actor\n
}\n
}\n
}\n
}\n
}""",
"variables": {"urn": TEST_DATASET_URN},
}
response = auth_session.post(
f"{auth_session.frontend_url()}/api/v2/graphql",
json=list_dataset_incidents_json,
)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["dataset"]["incidents"]["total"] is not None
assert "errors" not in res_data
# Find the new incident and do the comparison.
active_incidents = res_data["data"]["dataset"]["incidents"]["incidents"]
filtered_incidents = list(
filter(lambda incident: incident["urn"] == new_incident_urn, active_incidents)
)
assert len(filtered_incidents) == 1
new_incident = filtered_incidents[0]
assert new_incident["title"] == "test title 2"
assert new_incident["description"] == "test description 2"
assert new_incident["status"]["state"] == "RESOLVED"
assert new_incident["priority"] == "CRITICAL"
delete_json = {"urn": new_incident_urn}
# Cleanup: Delete the incident
response = auth_session.post(
f"{auth_session.gms_url()}/entities?action=delete", json=delete_json
)
response.raise_for_status()