mirror of
https://github.com/deepset-ai/haystack.git
synced 2025-08-31 20:03:38 +00:00
feat: opentelemetry tracer (#7052)
* feat: add opentelemetry tracer * feat: implement auto enable for `OpenTelemetry` * docs: add release notes * style: fix linting * tests: more test coverage * refactor: fix old method call * docs: fix duplicate `the` Co-authored-by: Massimiliano Pippi <mpippi@gmail.com> --------- Co-authored-by: Massimiliano Pippi <mpippi@gmail.com>
This commit is contained in:
parent
cb6389d7a2
commit
6d0d373def
@ -1 +1,2 @@
|
|||||||
from .tracer import Tracer, Span, enable_tracing, disable_tracing, tracer, is_tracing_enabled
|
from .tracer import Tracer, Span, enable_tracing, disable_tracing, is_tracing_enabled, auto_enable_tracing, tracer
|
||||||
|
from .opentelemetry import OpenTelemetryTracer
|
||||||
|
45
haystack/tracing/opentelemetry.py
Normal file
45
haystack/tracing/opentelemetry.py
Normal file
@ -0,0 +1,45 @@
|
|||||||
|
import contextlib
|
||||||
|
from typing import Optional, Dict, Any, Iterator
|
||||||
|
|
||||||
|
from haystack.lazy_imports import LazyImport
|
||||||
|
from haystack.tracing import Tracer, Span
|
||||||
|
from haystack.tracing import utils as tracing_utils
|
||||||
|
|
||||||
|
|
||||||
|
with LazyImport("Run 'pip install opentelemetry-sdk'") as opentelemetry_import:
|
||||||
|
import opentelemetry
|
||||||
|
import opentelemetry.trace
|
||||||
|
|
||||||
|
|
||||||
|
class OpenTelemetrySpan(Span):
|
||||||
|
def __init__(self, span: opentelemetry.trace.Span) -> None:
|
||||||
|
self._span = span
|
||||||
|
|
||||||
|
def set_tag(self, key: str, value: Any) -> None:
|
||||||
|
coerced_value = tracing_utils.coerce_tag_value(value)
|
||||||
|
self._span.set_attribute(key, coerced_value)
|
||||||
|
|
||||||
|
def raw_span(self) -> Any:
|
||||||
|
return self._span
|
||||||
|
|
||||||
|
|
||||||
|
class OpenTelemetryTracer(Tracer):
|
||||||
|
def __init__(self, tracer: opentelemetry.trace.Tracer) -> None:
|
||||||
|
opentelemetry_import.check()
|
||||||
|
self._tracer = tracer
|
||||||
|
|
||||||
|
@contextlib.contextmanager
|
||||||
|
def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> Iterator[Span]:
|
||||||
|
with self._tracer.start_as_current_span(operation_name) as raw_span:
|
||||||
|
span = OpenTelemetrySpan(raw_span)
|
||||||
|
if tags:
|
||||||
|
span.set_tags(tags)
|
||||||
|
|
||||||
|
yield span
|
||||||
|
|
||||||
|
def current_span(self) -> Optional[Span]:
|
||||||
|
current_span = opentelemetry.trace.get_current_span()
|
||||||
|
if isinstance(current_span, opentelemetry.trace.NonRecordingSpan):
|
||||||
|
return None
|
||||||
|
|
||||||
|
return OpenTelemetrySpan(current_span)
|
@ -1,7 +1,13 @@
|
|||||||
import abc
|
import abc
|
||||||
import contextlib
|
import contextlib
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
from typing import Dict, Any, Optional, Iterator
|
from typing import Dict, Any, Optional, Iterator
|
||||||
|
|
||||||
|
HAYSTACK_AUTO_TRACE_ENABLED_ENV_VAR = "HAYSTACK_AUTO_TRACE_ENABLED"
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class Span(abc.ABC):
|
class Span(abc.ABC):
|
||||||
"""Interface for an instrumented operation."""
|
"""Interface for an instrumented operation."""
|
||||||
@ -116,3 +122,47 @@ def disable_tracing() -> None:
|
|||||||
def is_tracing_enabled() -> bool:
|
def is_tracing_enabled() -> bool:
|
||||||
"""Return whether tracing is enabled."""
|
"""Return whether tracing is enabled."""
|
||||||
return not isinstance(tracer.actual_tracer, NullTracer)
|
return not isinstance(tracer.actual_tracer, NullTracer)
|
||||||
|
|
||||||
|
|
||||||
|
def auto_enable_tracing() -> None:
|
||||||
|
"""Auto-enable the right tracing backend.
|
||||||
|
|
||||||
|
This behavior can be disabled by setting the environment variable `HAYSTACK_AUTO_TRACE_ENABLED` to `false`.
|
||||||
|
Note that it will only work correctly if tracing was configured _before_ Haystack is imported.
|
||||||
|
"""
|
||||||
|
if os.getenv(HAYSTACK_AUTO_TRACE_ENABLED_ENV_VAR, "true").lower() == "false":
|
||||||
|
logger.info("Tracing disabled via '%s'", HAYSTACK_AUTO_TRACE_ENABLED_ENV_VAR)
|
||||||
|
return
|
||||||
|
|
||||||
|
if is_tracing_enabled():
|
||||||
|
return # tracing already enabled
|
||||||
|
|
||||||
|
tracer = _auto_configured_opentelemetry_tracer()
|
||||||
|
if tracer:
|
||||||
|
enable_tracing(tracer)
|
||||||
|
logger.info("Tracing enabled via '%s'", tracer.__class__.__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def _auto_configured_opentelemetry_tracer() -> Optional[Tracer]:
|
||||||
|
# we implement this here and not in the `opentelemetry` module to avoid import warnings when OpenTelemetry is not
|
||||||
|
# installed
|
||||||
|
try:
|
||||||
|
import opentelemetry.trace
|
||||||
|
|
||||||
|
# the safest way to check if tracing is enabled is to try to start a span and see if it's a no-op span
|
||||||
|
# alternatively we could of course check `opentelemetry.trace._TRACER_PROVIDER`
|
||||||
|
# but that's not part of the public API and could change in the future
|
||||||
|
with opentelemetry.trace.get_tracer("haystack").start_as_current_span("haystack.tracing.auto_enable") as span:
|
||||||
|
if isinstance(span, opentelemetry.trace.NonRecordingSpan):
|
||||||
|
return None
|
||||||
|
|
||||||
|
from haystack.tracing.opentelemetry import OpenTelemetryTracer
|
||||||
|
|
||||||
|
return OpenTelemetryTracer(opentelemetry.trace.get_tracer("haystack"))
|
||||||
|
except ImportError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
auto_enable_tracing()
|
||||||
|
@ -0,0 +1,39 @@
|
|||||||
|
---
|
||||||
|
features:
|
||||||
|
- |
|
||||||
|
Added out-of-the-box support for the OpenTelemetry Tracer. This allows you to instrument pipeline and component
|
||||||
|
runs using OpenTelemetry and send traces to your preferred backend.
|
||||||
|
|
||||||
|
To use the OpenTelemetry Tracer you need to have the `opentelemetry-sdk` package installed in your environment.
|
||||||
|
To instruct Haystack to use the OpenTelemetry Tracer, you have multiple options:
|
||||||
|
|
||||||
|
* Run your Haystack application using the `opentelemetry-instrument` command line tool as described in the
|
||||||
|
[OpenTelemetry documentation](https://opentelemetry.io/docs/languages/python/automatic/#configuring-the-agent).
|
||||||
|
This behavior can be disabled by setting the `HAYSTACK_AUTO_TRACE_ENABLED_ENV_VAR` environment variable to `false`.
|
||||||
|
* Configure the tracer manually in your code using the `opentelemetry` package:
|
||||||
|
```python
|
||||||
|
from opentelemetry import trace
|
||||||
|
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
|
||||||
|
from opentelemetry.sdk.trace import TracerProvider
|
||||||
|
from opentelemetry.sdk.trace.export import BatchSpanProcessor
|
||||||
|
|
||||||
|
# Service name is required for most backends
|
||||||
|
resource = Resource(attributes={
|
||||||
|
SERVICE_NAME: "haystack"
|
||||||
|
})
|
||||||
|
|
||||||
|
traceProvider = TracerProvider(resource=resource)
|
||||||
|
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4318/v1/traces"))
|
||||||
|
traceProvider.add_span_processor(processor)
|
||||||
|
trace.set_tracer_provider(traceProvider)
|
||||||
|
|
||||||
|
# Auto-configuration
|
||||||
|
import haystack.tracing
|
||||||
|
haystack.tracing.auto_enable_tracing()
|
||||||
|
|
||||||
|
# Or explicitly
|
||||||
|
from haystack.tracing import OpenTelemetryTracer
|
||||||
|
|
||||||
|
tracer = traceProvider.get_tracer("my_application")
|
||||||
|
tracing.enable_tracing(OpenTelemetryTracer(tracer))
|
||||||
|
```
|
@ -24,3 +24,6 @@ openapi3
|
|||||||
|
|
||||||
# Validation
|
# Validation
|
||||||
jsonschema
|
jsonschema
|
||||||
|
|
||||||
|
# Tracing
|
||||||
|
opentelemetry-sdk
|
||||||
|
79
test/tracing/test_opentelemetry.py
Normal file
79
test/tracing/test_opentelemetry.py
Normal file
@ -0,0 +1,79 @@
|
|||||||
|
import opentelemetry.trace
|
||||||
|
import pytest
|
||||||
|
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
|
||||||
|
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
|
||||||
|
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
|
||||||
|
|
||||||
|
from opentelemetry.sdk.trace import TracerProvider
|
||||||
|
|
||||||
|
from haystack.tracing.opentelemetry import OpenTelemetryTracer
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture()
|
||||||
|
def span_exporter() -> InMemorySpanExporter:
|
||||||
|
return InMemorySpanExporter()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def opentelemetry_tracer(span_exporter: InMemorySpanExporter) -> opentelemetry.trace.Tracer:
|
||||||
|
# Service name is required for most backends
|
||||||
|
resource = Resource(attributes={SERVICE_NAME: "haystack-testing"})
|
||||||
|
|
||||||
|
traceProvider = TracerProvider(resource=resource)
|
||||||
|
processor = SimpleSpanProcessor(span_exporter)
|
||||||
|
traceProvider.add_span_processor(processor)
|
||||||
|
|
||||||
|
return traceProvider.get_tracer("my_test")
|
||||||
|
|
||||||
|
|
||||||
|
class TestOpenTelemetryTracer:
|
||||||
|
def test_opentelemetry_tracer(
|
||||||
|
self, opentelemetry_tracer: opentelemetry.trace.Tracer, span_exporter: InMemorySpanExporter
|
||||||
|
) -> None:
|
||||||
|
tracer = OpenTelemetryTracer(opentelemetry_tracer)
|
||||||
|
|
||||||
|
with tracer.trace("test") as span:
|
||||||
|
span.set_tag("key", "value")
|
||||||
|
|
||||||
|
def test_tagging(
|
||||||
|
self, opentelemetry_tracer: opentelemetry.trace.Tracer, span_exporter: InMemorySpanExporter
|
||||||
|
) -> None:
|
||||||
|
tracer = OpenTelemetryTracer(opentelemetry_tracer)
|
||||||
|
|
||||||
|
with tracer.trace("test", tags={"key1": "value1"}) as span:
|
||||||
|
span.set_tag("key2", "value2")
|
||||||
|
|
||||||
|
spans = list(span_exporter.get_finished_spans())
|
||||||
|
assert len(spans) == 1
|
||||||
|
assert spans[0].attributes == {"key1": "value1", "key2": "value2"}
|
||||||
|
|
||||||
|
def test_current_span(
|
||||||
|
self, opentelemetry_tracer: opentelemetry.trace.Tracer, span_exporter: InMemorySpanExporter
|
||||||
|
) -> None:
|
||||||
|
tracer = OpenTelemetryTracer(opentelemetry_tracer)
|
||||||
|
with tracer.trace("test"):
|
||||||
|
current_span = tracer.current_span()
|
||||||
|
assert tracer.current_span() is not None
|
||||||
|
|
||||||
|
current_span.set_tag("key1", "value1")
|
||||||
|
|
||||||
|
raw_span = current_span.raw_span()
|
||||||
|
assert raw_span is not None
|
||||||
|
assert isinstance(raw_span, opentelemetry.trace.Span)
|
||||||
|
|
||||||
|
raw_span.set_attribute("key2", "value2")
|
||||||
|
|
||||||
|
spans = list(span_exporter.get_finished_spans())
|
||||||
|
assert len(spans) == 1
|
||||||
|
assert spans[0].attributes == {"key1": "value1", "key2": "value2"}
|
||||||
|
|
||||||
|
def test_tracing_complex_values(
|
||||||
|
self, opentelemetry_tracer: opentelemetry.trace.Tracer, span_exporter: InMemorySpanExporter
|
||||||
|
) -> None:
|
||||||
|
tracer = OpenTelemetryTracer(opentelemetry_tracer)
|
||||||
|
with tracer.trace("test") as span:
|
||||||
|
span.set_tag("key", {"a": 1, "b": [2, 3, 4]})
|
||||||
|
|
||||||
|
spans = list(span_exporter.get_finished_spans())
|
||||||
|
assert len(spans) == 1
|
||||||
|
assert spans[0].attributes == {"key": '{"a": 1, "b": [2, 3, 4]}'}
|
@ -1,5 +1,16 @@
|
|||||||
|
import builtins
|
||||||
|
import sys
|
||||||
from unittest.mock import Mock
|
from unittest.mock import Mock
|
||||||
|
|
||||||
|
import opentelemetry.trace
|
||||||
|
import pytest
|
||||||
|
from _pytest.monkeypatch import MonkeyPatch
|
||||||
|
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
|
||||||
|
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
|
||||||
|
from opentelemetry.sdk.trace import TracerProvider
|
||||||
|
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
|
||||||
|
|
||||||
|
from haystack.tracing.opentelemetry import OpenTelemetryTracer
|
||||||
from haystack.tracing.tracer import (
|
from haystack.tracing.tracer import (
|
||||||
NullTracer,
|
NullTracer,
|
||||||
NullSpan,
|
NullSpan,
|
||||||
@ -8,6 +19,7 @@ from haystack.tracing.tracer import (
|
|||||||
disable_tracing,
|
disable_tracing,
|
||||||
is_tracing_enabled,
|
is_tracing_enabled,
|
||||||
ProxyTracer,
|
ProxyTracer,
|
||||||
|
auto_enable_tracing,
|
||||||
tracer,
|
tracer,
|
||||||
)
|
)
|
||||||
from test.tracing.utils import SpyingTracer
|
from test.tracing.utils import SpyingTracer
|
||||||
@ -61,3 +73,57 @@ class TestConfigureTracer:
|
|||||||
disable_tracing()
|
disable_tracing()
|
||||||
assert isinstance(tracer.actual_tracer, NullTracer)
|
assert isinstance(tracer.actual_tracer, NullTracer)
|
||||||
assert is_tracing_enabled() is False
|
assert is_tracing_enabled() is False
|
||||||
|
|
||||||
|
|
||||||
|
class TestAutoEnableTracer:
|
||||||
|
@pytest.fixture()
|
||||||
|
def configured_opentelemetry_tracing(self) -> None:
|
||||||
|
resource = Resource(attributes={SERVICE_NAME: "haystack-testing"})
|
||||||
|
|
||||||
|
traceProvider = TracerProvider(resource=resource)
|
||||||
|
processor = SimpleSpanProcessor(InMemorySpanExporter())
|
||||||
|
traceProvider.add_span_processor(processor)
|
||||||
|
|
||||||
|
# We can't uset `set_tracer_provider` here, because opentelemetry has a lock to only set it once
|
||||||
|
opentelemetry.trace._TRACER_PROVIDER = traceProvider
|
||||||
|
|
||||||
|
yield
|
||||||
|
|
||||||
|
# unfortunately, there's no cleaner way to reset the global tracer provider
|
||||||
|
opentelemetry.trace._TRACER_PROVIDER = None
|
||||||
|
disable_tracing()
|
||||||
|
|
||||||
|
def test_skip_auto_enable_tracer_if_already_configured(self) -> None:
|
||||||
|
my_tracker = Mock(spec=Tracer) # anything else than `NullTracer` works for this test
|
||||||
|
enable_tracing(my_tracker)
|
||||||
|
|
||||||
|
auto_enable_tracing()
|
||||||
|
|
||||||
|
assert tracer.actual_tracer is my_tracker
|
||||||
|
|
||||||
|
def test_skip_auto_enable_if_tracing_disabled_via_env(
|
||||||
|
self, monkeypatch: MonkeyPatch, configured_opentelemetry_tracing: None
|
||||||
|
) -> None:
|
||||||
|
monkeypatch.setenv("HAYSTACK_AUTO_TRACE_ENABLED", "false")
|
||||||
|
|
||||||
|
old_tracer = tracer.actual_tracer
|
||||||
|
|
||||||
|
auto_enable_tracing()
|
||||||
|
|
||||||
|
assert tracer.actual_tracer is old_tracer
|
||||||
|
|
||||||
|
def test_enable_opentelemetry_tracer(self, configured_opentelemetry_tracing: None) -> None:
|
||||||
|
auto_enable_tracing()
|
||||||
|
|
||||||
|
activated_tracer = tracer.actual_tracer
|
||||||
|
assert isinstance(activated_tracer, OpenTelemetryTracer)
|
||||||
|
assert is_tracing_enabled()
|
||||||
|
|
||||||
|
def test_skip_enable_opentelemetry_tracer_if_import_error(self, monkeypatch: MonkeyPatch) -> None:
|
||||||
|
monkeypatch.delitem(sys.modules, "opentelemetry", raising=False)
|
||||||
|
monkeypatch.setattr(builtins, "__import__", Mock(side_effect=ImportError))
|
||||||
|
auto_enable_tracing()
|
||||||
|
|
||||||
|
activated_tracer = tracer.actual_tracer
|
||||||
|
assert isinstance(activated_tracer, NullTracer)
|
||||||
|
assert not is_tracing_enabled()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user