Remove samples that have duplicates in documentation (#631)

This commit is contained in:
Eric Zhu 2024-09-25 04:26:26 -07:00 committed by GitHub
parent 09501a0134
commit 6c14c7859d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 16 additions and 1605 deletions

View File

@ -1,63 +1,17 @@
# Examples
This directory contains examples and demos of how to use AutoGen core.
- `common`: Contains common implementations and utilities used by the examples.
- `core`: Contains examples that illustrate the core concepts of AutoGen core.
- `tool-use`: Contains examples that illustrate tool use in AutoGen core.
- `patterns`: Contains examples that illustrate how multi-agent patterns can be implemented in AutoGen core.
- `demos`: Contains interactive demos that showcase applications that can be built using AutoGen core.
This directory contains examples of how to use AutoGen core.
See [Running the examples](#running-the-examples) for instructions on how to run the examples.
## Core examples
We provide examples to illustrate the core concepts of AutoGen core: agents, runtime, and message passing.
- [`one_agent_direct.py`](core/one_agent_direct.py): A simple example of how to create a single agent powered by ChatCompletion model client. Communicate with the agent using direct communication.
- [`inner_outer_direct.py`](core/inner_outer_direct.py): A simple example of how to create an agent that calls an inner agent using direct communication.
- [`two_agents_pub_sub.py`](core/two_agents_pub_sub.py): An example of how to create two agents that communicate using broadcast communication (i.e., pub/sub).
## Tool use examples
We provide examples to illustrate how to use tools in AutoGen core:
- [`coding_direct.py`](tool-use/coding_direct.py): a code execution example with one agent that calls and executes tools to demonstrate tool use and reflection on tool use. This example uses direct communication.
- [`coding_pub_sub.py`](tool-use/coding_pub_sub.py): a code execution example with two agents, one for calling tool and one for executing the tool, to demonstrate tool use and reflection on tool use. This example uses broadcast communication.
- [`custom_tool_direct.py`](tool-use/custom_tool_direct.py): a custom function tool example with one agent that calls and executes tools to demonstrate tool use and reflection on tool use. This example uses direct communication.
- [`coding_direct_with_intercept.py`](tool-use/coding_direct_with_intercept.py): an example showing human-in-the-loop for approving or denying tool execution.
## Pattern examples
We provide examples to illustrate how multi-agent patterns can be implemented in AutoGen core:
- [`coder_executor.py`](patterns/coder_executor.py): An example of how to create a coder-executor reflection pattern. This example creates a plot of stock prices using the Yahoo Finance API.
- [`coder_reviewer.py`](patterns/coder_reviewer.py): An example of how to create a coder-reviewer reflection pattern.
- [`group_chat.py`](patterns/group_chat.py): An example of how to create a round-robin group chat among three agents.
- [`mixture_of_agents.py`](patterns/mixture_of_agents.py): An example of how to create a [mixture of agents](https://github.com/togethercomputer/moa).
- [`multi_agent_debate.py`](patterns/multi_agent_debate.py): An example of how to create a [sparse multi-agent debate](https://arxiv.org/abs/2406.11776) pattern.
## Demos
We provide interactive demos that showcase applications that can be built using AutoGen core:
- [`assistant.py`](demos/assistant.py): a demonstration of how to use the OpenAI Assistant API to create
- [`coding_pub_sub.py`](coding_pub_sub.py): a code execution example with two agents, one for calling tool and one for executing the tool, to demonstrate tool use and reflection on tool use. This example uses broadcast communication.
- [`coding_direct_with_intercept.py`](coding_direct_with_intercept.py): an example showing human-in-the-loop for approving or denying tool execution.
- [`mixture_of_agents.py`](mixture_of_agents.py): An example of how to create a [mixture of agents](https://github.com/togethercomputer/moa).
- [`multi_agent_debate.py`](multi_agent_debate.py): An example of how to create a [sparse multi-agent debate](https://arxiv.org/abs/2406.11776) pattern.
- [`assistant.py`](assistant.py): a demonstration of how to use the OpenAI Assistant API to create
a ChatGPT agent.
- [`chat_room.py`](demos/chat_room.py): An example of how to create a chat room of custom agents without
a centralized orchestrator.
- [`illustrator_critics.py`](demos/illustrator_critics.py): a demo that uses an illustrator, critics and descriptor agent
to implement the reflection pattern for image generation.
- [`software_consultancy.py`](demos/software_consultancy.py): a demonstration of multi-agent interaction using
the group chat pattern.
- [`chest_game.py`](demos/chess_game.py): an example with two chess player agents that executes its own tools to demonstrate tool use and reflection on tool use.
- [`slow_human_in_loop.py`](demos/slow_human_in_loop.py): an example showing human-in-the-loop which waits for human input before making the tool call.
## Bring Your Own Agent
We provide examples on how to integrate other agents with the platform:
- [`llamaindex_agent.py`](byoa/llamaindex_agent.py): An example that shows how to consume a LlamaIndex agent.
- [`langgraph_agent.py`](byoa/langgraph_agent.py): An example that shows how to consume a LangGraph agent.
- [`chest_game.py`](chess_game.py): an example with two chess player agents that executes its own tools to demonstrate tool use and reflection on tool use.
- [`slow_human_in_loop.py`](slow_human_in_loop.py): an example showing human-in-the-loop which waits for human input before making the tool call.
## Running the examples

View File

@ -6,28 +6,23 @@ import asyncio
import logging
import os
import re
import sys
from typing import List
import aiofiles
import openai
from autogen_core.application import SingleThreadedAgentRuntime
from autogen_core.base import AgentId, AgentRuntime, MessageContext
from autogen_core.base import AgentId, AgentInstantiationContext, AgentRuntime, MessageContext
from autogen_core.components import DefaultSubscription, DefaultTopicId, RoutedAgent, message_handler
from autogen_core.components.model_context import BufferedChatCompletionContext
from common.agents import OpenAIAssistantAgent
from common.patterns._group_chat_manager import GroupChatManager
from common.types import PublishNow, TextMessage
from openai import AsyncAssistantEventHandler
from openai.types.beta.thread import ToolResources
from openai.types.beta.threads import Message, Text, TextDelta
from openai.types.beta.threads.runs import RunStep, RunStepDelta
from typing_extensions import override
sys.path.append(os.path.join(os.path.dirname(__file__), ".."))
from autogen_core.base import AgentInstantiationContext
from common.agents import OpenAIAssistantAgent
from common.patterns._group_chat_manager import GroupChatManager
from common.types import PublishNow, TextMessage
sep = "-" * 50

View File

@ -1,136 +0,0 @@
"""
This example demonstrates how to create an AI agent using LangGraph.
Based on the example in the LangGraph documentation:
https://langchain-ai.github.io/langgraph/
"""
import asyncio
from dataclasses import dataclass
from typing import Any, Callable, List, Literal
from autogen_core.application import SingleThreadedAgentRuntime
from autogen_core.base import AgentId, MessageContext
from autogen_core.components import RoutedAgent, message_handler
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.tools import tool # pyright: ignore
from langchain_openai import ChatOpenAI
from langgraph.graph import END, MessagesState, StateGraph
from langgraph.prebuilt import ToolNode
@dataclass
class Message:
content: str
# Define the tools for the agent to use
@tool # pyright: ignore
def get_weather(location: str) -> str:
"""Call to surf the web."""
# This is a placeholder, but don't tell the LLM that...
if "sf" in location.lower() or "san francisco" in location.lower():
return "It's 60 degrees and foggy."
return "It's 90 degrees and sunny."
# Define the tool-use agent using LangGraph.
class LangGraphToolUseAgent(RoutedAgent):
def __init__(self, description: str, model: ChatOpenAI, tools: List[Callable[..., Any]]) -> None: # pyright: ignore
super().__init__(description)
self._model = model.bind_tools(tools) # pyright: ignore
# Define the function that determines whether to continue or not
def should_continue(state: MessagesState) -> Literal["tools", END]: # type: ignore
messages = state["messages"]
last_message = messages[-1]
# If the LLM makes a tool call, then we route to the "tools" node
if last_message.tool_calls: # type: ignore
return "tools"
# Otherwise, we stop (reply to the user)
return END
# Define the function that calls the model
async def call_model(state: MessagesState): # type: ignore
messages = state["messages"]
response = await self._model.ainvoke(messages)
# We return a list, because this will get added to the existing list
return {"messages": [response]}
tool_node = ToolNode(tools) # pyright: ignore
# Define a new graph
self._workflow = StateGraph(MessagesState)
# Define the two nodes we will cycle between
self._workflow.add_node("agent", call_model) # pyright: ignore
self._workflow.add_node("tools", tool_node) # pyright: ignore
# Set the entrypoint as `agent`
# This means that this node is the first one called
self._workflow.set_entry_point("agent")
# We now add a conditional edge
self._workflow.add_conditional_edges(
# First, we define the start node. We use `agent`.
# This means these are the edges taken after the `agent` node is called.
"agent",
# Next, we pass in the function that will determine which node is called next.
should_continue, # type: ignore
)
# We now add a normal edge from `tools` to `agent`.
# This means that after `tools` is called, `agent` node is called next.
self._workflow.add_edge("tools", "agent")
# Finally, we compile it!
# This compiles it into a LangChain Runnable,
# meaning you can use it as you would any other runnable.
# Note that we're (optionally) passing the memory when compiling the graph
self._app = self._workflow.compile()
@message_handler
async def handle_user_message(self, message: Message, ctx: MessageContext) -> Message:
# Use the Runnable
final_state = await self._app.ainvoke(
{
"messages": [
SystemMessage(
content="You are a helpful AI assistant. You can use tools to help answer questions."
),
HumanMessage(content=message.content),
]
},
config={"configurable": {"thread_id": 42}},
)
response = Message(content=final_state["messages"][-1].content)
return response
async def main() -> None:
# Create runtime.
runtime = SingleThreadedAgentRuntime()
# Register the agent.
await runtime.register(
"langgraph_tool_use_agent",
lambda: LangGraphToolUseAgent(
"Tool use agent",
ChatOpenAI(model="gpt-4o-mini"),
[get_weather],
),
)
agent = AgentId("langgraph_tool_use_agent", key="default")
# Start the runtime.
runtime.start()
# Send a message to the agent and get a response.
response = await runtime.send_message(Message("What's the weather in SF?"), agent)
print(response.content)
# Stop the runtime.
await runtime.stop()
if __name__ == "__main__":
import logging
logging.basicConfig(level=logging.WARNING)
logging.getLogger("autogen_core").setLevel(logging.DEBUG)
asyncio.run(main())

View File

@ -1,148 +0,0 @@
"""
This example shows how integrate llamaindex agent.
"""
import asyncio
import os
from dataclasses import dataclass
from typing import List, Optional
from autogen_core.application import SingleThreadedAgentRuntime
from autogen_core.base import AgentId, MessageContext
from autogen_core.components import RoutedAgent, message_handler
from llama_index.core import Settings
from llama_index.core.agent import ReActAgent
from llama_index.core.agent.runner.base import AgentRunner
from llama_index.core.base.llms.types import (
ChatMessage,
MessageRole,
)
from llama_index.core.chat_engine.types import AgentChatResponse
from llama_index.core.memory import ChatSummaryMemoryBuffer
from llama_index.core.memory.types import BaseMemory
from llama_index.embeddings.azure_openai import AzureOpenAIEmbedding
from llama_index.llms.azure_openai import AzureOpenAI
from llama_index.tools.wikipedia import WikipediaToolSpec
@dataclass
class Resource:
content: str
node_id: str
score: Optional[float] = None
@dataclass
class Message:
content: str
sources: Optional[List[Resource]] = None
class LlamaIndexAgent(RoutedAgent):
def __init__(self, description: str, llama_index_agent: AgentRunner, memory: BaseMemory | None = None) -> None:
super().__init__(description)
self._llama_index_agent = llama_index_agent
self._memory = memory
@message_handler
async def handle_user_message(self, message: Message, ctx: MessageContext) -> Message:
# retriever history messages from memory!
history_messages: List[ChatMessage] = []
# type: ignore
# pyright: ignore
response: AgentChatResponse # pyright: ignore
if self._memory is not None:
history_messages = self._memory.get(input=message.content)
response = await self._llama_index_agent.achat(message=message.content, history_messages=history_messages) # pyright: ignore
else:
response = await self._llama_index_agent.achat(message=message.content) # pyright: ignore
if isinstance(response, AgentChatResponse):
if self._memory is not None:
self._memory.put(ChatMessage(role=MessageRole.USER, content=message.content))
self._memory.put(ChatMessage(role=MessageRole.ASSISTANT, content=response.response))
assert isinstance(response.response, str)
resources: List[Resource] = [
Resource(content=source_node.get_text(), score=source_node.score, node_id=source_node.id_)
for source_node in response.source_nodes
]
tools: List[Resource] = [
Resource(content=source.content, node_id=source.tool_name) for source in response.sources
]
resources.extend(tools)
return Message(content=response.response, sources=resources)
else:
return Message(content="I'm sorry, I don't have an answer for you.")
async def main() -> None:
runtime = SingleThreadedAgentRuntime()
# setup llamaindex
llm = AzureOpenAI(
deployment_name=os.environ.get("AZURE_OPENAI_MODEL", ""),
temperature=0.0,
api_key=os.environ.get("AZURE_OPENAI_KEY", ""),
azure_endpoint=os.environ.get("AZURE_OPENAI_ENDPOINT", ""),
api_version=os.environ.get("AZURE_OPENAI_API_VERSION", ""),
)
embed_model = AzureOpenAIEmbedding(
deployment_name=os.environ.get("AZURE_OPENAI_EMBEDDING_MODEL", ""),
temperature=0.0,
api_key=os.environ.get("AZURE_OPENAI_KEY", ""),
azure_endpoint=os.environ.get("AZURE_OPENAI_ENDPOINT", ""),
api_version=os.environ.get("AZURE_OPENAI_API_VERSION", ""),
)
Settings.llm = llm
Settings.embed_model = embed_model
# create a react agent to use wikipedia tool
# Get the wikipedia tool spec for llamaindex agents
wiki_spec = WikipediaToolSpec()
wikipedia_tool = wiki_spec.to_tool_list()[1]
# create a memory buffer for the react agent
memory = ChatSummaryMemoryBuffer(llm=llm, token_limit=16000)
# create the agent using the ReAct agent pattern
llama_index_agent = ReActAgent.from_tools(
tools=[wikipedia_tool], llm=llm, max_iterations=8, memory=memory, verbose=True
)
await runtime.register(
"chat_agent",
lambda: LlamaIndexAgent("Chat agent", llama_index_agent=llama_index_agent),
)
agent = AgentId("chat_agent", key="default")
runtime.start()
# Send a message to the agent and get the response.
message = Message(content="What are the best movies from studio Ghibli?")
response = await runtime.send_message(message, agent)
assert isinstance(response, Message)
print(response.content)
if response.sources is not None:
for source in response.sources:
print(source.content)
await runtime.stop()
if __name__ == "__main__":
import logging
logging.basicConfig(level=logging.WARNING)
logging.getLogger("autogen_core").setLevel(logging.DEBUG)
asyncio.run(main())

View File

@ -5,22 +5,16 @@ and make moves, and using a group chat manager to orchestrate the conversation."
import argparse
import asyncio
import logging
import os
import sys
from typing import Annotated, Literal
from autogen_core.application import SingleThreadedAgentRuntime
from autogen_core.base import AgentInstantiationContext, AgentRuntime
from autogen_core.base import AgentId, AgentInstantiationContext, AgentRuntime
from autogen_core.components import DefaultSubscription, DefaultTopicId
from autogen_core.components.model_context import BufferedChatCompletionContext
from autogen_core.components.models import SystemMessage
from autogen_core.components.tools import FunctionTool
from chess import BLACK, SQUARE_NAMES, WHITE, Board, Move
from chess import piece_name as get_piece_name
sys.path.append(os.path.join(os.path.dirname(__file__), ".."))
from autogen_core.base import AgentId
from common.agents._chat_completion_agent import ChatCompletionAgent
from common.patterns._group_chat_manager import GroupChatManager
from common.types import TextMessage

View File

@ -6,8 +6,6 @@ before it is sent out, and prompt the user for permission to execute the tool.
"""
import asyncio
import os
import sys
from typing import Any, List
from autogen_core.application import SingleThreadedAgentRuntime
@ -18,9 +16,6 @@ from autogen_core.components.code_executor import DockerCommandLineCodeExecutor
from autogen_core.components.models import SystemMessage
from autogen_core.components.tool_agent import ToolAgent, ToolException
from autogen_core.components.tools import PythonCodeExecutionTool, Tool
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from coding_direct import Message, ToolUseAgent
from common.utils import get_chat_completion_client_from_envs

View File

@ -13,13 +13,12 @@ the results back to the tool use agent.
import asyncio
import json
import os
import sys
import uuid
from dataclasses import dataclass
from typing import Dict, List
from autogen_core.application import SingleThreadedAgentRuntime
from autogen_core.base import MessageContext
from autogen_core.components import DefaultSubscription, DefaultTopicId, FunctionCall, RoutedAgent, message_handler
from autogen_core.components.code_executor import DockerCommandLineCodeExecutor
from autogen_core.components.models import (
@ -32,10 +31,6 @@ from autogen_core.components.models import (
UserMessage,
)
from autogen_core.components.tools import PythonCodeExecutionTool, Tool
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from autogen_core.base import MessageContext
from common.utils import get_chat_completion_client_from_envs

View File

@ -1,64 +0,0 @@
"""
This example shows how to use direct messaging to implement
a simple interaction between an inner and an outer agent.
1. The outer agent receives a message, sends a message to the inner agent.
2. The inner agent receives the message, processes it, and sends a response to the outer agent.
3. The outer agent receives the response and processes it, and returns the final response.
"""
import asyncio
import logging
from dataclasses import dataclass
from autogen_core.application import SingleThreadedAgentRuntime
from autogen_core.base import AgentId, AgentInstantiationContext, MessageContext
from autogen_core.components import RoutedAgent, message_handler
@dataclass
class MessageType:
body: str
sender: str
class Inner(RoutedAgent):
def __init__(self) -> None:
super().__init__("The inner agent")
@message_handler()
async def on_new_message(self, message: MessageType, ctx: MessageContext) -> MessageType:
return MessageType(body=f"Inner: {message.body}", sender=self.metadata["type"])
class Outer(RoutedAgent):
def __init__(self, inner: AgentId) -> None:
super().__init__("The outer agent")
self._inner = inner
@message_handler()
async def on_new_message(self, message: MessageType, ctx: MessageContext) -> MessageType:
inner_response = self.send_message(message, self._inner)
inner_message = await inner_response
assert isinstance(inner_message, MessageType)
return MessageType(body=f"Outer: {inner_message.body}", sender=self.metadata["type"])
async def main() -> None:
runtime = SingleThreadedAgentRuntime()
await runtime.register("inner", Inner)
await runtime.register("outer", lambda: Outer(AgentId("inner", AgentInstantiationContext.current_agent_id().key)))
outer = AgentId("outer", "default")
runtime.start()
response = await runtime.send_message(MessageType(body="Hello", sender="external"), outer)
print(response)
await runtime.stop()
if __name__ == "__main__":
import logging
logging.basicConfig(level=logging.WARNING)
logging.getLogger("autogen_core").setLevel(logging.DEBUG)
asyncio.run(main())

View File

@ -1,71 +0,0 @@
"""
This example shows how to use direct messaging to implement
a simple chat completion agent.
The agent receives a message from the main function, sends it to the
chat completion model, and returns the response to the main function.
"""
import asyncio
import os
import sys
from dataclasses import dataclass
from autogen_core.application import SingleThreadedAgentRuntime
from autogen_core.base import AgentId
from autogen_core.components import RoutedAgent, message_handler
from autogen_core.components.models import (
ChatCompletionClient,
SystemMessage,
UserMessage,
)
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from autogen_core.base import MessageContext
from common.utils import get_chat_completion_client_from_envs
@dataclass
class Message:
content: str
class ChatCompletionAgent(RoutedAgent):
def __init__(self, description: str, model_client: ChatCompletionClient) -> None:
super().__init__(description)
self._system_messages = [SystemMessage("You are a helpful AI assistant.")]
self._model_client = model_client
@message_handler
async def handle_user_message(self, message: Message, ctx: MessageContext) -> Message:
user_message = UserMessage(content=message.content, source="User")
response = await self._model_client.create(self._system_messages + [user_message])
assert isinstance(response.content, str)
return Message(content=response.content)
async def main() -> None:
runtime = SingleThreadedAgentRuntime()
await runtime.register(
"chat_agent",
lambda: ChatCompletionAgent("Chat agent", get_chat_completion_client_from_envs(model="gpt-4o-mini")),
)
agent = AgentId("chat_agent", "default")
runtime.start()
# Send a message to the agent and get the response.
message = Message(content="Hello, what are some fun things to do in Seattle?")
response = await runtime.send_message(message, agent)
assert isinstance(response, Message)
print(response.content)
await runtime.stop()
if __name__ == "__main__":
import logging
logging.basicConfig(level=logging.WARNING)
logging.getLogger("autogen_core").setLevel(logging.DEBUG)
asyncio.run(main())

View File

@ -1,127 +0,0 @@
"""
This example shows how to use publish-subscribe to implement a simple
interaction between two agents that use a chat completion model to respond to messages.
1. The main function sends a message to Jack to start the conversation.
2. The Jack agent receives the message, generates a response using a chat completion model,
and publishes the response.
3. The Cathy agent receives the message, generates a response using a chat completion model,
and publishes the response.
4. The conversation continues until a message with termination word is received by any agent.
"""
import asyncio
import os
import sys
from dataclasses import dataclass
from typing import List
from autogen_core.application import SingleThreadedAgentRuntime
from autogen_core.base import AgentId, MessageContext
from autogen_core.components import DefaultSubscription, DefaultTopicId, RoutedAgent, message_handler
from autogen_core.components.models import (
AssistantMessage,
ChatCompletionClient,
LLMMessage,
SystemMessage,
UserMessage,
)
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from common.utils import get_chat_completion_client_from_envs
@dataclass
class Message:
source: str
content: str
class ChatCompletionAgent(RoutedAgent):
"""An agent that uses a chat completion model to respond to messages.
It keeps a memory of the conversation and uses it to generate responses.
It publishes a termination message when the termination word is mentioned."""
def __init__(
self,
description: str,
system_messages: List[SystemMessage],
model_client: ChatCompletionClient,
termination_word: str,
) -> None:
super().__init__(description)
self._system_messages = system_messages
self._model_client = model_client
self._memory: List[Message] = []
self._termination_word = termination_word
@message_handler
async def handle_message(self, message: Message, ctx: MessageContext) -> None:
self._memory.append(message)
if self._termination_word in message.content:
return
llm_messages: List[LLMMessage] = []
for m in self._memory[-10:]:
if m.source == self.metadata["type"]:
llm_messages.append(AssistantMessage(content=m.content, source=self.metadata["type"]))
else:
llm_messages.append(UserMessage(content=m.content, source=m.source))
response = await self._model_client.create(self._system_messages + llm_messages)
assert isinstance(response.content, str)
await self.publish_message(
Message(content=response.content, source=self.metadata["type"]), topic_id=DefaultTopicId()
)
async def main() -> None:
# Create the runtime.
runtime = SingleThreadedAgentRuntime()
# Register the agents.
await runtime.register(
"Jack",
lambda: ChatCompletionAgent(
description="Jack a comedian",
model_client=get_chat_completion_client_from_envs(model="gpt-4o-mini"),
system_messages=[
SystemMessage(
"You are a comedian that likes to make jokes. " "After multiple turns, respond with 'TERMINATE'"
)
],
termination_word="TERMINATE",
),
lambda: [DefaultSubscription()],
)
await runtime.register(
"Cathy",
lambda: ChatCompletionAgent(
description="Cathy a poet",
model_client=get_chat_completion_client_from_envs(model="gpt-4o-mini"),
system_messages=[
SystemMessage(
"You are a poet likes that to write poems. " "After multiple turns, respond with 'TERMINATE'"
)
],
termination_word="TERMINATE",
),
lambda: [DefaultSubscription()],
)
runtime.start()
# Send a message to Jack to start the conversation.
message = Message(content="Can you tell me something fun about SF?", source="User")
await runtime.send_message(message, AgentId("Jack", "default"))
# Process messages.
await runtime.stop_when_idle()
if __name__ == "__main__":
import logging
logging.basicConfig(level=logging.WARNING)
logging.getLogger("autogen_core").setLevel(logging.DEBUG)
asyncio.run(main())

View File

@ -1,190 +0,0 @@
import asyncio
import os
import random
import sys
from asyncio import Future
from autogen_core.base import AgentRuntime, CancellationToken
from autogen_core.components import DefaultTopicId, Image, RoutedAgent, message_handler
from textual.app import App, ComposeResult
from textual.containers import ScrollableContainer
from textual.widgets import Button, Footer, Header, Input, Markdown, Static
from textual_imageview.viewer import ImageViewer
sys.path.append(os.path.join(os.path.dirname(__file__), ".."))
from common.types import (
MultiModalMessage,
PublishNow,
RespondNow,
TextMessage,
ToolApprovalRequest,
ToolApprovalResponse,
)
class ChatAppMessage(Static):
def __init__(self, message: TextMessage | MultiModalMessage) -> None: # type: ignore
self._message = message
super().__init__()
def on_mount(self) -> None:
self.styles.margin = 1
self.styles.padding = 1
self.styles.border = ("solid", "blue")
def compose(self) -> ComposeResult:
if isinstance(self._message, TextMessage):
yield Markdown(f"{self._message.source}:")
yield Markdown(self._message.content)
else:
yield Markdown(f"{self._message.source}:")
for content in self._message.content:
if isinstance(content, str):
yield Markdown(content)
elif isinstance(content, Image):
viewer = ImageViewer(content.image)
viewer.styles.min_width = 50
viewer.styles.min_height = 50
yield viewer
class WelcomeMessage(Static):
def on_mount(self) -> None:
self.styles.margin = 1
self.styles.padding = 1
self.styles.border = ("solid", "blue")
class ChatInput(Input):
def on_mount(self) -> None:
self.focus()
def on_input_submitted(self, event: Input.Submitted) -> None:
self.clear()
class ToolApprovalRequestNotice(Static):
def __init__(self, request: ToolApprovalRequest, response_future: Future[ToolApprovalResponse]) -> None: # type: ignore
self._tool_call = request.tool_call
self._future = response_future
super().__init__()
def compose(self) -> ComposeResult:
yield Static(f"Tool call: {self._tool_call.name}, arguments: {self._tool_call.arguments[:50]}")
yield Button("Approve", id="approve", variant="warning")
yield Button("Deny", id="deny", variant="default")
def on_mount(self) -> None:
self.styles.margin = 1
self.styles.padding = 1
self.styles.border = ("solid", "red")
def on_button_pressed(self, event: Button.Pressed) -> None:
button_id = event.button.id
assert button_id is not None
if button_id == "approve":
self._future.set_result(ToolApprovalResponse(tool_call_id=self._tool_call.id, approved=True, reason=""))
else:
self._future.set_result(ToolApprovalResponse(tool_call_id=self._tool_call.id, approved=False, reason=""))
self.remove()
class TextualChatApp(App): # type: ignore
"""A Textual app for a chat interface."""
def __init__(self, runtime: AgentRuntime, welcoming_notice: str | None = None, user_name: str = "User") -> None: # type: ignore
self._runtime = runtime
self._welcoming_notice = welcoming_notice
self._user_name = user_name
super().__init__()
def compose(self) -> ComposeResult:
yield Header()
yield Footer()
yield ScrollableContainer(id="chat-messages")
yield ChatInput()
def on_mount(self) -> None:
if self._welcoming_notice is not None:
chat_messages = self.query_one("#chat-messages")
notice = WelcomeMessage(self._welcoming_notice, id="welcome")
chat_messages.mount(notice)
@property
def welcoming_notice(self) -> str | None:
return self._welcoming_notice
@welcoming_notice.setter
def welcoming_notice(self, value: str) -> None:
self._welcoming_notice = value
async def on_input_submitted(self, event: Input.Submitted) -> None:
user_input = event.value
await self.publish_user_message(user_input)
async def post_request_user_input_notice(self) -> None:
chat_messages = self.query_one("#chat-messages")
notice = Static("Please enter your input.", id="typing")
chat_messages.mount(notice)
notice.scroll_visible()
async def publish_user_message(self, user_input: str) -> None:
chat_messages = self.query_one("#chat-messages")
# Remove all typing messages.
chat_messages.query("#typing").remove()
# Publish the user message to the runtime.
await self._runtime.publish_message(
TextMessage(source=self._user_name, content=user_input), topic_id=DefaultTopicId()
)
async def post_runtime_message(self, message: TextMessage | MultiModalMessage) -> None: # type: ignore
"""Post a message from the agent runtime to the message list."""
chat_messages = self.query_one("#chat-messages")
msg = ChatAppMessage(message)
chat_messages.mount(msg)
msg.scroll_visible()
async def handle_tool_approval_request(self, message: ToolApprovalRequest) -> ToolApprovalResponse: # type: ignore
chat_messages = self.query_one("#chat-messages")
future: Future[ToolApprovalResponse] = asyncio.get_event_loop().create_future() # type: ignore
tool_call_approval_notice = ToolApprovalRequestNotice(message, future)
chat_messages.mount(tool_call_approval_notice)
tool_call_approval_notice.scroll_visible()
return await future
class TextualUserAgent(RoutedAgent): # type: ignore
"""An agent that is used to receive messages from the runtime."""
def __init__(self, description: str, app: TextualChatApp) -> None: # type: ignore
super().__init__(description)
self._app = app
@message_handler # type: ignore
async def on_text_message(self, message: TextMessage, cancellation_token: CancellationToken) -> None: # type: ignore
await self._app.post_runtime_message(message)
@message_handler # type: ignore
async def on_multi_modal_message(self, message: MultiModalMessage, cancellation_token: CancellationToken) -> None: # type: ignore
# Save the message to file.
# Generate a ramdom file name.
for content in message.content:
if isinstance(content, Image):
filename = f"{self.metadata['type']}_{message.source}_{random.randbytes(16).hex()}.png"
content.image.save(filename)
await self._app.post_runtime_message(message)
@message_handler # type: ignore
async def on_respond_now(self, message: RespondNow, cancellation_token: CancellationToken) -> None: # type: ignore
await self._app.post_request_user_input_notice()
@message_handler # type: ignore
async def on_publish_now(self, message: PublishNow, cancellation_token: CancellationToken) -> None: # type: ignore
await self._app.post_request_user_input_notice()
@message_handler # type: ignore
async def on_tool_approval_request(
self, message: ToolApprovalRequest, cancellation_token: CancellationToken
) -> ToolApprovalResponse:
return await self._app.handle_tool_approval_request(message)

View File

@ -8,8 +8,6 @@ The reference agents handle each task independently and return the results to th
"""
import asyncio
import os
import sys
import uuid
from dataclasses import dataclass
from typing import Dict, List
@ -18,9 +16,6 @@ from autogen_core.application import SingleThreadedAgentRuntime
from autogen_core.base import MessageContext
from autogen_core.components import DefaultSubscription, DefaultTopicId, RoutedAgent, message_handler
from autogen_core.components.models import ChatCompletionClient, SystemMessage, UserMessage
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from common.utils import get_chat_completion_client_from_envs

View File

@ -32,14 +32,13 @@ to sample a random number of neighbors' responses to use.
import asyncio
import logging
import os
import re
import sys
import uuid
from dataclasses import dataclass
from typing import Dict, List, Tuple
from autogen_core.application import SingleThreadedAgentRuntime
from autogen_core.base import MessageContext
from autogen_core.components import DefaultSubscription, DefaultTopicId, RoutedAgent, message_handler
from autogen_core.components.models import (
AssistantMessage,
@ -48,10 +47,6 @@ from autogen_core.components.models import (
SystemMessage,
UserMessage,
)
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from autogen_core.base import MessageContext
from common.utils import get_chat_completion_client_from_envs
logger = logging.getLogger(__name__)

View File

@ -1,218 +0,0 @@
"""
This example shows how to use publish-subscribe to implement
a simple interaction between a coder and an executor agent.
1. The coder agent receives a task message, generates a code block,
and publishes a code execution
task message.
2. The executor agent receives the code execution task message,
executes the code block, and publishes a code execution task result message.
3. The coder agent receives the code execution task result message, depending
on the result: if the task is completed, it publishes a task completion message;
otherwise, it generates a new code block and publishes a code execution task message.
4. The process continues until the coder agent publishes a task completion message.
"""
import asyncio
import os
import re
import sys
import uuid
from dataclasses import dataclass
from typing import Dict, List
from autogen_core.application import SingleThreadedAgentRuntime
from autogen_core.components import DefaultSubscription, DefaultTopicId, RoutedAgent, message_handler
from autogen_core.components.code_executor import CodeBlock, CodeExecutor, DockerCommandLineCodeExecutor
from autogen_core.components.models import (
AssistantMessage,
ChatCompletionClient,
LLMMessage,
SystemMessage,
UserMessage,
)
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from autogen_core.base import MessageContext
from common.utils import get_chat_completion_client_from_envs
@dataclass
class TaskMessage:
content: str
@dataclass
class TaskCompletion:
content: str
@dataclass
class CodeExecutionTask:
session_id: str
content: str
@dataclass
class CodeExecutionTaskResult:
session_id: str
output: str
exit_code: int
class Coder(RoutedAgent):
"""An agent that writes code."""
def __init__(
self,
model_client: ChatCompletionClient,
) -> None:
super().__init__(description="A Python coder assistant.")
self._model_client = model_client
self._system_messages = [
SystemMessage(
"""You are a helpful AI assistant.
Solve tasks using your coding and language skills.
In the following cases, suggest python code (in a python coding block) or shell script (in a sh coding block) for the user to execute.
1. When you need to collect info, use the code to output the info you need, for example, browse or search the web, download/read a file, print the content of a webpage or a file, get the current date/time, check the operating system. After sufficient info is printed and the task is ready to be solved based on your language skill, you can solve the task by yourself.
2. When you need to perform some task with code, use the code to perform the task and output the result. Finish the task smartly.
Solve the task step by step if you need to. If a plan is not provided, explain your plan first. Be clear which step uses code, and which step uses your language skill.
When using code, you must indicate the script type in the code block. The user cannot provide any other feedback or perform any other action beyond executing the code you suggest. The user can't modify your code. So do not suggest incomplete code which requires users to modify. Don't use a code block if it's not intended to be executed by the user.
If you want the user to save the code in a file before executing it, put # filename: <filename> inside the code block as the first line. Don't include multiple code blocks in one response. Do not ask users to copy and paste the result. Instead, use 'print' function for the output when relevant. Check the execution result returned by the user.
If the result indicates there is an error, fix the error and output the code again. Suggest the full code instead of partial code or code changes. If the error can't be fixed or if the task is not solved even after the code is executed successfully, analyze the problem, revisit your assumption, collect additional info you need, and think of a different approach to try.
When you find an answer, verify the answer carefully. Include verifiable evidence in your response if possible.
Reply "TERMINATE" in the end when everything is done."""
)
]
# A dictionary to store the messages for each task session.
self._session_memory: Dict[str, List[LLMMessage]] = {}
@message_handler
async def handle_task(self, message: TaskMessage, ctx: MessageContext) -> None:
# Create a new session.
session_id = str(uuid.uuid4())
self._session_memory.setdefault(session_id, []).append(UserMessage(content=message.content, source="user"))
# Make an inference to the model.
response = await self._model_client.create(self._system_messages + self._session_memory[session_id])
assert isinstance(response.content, str)
self._session_memory[session_id].append(
AssistantMessage(content=response.content, source=self.metadata["type"])
)
# Publish the code execution task.
await self.publish_message(
CodeExecutionTask(content=response.content, session_id=session_id),
cancellation_token=ctx.cancellation_token,
topic_id=DefaultTopicId(),
)
@message_handler
async def handle_code_execution_result(self, message: CodeExecutionTaskResult, ctx: MessageContext) -> None:
# Store the code execution output.
self._session_memory[message.session_id].append(UserMessage(content=message.output, source="user"))
# Make an inference to the model -- reflection on the code execution output happens here.
response = await self._model_client.create(self._system_messages + self._session_memory[message.session_id])
assert isinstance(response.content, str)
self._session_memory[message.session_id].append(
AssistantMessage(content=response.content, source=self.metadata["type"])
)
if "TERMINATE" in response.content:
# If the task is completed, publish a message with the completion content.
await self.publish_message(
TaskCompletion(content=response.content),
cancellation_token=ctx.cancellation_token,
topic_id=DefaultTopicId(),
)
print("--------------------")
print("Task completed:")
print(response.content)
return
# Publish the code execution task.
await self.publish_message(
CodeExecutionTask(content=response.content, session_id=message.session_id),
cancellation_token=ctx.cancellation_token,
topic_id=DefaultTopicId(),
)
class Executor(RoutedAgent):
"""An agent that executes code."""
def __init__(self, executor: CodeExecutor) -> None:
super().__init__(description="A code executor agent.")
self._executor = executor
@message_handler
async def handle_code_execution(self, message: CodeExecutionTask, ctx: MessageContext) -> None:
# Extract the code block from the message.
code_blocks = self._extract_code_blocks(message.content)
if not code_blocks:
# If no code block is found, publish a message with an error.
await self.publish_message(
CodeExecutionTaskResult(
output="Error: no Markdown code block found.", exit_code=1, session_id=message.session_id
),
cancellation_token=ctx.cancellation_token,
topic_id=DefaultTopicId(),
)
return
# Execute code blocks.
result = await self._executor.execute_code_blocks(
code_blocks=code_blocks, cancellation_token=ctx.cancellation_token
)
# Publish the code execution result.
await self.publish_message(
CodeExecutionTaskResult(output=result.output, exit_code=result.exit_code, session_id=message.session_id),
cancellation_token=ctx.cancellation_token,
topic_id=DefaultTopicId(),
)
def _extract_code_blocks(self, markdown_text: str) -> List[CodeBlock]:
pattern = re.compile(r"```(?:\s*([\w\+\-]+))?\n([\s\S]*?)```")
matches = pattern.findall(markdown_text)
code_blocks: List[CodeBlock] = []
for match in matches:
language = match[0].strip() if match[0] else ""
code_content = match[1]
code_blocks.append(CodeBlock(code=code_content, language=language))
return code_blocks
async def main(task: str, temp_dir: str) -> None:
# Create the runtime with the termination handler.
runtime = SingleThreadedAgentRuntime()
async with DockerCommandLineCodeExecutor(work_dir=temp_dir) as executor:
# Register the agents.
await runtime.register(
"coder",
lambda: Coder(model_client=get_chat_completion_client_from_envs(model="gpt-4-turbo")),
lambda: [DefaultSubscription()],
)
await runtime.register(
"executor",
lambda: Executor(executor=executor),
lambda: [DefaultSubscription()],
)
runtime.start()
# Publish the task message.
await runtime.publish_message(TaskMessage(content=task), topic_id=DefaultTopicId())
await runtime.stop_when_idle()
if __name__ == "__main__":
import logging
from datetime import datetime
logging.basicConfig(level=logging.WARNING)
logging.getLogger("autogen_core").setLevel(logging.DEBUG)
task = f"Today is {datetime.today()}, create a plot of NVDA and TSLA stock prices YTD using yfinance."
asyncio.run(main(task, "."))

View File

@ -1,295 +0,0 @@
"""
This example shows how to use publish-subscribe to implement
a simple interaction between a coder and a reviewer agent.
1. The coder agent receives a code writing task message, generates a code block,
and publishes a code review task message.
2. The reviewer agent receives the code review task message, reviews the code block,
and publishes a code review result message.
3. The coder agent receives the code review result message, depending on the result:
if the code is approved, it publishes a code writing result message; otherwise, it generates
a new code block and publishes a code review task message.
4. The process continues until the coder agent publishes a code writing result message.
"""
import asyncio
import json
import os
import re
import sys
import uuid
from dataclasses import dataclass
from typing import Dict, List, Union
from autogen_core.application import SingleThreadedAgentRuntime
from autogen_core.components import DefaultSubscription, DefaultTopicId, RoutedAgent, message_handler
from autogen_core.components.models import (
AssistantMessage,
ChatCompletionClient,
LLMMessage,
SystemMessage,
UserMessage,
)
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from autogen_core.base import MessageContext
from common.utils import get_chat_completion_client_from_envs
@dataclass
class CodeWritingTask:
task: str
@dataclass
class CodeWritingResult:
task: str
code: str
review: str
@dataclass
class CodeReviewTask:
session_id: str
code_writing_task: str
code_writing_scratchpad: str
code: str
@dataclass
class CodeReviewResult:
review: str
session_id: str
approved: bool
class ReviewerAgent(RoutedAgent):
"""An agent that performs code review tasks."""
def __init__(
self,
description: str,
model_client: ChatCompletionClient,
) -> None:
super().__init__(description)
self._system_messages = [
SystemMessage(
content="""You are a code reviewer. You focus on correctness, efficiency and safety of the code.
Respond using the following JSON format:
{
"correctness": "<Your comments>",
"efficiency": "<Your comments>",
"safety": "<Your comments>",
"approval": "<APPROVE or REVISE>",
"suggested_changes": "<Your comments>"
}
""",
)
]
self._model_client = model_client
@message_handler
async def handle_code_review_task(self, message: CodeReviewTask, ctx: MessageContext) -> None:
# Format the prompt for the code review.
prompt = f"""The problem statement is: {message.code_writing_task}
The code is:
```
{message.code}
```
Please review the code and provide feedback.
"""
# Generate a response using the chat completion API.
response = await self._model_client.create(
self._system_messages + [UserMessage(content=prompt, source=self.metadata["type"])]
)
assert isinstance(response.content, str)
# TODO: use structured generation library e.g. guidance to ensure the response is in the expected format.
# Parse the response JSON.
review = json.loads(response.content)
# Construct the review text.
review_text = "Code review:\n" + "\n".join([f"{k}: {v}" for k, v in review.items()])
approved = review["approval"].lower().strip() == "approve"
# Publish the review result.
await self.publish_message(
CodeReviewResult(
review=review_text,
approved=approved,
session_id=message.session_id,
),
topic_id=DefaultTopicId(),
)
class CoderAgent(RoutedAgent):
"""An agent that performs code writing tasks."""
def __init__(
self,
description: str,
model_client: ChatCompletionClient,
) -> None:
super().__init__(
description,
)
self._system_messages = [
SystemMessage(
content="""You are a proficient coder. You write code to solve problems.
Work with the reviewer to improve your code.
Always put all finished code in a single Markdown code block.
For example:
```python
def hello_world():
print("Hello, World!")
```
Respond using the following format:
Thoughts: <Your comments>
Code: <Your code>
""",
)
]
self._model_client = model_client
self._session_memory: Dict[str, List[CodeWritingTask | CodeReviewTask | CodeReviewResult]] = {}
@message_handler
async def handle_code_writing_task(
self,
message: CodeWritingTask,
ctx: MessageContext,
) -> None:
# Store the messages in a temporary memory for this request only.
session_id = str(uuid.uuid4())
self._session_memory.setdefault(session_id, []).append(message)
# Generate a response using the chat completion API.
response = await self._model_client.create(
self._system_messages + [UserMessage(content=message.task, source=self.metadata["type"])]
)
assert isinstance(response.content, str)
# Extract the code block from the response.
code_block = self._extract_code_block(response.content)
if code_block is None:
raise ValueError("Code block not found.")
# Create a code review task.
code_review_task = CodeReviewTask(
session_id=session_id,
code_writing_task=message.task,
code_writing_scratchpad=response.content,
code=code_block,
)
# Store the code review task in the session memory.
self._session_memory[session_id].append(code_review_task)
# Publish a code review task.
await self.publish_message(
code_review_task,
topic_id=DefaultTopicId(),
)
@message_handler
async def handle_code_review_result(self, message: CodeReviewResult, ctx: MessageContext) -> None:
# Store the review result in the session memory.
self._session_memory[message.session_id].append(message)
# Obtain the request from previous messages.
review_request = next(
m for m in reversed(self._session_memory[message.session_id]) if isinstance(m, CodeReviewTask)
)
assert review_request is not None
# Check if the code is approved.
if message.approved:
# Publish the code writing result.
await self.publish_message(
CodeWritingResult(
code=review_request.code,
task=review_request.code_writing_task,
review=message.review,
),
topic_id=DefaultTopicId(),
)
print("Code Writing Result:")
print("-" * 80)
print(f"Task:\n{review_request.code_writing_task}")
print("-" * 80)
print(f"Code:\n{review_request.code}")
print("-" * 80)
print(f"Review:\n{message.review}")
print("-" * 80)
else:
# Create a list of LLM messages to send to the model.
messages: List[LLMMessage] = [*self._system_messages]
for m in self._session_memory[message.session_id]:
if isinstance(m, CodeReviewResult):
messages.append(UserMessage(content=m.review, source="Reviewer"))
elif isinstance(m, CodeReviewTask):
messages.append(AssistantMessage(content=m.code_writing_scratchpad, source="Coder"))
elif isinstance(m, CodeWritingTask):
messages.append(UserMessage(content=m.task, source="User"))
else:
raise ValueError(f"Unexpected message type: {m}")
# Generate a revision using the chat completion API.
response = await self._model_client.create(messages)
assert isinstance(response.content, str)
# Extract the code block from the response.
code_block = self._extract_code_block(response.content)
if code_block is None:
raise ValueError("Code block not found.")
# Create a new code review task.
code_review_task = CodeReviewTask(
session_id=message.session_id,
code_writing_task=review_request.code_writing_task,
code_writing_scratchpad=response.content,
code=code_block,
)
# Store the code review task in the session memory.
self._session_memory[message.session_id].append(code_review_task)
# Publish a new code review task.
await self.publish_message(
code_review_task,
topic_id=DefaultTopicId(),
)
def _extract_code_block(self, markdown_text: str) -> Union[str, None]:
pattern = r"```(\w+)\n(.*?)\n```"
# Search for the pattern in the markdown text
match = re.search(pattern, markdown_text, re.DOTALL)
# Extract the language and code block if a match is found
if match:
return match.group(2)
return None
async def main() -> None:
runtime = SingleThreadedAgentRuntime()
await runtime.register(
"ReviewerAgent",
lambda: ReviewerAgent(
description="Code Reviewer",
model_client=get_chat_completion_client_from_envs(model="gpt-4o-mini"),
),
lambda: [DefaultSubscription()],
)
await runtime.register(
"CoderAgent",
lambda: CoderAgent(
description="Coder",
model_client=get_chat_completion_client_from_envs(model="gpt-4o-mini"),
),
lambda: [DefaultSubscription()],
)
runtime.start()
await runtime.publish_message(
message=CodeWritingTask(
task="Write a program to implement depth first search for a tree. Also implement the tree structure."
),
topic_id=DefaultTopicId(),
)
# Keep processing messages until idle.
await runtime.stop_when_idle()
if __name__ == "__main__":
import logging
logging.basicConfig(level=logging.WARNING)
logging.getLogger("autogen_core").setLevel(logging.DEBUG)
asyncio.run(main())

View File

@ -1,175 +0,0 @@
"""
This example shows how to use publish-subscribe to implement
a simple round-robin group chat among multiple agents:
each agent in the group chat takes turns speaking in a round-robin fashion.
The conversation ends after a specified number of rounds.
1. Upon receiving a message, the group chat manager selects the next speaker
in a round-robin fashion and sends a request to speak message to the selected speaker.
2. Upon receiving a request to speak message, the speaker generates a response
to the last message in the memory and publishes the response.
3. The conversation continues until the specified number of rounds is reached.
"""
import asyncio
import os
import sys
from dataclasses import dataclass
from typing import List
from autogen_core.application import SingleThreadedAgentRuntime
from autogen_core.base import AgentId, AgentInstantiationContext
from autogen_core.components import DefaultSubscription, DefaultTopicId, RoutedAgent, message_handler
from autogen_core.components.models import (
AssistantMessage,
ChatCompletionClient,
LLMMessage,
SystemMessage,
UserMessage,
)
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from autogen_core.base import MessageContext
from common.utils import get_chat_completion_client_from_envs
@dataclass
class Message:
source: str
content: str
@dataclass
class RequestToSpeak:
pass
@dataclass
class Termination:
pass
class RoundRobinGroupChatManager(RoutedAgent):
def __init__(
self,
description: str,
participants: List[AgentId],
num_rounds: int,
) -> None:
super().__init__(description)
self._participants = participants
self._num_rounds = num_rounds
self._round_count = 0
@message_handler
async def handle_message(self, message: Message, ctx: MessageContext) -> None:
# Select the next speaker in a round-robin fashion
speaker = self._participants[self._round_count % len(self._participants)]
self._round_count += 1
if self._round_count > self._num_rounds * len(self._participants):
# End the conversation after the specified number of rounds.
await self.publish_message(Termination(), DefaultTopicId())
return
# Send a request to speak message to the selected speaker.
await self.send_message(RequestToSpeak(), speaker)
class GroupChatParticipant(RoutedAgent):
def __init__(
self,
description: str,
system_messages: List[SystemMessage],
model_client: ChatCompletionClient,
) -> None:
super().__init__(description)
self._system_messages = system_messages
self._model_client = model_client
self._memory: List[Message] = []
@message_handler
async def handle_message(self, message: Message, ctx: MessageContext) -> None:
self._memory.append(message)
@message_handler
async def handle_request_to_speak(self, message: RequestToSpeak, ctx: MessageContext) -> None:
# Generate a response to the last message in the memory
if not self._memory:
return
llm_messages: List[LLMMessage] = []
for m in self._memory[-10:]:
if m.source == self.metadata["type"]:
llm_messages.append(AssistantMessage(content=m.content, source=self.metadata["type"]))
else:
llm_messages.append(UserMessage(content=m.content, source=m.source))
response = await self._model_client.create(self._system_messages + llm_messages)
assert isinstance(response.content, str)
speech = Message(content=response.content, source=self.metadata["type"])
self._memory.append(speech)
await self.publish_message(speech, topic_id=DefaultTopicId())
async def main() -> None:
# Create the runtime.
runtime = SingleThreadedAgentRuntime()
# Register the participants.
await runtime.register(
"DataScientist",
lambda: GroupChatParticipant(
description="A data scientist",
system_messages=[SystemMessage("You are a data scientist.")],
model_client=get_chat_completion_client_from_envs(model="gpt-4o-mini"),
),
lambda: [DefaultSubscription()],
)
await runtime.register(
"Engineer",
lambda: GroupChatParticipant(
description="An engineer",
system_messages=[SystemMessage("You are an engineer.")],
model_client=get_chat_completion_client_from_envs(model="gpt-4o-mini"),
),
lambda: [DefaultSubscription()],
)
await runtime.register(
"Artist",
lambda: GroupChatParticipant(
description="An artist",
system_messages=[SystemMessage("You are an artist.")],
model_client=get_chat_completion_client_from_envs(model="gpt-4o-mini"),
),
lambda: [DefaultSubscription()],
)
# Register the group chat manager.
await runtime.register(
"GroupChatManager",
lambda: RoundRobinGroupChatManager(
description="A group chat manager",
participants=[
AgentId("DataScientist", AgentInstantiationContext.current_agent_id().key),
AgentId("Engineer", AgentInstantiationContext.current_agent_id().key),
AgentId("Artist", AgentInstantiationContext.current_agent_id().key),
],
num_rounds=3,
),
lambda: [DefaultSubscription()],
)
# Start the runtime.
runtime.start()
# Start the conversation.
await runtime.publish_message(Message(content="Hello, everyone!", source="Moderator"), topic_id=DefaultTopicId())
await runtime.stop_when_idle()
if __name__ == "__main__":
import logging
logging.basicConfig(level=logging.WARNING)
logging.getLogger("autogen_core").setLevel(logging.DEBUG)
asyncio.run(main())

View File

@ -26,8 +26,6 @@ slow external system that the agent needs to interact with.
import asyncio
import datetime
import json
import os
import sys
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from typing import Any, Mapping, Optional
@ -44,12 +42,9 @@ from autogen_core.components.models import (
UserMessage,
)
from autogen_core.components.tools import BaseTool
from pydantic import BaseModel, Field
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from common.types import TextMessage
from common.utils import get_chat_completion_client_from_envs
from pydantic import BaseModel, Field
@dataclass

View File

@ -1,83 +0,0 @@
"""
This example shows how to use custom function tools with a tool-enabled
agent.
"""
import asyncio
import os
import random
import sys
from typing import List
from autogen_core.application import SingleThreadedAgentRuntime
from autogen_core.base import AgentInstantiationContext
from autogen_core.components.models import (
SystemMessage,
)
from autogen_core.components.tool_agent import ToolAgent
from autogen_core.components.tools import FunctionTool, Tool
from typing_extensions import Annotated
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__))))
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from autogen_core.base import AgentId
from coding_direct import Message, ToolUseAgent
from common.utils import get_chat_completion_client_from_envs
async def get_stock_price(ticker: str, date: Annotated[str, "The date in YYYY/MM/DD format."]) -> float:
"""Get the stock price of a company."""
# This is a placeholder function that returns a random number.
return random.uniform(10, 100)
async def main() -> None:
# Create the runtime.
runtime = SingleThreadedAgentRuntime()
tools: List[Tool] = [
# A tool that gets the stock price.
FunctionTool(
get_stock_price,
description="Get the stock price of a company given the ticker and date.",
name="get_stock_price",
)
]
# Register agents.
await runtime.register(
"tool_executor_agent",
lambda: ToolAgent(
description="Tool Executor Agent",
tools=tools,
),
)
await runtime.register(
"tool_enabled_agent",
lambda: ToolUseAgent(
description="Tool Use Agent",
system_messages=[SystemMessage("You are a helpful AI Assistant. Use your tools to solve problems.")],
model_client=get_chat_completion_client_from_envs(model="gpt-4o-mini"),
tool_schema=[tool.schema for tool in tools],
tool_agent=AgentId("tool_executor_agent", AgentInstantiationContext.current_agent_id().key),
),
)
tool_use_agent = AgentId("tool_enabled_agent", "default")
runtime.start()
# Send a task to the tool user.
response = await runtime.send_message(Message("What is the stock price of NVDA on 2024/06/01"), tool_use_agent)
# Print the result.
assert isinstance(response, Message)
print(response.content)
# Run the runtime until the task is completed.
await runtime.stop()
if __name__ == "__main__":
import logging
logging.basicConfig(level=logging.WARNING)
logging.getLogger("autogen_core").setLevel(logging.DEBUG)
asyncio.run(main())