diff --git a/haystack/core/component/sockets.py b/haystack/core/component/sockets.py index 3da03e4ea..3cc84deac 100644 --- a/haystack/core/component/sockets.py +++ b/haystack/core/component/sockets.py @@ -17,13 +17,18 @@ SocketsIOType = Union[Type[InputSocket], Type[OutputSocket]] class Sockets: """ - This class is used to represent the inputs or outputs of a `Component`. + Represents the inputs or outputs of a `Component`. + Depending on the type passed to the constructor, it will represent either the inputs or the outputs of the `Component`. Usage: ```python + from typing import Any from haystack.components.builders.prompt_builder import PromptBuilder + from haystack.core.component.sockets import Sockets + from haystack.core.component.types import InputSocket, OutputSocket + prompt_template = \""" Given these documents, answer the question.\nDocuments: @@ -37,9 +42,9 @@ class Sockets: prompt_builder = PromptBuilder(template=prompt_template) sockets = {"question": InputSocket("question", Any), "documents": InputSocket("documents", Any)} - inputs = Sockets(component=prompt_builder, sockets=sockets, sockets_type=InputSocket) + inputs = Sockets(component=prompt_builder, sockets_dict=sockets, sockets_io_type=InputSocket) inputs - >>> PromptBuilder inputs: + >>> Inputs: >>> - question: Any >>> - documents: Any @@ -57,10 +62,18 @@ class Sockets: ): """ Create a new Sockets object. + We don't do any enforcement on the types of the sockets here, the `sockets_type` is only used for the `__repr__` method. We could do without it and use the type of a random value in the `sockets` dict, but that wouldn't work for components that have no sockets at all. Either input or output. + + :param component: + The component that these sockets belong to. + :param sockets_dict: + A dictionary of sockets. + :param sockets_io_type: + The type of the sockets. """ self._sockets_io_type = sockets_io_type self._component = component diff --git a/haystack/core/component/types.py b/haystack/core/component/types.py index 663bccf4f..2c05e571c 100644 --- a/haystack/core/component/types.py +++ b/haystack/core/component/types.py @@ -22,6 +22,21 @@ class _empty: @dataclass class InputSocket: + """ + Represents an input of a `Component`. + + :param name: + The name of the input. + :param type: + The type of the input. + :param default_value: + The default value of the input. If not set, the input is mandatory. + :param is_variadic: + Whether the input is variadic or not. + :param senders: + The list of components that send data to this input. + """ + name: str type: Type default_value: Any = _empty @@ -58,6 +73,17 @@ class InputSocket: @dataclass class OutputSocket: + """ + Represents an output of a `Component`. + + :param name: + The name of the output. + :param type: + The type of the output. + :param receivers: + The list of components that receive the output of this component. + """ + name: str type: type receivers: List[str] = field(default_factory=list) diff --git a/haystack/core/pipeline/pipeline.py b/haystack/core/pipeline/pipeline.py index e5585b361..0d70b5be0 100644 --- a/haystack/core/pipeline/pipeline.py +++ b/haystack/core/pipeline/pipeline.py @@ -57,12 +57,14 @@ class Pipeline: """ Creates the Pipeline. - Args: - metadata: arbitrary dictionary to store metadata about this pipeline. Make sure all the values contained in - this dictionary can be serialized and deserialized if you wish to save this pipeline to file with - `save_pipelines()/load_pipelines()`. - max_loops_allowed: how many times the pipeline can run the same node before throwing an exception. - debug_path: when debug is enabled in `run()`, where to save the debug data. + :param metadata: + Arbitrary dictionary to store metadata about this pipeline. Make sure all the values contained in + this dictionary can be serialized and deserialized if you wish to save this pipeline to file with + `save_pipelines()/load_pipelines()`. + :param max_loops_allowed: + How many times the pipeline can run the same node before throwing an exception. + :param debug_path: + When debug is enabled in `run()`, where to save the debug data. """ self._telemetry_runs = 0 self._last_telemetry_sent: Optional[datetime] = None @@ -111,8 +113,11 @@ class Pipeline: def to_dict(self) -> Dict[str, Any]: """ - Returns this Pipeline instance as a dictionary. + Serializes the pipeline to a dictionary. This is meant to be an intermediate representation but it can be also used to save a pipeline to file. + + :returns: + Dictionary with serialized data. """ components = {} for name, instance in self.graph.nodes(data="instance"): # type:ignore @@ -133,34 +138,14 @@ class Pipeline: @classmethod def from_dict(cls: Type[T], data: Dict[str, Any], **kwargs) -> T: """ - Creates a Pipeline instance from a dictionary. - A sample `data` dictionary could be formatted like so: - ``` - { - "metadata": {"test": "test"}, - "max_loops_allowed": 100, - "components": { - "add_two": { - "type": "AddFixedValue", - "init_parameters": {"add": 2}, - }, - "add_default": { - "type": "AddFixedValue", - "init_parameters": {"add": 1}, - }, - "double": { - "type": "Double", - }, - }, - "connections": [ - {"sender": "add_two.result", "receiver": "double.value"}, - {"sender": "double.value", "receiver": "add_default.value"}, - ], - } - ``` + Deserializes the pipeline from a dictionary. - Supported kwargs: - `components`: a dictionary of {name: instance} to reuse instances of components instead of creating new ones. + :param data: + Dictionary to deserialize from. + :param kwargs: + `components`: a dictionary of {name: instance} to reuse instances of components instead of creating new ones. + :returns: + Deserialized component. """ metadata = data.get("metadata", {}) max_loops_allowed = data.get("max_loops_allowed", 100) @@ -209,10 +194,10 @@ class Pipeline: Returns the string representation of this pipeline according to the format dictated by the `Marshaller` in use. - :params marshaller: The Marshaller used to create the string representation. Defaults to - `YamlMarshaller` - - :returns: A string representing the pipeline. + :param marshaller: + The Marshaller used to create the string representation. Defaults to `YamlMarshaller`. + :returns: + A string representing the pipeline. """ return marshaller.marshal(self.to_dict()) @@ -221,9 +206,10 @@ class Pipeline: Writes the string representation of this pipeline to the file-like object passed in the `fp` argument. - :params fp: A file-like object ready to be written to. - :params marshaller: The Marshaller used to create the string representation. Defaults to - `YamlMarshaller`. + :param fp: + A file-like object ready to be written to. + :param marshaller: + The Marshaller used to create the string representation. Defaults to `YamlMarshaller`. """ fp.write(marshaller.marshal(self.to_dict())) @@ -232,11 +218,12 @@ class Pipeline: """ Creates a `Pipeline` object from the string representation passed in the `data` argument. - :params data: The string representation of the pipeline, can be `str`, `bytes` or `bytearray`. - :params marshaller: the Marshaller used to create the string representation. Defaults to - `YamlMarshaller` - - :returns: A `Pipeline` object. + :param data: + The string representation of the pipeline, can be `str`, `bytes` or `bytearray`. + :param marshaller: + The Marshaller used to create the string representation. Defaults to `YamlMarshaller`. + :returns: + A `Pipeline` object. """ return cls.from_dict(marshaller.unmarshal(data)) @@ -246,32 +233,33 @@ class Pipeline: Creates a `Pipeline` object from the string representation read from the file-like object passed in the `fp` argument. - :params data: The string representation of the pipeline, can be `str`, `bytes` or `bytearray`. - :params fp: A file-like object ready to be read from. - :params marshaller: the Marshaller used to create the string representation. Defaults to - `YamlMarshaller` - - :returns: A `Pipeline` object. + :param data: + The string representation of the pipeline, can be `str`, `bytes` or `bytearray`. + :param fp: + A file-like object ready to be read from. + :param marshaller: + The Marshaller used to create the string representation. Defaults to `YamlMarshaller`. + :returns: + A `Pipeline` object. """ return cls.from_dict(marshaller.unmarshal(fp.read())) def add_component(self, name: str, instance: Component) -> None: """ - Create a component for the given component. Components are not connected to anything by default: - use `Pipeline.connect()` to connect components together. + Add the given component to the pipeline. + Components are not connected to anything by default: use `Pipeline.connect()` to connect components together. Component names must be unique, but component instances can be reused if needed. - Args: - name: the name of the component. - instance: the component instance. + :param name: + The name of the component to add. + :param instance: + The component instance to add. - Returns: - None - - Raises: - ValueError: if a component with the same name already exists - PipelineValidationError: if the given instance is not a Canals component + :raises ValueError: + If a component with the same name already exists. + :raises PipelineValidationError: + If the given instance is not a Canals component. """ # Component names are unique if name in self.graph.nodes: @@ -309,22 +297,24 @@ class Pipeline: def connect(self, sender: str, receiver: str) -> "Pipeline": """ - Connects two components together. All components to connect must exist in the pipeline. + Connects two components together. + + All components to connect must exist in the pipeline. If connecting to an component that has several output connections, specify the inputs and output names as 'component_name.connections_name'. - Args: - sender: the component that delivers the value. This can be either just a component name or can be - in the format `component_name.connection_name` if the component has multiple outputs. - receiver: the component that receives the value. This can be either just a component name or can be - in the format `component_name.connection_name` if the component has multiple inputs. + :param sender: + The component that delivers the value. This can be either just a component name or can be + in the format `component_name.connection_name` if the component has multiple outputs. + :param receiver: + The component that receives the value. This can be either just a component name or can be + in the format `component_name.connection_name` if the component has multiple inputs. + :returns: + The Pipeline instance. - Returns: - The Pipeline instance - - Raises: - PipelineConnectError: if the two components cannot be connected (for example if one of the components is - not present in the pipeline, or the connections don't match by type, and so on). + :raises PipelineConnectError: + If the two components cannot be connected (for example if one of the components is + not present in the pipeline, or the connections don't match by type, and so on). """ # Edges may be named explicitly by passing 'node_name.edge_name' to connect(). sender_component_name, sender_socket_name = parse_connect_string(sender) @@ -473,16 +463,15 @@ class Pipeline: def get_component(self, name: str) -> Component: """ - Returns an instance of a component. + Get the component with the specified name from the pipeline. - Args: - name: the name of the component - - Returns: + :param name: + The name of the component. + :returns: The instance of that component. - Raises: - ValueError: if a component with that name is not present in the pipeline. + :raises ValueError: + If a component with that name is not present in the pipeline. """ try: return self.graph.nodes[name]["instance"] @@ -491,8 +480,12 @@ class Pipeline: def get_component_name(self, instance: Component) -> str: """ - Returns the name of a Component instance. If the Component has not been added to this Pipeline, - returns an empty string. + Returns the name of the Component instance if it has been added to this Pipeline or an empty string otherwise. + + :param instance: + The Component instance to look for. + :returns: + The name of the Component instance. """ for name, inst in self.graph.nodes(data="instance"): if inst == instance: @@ -505,7 +498,7 @@ class Pipeline: corresponds to a component name, and its value is another dictionary that describes the input sockets of that component, including their types and whether they are optional. - Returns: + :returns: A dictionary where each key is a pipeline component name and each value is a dictionary of inputs sockets of that component. """ @@ -527,7 +520,7 @@ class Pipeline: corresponds to a component name, and its value is another dictionary that describes the output sockets of that component. - Returns: + :returns: A dictionary where each key is a pipeline component name and each value is a dictionary of output sockets of that component. """ @@ -556,6 +549,9 @@ class Pipeline: def draw(self, path: Path) -> None: """ Save an image representing this `Pipeline` to `path`. + + :param path: + The path to save the image to. """ # Before drawing we edit a bit the graph, to avoid modifying the original that is # used for running the pipeline we copy it. @@ -582,7 +578,11 @@ class Pipeline: * Each Component has only one input per input socket, if not variadic * Each Component doesn't receive inputs that are already sent by another Component - Raises ValueError if any of the above is not true. + :param data: + A dictionary of inputs for the pipeline's components. Each key is a component name. + + :raises ValueError: + If inputs are invalid according to the above. """ for component_name, component_inputs in data.items(): if component_name not in self.graph.nodes: @@ -613,11 +613,16 @@ class Pipeline: """ Runs the pipeline with given input data. - :param data: A dictionary of inputs for the pipeline's components. Each key is a component name - and its value is a dictionary of that component's input parameters. - :param debug: Set to True to collect and return debug information. - :return: A dictionary containing the pipeline's output. - :raises PipelineRuntimeError: If a component fails or returns unexpected output. + :param data: + A dictionary of inputs for the pipeline's components. Each key is a component name + and its value is a dictionary of that component's input parameters. + :param debug: + Set to True to collect and return debug information. + :returns: + A dictionary containing the pipeline's output. + + :raises PipelineRuntimeError: + If a component fails or returns unexpected output. Example a - Using named components: Consider a 'Hello' component that takes a 'word' input and outputs a greeting. @@ -955,12 +960,12 @@ class Pipeline: and its corresponding value. It distributes these inputs to the appropriate pipeline components based on their input requirements. Inputs that don't match any component's input slots are classified as unresolved. - :param data: A dictionary with input names as keys and input values as values. - :type data: Dict[str, Any] - :return: A tuple containing two elements: + :param data: + A dictionary with input names as keys and input values as values. + :returns: + A tuple containing two elements: 1. A dictionary mapping component names to their respective matched inputs. 2. A dictionary of inputs that were not matched to any component, termed as unresolved keyword arguments. - :rtype: Tuple[Dict[str, Dict[str, Any]], Dict[str, Any]] """ pipeline_input_data: Dict[str, Dict[str, Any]] = defaultdict(dict) unresolved_kwargs = {} @@ -991,9 +996,12 @@ class Pipeline: """ Create a Pipeline from a predefined template. See `PredefinedPipeline` for available options. - :param predefined_pipeline: The predefined pipeline to use. - :param template_params: An optional dictionary of parameters to use when rendering the pipeline template. - :returns: An instance of `Pipeline`. + :param predefined_pipeline: + The predefined pipeline to use. + :param template_params: + An optional dictionary of parameters to use when rendering the pipeline template. + :returns: + An instance of `Pipeline`. """ tpl = PipelineTemplate.from_predefined(predefined_pipeline) # If tpl.render() fails, we let bubble up the original error @@ -1036,7 +1044,12 @@ def _connections_status( def parse_connect_string(connection: str) -> Tuple[str, Optional[str]]: """ - Returns component-connection pairs from a connect_to/from string + Returns component-connection pairs from a connect_to/from string. + + :param connection: + The connection string. + :returns: + A tuple containing the component name and the connection name. """ if "." in connection: split_str = connection.split(".", maxsplit=1) diff --git a/haystack/core/pipeline/template.py b/haystack/core/pipeline/template.py index 8b1bc1b84..11c591a38 100644 --- a/haystack/core/pipeline/template.py +++ b/haystack/core/pipeline/template.py @@ -84,7 +84,7 @@ class PipelineTemplate: :param template_params: An optional dictionary of parameters to use when rendering the pipeline template. - :return: An instance of `Pipeline` constructed from the rendered template and custom component configurations. + :returns: An instance of `Pipeline` constructed from the rendered template and custom component configurations. """ template_params = template_params or {} return self._template.render(**template_params) @@ -94,7 +94,7 @@ class PipelineTemplate: """ Create a PipelineTemplate from a file. :param file_path: The path to the file containing the template. Must contain valid Jinja2 syntax. - :return: An instance of `PipelineTemplate `. + :returns: An instance of `PipelineTemplate`. """ with open(file_path, "r") as file: return cls(file.read()) @@ -104,7 +104,7 @@ class PipelineTemplate: """ Create a PipelineTemplate from a predefined template. See `PredefinedPipeline` for available options. :param predefined_pipeline: The predefined pipeline to use. - :return: An instance of `PipelineTemplate `. + :returns: An instance of `PipelineTemplate `. """ template_path = f"{TEMPLATE_HOME_DIR}/{predefined_pipeline.value}{TEMPLATE_FILE_EXTENSION}" return cls.from_file(template_path) diff --git a/haystack/core/serialization.py b/haystack/core/serialization.py index dbe5400bb..e2f4ba1f7 100644 --- a/haystack/core/serialization.py +++ b/haystack/core/serialization.py @@ -9,8 +9,16 @@ from haystack.core.errors import DeserializationError, SerializationError def component_to_dict(obj: Any) -> Dict[str, Any]: """ - The marshaller used by the Pipeline. If a `to_dict` method is present in the + Converts a component instance into a dictionary. If a `to_dict` method is present in the component instance, that will be used instead of the default method. + + :param obj: + The component to be serialized. + :returns: + A dictionary representation of the component. + + :raises SerializationError: + If the component doesn't have a `to_dict` method and the values of the init parameters can't be determined. """ if hasattr(obj, "to_dict"): return obj.to_dict() @@ -41,8 +49,15 @@ def component_to_dict(obj: Any) -> Dict[str, Any]: def component_from_dict(cls: Type[object], data: Dict[str, Any]) -> Any: """ - The unmarshaller used by the Pipeline. If a `from_dict` method is present in the - component instance, that will be used instead of the default method. + Creates a component instance from a dictionary. If a `from_dict` method is present in the + component class, that will be used instead of the default method. + + :param cls: + The class to be used for deserialization. + :param data: + The serialized data. + :returns: + The deserialized component. """ if hasattr(cls, "from_dict"): return cls.from_dict(data) @@ -80,6 +95,13 @@ def default_to_dict(obj: Any, **init_parameters) -> Dict[str, Any]: }, } ``` + + :param obj: + The object to be serialized. + :param init_parameters: + The parameters used to create a new instance of the class. + :returns: + A dictionary representation of the instance. """ return {"type": f"{obj.__class__.__module__}.{obj.__class__.__name__}", "init_parameters": init_parameters} @@ -94,6 +116,16 @@ def default_from_dict(cls: Type[object], data: Dict[str, Any]) -> Any: If `data` contains an `init_parameters` field it will be used as parameters to create a new instance of `cls`. + + :param cls: + The class to be used for deserialization. + :param data: + The serialized data. + :returns: + The deserialized object. + + :raises DeserializationError: + If the `type` field in `data` is missing or it doesn't match the type of `cls`. """ init_params = data.get("init_parameters", {}) if "type" not in data: