diff --git a/haystack/core/pipeline/pipeline.py b/haystack/core/pipeline/pipeline.py index 5737d3095..10d0389a3 100644 --- a/haystack/core/pipeline/pipeline.py +++ b/haystack/core/pipeline/pipeline.py @@ -948,6 +948,26 @@ class Pipeline: if not there_are_only_lazy_variadics: continue + # Components that have defaults for all their inputs must be treated the same identical way as we treat + # lazy variadic components. If there are only components with defaults we can run them. + # If we don't do this the order of execution of the Pipeline's Components will be affected cause we + # enqueue the Components in `to_run` at the start using the order they are added in the Pipeline. + # If a Component A with defaults is added before a Component B that has no defaults, but in the Pipeline + # logic A must be executed after B it could run instead before if we don't do this check. + has_only_defaults = all( + not socket.is_mandatory for socket in comp.__haystack_input__._sockets_dict.values() # type: ignore + ) + if has_only_defaults: + there_are_only_components_with_defaults = True + for other_name, other_comp in waiting_for_input: + if name == other_name: + continue + there_are_only_components_with_defaults &= all( + not s.is_mandatory for s in other_comp.__haystack_input__._sockets_dict.values() # type: ignore + ) + if not there_are_only_components_with_defaults: + continue + # Find the first component that has all the inputs it needs to run has_enough_inputs = True for input_socket in comp.__haystack_input__._sockets_dict.values(): # type: ignore diff --git a/releasenotes/notes/fix-pipeline-run-disorder-382da1e6bd6db510.yaml b/releasenotes/notes/fix-pipeline-run-disorder-382da1e6bd6db510.yaml new file mode 100644 index 000000000..e5255e603 --- /dev/null +++ b/releasenotes/notes/fix-pipeline-run-disorder-382da1e6bd6db510.yaml @@ -0,0 +1,7 @@ +--- +fixes: + - | + Fixes `Pipeline.run()` logic so Components that have all their inputs with a default are run in the correct order. + This happened we gather a list of Components to run internally when running the Pipeline in the order they are + added during creation of the Pipeline. + This caused some Components to run before they received all their inputs. diff --git a/test/core/pipeline/test_pipeline.py b/test/core/pipeline/test_pipeline.py index ef15ccf1a..ab493b9c6 100644 --- a/test/core/pipeline/test_pipeline.py +++ b/test/core/pipeline/test_pipeline.py @@ -756,3 +756,54 @@ def test_walk_pipeline_with_cycles(): pipeline.add_component("hello", hello) pipeline.connect("hello.intermediate", "hello.intermediate") assert [("hello", hello)] == list(pipeline.walk()) + + +def test_correct_execution_order_of_components_with_only_defaults(spying_tracer): + """ + We enqueue the Components in internal `to_run` data structure at the start of `Pipeline.run()` using the order + they are added in the Pipeline with `Pipeline.add_component()`. + If a Component A with defaults is added before a Component B that has no defaults, but in the Pipeline + logic A must be executed after B it could run instead before. + + This test verifies that the order of execution is correct. + """ + docs = [Document(content="Rome is the capital of Italy"), Document(content="Paris is the capital of France")] + doc_store = InMemoryDocumentStore() + doc_store.write_documents(docs) + template = ( + "Given the following information, answer the question.\n" + "Context:\n" + "{% for document in documents %}" + " {{ document.content }}\n" + "{% endfor %}" + "Question: {{ query }}" + ) + + pipe = Pipeline() + + # The order of this addition is important for the test + # Do not edit them. + pipe.add_component("prompt_builder", PromptBuilder(template=template)) + pipe.add_component("retriever", InMemoryBM25Retriever(document_store=doc_store)) + pipe.connect("retriever", "prompt_builder.documents") + + query = "What is the capital of France?" + res = pipe.run({"prompt_builder": {"query": query}, "retriever": {"query": query}}) + + assert len(spying_tracer.spans) == 3 + assert spying_tracer.spans[0].operation_name == "haystack.pipeline.run" + assert spying_tracer.spans[1].operation_name == "haystack.component.run" + assert spying_tracer.spans[1].tags["haystack.component.name"] == "retriever" + assert spying_tracer.spans[2].operation_name == "haystack.component.run" + assert spying_tracer.spans[2].tags["haystack.component.name"] == "prompt_builder" + + print(res["prompt_builder"]["prompt"]) + assert res == { + "prompt_builder": { + "prompt": "Given the following information, answer the question.\n" + "Context:\n" + " Paris is the capital of France\n" + " Rome is the capital of Italy\n" + "Question: What is the capital of France?" + } + }