from pytest_bdd import scenarios, given import pytest from haystack import Pipeline, component from haystack.core.errors import PipelineMaxLoops from haystack.testing.sample_components import AddFixedValue, Double from haystack.testing.factory import component_class pytestmark = pytest.mark.integration scenarios("pipeline_run.feature") @given("a pipeline that has no components", target_fixture="pipeline_data") def pipeline_that_has_no_components(): pipeline = Pipeline() inputs = {} expected_outputs = {} return pipeline, inputs, expected_outputs, [] @given("a pipeline that is linear", target_fixture="pipeline_data") def pipeline_that_is_linear(): pipeline = Pipeline() pipeline.add_component("first_addition", AddFixedValue(add=2)) pipeline.add_component("second_addition", AddFixedValue()) pipeline.add_component("double", Double()) pipeline.connect("first_addition", "double") pipeline.connect("double", "second_addition") return ( pipeline, {"first_addition": {"value": 1}}, {"second_addition": {"result": 7}}, ["first_addition", "double", "second_addition"], ) @given("a pipeline that has an infinite loop", target_fixture="pipeline_data") def pipeline_that_has_an_infinite_loop(): def custom_init(self): component.set_input_type(self, "x", int) component.set_input_type(self, "y", int, 1) component.set_output_types(self, a=int, b=int) FakeComponent = component_class("FakeComponent", output={"a": 1, "b": 1}, extra_fields={"__init__": custom_init}) pipe = Pipeline(max_loops_allowed=1) pipe.add_component("first", FakeComponent()) pipe.add_component("second", FakeComponent()) pipe.connect("first.a", "second.x") pipe.connect("second.b", "first.y") return pipe, {"first": {"x": 1}}, PipelineMaxLoops