Add an example using autogen-core and FastAPI to create streaming responses (#6335)

## Why are these changes needed?

This PR adds an example demonstrates how to build a streaming chat API
with multi-turn conversation history using `autogen-core` and FastAPI.

## Related issue number


## Checks

- [x] I've included any doc changes needed for
<https://microsoft.github.io/autogen/>. See
<https://github.com/microsoft/autogen/blob/main/CONTRIBUTING.md> to
build and test documentation locally.
- [x] I've added tests (if relevant) corresponding to the changes
introduced in this PR.
- [x] I've made sure all auto checks have passed.

---------

Co-authored-by: Eric Zhu <ekzhu@users.noreply.github.com>
This commit is contained in:
ToryPan 2025-04-22 07:55:03 +08:00 committed by GitHub
parent 9b0a0bd6b8
commit 89d77c77c5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 269 additions and 0 deletions

View File

@ -0,0 +1,5 @@
model_config.yaml
agent_state.json
agent_history.json
team_state.json
team_history.json

View File

@ -0,0 +1,97 @@
# AutoGen-Core Streaming Chat API with FastAPI
This sample demonstrates how to build a streaming chat API with multi-turn conversation history using `autogen-core` and FastAPI.
## Key Features
1. **Streaming Response**: Implements real-time streaming of LLM responses by utilizing FastAPI's `StreamingResponse`, `autogen-core`'s asynchronous features, and a global queue created with `asyncio.Queue()` to manage the data stream, thereby providing faster user-perceived response times.
2. **Multi-Turn Conversation**: The Agent (`MyAgent`) can receive and process chat history records (`ChatHistory`) containing multiple turns of interaction, enabling context-aware continuous conversations.
## File Structure
* `app.py`: FastAPI application code, including API endpoints, Agent definitions, runtime settings, and streaming logic.
* `README.md`: (This document) Project introduction and usage instructions.
## Installation
First, make sure you have Python installed (recommended 3.8 or higher). Then, in your project directory, install the necessary libraries via pip:
```bash
pip install "fastapi" "uvicorn[standard]" "autogen-core" "autogen-ext[openai]"
```
## Configuration
Create a new file named `model_config.yaml` in the same directory as this README file to configure your model settings.
See `model_config_template.yaml` for an example.
**Note**: Hardcoding API keys directly in the code is only suitable for local testing. For production environments, it is strongly recommended to use environment variables or other secure methods to manage keys.
## Running the Application
In the directory containing `app.py`, run the following command to start the FastAPI application:
```bash
uvicorn app:app --host 0.0.0.0 --port 8501 --reload
```
After the service starts, the API endpoint will be available at `http://<your-server-ip>:8501/chat/completions`.
## Using the API
You can interact with the Agent by sending a POST request to the `/chat/completions` endpoint. The request body must be in JSON format and contain a `messages` field, the value of which is a list, where each element represents a turn of conversation.
**Request Body Format**:
```json
{
"messages": [
{"source": "user", "content": "Hello!"},
{"source": "assistant", "content": "Hello! How can I help you?"},
{"source": "user", "content": "Introduce yourself."}
]
}
```
**Example (using curl)**:
```bash
curl -N -X POST http://localhost:8501/chat/completions \
-H "Content-Type: application/json" \
-d '{
"messages": [
{"source": "user", "content": "Hello, I'\''m Tory."},
{"source": "assistant", "content": "Hello Tory, nice to meet you!"},
{"source": "user", "content": "Say hello by my name and introduce yourself."}
]
}'
```
**Example (using Python requests)**:
```python
import requests
import json
url = "http://localhost:8501/chat/completions"
data = {
'stream': True,
'messages': [
{'source': 'user', 'content': "Hello,I'm tory."},
{'source': 'assistant', 'content':"hello Tory, nice to meet you!"},
{'source': 'user', 'content': "Say hello by my name and introduce yourself."}
]
}
headers = {'Content-Type': 'application/json'}
try:
response = requests.post(url, json=data, headers=headers, stream=True)
response.raise_for_status()
for chunk in response.iter_content(chunk_size=None):
if chunk:
print(json.loads(chunk)["content"], end='', flush=True)
except requests.exceptions.RequestException as e:
print(f"Error: {e}")
except json.JSONDecodeError as e:
print(f"JSON Decode Error: {e}")
```

View File

@ -0,0 +1,141 @@
import asyncio
import json
import time
from contextlib import asynccontextmanager
from dataclasses import dataclass
from typing import AsyncGenerator, Dict, List
import aiofiles
import yaml
from autogen_core import (
AgentId,
MessageContext,
RoutedAgent,
SingleThreadedAgentRuntime,
message_handler,
)
from autogen_core.models import AssistantMessage, ChatCompletionClient, LLMMessage, SystemMessage, UserMessage
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import StreamingResponse
@dataclass
class AgentResponse:
"""
Represents the final accumulated response content from the LLM agent.
Note: The 'content' field hold the final response content.
"""
content: str
@dataclass
class UserRequest:
"""
Represents the chat history, containing a list of messages.
Each message is expected to be a dictionary with 'source' and 'content' keys.
"""
messages: List[Dict[str, str]]
# Runtime for the agent.
runtime = SingleThreadedAgentRuntime()
# Queue for streaming results from the agent back to the request handler
response_queue: asyncio.Queue[str | object] = asyncio.Queue()
# Sentinel object to signal the end of the stream
STREAM_DONE = object()
class MyAgent(RoutedAgent):
def __init__(self, name: str, model_client: ChatCompletionClient) -> None:
super().__init__(name)
self._system_messages = [SystemMessage(content="You are a helpful assistant.")]
self._model_client = model_client
self._response_queue = response_queue
@message_handler
async def handle_user_message(self, message: UserRequest, ctx: MessageContext) -> AgentResponse:
accumulated_content = "" # To store the full response.
try:
_message = message.messages
user_messages: List[LLMMessage] = []
for m in _message:
if m["source"] == "user":
user_messages.append(UserMessage(content=m["source"], source=m["source"]))
else:
user_messages.append(AssistantMessage(content=m["source"], source=m["source"]))
# Create a stream of messages to the model client.
async for i in self._model_client.create_stream(user_messages, cancellation_token=ctx.cancellation_token):
if isinstance(i, str):
accumulated_content += i
await self._response_queue.put(i)
else:
break
await self._response_queue.put(STREAM_DONE)
return AgentResponse(content=accumulated_content)
except Exception as e:
await self._response_queue.put("ERROR:" + str(e))
return AgentResponse(content=str(e))
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
# Get model client from config.
async with aiofiles.open("model_config.yaml", "r") as file:
model_config = yaml.safe_load(await file.read())
model_client = ChatCompletionClient.load_component(model_config)
# Register the agent with the runtime.
await MyAgent.register(
runtime,
"simple_agent",
lambda: MyAgent(
"myagent",
model_client=model_client,
),
)
# Start the agent runtime.
runtime.start()
yield
await runtime.stop()
app = FastAPI(lifespan=lifespan)
@app.post("/chat/completions")
async def chat_completions_stream(request: Request):
json_data = await request.json()
messages = json_data.get("messages", "")
if not isinstance(messages, list):
raise HTTPException(status_code=400, detail="Invalid input: 'messages' must be a list.")
user_request = UserRequest(messages=messages) # type: ignore
async def response_stream() -> AsyncGenerator[str, None]:
task1 = asyncio.create_task(runtime.send_message(user_request, AgentId("simple_agent", "default")))
# Consume items from the response queue until the stream ends or an error occurs
while True:
item = await response_queue.get()
if item is STREAM_DONE:
print(f"{time.time():.2f} - MAIN: Received STREAM_DONE. Exiting loop.")
break
elif isinstance(item, str) and item.startswith("ERROR:"):
print(f"{time.time():.2f} - MAIN: Received error message from agent: {item}")
break
else:
yield json.dumps({"content": item}) + "\n"
# Wait for the task to finish.
await task1
return StreamingResponse(response_stream(), media_type="text/plain") # type: ignore
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8501)

View File

@ -0,0 +1,26 @@
# Use Open AI with key
provider: autogen_ext.models.openai.OpenAIChatCompletionClient
config:
model: gpt-4o
api_key: REPLACE_WITH_YOUR_API_KEY
# Use Azure Open AI with key
# provider: autogen_ext.models.openai.AzureOpenAIChatCompletionClient
# config:
# model: gpt-4o
# azure_endpoint: https://{your-custom-endpoint}.openai.azure.com/
# azure_deployment: {your-azure-deployment}
# api_version: {your-api-version}
# api_key: REPLACE_WITH_YOUR_API_KEY
# Use Azure OpenAI with AD token provider.
# provider: autogen_ext.models.openai.AzureOpenAIChatCompletionClient
# config:
# model: gpt-4o
# azure_endpoint: https://{your-custom-endpoint}.openai.azure.com/
# azure_deployment: {your-azure-deployment}
# api_version: {your-api-version}
# azure_ad_token_provider:
# provider: autogen_ext.auth.azure.AzureTokenProvider
# config:
# provider_kind: DefaultAzureCredential
# scopes:
# - https://cognitiveservices.azure.com/.default