Make RunContext internal (#386)

* Make RunContext internal

* Mypy
This commit is contained in:
Eric Zhu 2024-08-21 13:59:59 -07:00 committed by GitHub
parent 09ceef4b4a
commit ed0890525d
40 changed files with 2360 additions and 2329 deletions

View File

@ -1,300 +1,300 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Using LangGraph-Backed Agent\n",
"\n",
"This example demonstrates how to create an AI agent using LangGraph.\n",
"Based on the example in the LangGraph documentation:\n",
"https://langchain-ai.github.io/langgraph/.\n",
"\n",
"First install the dependencies:"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"vscode": {
"languageId": "shellscript"
}
},
"outputs": [],
"source": [
"# pip install langgraph langchain-openai azure-identity"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's import the modules."
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"from dataclasses import dataclass\n",
"from typing import Any, Callable, List, Literal\n",
"\n",
"from agnext.application import SingleThreadedAgentRuntime\n",
"from agnext.components import TypeRoutedAgent, message_handler\n",
"from agnext.core import AgentId, MessageContext\n",
"from azure.identity import DefaultAzureCredential, get_bearer_token_provider\n",
"from langchain_core.messages import HumanMessage, SystemMessage\n",
"from langchain_core.tools import tool # pyright: ignore\n",
"from langchain_openai import AzureChatOpenAI, ChatOpenAI\n",
"from langgraph.graph import END, MessagesState, StateGraph\n",
"from langgraph.prebuilt import ToolNode"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Define our message type that will be used to communicate with the agent."
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"@dataclass\n",
"class Message:\n",
" content: str"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Define the tools the agent will use."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"@tool # pyright: ignore\n",
"def get_weather(location: str) -> str:\n",
" \"\"\"Call to surf the web.\"\"\"\n",
" # This is a placeholder, but don't tell the LLM that...\n",
" if \"sf\" in location.lower() or \"san francisco\" in location.lower():\n",
" return \"It's 60 degrees and foggy.\"\n",
" return \"It's 90 degrees and sunny.\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Define the agent using LangGraph's API."
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
"class LangGraphToolUseAgent(TypeRoutedAgent):\n",
" def __init__(self, description: str, model: ChatOpenAI, tools: List[Callable[..., Any]]) -> None: # pyright: ignore\n",
" super().__init__(description)\n",
" self._model = model.bind_tools(tools) # pyright: ignore\n",
"\n",
" # Define the function that determines whether to continue or not\n",
" def should_continue(state: MessagesState) -> Literal[\"tools\", END]: # type: ignore\n",
" messages = state[\"messages\"]\n",
" last_message = messages[-1]\n",
" # If the LLM makes a tool call, then we route to the \"tools\" node\n",
" if last_message.tool_calls: # type: ignore\n",
" return \"tools\"\n",
" # Otherwise, we stop (reply to the user)\n",
" return END\n",
"\n",
" # Define the function that calls the model\n",
" async def call_model(state: MessagesState): # type: ignore\n",
" messages = state[\"messages\"]\n",
" response = await self._model.ainvoke(messages)\n",
" # We return a list, because this will get added to the existing list\n",
" return {\"messages\": [response]}\n",
"\n",
" tool_node = ToolNode(tools) # pyright: ignore\n",
"\n",
" # Define a new graph\n",
" self._workflow = StateGraph(MessagesState)\n",
"\n",
" # Define the two nodes we will cycle between\n",
" self._workflow.add_node(\"agent\", call_model) # pyright: ignore\n",
" self._workflow.add_node(\"tools\", tool_node) # pyright: ignore\n",
"\n",
" # Set the entrypoint as `agent`\n",
" # This means that this node is the first one called\n",
" self._workflow.set_entry_point(\"agent\")\n",
"\n",
" # We now add a conditional edge\n",
" self._workflow.add_conditional_edges(\n",
" # First, we define the start node. We use `agent`.\n",
" # This means these are the edges taken after the `agent` node is called.\n",
" \"agent\",\n",
" # Next, we pass in the function that will determine which node is called next.\n",
" should_continue, # type: ignore\n",
" )\n",
"\n",
" # We now add a normal edge from `tools` to `agent`.\n",
" # This means that after `tools` is called, `agent` node is called next.\n",
" self._workflow.add_edge(\"tools\", \"agent\")\n",
"\n",
" # Finally, we compile it!\n",
" # This compiles it into a LangChain Runnable,\n",
" # meaning you can use it as you would any other runnable.\n",
" # Note that we're (optionally) passing the memory when compiling the graph\n",
" self._app = self._workflow.compile()\n",
"\n",
" @message_handler\n",
" async def handle_user_message(self, message: Message, ctx: MessageContext) -> Message:\n",
" # Use the Runnable\n",
" final_state = await self._app.ainvoke(\n",
" {\n",
" \"messages\": [\n",
" SystemMessage(\n",
" content=\"You are a helpful AI assistant. You can use tools to help answer questions.\"\n",
" ),\n",
" HumanMessage(content=message.content),\n",
" ]\n",
" },\n",
" config={\"configurable\": {\"thread_id\": 42}},\n",
" )\n",
" response = Message(content=final_state[\"messages\"][-1].content)\n",
" return response"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now let's test the agent. First we need to create an agent runtime and\n",
"register the agent, by providing the agent's name and a factory function\n",
"that will create the agent."
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [],
"source": [
"runtime = SingleThreadedAgentRuntime()\n",
"await runtime.register(\n",
" \"langgraph_tool_use_agent\",\n",
" lambda: LangGraphToolUseAgent(\n",
" \"Tool use agent\",\n",
" ChatOpenAI(\n",
" model=\"gpt-4o\",\n",
" # api_key=os.getenv(\"OPENAI_API_KEY\"),\n",
" ),\n",
" # AzureChatOpenAI(\n",
" # azure_deployment=os.getenv(\"AZURE_OPENAI_DEPLOYMENT\"),\n",
" # azure_endpoint=os.getenv(\"AZURE_OPENAI_ENDPOINT\"),\n",
" # api_version=os.getenv(\"AZURE_OPENAI_API_VERSION\"),\n",
" # # Using Azure Active Directory authentication.\n",
" # azure_ad_token_provider=get_bearer_token_provider(DefaultAzureCredential()),\n",
" # # Using API key.\n",
" # # api_key=os.getenv(\"AZURE_OPENAI_API_KEY\"),\n",
" # ),\n",
" [get_weather],\n",
" ),\n",
")\n",
"agent = AgentId(\"langgraph_tool_use_agent\", key=\"default\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Start the agent runtime."
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
"run_context = runtime.start()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Send a direct message to the agent, and print the response."
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"The weather in San Francisco is currently 60 degrees and foggy.\n"
]
}
],
"source": [
"response = await runtime.send_message(Message(\"What's the weather in SF?\"), agent)\n",
"print(response.content)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Stop the agent runtime."
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [],
"source": [
"await run_context.stop()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "agnext",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.9"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Using LangGraph-Backed Agent\n",
"\n",
"This example demonstrates how to create an AI agent using LangGraph.\n",
"Based on the example in the LangGraph documentation:\n",
"https://langchain-ai.github.io/langgraph/.\n",
"\n",
"First install the dependencies:"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"vscode": {
"languageId": "shellscript"
}
},
"outputs": [],
"source": [
"# pip install langgraph langchain-openai azure-identity"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's import the modules."
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"from dataclasses import dataclass\n",
"from typing import Any, Callable, List, Literal\n",
"\n",
"from agnext.application import SingleThreadedAgentRuntime\n",
"from agnext.components import TypeRoutedAgent, message_handler\n",
"from agnext.core import AgentId, MessageContext\n",
"from azure.identity import DefaultAzureCredential, get_bearer_token_provider\n",
"from langchain_core.messages import HumanMessage, SystemMessage\n",
"from langchain_core.tools import tool # pyright: ignore\n",
"from langchain_openai import AzureChatOpenAI, ChatOpenAI\n",
"from langgraph.graph import END, MessagesState, StateGraph\n",
"from langgraph.prebuilt import ToolNode"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Define our message type that will be used to communicate with the agent."
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"@dataclass\n",
"class Message:\n",
" content: str"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Define the tools the agent will use."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"@tool # pyright: ignore\n",
"def get_weather(location: str) -> str:\n",
" \"\"\"Call to surf the web.\"\"\"\n",
" # This is a placeholder, but don't tell the LLM that...\n",
" if \"sf\" in location.lower() or \"san francisco\" in location.lower():\n",
" return \"It's 60 degrees and foggy.\"\n",
" return \"It's 90 degrees and sunny.\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Define the agent using LangGraph's API."
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
"class LangGraphToolUseAgent(TypeRoutedAgent):\n",
" def __init__(self, description: str, model: ChatOpenAI, tools: List[Callable[..., Any]]) -> None: # pyright: ignore\n",
" super().__init__(description)\n",
" self._model = model.bind_tools(tools) # pyright: ignore\n",
"\n",
" # Define the function that determines whether to continue or not\n",
" def should_continue(state: MessagesState) -> Literal[\"tools\", END]: # type: ignore\n",
" messages = state[\"messages\"]\n",
" last_message = messages[-1]\n",
" # If the LLM makes a tool call, then we route to the \"tools\" node\n",
" if last_message.tool_calls: # type: ignore\n",
" return \"tools\"\n",
" # Otherwise, we stop (reply to the user)\n",
" return END\n",
"\n",
" # Define the function that calls the model\n",
" async def call_model(state: MessagesState): # type: ignore\n",
" messages = state[\"messages\"]\n",
" response = await self._model.ainvoke(messages)\n",
" # We return a list, because this will get added to the existing list\n",
" return {\"messages\": [response]}\n",
"\n",
" tool_node = ToolNode(tools) # pyright: ignore\n",
"\n",
" # Define a new graph\n",
" self._workflow = StateGraph(MessagesState)\n",
"\n",
" # Define the two nodes we will cycle between\n",
" self._workflow.add_node(\"agent\", call_model) # pyright: ignore\n",
" self._workflow.add_node(\"tools\", tool_node) # pyright: ignore\n",
"\n",
" # Set the entrypoint as `agent`\n",
" # This means that this node is the first one called\n",
" self._workflow.set_entry_point(\"agent\")\n",
"\n",
" # We now add a conditional edge\n",
" self._workflow.add_conditional_edges(\n",
" # First, we define the start node. We use `agent`.\n",
" # This means these are the edges taken after the `agent` node is called.\n",
" \"agent\",\n",
" # Next, we pass in the function that will determine which node is called next.\n",
" should_continue, # type: ignore\n",
" )\n",
"\n",
" # We now add a normal edge from `tools` to `agent`.\n",
" # This means that after `tools` is called, `agent` node is called next.\n",
" self._workflow.add_edge(\"tools\", \"agent\")\n",
"\n",
" # Finally, we compile it!\n",
" # This compiles it into a LangChain Runnable,\n",
" # meaning you can use it as you would any other runnable.\n",
" # Note that we're (optionally) passing the memory when compiling the graph\n",
" self._app = self._workflow.compile()\n",
"\n",
" @message_handler\n",
" async def handle_user_message(self, message: Message, ctx: MessageContext) -> Message:\n",
" # Use the Runnable\n",
" final_state = await self._app.ainvoke(\n",
" {\n",
" \"messages\": [\n",
" SystemMessage(\n",
" content=\"You are a helpful AI assistant. You can use tools to help answer questions.\"\n",
" ),\n",
" HumanMessage(content=message.content),\n",
" ]\n",
" },\n",
" config={\"configurable\": {\"thread_id\": 42}},\n",
" )\n",
" response = Message(content=final_state[\"messages\"][-1].content)\n",
" return response"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now let's test the agent. First we need to create an agent runtime and\n",
"register the agent, by providing the agent's name and a factory function\n",
"that will create the agent."
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [],
"source": [
"runtime = SingleThreadedAgentRuntime()\n",
"await runtime.register(\n",
" \"langgraph_tool_use_agent\",\n",
" lambda: LangGraphToolUseAgent(\n",
" \"Tool use agent\",\n",
" ChatOpenAI(\n",
" model=\"gpt-4o\",\n",
" # api_key=os.getenv(\"OPENAI_API_KEY\"),\n",
" ),\n",
" # AzureChatOpenAI(\n",
" # azure_deployment=os.getenv(\"AZURE_OPENAI_DEPLOYMENT\"),\n",
" # azure_endpoint=os.getenv(\"AZURE_OPENAI_ENDPOINT\"),\n",
" # api_version=os.getenv(\"AZURE_OPENAI_API_VERSION\"),\n",
" # # Using Azure Active Directory authentication.\n",
" # azure_ad_token_provider=get_bearer_token_provider(DefaultAzureCredential()),\n",
" # # Using API key.\n",
" # # api_key=os.getenv(\"AZURE_OPENAI_API_KEY\"),\n",
" # ),\n",
" [get_weather],\n",
" ),\n",
")\n",
"agent = AgentId(\"langgraph_tool_use_agent\", key=\"default\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Start the agent runtime."
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
"runtime.start()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Send a direct message to the agent, and print the response."
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"The weather in San Francisco is currently 60 degrees and foggy.\n"
]
}
],
"source": [
"response = await runtime.send_message(Message(\"What's the weather in SF?\"), agent)\n",
"print(response.content)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Stop the agent runtime."
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [],
"source": [
"await runtime.stop()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "agnext",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.9"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1,252 +1,263 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Agent and Agent Runtime\n",
"\n",
"In this and the following section, we focus on the \n",
"[core concepts](../core-concepts/overview.md) of AGNext:\n",
"agents, agent runtime, messages, and communication.\n",
"You will not find any AI models or tools here, just the foundational\n",
"building blocks for building multi-agent applications."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"An agent in AGNext is an entity defined by the base class {py:class}`agnext.core.BaseAgent`.\n",
"It has a unique identifier of the type {py:class}`agnext.core.AgentId`,\n",
"a metadata dictionary of the type {py:class}`agnext.core.AgentMetadata`,\n",
"and method for handling messages {py:meth}`agnext.core.BaseAgent.on_message`.\n",
"\n",
"An agent runtime is the execution environment for agents in AGNext.\n",
"Similar to the runtime environment of a programming language,\n",
"an agent runtime provides the necessary infrastructure to facilitate communication\n",
"between agents, manage agent lifecycles, enforce security boundaries, and support monitoring and\n",
"debugging.\n",
"For local development, developers can use {py:class}`~agnext.application.SingleThreadedAgentRuntime`,\n",
"which can be embedded in a Python application.\n",
"\n",
"```{note}\n",
"Agents are not directly instantiated and managed by application code.\n",
"Instead, they are created by the runtime when needed and managed by the runtime.\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Implementing an Agent\n",
"\n",
"To implement an agent, developer must subclass the {py:class}`~agnext.core.BaseAgent` class,\n",
"declare the message types it can handle in the {py:attr}`~agnext.core.AgentMetadata.subscriptions` metadata,\n",
"and implement the {py:meth}`~agnext.core.BaseAgent.on_message` method.\n",
"This method is invoked when the agent receives a message. For example,\n",
"the following agent handles a simple message type and simply prints message it receives:"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"from dataclasses import dataclass\n",
"\n",
"from agnext.core import AgentId, BaseAgent, MessageContext\n",
"\n",
"\n",
"@dataclass\n",
"class MyMessage:\n",
" content: str\n",
"\n",
"\n",
"class MyAgent(BaseAgent):\n",
" def __init__(self) -> None:\n",
" super().__init__(\"MyAgent\")\n",
"\n",
" async def on_message(self, message: MyMessage, ctx: MessageContext) -> None:\n",
" print(f\"Received message: {message.content}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"For convenience, developers can subclass the {py:class}`~agnext.components.TypeRoutedAgent` class\n",
"which provides an easy-to use API to implement different message handlers for different message types.\n",
"See the section on message handlers below."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Registering Agents\n",
"\n",
"To make an agent available to the runtime, developers can use the\n",
"{py:meth}`~agnext.core.AgentRuntime.register` method.\n",
"The process of registration associates a name and a factory function\n",
"that creates an instance of the agent in a given namespace.\n",
"The factory function is used to allow automatic creation of agents when they are needed.\n",
"\n",
"For example, to register an agent with the {py:class}`~agnext.application.SingleThreadedAgentRuntime`,\n",
"the following code can be used:"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"from agnext.application import SingleThreadedAgentRuntime\n",
"\n",
"runtime = SingleThreadedAgentRuntime()\n",
"await runtime.register(\"my_agent\", lambda: MyAgent())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Once an agent is registered, the agent's unique ID of the type {py:class}`~agnext.core.AgentId` \n",
"can be retrieved by calling {py:meth}`~agnext.core.AgentRuntime.get`.\n",
"Using the agent ID, we can send messages to the agent:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Received message: Hello, World!\n"
]
}
],
"source": [
"agent_id = AgentId(\"my_agent\", \"default\")\n",
"run_context = runtime.start() # Start processing messages in the background.\n",
"await runtime.send_message(MyMessage(content=\"Hello, World!\"), agent_id)\n",
"await run_context.stop() # Stop processing messages in the background."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"There is a convenience method\n",
"{py:meth}`~agnext.core.AgentRuntime.register_and_get` that both registers an agent\n",
"and gets its ID.\n",
"\n",
"```{note}\n",
"Because the runtime manages the lifecycle of agents, a reference to an agent,\n",
"whether it is {py:class}`~agnext.core.AgentId` or {py:class}`~agnext.core.AgentProxy`,\n",
"is only used to communicate with the agent or retrieve its metadata (e.g., description).\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Running the Single-Threaded Agent Runtime\n",
"\n",
"The above code snippet uses `runtime.start()` to start a background task\n",
"to process and deliver messages to recepients' message handlers.\n",
"This is a feature of the\n",
"local embedded runtime {py:class}`~agnext.application.SingleThreadedAgentRuntime`.\n",
"\n",
"To stop the background task immediately, use the `stop()` method:"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"run_context = runtime.start()\n",
"# ... Send messages, publish messages, etc.\n",
"await run_context.stop() # This will return immediately but will not cancel\n",
"# any in-progress message handling."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"You can resume the background task by calling `start()` again.\n",
"\n",
"For batch scenarios such as running benchmarks for evaluating agents,\n",
"you may want to wait for the background task to stop automatically when\n",
"there are no unprocessed messages and no agent is handling messages --\n",
"the batch may considered complete.\n",
"You can achieve this by using the `stop_when_idle()` method:"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"run_context = runtime.start()\n",
"# ... Send messages, publish messages, etc.\n",
"await run_context.stop_when_idle() # This will block until the runtime is idle."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"You can also directly process messages one-by-one without a background task using:"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"await runtime.process_next()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Other runtime implementations will have their own ways of running the runtime."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "agnext",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.9"
}
},
"nbformat": 4,
"nbformat_minor": 2
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Agent and Agent Runtime\n",
"\n",
"In this and the following section, we focus on the \n",
"[core concepts](../core-concepts/overview.md) of AGNext:\n",
"agents, agent runtime, messages, and communication.\n",
"You will not find any AI models or tools here, just the foundational\n",
"building blocks for building multi-agent applications."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"An agent in AGNext is an entity defined by the base class {py:class}`agnext.core.BaseAgent`.\n",
"It has a unique identifier of the type {py:class}`agnext.core.AgentId`,\n",
"a metadata dictionary of the type {py:class}`agnext.core.AgentMetadata`,\n",
"and method for handling messages {py:meth}`agnext.core.BaseAgent.on_message`.\n",
"\n",
"An agent runtime is the execution environment for agents in AGNext.\n",
"Similar to the runtime environment of a programming language,\n",
"an agent runtime provides the necessary infrastructure to facilitate communication\n",
"between agents, manage agent lifecycles, enforce security boundaries, and support monitoring and\n",
"debugging.\n",
"For local development, developers can use {py:class}`~agnext.application.SingleThreadedAgentRuntime`,\n",
"which can be embedded in a Python application.\n",
"\n",
"```{note}\n",
"Agents are not directly instantiated and managed by application code.\n",
"Instead, they are created by the runtime when needed and managed by the runtime.\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Implementing an Agent\n",
"\n",
"To implement an agent, developer must subclass the {py:class}`~agnext.core.BaseAgent` class,\n",
"declare the message types it can handle in the {py:attr}`~agnext.core.AgentMetadata.subscriptions` metadata,\n",
"and implement the {py:meth}`~agnext.core.BaseAgent.on_message` method.\n",
"This method is invoked when the agent receives a message. For example,\n",
"the following agent handles a simple message type and simply prints message it receives:"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"from dataclasses import dataclass\n",
"\n",
"from agnext.core import AgentId, BaseAgent, MessageContext\n",
"\n",
"\n",
"@dataclass\n",
"class MyMessage:\n",
" content: str\n",
"\n",
"\n",
"class MyAgent(BaseAgent):\n",
" def __init__(self) -> None:\n",
" super().__init__(\"MyAgent\")\n",
"\n",
" async def on_message(self, message: MyMessage, ctx: MessageContext) -> None:\n",
" print(f\"Received message: {message.content}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"For convenience, developers can subclass the {py:class}`~agnext.components.TypeRoutedAgent` class\n",
"which provides an easy-to use API to implement different message handlers for different message types.\n",
"See the section on message handlers below."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Registering Agents\n",
"\n",
"To make an agent available to the runtime, developers can use the\n",
"{py:meth}`~agnext.core.AgentRuntime.register` method.\n",
"The process of registration associates a name and a factory function\n",
"that creates an instance of the agent in a given namespace.\n",
"The factory function is used to allow automatic creation of agents when they are needed.\n",
"\n",
"For example, to register an agent with the {py:class}`~agnext.application.SingleThreadedAgentRuntime`,\n",
"the following code can be used:"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"AgentType(type='my_agent')"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from agnext.application import SingleThreadedAgentRuntime\n",
"\n",
"runtime = SingleThreadedAgentRuntime()\n",
"await runtime.register(\"my_agent\", lambda: MyAgent())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Once an agent is registered, the agent's unique ID of the type {py:class}`~agnext.core.AgentId` \n",
"can be retrieved by calling {py:meth}`~agnext.core.AgentRuntime.get`.\n",
"Using the agent ID, we can send messages to the agent:"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Received message: Hello, World!\n"
]
}
],
"source": [
"agent_id = AgentId(\"my_agent\", \"default\")\n",
"runtime.start() # Start processing messages in the background.\n",
"await runtime.send_message(MyMessage(content=\"Hello, World!\"), agent_id)\n",
"await runtime.stop() # Stop processing messages in the background."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"There is a convenience method\n",
"{py:meth}`~agnext.core.AgentRuntime.register_and_get` that both registers an agent\n",
"and gets its ID.\n",
"\n",
"```{note}\n",
"Because the runtime manages the lifecycle of agents, a reference to an agent,\n",
"whether it is {py:class}`~agnext.core.AgentId` or {py:class}`~agnext.core.AgentProxy`,\n",
"is only used to communicate with the agent or retrieve its metadata (e.g., description).\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Running the Single-Threaded Agent Runtime\n",
"\n",
"The above code snippet uses `runtime.start()` to start a background task\n",
"to process and deliver messages to recepients' message handlers.\n",
"This is a feature of the\n",
"local embedded runtime {py:class}`~agnext.application.SingleThreadedAgentRuntime`.\n",
"\n",
"To stop the background task immediately, use the `stop()` method:"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"runtime.start()\n",
"# ... Send messages, publish messages, etc.\n",
"await runtime.stop() # This will return immediately but will not cancel\n",
"# any in-progress message handling."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"You can resume the background task by calling `start()` again.\n",
"\n",
"For batch scenarios such as running benchmarks for evaluating agents,\n",
"you may want to wait for the background task to stop automatically when\n",
"there are no unprocessed messages and no agent is handling messages --\n",
"the batch may considered complete.\n",
"You can achieve this by using the `stop_when_idle()` method:"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"runtime.start()\n",
"# ... Send messages, publish messages, etc.\n",
"await runtime.stop_when_idle() # This will block until the runtime is idle."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"You can also directly process messages one-by-one without a background task using:"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"await runtime.process_next()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Other runtime implementations will have their own ways of running the runtime."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "agnext",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.9"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

View File

@ -136,10 +136,10 @@
}
],
"source": [
"run_context = runtime.start()\n",
"runtime.start()\n",
"await runtime.send_message(TextMessage(content=\"Hello, World!\", source=\"User\"), agent)\n",
"await runtime.send_message(ImageMessage(url=\"https://example.com/image.jpg\", source=\"User\"), agent)\n",
"await run_context.stop_when_idle()"
"await runtime.stop_when_idle()"
]
},
{
@ -241,10 +241,10 @@
"runtime = SingleThreadedAgentRuntime()\n",
"await runtime.register(\"inner_agent\", lambda: InnerAgent(\"InnerAgent\"))\n",
"await runtime.register(\"outer_agent\", lambda: OuterAgent(\"OuterAgent\", \"InnerAgent\"))\n",
"run_context = runtime.start()\n",
"runtime.start()\n",
"outer = AgentId(\"outer_agent\", \"default\")\n",
"await runtime.send_message(Message(content=\"Hello, World!\"), outer)\n",
"await run_context.stop_when_idle()"
"await runtime.stop_when_idle()"
]
},
{
@ -344,9 +344,9 @@
"await runtime.register(\"receiving_agent\", lambda: ReceivingAgent(\"Receiving Agent\"))\n",
"await runtime.add_subscription(TypeSubscription(\"default\", \"broadcasting_agent\"))\n",
"await runtime.add_subscription(TypeSubscription(\"default\", \"receiving_agent\"))\n",
"run_context = runtime.start()\n",
"runtime.start()\n",
"await runtime.send_message(Message(\"Hello, World!\"), AgentId(\"broadcasting_agent\", \"default\"))\n",
"await run_context.stop_when_idle()"
"await runtime.stop()"
]
},
{
@ -380,9 +380,9 @@
"await runtime.register(\"receiving_agent\", lambda: ReceivingAgent(\"Receiving Agent\"))\n",
"await runtime.add_subscription(TypeSubscription(\"default\", \"broadcasting_agent\"))\n",
"await runtime.add_subscription(TypeSubscription(\"default\", \"receiving_agent\"))\n",
"run_context = runtime.start()\n",
"runtime.start()\n",
"await runtime.publish_message(Message(\"Hello, World! From the runtime!\"), topic_id=TopicId(\"default\", \"default\"))\n",
"await run_context.stop_when_idle()"
"await runtime.stop_when_idle()"
]
},
{

View File

@ -331,13 +331,13 @@
" ),\n",
")\n",
"# Start the runtime processing messages.\n",
"run_context = runtime.start()\n",
"runtime.start()\n",
"# Send a message to the agent and get the response.\n",
"message = Message(\"Hello, what are some fun things to do in Seattle?\")\n",
"response = await runtime.send_message(message, AgentId(\"simple-agent\", \"default\"))\n",
"print(response.content)\n",
"# Stop the runtime processing messages.\n",
"await run_context.stop()"
"await runtime.stop()"
]
}
],

