Victor Dibia 7fc7f383f0
Enable LLM Call Observability in AGS (#5457)
<!-- Thank you for your contribution! Please review
https://microsoft.github.io/autogen/docs/Contribute before opening a
pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

It is often helpful to inspect the raw request and response to/from an
LLM as agents act.
This PR, does the following:

- Updates TeamManager to yield LLMCallEvents from core library
- Run in an async background task to listen for LLMCallEvent JIT style
when a team is run
   - Add events to   an async queue and 
- yield those events in addition to whatever actual agentchat
team.run_stream yields.
- Update the AGS UI to show those LLMCallEvents in the messages section
as a team runs
   - Add settings panel to show/hide llm call events in messages. 
- Minor updates to default team

<img width="1539" alt="image"
src="https://github.com/user-attachments/assets/bfbb19fe-3560-4faa-b600-7dd244e0e974"
/>
<img width="1554" alt="image"
src="https://github.com/user-attachments/assets/775624f5-ba83-46e8-81ff-512bfeed6bab"
/>
<img width="1538" alt="image"
src="https://github.com/user-attachments/assets/3becbf85-b75a-4506-adb7-e5575ebbaec4"
/>


## Why are these changes needed?

<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number

<!-- For example: "Closes #1234" -->

Closes #5440 

## Checks

- [ ] I've included any doc changes needed for
https://microsoft.github.io/autogen/. See
https://microsoft.github.io/autogen/docs/Contribute#documentation to
build and test documentation locally.
- [ ] I've added tests (if relevant) corresponding to the changes
introduced in this PR.
- [ ] I've made sure all auto checks have passed.
2025-02-09 04:50:12 +00:00

164 lines
4.0 KiB
Python

# api/app.py
import os
from contextlib import asynccontextmanager
from typing import AsyncGenerator
# import logging
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from loguru import logger
from ..version import VERSION
from .config import settings
from .deps import cleanup_managers, init_managers
from .initialization import AppInitializer
from .routes import runs, sessions, teams, ws
# Initialize application
app_file_path = os.path.dirname(os.path.abspath(__file__))
initializer = AppInitializer(settings, app_file_path)
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
"""
Lifecycle manager for the FastAPI application.
Handles initialization and cleanup of application resources.
"""
try:
# Initialize managers (DB, Connection, Team)
await init_managers(initializer.database_uri, initializer.config_dir, initializer.app_root)
# Any other initialization code
logger.info(
f"Application startup complete. Navigate to http://{os.environ.get('AUTOGENSTUDIO_HOST', '127.0.0.1')}:{os.environ.get('AUTOGENSTUDIO_PORT', '8081')}"
)
except Exception as e:
logger.error(f"Failed to initialize application: {str(e)}")
raise
yield # Application runs here
# Shutdown
try:
logger.info("Cleaning up application resources...")
await cleanup_managers()
logger.info("Application shutdown complete")
except Exception as e:
logger.error(f"Error during shutdown: {str(e)}")
# Create FastAPI application
app = FastAPI(lifespan=lifespan, debug=True)
# CORS middleware configuration
app.add_middleware(
CORSMiddleware,
allow_origins=[
"http://localhost:8000",
"http://127.0.0.1:8000",
"http://localhost:8001",
"http://localhost:8081",
],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Create API router with version and documentation
api = FastAPI(
root_path="/api",
title="AutoGen Studio API",
version=VERSION,
description="AutoGen Studio is a low-code tool for building and testing multi-agent workflows.",
docs_url="/docs" if settings.API_DOCS else None,
)
# Include all routers with their prefixes
api.include_router(
sessions.router,
prefix="/sessions",
tags=["sessions"],
responses={404: {"description": "Not found"}},
)
api.include_router(
runs.router,
prefix="/runs",
tags=["runs"],
responses={404: {"description": "Not found"}},
)
api.include_router(
teams.router,
prefix="/teams",
tags=["teams"],
responses={404: {"description": "Not found"}},
)
api.include_router(
ws.router,
prefix="/ws",
tags=["websocket"],
responses={404: {"description": "Not found"}},
)
# Version endpoint
@api.get("/version")
async def get_version():
"""Get API version"""
return {
"status": True,
"message": "Version retrieved successfully",
"data": {"version": VERSION},
}
# Health check endpoint
@api.get("/health")
async def health_check():
"""API health check endpoint"""
return {
"status": True,
"message": "Service is healthy",
}
# Mount static file directories
app.mount("/api", api)
app.mount(
"/files",
StaticFiles(directory=initializer.static_root, html=True),
name="files",
)
app.mount("/", StaticFiles(directory=initializer.ui_root, html=True), name="ui")
# Error handlers
@app.exception_handler(500)
async def internal_error_handler(request, exc):
logger.error(f"Internal error: {str(exc)}")
return {
"status": False,
"message": "Internal server error",
"detail": str(exc) if settings.API_DOCS else "Internal server error",
}
def create_app() -> FastAPI:
"""
Factory function to create and configure the FastAPI application.
Useful for testing and different deployment scenarios.
"""
return app