mirror of
				https://github.com/microsoft/autogen.git
				synced 2025-10-31 09:50:11 +00:00 
			
		
		
		
	 a14aeab6e4
			
		
	
	
		a14aeab6e4
		
			
		
	
	
	
	
		
			
			Resolves #5610 And address various questions regarding to how to use user proxy agent and human-in-the-loop.
		
			
				
	
	
		
			141 lines
		
	
	
		
			5.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			141 lines
		
	
	
		
			5.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from typing import List, cast
 | |
| 
 | |
| import chainlit as cl
 | |
| import yaml
 | |
| from autogen_agentchat.agents import AssistantAgent, UserProxyAgent
 | |
| from autogen_agentchat.base import TaskResult
 | |
| from autogen_agentchat.conditions import TextMentionTermination
 | |
| from autogen_agentchat.messages import ModelClientStreamingChunkEvent, TextMessage
 | |
| from autogen_agentchat.teams import RoundRobinGroupChat
 | |
| from autogen_core import CancellationToken
 | |
| from autogen_core.models import ChatCompletionClient
 | |
| 
 | |
| 
 | |
| async def user_input_func(prompt: str, cancellation_token: CancellationToken | None = None) -> str:
 | |
|     """Get user input from the UI for the user proxy agent."""
 | |
|     try:
 | |
|         response = await cl.AskUserMessage(content=prompt).send()
 | |
|     except TimeoutError:
 | |
|         return "User did not provide any input within the time limit."
 | |
|     if response:
 | |
|         return response["output"]  # type: ignore
 | |
|     else:
 | |
|         return "User did not provide any input."
 | |
| 
 | |
| 
 | |
| async def user_action_func(prompt: str, cancellation_token: CancellationToken | None = None) -> str:
 | |
|     """Get user action from the UI for the user proxy agent."""
 | |
|     try:
 | |
|         response = await cl.AskActionMessage(
 | |
|             content="Pick an action",
 | |
|             actions=[
 | |
|                 cl.Action(name="approve", label="Approve", payload={"value": "approve"}),
 | |
|                 cl.Action(name="reject", label="Reject", payload={"value": "reject"}),
 | |
|             ],
 | |
|         ).send()
 | |
|     except TimeoutError:
 | |
|         return "User did not provide any input within the time limit."
 | |
|     if response and response.get("payload"):  # type: ignore
 | |
|         if response.get("payload").get("value") == "approve":  # type: ignore
 | |
|             return "APPROVE."  # This is the termination condition.
 | |
|         else:
 | |
|             return "REJECT."
 | |
|     else:
 | |
|         return "User did not provide any input."
 | |
| 
 | |
| 
 | |
| @cl.on_chat_start  # type: ignore
 | |
| async def start_chat() -> None:
 | |
|     # Load model configuration and create the model client.
 | |
|     with open("model_config.yaml", "r") as f:
 | |
|         model_config = yaml.safe_load(f)
 | |
|     model_client = ChatCompletionClient.load_component(model_config)
 | |
| 
 | |
|     # Create the assistant agent.
 | |
|     assistant = AssistantAgent(
 | |
|         name="assistant",
 | |
|         model_client=model_client,
 | |
|         system_message="You are a helpful assistant.",
 | |
|         model_client_stream=True,  # Enable model client streaming.
 | |
|     )
 | |
| 
 | |
|     # Create the critic agent.
 | |
|     critic = AssistantAgent(
 | |
|         name="critic",
 | |
|         model_client=model_client,
 | |
|         system_message="You are a critic. Provide constructive feedback. "
 | |
|         "Respond with 'APPROVE' if your feedback has been addressed.",
 | |
|         model_client_stream=True,  # Enable model client streaming.
 | |
|     )
 | |
| 
 | |
|     # Create the user proxy agent.
 | |
|     user = UserProxyAgent(
 | |
|         name="user",
 | |
|         # input_func=user_input_func, # Uncomment this line to use user input as text.
 | |
|         input_func=user_action_func,  # Uncomment this line to use user input as action.
 | |
|     )
 | |
| 
 | |
|     # Termination condition.
 | |
|     termination = TextMentionTermination("APPROVE", sources=["user"])
 | |
| 
 | |
|     # Chain the assistant, critic and user agents using RoundRobinGroupChat.
 | |
|     group_chat = RoundRobinGroupChat([assistant, critic, user], termination_condition=termination)
 | |
| 
 | |
|     # Set the assistant agent in the user session.
 | |
|     cl.user_session.set("prompt_history", "")  # type: ignore
 | |
|     cl.user_session.set("team", group_chat)  # type: ignore
 | |
| 
 | |
| 
 | |
| @cl.set_starters  # type: ignore
 | |
| async def set_starts() -> List[cl.Starter]:
 | |
|     return [
 | |
|         cl.Starter(
 | |
|             label="Poem Writing",
 | |
|             message="Write a poem about the ocean.",
 | |
|         ),
 | |
|         cl.Starter(
 | |
|             label="Story Writing",
 | |
|             message="Write a story about a detective solving a mystery.",
 | |
|         ),
 | |
|         cl.Starter(
 | |
|             label="Write Code",
 | |
|             message="Write a function that merge two list of numbers into single sorted list.",
 | |
|         ),
 | |
|     ]
 | |
| 
 | |
| 
 | |
| @cl.on_message  # type: ignore
 | |
| async def chat(message: cl.Message) -> None:
 | |
|     # Get the team from the user session.
 | |
|     team = cast(RoundRobinGroupChat, cl.user_session.get("team"))  # type: ignore
 | |
|     # Streaming response message.
 | |
|     streaming_response: cl.Message | None = None
 | |
|     # Stream the messages from the team.
 | |
|     async for msg in team.run_stream(
 | |
|         task=[TextMessage(content=message.content, source="user")],
 | |
|         cancellation_token=CancellationToken(),
 | |
|     ):
 | |
|         if isinstance(msg, ModelClientStreamingChunkEvent):
 | |
|             # Stream the model client response to the user.
 | |
|             if streaming_response is None:
 | |
|                 # Start a new streaming response.
 | |
|                 streaming_response = cl.Message(content="", author=msg.source)
 | |
|             await streaming_response.stream_token(msg.content)
 | |
|         elif streaming_response is not None:
 | |
|             # Done streaming the model client response.
 | |
|             # We can skip the current message as it is just the complete message
 | |
|             # of the streaming response.
 | |
|             await streaming_response.send()
 | |
|             # Reset the streaming response so we won't enter this block again
 | |
|             # until the next streaming response is complete.
 | |
|             streaming_response = None
 | |
|         elif isinstance(msg, TaskResult):
 | |
|             # Send the task termination message.
 | |
|             final_message = "Task terminated. "
 | |
|             if msg.stop_reason:
 | |
|                 final_message += msg.stop_reason
 | |
|             await cl.Message(content=final_message).send()
 | |
|         else:
 | |
|             # Skip all other message types.
 | |
|             pass
 |