mirror of
https://github.com/deepset-ai/haystack.git
synced 2025-12-26 22:48:29 +00:00
feat: implement pipeline tracing (#7046)
* feat: implement pipeline tracing * tests: improve test setup for spying tracer * feat: implement util for type coercion * fix: trace a after checking pipeline output * docs: add release notes * docs: drop unused imports * refactor: simplify getting raw span * refactor: implement `ProxyTracer`
This commit is contained in:
parent
d6ef2b55be
commit
6e580e4430
@ -26,6 +26,7 @@ from haystack.core.type_utils import _type_name, _types_are_compatible
|
||||
from haystack.marshal import Marshaller, YamlMarshaller
|
||||
from haystack.telemetry import pipeline_running
|
||||
from haystack.utils import is_in_jupyter
|
||||
from haystack import tracing
|
||||
|
||||
from .descriptions import find_pipeline_inputs, find_pipeline_outputs
|
||||
from .draw import _to_mermaid_image
|
||||
@ -744,168 +745,191 @@ class Pipeline:
|
||||
# The waiting_for_input list is used to keep track of components that are waiting for input.
|
||||
waiting_for_input: List[Tuple[str, Component]] = []
|
||||
|
||||
# This is what we'll return at the end
|
||||
final_outputs = {}
|
||||
while len(to_run) > 0:
|
||||
name, comp = to_run.pop(0)
|
||||
with tracing.tracer.trace(
|
||||
"haystack.pipeline.run",
|
||||
tags={
|
||||
"haystack.pipeline.debug": debug,
|
||||
"haystack.pipeline.metadata": self.metadata,
|
||||
"haystack.pipeline.max_loops_allowed": self.max_loops_allowed,
|
||||
},
|
||||
):
|
||||
# This is what we'll return at the end
|
||||
final_outputs = {}
|
||||
while len(to_run) > 0:
|
||||
name, comp = to_run.pop(0)
|
||||
|
||||
if any(socket.is_variadic for socket in comp.__haystack_input__._sockets_dict.values()) and not getattr( # type: ignore
|
||||
comp, "is_greedy", False
|
||||
):
|
||||
there_are_non_variadics = False
|
||||
for _, other_comp in to_run:
|
||||
if not any(socket.is_variadic for socket in other_comp.__haystack_input__._sockets_dict.values()): # type: ignore
|
||||
there_are_non_variadics = True
|
||||
break
|
||||
|
||||
if there_are_non_variadics:
|
||||
if (name, comp) not in waiting_for_input:
|
||||
waiting_for_input.append((name, comp))
|
||||
continue
|
||||
|
||||
if name in last_inputs and len(comp.__haystack_input__._sockets_dict) == len(last_inputs[name]): # type: ignore
|
||||
if self.graph.nodes[name]["visits"] > self.max_loops_allowed:
|
||||
msg = f"Maximum loops count ({self.max_loops_allowed}) exceeded for component '{name}'"
|
||||
raise PipelineMaxLoops(msg)
|
||||
# This component has all the inputs it needs to run
|
||||
res = comp.run(**last_inputs[name])
|
||||
self.graph.nodes[name]["visits"] += 1
|
||||
|
||||
if not isinstance(res, Mapping):
|
||||
raise PipelineRuntimeError(
|
||||
f"Component '{name}' didn't return a dictionary. "
|
||||
"Components must always return dictionaries: check the the documentation."
|
||||
)
|
||||
|
||||
# Reset the waiting for input previous states, we managed to run a component
|
||||
before_last_waiting_for_input = None
|
||||
last_waiting_for_input = None
|
||||
|
||||
if (name, comp) in waiting_for_input:
|
||||
# We manage to run this component that was in the waiting list, we can remove it.
|
||||
# This happens when a component was put in the waiting list but we reached it from another edge.
|
||||
waiting_for_input.remove((name, comp))
|
||||
|
||||
# We keep track of which keys to remove from res at the end of the loop.
|
||||
# This is done after the output has been distributed to the next components, so that
|
||||
# we're sure all components that need this output have received it.
|
||||
to_remove_from_res = set()
|
||||
for sender_component_name, receiver_component_name, edge_data in self.graph.edges(data=True):
|
||||
if receiver_component_name == name and edge_data["to_socket"].is_variadic:
|
||||
# Delete variadic inputs that were already consumed
|
||||
last_inputs[name][edge_data["to_socket"].name] = []
|
||||
|
||||
if name != sender_component_name:
|
||||
continue
|
||||
|
||||
if edge_data["from_socket"].name not in res:
|
||||
# This output has not been produced by the component, skip it
|
||||
continue
|
||||
|
||||
if receiver_component_name not in last_inputs:
|
||||
last_inputs[receiver_component_name] = {}
|
||||
to_remove_from_res.add(edge_data["from_socket"].name)
|
||||
value = res[edge_data["from_socket"].name]
|
||||
|
||||
if edge_data["to_socket"].is_variadic:
|
||||
if edge_data["to_socket"].name not in last_inputs[receiver_component_name]:
|
||||
last_inputs[receiver_component_name][edge_data["to_socket"].name] = []
|
||||
# Add to the list of variadic inputs
|
||||
last_inputs[receiver_component_name][edge_data["to_socket"].name].append(value)
|
||||
else:
|
||||
last_inputs[receiver_component_name][edge_data["to_socket"].name] = value
|
||||
|
||||
pair = (receiver_component_name, self.graph.nodes[receiver_component_name]["instance"])
|
||||
if pair not in waiting_for_input and pair not in to_run:
|
||||
to_run.append(pair)
|
||||
|
||||
res = {k: v for k, v in res.items() if k not in to_remove_from_res}
|
||||
|
||||
if len(res) > 0:
|
||||
final_outputs[name] = res
|
||||
else:
|
||||
# This component doesn't have enough inputs so we can't run it yet
|
||||
if (name, comp) not in waiting_for_input:
|
||||
waiting_for_input.append((name, comp))
|
||||
|
||||
if len(to_run) == 0 and len(waiting_for_input) > 0:
|
||||
# Check if we're stuck in a loop.
|
||||
# It's important to check whether previous waitings are None as it could be that no
|
||||
# Component has actually been run yet.
|
||||
if (
|
||||
before_last_waiting_for_input is not None
|
||||
and last_waiting_for_input is not None
|
||||
and before_last_waiting_for_input == last_waiting_for_input
|
||||
if any(socket.is_variadic for socket in comp.__haystack_input__._sockets_dict.values()) and not getattr( # type: ignore
|
||||
comp, "is_greedy", False
|
||||
):
|
||||
# Are we actually stuck or there's a lazy variadic waiting for input?
|
||||
# This is our last resort, if there's no lazy variadic waiting for input
|
||||
# we're stuck for real and we can't make any progress.
|
||||
for name, comp in waiting_for_input:
|
||||
is_variadic = any(socket.is_variadic for socket in comp.__haystack_input__._sockets_dict.values()) # type: ignore
|
||||
if is_variadic and not comp.__haystack_is_greedy__: # type: ignore[attr-defined]
|
||||
there_are_non_variadics = False
|
||||
for _, other_comp in to_run:
|
||||
if not any(socket.is_variadic for socket in other_comp.__haystack_input__._sockets_dict.values()): # type: ignore
|
||||
there_are_non_variadics = True
|
||||
break
|
||||
else:
|
||||
# We're stuck in a loop for real, we can't make any progress.
|
||||
# BAIL!
|
||||
break
|
||||
|
||||
if len(waiting_for_input) == 1:
|
||||
# We have a single component with variadic input waiting for input.
|
||||
# If we're at this point it means it has been waiting for input for at least 2 iterations.
|
||||
# This will never run.
|
||||
# BAIL!
|
||||
break
|
||||
if there_are_non_variadics:
|
||||
if (name, comp) not in waiting_for_input:
|
||||
waiting_for_input.append((name, comp))
|
||||
continue
|
||||
|
||||
# There was a lazy variadic waiting for input, we can run it
|
||||
waiting_for_input.remove((name, comp))
|
||||
to_run.append((name, comp))
|
||||
continue
|
||||
if name in last_inputs and len(comp.__haystack_input__._sockets_dict) == len(last_inputs[name]): # type: ignore
|
||||
if self.graph.nodes[name]["visits"] > self.max_loops_allowed:
|
||||
msg = f"Maximum loops count ({self.max_loops_allowed}) exceeded for component '{name}'"
|
||||
raise PipelineMaxLoops(msg)
|
||||
# This component has all the inputs it needs to run
|
||||
with tracing.tracer.trace(
|
||||
"haystack.component.run",
|
||||
tags={
|
||||
"haystack.component.name": name,
|
||||
"haystack.component.type": comp.__class__.__name__,
|
||||
"haystack.component.inputs": {k: type(v).__name__ for k, v in last_inputs[name].items()},
|
||||
},
|
||||
) as span:
|
||||
res = comp.run(**last_inputs[name])
|
||||
self.graph.nodes[name]["visits"] += 1
|
||||
|
||||
before_last_waiting_for_input = (
|
||||
last_waiting_for_input.copy() if last_waiting_for_input is not None else None
|
||||
)
|
||||
last_waiting_for_input = {item[0] for item in waiting_for_input}
|
||||
|
||||
# Remove from waiting only if there is actually enough input to run
|
||||
for name, comp in waiting_for_input:
|
||||
if name not in last_inputs:
|
||||
last_inputs[name] = {}
|
||||
|
||||
# Lazy variadics must be removed only if there's nothing else to run at this stage
|
||||
is_variadic = any(socket.is_variadic for socket in comp.__haystack_input__._sockets_dict.values()) # type: ignore
|
||||
if is_variadic and not comp.__haystack_is_greedy__: # type: ignore[attr-defined]
|
||||
there_are_only_lazy_variadics = True
|
||||
for other_name, other_comp in waiting_for_input:
|
||||
if name == other_name:
|
||||
continue
|
||||
there_are_only_lazy_variadics &= (
|
||||
any(
|
||||
socket.is_variadic for socket in other_comp.__haystack_input__._sockets_dict.values() # type: ignore
|
||||
)
|
||||
and not other_comp.__haystack_is_greedy__ # type: ignore[attr-defined]
|
||||
if not isinstance(res, Mapping):
|
||||
raise PipelineRuntimeError(
|
||||
f"Component '{name}' didn't return a dictionary. "
|
||||
"Components must always return dictionaries: check the the documentation."
|
||||
)
|
||||
|
||||
if not there_are_only_lazy_variadics:
|
||||
span.set_tags(
|
||||
tags={
|
||||
"haystack.component.outputs": {k: type(v).__name__ for k, v in res.items()},
|
||||
"haystack.component.visits": self.graph.nodes[name]["visits"],
|
||||
}
|
||||
)
|
||||
|
||||
# Reset the waiting for input previous states, we managed to run a component
|
||||
before_last_waiting_for_input = None
|
||||
last_waiting_for_input = None
|
||||
|
||||
if (name, comp) in waiting_for_input:
|
||||
# We manage to run this component that was in the waiting list, we can remove it.
|
||||
# This happens when a component was put in the waiting list but we reached it from another edge.
|
||||
waiting_for_input.remove((name, comp))
|
||||
|
||||
# We keep track of which keys to remove from res at the end of the loop.
|
||||
# This is done after the output has been distributed to the next components, so that
|
||||
# we're sure all components that need this output have received it.
|
||||
to_remove_from_res = set()
|
||||
for sender_component_name, receiver_component_name, edge_data in self.graph.edges(data=True):
|
||||
if receiver_component_name == name and edge_data["to_socket"].is_variadic:
|
||||
# Delete variadic inputs that were already consumed
|
||||
last_inputs[name][edge_data["to_socket"].name] = []
|
||||
|
||||
if name != sender_component_name:
|
||||
continue
|
||||
|
||||
# Find the first component that has all the inputs it needs to run
|
||||
has_enough_inputs = True
|
||||
for input_socket in comp.__haystack_input__._sockets_dict.values(): # type: ignore
|
||||
if input_socket.is_mandatory and input_socket.name not in last_inputs[name]:
|
||||
has_enough_inputs = False
|
||||
if edge_data["from_socket"].name not in res:
|
||||
# This output has not been produced by the component, skip it
|
||||
continue
|
||||
|
||||
if receiver_component_name not in last_inputs:
|
||||
last_inputs[receiver_component_name] = {}
|
||||
to_remove_from_res.add(edge_data["from_socket"].name)
|
||||
value = res[edge_data["from_socket"].name]
|
||||
|
||||
if edge_data["to_socket"].is_variadic:
|
||||
if edge_data["to_socket"].name not in last_inputs[receiver_component_name]:
|
||||
last_inputs[receiver_component_name][edge_data["to_socket"].name] = []
|
||||
# Add to the list of variadic inputs
|
||||
last_inputs[receiver_component_name][edge_data["to_socket"].name].append(value)
|
||||
else:
|
||||
last_inputs[receiver_component_name][edge_data["to_socket"].name] = value
|
||||
|
||||
pair = (receiver_component_name, self.graph.nodes[receiver_component_name]["instance"])
|
||||
if pair not in waiting_for_input and pair not in to_run:
|
||||
to_run.append(pair)
|
||||
|
||||
res = {k: v for k, v in res.items() if k not in to_remove_from_res}
|
||||
|
||||
if len(res) > 0:
|
||||
final_outputs[name] = res
|
||||
else:
|
||||
# This component doesn't have enough inputs so we can't run it yet
|
||||
if (name, comp) not in waiting_for_input:
|
||||
waiting_for_input.append((name, comp))
|
||||
|
||||
if len(to_run) == 0 and len(waiting_for_input) > 0:
|
||||
# Check if we're stuck in a loop.
|
||||
# It's important to check whether previous waitings are None as it could be that no
|
||||
# Component has actually been run yet.
|
||||
if (
|
||||
before_last_waiting_for_input is not None
|
||||
and last_waiting_for_input is not None
|
||||
and before_last_waiting_for_input == last_waiting_for_input
|
||||
):
|
||||
# Are we actually stuck or there's a lazy variadic waiting for input?
|
||||
# This is our last resort, if there's no lazy variadic waiting for input
|
||||
# we're stuck for real and we can't make any progress.
|
||||
for name, comp in waiting_for_input:
|
||||
is_variadic = any(socket.is_variadic for socket in comp.__haystack_input__._sockets_dict.values()) # type: ignore
|
||||
if is_variadic and not comp.__haystack_is_greedy__: # type: ignore[attr-defined]
|
||||
break
|
||||
else:
|
||||
# We're stuck in a loop for real, we can't make any progress.
|
||||
# BAIL!
|
||||
break
|
||||
if input_socket.is_mandatory:
|
||||
continue
|
||||
|
||||
if input_socket.name not in last_inputs[name]:
|
||||
last_inputs[name][input_socket.name] = input_socket.default_value
|
||||
if has_enough_inputs:
|
||||
break
|
||||
if len(waiting_for_input) == 1:
|
||||
# We have a single component with variadic input waiting for input.
|
||||
# If we're at this point it means it has been waiting for input for at least 2 iterations.
|
||||
# This will never run.
|
||||
# BAIL!
|
||||
break
|
||||
|
||||
waiting_for_input.remove((name, comp))
|
||||
to_run.append((name, comp))
|
||||
# There was a lazy variadic waiting for input, we can run it
|
||||
waiting_for_input.remove((name, comp))
|
||||
to_run.append((name, comp))
|
||||
continue
|
||||
|
||||
return final_outputs
|
||||
before_last_waiting_for_input = (
|
||||
last_waiting_for_input.copy() if last_waiting_for_input is not None else None
|
||||
)
|
||||
last_waiting_for_input = {item[0] for item in waiting_for_input}
|
||||
|
||||
# Remove from waiting only if there is actually enough input to run
|
||||
for name, comp in waiting_for_input:
|
||||
if name not in last_inputs:
|
||||
last_inputs[name] = {}
|
||||
|
||||
# Lazy variadics must be removed only if there's nothing else to run at this stage
|
||||
is_variadic = any(socket.is_variadic for socket in comp.__haystack_input__._sockets_dict.values()) # type: ignore
|
||||
if is_variadic and not comp.__haystack_is_greedy__: # type: ignore[attr-defined]
|
||||
there_are_only_lazy_variadics = True
|
||||
for other_name, other_comp in waiting_for_input:
|
||||
if name == other_name:
|
||||
continue
|
||||
there_are_only_lazy_variadics &= (
|
||||
any(
|
||||
socket.is_variadic for socket in other_comp.__haystack_input__._sockets_dict.values() # type: ignore
|
||||
)
|
||||
and not other_comp.__haystack_is_greedy__ # type: ignore[attr-defined]
|
||||
)
|
||||
|
||||
if not there_are_only_lazy_variadics:
|
||||
continue
|
||||
|
||||
# Find the first component that has all the inputs it needs to run
|
||||
has_enough_inputs = True
|
||||
for input_socket in comp.__haystack_input__._sockets_dict.values(): # type: ignore
|
||||
if input_socket.is_mandatory and input_socket.name not in last_inputs[name]:
|
||||
has_enough_inputs = False
|
||||
break
|
||||
if input_socket.is_mandatory:
|
||||
continue
|
||||
|
||||
if input_socket.name not in last_inputs[name]:
|
||||
last_inputs[name][input_socket.name] = input_socket.default_value
|
||||
if has_enough_inputs:
|
||||
break
|
||||
|
||||
waiting_for_input.remove((name, comp))
|
||||
to_run.append((name, comp))
|
||||
|
||||
return final_outputs
|
||||
|
||||
def _prepare_component_input_data(self, data: Dict[str, Any]) -> Tuple[Dict[str, Dict[str, Any]], Dict[str, Any]]:
|
||||
"""
|
||||
|
||||
1
haystack/tracing/__init__.py
Normal file
1
haystack/tracing/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
from .tracer import Tracer, Span, enable_tracing, disable_tracing, tracer, is_tracing_enabled
|
||||
118
haystack/tracing/tracer.py
Normal file
118
haystack/tracing/tracer.py
Normal file
@ -0,0 +1,118 @@
|
||||
import abc
|
||||
import contextlib
|
||||
from typing import Dict, Any, Optional, Iterator
|
||||
|
||||
|
||||
class Span(abc.ABC):
|
||||
"""Interface for an instrumented operation."""
|
||||
|
||||
@abc.abstractmethod
|
||||
def set_tag(self, key: str, value: Any) -> None:
|
||||
"""Set a single tag on the span.
|
||||
|
||||
Note that the value will be serialized to a string, so it's best to use simple types like strings, numbers, or
|
||||
booleans.
|
||||
|
||||
:param key: the name of the tag.
|
||||
:param value: the value of the tag.
|
||||
"""
|
||||
pass
|
||||
|
||||
def set_tags(self, tags: Dict[str, Any]) -> None:
|
||||
"""Set multiple tags on the span.
|
||||
|
||||
:param tags: a mapping of tag names to tag values.
|
||||
"""
|
||||
for key, value in tags.items():
|
||||
self.set_tag(key, value)
|
||||
|
||||
def raw_span(self) -> Any:
|
||||
"""Provides access to the underlying span object of the tracer.
|
||||
|
||||
Use this if you need full access to the underlying span object.
|
||||
|
||||
:return: The underlying span object.
|
||||
"""
|
||||
return self
|
||||
|
||||
|
||||
class Tracer(abc.ABC):
|
||||
"""Interface for instrumenting code by creating and submitting spans."""
|
||||
|
||||
@abc.abstractmethod
|
||||
@contextlib.contextmanager
|
||||
def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> Iterator[Span]:
|
||||
"""Trace the execution of a block of code.
|
||||
|
||||
:param operation_name: the name of the operation being traced.
|
||||
:param tags: tags to apply to the newly created span.
|
||||
:return: the newly created span.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def current_span(self) -> Optional[Span]:
|
||||
"""Returns the currently active span. If no span is active, returns `None`.
|
||||
|
||||
:return: Currently active span or `None` if no span is active.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class ProxyTracer(Tracer):
|
||||
"""Container for the actual tracer instance.
|
||||
|
||||
This eases
|
||||
- replacing the actual tracer instance without having to change the global tracer instance
|
||||
- implementing default behavior for the tracer
|
||||
"""
|
||||
|
||||
def __init__(self, provided_tracer: Tracer) -> None:
|
||||
self.actual_tracer: Tracer = provided_tracer
|
||||
|
||||
@contextlib.contextmanager
|
||||
def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> Iterator[Span]:
|
||||
with self.actual_tracer.trace(operation_name, tags=tags) as span:
|
||||
yield span
|
||||
|
||||
def current_span(self) -> Optional[Span]:
|
||||
return self.actual_tracer.current_span()
|
||||
|
||||
|
||||
class NullSpan(Span):
|
||||
"""A no-op implementation of the `Span` interface. This is used when tracing is disabled."""
|
||||
|
||||
def set_tag(self, key: str, value: Any) -> None:
|
||||
pass
|
||||
|
||||
|
||||
class NullTracer(Tracer):
|
||||
"""A no-op implementation of the `Tracer` interface. This is used when tracing is disabled."""
|
||||
|
||||
@contextlib.contextmanager
|
||||
def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> Iterator[Span]:
|
||||
yield NullSpan()
|
||||
|
||||
def current_span(self) -> Optional[Span]:
|
||||
return NullSpan()
|
||||
|
||||
|
||||
# We use the proxy pattern to allow for easy enabling and disabling of tracing without having to change the global
|
||||
# tracer instance. That's especially convenient if users import the object directly
|
||||
# (in that case we'd have to monkey-patch it in all of these modules).
|
||||
tracer: ProxyTracer = ProxyTracer(provided_tracer=NullTracer())
|
||||
|
||||
|
||||
def enable_tracing(provided_tracer: Tracer) -> None:
|
||||
"""Enable tracing by setting the global tracer instance."""
|
||||
tracer.actual_tracer = provided_tracer
|
||||
|
||||
|
||||
def disable_tracing() -> None:
|
||||
"""Disable tracing by setting the global tracer instance to a no-op tracer."""
|
||||
tracer.actual_tracer = NullTracer()
|
||||
|
||||
|
||||
def is_tracing_enabled() -> bool:
|
||||
"""Return whether tracing is enabled."""
|
||||
return not isinstance(tracer.actual_tracer, NullTracer)
|
||||
30
haystack/tracing/utils.py
Normal file
30
haystack/tracing/utils.py
Normal file
@ -0,0 +1,30 @@
|
||||
import json
|
||||
import logging
|
||||
from typing import Any, Union
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def coerce_tag_value(value: Any) -> Union[bool, str, int, float]:
|
||||
"""Coerces span tag values to compatible types for the tracing backend.
|
||||
|
||||
Most tracing libraries don't support sending complex types to the backend. Hence, we need to convert them to
|
||||
compatible types.
|
||||
|
||||
:param value: an arbitrary value which should be coerced to a compatible type
|
||||
:return: the value coerced to a compatible type
|
||||
"""
|
||||
if isinstance(value, (bool, str, int, float)):
|
||||
return value
|
||||
|
||||
if value is None:
|
||||
return ""
|
||||
|
||||
try:
|
||||
return json.dumps(value)
|
||||
except Exception as error:
|
||||
logger.debug("Failed to coerce tag value to string: %s", error, exc_info=True)
|
||||
|
||||
# Our last resort is to convert the value to a string
|
||||
return str(value)
|
||||
@ -0,0 +1,89 @@
|
||||
---
|
||||
features:
|
||||
- |
|
||||
Added option to instrument pipeline and component runs.
|
||||
This allows users to observe their pipeline runs and component runs in real-time via their chosen observability
|
||||
tool. Out-of-the-box support for OpenTelemetry and Datadog will be added in separate contributions.
|
||||
|
||||
Example usage for [OpenTelemetry](https://opentelemetry.io/docs/languages/python/):
|
||||
|
||||
1. Install OpenTelemetry SDK and exporter:
|
||||
```bash
|
||||
pip install opentelemetry-sdk opentelemetry-exporter-otlp-proto-http
|
||||
```
|
||||
|
||||
2. Configure OpenTelemetry SDK with your tracing provider and exporter:
|
||||
```python
|
||||
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
|
||||
|
||||
from opentelemetry import trace
|
||||
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
|
||||
from opentelemetry.sdk.trace import TracerProvider
|
||||
from opentelemetry.sdk.trace.export import BatchSpanProcessor
|
||||
|
||||
# Service name is required for most backends
|
||||
resource = Resource(attributes={
|
||||
SERVICE_NAME: "haystack"
|
||||
})
|
||||
|
||||
traceProvider = TracerProvider(resource=resource)
|
||||
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4318/v1/traces"))
|
||||
traceProvider.add_span_processor(processor)
|
||||
trace.set_tracer_provider(traceProvider)
|
||||
|
||||
tracer = traceProvider.get_tracer("my_application")
|
||||
|
||||
|
||||
3. Create tracer
|
||||
```python
|
||||
import contextlib
|
||||
from typing import Optional, Dict, Any, Iterator
|
||||
|
||||
from opentelemetry import trace
|
||||
from opentelemetry.trace import NonRecordingSpan
|
||||
|
||||
from haystack.tracing import Tracer, Span
|
||||
from haystack.tracing import utils as tracing_utils
|
||||
import opentelemetry.trace
|
||||
|
||||
|
||||
class OpenTelemetrySpan(Span):
|
||||
def __init__(self, span: opentelemetry.trace.Span) -> None:
|
||||
self._span = span
|
||||
|
||||
def set_tag(self, key: str, value: Any) -> None:
|
||||
coerced_value = tracing_utils.coerce_tag_value(value)
|
||||
self._span.set_attribute(key, coerced_value)
|
||||
|
||||
|
||||
class OpenTelemetryTracer(Tracer):
|
||||
def __init__(self, tracer: opentelemetry.trace.Tracer) -> None:
|
||||
self._tracer = tracer
|
||||
|
||||
@contextlib.contextmanager
|
||||
def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> Iterator[Span]:
|
||||
with self._tracer.start_as_current_span(operation_name) as span:
|
||||
span = OpenTelemetrySpan(span)
|
||||
if tags:
|
||||
span.set_tags(tags)
|
||||
|
||||
yield span
|
||||
|
||||
def current_span(self) -> Optional[Span]:
|
||||
current_span = trace.get_current_span()
|
||||
if isinstance(current_span, NonRecordingSpan):
|
||||
return None
|
||||
|
||||
return OpenTelemetrySpan(current_span)
|
||||
|
||||
```
|
||||
|
||||
4. Use the tracer with Haystack:
|
||||
```python
|
||||
from haystack import tracing
|
||||
|
||||
haystack_tracer = OpenTelemetryTracer(tracer)
|
||||
tracing.enable_tracing(haystack_tracer)
|
||||
```
|
||||
|
||||
5. Run your pipeline
|
||||
@ -1,12 +1,15 @@
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Generator
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
import pytest
|
||||
from openai.types.chat import ChatCompletion, ChatCompletionMessage
|
||||
from openai.types.chat.chat_completion import Choice
|
||||
|
||||
from haystack import tracing
|
||||
from haystack.testing.test_utils import set_all_seeds
|
||||
from test.tracing.utils import SpyingTracer
|
||||
|
||||
set_all_seeds(0)
|
||||
|
||||
@ -68,3 +71,14 @@ def request_blocker(request: pytest.FixtureRequest, monkeypatch):
|
||||
raise RuntimeError(f"The test was about to {method} {self.scheme}://{self.host}{url}")
|
||||
|
||||
monkeypatch.setattr("urllib3.connectionpool.HTTPConnectionPool.urlopen", urlopen_mock)
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def spying_tracer() -> Generator[SpyingTracer, None, None]:
|
||||
tracer = SpyingTracer()
|
||||
tracing.enable_tracing(tracer)
|
||||
|
||||
yield tracer
|
||||
|
||||
# Make sure to disable tracing after the test to avoid affecting other tests
|
||||
tracing.disable_tracing()
|
||||
|
||||
63
test/core/pipeline/test_tracing.py
Normal file
63
test/core/pipeline/test_tracing.py
Normal file
@ -0,0 +1,63 @@
|
||||
import pytest
|
||||
|
||||
from haystack import component, Pipeline
|
||||
from haystack.tracing.tracer import enable_tracing
|
||||
from test.tracing.utils import SpyingSpan, SpyingTracer
|
||||
|
||||
|
||||
@component
|
||||
class Hello:
|
||||
@component.output_types(output=str)
|
||||
def run(self, word: str):
|
||||
"""
|
||||
Takes a string in input and returns "Hello, <string>!"
|
||||
in output.
|
||||
"""
|
||||
return {"output": f"Hello, {word}!"}
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def pipeline() -> Pipeline:
|
||||
pipeline = Pipeline()
|
||||
pipeline.add_component("hello", Hello())
|
||||
pipeline.add_component("hello2", Hello())
|
||||
pipeline.connect("hello.output", "hello2.word")
|
||||
return pipeline
|
||||
|
||||
|
||||
class TestTracing:
|
||||
def test_with_enabled_tracing(self, pipeline: Pipeline, spying_tracer: SpyingTracer) -> None:
|
||||
pipeline.run(data={"word": "world"})
|
||||
|
||||
assert len(spying_tracer.spans) == 3
|
||||
|
||||
assert spying_tracer.spans == [
|
||||
SpyingSpan(
|
||||
operation_name="haystack.pipeline.run",
|
||||
tags={
|
||||
"haystack.pipeline.debug": False,
|
||||
"haystack.pipeline.metadata": {},
|
||||
"haystack.pipeline.max_loops_allowed": 100,
|
||||
},
|
||||
),
|
||||
SpyingSpan(
|
||||
operation_name="haystack.component.run",
|
||||
tags={
|
||||
"haystack.component.name": "hello",
|
||||
"haystack.component.type": "Hello",
|
||||
"haystack.component.inputs": {"word": "str"},
|
||||
"haystack.component.outputs": {"output": "str"},
|
||||
"haystack.component.visits": 1,
|
||||
},
|
||||
),
|
||||
SpyingSpan(
|
||||
operation_name="haystack.component.run",
|
||||
tags={
|
||||
"haystack.component.name": "hello2",
|
||||
"haystack.component.type": "Hello",
|
||||
"haystack.component.inputs": {"word": "str"},
|
||||
"haystack.component.outputs": {"output": "str"},
|
||||
"haystack.component.visits": 1,
|
||||
},
|
||||
),
|
||||
]
|
||||
0
test/tracing/__init__.py
Normal file
0
test/tracing/__init__.py
Normal file
63
test/tracing/test_tracer.py
Normal file
63
test/tracing/test_tracer.py
Normal file
@ -0,0 +1,63 @@
|
||||
from unittest.mock import Mock
|
||||
|
||||
from haystack.tracing.tracer import (
|
||||
NullTracer,
|
||||
NullSpan,
|
||||
enable_tracing,
|
||||
Tracer,
|
||||
disable_tracing,
|
||||
is_tracing_enabled,
|
||||
ProxyTracer,
|
||||
tracer,
|
||||
)
|
||||
from test.tracing.utils import SpyingTracer
|
||||
|
||||
|
||||
class TestNullTracer:
|
||||
def test_tracing(self) -> None:
|
||||
assert isinstance(tracer.actual_tracer, NullTracer)
|
||||
|
||||
# None of this raises
|
||||
with tracer.trace("operation", {"key": "value"}) as span:
|
||||
span.set_tag("key", "value")
|
||||
span.set_tags({"key": "value"})
|
||||
|
||||
assert isinstance(tracer.current_span(), NullSpan)
|
||||
assert isinstance(tracer.current_span().raw_span(), NullSpan)
|
||||
|
||||
|
||||
class TestProxyTracer:
|
||||
def test_tracing(self) -> None:
|
||||
spying_tracer = SpyingTracer()
|
||||
my_tracer = ProxyTracer(provided_tracer=spying_tracer)
|
||||
|
||||
enable_tracing(spying_tracer)
|
||||
|
||||
with my_tracer.trace("operation", {"key": "value"}) as span:
|
||||
span.set_tag("key", "value")
|
||||
span.set_tags({"key2": "value2"})
|
||||
|
||||
assert len(spying_tracer.spans) == 1
|
||||
assert spying_tracer.spans[0].operation_name == "operation"
|
||||
assert spying_tracer.spans[0].tags == {"key": "value", "key2": "value2"}
|
||||
|
||||
|
||||
class TestConfigureTracer:
|
||||
def test_enable_tracer(self) -> None:
|
||||
my_tracer = Mock(spec=Tracer) # anything else than `NullTracer` works for this test
|
||||
|
||||
enable_tracing(my_tracer)
|
||||
|
||||
assert isinstance(tracer, ProxyTracer)
|
||||
assert tracer.actual_tracer is my_tracer
|
||||
assert is_tracing_enabled()
|
||||
|
||||
def test_disable_tracing(self) -> None:
|
||||
my_tracker = Mock(spec=Tracer) # anything else than `NullTracer` works for this test
|
||||
|
||||
enable_tracing(my_tracker)
|
||||
assert tracer.actual_tracer is my_tracker
|
||||
|
||||
disable_tracing()
|
||||
assert isinstance(tracer.actual_tracer, NullTracer)
|
||||
assert is_tracing_enabled() is False
|
||||
30
test/tracing/test_utils.py
Normal file
30
test/tracing/test_utils.py
Normal file
@ -0,0 +1,30 @@
|
||||
from typing import Any, Union
|
||||
|
||||
import pytest
|
||||
|
||||
from haystack.tracing import utils
|
||||
|
||||
|
||||
class NonSerializableClass:
|
||||
def __str__(self) -> str:
|
||||
return "NonSerializableClass"
|
||||
|
||||
|
||||
class TestTypeCoercion:
|
||||
@pytest.mark.parametrize(
|
||||
"raw_value,expected_tag_value",
|
||||
[
|
||||
(1, 1),
|
||||
(1.0, 1.0),
|
||||
(True, True),
|
||||
(None, ""),
|
||||
("string", "string"),
|
||||
([1, 2, 3], "[1, 2, 3]"),
|
||||
({"key": "value"}, '{"key": "value"}'),
|
||||
(NonSerializableClass(), "NonSerializableClass"),
|
||||
],
|
||||
)
|
||||
def test_type_coercion(self, raw_value: Any, expected_tag_value: Union[bool, str, int, float]) -> None:
|
||||
coerced_value = utils.coerce_tag_value(raw_value)
|
||||
|
||||
assert coerced_value == expected_tag_value
|
||||
33
test/tracing/utils.py
Normal file
33
test/tracing/utils.py
Normal file
@ -0,0 +1,33 @@
|
||||
import contextlib
|
||||
import dataclasses
|
||||
from typing import Dict, Any, Optional, List, Iterator
|
||||
|
||||
from haystack.tracing import Span, Tracer
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class SpyingSpan(Span):
|
||||
operation_name: str
|
||||
tags: Dict[str, Any] = dataclasses.field(default_factory=dict)
|
||||
|
||||
def set_tag(self, key: str, value: Any) -> None:
|
||||
self.tags[key] = value
|
||||
|
||||
|
||||
class SpyingTracer(Tracer):
|
||||
def current_span(self) -> Optional[Span]:
|
||||
return self.spans[-1] if self.spans else None
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.spans: List[SpyingSpan] = []
|
||||
|
||||
@contextlib.contextmanager
|
||||
def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> Iterator[Span]:
|
||||
new_span = SpyingSpan(operation_name)
|
||||
|
||||
for key, value in (tags or {}).items():
|
||||
new_span.set_tag(key, value)
|
||||
|
||||
self.spans.append(new_span)
|
||||
|
||||
yield new_span
|
||||
Loading…
x
Reference in New Issue
Block a user