View File

@ -509,14 +509,14 @@
" lambda: CoderAgent(model_client=OpenAIChatCompletionClient(model=\"gpt-4o-mini\")),\n",
")\n",
"await runtime.add_subscription(TypeSubscription(\"default\", \"ReviewerAgent\"))\n",
"run_context = runtime.start()\n",
"runtime.start()\n",
"await runtime.publish_message(\n",
" message=CodeWritingTask(task=\"Write a function to find the sum of all even numbers in a list.\"),\n",
" topic_id=TopicId(\"default\", \"default\"),\n",
")\n",
"\n",
"# Keep processing messages until idle.\n",
"await run_context.stop_when_idle()"
"await runtime.stop_when_idle()"
]
},
{

View File

@ -1,324 +1,324 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Tools\n",
"\n",
"Tools are code that can be executed by an agent to perform actions. A tool\n",
"can be a simple function such as a calculator, or an API call to a third-party service\n",
"such as stock price lookup and weather forecast.\n",
"In the context of AI agents, tools are designed to be executed by agents in\n",
"response to model-generated function calls.\n",
"\n",
"AGNext provides the {py:mod}`agnext.components.tools` module with a suite of built-in\n",
"tools and utilities for creating and running custom tools."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Built-in Tools\n",
"\n",
"One of the built-in tools is the {py:class}`agnext.components.tools.PythonCodeExecutionTool`,\n",
"which allows agents to execute Python code snippets.\n",
"\n",
"Here is how you create the tool and use it."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from agnext.components.code_executor import LocalCommandLineCodeExecutor\n",
"from agnext.components.tools import PythonCodeExecutionTool\n",
"from agnext.core import CancellationToken\n",
"\n",
"# Create the tool.\n",
"code_executor = LocalCommandLineCodeExecutor()\n",
"code_execution_tool = PythonCodeExecutionTool(code_executor)\n",
"cancellation_token = CancellationToken()\n",
"\n",
"# Use the tool directly without an agent.\n",
"code = \"print('Hello, world!')\"\n",
"result = await code_execution_tool.run_json({\"code\": code}, cancellation_token)\n",
"print(code_execution_tool.return_value_as_string(result))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The {py:class}`~agnext.components.code_executor.LocalCommandLineCodeExecutor`\n",
"class is a built-in code executor that runs Python code snippets in a subprocess\n",
"in the local command line environment.\n",
"The {py:class}`~agnext.components.tools.PythonCodeExecutionTool` class wraps the code executor\n",
"and provides a simple interface to execute Python code snippets.\n",
"\n",
"Other built-in tools will be added in the future."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Custom Function Tools\n",
"\n",
"A tool can also be a simple Python function that performs a specific action.\n",
"To create a custom function tool, you just need to create a Python function\n",
"and use the {py:class}`agnext.components.tools.FunctionTool` class to wrap it.\n",
"\n",
"For example, a simple tool to obtain the stock price of a company might look like this:"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"138.75280591295171\n"
]
}
],
"source": [
"import random\n",
"\n",
"from agnext.components.tools import FunctionTool\n",
"from agnext.core import CancellationToken\n",
"from typing_extensions import Annotated\n",
"\n",
"\n",
"async def get_stock_price(ticker: str, date: Annotated[str, \"Date in YYYY/MM/DD\"]) -> float:\n",
" # Returns a random stock price for demonstration purposes.\n",
" return random.uniform(10, 200)\n",
"\n",
"\n",
"# Create a function tool.\n",
"stock_price_tool = FunctionTool(get_stock_price, description=\"Get the stock price.\")\n",
"\n",
"# Run the tool.\n",
"cancellation_token = CancellationToken()\n",
"result = await stock_price_tool.run_json({\"ticker\": \"AAPL\", \"date\": \"2021/01/01\"}, cancellation_token)\n",
"\n",
"# Print the result.\n",
"print(stock_price_tool.return_value_as_string(result))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Tool-Equipped Agent\n",
"\n",
"To use tools with an agent, you can use {py:class}`agnext.components.tool_agent.ToolAgent`,\n",
"by using it in a composition pattern.\n",
"Here is an example tool-use agent that uses {py:class}`~agnext.components.tool_agent.ToolAgent`\n",
"as an inner agent for executing tools."
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"import asyncio\n",
"from dataclasses import dataclass\n",
"from typing import List\n",
"\n",
"from agnext.application import SingleThreadedAgentRuntime\n",
"from agnext.components import FunctionCall, TypeRoutedAgent, message_handler\n",
"from agnext.components.models import (\n",
" AssistantMessage,\n",
" ChatCompletionClient,\n",
" FunctionExecutionResult,\n",
" FunctionExecutionResultMessage,\n",
" LLMMessage,\n",
" OpenAIChatCompletionClient,\n",
" SystemMessage,\n",
" UserMessage,\n",
")\n",
"from agnext.components.tool_agent import ToolAgent, ToolException\n",
"from agnext.components.tools import FunctionTool, Tool, ToolSchema\n",
"from agnext.core import AgentId, AgentInstantiationContext, MessageContext\n",
"\n",
"\n",
"@dataclass\n",
"class Message:\n",
" content: str\n",
"\n",
"\n",
"class ToolUseAgent(TypeRoutedAgent):\n",
" def __init__(self, model_client: ChatCompletionClient, tool_schema: List[ToolSchema], tool_agent: AgentId) -> None:\n",
" super().__init__(\"An agent with tools\")\n",
" self._system_messages: List[LLMMessage] = [SystemMessage(\"You are a helpful AI assistant.\")]\n",
" self._model_client = model_client\n",
" self._tool_schema = tool_schema\n",
" self._tool_agent = tool_agent\n",
"\n",
" @message_handler\n",
" async def handle_user_message(self, message: Message, ctx: MessageContext) -> Message:\n",
" # Create a session of messages.\n",
" session: List[LLMMessage] = [UserMessage(content=message.content, source=\"user\")]\n",
" # Get a response from the model.\n",
" response = await self._model_client.create(\n",
" self._system_messages + session, tools=self._tool_schema, cancellation_token=cancellation_token\n",
" )\n",
" # Add the response to the session.\n",
" session.append(AssistantMessage(content=response.content, source=\"assistant\"))\n",
"\n",
" # Keep iterating until the model stops generating tool calls.\n",
" while isinstance(response.content, list) and all(isinstance(item, FunctionCall) for item in response.content):\n",
" # Execute functions called by the model by sending messages to itself.\n",
" results: List[FunctionExecutionResult | BaseException] = await asyncio.gather(\n",
" *[self.send_message(call, self._tool_agent) for call in response.content],\n",
" return_exceptions=True,\n",
" )\n",
" # Combine the results into a single response and handle exceptions.\n",
" function_results: List[FunctionExecutionResult] = []\n",
" for result in results:\n",
" if isinstance(result, FunctionExecutionResult):\n",
" function_results.append(result)\n",
" elif isinstance(result, ToolException):\n",
" function_results.append(FunctionExecutionResult(content=f\"Error: {result}\", call_id=result.call_id))\n",
" elif isinstance(result, BaseException):\n",
" raise result # Unexpected exception.\n",
" session.append(FunctionExecutionResultMessage(content=function_results))\n",
" # Query the model again with the new response.\n",
" response = await self._model_client.create(\n",
" self._system_messages + session, tools=self._tool_schema, cancellation_token=cancellation_token\n",
" )\n",
" session.append(AssistantMessage(content=response.content, source=self.metadata[\"type\"]))\n",
"\n",
" # Return the final response.\n",
" assert isinstance(response.content, str)\n",
" return Message(content=response.content)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The `ToolUseAgent` class is a bit involved, however,\n",
"the core idea can be described using a simple control flow graph:\n",
"\n",
"![ToolUseAgent control flow graph](tool-use-agent-cfg.svg)\n",
"\n",
"The `ToolUseAgent`'s `handle_user_message` handler handles messages from the user,\n",
"and determines whether the model has generated a tool call.\n",
"If the model has generated tool calls, then the handler sends a function call\n",
"message to the {py:class}`~agnext.components.tool_agent.ToolAgent` agent\n",
"to execute the tools,\n",
"and then queries the model again with the results of the tool calls.\n",
"This process continues until the model stops generating tool calls,\n",
"at which point the final response is returned to the user.\n",
"\n",
"By having the tool execution logic in a separate agent,\n",
"we expose the model-tool interactions to the agent runtime as messages, so the tool executions\n",
"can be observed externally and intercepted if necessary.\n",
"\n",
"To run the agent, we need to create a runtime and register the agent."
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"# Create a runtime.\n",
"runtime = SingleThreadedAgentRuntime()\n",
"# Create the tools.\n",
"tools: List[Tool] = [FunctionTool(get_stock_price, description=\"Get the stock price.\")]\n",
"# Register the agents.\n",
"await runtime.register(\n",
" \"tool-executor-agent\",\n",
" lambda: ToolAgent(\n",
" description=\"Tool Executor Agent\",\n",
" tools=tools,\n",
" ),\n",
")\n",
"await runtime.register(\n",
" \"tool-use-agent\",\n",
" lambda: ToolUseAgent(\n",
" OpenAIChatCompletionClient(model=\"gpt-4o-mini\"),\n",
" tool_schema=[tool.schema for tool in tools],\n",
" tool_agent=AgentId(\"tool-executor-agent\", AgentInstantiationContext.current_agent_id().key),\n",
" ),\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This example uses the {py:class}`agnext.components.models.OpenAIChatCompletionClient`,\n",
"for Azure OpenAI and other clients, see [Model Clients](./model-clients.ipynb).\n",
"Let's test the agent with a question about stock price."
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"The stock price of NVDA on June 1, 2024, is approximately $49.28.\n"
]
}
],
"source": [
"# Start processing messages.\n",
"run_context = runtime.start()\n",
"# Send a direct message to the tool agent.\n",
"tool_use_agent = AgentId(\"tool-use-agent\", \"default\")\n",
"response = await runtime.send_message(Message(\"What is the stock price of NVDA on 2024/06/01?\"), tool_use_agent)\n",
"print(response.content)\n",
"# Stop processing messages.\n",
"await run_context.stop()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"See [samples](https://github.com/microsoft/agnext/tree/main/python/samples#tool-use-examples)\n",
"for more examples of using tools with agents, including how to use\n",
"broadcast communication model for tool execution, and how to intercept tool\n",
"execution for human-in-the-loop approval."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "agnext",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.9"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Tools\n",
"\n",
"Tools are code that can be executed by an agent to perform actions. A tool\n",
"can be a simple function such as a calculator, or an API call to a third-party service\n",
"such as stock price lookup and weather forecast.\n",
"In the context of AI agents, tools are designed to be executed by agents in\n",
"response to model-generated function calls.\n",
"\n",
"AGNext provides the {py:mod}`agnext.components.tools` module with a suite of built-in\n",
"tools and utilities for creating and running custom tools."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Built-in Tools\n",
"\n",
"One of the built-in tools is the {py:class}`agnext.components.tools.PythonCodeExecutionTool`,\n",
"which allows agents to execute Python code snippets.\n",
"\n",
"Here is how you create the tool and use it."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from agnext.components.code_executor import LocalCommandLineCodeExecutor\n",
"from agnext.components.tools import PythonCodeExecutionTool\n",
"from agnext.core import CancellationToken\n",
"\n",
"# Create the tool.\n",
"code_executor = LocalCommandLineCodeExecutor()\n",
"code_execution_tool = PythonCodeExecutionTool(code_executor)\n",
"cancellation_token = CancellationToken()\n",
"\n",
"# Use the tool directly without an agent.\n",
"code = \"print('Hello, world!')\"\n",
"result = await code_execution_tool.run_json({\"code\": code}, cancellation_token)\n",
"print(code_execution_tool.return_value_as_string(result))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The {py:class}`~agnext.components.code_executor.LocalCommandLineCodeExecutor`\n",
"class is a built-in code executor that runs Python code snippets in a subprocess\n",
"in the local command line environment.\n",
"The {py:class}`~agnext.components.tools.PythonCodeExecutionTool` class wraps the code executor\n",
"and provides a simple interface to execute Python code snippets.\n",
"\n",
"Other built-in tools will be added in the future."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Custom Function Tools\n",
"\n",
"A tool can also be a simple Python function that performs a specific action.\n",
"To create a custom function tool, you just need to create a Python function\n",
"and use the {py:class}`agnext.components.tools.FunctionTool` class to wrap it.\n",
"\n",
"For example, a simple tool to obtain the stock price of a company might look like this:"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"138.75280591295171\n"
]
}
],
"source": [
"import random\n",
"\n",
"from agnext.components.tools import FunctionTool\n",
"from agnext.core import CancellationToken\n",
"from typing_extensions import Annotated\n",
"\n",
"\n",
"async def get_stock_price(ticker: str, date: Annotated[str, \"Date in YYYY/MM/DD\"]) -> float:\n",
" # Returns a random stock price for demonstration purposes.\n",
" return random.uniform(10, 200)\n",
"\n",
"\n",
"# Create a function tool.\n",
"stock_price_tool = FunctionTool(get_stock_price, description=\"Get the stock price.\")\n",
"\n",
"# Run the tool.\n",
"cancellation_token = CancellationToken()\n",
"result = await stock_price_tool.run_json({\"ticker\": \"AAPL\", \"date\": \"2021/01/01\"}, cancellation_token)\n",
"\n",
"# Print the result.\n",
"print(stock_price_tool.return_value_as_string(result))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Tool-Equipped Agent\n",
"\n",
"To use tools with an agent, you can use {py:class}`agnext.components.tool_agent.ToolAgent`,\n",
"by using it in a composition pattern.\n",
"Here is an example tool-use agent that uses {py:class}`~agnext.components.tool_agent.ToolAgent`\n",
"as an inner agent for executing tools."
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"import asyncio\n",
"from dataclasses import dataclass\n",
"from typing import List\n",
"\n",
"from agnext.application import SingleThreadedAgentRuntime\n",
"from agnext.components import FunctionCall, TypeRoutedAgent, message_handler\n",
"from agnext.components.models import (\n",
" AssistantMessage,\n",
" ChatCompletionClient,\n",
" FunctionExecutionResult,\n",
" FunctionExecutionResultMessage,\n",
" LLMMessage,\n",
" OpenAIChatCompletionClient,\n",
" SystemMessage,\n",
" UserMessage,\n",
")\n",
"from agnext.components.tool_agent import ToolAgent, ToolException\n",
"from agnext.components.tools import FunctionTool, Tool, ToolSchema\n",
"from agnext.core import AgentId, AgentInstantiationContext, MessageContext\n",
"\n",
"\n",
"@dataclass\n",
"class Message:\n",
" content: str\n",
"\n",
"\n",
"class ToolUseAgent(TypeRoutedAgent):\n",
" def __init__(self, model_client: ChatCompletionClient, tool_schema: List[ToolSchema], tool_agent: AgentId) -> None:\n",
" super().__init__(\"An agent with tools\")\n",
" self._system_messages: List[LLMMessage] = [SystemMessage(\"You are a helpful AI assistant.\")]\n",
" self._model_client = model_client\n",
" self._tool_schema = tool_schema\n",
" self._tool_agent = tool_agent\n",
"\n",
" @message_handler\n",
" async def handle_user_message(self, message: Message, ctx: MessageContext) -> Message:\n",
" # Create a session of messages.\n",
" session: List[LLMMessage] = [UserMessage(content=message.content, source=\"user\")]\n",
" # Get a response from the model.\n",
" response = await self._model_client.create(\n",
" self._system_messages + session, tools=self._tool_schema, cancellation_token=cancellation_token\n",
" )\n",
" # Add the response to the session.\n",
" session.append(AssistantMessage(content=response.content, source=\"assistant\"))\n",
"\n",
" # Keep iterating until the model stops generating tool calls.\n",
" while isinstance(response.content, list) and all(isinstance(item, FunctionCall) for item in response.content):\n",
" # Execute functions called by the model by sending messages to itself.\n",
" results: List[FunctionExecutionResult | BaseException] = await asyncio.gather(\n",
" *[self.send_message(call, self._tool_agent) for call in response.content],\n",
" return_exceptions=True,\n",
" )\n",
" # Combine the results into a single response and handle exceptions.\n",
" function_results: List[FunctionExecutionResult] = []\n",
" for result in results:\n",
" if isinstance(result, FunctionExecutionResult):\n",
" function_results.append(result)\n",
" elif isinstance(result, ToolException):\n",
" function_results.append(FunctionExecutionResult(content=f\"Error: {result}\", call_id=result.call_id))\n",
" elif isinstance(result, BaseException):\n",
" raise result # Unexpected exception.\n",
" session.append(FunctionExecutionResultMessage(content=function_results))\n",
" # Query the model again with the new response.\n",
" response = await self._model_client.create(\n",
" self._system_messages + session, tools=self._tool_schema, cancellation_token=cancellation_token\n",
" )\n",
" session.append(AssistantMessage(content=response.content, source=self.metadata[\"type\"]))\n",
"\n",
" # Return the final response.\n",
" assert isinstance(response.content, str)\n",
" return Message(content=response.content)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The `ToolUseAgent` class is a bit involved, however,\n",
"the core idea can be described using a simple control flow graph:\n",
"\n",
"![ToolUseAgent control flow graph](tool-use-agent-cfg.svg)\n",
"\n",
"The `ToolUseAgent`'s `handle_user_message` handler handles messages from the user,\n",
"and determines whether the model has generated a tool call.\n",
"If the model has generated tool calls, then the handler sends a function call\n",
"message to the {py:class}`~agnext.components.tool_agent.ToolAgent` agent\n",
"to execute the tools,\n",
"and then queries the model again with the results of the tool calls.\n",
"This process continues until the model stops generating tool calls,\n",
"at which point the final response is returned to the user.\n",
"\n",
"By having the tool execution logic in a separate agent,\n",
"we expose the model-tool interactions to the agent runtime as messages, so the tool executions\n",
"can be observed externally and intercepted if necessary.\n",
"\n",
"To run the agent, we need to create a runtime and register the agent."
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"# Create a runtime.\n",
"runtime = SingleThreadedAgentRuntime()\n",
"# Create the tools.\n",
"tools: List[Tool] = [FunctionTool(get_stock_price, description=\"Get the stock price.\")]\n",
"# Register the agents.\n",
"await runtime.register(\n",
" \"tool-executor-agent\",\n",
" lambda: ToolAgent(\n",
" description=\"Tool Executor Agent\",\n",
" tools=tools,\n",
" ),\n",
")\n",
"await runtime.register(\n",
" \"tool-use-agent\",\n",
" lambda: ToolUseAgent(\n",
" OpenAIChatCompletionClient(model=\"gpt-4o-mini\"),\n",
" tool_schema=[tool.schema for tool in tools],\n",
" tool_agent=AgentId(\"tool-executor-agent\", AgentInstantiationContext.current_agent_id().key),\n",
" ),\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This example uses the {py:class}`agnext.components.models.OpenAIChatCompletionClient`,\n",
"for Azure OpenAI and other clients, see [Model Clients](./model-clients.ipynb).\n",
"Let's test the agent with a question about stock price."
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"The stock price of NVDA on June 1, 2024, is approximately $49.28.\n"
]
}
],
"source": [
"# Start processing messages.\n",
"runtime.start()\n",
"# Send a direct message to the tool agent.\n",
"tool_use_agent = AgentId(\"tool-use-agent\", \"default\")\n",
"response = await runtime.send_message(Message(\"What is the stock price of NVDA on 2024/06/01?\"), tool_use_agent)\n",
"print(response.content)\n",
"# Stop processing messages.\n",
"await runtime.stop()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"See [samples](https://github.com/microsoft/agnext/tree/main/python/samples#tool-use-examples)\n",
"for more examples of using tools with agents, including how to use\n",
"broadcast communication model for tool execution, and how to intercept tool\n",
"execution for human-in-the-loop approval."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "agnext",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.9"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

View File

@ -120,12 +120,12 @@ async def main() -> None:
)
agent = AgentId("langgraph_tool_use_agent", key="default")
# Start the runtime.
run_context = runtime.start()
runtime.start()
# Send a message to the agent and get a response.
response = await runtime.send_message(Message("What's the weather in SF?"), agent)
print(response.content)
# Stop the runtime.
await run_context.stop()
await runtime.stop()
if __name__ == "__main__":

View File

@ -125,7 +125,7 @@ async def main() -> None:
)
agent = AgentId("chat_agent", key="default")
run_context = runtime.start()
runtime.start()
# Send a message to the agent and get the response.
message = Message(content="What are the best movies from studio Ghibli?")
@ -137,7 +137,7 @@ async def main() -> None:
for source in response.sources:
print(source.content)
await run_context.stop()
await runtime.stop()
if __name__ == "__main__":

