mirror of
https://github.com/deepset-ai/haystack.git
synced 2025-11-02 02:39:51 +00:00
refactor: Rename Pipeline.connect() arguments (#6919)
* Rename Pipeline.connect() arguments * Add release notes
This commit is contained in:
parent
95dce5b0ec
commit
b875eda4af
@ -154,7 +154,7 @@ class Pipeline:
|
||||
raise PipelineError(f"Missing sender in connection: {connection}")
|
||||
if "receiver" not in connection:
|
||||
raise PipelineError(f"Missing receiver in connection: {connection}")
|
||||
pipe.connect(connect_from=connection["sender"], connect_to=connection["receiver"])
|
||||
pipe.connect(sender=connection["sender"], receiver=connection["receiver"])
|
||||
|
||||
return pipe
|
||||
|
||||
@ -210,16 +210,16 @@ class Pipeline:
|
||||
visits=0,
|
||||
)
|
||||
|
||||
def connect(self, connect_from: str, connect_to: str) -> "Pipeline":
|
||||
def connect(self, sender: str, receiver: str) -> "Pipeline":
|
||||
"""
|
||||
Connects two components together. All components to connect must exist in the pipeline.
|
||||
If connecting to an component that has several output connections, specify the inputs and output names as
|
||||
'component_name.connections_name'.
|
||||
|
||||
Args:
|
||||
connect_from: the component that delivers the value. This can be either just a component name or can be
|
||||
sender: the component that delivers the value. This can be either just a component name or can be
|
||||
in the format `component_name.connection_name` if the component has multiple outputs.
|
||||
connect_to: the component that receives the value. This can be either just a component name or can be
|
||||
receiver: the component that receives the value. This can be either just a component name or can be
|
||||
in the format `component_name.connection_name` if the component has multiple inputs.
|
||||
|
||||
Returns:
|
||||
@ -230,18 +230,18 @@ class Pipeline:
|
||||
not present in the pipeline, or the connections don't match by type, and so on).
|
||||
"""
|
||||
# Edges may be named explicitly by passing 'node_name.edge_name' to connect().
|
||||
sender, sender_socket_name = parse_connect_string(connect_from)
|
||||
receiver, receiver_socket_name = parse_connect_string(connect_to)
|
||||
sender_component_name, sender_socket_name = parse_connect_string(sender)
|
||||
receiver_component_name, receiver_socket_name = parse_connect_string(receiver)
|
||||
|
||||
# Get the nodes data.
|
||||
try:
|
||||
from_sockets = self.graph.nodes[sender]["output_sockets"]
|
||||
from_sockets = self.graph.nodes[sender_component_name]["output_sockets"]
|
||||
except KeyError as exc:
|
||||
raise ValueError(f"Component named {sender} not found in the pipeline.") from exc
|
||||
raise ValueError(f"Component named {sender_component_name} not found in the pipeline.") from exc
|
||||
try:
|
||||
to_sockets = self.graph.nodes[receiver]["input_sockets"]
|
||||
to_sockets = self.graph.nodes[receiver_component_name]["input_sockets"]
|
||||
except KeyError as exc:
|
||||
raise ValueError(f"Component named {receiver} not found in the pipeline.") from exc
|
||||
raise ValueError(f"Component named {receiver_component_name} not found in the pipeline.") from exc
|
||||
|
||||
# If the name of either socket is given, get the socket
|
||||
sender_socket: Optional[OutputSocket] = None
|
||||
@ -249,8 +249,8 @@ class Pipeline:
|
||||
sender_socket = from_sockets.get(sender_socket_name)
|
||||
if not sender_socket:
|
||||
raise PipelineConnectError(
|
||||
f"'{connect_from} does not exist. "
|
||||
f"Output connections of {sender} are: "
|
||||
f"'{sender} does not exist. "
|
||||
f"Output connections of {sender_component_name} are: "
|
||||
+ ", ".join([f"{name} (type {_type_name(socket.type)})" for name, socket in from_sockets.items()])
|
||||
)
|
||||
|
||||
@ -259,8 +259,8 @@ class Pipeline:
|
||||
receiver_socket = to_sockets.get(receiver_socket_name)
|
||||
if not receiver_socket:
|
||||
raise PipelineConnectError(
|
||||
f"'{connect_to} does not exist. "
|
||||
f"Input connections of {receiver} are: "
|
||||
f"'{receiver} does not exist. "
|
||||
f"Input connections of {receiver_component_name} are: "
|
||||
+ ", ".join([f"{name} (type {_type_name(socket.type)})" for name, socket in to_sockets.items()])
|
||||
)
|
||||
|
||||
@ -280,9 +280,9 @@ class Pipeline:
|
||||
|
||||
# We need this status for error messages, since we might need it in multiple places we calculate it here
|
||||
status = _connections_status(
|
||||
sender_node=sender,
|
||||
sender_node=sender_component_name,
|
||||
sender_sockets=sender_socket_candidates,
|
||||
receiver_node=receiver,
|
||||
receiver_node=receiver_component_name,
|
||||
receiver_sockets=receiver_socket_candidates,
|
||||
)
|
||||
|
||||
@ -290,11 +290,14 @@ class Pipeline:
|
||||
# There's no possible connection between these two components
|
||||
if len(sender_socket_candidates) == len(receiver_socket_candidates) == 1:
|
||||
msg = (
|
||||
f"Cannot connect '{sender}.{sender_socket_candidates[0].name}' with '{receiver}.{receiver_socket_candidates[0].name}': "
|
||||
f"Cannot connect '{sender_component_name}.{sender_socket_candidates[0].name}' with '{receiver_component_name}.{receiver_socket_candidates[0].name}': "
|
||||
f"their declared input and output types do not match.\n{status}"
|
||||
)
|
||||
else:
|
||||
msg = f"Cannot connect '{sender}' with '{receiver}': " f"no matching connections available.\n{status}"
|
||||
msg = (
|
||||
f"Cannot connect '{sender_component_name}' with '{receiver_component_name}': "
|
||||
f"no matching connections available.\n{status}"
|
||||
)
|
||||
raise PipelineConnectError(msg)
|
||||
|
||||
if len(possible_connections) == 1:
|
||||
@ -310,10 +313,10 @@ class Pipeline:
|
||||
if len(name_matches) != 1:
|
||||
# There's are either no matches or more than one, we can't pick one reliably
|
||||
msg = (
|
||||
f"Cannot connect '{sender}' with '{receiver}': more than one connection is possible "
|
||||
f"Cannot connect '{sender_component_name}' with '{receiver_component_name}': more than one connection is possible "
|
||||
"between these components. Please specify the connection name, like: "
|
||||
f"pipeline.connect('{sender}.{possible_connections[0][0].name}', "
|
||||
f"'{receiver}.{possible_connections[0][1].name}').\n{status}"
|
||||
f"pipeline.connect('{sender_component_name}.{possible_connections[0][0].name}', "
|
||||
f"'{receiver_component_name}.{possible_connections[0][1].name}').\n{status}"
|
||||
)
|
||||
raise PipelineConnectError(msg)
|
||||
|
||||
@ -322,41 +325,47 @@ class Pipeline:
|
||||
receiver_socket = name_matches[0][1]
|
||||
|
||||
# Connection must be valid on both sender/receiver sides
|
||||
if not sender_socket or not receiver_socket or not sender or not receiver:
|
||||
if sender and sender_socket:
|
||||
sender_repr = f"{sender}.{sender_socket.name} ({_type_name(sender_socket.type)})"
|
||||
if not sender_socket or not receiver_socket or not sender_component_name or not receiver_component_name:
|
||||
if sender_component_name and sender_socket:
|
||||
sender_repr = f"{sender_component_name}.{sender_socket.name} ({_type_name(sender_socket.type)})"
|
||||
else:
|
||||
sender_repr = "input needed"
|
||||
|
||||
if receiver and receiver_socket:
|
||||
receiver_repr = f"({_type_name(receiver_socket.type)}) {receiver}.{receiver_socket.name}"
|
||||
if receiver_component_name and receiver_socket:
|
||||
receiver_repr = f"({_type_name(receiver_socket.type)}) {receiver_component_name}.{receiver_socket.name}"
|
||||
else:
|
||||
receiver_repr = "output"
|
||||
msg = f"Connection must have both sender and receiver: {sender_repr} -> {receiver_repr}"
|
||||
raise PipelineConnectError(msg)
|
||||
|
||||
logger.debug("Connecting '%s.%s' to '%s.%s'", sender, sender_socket.name, receiver, receiver_socket.name)
|
||||
logger.debug(
|
||||
"Connecting '%s.%s' to '%s.%s'",
|
||||
sender_component_name,
|
||||
sender_socket.name,
|
||||
receiver_component_name,
|
||||
receiver_socket.name,
|
||||
)
|
||||
|
||||
if receiver in sender_socket.receivers and sender in receiver_socket.senders:
|
||||
if receiver_component_name in sender_socket.receivers and sender_component_name in receiver_socket.senders:
|
||||
# This is already connected, nothing to do
|
||||
return self
|
||||
|
||||
if receiver_socket.senders and not receiver_socket.is_variadic:
|
||||
# Only variadic input sockets can receive from multiple senders
|
||||
msg = (
|
||||
f"Cannot connect '{sender}.{sender_socket.name}' with '{receiver}.{receiver_socket.name}': "
|
||||
f"{receiver}.{receiver_socket.name} is already connected to {receiver_socket.senders}.\n"
|
||||
f"Cannot connect '{sender_component_name}.{sender_socket.name}' with '{receiver_component_name}.{receiver_socket.name}': "
|
||||
f"{receiver_component_name}.{receiver_socket.name} is already connected to {receiver_socket.senders}.\n"
|
||||
)
|
||||
raise PipelineConnectError(msg)
|
||||
|
||||
# Update the sockets with the new connection
|
||||
sender_socket.receivers.append(receiver)
|
||||
receiver_socket.senders.append(sender)
|
||||
sender_socket.receivers.append(receiver_component_name)
|
||||
receiver_socket.senders.append(sender_component_name)
|
||||
|
||||
# Create the new connection
|
||||
self.graph.add_edge(
|
||||
sender,
|
||||
receiver,
|
||||
sender_component_name,
|
||||
receiver_component_name,
|
||||
key=f"{sender_socket.name}/{receiver_socket.name}",
|
||||
conn_type=_type_name(sender_socket.type),
|
||||
from_socket=sender_socket,
|
||||
|
||||
@ -0,0 +1,10 @@
|
||||
---
|
||||
upgrade:
|
||||
- |
|
||||
`Pipeline.connect()` arguments have renamed for clarity. This is a breaking change if `connect` was called
|
||||
with keyword arguments only.
|
||||
`connect_from` and `connect_to` arguments have been renamed respectively to `sender` and `receiver`.
|
||||
The behaviour of `Pipeline.connect()` is not changed.
|
||||
features:
|
||||
- |
|
||||
Rename `Pipeline.connect()` arguments for clarity
|
||||
Loading…
x
Reference in New Issue
Block a user