docs: Multiplexer - review and normalize docstrings for component.others (#7276)

* Multiplexer - review and normalize docstrings

* Update PR - Stefano

* simplify language

* re-generation

---------

Co-authored-by: Daria Fokina <daria.fokina@deepset.ai>
This commit is contained in:
Vladimir Blagojevic 2024-03-01 16:44:39 +01:00 committed by GitHub
parent d17f180dd2
commit 9ab5c95c19
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -17,52 +17,121 @@ logger = logging.getLogger(__name__)
@component(is_greedy=True)
class Multiplexer:
"""
This component is used to distribute a single value to many components that may need it.
It can take such value from different sources (the user's input, or another component), so
it's only input is of Variadic type.
`Multiplexer` offers the ability to both receive data connections from multiple other
components and to distribute it to various other components, enhancing the functionality of complex data
processing pipelines.
The type of the expected input (and therefore of the output as well) must be given at init time.
`Multiplexer` is important for spreading outputs from a single source like a Large Language Model (LLM) across
different branches of a pipeline. It is especially valuable in error correction loops by rerouting data for
reevaluation if errors are detected. For instance, in an example pipeline below, `Multiplexer` helps create
a schema valid JSON object (given a person's data) with the help of an `OpenAIChatGenerator` and a `JsonSchemaValidator`.
In case the generated JSON object fails schema validation, `JsonSchemaValidator` starts a correction loop, sending
the data back through the `Multiplexer` to the `OpenAIChatGenerator` until it passes schema validation. If we didn't
have `Multiplexer`, we wouldn't be able to loop back the data to `OpenAIChatGenerator` for re-generation, as components
accept only one input connection for the declared run method parameters.
Example usage:
Usage example:
```python
>>> mp = Multiplexer(str)
>>> mp.run(value=["hello"])
{"value" : "hello"}
import json
from typing import List
>>> mp = Multiplexer(int)
>>> mp.run(value=[3])
{"value": 3}
from haystack import Pipeline
from haystack.components.converters import OutputAdapter
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.others import Multiplexer
from haystack.components.validators import JsonSchemaValidator
from haystack.dataclasses import ChatMessage
person_schema = {
"type": "object",
"properties": {
"first_name": {"type": "string", "pattern": "^[A-Z][a-z]+$"},
"last_name": {"type": "string", "pattern": "^[A-Z][a-z]+$"},
"nationality": {"type": "string", "enum": ["Italian", "Portuguese", "American"]},
},
"required": ["first_name", "last_name", "nationality"]
}
# Initialize a pipeline
pipe = Pipeline()
# Add components to the pipeline
pipe.add_component('mx', Multiplexer(List[ChatMessage]))
pipe.add_component('fc_llm', OpenAIChatGenerator(model="gpt-3.5-turbo-0125"))
pipe.add_component('validator', JsonSchemaValidator(json_schema=person_schema))
pipe.add_component('adapter', OutputAdapter("{{chat_message}}", List[ChatMessage])),
# And connect them
pipe.connect("adapter", "mx")
pipe.connect("mx", "fc_llm")
pipe.connect("fc_llm.replies", "validator.messages")
pipe.connect("validator.validation_error", "mx")
result = pipe.run(data={"fc_llm": {"generation_kwargs": {"response_format": {"type": "json_object"}}},
"adapter": {"chat_message": [ChatMessage.from_user("Create json object from Peter Parker")]}})
print(json.loads(result["validator"]["validated"][0].content))
>> {'first_name': 'Peter', 'last_name': 'Parker', 'nationality': 'American', 'name': 'Spider-Man', 'occupation':
>> 'Superhero', 'age': 23, 'location': 'New York City'}
```
This component won't handle several inputs at the same time: it always only expects one.
If more than one input is received when run is invoked, it will raise a ValueError.
Note that `Multiplexer` is created with a single type parameter. This determines the
type of data that `Multiplexer` will receive from the upstream connected components and also the
type of data that `Multiplexer` will distribute to the downstream connected components. In the example
above, the `Multiplexer` is created with the type `List[ChatMessage]`. This means `Multiplexer` will receive
a list of `ChatMessage` objects from the upstream connected components and also distribute a list of `ChatMessage`
objects to the downstream connected components.
```python
>>> mp = Multiplexer(int)
>>> mp.run([2, 4])
ValueError: Multiplexer expects only one input, but 2 were received.
>>> mp = Multiplexer(int)
>>> mp.run([2, None])
ValueError: Multiplexer expects only one input, but 2 were received.
```
In the code example, `Multiplexer` receives a looped back `List[ChatMessage]` from the `JsonSchemaValidator` and
sends it down to the `OpenAIChatGenerator` for re-generation. We can have multiple loop back connections in the
pipeline. In this instance, the downstream component is only one the `OpenAIChatGenerator` but the pipeline can have more
than one downstream component.
"""
def __init__(self, type_: TypeAlias):
"""
Create a `Multiplexer` component.
:param type_: The type of data that the `Multiplexer` will receive from the upstream connected components and
distribute to the downstream connected components.
"""
self.type_ = type_
component.set_input_types(self, value=Variadic[type_])
component.set_output_types(self, value=type_)
def to_dict(self):
"""
Serializes the component to a dictionary.
:returns:
Dictionary with serialized data.
"""
return default_to_dict(self, type_=serialize_type(self.type_))
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Multiplexer":
"""
Deserializes the component from a dictionary.
:param data:
Dictionary to deserialize from.
:returns:
Deserialized component.
"""
data["init_parameters"]["type_"] = deserialize_type(data["init_parameters"]["type_"])
return default_from_dict(cls, data)
def run(self, **kwargs):
"""
Multiplexes the input data from the upstream connected components and distributes it to the downstream connected
components.
:param **kwargs: The input data. Must be of the type declared in `__init__`.
:return: A dictionary with the following keys:
- `value`: The input data.
"""
if (inputs_count := len(kwargs["value"])) != 1:
raise ValueError(f"Multiplexer expects only one input, but {inputs_count} were received.")
return {"value": kwargs["value"][0]}