Update docstring and prevent name collisions (#9078)

* Update docstring and prevent name collisions

* tracer in async pipeline not tracking same inputs as sync pipeline
This commit is contained in:
Sebastian Husch Lee 2025-03-20 13:59:31 +01:00 committed by GitHub
parent 833109900c
commit f3fc6894fd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -17,10 +17,10 @@ logger = logging.getLogger(__name__)
class AsyncPipeline(PipelineBase):
"""
Asynchronous version of the Pipeline orchestration engine.
Asynchronous version of the Pipeline orchestration engine.
Manages components in a pipeline allowing for concurrent processing when the pipeline's execution graph permits.
. This enables efficient processing of components by minimizing idle time and maximizing resource utilization.
Manages components in a pipeline allowing for concurrent processing when the pipeline's execution graph permits.
This enables efficient processing of components by minimizing idle time and maximizing resource utilization.
"""
async def run_async_generator( # noqa: PLR0915,C901
@ -153,7 +153,7 @@ class AsyncPipeline(PipelineBase):
with tracing.tracer.trace(
"haystack.async_pipeline.run",
tags={
"haystack.pipeline.input_data": data,
"haystack.pipeline.input_data": prepared_data,
"haystack.pipeline.output_data": pipeline_outputs,
"haystack.pipeline.metadata": self.metadata,
"haystack.pipeline.max_runs_per_component": self._max_runs_per_component,
@ -341,32 +341,32 @@ class AsyncPipeline(PipelineBase):
candidate = self._get_next_runnable_component(priority_queue, component_visits)
if candidate is None and running_tasks:
# We need to wait for one task to finish to make progress and potentially unblock the priority_queue
async for partial_result in _wait_for_one_task_to_complete():
yield partial_result
async for partial_res in _wait_for_one_task_to_complete():
yield partial_res
continue
if candidate is None and not running_tasks:
# done
break
priority, component_name, _ = candidate # type: ignore
priority, comp_name, _ = candidate # type: ignore
if component_name in scheduled_components:
if comp_name in scheduled_components:
# We need to wait for one task to finish to make progress
async for partial_result in _wait_for_one_task_to_complete():
yield partial_result
async for partial_res in _wait_for_one_task_to_complete():
yield partial_res
continue
if priority == ComponentPriority.HIGHEST:
# 1) run alone
async for partial_result in _run_highest_in_isolation(component_name):
yield partial_result
async for partial_res in _run_highest_in_isolation(comp_name):
yield partial_res
# then continue the loop
continue
if priority == ComponentPriority.READY:
# 1) schedule this one
await _schedule_task(component_name)
await _schedule_task(comp_name)
# 2) Possibly schedule more READY tasks if concurrency not fully used
while len(priority_queue) > 0 and not ready_sem.locked():
@ -387,25 +387,25 @@ class AsyncPipeline(PipelineBase):
# We only schedule components with priority DEFER or DEFER_LAST when no other tasks are running
elif priority in (ComponentPriority.DEFER, ComponentPriority.DEFER_LAST) and not running_tasks:
if len(priority_queue) > 0:
component_name, topological_sort = self._tiebreak_waiting_components(
component_name=component_name,
comp_name, topological_sort = self._tiebreak_waiting_components(
component_name=comp_name,
priority=priority,
priority_queue=priority_queue,
topological_sort=cached_topological_sort,
)
cached_topological_sort = topological_sort
await _schedule_task(component_name)
await _schedule_task(comp_name)
# To make progress, we wait for one task to complete before re-starting the loop
async for partial_result in _wait_for_one_task_to_complete():
yield partial_result
async for partial_res in _wait_for_one_task_to_complete():
yield partial_res
# End main loop
# 3) Drain leftover tasks
async for partial_result in _wait_for_all_tasks_to_complete():
yield partial_result
async for partial_res in _wait_for_all_tasks_to_complete():
yield partial_res
# 4) Yield final pipeline outputs
yield deepcopy(pipeline_outputs)