From b9bf83bbef93d9b1c98f888799696776a7bf485e Mon Sep 17 00:00:00 2001 From: Vladimir Blagojevic Date: Thu, 30 Nov 2023 14:37:55 +0100 Subject: [PATCH] feat: Allow flat dictionary `Pipeline.run()` inputs (#6413) * Initial implementation, release note, update API and unit test --------- Co-authored-by: Massimiliano Pippi Co-authored-by: Silvano Cerza --- haystack/pipeline.py | 107 ++++++++++++++++- ...d-pipeline-run-input-e3dd98ff38f0bc01.yaml | 6 + test/test_pipeline.py | 108 ++++++++++++++++++ 3 files changed, 218 insertions(+), 3 deletions(-) create mode 100644 releasenotes/notes/allow-simplified-pipeline-run-input-e3dd98ff38f0bc01.yaml diff --git a/haystack/pipeline.py b/haystack/pipeline.py index 0478e3698..be8d79583 100644 --- a/haystack/pipeline.py +++ b/haystack/pipeline.py @@ -1,4 +1,5 @@ -from typing import Any, Dict, Optional, Union, TextIO +from collections import defaultdict +from typing import Any, Dict, Optional, Union, TextIO, Tuple from pathlib import Path import datetime import logging @@ -35,14 +36,76 @@ class Pipeline(_pipeline): def run(self, data: Dict[str, Any], debug: bool = False) -> Dict[str, Any]: """ - Runs the pipeline. + Runs the pipeline with given input data. + + :param data: A dictionary of inputs for the pipeline's components. Each key is a component name + and its value is a dictionary of that component's input parameters. + :param debug: Set to True to collect and return debug information. + :return: A dictionary containing the pipeline's output. + :raises PipelineRuntimeError: If a component fails or returns unexpected output. + + Example a - Using named components: + Consider a 'Hello' component that takes a 'word' input and outputs a greeting. + + ```python + @component + class Hello: + @component.output_types(output=str) + def run(self, word: str): + return {"output": f"Hello, {word}!"} + ``` + + Create a pipeline with two 'Hello' components connected together: + + ```python + pipeline = Pipeline() + pipeline.add_component("hello", Hello()) + pipeline.add_component("hello2", Hello()) + pipeline.connect("hello.output", "hello2.word") + result = pipeline.run(data={"hello": {"word": "world"}}) + ``` + + This runs the pipeline with the specified input for 'hello', yielding + {'hello2': {'output': 'Hello, Hello, world!!'}}. + + Example b - Using flat inputs: + You can also pass inputs directly without specifying component names: + + ```python + result = pipeline.run(data={"word": "world"}) + ``` + + The pipeline resolves inputs to the correct components, returning + {'hello2': {'output': 'Hello, Hello, world!!'}}. + """ + # check whether the data is a nested dictionary of component inputs where each key is a component name + # and each value is a dictionary of input parameters for that component + is_nested_component_input = all(isinstance(value, dict) for value in data.values()) + if is_nested_component_input: + return self._run_internal(data=data, debug=debug) + else: + # flat input, a dict where keys are input names and values are the corresponding values + # we need to convert it to a nested dictionary of component inputs and then run the pipeline + # just like in the previous case + pipeline_inputs, unresolved_inputs = self._prepare_component_input_data(data) + if unresolved_inputs: + logger.warning( + "Inputs %s were not matched to any component inputs, please check your run parameters.", + list(unresolved_inputs.keys()), + ) + + return self._run_internal(data=pipeline_inputs, debug=debug) + + def _run_internal(self, data: Dict[str, Any], debug: bool = False) -> Dict[str, Any]: + """ + Runs the pipeline by invoking the underlying run to initiate the pipeline execution. :params data: the inputs to give to the input components of the Pipeline. :params debug: whether to collect and return debug information. :returns: A dictionary with the outputs of the output components of the Pipeline. - :raises PipelineRuntimeError: if the any of the components fail or return unexpected output. + :raises PipelineRuntimeError: if any of the components fail or return unexpected output. """ pipeline_running(self) return super().run(data=data, debug=debug) @@ -97,3 +160,41 @@ class Pipeline(_pipeline): :returns: A `Pipeline` object. """ return cls.from_dict(marshaller.unmarshal(fp.read())) + + def _prepare_component_input_data(self, data: Dict[str, Any]) -> Tuple[Dict[str, Dict[str, Any]], Dict[str, Any]]: + """ + Organizes input data for pipeline components and identifies any inputs that are not matched to any + component's input slots. + + This method processes a flat dictionary of input data, where each key-value pair represents an input name + and its corresponding value. It distributes these inputs to the appropriate pipeline components based on + their input requirements. Inputs that don't match any component's input slots are classified as unresolved. + + :param data: A dictionary with input names as keys and input values as values. + :type data: Dict[str, Any] + :return: A tuple containing two elements: + 1. A dictionary mapping component names to their respective matched inputs. + 2. A dictionary of inputs that were not matched to any component, termed as unresolved keyword arguments. + :rtype: Tuple[Dict[str, Dict[str, Any]], Dict[str, Any]] + """ + pipeline_input_data: Dict[str, Dict[str, Any]] = defaultdict(dict) + unresolved_kwargs = {} + + # Retrieve the input slots for each component in the pipeline + available_inputs: Dict[str, Dict[str, Any]] = self.inputs() + + # Go through all provided to distribute them to the appropriate component inputs + for input_name, input_value in data.items(): + resolved_at_least_once = False + + # Check each component to see if it has a slot for the current kwarg + for component_name, component_inputs in available_inputs.items(): + if input_name in component_inputs: + # If a match is found, add the kwarg to the component's input data + pipeline_input_data[component_name][input_name] = input_value + resolved_at_least_once = True + + if not resolved_at_least_once: + unresolved_kwargs[input_name] = input_value + + return pipeline_input_data, unresolved_kwargs diff --git a/releasenotes/notes/allow-simplified-pipeline-run-input-e3dd98ff38f0bc01.yaml b/releasenotes/notes/allow-simplified-pipeline-run-input-e3dd98ff38f0bc01.yaml new file mode 100644 index 000000000..6badfe24e --- /dev/null +++ b/releasenotes/notes/allow-simplified-pipeline-run-input-e3dd98ff38f0bc01.yaml @@ -0,0 +1,6 @@ +--- +preview: + - | + Adds option to run pipelines without specifying component inputs and their corresponding key/value pairs. Instead, + provide the input keys/values directly, and the pipeline's internal mechanisms will automatically determine the + appropriate components. diff --git a/test/test_pipeline.py b/test/test_pipeline.py index b5e4aee97..64d537422 100644 --- a/test/test_pipeline.py +++ b/test/test_pipeline.py @@ -56,3 +56,111 @@ def test_pipeline_load(test_files_path): assert pipeline.max_loops_allowed == 99 assert isinstance(pipeline.get_component("Comp1"), TestComponent) assert isinstance(pipeline.get_component("Comp2"), TestComponent) + + +@pytest.mark.unit +def test_pipeline_resolution_simple_input(): + @component + class Hello: + @component.output_types(output=str) + def run(self, word: str): + """ + Takes a string in input and returns "Hello, !" + in output. + """ + return {"output": f"Hello, {word}!"} + + pipeline = Pipeline() + pipeline.add_component("hello", Hello()) + pipeline.add_component("hello2", Hello()) + + pipeline.connect("hello.output", "hello2.word") + result = pipeline.run(data={"hello": {"word": "world"}}) + assert result == {"hello2": {"output": "Hello, Hello, world!!"}} + + result = pipeline.run(data={"word": "world"}) + assert result == {"hello2": {"output": "Hello, Hello, world!!"}} + + +def test_pipeline_resolution_wrong_input_name(caplog): + @component + class Hello: + @component.output_types(output=str) + def run(self, who: str): + """ + Takes a string in input and returns "Hello, !" + in output. + """ + return {"output": f"Hello, {who}!"} + + pipeline = Pipeline() + pipeline.add_component("hello", Hello()) + pipeline.add_component("hello2", Hello()) + + pipeline.connect("hello.output", "hello2.who") + + # test case with nested component inputs + with pytest.raises(ValueError): + pipeline.run(data={"hello": {"non_existing_input": "world"}}) + + # test case with flat component inputs + with pytest.raises(ValueError): + pipeline.run(data={"non_existing_input": "world"}) + + # important to check that the warning is logged for UX purposes, leave it here + assert "were not matched to any component" in caplog.text + + +def test_pipeline_resolution_with_mixed_correct_and_incorrect_input_names(caplog): + @component + class Hello: + @component.output_types(output=str) + def run(self, who: str): + """ + Takes a string in input and returns "Hello, !" + in output. + """ + return {"output": f"Hello, {who}!"} + + pipeline = Pipeline() + pipeline.add_component("hello", Hello()) + pipeline.add_component("hello2", Hello()) + + pipeline.connect("hello.output", "hello2.who") + + # test case with nested component inputs + # this will raise ValueError because hello component does not have an input named "non_existing_input" + # even though it has an input named "who" + with pytest.raises(ValueError): + pipeline.run(data={"hello": {"non_existing_input": "world", "who": "world"}}) + + # test case with flat component inputs + # this will not raise ValueError because the input "who" will be resolved to the correct component + # and we'll log a warning for the input "non_existing_input" which was not resolved + result = pipeline.run(data={"non_existing_input": "world", "who": "world"}) + assert result == {"hello2": {"output": "Hello, Hello, world!!"}} + + # important to check that the warning is logged for UX purposes, leave it here + assert "were not matched to any component" in caplog.text + + +def test_pipeline_resolution_duplicate_input_names_across_components(): + @component + class Hello: + @component.output_types(output=str) + def run(self, who: str, what: str): + return {"output": f"Hello {who} {what}!"} + + pipe = Pipeline() + pipe.add_component("hello", Hello()) + pipe.add_component("hello2", Hello()) + + pipe.connect("hello.output", "hello2.who") + + result = pipe.run(data={"what": "Haystack", "who": "world"}) + assert result == {"hello2": {"output": "Hello Hello world Haystack! Haystack!"}} + + resolved, _ = pipe._prepare_component_input_data(data={"what": "Haystack", "who": "world"}) + + # why does hello2 have only one input? Because who of hello2 is inserted from hello.output + assert resolved == {"hello": {"what": "Haystack", "who": "world"}, "hello2": {"what": "Haystack"}}