add cancellation support to docker executor (#6027)

Resolves #6013
This commit is contained in:
Eric Zhu 2025-03-19 21:29:01 -07:00 committed by GitHub
parent 855bcd711c
commit 9103359ef4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 82 additions and 38 deletions

View File

@ -12,7 +12,7 @@ from collections.abc import Sequence
from hashlib import sha256
from pathlib import Path
from types import TracebackType
from typing import Any, Callable, ClassVar, Dict, List, Optional, ParamSpec, Type, Union
from typing import Any, Callable, ClassVar, Dict, List, Optional, ParamSpec, Tuple, Type, Union
from autogen_core import CancellationToken, Component
from autogen_core.code_executor import (
@ -37,6 +37,17 @@ if sys.version_info >= (3, 11):
else:
from typing_extensions import Self
try:
import asyncio_atexit
import docker
from docker.errors import DockerException, ImageNotFound, NotFound
from docker.models.containers import Container
except ImportError as e:
raise RuntimeError(
"Missing dependecies for DockerCommandLineCodeExecutor. Please ensure the autogen-ext package was installed with the 'docker' extra."
) from e
async def _wait_for_ready(container: Any, timeout: int = 60, stop_time: float = 0.1) -> None:
elapsed_time = 0.0
@ -198,15 +209,9 @@ $functions"""
else:
self._setup_functions_complete = True
try:
from docker.models.containers import Container
except ImportError as e:
raise RuntimeError(
"Missing dependecies for DockerCommandLineCodeExecutor. Please ensure the autogen-ext package was installed with the 'docker' extra."
) from e
self._container: Container | None = None
self._running = False
self._cancellation_tasks: List[asyncio.Task[None]] = []
@property
def timeout(self) -> int:
@ -256,6 +261,31 @@ $functions"""
self._setup_functions_complete = True
async def _kill_running_command(self, command: List[str]) -> None:
if self._container is None or not self._running:
return
await asyncio.to_thread(self._container.exec_run, ["pkill", "-f", " ".join(command)])
async def _execute_command(self, command: List[str], cancellation_token: CancellationToken) -> Tuple[str, int]:
if self._container is None or not self._running:
raise ValueError("Container is not running. Must first be started with either start or a context manager.")
exec_task = asyncio.create_task(asyncio.to_thread(self._container.exec_run, command))
cancellation_token.link_future(exec_task)
# Wait for the exec task to finish.
try:
result = await exec_task
exit_code = result.exit_code
output = result.output.decode("utf-8")
if exit_code == 124:
output += "\n Timeout"
return output, exit_code
except asyncio.CancelledError:
# Schedule a task to kill the running command in the background.
self._cancellation_tasks.append(asyncio.create_task(self._kill_running_command(command)))
return "Code execution was cancelled.", 1
async def _execute_code_dont_check_setup(
self, code_blocks: List[CodeBlock], cancellation_token: CancellationToken
) -> CommandLineCodeResult:
@ -290,13 +320,8 @@ $functions"""
command = ["timeout", str(self._timeout), lang_to_cmd(lang), filename]
result = await asyncio.to_thread(self._container.exec_run, command) # type: ignore
exit_code = result.exit_code
output = result.output.decode("utf-8")
if exit_code == 124:
output += "\n Timeout"
output, exit_code = await self._execute_command(command, cancellation_token)
outputs.append(output)
last_exit_code = exit_code
if exit_code != 0:
break
@ -315,11 +340,6 @@ $functions"""
Returns:
CommandlineCodeResult: The result of the code execution."""
def raise_not_implemented() -> None:
raise NotImplementedError("Cancellation is not yet supported for DockerCommandLineCodeExecutor")
cancellation_token.add_callback(lambda: raise_not_implemented())
if not self._setup_functions_complete:
await self._setup_functions(cancellation_token)
@ -342,17 +362,12 @@ $functions"""
if not self._running:
return
try:
import docker
from docker.errors import NotFound
except ImportError as e:
raise RuntimeError(
"Missing dependecies for DockerCommandLineCodeExecutor. Please ensure the autogen-ext package was installed with the 'docker' extra."
) from e
client = docker.from_env()
try:
container = await asyncio.to_thread(client.containers.get, self.container_name)
# Wait for all cancellation tasks to finish before stopping the container.
await asyncio.gather(*self._cancellation_tasks)
# Stop the container.
await asyncio.to_thread(container.stop)
except NotFound:
pass
@ -360,16 +375,6 @@ $functions"""
self._running = False
async def start(self) -> None:
try:
import asyncio_atexit
import docker
from docker.errors import DockerException, ImageNotFound
except ImportError as e:
raise RuntimeError(
"Missing dependecies for DockerCommandLineCodeExecutor. Please ensure the autogen-ext package was installed with the 'docker' extra."
) from e
# Start a container from the image, read to exec commands later
try:
client = docker.from_env()

View File

@ -98,7 +98,14 @@ async def test_commandline_code_executor_cancellation() -> None:
with tempfile.TemporaryDirectory() as temp_dir:
cancellation_token = CancellationToken()
executor = LocalCommandLineCodeExecutor(work_dir=temp_dir)
code_blocks = [CodeBlock(code="import time; time.sleep(10); print('hello world!')", language="python")]
# Write code that sleep for 10 seconds and then write "hello world!"
# to a file.
code = """import time
time.sleep(10)
with open("hello.txt", "w") as f:
f.write("hello world!")
"""
code_blocks = [CodeBlock(code=code, language="python")]
coro = executor.execute_code_blocks(code_blocks, cancellation_token)
@ -108,6 +115,10 @@ async def test_commandline_code_executor_cancellation() -> None:
assert code_result.exit_code and "Cancelled" in code_result.output
# Check if the file is not created.
hello_file = Path(temp_dir) / "hello.txt"
assert not hello_file.exists()
@pytest.mark.asyncio
async def test_local_commandline_code_executor_restart() -> None:

View File

@ -1,4 +1,5 @@
# mypy: disable-error-code="no-any-unimported"
import asyncio
import os
import sys
import tempfile
@ -107,6 +108,33 @@ async def test_commandline_code_executor_timeout(executor_and_temp_dir: Executor
assert code_result.exit_code and "Timeout" in code_result.output
@pytest.mark.asyncio
@pytest.mark.parametrize("executor_and_temp_dir", ["docker"], indirect=True)
async def test_commandline_code_executor_cancellation(executor_and_temp_dir: ExecutorFixture) -> None:
_executor, temp_dir = executor_and_temp_dir
cancellation_token = CancellationToken()
# Write code that sleep for 10 seconds and then write "hello world!"
# to a file.
code = """import time
time.sleep(10)
with open("hello.txt", "w") as f:
f.write("hello world!")
"""
code_blocks = [CodeBlock(code=code, language="python")]
task = asyncio.create_task(_executor.execute_code_blocks(code_blocks, cancellation_token))
# Cancel the task after 2 seconds
await asyncio.sleep(2)
cancellation_token.cancel()
code_result = await task
assert code_result.exit_code and "Code execution was cancelled" in code_result.output
# Check if the file was not created
hello_file_path = Path(temp_dir) / "hello.txt"
assert not hello_file_path.exists(), f"File {hello_file_path} should not exist after cancellation"
@pytest.mark.asyncio
@pytest.mark.parametrize("executor_and_temp_dir", ["docker"], indirect=True)
async def test_invalid_relative_path(executor_and_temp_dir: ExecutorFixture) -> None: