feat: Deprecate max_loops_allowed in favour of new argument max_runs_per_component (#8354)

* Deprecate max_loops_allowed in favour of new argument max_runs_per_component

* Add missing test file

* Some enhancements

* Add version that will remove deprecate stuff
This commit is contained in:
Silvano Cerza 2024-09-12 11:00:12 +02:00 committed by GitHub
parent 3016c5ca93
commit 5514676b5e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 232 additions and 35 deletions

View File

@ -2,6 +2,11 @@
#
# SPDX-License-Identifier: Apache-2.0
import warnings
# TODO: Remove this when PipelineMaxLoops is removed
warnings.filterwarnings("default", category=DeprecationWarning, module=__name__)
class PipelineError(Exception):
pass
@ -24,6 +29,15 @@ class PipelineDrawingError(PipelineError):
class PipelineMaxLoops(PipelineError):
# NOTE: This is shown also when importing PipelineMaxComponentRuns, I can't find an easy
# way to fix this, so I will ignore that case.
warnings.warn(
"PipelineMaxLoops is deprecated and will be remove in version '2.7.0'; use PipelineMaxComponentRuns instead.",
DeprecationWarning,
)
class PipelineMaxComponentRuns(PipelineMaxLoops):
pass

View File

@ -4,6 +4,7 @@
import importlib
import itertools
import warnings
from collections import defaultdict
from copy import copy, deepcopy
from datetime import datetime
@ -41,6 +42,11 @@ T = TypeVar("T", bound="PipelineBase")
logger = logging.getLogger(__name__)
_MAX_LOOPS_ALLOWED_DEPRECATION_MESSAGE = (
"'max_loops_allowed' argument is deprecated and will be removed in version '2.7.0'. "
"Use 'max_runs_per_component' instead."
)
class PipelineBase:
"""
@ -52,29 +58,63 @@ class PipelineBase:
def __init__(
self,
metadata: Optional[Dict[str, Any]] = None,
max_loops_allowed: int = 100,
max_loops_allowed: Optional[int] = None,
debug_path: Union[Path, str] = Path(".haystack_debug/"),
max_runs_per_component: int = 100,
):
"""
Creates the Pipeline.
:param metadata:
Arbitrary dictionary to store metadata about this pipeline. Make sure all the values contained in
this dictionary can be serialized and deserialized if you wish to save this pipeline to file with
`save_pipelines()/load_pipelines()`.
Arbitrary dictionary to store metadata about this `Pipeline`. Make sure all the values contained in
this dictionary can be serialized and deserialized if you wish to save this `Pipeline` to file.
:param max_loops_allowed:
How many times the pipeline can run the same node before throwing an exception.
How many times the `Pipeline` can run the same node before throwing an exception.
This is deprecated and will be removed in version 2.7.0, use `max_runs_per_component` instead.
:param debug_path:
When debug is enabled in `run()`, where to save the debug data.
:param max_runs_per_component:
How many times the `Pipeline` can run the same Component.
If this limit is reached a `PipelineMaxComponentRuns` exception is raised.
If not set defaults to 100 runs per Component.
"""
self._telemetry_runs = 0
self._last_telemetry_sent: Optional[datetime] = None
self.metadata = metadata or {}
self.max_loops_allowed = max_loops_allowed
self.graph = networkx.MultiDiGraph()
self._debug: Dict[int, Dict[str, Any]] = {}
self._debug_path = Path(debug_path)
if max_loops_allowed is not None:
warnings.warn(_MAX_LOOPS_ALLOWED_DEPRECATION_MESSAGE, DeprecationWarning)
self._max_runs_per_component = max_loops_allowed
else:
self._max_runs_per_component = max_runs_per_component
@property
def max_loops_allowed(self) -> int:
"""
Returns the maximum number of runs per Component allowed in this Pipeline.
This is a deprecated field, use `max_runs_per_component` instead.
:return: Maximum number of runs per Component
"""
warnings.warn(_MAX_LOOPS_ALLOWED_DEPRECATION_MESSAGE, DeprecationWarning)
return self._max_runs_per_component
@max_loops_allowed.setter
def max_loops_allowed(self, value: int):
"""
Sets the maximum number of runs per Component allowed in this Pipeline.
This is a deprecated property, use `max_runs_per_component` instead.
:param value: Maximum number of runs per Component
"""
warnings.warn(_MAX_LOOPS_ALLOWED_DEPRECATION_MESSAGE, DeprecationWarning)
self._max_runs_per_component = value
def __eq__(self, other) -> bool:
"""
Pipeline equality is defined by their type and the equality of their serialized form.
@ -128,7 +168,7 @@ class PipelineBase:
connections.append({"sender": f"{sender}.{sender_socket}", "receiver": f"{receiver}.{receiver_socket}"})
return {
"metadata": self.metadata,
"max_loops_allowed": self.max_loops_allowed,
"max_runs_per_component": self._max_runs_per_component,
"components": components,
"connections": connections,
}
@ -152,9 +192,15 @@ class PipelineBase:
"""
data_copy = deepcopy(data) # to prevent modification of original data
metadata = data_copy.get("metadata", {})
max_loops_allowed = data_copy.get("max_loops_allowed", 100)
max_runs_per_component = data_copy.get("max_runs_per_component", 100)
max_loops_allowed = data_copy.get("max_loops_allowed", None)
debug_path = Path(data_copy.get("debug_path", ".haystack_debug/"))
pipe = cls(metadata=metadata, max_loops_allowed=max_loops_allowed, debug_path=debug_path)
pipe = cls(
metadata=metadata,
max_loops_allowed=max_loops_allowed,
max_runs_per_component=max_runs_per_component,
debug_path=debug_path,
)
components_to_reuse = kwargs.get("components", {})
for name, component_data in data_copy.get("components", {}).items():
if name in components_to_reuse:

View File

@ -8,7 +8,7 @@ from warnings import warn
from haystack import logging, tracing
from haystack.core.component import Component
from haystack.core.errors import PipelineMaxLoops, PipelineRuntimeError
from haystack.core.errors import PipelineMaxComponentRuns, PipelineRuntimeError
from haystack.core.pipeline.base import (
_dequeue_component,
_dequeue_waiting_component,
@ -205,7 +205,8 @@ class Pipeline(PipelineBase):
"haystack.pipeline.input_data": data,
"haystack.pipeline.output_data": final_outputs,
"haystack.pipeline.metadata": self.metadata,
"haystack.pipeline.max_loops_allowed": self.max_loops_allowed,
"haystack.pipeline.max_loops_allowed": self._max_runs_per_component,
"haystack.pipeline.max_runs_per_component": self._max_runs_per_component,
},
):
# Cache for extra outputs, if enabled.
@ -221,9 +222,9 @@ class Pipeline(PipelineBase):
continue
if self._component_has_enough_inputs_to_run(name, components_inputs):
if self.graph.nodes[name]["visits"] > self.max_loops_allowed:
msg = f"Maximum loops count ({self.max_loops_allowed}) exceeded for component '{name}'"
raise PipelineMaxLoops(msg)
if self.graph.nodes[name]["visits"] > self._max_runs_per_component:
msg = f"Maximum run count {self._max_runs_per_component} reached for component '{name}'"
raise PipelineMaxComponentRuns(msg)
res: Dict[str, Any] = self._run_component(name, components_inputs[name])

View File

@ -0,0 +1,12 @@
---
enhancements:
- |
Add new `Pipeline` init argument `max_runs_per_component`, this has the same identical
behaviour as the existing `max_loops_allowed` argument but is more descriptive of its actual effects.
- |
Add new `PipelineMaxLoops` to reflect new `max_runs_per_component` init argument
deprecations:
- |
`Pipeline` init argument `max_loops_allowed` is deprecated and will be remove in version `2.7.0`. Use `max_runs_per_component` instead.
- |
`PipelineMaxLoops` exception is deprecated and will be remove in version `2.7.0`. Use `PipelineMaxComponentRuns` instead.

View File

@ -48,5 +48,5 @@ Feature: Pipeline running
Examples:
| kind | exception |
| that has an infinite loop | PipelineMaxLoops |
| that has an infinite loop | PipelineMaxComponentRuns |
| that has a component that doesn't return a dictionary | PipelineRuntimeError |

View File

@ -73,7 +73,7 @@ def pipeline_that_has_an_infinite_loop():
component.set_output_types(self, a=int, b=int)
FakeComponent = component_class("FakeComponent", output={"a": 1, "b": 1}, extra_fields={"__init__": custom_init})
pipe = Pipeline(max_loops_allowed=1)
pipe = Pipeline(max_runs_per_component=1)
pipe.add_component("first", FakeComponent())
pipe.add_component("second", FakeComponent())
pipe.connect("first.a", "second.x")
@ -83,7 +83,7 @@ def pipeline_that_has_an_infinite_loop():
@given("a pipeline that is really complex with lots of components, forks, and loops", target_fixture="pipeline_data")
def pipeline_complex():
pipeline = Pipeline(max_loops_allowed=2)
pipeline = Pipeline(max_runs_per_component=2)
pipeline.add_component("greet_first", Greet(message="Hello, the value is {value}."))
pipeline.add_component("accumulate_1", Accumulate())
pipeline.add_component("add_two", AddFixedValue(add=2))
@ -205,7 +205,7 @@ def pipeline_that_has_a_single_component_with_a_default_input():
@given("a pipeline that has two loops of identical lengths", target_fixture="pipeline_data")
def pipeline_that_has_two_loops_of_identical_lengths():
pipeline = Pipeline(max_loops_allowed=10)
pipeline = Pipeline(max_runs_per_component=10)
pipeline.add_component("branch_joiner", BranchJoiner(type_=int))
pipeline.add_component("remainder", Remainder(divisor=3))
pipeline.add_component("add_one", AddFixedValue(add=1))
@ -250,7 +250,7 @@ def pipeline_that_has_two_loops_of_identical_lengths():
@given("a pipeline that has two loops of different lengths", target_fixture="pipeline_data")
def pipeline_that_has_two_loops_of_different_lengths():
pipeline = Pipeline(max_loops_allowed=10)
pipeline = Pipeline(max_runs_per_component=10)
pipeline.add_component("branch_joiner", BranchJoiner(type_=int))
pipeline.add_component("remainder", Remainder(divisor=3))
pipeline.add_component("add_one", AddFixedValue(add=1))
@ -306,7 +306,7 @@ def pipeline_that_has_two_loops_of_different_lengths():
@given("a pipeline that has a single loop with two conditional branches", target_fixture="pipeline_data")
def pipeline_that_has_a_single_loop_with_two_conditional_branches():
accumulator = Accumulate()
pipeline = Pipeline(max_loops_allowed=10)
pipeline = Pipeline(max_runs_per_component=10)
pipeline.add_component("add_one", AddFixedValue(add=1))
pipeline.add_component("branch_joiner", BranchJoiner(type_=int))
@ -503,7 +503,7 @@ def pipeline_that_has_different_combinations_of_branches_that_merge_and_do_not_m
@given("a pipeline that has two branches, one of which loops back", target_fixture="pipeline_data")
def pipeline_that_has_two_branches_one_of_which_loops_back():
pipeline = Pipeline(max_loops_allowed=10)
pipeline = Pipeline(max_runs_per_component=10)
pipeline.add_component("add_zero", AddFixedValue(add=0))
pipeline.add_component("branch_joiner", BranchJoiner(type_=int))
pipeline.add_component("sum", Sum())
@ -704,7 +704,7 @@ def pipeline_that_has_a_component_that_doesnt_return_a_dictionary():
output=1, # type:ignore
)
pipe = Pipeline(max_loops_allowed=10)
pipe = Pipeline(max_runs_per_component=10)
pipe.add_component("comp", BrokenComponent())
return pipe, [PipelineRunData({"comp": {"a": 1}})]
@ -916,7 +916,7 @@ def pipeline_that_has_a_component_with_only_default_inputs_as_first_to_run():
target_fixture="pipeline_data",
)
def pipeline_that_has_a_single_component_that_send_one_of_outputs_to_itself():
pipeline = Pipeline(max_loops_allowed=10)
pipeline = Pipeline(max_runs_per_component=10)
pipeline.add_component("self_loop", SelfLoop())
pipeline.connect("self_loop.current_value", "self_loop.values")
@ -934,7 +934,7 @@ def pipeline_that_has_a_single_component_that_send_one_of_outputs_to_itself():
@given("a pipeline that has a component that sends one of its outputs to itself", target_fixture="pipeline_data")
def pipeline_that_has_a_component_that_sends_one_of_its_outputs_to_itself():
pipeline = Pipeline(max_loops_allowed=10)
pipeline = Pipeline(max_runs_per_component=10)
pipeline.add_component("add_1", AddFixedValue())
pipeline.add_component("self_loop", SelfLoop())
pipeline.add_component("add_2", AddFixedValue())
@ -1068,7 +1068,7 @@ def pipeline_that_is_linear_and_returns_intermediate_outputs():
@given("a pipeline that has a loop and returns intermediate outputs from it", target_fixture="pipeline_data")
def pipeline_that_has_a_loop_and_returns_intermediate_outputs_from_it():
pipeline = Pipeline(max_loops_allowed=10)
pipeline = Pipeline(max_runs_per_component=10)
pipeline.add_component("add_one", AddFixedValue(add=1))
pipeline.add_component("branch_joiner", BranchJoiner(type_=int))
pipeline.add_component("below_10", Threshold(threshold=10))

View File

@ -55,20 +55,29 @@ class TestPipeline:
It doesn't test Pipeline.run(), that is done separately in a different way.
"""
def test_pipeline_dumps(self, test_files_path):
pipeline = Pipeline()
def test_pipeline_dumps_with_deprecated_max_loops_allowed(self, test_files_path):
pipeline = Pipeline(max_loops_allowed=99)
pipeline.add_component("Comp1", FakeComponent("Foo"))
pipeline.add_component("Comp2", FakeComponent())
pipeline.connect("Comp1.value", "Comp2.input_")
pipeline.max_loops_allowed = 99
result = pipeline.dumps()
with open(f"{test_files_path}/yaml/test_pipeline.yaml", "r") as f:
assert f.read() == result
def test_pipeline_loads(self, test_files_path):
def test_pipeline_dumps(self, test_files_path):
pipeline = Pipeline(max_runs_per_component=99)
pipeline.add_component("Comp1", FakeComponent("Foo"))
pipeline.add_component("Comp2", FakeComponent())
pipeline.connect("Comp1.value", "Comp2.input_")
result = pipeline.dumps()
with open(f"{test_files_path}/yaml/test_pipeline.yaml", "r") as f:
assert f.read() == result
def test_pipeline_loads_with_deprecated_max_loops_allowed(self, test_files_path):
with open(f"{test_files_path}/yaml/test_pipeline_deprecated.yaml", "r") as f:
pipeline = Pipeline.loads(f.read())
assert pipeline.max_loops_allowed == 99
assert pipeline._max_runs_per_component == 99
assert isinstance(pipeline.get_component("Comp1"), FakeComponent)
assert isinstance(pipeline.get_component("Comp2"), FakeComponent)
@ -111,22 +120,41 @@ class TestPipeline:
with pytest.raises(DeserializationError, match=".*Comp1.*unknown.*"):
pipeline = Pipeline.loads(invalid_init_parameter_yaml)
def test_pipeline_dump(self, test_files_path, tmp_path):
pipeline = Pipeline()
def test_pipeline_dump_with_deprecated_max_loops_allowed(self, test_files_path, tmp_path):
pipeline = Pipeline(max_loops_allowed=99)
pipeline.add_component("Comp1", FakeComponent("Foo"))
pipeline.add_component("Comp2", FakeComponent())
pipeline.connect("Comp1.value", "Comp2.input_")
pipeline.max_loops_allowed = 99
with open(tmp_path / "out.yaml", "w") as f:
pipeline.dump(f)
# re-open and ensure it's the same data as the test file
with open(f"{test_files_path}/yaml/test_pipeline.yaml", "r") as test_f, open(tmp_path / "out.yaml", "r") as f:
assert f.read() == test_f.read()
def test_pipeline_dump(self, test_files_path, tmp_path):
pipeline = Pipeline(max_runs_per_component=99)
pipeline.add_component("Comp1", FakeComponent("Foo"))
pipeline.add_component("Comp2", FakeComponent())
pipeline.connect("Comp1.value", "Comp2.input_")
with open(tmp_path / "out.yaml", "w") as f:
pipeline.dump(f)
# re-open and ensure it's the same data as the test file
with open(f"{test_files_path}/yaml/test_pipeline.yaml", "r") as test_f, open(tmp_path / "out.yaml", "r") as f:
assert f.read() == test_f.read()
def test_pipeline_load_with_deprecated_max_loops_allowed(self, test_files_path):
with open(f"{test_files_path}/yaml/test_pipeline_deprecated.yaml", "r") as f:
pipeline = Pipeline.load(f)
assert pipeline.max_loops_allowed == 99
assert pipeline._max_runs_per_component == 99
assert isinstance(pipeline.get_component("Comp1"), FakeComponent)
assert isinstance(pipeline.get_component("Comp2"), FakeComponent)
def test_pipeline_load(self, test_files_path):
with open(f"{test_files_path}/yaml/test_pipeline.yaml", "r") as f:
pipeline = Pipeline.load(f)
assert pipeline.max_loops_allowed == 99
assert pipeline._max_runs_per_component == 99
assert isinstance(pipeline.get_component("Comp1"), FakeComponent)
assert isinstance(pipeline.get_component("Comp2"), FakeComponent)
@ -296,7 +324,7 @@ class TestPipeline:
res = pipe.to_dict()
expected = {
"metadata": {"test": "test"},
"max_loops_allowed": 42,
"max_runs_per_component": 42,
"components": {
"add_two": {
"type": "haystack.testing.sample_components.add_value.AddFixedValue",
@ -316,7 +344,7 @@ class TestPipeline:
assert res == expected
# UNIT
def test_from_dict(self):
def test_from_dict_with_deprecated_max_loops_allowed(self):
data = {
"metadata": {"test": "test"},
"max_loops_allowed": 101,
@ -340,6 +368,86 @@ class TestPipeline:
assert pipe.metadata == {"test": "test"}
assert pipe.max_loops_allowed == 101
assert pipe._max_runs_per_component == 101
# Components
assert len(pipe.graph.nodes) == 3
## add_two
add_two = pipe.graph.nodes["add_two"]
assert add_two["instance"].add == 2
assert add_two["input_sockets"] == {
"value": InputSocket(name="value", type=int),
"add": InputSocket(name="add", type=Optional[int], default_value=None),
}
assert add_two["output_sockets"] == {"result": OutputSocket(name="result", type=int, receivers=["double"])}
assert add_two["visits"] == 0
## add_default
add_default = pipe.graph.nodes["add_default"]
assert add_default["instance"].add == 1
assert add_default["input_sockets"] == {
"value": InputSocket(name="value", type=int, senders=["double"]),
"add": InputSocket(name="add", type=Optional[int], default_value=None),
}
assert add_default["output_sockets"] == {"result": OutputSocket(name="result", type=int)}
assert add_default["visits"] == 0
## double
double = pipe.graph.nodes["double"]
assert double["instance"]
assert double["input_sockets"] == {"value": InputSocket(name="value", type=int, senders=["add_two"])}
assert double["output_sockets"] == {"value": OutputSocket(name="value", type=int, receivers=["add_default"])}
assert double["visits"] == 0
# Connections
connections = list(pipe.graph.edges(data=True))
assert len(connections) == 2
assert connections[0] == (
"add_two",
"double",
{
"conn_type": "int",
"from_socket": OutputSocket(name="result", type=int, receivers=["double"]),
"to_socket": InputSocket(name="value", type=int, senders=["add_two"]),
"mandatory": True,
},
)
assert connections[1] == (
"double",
"add_default",
{
"conn_type": "int",
"from_socket": OutputSocket(name="value", type=int, receivers=["add_default"]),
"to_socket": InputSocket(name="value", type=int, senders=["double"]),
"mandatory": True,
},
)
def test_from_dict(self):
data = {
"metadata": {"test": "test"},
"max_runs_per_component": 101,
"components": {
"add_two": {
"type": "haystack.testing.sample_components.add_value.AddFixedValue",
"init_parameters": {"add": 2},
},
"add_default": {
"type": "haystack.testing.sample_components.add_value.AddFixedValue",
"init_parameters": {"add": 1},
},
"double": {"type": "haystack.testing.sample_components.double.Double", "init_parameters": {}},
},
"connections": [
{"sender": "add_two.result", "receiver": "double.value"},
{"sender": "double.value", "receiver": "add_default.value"},
],
}
pipe = Pipeline.from_dict(data)
assert pipe.metadata == {"test": "test"}
assert pipe.max_loops_allowed == 101
assert pipe._max_runs_per_component == 101
# Components
assert len(pipe.graph.nodes) == 3

View File

@ -46,6 +46,7 @@ class TestTracing:
"haystack.pipeline.output_data": {"hello2": {"output": "Hello, Hello, world!!"}},
"haystack.pipeline.metadata": {},
"haystack.pipeline.max_loops_allowed": 100,
"haystack.pipeline.max_runs_per_component": 100,
},
trace_id=ANY,
span_id=ANY,
@ -100,6 +101,7 @@ class TestTracing:
tags={
"haystack.pipeline.metadata": {},
"haystack.pipeline.max_loops_allowed": 100,
"haystack.pipeline.max_runs_per_component": 100,
"haystack.pipeline.input_data": {"hello": {"word": "world"}},
"haystack.pipeline.output_data": {"hello2": {"output": "Hello, Hello, world!!"}},
},

View File

@ -10,5 +10,5 @@ components:
connections:
- receiver: Comp2.input_
sender: Comp1.value
max_loops_allowed: 99
max_runs_per_component: 99
metadata: {}

View File

@ -0,0 +1,14 @@
components:
Comp1:
init_parameters:
an_init_param: null
type: test.core.pipeline.test_pipeline.FakeComponent
Comp2:
init_parameters:
an_init_param: null
type: test.core.pipeline.test_pipeline.FakeComponent
connections:
- receiver: Comp2.input_
sender: Comp1.value
max_loops_allowed: 99
metadata: {}