View File

@ -49,11 +49,11 @@ async def main() -> None:
await runtime.register("outer", lambda: Outer(AgentId("outer", AgentInstantiationContext.current_agent_id().key)))
outer = AgentId("outer", "default")
run_context = runtime.start()
runtime.start()
response = await runtime.send_message(MessageType(body="Hello", sender="external"), outer)
print(response)
await run_context.stop()
await runtime.stop()
if __name__ == "__main__":

View File

@ -52,7 +52,7 @@ async def main() -> None:
)
agent = AgentId("chat_agent", "default")
run_context = runtime.start()
runtime.start()
# Send a message to the agent and get the response.
message = Message(content="Hello, what are some fun things to do in Seattle?")
@ -60,7 +60,7 @@ async def main() -> None:
assert isinstance(response, Message)
print(response.content)
await run_context.stop()
await runtime.stop()
if __name__ == "__main__":

View File

@ -108,14 +108,14 @@ async def main() -> None:
)
await runtime.add_subscription(TypeSubscription("default", "Cathy"))
run_context = runtime.start()
runtime.start()
# Send a message to Jack to start the conversation.
message = Message(content="Can you tell me something fun about SF?", source="User")
await runtime.send_message(message, AgentId("jack", "default"))
# Process messages.
await run_context.stop_when_idle()
await runtime.stop_when_idle()
if __name__ == "__main__":

