From 33115bcccb94ec21aa2b8f8da017a35d3226824b Mon Sep 17 00:00:00 2001 From: Sebastian Husch Lee <10526848+sjrl@users.noreply.github.com> Date: Fri, 23 May 2025 15:11:08 +0200 Subject: [PATCH] fix: Copy context when using `ThreadPoolExecutor` in `AsyncPipeline.run_async (#9433) * Copy context when using threadpoolexecutor in AsyncPipeline.run_async * Add reno * Add dev comment --- haystack/core/pipeline/async_pipeline.py | 6 +++++- .../preserve-context-async-pipeline-ce363ae95bbdb8bf.yaml | 5 +++++ 2 files changed, 10 insertions(+), 1 deletion(-) create mode 100644 releasenotes/notes/preserve-context-async-pipeline-ce363ae95bbdb8bf.yaml diff --git a/haystack/core/pipeline/async_pipeline.py b/haystack/core/pipeline/async_pipeline.py index 2f4dc4e21..17cc6d6d0 100644 --- a/haystack/core/pipeline/async_pipeline.py +++ b/haystack/core/pipeline/async_pipeline.py @@ -3,6 +3,7 @@ # SPDX-License-Identifier: Apache-2.0 import asyncio +import contextvars from typing import Any, AsyncIterator, Dict, List, Optional, Set from haystack import logging, tracing @@ -68,7 +69,10 @@ class AsyncPipeline(PipelineBase): raise PipelineRuntimeError.from_exception(component_name, instance.__class__, error) from error else: loop = asyncio.get_running_loop() - outputs = await loop.run_in_executor(None, lambda: instance.run(**component_inputs)) + # Important: contextvars (e.g. active tracing Span) don’t propagate to running loop's ThreadPoolExecutor + # We use ctx.run(...) to preserve context like the active tracing span + ctx = contextvars.copy_context() + outputs = await loop.run_in_executor(None, lambda: ctx.run(lambda: instance.run(**component_inputs))) component_visits[component_name] += 1 diff --git a/releasenotes/notes/preserve-context-async-pipeline-ce363ae95bbdb8bf.yaml b/releasenotes/notes/preserve-context-async-pipeline-ce363ae95bbdb8bf.yaml new file mode 100644 index 000000000..14d9be432 --- /dev/null +++ b/releasenotes/notes/preserve-context-async-pipeline-ce363ae95bbdb8bf.yaml @@ -0,0 +1,5 @@ +--- +fixes: + - | + To properly preserve the context when AsyncPipeline with components that only have sync run methods we copy the context using contextvars.copy_context() and run the component using `ctx.run(...)` so we can preserve context like the active tracing span. + This now means if your component 1) only has a sync run method and 2) it logs something to the tracer then this trace will be properly nested within the parent context.