From 0411cd938a8b1d4a7153b0c269c6cd11d7da2efd Mon Sep 17 00:00:00 2001 From: Silvano Cerza <3314350+silvanocerza@users.noreply.github.com> Date: Fri, 12 Jul 2024 17:30:10 +0200 Subject: [PATCH] Fix bug in Pipeline.run() executing Components in a wrong and unexpected order (#8021) * Fix bug in Pipeline.run() executing Components in a wrong and unexpected order * Update haystack/core/pipeline/base.py Co-authored-by: Madeesh Kannan --------- Co-authored-by: Madeesh Kannan --- haystack/core/pipeline/base.py | 12 ++ .../fix-execution-order-1121cedd9c68c560.yaml | 4 + .../pipeline/features/pipeline_run.feature | 1 + test/core/pipeline/features/test_run.py | 114 +++++++++++++++++- test/core/pipeline/test_pipeline.py | 11 +- 5 files changed, 133 insertions(+), 9 deletions(-) create mode 100644 releasenotes/notes/fix-execution-order-1121cedd9c68c560.yaml diff --git a/haystack/core/pipeline/base.py b/haystack/core/pipeline/base.py index 3552d37bd..cb7102b28 100644 --- a/haystack/core/pipeline/base.py +++ b/haystack/core/pipeline/base.py @@ -791,6 +791,18 @@ class PipelineBase: def _init_run_queue(self, pipeline_inputs: Dict[str, Any]) -> List[Tuple[str, Component]]: run_queue: List[Tuple[str, Component]] = [] + + # HACK: Quick workaround for the issue of execution order not being + # well-defined (NB - https://github.com/deepset-ai/haystack/issues/7985). + # We should fix the original execution logic instead. + if networkx.is_directed_acyclic_graph(self.graph): + # If the Pipeline is linear we can easily determine the order of execution with + # a topological sort. + # So use that to get the run order. + for node in networkx.topological_sort(self.graph): + run_queue.append((node, self.graph.nodes[node]["instance"])) + return run_queue + for node_name in self.graph.nodes: component = self.graph.nodes[node_name]["instance"] diff --git a/releasenotes/notes/fix-execution-order-1121cedd9c68c560.yaml b/releasenotes/notes/fix-execution-order-1121cedd9c68c560.yaml new file mode 100644 index 000000000..77dd77967 --- /dev/null +++ b/releasenotes/notes/fix-execution-order-1121cedd9c68c560.yaml @@ -0,0 +1,4 @@ +--- +fixes: + - | + Fix bug in Pipeline.run() executing Components in a wrong and unexpected order diff --git a/test/core/pipeline/features/pipeline_run.feature b/test/core/pipeline/features/pipeline_run.feature index c045f63ee..0ea749c64 100644 --- a/test/core/pipeline/features/pipeline_run.feature +++ b/test/core/pipeline/features/pipeline_run.feature @@ -37,6 +37,7 @@ Feature: Pipeline running | that has a component with default inputs that doesn't receive anything from its sender | | that has a component with default inputs that doesn't receive anything from its sender but receives input from user | | that has a loop and a component with default inputs that doesn't receive anything from its sender but receives input from user | + | that has multiple components with only default inputs and are added in a different order from the order of execution | Scenario Outline: Running a bad Pipeline Given a pipeline diff --git a/test/core/pipeline/features/test_run.py b/test/core/pipeline/features/test_run.py index c6c3362b9..baf157baa 100644 --- a/test/core/pipeline/features/test_run.py +++ b/test/core/pipeline/features/test_run.py @@ -423,7 +423,7 @@ def pipeline_that_has_three_branches_that_dont_merge(): PipelineRunData( inputs={"add_one": {"value": 1}}, expected_outputs={"add_one_again": {"result": 6}, "add_ten": {"result": 12}, "double": {"value": 4}}, - expected_run_order=["add_one", "repeat", "double", "add_ten", "add_three", "add_one_again"], + expected_run_order=["add_one", "repeat", "add_ten", "double", "add_three", "add_one_again"], ) ], ) @@ -448,7 +448,7 @@ def pipeline_that_has_two_branches_that_merge(): PipelineRunData( inputs={"first_addition": {"value": 1}, "third_addition": {"value": 1}}, expected_outputs={"fourth_addition": {"result": 3}}, - expected_run_order=["first_addition", "second_addition", "third_addition", "diff", "fourth_addition"], + expected_run_order=["first_addition", "third_addition", "second_addition", "diff", "fourth_addition"], ) ], ) @@ -633,7 +633,7 @@ def pipeline_that_has_a_component_with_mutable_output_sent_to_multiple_inputs(): }, "mm2": {"merged_message": "Fake message"}, }, - expected_run_order=["prompt_builder", "mm1", "llm", "mm2"], + expected_run_order=["prompt_builder", "llm", "mm1", "mm2"], ) ], ) @@ -1021,7 +1021,7 @@ def pipeline_that_has_multiple_branches_of_different_lengths_that_merge_into_a_c PipelineRunData( inputs={"first_addition": {"value": 1}, "third_addition": {"value": 1}}, expected_outputs={"fourth_addition": {"result": 12}}, - expected_run_order=["first_addition", "second_addition", "third_addition", "sum", "fourth_addition"], + expected_run_order=["first_addition", "third_addition", "second_addition", "sum", "fourth_addition"], ) ], ) @@ -1376,3 +1376,109 @@ def pipeline_that_has_a_loop_and_a_component_with_default_inputs_that_doesnt_rec ) ], ) + + +@given( + "a pipeline that has multiple components with only default inputs and are added in a different order from the order of execution", + target_fixture="pipeline_data", +) +def pipeline_that_has_multiple_components_with_only_default_inputs_and_are_added_in_a_different_order_from_the_order_of_execution(): + prompt_builder1 = PromptBuilder( + template=""" + You are a spellchecking system. Check the given query and fill in the corrected query. + + Question: {{question}} + Corrected question: + """ + ) + prompt_builder2 = PromptBuilder( + template=""" + According to these documents: + + {% for doc in documents %} + {{ doc.content }} + {% endfor %} + + Answer the given question: {{question}} + Answer: + """ + ) + prompt_builder3 = PromptBuilder( + template=""" + {% for ans in replies %} + {{ ans }} + {% endfor %} + """ + ) + + @component + class FakeRetriever: + @component.output_types(documents=List[Document]) + def run( + self, + query: str, + filters: Optional[Dict[str, Any]] = None, + top_k: Optional[int] = None, + scale_score: Optional[bool] = None, + ): + return {"documents": [Document(content="This is a document")]} + + @component + class FakeRanker: + @component.output_types(documents=List[Document]) + def run( + self, + query: str, + documents: List[Document], + top_k: Optional[int] = None, + scale_score: Optional[bool] = None, + calibration_factor: Optional[float] = None, + score_threshold: Optional[float] = None, + ): + return {"documents": documents} + + @component + class FakeGenerator: + @component.output_types(replies=List[str], meta=Dict[str, Any]) + def run(self, prompt: str, generation_kwargs: Optional[Dict[str, Any]] = None): + return {"replies": ["This is a reply"], "meta": {"meta_key": "meta_value"}} + + pipeline = Pipeline() + pipeline.add_component(name="retriever", instance=FakeRetriever()) + pipeline.add_component(name="ranker", instance=FakeRanker()) + pipeline.add_component(name="prompt_builder2", instance=prompt_builder2) + pipeline.add_component(name="prompt_builder1", instance=prompt_builder1) + pipeline.add_component(name="prompt_builder3", instance=prompt_builder3) + pipeline.add_component(name="llm", instance=FakeGenerator()) + pipeline.add_component(name="spellchecker", instance=FakeGenerator()) + + pipeline.connect("prompt_builder1", "spellchecker") + pipeline.connect("spellchecker.replies", "prompt_builder3") + pipeline.connect("prompt_builder3", "retriever.query") + pipeline.connect("prompt_builder3", "ranker.query") + pipeline.connect("retriever.documents", "ranker.documents") + pipeline.connect("ranker.documents", "prompt_builder2.documents") + pipeline.connect("prompt_builder3", "prompt_builder2.question") + pipeline.connect("prompt_builder2", "llm") + + return ( + pipeline, + [ + PipelineRunData( + inputs={"prompt_builder1": {"question": "Wha i Acromegaly?"}}, + expected_outputs={ + "llm": {"replies": ["This is a reply"], "meta": {"meta_key": "meta_value"}}, + "spellchecker": {"meta": {"meta_key": "meta_value"}}, + }, + expected_run_order=[ + "prompt_builder1", + "spellchecker", + "prompt_builder3", + "retriever", + "ranker", + "prompt_builder2", + "llm", + ], + ) + ], + ) diff --git a/test/core/pipeline/test_pipeline.py b/test/core/pipeline/test_pipeline.py index ae8fd34e5..f87dfa6e8 100644 --- a/test/core/pipeline/test_pipeline.py +++ b/test/core/pipeline/test_pipeline.py @@ -840,12 +840,13 @@ class TestPipeline: data = {"yet_another_with_single_input": {"in": 1}} run_queue = pipe._init_run_queue(data) - assert len(run_queue) == 5 - assert run_queue[0][0] == "with_variadic" - assert run_queue[1][0] == "with_no_inputs" - assert run_queue[2][0] == "with_single_input" - assert run_queue[3][0] == "yet_another_with_single_input" + assert len(run_queue) == 6 + assert run_queue[0][0] == "with_no_inputs" + assert run_queue[1][0] == "with_single_input" + assert run_queue[2][0] == "yet_another_with_single_input" + assert run_queue[3][0] == "another_with_single_input" assert run_queue[4][0] == "with_multiple_inputs" + assert run_queue[5][0] == "with_variadic" def test__init_inputs_state(self): pipe = Pipeline()