View File

@ -233,7 +233,7 @@ Type "exit" to exit the chat.
"""
runtime = SingleThreadedAgentRuntime()
user = await assistant_chat(runtime)
_run_context = runtime.start()
runtime.start()
print(usage)
# Request the user to start the conversation.
await runtime.send_message(PublishNow(), AgentId(user, "default"))

View File

@ -147,7 +147,7 @@ async def main() -> None:
runtime = SingleThreadedAgentRuntime()
app = TextualChatApp(runtime, user_name="You")
await chat_room(runtime, app)
_run_context = runtime.start()
runtime.start()
await app.run_async()

View File

@ -212,12 +212,12 @@ async def chess_game(runtime: AgentRuntime) -> None: # type: ignore
async def main() -> None:
runtime = SingleThreadedAgentRuntime()
await chess_game(runtime)
run_context = runtime.start()
runtime.start()
# Publish an initial message to trigger the group chat manager to start orchestration.
await runtime.publish_message(
TextMessage(content="Game started.", source="System"), topic_id=TopicId("default", "default")
)
await run_context.stop_when_idle()
await runtime.stop_when_idle()
if __name__ == "__main__":

View File

@ -106,7 +106,7 @@ async def main() -> None:
runtime = SingleThreadedAgentRuntime()
app = TextualChatApp(runtime, user_name="You")
await illustrator_critics(runtime, app)
_run_context = runtime.start()
runtime.start()
await app.run_async()

View File

@ -288,7 +288,7 @@ async def main() -> None:
app = TextualChatApp(runtime, user_name="You")
await software_consultancy(runtime, app)
# Start the runtime.
_run_context = runtime.start()
runtime.start()
# Start the app.
await app.run_async()

View File

@ -32,7 +32,7 @@ async def main() -> None:
await build_app(runtime)
await runtime.register("Printer", lambda: Printer())
ctx = runtime.start()
runtime.start()
topic_id = TopicId("default", "default")
@ -45,7 +45,7 @@ async def main() -> None:
topic_id=topic_id,
)
await ctx.stop_when_idle()
await runtime.stop_when_idle()
if __name__ == "__main__":

View File

@ -200,12 +200,12 @@ async def main(task: str, temp_dir: str) -> None:
await runtime.register("executor", lambda: Executor(executor=LocalCommandLineCodeExecutor(work_dir=temp_dir)))
await runtime.add_subscription(TypeSubscription("default", "coder"))
await runtime.add_subscription(TypeSubscription("default", "executor"))
run_context = runtime.start()
runtime.start()
# Publish the task message.
await runtime.publish_message(TaskMessage(content=task), topic_id=TopicId("default", "default"))
await run_context.stop_when_idle()
await runtime.stop_when_idle()
if __name__ == "__main__":

View File

@ -281,7 +281,7 @@ async def main() -> None:
),
)
await runtime.add_subscription(TypeSubscription("default", "CoderAgent"))
run_context = runtime.start()
runtime.start()
await runtime.publish_message(
message=CodeWritingTask(
task="Write a function to find the directory with the largest number of files using multi-processing."
@ -290,7 +290,7 @@ async def main() -> None:
)
# Keep processing messages until idle.
await run_context.stop_when_idle()
await runtime.stop_when_idle()
if __name__ == "__main__":

View File

@ -157,14 +157,14 @@ async def main() -> None:
)
# Start the runtime.
run_context = runtime.start()
runtime.start()
# Start the conversation.
await runtime.publish_message(
Message(content="Hello, everyone!", source="Moderator"), topic_id=TopicId("default", "default")
)
await run_context.stop_when_idle()
await runtime.stop_when_idle()
if __name__ == "__main__":

View File

@ -158,13 +158,13 @@ async def main() -> None:
),
)
await runtime.add_subscription(TypeSubscription("default", "AggregatorAgent"))
run_context = runtime.start()
runtime.start()
await runtime.publish_message(
AggregatorTask(task="What are something fun to do in SF?"), topic_id=TopicId("default", "default")
)
# Keep processing messages.
await run_context.stop_when_idle()
await runtime.stop_when_idle()
if __name__ == "__main__":

View File

@ -264,12 +264,12 @@ async def main(question: str) -> None:
# Register the aggregator agent.
await runtime.register("MathAggregator", lambda: MathAggregator(num_solvers=4))
run_context = runtime.start()
runtime.start()
# Send a math problem to the aggregator agent.
await runtime.publish_message(Question(content=question), topic_id=TopicId("default", "default"))
await run_context.stop_when_idle()
await runtime.stop_when_idle()
if __name__ == "__main__":

View File

@ -125,7 +125,7 @@ async def main() -> None:
),
)
run_context = runtime.start()
runtime.start()
# Send a task to the tool user.
response = await runtime.send_message(
@ -134,7 +134,7 @@ async def main() -> None:
print(response.content)
# Run the runtime until the task is completed.
await run_context.stop()
await runtime.stop()
if __name__ == "__main__":

View File

@ -66,7 +66,7 @@ async def main() -> None:
),
)
run_context = runtime.start()
runtime.start()
# Send a task to the tool user.
response = await runtime.send_message(
@ -75,7 +75,7 @@ async def main() -> None:
print(response.content)
# Run the runtime until the task is completed.
await run_context.stop()
await runtime.stop()
if __name__ == "__main__":

View File

@ -210,14 +210,14 @@ async def main() -> None:
)
await runtime.add_subscription(TypeSubscription("default", "tool_use_agent"))
run_context = runtime.start()
runtime.start()
# Publish a task.
await runtime.publish_message(
UserRequest("Run the following Python code: print('Hello, World!')"), topic_id=TopicId("default", "default")
)
await run_context.stop_when_idle()
await runtime.stop_when_idle()
if __name__ == "__main__":

View File

@ -63,7 +63,7 @@ async def main() -> None:
)
tool_use_agent = AgentId("tool_enabled_agent", "default")
run_context = runtime.start()
runtime.start()
# Send a task to the tool user.
response = await runtime.send_message(Message("What is the stock price of NVDA on 2024/06/01"), tool_use_agent)
@ -72,7 +72,7 @@ async def main() -> None:
print(response.content)
# Run the runtime until the task is completed.
await run_context.stop()
await runtime.stop()
if __name__ == "__main__":

View File

@ -137,6 +137,8 @@ class SingleThreadedAgentRuntime(AgentRuntime):
self._seen_topics: Set[TopicId] = set()
self._subscribed_recipients: DefaultDict[TopicId, List[AgentId]] = defaultdict(list)
self._run_context: RunContext | None = None
@property
def unprocessed_messages(
self,
@ -430,8 +432,26 @@ class SingleThreadedAgentRuntime(AgentRuntime):
def idle(self) -> bool:
return len(self._message_queue) == 0 and self._outstanding_tasks.get() == 0
def start(self) -> RunContext:
return RunContext(self)
def start(self) -> None:
"""Start the runtime message processing loop."""
if self._run_context is not None:
raise RuntimeError("Runtime is already started")
self._run_context = RunContext(self)
async def stop(self) -> None:
"""Stop the runtime message processing loop."""
if self._run_context is None:
raise RuntimeError("Runtime is not started")
await self._run_context.stop()
self._run_context = None
async def stop_when_idle(self) -> None:
"""Stop the runtime message processing loop when there is
no outstanding message being processed or queued."""
if self._run_context is None:
raise RuntimeError("Runtime is not started")
await self._run_context.stop_when_idle()
self._run_context = None
async def agent_metadata(self, agent: AgentId) -> AgentMetadata:
return (await self._get_agent(agent)).metadata

View File

@ -39,9 +39,9 @@ async def main() -> None:
),
)
run_context = runtime.start()
runtime.start()
await runtime.send_message(RequestReplyMessage(), user_proxy.id)
await run_context.stop_when_idle()
await runtime.stop_when_idle()
if __name__ == "__main__":

View File

@ -33,9 +33,9 @@ async def main() -> None:
await runtime.register("orchestrator", lambda: RoundRobinOrchestrator([coder, executor, user_proxy]))
run_context = runtime.start()
runtime.start()
await runtime.send_message(RequestReplyMessage(), user_proxy.id)
await run_context.stop_when_idle()
await runtime.stop_when_idle()
if __name__ == "__main__":

View File

@ -33,9 +33,9 @@ async def main() -> None:
await runtime.register("orchestrator", lambda: RoundRobinOrchestrator([file_surfer, user_proxy]))
run_context = runtime.start()
runtime.start()
await runtime.send_message(RequestReplyMessage(), user_proxy.id)
await run_context.stop_when_idle()
await runtime.stop_when_idle()
if __name__ == "__main__":

View File

@ -25,10 +25,10 @@ async def main() -> None:
await runtime.register("orchestrator", lambda: RoundRobinOrchestrator([fake1, fake2, fake3]))
task_message = UserMessage(content="Test Message", source="User")
run_context = runtime.start()
runtime.start()
await runtime.publish_message(BroadcastMessage(task_message), topic_id=TopicId("default", "default"))
await run_context.stop_when_idle()
await runtime.stop_when_idle()
if __name__ == "__main__":

View File

@ -34,9 +34,9 @@ async def main() -> None:
await runtime.register("orchestrator", lambda: RoundRobinOrchestrator([coder, user_proxy]))
run_context = runtime.start()
runtime.start()
await runtime.send_message(RequestReplyMessage(), user_proxy.id)
await run_context.stop_when_idle()
await runtime.stop_when_idle()
if __name__ == "__main__":

View File

@ -36,7 +36,7 @@ async def main() -> None:
await runtime.register("orchestrator", lambda: RoundRobinOrchestrator([web_surfer, user_proxy]))
run_context = runtime.start()
runtime.start()
actual_surfer = await runtime.try_get_underlying_agent_instance(web_surfer.id, type=MultimodalWebSurfer)
await actual_surfer.init(
@ -47,7 +47,7 @@ async def main() -> None:
)
await runtime.send_message(RequestReplyMessage(), user_proxy.id)
await run_context.stop_when_idle()
await runtime.stop_when_idle()
if __name__ == "__main__":

View File

@ -104,7 +104,7 @@ async def test_web_surfer() -> None:
lambda: MultimodalWebSurfer(),
)
web_surfer = AgentId("WebSurfer", "default")
run_context = runtime.start()
runtime.start()
actual_surfer = await runtime.try_get_underlying_agent_instance(web_surfer, MultimodalWebSurfer)
await actual_surfer.init(model_client=client, downloads_folder=os.getcwd(), browser_channel="chromium")
@ -150,7 +150,7 @@ async def test_web_surfer() -> None:
with pytest.raises(AuthenticationError):
tool_resp = await make_browser_request(actual_surfer, TOOL_SUMMARIZE_PAGE)
await run_context.stop_when_idle()
await runtime.stop_when_idle()
@pytest.mark.skipif(
skip_all or skip_openai,
@ -176,7 +176,7 @@ async def test_web_surfer_oai() -> None:
)
user_proxy = AgentProxy(AgentId("UserProxy", "default"), runtime)
await runtime.register("orchestrator", lambda: RoundRobinOrchestrator([web_surfer, user_proxy]))
run_context = runtime.start()
runtime.start()
actual_surfer = await runtime.try_get_underlying_agent_instance(web_surfer.id, MultimodalWebSurfer)
await actual_surfer.init(model_client=client, downloads_folder=os.getcwd(), browser_channel="chromium")
@ -206,7 +206,7 @@ async def test_web_surfer_oai() -> None:
recipient=web_surfer.id,
sender=user_proxy.id
)
await run_context.stop_when_idle()
await runtime.stop_when_idle()
@pytest.mark.skipif(
skip_bing,
@ -232,7 +232,7 @@ async def test_web_surfer_bing() -> None:
)
web_surfer = AgentProxy(AgentId("WebSurfer", "default"), runtime)
run_context = runtime.start()
runtime.start()
actual_surfer = await runtime.try_get_underlying_agent_instance(web_surfer.id, MultimodalWebSurfer)
await actual_surfer.init(model_client=client, downloads_folder=os.getcwd(), browser_channel="chromium")
@ -247,7 +247,7 @@ async def test_web_surfer_bing() -> None:
tool_resp = await make_browser_request(actual_surfer, TOOL_WEB_SEARCH, {"query": BING_QUERY + " Wikipedia"})
markdown = await actual_surfer._get_page_markdown() # type: ignore
assert "https://en.wikipedia.org/wiki/" in markdown
await run_context.stop_when_idle()
await runtime.stop_when_idle()
if __name__ == "__main__":
"""Runs this file's tests from the command line."""

