refactor: Merge Pipelines definition in core package (#6973)

* Move marshalling functions in core Pipeline

* Move telemetry gathering in core Pipeline

* Move run logic in core Pipeline

* Update root Pipeline import

* Add release notes

* Update Pipeline docs path

* Update releasenotes/notes/merge-pipeline-definitions-1da80e9803e2a8bb.yaml

Co-authored-by: Massimiliano Pippi <mpippi@gmail.com>

---------

Co-authored-by: Massimiliano Pippi <mpippi@gmail.com>
This commit is contained in:
Silvano Cerza 2024-02-12 18:25:28 +01:00 committed by GitHub
parent 549021d2fc
commit f96eb3847f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 333 additions and 383 deletions

View File

@ -1,6 +1,6 @@
loaders:
- type: haystack_pydoc_tools.loaders.CustomPythonLoader
search_path: [../../../haystack]
search_path: [../../../haystack/core/pipeline]
modules: ["pipeline"]
ignore_when_discovered: ["__init__"]
processors:

View File

@ -1,9 +1,8 @@
from haystack.core.component import component
from haystack.core.errors import ComponentError, DeserializationError
from haystack.core.pipeline import Pipeline
from haystack.core.serialization import default_from_dict, default_to_dict
from haystack.core.errors import DeserializationError, ComponentError
from haystack.pipeline import Pipeline
from haystack.dataclasses import Document, Answer, GeneratedAnswer, ExtractedAnswer
from haystack.dataclasses import Answer, Document, ExtractedAnswer, GeneratedAnswer
__all__ = [
"component",

View File

@ -4,9 +4,11 @@
import importlib
import itertools
import logging
from collections import defaultdict
from copy import copy
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Mapping, Optional, Set, Tuple, Type, TypeVar, Union
from typing import Any, Dict, List, Mapping, Optional, Set, TextIO, Tuple, Type, TypeVar, Union
import networkx # type:ignore
@ -20,11 +22,14 @@ from haystack.core.errors import (
)
from haystack.core.serialization import component_from_dict, component_to_dict
from haystack.core.type_utils import _type_name, _types_are_compatible
from haystack.marshal import Marshaller, YamlMarshaller
from haystack.telemetry import pipeline_running
from haystack.utils import is_in_jupyter
from .descriptions import find_pipeline_inputs, find_pipeline_outputs
from .draw import _to_mermaid_image
DEFAULT_MARSHALLER = YamlMarshaller()
logger = logging.getLogger(__name__)
# We use a generic type to annotate the return value of classmethods,
@ -56,6 +61,8 @@ class Pipeline:
max_loops_allowed: how many times the pipeline can run the same node before throwing an exception.
debug_path: when debug is enabled in `run()`, where to save the debug data.
"""
self._telemetry_runs = 0
self._last_telemetry_sent: Optional[datetime] = None
self.metadata = metadata or {}
self.max_loops_allowed = max_loops_allowed
self.graph = networkx.MultiDiGraph()
@ -194,6 +201,57 @@ class Pipeline:
return pipe
def dumps(self, marshaller: Marshaller = DEFAULT_MARSHALLER) -> str:
"""
Returns the string representation of this pipeline according to the
format dictated by the `Marshaller` in use.
:params marshaller: The Marshaller used to create the string representation. Defaults to
`YamlMarshaller`
:returns: A string representing the pipeline.
"""
return marshaller.marshal(self.to_dict())
def dump(self, fp: TextIO, marshaller: Marshaller = DEFAULT_MARSHALLER):
"""
Writes the string representation of this pipeline to the file-like object
passed in the `fp` argument.
:params fp: A file-like object ready to be written to.
:params marshaller: The Marshaller used to create the string representation. Defaults to
`YamlMarshaller`.
"""
fp.write(marshaller.marshal(self.to_dict()))
@classmethod
def loads(cls, data: Union[str, bytes, bytearray], marshaller: Marshaller = DEFAULT_MARSHALLER) -> "Pipeline":
"""
Creates a `Pipeline` object from the string representation passed in the `data` argument.
:params data: The string representation of the pipeline, can be `str`, `bytes` or `bytearray`.
:params marshaller: the Marshaller used to create the string representation. Defaults to
`YamlMarshaller`
:returns: A `Pipeline` object.
"""
return cls.from_dict(marshaller.unmarshal(data))
@classmethod
def load(cls, fp: TextIO, marshaller: Marshaller = DEFAULT_MARSHALLER) -> "Pipeline":
"""
Creates a `Pipeline` object from the string representation read from the file-like
object passed in the `fp` argument.
:params data: The string representation of the pipeline, can be `str`, `bytes` or `bytearray`.
:params fp: A file-like object ready to be read from.
:params marshaller: the Marshaller used to create the string representation. Defaults to
`YamlMarshaller`
:returns: A `Pipeline` object.
"""
return cls.from_dict(marshaller.unmarshal(fp.read()))
def add_component(self, name: str, instance: Component) -> None:
"""
Create a component for the given component. Components are not connected to anything by default:
@ -545,10 +603,54 @@ class Pipeline:
f"Input {socket_name} for component {component_name} is already sent by {socket.senders}."
)
# TODO: We're ignoring this linting rules for the time being, after we properly optimize this function we'll remove the noqa
def run( # noqa: C901, PLR0912 pylint: disable=too-many-branches
# TODO: We're ignoring these linting rules for the time being, after we properly optimize this function we'll remove the noqa
def run( # noqa: C901, PLR0912, PLR0915 pylint: disable=too-many-branches
self, data: Dict[str, Any], debug: bool = False
) -> Dict[str, Any]:
"""
Runs the pipeline with given input data.
:param data: A dictionary of inputs for the pipeline's components. Each key is a component name
and its value is a dictionary of that component's input parameters.
:param debug: Set to True to collect and return debug information.
:return: A dictionary containing the pipeline's output.
:raises PipelineRuntimeError: If a component fails or returns unexpected output.
Example a - Using named components:
Consider a 'Hello' component that takes a 'word' input and outputs a greeting.
```python
@component
class Hello:
@component.output_types(output=str)
def run(self, word: str):
return {"output": f"Hello, {word}!"}
```
Create a pipeline with two 'Hello' components connected together:
```python
pipeline = Pipeline()
pipeline.add_component("hello", Hello())
pipeline.add_component("hello2", Hello())
pipeline.connect("hello.output", "hello2.word")
result = pipeline.run(data={"hello": {"word": "world"}})
```
This runs the pipeline with the specified input for 'hello', yielding
{'hello2': {'output': 'Hello, Hello, world!!'}}.
Example b - Using flat inputs:
You can also pass inputs directly without specifying component names:
```python
result = pipeline.run(data={"word": "world"})
```
The pipeline resolves inputs to the correct components, returning
{'hello2': {'output': 'Hello, Hello, world!!'}}.
"""
pipeline_running(self)
# NOTE: We're assuming data is formatted like so as of now
# data = {
# "comp1": {"input1": 1, "input2": 2},
@ -563,9 +665,22 @@ class Pipeline:
# As of now it's here to make sure we don't have failing tests that assume warm_up() is called in run()
self.warm_up()
# check whether the data is a nested dictionary of component inputs where each key is a component name
# and each value is a dictionary of input parameters for that component
is_nested_component_input = all(isinstance(value, dict) for value in data.values())
if not is_nested_component_input:
# flat input, a dict where keys are input names and values are the corresponding values
# we need to convert it to a nested dictionary of component inputs and then run the pipeline
# just like in the previous case
data, unresolved_inputs = self._prepare_component_input_data(data)
if unresolved_inputs:
logger.warning(
"Inputs %s were not matched to any component inputs, please check your run parameters.",
list(unresolved_inputs.keys()),
)
# Raise if input is malformed in some way
self._validate_input(data)
# NOTE: The above NOTE and TODO are technically not true.
# This implementation of run supports only the first format, but the second format is actually
# never received by this method. It's handled by the `run()` method of the `Pipeline` class
@ -774,6 +889,44 @@ class Pipeline:
return final_outputs
def _prepare_component_input_data(self, data: Dict[str, Any]) -> Tuple[Dict[str, Dict[str, Any]], Dict[str, Any]]:
"""
Organizes input data for pipeline components and identifies any inputs that are not matched to any
component's input slots.
This method processes a flat dictionary of input data, where each key-value pair represents an input name
and its corresponding value. It distributes these inputs to the appropriate pipeline components based on
their input requirements. Inputs that don't match any component's input slots are classified as unresolved.
:param data: A dictionary with input names as keys and input values as values.
:type data: Dict[str, Any]
:return: A tuple containing two elements:
1. A dictionary mapping component names to their respective matched inputs.
2. A dictionary of inputs that were not matched to any component, termed as unresolved keyword arguments.
:rtype: Tuple[Dict[str, Dict[str, Any]], Dict[str, Any]]
"""
pipeline_input_data: Dict[str, Dict[str, Any]] = defaultdict(dict)
unresolved_kwargs = {}
# Retrieve the input slots for each component in the pipeline
available_inputs: Dict[str, Dict[str, Any]] = self.inputs()
# Go through all provided to distribute them to the appropriate component inputs
for input_name, input_value in data.items():
resolved_at_least_once = False
# Check each component to see if it has a slot for the current kwarg
for component_name, component_inputs in available_inputs.items():
if input_name in component_inputs:
# If a match is found, add the kwarg to the component's input data
pipeline_input_data[component_name][input_name] = input_value
resolved_at_least_once = True
if not resolved_at_least_once:
unresolved_kwargs[input_name] = input_value
return pipeline_input_data, unresolved_kwargs
def _connections_status(
sender_node: str, receiver_node: str, sender_sockets: List[OutputSocket], receiver_sockets: List[InputSocket]

View File

@ -1,200 +0,0 @@
from collections import defaultdict
from typing import Any, Dict, Optional, Union, TextIO, Tuple
from pathlib import Path
import datetime
import logging
from haystack.core.pipeline import Pipeline as _pipeline
from haystack.telemetry import pipeline_running
from haystack.marshal import Marshaller, YamlMarshaller
DEFAULT_MARSHALLER = YamlMarshaller()
logger = logging.getLogger(__name__)
class Pipeline(_pipeline):
def __init__(
self,
metadata: Optional[Dict[str, Any]] = None,
max_loops_allowed: int = 100,
debug_path: Union[Path, str] = Path(".haystack_debug/"),
):
"""
Creates the Pipeline.
Args:
metadata: arbitrary dictionary to store metadata about this pipeline. Make sure all the values contained in
this dictionary can be serialized and deserialized if you wish to save this pipeline to file with
`save_pipelines()/load_pipelines()`.
max_loops_allowed: how many times the pipeline can run the same node before throwing an exception.
debug_path: when debug is enabled in `run()`, where to save the debug data.
"""
self._telemetry_runs = 0
self._last_telemetry_sent: Optional[datetime.datetime] = None
super().__init__(metadata=metadata, max_loops_allowed=max_loops_allowed, debug_path=debug_path)
def run(self, data: Dict[str, Any], debug: bool = False) -> Dict[str, Any]:
"""
Runs the pipeline with given input data.
:param data: A dictionary of inputs for the pipeline's components. Each key is a component name
and its value is a dictionary of that component's input parameters.
:param debug: Set to True to collect and return debug information.
:return: A dictionary containing the pipeline's output.
:raises PipelineRuntimeError: If a component fails or returns unexpected output.
Example a - Using named components:
Consider a 'Hello' component that takes a 'word' input and outputs a greeting.
```python
@component
class Hello:
@component.output_types(output=str)
def run(self, word: str):
return {"output": f"Hello, {word}!"}
```
Create a pipeline with two 'Hello' components connected together:
```python
pipeline = Pipeline()
pipeline.add_component("hello", Hello())
pipeline.add_component("hello2", Hello())
pipeline.connect("hello.output", "hello2.word")
result = pipeline.run(data={"hello": {"word": "world"}})
```
This runs the pipeline with the specified input for 'hello', yielding
{'hello2': {'output': 'Hello, Hello, world!!'}}.
Example b - Using flat inputs:
You can also pass inputs directly without specifying component names:
```python
result = pipeline.run(data={"word": "world"})
```
The pipeline resolves inputs to the correct components, returning
{'hello2': {'output': 'Hello, Hello, world!!'}}.
"""
# check whether the data is a nested dictionary of component inputs where each key is a component name
# and each value is a dictionary of input parameters for that component
is_nested_component_input = all(isinstance(value, dict) for value in data.values())
if is_nested_component_input:
return self._run_internal(data=data, debug=debug)
else:
# flat input, a dict where keys are input names and values are the corresponding values
# we need to convert it to a nested dictionary of component inputs and then run the pipeline
# just like in the previous case
pipeline_inputs, unresolved_inputs = self._prepare_component_input_data(data)
if unresolved_inputs:
logger.warning(
"Inputs %s were not matched to any component inputs, please check your run parameters.",
list(unresolved_inputs.keys()),
)
return self._run_internal(data=pipeline_inputs, debug=debug)
def _run_internal(self, data: Dict[str, Any], debug: bool = False) -> Dict[str, Any]:
"""
Runs the pipeline by invoking the underlying run to initiate the pipeline execution.
:params data: the inputs to give to the input components of the Pipeline.
:params debug: whether to collect and return debug information.
:returns: A dictionary with the outputs of the output components of the Pipeline.
:raises PipelineRuntimeError: if any of the components fail or return unexpected output.
"""
pipeline_running(self)
return super().run(data=data, debug=debug)
def dumps(self, marshaller: Marshaller = DEFAULT_MARSHALLER) -> str:
"""
Returns the string representation of this pipeline according to the
format dictated by the `Marshaller` in use.
:params marshaller: The Marshaller used to create the string representation. Defaults to
`YamlMarshaller`
:returns: A string representing the pipeline.
"""
return marshaller.marshal(self.to_dict())
def dump(self, fp: TextIO, marshaller: Marshaller = DEFAULT_MARSHALLER):
"""
Writes the string representation of this pipeline to the file-like object
passed in the `fp` argument.
:params fp: A file-like object ready to be written to.
:params marshaller: The Marshaller used to create the string representation. Defaults to
`YamlMarshaller`.
"""
fp.write(marshaller.marshal(self.to_dict()))
@classmethod
def loads(cls, data: Union[str, bytes, bytearray], marshaller: Marshaller = DEFAULT_MARSHALLER) -> "Pipeline":
"""
Creates a `Pipeline` object from the string representation passed in the `data` argument.
:params data: The string representation of the pipeline, can be `str`, `bytes` or `bytearray`.
:params marshaller: the Marshaller used to create the string representation. Defaults to
`YamlMarshaller`
:returns: A `Pipeline` object.
"""
return cls.from_dict(marshaller.unmarshal(data))
@classmethod
def load(cls, fp: TextIO, marshaller: Marshaller = DEFAULT_MARSHALLER) -> "Pipeline":
"""
Creates a `Pipeline` object from the string representation read from the file-like
object passed in the `fp` argument.
:params data: The string representation of the pipeline, can be `str`, `bytes` or `bytearray`.
:params fp: A file-like object ready to be read from.
:params marshaller: the Marshaller used to create the string representation. Defaults to
`YamlMarshaller`
:returns: A `Pipeline` object.
"""
return cls.from_dict(marshaller.unmarshal(fp.read()))
def _prepare_component_input_data(self, data: Dict[str, Any]) -> Tuple[Dict[str, Dict[str, Any]], Dict[str, Any]]:
"""
Organizes input data for pipeline components and identifies any inputs that are not matched to any
component's input slots.
This method processes a flat dictionary of input data, where each key-value pair represents an input name
and its corresponding value. It distributes these inputs to the appropriate pipeline components based on
their input requirements. Inputs that don't match any component's input slots are classified as unresolved.
:param data: A dictionary with input names as keys and input values as values.
:type data: Dict[str, Any]
:return: A tuple containing two elements:
1. A dictionary mapping component names to their respective matched inputs.
2. A dictionary of inputs that were not matched to any component, termed as unresolved keyword arguments.
:rtype: Tuple[Dict[str, Dict[str, Any]], Dict[str, Any]]
"""
pipeline_input_data: Dict[str, Dict[str, Any]] = defaultdict(dict)
unresolved_kwargs = {}
# Retrieve the input slots for each component in the pipeline
available_inputs: Dict[str, Dict[str, Any]] = self.inputs()
# Go through all provided to distribute them to the appropriate component inputs
for input_name, input_value in data.items():
resolved_at_least_once = False
# Check each component to see if it has a slot for the current kwarg
for component_name, component_inputs in available_inputs.items():
if input_name in component_inputs:
# If a match is found, add the kwarg to the component's input data
pipeline_input_data[component_name][input_name] = input_value
resolved_at_least_once = True
if not resolved_at_least_once:
unresolved_kwargs[input_name] = input_value
return pipeline_input_data, unresolved_kwargs

View File

@ -1,17 +1,18 @@
from typing import Any, Dict, Optional, TYPE_CHECKING, List, Tuple
import os
from pathlib import Path
from collections import defaultdict
import datetime
import logging
import os
import uuid
import yaml
from collections import defaultdict
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
import posthog
import yaml
from haystack.telemetry._environment import collect_system_specs
if TYPE_CHECKING:
from haystack.pipeline import Pipeline
from haystack.core.pipeline import Pipeline
HAYSTACK_TELEMETRY_ENABLED = "HAYSTACK_TELEMETRY_ENABLED"

View File

@ -0,0 +1,5 @@
---
enhancements:
- |
Merge `Pipeline`s definitions into a single `Pipeline` class.
The class in the `haystack.pipeline` package has been deleted and only `haystack.core.pipeline` exists now.

View File

@ -7,6 +7,7 @@ from unittest.mock import patch
import pytest
from haystack.core.component import component
from haystack.core.component.types import InputSocket, OutputSocket
from haystack.core.errors import PipelineDrawingError, PipelineError, PipelineRuntimeError
from haystack.core.pipeline import Pipeline
@ -16,6 +17,163 @@ from haystack.testing.sample_components import AddFixedValue, Double
logging.basicConfig(level=logging.DEBUG)
@component
class FakeComponent:
def __init__(self, an_init_param: Optional[str] = None):
pass
@component.output_types(value=str)
def run(self, input_: str):
return {"value": input_}
def test_pipeline_resolution_simple_input():
@component
class Hello:
@component.output_types(output=str)
def run(self, word: str):
"""
Takes a string in input and returns "Hello, <string>!"
in output.
"""
return {"output": f"Hello, {word}!"}
pipeline = Pipeline()
pipeline.add_component("hello", Hello())
pipeline.add_component("hello2", Hello())
pipeline.connect("hello.output", "hello2.word")
result = pipeline.run(data={"hello": {"word": "world"}})
assert result == {"hello2": {"output": "Hello, Hello, world!!"}}
result = pipeline.run(data={"word": "world"})
assert result == {"hello2": {"output": "Hello, Hello, world!!"}}
def test_pipeline_resolution_wrong_input_name(caplog):
@component
class Hello:
@component.output_types(output=str)
def run(self, who: str):
"""
Takes a string in input and returns "Hello, <string>!"
in output.
"""
return {"output": f"Hello, {who}!"}
pipeline = Pipeline()
pipeline.add_component("hello", Hello())
pipeline.add_component("hello2", Hello())
pipeline.connect("hello.output", "hello2.who")
# test case with nested component inputs
with pytest.raises(ValueError):
pipeline.run(data={"hello": {"non_existing_input": "world"}})
# test case with flat component inputs
with pytest.raises(ValueError):
pipeline.run(data={"non_existing_input": "world"})
# important to check that the warning is logged for UX purposes, leave it here
assert "were not matched to any component" in caplog.text
def test_pipeline_resolution_with_mixed_correct_and_incorrect_input_names(caplog):
@component
class Hello:
@component.output_types(output=str)
def run(self, who: str):
"""
Takes a string in input and returns "Hello, <string>!"
in output.
"""
return {"output": f"Hello, {who}!"}
pipeline = Pipeline()
pipeline.add_component("hello", Hello())
pipeline.add_component("hello2", Hello())
pipeline.connect("hello.output", "hello2.who")
# test case with nested component inputs
# this will raise ValueError because hello component does not have an input named "non_existing_input"
# even though it has an input named "who"
with pytest.raises(ValueError):
pipeline.run(data={"hello": {"non_existing_input": "world", "who": "world"}})
# test case with flat component inputs
# this will not raise ValueError because the input "who" will be resolved to the correct component
# and we'll log a warning for the input "non_existing_input" which was not resolved
result = pipeline.run(data={"non_existing_input": "world", "who": "world"})
assert result == {"hello2": {"output": "Hello, Hello, world!!"}}
# important to check that the warning is logged for UX purposes, leave it here
assert "were not matched to any component" in caplog.text
def test_pipeline_resolution_duplicate_input_names_across_components():
@component
class Hello:
@component.output_types(output=str)
def run(self, who: str, what: str):
return {"output": f"Hello {who} {what}!"}
pipe = Pipeline()
pipe.add_component("hello", Hello())
pipe.add_component("hello2", Hello())
pipe.connect("hello.output", "hello2.who")
result = pipe.run(data={"what": "Haystack", "who": "world"})
assert result == {"hello2": {"output": "Hello Hello world Haystack! Haystack!"}}
resolved, _ = pipe._prepare_component_input_data(data={"what": "Haystack", "who": "world"})
# why does hello2 have only one input? Because who of hello2 is inserted from hello.output
assert resolved == {"hello": {"what": "Haystack", "who": "world"}, "hello2": {"what": "Haystack"}}
def test_pipeline_dumps(test_files_path):
pipeline = Pipeline()
pipeline.add_component("Comp1", FakeComponent("Foo"))
pipeline.add_component("Comp2", FakeComponent())
pipeline.connect("Comp1.value", "Comp2.input_")
pipeline.max_loops_allowed = 99
result = pipeline.dumps()
with open(f"{test_files_path}/yaml/test_pipeline.yaml", "r") as f:
assert f.read() == result
def test_pipeline_loads(test_files_path):
with open(f"{test_files_path}/yaml/test_pipeline.yaml", "r") as f:
pipeline = Pipeline.loads(f.read())
assert pipeline.max_loops_allowed == 99
assert isinstance(pipeline.get_component("Comp1"), FakeComponent)
assert isinstance(pipeline.get_component("Comp2"), FakeComponent)
def test_pipeline_dump(test_files_path, tmp_path):
pipeline = Pipeline()
pipeline.add_component("Comp1", FakeComponent("Foo"))
pipeline.add_component("Comp2", FakeComponent())
pipeline.connect("Comp1.value", "Comp2.input_")
pipeline.max_loops_allowed = 99
with open(tmp_path / "out.yaml", "w") as f:
pipeline.dump(f)
# re-open and ensure it's the same data as the test file
with open(f"{test_files_path}/yaml/test_pipeline.yaml", "r") as test_f, open(tmp_path / "out.yaml", "r") as f:
assert f.read() == test_f.read()
def test_pipeline_load(test_files_path):
with open(f"{test_files_path}/yaml/test_pipeline.yaml", "r") as f:
pipeline = Pipeline.load(f)
assert pipeline.max_loops_allowed == 99
assert isinstance(pipeline.get_component("Comp1"), FakeComponent)
assert isinstance(pipeline.get_component("Comp2"), FakeComponent)
@patch("haystack.core.pipeline.pipeline._to_mermaid_image")
@patch("haystack.core.pipeline.pipeline.is_in_jupyter")
@patch("IPython.display.Image")

View File

@ -2,11 +2,11 @@ components:
Comp1:
init_parameters:
an_init_param: null
type: test.test_pipeline.TestComponent
type: test.core.pipeline.test_pipeline.FakeComponent
Comp2:
init_parameters:
an_init_param: null
type: test.test_pipeline.TestComponent
type: test.core.pipeline.test_pipeline.FakeComponent
connections:
- receiver: Comp2.input_
sender: Comp1.value

View File

@ -1,166 +0,0 @@
from typing import Optional
import pytest
from haystack import Pipeline, component
@component
class TestComponent:
def __init__(self, an_init_param: Optional[str] = None):
pass
@component.output_types(value=str)
def run(self, input_: str):
return {"value": input_}
@pytest.fixture
def pipeline():
return Pipeline()
def test_pipeline_dumps(pipeline, test_files_path):
pipeline.add_component("Comp1", TestComponent("Foo"))
pipeline.add_component("Comp2", TestComponent())
pipeline.connect("Comp1.value", "Comp2.input_")
pipeline.max_loops_allowed = 99
result = pipeline.dumps()
with open(f"{test_files_path}/yaml/test_pipeline.yaml", "r") as f:
assert f.read() == result
def test_pipeline_loads(test_files_path):
with open(f"{test_files_path}/yaml/test_pipeline.yaml", "r") as f:
pipeline = Pipeline.loads(f.read())
assert pipeline.max_loops_allowed == 99
assert isinstance(pipeline.get_component("Comp1"), TestComponent)
assert isinstance(pipeline.get_component("Comp2"), TestComponent)
def test_pipeline_dump(pipeline, test_files_path, tmp_path):
pipeline.add_component("Comp1", TestComponent("Foo"))
pipeline.add_component("Comp2", TestComponent())
pipeline.connect("Comp1.value", "Comp2.input_")
pipeline.max_loops_allowed = 99
with open(tmp_path / "out.yaml", "w") as f:
pipeline.dump(f)
# re-open and ensure it's the same data as the test file
with open(f"{test_files_path}/yaml/test_pipeline.yaml", "r") as test_f, open(tmp_path / "out.yaml", "r") as f:
assert f.read() == test_f.read()
def test_pipeline_load(test_files_path):
with open(f"{test_files_path}/yaml/test_pipeline.yaml", "r") as f:
pipeline = Pipeline.load(f)
assert pipeline.max_loops_allowed == 99
assert isinstance(pipeline.get_component("Comp1"), TestComponent)
assert isinstance(pipeline.get_component("Comp2"), TestComponent)
@pytest.mark.unit
def test_pipeline_resolution_simple_input():
@component
class Hello:
@component.output_types(output=str)
def run(self, word: str):
"""
Takes a string in input and returns "Hello, <string>!"
in output.
"""
return {"output": f"Hello, {word}!"}
pipeline = Pipeline()
pipeline.add_component("hello", Hello())
pipeline.add_component("hello2", Hello())
pipeline.connect("hello.output", "hello2.word")
result = pipeline.run(data={"hello": {"word": "world"}})
assert result == {"hello2": {"output": "Hello, Hello, world!!"}}
result = pipeline.run(data={"word": "world"})
assert result == {"hello2": {"output": "Hello, Hello, world!!"}}
def test_pipeline_resolution_wrong_input_name(caplog):
@component
class Hello:
@component.output_types(output=str)
def run(self, who: str):
"""
Takes a string in input and returns "Hello, <string>!"
in output.
"""
return {"output": f"Hello, {who}!"}
pipeline = Pipeline()
pipeline.add_component("hello", Hello())
pipeline.add_component("hello2", Hello())
pipeline.connect("hello.output", "hello2.who")
# test case with nested component inputs
with pytest.raises(ValueError):
pipeline.run(data={"hello": {"non_existing_input": "world"}})
# test case with flat component inputs
with pytest.raises(ValueError):
pipeline.run(data={"non_existing_input": "world"})
# important to check that the warning is logged for UX purposes, leave it here
assert "were not matched to any component" in caplog.text
def test_pipeline_resolution_with_mixed_correct_and_incorrect_input_names(caplog):
@component
class Hello:
@component.output_types(output=str)
def run(self, who: str):
"""
Takes a string in input and returns "Hello, <string>!"
in output.
"""
return {"output": f"Hello, {who}!"}
pipeline = Pipeline()
pipeline.add_component("hello", Hello())
pipeline.add_component("hello2", Hello())
pipeline.connect("hello.output", "hello2.who")
# test case with nested component inputs
# this will raise ValueError because hello component does not have an input named "non_existing_input"
# even though it has an input named "who"
with pytest.raises(ValueError):
pipeline.run(data={"hello": {"non_existing_input": "world", "who": "world"}})
# test case with flat component inputs
# this will not raise ValueError because the input "who" will be resolved to the correct component
# and we'll log a warning for the input "non_existing_input" which was not resolved
result = pipeline.run(data={"non_existing_input": "world", "who": "world"})
assert result == {"hello2": {"output": "Hello, Hello, world!!"}}
# important to check that the warning is logged for UX purposes, leave it here
assert "were not matched to any component" in caplog.text
def test_pipeline_resolution_duplicate_input_names_across_components():
@component
class Hello:
@component.output_types(output=str)
def run(self, who: str, what: str):
return {"output": f"Hello {who} {what}!"}
pipe = Pipeline()
pipe.add_component("hello", Hello())
pipe.add_component("hello2", Hello())
pipe.connect("hello.output", "hello2.who")
result = pipe.run(data={"what": "Haystack", "who": "world"})
assert result == {"hello2": {"output": "Hello Hello world Haystack! Haystack!"}}
resolved, _ = pipe._prepare_component_input_data(data={"what": "Haystack", "who": "world"})
# why does hello2 have only one input? Because who of hello2 is inserted from hello.output
assert resolved == {"hello": {"what": "Haystack", "who": "world"}, "hello2": {"what": "Haystack"}}