mirror of
https://github.com/microsoft/autogen.git
synced 2025-07-04 23:50:39 +00:00

Resolves #4075 1. Introduce custom runtime parameter for all AgentChat teams (RoundRobinGroupChat, SelectorGroupChat, etc.). This is done by making sure each team's topics are isolated from other teams, and decoupling state from agent identities. Also, I removed the closure agent from the BaseGroupChat and use the group chat manager agent to relay messages to the output message queue. 2. Added unit tests to test scenarios with custom runtimes by using pytest fixture 3. Refactored existing unit tests to use ReplayChatCompletionClient with a few improvements to the client. 4. Fix a one-liner bug in AssistantAgent that caused deserialized agent to have handoffs. How to use it? ```python import asyncio from autogen_core import SingleThreadedAgentRuntime from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.teams import RoundRobinGroupChat from autogen_agentchat.conditions import TextMentionTermination from autogen_ext.models.replay import ReplayChatCompletionClient async def main() -> None: # Create a runtime runtime = SingleThreadedAgentRuntime() runtime.start() # Create a model client. model_client = ReplayChatCompletionClient( ["1", "2", "3", "4", "5", "6", "7", "8", "9", "10"], ) # Create agents agent1 = AssistantAgent("assistant1", model_client=model_client, system_message="You are a helpful assistant.") agent2 = AssistantAgent("assistant2", model_client=model_client, system_message="You are a helpful assistant.") # Create a termination condition termination_condition = TextMentionTermination("10", sources=["assistant1", "assistant2"]) # Create a team team = RoundRobinGroupChat([agent1, agent2], runtime=runtime, termination_condition=termination_condition) # Run the team stream = team.run_stream(task="Count to 10.") async for message in stream: print(message) # Save the state. state = await team.save_state() # Load the state to an existing team. await team.load_state(state) # Run the team again model_client.reset() stream = team.run_stream(task="Count to 10.") async for message in stream: print(message) # Create a new team, with the same agent names. agent3 = AssistantAgent("assistant1", model_client=model_client, system_message="You are a helpful assistant.") agent4 = AssistantAgent("assistant2", model_client=model_client, system_message="You are a helpful assistant.") new_team = RoundRobinGroupChat([agent3, agent4], runtime=runtime, termination_condition=termination_condition) # Load the state to the new team. await new_team.load_state(state) # Run the new team model_client.reset() new_stream = new_team.run_stream(task="Count to 10.") async for message in new_stream: print(message) # Stop the runtime await runtime.stop() asyncio.run(main()) ``` TODOs as future PRs: 1. Documentation. 2. How to handle errors in custom runtime when the agent has exception? --------- Co-authored-by: Ryan Sweet <rysweet@microsoft.com>
60 lines
2.9 KiB
Python
60 lines
2.9 KiB
Python
from typing import AsyncGenerator
|
|
|
|
import pytest
|
|
import pytest_asyncio
|
|
from autogen_agentchat.agents import AssistantAgent, SocietyOfMindAgent
|
|
from autogen_agentchat.conditions import MaxMessageTermination
|
|
from autogen_agentchat.teams import RoundRobinGroupChat
|
|
from autogen_core import AgentRuntime, SingleThreadedAgentRuntime
|
|
from autogen_ext.models.replay import ReplayChatCompletionClient
|
|
|
|
|
|
@pytest_asyncio.fixture(params=["single_threaded", "embedded"]) # type: ignore
|
|
async def runtime(request: pytest.FixtureRequest) -> AsyncGenerator[AgentRuntime | None, None]:
|
|
if request.param == "single_threaded":
|
|
runtime = SingleThreadedAgentRuntime()
|
|
runtime.start()
|
|
yield runtime
|
|
await runtime.stop()
|
|
elif request.param == "embedded":
|
|
yield None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_society_of_mind_agent(runtime: AgentRuntime | None) -> None:
|
|
model_client = ReplayChatCompletionClient(
|
|
["1", "2", "3"],
|
|
)
|
|
agent1 = AssistantAgent("assistant1", model_client=model_client, system_message="You are a helpful assistant.")
|
|
agent2 = AssistantAgent("assistant2", model_client=model_client, system_message="You are a helpful assistant.")
|
|
inner_termination = MaxMessageTermination(3)
|
|
inner_team = RoundRobinGroupChat([agent1, agent2], termination_condition=inner_termination, runtime=runtime)
|
|
society_of_mind_agent = SocietyOfMindAgent("society_of_mind", team=inner_team, model_client=model_client)
|
|
response = await society_of_mind_agent.run(task="Count to 10.")
|
|
assert len(response.messages) == 4
|
|
assert response.messages[0].source == "user"
|
|
assert response.messages[1].source == "assistant1"
|
|
assert response.messages[2].source == "assistant2"
|
|
assert response.messages[3].source == "society_of_mind"
|
|
|
|
# Test save and load state.
|
|
state = await society_of_mind_agent.save_state()
|
|
assert state is not None
|
|
agent1 = AssistantAgent("assistant1", model_client=model_client, system_message="You are a helpful assistant.")
|
|
agent2 = AssistantAgent("assistant2", model_client=model_client, system_message="You are a helpful assistant.")
|
|
inner_termination = MaxMessageTermination(3)
|
|
inner_team = RoundRobinGroupChat([agent1, agent2], termination_condition=inner_termination, runtime=runtime)
|
|
society_of_mind_agent2 = SocietyOfMindAgent("society_of_mind", team=inner_team, model_client=model_client)
|
|
await society_of_mind_agent2.load_state(state)
|
|
state2 = await society_of_mind_agent2.save_state()
|
|
assert state == state2
|
|
|
|
# Test serialization.
|
|
soc_agent_config = society_of_mind_agent.dump_component()
|
|
assert soc_agent_config.provider == "autogen_agentchat.agents.SocietyOfMindAgent"
|
|
|
|
# Test deserialization.
|
|
loaded_soc_agent = SocietyOfMindAgent.load_component(soc_agent_config)
|
|
assert isinstance(loaded_soc_agent, SocietyOfMindAgent)
|
|
assert loaded_soc_agent.name == "society_of_mind"
|