View File

@ -35,14 +35,14 @@ async def test_register_receives_publish() -> None:
await runtime.register("name", lambda: ClosureAgent("my_agent", log_message))
await runtime.add_subscription(TypeSubscription("default", "name"))
topic_id = TopicId("default", "default")
run_context = runtime.start()
runtime.start()
await runtime.publish_message(Message("first message"), topic_id=topic_id)
await runtime.publish_message(Message("second message"), topic_id=topic_id)
await runtime.publish_message(Message("third message"), topic_id=topic_id)
await run_context.stop_when_idle()
await runtime.stop_when_idle()
assert queue.qsize() == 3
assert queue.get_nowait() == ("default", "first message")

View File

@ -21,11 +21,11 @@ async def test_intervention_count_messages() -> None:
runtime = SingleThreadedAgentRuntime(intervention_handler=handler)
await runtime.register("name", LoopbackAgent)
loopback = AgentId("name", key="default")
run_context = runtime.start()
runtime.start()
_response = await runtime.send_message(MessageType(), recipient=loopback)
await run_context.stop()
await runtime.stop()
assert handler.num_messages == 1
loopback_agent = await runtime.try_get_underlying_agent_instance(loopback, type=LoopbackAgent)
@ -43,12 +43,12 @@ async def test_intervention_drop_send() -> None:
await runtime.register("name", LoopbackAgent)
loopback = AgentId("name", key="default")
run_context = runtime.start()
runtime.start()
with pytest.raises(MessageDroppedException):
_response = await runtime.send_message(MessageType(), recipient=loopback)
await run_context.stop()
await runtime.stop()
loopback_agent = await runtime.try_get_underlying_agent_instance(loopback, type=LoopbackAgent)
assert loopback_agent.num_calls == 0
@ -66,12 +66,12 @@ async def test_intervention_drop_response() -> None:
await runtime.register("name", LoopbackAgent)
loopback = AgentId("name", key="default")
run_context = runtime.start()
runtime.start()
with pytest.raises(MessageDroppedException):
_response = await runtime.send_message(MessageType(), recipient=loopback)
await run_context.stop()
await runtime.stop()
@pytest.mark.asyncio
@ -89,12 +89,12 @@ async def test_intervention_raise_exception_on_send() -> None:
await runtime.register("name", LoopbackAgent)
loopback = AgentId("name", key="default")
run_context = runtime.start()
runtime.start()
with pytest.raises(InterventionException):
_response = await runtime.send_message(MessageType(), recipient=loopback)
await run_context.stop()
await runtime.stop()
long_running_agent = await runtime.try_get_underlying_agent_instance(loopback, type=LoopbackAgent)
assert long_running_agent.num_calls == 0
@ -114,11 +114,11 @@ async def test_intervention_raise_exception_on_respond() -> None:
await runtime.register("name", LoopbackAgent)
loopback = AgentId("name", key="default")
run_context = runtime.start()
runtime.start()
with pytest.raises(InterventionException):
_response = await runtime.send_message(MessageType(), recipient=loopback)
await run_context.stop()
await runtime.stop()
long_running_agent = await runtime.try_get_underlying_agent_instance(loopback, type=LoopbackAgent)
assert long_running_agent.num_calls == 1

