diff --git a/docs/design/04 - Agent and Topic ID Specs.md b/docs/design/04 - Agent and Topic ID Specs.md index ee872ab2a..b0e0e0e94 100644 --- a/docs/design/04 - Agent and Topic ID Specs.md +++ b/docs/design/04 - Agent and Topic ID Specs.md @@ -34,7 +34,7 @@ This document describes the structure, constraints, and behavior of Agent IDs an - Type: `string` - Description: Topic type is usually defined by application code to mark the type of messages the topic is for. -- Constraints: UTF8 and only contain alphanumeric letters (a-z) and (0-9), or underscores (\_). A valid identifier cannot start with a number, or contain any spaces. +- Constraints: UTF8 and only contain alphanumeric letters (a-z) and (0-9), ':', '=', or underscores (\_). A valid identifier cannot start with a number, or contain any spaces. - Examples: - `GitHub_Issues` diff --git a/protos/agent_worker.proto b/protos/agent_worker.proto index 7b0b5245d..61b00333c 100644 --- a/protos/agent_worker.proto +++ b/protos/agent_worker.proto @@ -63,9 +63,15 @@ message TypeSubscription { string agent_type = 2; } +message TypePrefixSubscription { + string topic_type_prefix = 1; + string agent_type = 2; +} + message Subscription { oneof subscription { TypeSubscription typeSubscription = 1; + TypePrefixSubscription typePrefixSubscription = 2; } } diff --git a/python/packages/autogen-core/src/autogen_core/application/_worker_runtime.py b/python/packages/autogen-core/src/autogen_core/application/_worker_runtime.py index 2c4057108..f8ad4bbd1 100644 --- a/python/packages/autogen-core/src/autogen_core/application/_worker_runtime.py +++ b/python/packages/autogen-core/src/autogen_core/application/_worker_runtime.py @@ -47,7 +47,7 @@ from ..base import ( ) from ..base._serialization import MessageSerializer, SerializationRegistry from ..base._type_helpers import ChannelArgumentType -from ..components import TypeSubscription +from ..components import TypePrefixSubscription, TypeSubscription from ._helpers import SubscriptionManager, get_impl from ._utils import GRPC_IMPORT_ERROR_STR from .protos import agent_worker_pb2, agent_worker_pb2_grpc @@ -705,27 +705,44 @@ class WorkerAgentRuntime(AgentRuntime): async def add_subscription(self, subscription: Subscription) -> None: if self._host_connection is None: raise RuntimeError("Host connection is not set.") - if not isinstance(subscription, TypeSubscription): - raise ValueError("Only TypeSubscription is supported.") - # Add to local subscription manager. - await self._subscription_manager.add_subscription(subscription) # Create a future for the subscription response. future = asyncio.get_event_loop().create_future() request_id = await self._get_new_request_id() + + match subscription: + case TypeSubscription(topic_type=topic_type, agent_type=agent_type): + message = agent_worker_pb2.Message( + addSubscriptionRequest=agent_worker_pb2.AddSubscriptionRequest( + request_id=request_id, + subscription=agent_worker_pb2.Subscription( + typeSubscription=agent_worker_pb2.TypeSubscription( + topic_type=topic_type, agent_type=agent_type + ) + ), + ) + ) + case TypePrefixSubscription(topic_type_prefix=topic_type_prefix, agent_type=agent_type): + message = agent_worker_pb2.Message( + addSubscriptionRequest=agent_worker_pb2.AddSubscriptionRequest( + request_id=request_id, + subscription=agent_worker_pb2.Subscription( + typePrefixSubscription=agent_worker_pb2.TypePrefixSubscription( + topic_type_prefix=topic_type_prefix, agent_type=agent_type + ) + ), + ) + ) + case _: + raise ValueError("Unsupported subscription type.") + + # Add the future to the pending requests. self._pending_requests[request_id] = future + # Add to local subscription manager. + await self._subscription_manager.add_subscription(subscription) + # Send the subscription to the host. - message = agent_worker_pb2.Message( - addSubscriptionRequest=agent_worker_pb2.AddSubscriptionRequest( - request_id=request_id, - subscription=agent_worker_pb2.Subscription( - typeSubscription=agent_worker_pb2.TypeSubscription( - topic_type=subscription.topic_type, agent_type=subscription.agent_type - ) - ), - ) - ) await self._host_connection.send(message) # Wait for the subscription response. diff --git a/python/packages/autogen-core/src/autogen_core/application/_worker_runtime_host_servicer.py b/python/packages/autogen-core/src/autogen_core/application/_worker_runtime_host_servicer.py index 3da50c56f..5cd2bf8ea 100644 --- a/python/packages/autogen-core/src/autogen_core/application/_worker_runtime_host_servicer.py +++ b/python/packages/autogen-core/src/autogen_core/application/_worker_runtime_host_servicer.py @@ -4,7 +4,9 @@ from _collections_abc import AsyncIterator, Iterator from asyncio import Future, Task from typing import Any, Dict, Set -from ..base import TopicId +from autogen_core.components._type_prefix_subscription import TypePrefixSubscription + +from ..base import Subscription, TopicId from ..components import TypeSubscription from ._helpers import SubscriptionManager from ._utils import GRPC_IMPORT_ERROR_STR @@ -221,34 +223,46 @@ class WorkerAgentRuntimeHostServicer(agent_worker_pb2_grpc.AgentRpcServicer): self, add_subscription_req: agent_worker_pb2.AddSubscriptionRequest, client_id: int ) -> None: oneofcase = add_subscription_req.subscription.WhichOneof("subscription") + subscription: Subscription | None = None match oneofcase: case "typeSubscription": type_subscription_msg: agent_worker_pb2.TypeSubscription = ( add_subscription_req.subscription.typeSubscription ) - type_subscription = TypeSubscription( + subscription = TypeSubscription( topic_type=type_subscription_msg.topic_type, agent_type=type_subscription_msg.agent_type ) - try: - await self._subscription_manager.add_subscription(type_subscription) - subscription_ids = self._client_id_to_subscription_id_mapping.setdefault(client_id, set()) - subscription_ids.add(type_subscription.id) - success = True - error = None - except ValueError as e: - success = False - error = str(e) - # Send a response back to the client. - await self._send_queues[client_id].put( - agent_worker_pb2.Message( - addSubscriptionResponse=agent_worker_pb2.AddSubscriptionResponse( - request_id=add_subscription_req.request_id, success=success, error=error - ) - ) + + case "typePrefixSubscription": + type_prefix_subscription_msg: agent_worker_pb2.TypePrefixSubscription = ( + add_subscription_req.subscription.typePrefixSubscription + ) + subscription = TypePrefixSubscription( + topic_type_prefix=type_prefix_subscription_msg.topic_type_prefix, + agent_type=type_prefix_subscription_msg.agent_type, ) case None: logger.warning("Received empty subscription message") + if subscription is not None: + try: + await self._subscription_manager.add_subscription(subscription) + subscription_ids = self._client_id_to_subscription_id_mapping.setdefault(client_id, set()) + subscription_ids.add(subscription.id) + success = True + error = None + except ValueError as e: + success = False + error = str(e) + # Send a response back to the client. + await self._send_queues[client_id].put( + agent_worker_pb2.Message( + addSubscriptionResponse=agent_worker_pb2.AddSubscriptionResponse( + request_id=add_subscription_req.request_id, success=success, error=error + ) + ) + ) + async def GetState( # type: ignore self, request: agent_worker_pb2.AgentId, diff --git a/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2.py b/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2.py index 0637e866c..8f143d770 100644 --- a/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2.py +++ b/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2.py @@ -16,7 +16,7 @@ import cloudevent_pb2 as cloudevent__pb2 from google.protobuf import any_pb2 as google_dot_protobuf_dot_any__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x12\x61gent_worker.proto\x12\x06\x61gents\x1a\x10\x63loudevent.proto\x1a\x19google/protobuf/any.proto\"\'\n\x07TopicId\x12\x0c\n\x04type\x18\x01 \x01(\t\x12\x0e\n\x06source\x18\x02 \x01(\t\"$\n\x07\x41gentId\x12\x0c\n\x04type\x18\x01 \x01(\t\x12\x0b\n\x03key\x18\x02 \x01(\t\"E\n\x07Payload\x12\x11\n\tdata_type\x18\x01 \x01(\t\x12\x19\n\x11\x64\x61ta_content_type\x18\x02 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x03 \x01(\x0c\"\x89\x02\n\nRpcRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12$\n\x06source\x18\x02 \x01(\x0b\x32\x0f.agents.AgentIdH\x00\x88\x01\x01\x12\x1f\n\x06target\x18\x03 \x01(\x0b\x32\x0f.agents.AgentId\x12\x0e\n\x06method\x18\x04 \x01(\t\x12 \n\x07payload\x18\x05 \x01(\x0b\x32\x0f.agents.Payload\x12\x32\n\x08metadata\x18\x06 \x03(\x0b\x32 .agents.RpcRequest.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\t\n\x07_source\"\xb8\x01\n\x0bRpcResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12 \n\x07payload\x18\x02 \x01(\x0b\x32\x0f.agents.Payload\x12\r\n\x05\x65rror\x18\x03 \x01(\t\x12\x33\n\x08metadata\x18\x04 \x03(\x0b\x32!.agents.RpcResponse.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xe4\x01\n\x05\x45vent\x12\x12\n\ntopic_type\x18\x01 \x01(\t\x12\x14\n\x0ctopic_source\x18\x02 \x01(\t\x12$\n\x06source\x18\x03 \x01(\x0b\x32\x0f.agents.AgentIdH\x00\x88\x01\x01\x12 \n\x07payload\x18\x04 \x01(\x0b\x32\x0f.agents.Payload\x12-\n\x08metadata\x18\x05 \x03(\x0b\x32\x1b.agents.Event.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\t\n\x07_source\"<\n\x18RegisterAgentTypeRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0c\n\x04type\x18\x02 \x01(\t\"^\n\x19RegisterAgentTypeResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x12\n\x05\x65rror\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\":\n\x10TypeSubscription\x12\x12\n\ntopic_type\x18\x01 \x01(\t\x12\x12\n\nagent_type\x18\x02 \x01(\t\"T\n\x0cSubscription\x12\x34\n\x10typeSubscription\x18\x01 \x01(\x0b\x32\x18.agents.TypeSubscriptionH\x00\x42\x0e\n\x0csubscription\"X\n\x16\x41\x64\x64SubscriptionRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12*\n\x0csubscription\x18\x02 \x01(\x0b\x32\x14.agents.Subscription\"\\\n\x17\x41\x64\x64SubscriptionResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x12\n\x05\x65rror\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\"\x9d\x01\n\nAgentState\x12!\n\x08\x61gent_id\x18\x01 \x01(\x0b\x32\x0f.agents.AgentId\x12\x0c\n\x04\x65Tag\x18\x02 \x01(\t\x12\x15\n\x0b\x62inary_data\x18\x03 \x01(\x0cH\x00\x12\x13\n\ttext_data\x18\x04 \x01(\tH\x00\x12*\n\nproto_data\x18\x05 \x01(\x0b\x32\x14.google.protobuf.AnyH\x00\x42\x06\n\x04\x64\x61ta\"j\n\x10GetStateResponse\x12\'\n\x0b\x61gent_state\x18\x01 \x01(\x0b\x32\x12.agents.AgentState\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x12\n\x05\x65rror\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\"B\n\x11SaveStateResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\x12\x12\n\x05\x65rror\x18\x02 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\"\xc6\x03\n\x07Message\x12%\n\x07request\x18\x01 \x01(\x0b\x32\x12.agents.RpcRequestH\x00\x12\'\n\x08response\x18\x02 \x01(\x0b\x32\x13.agents.RpcResponseH\x00\x12\x1e\n\x05\x65vent\x18\x03 \x01(\x0b\x32\r.agents.EventH\x00\x12\x44\n\x18registerAgentTypeRequest\x18\x04 \x01(\x0b\x32 .agents.RegisterAgentTypeRequestH\x00\x12\x46\n\x19registerAgentTypeResponse\x18\x05 \x01(\x0b\x32!.agents.RegisterAgentTypeResponseH\x00\x12@\n\x16\x61\x64\x64SubscriptionRequest\x18\x06 \x01(\x0b\x32\x1e.agents.AddSubscriptionRequestH\x00\x12\x42\n\x17\x61\x64\x64SubscriptionResponse\x18\x07 \x01(\x0b\x32\x1f.agents.AddSubscriptionResponseH\x00\x12,\n\ncloudEvent\x18\x08 \x01(\x0b\x32\x16.cloudevent.CloudEventH\x00\x42\t\n\x07message2\xb2\x01\n\x08\x41gentRpc\x12\x33\n\x0bOpenChannel\x12\x0f.agents.Message\x1a\x0f.agents.Message(\x01\x30\x01\x12\x35\n\x08GetState\x12\x0f.agents.AgentId\x1a\x18.agents.GetStateResponse\x12:\n\tSaveState\x12\x12.agents.AgentState\x1a\x19.agents.SaveStateResponseB!\xaa\x02\x1eMicrosoft.AutoGen.Abstractionsb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x12\x61gent_worker.proto\x12\x06\x61gents\x1a\x10\x63loudevent.proto\x1a\x19google/protobuf/any.proto\"\'\n\x07TopicId\x12\x0c\n\x04type\x18\x01 \x01(\t\x12\x0e\n\x06source\x18\x02 \x01(\t\"$\n\x07\x41gentId\x12\x0c\n\x04type\x18\x01 \x01(\t\x12\x0b\n\x03key\x18\x02 \x01(\t\"E\n\x07Payload\x12\x11\n\tdata_type\x18\x01 \x01(\t\x12\x19\n\x11\x64\x61ta_content_type\x18\x02 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x03 \x01(\x0c\"\x89\x02\n\nRpcRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12$\n\x06source\x18\x02 \x01(\x0b\x32\x0f.agents.AgentIdH\x00\x88\x01\x01\x12\x1f\n\x06target\x18\x03 \x01(\x0b\x32\x0f.agents.AgentId\x12\x0e\n\x06method\x18\x04 \x01(\t\x12 \n\x07payload\x18\x05 \x01(\x0b\x32\x0f.agents.Payload\x12\x32\n\x08metadata\x18\x06 \x03(\x0b\x32 .agents.RpcRequest.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\t\n\x07_source\"\xb8\x01\n\x0bRpcResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12 \n\x07payload\x18\x02 \x01(\x0b\x32\x0f.agents.Payload\x12\r\n\x05\x65rror\x18\x03 \x01(\t\x12\x33\n\x08metadata\x18\x04 \x03(\x0b\x32!.agents.RpcResponse.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xe4\x01\n\x05\x45vent\x12\x12\n\ntopic_type\x18\x01 \x01(\t\x12\x14\n\x0ctopic_source\x18\x02 \x01(\t\x12$\n\x06source\x18\x03 \x01(\x0b\x32\x0f.agents.AgentIdH\x00\x88\x01\x01\x12 \n\x07payload\x18\x04 \x01(\x0b\x32\x0f.agents.Payload\x12-\n\x08metadata\x18\x05 \x03(\x0b\x32\x1b.agents.Event.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\t\n\x07_source\"<\n\x18RegisterAgentTypeRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0c\n\x04type\x18\x02 \x01(\t\"^\n\x19RegisterAgentTypeResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x12\n\x05\x65rror\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\":\n\x10TypeSubscription\x12\x12\n\ntopic_type\x18\x01 \x01(\t\x12\x12\n\nagent_type\x18\x02 \x01(\t\"G\n\x16TypePrefixSubscription\x12\x19\n\x11topic_type_prefix\x18\x01 \x01(\t\x12\x12\n\nagent_type\x18\x02 \x01(\t\"\x96\x01\n\x0cSubscription\x12\x34\n\x10typeSubscription\x18\x01 \x01(\x0b\x32\x18.agents.TypeSubscriptionH\x00\x12@\n\x16typePrefixSubscription\x18\x02 \x01(\x0b\x32\x1e.agents.TypePrefixSubscriptionH\x00\x42\x0e\n\x0csubscription\"X\n\x16\x41\x64\x64SubscriptionRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12*\n\x0csubscription\x18\x02 \x01(\x0b\x32\x14.agents.Subscription\"\\\n\x17\x41\x64\x64SubscriptionResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x12\n\x05\x65rror\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\"\x9d\x01\n\nAgentState\x12!\n\x08\x61gent_id\x18\x01 \x01(\x0b\x32\x0f.agents.AgentId\x12\x0c\n\x04\x65Tag\x18\x02 \x01(\t\x12\x15\n\x0b\x62inary_data\x18\x03 \x01(\x0cH\x00\x12\x13\n\ttext_data\x18\x04 \x01(\tH\x00\x12*\n\nproto_data\x18\x05 \x01(\x0b\x32\x14.google.protobuf.AnyH\x00\x42\x06\n\x04\x64\x61ta\"j\n\x10GetStateResponse\x12\'\n\x0b\x61gent_state\x18\x01 \x01(\x0b\x32\x12.agents.AgentState\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x12\n\x05\x65rror\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\"B\n\x11SaveStateResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\x12\x12\n\x05\x65rror\x18\x02 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\"\xc6\x03\n\x07Message\x12%\n\x07request\x18\x01 \x01(\x0b\x32\x12.agents.RpcRequestH\x00\x12\'\n\x08response\x18\x02 \x01(\x0b\x32\x13.agents.RpcResponseH\x00\x12\x1e\n\x05\x65vent\x18\x03 \x01(\x0b\x32\r.agents.EventH\x00\x12\x44\n\x18registerAgentTypeRequest\x18\x04 \x01(\x0b\x32 .agents.RegisterAgentTypeRequestH\x00\x12\x46\n\x19registerAgentTypeResponse\x18\x05 \x01(\x0b\x32!.agents.RegisterAgentTypeResponseH\x00\x12@\n\x16\x61\x64\x64SubscriptionRequest\x18\x06 \x01(\x0b\x32\x1e.agents.AddSubscriptionRequestH\x00\x12\x42\n\x17\x61\x64\x64SubscriptionResponse\x18\x07 \x01(\x0b\x32\x1f.agents.AddSubscriptionResponseH\x00\x12,\n\ncloudEvent\x18\x08 \x01(\x0b\x32\x16.cloudevent.CloudEventH\x00\x42\t\n\x07message2\xb2\x01\n\x08\x41gentRpc\x12\x33\n\x0bOpenChannel\x12\x0f.agents.Message\x1a\x0f.agents.Message(\x01\x30\x01\x12\x35\n\x08GetState\x12\x0f.agents.AgentId\x1a\x18.agents.GetStateResponse\x12:\n\tSaveState\x12\x12.agents.AgentState\x1a\x19.agents.SaveStateResponseB!\xaa\x02\x1eMicrosoft.AutoGen.Abstractionsb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -54,20 +54,22 @@ if _descriptor._USE_C_DESCRIPTORS == False: _globals['_REGISTERAGENTTYPERESPONSE']._serialized_end=1067 _globals['_TYPESUBSCRIPTION']._serialized_start=1069 _globals['_TYPESUBSCRIPTION']._serialized_end=1127 - _globals['_SUBSCRIPTION']._serialized_start=1129 - _globals['_SUBSCRIPTION']._serialized_end=1213 - _globals['_ADDSUBSCRIPTIONREQUEST']._serialized_start=1215 - _globals['_ADDSUBSCRIPTIONREQUEST']._serialized_end=1303 - _globals['_ADDSUBSCRIPTIONRESPONSE']._serialized_start=1305 - _globals['_ADDSUBSCRIPTIONRESPONSE']._serialized_end=1397 - _globals['_AGENTSTATE']._serialized_start=1400 - _globals['_AGENTSTATE']._serialized_end=1557 - _globals['_GETSTATERESPONSE']._serialized_start=1559 - _globals['_GETSTATERESPONSE']._serialized_end=1665 - _globals['_SAVESTATERESPONSE']._serialized_start=1667 - _globals['_SAVESTATERESPONSE']._serialized_end=1733 - _globals['_MESSAGE']._serialized_start=1736 - _globals['_MESSAGE']._serialized_end=2190 - _globals['_AGENTRPC']._serialized_start=2193 - _globals['_AGENTRPC']._serialized_end=2371 + _globals['_TYPEPREFIXSUBSCRIPTION']._serialized_start=1129 + _globals['_TYPEPREFIXSUBSCRIPTION']._serialized_end=1200 + _globals['_SUBSCRIPTION']._serialized_start=1203 + _globals['_SUBSCRIPTION']._serialized_end=1353 + _globals['_ADDSUBSCRIPTIONREQUEST']._serialized_start=1355 + _globals['_ADDSUBSCRIPTIONREQUEST']._serialized_end=1443 + _globals['_ADDSUBSCRIPTIONRESPONSE']._serialized_start=1445 + _globals['_ADDSUBSCRIPTIONRESPONSE']._serialized_end=1537 + _globals['_AGENTSTATE']._serialized_start=1540 + _globals['_AGENTSTATE']._serialized_end=1697 + _globals['_GETSTATERESPONSE']._serialized_start=1699 + _globals['_GETSTATERESPONSE']._serialized_end=1805 + _globals['_SAVESTATERESPONSE']._serialized_start=1807 + _globals['_SAVESTATERESPONSE']._serialized_end=1873 + _globals['_MESSAGE']._serialized_start=1876 + _globals['_MESSAGE']._serialized_end=2330 + _globals['_AGENTRPC']._serialized_start=2333 + _globals['_AGENTRPC']._serialized_end=2511 # @@protoc_insertion_point(module_scope) diff --git a/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2.pyi b/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2.pyi index 522124ab8..728bfafcc 100644 --- a/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2.pyi +++ b/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2.pyi @@ -273,21 +273,43 @@ class TypeSubscription(google.protobuf.message.Message): global___TypeSubscription = TypeSubscription +@typing.final +class TypePrefixSubscription(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + TOPIC_TYPE_PREFIX_FIELD_NUMBER: builtins.int + AGENT_TYPE_FIELD_NUMBER: builtins.int + topic_type_prefix: builtins.str + agent_type: builtins.str + def __init__( + self, + *, + topic_type_prefix: builtins.str = ..., + agent_type: builtins.str = ..., + ) -> None: ... + def ClearField(self, field_name: typing.Literal["agent_type", b"agent_type", "topic_type_prefix", b"topic_type_prefix"]) -> None: ... + +global___TypePrefixSubscription = TypePrefixSubscription + @typing.final class Subscription(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor TYPESUBSCRIPTION_FIELD_NUMBER: builtins.int + TYPEPREFIXSUBSCRIPTION_FIELD_NUMBER: builtins.int @property def typeSubscription(self) -> global___TypeSubscription: ... + @property + def typePrefixSubscription(self) -> global___TypePrefixSubscription: ... def __init__( self, *, typeSubscription: global___TypeSubscription | None = ..., + typePrefixSubscription: global___TypePrefixSubscription | None = ..., ) -> None: ... - def HasField(self, field_name: typing.Literal["subscription", b"subscription", "typeSubscription", b"typeSubscription"]) -> builtins.bool: ... - def ClearField(self, field_name: typing.Literal["subscription", b"subscription", "typeSubscription", b"typeSubscription"]) -> None: ... - def WhichOneof(self, oneof_group: typing.Literal["subscription", b"subscription"]) -> typing.Literal["typeSubscription"] | None: ... + def HasField(self, field_name: typing.Literal["subscription", b"subscription", "typePrefixSubscription", b"typePrefixSubscription", "typeSubscription", b"typeSubscription"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["subscription", b"subscription", "typePrefixSubscription", b"typePrefixSubscription", "typeSubscription", b"typeSubscription"]) -> None: ... + def WhichOneof(self, oneof_group: typing.Literal["subscription", b"subscription"]) -> typing.Literal["typeSubscription", "typePrefixSubscription"] | None: ... global___Subscription = Subscription diff --git a/python/packages/autogen-core/src/autogen_core/components/__init__.py b/python/packages/autogen-core/src/autogen_core/components/__init__.py index 9ad8bdb35..4c4d02f3b 100644 --- a/python/packages/autogen-core/src/autogen_core/components/__init__.py +++ b/python/packages/autogen-core/src/autogen_core/components/__init__.py @@ -7,6 +7,7 @@ from ._default_subscription import DefaultSubscription, default_subscription, ty from ._default_topic import DefaultTopicId from ._image import Image from ._routed_agent import RoutedAgent, TypeRoutedAgent, event, message_handler, rpc +from ._type_prefix_subscription import TypePrefixSubscription from ._type_subscription import TypeSubscription from ._types import FunctionCall @@ -24,4 +25,5 @@ __all__ = [ "DefaultTopicId", "default_subscription", "type_subscription", + "TypePrefixSubscription", ] diff --git a/python/packages/autogen-core/src/autogen_core/components/_type_prefix_subscription.py b/python/packages/autogen-core/src/autogen_core/components/_type_prefix_subscription.py new file mode 100644 index 000000000..d71b587d1 --- /dev/null +++ b/python/packages/autogen-core/src/autogen_core/components/_type_prefix_subscription.py @@ -0,0 +1,63 @@ +import uuid + +from ..base import AgentId, Subscription, TopicId +from ..base.exceptions import CantHandleException + + +class TypePrefixSubscription(Subscription): + """This subscription matches on topics based on a prefix of the type and maps to agents using the source of the topic as the agent key. + + This subscription causes each source to have its own agent instance. + + Example: + + .. code-block:: python + + from autogen_core.components import TypePrefixSubscription + + subscription = TypePrefixSubscription(topic_type_prefix="t1", agent_type="a1") + + In this case: + + - A topic_id with type `t1` and source `s1` will be handled by an agent of type `a1` with key `s1` + - A topic_id with type `t1` and source `s2` will be handled by an agent of type `a1` with key `s2`. + - A topic_id with type `t1SUFFIX` and source `s2` will be handled by an agent of type `a1` with key `s2`. + + Args: + topic_type_prefix (str): Topic type prefix to match against + agent_type (str): Agent type to handle this subscription + """ + + def __init__(self, topic_type_prefix: str, agent_type: str): + self._topic_type_prefix = topic_type_prefix + self._agent_type = agent_type + self._id = str(uuid.uuid4()) + + @property + def id(self) -> str: + return self._id + + @property + def topic_type_prefix(self) -> str: + return self._topic_type_prefix + + @property + def agent_type(self) -> str: + return self._agent_type + + def is_match(self, topic_id: TopicId) -> bool: + return topic_id.type.startswith(self._topic_type_prefix) + + def map_to_agent(self, topic_id: TopicId) -> AgentId: + if not self.is_match(topic_id): + raise CantHandleException("TopicId does not match the subscription") + + return AgentId(type=self._agent_type, key=topic_id.source) + + def __eq__(self, other: object) -> bool: + if not isinstance(other, TypePrefixSubscription): + return False + + return self.id == other.id or ( + self.agent_type == other.agent_type and self.topic_type_prefix == other.topic_type_prefix + ) diff --git a/python/packages/autogen-core/src/autogen_core/components/_type_subscription.py b/python/packages/autogen-core/src/autogen_core/components/_type_subscription.py index 92709a457..94def7659 100644 --- a/python/packages/autogen-core/src/autogen_core/components/_type_subscription.py +++ b/python/packages/autogen-core/src/autogen_core/components/_type_subscription.py @@ -1,7 +1,6 @@ import uuid -from typing import TypeVar -from ..base import AgentId, BaseAgent, Subscription, TopicId +from ..base import AgentId, Subscription, TopicId from ..base.exceptions import CantHandleException @@ -59,6 +58,3 @@ class TypeSubscription(Subscription): return False return self.id == other.id or (self.agent_type == other.agent_type and self.topic_type == other.topic_type) - - -BaseAgentType = TypeVar("BaseAgentType", bound="BaseAgent")