Victor Dibia 05fc763b8a
add anthropic native support (#5695)
<!-- Thank you for your contribution! Please review
https://microsoft.github.io/autogen/docs/Contribute before opening a
pull request. -->

Claude 3.7 just came out. Its a pretty capable model and it would be
great to support it in Autogen.
This will could augment the already excellent support we have for
Anthropic via the SKAdapters in the following ways

- Based on the ChatCompletion API similar to the ollama and openai
client
- Configurable/serializable (can be dumped) .. this means it can be used
easily in AGS.

## What is Supported 

(video below shows the client being used in autogen studio)

https://github.com/user-attachments/assets/8fb7c17c-9f9c-4525-aa9c-f256aad0f40b



- streaming 
- tool callign / function calling 
- drop in integration with assistant agent. 
- multimodal support

```python

from dotenv import load_dotenv
import os 

load_dotenv()

from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_ext.models.anthropic import AnthropicChatCompletionClient 
model_client =   AnthropicChatCompletionClient(
        model="claude-3-7-sonnet-20250219" 
    )

async def get_weather(city: str) -> str:
    """Get the weather for a given city."""
    return f"The weather in {city} is 73 degrees and Sunny."

 
agent = AssistantAgent(
    name="weather_agent",
    model_client=model_client,
    tools=[get_weather],
    system_message="You are a helpful assistant.", 
    # model_client_stream=True,   
)

# Run the agent and stream the messages to the console.
async def main() -> None:
    await Console(agent.run_stream(task="What is the weather in New York?"))
await main()
```

result 

```
messages = [
    UserMessage(content="Write a very short story about a dragon.", source="user"),
]

# Create a stream.
stream = model_client.create_stream(messages=messages)

# Iterate over the stream and print the responses.
print("Streamed responses:")
async for response in stream:  # type: ignore
    if isinstance(response, str):
        # A partial response is a string.
        print(response, flush=True, end="")
    else:
        # The last response is a CreateResult object with the complete message.
        print("\n\n------------\n")
        print("The complete response:", flush=True)
        print(response.content, flush=True)
        print("\n\n------------\n")
        print("The token usage was:", flush=True)
        print(response.usage, flush=True)
```

 

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number

<!-- For example: "Closes #1234" --> 

Closes #5205 
Closes #5708

## Checks

- [ ] I've included any doc changes needed for
<https://microsoft.github.io/autogen/>. See
<https://github.com/microsoft/autogen/blob/main/CONTRIBUTING.md> to
build and test documentation locally.
- [ ] I've added tests (if relevant) corresponding to the changes
introduced in this PR.
- [ ] I've made sure all auto checks have passed. 



cc @rohanthacker
2025-02-26 07:27:41 +00:00

163 lines
5.7 KiB
Python

import asyncio
import json
import logging
import os
import time
from pathlib import Path
from typing import AsyncGenerator, Callable, List, Optional, Union
import aiofiles
import yaml
from autogen_agentchat.base import TaskResult, Team
from autogen_agentchat.messages import AgentEvent, ChatMessage
from autogen_core import EVENT_LOGGER_NAME, CancellationToken, Component, ComponentModel
from autogen_core.logging import LLMCallEvent
from ..datamodel.types import EnvironmentVariable, LLMCallEventMessage, TeamResult
logger = logging.getLogger(__name__)
class RunEventLogger(logging.Handler):
"""Event logger that queues LLMCallEvents for streaming"""
def __init__(self):
super().__init__()
self.events = asyncio.Queue()
def emit(self, record: logging.LogRecord):
if isinstance(record.msg, LLMCallEvent):
self.events.put_nowait(LLMCallEventMessage(content=str(record.msg)))
class TeamManager:
"""Manages team operations including loading configs and running teams"""
@staticmethod
async def load_from_file(path: Union[str, Path]) -> dict:
"""Load team configuration from JSON/YAML file"""
path = Path(path)
if not path.exists():
raise FileNotFoundError(f"Config file not found: {path}")
async with aiofiles.open(path) as f:
content = await f.read()
if path.suffix == ".json":
return json.loads(content)
elif path.suffix in (".yml", ".yaml"):
return yaml.safe_load(content)
raise ValueError(f"Unsupported file format: {path.suffix}")
@staticmethod
async def load_from_directory(directory: Union[str, Path]) -> List[dict]:
"""Load all team configurations from a directory"""
directory = Path(directory)
configs = []
valid_extensions = {".json", ".yaml", ".yml"}
for path in directory.iterdir():
if path.is_file() and path.suffix.lower() in valid_extensions:
try:
config = await TeamManager.load_from_file(path)
configs.append(config)
except Exception as e:
logger.error(f"Failed to load {path}: {e}")
return configs
async def _create_team(
self,
team_config: Union[str, Path, dict, ComponentModel],
input_func: Optional[Callable] = None,
env_vars: Optional[List[EnvironmentVariable]] = None,
) -> Component:
"""Create team instance from config"""
if isinstance(team_config, (str, Path)):
config = await self.load_from_file(team_config)
elif isinstance(team_config, dict):
config = team_config
else:
config = team_config.model_dump()
# Load env vars into environment if provided
if env_vars:
logger.info("Loading environment variables")
for var in env_vars:
os.environ[var.name] = var.value
team = Team.load_component(config)
for agent in team._participants:
if hasattr(agent, "input_func"):
agent.input_func = input_func
return team
async def run_stream(
self,
task: str,
team_config: Union[str, Path, dict, ComponentModel],
input_func: Optional[Callable] = None,
cancellation_token: Optional[CancellationToken] = None,
env_vars: Optional[List[EnvironmentVariable]] = None,
) -> AsyncGenerator[Union[AgentEvent | ChatMessage | LLMCallEvent, ChatMessage, TeamResult], None]:
"""Stream team execution results"""
start_time = time.time()
team = None
# Setup logger correctly
logger = logging.getLogger(EVENT_LOGGER_NAME)
logger.setLevel(logging.INFO)
llm_event_logger = RunEventLogger()
logger.handlers = [llm_event_logger] # Replace all handlers
try:
team = await self._create_team(team_config, input_func, env_vars)
async for message in team.run_stream(task=task, cancellation_token=cancellation_token):
if cancellation_token and cancellation_token.is_cancelled():
break
if isinstance(message, TaskResult):
yield TeamResult(task_result=message, usage="", duration=time.time() - start_time)
else:
yield message
# Check for any LLM events
while not llm_event_logger.events.empty():
event = await llm_event_logger.events.get()
yield event
finally:
# Cleanup - remove our handler
logger.handlers.remove(llm_event_logger)
# Ensure cleanup happens
if team and hasattr(team, "_participants"):
for agent in team._participants:
if hasattr(agent, "close"):
await agent.close()
async def run(
self,
task: str,
team_config: Union[str, Path, dict, ComponentModel],
input_func: Optional[Callable] = None,
cancellation_token: Optional[CancellationToken] = None,
env_vars: Optional[List[EnvironmentVariable]] = None,
) -> TeamResult:
"""Run team synchronously"""
start_time = time.time()
team = None
try:
team = await self._create_team(team_config, input_func, env_vars)
result = await team.run(task=task, cancellation_token=cancellation_token)
return TeamResult(task_result=result, usage="", duration=time.time() - start_time)
finally:
if team and hasattr(team, "_participants"):
for agent in team._participants:
if hasattr(agent, "close"):
await agent.close()