mirror of
https://github.com/deepset-ai/haystack.git
synced 2025-10-14 17:38:24 +00:00
feat: log trace correlation (#7206)
* feat: implement support for trace-log correlation * docs: add release notes
This commit is contained in:
parent
2a591280ab
commit
6fce2dbb11
@ -3,13 +3,30 @@ import os
|
||||
import typing
|
||||
from typing import List
|
||||
|
||||
import haystack.tracing.tracer
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from structlog.typing import Processor
|
||||
from structlog.typing import Processor, WrappedLogger, EventDict
|
||||
|
||||
HAYSTACK_LOGGING_USE_JSON_ENV_VAR = "HAYSTACK_LOGGING_USE_JSON"
|
||||
HAYSTACK_LOGGING_IGNORE_STRUCTLOG_ENV_VAR = "HAYSTACK_LOGGING_IGNORE_STRUCTLOG"
|
||||
|
||||
|
||||
def correlate_logs_with_traces(_: "WrappedLogger", __: str, event_dict: "EventDict") -> "EventDict":
|
||||
"""Add correlation data for logs.
|
||||
|
||||
This is useful if you want to correlate logs with traces.
|
||||
"""
|
||||
if not haystack.tracing.is_tracing_enabled():
|
||||
return event_dict
|
||||
|
||||
current_span = haystack.tracing.tracer.current_span()
|
||||
if current_span:
|
||||
event_dict.update(current_span.get_correlation_data_for_logs())
|
||||
|
||||
return event_dict
|
||||
|
||||
|
||||
def configure_logging(use_json: bool = False) -> None:
|
||||
"""Configure logging for Haystack.
|
||||
|
||||
@ -38,6 +55,8 @@ def configure_logging(use_json: bool = False) -> None:
|
||||
# https://www.structlog.org/en/stable/standard-library.html#rendering-using-structlog-based-formatters-within-logging
|
||||
# This means that we use structlog to format the log entries for entries emitted via `logging` and `structlog`.
|
||||
|
||||
use_json = os.getenv(HAYSTACK_LOGGING_USE_JSON_ENV_VAR, "false").lower() == "true" or use_json
|
||||
|
||||
shared_processors: List[Processor] = [
|
||||
# Add the log level to the event_dict for structlog to use
|
||||
structlog.stdlib.add_log_level,
|
||||
@ -45,6 +64,10 @@ def configure_logging(use_json: bool = False) -> None:
|
||||
structlog.processors.TimeStamper(fmt="iso"),
|
||||
]
|
||||
|
||||
if use_json:
|
||||
# We only need that in sophisticated production setups where we want to correlate logs with traces
|
||||
shared_processors.append(correlate_logs_with_traces)
|
||||
|
||||
structlog.configure(
|
||||
processors=shared_processors + [structlog.stdlib.ProcessorFormatter.wrap_for_formatter],
|
||||
logger_factory=structlog.stdlib.LoggerFactory(),
|
||||
@ -54,7 +77,7 @@ def configure_logging(use_json: bool = False) -> None:
|
||||
)
|
||||
|
||||
renderers: List[Processor]
|
||||
if os.getenv(HAYSTACK_LOGGING_USE_JSON_ENV_VAR, "false").lower() == "true" or use_json:
|
||||
if use_json:
|
||||
renderers = [
|
||||
ExceptionRenderer(
|
||||
# don't show locals in production logs - this can be quite sensitive information
|
||||
|
@ -20,6 +20,22 @@ class DatadogSpan(Span):
|
||||
def raw_span(self) -> Any:
|
||||
return self._span
|
||||
|
||||
def get_correlation_data_for_logs(self) -> Dict[str, Any]:
|
||||
raw_span = self.raw_span()
|
||||
if not raw_span:
|
||||
return {}
|
||||
|
||||
# https://docs.datadoghq.com/tracing/other_telemetry/connect_logs_and_traces/python/#no-standard-library-logging
|
||||
trace_id, span_id = (str((1 << 64) - 1 & raw_span.trace_id), raw_span.span_id)
|
||||
|
||||
return {
|
||||
"dd.trace_id": trace_id,
|
||||
"dd.span_id": span_id,
|
||||
"dd.service": ddtrace.config.service or "",
|
||||
"dd.env": ddtrace.config.env or "",
|
||||
"dd.version": ddtrace.config.version or "",
|
||||
}
|
||||
|
||||
|
||||
class DatadogTracer(Tracer):
|
||||
def __init__(self, tracer: "ddtrace.Tracer") -> None:
|
||||
|
@ -22,6 +22,10 @@ class OpenTelemetrySpan(Span):
|
||||
def raw_span(self) -> Any:
|
||||
return self._span
|
||||
|
||||
def get_correlation_data_for_logs(self) -> Dict[str, Any]:
|
||||
span_context = self._span.get_span_context()
|
||||
return {"trace_id": span_context.trace_id, "span_id": span_context.span_id}
|
||||
|
||||
|
||||
class OpenTelemetryTracer(Tracer):
|
||||
def __init__(self, tracer: "opentelemetry.trace.Tracer") -> None:
|
||||
|
@ -60,6 +60,12 @@ class Span(abc.ABC):
|
||||
if tracer.is_content_tracing_enabled:
|
||||
self.set_tag(key, value)
|
||||
|
||||
def get_correlation_data_for_logs(self) -> Dict[str, Any]:
|
||||
"""Return a dictionary with correlation data for logs.
|
||||
|
||||
This is useful if you want to correlate logs with traces."""
|
||||
return {}
|
||||
|
||||
|
||||
class Tracer(abc.ABC):
|
||||
"""Interface for instrumenting code by creating and submitting spans."""
|
||||
|
@ -0,0 +1,17 @@
|
||||
---
|
||||
enhancements:
|
||||
- |
|
||||
If using JSON logging in conjunction with tracing, Haystack will automatically add correlation IDs to the logs.
|
||||
This is done by getting the necessary information from the current span and adding it to the log record.
|
||||
You can customize this by overriding the `get_correlation_data_for_logs` of your tracer's span:
|
||||
|
||||
```python
|
||||
from haystack.tracing import Span
|
||||
|
||||
class OpenTelemetrySpan(Span):
|
||||
...
|
||||
|
||||
def get_correlation_data_for_logs(self) -> Dict[str, Any]:
|
||||
span_context = ...
|
||||
return {"trace_id": span_context.trace_id, "span_id": span_context.span_id}
|
||||
```
|
@ -1,4 +1,5 @@
|
||||
from typing import Optional
|
||||
from unittest.mock import ANY
|
||||
|
||||
import pytest
|
||||
from _pytest.monkeypatch import MonkeyPatch
|
||||
@ -42,6 +43,8 @@ class TestTracing:
|
||||
"haystack.pipeline.metadata": {},
|
||||
"haystack.pipeline.max_loops_allowed": 100,
|
||||
},
|
||||
trace_id=ANY,
|
||||
span_id=ANY,
|
||||
),
|
||||
SpyingSpan(
|
||||
operation_name="haystack.component.run",
|
||||
@ -53,6 +56,8 @@ class TestTracing:
|
||||
"haystack.component.output_spec": {"output": {"type": "str", "senders": ["hello2"]}},
|
||||
"haystack.component.visits": 1,
|
||||
},
|
||||
trace_id=ANY,
|
||||
span_id=ANY,
|
||||
),
|
||||
SpyingSpan(
|
||||
operation_name="haystack.component.run",
|
||||
@ -66,6 +71,8 @@ class TestTracing:
|
||||
"haystack.component.output_spec": {"output": {"type": "str", "senders": []}},
|
||||
"haystack.component.visits": 1,
|
||||
},
|
||||
trace_id=ANY,
|
||||
span_id=ANY,
|
||||
),
|
||||
]
|
||||
|
||||
@ -86,6 +93,8 @@ class TestTracing:
|
||||
"haystack.pipeline.metadata": {},
|
||||
"haystack.pipeline.max_loops_allowed": 100,
|
||||
},
|
||||
trace_id=ANY,
|
||||
span_id=ANY,
|
||||
),
|
||||
SpyingSpan(
|
||||
operation_name="haystack.component.run",
|
||||
@ -99,6 +108,8 @@ class TestTracing:
|
||||
"haystack.component.visits": 1,
|
||||
"haystack.component.output": {"output": "Hello, world!"},
|
||||
},
|
||||
trace_id=ANY,
|
||||
span_id=ANY,
|
||||
),
|
||||
SpyingSpan(
|
||||
operation_name="haystack.component.run",
|
||||
@ -114,5 +125,7 @@ class TestTracing:
|
||||
"haystack.component.visits": 1,
|
||||
"haystack.component.output": {"output": "Hello, Hello, world!!"},
|
||||
},
|
||||
trace_id=ANY,
|
||||
span_id=ANY,
|
||||
),
|
||||
]
|
||||
|
@ -13,6 +13,7 @@ from _pytest.logging import LogCaptureFixture
|
||||
from _pytest.monkeypatch import MonkeyPatch
|
||||
|
||||
from haystack import logging as haystack_logging
|
||||
from test.tracing.utils import SpyingTracer
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
@ -204,3 +205,73 @@ class TestStructuredLoggingJSONRendering:
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
class TestLogTraceCorrelation:
|
||||
def test_trace_log_correlation_python_logs_with_console_rendering(
|
||||
self, spying_tracer: SpyingTracer, capfd: CaptureFixture
|
||||
) -> None:
|
||||
haystack_logging.configure_logging(use_json=False)
|
||||
|
||||
with spying_tracer.trace("test-operation"):
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.warning("Hello, structured logging!", extra={"key1": "value1", "key2": "value2"})
|
||||
|
||||
output = capfd.readouterr().err
|
||||
assert "trace_id" not in output
|
||||
|
||||
def test_trace_log_correlation_python_logs(self, spying_tracer: SpyingTracer, capfd: CaptureFixture) -> None:
|
||||
haystack_logging.configure_logging(use_json=True)
|
||||
|
||||
with spying_tracer.trace("test-operation") as span:
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.warning("Hello, structured logging!", extra={"key1": "value1", "key2": "value2"})
|
||||
|
||||
output = capfd.readouterr().err
|
||||
parsed_output = json.loads(output)
|
||||
|
||||
assert parsed_output == {
|
||||
"event": "Hello, structured logging!",
|
||||
"key1": "value1",
|
||||
"key2": "value2",
|
||||
"level": "warning",
|
||||
"timestamp": ANY,
|
||||
"trace_id": span.trace_id,
|
||||
"span_id": span.span_id,
|
||||
}
|
||||
|
||||
def test_trace_log_correlation_no_span(self, spying_tracer: SpyingTracer, capfd: CaptureFixture) -> None:
|
||||
haystack_logging.configure_logging(use_json=True)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
logger.warning("Hello, structured logging!", extra={"key1": "value1", "key2": "value2"})
|
||||
|
||||
output = capfd.readouterr().err
|
||||
parsed_output = json.loads(output)
|
||||
|
||||
assert parsed_output == {
|
||||
"event": "Hello, structured logging!",
|
||||
"key1": "value1",
|
||||
"key2": "value2",
|
||||
"level": "warning",
|
||||
"timestamp": ANY,
|
||||
}
|
||||
|
||||
def test_trace_log_correlation_no_tracer(self, capfd: CaptureFixture) -> None:
|
||||
haystack_logging.configure_logging(use_json=True)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
logger.warning("Hello, structured logging!", extra={"key1": "value1", "key2": "value2"})
|
||||
|
||||
output = capfd.readouterr().err
|
||||
parsed_output = json.loads(output)
|
||||
|
||||
assert parsed_output == {
|
||||
"event": "Hello, structured logging!",
|
||||
"key1": "value1",
|
||||
"key2": "value2",
|
||||
"level": "warning",
|
||||
"timestamp": ANY,
|
||||
}
|
||||
|
@ -83,3 +83,15 @@ class TestDatadogTracer:
|
||||
spans = get_traces_from_console(capfd)
|
||||
assert len(spans) == 1
|
||||
assert spans[0]["meta"]["key"] == '{"a": 1, "b": [2, 3, 4]}'
|
||||
|
||||
def test_get_log_correlation_info(self, datadog_tracer: ddtrace.Tracer) -> None:
|
||||
tracer = DatadogTracer(datadog_tracer)
|
||||
with tracer.trace("test") as span:
|
||||
span.set_tag("key", "value")
|
||||
assert span.get_correlation_data_for_logs() == {
|
||||
"dd.trace_id": str((1 << 64) - 1 & span.raw_span().trace_id),
|
||||
"dd.span_id": span.raw_span().span_id,
|
||||
"dd.service": "",
|
||||
"dd.env": "",
|
||||
"dd.version": "",
|
||||
}
|
||||
|
@ -77,3 +77,14 @@ class TestOpenTelemetryTracer:
|
||||
spans = list(span_exporter.get_finished_spans())
|
||||
assert len(spans) == 1
|
||||
assert spans[0].attributes == {"key": '{"a": 1, "b": [2, 3, 4]}'}
|
||||
|
||||
def test_log_correlation_info(self, opentelemetry_tracer: opentelemetry.trace.Tracer) -> None:
|
||||
tracer = OpenTelemetryTracer(opentelemetry_tracer)
|
||||
with tracer.trace("test") as span:
|
||||
span.set_tag("key", "value")
|
||||
|
||||
correlation_data = span.get_correlation_data_for_logs()
|
||||
assert correlation_data == {
|
||||
"trace_id": span.raw_span().get_span_context().trace_id,
|
||||
"span_id": span.raw_span().get_span_context().span_id,
|
||||
}
|
||||
|
@ -1,5 +1,6 @@
|
||||
import contextlib
|
||||
import dataclasses
|
||||
import uuid
|
||||
from typing import Dict, Any, Optional, List, Iterator
|
||||
|
||||
from haystack.tracing import Span, Tracer
|
||||
@ -10,9 +11,15 @@ class SpyingSpan(Span):
|
||||
operation_name: str
|
||||
tags: Dict[str, Any] = dataclasses.field(default_factory=dict)
|
||||
|
||||
trace_id: Optional[str] = dataclasses.field(default_factory=lambda: str(uuid.uuid4()))
|
||||
span_id: Optional[str] = dataclasses.field(default_factory=lambda: str(uuid.uuid4()))
|
||||
|
||||
def set_tag(self, key: str, value: Any) -> None:
|
||||
self.tags[key] = value
|
||||
|
||||
def get_correlation_data_for_logs(self) -> Dict[str, Any]:
|
||||
return {"trace_id": self.trace_id, "span_id": self.span_id}
|
||||
|
||||
|
||||
class SpyingTracer(Tracer):
|
||||
def current_span(self) -> Optional[Span]:
|
||||
|
Loading…
x
Reference in New Issue
Block a user