diff --git a/haystack/core/pipeline/pipeline.py b/haystack/core/pipeline/pipeline.py index d996a1181..eb38e4dc1 100644 --- a/haystack/core/pipeline/pipeline.py +++ b/haystack/core/pipeline/pipeline.py @@ -154,7 +154,7 @@ class Pipeline: raise PipelineError(f"Missing sender in connection: {connection}") if "receiver" not in connection: raise PipelineError(f"Missing receiver in connection: {connection}") - pipe.connect(connect_from=connection["sender"], connect_to=connection["receiver"]) + pipe.connect(sender=connection["sender"], receiver=connection["receiver"]) return pipe @@ -210,16 +210,16 @@ class Pipeline: visits=0, ) - def connect(self, connect_from: str, connect_to: str) -> "Pipeline": + def connect(self, sender: str, receiver: str) -> "Pipeline": """ Connects two components together. All components to connect must exist in the pipeline. If connecting to an component that has several output connections, specify the inputs and output names as 'component_name.connections_name'. Args: - connect_from: the component that delivers the value. This can be either just a component name or can be + sender: the component that delivers the value. This can be either just a component name or can be in the format `component_name.connection_name` if the component has multiple outputs. - connect_to: the component that receives the value. This can be either just a component name or can be + receiver: the component that receives the value. This can be either just a component name or can be in the format `component_name.connection_name` if the component has multiple inputs. Returns: @@ -230,18 +230,18 @@ class Pipeline: not present in the pipeline, or the connections don't match by type, and so on). """ # Edges may be named explicitly by passing 'node_name.edge_name' to connect(). - sender, sender_socket_name = parse_connect_string(connect_from) - receiver, receiver_socket_name = parse_connect_string(connect_to) + sender_component_name, sender_socket_name = parse_connect_string(sender) + receiver_component_name, receiver_socket_name = parse_connect_string(receiver) # Get the nodes data. try: - from_sockets = self.graph.nodes[sender]["output_sockets"] + from_sockets = self.graph.nodes[sender_component_name]["output_sockets"] except KeyError as exc: - raise ValueError(f"Component named {sender} not found in the pipeline.") from exc + raise ValueError(f"Component named {sender_component_name} not found in the pipeline.") from exc try: - to_sockets = self.graph.nodes[receiver]["input_sockets"] + to_sockets = self.graph.nodes[receiver_component_name]["input_sockets"] except KeyError as exc: - raise ValueError(f"Component named {receiver} not found in the pipeline.") from exc + raise ValueError(f"Component named {receiver_component_name} not found in the pipeline.") from exc # If the name of either socket is given, get the socket sender_socket: Optional[OutputSocket] = None @@ -249,8 +249,8 @@ class Pipeline: sender_socket = from_sockets.get(sender_socket_name) if not sender_socket: raise PipelineConnectError( - f"'{connect_from} does not exist. " - f"Output connections of {sender} are: " + f"'{sender} does not exist. " + f"Output connections of {sender_component_name} are: " + ", ".join([f"{name} (type {_type_name(socket.type)})" for name, socket in from_sockets.items()]) ) @@ -259,8 +259,8 @@ class Pipeline: receiver_socket = to_sockets.get(receiver_socket_name) if not receiver_socket: raise PipelineConnectError( - f"'{connect_to} does not exist. " - f"Input connections of {receiver} are: " + f"'{receiver} does not exist. " + f"Input connections of {receiver_component_name} are: " + ", ".join([f"{name} (type {_type_name(socket.type)})" for name, socket in to_sockets.items()]) ) @@ -280,9 +280,9 @@ class Pipeline: # We need this status for error messages, since we might need it in multiple places we calculate it here status = _connections_status( - sender_node=sender, + sender_node=sender_component_name, sender_sockets=sender_socket_candidates, - receiver_node=receiver, + receiver_node=receiver_component_name, receiver_sockets=receiver_socket_candidates, ) @@ -290,11 +290,14 @@ class Pipeline: # There's no possible connection between these two components if len(sender_socket_candidates) == len(receiver_socket_candidates) == 1: msg = ( - f"Cannot connect '{sender}.{sender_socket_candidates[0].name}' with '{receiver}.{receiver_socket_candidates[0].name}': " + f"Cannot connect '{sender_component_name}.{sender_socket_candidates[0].name}' with '{receiver_component_name}.{receiver_socket_candidates[0].name}': " f"their declared input and output types do not match.\n{status}" ) else: - msg = f"Cannot connect '{sender}' with '{receiver}': " f"no matching connections available.\n{status}" + msg = ( + f"Cannot connect '{sender_component_name}' with '{receiver_component_name}': " + f"no matching connections available.\n{status}" + ) raise PipelineConnectError(msg) if len(possible_connections) == 1: @@ -310,10 +313,10 @@ class Pipeline: if len(name_matches) != 1: # There's are either no matches or more than one, we can't pick one reliably msg = ( - f"Cannot connect '{sender}' with '{receiver}': more than one connection is possible " + f"Cannot connect '{sender_component_name}' with '{receiver_component_name}': more than one connection is possible " "between these components. Please specify the connection name, like: " - f"pipeline.connect('{sender}.{possible_connections[0][0].name}', " - f"'{receiver}.{possible_connections[0][1].name}').\n{status}" + f"pipeline.connect('{sender_component_name}.{possible_connections[0][0].name}', " + f"'{receiver_component_name}.{possible_connections[0][1].name}').\n{status}" ) raise PipelineConnectError(msg) @@ -322,41 +325,47 @@ class Pipeline: receiver_socket = name_matches[0][1] # Connection must be valid on both sender/receiver sides - if not sender_socket or not receiver_socket or not sender or not receiver: - if sender and sender_socket: - sender_repr = f"{sender}.{sender_socket.name} ({_type_name(sender_socket.type)})" + if not sender_socket or not receiver_socket or not sender_component_name or not receiver_component_name: + if sender_component_name and sender_socket: + sender_repr = f"{sender_component_name}.{sender_socket.name} ({_type_name(sender_socket.type)})" else: sender_repr = "input needed" - if receiver and receiver_socket: - receiver_repr = f"({_type_name(receiver_socket.type)}) {receiver}.{receiver_socket.name}" + if receiver_component_name and receiver_socket: + receiver_repr = f"({_type_name(receiver_socket.type)}) {receiver_component_name}.{receiver_socket.name}" else: receiver_repr = "output" msg = f"Connection must have both sender and receiver: {sender_repr} -> {receiver_repr}" raise PipelineConnectError(msg) - logger.debug("Connecting '%s.%s' to '%s.%s'", sender, sender_socket.name, receiver, receiver_socket.name) + logger.debug( + "Connecting '%s.%s' to '%s.%s'", + sender_component_name, + sender_socket.name, + receiver_component_name, + receiver_socket.name, + ) - if receiver in sender_socket.receivers and sender in receiver_socket.senders: + if receiver_component_name in sender_socket.receivers and sender_component_name in receiver_socket.senders: # This is already connected, nothing to do return self if receiver_socket.senders and not receiver_socket.is_variadic: # Only variadic input sockets can receive from multiple senders msg = ( - f"Cannot connect '{sender}.{sender_socket.name}' with '{receiver}.{receiver_socket.name}': " - f"{receiver}.{receiver_socket.name} is already connected to {receiver_socket.senders}.\n" + f"Cannot connect '{sender_component_name}.{sender_socket.name}' with '{receiver_component_name}.{receiver_socket.name}': " + f"{receiver_component_name}.{receiver_socket.name} is already connected to {receiver_socket.senders}.\n" ) raise PipelineConnectError(msg) # Update the sockets with the new connection - sender_socket.receivers.append(receiver) - receiver_socket.senders.append(sender) + sender_socket.receivers.append(receiver_component_name) + receiver_socket.senders.append(sender_component_name) # Create the new connection self.graph.add_edge( - sender, - receiver, + sender_component_name, + receiver_component_name, key=f"{sender_socket.name}/{receiver_socket.name}", conn_type=_type_name(sender_socket.type), from_socket=sender_socket, diff --git a/releasenotes/notes/rename-connect-arguments-2d99d9d5cbe9ab4c.yaml b/releasenotes/notes/rename-connect-arguments-2d99d9d5cbe9ab4c.yaml new file mode 100644 index 000000000..8d242c5b6 --- /dev/null +++ b/releasenotes/notes/rename-connect-arguments-2d99d9d5cbe9ab4c.yaml @@ -0,0 +1,10 @@ +--- +upgrade: + - | + `Pipeline.connect()` arguments have renamed for clarity. This is a breaking change if `connect` was called + with keyword arguments only. + `connect_from` and `connect_to` arguments have been renamed respectively to `sender` and `receiver`. + The behaviour of `Pipeline.connect()` is not changed. +features: + - | + Rename `Pipeline.connect()` arguments for clarity