add activation group for workflow with multiple cycles (#6711)

## Why are these changes needed?
1. problem
When the GraphFlowManager encounters cycles, it tracks remaining
indegree counts for the node's activation. However, this tracking
mechanism has a flaw when dealing with cycles. When a node first enters
a cycle, the GraphFlowManager evaluates all remaining incoming edges,
including those that loop back to the origin node. If the activation
prerequisites are not satisfied at that moment, the workflow will
eventually finish because the _remaining counter never reaches zero,
preventing the select_speaker() method from selecting any agents for
execution.
2. solution
change activation map to 2 layer for ditinguish remaining inside
different cycle and outside the cycle.
add a activation group and policy property for edge, compute the
remaining map when GraphFlowManager is init and check the remaining map
with activation group to avoid checking the loop back edges
<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number

#6710

## Checks

- [x] I've included any doc changes needed for
<https://microsoft.github.io/autogen/>. See
<https://github.com/microsoft/autogen/blob/main/CONTRIBUTING.md> to
build and test documentation locally.
- [x] I've added tests (if relevant) corresponding to the changes
introduced in this PR.
- [x] I've made sure all auto checks have passed.
This commit is contained in:
Zen 2025-06-25 12:20:04 +08:00 committed by GitHub
parent c5b893d3f8
commit 9b8dc8d707
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 559 additions and 31 deletions

View File

@ -50,6 +50,25 @@ class DiGraphEdge(BaseModel):
# Using Field to exclude the condition in serialization if it's a callable
condition_function: Callable[[BaseChatMessage], bool] | None = Field(default=None, exclude=True)
activation_group: str = Field(default="")
"""Group identifier for forward dependencies.
When multiple edges point to the same target node, they are grouped by this field.
This allows distinguishing between different cycles or dependency patterns.
Example: In a graph containing a cycle like A->B->C->B, the two edges pointing to B (A->B and C->B)
can be in different activation groups to control how B is activated.
Defaults to the target node name if not specified.
"""
activation_condition: Literal["all", "any"] = "all"
"""Determines how forward dependencies within the same activation_group are evaluated.
- "all": All edges in this activation group must be satisfied before the target node can execute
- "any": Any single edge in this activation group being satisfied allows the target node to execute
This is used to handle complex dependency patterns in cyclic graphs where multiple
paths can lead to the same target node.
"""
@model_validator(mode="after")
def _validate_condition(self) -> "DiGraphEdge":
@ -59,6 +78,11 @@ class DiGraphEdge(BaseModel):
# For serialization purposes, we'll set the condition to None
# when storing as a pydantic model/dict
object.__setattr__(self, "condition", None)
# Set activation_group to target if not already set
if not self.activation_group:
self.activation_group = self.target
return self
def check_condition(self, message: BaseChatMessage) -> bool:
@ -112,7 +136,6 @@ class DiGraph(BaseModel):
parents: Dict[str, List[str]] = {node: [] for node in self.nodes}
for node in self.nodes.values():
for edge in node.edges:
if edge.target != node.name:
parents[edge.target].append(node.name)
return parents
@ -206,8 +229,79 @@ class DiGraph(BaseModel):
if has_condition and has_unconditioned:
raise ValueError(f"Node '{node.name}' has a mix of conditional and unconditional edges.")
# Validate activation conditions across all edges in the graph
self._validate_activation_conditions()
self._has_cycles = self.has_cycles_with_exit()
def _validate_activation_conditions(self) -> None:
"""Validate that all edges pointing to the same target node have consistent activation_condition values.
Raises:
ValueError: If edges pointing to the same target have different activation_condition values
"""
target_activation_conditions: Dict[str, Dict[str, str]] = {} # target_node -> {activation_group -> condition}
for node in self.nodes.values():
for edge in node.edges:
target = edge.target # The target node this edge points to
activation_group = edge.activation_group
if target not in target_activation_conditions:
target_activation_conditions[target] = {}
if activation_group in target_activation_conditions[target]:
if target_activation_conditions[target][activation_group] != edge.activation_condition:
# Find the source node that has the conflicting condition
conflicting_source = self._find_edge_source_by_target_and_group(
target, activation_group, target_activation_conditions[target][activation_group]
)
raise ValueError(
f"Conflicting activation conditions for target '{target}' group '{activation_group}': "
f"'{target_activation_conditions[target][activation_group]}' (from node '{conflicting_source}') "
f"and '{edge.activation_condition}' (from node '{node.name}')"
)
else:
target_activation_conditions[target][activation_group] = edge.activation_condition
def _find_edge_source_by_target_and_group(
self, target: str, activation_group: str, activation_condition: str
) -> str:
"""Find the source node that has an edge pointing to the given target with the given activation_group and activation_condition."""
for node_name, node in self.nodes.items():
for edge in node.edges:
if (
edge.target == target
and edge.activation_group == activation_group
and edge.activation_condition == activation_condition
):
return node_name
return "unknown"
def get_remaining_map(self) -> Dict[str, Dict[str, int]]:
"""Get the remaining map that tracks how many edges point to each target node with each activation group.
Returns:
Dictionary mapping target nodes to their activation groups and remaining counts
"""
remaining_map: Dict[str, Dict[str, int]] = {}
for node in self.nodes.values():
for edge in node.edges:
target = edge.target
activation_group = edge.activation_group
if target not in remaining_map:
remaining_map[target] = {}
if activation_group not in remaining_map[target]:
remaining_map[target][activation_group] = 0
remaining_map[target][activation_group] += 1
return remaining_map
class GraphFlowManagerState(BaseGroupChatManagerState):
"""Tracks active execution state for DAG-based execution."""
@ -254,18 +348,51 @@ class GraphFlowManager(BaseGroupChatManager):
self._parents = graph.get_parents()
# Lookup table for outgoing edges for each node.
self._edges: Dict[str, List[DiGraphEdge]] = {n: node.edges for n, node in graph.nodes.items()}
# Activation lookup table for each node.
self._activation: Dict[str, Literal["any", "all"]] = {n: node.activation for n, node in graph.nodes.items()}
# Build activation and enqueued_any lookup tables by collecting all edges and grouping by target node
self._build_lookup_tables(graph)
# Track which activation groups were triggered for each node
self._triggered_activation_groups: Dict[str, Set[str]] = {}
# === Mutable states for the graph execution ===
# Count the number of remaining parents to activate each node.
self._remaining: Counter[str] = Counter({n: len(p) for n, p in self._parents.items()})
# Lookup table for nodes that have been enqueued through an any activation.
# This is used to prevent re-adding the same node multiple times.
self._enqueued_any: Dict[str, bool] = {n: False for n in graph.nodes}
self._remaining: Dict[str, Counter[str]] = {
target: Counter(groups) for target, groups in graph.get_remaining_map().items()
}
# cache for remaining
self._origin_remaining: Dict[str, Dict[str, int]] = {
target: Counter(groups) for target, groups in self._remaining.items()
}
# Ready queue for nodes that are ready to execute, starting with the start nodes.
self._ready: Deque[str] = deque([n for n in graph.get_start_nodes()])
def _build_lookup_tables(self, graph: DiGraph) -> None:
"""Build activation and enqueued_any lookup tables by collecting all edges and grouping by target node.
Args:
graph: The directed graph
"""
self._activation: Dict[str, Dict[str, Literal["any", "all"]]] = {}
self._enqueued_any: Dict[str, Dict[str, bool]] = {}
for node in graph.nodes.values():
for edge in node.edges:
target = edge.target
activation_group = edge.activation_group
# Build activation lookup
if target not in self._activation:
self._activation[target] = {}
if activation_group not in self._activation[target]:
self._activation[target][activation_group] = edge.activation_condition
# Build enqueued_any lookup
if target not in self._enqueued_any:
self._enqueued_any[target] = {}
if activation_group not in self._enqueued_any[target]:
self._enqueued_any[target][activation_group] = False
async def update_message_thread(self, messages: Sequence[BaseAgentEvent | BaseChatMessage]) -> None:
await super().update_message_thread(messages)
@ -282,16 +409,55 @@ class GraphFlowManager(BaseGroupChatManager):
# Use the new check_condition method that handles both string and callable conditions
if not edge.check_condition(message):
continue
if self._activation[edge.target] == "all":
self._remaining[edge.target] -= 1
if self._remaining[edge.target] == 0:
target = edge.target
activation_group = edge.activation_group
if self._activation[target][activation_group] == "all":
self._remaining[target][activation_group] -= 1
if self._remaining[target][activation_group] == 0:
# If all parents are done, add to the ready queue.
self._ready.append(edge.target)
self._ready.append(target)
# Track which activation group was triggered
self._save_triggered_activation_group(target, activation_group)
else:
# If activation is any, add to the ready queue if not already enqueued.
if not self._enqueued_any[edge.target]:
self._ready.append(edge.target)
self._enqueued_any[edge.target] = True
if not self._enqueued_any[target][activation_group]:
self._ready.append(target)
self._enqueued_any[target][activation_group] = True
# Track which activation group was triggered
self._save_triggered_activation_group(target, activation_group)
def _save_triggered_activation_group(self, target: str, activation_group: str) -> None:
"""Save which activation group was triggered for a target node.
Args:
target: The target node that was triggered
activation_group: The activation group that caused the trigger
"""
if target not in self._triggered_activation_groups:
self._triggered_activation_groups[target] = set()
self._triggered_activation_groups[target].add(activation_group)
def _reset_triggered_activation_groups(self, speaker: str) -> None:
"""Reset the bookkeeping for the specific activation groups that were triggered for a speaker.
Args:
speaker: The speaker node to reset activation groups for
"""
if speaker not in self._triggered_activation_groups:
return
for activation_group in self._triggered_activation_groups[speaker]:
if self._activation[speaker][activation_group] == "any":
self._enqueued_any[speaker][activation_group] = False
else:
# Reset the remaining count for this activation group using the graph's original count
if speaker in self._remaining and activation_group in self._remaining[speaker]:
self._remaining[speaker][activation_group] = self._origin_remaining[speaker][activation_group]
# Clear the triggered activation groups for this speaker
self._triggered_activation_groups[speaker].clear()
async def select_speaker(self, thread: Sequence[BaseAgentEvent | BaseChatMessage]) -> List[str]:
# Drain the ready queue for the next set of speakers.
@ -299,11 +465,9 @@ class GraphFlowManager(BaseGroupChatManager):
while self._ready:
speaker = self._ready.popleft()
speakers.append(speaker)
# Reset the bookkeeping for the node that were selected.
if self._activation[speaker] == "any":
self._enqueued_any[speaker] = False
else:
self._remaining[speaker] = len(self._parents[speaker])
# Reset the bookkeeping for the specific activation groups that were triggered
self._reset_triggered_activation_groups(speaker)
# If there are no speakers, trigger the stop agent.
if not speakers:
@ -319,7 +483,7 @@ class GraphFlowManager(BaseGroupChatManager):
state = {
"message_thread": [message.dump() for message in self._message_thread],
"current_turn": self._current_turn,
"remaining": dict(self._remaining),
"remaining": {target: dict(counter) for target, counter in self._remaining.items()},
"enqueued_any": dict(self._enqueued_any),
"ready": list(self._ready),
}
@ -329,7 +493,7 @@ class GraphFlowManager(BaseGroupChatManager):
"""Restore execution state from saved data."""
self._message_thread = [self._message_factory.create(msg) for msg in state["message_thread"]]
self._current_turn = state["current_turn"]
self._remaining = Counter(state["remaining"])
self._remaining = {target: Counter(groups) for target, groups in state["remaining"].items()}
self._enqueued_any = state["enqueued_any"]
self._ready = deque(state["ready"])
@ -339,8 +503,8 @@ class GraphFlowManager(BaseGroupChatManager):
self._message_thread.clear()
if self._termination_condition:
await self._termination_condition.reset()
self._remaining = Counter({n: len(p) for n, p in self._parents.items()})
self._enqueued_any = {n: False for n in self._graph.nodes}
self._remaining = {target: Counter(groups) for target, groups in self._graph.get_remaining_map().items()}
self._enqueued_any = {n: {g: False for g in self._enqueued_any[n]} for n in self._enqueued_any}
self._ready = deque([n for n in self._graph.get_start_nodes()])

View File

@ -74,6 +74,23 @@ class DiGraphBuilder:
>>> builder.add_edge(agent_b, agent_a, condition=lambda msg: "loop" in msg.to_model_text())
>>> # Add exit condition to break the loop
>>> builder.add_edge(agent_b, agent_c, condition=lambda msg: "loop" not in msg.to_model_text())
Example Loop with multiple paths to the same node: A B C B:
>>> builder = GraphBuilder()
>>> builder.add_node(agent_a).add_node(agent_b).add_node(agent_c)
>>> builder.add_edge(agent_a, agent_b)
>>> builder.add_edge(agent_b, agent_c)
>>> builder.add_edge(agent_c, agent_b, activation_group="loop_back")
Example Loop with multiple paths to the same node with any activation condition: A B (C1, C2) B E(exit):
>>> builder = GraphBuilder()
>>> builder.add_node(agent_a).add_node(agent_b).add_node(agent_c1).add_node(agent_c2).add_node(agent_e)
>>> builder.add_edge(agent_a, agent_b)
>>> builder.add_edge(agent_b, agent_c1)
>>> builder.add_edge(agent_b, agent_c2)
>>> builder.add_edge(agent_b, agent_e, condition="exit")
>>> builder.add_edge(agent_c1, agent_b, activation_group="loop_back_group", activation_condition="any")
>>> builder.add_edge(agent_c2, agent_b, activation_group="loop_back_group", activation_condition="any")
"""
def __init__(self) -> None:
@ -97,6 +114,8 @@ class DiGraphBuilder:
source: Union[str, ChatAgent],
target: Union[str, ChatAgent],
condition: Optional[Union[str, Callable[[BaseChatMessage], bool]]] = None,
activation_group: Optional[str] = None,
activation_condition: Optional[Literal["all", "any"]] = None,
) -> "DiGraphBuilder":
"""Add a directed edge from source to target, optionally with a condition.
@ -120,8 +139,18 @@ class DiGraphBuilder:
raise ValueError(f"Source node '{source_name}' must be added before adding an edge.")
if target_name not in self.nodes:
raise ValueError(f"Target node '{target_name}' must be added before adding an edge.")
self.nodes[source_name].edges.append(DiGraphEdge(target=target_name, condition=condition))
if activation_group is None:
activation_group = target_name
if activation_condition is None:
activation_condition = "all"
self.nodes[source_name].edges.append(
DiGraphEdge(
target=target_name,
condition=condition,
activation_group=activation_group,
activation_condition=activation_condition,
)
)
return self
def add_conditional_edges(

View File

@ -1,4 +1,5 @@
import asyncio
import re
from typing import AsyncGenerator, List, Sequence
from unittest.mock import patch
@ -247,6 +248,32 @@ def test_cycle_detection_without_exit_condition() -> None:
graph.has_cycles_with_exit()
def test_different_activation_groups_detection() -> None:
"""Test different activation groups."""
graph = DiGraph(
nodes={
"A": DiGraphNode(
name="A",
edges=[
DiGraphEdge(target="B"),
DiGraphEdge(target="C"),
],
),
"B": DiGraphNode(name="B", edges=[DiGraphEdge(target="D", activation_condition="all")]),
"C": DiGraphNode(name="C", edges=[DiGraphEdge(target="D", activation_condition="any")]),
"D": DiGraphNode(name="D", edges=[]),
}
)
with pytest.raises(
ValueError,
match=re.escape(
"Conflicting activation conditions for target 'D' group 'D': "
"'all' (from node 'B') and 'any' (from node 'C')"
),
):
graph.graph_validate()
def test_validate_graph_success() -> None:
"""Test successful validation of a valid graph."""
graph = DiGraph(
@ -719,7 +746,7 @@ async def test_digraph_group_chat_loop_with_exit_condition(runtime: AgentRuntime
@pytest.mark.asyncio
async def test_digraph_group_chat_loop_with_exit_condition_2(runtime: AgentRuntime | None) -> None:
async def test_digraph_group_chat_loop_with_self_cycle(runtime: AgentRuntime | None) -> None:
# Agents A and C: Echo Agents
agent_a = _EchoAgent("A", description="Echo agent A")
agent_c = _EchoAgent("C", description="Echo agent C")
@ -740,7 +767,11 @@ async def test_digraph_group_chat_loop_with_exit_condition_2(runtime: AgentRunti
nodes={
"A": DiGraphNode(name="A", edges=[DiGraphEdge(target="B")]),
"B": DiGraphNode(
name="B", edges=[DiGraphEdge(target="C", condition="exit"), DiGraphEdge(target="B", condition="loop")]
name="B",
edges=[
DiGraphEdge(target="C", condition="exit"),
DiGraphEdge(target="B", condition="loop", activation_group="B_loop"),
],
),
"C": DiGraphNode(name="C", edges=[]),
},
@ -777,6 +808,96 @@ async def test_digraph_group_chat_loop_with_exit_condition_2(runtime: AgentRunti
assert result.messages[-1].source == _DIGRAPH_STOP_AGENT_NAME
@pytest.mark.asyncio
async def test_digraph_group_chat_loop_with_two_cycles(runtime: AgentRuntime | None) -> None:
# Agents A and C: Echo Agents
agent_a = _EchoAgent("A", description="Echo agent A")
agent_b = _EchoAgent("B", description="Echo agent B")
agent_c = _EchoAgent("C", description="Echo agent C")
agent_e = _EchoAgent("E", description="Echo agent E")
# Replay model client for agent B
model_client = ReplayChatCompletionClient(
chat_completions=[
"to_x", # First time O will branch to B
"to_o", # X will go back to O
"to_y", # Second time O will branch to C
"to_o", # Y will go back to O
"exit", # Third time O will say exit
]
)
# Agent o, b, c: Assistant Agent using Replay Client
agent_o = AssistantAgent("O", description="Decision agent o", model_client=model_client)
agent_x = AssistantAgent("X", description="Decision agent x", model_client=model_client)
agent_y = AssistantAgent("Y", description="Decision agent y", model_client=model_client)
# DiGraph:
#
# A
# / \
# B C
# \ |
# X = O = Y (bidirectional)
# |
# E(exit)
graph = DiGraph(
nodes={
"A": DiGraphNode(name="A", edges=[DiGraphEdge(target="B"), DiGraphEdge(target="C")]),
"B": DiGraphNode(
name="B", edges=[DiGraphEdge(target="O")]
), # default activation group name is same as target node name "O"
"C": DiGraphNode(
name="C", edges=[DiGraphEdge(target="O")]
), # default activation group name is same as target node name "O"
"O": DiGraphNode(
name="O",
edges=[
DiGraphEdge(target="X", condition="to_x"),
DiGraphEdge(target="Y", condition="to_y"),
DiGraphEdge(target="E", condition="exit"),
],
),
"X": DiGraphNode(name="X", edges=[DiGraphEdge(target="O", condition="to_o", activation_group="x_o_loop")]),
"Y": DiGraphNode(name="Y", edges=[DiGraphEdge(target="O", condition="to_o", activation_group="y_o_loop")]),
"E": DiGraphNode(name="E", edges=[]),
},
default_start_node="A",
)
team = GraphFlow(
participants=[agent_a, agent_o, agent_b, agent_c, agent_x, agent_y, agent_e],
graph=graph,
runtime=runtime,
termination_condition=MaxMessageTermination(20),
)
# Run
result = await team.run(task="Start")
# Assert message order
expected_sources = [
"user",
"A",
"B",
"C",
"O",
"X", # O -> X
"O", # X -> O
"Y", # O -> Y
"O", # Y -> O
"E", # O -> E
_DIGRAPH_STOP_AGENT_NAME,
]
actual_sources = [m.source for m in result.messages]
assert actual_sources == expected_sources
assert result.stop_reason is not None
assert result.messages[-2].source == "E"
assert any(m.content == "exit" for m in result.messages[:-1]) # type: ignore[attr-defined,union-attr]
assert result.messages[-1].source == _DIGRAPH_STOP_AGENT_NAME
@pytest.mark.asyncio
async def test_digraph_group_chat_parallel_join_any_1(runtime: AgentRuntime | None) -> None:
agent_a = _EchoAgent("A", description="Echo agent A")
@ -787,9 +908,9 @@ async def test_digraph_group_chat_parallel_join_any_1(runtime: AgentRuntime | No
graph = DiGraph(
nodes={
"A": DiGraphNode(name="A", edges=[DiGraphEdge(target="B"), DiGraphEdge(target="C")]),
"B": DiGraphNode(name="B", edges=[DiGraphEdge(target="D")]),
"C": DiGraphNode(name="C", edges=[DiGraphEdge(target="D")]),
"D": DiGraphNode(name="D", edges=[], activation="any"),
"B": DiGraphNode(name="B", edges=[DiGraphEdge(target="D", activation_group="any")]),
"C": DiGraphNode(name="C", edges=[DiGraphEdge(target="D", activation_group="any")]),
"D": DiGraphNode(name="D", edges=[]),
}
)

View File

@ -577,6 +577,220 @@
"# Run the flow and pretty print the output in the console\n",
"await Console(flow.run_stream(task=\"Brainstorm ways to reduce plastic waste.\"))"
]
},
{
"cell_type": "markdown",
"id": "4b39f9d6",
"metadata": {},
"source": [
"## 🔁 Advanced Example: Cycles With Activation Group Examples\n",
"\n",
"The following examples demonstrate how to use `activation_group` and `activation_condition` to handle complex dependency patterns in cyclic graphs, especially when multiple paths lead to the same target node."
]
},
{
"cell_type": "markdown",
"id": "791a4c47",
"metadata": {},
"source": [
"### Example 1: Loop with Multiple Paths - \"All\" Activation (A→B→C→B)\n",
"\n",
"In this scenario, we have A → B → C → B, where B has two incoming edges (from A and from C). By default, B requires **all** its dependencies to be satisfied before executing.\n",
"\n",
"This example shows a review loop where both the initial input (A) and the feedback (C) must be processed before B can execute again."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "384f5831",
"metadata": {},
"outputs": [],
"source": [
"from autogen_agentchat.agents import AssistantAgent\n",
"from autogen_agentchat.teams import DiGraphBuilder, GraphFlow\n",
"from autogen_agentchat.ui import Console\n",
"from autogen_ext.models.openai import OpenAIChatCompletionClient\n",
"\n",
"# Model client\n",
"client = OpenAIChatCompletionClient(model=\"gpt-4o-mini\")\n",
"\n",
"# Create agents for A→B→C→B→E scenario\n",
"agent_a = AssistantAgent(\"A\", model_client=client, system_message=\"Start the process and provide initial input.\")\n",
"agent_b = AssistantAgent(\n",
" \"B\",\n",
" model_client=client,\n",
" system_message=\"Process input from A or feedback from C. Say 'CONTINUE' if it's from A or 'STOP' if it's from C.\",\n",
")\n",
"agent_c = AssistantAgent(\"C\", model_client=client, system_message=\"Review B's output and provide feedback.\")\n",
"agent_e = AssistantAgent(\"E\", model_client=client, system_message=\"Finalize the process.\")\n",
"\n",
"# Build the graph with activation groups\n",
"builder = DiGraphBuilder()\n",
"builder.add_node(agent_a).add_node(agent_b).add_node(agent_c).add_node(agent_e)\n",
"\n",
"# A → B (initial path)\n",
"builder.add_edge(agent_a, agent_b, activation_group=\"initial\")\n",
"\n",
"# B → C\n",
"builder.add_edge(agent_b, agent_c, condition=\"CONTINUE\")\n",
"\n",
"# C → B (loop back - different activation group)\n",
"builder.add_edge(agent_c, agent_b, activation_group=\"feedback\")\n",
"\n",
"# B → E (exit condition)\n",
"builder.add_edge(agent_b, agent_e, condition=\"STOP\")\n",
"\n",
"# Build and create flow\n",
"graph = builder.build()\n",
"flow = GraphFlow(participants=[agent_a, agent_b, agent_c], graph=graph)\n",
"\n",
"print(\"=== Example 1: A→B→C→B with 'All' Activation ===\")\n",
"print(\"B will exit when it receives a message from C\")\n",
"# await Console(flow.run_stream(task=\"Start a review process for a document.\"))"
]
},
{
"cell_type": "markdown",
"id": "5dc08c64",
"metadata": {},
"source": [
"### Example 2: Loop with Multiple Paths - \"Any\" Activation (A→B→(C1,C2)→B)\n",
"\n",
"In this more complex scenario, we have A → B → (C1, C2) → B, where:\n",
"- B fans out to both C1 and C2 in parallel\n",
"- Both C1 and C2 feed back to B \n",
"- B uses \"any\" activation, meaning it executes as soon as **either** C1 or C2 completes\n",
"\n",
"This is useful for scenarios where you want the fastest response to trigger the next step.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "00f40293",
"metadata": {},
"outputs": [],
"source": [
"# Create agents for A→B→(C1,C2)→B scenario\n",
"agent_a2 = AssistantAgent(\"A\", model_client=client, system_message=\"Initiate a task that needs parallel processing.\")\n",
"agent_b2 = AssistantAgent(\n",
" \"B\",\n",
" model_client=client,\n",
" system_message=\"Coordinate parallel tasks. Say 'PROCESS' to start parallel work or 'DONE' to finish.\",\n",
")\n",
"agent_c1 = AssistantAgent(\"C1\", model_client=client, system_message=\"Handle task type 1. Say 'C1_COMPLETE' when done.\")\n",
"agent_c2 = AssistantAgent(\"C2\", model_client=client, system_message=\"Handle task type 2. Say 'C2_COMPLETE' when done.\")\n",
"agent_e = AssistantAgent(\"E\", model_client=client, system_message=\"Finalize the process.\")\n",
"\n",
"# Build the graph with \"any\" activation\n",
"builder2 = DiGraphBuilder()\n",
"builder2.add_node(agent_a2).add_node(agent_b2).add_node(agent_c1).add_node(agent_c2).add_node(agent_e)\n",
"\n",
"# A → B (initial)\n",
"builder2.add_edge(agent_a2, agent_b2)\n",
"\n",
"# B → C1 and B → C2 (parallel fan-out)\n",
"builder2.add_edge(agent_b2, agent_c1, condition=\"PROCESS\")\n",
"builder2.add_edge(agent_b2, agent_c2, condition=\"PROCESS\")\n",
"\n",
"# B → E (exit condition)\n",
"builder2.add_edge(agent_b2, agent_e, condition=lambda msg: \"DONE\" in msg.to_model_text())\n",
"\n",
"# C1 → B and C2 → B (both in same activation group with \"any\" condition)\n",
"builder2.add_edge(\n",
" agent_c1, agent_b2, activation_group=\"loop_back_group\", activation_condition=\"any\", condition=\"C1_COMPLETE\"\n",
")\n",
"\n",
"builder2.add_edge(\n",
" agent_c2, agent_b2, activation_group=\"loop_back_group\", activation_condition=\"any\", condition=\"C2_COMPLETE\"\n",
")\n",
"\n",
"# Build and create flow\n",
"graph2 = builder2.build()\n",
"flow2 = GraphFlow(participants=[agent_a2, agent_b2, agent_c1, agent_c2, agent_e], graph=graph2)\n",
"\n",
"print(\"=== Example 2: A→B→(C1,C2)→B with 'Any' Activation ===\")\n",
"print(\"B will execute as soon as EITHER C1 OR C2 completes (whichever finishes first)\")\n",
"# await Console(flow2.run_stream(task=\"Start a parallel processing task.\"))"
]
},
{
"cell_type": "markdown",
"id": "7c56cd2e",
"metadata": {},
"source": [
"### Example 3: Mixed Activation Groups\n",
"\n",
"This example shows how different activation groups can coexist in the same graph. We have a scenario where:\n",
"- Node D receives inputs from multiple sources with different activation requirements\n",
"- Some dependencies use \"all\" activation (must wait for all inputs)\n",
"- Other dependencies use \"any\" activation (proceed on first input)\n",
"\n",
"This pattern is useful for complex workflows where different types of dependencies have different urgency levels.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "97f75ba1",
"metadata": {},
"outputs": [],
"source": [
"# Create agents for mixed activation scenario\n",
"agent_a3 = AssistantAgent(\"A\", model_client=client, system_message=\"Provide critical input that must be processed.\")\n",
"agent_b3 = AssistantAgent(\"B\", model_client=client, system_message=\"Provide secondary critical input.\")\n",
"agent_c3 = AssistantAgent(\"C\", model_client=client, system_message=\"Provide optional quick input.\")\n",
"agent_d3 = AssistantAgent(\"D\", model_client=client, system_message=\"Process inputs based on different priority levels.\")\n",
"\n",
"# Build graph with mixed activation groups\n",
"builder3 = DiGraphBuilder()\n",
"builder3.add_node(agent_a3).add_node(agent_b3).add_node(agent_c3).add_node(agent_d3)\n",
"\n",
"# Critical inputs that must ALL be present (activation_group=\"critical\", activation_condition=\"all\")\n",
"builder3.add_edge(agent_a3, agent_d3, activation_group=\"critical\", activation_condition=\"all\")\n",
"builder3.add_edge(agent_b3, agent_d3, activation_group=\"critical\", activation_condition=\"all\")\n",
"\n",
"# Optional input that can trigger execution on its own (activation_group=\"optional\", activation_condition=\"any\")\n",
"builder3.add_edge(agent_c3, agent_d3, activation_group=\"optional\", activation_condition=\"any\")\n",
"\n",
"# Build and create flow\n",
"graph3 = builder3.build()\n",
"flow3 = GraphFlow(participants=[agent_a3, agent_b3, agent_c3, agent_d3], graph=graph3)\n",
"\n",
"print(\"=== Example 3: Mixed Activation Groups ===\")\n",
"print(\"D will execute when:\")\n",
"print(\"- BOTH A AND B complete (critical group with 'all' activation), OR\")\n",
"print(\"- C completes (optional group with 'any' activation)\")\n",
"print(\"This allows for both required dependencies and fast-path triggers.\")\n",
"# await Console(flow3.run_stream(task=\"Process inputs with mixed priority levels.\"))"
]
},
{
"cell_type": "markdown",
"id": "e329fe57",
"metadata": {},
"source": [
"### Key Takeaways for Activation Groups\n",
"\n",
"1. **`activation_group`**: Groups edges that point to the same target node, allowing you to define different dependency patterns.\n",
"\n",
"2. **`activation_condition`**: \n",
" - `\"all\"` (default): Target node waits for ALL edges in the group to be satisfied\n",
" - `\"any\"`: Target node executes as soon as ANY edge in the group is satisfied\n",
"\n",
"3. **Use Cases**:\n",
" - **Cycles with multiple entry points**: Different activation groups prevent conflicts\n",
" - **Priority-based execution**: Mix \"all\" and \"any\" conditions for different urgency levels \n",
" - **Parallel processing with early termination**: Use \"any\" to proceed with the fastest result\n",
"\n",
"4. **Best Practices**:\n",
" - Use descriptive group names (`\"critical\"`, `\"optional\"`, `\"feedback\"`, etc.)\n",
" - Keep activation conditions consistent within the same group\n",
" - Test your graph logic with different execution paths\n",
"\n",
"These patterns enable sophisticated workflow control while maintaining clear, understandable execution semantics."
]
}
],
"metadata": {