mirror of
https://github.com/microsoft/autogen.git
synced 2025-07-05 16:10:50 +00:00

Resolves #5610 And address various questions regarding to how to use user proxy agent and human-in-the-loop.
141 lines
5.5 KiB
Python
141 lines
5.5 KiB
Python
from typing import List, cast
|
|
|
|
import chainlit as cl
|
|
import yaml
|
|
from autogen_agentchat.agents import AssistantAgent, UserProxyAgent
|
|
from autogen_agentchat.base import TaskResult
|
|
from autogen_agentchat.conditions import TextMentionTermination
|
|
from autogen_agentchat.messages import ModelClientStreamingChunkEvent, TextMessage
|
|
from autogen_agentchat.teams import RoundRobinGroupChat
|
|
from autogen_core import CancellationToken
|
|
from autogen_core.models import ChatCompletionClient
|
|
|
|
|
|
async def user_input_func(prompt: str, cancellation_token: CancellationToken | None = None) -> str:
|
|
"""Get user input from the UI for the user proxy agent."""
|
|
try:
|
|
response = await cl.AskUserMessage(content=prompt).send()
|
|
except TimeoutError:
|
|
return "User did not provide any input within the time limit."
|
|
if response:
|
|
return response["output"] # type: ignore
|
|
else:
|
|
return "User did not provide any input."
|
|
|
|
|
|
async def user_action_func(prompt: str, cancellation_token: CancellationToken | None = None) -> str:
|
|
"""Get user action from the UI for the user proxy agent."""
|
|
try:
|
|
response = await cl.AskActionMessage(
|
|
content="Pick an action",
|
|
actions=[
|
|
cl.Action(name="approve", label="Approve", payload={"value": "approve"}),
|
|
cl.Action(name="reject", label="Reject", payload={"value": "reject"}),
|
|
],
|
|
).send()
|
|
except TimeoutError:
|
|
return "User did not provide any input within the time limit."
|
|
if response and response.get("payload"): # type: ignore
|
|
if response.get("payload").get("value") == "approve": # type: ignore
|
|
return "APPROVE." # This is the termination condition.
|
|
else:
|
|
return "REJECT."
|
|
else:
|
|
return "User did not provide any input."
|
|
|
|
|
|
@cl.on_chat_start # type: ignore
|
|
async def start_chat() -> None:
|
|
# Load model configuration and create the model client.
|
|
with open("model_config.yaml", "r") as f:
|
|
model_config = yaml.safe_load(f)
|
|
model_client = ChatCompletionClient.load_component(model_config)
|
|
|
|
# Create the assistant agent.
|
|
assistant = AssistantAgent(
|
|
name="assistant",
|
|
model_client=model_client,
|
|
system_message="You are a helpful assistant.",
|
|
model_client_stream=True, # Enable model client streaming.
|
|
)
|
|
|
|
# Create the critic agent.
|
|
critic = AssistantAgent(
|
|
name="critic",
|
|
model_client=model_client,
|
|
system_message="You are a critic. Provide constructive feedback. "
|
|
"Respond with 'APPROVE' if your feedback has been addressed.",
|
|
model_client_stream=True, # Enable model client streaming.
|
|
)
|
|
|
|
# Create the user proxy agent.
|
|
user = UserProxyAgent(
|
|
name="user",
|
|
# input_func=user_input_func, # Uncomment this line to use user input as text.
|
|
input_func=user_action_func, # Uncomment this line to use user input as action.
|
|
)
|
|
|
|
# Termination condition.
|
|
termination = TextMentionTermination("APPROVE", sources=["user"])
|
|
|
|
# Chain the assistant, critic and user agents using RoundRobinGroupChat.
|
|
group_chat = RoundRobinGroupChat([assistant, critic, user], termination_condition=termination)
|
|
|
|
# Set the assistant agent in the user session.
|
|
cl.user_session.set("prompt_history", "") # type: ignore
|
|
cl.user_session.set("team", group_chat) # type: ignore
|
|
|
|
|
|
@cl.set_starters # type: ignore
|
|
async def set_starts() -> List[cl.Starter]:
|
|
return [
|
|
cl.Starter(
|
|
label="Poem Writing",
|
|
message="Write a poem about the ocean.",
|
|
),
|
|
cl.Starter(
|
|
label="Story Writing",
|
|
message="Write a story about a detective solving a mystery.",
|
|
),
|
|
cl.Starter(
|
|
label="Write Code",
|
|
message="Write a function that merge two list of numbers into single sorted list.",
|
|
),
|
|
]
|
|
|
|
|
|
@cl.on_message # type: ignore
|
|
async def chat(message: cl.Message) -> None:
|
|
# Get the team from the user session.
|
|
team = cast(RoundRobinGroupChat, cl.user_session.get("team")) # type: ignore
|
|
# Streaming response message.
|
|
streaming_response: cl.Message | None = None
|
|
# Stream the messages from the team.
|
|
async for msg in team.run_stream(
|
|
task=[TextMessage(content=message.content, source="user")],
|
|
cancellation_token=CancellationToken(),
|
|
):
|
|
if isinstance(msg, ModelClientStreamingChunkEvent):
|
|
# Stream the model client response to the user.
|
|
if streaming_response is None:
|
|
# Start a new streaming response.
|
|
streaming_response = cl.Message(content="", author=msg.source)
|
|
await streaming_response.stream_token(msg.content)
|
|
elif streaming_response is not None:
|
|
# Done streaming the model client response.
|
|
# We can skip the current message as it is just the complete message
|
|
# of the streaming response.
|
|
await streaming_response.send()
|
|
# Reset the streaming response so we won't enter this block again
|
|
# until the next streaming response is complete.
|
|
streaming_response = None
|
|
elif isinstance(msg, TaskResult):
|
|
# Send the task termination message.
|
|
final_message = "Task terminated. "
|
|
if msg.stop_reason:
|
|
final_message += msg.stop_reason
|
|
await cl.Message(content=final_message).send()
|
|
else:
|
|
# Skip all other message types.
|
|
pass
|