fix(ingest): warn when API tracing is unexpectedly inactive (#13007)

This commit is contained in:
Harshal Sheth 2025-03-27 22:21:33 -07:00 committed by GitHub
parent c9999d4cf0
commit 29e09185eb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 55 additions and 43 deletions

View File

@ -74,6 +74,8 @@ filterwarnings =
ignore::datahub.configuration.pydantic_migration_helpers.PydanticDeprecatedSince20
ignore::datahub.configuration.common.ConfigurationWarning
ignore:The new datahub SDK:datahub.errors.ExperimentalWarning
# We should not be unexpectedly seeing API tracing warnings.
error::datahub.errors.APITracingWarning
[coverage:run]
# Because of some quirks in the way setup.cfg, coverage.py, pytest-cov,

View File

@ -125,9 +125,7 @@ def parse_ts_millis(ts: Optional[float]) -> Optional[datetime]:
def make_data_platform_urn(platform: str) -> str:
if platform.startswith("urn:li:dataPlatform:"):
return platform
return DataPlatformUrn.create_from_id(platform).urn()
return DataPlatformUrn(platform).urn()
def make_dataset_urn(platform: str, name: str, env: str = DEFAULT_ENV) -> str:

View File

@ -1,17 +1,21 @@
import json
import logging
import warnings
from dataclasses import dataclass
from typing import Dict, List, Optional, Sequence, Union
from requests import Response
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.errors import APITracingWarning
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
MetadataChangeProposal,
)
logger = logging.getLogger(__name__)
_TRACE_HEADER_NAME = "traceparent"
@dataclass
class TraceData:
@ -25,14 +29,11 @@ class TraceData:
raise TypeError("data must be a dictionary")
def _extract_trace_id(
response: Response, trace_header: str = "traceparent"
) -> Optional[str]:
def _extract_trace_id(response: Response) -> Optional[str]:
"""
Extract trace ID from response headers.
Args:
response: HTTP response object
trace_header: Name of the trace header to use
Returns:
Trace ID if found and response is valid, None otherwise
"""
@ -40,9 +41,17 @@ def _extract_trace_id(
logger.debug(f"Invalid status code: {response.status_code}")
return None
trace_id = response.headers.get(trace_header)
trace_id = response.headers.get(_TRACE_HEADER_NAME)
if not trace_id:
logger.debug(f"Missing trace header: {trace_header}")
# This will only be printed if
# 1. we're in async mode (checked by the caller)
# 2. the server did not return a trace ID
logger.debug(f"Missing trace header: {_TRACE_HEADER_NAME}")
warnings.warn(
"No trace ID found in response headers. API tracing is not active - likely due to an outdated server version.",
APITracingWarning,
stacklevel=3,
)
return None
return trace_id
@ -51,20 +60,19 @@ def _extract_trace_id(
def extract_trace_data(
response: Response,
aspects_to_trace: Optional[List[str]] = None,
trace_header: str = "traceparent",
) -> Optional[TraceData]:
"""
Extract trace data from a response object.
"""Extract trace data from a response object.
If we run into a JSONDecodeError, we'll log an error and return None.
Args:
response: HTTP response object
aspects_to_trace: Optional list of aspect names to extract. If None, extracts all aspects.
trace_header: Name of the trace header to use (default: "traceparent")
Returns:
TraceData object if successful, None otherwise
Raises:
JSONDecodeError: If response body cannot be decoded as JSON
"""
trace_id = _extract_trace_id(response, trace_header)
trace_id = _extract_trace_id(response)
if not trace_id:
return None
@ -104,19 +112,18 @@ def extract_trace_data_from_mcps(
response: Response,
mcps: Sequence[Union[MetadataChangeProposal, MetadataChangeProposalWrapper]],
aspects_to_trace: Optional[List[str]] = None,
trace_header: str = "traceparent",
) -> Optional[TraceData]:
"""
Extract trace data from a response object and populate data from provided MCPs.
"""Extract trace data from a response object and populate data from provided MCPs.
Args:
response: HTTP response object used only for trace_id extraction
mcps: List of MCP URN and aspect data
aspects_to_trace: Optional list of aspect names to extract. If None, extracts all aspects.
trace_header: Name of the trace header to use (default: "traceparent")
Returns:
TraceData object if successful, None otherwise
"""
trace_id = _extract_trace_id(response, trace_header)
trace_id = _extract_trace_id(response)
if not trace_id:
return None

View File

@ -5,6 +5,7 @@ import json
import logging
import os
import time
import warnings
from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime, timedelta
@ -50,6 +51,7 @@ from datahub.emitter.response_helper import (
extract_trace_data_from_mcps,
)
from datahub.emitter.serialization_helper import pre_json_transform
from datahub.errors import APITracingWarning
from datahub.ingestion.api.closeable import Closeable
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
MetadataChangeEvent,
@ -749,6 +751,12 @@ class DataHubRestEmitter(Closeable, Emitter):
trace_flag if trace_flag is not None else self._default_trace_mode
)
resolved_async_flag = async_flag if async_flag is not None else async_default
if resolved_trace_flag and not resolved_async_flag:
warnings.warn(
"API tracing is only available with async ingestion. For sync mode, API errors will be surfaced as exceptions.",
APITracingWarning,
stacklevel=3,
)
return resolved_trace_flag and resolved_async_flag
def __repr__(self) -> str:

View File

@ -33,3 +33,7 @@ class MultipleSubtypesWarning(Warning):
class ExperimentalWarning(Warning):
pass
class APITracingWarning(Warning):
pass

View File

@ -2,6 +2,7 @@ import json
from typing import Any
from unittest.mock import Mock
import pytest
from requests import Response
from datahub.emitter.mcp import MetadataChangeProposalWrapper
@ -9,6 +10,7 @@ from datahub.emitter.response_helper import (
extract_trace_data,
extract_trace_data_from_mcps,
)
from datahub.errors import APITracingWarning
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeProposal
@ -41,7 +43,8 @@ def test_invalid_status_code():
def test_missing_trace_header():
"""Test that missing trace header returns None"""
response = create_response(status_code=200, headers={}, json_data=[])
result = extract_trace_data(response)
with pytest.warns(APITracingWarning):
result = extract_trace_data(response)
assert result is None
@ -138,22 +141,6 @@ def test_missing_urn():
assert "test:2" in result.data
def test_custom_trace_header():
"""Test using a custom trace header"""
response = create_response(
status_code=200,
headers={"custom-trace": "test-trace-id"},
json_data=[{"urn": "test:1", "status": {"removed": False}}],
)
result = extract_trace_data(response, trace_header="custom-trace")
assert result is not None
assert result.trace_id == "test-trace-id"
assert "test:1" in result.data
assert len(result.data["test:1"]) == 1
def test_mcps_invalid_status_code():
"""Test that non-200 status codes return None for MCPs"""
response = create_response(
@ -168,7 +155,8 @@ def test_mcps_missing_trace_header():
"""Test that missing trace header returns None for MCPs"""
response = create_response(status_code=200, headers={}, json_data=[])
mcps = [create_mcp("urn:test:1", "testAspect")]
result = extract_trace_data_from_mcps(response, mcps)
with pytest.warns(APITracingWarning):
result = extract_trace_data_from_mcps(response, mcps)
assert result is None

View File

@ -16,6 +16,7 @@ from datahub.emitter.rest_emitter import (
DatahubRestEmitter,
logger,
)
from datahub.errors import APITracingWarning
from datahub.metadata.com.linkedin.pegasus2avro.common import (
Status,
)
@ -514,10 +515,14 @@ def test_openapi_emitter_missing_trace_header(openapi_emitter):
aspect=Status(removed=False),
)
# Should not raise exception but log warning
openapi_emitter.emit_mcp(
item, async_flag=True, trace_flag=True, trace_timeout=timedelta(seconds=10)
)
# Should not raise exception but log a warning.
with pytest.warns(APITracingWarning):
openapi_emitter.emit_mcp(
item,
async_flag=True,
trace_flag=True,
trace_timeout=timedelta(seconds=10),
)
def test_openapi_emitter_invalid_status_code(openapi_emitter):