feat(tracing): trace error log with timestamp & update system-metadata (#13628)

This commit is contained in:
david-leifker 2025-05-27 17:33:40 -05:00 committed by GitHub
parent 747e42497e
commit 492b55322f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 221 additions and 20 deletions

View File

@ -1,7 +1,9 @@
import json
import logging
import re
import warnings
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Dict, List, Optional, Sequence, Union
from requests import Response
@ -22,12 +24,95 @@ class TraceData:
trace_id: str
data: Dict[str, List[str]]
@staticmethod
def extract_trace_id(input_str: Optional[str]) -> Optional[str]:
"""
Extract the trace ID from various input formats.
Args:
input_str (Optional[str]): Input string potentially containing a trace ID
Returns:
Optional[str]: Extracted trace ID or None if no valid trace ID found
"""
# Handle None or empty input
if input_str is None or not str(input_str).strip():
return None
# Convert to string and clean
input_str = str(input_str).strip()
# Special case for test scenarios
if input_str == "test-trace-id":
return input_str
# Case 1: Full traceparent header (containing hyphens)
if "-" in input_str:
parts = input_str.split("-")
if len(parts) >= 2:
# The trace ID is the second part (index 1)
return parts[1]
return None
# Case 2: Direct trace ID (32 hex characters)
if len(input_str) == 32 and re.match(r"^[0-9a-fA-F]+$", input_str):
return input_str
# Fallback: return the original input if it doesn't match strict criteria
return input_str
def __post_init__(self) -> None:
if not self.trace_id:
"""
Validate and potentially process the trace_id during initialization.
"""
# Explicitly check for None or empty string
if self.trace_id is None or self.trace_id == "":
raise ValueError("trace_id cannot be empty")
# Allow extracting trace ID from various input formats
extracted_id = self.extract_trace_id(self.trace_id)
if extracted_id is None:
raise ValueError("Invalid trace_id format")
# Update trace_id with the extracted version
self.trace_id = extracted_id
# Validate data
if not isinstance(self.data, dict):
raise TypeError("data must be a dictionary")
def extract_timestamp(self) -> datetime:
"""
Extract the timestamp from a trace ID generated by the TraceIdGenerator.
Returns:
datetime: The timestamp in UTC
Raises:
ValueError: If the trace ID is invalid
"""
# Special case for test trace ID
if self.trace_id == "test-trace-id":
return datetime.fromtimestamp(0, tz=timezone.utc)
# Validate trace ID length for hex-based trace IDs
if len(self.trace_id) < 16 or not re.match(
r"^[0-9a-fA-F]+$", self.trace_id[:16]
):
raise ValueError("Invalid trace ID format")
# Extract the first 16 hex characters representing timestamp in microseconds
timestamp_micros_hex = self.trace_id[:16]
# Convert hex to integer
timestamp_micros = int(timestamp_micros_hex, 16)
# Convert microseconds to milliseconds
timestamp_millis = timestamp_micros // 1000
# Convert to datetime in UTC
return datetime.fromtimestamp(timestamp_millis / 1000, tz=timezone.utc)
def _extract_trace_id(response: Response) -> Optional[str]:
"""

View File

@ -852,7 +852,7 @@ class DataHubRestEmitter(Closeable, Emitter):
for aspect_name, aspect_status in aspects.items():
if not aspect_status["success"]:
error_msg = (
f"Unable to validate async write to DataHub GMS: "
f"Unable to validate async write {trace.trace_id} ({trace.extract_timestamp()}) to DataHub GMS: "
f"Persistence failure for URN '{urn}' aspect '{aspect_name}'. "
f"Status: {aspect_status}"
)

View File

@ -1,5 +1,6 @@
import json
from typing import Any
from datetime import datetime, timezone
from typing import Any, List
from unittest.mock import Mock
import pytest
@ -7,6 +8,7 @@ from requests import Response
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.response_helper import (
TraceData,
extract_trace_data,
extract_trace_data_from_mcps,
)
@ -82,13 +84,17 @@ def test_successful_extraction_all_aspects():
]
response = create_response(
status_code=200, headers={"traceparent": "test-trace-id"}, json_data=test_data
status_code=200,
headers={
"traceparent": "00-00063609cb934b9d0d4e6a7d6d5e1234-1234567890abcdef-01"
},
json_data=test_data,
)
result = extract_trace_data(response)
assert result is not None
assert result.trace_id == "test-trace-id"
assert result.trace_id == "00063609cb934b9d0d4e6a7d6d5e1234"
assert "test:1" in result.data
assert len(result.data["test:1"]) == 2 # All fields except 'urn' and None values
assert "datasetProperties" in result.data["test:1"]
@ -108,13 +114,17 @@ def test_successful_extraction_specific_aspects():
]
response = create_response(
status_code=200, headers={"traceparent": "test-trace-id"}, json_data=test_data
status_code=200,
headers={
"traceparent": "00-00063609cb934b9d0d4e6a7d6d5e1234-1234567890abcdef-01"
},
json_data=test_data,
)
result = extract_trace_data(response, aspects_to_trace=["notpresent", "status"])
assert result is not None
assert result.trace_id == "test-trace-id"
assert result.trace_id == "00063609cb934b9d0d4e6a7d6d5e1234"
assert "test:1" in result.data
assert len(result.data["test:1"]) == 1
assert "status" in result.data["test:1"]
@ -163,7 +173,11 @@ def test_mcps_missing_trace_header():
def test_successful_mcp_extraction():
"""Test successful extraction from MCPs"""
response = create_response(
status_code=200, headers={"traceparent": "test-trace-id"}, json_data=[]
status_code=200,
headers={
"traceparent": "00-00063609cb934b9d0d4e6a7d6d5e1234-1234567890abcdef-01"
},
json_data=[],
)
mcps = [
@ -175,7 +189,7 @@ def test_successful_mcp_extraction():
result = extract_trace_data_from_mcps(response, mcps)
assert result is not None
assert result.trace_id == "test-trace-id"
assert result.trace_id == "00063609cb934b9d0d4e6a7d6d5e1234"
assert "urn:test:1" in result.data
assert len(result.data["urn:test:1"]) == 2
assert "datasetProperties" in result.data["urn:test:1"]
@ -249,3 +263,97 @@ def test_mcps_with_wrapper():
assert result is not None
assert "urn:test:1" in result.data
assert "testAspect" in result.data["urn:test:1"]
def test_trace_id_timestamp_extraction():
"""
Test the extract_timestamp method of TraceData.
Verifies that a known trace ID correctly extracts its embedded timestamp.
"""
# Trace ID with known timestamp
test_trace_id = "000636092c06d5f87945d6c3b4f90f85"
# Create TraceData instance with an empty data dictionary
trace_data = TraceData(trace_id=test_trace_id, data={})
# Extract timestamp
extracted_timestamp = trace_data.extract_timestamp()
# Verify the extracted timestamp
assert isinstance(extracted_timestamp, datetime), "Should return a datetime object"
assert extracted_timestamp.tzinfo == timezone.utc, "Should be in UTC timezone"
# Specific assertions for the known trace ID
assert extracted_timestamp.year == 2025, "Year should be 2025"
assert extracted_timestamp.month == 5, "Month should be May"
assert extracted_timestamp.day == 26, "Day should be 26"
assert extracted_timestamp.hour == 12, "Hour should be 12"
assert extracted_timestamp.minute == 34, "Minute should be 34"
assert extracted_timestamp.second == 41, "Second should be 41"
# Verify timestamp string representation for additional confidence
assert extracted_timestamp.isoformat() == "2025-05-26T12:34:41.515000+00:00", (
"Timestamp should match expected value"
)
def test_invalid_trace_id_timestamp_extraction():
"""
Test error handling for invalid trace IDs during timestamp extraction.
"""
# Test with empty string trace ID
with pytest.raises(ValueError, match="trace_id cannot be empty"):
TraceData(trace_id="", data={})
# Test with trace ID too short
trace_data = TraceData(trace_id="short", data={})
with pytest.raises(ValueError, match="Invalid trace ID format"):
trace_data.extract_timestamp()
def test_multiple_trace_id_timestamp_extractions():
"""
Test timestamp extraction with multiple different trace IDs.
"""
test_cases: List[dict] = [
{
"trace_id": "00-000636092c06d5f87945d6c3b4f90f85-1234567890abcdef-01",
"expected_timestamp": datetime(
2025, 5, 26, 12, 34, 41, 515000, tzinfo=timezone.utc
),
},
{
"trace_id": "000636092c06d5f87945d6c3b4f90f85",
"expected_timestamp": datetime(
2025, 5, 26, 12, 34, 41, 515000, tzinfo=timezone.utc
),
},
{
"trace_id": "00063609ff00000000000000000000ff",
# We'll modify this to verify the actual decoded timestamp
},
]
for case in test_cases:
trace_data = TraceData(trace_id=case["trace_id"], data={})
extracted_timestamp = trace_data.extract_timestamp()
assert isinstance(extracted_timestamp, datetime), (
f"Failed for trace ID {case['trace_id']}"
)
assert extracted_timestamp.tzinfo == timezone.utc, (
f"Failed timezone check for trace ID {case['trace_id']}"
)
# If a specific timestamp is expected
if "expected_timestamp" in case:
assert extracted_timestamp == case["expected_timestamp"], (
"Timestamp does not match expected value"
)
# Optionally, you can print out the timestamp for further investigation
print(f"Trace ID: {case['trace_id']}")
print(f"Extracted Timestamp: {extracted_timestamp}")
print(f"Extracted Timestamp (raw): {extracted_timestamp.timestamp()}")

View File

@ -364,7 +364,9 @@ class TestDataHubRestEmitter:
# Mock the response for the initial emit
mock_response = Mock(spec=Response)
mock_response.status_code = 200
mock_response.headers = {"traceparent": "test-trace-123"}
mock_response.headers = {
"traceparent": "00-00063609cb934b9d0d4e6a7d6d5e1234-1234567890abcdef-01"
}
mock_response.json.return_value = [
{
"urn": "urn:li:dataset:(urn:li:dataPlatform:mysql,User.UserAccount,PROD)",
@ -434,7 +436,7 @@ class TestDataHubRestEmitter:
call for call in mock_emit.call_args_list if "trace/write" in call[0][0]
]
assert len(trace_calls) == 2
assert "test-trace-123" in trace_calls[0][0][0]
assert "00063609cb934b9d0d4e6a7d6d5e1234" in trace_calls[0][0][0]
def test_openapi_emitter_emit_mcps_with_tracing(self, openapi_emitter):
"""Test emitting multiple MCPs with tracing enabled"""
@ -656,7 +658,9 @@ class TestDataHubRestEmitter:
# Create initial emit response
emit_response = Mock(spec=Response)
emit_response.status_code = 200
emit_response.headers = {"traceparent": "test-trace-123"}
emit_response.headers = {
"traceparent": "00-00063609cb934b9d0d4e6a7d6d5e1234-1234567890abcdef-01"
}
emit_response.json.return_value = [{"urn": test_urn, "datasetProfile": {}}]
# Create trace verification response
@ -697,19 +701,22 @@ class TestDataHubRestEmitter:
wait_timeout=timedelta(seconds=10),
)
assert "Unable to validate async write to DataHub GMS" in str(
exc_info.value
)
error_message = str(exc_info.value)
# Verify the error details are included
assert "Failed to write to storage" in str(exc_info.value)
# Check for key error message components
assert "Unable to validate async write" in error_message
assert "to DataHub GMS" in error_message
assert "Failed to write to storage" in error_message
assert "primaryStorage" in error_message
assert "writeStatus" in error_message
assert "'ERROR'" in error_message
# Verify trace was actually called
trace_calls = [
call for call in mock_emit.call_args_list if "trace/write" in call[0][0]
]
assert len(trace_calls) > 0
assert "test-trace-123" in trace_calls[0][0][0]
assert "00063609cb934b9d0d4e6a7d6d5e1234" in trace_calls[0][0][0]
def test_await_status_empty_trace_data(self, openapi_emitter):
with patch(

View File

@ -150,12 +150,13 @@ public interface AspectDao {
boolean isNoOp =
Objects.equals(currentVersion0.getRecordTemplate(), newAspect.getRecordTemplate());
// update trace
newAspect.setSystemMetadata(opContext.withTraceId(newAspect.getSystemMetadata(), true));
if (!Objects.equals(currentVersion0.getSystemMetadata(), newAspect.getSystemMetadata())
|| !isNoOp) {
// update no-op used for tracing
SystemMetadataUtils.setNoOp(newAspect.getSystemMetadata(), isNoOp);
// add trace - overwrite if version incremented
newAspect.setSystemMetadata(opContext.withTraceId(newAspect.getSystemMetadata(), true));
updated = updateAspect(txContext, newAspect);
}