mirror of
https://github.com/deepset-ai/haystack.git
synced 2026-01-06 03:57:19 +00:00
feat: allow content tracing (#7066)
* feat: implement content tracing capability * feat: trace component input/output * chore: extend type serialization to classes with `to_dict` * docs: add option for content tracing * style: remove unused import * chore: add more tags * style: ignore typing * fix: fix tagging of types
This commit is contained in:
parent
b4011af8e9
commit
41b93108c0
@ -782,9 +782,27 @@ class Pipeline:
|
||||
tags={
|
||||
"haystack.component.name": name,
|
||||
"haystack.component.type": comp.__class__.__name__,
|
||||
"haystack.component.inputs": {k: type(v).__name__ for k, v in last_inputs[name].items()},
|
||||
"haystack.component.input_types": {
|
||||
k: type(v).__name__ for k, v in last_inputs[name].items()
|
||||
},
|
||||
"haystack.component.input_spec": {
|
||||
key: {
|
||||
"type": value.type.__name__ if isinstance(value.type, type) else str(value.type),
|
||||
"senders": value.senders,
|
||||
}
|
||||
for key, value in comp.__haystack_input__._sockets_dict.items() # type: ignore
|
||||
},
|
||||
"haystack.component.output_spec": {
|
||||
key: {
|
||||
"type": value.type.__name__ if isinstance(value.type, type) else str(value.type),
|
||||
"senders": value.receivers,
|
||||
}
|
||||
for key, value in comp.__haystack_output__._sockets_dict.items() # type: ignore
|
||||
},
|
||||
},
|
||||
) as span:
|
||||
span.set_content_tag("haystack.component.input", last_inputs[name])
|
||||
|
||||
res = comp.run(**last_inputs[name])
|
||||
self.graph.nodes[name]["visits"] += 1
|
||||
|
||||
@ -794,12 +812,8 @@ class Pipeline:
|
||||
"Components must always return dictionaries: check the the documentation."
|
||||
)
|
||||
|
||||
span.set_tags(
|
||||
tags={
|
||||
"haystack.component.outputs": {k: type(v).__name__ for k, v in res.items()},
|
||||
"haystack.component.visits": self.graph.nodes[name]["visits"],
|
||||
}
|
||||
)
|
||||
span.set_tags(tags={"haystack.component.visits": self.graph.nodes[name]["visits"]})
|
||||
span.set_content_tag("haystack.component.output", res)
|
||||
|
||||
# Reset the waiting for input previous states, we managed to run a component
|
||||
before_last_waiting_for_input = None
|
||||
|
||||
@ -5,6 +5,7 @@ import os
|
||||
from typing import Dict, Any, Optional, Iterator
|
||||
|
||||
HAYSTACK_AUTO_TRACE_ENABLED_ENV_VAR = "HAYSTACK_AUTO_TRACE_ENABLED"
|
||||
HAYSTACK_CONTENT_TRACING_ENABLED_ENV_VAR = "HAYSTACK_CONTENT_TRACING_ENABLED"
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -41,6 +42,24 @@ class Span(abc.ABC):
|
||||
"""
|
||||
return self
|
||||
|
||||
def set_content_tag(self, key: str, value: Any) -> None:
|
||||
"""Set a single tag containing content information.
|
||||
|
||||
Content is sensitive information such as
|
||||
- the content of a query
|
||||
- the content of a document
|
||||
- the content of an answer
|
||||
|
||||
By default, this behavior is disabled. To enable it
|
||||
- set the environment variable `HAYSTACK_CONTENT_TRACING_ENABLED` to `true` or
|
||||
- override the `set_content_tag` method in a custom tracer implementation.
|
||||
|
||||
:param key: the name of the tag.
|
||||
:param value: the value of the tag.
|
||||
"""
|
||||
if tracer.is_content_tracing_enabled:
|
||||
self.set_tag(key, value)
|
||||
|
||||
|
||||
class Tracer(abc.ABC):
|
||||
"""Interface for instrumenting code by creating and submitting spans."""
|
||||
@ -75,6 +94,7 @@ class ProxyTracer(Tracer):
|
||||
|
||||
def __init__(self, provided_tracer: Tracer) -> None:
|
||||
self.actual_tracer: Tracer = provided_tracer
|
||||
self.is_content_tracing_enabled = os.getenv(HAYSTACK_CONTENT_TRACING_ENABLED_ENV_VAR, "false").lower() == "true"
|
||||
|
||||
@contextlib.contextmanager
|
||||
def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> Iterator[Span]:
|
||||
|
||||
@ -5,6 +5,8 @@ from typing import Any, Union
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
PRIMITIVE_TYPES = (bool, str, int, float)
|
||||
|
||||
|
||||
def coerce_tag_value(value: Any) -> Union[bool, str, int, float]:
|
||||
"""Coerces span tag values to compatible types for the tracing backend.
|
||||
@ -15,16 +17,31 @@ def coerce_tag_value(value: Any) -> Union[bool, str, int, float]:
|
||||
:param value: an arbitrary value which should be coerced to a compatible type
|
||||
:return: the value coerced to a compatible type
|
||||
"""
|
||||
if isinstance(value, (bool, str, int, float)):
|
||||
if isinstance(value, PRIMITIVE_TYPES):
|
||||
return value
|
||||
|
||||
if value is None:
|
||||
return ""
|
||||
|
||||
try:
|
||||
return json.dumps(value)
|
||||
# do that with-in try-except because who knows what kind of objects are being passed
|
||||
serializable = _serializable_value(value)
|
||||
return json.dumps(serializable)
|
||||
except Exception as error:
|
||||
logger.debug("Failed to coerce tag value to string: %s", error, exc_info=True)
|
||||
|
||||
# Our last resort is to convert the value to a string
|
||||
return str(value)
|
||||
|
||||
|
||||
def _serializable_value(value: Any) -> Any:
|
||||
if isinstance(value, list):
|
||||
return [_serializable_value(v) for v in value]
|
||||
|
||||
if isinstance(value, dict):
|
||||
return {k: _serializable_value(v) for k, v in value.items()}
|
||||
|
||||
if getattr(value, "to_dict", None):
|
||||
return _serializable_value(value.to_dict())
|
||||
|
||||
return value
|
||||
|
||||
12
releasenotes/notes/content-tracing-25df3909e11924b1.yaml
Normal file
12
releasenotes/notes/content-tracing-25df3909e11924b1.yaml
Normal file
@ -0,0 +1,12 @@
|
||||
---
|
||||
enhancements:
|
||||
- |
|
||||
Allow code instrumentation to also trace the input and output of components.
|
||||
This is useful for debugging and understanding the behavior of components.
|
||||
This behavior is disabled by default and can be enabled with one of the following methods:
|
||||
- Set the environment variable `HAYSTACK_CONTENT_TRACING_ENABLED_ENV_VAR` to `true` before importing Haystack.
|
||||
- Enable content tracing in the code:
|
||||
```python
|
||||
from haystack import tracing
|
||||
tracing.tracer.is_content_tracing_enabled = True
|
||||
```
|
||||
@ -1,14 +1,17 @@
|
||||
from typing import Optional
|
||||
|
||||
import pytest
|
||||
from _pytest.monkeypatch import MonkeyPatch
|
||||
|
||||
from haystack import component, Pipeline
|
||||
from haystack.tracing.tracer import enable_tracing
|
||||
from haystack.tracing.tracer import enable_tracing, tracer
|
||||
from test.tracing.utils import SpyingSpan, SpyingTracer
|
||||
|
||||
|
||||
@component
|
||||
class Hello:
|
||||
@component.output_types(output=str)
|
||||
def run(self, word: str):
|
||||
def run(self, word: Optional[str]): # use optional to spice up the typing tags
|
||||
"""
|
||||
Takes a string in input and returns "Hello, <string>!"
|
||||
in output.
|
||||
@ -45,8 +48,9 @@ class TestTracing:
|
||||
tags={
|
||||
"haystack.component.name": "hello",
|
||||
"haystack.component.type": "Hello",
|
||||
"haystack.component.inputs": {"word": "str"},
|
||||
"haystack.component.outputs": {"output": "str"},
|
||||
"haystack.component.input_types": {"word": "str"},
|
||||
"haystack.component.input_spec": {"word": {"type": "typing.Union[str, NoneType]", "senders": []}},
|
||||
"haystack.component.output_spec": {"output": {"type": "str", "senders": ["hello2"]}},
|
||||
"haystack.component.visits": 1,
|
||||
},
|
||||
),
|
||||
@ -55,9 +59,60 @@ class TestTracing:
|
||||
tags={
|
||||
"haystack.component.name": "hello2",
|
||||
"haystack.component.type": "Hello",
|
||||
"haystack.component.inputs": {"word": "str"},
|
||||
"haystack.component.outputs": {"output": "str"},
|
||||
"haystack.component.input_types": {"word": "str"},
|
||||
"haystack.component.input_spec": {
|
||||
"word": {"type": "typing.Union[str, NoneType]", "senders": ["hello"]}
|
||||
},
|
||||
"haystack.component.output_spec": {"output": {"type": "str", "senders": []}},
|
||||
"haystack.component.visits": 1,
|
||||
},
|
||||
),
|
||||
]
|
||||
|
||||
def test_with_enabled_content_tracing(
|
||||
self, spying_tracer: SpyingTracer, monkeypatch: MonkeyPatch, pipeline: Pipeline
|
||||
) -> None:
|
||||
# Monkeypatch to avoid impact on other tests
|
||||
monkeypatch.setattr(tracer, "is_content_tracing_enabled", True)
|
||||
|
||||
pipeline.run(data={"word": "world"})
|
||||
|
||||
assert len(spying_tracer.spans) == 3
|
||||
assert spying_tracer.spans == [
|
||||
SpyingSpan(
|
||||
operation_name="haystack.pipeline.run",
|
||||
tags={
|
||||
"haystack.pipeline.debug": False,
|
||||
"haystack.pipeline.metadata": {},
|
||||
"haystack.pipeline.max_loops_allowed": 100,
|
||||
},
|
||||
),
|
||||
SpyingSpan(
|
||||
operation_name="haystack.component.run",
|
||||
tags={
|
||||
"haystack.component.name": "hello",
|
||||
"haystack.component.type": "Hello",
|
||||
"haystack.component.input_types": {"word": "str"},
|
||||
"haystack.component.input_spec": {"word": {"type": "typing.Union[str, NoneType]", "senders": []}},
|
||||
"haystack.component.output_spec": {"output": {"type": "str", "senders": ["hello2"]}},
|
||||
"haystack.component.input": {"word": "world"},
|
||||
"haystack.component.visits": 1,
|
||||
"haystack.component.output": {"output": "Hello, world!"},
|
||||
},
|
||||
),
|
||||
SpyingSpan(
|
||||
operation_name="haystack.component.run",
|
||||
tags={
|
||||
"haystack.component.name": "hello2",
|
||||
"haystack.component.type": "Hello",
|
||||
"haystack.component.input_types": {"word": "str"},
|
||||
"haystack.component.input_spec": {
|
||||
"word": {"type": "typing.Union[str, NoneType]", "senders": ["hello"]}
|
||||
},
|
||||
"haystack.component.output_spec": {"output": {"type": "str", "senders": []}},
|
||||
"haystack.component.input": {"word": "Hello, world!"},
|
||||
"haystack.component.visits": 1,
|
||||
"haystack.component.output": {"output": "Hello, Hello, world!!"},
|
||||
},
|
||||
),
|
||||
]
|
||||
|
||||
@ -23,6 +23,7 @@ from haystack.tracing.tracer import (
|
||||
ProxyTracer,
|
||||
auto_enable_tracing,
|
||||
tracer,
|
||||
HAYSTACK_CONTENT_TRACING_ENABLED_ENV_VAR,
|
||||
)
|
||||
from test.tracing.utils import SpyingTracer
|
||||
|
||||
@ -45,8 +46,6 @@ class TestProxyTracer:
|
||||
spying_tracer = SpyingTracer()
|
||||
my_tracer = ProxyTracer(provided_tracer=spying_tracer)
|
||||
|
||||
enable_tracing(spying_tracer)
|
||||
|
||||
with my_tracer.trace("operation", {"key": "value"}) as span:
|
||||
span.set_tag("key", "value")
|
||||
span.set_tags({"key2": "value2"})
|
||||
@ -86,7 +85,7 @@ class TestAutoEnableTracer:
|
||||
processor = SimpleSpanProcessor(InMemorySpanExporter())
|
||||
traceProvider.add_span_processor(processor)
|
||||
|
||||
# We can't uset `set_tracer_provider` here, because opentelemetry has a lock to only set it once
|
||||
# We can't unset `set_tracer_provider` here, because opentelemetry has a lock to only set it once
|
||||
opentelemetry.trace._TRACER_PROVIDER = traceProvider
|
||||
|
||||
yield
|
||||
@ -149,3 +148,34 @@ class TestAutoEnableTracer:
|
||||
activated_tracer = tracer.actual_tracer
|
||||
assert isinstance(activated_tracer, DatadogTracer)
|
||||
assert is_tracing_enabled()
|
||||
|
||||
|
||||
class TestTracingContent:
|
||||
def test_set_content_tag_with_default_settings(self, spying_tracer: SpyingTracer) -> None:
|
||||
with tracer.trace("test") as span:
|
||||
span.set_content_tag("my_content", "my_content")
|
||||
|
||||
assert len(spying_tracer.spans) == 1
|
||||
span = spying_tracer.spans[0]
|
||||
assert span.tags == {}
|
||||
|
||||
def test_set_content_tag_with_enabled_content_tracing(
|
||||
self, monkeypatch: MonkeyPatch, spying_tracer: SpyingTracer
|
||||
) -> None:
|
||||
enable_tracing(spying_tracer)
|
||||
# monkeypatch to avoid impact on other tests
|
||||
monkeypatch.setattr(tracer, "is_content_tracing_enabled", True)
|
||||
|
||||
with tracer.trace("test") as span:
|
||||
span.set_content_tag("my_content", "my_content")
|
||||
|
||||
assert len(spying_tracer.spans) == 1
|
||||
span = spying_tracer.spans[0]
|
||||
assert span.tags == {"my_content": "my_content"}
|
||||
|
||||
def test_set_content_tag_when_enabled_via_env_variable(self, monkeypatch: MonkeyPatch) -> None:
|
||||
monkeypatch.setenv(HAYSTACK_CONTENT_TRACING_ENABLED_ENV_VAR, "true")
|
||||
|
||||
proxy_tracer = ProxyTracer(provided_tracer=SpyingTracer())
|
||||
|
||||
assert proxy_tracer.is_content_tracing_enabled is True
|
||||
|
||||
@ -2,6 +2,7 @@ from typing import Any, Union
|
||||
|
||||
import pytest
|
||||
|
||||
from haystack import Document
|
||||
from haystack.tracing import utils
|
||||
|
||||
|
||||
@ -22,6 +23,18 @@ class TestTypeCoercion:
|
||||
([1, 2, 3], "[1, 2, 3]"),
|
||||
({"key": "value"}, '{"key": "value"}'),
|
||||
(NonSerializableClass(), "NonSerializableClass"),
|
||||
(
|
||||
Document(id="1", content="text"),
|
||||
'{"id": "1", "content": "text", "dataframe": null, "blob": null, "score": null, "embedding": null}',
|
||||
),
|
||||
(
|
||||
[Document(id="1", content="text")],
|
||||
'[{"id": "1", "content": "text", "dataframe": null, "blob": null, "score": null, "embedding": null}]',
|
||||
),
|
||||
(
|
||||
{"key": Document(id="1", content="text")},
|
||||
'{"key": {"id": "1", "content": "text", "dataframe": null, "blob": null, "score": null, "embedding": null}}',
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_type_coercion(self, raw_value: Any, expected_tag_value: Union[bool, str, int, float]) -> None:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user