autogen/python/packages/autogen-agentchat/tests/test_sequential_routed_agent.py
Eric Zhu 58a5583549
feat: Pause and Resume for AgentChat Teams and Agents (#5887)
1. Add `on_pause` and `on_resume` API to `ChatAgent` to support pausing
behavior when running `on_message` concurrently.
2. Add `GroupChatPause` and `GroupChatResume` RPC events and handle them
in `ChatAgentContainer`.
3. Add `pause` and `resume` API to `BaseGroupChat` to allow for this
behavior accessible from the public API.
4. Improve `SequentialRoutedAgent` class to customize which message
types are sequentially handled, making it possible to have concurrent
handling for some messages (e.g., `GroupChatPause`).
5. Added unit tests. 

See `test_group_chat_pause_resume.py` for how to use this feature. 

What is the difference between pause/resume vs. termination and restart?
- Pause and resume issue direct RPC calls to the participanting agents
of a team while they are running, allowing putting the on-going
generation or actions on hold. This is useful when an agent's turn takes
a long time and multiple steps to complete, and user/application wants
to re-evaluate whether it is worth continue the step or cancel. This
also allows user/application to pause individual agents and resuming
them independently from the team API.
- Termination and restart requires the whole team to comes to a
full-stop, and termination conditions are checked in between agents'
turns. So termination can only happen when no agent is working on its
turn. It is possible that a termination condition has reached well
before the team is terminated, if the agent is taking a long time to
generate a response.

Resolves: #5881
2025-03-11 17:12:34 -07:00

48 lines
1.5 KiB
Python

import asyncio
import random
from dataclasses import dataclass
from typing import List
import pytest
from autogen_agentchat.teams._group_chat._sequential_routed_agent import SequentialRoutedAgent
from autogen_core import (
AgentId,
DefaultTopicId,
MessageContext,
SingleThreadedAgentRuntime,
default_subscription,
message_handler,
)
@dataclass
class Message:
content: str
@default_subscription
class _TestAgent(SequentialRoutedAgent):
def __init__(self, description: str) -> None:
super().__init__(description=description, sequential_message_types=[Message])
self.messages: List[Message] = []
@message_handler
async def handle_content_publish(self, message: Message, ctx: MessageContext) -> None:
# Sleep a random amount of time to simulate processing time.
await asyncio.sleep(random.random() / 100)
self.messages.append(message)
@pytest.mark.asyncio
async def test_sequential_routed_agent() -> None:
runtime = SingleThreadedAgentRuntime()
runtime.start()
await _TestAgent.register(runtime, type="test_agent", factory=lambda: _TestAgent(description="Test Agent"))
test_agent_id = AgentId(type="test_agent", key="default")
for i in range(100):
await runtime.publish_message(Message(content=f"{i}"), topic_id=DefaultTopicId())
await runtime.stop_when_idle()
test_agent = await runtime.try_get_underlying_agent_instance(test_agent_id, _TestAgent)
for i in range(100):
assert test_agent.messages[i].content == f"{i}"