From 29e09185eb6104d2b674e56622402810ce48a919 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Thu, 27 Mar 2025 22:21:33 -0700 Subject: [PATCH] fix(ingest): warn when API tracing is unexpectedly inactive (#13007) --- metadata-ingestion/setup.cfg | 2 + .../src/datahub/emitter/mce_builder.py | 4 +- .../src/datahub/emitter/response_helper.py | 43 +++++++++++-------- .../src/datahub/emitter/rest_emitter.py | 8 ++++ metadata-ingestion/src/datahub/errors.py | 4 ++ .../datahub/emitter/test_response_helper.py | 24 +++-------- .../tests/unit/sdk/test_rest_emitter.py | 13 ++++-- 7 files changed, 55 insertions(+), 43 deletions(-) diff --git a/metadata-ingestion/setup.cfg b/metadata-ingestion/setup.cfg index f8dab7f824..4cc8b1c79e 100644 --- a/metadata-ingestion/setup.cfg +++ b/metadata-ingestion/setup.cfg @@ -74,6 +74,8 @@ filterwarnings = ignore::datahub.configuration.pydantic_migration_helpers.PydanticDeprecatedSince20 ignore::datahub.configuration.common.ConfigurationWarning ignore:The new datahub SDK:datahub.errors.ExperimentalWarning + # We should not be unexpectedly seeing API tracing warnings. + error::datahub.errors.APITracingWarning [coverage:run] # Because of some quirks in the way setup.cfg, coverage.py, pytest-cov, diff --git a/metadata-ingestion/src/datahub/emitter/mce_builder.py b/metadata-ingestion/src/datahub/emitter/mce_builder.py index 7d6f0bdcd8..42bb426f2f 100644 --- a/metadata-ingestion/src/datahub/emitter/mce_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mce_builder.py @@ -125,9 +125,7 @@ def parse_ts_millis(ts: Optional[float]) -> Optional[datetime]: def make_data_platform_urn(platform: str) -> str: - if platform.startswith("urn:li:dataPlatform:"): - return platform - return DataPlatformUrn.create_from_id(platform).urn() + return DataPlatformUrn(platform).urn() def make_dataset_urn(platform: str, name: str, env: str = DEFAULT_ENV) -> str: diff --git a/metadata-ingestion/src/datahub/emitter/response_helper.py b/metadata-ingestion/src/datahub/emitter/response_helper.py index e1c78361c8..ae311860e2 100644 --- a/metadata-ingestion/src/datahub/emitter/response_helper.py +++ b/metadata-ingestion/src/datahub/emitter/response_helper.py @@ -1,17 +1,21 @@ import json import logging +import warnings from dataclasses import dataclass from typing import Dict, List, Optional, Sequence, Union from requests import Response from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.errors import APITracingWarning from datahub.metadata.com.linkedin.pegasus2avro.mxe import ( MetadataChangeProposal, ) logger = logging.getLogger(__name__) +_TRACE_HEADER_NAME = "traceparent" + @dataclass class TraceData: @@ -25,14 +29,11 @@ class TraceData: raise TypeError("data must be a dictionary") -def _extract_trace_id( - response: Response, trace_header: str = "traceparent" -) -> Optional[str]: +def _extract_trace_id(response: Response) -> Optional[str]: """ Extract trace ID from response headers. Args: response: HTTP response object - trace_header: Name of the trace header to use Returns: Trace ID if found and response is valid, None otherwise """ @@ -40,9 +41,17 @@ def _extract_trace_id( logger.debug(f"Invalid status code: {response.status_code}") return None - trace_id = response.headers.get(trace_header) + trace_id = response.headers.get(_TRACE_HEADER_NAME) if not trace_id: - logger.debug(f"Missing trace header: {trace_header}") + # This will only be printed if + # 1. we're in async mode (checked by the caller) + # 2. the server did not return a trace ID + logger.debug(f"Missing trace header: {_TRACE_HEADER_NAME}") + warnings.warn( + "No trace ID found in response headers. API tracing is not active - likely due to an outdated server version.", + APITracingWarning, + stacklevel=3, + ) return None return trace_id @@ -51,20 +60,19 @@ def _extract_trace_id( def extract_trace_data( response: Response, aspects_to_trace: Optional[List[str]] = None, - trace_header: str = "traceparent", ) -> Optional[TraceData]: - """ - Extract trace data from a response object. + """Extract trace data from a response object. + + If we run into a JSONDecodeError, we'll log an error and return None. + Args: response: HTTP response object aspects_to_trace: Optional list of aspect names to extract. If None, extracts all aspects. - trace_header: Name of the trace header to use (default: "traceparent") + Returns: TraceData object if successful, None otherwise - Raises: - JSONDecodeError: If response body cannot be decoded as JSON """ - trace_id = _extract_trace_id(response, trace_header) + trace_id = _extract_trace_id(response) if not trace_id: return None @@ -104,19 +112,18 @@ def extract_trace_data_from_mcps( response: Response, mcps: Sequence[Union[MetadataChangeProposal, MetadataChangeProposalWrapper]], aspects_to_trace: Optional[List[str]] = None, - trace_header: str = "traceparent", ) -> Optional[TraceData]: - """ - Extract trace data from a response object and populate data from provided MCPs. + """Extract trace data from a response object and populate data from provided MCPs. + Args: response: HTTP response object used only for trace_id extraction mcps: List of MCP URN and aspect data aspects_to_trace: Optional list of aspect names to extract. If None, extracts all aspects. - trace_header: Name of the trace header to use (default: "traceparent") + Returns: TraceData object if successful, None otherwise """ - trace_id = _extract_trace_id(response, trace_header) + trace_id = _extract_trace_id(response) if not trace_id: return None diff --git a/metadata-ingestion/src/datahub/emitter/rest_emitter.py b/metadata-ingestion/src/datahub/emitter/rest_emitter.py index 298329e620..baf9aebaf5 100644 --- a/metadata-ingestion/src/datahub/emitter/rest_emitter.py +++ b/metadata-ingestion/src/datahub/emitter/rest_emitter.py @@ -5,6 +5,7 @@ import json import logging import os import time +import warnings from collections import defaultdict from dataclasses import dataclass from datetime import datetime, timedelta @@ -50,6 +51,7 @@ from datahub.emitter.response_helper import ( extract_trace_data_from_mcps, ) from datahub.emitter.serialization_helper import pre_json_transform +from datahub.errors import APITracingWarning from datahub.ingestion.api.closeable import Closeable from datahub.metadata.com.linkedin.pegasus2avro.mxe import ( MetadataChangeEvent, @@ -749,6 +751,12 @@ class DataHubRestEmitter(Closeable, Emitter): trace_flag if trace_flag is not None else self._default_trace_mode ) resolved_async_flag = async_flag if async_flag is not None else async_default + if resolved_trace_flag and not resolved_async_flag: + warnings.warn( + "API tracing is only available with async ingestion. For sync mode, API errors will be surfaced as exceptions.", + APITracingWarning, + stacklevel=3, + ) return resolved_trace_flag and resolved_async_flag def __repr__(self) -> str: diff --git a/metadata-ingestion/src/datahub/errors.py b/metadata-ingestion/src/datahub/errors.py index cb57251fcb..9c148c6c3e 100644 --- a/metadata-ingestion/src/datahub/errors.py +++ b/metadata-ingestion/src/datahub/errors.py @@ -33,3 +33,7 @@ class MultipleSubtypesWarning(Warning): class ExperimentalWarning(Warning): pass + + +class APITracingWarning(Warning): + pass diff --git a/metadata-ingestion/tests/unit/datahub/emitter/test_response_helper.py b/metadata-ingestion/tests/unit/datahub/emitter/test_response_helper.py index 0385bac47d..cb03037b62 100644 --- a/metadata-ingestion/tests/unit/datahub/emitter/test_response_helper.py +++ b/metadata-ingestion/tests/unit/datahub/emitter/test_response_helper.py @@ -2,6 +2,7 @@ import json from typing import Any from unittest.mock import Mock +import pytest from requests import Response from datahub.emitter.mcp import MetadataChangeProposalWrapper @@ -9,6 +10,7 @@ from datahub.emitter.response_helper import ( extract_trace_data, extract_trace_data_from_mcps, ) +from datahub.errors import APITracingWarning from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeProposal @@ -41,7 +43,8 @@ def test_invalid_status_code(): def test_missing_trace_header(): """Test that missing trace header returns None""" response = create_response(status_code=200, headers={}, json_data=[]) - result = extract_trace_data(response) + with pytest.warns(APITracingWarning): + result = extract_trace_data(response) assert result is None @@ -138,22 +141,6 @@ def test_missing_urn(): assert "test:2" in result.data -def test_custom_trace_header(): - """Test using a custom trace header""" - response = create_response( - status_code=200, - headers={"custom-trace": "test-trace-id"}, - json_data=[{"urn": "test:1", "status": {"removed": False}}], - ) - - result = extract_trace_data(response, trace_header="custom-trace") - - assert result is not None - assert result.trace_id == "test-trace-id" - assert "test:1" in result.data - assert len(result.data["test:1"]) == 1 - - def test_mcps_invalid_status_code(): """Test that non-200 status codes return None for MCPs""" response = create_response( @@ -168,7 +155,8 @@ def test_mcps_missing_trace_header(): """Test that missing trace header returns None for MCPs""" response = create_response(status_code=200, headers={}, json_data=[]) mcps = [create_mcp("urn:test:1", "testAspect")] - result = extract_trace_data_from_mcps(response, mcps) + with pytest.warns(APITracingWarning): + result = extract_trace_data_from_mcps(response, mcps) assert result is None diff --git a/metadata-ingestion/tests/unit/sdk/test_rest_emitter.py b/metadata-ingestion/tests/unit/sdk/test_rest_emitter.py index a0581573ae..7eed3b085e 100644 --- a/metadata-ingestion/tests/unit/sdk/test_rest_emitter.py +++ b/metadata-ingestion/tests/unit/sdk/test_rest_emitter.py @@ -16,6 +16,7 @@ from datahub.emitter.rest_emitter import ( DatahubRestEmitter, logger, ) +from datahub.errors import APITracingWarning from datahub.metadata.com.linkedin.pegasus2avro.common import ( Status, ) @@ -514,10 +515,14 @@ def test_openapi_emitter_missing_trace_header(openapi_emitter): aspect=Status(removed=False), ) - # Should not raise exception but log warning - openapi_emitter.emit_mcp( - item, async_flag=True, trace_flag=True, trace_timeout=timedelta(seconds=10) - ) + # Should not raise exception but log a warning. + with pytest.warns(APITracingWarning): + openapi_emitter.emit_mcp( + item, + async_flag=True, + trace_flag=True, + trace_timeout=timedelta(seconds=10), + ) def test_openapi_emitter_invalid_status_code(openapi_emitter):