Remove Multiplexer and related tests (#8020)

This commit is contained in:
Amna Mubashar 2024-07-16 15:39:40 +02:00 committed by GitHub
parent 0411cd938a
commit 499fbcc59f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 84 additions and 267 deletions

View File

@ -1,7 +0,0 @@
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
#
# SPDX-License-Identifier: Apache-2.0
from haystack.components.others.multiplexer import Multiplexer
__all__ = ["Multiplexer"]

View File

@ -1,151 +0,0 @@
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
#
# SPDX-License-Identifier: Apache-2.0
import sys
import warnings
from typing import Any, Dict
from haystack import component, default_from_dict, default_to_dict, logging
from haystack.core.component.types import Variadic
from haystack.utils import deserialize_type, serialize_type
if sys.version_info < (3, 10):
from typing_extensions import TypeAlias
else:
from typing import TypeAlias
logger = logging.getLogger(__name__)
@component(is_greedy=True)
class Multiplexer:
"""
A component which receives data connections from multiple components and distributes them to multiple components.
`Multiplexer` offers the ability to both receive data connections from multiple other
components and to distribute it to various other components, enhancing the functionality of complex data
processing pipelines.
`Multiplexer` is important for spreading outputs from a single source like a Large Language Model (LLM) across
different branches of a pipeline. It is especially valuable in error correction loops by rerouting data for
reevaluation if errors are detected. For instance, in an example pipeline below, `Multiplexer` helps create
a schema valid JSON object (given a person's data) with the help of an `OpenAIChatGenerator` and a
`JsonSchemaValidator`.
In case the generated JSON object fails schema validation, `JsonSchemaValidator` starts a correction loop, sending
the data back through the `Multiplexer` to the `OpenAIChatGenerator` until it passes schema validation. If we didn't
have `Multiplexer`, we wouldn't be able to loop back the data to `OpenAIChatGenerator` for re-generation, as
components accept only one input connection for the declared run method parameters.
Usage example:
```python
import json
from typing import List
from haystack import Pipeline
from haystack.components.converters import OutputAdapter
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.others import Multiplexer
from haystack.components.validators import JsonSchemaValidator
from haystack.dataclasses import ChatMessage
person_schema = {
"type": "object",
"properties": {
"first_name": {"type": "string", "pattern": "^[A-Z][a-z]+$"},
"last_name": {"type": "string", "pattern": "^[A-Z][a-z]+$"},
"nationality": {"type": "string", "enum": ["Italian", "Portuguese", "American"]},
},
"required": ["first_name", "last_name", "nationality"]
}
# Initialize a pipeline
pipe = Pipeline()
# Add components to the pipeline
pipe.add_component('mx', Multiplexer(List[ChatMessage]))
pipe.add_component('fc_llm', OpenAIChatGenerator(model="gpt-3.5-turbo-0125"))
pipe.add_component('validator', JsonSchemaValidator(json_schema=person_schema))
pipe.add_component('adapter', OutputAdapter("{{chat_message}}", List[ChatMessage])),
# And connect them
pipe.connect("adapter", "mx")
pipe.connect("mx", "fc_llm")
pipe.connect("fc_llm.replies", "validator.messages")
pipe.connect("validator.validation_error", "mx")
result = pipe.run(data={"fc_llm": {"generation_kwargs": {"response_format": {"type": "json_object"}}},
"adapter": {"chat_message": [ChatMessage.from_user("Create json from Peter Parker")]}})
print(json.loads(result["validator"]["validated"][0].content))
>> {'first_name': 'Peter', 'last_name': 'Parker', 'nationality': 'American', 'name': 'Spider-Man', 'occupation':
>> 'Superhero', 'age': 23, 'location': 'New York City'}
```
Note that `Multiplexer` is created with a single type parameter. This determines the
type of data that `Multiplexer` will receive from the upstream connected components and also the
type of data that `Multiplexer` will distribute to the downstream connected components. In the example
above, the `Multiplexer` is created with the type `List[ChatMessage]`. This means `Multiplexer` will receive
a list of `ChatMessage` objects from the upstream connected components and also distribute a list of `ChatMessage`
objects to the downstream connected components.
In the code example, `Multiplexer` receives a looped back `List[ChatMessage]` from the `JsonSchemaValidator` and
sends it down to the `OpenAIChatGenerator` for re-generation. We can have multiple loop back connections in the
pipeline. In this instance, the downstream component is only one the `OpenAIChatGenerator` but the pipeline can
have more than one downstream component.
"""
def __init__(self, type_: TypeAlias):
"""
Create a `Multiplexer` component.
:param type_: The type of data that the `Multiplexer` will receive from the upstream connected components and
distribute to the downstream connected components.
"""
warnings.warn(
"`Multiplexer` is deprecated and will be removed in Haystack 2.4.0. Use `joiners.BranchJoiner` instead.",
DeprecationWarning,
)
self.type_ = type_
component.set_input_types(self, value=Variadic[type_])
component.set_output_types(self, value=type_)
def to_dict(self):
"""
Serializes the component to a dictionary.
:returns:
Dictionary with serialized data.
"""
return default_to_dict(self, type_=serialize_type(self.type_))
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Multiplexer":
"""
Deserializes the component from a dictionary.
:param data:
Dictionary to deserialize from.
:returns:
Deserialized component.
"""
data["init_parameters"]["type_"] = deserialize_type(data["init_parameters"]["type_"])
return default_from_dict(cls, data)
def run(self, **kwargs):
"""
The run method of the `Multiplexer` component.
Multiplexes the input data from the upstream connected components and distributes it to the downstream connected
components.
:param **kwargs: The input data. Must be of the type declared in `__init__`.
:return: A dictionary with the following keys:
- `value`: The input data.
"""
if (inputs_count := len(kwargs["value"])) != 1:
raise ValueError(f"Multiplexer expects only one input, but {inputs_count} were received.")
return {"value": kwargs["value"][0]}

View File

@ -44,7 +44,7 @@ class JsonSchemaValidator:
from haystack import Pipeline
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.others import Multiplexer
from haystack.components.joiners import BranchJoiner
from haystack.components.validators import JsonSchemaValidator
from haystack import component
from haystack.dataclasses import ChatMessage
@ -62,13 +62,13 @@ class JsonSchemaValidator:
p.add_component("llm", OpenAIChatGenerator(model="gpt-4-1106-preview",
generation_kwargs={"response_format": {"type": "json_object"}}))
p.add_component("schema_validator", JsonSchemaValidator())
p.add_component("mx_for_llm", Multiplexer(List[ChatMessage]))
p.add_component("joiner_for_llm", BranchJoiner(List[ChatMessage]))
p.add_component("message_producer", MessageProducer())
p.connect("message_producer.messages", "mx_for_llm")
p.connect("mx_for_llm", "llm")
p.connect("message_producer.messages", "joiner_for_llm")
p.connect("joiner_for_llm", "llm")
p.connect("llm.replies", "schema_validator.messages")
p.connect("schema_validator.validation_error", "mx_for_llm")
p.connect("schema_validator.validation_error", "joiner_for_llm")
result = p.run(data={
"message_producer": {

View File

@ -0,0 +1,4 @@
---
upgrade:
- |
`Multiplexer` is removed and users should switch to `BranchJoiner` instead.

View File

@ -1,35 +0,0 @@
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
#
# SPDX-License-Identifier: Apache-2.0
import pytest
from haystack.components.others import Multiplexer
class TestMultiplexer:
def test_one_value(self):
multiplexer = Multiplexer(int)
output = multiplexer.run(value=[2])
assert output == {"value": 2}
def test_one_value_of_wrong_type(self):
# Multiplexer does not type check the input
multiplexer = Multiplexer(int)
output = multiplexer.run(value=["hello"])
assert output == {"value": "hello"}
def test_one_value_of_none_type(self):
# Multiplexer does not type check the input
multiplexer = Multiplexer(int)
output = multiplexer.run(value=[None])
assert output == {"value": None}
def test_more_values_of_expected_type(self):
multiplexer = Multiplexer(int)
with pytest.raises(ValueError, match="Multiplexer expects only one input, but 3 were received."):
multiplexer.run(value=[2, 3, 4])
def test_no_values(self):
multiplexer = Multiplexer(int)
with pytest.raises(ValueError, match="Multiplexer expects only one input, but 0 were received."):
multiplexer.run(value=[])

View File

@ -9,7 +9,7 @@ from haystack.components.routers import ConditionalRouter
from haystack.components.builders import PromptBuilder, AnswerBuilder
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.others import Multiplexer
from haystack.components.joiners import BranchJoiner
from haystack.testing.sample_components import (
Accumulate,
AddFixedValue,
@ -91,7 +91,7 @@ def pipeline_complex():
pipeline.add_component("add_one", AddFixedValue(add=1))
pipeline.add_component("accumulate_2", Accumulate())
pipeline.add_component("multiplexer", Multiplexer(type_=int))
pipeline.add_component("branch_joiner", BranchJoiner(type_=int))
pipeline.add_component("below_10", Threshold(threshold=10))
pipeline.add_component("double", Double())
@ -123,11 +123,11 @@ def pipeline_complex():
pipeline.connect("add_four", "accumulate_3")
pipeline.connect("parity.odd", "add_one.value")
pipeline.connect("add_one", "multiplexer.value")
pipeline.connect("multiplexer", "below_10")
pipeline.connect("add_one", "branch_joiner.value")
pipeline.connect("branch_joiner", "below_10")
pipeline.connect("below_10.below", "double")
pipeline.connect("double", "multiplexer.value")
pipeline.connect("double", "branch_joiner.value")
pipeline.connect("below_10.above", "accumulate_2")
pipeline.connect("accumulate_2", "diff.second_value")
@ -150,13 +150,13 @@ def pipeline_complex():
"add_two",
"parity",
"add_one",
"multiplexer",
"branch_joiner",
"below_10",
"double",
"multiplexer",
"branch_joiner",
"below_10",
"double",
"multiplexer",
"branch_joiner",
"below_10",
"accumulate_2",
"greet_enumerator",
@ -206,43 +206,43 @@ def pipeline_that_has_a_single_component_with_a_default_input():
@given("a pipeline that has two loops of identical lengths", target_fixture="pipeline_data")
def pipeline_that_has_two_loops_of_identical_lengths():
pipeline = Pipeline(max_loops_allowed=10)
pipeline.add_component("multiplexer", Multiplexer(type_=int))
pipeline.add_component("branch_joiner", BranchJoiner(type_=int))
pipeline.add_component("remainder", Remainder(divisor=3))
pipeline.add_component("add_one", AddFixedValue(add=1))
pipeline.add_component("add_two", AddFixedValue(add=2))
pipeline.connect("multiplexer.value", "remainder.value")
pipeline.connect("branch_joiner.value", "remainder.value")
pipeline.connect("remainder.remainder_is_1", "add_two.value")
pipeline.connect("remainder.remainder_is_2", "add_one.value")
pipeline.connect("add_two", "multiplexer.value")
pipeline.connect("add_one", "multiplexer.value")
pipeline.connect("add_two", "branch_joiner.value")
pipeline.connect("add_one", "branch_joiner.value")
return (
pipeline,
[
PipelineRunData(
inputs={"multiplexer": {"value": 0}},
inputs={"branch_joiner": {"value": 0}},
expected_outputs={"remainder": {"remainder_is_0": 0}},
expected_run_order=["multiplexer", "remainder"],
expected_run_order=["branch_joiner", "remainder"],
),
PipelineRunData(
inputs={"multiplexer": {"value": 3}},
inputs={"branch_joiner": {"value": 3}},
expected_outputs={"remainder": {"remainder_is_0": 3}},
expected_run_order=["multiplexer", "remainder"],
expected_run_order=["branch_joiner", "remainder"],
),
PipelineRunData(
inputs={"multiplexer": {"value": 4}},
inputs={"branch_joiner": {"value": 4}},
expected_outputs={"remainder": {"remainder_is_0": 6}},
expected_run_order=["multiplexer", "remainder", "add_two", "multiplexer", "remainder"],
expected_run_order=["branch_joiner", "remainder", "add_two", "branch_joiner", "remainder"],
),
PipelineRunData(
inputs={"multiplexer": {"value": 5}},
inputs={"branch_joiner": {"value": 5}},
expected_outputs={"remainder": {"remainder_is_0": 6}},
expected_run_order=["multiplexer", "remainder", "add_one", "multiplexer", "remainder"],
expected_run_order=["branch_joiner", "remainder", "add_one", "branch_joiner", "remainder"],
),
PipelineRunData(
inputs={"multiplexer": {"value": 6}},
inputs={"branch_joiner": {"value": 6}},
expected_outputs={"remainder": {"remainder_is_0": 6}},
expected_run_order=["multiplexer", "remainder"],
expected_run_order=["branch_joiner", "remainder"],
),
],
)
@ -251,46 +251,53 @@ def pipeline_that_has_two_loops_of_identical_lengths():
@given("a pipeline that has two loops of different lengths", target_fixture="pipeline_data")
def pipeline_that_has_two_loops_of_different_lengths():
pipeline = Pipeline(max_loops_allowed=10)
pipeline.add_component("multiplexer", Multiplexer(type_=int))
pipeline.add_component("branch_joiner", BranchJoiner(type_=int))
pipeline.add_component("remainder", Remainder(divisor=3))
pipeline.add_component("add_one", AddFixedValue(add=1))
pipeline.add_component("add_two_1", AddFixedValue(add=1))
pipeline.add_component("add_two_2", AddFixedValue(add=1))
pipeline.connect("multiplexer.value", "remainder.value")
pipeline.connect("branch_joiner.value", "remainder.value")
pipeline.connect("remainder.remainder_is_1", "add_two_1.value")
pipeline.connect("add_two_1", "add_two_2.value")
pipeline.connect("add_two_2", "multiplexer")
pipeline.connect("add_two_2", "branch_joiner")
pipeline.connect("remainder.remainder_is_2", "add_one.value")
pipeline.connect("add_one", "multiplexer")
pipeline.connect("add_one", "branch_joiner")
return (
pipeline,
[
PipelineRunData(
inputs={"multiplexer": {"value": 0}},
inputs={"branch_joiner": {"value": 0}},
expected_outputs={"remainder": {"remainder_is_0": 0}},
expected_run_order=["multiplexer", "remainder"],
expected_run_order=["branch_joiner", "remainder"],
),
PipelineRunData(
inputs={"multiplexer": {"value": 3}},
inputs={"branch_joiner": {"value": 3}},
expected_outputs={"remainder": {"remainder_is_0": 3}},
expected_run_order=["multiplexer", "remainder"],
expected_run_order=["branch_joiner", "remainder"],
),
PipelineRunData(
inputs={"multiplexer": {"value": 4}},
inputs={"branch_joiner": {"value": 4}},
expected_outputs={"remainder": {"remainder_is_0": 6}},
expected_run_order=["multiplexer", "remainder", "add_two_1", "add_two_2", "multiplexer", "remainder"],
expected_run_order=[
"branch_joiner",
"remainder",
"add_two_1",
"add_two_2",
"branch_joiner",
"remainder",
],
),
PipelineRunData(
inputs={"multiplexer": {"value": 5}},
inputs={"branch_joiner": {"value": 5}},
expected_outputs={"remainder": {"remainder_is_0": 6}},
expected_run_order=["multiplexer", "remainder", "add_one", "multiplexer", "remainder"],
expected_run_order=["branch_joiner", "remainder", "add_one", "branch_joiner", "remainder"],
),
PipelineRunData(
inputs={"multiplexer": {"value": 6}},
inputs={"branch_joiner": {"value": 6}},
expected_outputs={"remainder": {"remainder_is_0": 6}},
expected_run_order=["multiplexer", "remainder"],
expected_run_order=["branch_joiner", "remainder"],
),
],
)
@ -302,20 +309,20 @@ def pipeline_that_has_a_single_loop_with_two_conditional_branches():
pipeline = Pipeline(max_loops_allowed=10)
pipeline.add_component("add_one", AddFixedValue(add=1))
pipeline.add_component("multiplexer", Multiplexer(type_=int))
pipeline.add_component("branch_joiner", BranchJoiner(type_=int))
pipeline.add_component("below_10", Threshold(threshold=10))
pipeline.add_component("below_5", Threshold(threshold=5))
pipeline.add_component("add_three", AddFixedValue(add=3))
pipeline.add_component("accumulator", accumulator)
pipeline.add_component("add_two", AddFixedValue(add=2))
pipeline.connect("add_one.result", "multiplexer")
pipeline.connect("multiplexer.value", "below_10.value")
pipeline.connect("add_one.result", "branch_joiner")
pipeline.connect("branch_joiner.value", "below_10.value")
pipeline.connect("below_10.below", "accumulator.value")
pipeline.connect("accumulator.value", "below_5.value")
pipeline.connect("below_5.above", "add_three.value")
pipeline.connect("below_5.below", "multiplexer")
pipeline.connect("add_three.result", "multiplexer")
pipeline.connect("below_5.below", "branch_joiner")
pipeline.connect("add_three.result", "branch_joiner")
pipeline.connect("below_10.above", "add_two.value")
return (
@ -326,16 +333,16 @@ def pipeline_that_has_a_single_loop_with_two_conditional_branches():
expected_outputs={"add_two": {"result": 13}},
expected_run_order=[
"add_one",
"multiplexer",
"branch_joiner",
"below_10",
"accumulator",
"below_5",
"multiplexer",
"branch_joiner",
"below_10",
"accumulator",
"below_5",
"add_three",
"multiplexer",
"branch_joiner",
"below_10",
"add_two",
],
@ -498,18 +505,18 @@ def pipeline_that_has_different_combinations_of_branches_that_merge_and_do_not_m
def pipeline_that_has_two_branches_one_of_which_loops_back():
pipeline = Pipeline(max_loops_allowed=10)
pipeline.add_component("add_zero", AddFixedValue(add=0))
pipeline.add_component("multiplexer", Multiplexer(type_=int))
pipeline.add_component("branch_joiner", BranchJoiner(type_=int))
pipeline.add_component("sum", Sum())
pipeline.add_component("below_10", Threshold(threshold=10))
pipeline.add_component("add_one", AddFixedValue(add=1))
pipeline.add_component("counter", Accumulate())
pipeline.add_component("add_two", AddFixedValue(add=2))
pipeline.connect("add_zero", "multiplexer.value")
pipeline.connect("multiplexer", "below_10.value")
pipeline.connect("add_zero", "branch_joiner.value")
pipeline.connect("branch_joiner", "below_10.value")
pipeline.connect("below_10.below", "add_one.value")
pipeline.connect("add_one.result", "counter.value")
pipeline.connect("counter.value", "multiplexer.value")
pipeline.connect("counter.value", "branch_joiner.value")
pipeline.connect("below_10.above", "add_two.value")
pipeline.connect("add_two.result", "sum.values")
@ -521,15 +528,15 @@ def pipeline_that_has_two_branches_one_of_which_loops_back():
expected_outputs={"sum": {"total": 23}},
expected_run_order=[
"add_zero",
"multiplexer",
"branch_joiner",
"below_10",
"add_one",
"counter",
"multiplexer",
"branch_joiner",
"below_10",
"add_one",
"counter",
"multiplexer",
"branch_joiner",
"below_10",
"add_two",
"sum",
@ -660,10 +667,10 @@ def pipeline_that_has_a_greedy_and_variadic_component_after_a_component_with_def
template = "Given this documents: {{ documents|join(', ', attribute='content') }} Answer this question: {{ query }}"
pipeline.add_component("retriever", InMemoryBM25Retriever(document_store=document_store))
pipeline.add_component("prompt_builder", PromptBuilder(template=template))
pipeline.add_component("multiplexer", Multiplexer(List[Document]))
pipeline.add_component("branch_joiner", BranchJoiner(List[Document]))
pipeline.connect("retriever", "multiplexer")
pipeline.connect("multiplexer", "prompt_builder.documents")
pipeline.connect("retriever", "branch_joiner")
pipeline.connect("branch_joiner", "prompt_builder.documents")
return (
pipeline,
[
@ -682,7 +689,7 @@ def pipeline_that_has_a_greedy_and_variadic_component_after_a_component_with_def
"question"
}
},
expected_run_order=["retriever", "multiplexer", "prompt_builder"],
expected_run_order=["retriever", "branch_joiner", "prompt_builder"],
)
],
)
@ -1063,20 +1070,20 @@ def pipeline_that_is_linear_and_returns_intermediate_outputs():
def pipeline_that_has_a_loop_and_returns_intermediate_outputs_from_it():
pipeline = Pipeline(max_loops_allowed=10)
pipeline.add_component("add_one", AddFixedValue(add=1))
pipeline.add_component("multiplexer", Multiplexer(type_=int))
pipeline.add_component("branch_joiner", BranchJoiner(type_=int))
pipeline.add_component("below_10", Threshold(threshold=10))
pipeline.add_component("below_5", Threshold(threshold=5))
pipeline.add_component("add_three", AddFixedValue(add=3))
pipeline.add_component("accumulator", Accumulate())
pipeline.add_component("add_two", AddFixedValue(add=2))
pipeline.connect("add_one.result", "multiplexer")
pipeline.connect("multiplexer.value", "below_10.value")
pipeline.connect("add_one.result", "branch_joiner")
pipeline.connect("branch_joiner.value", "below_10.value")
pipeline.connect("below_10.below", "accumulator.value")
pipeline.connect("accumulator.value", "below_5.value")
pipeline.connect("below_5.above", "add_three.value")
pipeline.connect("below_5.below", "multiplexer")
pipeline.connect("add_three.result", "multiplexer")
pipeline.connect("below_5.below", "branch_joiner")
pipeline.connect("add_three.result", "branch_joiner")
pipeline.connect("below_10.above", "add_two.value")
return (
@ -1087,7 +1094,7 @@ def pipeline_that_has_a_loop_and_returns_intermediate_outputs_from_it():
include_outputs_from={
"add_two",
"add_one",
"multiplexer",
"branch_joiner",
"below_10",
"accumulator",
"below_5",
@ -1096,7 +1103,7 @@ def pipeline_that_has_a_loop_and_returns_intermediate_outputs_from_it():
expected_outputs={
"add_two": {"result": 13},
"add_one": {"result": 4},
"multiplexer": {"value": 11},
"branch_joiner": {"value": 11},
"below_10": {"above": 11},
"accumulator": {"value": 8},
"below_5": {"above": 8},
@ -1104,16 +1111,16 @@ def pipeline_that_has_a_loop_and_returns_intermediate_outputs_from_it():
},
expected_run_order=[
"add_one",
"multiplexer",
"branch_joiner",
"below_10",
"accumulator",
"below_5",
"multiplexer",
"branch_joiner",
"below_10",
"accumulator",
"below_5",
"add_three",
"multiplexer",
"branch_joiner",
"below_10",
"add_two",
],

View File

@ -10,7 +10,6 @@ import pytest
from haystack import Document
from haystack.components.builders import PromptBuilder, AnswerBuilder
from haystack.components.joiners import BranchJoiner
from haystack.components.others import Multiplexer
from haystack.core.component import component
from haystack.core.component.types import InputSocket, OutputSocket, Variadic
from haystack.core.errors import DeserializationError, PipelineConnectError, PipelineDrawingError, PipelineError
@ -855,17 +854,17 @@ class TestPipeline:
{{ questions | join("\n") }}
"""
pipe.add_component("prompt_builder", PromptBuilder(template=template))
pipe.add_component("multiplexer", Multiplexer(type_=int))
pipe.add_component("branch_joiner", BranchJoiner(type_=int))
questions = ["What is the capital of Italy?", "What is the capital of France?"]
data = {
"prompt_builder": {"questions": questions},
"multiplexer": {"value": 1},
"branch_joiner": {"value": 1},
"not_a_component": "some input data",
}
res = pipe._init_inputs_state(data)
assert res == {
"prompt_builder": {"questions": ["What is the capital of Italy?", "What is the capital of France?"]},
"multiplexer": {"value": [1]},
"branch_joiner": {"value": [1]},
"not_a_component": "some input data",
}
assert id(questions) != id(res["prompt_builder"]["questions"])