Sample for integrating Core API with chainlit (#6422)

## Why are these changes needed?

This pull request adds new samples that integrates the Autogen Core API
with Chainlit. It closely follows the structure of the
Agentchat+Chainlit sample and provides examples for using a single agent
and multiple agents in a groupchat.
## Related issue number


Closes: #5345

---------

Co-authored-by: Eric Zhu <ekzhu@users.noreply.github.com>
This commit is contained in:
DavidYu00 2025-05-06 17:19:31 -04:00 committed by GitHub
parent 2864fbfc2c
commit bacfa98aa6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 628 additions and 0 deletions

View File

@ -0,0 +1,3 @@
model_config.yaml
.chainlit
chainlit.md

View File

@ -0,0 +1,64 @@
# Core ChainLit Integration Sample
In this sample, we will demonstrate how to build simple chat interface that
interacts with a [Core](https://microsoft.github.io/autogen/stable/user-guide/core-user-guide/index.html)
agent or a team, using [Chainlit](https://github.com/Chainlit/chainlit),
and support streaming messages.
## Overview
The `core_chainlit` sample is designed to illustrate a simple use case of ChainLit integrated with a single-threaded agent runtime. It includes the following components:
- **Single Agent**: A single agent that operates within the ChainLit environment.
- **Group Chat**: A group chat setup featuring two agents:
- **Assistant Agent**: This agent responds to user inputs.
- **Critic Agent**: This agent reflects on and critiques the responses from the Assistant Agent.
- **Closure Agent**: Utilizes a closure agent to aggregate output messages into an output queue.
- **Token Streaming**: Demonstrates how to stream tokens to the user interface.
- **Session Management**: Manages the runtime and output queue within the ChainLit user session.
## Requirements
To run this sample, you will need:
- Python 3.8 or higher
- Installation of necessary Python packages as listed in `requirements.txt`
## Installation
To run this sample, you will need to install the following packages:
```shell
pip install -U chainlit autogen-core autogen-ext[openai] pyyaml
```
To use other model providers, you will need to install a different extra
for the `autogen-ext` package.
See the [Models documentation](https://microsoft.github.io/autogen/stable/user-guide/agentchat-user-guide/tutorial/models.html) for more information.
## Model Configuration
Create a configuration file named `model_config.yaml` to configure the model
you want to use. Use `model_config_template.yaml` as a template.
## Running the Agent Sample
The first sample demonstrate how to interact with a single AssistantAgent
from the chat interface.
Note: cd to the sample directory.
```shell
chainlit run app_agent.py
```
## Running the Team Sample
The second sample demonstrate how to interact with a team of agents from the
chat interface.
```shell
chainlit run app_team.py -h
```
There are two agents in the team: one is instructed to be generally helpful
and the other one is instructed to be a critic and provide feedback.

View File

@ -0,0 +1,222 @@
from typing import AsyncGenerator, List, Optional
import asyncio
import json
from dataclasses import dataclass
from autogen_core import (
CancellationToken,
DefaultTopicId,
FunctionCall,
message_handler,
MessageContext,
RoutedAgent,
TopicId
)
from autogen_core.models import (
AssistantMessage,
ChatCompletionClient,
CreateResult,
LLMMessage,
SystemMessage,
UserMessage,
FunctionExecutionResult,
FunctionExecutionResultMessage
)
from autogen_core.tools import Tool
from pydantic import BaseModel
@dataclass
class Message:
content: str
class StreamResult(BaseModel):
content: str | CreateResult | AssistantMessage
source: str
class GroupChatMessage(BaseModel):
body: UserMessage
class RequestToSpeak(BaseModel):
pass
TASK_RESULTS_TOPIC_TYPE = "task-results"
task_results_topic_id = TopicId(type=TASK_RESULTS_TOPIC_TYPE, source="default")
class SimpleAssistantAgent(RoutedAgent):
def __init__(
self,
name: str,
system_message: str,
#context: MessageContext,
model_client: ChatCompletionClient,
tool_schema: List[Tool] = [],
model_client_stream: bool = False,
reflect_on_tool_use: bool | None = None,
group_chat_topic_type: str = "Default",
) -> None:
super().__init__(name)
self._system_message = SystemMessage(content=system_message)
self._model_client = model_client
self._tools = tool_schema
#self._model_context = context
self._model_client_stream = model_client_stream
self._reflect_on_tool_use = reflect_on_tool_use
self._group_chat_topic_type = group_chat_topic_type
self._chat_history: List[LLMMessage] = []
async def _call_model_client(
self, cancellation_token: CancellationToken
) -> AsyncGenerator[str | CreateResult, None]:
# Call the LLM model to process the message
model_result = None
async for chunk in self._model_client.create_stream(
messages=[self._system_message] + self._chat_history,
tools=self._tools,
cancellation_token=cancellation_token,
):
if isinstance(chunk, CreateResult):
model_result = chunk
elif isinstance(chunk, str):
yield chunk
else:
raise RuntimeError(f"Invalid chunk type: {type(chunk)}")
if model_result is None: # No final result in model client respons
raise RuntimeError("No final model result in streaming mode.")
yield model_result
return
async def _execute_tool_call(
self, call: FunctionCall, cancellation_token: CancellationToken
) -> FunctionExecutionResult:
# Find the tool by name.
tool = next((tool for tool in self._tools if tool.name == call.name), None)
assert tool is not None
# Run the tool and capture the result.
try:
arguments = json.loads(call.arguments)
result = await tool.run_json(arguments, cancellation_token)
return FunctionExecutionResult(
call_id=call.id, content=tool.return_value_as_string(result), is_error=False, name=tool.name
)
except Exception as e:
return FunctionExecutionResult(call_id=call.id, content=str(e), is_error=True, name=tool.name)
@message_handler
async def handle_user_message(self, message: UserMessage, ctx: MessageContext) -> Message:
# Append the message to chat history.
self._chat_history.append(
message
)
# Add message to model context.
# await self._model_context.add_message(UserMessage(content=message.content, source="User"))
model_result: Optional[CreateResult] = None
async for chunk in self._call_model_client(
cancellation_token=ctx.cancellation_token,
):
if isinstance(chunk, CreateResult):
model_result = chunk
elif isinstance(chunk, str):
# foward the stream tokent to the Queue
await self.runtime.publish_message(StreamResult(content=chunk, source=self.id.type), topic_id=task_results_topic_id)
else:
raise RuntimeError(f"Invalid chunk type: {type(chunk)}")
if model_result is None: # No final result in model client respons
raise RuntimeError("No final model result in streaming mode.")
# Add the first model create result to the session.
self._chat_history.append(AssistantMessage(content=model_result.content, source=self.id.type))
if isinstance(model_result.content, str): # No tools, return the result
await self.runtime.publish_message(StreamResult(content=model_result, source=self.id.type), topic_id=task_results_topic_id)
return (Message(content= model_result.content))
# Execute the tool calls.
assert isinstance(model_result.content, list) and all(
isinstance(call, FunctionCall) for call in model_result.content
)
results = await asyncio.gather(
*[self._execute_tool_call(call, ctx.cancellation_token) for call in model_result.content]
)
# Add the function execution results to the session.
self._chat_history.append(FunctionExecutionResultMessage(content=results))
#if (not self._reflect_on_tool_use):
# return Message(content=model_result.content)
# Run the chat completion client again to reflect on the history and function execution results.
#model_result = None
model_result2: Optional[CreateResult] = None
async for chunk in self._call_model_client(
cancellation_token=ctx.cancellation_token,
):
if isinstance(chunk, CreateResult):
model_result2 = chunk
elif isinstance(chunk, str):
# foward the stream tokent to the Queue
await self.runtime.publish_message(StreamResult(content=chunk, source=self.id.type), topic_id=task_results_topic_id)
else:
raise RuntimeError(f"Invalid chunk type: {type(chunk)}")
if model_result2 is None:
raise RuntimeError("No final model result in streaming mode.")
assert model_result2.content is not None
assert isinstance(model_result2.content, str)
await self.runtime.publish_message(StreamResult(content=model_result2, source=self.id.type), topic_id=task_results_topic_id)
return Message(content=model_result2.content)
# Message handler for Group chat message. It just add the message to the agent message history.
# The message will be processed when the agent receives the RequestToSpeak.
@message_handler
async def handle_message(self, message: GroupChatMessage, ctx: MessageContext) -> None:
self._chat_history.extend(
[
UserMessage(content=f"Transferred to {message.body.source}", source="system"),
message.body,
]
)
# Message handler for request to speaker message.
@message_handler
async def handle_request_to_speak(self, message: RequestToSpeak, ctx: MessageContext) -> None:
#print(f"### {self.id.type}: ")
self._chat_history.append(
UserMessage(content=f"Transferred to {self.id.type}, adopt the persona immediately.", source="system")
)
# Run the chat completion client again to reflect on the history and function execution results.
model_result: Optional[CreateResult] = None
async for chunk in self._call_model_client(
cancellation_token=ctx.cancellation_token,
):
if isinstance(chunk, CreateResult):
model_result = chunk
await self.runtime.publish_message(StreamResult(content=model_result, source=self.id.type), topic_id=task_results_topic_id)
elif isinstance(chunk, str):
# foward the stream tokent to the Queue
await self.runtime.publish_message(StreamResult(content=chunk, source=self.id.type), topic_id=task_results_topic_id)
else:
raise RuntimeError(f"Invalid chunk type: {type(chunk)}")
if model_result is None:
raise RuntimeError("No final model result in streaming mode.")
assert isinstance(model_result.content, str)
assert model_result.content is not None
self._chat_history.append(AssistantMessage(content=model_result.content, source=self.id.type))
#print(model_result.content, flush=True)
await self.publish_message(
GroupChatMessage(body=UserMessage(content=model_result.content, source=self.id.type)),
topic_id=DefaultTopicId(type=self._group_chat_topic_type),
)

View File

@ -0,0 +1,112 @@
from typing import List, cast
import chainlit as cl
import yaml
import asyncio
#from dataclasses import dataclass
from autogen_core import (
AgentId,
MessageContext,
SingleThreadedAgentRuntime,
ClosureAgent,
ClosureContext,
TopicId,
TypeSubscription
)
from autogen_core.models import (
ChatCompletionClient,
CreateResult,
UserMessage,
)
from autogen_core.tools import FunctionTool, Tool
#from autogen_ext.models.openai import OpenAIChatCompletionClient
#from autogen_core.model_context import BufferedChatCompletionContext
from SimpleAssistantAgent import SimpleAssistantAgent, StreamResult
TASK_RESULTS_TOPIC_TYPE = "task-results"
task_results_topic_id = TopicId(type=TASK_RESULTS_TOPIC_TYPE, source="default")
CLOSURE_AGENT_TYPE = "collect_result_agent"
@cl.set_starters # type: ignore
async def set_starts() -> List[cl.Starter]:
return [
cl.Starter(
label="Greetings",
message="Hello! What can you help me with today?",
),
cl.Starter(
label="Weather",
message="Find the weather in New York City.",
),
]
# Function called when closure agent receives message. It put the messages to the output queue
async def output_result(_agent: ClosureContext, message: StreamResult, ctx: MessageContext) -> None:
queue = cast(asyncio.Queue[StreamResult], cl.user_session.get("queue_stream")) # type: ignore
await queue.put(message)
@cl.step(type="tool") # type: ignore
async def get_weather(city: str) -> str:
return f"The weather in {city} is 73 degrees and Sunny."
@cl.on_chat_start # type: ignore
async def start_chat() -> None:
# Load model configuration and create the model client.
with open("model_config.yaml", "r") as f:
model_config = yaml.safe_load(f)
model_client = ChatCompletionClient.load_component(model_config)
#context = BufferedChatCompletionContext(buffer_size=10)
# Create a runtime and save to chainlit session
runtime = SingleThreadedAgentRuntime()
cl.user_session.set("run_time", runtime) # type: ignore
# Create tools
tools: List[Tool] = [FunctionTool(get_weather, description="Get weather tool.")]
# Create a queue for output stream data and save to chainlit session
queue_stream = asyncio.Queue[StreamResult]()
cl.user_session.set("queue_stream", queue_stream) # type: ignore
# Create the assistant agent with the get_weather tool.
await SimpleAssistantAgent.register(runtime, "weather_agent", lambda: SimpleAssistantAgent(
name="weather_agent",
tool_schema=tools,
model_client=model_client,
system_message="You are a helpful assistant",
#context=context,
model_client_stream=True, # Enable model client streaming.
reflect_on_tool_use=True, # Reflect on tool use.
))
# Register the Closure Agent to process streaming chunks from agents by exeucting the output_result
# function, whihc sends the stream response to the output queue
await ClosureAgent.register_closure(
runtime, CLOSURE_AGENT_TYPE, output_result, subscriptions=lambda:[TypeSubscription(topic_type=TASK_RESULTS_TOPIC_TYPE, agent_type=CLOSURE_AGENT_TYPE)]
)
runtime.start() # Start processing messages in the background.
@cl.on_message # type: ignore
async def chat(message: cl.Message) -> None:
# Construct the response message for the user message received.
ui_resp = cl.Message("")
# Get the runtime and queue from the session
runtime = cast(SingleThreadedAgentRuntime, cl.user_session.get("run_time")) # type: ignore
queue = cast(asyncio.Queue[StreamResult], cl.user_session.get("queue_stream")) # type: ignore
task1 = asyncio.create_task( runtime.send_message(UserMessage(content=message.content, source="User"), AgentId("weather_agent", "default")))
# Consume items from the response queue until the stream ends or an error occurs
while True:
stream_msg = await queue.get()
if (isinstance(stream_msg.content, str)):
await ui_resp.stream_token(stream_msg.content)
elif (isinstance(stream_msg.content, CreateResult)):
await ui_resp.send()
break
await task1

View File

@ -0,0 +1,201 @@
from typing import List, cast
import chainlit as cl
import yaml
import uuid
import string
import asyncio
from autogen_core import (
ClosureAgent,
ClosureContext,
DefaultTopicId,
MessageContext,
message_handler,
RoutedAgent,
SingleThreadedAgentRuntime,
TopicId,
TypeSubscription,
)
from autogen_core.models import (
AssistantMessage,
ChatCompletionClient,
CreateResult,
#LLMMessage,
UserMessage,
)
from SimpleAssistantAgent import SimpleAssistantAgent, StreamResult, GroupChatMessage, RequestToSpeak
assistant_topic_type = "assistant"
critic_topic_type = "critic"
group_chat_topic_type = "group_chat"
TASK_RESULTS_TOPIC_TYPE = "task-results"
task_results_topic_id = TopicId(type=TASK_RESULTS_TOPIC_TYPE, source="default")
CLOSURE_AGENT_TYPE = "collect_result_agent"
class GroupChatManager(RoutedAgent):
def __init__(
self,
participant_topic_types: List[str],
model_client: ChatCompletionClient,
) -> None:
super().__init__("Group chat manager")
self._participant_topic_types = participant_topic_types
self._model_client = model_client
self._chat_history: List[UserMessage] = []
self._previous_participant_idx = -1
@message_handler
async def handle_message(self, message: GroupChatMessage, ctx: MessageContext) -> None:
assert isinstance(message.body, UserMessage)
self._chat_history.append(message.body)
# If the message is an approval message from the user, stop the chat.
if message.body.source == "User":
assert isinstance(message.body.content, str)
if message.body.content.lower().strip(string.punctuation).endswith("approve"): # type: ignore
await self.runtime.publish_message(StreamResult(content="stop", source=self.id.type), topic_id=task_results_topic_id)
return
if message.body.source == "Critic":
#if ("approve" in message.body.content.lower().strip(string.punctuation)):
if message.body.content.lower().strip(string.punctuation).endswith("approve"): # type: ignore
stop_msg = AssistantMessage(content="Task Finished", source=self.id.type)
await self.runtime.publish_message(StreamResult(content=stop_msg, source=self.id.type), topic_id=task_results_topic_id)
return
# Simple round robin algorithm to call next client to speak
selected_topic_type: str
idx = self._previous_participant_idx +1
if (idx == len(self._participant_topic_types)):
idx = 0
selected_topic_type = self._participant_topic_types[idx]
self._previous_participant_idx = idx
# Send the RequestToSpeak message to next agent
await self.publish_message(RequestToSpeak(), DefaultTopicId(type=selected_topic_type))
# Function called when closure agent receives message. It put the messages to the output queue
async def output_result(_agent: ClosureContext, message: StreamResult, ctx: MessageContext) -> None:
queue = cast(asyncio.Queue[StreamResult], cl.user_session.get("queue_stream")) # type: ignore
await queue.put(message)
@cl.on_chat_start # type: ignore
async def start_chat() -> None:
# Load model configuration and create the model client.
with open("model_config.yaml", "r") as f:
model_config = yaml.safe_load(f)
model_client = ChatCompletionClient.load_component(model_config)
runtime = SingleThreadedAgentRuntime()
cl.user_session.set("run_time", runtime) # type: ignore
queue = asyncio.Queue[StreamResult]()
cl.user_session.set("queue_stream", queue) # type: ignore
# Create the assistant agent.
assistant_agent_type = await SimpleAssistantAgent.register(runtime, "Assistant", lambda: SimpleAssistantAgent(
name="Assistant",
group_chat_topic_type=group_chat_topic_type,
model_client=model_client,
system_message="You are a helpful assistant",
model_client_stream=True, # Enable model client streaming.
))
# Assistant agent listen to assistant topic and group chat topic
await runtime.add_subscription(TypeSubscription(topic_type=assistant_topic_type, agent_type=assistant_agent_type.type))
await runtime.add_subscription(TypeSubscription(topic_type=group_chat_topic_type, agent_type=assistant_agent_type.type))
# Create the critic agent.
critic_agent_type = await SimpleAssistantAgent.register(runtime, "Critic", lambda: SimpleAssistantAgent(
name="Critic",
group_chat_topic_type=group_chat_topic_type,
model_client=model_client,
system_message="You are a critic. Provide constructive feedback. Respond with 'APPROVE' if your feedback has been addressed.",
model_client_stream=True, # Enable model client streaming.
))
# Critic agent listen to critic topic and group chat topic
await runtime.add_subscription(TypeSubscription(topic_type=critic_topic_type, agent_type=critic_agent_type.type))
await runtime.add_subscription(TypeSubscription(topic_type=group_chat_topic_type, agent_type=critic_agent_type.type))
# Chain the assistant and critic agents using group_chat_manager.
group_chat_manager_type = await GroupChatManager.register(
runtime,
"group_chat_manager",
lambda: GroupChatManager(
participant_topic_types=[assistant_topic_type, critic_topic_type],
model_client=model_client,
),
)
await runtime.add_subscription(
TypeSubscription(topic_type=group_chat_topic_type, agent_type=group_chat_manager_type.type)
)
# Register the Closure Agent, it will place streamed response into the output queue by calling output_result function
await ClosureAgent.register_closure(
runtime, CLOSURE_AGENT_TYPE, output_result, subscriptions=lambda:[TypeSubscription(topic_type=TASK_RESULTS_TOPIC_TYPE, agent_type=CLOSURE_AGENT_TYPE)]
)
runtime.start() # Start processing messages in the background.
cl.user_session.set("prompt_history", "") # type: ignore
@cl.set_starters # type: ignore
async def set_starts() -> List[cl.Starter]:
return [
cl.Starter(
label="Poem Writing",
message="Write a poem about the ocean.",
),
cl.Starter(
label="Story Writing",
message="Write a story about a detective solving a mystery.",
),
cl.Starter(
label="Write Code",
message="Write a function that merge two list of numbers into single sorted list.",
),
]
async def pass_msg_to_ui() -> None:
queue = cast(asyncio.Queue[StreamResult], cl.user_session.get("queue_stream")) # type: ignore
ui_resp = cl.Message("")
first_message = True
while True:
stream_msg = await queue.get()
if (isinstance(stream_msg.content, str)):
if (first_message):
ui_resp = cl.Message(content= stream_msg.source + ": ")
first_message = False
await ui_resp.stream_token(stream_msg.content)
elif (isinstance(stream_msg.content, CreateResult)):
await ui_resp.send()
ui_resp = cl.Message("")
first_message = True
else:
# This is a stop meesage
if (stream_msg.content.content == "stop"):
break
break
@cl.on_message # type: ignore
async def chat(message: cl.Message) -> None:
# Construct the response message.
# Get the runtime and queue from the session
runtime = cast(SingleThreadedAgentRuntime, cl.user_session.get("run_time")) # type: ignore
queue = cast(asyncio.Queue[StreamResult], cl.user_session.get("queue_stream")) # type: ignore
output_msg = cl.Message(content="")
cl.user_session.set("output_msg", output_msg) # type: ignore
# Publish the user message to the Group Chat
session_id = str(uuid.uuid4())
await runtime.publish_message( GroupChatMessage( body=UserMessage(
content=message.content,
source="User",
)
),
TopicId(type=group_chat_topic_type, source=session_id),)
task1 = asyncio.create_task( pass_msg_to_ui())
await task1

View File

@ -0,0 +1,26 @@
# Use Open AI with key
provider: autogen_ext.models.openai.OpenAIChatCompletionClient
config:
model: gpt-4o
api_key: REPLACE_WITH_YOUR_API_KEY
# Use Azure Open AI with key
# provider: autogen_ext.models.openai.AzureOpenAIChatCompletionClient
# config:
# model: gpt-4o
# azure_endpoint: https://{your-custom-endpoint}.openai.azure.com/
# azure_deployment: {your-azure-deployment}
# api_version: {your-api-version}
# api_key: REPLACE_WITH_YOUR_API_KEY
# Use Azure OpenAI with AD token provider.
# provider: autogen_ext.models.openai.AzureOpenAIChatCompletionClient
# config:
# model: gpt-4o
# azure_endpoint: https://{your-custom-endpoint}.openai.azure.com/
# azure_deployment: {your-azure-deployment}
# api_version: {your-api-version}
# azure_ad_token_provider:
# provider: autogen_ext.auth.azure.AzureTokenProvider
# config:
# provider_kind: DefaultAzureCredential
# scopes:
# - https://cognitiveservices.azure.com/.default