From f96eb3847ffd5f6f1ebf20f9765c99e12b1f09b4 Mon Sep 17 00:00:00 2001 From: Silvano Cerza <3314350+silvanocerza@users.noreply.github.com> Date: Mon, 12 Feb 2024 18:25:28 +0100 Subject: [PATCH] refactor: Merge `Pipeline`s definition in `core` package (#6973) * Move marshalling functions in core Pipeline * Move telemetry gathering in core Pipeline * Move run logic in core Pipeline * Update root Pipeline import * Add release notes * Update Pipeline docs path * Update releasenotes/notes/merge-pipeline-definitions-1da80e9803e2a8bb.yaml Co-authored-by: Massimiliano Pippi --------- Co-authored-by: Massimiliano Pippi --- docs/pydoc/config/pipeline_api.yml | 2 +- haystack/__init__.py | 7 +- haystack/core/pipeline/pipeline.py | 161 +++++++++++++- haystack/pipeline.py | 200 ------------------ haystack/telemetry/_telemetry.py | 13 +- ...pipeline-definitions-1da80e9803e2a8bb.yaml | 5 + test/core/pipeline/test_pipeline.py | 158 ++++++++++++++ test/test_files/yaml/test_pipeline.yaml | 4 +- test/test_pipeline.py | 166 --------------- 9 files changed, 333 insertions(+), 383 deletions(-) delete mode 100644 haystack/pipeline.py create mode 100644 releasenotes/notes/merge-pipeline-definitions-1da80e9803e2a8bb.yaml delete mode 100644 test/test_pipeline.py diff --git a/docs/pydoc/config/pipeline_api.yml b/docs/pydoc/config/pipeline_api.yml index 417dbc91e..475496d30 100644 --- a/docs/pydoc/config/pipeline_api.yml +++ b/docs/pydoc/config/pipeline_api.yml @@ -1,6 +1,6 @@ loaders: - type: haystack_pydoc_tools.loaders.CustomPythonLoader - search_path: [../../../haystack] + search_path: [../../../haystack/core/pipeline] modules: ["pipeline"] ignore_when_discovered: ["__init__"] processors: diff --git a/haystack/__init__.py b/haystack/__init__.py index 30c0026fb..b20a367e3 100644 --- a/haystack/__init__.py +++ b/haystack/__init__.py @@ -1,9 +1,8 @@ from haystack.core.component import component +from haystack.core.errors import ComponentError, DeserializationError +from haystack.core.pipeline import Pipeline from haystack.core.serialization import default_from_dict, default_to_dict -from haystack.core.errors import DeserializationError, ComponentError -from haystack.pipeline import Pipeline -from haystack.dataclasses import Document, Answer, GeneratedAnswer, ExtractedAnswer - +from haystack.dataclasses import Answer, Document, ExtractedAnswer, GeneratedAnswer __all__ = [ "component", diff --git a/haystack/core/pipeline/pipeline.py b/haystack/core/pipeline/pipeline.py index 98ba8df87..f81a8e43d 100644 --- a/haystack/core/pipeline/pipeline.py +++ b/haystack/core/pipeline/pipeline.py @@ -4,9 +4,11 @@ import importlib import itertools import logging +from collections import defaultdict from copy import copy +from datetime import datetime from pathlib import Path -from typing import Any, Dict, List, Mapping, Optional, Set, Tuple, Type, TypeVar, Union +from typing import Any, Dict, List, Mapping, Optional, Set, TextIO, Tuple, Type, TypeVar, Union import networkx # type:ignore @@ -20,11 +22,14 @@ from haystack.core.errors import ( ) from haystack.core.serialization import component_from_dict, component_to_dict from haystack.core.type_utils import _type_name, _types_are_compatible +from haystack.marshal import Marshaller, YamlMarshaller +from haystack.telemetry import pipeline_running from haystack.utils import is_in_jupyter from .descriptions import find_pipeline_inputs, find_pipeline_outputs from .draw import _to_mermaid_image +DEFAULT_MARSHALLER = YamlMarshaller() logger = logging.getLogger(__name__) # We use a generic type to annotate the return value of classmethods, @@ -56,6 +61,8 @@ class Pipeline: max_loops_allowed: how many times the pipeline can run the same node before throwing an exception. debug_path: when debug is enabled in `run()`, where to save the debug data. """ + self._telemetry_runs = 0 + self._last_telemetry_sent: Optional[datetime] = None self.metadata = metadata or {} self.max_loops_allowed = max_loops_allowed self.graph = networkx.MultiDiGraph() @@ -194,6 +201,57 @@ class Pipeline: return pipe + def dumps(self, marshaller: Marshaller = DEFAULT_MARSHALLER) -> str: + """ + Returns the string representation of this pipeline according to the + format dictated by the `Marshaller` in use. + + :params marshaller: The Marshaller used to create the string representation. Defaults to + `YamlMarshaller` + + :returns: A string representing the pipeline. + """ + return marshaller.marshal(self.to_dict()) + + def dump(self, fp: TextIO, marshaller: Marshaller = DEFAULT_MARSHALLER): + """ + Writes the string representation of this pipeline to the file-like object + passed in the `fp` argument. + + :params fp: A file-like object ready to be written to. + :params marshaller: The Marshaller used to create the string representation. Defaults to + `YamlMarshaller`. + """ + fp.write(marshaller.marshal(self.to_dict())) + + @classmethod + def loads(cls, data: Union[str, bytes, bytearray], marshaller: Marshaller = DEFAULT_MARSHALLER) -> "Pipeline": + """ + Creates a `Pipeline` object from the string representation passed in the `data` argument. + + :params data: The string representation of the pipeline, can be `str`, `bytes` or `bytearray`. + :params marshaller: the Marshaller used to create the string representation. Defaults to + `YamlMarshaller` + + :returns: A `Pipeline` object. + """ + return cls.from_dict(marshaller.unmarshal(data)) + + @classmethod + def load(cls, fp: TextIO, marshaller: Marshaller = DEFAULT_MARSHALLER) -> "Pipeline": + """ + Creates a `Pipeline` object from the string representation read from the file-like + object passed in the `fp` argument. + + :params data: The string representation of the pipeline, can be `str`, `bytes` or `bytearray`. + :params fp: A file-like object ready to be read from. + :params marshaller: the Marshaller used to create the string representation. Defaults to + `YamlMarshaller` + + :returns: A `Pipeline` object. + """ + return cls.from_dict(marshaller.unmarshal(fp.read())) + def add_component(self, name: str, instance: Component) -> None: """ Create a component for the given component. Components are not connected to anything by default: @@ -545,10 +603,54 @@ class Pipeline: f"Input {socket_name} for component {component_name} is already sent by {socket.senders}." ) - # TODO: We're ignoring this linting rules for the time being, after we properly optimize this function we'll remove the noqa - def run( # noqa: C901, PLR0912 pylint: disable=too-many-branches + # TODO: We're ignoring these linting rules for the time being, after we properly optimize this function we'll remove the noqa + def run( # noqa: C901, PLR0912, PLR0915 pylint: disable=too-many-branches self, data: Dict[str, Any], debug: bool = False ) -> Dict[str, Any]: + """ + Runs the pipeline with given input data. + + :param data: A dictionary of inputs for the pipeline's components. Each key is a component name + and its value is a dictionary of that component's input parameters. + :param debug: Set to True to collect and return debug information. + :return: A dictionary containing the pipeline's output. + :raises PipelineRuntimeError: If a component fails or returns unexpected output. + + Example a - Using named components: + Consider a 'Hello' component that takes a 'word' input and outputs a greeting. + + ```python + @component + class Hello: + @component.output_types(output=str) + def run(self, word: str): + return {"output": f"Hello, {word}!"} + ``` + + Create a pipeline with two 'Hello' components connected together: + + ```python + pipeline = Pipeline() + pipeline.add_component("hello", Hello()) + pipeline.add_component("hello2", Hello()) + pipeline.connect("hello.output", "hello2.word") + result = pipeline.run(data={"hello": {"word": "world"}}) + ``` + + This runs the pipeline with the specified input for 'hello', yielding + {'hello2': {'output': 'Hello, Hello, world!!'}}. + + Example b - Using flat inputs: + You can also pass inputs directly without specifying component names: + + ```python + result = pipeline.run(data={"word": "world"}) + ``` + + The pipeline resolves inputs to the correct components, returning + {'hello2': {'output': 'Hello, Hello, world!!'}}. + """ + pipeline_running(self) # NOTE: We're assuming data is formatted like so as of now # data = { # "comp1": {"input1": 1, "input2": 2}, @@ -563,9 +665,22 @@ class Pipeline: # As of now it's here to make sure we don't have failing tests that assume warm_up() is called in run() self.warm_up() + # check whether the data is a nested dictionary of component inputs where each key is a component name + # and each value is a dictionary of input parameters for that component + is_nested_component_input = all(isinstance(value, dict) for value in data.values()) + if not is_nested_component_input: + # flat input, a dict where keys are input names and values are the corresponding values + # we need to convert it to a nested dictionary of component inputs and then run the pipeline + # just like in the previous case + data, unresolved_inputs = self._prepare_component_input_data(data) + if unresolved_inputs: + logger.warning( + "Inputs %s were not matched to any component inputs, please check your run parameters.", + list(unresolved_inputs.keys()), + ) + # Raise if input is malformed in some way self._validate_input(data) - # NOTE: The above NOTE and TODO are technically not true. # This implementation of run supports only the first format, but the second format is actually # never received by this method. It's handled by the `run()` method of the `Pipeline` class @@ -774,6 +889,44 @@ class Pipeline: return final_outputs + def _prepare_component_input_data(self, data: Dict[str, Any]) -> Tuple[Dict[str, Dict[str, Any]], Dict[str, Any]]: + """ + Organizes input data for pipeline components and identifies any inputs that are not matched to any + component's input slots. + + This method processes a flat dictionary of input data, where each key-value pair represents an input name + and its corresponding value. It distributes these inputs to the appropriate pipeline components based on + their input requirements. Inputs that don't match any component's input slots are classified as unresolved. + + :param data: A dictionary with input names as keys and input values as values. + :type data: Dict[str, Any] + :return: A tuple containing two elements: + 1. A dictionary mapping component names to their respective matched inputs. + 2. A dictionary of inputs that were not matched to any component, termed as unresolved keyword arguments. + :rtype: Tuple[Dict[str, Dict[str, Any]], Dict[str, Any]] + """ + pipeline_input_data: Dict[str, Dict[str, Any]] = defaultdict(dict) + unresolved_kwargs = {} + + # Retrieve the input slots for each component in the pipeline + available_inputs: Dict[str, Dict[str, Any]] = self.inputs() + + # Go through all provided to distribute them to the appropriate component inputs + for input_name, input_value in data.items(): + resolved_at_least_once = False + + # Check each component to see if it has a slot for the current kwarg + for component_name, component_inputs in available_inputs.items(): + if input_name in component_inputs: + # If a match is found, add the kwarg to the component's input data + pipeline_input_data[component_name][input_name] = input_value + resolved_at_least_once = True + + if not resolved_at_least_once: + unresolved_kwargs[input_name] = input_value + + return pipeline_input_data, unresolved_kwargs + def _connections_status( sender_node: str, receiver_node: str, sender_sockets: List[OutputSocket], receiver_sockets: List[InputSocket] diff --git a/haystack/pipeline.py b/haystack/pipeline.py deleted file mode 100644 index be8d79583..000000000 --- a/haystack/pipeline.py +++ /dev/null @@ -1,200 +0,0 @@ -from collections import defaultdict -from typing import Any, Dict, Optional, Union, TextIO, Tuple -from pathlib import Path -import datetime -import logging - -from haystack.core.pipeline import Pipeline as _pipeline -from haystack.telemetry import pipeline_running -from haystack.marshal import Marshaller, YamlMarshaller - - -DEFAULT_MARSHALLER = YamlMarshaller() -logger = logging.getLogger(__name__) - - -class Pipeline(_pipeline): - def __init__( - self, - metadata: Optional[Dict[str, Any]] = None, - max_loops_allowed: int = 100, - debug_path: Union[Path, str] = Path(".haystack_debug/"), - ): - """ - Creates the Pipeline. - - Args: - metadata: arbitrary dictionary to store metadata about this pipeline. Make sure all the values contained in - this dictionary can be serialized and deserialized if you wish to save this pipeline to file with - `save_pipelines()/load_pipelines()`. - max_loops_allowed: how many times the pipeline can run the same node before throwing an exception. - debug_path: when debug is enabled in `run()`, where to save the debug data. - """ - self._telemetry_runs = 0 - self._last_telemetry_sent: Optional[datetime.datetime] = None - super().__init__(metadata=metadata, max_loops_allowed=max_loops_allowed, debug_path=debug_path) - - def run(self, data: Dict[str, Any], debug: bool = False) -> Dict[str, Any]: - """ - Runs the pipeline with given input data. - - :param data: A dictionary of inputs for the pipeline's components. Each key is a component name - and its value is a dictionary of that component's input parameters. - :param debug: Set to True to collect and return debug information. - :return: A dictionary containing the pipeline's output. - :raises PipelineRuntimeError: If a component fails or returns unexpected output. - - Example a - Using named components: - Consider a 'Hello' component that takes a 'word' input and outputs a greeting. - - ```python - @component - class Hello: - @component.output_types(output=str) - def run(self, word: str): - return {"output": f"Hello, {word}!"} - ``` - - Create a pipeline with two 'Hello' components connected together: - - ```python - pipeline = Pipeline() - pipeline.add_component("hello", Hello()) - pipeline.add_component("hello2", Hello()) - pipeline.connect("hello.output", "hello2.word") - result = pipeline.run(data={"hello": {"word": "world"}}) - ``` - - This runs the pipeline with the specified input for 'hello', yielding - {'hello2': {'output': 'Hello, Hello, world!!'}}. - - Example b - Using flat inputs: - You can also pass inputs directly without specifying component names: - - ```python - result = pipeline.run(data={"word": "world"}) - ``` - - The pipeline resolves inputs to the correct components, returning - {'hello2': {'output': 'Hello, Hello, world!!'}}. - """ - # check whether the data is a nested dictionary of component inputs where each key is a component name - # and each value is a dictionary of input parameters for that component - is_nested_component_input = all(isinstance(value, dict) for value in data.values()) - if is_nested_component_input: - return self._run_internal(data=data, debug=debug) - else: - # flat input, a dict where keys are input names and values are the corresponding values - # we need to convert it to a nested dictionary of component inputs and then run the pipeline - # just like in the previous case - pipeline_inputs, unresolved_inputs = self._prepare_component_input_data(data) - if unresolved_inputs: - logger.warning( - "Inputs %s were not matched to any component inputs, please check your run parameters.", - list(unresolved_inputs.keys()), - ) - - return self._run_internal(data=pipeline_inputs, debug=debug) - - def _run_internal(self, data: Dict[str, Any], debug: bool = False) -> Dict[str, Any]: - """ - Runs the pipeline by invoking the underlying run to initiate the pipeline execution. - - :params data: the inputs to give to the input components of the Pipeline. - :params debug: whether to collect and return debug information. - - :returns: A dictionary with the outputs of the output components of the Pipeline. - - :raises PipelineRuntimeError: if any of the components fail or return unexpected output. - """ - pipeline_running(self) - return super().run(data=data, debug=debug) - - def dumps(self, marshaller: Marshaller = DEFAULT_MARSHALLER) -> str: - """ - Returns the string representation of this pipeline according to the - format dictated by the `Marshaller` in use. - - :params marshaller: The Marshaller used to create the string representation. Defaults to - `YamlMarshaller` - - :returns: A string representing the pipeline. - """ - return marshaller.marshal(self.to_dict()) - - def dump(self, fp: TextIO, marshaller: Marshaller = DEFAULT_MARSHALLER): - """ - Writes the string representation of this pipeline to the file-like object - passed in the `fp` argument. - - :params fp: A file-like object ready to be written to. - :params marshaller: The Marshaller used to create the string representation. Defaults to - `YamlMarshaller`. - """ - fp.write(marshaller.marshal(self.to_dict())) - - @classmethod - def loads(cls, data: Union[str, bytes, bytearray], marshaller: Marshaller = DEFAULT_MARSHALLER) -> "Pipeline": - """ - Creates a `Pipeline` object from the string representation passed in the `data` argument. - - :params data: The string representation of the pipeline, can be `str`, `bytes` or `bytearray`. - :params marshaller: the Marshaller used to create the string representation. Defaults to - `YamlMarshaller` - - :returns: A `Pipeline` object. - """ - return cls.from_dict(marshaller.unmarshal(data)) - - @classmethod - def load(cls, fp: TextIO, marshaller: Marshaller = DEFAULT_MARSHALLER) -> "Pipeline": - """ - Creates a `Pipeline` object from the string representation read from the file-like - object passed in the `fp` argument. - - :params data: The string representation of the pipeline, can be `str`, `bytes` or `bytearray`. - :params fp: A file-like object ready to be read from. - :params marshaller: the Marshaller used to create the string representation. Defaults to - `YamlMarshaller` - - :returns: A `Pipeline` object. - """ - return cls.from_dict(marshaller.unmarshal(fp.read())) - - def _prepare_component_input_data(self, data: Dict[str, Any]) -> Tuple[Dict[str, Dict[str, Any]], Dict[str, Any]]: - """ - Organizes input data for pipeline components and identifies any inputs that are not matched to any - component's input slots. - - This method processes a flat dictionary of input data, where each key-value pair represents an input name - and its corresponding value. It distributes these inputs to the appropriate pipeline components based on - their input requirements. Inputs that don't match any component's input slots are classified as unresolved. - - :param data: A dictionary with input names as keys and input values as values. - :type data: Dict[str, Any] - :return: A tuple containing two elements: - 1. A dictionary mapping component names to their respective matched inputs. - 2. A dictionary of inputs that were not matched to any component, termed as unresolved keyword arguments. - :rtype: Tuple[Dict[str, Dict[str, Any]], Dict[str, Any]] - """ - pipeline_input_data: Dict[str, Dict[str, Any]] = defaultdict(dict) - unresolved_kwargs = {} - - # Retrieve the input slots for each component in the pipeline - available_inputs: Dict[str, Dict[str, Any]] = self.inputs() - - # Go through all provided to distribute them to the appropriate component inputs - for input_name, input_value in data.items(): - resolved_at_least_once = False - - # Check each component to see if it has a slot for the current kwarg - for component_name, component_inputs in available_inputs.items(): - if input_name in component_inputs: - # If a match is found, add the kwarg to the component's input data - pipeline_input_data[component_name][input_name] = input_value - resolved_at_least_once = True - - if not resolved_at_least_once: - unresolved_kwargs[input_name] = input_value - - return pipeline_input_data, unresolved_kwargs diff --git a/haystack/telemetry/_telemetry.py b/haystack/telemetry/_telemetry.py index 7a8f1af32..a0b9be8ff 100644 --- a/haystack/telemetry/_telemetry.py +++ b/haystack/telemetry/_telemetry.py @@ -1,17 +1,18 @@ -from typing import Any, Dict, Optional, TYPE_CHECKING, List, Tuple -import os -from pathlib import Path -from collections import defaultdict import datetime import logging +import os import uuid -import yaml +from collections import defaultdict +from pathlib import Path +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple + import posthog +import yaml from haystack.telemetry._environment import collect_system_specs if TYPE_CHECKING: - from haystack.pipeline import Pipeline + from haystack.core.pipeline import Pipeline HAYSTACK_TELEMETRY_ENABLED = "HAYSTACK_TELEMETRY_ENABLED" diff --git a/releasenotes/notes/merge-pipeline-definitions-1da80e9803e2a8bb.yaml b/releasenotes/notes/merge-pipeline-definitions-1da80e9803e2a8bb.yaml new file mode 100644 index 000000000..094562e8f --- /dev/null +++ b/releasenotes/notes/merge-pipeline-definitions-1da80e9803e2a8bb.yaml @@ -0,0 +1,5 @@ +--- +enhancements: + - | + Merge `Pipeline`s definitions into a single `Pipeline` class. + The class in the `haystack.pipeline` package has been deleted and only `haystack.core.pipeline` exists now. diff --git a/test/core/pipeline/test_pipeline.py b/test/core/pipeline/test_pipeline.py index 4e66f38f1..1876ee28b 100644 --- a/test/core/pipeline/test_pipeline.py +++ b/test/core/pipeline/test_pipeline.py @@ -7,6 +7,7 @@ from unittest.mock import patch import pytest +from haystack.core.component import component from haystack.core.component.types import InputSocket, OutputSocket from haystack.core.errors import PipelineDrawingError, PipelineError, PipelineRuntimeError from haystack.core.pipeline import Pipeline @@ -16,6 +17,163 @@ from haystack.testing.sample_components import AddFixedValue, Double logging.basicConfig(level=logging.DEBUG) +@component +class FakeComponent: + def __init__(self, an_init_param: Optional[str] = None): + pass + + @component.output_types(value=str) + def run(self, input_: str): + return {"value": input_} + + +def test_pipeline_resolution_simple_input(): + @component + class Hello: + @component.output_types(output=str) + def run(self, word: str): + """ + Takes a string in input and returns "Hello, !" + in output. + """ + return {"output": f"Hello, {word}!"} + + pipeline = Pipeline() + pipeline.add_component("hello", Hello()) + pipeline.add_component("hello2", Hello()) + + pipeline.connect("hello.output", "hello2.word") + result = pipeline.run(data={"hello": {"word": "world"}}) + assert result == {"hello2": {"output": "Hello, Hello, world!!"}} + + result = pipeline.run(data={"word": "world"}) + assert result == {"hello2": {"output": "Hello, Hello, world!!"}} + + +def test_pipeline_resolution_wrong_input_name(caplog): + @component + class Hello: + @component.output_types(output=str) + def run(self, who: str): + """ + Takes a string in input and returns "Hello, !" + in output. + """ + return {"output": f"Hello, {who}!"} + + pipeline = Pipeline() + pipeline.add_component("hello", Hello()) + pipeline.add_component("hello2", Hello()) + + pipeline.connect("hello.output", "hello2.who") + + # test case with nested component inputs + with pytest.raises(ValueError): + pipeline.run(data={"hello": {"non_existing_input": "world"}}) + + # test case with flat component inputs + with pytest.raises(ValueError): + pipeline.run(data={"non_existing_input": "world"}) + + # important to check that the warning is logged for UX purposes, leave it here + assert "were not matched to any component" in caplog.text + + +def test_pipeline_resolution_with_mixed_correct_and_incorrect_input_names(caplog): + @component + class Hello: + @component.output_types(output=str) + def run(self, who: str): + """ + Takes a string in input and returns "Hello, !" + in output. + """ + return {"output": f"Hello, {who}!"} + + pipeline = Pipeline() + pipeline.add_component("hello", Hello()) + pipeline.add_component("hello2", Hello()) + + pipeline.connect("hello.output", "hello2.who") + + # test case with nested component inputs + # this will raise ValueError because hello component does not have an input named "non_existing_input" + # even though it has an input named "who" + with pytest.raises(ValueError): + pipeline.run(data={"hello": {"non_existing_input": "world", "who": "world"}}) + + # test case with flat component inputs + # this will not raise ValueError because the input "who" will be resolved to the correct component + # and we'll log a warning for the input "non_existing_input" which was not resolved + result = pipeline.run(data={"non_existing_input": "world", "who": "world"}) + assert result == {"hello2": {"output": "Hello, Hello, world!!"}} + + # important to check that the warning is logged for UX purposes, leave it here + assert "were not matched to any component" in caplog.text + + +def test_pipeline_resolution_duplicate_input_names_across_components(): + @component + class Hello: + @component.output_types(output=str) + def run(self, who: str, what: str): + return {"output": f"Hello {who} {what}!"} + + pipe = Pipeline() + pipe.add_component("hello", Hello()) + pipe.add_component("hello2", Hello()) + + pipe.connect("hello.output", "hello2.who") + + result = pipe.run(data={"what": "Haystack", "who": "world"}) + assert result == {"hello2": {"output": "Hello Hello world Haystack! Haystack!"}} + + resolved, _ = pipe._prepare_component_input_data(data={"what": "Haystack", "who": "world"}) + + # why does hello2 have only one input? Because who of hello2 is inserted from hello.output + assert resolved == {"hello": {"what": "Haystack", "who": "world"}, "hello2": {"what": "Haystack"}} + + +def test_pipeline_dumps(test_files_path): + pipeline = Pipeline() + pipeline.add_component("Comp1", FakeComponent("Foo")) + pipeline.add_component("Comp2", FakeComponent()) + pipeline.connect("Comp1.value", "Comp2.input_") + pipeline.max_loops_allowed = 99 + result = pipeline.dumps() + with open(f"{test_files_path}/yaml/test_pipeline.yaml", "r") as f: + assert f.read() == result + + +def test_pipeline_loads(test_files_path): + with open(f"{test_files_path}/yaml/test_pipeline.yaml", "r") as f: + pipeline = Pipeline.loads(f.read()) + assert pipeline.max_loops_allowed == 99 + assert isinstance(pipeline.get_component("Comp1"), FakeComponent) + assert isinstance(pipeline.get_component("Comp2"), FakeComponent) + + +def test_pipeline_dump(test_files_path, tmp_path): + pipeline = Pipeline() + pipeline.add_component("Comp1", FakeComponent("Foo")) + pipeline.add_component("Comp2", FakeComponent()) + pipeline.connect("Comp1.value", "Comp2.input_") + pipeline.max_loops_allowed = 99 + with open(tmp_path / "out.yaml", "w") as f: + pipeline.dump(f) + # re-open and ensure it's the same data as the test file + with open(f"{test_files_path}/yaml/test_pipeline.yaml", "r") as test_f, open(tmp_path / "out.yaml", "r") as f: + assert f.read() == test_f.read() + + +def test_pipeline_load(test_files_path): + with open(f"{test_files_path}/yaml/test_pipeline.yaml", "r") as f: + pipeline = Pipeline.load(f) + assert pipeline.max_loops_allowed == 99 + assert isinstance(pipeline.get_component("Comp1"), FakeComponent) + assert isinstance(pipeline.get_component("Comp2"), FakeComponent) + + @patch("haystack.core.pipeline.pipeline._to_mermaid_image") @patch("haystack.core.pipeline.pipeline.is_in_jupyter") @patch("IPython.display.Image") diff --git a/test/test_files/yaml/test_pipeline.yaml b/test/test_files/yaml/test_pipeline.yaml index 21499d5ee..ee8b3f689 100644 --- a/test/test_files/yaml/test_pipeline.yaml +++ b/test/test_files/yaml/test_pipeline.yaml @@ -2,11 +2,11 @@ components: Comp1: init_parameters: an_init_param: null - type: test.test_pipeline.TestComponent + type: test.core.pipeline.test_pipeline.FakeComponent Comp2: init_parameters: an_init_param: null - type: test.test_pipeline.TestComponent + type: test.core.pipeline.test_pipeline.FakeComponent connections: - receiver: Comp2.input_ sender: Comp1.value diff --git a/test/test_pipeline.py b/test/test_pipeline.py deleted file mode 100644 index 64d537422..000000000 --- a/test/test_pipeline.py +++ /dev/null @@ -1,166 +0,0 @@ -from typing import Optional - -import pytest - -from haystack import Pipeline, component - - -@component -class TestComponent: - def __init__(self, an_init_param: Optional[str] = None): - pass - - @component.output_types(value=str) - def run(self, input_: str): - return {"value": input_} - - -@pytest.fixture -def pipeline(): - return Pipeline() - - -def test_pipeline_dumps(pipeline, test_files_path): - pipeline.add_component("Comp1", TestComponent("Foo")) - pipeline.add_component("Comp2", TestComponent()) - pipeline.connect("Comp1.value", "Comp2.input_") - pipeline.max_loops_allowed = 99 - result = pipeline.dumps() - with open(f"{test_files_path}/yaml/test_pipeline.yaml", "r") as f: - assert f.read() == result - - -def test_pipeline_loads(test_files_path): - with open(f"{test_files_path}/yaml/test_pipeline.yaml", "r") as f: - pipeline = Pipeline.loads(f.read()) - assert pipeline.max_loops_allowed == 99 - assert isinstance(pipeline.get_component("Comp1"), TestComponent) - assert isinstance(pipeline.get_component("Comp2"), TestComponent) - - -def test_pipeline_dump(pipeline, test_files_path, tmp_path): - pipeline.add_component("Comp1", TestComponent("Foo")) - pipeline.add_component("Comp2", TestComponent()) - pipeline.connect("Comp1.value", "Comp2.input_") - pipeline.max_loops_allowed = 99 - with open(tmp_path / "out.yaml", "w") as f: - pipeline.dump(f) - # re-open and ensure it's the same data as the test file - with open(f"{test_files_path}/yaml/test_pipeline.yaml", "r") as test_f, open(tmp_path / "out.yaml", "r") as f: - assert f.read() == test_f.read() - - -def test_pipeline_load(test_files_path): - with open(f"{test_files_path}/yaml/test_pipeline.yaml", "r") as f: - pipeline = Pipeline.load(f) - assert pipeline.max_loops_allowed == 99 - assert isinstance(pipeline.get_component("Comp1"), TestComponent) - assert isinstance(pipeline.get_component("Comp2"), TestComponent) - - -@pytest.mark.unit -def test_pipeline_resolution_simple_input(): - @component - class Hello: - @component.output_types(output=str) - def run(self, word: str): - """ - Takes a string in input and returns "Hello, !" - in output. - """ - return {"output": f"Hello, {word}!"} - - pipeline = Pipeline() - pipeline.add_component("hello", Hello()) - pipeline.add_component("hello2", Hello()) - - pipeline.connect("hello.output", "hello2.word") - result = pipeline.run(data={"hello": {"word": "world"}}) - assert result == {"hello2": {"output": "Hello, Hello, world!!"}} - - result = pipeline.run(data={"word": "world"}) - assert result == {"hello2": {"output": "Hello, Hello, world!!"}} - - -def test_pipeline_resolution_wrong_input_name(caplog): - @component - class Hello: - @component.output_types(output=str) - def run(self, who: str): - """ - Takes a string in input and returns "Hello, !" - in output. - """ - return {"output": f"Hello, {who}!"} - - pipeline = Pipeline() - pipeline.add_component("hello", Hello()) - pipeline.add_component("hello2", Hello()) - - pipeline.connect("hello.output", "hello2.who") - - # test case with nested component inputs - with pytest.raises(ValueError): - pipeline.run(data={"hello": {"non_existing_input": "world"}}) - - # test case with flat component inputs - with pytest.raises(ValueError): - pipeline.run(data={"non_existing_input": "world"}) - - # important to check that the warning is logged for UX purposes, leave it here - assert "were not matched to any component" in caplog.text - - -def test_pipeline_resolution_with_mixed_correct_and_incorrect_input_names(caplog): - @component - class Hello: - @component.output_types(output=str) - def run(self, who: str): - """ - Takes a string in input and returns "Hello, !" - in output. - """ - return {"output": f"Hello, {who}!"} - - pipeline = Pipeline() - pipeline.add_component("hello", Hello()) - pipeline.add_component("hello2", Hello()) - - pipeline.connect("hello.output", "hello2.who") - - # test case with nested component inputs - # this will raise ValueError because hello component does not have an input named "non_existing_input" - # even though it has an input named "who" - with pytest.raises(ValueError): - pipeline.run(data={"hello": {"non_existing_input": "world", "who": "world"}}) - - # test case with flat component inputs - # this will not raise ValueError because the input "who" will be resolved to the correct component - # and we'll log a warning for the input "non_existing_input" which was not resolved - result = pipeline.run(data={"non_existing_input": "world", "who": "world"}) - assert result == {"hello2": {"output": "Hello, Hello, world!!"}} - - # important to check that the warning is logged for UX purposes, leave it here - assert "were not matched to any component" in caplog.text - - -def test_pipeline_resolution_duplicate_input_names_across_components(): - @component - class Hello: - @component.output_types(output=str) - def run(self, who: str, what: str): - return {"output": f"Hello {who} {what}!"} - - pipe = Pipeline() - pipe.add_component("hello", Hello()) - pipe.add_component("hello2", Hello()) - - pipe.connect("hello.output", "hello2.who") - - result = pipe.run(data={"what": "Haystack", "who": "world"}) - assert result == {"hello2": {"output": "Hello Hello world Haystack! Haystack!"}} - - resolved, _ = pipe._prepare_component_input_data(data={"what": "Haystack", "who": "world"}) - - # why does hello2 have only one input? Because who of hello2 is inserted from hello.output - assert resolved == {"hello": {"what": "Haystack", "who": "world"}, "hello2": {"what": "Haystack"}}