View File

@ -30,13 +30,13 @@ async def test_register_receives_publish() -> None:
runtime = SingleThreadedAgentRuntime()
await runtime.register("name", LoopbackAgent)
run_context = runtime.start()
runtime.start()
await runtime.add_subscription(TypeSubscription("default", "name"))
agent_id = AgentId("name", key="default")
topic_id = TopicId("default", "default")
await runtime.publish_message(MessageType(), topic_id=topic_id)
await run_context.stop_when_idle()
await runtime.stop_when_idle()
# Agent in default namespace should have received the message
long_running_agent = await runtime.try_get_underlying_agent_instance(agent_id, type=LoopbackAgent)
@ -62,7 +62,7 @@ async def test_register_receives_publish_cascade() -> None:
await runtime.register(f"name{i}", lambda: CascadingAgent(max_rounds))
await runtime.add_subscription(TypeSubscription("default", f"name{i}"))
run_context = runtime.start()
runtime.start()
# Publish messages
topic_id = TopicId("default", "default")
@ -70,7 +70,7 @@ async def test_register_receives_publish_cascade() -> None:
await runtime.publish_message(CascadingMessageType(round=1), topic_id)
# Process until idle.
await run_context.stop_when_idle()
await runtime.stop_when_idle()
# Check that each agent received the correct number of messages.
for i in range(num_agents):

View File

@ -44,7 +44,7 @@ async def test_tool_agent() -> None:
),
)
agent = AgentId("tool_agent", "default")
run = runtime.start()
runtime.start()
# Test pass function
result = await runtime.send_message(
@ -73,4 +73,4 @@ async def test_tool_agent() -> None:
with pytest.raises(asyncio.CancelledError):
await result_future
await run.stop()
await runtime.stop()