mirror of
https://github.com/microsoft/autogen.git
synced 2025-09-02 12:57:21 +00:00
Core API example showing multiple agents concurrently (#4461)
* Core API example showing multiple agents concurrently (#4427) * Apply feedback * Add different topics for closure agent example * Address feedback * Mypy fix * Modify import path based on refactoring --------- Co-authored-by: Eric Zhu <ekzhu@users.noreply.github.com>
This commit is contained in:
parent
8b05e03520
commit
39ff183fad
@ -0,0 +1,403 @@
|
||||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"# Concurrent Agents\n",
|
||||
"\n",
|
||||
"In this section, we explore the use of multiple agents working concurrently. We cover three main patterns:\n",
|
||||
"\n",
|
||||
"1. **Single Message & Multiple Processors** \n",
|
||||
" Demonstrates how a single message can be processed by multiple agents subscribed to the same topic simultaneously.\n",
|
||||
"\n",
|
||||
"2. **Multiple Messages & Multiple Processors** \n",
|
||||
" Illustrates how specific message types can be routed to dedicated agents based on topics.\n",
|
||||
"\n",
|
||||
"3. **Direct Messaging** \n",
|
||||
" Focuses on sending messages between agents and from the runtime to agents."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 21,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"import asyncio\n",
|
||||
"from dataclasses import dataclass\n",
|
||||
"\n",
|
||||
"from autogen_core import (\n",
|
||||
" AgentId,\n",
|
||||
" ClosureAgent,\n",
|
||||
" ClosureContext,\n",
|
||||
" DefaultTopicId,\n",
|
||||
" MessageContext,\n",
|
||||
" RoutedAgent,\n",
|
||||
" TopicId,\n",
|
||||
" TypeSubscription,\n",
|
||||
" default_subscription,\n",
|
||||
" message_handler,\n",
|
||||
" type_subscription,\n",
|
||||
")\n",
|
||||
"from autogen_core.application import SingleThreadedAgentRuntime"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 2,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"@dataclass\n",
|
||||
"class Task:\n",
|
||||
" task_id: str\n",
|
||||
"\n",
|
||||
"\n",
|
||||
"@dataclass\n",
|
||||
"class TaskResponse:\n",
|
||||
" task_id: str\n",
|
||||
" result: str"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Single Message & Multiple Processors\n",
|
||||
"The first pattern shows how a single message can be processed by multiple agents simultaneously:\n",
|
||||
"\n",
|
||||
"- Each `Processor` agent subscribes to the default topic using the {py:meth}`~autogen_core.components.default_subscription` decorator.\n",
|
||||
"- When publishing a message to the default topic, all registered agents will process the message independently.\n",
|
||||
"```{note}\n",
|
||||
"Below, we are subscribing `Processor` using the {py:meth}`~autogen_core.components.default_subscription` decorator, there's an alternative way to subscribe an agent without using decorators altogether as shown in [Subscribe and Publish to Topics](../framework/message-and-communication.ipynb#subscribe-and-publish-to-topics), this way the same agent class can be subscribed to different topics.\n",
|
||||
"```\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 3,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"@default_subscription\n",
|
||||
"class Processor(RoutedAgent):\n",
|
||||
" @message_handler\n",
|
||||
" async def on_task(self, message: Task, ctx: MessageContext) -> None:\n",
|
||||
" print(f\"{self._description} starting task {message.task_id}\")\n",
|
||||
" await asyncio.sleep(2) # Simulate work\n",
|
||||
" print(f\"{self._description} finished task {message.task_id}\")"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 4,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"Agent 1 starting task task-1\n",
|
||||
"Agent 2 starting task task-1\n",
|
||||
"Agent 1 finished task task-1\n",
|
||||
"Agent 2 finished task task-1\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"runtime = SingleThreadedAgentRuntime()\n",
|
||||
"\n",
|
||||
"await Processor.register(runtime, \"agent_1\", lambda: Processor(\"Agent 1\"))\n",
|
||||
"await Processor.register(runtime, \"agent_2\", lambda: Processor(\"Agent 2\"))\n",
|
||||
"\n",
|
||||
"runtime.start()\n",
|
||||
"\n",
|
||||
"await runtime.publish_message(Task(task_id=\"task-1\"), topic_id=DefaultTopicId())\n",
|
||||
"\n",
|
||||
"await runtime.stop_when_idle()"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Multiple messages & Multiple Processors\n",
|
||||
"Second, this pattern demonstrates routing different types of messages to specific processors:\n",
|
||||
"- `UrgentProcessor` subscribes to the \"urgent\" topic\n",
|
||||
"- `NormalProcessor` subscribes to the \"normal\" topic\n",
|
||||
"\n",
|
||||
"We make an agent subscribe to a specific topic type using the {py:meth}`~autogen_core.components.type_subscription` decorator."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 50,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"TASK_RESULTS_TOPIC_TYPE = \"task-results\"\n",
|
||||
"task_results_topic_id = TopicId(type=TASK_RESULTS_TOPIC_TYPE, source=\"default\")\n",
|
||||
"\n",
|
||||
"\n",
|
||||
"@type_subscription(topic_type=\"urgent\")\n",
|
||||
"class UrgentProcessor(RoutedAgent):\n",
|
||||
" @message_handler\n",
|
||||
" async def on_task(self, message: Task, ctx: MessageContext) -> None:\n",
|
||||
" print(f\"Urgent processor starting task {message.task_id}\")\n",
|
||||
" await asyncio.sleep(1) # Simulate work\n",
|
||||
" print(f\"Urgent processor finished task {message.task_id}\")\n",
|
||||
"\n",
|
||||
" task_response = TaskResponse(task_id=message.task_id, result=\"Results by Urgent Processor\")\n",
|
||||
" await self.publish_message(task_response, topic_id=task_results_topic_id)\n",
|
||||
"\n",
|
||||
"\n",
|
||||
"@type_subscription(topic_type=\"normal\")\n",
|
||||
"class NormalProcessor(RoutedAgent):\n",
|
||||
" @message_handler\n",
|
||||
" async def on_task(self, message: Task, ctx: MessageContext) -> None:\n",
|
||||
" print(f\"Normal processor starting task {message.task_id}\")\n",
|
||||
" await asyncio.sleep(3) # Simulate work\n",
|
||||
" print(f\"Normal processor finished task {message.task_id}\")\n",
|
||||
"\n",
|
||||
" task_response = TaskResponse(task_id=message.task_id, result=\"Results by Normal Processor\")\n",
|
||||
" await self.publish_message(task_response, topic_id=task_results_topic_id)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"After registering the agents, we can publish messages to the \"urgent\" and \"normal\" topics:"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 51,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"Normal processor starting task normal-1\n",
|
||||
"Urgent processor starting task urgent-1\n",
|
||||
"Urgent processor finished task urgent-1\n",
|
||||
"Normal processor finished task normal-1\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"runtime = SingleThreadedAgentRuntime()\n",
|
||||
"\n",
|
||||
"await UrgentProcessor.register(runtime, \"urgent_processor\", lambda: UrgentProcessor(\"Urgent Processor\"))\n",
|
||||
"await NormalProcessor.register(runtime, \"normal_processor\", lambda: NormalProcessor(\"Normal Processor\"))\n",
|
||||
"\n",
|
||||
"runtime.start()\n",
|
||||
"\n",
|
||||
"await runtime.publish_message(Task(task_id=\"normal-1\"), topic_id=TopicId(type=\"normal\", source=\"default\"))\n",
|
||||
"await runtime.publish_message(Task(task_id=\"urgent-1\"), topic_id=TopicId(type=\"urgent\", source=\"default\"))\n",
|
||||
"\n",
|
||||
"await runtime.stop_when_idle()"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### Collecting Results"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"In the previous example, we relied on console printing to verify task completion. However, in real applications, we typically want to collect and process the results programmatically.\n",
|
||||
"\n",
|
||||
"To collect these messages, we'll use a {py:class}`~autogen_core.components.ClosureAgent`. We've defined a dedicated topic `TASK_RESULTS_TOPIC_TYPE` where both `UrgentProcessor` and `NormalProcessor` publish their results. The ClosureAgent will then process messages from this topic."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 52,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"Normal processor starting task normal-1\n",
|
||||
"Urgent processor starting task urgent-1\n",
|
||||
"Urgent processor finished task urgent-1\n",
|
||||
"Normal processor finished task normal-1\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"queue = asyncio.Queue[TaskResponse]()\n",
|
||||
"\n",
|
||||
"\n",
|
||||
"async def collect_result(_agent: ClosureContext, message: TaskResponse, ctx: MessageContext) -> None:\n",
|
||||
" await queue.put(message)\n",
|
||||
"\n",
|
||||
"\n",
|
||||
"runtime.start()\n",
|
||||
"\n",
|
||||
"CLOSURE_AGENT_TYPE = \"collect_result_agent\"\n",
|
||||
"await ClosureAgent.register_closure(\n",
|
||||
" runtime,\n",
|
||||
" CLOSURE_AGENT_TYPE,\n",
|
||||
" collect_result,\n",
|
||||
" subscriptions=lambda: [TypeSubscription(topic_type=TASK_RESULTS_TOPIC_TYPE, agent_type=CLOSURE_AGENT_TYPE)],\n",
|
||||
")\n",
|
||||
"\n",
|
||||
"await runtime.publish_message(Task(task_id=\"normal-1\"), topic_id=TopicId(type=\"normal\", source=\"default\"))\n",
|
||||
"await runtime.publish_message(Task(task_id=\"urgent-1\"), topic_id=TopicId(type=\"urgent\", source=\"default\"))\n",
|
||||
"\n",
|
||||
"await runtime.stop_when_idle()"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 53,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"TaskResponse(task_id='urgent-1', result='Results by Urgent Processor')\n",
|
||||
"TaskResponse(task_id='normal-1', result='Results by Normal Processor')\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"while not queue.empty():\n",
|
||||
" print(await queue.get())"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Direct Messages\n",
|
||||
"\n",
|
||||
"In contrast to the previous patterns, this pattern focuses on direct messages. Here we demonstrate two ways to send them:\n",
|
||||
"\n",
|
||||
"- Direct messaging between agents \n",
|
||||
"- Sending messages from the runtime to specific agents \n",
|
||||
"\n",
|
||||
"Things to consider in the example below:\n",
|
||||
"\n",
|
||||
"- Messages are addressed using the {py:class}`~autogen_core.components.AgentId`. \n",
|
||||
"- The sender can expect to receive a response from the target agent. \n",
|
||||
"- We register the `WorkerAgent` class only once; however, we send tasks to two different workers.\n",
|
||||
" - How? As stated in [Agent lifecycle](../core-concepts/agent-identity-and-lifecycle.md#agent-lifecycle), when delivering a message using an {py:class}`~autogen_core.components.AgentId`, the runtime will either fetch the instance or create one if it doesn't exist. In this case, the runtime creates two instances of workers when sending those two messages."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 36,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"class WorkerAgent(RoutedAgent):\n",
|
||||
" @message_handler\n",
|
||||
" async def on_task(self, message: Task, ctx: MessageContext) -> TaskResponse:\n",
|
||||
" print(f\"{self.id} starting task {message.task_id}\")\n",
|
||||
" await asyncio.sleep(2) # Simulate work\n",
|
||||
" print(f\"{self.id} finished task {message.task_id}\")\n",
|
||||
" return TaskResponse(task_id=message.task_id, result=f\"Results by {self.id}\")\n",
|
||||
"\n",
|
||||
"\n",
|
||||
"class DelegatorAgent(RoutedAgent):\n",
|
||||
" def __init__(self, description: str, worker_type: str):\n",
|
||||
" super().__init__(description)\n",
|
||||
" self.worker_instances = [AgentId(worker_type, f\"{worker_type}-1\"), AgentId(worker_type, f\"{worker_type}-2\")]\n",
|
||||
"\n",
|
||||
" @message_handler\n",
|
||||
" async def on_task(self, message: Task, ctx: MessageContext) -> TaskResponse:\n",
|
||||
" print(f\"Delegator received task {message.task_id}.\")\n",
|
||||
"\n",
|
||||
" subtask1 = Task(task_id=\"task-part-1\")\n",
|
||||
" subtask2 = Task(task_id=\"task-part-2\")\n",
|
||||
"\n",
|
||||
" worker1_result, worker2_result = await asyncio.gather(\n",
|
||||
" self.send_message(subtask1, self.worker_instances[0]), self.send_message(subtask2, self.worker_instances[1])\n",
|
||||
" )\n",
|
||||
"\n",
|
||||
" combined_result = f\"Part 1: {worker1_result.result}, \" f\"Part 2: {worker2_result.result}\"\n",
|
||||
" task_response = TaskResponse(task_id=message.task_id, result=combined_result)\n",
|
||||
" return task_response"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 37,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"Delegator received task main-task.\n",
|
||||
"worker/worker-1 starting task task-part-1\n",
|
||||
"worker/worker-2 starting task task-part-2\n",
|
||||
"worker/worker-1 finished task task-part-1\n",
|
||||
"worker/worker-2 finished task task-part-2\n",
|
||||
"Final result: Part 1: Results by worker/worker-1, Part 2: Results by worker/worker-2\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"runtime = SingleThreadedAgentRuntime()\n",
|
||||
"\n",
|
||||
"await WorkerAgent.register(runtime, \"worker\", lambda: WorkerAgent(\"Worker Agent\"))\n",
|
||||
"await DelegatorAgent.register(runtime, \"delegator\", lambda: DelegatorAgent(\"Delegator Agent\", \"worker\"))\n",
|
||||
"\n",
|
||||
"runtime.start()\n",
|
||||
"\n",
|
||||
"delegator = AgentId(\"delegator\", \"default\")\n",
|
||||
"response = await runtime.send_message(Task(task_id=\"main-task\"), recipient=delegator)\n",
|
||||
"\n",
|
||||
"print(f\"Final result: {response.result}\")\n",
|
||||
"await runtime.stop_when_idle()"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Additional Resources"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"If you're interested in more about concurrent processing, check out the [Mixture of Agents](./mixture-of-agents.ipynb) pattern, which relies heavily on concurrent agents."
|
||||
]
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"kernelspec": {
|
||||
"display_name": "autogen",
|
||||
"language": "python",
|
||||
"name": "python3"
|
||||
},
|
||||
"language_info": {
|
||||
"codemirror_mode": {
|
||||
"name": "ipython",
|
||||
"version": 3
|
||||
},
|
||||
"file_extension": ".py",
|
||||
"mimetype": "text/x-python",
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.12.7"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 2
|
||||
}
|
@ -20,6 +20,7 @@ group chat for task decomposition, and reflection for robustness.
|
||||
```{toctree}
|
||||
:maxdepth: 1
|
||||
|
||||
concurrent-agents
|
||||
group-chat
|
||||
handoffs
|
||||
mixture-of-agents
|
||||
|
Loading…
x
Reference in New Issue
Block a user