Add type prefix subscription (#4383)

* Add type prefix subscription

* update example

---------

Co-authored-by: Ryan Sweet <rysweet@microsoft.com>
This commit is contained in:
Jack Gerrits 2024-11-26 15:58:26 -05:00 committed by GitHub
parent 74bcd5e0f6
commit 1f07e5bea5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 181 additions and 59 deletions

View File

@ -34,7 +34,7 @@ This document describes the structure, constraints, and behavior of Agent IDs an
- Type: `string` - Type: `string`
- Description: Topic type is usually defined by application code to mark the type of messages the topic is for. - Description: Topic type is usually defined by application code to mark the type of messages the topic is for.
- Constraints: UTF8 and only contain alphanumeric letters (a-z) and (0-9), or underscores (\_). A valid identifier cannot start with a number, or contain any spaces. - Constraints: UTF8 and only contain alphanumeric letters (a-z) and (0-9), ':', '=', or underscores (\_). A valid identifier cannot start with a number, or contain any spaces.
- Examples: - Examples:
- `GitHub_Issues` - `GitHub_Issues`

View File

@ -63,9 +63,15 @@ message TypeSubscription {
string agent_type = 2; string agent_type = 2;
} }
message TypePrefixSubscription {
string topic_type_prefix = 1;
string agent_type = 2;
}
message Subscription { message Subscription {
oneof subscription { oneof subscription {
TypeSubscription typeSubscription = 1; TypeSubscription typeSubscription = 1;
TypePrefixSubscription typePrefixSubscription = 2;
} }
} }

View File

@ -47,7 +47,7 @@ from ..base import (
) )
from ..base._serialization import MessageSerializer, SerializationRegistry from ..base._serialization import MessageSerializer, SerializationRegistry
from ..base._type_helpers import ChannelArgumentType from ..base._type_helpers import ChannelArgumentType
from ..components import TypeSubscription from ..components import TypePrefixSubscription, TypeSubscription
from ._helpers import SubscriptionManager, get_impl from ._helpers import SubscriptionManager, get_impl
from ._utils import GRPC_IMPORT_ERROR_STR from ._utils import GRPC_IMPORT_ERROR_STR
from .protos import agent_worker_pb2, agent_worker_pb2_grpc from .protos import agent_worker_pb2, agent_worker_pb2_grpc
@ -705,27 +705,44 @@ class WorkerAgentRuntime(AgentRuntime):
async def add_subscription(self, subscription: Subscription) -> None: async def add_subscription(self, subscription: Subscription) -> None:
if self._host_connection is None: if self._host_connection is None:
raise RuntimeError("Host connection is not set.") raise RuntimeError("Host connection is not set.")
if not isinstance(subscription, TypeSubscription):
raise ValueError("Only TypeSubscription is supported.")
# Add to local subscription manager.
await self._subscription_manager.add_subscription(subscription)
# Create a future for the subscription response. # Create a future for the subscription response.
future = asyncio.get_event_loop().create_future() future = asyncio.get_event_loop().create_future()
request_id = await self._get_new_request_id() request_id = await self._get_new_request_id()
self._pending_requests[request_id] = future
# Send the subscription to the host. match subscription:
case TypeSubscription(topic_type=topic_type, agent_type=agent_type):
message = agent_worker_pb2.Message( message = agent_worker_pb2.Message(
addSubscriptionRequest=agent_worker_pb2.AddSubscriptionRequest( addSubscriptionRequest=agent_worker_pb2.AddSubscriptionRequest(
request_id=request_id, request_id=request_id,
subscription=agent_worker_pb2.Subscription( subscription=agent_worker_pb2.Subscription(
typeSubscription=agent_worker_pb2.TypeSubscription( typeSubscription=agent_worker_pb2.TypeSubscription(
topic_type=subscription.topic_type, agent_type=subscription.agent_type topic_type=topic_type, agent_type=agent_type
) )
), ),
) )
) )
case TypePrefixSubscription(topic_type_prefix=topic_type_prefix, agent_type=agent_type):
message = agent_worker_pb2.Message(
addSubscriptionRequest=agent_worker_pb2.AddSubscriptionRequest(
request_id=request_id,
subscription=agent_worker_pb2.Subscription(
typePrefixSubscription=agent_worker_pb2.TypePrefixSubscription(
topic_type_prefix=topic_type_prefix, agent_type=agent_type
)
),
)
)
case _:
raise ValueError("Unsupported subscription type.")
# Add the future to the pending requests.
self._pending_requests[request_id] = future
# Add to local subscription manager.
await self._subscription_manager.add_subscription(subscription)
# Send the subscription to the host.
await self._host_connection.send(message) await self._host_connection.send(message)
# Wait for the subscription response. # Wait for the subscription response.

