datahub/smoke-test/tests/trace/test_api_trace.py

487 lines
16 KiB
Python
Raw Permalink Normal View History

import time
import pytest
from opensearchpy import OpenSearch
from tests.utils import delete_urns, wait_for_writes_to_sync
es = OpenSearch(["http://localhost:9200"])
generated_urns = {
"apiTraceHappyPath": "urn:li:dataset:(urn:li:dataPlatform:test,apiTraceHappyPath,PROD)",
"apiTraceMCPFail": "urn:li:dataset:(urn:li:dataPlatform:test,apiTraceMCPFail,PROD)",
"apiTraceDroppedElasticsearch": "urn:li:dataset:(urn:li:dataPlatform:test,apiTraceDroppedElasticsearch,PROD)",
"apiTraceOverwritten": "urn:li:dataset:(urn:li:dataPlatform:test,apiTraceOverwritten,PROD)",
"apiTraceTimeseries": "urn:li:dataset:(urn:li:dataPlatform:test,apiTraceTimeseries,PROD)",
"apiTraceNoop": "urn:li:dataset:(urn:li:dataPlatform:test,apiTraceNoop,PROD)",
"apiTraceNoopWithFMCP": "urn:li:dataset:(urn:li:dataPlatform:test,apiTraceNoopWithFMCP,PROD)",
}
@pytest.fixture(scope="module", autouse=True)
def test_setup(graph_client):
"""Fixture to clean-up urns before and after a test is run"""
print("removing previous test data")
delete_urns(graph_client, list(generated_urns.values()))
wait_for_writes_to_sync()
yield
print("removing generated test data")
delete_urns(graph_client, list(generated_urns.values()))
wait_for_writes_to_sync()
def test_successful_async_write(auth_session):
urn = generated_urns["apiTraceHappyPath"]
aspect_name = "status"
resp = auth_session.post(
f"{auth_session.gms_url()}/openapi/v3/entity/dataset",
params={"async": "true", "systemMetadata": "true"},
json=[{"urn": urn, aspect_name: {"value": {"removed": False}}}],
)
trace_id = compare_trace_header_system_metadata(
resp, resp.json()[0][aspect_name]["systemMetadata"]
)
wait_for_writes_to_sync()
resp = auth_session.post(
f"{auth_session.gms_url()}/openapi/v1/trace/write/{trace_id}",
params={"onlyIncludeErrors": "false", "detailed": "true"},
json={urn: [aspect_name]},
)
assert resp.json() == {
urn: {
aspect_name: {
"success": True,
"primaryStorage": {"writeStatus": "ACTIVE_STATE"},
"searchStorage": {"writeStatus": "ACTIVE_STATE"},
}
}
}
def test_mcp_fail_aspect_async_write(auth_session):
urn = generated_urns["apiTraceMCPFail"]
aspect_name = "glossaryTerms"
resp = auth_session.post(
f"{auth_session.gms_url()}/openapi/v3/entity/dataset/{urn}/{aspect_name}",
params={"async": "true", "systemMetadata": "true"},
json={
"value": {
"terms": [{"urn": "urn:li:glossaryTerm:someTerm"}],
"auditStamp": {"time": 0, "actor": "urn:li:corpuser:datahub"},
},
"headers": {"If-Version-Match": "-10000"},
},
)
trace_id = compare_trace_header_system_metadata(
resp, resp.json()[aspect_name]["systemMetadata"]
)
wait_for_writes_to_sync()
resp = auth_session.post(
f"{auth_session.gms_url()}/openapi/v1/trace/write/{trace_id}",
params={"onlyIncludeErrors": "false", "detailed": "true", "skipCache": "true"},
json={urn: [aspect_name]},
)
assert resp.json()[urn][aspect_name]["success"] is False
assert resp.json()[urn][aspect_name]["primaryStorage"]["writeStatus"] == "ERROR"
assert (
resp.json()[urn][aspect_name]["primaryStorage"]["writeExceptions"][0]["message"]
== "Expected version -10000, actual version -1"
)
assert resp.json()[urn][aspect_name]["searchStorage"] == {
"writeStatus": "ERROR",
"writeMessage": "Primary storage write failed.",
}
def test_overwritten_async_write(auth_session):
urn = generated_urns["apiTraceOverwritten"]
aspect_name = "datasetProperties"
resp = auth_session.post(
f"{auth_session.gms_url()}/openapi/v3/entity/dataset",
params={"async": "true", "systemMetadata": "true"},
json=[
{
"urn": urn,
aspect_name: {
"value": {"name": "original", "customProperties": {}, "tags": []}
},
}
],
)
original_trace_id = compare_trace_header_system_metadata(
resp, resp.json()[0][aspect_name]["systemMetadata"]
)
wait_for_writes_to_sync()
resp = auth_session.post(
f"{auth_session.gms_url()}/openapi/v1/trace/write/{original_trace_id}",
params={"onlyIncludeErrors": "false", "detailed": "true"},
json={urn: [aspect_name]},
)
assert resp.json() == {
urn: {
aspect_name: {
"success": True,
"primaryStorage": {"writeStatus": "ACTIVE_STATE"},
"searchStorage": {"writeStatus": "ACTIVE_STATE"},
}
}
}
# Perform 2nd write
resp = auth_session.post(
f"{auth_session.gms_url()}/openapi/v3/entity/dataset",
params={"async": "true", "systemMetadata": "true"},
json=[
{
"urn": urn,
aspect_name: {
"value": {"name": "updated", "customProperties": {}, "tags": []}
},
}
],
)
second_trace_id = compare_trace_header_system_metadata(
resp, resp.json()[0][aspect_name]["systemMetadata"]
)
wait_for_writes_to_sync()
resp = auth_session.post(
f"{auth_session.gms_url()}/openapi/v1/trace/write/{second_trace_id}",
params={"onlyIncludeErrors": "false", "detailed": "true", "skipCache": "true"},
json={urn: [aspect_name]},
)
assert resp.json() == {
urn: {
aspect_name: {
"success": True,
"primaryStorage": {"writeStatus": "ACTIVE_STATE"},
"searchStorage": {"writeStatus": "ACTIVE_STATE"},
}
}
}
resp = auth_session.post(
f"{auth_session.gms_url()}/openapi/v1/trace/write/{original_trace_id}",
params={"onlyIncludeErrors": "false", "detailed": "true"},
json={urn: [aspect_name]},
)
assert resp.json() == {
urn: {
aspect_name: {
"success": True,
"primaryStorage": {"writeStatus": "HISTORIC_STATE"},
"searchStorage": {"writeStatus": "HISTORIC_STATE"},
}
}
}
def test_missing_elasticsearch_async_write(auth_session):
urn = generated_urns["apiTraceDroppedElasticsearch"]
aspect_name = "status"
resp = auth_session.post(
f"{auth_session.gms_url()}/openapi/v3/entity/dataset",
params={"async": "true", "systemMetadata": "true"},
json=[{"urn": urn, aspect_name: {"value": {"removed": False}}}],
)
trace_id = compare_trace_header_system_metadata(
resp, resp.json()[0][aspect_name]["systemMetadata"]
)
wait_for_writes_to_sync()
resp = auth_session.post(
f"{auth_session.gms_url()}/openapi/v1/trace/write/{trace_id}",
params={"onlyIncludeErrors": "false", "detailed": "true"},
json={urn: [aspect_name]},
)
assert resp.json() == {
urn: {
aspect_name: {
"success": True,
"primaryStorage": {"writeStatus": "ACTIVE_STATE"},
"searchStorage": {"writeStatus": "ACTIVE_STATE"},
}
}
}
# Simulate overwrite
delete_elasticsearch_trace(trace_id)
resp = auth_session.post(
f"{auth_session.gms_url()}/openapi/v1/trace/write/{trace_id}",
params={"onlyIncludeErrors": "false", "detailed": "true", "skipCache": "true"},
json={urn: [aspect_name]},
)
assert resp.json() == {
urn: {
aspect_name: {
"success": True,
"primaryStorage": {"writeStatus": "ACTIVE_STATE"},
"searchStorage": {"writeStatus": "HISTORIC_STATE"},
}
}
}
# Simulate dropped write
delete_elasticsearch_system_metadata(urn)
resp = auth_session.post(
f"{auth_session.gms_url()}/openapi/v1/trace/write/{trace_id}",
params={"onlyIncludeErrors": "false", "detailed": "true", "skipCache": "true"},
json={urn: [aspect_name]},
)
assert resp.json() == {
urn: {
aspect_name: {
"success": False,
"primaryStorage": {"writeStatus": "ACTIVE_STATE"},
"searchStorage": {
"writeStatus": "ERROR",
"writeMessage": "Consumer has processed past the offset.",
},
}
}
}
def test_timeseries_async_write(auth_session):
urn = generated_urns["apiTraceTimeseries"]
aspect_name = "datasetProfile"
resp = auth_session.post(
f"{auth_session.gms_url()}/openapi/v3/entity/dataset",
params={"async": "true", "systemMetadata": "true"},
json=[
{
"urn": urn,
aspect_name: {
"value": {
"timestampMillis": time.time_ns() // 1_000_000,
"messageId": "test timeseries",
"rowCount": 1,
}
},
}
],
)
trace_id = compare_trace_header_system_metadata(
resp, resp.json()[0][aspect_name]["systemMetadata"]
)
wait_for_writes_to_sync()
resp = auth_session.post(
f"{auth_session.gms_url()}/openapi/v1/trace/write/{trace_id}",
params={"onlyIncludeErrors": "false", "detailed": "true"},
json={urn: [aspect_name]},
)
assert resp.json() == {
urn: {
aspect_name: {
"success": True,
"primaryStorage": {"writeStatus": "NO_OP"},
"searchStorage": {"writeStatus": "TRACE_NOT_IMPLEMENTED"},
}
}
}
def test_noop_async_write(auth_session):
urn = generated_urns["apiTraceNoop"]
aspect_name = "status"
resp = auth_session.post(
f"{auth_session.gms_url()}/openapi/v3/entity/dataset",
params={"async": "true", "systemMetadata": "true"},
json=[{"urn": urn, aspect_name: {"value": {"removed": False}}}],
)
trace_id = compare_trace_header_system_metadata(
resp, resp.json()[0][aspect_name]["systemMetadata"]
)
wait_for_writes_to_sync()
resp = auth_session.post(
f"{auth_session.gms_url()}/openapi/v1/trace/write/{trace_id}",
params={"onlyIncludeErrors": "false", "detailed": "true"},
json={urn: [aspect_name]},
)
assert resp.json() == {
urn: {
aspect_name: {
"success": True,
"primaryStorage": {"writeStatus": "ACTIVE_STATE"},
"searchStorage": {"writeStatus": "ACTIVE_STATE"},
}
}
}
resp = auth_session.post(
f"{auth_session.gms_url()}/openapi/v3/entity/dataset",
params={"async": "true", "systemMetadata": "true"},
json=[{"urn": urn, aspect_name: {"value": {"removed": False}}}],
)
trace_id = compare_trace_header_system_metadata(
resp, resp.json()[0][aspect_name]["systemMetadata"]
)
wait_for_writes_to_sync()
resp = auth_session.post(
f"{auth_session.gms_url()}/openapi/v1/trace/write/{trace_id}",
params={"onlyIncludeErrors": "false", "detailed": "true", "skipCache": "true"},
json={urn: [aspect_name]},
)
assert resp.json() == {
urn: {
aspect_name: {
"success": True,
"primaryStorage": {"writeStatus": "NO_OP"},
"searchStorage": {"writeStatus": "NO_OP"},
}
}
}
def test_noop_with_fmcp_async_write(auth_session):
urn = generated_urns["apiTraceNoopWithFMCP"]
aspect_name = "status"
resp = auth_session.post(
f"{auth_session.gms_url()}/openapi/v3/entity/dataset",
params={"async": "true", "systemMetadata": "true"},
json=[{"urn": urn, aspect_name: {"value": {"removed": False}}}],
)
trace_id = compare_trace_header_system_metadata(
resp, resp.json()[0][aspect_name]["systemMetadata"]
)
wait_for_writes_to_sync()
resp = auth_session.post(
f"{auth_session.gms_url()}/openapi/v1/trace/write/{trace_id}",
params={"onlyIncludeErrors": "false", "detailed": "true"},
json={urn: [aspect_name]},
)
assert resp.json() == {
urn: {
aspect_name: {
"success": True,
"primaryStorage": {"writeStatus": "ACTIVE_STATE"},
"searchStorage": {"writeStatus": "ACTIVE_STATE"},
}
}
}
resp = auth_session.post(
f"{auth_session.gms_url()}/openapi/v3/entity/dataset",
params={"async": "true", "systemMetadata": "true"},
json=[
{
"urn": urn,
aspect_name: {
"value": {"removed": False},
"headers": {"If-Version-Match": "-10000"},
},
}
],
)
trace_id = compare_trace_header_system_metadata(
resp, resp.json()[0][aspect_name]["systemMetadata"]
)
wait_for_writes_to_sync()
resp = auth_session.post(
f"{auth_session.gms_url()}/openapi/v1/trace/write/{trace_id}",
params={"onlyIncludeErrors": "false", "detailed": "true", "skipCache": "true"},
json={urn: [aspect_name]},
)
assert resp.json()[urn][aspect_name]["success"] is False
assert resp.json()[urn][aspect_name]["primaryStorage"]["writeStatus"] == "ERROR"
assert (
resp.json()[urn][aspect_name]["primaryStorage"]["writeExceptions"][0]["message"]
== "Expected version -10000, actual version 1"
)
assert resp.json()[urn][aspect_name]["searchStorage"] == {
"writeStatus": "ERROR",
"writeMessage": "Primary storage write failed.",
}
def compare_trace_header_system_metadata(resp, system_metadata):
header_trace_id = extract_trace_header(resp)
system_metadata_trace_id = extract_trace_system_metadata(system_metadata)
assert header_trace_id.startswith("00-" + system_metadata_trace_id)
return system_metadata_trace_id
def extract_trace_header(resp):
assert "traceparent" in resp.headers
return resp.headers["traceparent"]
def extract_trace_system_metadata(system_metadata):
assert "properties" in system_metadata
assert "telemetryTraceId" in system_metadata["properties"]
return system_metadata["properties"]["telemetryTraceId"]
def delete_elasticsearch_trace(trace_id, timeout=10, refresh_interval=1):
field_name = "telemetryTraceId"
index_name = "system_metadata_service_v1"
update_body = {
"query": {"term": {field_name: trace_id}},
"script": {"source": f"ctx._source.remove('{field_name}')"},
}
response = es.update_by_query(
index=index_name,
body=update_body,
conflicts="proceed",
timeout=timeout,
wait_for_completion=True,
)
if response.get("failures"):
raise Exception(
f"Update by query operation had failures: {response['failures']}"
)
time.sleep(refresh_interval)
def delete_elasticsearch_system_metadata(urn, timeout=10, refresh_interval=1):
index_name = "system_metadata_service_v1"
update_body = {"query": {"term": {"urn": urn}}}
response = es.delete_by_query(
index=index_name,
body=update_body,
conflicts="proceed",
timeout=timeout,
wait_for_completion=True,
)
if response.get("failures"):
raise Exception(
f"Update by query operation had failures: {response['failures']}"
)
time.sleep(refresh_interval)