diff --git a/python/packages/autogen-agentchat/src/autogen_agentchat/task/__init__.py b/python/packages/autogen-agentchat/src/autogen_agentchat/task/__init__.py index 2a57b0e9c..dd7b6265a 100644 --- a/python/packages/autogen-agentchat/src/autogen_agentchat/task/__init__.py +++ b/python/packages/autogen-agentchat/src/autogen_agentchat/task/__init__.py @@ -1,5 +1,6 @@ from ._console import Console from ._terminations import ( + ExternalTermination, HandoffTermination, MaxMessageTermination, StopMessageTermination, @@ -15,5 +16,6 @@ __all__ = [ "TokenUsageTermination", "HandoffTermination", "TimeoutTermination", + "ExternalTermination", "Console", ] diff --git a/python/packages/autogen-agentchat/src/autogen_agentchat/task/_terminations.py b/python/packages/autogen-agentchat/src/autogen_agentchat/task/_terminations.py index 16d44b984..31b8d9d3e 100644 --- a/python/packages/autogen-agentchat/src/autogen_agentchat/task/_terminations.py +++ b/python/packages/autogen-agentchat/src/autogen_agentchat/task/_terminations.py @@ -208,3 +208,45 @@ class TimeoutTermination(TerminationCondition): async def reset(self) -> None: self._start_time = time.monotonic() self._terminated = False + + +class ExternalTermination(TerminationCondition): + """A termination condition that is externally controlled + by calling the :meth:`set` method. + + Example: + + .. code-block:: python + + termination = ExternalTermination() + + # Run the team in an asyncio task. + ... + + # Set the termination condition externally + termination.set() + + """ + + def __init__(self) -> None: + self._terminated = False + self._setted = False + + @property + def terminated(self) -> bool: + return self._terminated + + def set(self) -> None: + self._setted = True + + async def __call__(self, messages: Sequence[AgentMessage]) -> StopMessage | None: + if self._terminated: + raise TerminatedException("Termination condition has already been reached") + if self._setted: + self._terminated = True + return StopMessage(content="External termination requested", source="ExternalTermination") + return None + + async def reset(self) -> None: + self._terminated = False + self._setted = False diff --git a/python/packages/autogen-agentchat/tests/test_termination_condition.py b/python/packages/autogen-agentchat/tests/test_termination_condition.py index a56d8df35..c09e0e1c1 100644 --- a/python/packages/autogen-agentchat/tests/test_termination_condition.py +++ b/python/packages/autogen-agentchat/tests/test_termination_condition.py @@ -3,6 +3,7 @@ import asyncio import pytest from autogen_agentchat.messages import HandoffMessage, StopMessage, TextMessage from autogen_agentchat.task import ( + ExternalTermination, HandoffTermination, MaxMessageTermination, StopMessageTermination, @@ -226,3 +227,18 @@ async def test_timeout_termination() -> None: assert await termination([TextMessage(content="Hello", source="user")]) is None await asyncio.sleep(0.2) assert await termination([TextMessage(content="World", source="user")]) is not None + + +@pytest.mark.asyncio +async def test_external_termination() -> None: + termination = ExternalTermination() + + assert await termination([]) is None + assert not termination.terminated + + termination.set() + assert await termination([]) is not None + assert termination.terminated + + await termination.reset() + assert await termination([]) is None