autogen/python/samples/agentchat_chainlit/app_team_user_proxy.py
Eric Zhu a14aeab6e4
doc & sample: Update documentation for human-in-the-loop and UserProxyAgent; Add UserProxyAgent to ChainLit sample; (#5656)
Resolves #5610

And address various questions regarding to how to use user proxy agent
and human-in-the-loop.
2025-02-25 01:41:15 +00:00

141 lines
5.5 KiB
Python

from typing import List, cast
import chainlit as cl
import yaml
from autogen_agentchat.agents import AssistantAgent, UserProxyAgent
from autogen_agentchat.base import TaskResult
from autogen_agentchat.conditions import TextMentionTermination
from autogen_agentchat.messages import ModelClientStreamingChunkEvent, TextMessage
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_core import CancellationToken
from autogen_core.models import ChatCompletionClient
async def user_input_func(prompt: str, cancellation_token: CancellationToken | None = None) -> str:
"""Get user input from the UI for the user proxy agent."""
try:
response = await cl.AskUserMessage(content=prompt).send()
except TimeoutError:
return "User did not provide any input within the time limit."
if response:
return response["output"] # type: ignore
else:
return "User did not provide any input."
async def user_action_func(prompt: str, cancellation_token: CancellationToken | None = None) -> str:
"""Get user action from the UI for the user proxy agent."""
try:
response = await cl.AskActionMessage(
content="Pick an action",
actions=[
cl.Action(name="approve", label="Approve", payload={"value": "approve"}),
cl.Action(name="reject", label="Reject", payload={"value": "reject"}),
],
).send()
except TimeoutError:
return "User did not provide any input within the time limit."
if response and response.get("payload"): # type: ignore
if response.get("payload").get("value") == "approve": # type: ignore
return "APPROVE." # This is the termination condition.
else:
return "REJECT."
else:
return "User did not provide any input."
@cl.on_chat_start # type: ignore
async def start_chat() -> None:
# Load model configuration and create the model client.
with open("model_config.yaml", "r") as f:
model_config = yaml.safe_load(f)
model_client = ChatCompletionClient.load_component(model_config)
# Create the assistant agent.
assistant = AssistantAgent(
name="assistant",
model_client=model_client,
system_message="You are a helpful assistant.",
model_client_stream=True, # Enable model client streaming.
)
# Create the critic agent.
critic = AssistantAgent(
name="critic",
model_client=model_client,
system_message="You are a critic. Provide constructive feedback. "
"Respond with 'APPROVE' if your feedback has been addressed.",
model_client_stream=True, # Enable model client streaming.
)
# Create the user proxy agent.
user = UserProxyAgent(
name="user",
# input_func=user_input_func, # Uncomment this line to use user input as text.
input_func=user_action_func, # Uncomment this line to use user input as action.
)
# Termination condition.
termination = TextMentionTermination("APPROVE", sources=["user"])
# Chain the assistant, critic and user agents using RoundRobinGroupChat.
group_chat = RoundRobinGroupChat([assistant, critic, user], termination_condition=termination)
# Set the assistant agent in the user session.
cl.user_session.set("prompt_history", "") # type: ignore
cl.user_session.set("team", group_chat) # type: ignore
@cl.set_starters # type: ignore
async def set_starts() -> List[cl.Starter]:
return [
cl.Starter(
label="Poem Writing",
message="Write a poem about the ocean.",
),
cl.Starter(
label="Story Writing",
message="Write a story about a detective solving a mystery.",
),
cl.Starter(
label="Write Code",
message="Write a function that merge two list of numbers into single sorted list.",
),
]
@cl.on_message # type: ignore
async def chat(message: cl.Message) -> None:
# Get the team from the user session.
team = cast(RoundRobinGroupChat, cl.user_session.get("team")) # type: ignore
# Streaming response message.
streaming_response: cl.Message | None = None
# Stream the messages from the team.
async for msg in team.run_stream(
task=[TextMessage(content=message.content, source="user")],
cancellation_token=CancellationToken(),
):
if isinstance(msg, ModelClientStreamingChunkEvent):
# Stream the model client response to the user.
if streaming_response is None:
# Start a new streaming response.
streaming_response = cl.Message(content="", author=msg.source)
await streaming_response.stream_token(msg.content)
elif streaming_response is not None:
# Done streaming the model client response.
# We can skip the current message as it is just the complete message
# of the streaming response.
await streaming_response.send()
# Reset the streaming response so we won't enter this block again
# until the next streaming response is complete.
streaming_response = None
elif isinstance(msg, TaskResult):
# Send the task termination message.
final_message = "Task terminated. "
if msg.stop_reason:
final_message += msg.stop_reason
await cl.Message(content=final_message).send()
else:
# Skip all other message types.
pass