Add example for termination using an intervention handler (#109)

This commit is contained in:
Jack Gerrits 2024-06-24 13:05:46 -04:00 committed by GitHub
parent 60acf8757d
commit bb202058ef
2 changed files with 88 additions and 0 deletions

View File

@ -0,0 +1,87 @@
# Termination using Intervention Handler
```{note}
This method is only really valid for single-tenant applications. If multiple parallel users are using the application via namespaces this approach will not work without modification.
```
There are many different ways to handle termination in `agnext`. Ultimately, the goal is to detect that the runtime no longer needs to be executed and you can proceed to finalization tasks. One way to do this is to use an `InterventionHandler` to detect a termination message and then act on it.
```python
import asyncio
from dataclasses import dataclass
from typing import Any, Generic, Type, TypeVar
from agnext.application import SingleThreadedAgentRuntime
from agnext.components import TypeRoutedAgent, message_handler
from agnext.core import AgentId, CancellationToken
from agnext.core.intervention import DefaultInterventionHandler
```
First, we define a dataclass that will be used to signal termination.
```python
@dataclass
class Termination:
reason: str
```
We code our agent to publish a termination message when it decides it is time to terminate.
```python
class AnAgent(TypeRoutedAgent):
def __init__(self) -> None:
super().__init__("MyAgent")
self.received = 0
@message_handler
async def on_new_message(self, message: str, cancellation_token: CancellationToken) -> None:
self.received += 1
if self.received > 3:
self.publish_message(Termination(reason="Reached maximum number of messages"))
```
Next, we create an InterventionHandler that will detect the termination message and act on it. This one hooks into publishes and when it encounters `Termination` it alters its internal state to indicate that termination has been requested.
```python
T = TypeVar("T")
class TerminationHandler(Generic[T], DefaultInterventionHandler):
def __init__(self, termination_type: Type[T]):
self.termination_type = termination_type
self.termination_value: T | None = None
async def on_publish(self, message: Any, *, sender: AgentId | None) -> Any:
if isinstance(message, Termination):
self.termination_value = self.termination_type(message.reason)
return message
@property
def termination_value(self) -> T | None:
return self.termination_value
@property
def has_terminated(self) -> bool:
return self.termination_value is not None
```
Finally, we add this handler to the runtime and use it to detect termination and cease running the `process_next` loop once it has encountered termination.
```python
async def main() -> None:
termination_handler = TerminationHandler(Termination)
runtime = SingleThreadedAgentRuntime(
before_send=termination_handler
)
# Add Agents and kick off task
while not termination_handler.has_terminated:
await runtime.process_next()
print(termination_handler.termination_value)
if __name__ == "__main__":
asyncio.run(main())
```

View File

@ -47,6 +47,7 @@ AGNext's developer API consists of the following layers:
guides/type-routed-agent
guides/group-chat-coder-reviewer
guides/azure-openai-with-aad-auth
guides/termination-with-intervention
.. toctree::