fix: Copy context when using ThreadPoolExecutor in `AsyncPipeline.run_async (#9433)

* Copy context when using threadpoolexecutor in AsyncPipeline.run_async

* Add reno

* Add dev comment
This commit is contained in:
Sebastian Husch Lee 2025-05-23 15:11:08 +02:00 committed by GitHub
parent d8cc6f7339
commit 33115bcccb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 10 additions and 1 deletions

View File

@ -3,6 +3,7 @@
# SPDX-License-Identifier: Apache-2.0
import asyncio
import contextvars
from typing import Any, AsyncIterator, Dict, List, Optional, Set
from haystack import logging, tracing
@ -68,7 +69,10 @@ class AsyncPipeline(PipelineBase):
raise PipelineRuntimeError.from_exception(component_name, instance.__class__, error) from error
else:
loop = asyncio.get_running_loop()
outputs = await loop.run_in_executor(None, lambda: instance.run(**component_inputs))
# Important: contextvars (e.g. active tracing Span) dont propagate to running loop's ThreadPoolExecutor
# We use ctx.run(...) to preserve context like the active tracing span
ctx = contextvars.copy_context()
outputs = await loop.run_in_executor(None, lambda: ctx.run(lambda: instance.run(**component_inputs)))
component_visits[component_name] += 1

View File

@ -0,0 +1,5 @@
---
fixes:
- |
To properly preserve the context when AsyncPipeline with components that only have sync run methods we copy the context using contextvars.copy_context() and run the component using `ctx.run(...)` so we can preserve context like the active tracing span.
This now means if your component 1) only has a sync run method and 2) it logs something to the tracer then this trace will be properly nested within the parent context.