Add self-debugging loop to CodeExecutionAgent (#6306)

## Why are these changes needed?
This PR introduces a baseline self-debugging loop to the
`CodeExecutionAgent`.

The loop automatically retries code generation and execution up to a
configurable number of attempts (n) until the execution succeeds or the
retry limit is reached.

This enables the agent to recover from transient failures (e.g., syntax
errors, runtime errors) by using its own reasoning to iteratively
improve generated code—laying the foundation for more robust autonomous
behavior.

## Related issue number

Closes #6207

## Checks

- [x] I've included any doc changes needed for
<https://microsoft.github.io/autogen/>. See
<https://github.com/microsoft/autogen/blob/main/CONTRIBUTING.md> to
build and test documentation locally.
- [x] I've added tests (if relevant) corresponding to the changes
introduced in this PR.
- [x] I've made sure all auto checks have passed.

---------

Signed-off-by: Abhijeetsingh Meena <abhijeet040403@gmail.com>
Co-authored-by: Eric Zhu <ekzhu@users.noreply.github.com>
This commit is contained in:
Abhijeetsingh Meena 2025-04-22 21:54:05 +05:30 committed by GitHub
parent b3f37319e3
commit aad6caa768
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 281 additions and 102 deletions

View File

@ -9,8 +9,7 @@ from typing import (
)
from autogen_core import CancellationToken, Component, ComponentModel
from autogen_core.code_executor import CodeBlock, CodeExecutor
from autogen_core.memory import Memory
from autogen_core.code_executor import CodeBlock, CodeExecutor, CodeResult
from autogen_core.model_context import (
ChatCompletionContext,
UnboundedChatCompletionContext,
@ -34,7 +33,6 @@ from ..messages import (
CodeExecutionEvent,
CodeGenerationEvent,
HandoffMessage,
MemoryQueryEvent,
ModelClientStreamingChunkEvent,
TextMessage,
ThoughtEvent,
@ -58,6 +56,11 @@ class CodeExecutorAgentConfig(BaseModel):
model_context: ComponentModel | None = None
class RetryDecision(BaseModel):
reason: str
retry: bool
class CodeExecutorAgent(BaseChatAgent, Component[CodeExecutorAgentConfig]):
"""(Experimental) An agent that generates and executes code snippets based on user instructions.
@ -91,6 +94,8 @@ class CodeExecutorAgent(BaseChatAgent, Component[CodeExecutorAgentConfig]):
(:py:class:`~autogen_ext.code_executors.docker.DockerCommandLineCodeExecutor` recommended. See example below)
model_client (ChatCompletionClient, optional): The model client to use for inference and generating code.
If not provided, the agent will only execute code blocks found in input messages.
Currently, the model must support structured output mode, which is required for
the automatic retry mechanism to work.
model_client_stream (bool, optional): If `True`, the model client will be used in streaming mode.
:meth:`on_messages_stream` and :meth:`BaseChatAgent.run_stream` methods will
also yield :class:`~autogen_agentchat.messages.ModelClientStreamingChunkEvent`
@ -103,6 +108,8 @@ class CodeExecutorAgent(BaseChatAgent, Component[CodeExecutorAgentConfig]):
This is useful when the agent is part of a group chat and you want to limit the code execution to messages from specific agents.
If not provided, all messages will be checked for code blocks.
This is only used if `model_client` is not provided.
max_retries_on_error (int, optional): The maximum number of retries on error. If the code execution fails, the agent will retry up to this number of times.
If the code execution fails after this number of retries, the agent will yield a reflection result.
.. note::
@ -334,6 +341,7 @@ class CodeExecutorAgent(BaseChatAgent, Component[CodeExecutorAgentConfig]):
model_client: ChatCompletionClient | None = None,
model_context: ChatCompletionContext | None = None,
model_client_stream: bool = False,
max_retries_on_error: int = 0,
description: str | None = None,
system_message: str | None = DEFAULT_SYSTEM_MESSAGE,
sources: Sequence[str] | None = None,
@ -348,6 +356,7 @@ class CodeExecutorAgent(BaseChatAgent, Component[CodeExecutorAgentConfig]):
self._code_executor = code_executor
self._sources = sources
self._model_client_stream = model_client_stream
self._max_retries_on_error = max_retries_on_error
self._model_client = None
if model_client is not None:
@ -364,6 +373,12 @@ class CodeExecutorAgent(BaseChatAgent, Component[CodeExecutorAgentConfig]):
else:
self._system_messages = [SystemMessage(content=system_message)]
if self._max_retries_on_error > 0:
if not self._model_client or not self._model_client.model_info:
raise ValueError("model_client.model_info must be provided when max_retries_on_error > 0")
if not self._model_client.model_info["structured_output"]:
raise ValueError("Specified model_client doesn't support structured output mode.")
@property
def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:
"""The types of messages that the code executor agent produces."""
@ -395,8 +410,9 @@ class CodeExecutorAgent(BaseChatAgent, Component[CodeExecutorAgentConfig]):
system_messages = self._system_messages
model_client = self._model_client
model_client_stream = self._model_client_stream
max_retries_on_error = self._max_retries_on_error
execution_result: CodeExecutionEvent | None = None
execution_result: CodeResult | None = None
if model_client is None: # default behaviour for backward compatibility
# execute generated code if present
code_blocks: List[CodeBlock] = await self.extract_code_blocks_from_messages(messages)
@ -409,93 +425,130 @@ class CodeExecutorAgent(BaseChatAgent, Component[CodeExecutorAgentConfig]):
)
return
execution_result = await self.execute_code_block(code_blocks, cancellation_token)
yield Response(chat_message=TextMessage(content=execution_result.to_text(), source=execution_result.source))
yield Response(chat_message=TextMessage(content=execution_result.output, source=self.name))
return
# STEP 1: Add new user/handoff messages to the model context
await self._add_messages_to_context(
model_context=model_context,
messages=messages,
)
# STEP 2: Update model context with any relevant memory
inner_messages: List[BaseAgentEvent | BaseChatMessage] = []
for event_msg in await self._update_model_context_with_memory(
memory=None,
model_context=model_context,
agent_name=agent_name,
):
inner_messages.append(event_msg)
yield event_msg
# STEP 3: Run the first inference
model_result = None
async for inference_output in self._call_llm(
model_client=model_client,
model_client_stream=model_client_stream,
system_messages=system_messages,
model_context=model_context,
agent_name=agent_name,
cancellation_token=cancellation_token,
):
if isinstance(inference_output, CreateResult):
model_result = inference_output
else:
# Streaming chunk event
yield inference_output
assert model_result is not None, "No model result was produced."
# --- NEW: If the model produced a hidden "thought," yield it as an event ---
if model_result.thought:
thought_event = ThoughtEvent(content=model_result.thought, source=agent_name)
yield thought_event
inner_messages.append(thought_event)
# Add the assistant message to the model context (including thought if present)
await model_context.add_message(
AssistantMessage(
content=model_result.content,
source=agent_name,
thought=getattr(model_result, "thought", None),
for nth_try in range(max_retries_on_error + 1): # Do one default generation, execution and inference loop
# Step 1: Add new user/handoff messages to the model context
await self._add_messages_to_context(
model_context=model_context,
messages=messages,
)
)
code_blocks = self._extract_markdown_code_blocks(str(model_result.content))
# Step 2: Run inference with the model context
model_result = None
async for inference_output in self._call_llm(
model_client=model_client,
model_client_stream=model_client_stream,
system_messages=system_messages,
model_context=model_context,
agent_name=agent_name,
cancellation_token=cancellation_token,
):
if isinstance(inference_output, CreateResult):
model_result = inference_output
else:
# Streaming chunk event
yield inference_output
if not code_blocks:
yield Response(
chat_message=TextMessage(
content=str(model_result.content),
assert model_result is not None, "No model result was produced."
# Step 3: [NEW] If the model produced a hidden "thought," yield it as an event
if model_result.thought:
thought_event = ThoughtEvent(content=model_result.thought, source=agent_name)
yield thought_event
inner_messages.append(thought_event)
# Step 4: Add the assistant message to the model context (including thought if present)
await model_context.add_message(
AssistantMessage(
content=model_result.content,
source=agent_name,
thought=getattr(model_result, "thought", None),
)
)
# Step 5: Extract the code blocks from inferred text
assert isinstance(model_result.content, str), "Expected inferred model_result.content to be of type str."
code_blocks = self._extract_markdown_code_blocks(str(model_result.content))
# Step 6: Exit the loop if no code blocks found
if not code_blocks:
yield Response(
chat_message=TextMessage(
content=str(model_result.content),
source=agent_name,
)
)
return
# Step 7: Yield a CodeGenerationEvent
inferred_text_message: CodeGenerationEvent = CodeGenerationEvent(
retry_attempt=nth_try,
content=model_result.content,
code_blocks=code_blocks,
source=agent_name,
)
yield inferred_text_message
# Step 8: Execute the extracted code blocks
execution_result = await self.execute_code_block(inferred_text_message.code_blocks, cancellation_token)
# Step 9: Update model context with the code execution result
await model_context.add_message(
UserMessage(
content=execution_result.output,
source=agent_name,
)
)
return
# NOTE: error: Argument of type "str | List[FunctionCall]" cannot be assigned to parameter "content" of type "str" in function "__init__".
# For now we can assume that there are no FunctionCalls in the response because we are not providing tools to the CodeExecutorAgent.
# So, for now we cast model_result.content to string
inferred_text_message: CodeGenerationEvent = CodeGenerationEvent(
content=str(model_result.content),
code_blocks=code_blocks,
source=agent_name,
)
# Step 10: Yield a CodeExecutionEvent
yield CodeExecutionEvent(retry_attempt=nth_try, result=execution_result, source=self.name)
yield inferred_text_message
# If execution was successful or last retry, then exit
if execution_result.exit_code == 0 or nth_try == max_retries_on_error:
break
execution_result = await self.execute_code_block(inferred_text_message.code_blocks, cancellation_token)
# Step 11: If exit code is non-zero and retries are available then
# make an inference asking if we should retry or not
chat_context = await model_context.get_messages()
# Add the code execution result to the model context
await model_context.add_message(
UserMessage(
content=execution_result.result.output,
retry_prompt = (
f"The most recent code execution resulted in an error:\n{execution_result.output}\n\n"
"Should we attempt to resolve it? Please respond with:\n"
"- A boolean value for 'retry' indicating whether it should be retried.\n"
"- A detailed explanation in 'reason' that identifies the issue, justifies your decision to retry or not, and outlines how you would resolve the error if a retry is attempted."
)
chat_context = chat_context + [
UserMessage(
content=retry_prompt,
source=agent_name,
)
]
response = await model_client.create(messages=chat_context, json_output=RetryDecision)
assert isinstance(
response.content, str
), "Expected structured response for retry decision to be of type str."
should_retry_generation = RetryDecision.model_validate_json(str(response.content))
# Exit if no-retry is needed
if not should_retry_generation.retry:
break
yield CodeGenerationEvent(
retry_attempt=nth_try,
content=f"Attempt number: {nth_try + 1}\nProposed correction: {should_retry_generation.reason}",
code_blocks=[],
source=agent_name,
)
)
yield execution_result
# always reflect on the execution result
# Always reflect on the execution result
async for reflection_response in CodeExecutorAgent._reflect_on_code_block_results_flow(
system_messages=system_messages,
model_client=model_client,
@ -504,7 +557,7 @@ class CodeExecutorAgent(BaseChatAgent, Component[CodeExecutorAgentConfig]):
agent_name=agent_name,
inner_messages=inner_messages,
):
yield reflection_response # last reflection_response is of type Response so it will finish the routine
yield reflection_response # Last reflection_response is of type Response so it will finish the routine
async def extract_code_blocks_from_messages(self, messages: Sequence[BaseChatMessage]) -> List[CodeBlock]:
# Extract code blocks from the messages.
@ -518,7 +571,7 @@ class CodeExecutorAgent(BaseChatAgent, Component[CodeExecutorAgentConfig]):
async def execute_code_block(
self, code_blocks: List[CodeBlock], cancellation_token: CancellationToken
) -> CodeExecutionEvent:
) -> CodeResult:
# Execute the code blocks.
result = await self._code_executor.execute_code_blocks(code_blocks, cancellation_token=cancellation_token)
@ -529,7 +582,7 @@ class CodeExecutorAgent(BaseChatAgent, Component[CodeExecutorAgentConfig]):
# Error
result.output = f"The script ran, then exited with an error (POSIX exit code: {result.exit_code})\nIts output was:\n{result.output}"
return CodeExecutionEvent(result=result, source=self.name)
return result
async def on_reset(self, cancellation_token: CancellationToken) -> None:
"""Its a no-op as the code executor agent has no mutable state."""
@ -618,27 +671,6 @@ class CodeExecutorAgent(BaseChatAgent, Component[CodeExecutorAgentConfig]):
model_result = await model_client.create(llm_messages, tools=[], cancellation_token=cancellation_token)
yield model_result
@staticmethod
async def _update_model_context_with_memory(
memory: Optional[Sequence[Memory]],
model_context: ChatCompletionContext,
agent_name: str,
) -> List[MemoryQueryEvent]:
"""
If memory modules are present, update the model context and return the events produced.
"""
events: List[MemoryQueryEvent] = []
if memory:
for mem in memory:
update_context_result = await mem.update_context(model_context)
if update_context_result and len(update_context_result.memories.results) > 0:
memory_query_event_msg = MemoryQueryEvent(
content=update_context_result.memories.results,
source=agent_name,
)
events.append(memory_query_event_msg)
return events
@staticmethod
async def _add_messages_to_context(
model_context: ChatCompletionContext,

View File

@ -433,22 +433,33 @@ class ToolCallRequestEvent(BaseAgentEvent):
class CodeGenerationEvent(BaseAgentEvent):
"""An event signaling code generation for execution."""
"""An event signaling code generation event."""
retry_attempt: int
"Retry number, 0 means first generation"
content: str
"The complete content as string."
type: Literal["CodeGenerationEvent"] = "CodeGenerationEvent"
code_blocks: List[CodeBlock]
"List of code blocks present in content"
type: Literal["CodeGenerationEvent"] = "CodeGenerationEvent"
def to_text(self) -> str:
return self.content
class CodeExecutionEvent(BaseAgentEvent):
type: Literal["CodeExecutionEvent"] = "CodeExecutionEvent"
"""An event signaling code execution event."""
retry_attempt: int
"Retry number, 0 means first execution"
result: CodeResult
"Code Execution Result"
type: Literal["CodeExecutionEvent"] = "CodeExecutionEvent"
def to_text(self) -> str:
return self.result.output

View File

@ -7,6 +7,7 @@ from autogen_agentchat.messages import (
TextMessage,
)
from autogen_core import CancellationToken
from autogen_core.models import ModelFamily, ModelInfo
from autogen_ext.code_executors.local import LocalCommandLineCodeExecutor
from autogen_ext.models.replay import ReplayChatCompletionClient
@ -131,6 +132,141 @@ async def test_no_code_response_with_model_client() -> None:
assert response is not None, "Response was not received"
@pytest.mark.asyncio
async def test_self_debugging_loop() -> None:
"""
Tests self debugging loop when the model client responds with incorrect code.
"""
language = "python"
incorrect_code_block = """
numbers = [10, 20, 30, 40, 50]
mean = sum(numbers) / len(numbers
print("The mean is:", mean)
""".strip()
incorrect_code_result = """
mean = sum(numbers) / len(numbers
^
SyntaxError: '(' was never closed
""".strip()
correct_code_block = """
numbers = [10, 20, 30, 40, 50]
mean = sum(numbers) / len(numbers)
print("The mean is:", mean)
""".strip()
correct_code_result = """
The mean is: 30.0
""".strip()
model_client = ReplayChatCompletionClient(
[
f"""
Here is the code to calculate the mean of 10, 20, 30, 40, 50
```{language}
{incorrect_code_block}
```
""",
"""{"retry": "true", "reason": "Retry 1: It is a test environment"}""",
f"""
Here is the updated code to calculate the mean of 10, 20, 30, 40, 50
```{language}
{correct_code_block}
```""",
"Final Response",
"TERMINATE",
],
model_info=ModelInfo(
vision=False,
function_calling=False,
json_output=True,
family=ModelFamily.UNKNOWN,
structured_output=True,
),
)
agent = CodeExecutorAgent(
name="code_executor_agent",
code_executor=LocalCommandLineCodeExecutor(),
model_client=model_client,
max_retries_on_error=1,
)
messages = [
TextMessage(
content="Calculate the mean of 10, 20, 30, 40, 50.",
source="assistant",
)
]
incorrect_code_generation_event: CodeGenerationEvent | None = None
correct_code_generation_event: CodeGenerationEvent | None = None
retry_decision_event: CodeGenerationEvent | None = None
incorrect_code_execution_event: CodeExecutionEvent | None = None
correct_code_execution_event: CodeExecutionEvent | None = None
response: Response | None = None
message_id: int = 0
async for message in agent.on_messages_stream(messages, CancellationToken()):
if isinstance(message, CodeGenerationEvent) and message_id == 0:
# Step 1: First code generation
code_block = message.code_blocks[0]
assert code_block.code.strip() == incorrect_code_block, "Incorrect code block does not match"
assert code_block.language == language, "Language does not match"
incorrect_code_generation_event = message
elif isinstance(message, CodeExecutionEvent) and message_id == 1:
# Step 2: First code execution
assert (
incorrect_code_result in message.to_text().strip()
), f"Expected {incorrect_code_result} in execution result, got: {message.to_text().strip()}"
incorrect_code_execution_event = message
elif isinstance(message, CodeGenerationEvent) and message_id == 2:
# Step 3: Retry generation with proposed correction
retry_response = "Attempt number: 1\nProposed correction: Retry 1: It is a test environment"
assert (
message.to_text().strip() == retry_response
), f"Expected {retry_response}, got: {message.to_text().strip()}"
retry_decision_event = message
elif isinstance(message, CodeGenerationEvent) and message_id == 3:
# Step 4: Second retry code generation
code_block = message.code_blocks[0]
assert code_block.code.strip() == correct_code_block, "Correct code block does not match"
assert code_block.language == language, "Language does not match"
correct_code_generation_event = message
elif isinstance(message, CodeExecutionEvent) and message_id == 4:
# Step 5: Second retry code execution
assert (
message.to_text().strip() == correct_code_result
), f"Expected {correct_code_result} in execution result, got: {message.to_text().strip()}"
correct_code_execution_event = message
elif isinstance(message, Response) and message_id == 5:
# Step 6: Final response
assert isinstance(
message.chat_message, TextMessage
), f"Expected TextMessage, got: {type(message.chat_message)}"
assert (
message.chat_message.source == "code_executor_agent"
), f"Expected source 'code_executor_agent', got: {message.chat_message.source}"
response = message
else:
raise AssertionError(f"Unexpected message type: {type(message)}")
message_id += 1
assert incorrect_code_generation_event is not None, "Incorrect code generation event was not received"
assert incorrect_code_execution_event is not None, "Incorrect code execution event was not received"
assert retry_decision_event is not None, "Retry decision event was not received"
assert correct_code_generation_event is not None, "Correct code generation event was not received"
assert correct_code_execution_event is not None, "Correct code execution event was not received"
assert response is not None, "Response was not received"
@pytest.mark.asyncio
async def test_code_execution_error() -> None:
"""Test basic code execution"""