make code execution async (#219)

* make code execution async

* python 3.10 does not support asyncio.timeout()

* make code execution cancellable

* make code execution async

* python 3.10 does not support asyncio.timeout()

* make code execution cancellable

* make entire callstack for code_executor async

* Update python/src/agnext/components/code_executor/_impl/local_commandline_code_executor.py

Co-authored-by: Jack Gerrits <jackgerrits@users.noreply.github.com>

* fix variable description

* remove unnecessary code

* fix usage of execute_code_blocks

* fix usage of execute_code_blocks

---------

Co-authored-by: Jack Gerrits <jackgerrits@users.noreply.github.com>
Co-authored-by: Eric Zhu <ekzhu@users.noreply.github.com>
This commit is contained in:
peterychang 2024-07-26 18:37:34 -04:00 committed by GitHub
parent 53343972f0
commit 070a97ceaa
8 changed files with 100 additions and 82 deletions

View File

@ -169,11 +169,7 @@ class Executor(TypeRoutedAgent):
code = self._extract_execution_request(message.execution_request)
if code is not None:
execution_requests = [CodeBlock(code=code, language="python")]
future = asyncio.get_event_loop().run_in_executor(
None, self._executor.execute_code_blocks, execution_requests
)
cancellation_token.link_future(future)
result = await future
result = await self._executor.execute_code_blocks(execution_requests)
await self.publish_message(
CodeExecutionResultMessage(
output=result.output,

View File

@ -155,9 +155,7 @@ class Executor(TypeRoutedAgent):
)
return
# Execute code blocks.
future = asyncio.get_event_loop().run_in_executor(None, self._executor.execute_code_blocks, code_blocks)
cancellation_token.link_future(future)
result = await future
result = await self._executor.execute_code_blocks(code_blocks=code_blocks)
# Publish the code execution result.
await self.publish_message(
CodeExecutionTaskResult(output=result.output, exit_code=result.exit_code, session_id=message.session_id),

View File

@ -27,7 +27,7 @@ class CodeResult:
class CodeExecutor(Protocol):
"""Executes code blocks and returns the result."""
def execute_code_blocks(self, code_blocks: List[CodeBlock]) -> CodeResult:
async def execute_code_blocks(self, code_blocks: List[CodeBlock]) -> CodeResult:
"""Execute code blocks and return the result.
This method should be implemented by the code executor.

View File

@ -1,17 +1,18 @@
# File based from: https://github.com/microsoft/autogen/blob/main/autogen/coding/local_commandline_code_executor.py
# Credit to original authors
import asyncio
import logging
import subprocess
import sys
import warnings
from hashlib import md5
from pathlib import Path
from string import Template
from typing import Any, Callable, ClassVar, List, Sequence, Union
from typing import Any, Callable, ClassVar, List, Sequence, Union, Optional
from typing_extensions import ParamSpec
from ....core import CancellationToken
from .._base import CodeBlock, CodeExecutor
from .._func_with_reqs import (
FunctionWithRequirements,
@ -21,6 +22,7 @@ from .._func_with_reqs import (
)
from .command_line_code_result import CommandLineCodeResult
from .utils import PYTHON_VARIANTS, get_file_name_from_content, lang_to_cmd, silence_pip # type: ignore
from ....core import CancellationToken
__all__ = ("LocalCommandLineCodeExecutor",)
@ -75,7 +77,7 @@ $functions"""
block.
Args:
timeout (int): The timeout for code execution. Default is 60.
timeout (int): The timeout for the execution of any single code block. Default is 60.
work_dir (str): The working directory for the code execution. If None,
a default working directory will be used. The default working
directory is the current directory ".".
@ -144,7 +146,7 @@ $functions"""
"""(Experimental) The working directory for the code execution."""
return self._work_dir
def _setup_functions(self) -> None:
async def _setup_functions(self, cancellation_token: Optional[CancellationToken]) -> None:
func_file_content = build_python_functions_file(self._functions)
func_file = self._work_dir / f"{self._functions_module}.py"
func_file.write_text(func_file_content)
@ -156,46 +158,61 @@ $functions"""
if len(required_packages) > 0:
logging.info("Ensuring packages are installed in executor.")
cmd = [sys.executable, "-m", "pip", "install"]
cmd.extend(required_packages)
cmd_args = ["-m", "pip", "install"]
cmd_args.extend(required_packages)
try:
result = subprocess.run(
cmd,
task = asyncio.create_task(
asyncio.create_subprocess_exec(
sys.executable,
*cmd_args,
cwd=self._work_dir,
capture_output=True,
text=True,
timeout=float(self._timeout),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
except subprocess.TimeoutExpired as e:
)
if cancellation_token:
cancellation_token.link_future(task)
try:
proc = await task
stdout, stderr = await asyncio.wait_for(proc.communicate(), self._timeout)
except asyncio.TimeoutError as e:
raise ValueError("Pip install timed out") from e
except asyncio.CancelledError as e:
raise ValueError("Pip install was cancelled") from e
if result.returncode != 0:
raise ValueError(f"Pip install failed. {result.stdout}, {result.stderr}")
if proc.returncode is not None and proc.returncode != 0:
raise ValueError(f"Pip install failed. {stdout.decode()}, {stderr.decode()}")
# Attempt to load the function file to check for syntax errors, imports etc.
exec_result = self._execute_code_dont_check_setup([CodeBlock(code=func_file_content, language="python")])
exec_result = await self._execute_code_dont_check_setup(
[CodeBlock(code=func_file_content, language="python")], cancellation_token
)
if exec_result.exit_code != 0:
raise ValueError(f"Functions failed to load: {exec_result.output}")
self._setup_functions_complete = True
def execute_code_blocks(self, code_blocks: List[CodeBlock]) -> CommandLineCodeResult:
async def execute_code_blocks(
self, code_blocks: List[CodeBlock], cancellation_token: Optional[CancellationToken] = None
) -> CommandLineCodeResult:
"""(Experimental) Execute the code blocks and return the result.
Args:
code_blocks (List[CodeBlock]): The code blocks to execute.
cancellation_token (CancellationToken|None): an optional token to cancel the operation
Returns:
CommandLineCodeResult: The result of the code execution."""
if not self._setup_functions_complete:
self._setup_functions()
await self._setup_functions(cancellation_token)
return self._execute_code_dont_check_setup(code_blocks)
return await self._execute_code_dont_check_setup(code_blocks, cancellation_token)
def _execute_code_dont_check_setup(self, code_blocks: List[CodeBlock]) -> CommandLineCodeResult:
async def _execute_code_dont_check_setup(
self, code_blocks: List[CodeBlock], cancellation_token: Optional[CancellationToken]
) -> CommandLineCodeResult:
logs_all: str = ""
file_names: List[Path] = []
exitcode = 0
@ -235,25 +252,38 @@ $functions"""
file_names.append(written_file)
program = sys.executable if lang.startswith("python") else lang_to_cmd(lang)
cmd = [program, str(written_file.absolute())]
try:
result = subprocess.run(
cmd,
# Wrap in a task to make it cancellable
task = asyncio.create_task(
asyncio.create_subprocess_exec(
program,
str(written_file.absolute()),
cwd=self._work_dir,
capture_output=True,
text=True,
timeout=float(self._timeout),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
except subprocess.TimeoutExpired:
)
if cancellation_token:
cancellation_token.link_future(task)
try:
proc = await task
stdout, stderr = await asyncio.wait_for(proc.communicate(), self._timeout)
exitcode = proc.returncode or 0
except asyncio.TimeoutError:
logs_all += "\n Timeout"
# Same exit code as the timeout command on linux.
exitcode = 124
break
except asyncio.CancelledError:
logs_all += "\n Cancelled"
# TODO: which exit code? 125 is Operation Canceled
exitcode = 125
break
logs_all += result.stderr
logs_all += result.stdout
exitcode = result.returncode
self._running_cmd_task = None
logs_all += stderr.decode()
logs_all += stdout.decode()
if exitcode != 0:
break

View File

@ -1,6 +1,3 @@
import asyncio
import functools
from pydantic import BaseModel, Field, model_serializer
from ...core import CancellationToken
@ -28,10 +25,6 @@ class PythonCodeExecutionTool(BaseTool[CodeExecutionInput, CodeExecutionResult])
async def run(self, args: CodeExecutionInput, cancellation_token: CancellationToken) -> CodeExecutionResult:
code_blocks = [CodeBlock(code=args.code, language="python")]
future = asyncio.get_event_loop().run_in_executor(
None, functools.partial(self._executor.execute_code_blocks, code_blocks=code_blocks)
)
cancellation_token.link_future(future)
result = await future
result = await self._executor.execute_code_blocks(code_blocks=code_blocks)
return CodeExecutionResult(success=result.exit_code == 0, output=result.output)

View File

@ -1,4 +1,3 @@
import asyncio
import re
from typing import List, Optional, Tuple, Union
@ -81,11 +80,7 @@ class Executor(BaseAgent):
code = self._extract_execution_request(message_content_to_str(message.content))
if code is not None:
execution_requests = [CodeBlock(code=code, language="python")]
future = asyncio.get_event_loop().run_in_executor(
None, self._executor.execute_code_blocks, execution_requests
)
cancellation_token.link_future(future)
result = await future
result = await self._executor.execute_code_blocks(execution_requests)
if result.output.strip() == "":
# Sometimes agents forget to print(). Remind the to print something

View File

@ -12,15 +12,15 @@ UNIX_SHELLS = ["bash", "sh", "shell"]
WINDOWS_SHELLS = ["ps1", "pwsh", "powershell"]
PYTHON_VARIANTS = ["python", "Python", "py"]
def test_execute_code() -> None:
@pytest.mark.asyncio
async def test_execute_code() -> None:
with tempfile.TemporaryDirectory() as temp_dir:
executor = LocalCommandLineCodeExecutor(work_dir=temp_dir)
# Test single code block.
code_blocks = [CodeBlock(code="import sys; print('hello world!')", language="python")]
code_result = executor.execute_code_blocks(code_blocks)
code_result = await executor.execute_code_blocks(code_blocks)
assert code_result.exit_code == 0 and "hello world!" in code_result.output and code_result.code_file is not None
# Test multiple code blocks.
@ -28,7 +28,7 @@ def test_execute_code() -> None:
CodeBlock(code="import sys; print('hello world!')", language="python"),
CodeBlock(code="a = 100 + 100; print(a)", language="python"),
]
code_result = executor.execute_code_blocks(code_blocks)
code_result = await executor.execute_code_blocks(code_blocks)
assert (
code_result.exit_code == 0
and "hello world!" in code_result.output
@ -39,13 +39,13 @@ def test_execute_code() -> None:
# Test bash script.
if sys.platform not in ["win32"]:
code_blocks = [CodeBlock(code="echo 'hello world!'", language="bash")]
code_result = executor.execute_code_blocks(code_blocks)
code_result = await executor.execute_code_blocks(code_blocks)
assert code_result.exit_code == 0 and "hello world!" in code_result.output and code_result.code_file is not None
# Test running code.
file_lines = ["import sys", "print('hello world!')", "a = 100 + 100", "print(a)"]
code_blocks = [CodeBlock(code="\n".join(file_lines), language="python")]
code_result = executor.execute_code_blocks(code_blocks)
code_result = await executor.execute_code_blocks(code_blocks)
assert (
code_result.exit_code == 0
and "hello world!" in code_result.output
@ -59,12 +59,12 @@ def test_execute_code() -> None:
for file_line, code_line in zip(file_lines, code_lines):
assert file_line.strip() == code_line.strip()
def test_commandline_code_executor_timeout() -> None:
@pytest.mark.asyncio
async def test_commandline_code_executor_timeout() -> None:
with tempfile.TemporaryDirectory() as temp_dir:
executor = LocalCommandLineCodeExecutor(timeout=1, work_dir=temp_dir)
code_blocks = [CodeBlock(code="import time; time.sleep(10); print('hello world!')", language="python")]
code_result = executor.execute_code_blocks(code_blocks)
code_result = await executor.execute_code_blocks(code_blocks)
assert code_result.exit_code and "Timeout" in code_result.output
@ -75,18 +75,18 @@ def test_local_commandline_code_executor_restart() -> None:
def test_invalid_relative_path() -> None:
@pytest.mark.asyncio
async def test_invalid_relative_path() -> None:
executor = LocalCommandLineCodeExecutor()
code = """# filename: /tmp/test.py
print("hello world")
"""
result = executor.execute_code_blocks([CodeBlock(code=code, language="python")])
result = await executor.execute_code_blocks([CodeBlock(code=code, language="python")])
assert result.exit_code == 1 and "Filename is not in the workspace" in result.output
def test_valid_relative_path() -> None:
@pytest.mark.asyncio
async def test_valid_relative_path() -> None:
with tempfile.TemporaryDirectory() as temp_dir_str:
temp_dir = Path(temp_dir_str)
executor = LocalCommandLineCodeExecutor(work_dir=temp_dir)
@ -94,7 +94,7 @@ def test_valid_relative_path() -> None:
print("hello world")
"""
result = executor.execute_code_blocks([CodeBlock(code=code, language="python")])
result = await executor.execute_code_blocks([CodeBlock(code=code, language="python")])
assert result.exit_code == 0
assert "hello world" in result.output
assert result.code_file is not None

View File

@ -47,7 +47,8 @@ def function_missing_reqs() -> "polars.DataFrame":
return polars.DataFrame()
def test_can_load_function_with_reqs() -> None:
@pytest.mark.asyncio
async def test_can_load_function_with_reqs() -> None:
with tempfile.TemporaryDirectory() as temp_dir:
executor = LocalCommandLineCodeExecutor(
work_dir=temp_dir, functions=[load_data]
@ -59,7 +60,7 @@ import polars
data = load_data()
print(data['name'][0])"""
result = executor.execute_code_blocks(
result = await executor.execute_code_blocks(
code_blocks=[
CodeBlock(language="python", code=code),
]
@ -68,7 +69,8 @@ print(data['name'][0])"""
assert result.exit_code == 0
def test_can_load_function() -> None:
@pytest.mark.asyncio
async def test_can_load_function() -> None:
with tempfile.TemporaryDirectory() as temp_dir:
executor = LocalCommandLineCodeExecutor(
work_dir=temp_dir, functions=[add_two_numbers]
@ -76,7 +78,7 @@ def test_can_load_function() -> None:
code = f"""from {executor.functions_module} import add_two_numbers
print(add_two_numbers(1, 2))"""
result = executor.execute_code_blocks(
result = await executor.execute_code_blocks(
code_blocks=[
CodeBlock(language="python", code=code),
]
@ -85,7 +87,8 @@ print(add_two_numbers(1, 2))"""
assert result.exit_code == 0
def test_fails_for_function_incorrect_import() -> None:
@pytest.mark.asyncio
async def test_fails_for_function_incorrect_import() -> None:
with tempfile.TemporaryDirectory() as temp_dir:
executor = LocalCommandLineCodeExecutor(
work_dir=temp_dir, functions=[function_incorrect_import]
@ -94,14 +97,15 @@ def test_fails_for_function_incorrect_import() -> None:
function_incorrect_import()"""
with pytest.raises(ValueError):
executor.execute_code_blocks(
await executor.execute_code_blocks(
code_blocks=[
CodeBlock(language="python", code=code),
]
)
def test_fails_for_function_incorrect_dep() -> None:
@pytest.mark.asyncio
async def test_fails_for_function_incorrect_dep() -> None:
with tempfile.TemporaryDirectory() as temp_dir:
executor = LocalCommandLineCodeExecutor(
work_dir=temp_dir, functions=[function_incorrect_dep]
@ -110,7 +114,7 @@ def test_fails_for_function_incorrect_dep() -> None:
function_incorrect_dep()"""
with pytest.raises(ValueError):
executor.execute_code_blocks(
await executor.execute_code_blocks(
code_blocks=[
CodeBlock(language="python", code=code),
]
@ -152,7 +156,8 @@ def add_two_numbers(a: int, b: int) -> int:
)
def test_can_load_str_function_with_reqs() -> None:
@pytest.mark.asyncio
async def test_can_load_str_function_with_reqs() -> None:
with tempfile.TemporaryDirectory() as temp_dir:
func = FunctionWithRequirements.from_str(
'''
@ -166,7 +171,7 @@ def add_two_numbers(a: int, b: int) -> int:
code = f"""from {executor.functions_module} import add_two_numbers
print(add_two_numbers(1, 2))"""
result = executor.execute_code_blocks(
result = await executor.execute_code_blocks(
code_blocks=[
CodeBlock(language="python", code=code),
]
@ -187,7 +192,8 @@ invaliddef add_two_numbers(a: int, b: int) -> int:
)
def test_cant_run_broken_str_function_with_reqs() -> None:
@pytest.mark.asyncio
async def test_cant_run_broken_str_function_with_reqs() -> None:
with tempfile.TemporaryDirectory() as temp_dir:
func = FunctionWithRequirements.from_str(
'''
@ -201,7 +207,7 @@ def add_two_numbers(a: int, b: int) -> int:
code = f"""from {executor.functions_module} import add_two_numbers
print(add_two_numbers(object(), False))"""
result = executor.execute_code_blocks(
result = await executor.execute_code_blocks(
code_blocks=[
CodeBlock(language="python", code=code),
]