diff --git a/haystack/core/pipeline/async_pipeline.py b/haystack/core/pipeline/async_pipeline.py index 72774ae1a..1cd9ee726 100644 --- a/haystack/core/pipeline/async_pipeline.py +++ b/haystack/core/pipeline/async_pipeline.py @@ -17,10 +17,10 @@ logger = logging.getLogger(__name__) class AsyncPipeline(PipelineBase): """ - Asynchronous version of the Pipeline orchestration engine. + Asynchronous version of the Pipeline orchestration engine. - Manages components in a pipeline allowing for concurrent processing when the pipeline's execution graph permits. - . This enables efficient processing of components by minimizing idle time and maximizing resource utilization. + Manages components in a pipeline allowing for concurrent processing when the pipeline's execution graph permits. + This enables efficient processing of components by minimizing idle time and maximizing resource utilization. """ async def run_async_generator( # noqa: PLR0915,C901 @@ -153,7 +153,7 @@ class AsyncPipeline(PipelineBase): with tracing.tracer.trace( "haystack.async_pipeline.run", tags={ - "haystack.pipeline.input_data": data, + "haystack.pipeline.input_data": prepared_data, "haystack.pipeline.output_data": pipeline_outputs, "haystack.pipeline.metadata": self.metadata, "haystack.pipeline.max_runs_per_component": self._max_runs_per_component, @@ -341,32 +341,32 @@ class AsyncPipeline(PipelineBase): candidate = self._get_next_runnable_component(priority_queue, component_visits) if candidate is None and running_tasks: # We need to wait for one task to finish to make progress and potentially unblock the priority_queue - async for partial_result in _wait_for_one_task_to_complete(): - yield partial_result + async for partial_res in _wait_for_one_task_to_complete(): + yield partial_res continue if candidate is None and not running_tasks: # done break - priority, component_name, _ = candidate # type: ignore + priority, comp_name, _ = candidate # type: ignore - if component_name in scheduled_components: + if comp_name in scheduled_components: # We need to wait for one task to finish to make progress - async for partial_result in _wait_for_one_task_to_complete(): - yield partial_result + async for partial_res in _wait_for_one_task_to_complete(): + yield partial_res continue if priority == ComponentPriority.HIGHEST: # 1) run alone - async for partial_result in _run_highest_in_isolation(component_name): - yield partial_result + async for partial_res in _run_highest_in_isolation(comp_name): + yield partial_res # then continue the loop continue if priority == ComponentPriority.READY: # 1) schedule this one - await _schedule_task(component_name) + await _schedule_task(comp_name) # 2) Possibly schedule more READY tasks if concurrency not fully used while len(priority_queue) > 0 and not ready_sem.locked(): @@ -387,25 +387,25 @@ class AsyncPipeline(PipelineBase): # We only schedule components with priority DEFER or DEFER_LAST when no other tasks are running elif priority in (ComponentPriority.DEFER, ComponentPriority.DEFER_LAST) and not running_tasks: if len(priority_queue) > 0: - component_name, topological_sort = self._tiebreak_waiting_components( - component_name=component_name, + comp_name, topological_sort = self._tiebreak_waiting_components( + component_name=comp_name, priority=priority, priority_queue=priority_queue, topological_sort=cached_topological_sort, ) cached_topological_sort = topological_sort - await _schedule_task(component_name) + await _schedule_task(comp_name) # To make progress, we wait for one task to complete before re-starting the loop - async for partial_result in _wait_for_one_task_to_complete(): - yield partial_result + async for partial_res in _wait_for_one_task_to_complete(): + yield partial_res # End main loop # 3) Drain leftover tasks - async for partial_result in _wait_for_all_tasks_to_complete(): - yield partial_result + async for partial_res in _wait_for_all_tasks_to_complete(): + yield partial_res # 4) Yield final pipeline outputs yield deepcopy(pipeline_outputs)