add tool use and core examples; refactor example directory (#112)

* add tool use example; refactor example directory

* update

* add more examples

* fix

* fix

* doc
This commit is contained in:
Eric Zhu 2024-06-24 15:05:47 -07:00 committed by GitHub
parent 6189fdb05c
commit 2ab3ce48b1
18 changed files with 794 additions and 35 deletions

View File

@ -7,28 +7,36 @@ This directory contains examples and demos of how to use AGNext.
We provide examples to illustrate the core concepts of AGNext:
agents, runtime, and message passing APIs.
- [`inner_outer.py`](inner_outer.py): A simple example of how to create custom agent and message type.
- [`mixture_of_agents_direct.py`](mixture_of_agents_direct.py): An example of how to create a [mixture of agents](https://github.com/togethercomputer/moa) that communicate using async direct messaging API.
- [`mixture_of_agents_pub_sub.py`](mixture_of_agents_pub_sub.py): An example of how to create a [mixture of agents](https://github.com/togethercomputer/moa) that communicate using publish-subscribe API.
- [`coder_reviewer_direct.py`](coder_reviewer_direct.py): An example of how to create a coder-reviewer reflection pattern using async direct messaging API.
- [`coder_reviewer_pub_sub.py`](coder_reviewer_pub_sub.py): An example of how to create a coder-reviewer reflection pattern using publish-subscribe API.
- [`one_agent_direct.py`](core/one_agent_direct.py): A simple example of how to create a single agent powered by ChatCompletion model client. Communicate with the agent using async direct messaging API.
- [`inner_outer_direct.py`](core/inner_outer_direct.py): A simple example of how to create an agent that calls an inner agent using async direct messaging API.
- [`two_agents_pub_sub.py`](core/two_agents_pub_sub.py): An example of how to create two agents that communicate using publish-subscribe API.
- [`mixture_of_agents_direct.py`](core/mixture_of_agents_direct.py): An example of how to create a [mixture of agents](https://github.com/togethercomputer/moa) that communicate using async direct messaging API.
- [`mixture_of_agents_pub_sub.py`](core/mixture_of_agents_pub_sub.py): An example of how to create a [mixture of agents](https://github.com/togethercomputer/moa) that communicate using publish-subscribe API.
- [`coder_reviewer_direct.py`](core/coder_reviewer_direct.py): An example of how to create a coder-reviewer reflection pattern using async direct messaging API.
- [`coder_reviewer_pub_sub.py`](core/coder_reviewer_pub_sub.py): An example of how to create a coder-reviewer reflection pattern using publish-subscribe API.
## Tool use examples
We provide examples to illustrate how to use tools in AGNext:
- [`coding_one_agent_direct.py`](tool-use/coding_two_agent_direct.py): a code execution example with one agent that calls and executes tools to demonstrate tool use and reflection on tool use. This example uses the async direct messaging API.
- [`coding_two_agent_direct.py`](tool-use/coding_two_agent_direct.py): a code execution example with two agents, one for calling tool and one for executing the tool, to demonstrate tool use and reflection on tool use. This example uses the async direct messaging API.
- [`coding_two_agent_pub_sub.py`](tool-use/coding_two_agent_pub_sub.py): a code execution example with two agents, one for calling tool and one for executing the tool, to demonstrate tool use and reflection on tool use. This example uses the publish-subscribe API.
- [`custom_function_tool_one_agent_direct.py`](tool-use/custom_function_tool_one_agent_direct.py): a custom function tool example with one agent that calls and executes tools to demonstrate tool use and reflection on tool use. This example uses the async direct messaging API.
## Demos
We provide interactive demos that showcase the capabilities of AGNext:
We provide interactive demos that showcase applications that can be built using AGNext:
- `assistant.py`: a demonstration of how to use the OpenAI Assistant API to create
- [`assistant.py`](demos/assistant.py): a demonstration of how to use the OpenAI Assistant API to create
a ChatGPT agent.
- `chat_room.py`: An example of how to create a chat room of custom agents without
- [`chat_room.py`](demos/chat_room.py): An example of how to create a chat room of custom agents without
a centralized orchestrator.
- `illustrator_critics.py`: a demo that uses an illustrator, critics and descriptor agent
- [`illustrator_critics.py`](demos/illustrator_critics.py): a demo that uses an illustrator, critics and descriptor agent
to implement the reflection pattern for image generation.
- `chest_game.py`: a demo that two chess player agents to demonstrate tool use and reflection
on tool use.
- `software_consultancy.py`: a demonstration of multi-agent interaction using
- [`software_consultancy.py`](demos/software_consultancy.py): a demonstration of multi-agent interaction using
the group chat pattern.
- `orchestrator.py`: a demonstration of multi-agent problem solving using
the orchestrator pattern.
- [`chest_game.py`](tool-use/chess_game.py): an example with two chess player agents that executes its own tools to demonstrate tool use and reflection on tool use.
## Running the examples and demos
@ -38,23 +46,23 @@ First, you need a shell with AGNext and the examples dependencies installed. To
hatch shell
```
To run an example, just run the corresponding Python script. For example, to run the `coder_reviewer.py` example, run:
To run an example, just run the corresponding Python script. For example, to run the `coder_reviewer_pub_sub.py` example, run:
```bash
hatch shell
python coder_reviewer.py
python core/coder_reviewer.py
```
Or simply:
```bash
hatch run python coder_reviewer.py
hatch run python core/coder_reviewer.py
```
To enable logging, turn on verbose mode by setting `--verbose` flag:
```bash
hatch run python coder_reviewer.py --verbose
hatch run python core/coder_reviewer.py --verbose
```
By default the log file is saved in the same directory with the same filename

View File

@ -1,4 +1,3 @@
import argparse
import asyncio
import logging
from dataclasses import dataclass
@ -14,22 +13,22 @@ class MessageType:
sender: str
class Inner(TypeRoutedAgent): # type: ignore
def __init__(self) -> None: # type: ignore
class Inner(TypeRoutedAgent):
def __init__(self) -> None:
super().__init__("The inner agent")
@message_handler() # type: ignore
async def on_new_message(self, message: MessageType, cancellation_token: CancellationToken) -> MessageType: # type: ignore
@message_handler()
async def on_new_message(self, message: MessageType, cancellation_token: CancellationToken) -> MessageType:
return MessageType(body=f"Inner: {message.body}", sender=self.metadata["name"])
class Outer(TypeRoutedAgent): # type: ignore
def __init__(self, inner: AgentId) -> None: # type: ignore
class Outer(TypeRoutedAgent):
def __init__(self, inner: AgentId) -> None:
super().__init__("The outer agent")
self._inner = inner
@message_handler() # type: ignore
async def on_new_message(self, message: MessageType, cancellation_token: CancellationToken) -> MessageType: # type: ignore
@message_handler()
async def on_new_message(self, message: MessageType, cancellation_token: CancellationToken) -> MessageType:
inner_response = self.send_message(message, self._inner)
inner_message = await inner_response
assert isinstance(inner_message, MessageType)
@ -49,12 +48,8 @@ async def main() -> None:
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Inner-Outter agent example.")
parser.add_argument("--verbose", action="store_true", help="Enable verbose logging.")
args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.WARNING)
logging.getLogger("agnext").setLevel(logging.DEBUG)
handler = logging.FileHandler("inner_outter.log")
logging.getLogger("agnext").addHandler(handler)
import logging
logging.basicConfig(level=logging.WARNING)
logging.getLogger("agnext").setLevel(logging.DEBUG)
asyncio.run(main())

View File

@ -0,0 +1,59 @@
import asyncio
from dataclasses import dataclass
from agnext.application import SingleThreadedAgentRuntime
from agnext.components import TypeRoutedAgent, message_handler
from agnext.components.models import (
ChatCompletionClient,
OpenAI,
SystemMessage,
UserMessage,
)
from agnext.core import CancellationToken
@dataclass
class Message:
content: str
class ChatCompletionAgent(TypeRoutedAgent):
def __init__(self, description: str, model_client: ChatCompletionClient) -> None:
super().__init__(description)
self._system_messages = [SystemMessage("You are a helpful AI assistant.")]
self._model_client = model_client
@message_handler
async def handle_user_message(self, message: Message, cancellation_token: CancellationToken) -> Message:
user_message = UserMessage(content=message.content, source="User")
response = await self._model_client.create(self._system_messages + [user_message])
assert isinstance(response.content, str)
return Message(content=response.content)
async def main() -> None:
runtime = SingleThreadedAgentRuntime()
agent = runtime.register_and_get(
"chat_agent", lambda: ChatCompletionAgent("Chat agent", OpenAI(model="gpt-3.5-turbo"))
)
# Send a message to the agent.
message = Message(content="Can you tell me something fun about SF?")
result = runtime.send_message(message, agent)
# Process messages until the agent responds.
while result.done() is False:
await runtime.process_next()
# Get the response from the agent.
response = await result
assert isinstance(response, Message)
print(response.content)
if __name__ == "__main__":
import logging
logging.basicConfig(level=logging.WARNING)
logging.getLogger("agnext").setLevel(logging.DEBUG)
asyncio.run(main())

View File

@ -0,0 +1,97 @@
import asyncio
from dataclasses import dataclass
from typing import List
from agnext.application import SingleThreadedAgentRuntime
from agnext.components import TypeRoutedAgent, message_handler
from agnext.components.models import (
AssistantMessage,
ChatCompletionClient,
LLMMessage,
OpenAI,
SystemMessage,
UserMessage,
)
from agnext.core import CancellationToken
@dataclass
class Message:
source: str
content: str
class ChatCompletionAgent(TypeRoutedAgent):
"""An agent that uses a chat completion model to respond to messages.
It keeps a memory of the conversation and uses it to generate responses.
It terminates the conversation when the termination word is mentioned."""
def __init__(
self,
description: str,
system_messages: List[SystemMessage],
model_client: ChatCompletionClient,
termination_word: str,
) -> None:
super().__init__(description)
self._system_messages = system_messages
self._model_client = model_client
self._memory: List[Message] = []
self._termination_word = termination_word
@message_handler
async def handle_message(self, message: Message, cancellation_token: CancellationToken) -> None:
self._memory.append(message)
if self._termination_word in message.content:
return
llm_messages: List[LLMMessage] = []
for m in self._memory[-10:]:
if m.source == self.metadata["name"]:
llm_messages.append(AssistantMessage(content=m.content, source=self.metadata["name"]))
else:
llm_messages.append(UserMessage(content=m.content, source=m.source))
response = await self._model_client.create(self._system_messages + llm_messages)
assert isinstance(response.content, str)
self.publish_message(Message(content=response.content, source=self.metadata["name"]))
async def main() -> None:
runtime = SingleThreadedAgentRuntime()
jack = runtime.register_and_get(
"Jack",
lambda: ChatCompletionAgent(
description="Jack a comedian",
model_client=OpenAI(model="gpt-3.5-turbo"),
system_messages=[
SystemMessage("You are a comedian likes to make jokes. " "When you are done talking, say 'TERMINATE'.")
],
termination_word="TERMINATE",
),
)
runtime.register_and_get(
"Cathy",
lambda: ChatCompletionAgent(
description="Cathy a poet",
model_client=OpenAI(model="gpt-3.5-turbo"),
system_messages=[
SystemMessage("You are a poet likes to write poems. " "When you are done talking, say 'TERMINATE'.")
],
termination_word="TERMINATE",
),
)
# Send a message to Jack to start the conversation.
message = Message(content="Can you tell me something fun about SF?", source="User")
runtime.send_message(message, jack)
# Process messages until the agent responds.
while True:
await runtime.process_next()
if __name__ == "__main__":
import logging
logging.basicConfig(level=logging.WARNING)
logging.getLogger("agnext").setLevel(logging.DEBUG)
asyncio.run(main())

View File

@ -0,0 +1,144 @@
import asyncio
import json
from dataclasses import dataclass
from typing import List
from agnext.application import SingleThreadedAgentRuntime
from agnext.components import FunctionCall, TypeRoutedAgent, message_handler
from agnext.components.code_executor import LocalCommandLineCodeExecutor
from agnext.components.models import (
AssistantMessage,
ChatCompletionClient,
FunctionExecutionResult,
FunctionExecutionResultMessage,
LLMMessage,
OpenAI,
SystemMessage,
UserMessage,
)
from agnext.components.tools import PythonCodeExecutionTool, Tool
from agnext.core import CancellationToken
@dataclass
class ToolExecutionTask:
function_call: FunctionCall
@dataclass
class ToolExecutionTaskResult:
result: FunctionExecutionResult
@dataclass
class UserRequest:
content: str
@dataclass
class AIResponse:
content: str
class ToolEnabledAgent(TypeRoutedAgent):
"""An agent that uses tools to perform tasks. It executes the tools
by itself by sending the tool execution task to itself."""
def __init__(
self,
description: str,
system_messages: List[SystemMessage],
model_client: ChatCompletionClient,
tools: List[Tool],
) -> None:
super().__init__(description)
self._model_client = model_client
self._system_messages = system_messages
self._tools = tools
@message_handler
async def handle_user_message(self, message: UserRequest, cancellation_token: CancellationToken) -> AIResponse:
"""Handle a user message, execute the model and tools, and returns the response."""
session: List[LLMMessage] = []
session.append(UserMessage(content=message.content, source="User"))
response = await self._model_client.create(self._system_messages + session, tools=self._tools)
session.append(AssistantMessage(content=response.content, source=self.metadata["name"]))
# Keep executing the tools until the response is not a list of function calls.
while isinstance(response.content, list) and all(isinstance(item, FunctionCall) for item in response.content):
results = await asyncio.gather(
*[self.send_message(ToolExecutionTask(function_call=call), self.id) for call in response.content]
)
# Combine the results into a single response.
result = FunctionExecutionResultMessage(content=[result.result for result in results])
session.append(result)
# Execute the model again with the new response.
response = await self._model_client.create(self._system_messages + session, tools=self._tools)
session.append(AssistantMessage(content=response.content, source=self.metadata["name"]))
assert isinstance(response.content, str)
return AIResponse(content=response.content)
@message_handler
async def handle_tool_call(
self, message: ToolExecutionTask, cancellation_token: CancellationToken
) -> ToolExecutionTaskResult:
"""Handle a tool execution task. This method executes the tool and publishes the result."""
# Find the tool
tool = next((tool for tool in self._tools if tool.name == message.function_call.name), None)
if tool is None:
result_as_str = f"Error: Tool not found: {message.function_call.name}"
else:
try:
arguments = json.loads(message.function_call.arguments)
result = await tool.run_json(args=arguments, cancellation_token=cancellation_token)
result_as_str = tool.return_value_as_string(result)
except json.JSONDecodeError:
result_as_str = f"Error: Invalid arguments: {message.function_call.arguments}"
except Exception as e:
result_as_str = f"Error: {e}"
return ToolExecutionTaskResult(
result=FunctionExecutionResult(content=result_as_str, call_id=message.function_call.id),
)
async def main() -> None:
# Create the runtime.
runtime = SingleThreadedAgentRuntime()
# Define the tools.
tools: List[Tool] = [
# A tool that executes Python code.
PythonCodeExecutionTool(
LocalCommandLineCodeExecutor(),
)
]
# Register agents.
tool_agent = runtime.register_and_get(
"tool_enabled_agent",
lambda: ToolEnabledAgent(
description="Tool Use Agent",
system_messages=[SystemMessage("You are a helpful AI Assistant. Use your tools to solve problems.")],
model_client=OpenAI(model="gpt-3.5-turbo"),
tools=tools,
),
)
# Send a task to the tool user.
result = runtime.send_message(UserRequest("Run the following Python code: print('Hello, World!')"), tool_agent)
# Run the runtime until the task is completed.
while not result.done():
await runtime.process_next()
# Print the result.
ai_response = result.result()
assert isinstance(ai_response, AIResponse)
print(ai_response.content)
if __name__ == "__main__":
import logging
logging.basicConfig(level=logging.WARNING)
logging.getLogger("agnext").setLevel(logging.DEBUG)
asyncio.run(main())

View File

@ -0,0 +1,160 @@
import asyncio
import json
from dataclasses import dataclass
from typing import List
from agnext.application import SingleThreadedAgentRuntime
from agnext.components import FunctionCall, TypeRoutedAgent, message_handler
from agnext.components.code_executor import LocalCommandLineCodeExecutor
from agnext.components.models import (
AssistantMessage,
ChatCompletionClient,
FunctionExecutionResult,
FunctionExecutionResultMessage,
LLMMessage,
OpenAI,
SystemMessage,
UserMessage,
)
from agnext.components.tools import PythonCodeExecutionTool, Tool
from agnext.core import AgentId, CancellationToken
@dataclass
class ToolExecutionTask:
function_call: FunctionCall
@dataclass
class ToolExecutionTaskResult:
result: FunctionExecutionResult
@dataclass
class UserRequest:
content: str
@dataclass
class AIResponse:
content: str
class ToolExecutorAgent(TypeRoutedAgent):
"""An agent that executes tools."""
def __init__(self, description: str, tools: List[Tool]) -> None:
super().__init__(description)
self._tools = tools
@message_handler
async def handle_tool_call(
self, message: ToolExecutionTask, cancellation_token: CancellationToken
) -> ToolExecutionTaskResult:
"""Handle a tool execution task. This method executes the tool and publishes the result."""
# Find the tool
tool = next((tool for tool in self._tools if tool.name == message.function_call.name), None)
if tool is None:
result_as_str = f"Error: Tool not found: {message.function_call.name}"
else:
try:
arguments = json.loads(message.function_call.arguments)
result = await tool.run_json(args=arguments, cancellation_token=cancellation_token)
result_as_str = tool.return_value_as_string(result)
except json.JSONDecodeError:
result_as_str = f"Error: Invalid arguments: {message.function_call.arguments}"
except Exception as e:
result_as_str = f"Error: {e}"
return ToolExecutionTaskResult(
result=FunctionExecutionResult(content=result_as_str, call_id=message.function_call.id),
)
class ToolUserAgent(TypeRoutedAgent):
"""An agent that uses tools to perform tasks. It doesn't execute the tools
by itself, but delegates the execution to ToolExecutorAgent using direct
messaging."""
def __init__(
self,
description: str,
system_messages: List[SystemMessage],
model_client: ChatCompletionClient,
tools: List[Tool],
tool_executor: AgentId,
) -> None:
super().__init__(description)
self._model_client = model_client
self._system_messages = system_messages
self._tools = tools
self._tool_executor = tool_executor
@message_handler
async def handle_user_message(self, message: UserRequest, cancellation_token: CancellationToken) -> AIResponse:
"""Handle a user message, execute the model and tools, and returns the response."""
session: List[LLMMessage] = []
session.append(UserMessage(content=message.content, source="User"))
response = await self._model_client.create(self._system_messages + session, tools=self._tools)
session.append(AssistantMessage(content=response.content, source=self.metadata["name"]))
# Keep executing the tools until the response is not a list of function calls.
while isinstance(response.content, list) and all(isinstance(item, FunctionCall) for item in response.content):
results = await asyncio.gather(
*[
self.send_message(ToolExecutionTask(function_call=call), self._tool_executor)
for call in response.content
]
)
# Combine the results into a single response.
result = FunctionExecutionResultMessage(content=[result.result for result in results])
session.append(result)
# Execute the model again with the new response.
response = await self._model_client.create(self._system_messages + session, tools=self._tools)
session.append(AssistantMessage(content=response.content, source=self.metadata["name"]))
assert isinstance(response.content, str)
return AIResponse(content=response.content)
async def main() -> None:
# Create the runtime.
runtime = SingleThreadedAgentRuntime()
# Define the tools.
tools: List[Tool] = [
# A tool that executes Python code.
PythonCodeExecutionTool(
LocalCommandLineCodeExecutor(),
)
]
# Register agents.
executor = runtime.register_and_get("tool_executor", lambda: ToolExecutorAgent("Tool Executor", tools))
tool_user = runtime.register_and_get(
"tool_use_agent",
lambda: ToolUserAgent(
description="Tool Use Agent",
system_messages=[SystemMessage("You are a helpful AI Assistant. Use your tools to solve problems.")],
model_client=OpenAI(model="gpt-3.5-turbo"),
tools=tools,
tool_executor=executor,
),
)
# Send a task to the tool user.
result = runtime.send_message(UserRequest("Run the following Python code: print('Hello, World!')"), tool_user)
# Run the runtime until the task is completed.
while not result.done():
await runtime.process_next()
# Print the result.
ai_response = result.result()
assert isinstance(ai_response, AIResponse)
print(ai_response.content)
if __name__ == "__main__":
import logging
logging.basicConfig(level=logging.WARNING)
logging.getLogger("agnext").setLevel(logging.DEBUG)
asyncio.run(main())

View File

@ -0,0 +1,232 @@
import asyncio
import json
import uuid
from dataclasses import dataclass
from typing import Any, Dict, List
from agnext.application import SingleThreadedAgentRuntime
from agnext.components import FunctionCall, TypeRoutedAgent, message_handler
from agnext.components.code_executor import LocalCommandLineCodeExecutor
from agnext.components.models import (
AssistantMessage,
ChatCompletionClient,
FunctionExecutionResult,
FunctionExecutionResultMessage,
LLMMessage,
OpenAI,
SystemMessage,
UserMessage,
)
from agnext.components.tools import PythonCodeExecutionTool, Tool
from agnext.core import AgentId, CancellationToken
from agnext.core.intervention import DefaultInterventionHandler
@dataclass
class ToolExecutionTask:
session_id: str
function_call: FunctionCall
@dataclass
class ToolExecutionTaskResult:
session_id: str
result: FunctionExecutionResult
@dataclass
class UserRequest:
content: str
@dataclass
class AIResponse:
content: str
@dataclass
class Termination:
pass
class ToolExecutorAgent(TypeRoutedAgent):
"""An agent that executes tools."""
def __init__(self, description: str, tools: List[Tool]) -> None:
super().__init__(description)
self._tools = tools
@message_handler
async def handle_tool_call(self, message: ToolExecutionTask, cancellation_token: CancellationToken) -> None:
"""Handle a tool execution task. This method executes the tool and publishes the result."""
# Find the tool
tool = next((tool for tool in self._tools if tool.name == message.function_call.name), None)
if tool is None:
result_as_str = f"Error: Tool not found: {message.function_call.name}"
else:
try:
arguments = json.loads(message.function_call.arguments)
result = await tool.run_json(args=arguments, cancellation_token=cancellation_token)
result_as_str = tool.return_value_as_string(result)
except json.JSONDecodeError:
result_as_str = f"Error: Invalid arguments: {message.function_call.arguments}"
except Exception as e:
result_as_str = f"Error: {e}"
task_result = ToolExecutionTaskResult(
session_id=message.session_id,
result=FunctionExecutionResult(content=result_as_str, call_id=message.function_call.id),
)
await self.publish_message(task_result)
class ToolUseAgent(TypeRoutedAgent):
"""An agent that uses tools to perform tasks. It doesn't execute the tools
by itself, but delegates the execution to ToolExecutorAgent using pub/sub
mechanism."""
def __init__(
self,
description: str,
system_messages: List[SystemMessage],
model_client: ChatCompletionClient,
tools: List[Tool],
) -> None:
super().__init__(description)
self._model_client = model_client
self._system_messages = system_messages
self._tools = tools
self._sessions: Dict[str, List[LLMMessage]] = {}
self._tool_results: Dict[str, List[ToolExecutionTaskResult]] = {}
self._tool_counter: Dict[str, int] = {}
@message_handler
async def handle_user_message(self, message: UserRequest, cancellation_token: CancellationToken) -> None:
"""Handle a user message. This method calls the model. If the model response is a string,
it publishes the response. If the model response is a list of function calls, it publishes
the function calls to the tool executor agent."""
session_id = str(uuid.uuid4())
self._sessions.setdefault(session_id, []).append(UserMessage(content=message.content, source="User"))
response = await self._model_client.create(
self._system_messages + self._sessions[session_id], tools=self._tools
)
self._sessions[session_id].append(AssistantMessage(content=response.content, source=self.metadata["name"]))
if isinstance(response.content, str):
# If the response is a string, just publish the response.
response_message = AIResponse(content=response.content)
await self.publish_message(response_message)
# Handle the response as a list of function calls.
assert isinstance(response.content, list) and all(isinstance(item, FunctionCall) for item in response.content)
self._tool_results.setdefault(session_id, [])
self._tool_counter.setdefault(session_id, 0)
# Publish the function calls to the tool executor agent.
for function_call in response.content:
task = ToolExecutionTask(session_id=session_id, function_call=function_call)
self._tool_counter[session_id] += 1
await self.publish_message(task)
@message_handler
async def handle_tool_result(self, message: ToolExecutionTaskResult, cancellation_token: CancellationToken) -> None:
"""Handle a tool execution result. This method aggregates the tool results and
calls the model again to get another response. If the response is a string, it
publishes the response. If the response is a list of function calls, it publishes
the function calls to the tool executor agent."""
self._tool_results[message.session_id].append(message)
self._tool_counter[message.session_id] -= 1
if self._tool_counter[message.session_id] > 0:
# Not all tools have finished execution.
return
# All tools have finished execution.
# Aggregate tool results into a single LLM message.
result = FunctionExecutionResultMessage(content=[r.result for r in self._tool_results[message.session_id]])
# Clear the tool results.
self._tool_results[message.session_id].clear()
# Get another response from the model.
self._sessions[message.session_id].append(result)
response = await self._model_client.create(
self._system_messages + self._sessions[message.session_id], tools=self._tools
)
self._sessions[message.session_id].append(
AssistantMessage(content=response.content, source=self.metadata["name"])
)
# If the response is a string, just publish the response.
if isinstance(response.content, str):
response_message = AIResponse(content=response.content)
await self.publish_message(response_message)
self._tool_results.pop(message.session_id)
self._tool_counter.pop(message.session_id)
return
# Handle the response as a list of function calls.
assert isinstance(response.content, list) and all(isinstance(item, FunctionCall) for item in response.content)
# Publish the function calls to the tool executor agent.
for function_call in response.content:
task = ToolExecutionTask(session_id=message.session_id, function_call=function_call)
self._tool_counter[message.session_id] += 1
await self.publish_message(task)
class TerminationHandler(DefaultInterventionHandler):
"""A handler that listens for termination messages."""
def __init__(self) -> None:
self._terminated = False
async def on_publish(self, message: Any, *, sender: AgentId | None) -> Any:
if isinstance(message, Termination):
self._terminated = True
return message
@property
def terminated(self) -> bool:
return self._terminated
class DisplayAgent(TypeRoutedAgent):
"""An agent that displays to the console and publishes a termination message
to the runtime."""
@message_handler
async def handle_code_writing_result(self, message: AIResponse, cancellation_token: CancellationToken) -> None:
print("AI Response:", message.content)
# Terminate the runtime.
await self.publish_message(Termination())
async def main() -> None:
termination_handler = TerminationHandler()
runtime = SingleThreadedAgentRuntime(intervention_handler=termination_handler)
# Define the tools.
tools: List[Tool] = [
PythonCodeExecutionTool(
LocalCommandLineCodeExecutor(),
)
]
# Register agents.
runtime.register("tool_executor", lambda: ToolExecutorAgent("Tool Executor", tools))
runtime.register(
"tool_use_agent",
lambda: ToolUseAgent(
description="Tool Use Agent",
system_messages=[SystemMessage("You are a helpful AI Assistant. Use your tools to solve problems.")],
model_client=OpenAI(model="gpt-3.5-turbo"),
tools=tools,
),
)
runtime.register("display_agent", lambda: DisplayAgent("Display Agent"))
# Publish a task.
runtime.publish_message(UserRequest("Run the following Python code: print('Hello, World!')"), namespace="default")
# Run the runtime until termination.
while not termination_handler.terminated:
await runtime.process_next()
if __name__ == "__main__":
import logging
logging.basicConfig(level=logging.WARNING)
logging.getLogger("agnext").setLevel(logging.DEBUG)
asyncio.run(main())

View File

@ -0,0 +1,64 @@
import asyncio
import os
import random
import sys
from agnext.application import SingleThreadedAgentRuntime
from agnext.components.models import (
OpenAI,
SystemMessage,
)
from agnext.components.tools import FunctionTool
from typing_extensions import Annotated
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__))))
from coding_one_agent_direct import AIResponse, ToolEnabledAgent, UserRequest
async def get_stock_price(ticker: str, date: Annotated[str, "The date in YYYY/MM/DD format."]) -> float:
"""Get the stock price of a company."""
# This is a placeholder function that returns a random number.
return random.uniform(10, 100)
async def main() -> None:
# Create the runtime.
runtime = SingleThreadedAgentRuntime()
# Register agents.
tool_agent = runtime.register_and_get(
"tool_enabled_agent",
lambda: ToolEnabledAgent(
description="Tool Use Agent",
system_messages=[SystemMessage("You are a helpful AI Assistant. Use your tools to solve problems.")],
model_client=OpenAI(model="gpt-3.5-turbo"),
tools=[
# Define a tool that gets the stock price.
FunctionTool(
get_stock_price,
description="Get the stock price of a company given the ticker and date.",
name="get_stock_price",
)
],
),
)
# Send a task to the tool user.
result = runtime.send_message(UserRequest("What is the stock price of NVDA on 2024/06/01"), tool_agent)
# Run the runtime until the task is completed.
while not result.done():
await runtime.process_next()
# Print the result.
ai_response = result.result()
assert isinstance(ai_response, AIResponse)
print(ai_response.content)
if __name__ == "__main__":
import logging
logging.basicConfig(level=logging.WARNING)
logging.getLogger("agnext").setLevel(logging.DEBUG)
asyncio.run(main())