from typing import List, cast import chainlit as cl import yaml from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.base import TaskResult from autogen_agentchat.conditions import TextMentionTermination from autogen_agentchat.messages import ModelClientStreamingChunkEvent, TextMessage from autogen_agentchat.teams import RoundRobinGroupChat from autogen_core import CancellationToken from autogen_core.models import ChatCompletionClient @cl.on_chat_start # type: ignore async def start_chat() -> None: # Load model configuration and create the model client. with open("model_config.yaml", "r") as f: model_config = yaml.safe_load(f) model_client = ChatCompletionClient.load_component(model_config) # Create the assistant agent. assistant = AssistantAgent( name="assistant", model_client=model_client, system_message="You are a helpful assistant.", model_client_stream=True, # Enable model client streaming. ) # Create the critic agent. critic = AssistantAgent( name="critic", model_client=model_client, system_message="You are a critic. Provide constructive feedback. " "Respond with 'APPROVE' if your feedback has been addressed.", model_client_stream=True, # Enable model client streaming. ) # Termination condition. termination = TextMentionTermination("APPROVE", sources=["critic"]) # Chain the assistant and critic agents using RoundRobinGroupChat. group_chat = RoundRobinGroupChat([assistant, critic], termination_condition=termination) # Set the assistant agent in the user session. cl.user_session.set("prompt_history", "") # type: ignore cl.user_session.set("team", group_chat) # type: ignore @cl.set_starters # type: ignore async def set_starts() -> List[cl.Starter]: return [ cl.Starter( label="Poem Writing", message="Write a poem about the ocean.", ), cl.Starter( label="Story Writing", message="Write a story about a detective solving a mystery.", ), cl.Starter( label="Write Code", message="Write a function that merge two list of numbers into single sorted list.", ), ] @cl.on_message # type: ignore async def chat(message: cl.Message) -> None: # Get the team from the user session. team = cast(RoundRobinGroupChat, cl.user_session.get("team")) # type: ignore # Streaming response message. streaming_response: cl.Message | None = None # Stream the messages from the team. async for msg in team.run_stream( task=[TextMessage(content=message.content, source="user")], cancellation_token=CancellationToken(), ): if isinstance(msg, ModelClientStreamingChunkEvent): # Stream the model client response to the user. if streaming_response is None: # Start a new streaming response. streaming_response = cl.Message(content=msg.source + ": ", author=msg.source) await streaming_response.stream_token(msg.content) elif streaming_response is not None: # Done streaming the model client response. # We can skip the current message as it is just the complete message # of the streaming response. await streaming_response.send() # Reset the streaming response so we won't enter this block again # until the next streaming response is complete. streaming_response = None elif isinstance(msg, TaskResult): # Send the task termination message. final_message = "Task terminated. " if msg.stop_reason: final_message += msg.stop_reason await cl.Message(content=final_message).send() else: # Skip all other message types. pass