diff --git a/haystack/core/pipeline/async_pipeline.py b/haystack/core/pipeline/async_pipeline.py index 2f4dc4e21..17cc6d6d0 100644 --- a/haystack/core/pipeline/async_pipeline.py +++ b/haystack/core/pipeline/async_pipeline.py @@ -3,6 +3,7 @@ # SPDX-License-Identifier: Apache-2.0 import asyncio +import contextvars from typing import Any, AsyncIterator, Dict, List, Optional, Set from haystack import logging, tracing @@ -68,7 +69,10 @@ class AsyncPipeline(PipelineBase): raise PipelineRuntimeError.from_exception(component_name, instance.__class__, error) from error else: loop = asyncio.get_running_loop() - outputs = await loop.run_in_executor(None, lambda: instance.run(**component_inputs)) + # Important: contextvars (e.g. active tracing Span) don’t propagate to running loop's ThreadPoolExecutor + # We use ctx.run(...) to preserve context like the active tracing span + ctx = contextvars.copy_context() + outputs = await loop.run_in_executor(None, lambda: ctx.run(lambda: instance.run(**component_inputs))) component_visits[component_name] += 1 diff --git a/releasenotes/notes/preserve-context-async-pipeline-ce363ae95bbdb8bf.yaml b/releasenotes/notes/preserve-context-async-pipeline-ce363ae95bbdb8bf.yaml new file mode 100644 index 000000000..14d9be432 --- /dev/null +++ b/releasenotes/notes/preserve-context-async-pipeline-ce363ae95bbdb8bf.yaml @@ -0,0 +1,5 @@ +--- +fixes: + - | + To properly preserve the context when AsyncPipeline with components that only have sync run methods we copy the context using contextvars.copy_context() and run the component using `ctx.run(...)` so we can preserve context like the active tracing span. + This now means if your component 1) only has a sync run method and 2) it logs something to the tracer then this trace will be properly nested within the parent context.