Agentchat canvas (#6215)

<!-- Thank you for your contribution! Please review
https://microsoft.github.io/autogen/docs/Contribute before opening a
pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

This is an initial exploration of what could be a solution for #6214 .
It implements a simple text canvas using difflib and also a memory
component and a tool component for interacting with the canvas. Still in
early testing but would love feedback on the design.

## Related issue number

<!-- For example: "Closes #1234" -->

## Checks

- [ ] I've included any doc changes needed for
<https://microsoft.github.io/autogen/>. See
<https://github.com/microsoft/autogen/blob/main/CONTRIBUTING.md> to
build and test documentation locally.
- [ ] I've added tests (if relevant) corresponding to the changes
introduced in this PR.
- [ ] I've made sure all auto checks have passed.

---------

Co-authored-by: Leonardo Pinheiro <lpinheiro@microsoft.com>
Co-authored-by: Eric Zhu <ekzhu@users.noreply.github.com>
This commit is contained in:
Leonardo Pinheiro 2025-04-21 18:03:29 +10:00 committed by GitHub
parent 183dbb8dd1
commit 99aac24dd3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 672 additions and 0 deletions

View File

@ -62,6 +62,7 @@ python/autogen_ext.tools.graphrag
python/autogen_ext.tools.http
python/autogen_ext.tools.langchain
python/autogen_ext.tools.mcp
python/autogen_ext.memory.canvas
python/autogen_ext.tools.semantic_kernel
python/autogen_ext.code_executors.local
python/autogen_ext.code_executors.docker

View File

@ -0,0 +1,8 @@
autogen\_ext.memory.canvas
==========================
.. automodule:: autogen_ext.memory.canvas
:members:
:undoc-members:
:show-inheritance:

View File

@ -144,6 +144,9 @@ semantic-kernel-all = [
rich = ["rich>=13.9.4"]
mcp = ["mcp>=1.6.0"]
canvas = [
"unidiff>=0.7.5",
]
[tool.hatch.build.targets.wheel]
packages = ["src/autogen_ext"]

View File

@ -0,0 +1,4 @@
from ._text_canvas import TextCanvas
from ._text_canvas_memory import TextCanvasMemory
__all__ = ["TextCanvas", "TextCanvasMemory"]

View File

@ -0,0 +1,45 @@
from abc import ABC, abstractmethod
from typing import Any, Dict, Union
class BaseCanvas(ABC):
"""
An abstract protocol for "canvas" objects that maintain
revision history for file-like data. Concrete subclasses
can handle text, images, structured data, etc.
"""
@abstractmethod
def list_files(self) -> Dict[str, int]:
"""
Returns a dict of filename -> latest revision number.
"""
raise NotImplementedError
@abstractmethod
def get_latest_content(self, filename: str) -> Union[str, bytes, Any]:
"""
Returns the latest version of a file's content.
"""
raise NotImplementedError
@abstractmethod
def add_or_update_file(self, filename: str, new_content: Union[str, bytes, Any]) -> None:
"""
Creates or updates the file content with a new revision.
"""
raise NotImplementedError
@abstractmethod
def get_diff(self, filename: str, from_revision: int, to_revision: int) -> str:
"""
Returns a diff (in some format) between two revisions.
"""
raise NotImplementedError
@abstractmethod
def apply_patch(self, filename: str, patch_data: Union[str, bytes, Any]) -> None:
"""
Applies a patch/diff to the latest revision and increments the revision.
"""
raise NotImplementedError

View File

@ -0,0 +1,64 @@
from autogen_core import CancellationToken
from autogen_core.tools import BaseTool
from pydantic import BaseModel
from ._text_canvas import TextCanvas
class UpdateFileArgs(BaseModel):
filename: str
new_content: str
class UpdateFileResult(BaseModel):
status: str
class UpdateFileTool(BaseTool[UpdateFileArgs, UpdateFileResult]):
"""
Overwrites or creates a file in the canvas.
"""
def __init__(self, canvas: TextCanvas):
super().__init__(
args_type=UpdateFileArgs,
return_type=UpdateFileResult,
name="update_file",
description="Create/update a file on the canvas with the provided content.",
)
self._canvas = canvas
async def run(self, args: UpdateFileArgs, cancellation_token: CancellationToken) -> UpdateFileResult:
self._canvas.add_or_update_file(args.filename, args.new_content)
return UpdateFileResult(status="OK")
class ApplyPatchArgs(BaseModel):
filename: str
patch_text: str
class ApplyPatchResult(BaseModel):
status: str
class ApplyPatchTool(BaseTool[ApplyPatchArgs, ApplyPatchResult]):
"""
Applies a unified diff patch to the given file on the canvas.
"""
def __init__(self, canvas: TextCanvas):
super().__init__(
args_type=ApplyPatchArgs,
return_type=ApplyPatchResult,
name="apply_patch",
description=(
"Apply a unified diff patch to an existing file on the canvas. "
"The patch must be in diff/patch format. The file must exist or be created first."
),
)
self._canvas = canvas
async def run(self, args: ApplyPatchArgs, cancellation_token: CancellationToken) -> ApplyPatchResult:
self._canvas.apply_patch(args.filename, args.patch_text)
return ApplyPatchResult(status="PATCH APPLIED")

View File

@ -0,0 +1,188 @@
import difflib
from typing import Any, Dict, List, Union
try: # pragma: no cover
from unidiff import PatchSet
except ModuleNotFoundError: # pragma: no cover
PatchSet = None # type: ignore
from ._canvas import BaseCanvas
class FileRevision:
"""Tracks the history of one file's content."""
__slots__ = ("content", "revision")
def __init__(self, content: str, revision: int) -> None:
self.content: str = content
self.revision: int = revision # e.g. an integer, a timestamp, or git hash
class TextCanvas(BaseCanvas):
"""An inmemory canvas that stores *text* files with full revision history.
Besides the original CRUDlike operations, this enhanced implementation adds:
* **apply_patch** applies patches using the ``unidiff`` library for accurate
hunk application and context line validation.
* **get_revision_content** random access to any historical revision.
* **get_revision_diffs** obtain the list of diffs applied between every
consecutive pair of revisions so that a caller can replay or audit the
full change history.
"""
# ----------------------------------------------------------------------------------
# Construction helpers
# ----------------------------------------------------------------------------------
def __init__(self) -> None:
# For each file we keep an *ordered* list of FileRevision where the last
# element is the most recent. Using a list keeps the memory footprint
# small and preserves order without any extra bookkeeping.
self._files: Dict[str, List[FileRevision]] = {}
# ----------------------------------------------------------------------------------
# Internal utilities
# ----------------------------------------------------------------------------------
def _latest_idx(self, filename: str) -> int:
"""Return the index (not revision number) of the newest revision."""
return len(self._files.get(filename, [])) - 1
def _ensure_file(self, filename: str) -> None:
if filename not in self._files:
raise ValueError(f"File '{filename}' does not exist on the canvas; create it first.")
# ----------------------------------------------------------------------------------
# Revision inspection helpers
# ----------------------------------------------------------------------------------
def get_revision_content(self, filename: str, revision: int) -> str: # NEW 🚀
"""Return the exact content stored in *revision*.
If the revision does not exist an empty string is returned so that
downstream code can handle the "not found" case without exceptions.
"""
for rev in self._files.get(filename, []):
if rev.revision == revision:
return rev.content
return ""
def get_revision_diffs(self, filename: str) -> List[str]: # NEW 🚀
"""Return a *chronological* list of unifieddiffs for *filename*.
Each element in the returned list represents the diff that transformed
revision *n* into revision *n+1* (starting at revision 1 2).
"""
revisions = self._files.get(filename, [])
diffs: List[str] = []
for i in range(1, len(revisions)):
older, newer = revisions[i - 1], revisions[i]
diff = difflib.unified_diff(
older.content.splitlines(keepends=True),
newer.content.splitlines(keepends=True),
fromfile=f"{filename}@r{older.revision}",
tofile=f"{filename}@r{newer.revision}",
)
diffs.append("".join(diff))
return diffs
# ----------------------------------------------------------------------------------
# BaseCanvas interface implementation
# ----------------------------------------------------------------------------------
def list_files(self) -> Dict[str, int]:
"""Return a mapping of *filename → latest revision number*."""
return {fname: revs[-1].revision for fname, revs in self._files.items() if revs}
def get_latest_content(self, filename: str) -> str: # noqa: D401 keep API identical
"""Return the most recent content or an empty string if the file is new."""
revs = self._files.get(filename, [])
return revs[-1].content if revs else ""
def add_or_update_file(self, filename: str, new_content: Union[str, bytes, Any]) -> None:
"""Create *filename* or append a new revision containing *new_content*."""
if isinstance(new_content, bytes):
new_content = new_content.decode("utf-8")
if not isinstance(new_content, str):
raise ValueError(f"Expected str or bytes, got {type(new_content)}")
if filename not in self._files:
self._files[filename] = [FileRevision(new_content, 1)]
else:
last_rev_num = self._files[filename][-1].revision
self._files[filename].append(FileRevision(new_content, last_rev_num + 1))
def get_diff(self, filename: str, from_revision: int, to_revision: int) -> str:
"""Return a unified diff between *from_revision* and *to_revision*."""
revisions = self._files.get(filename, [])
if not revisions:
return ""
# Fetch the contents for the requested revisions.
from_content = self.get_revision_content(filename, from_revision)
to_content = self.get_revision_content(filename, to_revision)
if from_content == "" and to_content == "": # one (or both) revision ids not found
return ""
diff = difflib.unified_diff(
from_content.splitlines(keepends=True),
to_content.splitlines(keepends=True),
fromfile=f"{filename}@r{from_revision}",
tofile=f"{filename}@r{to_revision}",
)
return "".join(diff)
def apply_patch(self, filename: str, patch_data: Union[str, bytes, Any]) -> None:
"""Apply *patch_text* (unified diff) to the latest revision and save a new revision.
Uses the *unidiff* library to accurately apply hunks and validate context lines.
"""
if isinstance(patch_data, bytes):
patch_data = patch_data.decode("utf-8")
if not isinstance(patch_data, str):
raise ValueError(f"Expected str or bytes, got {type(patch_data)}")
self._ensure_file(filename)
original_content = self.get_latest_content(filename)
if PatchSet is None:
raise ImportError(
"The 'unidiff' package is required for patch application. Install with 'pip install unidiff'."
)
patch = PatchSet(patch_data)
# Our canvas stores exactly one file per patch operation so we
# use the first (and only) patched_file object.
if not patch:
raise ValueError("Empty patch text provided.")
patched_file = patch[0]
working_lines = original_content.splitlines(keepends=True)
line_offset = 0
for hunk in patched_file:
# Calculate the slice boundaries in the *current* working copy.
start = hunk.source_start - 1 + line_offset
end = start + hunk.source_length
# Build the replacement block for this hunk.
replacement: List[str] = []
for line in hunk:
if line.is_added or line.is_context:
replacement.append(line.value)
# removed lines (line.is_removed) are *not* added.
# Replace the slice with the hunkresult.
working_lines[start:end] = replacement
line_offset += len(replacement) - (end - start)
new_content = "".join(working_lines)
# Finally commit the new revision.
self.add_or_update_file(filename, new_content)
# ----------------------------------------------------------------------------------
# Convenience helpers
# ----------------------------------------------------------------------------------
def get_all_contents_for_context(self) -> str: # noqa: D401 keep public API stable
"""Return a summarised view of every file and its *latest* revision."""
out: List[str] = ["=== CANVAS FILES ==="]
for fname, revs in self._files.items():
latest = revs[-1]
out.append(f"File: {fname} (rev {latest.revision}):\n{latest.content}\n")
out.append("=== END OF CANVAS ===")
return "\n".join(out)

View File

@ -0,0 +1,225 @@
from typing import Any, Optional
from autogen_core import CancellationToken
from autogen_core.memory import (
Memory,
MemoryContent,
MemoryMimeType,
MemoryQueryResult,
UpdateContextResult,
)
from autogen_core.model_context import ChatCompletionContext
from autogen_core.models import SystemMessage
from ._canvas_writer import ApplyPatchTool, UpdateFileTool
from ._text_canvas import TextCanvas
class TextCanvasMemory(Memory):
"""
A memory implementation that uses a Canvas for storing file-like content.
Inserts the current state of the canvas into the ChatCompletionContext on each turn.
The TextCanvasMemory provides a persistent, file-like storage mechanism that can be used
by agents to read and write content. It automatically injects the current state of all files
in the canvas into the model context before each inference.
This is particularly useful for:
- Allowing agents to create and modify documents over multiple turns
- Enabling collaborative document editing between multiple agents
- Maintaining persistent state across conversation turns
- Working with content too large to fit in a single message
The canvas provides tools for:
- Creating or updating files with new content
- Applying patches (unified diff format) to existing files
Examples:
**Example: Using TextCanvasMemory with an AssistantAgent**
The following example demonstrates how to create a TextCanvasMemory and use it with
an AssistantAgent to write and update a story file.
.. code-block:: python
import asyncio
from autogen_core import CancellationToken
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.messages import TextMessage
from autogen_ext.memory.canvas import TextCanvasMemory
async def main():
# Create a model client
model_client = OpenAIChatCompletionClient(
model="gpt-4o",
# api_key = "your_openai_api_key"
)
# Create the canvas memory
text_canvas_memory = TextCanvasMemory()
# Get tools for working with the canvas
update_file_tool = text_canvas_memory.get_update_file_tool()
apply_patch_tool = text_canvas_memory.get_apply_patch_tool()
# Create an agent with the canvas memory and tools
writer_agent = AssistantAgent(
name="Writer",
model_client=model_client,
description="A writer agent that creates and updates stories.",
system_message='''
You are a Writer Agent. Your focus is to generate a story based on the user's request.
Instructions for using the canvas:
- The story should be stored on the canvas in a file named "story.md".
- If "story.md" does not exist, create it by calling the 'update_file' tool.
- If "story.md" already exists, generate a unified diff (patch) from the current
content to the new version, and call the 'apply_patch' tool to apply the changes.
IMPORTANT: Do not include the full story text in your chat messages.
Only write the story content to the canvas using the tools.
''',
tools=[update_file_tool, apply_patch_tool],
memory=[text_canvas_memory],
)
# Send a message to the agent
await writer_agent.on_messages(
[TextMessage(content="Write a short story about a bunny and a sunflower.", source="user")],
CancellationToken(),
)
# Retrieve the content from the canvas
story_content = text_canvas_memory.canvas.get_latest_content("story.md")
print("Story content from canvas:")
print(story_content)
if __name__ == "__main__":
asyncio.run(main())
**Example: Using TextCanvasMemory with multiple agents**
The following example shows how to use TextCanvasMemory with multiple agents
collaborating on the same document.
.. code-block:: python
import asyncio
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import TextMentionTermination
from autogen_ext.memory.canvas import TextCanvasMemory
async def main():
# Create a model client
model_client = OpenAIChatCompletionClient(
model="gpt-4o",
# api_key = "your_openai_api_key"
)
# Create the shared canvas memory
text_canvas_memory = TextCanvasMemory()
update_file_tool = text_canvas_memory.get_update_file_tool()
apply_patch_tool = text_canvas_memory.get_apply_patch_tool()
# Create a writer agent
writer_agent = AssistantAgent(
name="Writer",
model_client=model_client,
description="A writer agent that creates stories.",
system_message="You write children's stories on the canvas in story.md.",
tools=[update_file_tool, apply_patch_tool],
memory=[text_canvas_memory],
)
# Create a critique agent
critique_agent = AssistantAgent(
name="Critique",
model_client=model_client,
description="A critique agent that provides feedback on stories.",
system_message="You review the story.md file and provide constructive feedback.",
memory=[text_canvas_memory],
)
# Create a team with both agents
team = RoundRobinGroupChat(
participants=[writer_agent, critique_agent],
termination_condition=TextMentionTermination("TERMINATE"),
max_turns=10,
)
# Run the team on a task
await team.run(task="Create a children's book about a bunny and a sunflower")
# Get the final story
story = text_canvas_memory.canvas.get_latest_content("story.md")
print(story)
if __name__ == "__main__":
asyncio.run(main())
"""
def __init__(self, canvas: Optional[TextCanvas] = None):
super().__init__()
self.canvas = canvas if canvas is not None else TextCanvas()
async def update_context(self, model_context: ChatCompletionContext) -> UpdateContextResult:
"""
Inject the entire canvas summary (or a selected subset) as reference data.
Here, we just put it into a system message, but you could customize.
"""
snapshot = self.canvas.get_all_contents_for_context()
if snapshot.strip():
msg = SystemMessage(content=snapshot)
await model_context.add_message(msg)
# Return it for debugging/logging
memory_content = MemoryContent(content=snapshot, mime_type=MemoryMimeType.TEXT)
return UpdateContextResult(memories=MemoryQueryResult(results=[memory_content]))
return UpdateContextResult(memories=MemoryQueryResult(results=[]))
async def query(
self, query: str | MemoryContent, cancellation_token: Optional[CancellationToken] = None, **kwargs: Any
) -> MemoryQueryResult:
"""
Potentially search for matching filenames or file content.
This example returns empty.
"""
return MemoryQueryResult(results=[])
async def add(self, content: MemoryContent, cancellation_token: Optional[CancellationToken] = None) -> None:
"""
Example usage: Possibly interpret content as a patch or direct file update.
Could also be done by a specialized "CanvasTool" instead.
"""
# NO-OP here, leaving actual changes to the CanvasTool
pass
async def clear(self) -> None:
"""Clear the entire canvas by replacing it with a new empty instance."""
# Create a new TextCanvas instance instead of calling __init__ directly
self.canvas = TextCanvas()
async def close(self) -> None:
pass
def get_update_file_tool(self) -> UpdateFileTool:
"""
Returns an UpdateFileTool instance that works with this memory's canvas.
"""
return UpdateFileTool(self.canvas)
def get_apply_patch_tool(self) -> ApplyPatchTool:
"""
Returns an ApplyPatchTool instance that works with this memory's canvas.
"""
return ApplyPatchTool(self.canvas)

View File

@ -0,0 +1,121 @@
import difflib
import pytest
from autogen_core import CancellationToken
from autogen_core.model_context import UnboundedChatCompletionContext
from autogen_ext.memory.canvas import TextCanvasMemory
from autogen_ext.memory.canvas._canvas_writer import (
ApplyPatchArgs,
UpdateFileArgs,
)
# ── Fixtures ─────────────────────────────────────────────────────────────────────
@pytest.fixture()
def story_v1() -> str:
# Extracted (slightly trimmed) from the sample output
return (
"# The Bunny and the Sunflower\n\n"
"## Beginning\n"
"Once upon a time, in a bright and cheerful meadow, Bella the bunny came "
"across **a beautiful sunflower** waving in the sunshine.\n"
)
@pytest.fixture()
def story_v2(story_v1: str) -> str:
# A small edit: give the sunflower a name (mirrors the first patch in the log)
return story_v1.replace(
"a beautiful sunflower",
"a beautiful sunflower named Sunny",
)
@pytest.fixture()
def memory() -> TextCanvasMemory:
return TextCanvasMemory()
# ── Tests ────────────────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_canvas_initial_state(memory: TextCanvasMemory) -> None:
assert memory.canvas.list_files() == {}
snapshot = memory.canvas.get_all_contents_for_context()
assert snapshot.startswith("=== CANVAS FILES ===")
@pytest.mark.asyncio
async def test_update_file_tool_creates_file(
memory: TextCanvasMemory,
story_v1: str,
) -> None:
update_tool = memory.get_update_file_tool()
await update_tool.run(
UpdateFileArgs(filename="story.md", new_content=story_v1),
CancellationToken(),
)
assert memory.canvas.get_latest_content("story.md") == story_v1
assert memory.canvas.list_files()["story.md"] == 1
@pytest.mark.asyncio
async def test_apply_patch_increments_revision(
memory: TextCanvasMemory,
story_v1: str,
story_v2: str,
) -> None:
# Set up revision 1
await memory.get_update_file_tool().run(
UpdateFileArgs(filename="story.md", new_content=story_v1),
CancellationToken(),
)
# Create a unified diff for the patch tool
diff_text = "".join(
difflib.unified_diff(
story_v1.splitlines(keepends=True),
story_v2.splitlines(keepends=True),
fromfile="story.md",
tofile="story.md",
)
)
# Apply the patch → revision 2
await memory.get_apply_patch_tool().run(
ApplyPatchArgs(filename="story.md", patch_text=diff_text),
CancellationToken(),
)
assert memory.canvas.get_latest_content("story.md") == story_v2
# The revision number should now be 2
assert memory.canvas.list_files()["story.md"] == 2
# And the diff history should contain exactly one patch
assert len(memory.canvas.get_revision_diffs("story.md")) == 1
@pytest.mark.asyncio
async def test_update_context_injects_snapshot(
memory: TextCanvasMemory,
story_v2: str,
) -> None:
# Seed with some content
await memory.get_update_file_tool().run(
UpdateFileArgs(filename="story.md", new_content=story_v2),
CancellationToken(),
)
chat_ctx = UnboundedChatCompletionContext()
result = await memory.update_context(chat_ctx)
# A single SystemMessage should have been added to the context
assert len(chat_ctx._messages) == 1 # type: ignore
injected_text = chat_ctx._messages[0].content # type: ignore
assert "=== CANVAS FILES ===" in injected_text
assert "story.md" in injected_text
# The UpdateContextResult should surface the same snapshot via MemoryContent
assert result.memories.results
assert isinstance(result.memories.results[0].content, str)
assert story_v2.strip() in result.memories.results[0].content

13
python/uv.lock generated
View File

@ -601,6 +601,9 @@ azure = [
{ name = "azure-identity" },
{ name = "azure-search-documents" },
]
canvas = [
{ name = "unidiff" },
]
chromadb = [
{ name = "chromadb" },
]
@ -792,6 +795,7 @@ requires-dist = [
{ name = "semantic-kernel", extras = ["pandas"], marker = "extra == 'semantic-kernel-pandas'", specifier = ">=1.17.1" },
{ name = "tiktoken", marker = "extra == 'ollama'", specifier = ">=0.8.0" },
{ name = "tiktoken", marker = "extra == 'openai'", specifier = ">=0.8.0" },
{ name = "unidiff", marker = "extra == 'canvas'", specifier = ">=0.7.5" },
{ name = "websockets", marker = "extra == 'docker-jupyter-executor'", specifier = ">=15.0.1" },
]
@ -7966,6 +7970,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/3c/8f/671c0e1f2572ba625cbcc1faeba9435e00330c3d6962858711445cf1e817/umap_learn-0.5.7-py3-none-any.whl", hash = "sha256:6a7e0be2facfa365a5ed6588447102bdbef32a0ef449535c25c97ea7e680073c", size = 88815 },
]
[[package]]
name = "unidiff"
version = "0.7.5"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/a3/48/81be0ac96e423a877754153699731ef439fd7b80b4c8b5425c94ed079ebd/unidiff-0.7.5.tar.gz", hash = "sha256:2e5f0162052248946b9f0970a40e9e124236bf86c82b70821143a6fc1dea2574", size = 20931 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/8a/54/57c411a6e8f7bd7848c8b66e4dcaffa586bf4c02e63f2280db0327a4e6eb/unidiff-0.7.5-py2.py3-none-any.whl", hash = "sha256:c93bf2265cc1ba2a520e415ab05da587370bc2a3ae9e0414329f54f0c2fc09e8", size = 14386 },
]
[[package]]
name = "uptrace"
version = "1.29.0"