diff --git a/test/core/pipeline/features/conftest.py b/test/core/pipeline/features/conftest.py index 37a51be3f..c97a7dca9 100644 --- a/test/core/pipeline/features/conftest.py +++ b/test/core/pipeline/features/conftest.py @@ -1,11 +1,11 @@ -from typing import Tuple, List, Dict, Any +from typing import Tuple, List, Dict, Any, Set from pytest_bdd import when, then, parsers from haystack import Pipeline -PipelineData = Tuple[Pipeline, List[Dict[str, Any]], List[Dict[str, Any]], List[List[str]]] +PipelineData = Tuple[Pipeline, List[Tuple[Dict[str, Any], Set[str]]], List[Dict[str, Any]], List[List[str]]] PipelineResult = Tuple[List[Dict[str, Any]], List[Dict[str, Any]], List[List[str]], List[List[str]]] @@ -15,7 +15,7 @@ def run_pipeline(pipeline_data: PipelineData, spying_tracer): Attempts to run a pipeline with the given inputs. `pipeline_data` is a tuple that must contain: * A Pipeline instance - * The Pipeline inputs + * The Pipeline inputs, and optionally components * The expected outputs Optionally it can contain: @@ -39,7 +39,10 @@ def run_pipeline(pipeline_data: PipelineData, spying_tracer): for i in inputs: try: - res = pipeline.run(i) + if isinstance(i, tuple): + res = pipeline.run(data=i[0], include_outputs_from=i[1]) + else: + res = pipeline.run(i) run_order = [ span.tags["haystack.component.name"] for span in spying_tracer.spans diff --git a/test/core/pipeline/features/pipeline_run.feature b/test/core/pipeline/features/pipeline_run.feature index 8d0da40e7..f388e3168 100644 --- a/test/core/pipeline/features/pipeline_run.feature +++ b/test/core/pipeline/features/pipeline_run.feature @@ -31,6 +31,9 @@ Feature: Pipeline running | that has a component that sends one of its outputs to itself | | that has multiple branches that merge into a component with a single variadic input | | that has multiple branches of different lengths that merge into a component with a single variadic input | + | that is linear and returns intermediate outputs | + | that has a loop and returns intermediate outputs from it | + | that is linear and returns intermediate outputs from multiple sockets | Scenario Outline: Running a bad Pipeline Given a pipeline diff --git a/test/core/pipeline/features/test_run.py b/test/core/pipeline/features/test_run.py index c95871902..f75849791 100644 --- a/test/core/pipeline/features/test_run.py +++ b/test/core/pipeline/features/test_run.py @@ -878,3 +878,114 @@ def pipeline_that_has_multiple_branches_of_different_lengths_that_merge_into_a_c {"fourth_addition": {"result": 12}}, ["first_addition", "second_addition", "third_addition", "sum", "fourth_addition"], ) + + +@given("a pipeline that is linear and returns intermediate outputs", target_fixture="pipeline_data") +def pipeline_that_is_linear_and_returns_intermediate_outputs(): + pipeline = Pipeline() + pipeline.add_component("first_addition", AddFixedValue(add=2)) + pipeline.add_component("second_addition", AddFixedValue()) + pipeline.add_component("double", Double()) + pipeline.connect("first_addition", "double") + pipeline.connect("double", "second_addition") + + return ( + pipeline, + [ + ({"first_addition": {"value": 1}}, {"first_addition", "second_addition", "double"}), + ({"first_addition": {"value": 1}}, {"double"}), + ], + [ + {"second_addition": {"result": 7}, "first_addition": {"result": 3}, "double": {"value": 6}}, + {"second_addition": {"result": 7}, "double": {"value": 6}}, + ], + [["first_addition", "double", "second_addition"], ["first_addition", "double", "second_addition"]], + ) + + +@given("a pipeline that has a loop and returns intermediate outputs from it", target_fixture="pipeline_data") +def pipeline_that_has_a_loop_and_returns_intermediate_outputs_from_it(): + pipeline = Pipeline(max_loops_allowed=10) + pipeline.add_component("add_one", AddFixedValue(add=1)) + pipeline.add_component("multiplexer", Multiplexer(type_=int)) + pipeline.add_component("below_10", Threshold(threshold=10)) + pipeline.add_component("below_5", Threshold(threshold=5)) + pipeline.add_component("add_three", AddFixedValue(add=3)) + pipeline.add_component("accumulator", Accumulate()) + pipeline.add_component("add_two", AddFixedValue(add=2)) + + pipeline.connect("add_one.result", "multiplexer") + pipeline.connect("multiplexer.value", "below_10.value") + pipeline.connect("below_10.below", "accumulator.value") + pipeline.connect("accumulator.value", "below_5.value") + pipeline.connect("below_5.above", "add_three.value") + pipeline.connect("below_5.below", "multiplexer") + pipeline.connect("add_three.result", "multiplexer") + pipeline.connect("below_10.above", "add_two.value") + + return ( + pipeline, + ( + {"add_one": {"value": 3}}, + {"add_two", "add_one", "multiplexer", "below_10", "accumulator", "below_5", "add_three"}, + ), + { + "add_two": {"result": 13}, + "add_one": {"result": 4}, + "multiplexer": {"value": 11}, + "below_10": {"above": 11}, + "accumulator": {"value": 8}, + "below_5": {"above": 8}, + "add_three": {"result": 11}, + }, + [ + "add_one", + "multiplexer", + "below_10", + "accumulator", + "below_5", + "multiplexer", + "below_10", + "accumulator", + "below_5", + "add_three", + "multiplexer", + "below_10", + "add_two", + ], + ) + + +@given( + "a pipeline that is linear and returns intermediate outputs from multiple sockets", target_fixture="pipeline_data" +) +def pipeline_that_is_linear_and_returns_intermediate_outputs_from_multiple_sockets(): + @component + class DoubleWithOriginal: + """ + Doubles the input value and returns the original value as well. + """ + + @component.output_types(value=int, original=int) + def run(self, value: int): + return {"value": value * 2, "original": value} + + pipeline = Pipeline() + pipeline.add_component("first_addition", AddFixedValue(add=2)) + pipeline.add_component("second_addition", AddFixedValue()) + pipeline.add_component("double", DoubleWithOriginal()) + pipeline.connect("first_addition", "double") + pipeline.connect("double.value", "second_addition") + + return ( + pipeline, + [ + ({"first_addition": {"value": 1}}, {"first_addition", "second_addition", "double"}), + ({"first_addition": {"value": 1}}, {"double"}), + ], + [ + {"second_addition": {"result": 7}, "first_addition": {"result": 3}, "double": {"value": 6, "original": 3}}, + {"second_addition": {"result": 7}, "double": {"value": 6, "original": 3}}, + ], + [["first_addition", "double", "second_addition"], ["first_addition", "double", "second_addition"]], + ) diff --git a/test/core/pipeline/test_intermediate_outputs.py b/test/core/pipeline/test_intermediate_outputs.py deleted file mode 100644 index a241fad66..000000000 --- a/test/core/pipeline/test_intermediate_outputs.py +++ /dev/null @@ -1,97 +0,0 @@ -# SPDX-FileCopyrightText: 2022-present deepset GmbH -# -# SPDX-License-Identifier: Apache-2.0 -import logging - -from haystack.components.others import Multiplexer -from haystack.core.pipeline import Pipeline -from haystack.core.component import component -from haystack.testing.sample_components import Accumulate, AddFixedValue, Double, Threshold - -logging.basicConfig(level=logging.DEBUG) - - -@component -class DoubleWithOriginal: - """ - Doubles the input value and returns the original value as well. - """ - - @component.output_types(value=int, original=int) - def run(self, value: int): - return {"value": value * 2, "original": value} - - -def test_pipeline_intermediate_outputs(): - pipeline = Pipeline() - pipeline.add_component("first_addition", AddFixedValue(add=2)) - pipeline.add_component("second_addition", AddFixedValue()) - pipeline.add_component("double", Double()) - pipeline.connect("first_addition", "double") - pipeline.connect("double", "second_addition") - - results = pipeline.run( - {"first_addition": {"value": 1}}, include_outputs_from={"first_addition", "second_addition", "double"} - ) - assert results == {"second_addition": {"result": 7}, "first_addition": {"result": 3}, "double": {"value": 6}} - - results = pipeline.run({"first_addition": {"value": 1}}, include_outputs_from={"double"}) - assert results == {"second_addition": {"result": 7}, "double": {"value": 6}} - - -def test_pipeline_with_loops_intermediate_outputs(): - accumulator = Accumulate() - - pipeline = Pipeline(max_loops_allowed=10) - pipeline.add_component("add_one", AddFixedValue(add=1)) - pipeline.add_component("multiplexer", Multiplexer(type_=int)) - pipeline.add_component("below_10", Threshold(threshold=10)) - pipeline.add_component("below_5", Threshold(threshold=5)) - pipeline.add_component("add_three", AddFixedValue(add=3)) - pipeline.add_component("accumulator", accumulator) - pipeline.add_component("add_two", AddFixedValue(add=2)) - - pipeline.connect("add_one.result", "multiplexer") - pipeline.connect("multiplexer.value", "below_10.value") - pipeline.connect("below_10.below", "accumulator.value") - pipeline.connect("accumulator.value", "below_5.value") - pipeline.connect("below_5.above", "add_three.value") - pipeline.connect("below_5.below", "multiplexer") - pipeline.connect("add_three.result", "multiplexer") - pipeline.connect("below_10.above", "add_two.value") - - results = pipeline.run( - {"add_one": {"value": 3}}, - include_outputs_from={"add_two", "add_one", "multiplexer", "below_10", "accumulator", "below_5", "add_three"}, - ) - - assert results == { - "add_two": {"result": 13}, - "add_one": {"result": 4}, - "multiplexer": {"value": 11}, - "below_10": {"above": 11}, - "accumulator": {"value": 8}, - "below_5": {"above": 8}, - "add_three": {"result": 11}, - } - - -def test_pipeline_intermediate_outputs_multiple_output_sockets(): - pipeline = Pipeline() - pipeline.add_component("first_addition", AddFixedValue(add=2)) - pipeline.add_component("second_addition", AddFixedValue()) - pipeline.add_component("double", DoubleWithOriginal()) - pipeline.connect("first_addition", "double") - pipeline.connect("double.value", "second_addition") - - results = pipeline.run( - {"first_addition": {"value": 1}}, include_outputs_from={"first_addition", "second_addition", "double"} - ) - assert results == { - "second_addition": {"result": 7}, - "first_addition": {"result": 3}, - "double": {"value": 6, "original": 3}, - } - - results = pipeline.run({"first_addition": {"value": 1}}, include_outputs_from={"double"}) - assert results == {"second_addition": {"result": 7}, "double": {"value": 6, "original": 3}}