View File

@ -4,7 +4,9 @@ from _collections_abc import AsyncIterator, Iterator
from asyncio import Future, Task from asyncio import Future, Task
from typing import Any, Dict, Set from typing import Any, Dict, Set
from ..base import TopicId from autogen_core.components._type_prefix_subscription import TypePrefixSubscription
from ..base import Subscription, TopicId
from ..components import TypeSubscription from ..components import TypeSubscription
from ._helpers import SubscriptionManager from ._helpers import SubscriptionManager
from ._utils import GRPC_IMPORT_ERROR_STR from ._utils import GRPC_IMPORT_ERROR_STR
@ -221,18 +223,32 @@ class WorkerAgentRuntimeHostServicer(agent_worker_pb2_grpc.AgentRpcServicer):
self, add_subscription_req: agent_worker_pb2.AddSubscriptionRequest, client_id: int self, add_subscription_req: agent_worker_pb2.AddSubscriptionRequest, client_id: int
) -> None: ) -> None:
oneofcase = add_subscription_req.subscription.WhichOneof("subscription") oneofcase = add_subscription_req.subscription.WhichOneof("subscription")
subscription: Subscription | None = None
match oneofcase: match oneofcase:
case "typeSubscription": case "typeSubscription":
type_subscription_msg: agent_worker_pb2.TypeSubscription = ( type_subscription_msg: agent_worker_pb2.TypeSubscription = (
add_subscription_req.subscription.typeSubscription add_subscription_req.subscription.typeSubscription
) )
type_subscription = TypeSubscription( subscription = TypeSubscription(
topic_type=type_subscription_msg.topic_type, agent_type=type_subscription_msg.agent_type topic_type=type_subscription_msg.topic_type, agent_type=type_subscription_msg.agent_type
) )
case "typePrefixSubscription":
type_prefix_subscription_msg: agent_worker_pb2.TypePrefixSubscription = (
add_subscription_req.subscription.typePrefixSubscription
)
subscription = TypePrefixSubscription(
topic_type_prefix=type_prefix_subscription_msg.topic_type_prefix,
agent_type=type_prefix_subscription_msg.agent_type,
)
case None:
logger.warning("Received empty subscription message")
if subscription is not None:
try: try:
await self._subscription_manager.add_subscription(type_subscription) await self._subscription_manager.add_subscription(subscription)
subscription_ids = self._client_id_to_subscription_id_mapping.setdefault(client_id, set()) subscription_ids = self._client_id_to_subscription_id_mapping.setdefault(client_id, set())
subscription_ids.add(type_subscription.id) subscription_ids.add(subscription.id)
success = True success = True
error = None error = None
except ValueError as e: except ValueError as e:
@ -246,8 +262,6 @@ class WorkerAgentRuntimeHostServicer(agent_worker_pb2_grpc.AgentRpcServicer):
) )
) )
) )
case None:
logger.warning("Received empty subscription message")
async def GetState( # type: ignore async def GetState( # type: ignore
self, self,

View File

@ -16,7 +16,7 @@ import cloudevent_pb2 as cloudevent__pb2
from google.protobuf import any_pb2 as google_dot_protobuf_dot_any__pb2 from google.protobuf import any_pb2 as google_dot_protobuf_dot_any__pb2
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x12\x61gent_worker.proto\x12\x06\x61gents\x1a\x10\x63loudevent.proto\x1a\x19google/protobuf/any.proto\"\'\n\x07TopicId\x12\x0c\n\x04type\x18\x01 \x01(\t\x12\x0e\n\x06source\x18\x02 \x01(\t\"$\n\x07\x41gentId\x12\x0c\n\x04type\x18\x01 \x01(\t\x12\x0b\n\x03key\x18\x02 \x01(\t\"E\n\x07Payload\x12\x11\n\tdata_type\x18\x01 \x01(\t\x12\x19\n\x11\x64\x61ta_content_type\x18\x02 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x03 \x01(\x0c\"\x89\x02\n\nRpcRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12$\n\x06source\x18\x02 \x01(\x0b\x32\x0f.agents.AgentIdH\x00\x88\x01\x01\x12\x1f\n\x06target\x18\x03 \x01(\x0b\x32\x0f.agents.AgentId\x12\x0e\n\x06method\x18\x04 \x01(\t\x12 \n\x07payload\x18\x05 \x01(\x0b\x32\x0f.agents.Payload\x12\x32\n\x08metadata\x18\x06 \x03(\x0b\x32 .agents.RpcRequest.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\t\n\x07_source\"\xb8\x01\n\x0bRpcResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12 \n\x07payload\x18\x02 \x01(\x0b\x32\x0f.agents.Payload\x12\r\n\x05\x65rror\x18\x03 \x01(\t\x12\x33\n\x08metadata\x18\x04 \x03(\x0b\x32!.agents.RpcResponse.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xe4\x01\n\x05\x45vent\x12\x12\n\ntopic_type\x18\x01 \x01(\t\x12\x14\n\x0ctopic_source\x18\x02 \x01(\t\x12$\n\x06source\x18\x03 \x01(\x0b\x32\x0f.agents.AgentIdH\x00\x88\x01\x01\x12 \n\x07payload\x18\x04 \x01(\x0b\x32\x0f.agents.Payload\x12-\n\x08metadata\x18\x05 \x03(\x0b\x32\x1b.agents.Event.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\t\n\x07_source\"<\n\x18RegisterAgentTypeRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0c\n\x04type\x18\x02 \x01(\t\"^\n\x19RegisterAgentTypeResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x12\n\x05\x65rror\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\":\n\x10TypeSubscription\x12\x12\n\ntopic_type\x18\x01 \x01(\t\x12\x12\n\nagent_type\x18\x02 \x01(\t\"T\n\x0cSubscription\x12\x34\n\x10typeSubscription\x18\x01 \x01(\x0b\x32\x18.agents.TypeSubscriptionH\x00\x42\x0e\n\x0csubscription\"X\n\x16\x41\x64\x64SubscriptionRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12*\n\x0csubscription\x18\x02 \x01(\x0b\x32\x14.agents.Subscription\"\\\n\x17\x41\x64\x64SubscriptionResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x12\n\x05\x65rror\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\"\x9d\x01\n\nAgentState\x12!\n\x08\x61gent_id\x18\x01 \x01(\x0b\x32\x0f.agents.AgentId\x12\x0c\n\x04\x65Tag\x18\x02 \x01(\t\x12\x15\n\x0b\x62inary_data\x18\x03 \x01(\x0cH\x00\x12\x13\n\ttext_data\x18\x04 \x01(\tH\x00\x12*\n\nproto_data\x18\x05 \x01(\x0b\x32\x14.google.protobuf.AnyH\x00\x42\x06\n\x04\x64\x61ta\"j\n\x10GetStateResponse\x12\'\n\x0b\x61gent_state\x18\x01 \x01(\x0b\x32\x12.agents.AgentState\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x12\n\x05\x65rror\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\"B\n\x11SaveStateResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\x12\x12\n\x05\x65rror\x18\x02 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\"\xc6\x03\n\x07Message\x12%\n\x07request\x18\x01 \x01(\x0b\x32\x12.agents.RpcRequestH\x00\x12\'\n\x08response\x18\x02 \x01(\x0b\x32\x13.agents.RpcResponseH\x00\x12\x1e\n\x05\x65vent\x18\x03 \x01(\x0b\x32\r.agents.EventH\x00\x12\x44\n\x18registerAgentTypeRequest\x18\x04 \x01(\x0b\x32 .agents.RegisterAgentTypeRequestH\x00\x12\x46\n\x19registerAgentTypeResponse\x18\x05 \x01(\x0b\x32!.agents.RegisterAgentTypeResponseH\x00\x12@\n\x16\x61\x64\x64SubscriptionRequest\x18\x06 \x01(\x0b\x32\x1e.agents.AddSubscriptionRequestH\x00\x12\x42\n\x17\x61\x64\x64SubscriptionResponse\x18\x07 \x01(\x0b\x32\x1f.agents.AddSubscriptionResponseH\x00\x12,\n\ncloudEvent\x18\x08 \x01(\x0b\x32\x16.cloudevent.CloudEventH\x00\x42\t\n\x07message2\xb2\x01\n\x08\x41gentRpc\x12\x33\n\x0bOpenChannel\x12\x0f.agents.Message\x1a\x0f.agents.Message(\x01\x30\x01\x12\x35\n\x08GetState\x12\x0f.agents.AgentId\x1a\x18.agents.GetStateResponse\x12:\n\tSaveState\x12\x12.agents.AgentState\x1a\x19.agents.SaveStateResponseB!\xaa\x02\x1eMicrosoft.AutoGen.Abstractionsb\x06proto3') DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x12\x61gent_worker.proto\x12\x06\x61gents\x1a\x10\x63loudevent.proto\x1a\x19google/protobuf/any.proto\"\'\n\x07TopicId\x12\x0c\n\x04type\x18\x01 \x01(\t\x12\x0e\n\x06source\x18\x02 \x01(\t\"$\n\x07\x41gentId\x12\x0c\n\x04type\x18\x01 \x01(\t\x12\x0b\n\x03key\x18\x02 \x01(\t\"E\n\x07Payload\x12\x11\n\tdata_type\x18\x01 \x01(\t\x12\x19\n\x11\x64\x61ta_content_type\x18\x02 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x03 \x01(\x0c\"\x89\x02\n\nRpcRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12$\n\x06source\x18\x02 \x01(\x0b\x32\x0f.agents.AgentIdH\x00\x88\x01\x01\x12\x1f\n\x06target\x18\x03 \x01(\x0b\x32\x0f.agents.AgentId\x12\x0e\n\x06method\x18\x04 \x01(\t\x12 \n\x07payload\x18\x05 \x01(\x0b\x32\x0f.agents.Payload\x12\x32\n\x08metadata\x18\x06 \x03(\x0b\x32 .agents.RpcRequest.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\t\n\x07_source\"\xb8\x01\n\x0bRpcResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12 \n\x07payload\x18\x02 \x01(\x0b\x32\x0f.agents.Payload\x12\r\n\x05\x65rror\x18\x03 \x01(\t\x12\x33\n\x08metadata\x18\x04 \x03(\x0b\x32!.agents.RpcResponse.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xe4\x01\n\x05\x45vent\x12\x12\n\ntopic_type\x18\x01 \x01(\t\x12\x14\n\x0ctopic_source\x18\x02 \x01(\t\x12$\n\x06source\x18\x03 \x01(\x0b\x32\x0f.agents.AgentIdH\x00\x88\x01\x01\x12 \n\x07payload\x18\x04 \x01(\x0b\x32\x0f.agents.Payload\x12-\n\x08metadata\x18\x05 \x03(\x0b\x32\x1b.agents.Event.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\t\n\x07_source\"<\n\x18RegisterAgentTypeRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0c\n\x04type\x18\x02 \x01(\t\"^\n\x19RegisterAgentTypeResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x12\n\x05\x65rror\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\":\n\x10TypeSubscription\x12\x12\n\ntopic_type\x18\x01 \x01(\t\x12\x12\n\nagent_type\x18\x02 \x01(\t\"G\n\x16TypePrefixSubscription\x12\x19\n\x11topic_type_prefix\x18\x01 \x01(\t\x12\x12\n\nagent_type\x18\x02 \x01(\t\"\x96\x01\n\x0cSubscription\x12\x34\n\x10typeSubscription\x18\x01 \x01(\x0b\x32\x18.agents.TypeSubscriptionH\x00\x12@\n\x16typePrefixSubscription\x18\x02 \x01(\x0b\x32\x1e.agents.TypePrefixSubscriptionH\x00\x42\x0e\n\x0csubscription\"X\n\x16\x41\x64\x64SubscriptionRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12*\n\x0csubscription\x18\x02 \x01(\x0b\x32\x14.agents.Subscription\"\\\n\x17\x41\x64\x64SubscriptionResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x12\n\x05\x65rror\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\"\x9d\x01\n\nAgentState\x12!\n\x08\x61gent_id\x18\x01 \x01(\x0b\x32\x0f.agents.AgentId\x12\x0c\n\x04\x65Tag\x18\x02 \x01(\t\x12\x15\n\x0b\x62inary_data\x18\x03 \x01(\x0cH\x00\x12\x13\n\ttext_data\x18\x04 \x01(\tH\x00\x12*\n\nproto_data\x18\x05 \x01(\x0b\x32\x14.google.protobuf.AnyH\x00\x42\x06\n\x04\x64\x61ta\"j\n\x10GetStateResponse\x12\'\n\x0b\x61gent_state\x18\x01 \x01(\x0b\x32\x12.agents.AgentState\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x12\n\x05\x65rror\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\"B\n\x11SaveStateResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\x12\x12\n\x05\x65rror\x18\x02 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\"\xc6\x03\n\x07Message\x12%\n\x07request\x18\x01 \x01(\x0b\x32\x12.agents.RpcRequestH\x00\x12\'\n\x08response\x18\x02 \x01(\x0b\x32\x13.agents.RpcResponseH\x00\x12\x1e\n\x05\x65vent\x18\x03 \x01(\x0b\x32\r.agents.EventH\x00\x12\x44\n\x18registerAgentTypeRequest\x18\x04 \x01(\x0b\x32 .agents.RegisterAgentTypeRequestH\x00\x12\x46\n\x19registerAgentTypeResponse\x18\x05 \x01(\x0b\x32!.agents.RegisterAgentTypeResponseH\x00\x12@\n\x16\x61\x64\x64SubscriptionRequest\x18\x06 \x01(\x0b\x32\x1e.agents.AddSubscriptionRequestH\x00\x12\x42\n\x17\x61\x64\x64SubscriptionResponse\x18\x07 \x01(\x0b\x32\x1f.agents.AddSubscriptionResponseH\x00\x12,\n\ncloudEvent\x18\x08 \x01(\x0b\x32\x16.cloudevent.CloudEventH\x00\x42\t\n\x07message2\xb2\x01\n\x08\x41gentRpc\x12\x33\n\x0bOpenChannel\x12\x0f.agents.Message\x1a\x0f.agents.Message(\x01\x30\x01\x12\x35\n\x08GetState\x12\x0f.agents.AgentId\x1a\x18.agents.GetStateResponse\x12:\n\tSaveState\x12\x12.agents.AgentState\x1a\x19.agents.SaveStateResponseB!\xaa\x02\x1eMicrosoft.AutoGen.Abstractionsb\x06proto3')
_globals = globals() _globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
@ -54,20 +54,22 @@ if _descriptor._USE_C_DESCRIPTORS == False:
_globals['_REGISTERAGENTTYPERESPONSE']._serialized_end=1067 _globals['_REGISTERAGENTTYPERESPONSE']._serialized_end=1067
_globals['_TYPESUBSCRIPTION']._serialized_start=1069 _globals['_TYPESUBSCRIPTION']._serialized_start=1069
_globals['_TYPESUBSCRIPTION']._serialized_end=1127 _globals['_TYPESUBSCRIPTION']._serialized_end=1127
_globals['_SUBSCRIPTION']._serialized_start=1129 _globals['_TYPEPREFIXSUBSCRIPTION']._serialized_start=1129
_globals['_SUBSCRIPTION']._serialized_end=1213 _globals['_TYPEPREFIXSUBSCRIPTION']._serialized_end=1200
_globals['_ADDSUBSCRIPTIONREQUEST']._serialized_start=1215 _globals['_SUBSCRIPTION']._serialized_start=1203
_globals['_ADDSUBSCRIPTIONREQUEST']._serialized_end=1303 _globals['_SUBSCRIPTION']._serialized_end=1353
_globals['_ADDSUBSCRIPTIONRESPONSE']._serialized_start=1305 _globals['_ADDSUBSCRIPTIONREQUEST']._serialized_start=1355
_globals['_ADDSUBSCRIPTIONRESPONSE']._serialized_end=1397 _globals['_ADDSUBSCRIPTIONREQUEST']._serialized_end=1443
_globals['_AGENTSTATE']._serialized_start=1400 _globals['_ADDSUBSCRIPTIONRESPONSE']._serialized_start=1445
_globals['_AGENTSTATE']._serialized_end=1557 _globals['_ADDSUBSCRIPTIONRESPONSE']._serialized_end=1537
_globals['_GETSTATERESPONSE']._serialized_start=1559 _globals['_AGENTSTATE']._serialized_start=1540
_globals['_GETSTATERESPONSE']._serialized_end=1665 _globals['_AGENTSTATE']._serialized_end=1697
_globals['_SAVESTATERESPONSE']._serialized_start=1667 _globals['_GETSTATERESPONSE']._serialized_start=1699
_globals['_SAVESTATERESPONSE']._serialized_end=1733 _globals['_GETSTATERESPONSE']._serialized_end=1805
_globals['_MESSAGE']._serialized_start=1736 _globals['_SAVESTATERESPONSE']._serialized_start=1807
_globals['_MESSAGE']._serialized_end=2190 _globals['_SAVESTATERESPONSE']._serialized_end=1873
_globals['_AGENTRPC']._serialized_start=2193 _globals['_MESSAGE']._serialized_start=1876
_globals['_AGENTRPC']._serialized_end=2371 _globals['_MESSAGE']._serialized_end=2330
_globals['_AGENTRPC']._serialized_start=2333
_globals['_AGENTRPC']._serialized_end=2511
# @@protoc_insertion_point(module_scope) # @@protoc_insertion_point(module_scope)

View File

@ -273,21 +273,43 @@ class TypeSubscription(google.protobuf.message.Message):
global___TypeSubscription = TypeSubscription global___TypeSubscription = TypeSubscription
@typing.final
class TypePrefixSubscription(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
TOPIC_TYPE_PREFIX_FIELD_NUMBER: builtins.int
AGENT_TYPE_FIELD_NUMBER: builtins.int
topic_type_prefix: builtins.str
agent_type: builtins.str
def __init__(
self,
*,
topic_type_prefix: builtins.str = ...,
agent_type: builtins.str = ...,
) -> None: ...
def ClearField(self, field_name: typing.Literal["agent_type", b"agent_type", "topic_type_prefix", b"topic_type_prefix"]) -> None: ...
global___TypePrefixSubscription = TypePrefixSubscription
@typing.final @typing.final
class Subscription(google.protobuf.message.Message): class Subscription(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor DESCRIPTOR: google.protobuf.descriptor.Descriptor
TYPESUBSCRIPTION_FIELD_NUMBER: builtins.int TYPESUBSCRIPTION_FIELD_NUMBER: builtins.int
TYPEPREFIXSUBSCRIPTION_FIELD_NUMBER: builtins.int
@property @property
def typeSubscription(self) -> global___TypeSubscription: ... def typeSubscription(self) -> global___TypeSubscription: ...
@property
def typePrefixSubscription(self) -> global___TypePrefixSubscription: ...
def __init__( def __init__(
self, self,
*, *,
typeSubscription: global___TypeSubscription | None = ..., typeSubscription: global___TypeSubscription | None = ...,
typePrefixSubscription: global___TypePrefixSubscription | None = ...,
) -> None: ... ) -> None: ...
def HasField(self, field_name: typing.Literal["subscription", b"subscription", "typeSubscription", b"typeSubscription"]) -> builtins.bool: ... def HasField(self, field_name: typing.Literal["subscription", b"subscription", "typePrefixSubscription", b"typePrefixSubscription", "typeSubscription", b"typeSubscription"]) -> builtins.bool: ...
def ClearField(self, field_name: typing.Literal["subscription", b"subscription", "typeSubscription", b"typeSubscription"]) -> None: ... def ClearField(self, field_name: typing.Literal["subscription", b"subscription", "typePrefixSubscription", b"typePrefixSubscription", "typeSubscription", b"typeSubscription"]) -> None: ...
def WhichOneof(self, oneof_group: typing.Literal["subscription", b"subscription"]) -> typing.Literal["typeSubscription"] | None: ... def WhichOneof(self, oneof_group: typing.Literal["subscription", b"subscription"]) -> typing.Literal["typeSubscription", "typePrefixSubscription"] | None: ...
global___Subscription = Subscription global___Subscription = Subscription

View File

@ -7,6 +7,7 @@ from ._default_subscription import DefaultSubscription, default_subscription, ty
from ._default_topic import DefaultTopicId from ._default_topic import DefaultTopicId
from ._image import Image from ._image import Image
from ._routed_agent import RoutedAgent, TypeRoutedAgent, event, message_handler, rpc from ._routed_agent import RoutedAgent, TypeRoutedAgent, event, message_handler, rpc
from ._type_prefix_subscription import TypePrefixSubscription
from ._type_subscription import TypeSubscription from ._type_subscription import TypeSubscription
from ._types import FunctionCall from ._types import FunctionCall
@ -24,4 +25,5 @@ __all__ = [
"DefaultTopicId", "DefaultTopicId",
"default_subscription", "default_subscription",
"type_subscription", "type_subscription",
"TypePrefixSubscription",
] ]

View File

@ -0,0 +1,63 @@
import uuid
from ..base import AgentId, Subscription, TopicId
from ..base.exceptions import CantHandleException
class TypePrefixSubscription(Subscription):
"""This subscription matches on topics based on a prefix of the type and maps to agents using the source of the topic as the agent key.
This subscription causes each source to have its own agent instance.
Example:
.. code-block:: python
from autogen_core.components import TypePrefixSubscription
subscription = TypePrefixSubscription(topic_type_prefix="t1", agent_type="a1")
In this case:
- A topic_id with type `t1` and source `s1` will be handled by an agent of type `a1` with key `s1`
- A topic_id with type `t1` and source `s2` will be handled by an agent of type `a1` with key `s2`.
- A topic_id with type `t1SUFFIX` and source `s2` will be handled by an agent of type `a1` with key `s2`.
Args:
topic_type_prefix (str): Topic type prefix to match against
agent_type (str): Agent type to handle this subscription
"""
def __init__(self, topic_type_prefix: str, agent_type: str):
self._topic_type_prefix = topic_type_prefix
self._agent_type = agent_type
self._id = str(uuid.uuid4())
@property
def id(self) -> str:
return self._id
@property
def topic_type_prefix(self) -> str:
return self._topic_type_prefix
@property
def agent_type(self) -> str:
return self._agent_type
def is_match(self, topic_id: TopicId) -> bool:
return topic_id.type.startswith(self._topic_type_prefix)
def map_to_agent(self, topic_id: TopicId) -> AgentId:
if not self.is_match(topic_id):
raise CantHandleException("TopicId does not match the subscription")
return AgentId(type=self._agent_type, key=topic_id.source)
def __eq__(self, other: object) -> bool:
if not isinstance(other, TypePrefixSubscription):
return False
return self.id == other.id or (
self.agent_type == other.agent_type and self.topic_type_prefix == other.topic_type_prefix
)

View File

@ -1,7 +1,6 @@
import uuid import uuid
from typing import TypeVar
from ..base import AgentId, BaseAgent, Subscription, TopicId from ..base import AgentId, Subscription, TopicId
from ..base.exceptions import CantHandleException from ..base.exceptions import CantHandleException
@ -59,6 +58,3 @@ class TypeSubscription(Subscription):
return False return False
return self.id == other.id or (self.agent_type == other.agent_type and self.topic_type == other.topic_type) return self.id == other.id or (self.agent_type == other.agent_type and self.topic_type == other.topic_type)
BaseAgentType = TypeVar("BaseAgentType", bound="BaseAgent")