Add core components for topics (#278)

* Add core components for topics

* rename topic to topic id
This commit is contained in:
Jack Gerrits 2024-07-26 14:38:08 -04:00 committed by GitHub
parent 61dec0a3ca
commit 7e75dc8df8
6 changed files with 117 additions and 1 deletions

View File

@ -5,6 +5,7 @@ The :mod:`agnext.components` module provides building blocks for creating single
from ._closure_agent import ClosureAgent
from ._image import Image
from ._type_routed_agent import TypeRoutedAgent, message_handler
from ._type_subscription import TypeSubscription
from ._types import FunctionCall
__all__ = ["Image", "TypeRoutedAgent", "ClosureAgent", "message_handler", "FunctionCall"]
__all__ = ["Image", "TypeRoutedAgent", "ClosureAgent", "message_handler", "FunctionCall", "TypeSubscription"]

View File

@ -0,0 +1,39 @@
from agnext.core.exceptions import CantHandleException
from ..core import AgentId, Subscription, TopicId
class TypeSubscription(Subscription):
def __init__(self, topic_type: str, agent_type: str):
"""This subscription matches on topics based on the type and maps to agents using the source of the topic as the agent key.
This subscription causes each source to have its own agent instance.
Example:
.. code-block:: python
subscription = TypeSubscription(topic_type="t1", agent_type="a1")
In this case:
- A topic_id with type `t1` and source `s1` will be handled by an agent of type `a1` with key `s1`
- A topic_id with type `t1` and source `s2` will be handled by an agent of type `a1` with key `s2`.
Args:
topic_type (str): Topic type to match against
agent_type (str): Agent type to handle this subscription
"""
self._topic_type = topic_type
self._agent_type = agent_type
def is_match(self, topic_id: TopicId) -> bool:
return topic_id.type == self._topic_type
def map_to_agent(self, topic_id: TopicId) -> AgentId:
if not self.is_match(topic_id):
raise CantHandleException("TopicId does not match the subscription")
# TODO: Update agentid to reflect agent type and key
return AgentId(name=self._agent_type, namespace=topic_id.source)

View File

@ -11,6 +11,8 @@ from ._agent_runtime import AGENT_INSTANTIATION_CONTEXT_VAR, AgentRuntime, agent
from ._base_agent import BaseAgent
from ._cancellation_token import CancellationToken
from ._serialization import MESSAGE_TYPE_REGISTRY, TypeDeserializer, TypeSerializer
from ._subscription import Subscription
from ._topic import TopicId
__all__ = [
"Agent",
@ -26,4 +28,6 @@ __all__ = [
"MESSAGE_TYPE_REGISTRY",
"TypeSerializer",
"TypeDeserializer",
"TopicId",
"Subscription",
]

View File

@ -0,0 +1,34 @@
from typing import Protocol
from agnext.core._agent_id import AgentId
from ._topic import TopicId
class Subscription(Protocol):
"""Subscriptions define the topics that an agent is interested in."""
def is_match(self, topic_id: TopicId) -> bool:
"""Check if a given topic_id matches the subscription.
Args:
topic_id (TopicId): TopicId to check.
Returns:
bool: True if the topic_id matches the subscription, False otherwise.
"""
...
def map_to_agent(self, topic_id: TopicId) -> AgentId:
"""Map a topic_id to an agent. Should only be called if `is_match` returns True for the given topic_id.
Args:
topic_id (TopicId): TopicId to map.
Returns:
AgentId: ID of the agent that should handle the topic_id.
Raises:
CantHandleException: If the subscription cannot handle the topic_id.
"""
...

View File

@ -0,0 +1,16 @@
from dataclasses import dataclass
@dataclass
class TopicId:
type: str
"""Type of the event that this topic_id contains. Adhere's to the cloud event spec.
Learn more here: https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md#type
"""
source: str
"""Identifies the context in which an event happened. Adhere's to the cloud event spec.
Learn more here: https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md#source-1
"""

View File

@ -0,0 +1,22 @@
from agnext.components import TypeSubscription
from agnext.core import TopicId, AgentId
import pytest
from agnext.core.exceptions import CantHandleException
def test_type_subscription_match() -> None:
sub = TypeSubscription(topic_type="t1", agent_type="a1")
assert sub.is_match(TopicId(type="t0", source="s1")) == False
assert sub.is_match(TopicId(type="t1", source="s1")) == True
assert sub.is_match(TopicId(type="t1", source="s2")) == True
def test_type_subscription_map() -> None:
sub = TypeSubscription(topic_type="t1", agent_type="a1")
assert sub.map_to_agent(TopicId(type="t1", source="s1")) == AgentId(name="a1", namespace="s1")
with pytest.raises(CantHandleException):
_agent_id = sub.map_to_agent(TopicId(type="t0", source="s1"))