From 196c6169fc88804683f1359518b91726d76c2ee6 Mon Sep 17 00:00:00 2001 From: Mohammad Mazraeh Date: Sat, 16 Nov 2024 03:51:04 +0000 Subject: [PATCH] Feat add UI streaming to distributed group chat (#4181) * add ui streaming to distributed group chat example Signed-off-by: Mohammad Mazraeh * fix pyright error after updating dependencies Signed-off-by: Mohammad Mazraeh --------- Signed-off-by: Mohammad Mazraeh --- .../samples/distributed-group-chat/README.md | 32 +++--- .../samples/distributed-group-chat/_agents.py | 99 ++++++++++++++++--- .../samples/distributed-group-chat/_types.py | 31 +++++- .../distributed-group-chat/config.yaml | 12 ++- .../samples/distributed-group-chat/run.sh | 15 +-- .../run_editor_agent.py | 7 +- .../run_group_chat_manager.py | 61 +++++------- .../samples/distributed-group-chat/run_ui.py | 67 +++++++++++++ .../run_writer_agent.py | 5 +- 9 files changed, 254 insertions(+), 75 deletions(-) create mode 100644 python/packages/autogen-core/samples/distributed-group-chat/run_ui.py diff --git a/python/packages/autogen-core/samples/distributed-group-chat/README.md b/python/packages/autogen-core/samples/distributed-group-chat/README.md index 43d17ee0e..b4cf16583 100644 --- a/python/packages/autogen-core/samples/distributed-group-chat/README.md +++ b/python/packages/autogen-core/samples/distributed-group-chat/README.md @@ -31,7 +31,7 @@ The [run.sh](./run.sh) file provides commands to run the host and agents using [ Here is a screen recording of the execution: -[![Distributed Group Chat Demo with Simple UI Integration](https://img.youtube.com/vi/kLTzI-3VgPQ/0.jpg)](https://youtu.be/kLTzI-3VgPQ) +[![Distributed Group Chat Demo with Simple UI Integration](https://img.youtube.com/vi/503QJ1onV8I/0.jpg)](https://youtu.be/503QJ1onV8I?feature=shared) **Note**: Some `asyncio.sleep` commands have been added to the example code to make the `./run.sh` execution look sequential and visually easy to follow. In practice, these lines are not necessary. @@ -40,14 +40,16 @@ Here is a screen recording of the execution: If you prefer to run Python files individually, follow these steps. Note that each step must be run in a different terminal process, and the virtual environment should be activated using `source .venv/bin/activate`. 1. `python run_host.py`: Starts the host and listens for agent connections. -2. `python run_editor.py`: Starts the editor agent and connects it to the host. -3. `python run_writer.py`: Starts the writer agent and connects it to the host. -4. `chainlit run run_group_chat_manager.py --port 8001`: Run chainlit app which starts group chat manager agent and sends the initial message to start the conversation. We're using port 8001 as the default port 8000 is used to run host (assuming using same machine to run all of the agents) +2. `chainlit run run_ui.py --port 8001`: Starts the Chainlit app and UI agent and listens on UI topic to display messages. We're using port 8001 as the default port 8000 is used to run host (assuming using same machine to run all of the agents) +3. `python run_editor.py`: Starts the editor agent and connects it to the host. +4. `python run_writer.py`: Starts the writer agent and connects it to the host. +5. `python run_group_chat_manager.py`: Run chainlit app which starts group chat manager agent and sends the initial message to start the conversation. ## What's Going On? The general flow of this example is as follows: +0. The UI Agent runs starts the UI App, listens for stream of messages in the UI topic and displays them in the UI. 1. The Group Chat Manager, on behalf of `User`, sends a `RequestToSpeak` request to the `writer_agent`. 2. The `writer_agent` writes a short sentence into the group chat topic. 3. The `editor_agent` receives the message in the group chat topic and updates its memory. @@ -64,41 +66,49 @@ graph TD; A1[GRPC Server] wt[Writer Topic] et[Editor Topic] + ut[UI Topic] gct[Group Chat Topic] end + all_agents[All Agents - Simplified Arrows!] --> A1 subgraph Distributed Writer Runtime - writer_agent[ Writer Agent] --> A1 wt -.->|2 - Subscription| writer_agent gct -.->|4 - Subscription| writer_agent - writer_agent -.->|3 - Publish: Group Chat Message| gct + writer_agent -.->|3.1 - Publish: UI Message| ut + writer_agent -.->|3.2 - Publish: Group Chat Message| gct end subgraph Distributed Editor Runtime - editor_agent[ Editor Agent] --> A1 et -.->|6 - Subscription| editor_agent gct -.->|4 - Subscription| editor_agent - editor_agent -.->|7 - Publish: Group Chat Message| gct + editor_agent -.->|7.1 - Publish: UI Message| ut + editor_agent -.->|7.2 - Publish: Group Chat Message| gct end subgraph Distributed Group Chat Manager Runtime - group_chat_manager[ Group Chat Manager Agent] --> A1 gct -.->|4 - Subscription| group_chat_manager group_chat_manager -.->|1 - Request To Speak| wt group_chat_manager -.->|5 - Request To Speak| et + group_chat_manager -.->|\* - Publish Some of to UI Message| ut end + subgraph Distributed UI Runtime + ut -.->|\* - Subscription| ui_agent + end + + style wt fill:#beb2c3,color:#000 style et fill:#beb2c3,color:#000 style gct fill:#beb2c3,color:#000 + style ut fill:#beb2c3,color:#000 style writer_agent fill:#b7c4d7,color:#000 style editor_agent fill:#b7c4d7,color:#000 style group_chat_manager fill:#b7c4d7,color:#000 + style ui_agent fill:#b7c4d7,color:#000 ``` ## TODO: - [ ] Properly handle chat restarts. It complains about group chat manager being already registered -- [ ] Send Chainlit messages within each agent (Currently the manager can just sends messages in the group chat topic) -- [ ] Add streaming to the UI like [this example](https://docs.chainlit.io/advanced-features/streaming) but Autogen's Open AI Client [does not supporting streaming yet](https://github.com/microsoft/autogen/blob/0f4dd0cc6dd3eea303ad3d2063979b4b9a1aacfc/python/packages/autogen-ext/src/autogen_ext/models/_openai/_openai_client.py#L81) +- [ ] Add streaming to the UI like [this example](https://docs.chainlit.io/advanced-features/streaming) when [this bug](https://github.com/microsoft/autogen/issues/4213) is resolved diff --git a/python/packages/autogen-core/samples/distributed-group-chat/_agents.py b/python/packages/autogen-core/samples/distributed-group-chat/_agents.py index 89ac74555..8ff935600 100644 --- a/python/packages/autogen-core/samples/distributed-group-chat/_agents.py +++ b/python/packages/autogen-core/samples/distributed-group-chat/_agents.py @@ -1,6 +1,10 @@ +import asyncio +import random from typing import Awaitable, Callable, List +from uuid import uuid4 -from _types import GroupChatMessage, RequestToSpeak +from _types import GroupChatMessage, MessageChunk, RequestToSpeak, UIAgentConfig +from autogen_core.application import WorkerAgentRuntime from autogen_core.base import MessageContext from autogen_core.components import DefaultTopicId, RoutedAgent, message_handler from autogen_core.components.models import ( @@ -23,12 +27,15 @@ class BaseGroupChatAgent(RoutedAgent): group_chat_topic_type: str, model_client: ChatCompletionClient, system_message: str, + ui_config: UIAgentConfig, ) -> None: super().__init__(description=description) self._group_chat_topic_type = group_chat_topic_type self._model_client = model_client self._system_message = SystemMessage(system_message) self._chat_history: List[LLMMessage] = [] + self._ui_config = ui_config + self.console = Console() @message_handler async def handle_message(self, message: GroupChatMessage, ctx: MessageContext) -> None: @@ -47,11 +54,16 @@ class BaseGroupChatAgent(RoutedAgent): completion = await self._model_client.create([self._system_message] + self._chat_history) assert isinstance(completion.content, str) self._chat_history.append(AssistantMessage(content=completion.content, source=self.id.type)) - Console().print(Markdown(f"**{self.id.type}**: {completion.content}\n")) - await self.publish_message( - GroupChatMessage(body=UserMessage(content=completion.content, source=self.id.type)), - topic_id=DefaultTopicId(type=self._group_chat_topic_type), + console_message = f"\n{'-'*80}\n**{self.id.type}**: {completion.content}" + self.console.print(Markdown(console_message)) + + await publish_message_to_ui_and_backend( + runtime=self, + source=self.id.type, + user_message=completion.content, + ui_config=self._ui_config, + group_chat_topic_type=self._group_chat_topic_type, ) @@ -61,7 +73,7 @@ class GroupChatManager(RoutedAgent): model_client: ChatCompletionClient, participant_topic_types: List[str], participant_descriptions: List[str], - on_message_func: Callable[[str, str], Awaitable[None]], + ui_config: UIAgentConfig, max_rounds: int = 3, ) -> None: super().__init__("Group chat manager") @@ -71,14 +83,14 @@ class GroupChatManager(RoutedAgent): self._chat_history: List[GroupChatMessage] = [] self._max_rounds = max_rounds self.console = Console() - self._on_message_func = on_message_func self._participant_descriptions = participant_descriptions self._previous_participant_topic_type: str | None = None + self._ui_config = ui_config @message_handler async def handle_message(self, message: GroupChatMessage, ctx: MessageContext) -> None: assert isinstance(message.body, UserMessage) - await self._on_message_func(message.body.content, message.body.source) # type: ignore[arg-type] + self._chat_history.append(message.body) # type: ignore[reportargumenttype,arg-type] # Format message history. @@ -117,11 +129,17 @@ Read the above conversation. Then select the next role from {participants} to pl """ system_message = SystemMessage(selector_prompt) completion = await self._model_client.create([system_message], cancellation_token=ctx.cancellation_token) - assert isinstance(completion.content, str) + + assert isinstance( + completion.content, str + ), f"Completion content must be a string, but is: {type(completion.content)}" if completion.content.upper() == "FINISH": - manager_message = f"\n{'-'*80}\n Manager ({id(self)}): I think it's enough iterations on the story! Thanks for collaborating!" - await self._on_message_func(manager_message, "group_chat_manager") + finish_msg = "I think it's enough iterations on the story! Thanks for collaborating!" + manager_message = f"\n{'-'*80}\n Manager ({id(self)}): {finish_msg}" + await publish_message_to_ui( + runtime=self, source=self.id.type, user_message=finish_msg, ui_config=self._ui_config + ) self.console.print(Markdown(manager_message)) return @@ -136,3 +154,62 @@ Read the above conversation. Then select the next role from {participants} to pl await self.publish_message(RequestToSpeak(), DefaultTopicId(type=selected_topic_type)) return raise ValueError(f"Invalid role selected: {completion.content}") + + +class UIAgent(RoutedAgent): + """Handles UI-related tasks and message processing for the distributed group chat system.""" + + def __init__(self, on_message_chunk_func: Callable[[MessageChunk], Awaitable[None]]) -> None: + super().__init__("UI Agent") + self._on_message_chunk_func = on_message_chunk_func + + @message_handler + async def handle_message_chunk(self, message: MessageChunk, ctx: MessageContext) -> None: + await self._on_message_chunk_func(message) + + +async def publish_message_to_ui( + runtime: RoutedAgent | WorkerAgentRuntime, + source: str, + user_message: str, + ui_config: UIAgentConfig, +) -> None: + message_id = str(uuid4()) + # Stream the message to UI + message_chunks = ( + MessageChunk(message_id=message_id, text=token + " ", author=source, finished=False) + for token in user_message.split() + ) + for chunk in message_chunks: + await runtime.publish_message( + chunk, + DefaultTopicId(type=ui_config.topic_type), + ) + await asyncio.sleep(random.uniform(ui_config.min_delay, ui_config.max_delay)) + + await runtime.publish_message( + MessageChunk(message_id=message_id, text=" ", author=source, finished=True), + DefaultTopicId(type=ui_config.topic_type), + ) + + +async def publish_message_to_ui_and_backend( + runtime: RoutedAgent | WorkerAgentRuntime, + source: str, + user_message: str, + ui_config: UIAgentConfig, + group_chat_topic_type: str, +) -> None: + # Publish messages for ui + await publish_message_to_ui( + runtime=runtime, + source=source, + user_message=user_message, + ui_config=ui_config, + ) + + # Publish message to backend + await runtime.publish_message( + GroupChatMessage(body=UserMessage(content=user_message, source=source)), + topic_id=DefaultTopicId(type=group_chat_topic_type), + ) diff --git a/python/packages/autogen-core/samples/distributed-group-chat/_types.py b/python/packages/autogen-core/samples/distributed-group-chat/_types.py index 343c264f1..178446ca8 100644 --- a/python/packages/autogen-core/samples/distributed-group-chat/_types.py +++ b/python/packages/autogen-core/samples/distributed-group-chat/_types.py @@ -1,3 +1,6 @@ +from dataclasses import dataclass +from typing import Dict + from autogen_core.components.models import ( LLMMessage, ) @@ -17,6 +20,17 @@ class RequestToSpeak(BaseModel): pass +@dataclass +class MessageChunk: + message_id: str + text: str + author: str + finished: bool + + def __str__(self) -> str: + return f"{self.author}({self.message_id}): {self.text}" + + # Define Host configuration model class HostConfig(BaseModel): hostname: str @@ -40,10 +54,25 @@ class ChatAgentConfig(BaseModel): system_message: str +# Define UI Agent configuration model +class UIAgentConfig(BaseModel): + topic_type: str + artificial_stream_delay_seconds: Dict[str, float] + + @property + def min_delay(self) -> float: + return self.artificial_stream_delay_seconds.get("min", 0.0) + + @property + def max_delay(self) -> float: + return self.artificial_stream_delay_seconds.get("max", 0.0) + + # Define the overall AppConfig model class AppConfig(BaseModel): host: HostConfig group_chat_manager: GroupChatManagerConfig writer_agent: ChatAgentConfig editor_agent: ChatAgentConfig - client_config: AzureOpenAIClientConfiguration = None # type: ignore[assignment] # This was required to do custom instantiation in `load_config`` + ui_agent: UIAgentConfig + client_config: AzureOpenAIClientConfiguration = None # type: ignore[assignment] # This was required to do custom instantiation in `load_config` diff --git a/python/packages/autogen-core/samples/distributed-group-chat/config.yaml b/python/packages/autogen-core/samples/distributed-group-chat/config.yaml index 26eaf5b3b..f18b45455 100644 --- a/python/packages/autogen-core/samples/distributed-group-chat/config.yaml +++ b/python/packages/autogen-core/samples/distributed-group-chat/config.yaml @@ -4,17 +4,23 @@ host: group_chat_manager: topic_type: "group_chat" - max_rounds: 7 + max_rounds: 3 writer_agent: topic_type: "Writer" description: "Writer for creating any text content." - system_message: "You are a one sentence Writer and provide one line content each time" + system_message: "You are a one sentence Writer and provide one sentence content each time" editor_agent: topic_type: "Editor" description: "Editor for planning and reviewing the content." - system_message: "You are an Editor. You provide just max 10 words as feedback on writers content." + system_message: "You are an Editor. You provide just max 15 words as feedback on writers content." + +ui_agent: + topic_type: "ui_events" + artificial_stream_delay_seconds: + min: 0.05 + max: 0.1 client_config: model: "gpt-4o" diff --git a/python/packages/autogen-core/samples/distributed-group-chat/run.sh b/python/packages/autogen-core/samples/distributed-group-chat/run.sh index 859096e94..d4b8c1b1b 100755 --- a/python/packages/autogen-core/samples/distributed-group-chat/run.sh +++ b/python/packages/autogen-core/samples/distributed-group-chat/run.sh @@ -5,12 +5,14 @@ tmux new-session -d -s distributed_group_chat # # Split the terminal into 2 vertical panes tmux split-window -h -# # Split the left pane horizontally +# # Split the left pane into 3 windows tmux select-pane -t distributed_group_chat:0.0 -tmux split-window -v +tmux split-window -v +tmux select-pane -t distributed_group_chat:0.0 +tmux split-window -v # # Split the right pane horizontally -tmux select-pane -t distributed_group_chat:0.2 +tmux select-pane -t distributed_group_chat:0.3 tmux split-window -v # Select the first pane to start @@ -18,9 +20,10 @@ tmux select-pane -t distributed_group_chat:0.0 # Activate the virtual environment and run the scripts in each pane tmux send-keys -t distributed_group_chat:0.0 "python run_host.py" C-m -tmux send-keys -t distributed_group_chat:0.2 "python run_writer_agent.py" C-m -tmux send-keys -t distributed_group_chat:0.3 "python run_editor_agent.py" C-m -tmux send-keys -t distributed_group_chat:0.1 "chainlit run run_group_chat_manager.py --port 8001" C-m +tmux send-keys -t distributed_group_chat:0.1 "chainlit run run_ui.py --port 8001" C-m +tmux send-keys -t distributed_group_chat:0.3 "python run_writer_agent.py" C-m +tmux send-keys -t distributed_group_chat:0.4 "python run_editor_agent.py" C-m +tmux send-keys -t distributed_group_chat:0.2 "python run_group_chat_manager.py" C-m # # Attach to the session tmux attach-session -t distributed_group_chat diff --git a/python/packages/autogen-core/samples/distributed-group-chat/run_editor_agent.py b/python/packages/autogen-core/samples/distributed-group-chat/run_editor_agent.py index a452db6b9..8a08bfe6b 100644 --- a/python/packages/autogen-core/samples/distributed-group-chat/run_editor_agent.py +++ b/python/packages/autogen-core/samples/distributed-group-chat/run_editor_agent.py @@ -3,13 +3,13 @@ import logging import warnings from _agents import BaseGroupChatAgent -from _types import AppConfig, GroupChatMessage, RequestToSpeak +from _types import AppConfig, GroupChatMessage, MessageChunk, RequestToSpeak from _utils import get_serializers, load_config, set_all_log_levels from autogen_core.application import WorkerAgentRuntime from autogen_core.components import ( TypeSubscription, ) -from autogen_core.components.models._openai_client import AzureOpenAIChatCompletionClient +from autogen_ext.models import AzureOpenAIChatCompletionClient from rich.console import Console from rich.markdown import Markdown @@ -17,7 +17,7 @@ from rich.markdown import Markdown async def main(config: AppConfig): set_all_log_levels(logging.ERROR) editor_agent_runtime = WorkerAgentRuntime(host_address=config.host.address) - editor_agent_runtime.add_message_serializer(get_serializers([RequestToSpeak, GroupChatMessage])) # type: ignore[arg-type] + editor_agent_runtime.add_message_serializer(get_serializers([RequestToSpeak, GroupChatMessage, MessageChunk])) # type: ignore[arg-type] await asyncio.sleep(4) Console().print(Markdown("Starting **`Editor Agent`**")) editor_agent_runtime.start() @@ -29,6 +29,7 @@ async def main(config: AppConfig): group_chat_topic_type=config.group_chat_manager.topic_type, system_message=config.editor_agent.system_message, model_client=AzureOpenAIChatCompletionClient(**config.client_config), + ui_config=config.ui_agent, ), ) await editor_agent_runtime.add_subscription( diff --git a/python/packages/autogen-core/samples/distributed-group-chat/run_group_chat_manager.py b/python/packages/autogen-core/samples/distributed-group-chat/run_group_chat_manager.py index 5af446ab1..e0b2880aa 100644 --- a/python/packages/autogen-core/samples/distributed-group-chat/run_group_chat_manager.py +++ b/python/packages/autogen-core/samples/distributed-group-chat/run_group_chat_manager.py @@ -2,18 +2,13 @@ import asyncio import logging import warnings -import chainlit as cl # type: ignore [reportUnknownMemberType] # This dependency is installed through instructions -from _agents import GroupChatManager -from _types import AppConfig, GroupChatMessage, RequestToSpeak +from _agents import GroupChatManager, publish_message_to_ui, publish_message_to_ui_and_backend +from _types import AppConfig, GroupChatMessage, MessageChunk, RequestToSpeak from _utils import get_serializers, load_config, set_all_log_levels from autogen_core.application import WorkerAgentRuntime from autogen_core.components import ( - DefaultTopicId, TypeSubscription, ) -from autogen_core.components.models import ( - UserMessage, -) from autogen_ext.models import AzureOpenAIChatCompletionClient from rich.console import Console from rich.markdown import Markdown @@ -21,22 +16,16 @@ from rich.markdown import Markdown set_all_log_levels(logging.ERROR) -# TODO: This is the simple hack to send messages to the UI, needs to be improved once we get some help in https://github.com/Chainlit/chainlit/issues/1491 -async def send_cl(msg: str, author: str) -> None: - await cl.Message(content=msg, author=author).send() # type: ignore [reportAttributeAccessIssue,reportUnknownMemberType] - - async def main(config: AppConfig): set_all_log_levels(logging.ERROR) group_chat_manager_runtime = WorkerAgentRuntime(host_address=config.host.address) - # Add group chat manager runtime - - group_chat_manager_runtime.add_message_serializer(get_serializers([RequestToSpeak, GroupChatMessage])) # type: ignore[arg-type] + group_chat_manager_runtime.add_message_serializer(get_serializers([RequestToSpeak, GroupChatMessage, MessageChunk])) # type: ignore[arg-type] await asyncio.sleep(1) Console().print(Markdown("Starting **`Group Chat Manager`**")) group_chat_manager_runtime.start() set_all_log_levels(logging.ERROR) + group_chat_manager_type = await GroupChatManager.register( group_chat_manager_runtime, "group_chat_manager", @@ -45,7 +34,7 @@ async def main(config: AppConfig): participant_topic_types=[config.writer_agent.topic_type, config.editor_agent.topic_type], participant_descriptions=[config.writer_agent.description, config.editor_agent.description], max_rounds=config.group_chat_manager.max_rounds, - on_message_func=send_cl, + ui_config=config.ui_agent, ), ) @@ -53,36 +42,32 @@ async def main(config: AppConfig): TypeSubscription(topic_type=config.group_chat_manager.topic_type, agent_type=group_chat_manager_type.type) ) - # This is a simple way to make sure first message gets send after all of the agents have joined await asyncio.sleep(5) - user_message: str = "Please write a one line story about the gingerbread in halloween!" + + await publish_message_to_ui( + runtime=group_chat_manager_runtime, + source="System", + user_message="[ **Due to responsible AI considerations of this sample, group chat manager is sending an initiator message on behalf of user** ]", + ui_config=config.ui_agent, + ) + await asyncio.sleep(3) + + user_message: str = "Please write a short story about the gingerbread in halloween!" Console().print(f"Simulating User input in group chat topic:\n\t'{user_message}'") - await group_chat_manager_runtime.publish_message( - GroupChatMessage( - body=UserMessage( - content=user_message, - source="User", - ) - ), - DefaultTopicId(type=config.group_chat_manager.topic_type), + + await publish_message_to_ui_and_backend( + runtime=group_chat_manager_runtime, + source="User", + user_message=user_message, + ui_config=config.ui_agent, + group_chat_topic_type=config.group_chat_manager.topic_type, ) await group_chat_manager_runtime.stop_when_signal() Console().print("Manager left the chat!") -@cl.on_chat_start # type: ignore -async def start_chat(): +if __name__ == "__main__": set_all_log_levels(logging.ERROR) warnings.filterwarnings("ignore", category=UserWarning, message="Resolved model mismatch.*") asyncio.run(main(load_config())) - - -# This can be used for debugging, you can run this file using python -# if __name__ == "__main__": -# from chainlit.cli import run_chainlit - -# set_all_log_levels(logging.ERROR) -# run_chainlit( -# __file__, -# ) diff --git a/python/packages/autogen-core/samples/distributed-group-chat/run_ui.py b/python/packages/autogen-core/samples/distributed-group-chat/run_ui.py new file mode 100644 index 000000000..aeb24e8d1 --- /dev/null +++ b/python/packages/autogen-core/samples/distributed-group-chat/run_ui.py @@ -0,0 +1,67 @@ +import asyncio +import logging +import warnings + +import chainlit as cl # type: ignore [reportUnknownMemberType] # This dependency is installed through instructions +from _agents import MessageChunk, UIAgent +from _types import AppConfig, GroupChatMessage, RequestToSpeak +from _utils import get_serializers, load_config, set_all_log_levels +from autogen_core.application import WorkerAgentRuntime +from autogen_core.components import ( + TypeSubscription, +) +from chainlit import Message # type: ignore [reportAttributeAccessIssue] +from rich.console import Console +from rich.markdown import Markdown + +set_all_log_levels(logging.ERROR) + + +message_chunks: dict[str, Message] = {} # type: ignore [reportUnknownVariableType] + + +async def send_cl_stream(msg: MessageChunk) -> None: + if msg.message_id not in message_chunks: + message_chunks[msg.message_id] = Message(content="", author=msg.author) + + if not msg.finished: + await message_chunks[msg.message_id].stream_token(msg.text) # type: ignore [reportUnknownVariableType] + else: + await message_chunks[msg.message_id].stream_token(msg.text) # type: ignore [reportUnknownVariableType] + await message_chunks[msg.message_id].update() # type: ignore [reportUnknownMemberType] + await asyncio.sleep(3) + cl_msg = message_chunks[msg.message_id] # type: ignore [reportUnknownVariableType] + await cl_msg.send() # type: ignore [reportUnknownMemberType] + + +async def main(config: AppConfig): + set_all_log_levels(logging.ERROR) + ui_agent_runtime = WorkerAgentRuntime(host_address=config.host.address) + + ui_agent_runtime.add_message_serializer(get_serializers([RequestToSpeak, GroupChatMessage, MessageChunk])) # type: ignore[arg-type] + + Console().print(Markdown("Starting **`UI Agent`**")) + ui_agent_runtime.start() + set_all_log_levels(logging.ERROR) + + ui_agent_type = await UIAgent.register( + ui_agent_runtime, + "ui_agent", + lambda: UIAgent( + on_message_chunk_func=send_cl_stream, + ), + ) + + await ui_agent_runtime.add_subscription( + TypeSubscription(topic_type=config.ui_agent.topic_type, agent_type=ui_agent_type.type) + ) # TODO: This could be a great example of using agent_id to route to sepecific element in the ui. Can replace MessageChunk.message_id + + await ui_agent_runtime.stop_when_signal() + Console().print("UI Agent left the chat!") + + +@cl.on_chat_start # type: ignore +async def start_chat(): + set_all_log_levels(logging.ERROR) + warnings.filterwarnings("ignore", category=UserWarning, message="Resolved model mismatch.*") + asyncio.run(main(load_config())) diff --git a/python/packages/autogen-core/samples/distributed-group-chat/run_writer_agent.py b/python/packages/autogen-core/samples/distributed-group-chat/run_writer_agent.py index 1c6935de3..674ed59ce 100644 --- a/python/packages/autogen-core/samples/distributed-group-chat/run_writer_agent.py +++ b/python/packages/autogen-core/samples/distributed-group-chat/run_writer_agent.py @@ -3,7 +3,7 @@ import logging import warnings from _agents import BaseGroupChatAgent -from _types import AppConfig, GroupChatMessage, RequestToSpeak +from _types import AppConfig, GroupChatMessage, MessageChunk, RequestToSpeak from _utils import get_serializers, load_config, set_all_log_levels from autogen_core.application import WorkerAgentRuntime from autogen_core.components import ( @@ -17,7 +17,7 @@ from rich.markdown import Markdown async def main(config: AppConfig) -> None: set_all_log_levels(logging.ERROR) writer_agent_runtime = WorkerAgentRuntime(host_address=config.host.address) - writer_agent_runtime.add_message_serializer(get_serializers([RequestToSpeak, GroupChatMessage])) # type: ignore[arg-type] + writer_agent_runtime.add_message_serializer(get_serializers([RequestToSpeak, GroupChatMessage, MessageChunk])) # type: ignore[arg-type] await asyncio.sleep(3) Console().print(Markdown("Starting **`Writer Agent`**")) @@ -30,6 +30,7 @@ async def main(config: AppConfig) -> None: group_chat_topic_type=config.group_chat_manager.topic_type, system_message=config.writer_agent.system_message, model_client=AzureOpenAIChatCompletionClient(**config.client_config), + ui_config=config.ui_agent, ), ) await writer_agent_runtime.add_subscription(