mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-18 23:02:11 +00:00
487 lines
16 KiB
Python
487 lines
16 KiB
Python
![]() |
import time
|
||
|
|
||
|
import pytest
|
||
|
from opensearchpy import OpenSearch
|
||
|
|
||
|
from tests.utils import delete_urns, wait_for_writes_to_sync
|
||
|
|
||
|
es = OpenSearch(["http://localhost:9200"])
|
||
|
|
||
|
|
||
|
generated_urns = {
|
||
|
"apiTraceHappyPath": "urn:li:dataset:(urn:li:dataPlatform:test,apiTraceHappyPath,PROD)",
|
||
|
"apiTraceMCPFail": "urn:li:dataset:(urn:li:dataPlatform:test,apiTraceMCPFail,PROD)",
|
||
|
"apiTraceDroppedElasticsearch": "urn:li:dataset:(urn:li:dataPlatform:test,apiTraceDroppedElasticsearch,PROD)",
|
||
|
"apiTraceOverwritten": "urn:li:dataset:(urn:li:dataPlatform:test,apiTraceOverwritten,PROD)",
|
||
|
"apiTraceTimeseries": "urn:li:dataset:(urn:li:dataPlatform:test,apiTraceTimeseries,PROD)",
|
||
|
"apiTraceNoop": "urn:li:dataset:(urn:li:dataPlatform:test,apiTraceNoop,PROD)",
|
||
|
"apiTraceNoopWithFMCP": "urn:li:dataset:(urn:li:dataPlatform:test,apiTraceNoopWithFMCP,PROD)",
|
||
|
}
|
||
|
|
||
|
|
||
|
@pytest.fixture(scope="module", autouse=True)
|
||
|
def test_setup(graph_client):
|
||
|
"""Fixture to clean-up urns before and after a test is run"""
|
||
|
print("removing previous test data")
|
||
|
delete_urns(graph_client, list(generated_urns.values()))
|
||
|
wait_for_writes_to_sync()
|
||
|
yield
|
||
|
print("removing generated test data")
|
||
|
delete_urns(graph_client, list(generated_urns.values()))
|
||
|
wait_for_writes_to_sync()
|
||
|
|
||
|
|
||
|
def test_successful_async_write(auth_session):
|
||
|
urn = generated_urns["apiTraceHappyPath"]
|
||
|
aspect_name = "status"
|
||
|
|
||
|
resp = auth_session.post(
|
||
|
f"{auth_session.gms_url()}/openapi/v3/entity/dataset",
|
||
|
params={"async": "true", "systemMetadata": "true"},
|
||
|
json=[{"urn": urn, aspect_name: {"value": {"removed": False}}}],
|
||
|
)
|
||
|
|
||
|
trace_id = compare_trace_header_system_metadata(
|
||
|
resp, resp.json()[0][aspect_name]["systemMetadata"]
|
||
|
)
|
||
|
wait_for_writes_to_sync()
|
||
|
|
||
|
resp = auth_session.post(
|
||
|
f"{auth_session.gms_url()}/openapi/v1/trace/write/{trace_id}",
|
||
|
params={"onlyIncludeErrors": "false", "detailed": "true"},
|
||
|
json={urn: [aspect_name]},
|
||
|
)
|
||
|
assert resp.json() == {
|
||
|
urn: {
|
||
|
aspect_name: {
|
||
|
"success": True,
|
||
|
"primaryStorage": {"writeStatus": "ACTIVE_STATE"},
|
||
|
"searchStorage": {"writeStatus": "ACTIVE_STATE"},
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
|
||
|
def test_mcp_fail_aspect_async_write(auth_session):
|
||
|
urn = generated_urns["apiTraceMCPFail"]
|
||
|
aspect_name = "glossaryTerms"
|
||
|
|
||
|
resp = auth_session.post(
|
||
|
f"{auth_session.gms_url()}/openapi/v3/entity/dataset/{urn}/{aspect_name}",
|
||
|
params={"async": "true", "systemMetadata": "true"},
|
||
|
json={
|
||
|
"value": {
|
||
|
"terms": [{"urn": "urn:li:glossaryTerm:someTerm"}],
|
||
|
"auditStamp": {"time": 0, "actor": "urn:li:corpuser:datahub"},
|
||
|
},
|
||
|
"headers": {"If-Version-Match": "-10000"},
|
||
|
},
|
||
|
)
|
||
|
|
||
|
trace_id = compare_trace_header_system_metadata(
|
||
|
resp, resp.json()[aspect_name]["systemMetadata"]
|
||
|
)
|
||
|
wait_for_writes_to_sync()
|
||
|
|
||
|
resp = auth_session.post(
|
||
|
f"{auth_session.gms_url()}/openapi/v1/trace/write/{trace_id}",
|
||
|
params={"onlyIncludeErrors": "false", "detailed": "true", "skipCache": "true"},
|
||
|
json={urn: [aspect_name]},
|
||
|
)
|
||
|
assert resp.json()[urn][aspect_name]["success"] is False
|
||
|
assert resp.json()[urn][aspect_name]["primaryStorage"]["writeStatus"] == "ERROR"
|
||
|
assert (
|
||
|
resp.json()[urn][aspect_name]["primaryStorage"]["writeExceptions"][0]["message"]
|
||
|
== "Expected version -10000, actual version -1"
|
||
|
)
|
||
|
assert resp.json()[urn][aspect_name]["searchStorage"] == {
|
||
|
"writeStatus": "ERROR",
|
||
|
"writeMessage": "Primary storage write failed.",
|
||
|
}
|
||
|
|
||
|
|
||
|
def test_overwritten_async_write(auth_session):
|
||
|
urn = generated_urns["apiTraceOverwritten"]
|
||
|
aspect_name = "datasetProperties"
|
||
|
|
||
|
resp = auth_session.post(
|
||
|
f"{auth_session.gms_url()}/openapi/v3/entity/dataset",
|
||
|
params={"async": "true", "systemMetadata": "true"},
|
||
|
json=[
|
||
|
{
|
||
|
"urn": urn,
|
||
|
aspect_name: {
|
||
|
"value": {"name": "original", "customProperties": {}, "tags": []}
|
||
|
},
|
||
|
}
|
||
|
],
|
||
|
)
|
||
|
|
||
|
original_trace_id = compare_trace_header_system_metadata(
|
||
|
resp, resp.json()[0][aspect_name]["systemMetadata"]
|
||
|
)
|
||
|
wait_for_writes_to_sync()
|
||
|
|
||
|
resp = auth_session.post(
|
||
|
f"{auth_session.gms_url()}/openapi/v1/trace/write/{original_trace_id}",
|
||
|
params={"onlyIncludeErrors": "false", "detailed": "true"},
|
||
|
json={urn: [aspect_name]},
|
||
|
)
|
||
|
assert resp.json() == {
|
||
|
urn: {
|
||
|
aspect_name: {
|
||
|
"success": True,
|
||
|
"primaryStorage": {"writeStatus": "ACTIVE_STATE"},
|
||
|
"searchStorage": {"writeStatus": "ACTIVE_STATE"},
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
# Perform 2nd write
|
||
|
resp = auth_session.post(
|
||
|
f"{auth_session.gms_url()}/openapi/v3/entity/dataset",
|
||
|
params={"async": "true", "systemMetadata": "true"},
|
||
|
json=[
|
||
|
{
|
||
|
"urn": urn,
|
||
|
aspect_name: {
|
||
|
"value": {"name": "updated", "customProperties": {}, "tags": []}
|
||
|
},
|
||
|
}
|
||
|
],
|
||
|
)
|
||
|
|
||
|
second_trace_id = compare_trace_header_system_metadata(
|
||
|
resp, resp.json()[0][aspect_name]["systemMetadata"]
|
||
|
)
|
||
|
wait_for_writes_to_sync()
|
||
|
|
||
|
resp = auth_session.post(
|
||
|
f"{auth_session.gms_url()}/openapi/v1/trace/write/{second_trace_id}",
|
||
|
params={"onlyIncludeErrors": "false", "detailed": "true", "skipCache": "true"},
|
||
|
json={urn: [aspect_name]},
|
||
|
)
|
||
|
assert resp.json() == {
|
||
|
urn: {
|
||
|
aspect_name: {
|
||
|
"success": True,
|
||
|
"primaryStorage": {"writeStatus": "ACTIVE_STATE"},
|
||
|
"searchStorage": {"writeStatus": "ACTIVE_STATE"},
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
resp = auth_session.post(
|
||
|
f"{auth_session.gms_url()}/openapi/v1/trace/write/{original_trace_id}",
|
||
|
params={"onlyIncludeErrors": "false", "detailed": "true"},
|
||
|
json={urn: [aspect_name]},
|
||
|
)
|
||
|
assert resp.json() == {
|
||
|
urn: {
|
||
|
aspect_name: {
|
||
|
"success": True,
|
||
|
"primaryStorage": {"writeStatus": "HISTORIC_STATE"},
|
||
|
"searchStorage": {"writeStatus": "HISTORIC_STATE"},
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
|
||
|
def test_missing_elasticsearch_async_write(auth_session):
|
||
|
urn = generated_urns["apiTraceDroppedElasticsearch"]
|
||
|
aspect_name = "status"
|
||
|
|
||
|
resp = auth_session.post(
|
||
|
f"{auth_session.gms_url()}/openapi/v3/entity/dataset",
|
||
|
params={"async": "true", "systemMetadata": "true"},
|
||
|
json=[{"urn": urn, aspect_name: {"value": {"removed": False}}}],
|
||
|
)
|
||
|
|
||
|
trace_id = compare_trace_header_system_metadata(
|
||
|
resp, resp.json()[0][aspect_name]["systemMetadata"]
|
||
|
)
|
||
|
wait_for_writes_to_sync()
|
||
|
|
||
|
resp = auth_session.post(
|
||
|
f"{auth_session.gms_url()}/openapi/v1/trace/write/{trace_id}",
|
||
|
params={"onlyIncludeErrors": "false", "detailed": "true"},
|
||
|
json={urn: [aspect_name]},
|
||
|
)
|
||
|
assert resp.json() == {
|
||
|
urn: {
|
||
|
aspect_name: {
|
||
|
"success": True,
|
||
|
"primaryStorage": {"writeStatus": "ACTIVE_STATE"},
|
||
|
"searchStorage": {"writeStatus": "ACTIVE_STATE"},
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
# Simulate overwrite
|
||
|
delete_elasticsearch_trace(trace_id)
|
||
|
|
||
|
resp = auth_session.post(
|
||
|
f"{auth_session.gms_url()}/openapi/v1/trace/write/{trace_id}",
|
||
|
params={"onlyIncludeErrors": "false", "detailed": "true", "skipCache": "true"},
|
||
|
json={urn: [aspect_name]},
|
||
|
)
|
||
|
assert resp.json() == {
|
||
|
urn: {
|
||
|
aspect_name: {
|
||
|
"success": True,
|
||
|
"primaryStorage": {"writeStatus": "ACTIVE_STATE"},
|
||
|
"searchStorage": {"writeStatus": "HISTORIC_STATE"},
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
# Simulate dropped write
|
||
|
delete_elasticsearch_system_metadata(urn)
|
||
|
|
||
|
resp = auth_session.post(
|
||
|
f"{auth_session.gms_url()}/openapi/v1/trace/write/{trace_id}",
|
||
|
params={"onlyIncludeErrors": "false", "detailed": "true", "skipCache": "true"},
|
||
|
json={urn: [aspect_name]},
|
||
|
)
|
||
|
assert resp.json() == {
|
||
|
urn: {
|
||
|
aspect_name: {
|
||
|
"success": False,
|
||
|
"primaryStorage": {"writeStatus": "ACTIVE_STATE"},
|
||
|
"searchStorage": {
|
||
|
"writeStatus": "ERROR",
|
||
|
"writeMessage": "Consumer has processed past the offset.",
|
||
|
},
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
|
||
|
def test_timeseries_async_write(auth_session):
|
||
|
urn = generated_urns["apiTraceTimeseries"]
|
||
|
aspect_name = "datasetProfile"
|
||
|
|
||
|
resp = auth_session.post(
|
||
|
f"{auth_session.gms_url()}/openapi/v3/entity/dataset",
|
||
|
params={"async": "true", "systemMetadata": "true"},
|
||
|
json=[
|
||
|
{
|
||
|
"urn": urn,
|
||
|
aspect_name: {
|
||
|
"value": {
|
||
|
"timestampMillis": time.time_ns() // 1_000_000,
|
||
|
"messageId": "test timeseries",
|
||
|
"rowCount": 1,
|
||
|
}
|
||
|
},
|
||
|
}
|
||
|
],
|
||
|
)
|
||
|
|
||
|
trace_id = compare_trace_header_system_metadata(
|
||
|
resp, resp.json()[0][aspect_name]["systemMetadata"]
|
||
|
)
|
||
|
wait_for_writes_to_sync()
|
||
|
|
||
|
resp = auth_session.post(
|
||
|
f"{auth_session.gms_url()}/openapi/v1/trace/write/{trace_id}",
|
||
|
params={"onlyIncludeErrors": "false", "detailed": "true"},
|
||
|
json={urn: [aspect_name]},
|
||
|
)
|
||
|
assert resp.json() == {
|
||
|
urn: {
|
||
|
aspect_name: {
|
||
|
"success": True,
|
||
|
"primaryStorage": {"writeStatus": "NO_OP"},
|
||
|
"searchStorage": {"writeStatus": "TRACE_NOT_IMPLEMENTED"},
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
|
||
|
def test_noop_async_write(auth_session):
|
||
|
urn = generated_urns["apiTraceNoop"]
|
||
|
aspect_name = "status"
|
||
|
|
||
|
resp = auth_session.post(
|
||
|
f"{auth_session.gms_url()}/openapi/v3/entity/dataset",
|
||
|
params={"async": "true", "systemMetadata": "true"},
|
||
|
json=[{"urn": urn, aspect_name: {"value": {"removed": False}}}],
|
||
|
)
|
||
|
|
||
|
trace_id = compare_trace_header_system_metadata(
|
||
|
resp, resp.json()[0][aspect_name]["systemMetadata"]
|
||
|
)
|
||
|
wait_for_writes_to_sync()
|
||
|
|
||
|
resp = auth_session.post(
|
||
|
f"{auth_session.gms_url()}/openapi/v1/trace/write/{trace_id}",
|
||
|
params={"onlyIncludeErrors": "false", "detailed": "true"},
|
||
|
json={urn: [aspect_name]},
|
||
|
)
|
||
|
assert resp.json() == {
|
||
|
urn: {
|
||
|
aspect_name: {
|
||
|
"success": True,
|
||
|
"primaryStorage": {"writeStatus": "ACTIVE_STATE"},
|
||
|
"searchStorage": {"writeStatus": "ACTIVE_STATE"},
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
resp = auth_session.post(
|
||
|
f"{auth_session.gms_url()}/openapi/v3/entity/dataset",
|
||
|
params={"async": "true", "systemMetadata": "true"},
|
||
|
json=[{"urn": urn, aspect_name: {"value": {"removed": False}}}],
|
||
|
)
|
||
|
|
||
|
trace_id = compare_trace_header_system_metadata(
|
||
|
resp, resp.json()[0][aspect_name]["systemMetadata"]
|
||
|
)
|
||
|
wait_for_writes_to_sync()
|
||
|
|
||
|
resp = auth_session.post(
|
||
|
f"{auth_session.gms_url()}/openapi/v1/trace/write/{trace_id}",
|
||
|
params={"onlyIncludeErrors": "false", "detailed": "true", "skipCache": "true"},
|
||
|
json={urn: [aspect_name]},
|
||
|
)
|
||
|
assert resp.json() == {
|
||
|
urn: {
|
||
|
aspect_name: {
|
||
|
"success": True,
|
||
|
"primaryStorage": {"writeStatus": "NO_OP"},
|
||
|
"searchStorage": {"writeStatus": "NO_OP"},
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
|
||
|
def test_noop_with_fmcp_async_write(auth_session):
|
||
|
urn = generated_urns["apiTraceNoopWithFMCP"]
|
||
|
aspect_name = "status"
|
||
|
|
||
|
resp = auth_session.post(
|
||
|
f"{auth_session.gms_url()}/openapi/v3/entity/dataset",
|
||
|
params={"async": "true", "systemMetadata": "true"},
|
||
|
json=[{"urn": urn, aspect_name: {"value": {"removed": False}}}],
|
||
|
)
|
||
|
|
||
|
trace_id = compare_trace_header_system_metadata(
|
||
|
resp, resp.json()[0][aspect_name]["systemMetadata"]
|
||
|
)
|
||
|
wait_for_writes_to_sync()
|
||
|
|
||
|
resp = auth_session.post(
|
||
|
f"{auth_session.gms_url()}/openapi/v1/trace/write/{trace_id}",
|
||
|
params={"onlyIncludeErrors": "false", "detailed": "true"},
|
||
|
json={urn: [aspect_name]},
|
||
|
)
|
||
|
assert resp.json() == {
|
||
|
urn: {
|
||
|
aspect_name: {
|
||
|
"success": True,
|
||
|
"primaryStorage": {"writeStatus": "ACTIVE_STATE"},
|
||
|
"searchStorage": {"writeStatus": "ACTIVE_STATE"},
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
resp = auth_session.post(
|
||
|
f"{auth_session.gms_url()}/openapi/v3/entity/dataset",
|
||
|
params={"async": "true", "systemMetadata": "true"},
|
||
|
json=[
|
||
|
{
|
||
|
"urn": urn,
|
||
|
aspect_name: {
|
||
|
"value": {"removed": False},
|
||
|
"headers": {"If-Version-Match": "-10000"},
|
||
|
},
|
||
|
}
|
||
|
],
|
||
|
)
|
||
|
|
||
|
trace_id = compare_trace_header_system_metadata(
|
||
|
resp, resp.json()[0][aspect_name]["systemMetadata"]
|
||
|
)
|
||
|
wait_for_writes_to_sync()
|
||
|
|
||
|
resp = auth_session.post(
|
||
|
f"{auth_session.gms_url()}/openapi/v1/trace/write/{trace_id}",
|
||
|
params={"onlyIncludeErrors": "false", "detailed": "true", "skipCache": "true"},
|
||
|
json={urn: [aspect_name]},
|
||
|
)
|
||
|
assert resp.json()[urn][aspect_name]["success"] is False
|
||
|
assert resp.json()[urn][aspect_name]["primaryStorage"]["writeStatus"] == "ERROR"
|
||
|
assert (
|
||
|
resp.json()[urn][aspect_name]["primaryStorage"]["writeExceptions"][0]["message"]
|
||
|
== "Expected version -10000, actual version 1"
|
||
|
)
|
||
|
assert resp.json()[urn][aspect_name]["searchStorage"] == {
|
||
|
"writeStatus": "ERROR",
|
||
|
"writeMessage": "Primary storage write failed.",
|
||
|
}
|
||
|
|
||
|
|
||
|
def compare_trace_header_system_metadata(resp, system_metadata):
|
||
|
header_trace_id = extract_trace_header(resp)
|
||
|
system_metadata_trace_id = extract_trace_system_metadata(system_metadata)
|
||
|
assert header_trace_id.startswith("00-" + system_metadata_trace_id)
|
||
|
return system_metadata_trace_id
|
||
|
|
||
|
|
||
|
def extract_trace_header(resp):
|
||
|
assert "traceparent" in resp.headers
|
||
|
return resp.headers["traceparent"]
|
||
|
|
||
|
|
||
|
def extract_trace_system_metadata(system_metadata):
|
||
|
assert "properties" in system_metadata
|
||
|
assert "telemetryTraceId" in system_metadata["properties"]
|
||
|
return system_metadata["properties"]["telemetryTraceId"]
|
||
|
|
||
|
|
||
|
def delete_elasticsearch_trace(trace_id, timeout=10, refresh_interval=1):
|
||
|
field_name = "telemetryTraceId"
|
||
|
index_name = "system_metadata_service_v1"
|
||
|
|
||
|
update_body = {
|
||
|
"query": {"term": {field_name: trace_id}},
|
||
|
"script": {"source": f"ctx._source.remove('{field_name}')"},
|
||
|
}
|
||
|
|
||
|
response = es.update_by_query(
|
||
|
index=index_name,
|
||
|
body=update_body,
|
||
|
conflicts="proceed",
|
||
|
timeout=timeout,
|
||
|
wait_for_completion=True,
|
||
|
)
|
||
|
|
||
|
if response.get("failures"):
|
||
|
raise Exception(
|
||
|
f"Update by query operation had failures: {response['failures']}"
|
||
|
)
|
||
|
|
||
|
time.sleep(refresh_interval)
|
||
|
|
||
|
|
||
|
def delete_elasticsearch_system_metadata(urn, timeout=10, refresh_interval=1):
|
||
|
index_name = "system_metadata_service_v1"
|
||
|
|
||
|
update_body = {"query": {"term": {"urn": urn}}}
|
||
|
|
||
|
response = es.delete_by_query(
|
||
|
index=index_name,
|
||
|
body=update_body,
|
||
|
conflicts="proceed",
|
||
|
timeout=timeout,
|
||
|
wait_for_completion=True,
|
||
|
)
|
||
|
|
||
|
if response.get("failures"):
|
||
|
raise Exception(
|
||
|
f"Update by query operation had failures: {response['failures']}"
|
||
|
)
|
||
|
|
||
|
time.sleep(refresh_interval)
|