import argparse import asyncio import json import logging import os import sys from agnext.application import SingleThreadedAgentRuntime from agnext.components import TypeRoutedAgent, message_handler from agnext.components.memory import ChatMemory from agnext.components.models import ChatCompletionClient, SystemMessage from agnext.core import AgentRuntime, CancellationToken sys.path.append(os.path.abspath(os.path.dirname(__file__))) sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) from common.memory import BufferedChatMemory from common.types import Message, TextMessage from common.utils import convert_messages_to_llm_messages, get_chat_completion_client_from_envs from utils import TextualChatApp, TextualUserAgent, start_runtime # Define a custom agent that can handle chat room messages. class ChatRoomAgent(TypeRoutedAgent): def __init__( self, name: str, description: str, background_story: str, memory: ChatMemory[Message], model_client: ChatCompletionClient, ) -> None: super().__init__(description) system_prompt = f"""Your name is {name}. Your background story is: {background_story} Now you are in a chat room with other users. You can send messages to the chat room by typing your message below. You do not need to respond to every message. Use the following JSON format to provide your thought on the latest message and choose whether to respond: {{ "thought": "Your thought on the message", "respond": , "response": "Your response to the message or None if you choose not to respond." }} """ self._system_messages = [SystemMessage(system_prompt)] self._memory = memory self._client = model_client @message_handler() async def on_chat_room_message(self, message: TextMessage, cancellation_token: CancellationToken) -> None: # Save the message to memory as structured JSON. from_message = TextMessage( content=json.dumps({"sender": message.source, "content": message.content}), source=message.source ) await self._memory.add_message(from_message) # Get a response from the model. raw_response = await self._client.create( self._system_messages + convert_messages_to_llm_messages(await self._memory.get_messages(), self_name=self.metadata["name"]), json_output=True, ) assert isinstance(raw_response.content, str) # Save the response to memory. await self._memory.add_message(TextMessage(source=self.metadata["name"], content=raw_response.content)) # Parse the response. data = json.loads(raw_response.content) respond = data.get("respond") response = data.get("response") # Publish the response if needed. if respond is True or str(respond).lower().strip() == "true": await self.publish_message(TextMessage(source=self.metadata["name"], content=str(response))) class ChatRoomUserAgent(TextualUserAgent): """An agent that is used to receive messages from the runtime.""" @message_handler async def on_chat_room_message(self, message: TextMessage, cancellation_token: CancellationToken) -> None: await self._app.post_runtime_message(message) # Define a chat room with participants -- the runtime is the chat room. def chat_room(runtime: AgentRuntime, app: TextualChatApp) -> None: runtime.register( "User", lambda: ChatRoomUserAgent( description="The user in the chat room.", app=app, ), ) alice = runtime.register_and_get_proxy( "Alice", lambda rt, id: ChatRoomAgent( name=id.name, description="Alice in the chat room.", background_story="Alice is a software engineer who loves to code.", memory=BufferedChatMemory(buffer_size=10), model_client=get_chat_completion_client_from_envs(model="gpt-4-turbo"), ), ) bob = runtime.register_and_get_proxy( "Bob", lambda rt, id: ChatRoomAgent( name=id.name, description="Bob in the chat room.", background_story="Bob is a data scientist who loves to analyze data.", memory=BufferedChatMemory(buffer_size=10), model_client=get_chat_completion_client_from_envs(model="gpt-4-turbo"), ), ) charlie = runtime.register_and_get_proxy( "Charlie", lambda rt, id: ChatRoomAgent( name=id.name, description="Charlie in the chat room.", background_story="Charlie is a designer who loves to create art.", memory=BufferedChatMemory(buffer_size=10), model_client=get_chat_completion_client_from_envs(model="gpt-4-turbo"), ), ) app.welcoming_notice = f"""Welcome to the chat room demo with the following participants: 1. 👧 {alice.id.name}: {alice.metadata['description']} 2. 👱🏼‍♂️ {bob.id.name}: {bob.metadata['description']} 3. 👨🏾‍🦳 {charlie.id.name}: {charlie.metadata['description']} Each participant decides on its own whether to respond to the latest message. You can greet the chat room by typing your first message below. """ async def main() -> None: runtime = SingleThreadedAgentRuntime() app = TextualChatApp(runtime, user_name="You") chat_room(runtime, app) asyncio.create_task(start_runtime(runtime)) await app.run_async() if __name__ == "__main__": parser = argparse.ArgumentParser(description="Chat room demo with self-driving AI agents.") parser.add_argument("--verbose", action="store_true", help="Enable verbose logging.") args = parser.parse_args() if args.verbose: logging.basicConfig(level=logging.WARNING) logging.getLogger("agnext").setLevel(logging.DEBUG) handler = logging.FileHandler("chat_room.log") logging.getLogger("agnext").addHandler(handler) asyncio.run(main())