From 499b3fcbbfc99808c45fcf5a7c202836bca11d82 Mon Sep 17 00:00:00 2001 From: Jacob Alber Date: Tue, 1 Oct 2024 15:36:05 -0400 Subject: [PATCH] fix: Make cleanup code in docker CodeExecutor asyncio aware (#669) Co-authored-by: Jack Gerrits --- python/packages/autogen-core/pyproject.toml | 3 +- .../docker_command_line_code_executor.py | 11 +- .../tests/regressions/test_clean_terminate.py | 121 ++++++++++++++++++ python/uv.lock | 11 ++ 4 files changed, 139 insertions(+), 7 deletions(-) create mode 100644 python/packages/autogen-core/tests/regressions/test_clean_terminate.py diff --git a/python/packages/autogen-core/pyproject.toml b/python/packages/autogen-core/pyproject.toml index d3416b697..fc3127f99 100644 --- a/python/packages/autogen-core/pyproject.toml +++ b/python/packages/autogen-core/pyproject.toml @@ -24,7 +24,8 @@ dependencies = [ "tiktoken", "azure-core", "docker~=7.0", - "opentelemetry-api~=1.27.0" + "opentelemetry-api~=1.27.0", + "asyncio_atexit" ] [tool.uv] diff --git a/python/packages/autogen-core/src/autogen_core/components/code_executor/_impl/docker_command_line_code_executor.py b/python/packages/autogen-core/src/autogen_core/components/code_executor/_impl/docker_command_line_code_executor.py index 96fd291cb..9a7385606 100644 --- a/python/packages/autogen-core/src/autogen_core/components/code_executor/_impl/docker_command_line_code_executor.py +++ b/python/packages/autogen-core/src/autogen_core/components/code_executor/_impl/docker_command_line_code_executor.py @@ -4,7 +4,6 @@ from __future__ import annotations import asyncio -import atexit import logging import shlex import sys @@ -15,6 +14,7 @@ from pathlib import Path from types import TracebackType from typing import Any, Callable, ClassVar, List, Optional, ParamSpec, Type, Union +import asyncio_atexit import docker import docker.models import docker.models.containers @@ -326,13 +326,12 @@ $functions""" await _wait_for_ready(self._container) - def cleanup() -> None: - loop = asyncio.get_event_loop() - loop.run_until_complete(self.stop()) - atexit.unregister(cleanup) + async def cleanup() -> None: + await self.stop() + asyncio_atexit.unregister(cleanup) # type: ignore if self._stop_container: - atexit.register(cleanup) + asyncio_atexit.register(cleanup) # type: ignore # Check if the container is running if self._container.status != "running": diff --git a/python/packages/autogen-core/tests/regressions/test_clean_terminate.py b/python/packages/autogen-core/tests/regressions/test_clean_terminate.py new file mode 100644 index 000000000..5ecd02fdc --- /dev/null +++ b/python/packages/autogen-core/tests/regressions/test_clean_terminate.py @@ -0,0 +1,121 @@ +import asyncio +import typing as t +from functools import partial +from typing import Protocol + +import asyncio_atexit +import pytest + + +class AtExitImpl(Protocol): + def register(self, func: t.Callable[..., t.Any], /, *args: t.Any, **kwargs: t.Any) -> t.Callable[..., t.Any]: ... + def unregister(self, func: t.Callable[..., t.Any], /) -> None: ... + + +class AtExitSimulator(AtExitImpl): + def __init__(self) -> None: + self._funcs: t.List[t.Callable[..., t.Any]] = [] + + def complete(self) -> None: + for func in self._funcs: + func() + + self._funcs.clear() + + def register(self, func: t.Callable[..., t.Any], /, *args: t.Any, **kwargs: t.Any) -> t.Callable[..., t.Any]: + self._funcs.append(func) + return func + + def unregister(self, func: t.Callable[..., t.Any], /) -> None: + self._funcs.remove(func) + + +class AsyncioAtExitWrapper(AtExitImpl): + """This only exists to make mypy happy""" + + def register(self, func: t.Callable[..., t.Any], /, *args: t.Any, **kwargs: t.Any) -> t.Callable[..., t.Any]: + loop = None + if "loop" in kwargs: + loop = kwargs["loop"] + kwargs.pop("loop") + + wrapper = partial(func, *args, **kwargs) + + asyncio_atexit.register(wrapper, loop=loop) # type: ignore + + return func + + def unregister(self, func: t.Callable[..., t.Any], /, **kwargs: t.Any) -> None: + loop = None + if "loop" in kwargs: + loop = kwargs["loop"] + kwargs.pop("loop") + + asyncio_atexit.unregister(func, loop=loop) # type: ignore + + +# From Issue #584: No EventLoop error when agents exit. +# see: https://github.com/microsoft/agnext/issues/584 + + +# This is a minimal implementation of a component that requires cleanup on exit. +class CleanupComponent: + def __init__(self, atexit_impl: AtExitImpl, use_async_cleanup: bool) -> None: + self.atexit_impl = atexit_impl + self.cleanup_has_run = False + self.stop_has_run = False + + self.cleanup = self._acleanup if use_async_cleanup else self._cleanup + self.atexit_impl.register(self.cleanup) + + async def stop(self) -> None: + self.stop_has_run = True + + async def _acleanup(self) -> None: + self.cleanup_has_run = True + await self.stop() + + def _cleanup(self) -> None: + self.cleanup_has_run = True + loop = asyncio.get_running_loop() + loop.run_until_complete(self.stop()) + + +async def create_component(atexit_impl: AtExitImpl, /, use_async_cleanup: bool) -> CleanupComponent: + await asyncio.sleep(0.001) + return CleanupComponent(atexit_impl, use_async_cleanup) + + +def run_test_impl(debug_printer: t.Callable[[str], t.Any] | None = None) -> None: + def validate(component: CleanupComponent, expect_exception: bool, expect_stop: bool) -> None: + if debug_printer is not None: + debug_printer(f"Cleanup ran: {component.cleanup_has_run} (expected True)") + debug_printer(f"Stop ran: {component.stop_has_run} (expected {expect_stop})") + + assert component.cleanup_has_run, "Cleanup should always run to be a faithful simulation." + assert component.stop_has_run == expect_stop + + # AtExitSimulator behaves like atexit.register, while causes cleanup relying on it to fail. + atexit_simulator = AtExitSimulator() + loop = asyncio.new_event_loop() + component = loop.run_until_complete(create_component(atexit_simulator, use_async_cleanup=False)) + loop.close() + + with pytest.raises(RuntimeError): + atexit_simulator.complete() + + validate(component, expect_exception=True, expect_stop=False) + + loop = asyncio.new_event_loop() + component = loop.run_until_complete(create_component(AsyncioAtExitWrapper(), use_async_cleanup=True)) + loop.close() + validate(component, expect_exception=False, expect_stop=True) + + +def test_asyncio_atexit_assumptions() -> None: + run_test_impl() + + +if __name__ == "__main__": + debug_printer = print + run_test_impl(debug_printer=debug_printer) diff --git a/python/uv.lock b/python/uv.lock index 6faa99604..f226e4ee7 100644 --- a/python/uv.lock +++ b/python/uv.lock @@ -324,6 +324,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/a7/fa/e01228c2938de91d47b307831c62ab9e4001e747789d0b05baf779a6488c/async_timeout-4.0.3-py3-none-any.whl", hash = "sha256:7405140ff1230c310e51dc27b3145b9092d659ce68ff733fb0cefe3ee42be028", size = 5721 }, ] +[[package]] +name = "asyncio-atexit" +version = "1.0.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/22/d3/dd2974be3f67c7ec96e0d6ab454429d0372cb7c7bffa3d0ac67a483cb801/asyncio-atexit-1.0.1.tar.gz", hash = "sha256:1d0c71544b8ee2c484d322844ee72c0875dde6f250c0ed5b6993592ab9f7d436", size = 4373 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/65/10/d6abaefa57a52646651fd0383c056280b0853c0106229ece6bb38cd14463/asyncio_atexit-1.0.1-py3-none-any.whl", hash = "sha256:d93d5f7d5633a534abd521ce2896ed0fbe8de170bb1e65ec871d1c20eac9d376", size = 3752 }, +] + [[package]] name = "attrs" version = "24.2.0" @@ -350,6 +359,7 @@ version = "0.4.0.dev0" source = { editable = "packages/autogen-core" } dependencies = [ { name = "aiohttp" }, + { name = "asyncio-atexit" }, { name = "azure-core" }, { name = "docker" }, { name = "grpcio" }, @@ -406,6 +416,7 @@ dev = [ [package.metadata] requires-dist = [ { name = "aiohttp" }, + { name = "asyncio-atexit" }, { name = "azure-core" }, { name = "docker", specifier = "~=7.0" }, { name = "grpcio", specifier = "~=1.62.0" },