LightRAG/lightrag/api/lightrag_server.py

494 lines
16 KiB
Python
Raw Normal View History

"""
LightRAG FastAPI Server
"""
2025-01-31 23:35:42 +08:00
from fastapi import (
FastAPI,
Depends,
2025-01-31 23:35:42 +08:00
)
from fastapi.responses import FileResponse
import asyncio
import threading
import os
2025-01-24 13:55:50 +01:00
from fastapi.staticfiles import StaticFiles
import logging
2025-02-20 04:12:21 +08:00
from typing import Dict
from pathlib import Path
import configparser
from ascii_colors import ASCIIColors
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
2025-01-17 02:34:29 +01:00
from dotenv import load_dotenv
from .utils_api import (
get_api_key_dependency,
parse_args,
get_default_host,
display_splash_screen,
)
from lightrag import LightRAG
from lightrag.types import GPTKeywordExtractionFormat
from lightrag.api import __api_version__
from lightrag.utils import EmbeddingFunc
from lightrag.utils import logger
from .routers.document_routes import (
DocumentManager,
create_document_routes,
run_scanning_process,
)
from .routers.query_routes import create_query_routes
from .routers.graph_routes import create_graph_routes
from .routers.ollama_api import OllamaAPI
2025-02-18 16:33:57 +08:00
2025-02-01 15:22:40 +08:00
# Load environment variables
2025-02-17 15:10:15 +08:00
try:
load_dotenv(override=True)
except Exception as e:
logger.warning(f"Failed to load .env file: {e}")
# Initialize config parser
config = configparser.ConfigParser()
config.read("config.ini")
# Global configuration
global_top_k = 60 # default value
2025-02-01 15:22:40 +08:00
# Global progress tracker
scan_progress: Dict = {
"is_scanning": False,
"current_file": "",
"indexed_count": 0,
"total_files": 0,
"progress": 0,
}
# Lock for thread-safe operations
progress_lock = threading.Lock()
2025-02-20 04:12:21 +08:00
class AccessLogFilter(logging.Filter):
def __init__(self):
super().__init__()
# Define paths to be filtered
self.filtered_paths = ["/documents", "/health", "/webui/"]
def filter(self, record):
try:
if not hasattr(record, "args") or not isinstance(record.args, tuple):
return True
if len(record.args) < 5:
return True
method = record.args[1]
path = record.args[2]
status = record.args[4]
# print(f"Debug - Method: {method}, Path: {path}, Status: {status}")
# print(f"Debug - Filtered paths: {self.filtered_paths}")
if (
method == "GET"
and (status == 200 or status == 304)
and path in self.filtered_paths
):
return False
return True
except Exception:
return True
def create_app(args):
# Set global top_k
global global_top_k
global_top_k = args.top_k # save top_k from args
# Initialize verbose debug setting
from lightrag.utils import set_verbose_debug
set_verbose_debug(args.verbose)
# Verify that bindings are correctly setup
if args.llm_binding not in [
"lollms",
"ollama",
"openai",
"openai-ollama",
"azure_openai",
]:
2025-01-10 20:30:58 +01:00
raise Exception("llm binding not supported")
if args.embedding_binding not in ["lollms", "ollama", "openai", "azure_openai"]:
2025-01-10 20:30:58 +01:00
raise Exception("embedding binding not supported")
# Set default hosts if not provided
if args.llm_binding_host is None:
args.llm_binding_host = get_default_host(args.llm_binding)
2025-01-26 05:10:57 +08:00
if args.embedding_binding_host is None:
args.embedding_binding_host = get_default_host(args.embedding_binding)
2025-01-11 01:35:49 +01:00
# Add SSL validation
if args.ssl:
if not args.ssl_certfile or not args.ssl_keyfile:
2025-01-11 01:37:07 +01:00
raise Exception(
"SSL certificate and key files must be provided when SSL is enabled"
)
2025-01-11 01:35:49 +01:00
if not os.path.exists(args.ssl_certfile):
raise Exception(f"SSL certificate file not found: {args.ssl_certfile}")
if not os.path.exists(args.ssl_keyfile):
raise Exception(f"SSL key file not found: {args.ssl_keyfile}")
2025-01-11 01:37:07 +01:00
# Setup logging
logging.basicConfig(
format="%(levelname)s:%(message)s", level=getattr(logging, args.log_level)
)
# Check if API key is provided either through env var or args
api_key = os.getenv("LIGHTRAG_API_KEY") or args.key
2025-01-04 02:23:39 +01:00
# Initialize document manager
doc_manager = DocumentManager(args.input_dir)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Lifespan context manager for startup and shutdown events"""
# Store background tasks
app.state.background_tasks = set()
try:
# Initialize database connections
await rag.initialize_storages()
# Auto scan documents if enabled
if args.auto_scan_at_startup:
# Start scanning in background
with progress_lock:
if not scan_progress["is_scanning"]:
scan_progress["is_scanning"] = True
scan_progress["indexed_count"] = 0
scan_progress["progress"] = 0
# Create background task
2025-02-20 04:12:21 +08:00
task = asyncio.create_task(
run_scanning_process(rag, doc_manager)
)
app.state.background_tasks.add(task)
task.add_done_callback(app.state.background_tasks.discard)
2025-02-14 01:14:12 +08:00
ASCIIColors.info(
f"Started background scanning of documents from {args.input_dir}"
)
else:
2025-02-14 01:14:12 +08:00
ASCIIColors.info(
"Skip document scanning(another scanning is active)"
2025-02-14 01:14:12 +08:00
)
yield
finally:
# Clean up database connections
await rag.finalize_storages()
# Initialize FastAPI
app = FastAPI(
title="LightRAG API",
2025-01-04 02:23:39 +01:00
description="API for querying text using LightRAG with separate storage and input directories"
+ "(With authentication)"
if api_key
else "",
version=__api_version__,
2025-01-04 02:23:39 +01:00
openapi_tags=[{"name": "api"}],
2025-01-21 01:03:37 +08:00
lifespan=lifespan,
)
2025-01-04 02:23:39 +01:00
def get_cors_origins():
"""Get allowed origins from environment variable
Returns a list of allowed origins, defaults to ["*"] if not set
"""
origins_str = os.getenv("CORS_ORIGINS", "*")
if origins_str == "*":
return ["*"]
return [origin.strip() for origin in origins_str.split(",")]
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=get_cors_origins(),
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Create the optional API key dependency
optional_api_key = get_api_key_dependency(api_key)
# Create working directory if it doesn't exist
Path(args.working_dir).mkdir(parents=True, exist_ok=True)
if args.llm_binding == "lollms" or args.embedding_binding == "lollms":
from lightrag.llm.lollms import lollms_model_complete, lollms_embed
if args.llm_binding == "ollama" or args.embedding_binding == "ollama":
from lightrag.llm.ollama import ollama_model_complete, ollama_embed
if args.llm_binding == "openai" or args.embedding_binding == "openai":
from lightrag.llm.openai import openai_complete_if_cache, openai_embed
2025-01-25 16:57:47 +08:00
if args.llm_binding == "azure_openai" or args.embedding_binding == "azure_openai":
from lightrag.llm.azure_openai import (
azure_openai_complete_if_cache,
azure_openai_embed,
)
if args.llm_binding_host == "openai-ollama" or args.embedding_binding == "ollama":
2025-01-26 09:13:11 +08:00
from lightrag.llm.openai import openai_complete_if_cache
from lightrag.llm.ollama import ollama_embed
async def openai_alike_model_complete(
2025-01-19 08:07:26 +08:00
prompt,
system_prompt=None,
history_messages=None,
2025-01-19 08:07:26 +08:00
keyword_extraction=False,
**kwargs,
) -> str:
keyword_extraction = kwargs.pop("keyword_extraction", None)
if keyword_extraction:
kwargs["response_format"] = GPTKeywordExtractionFormat
if history_messages is None:
history_messages = []
return await openai_complete_if_cache(
args.llm_model,
prompt,
system_prompt=system_prompt,
history_messages=history_messages,
base_url=args.llm_binding_host,
api_key=args.llm_binding_api_key,
**kwargs,
)
async def azure_openai_model_complete(
2025-01-19 08:07:26 +08:00
prompt,
system_prompt=None,
history_messages=None,
2025-01-19 08:07:26 +08:00
keyword_extraction=False,
**kwargs,
) -> str:
keyword_extraction = kwargs.pop("keyword_extraction", None)
if keyword_extraction:
kwargs["response_format"] = GPTKeywordExtractionFormat
if history_messages is None:
history_messages = []
return await azure_openai_complete_if_cache(
args.llm_model,
prompt,
system_prompt=system_prompt,
history_messages=history_messages,
base_url=args.llm_binding_host,
api_key=os.getenv("AZURE_OPENAI_API_KEY"),
api_version=os.getenv("AZURE_OPENAI_API_VERSION", "2024-08-01-preview"),
**kwargs,
)
embedding_func = EmbeddingFunc(
embedding_dim=args.embedding_dim,
max_token_size=args.max_embed_tokens,
func=lambda texts: lollms_embed(
texts,
embed_model=args.embedding_model,
host=args.embedding_binding_host,
2025-01-20 00:26:28 +01:00
api_key=args.embedding_binding_api_key,
)
if args.embedding_binding == "lollms"
else ollama_embed(
texts,
embed_model=args.embedding_model,
host=args.embedding_binding_host,
2025-01-20 00:26:28 +01:00
api_key=args.embedding_binding_api_key,
)
if args.embedding_binding == "ollama"
else azure_openai_embed(
texts,
model=args.embedding_model, # no host is used for openai,
2025-01-20 00:26:28 +01:00
api_key=args.embedding_binding_api_key,
)
if args.embedding_binding == "azure_openai"
else openai_embed(
texts,
2025-02-14 00:09:32 +01:00
model=args.embedding_model,
base_url=args.embedding_binding_host,
2025-01-20 00:26:28 +01:00
api_key=args.embedding_binding_api_key,
),
)
# Initialize RAG
if args.llm_binding in ["lollms", "ollama", "openai-ollama"]:
rag = LightRAG(
working_dir=args.working_dir,
llm_model_func=lollms_model_complete
2025-01-11 01:37:07 +01:00
if args.llm_binding == "lollms"
else ollama_model_complete
if args.llm_binding == "ollama"
else openai_alike_model_complete,
llm_model_name=args.llm_model,
llm_model_max_async=args.max_async,
llm_model_max_token_size=args.max_tokens,
chunk_token_size=int(args.chunk_size),
chunk_overlap_token_size=int(args.chunk_overlap_size),
llm_model_kwargs={
"host": args.llm_binding_host,
"timeout": args.timeout,
"options": {"num_ctx": args.max_tokens},
2025-01-20 00:26:28 +01:00
"api_key": args.llm_binding_api_key,
}
if args.llm_binding == "lollms" or args.llm_binding == "ollama"
else {},
embedding_func=embedding_func,
kv_storage=args.kv_storage,
graph_storage=args.graph_storage,
vector_storage=args.vector_storage,
doc_status_storage=args.doc_status_storage,
vector_db_storage_cls_kwargs={
"cosine_better_than_threshold": args.cosine_threshold
},
enable_llm_cache_for_entity_extract=False, # set to True for debuging to reduce llm fee
embedding_cache_config={
"enabled": True,
"similarity_threshold": 0.95,
"use_llm_check": False,
},
log_level=args.log_level,
namespace_prefix=args.namespace_prefix,
auto_manage_storages_states=False,
2025-01-19 08:07:26 +08:00
)
else:
rag = LightRAG(
working_dir=args.working_dir,
llm_model_func=azure_openai_model_complete
2025-01-11 01:37:07 +01:00
if args.llm_binding == "azure_openai"
else openai_alike_model_complete,
chunk_token_size=int(args.chunk_size),
chunk_overlap_token_size=int(args.chunk_overlap_size),
llm_model_kwargs={
"timeout": args.timeout,
},
llm_model_name=args.llm_model,
llm_model_max_async=args.max_async,
llm_model_max_token_size=args.max_tokens,
embedding_func=embedding_func,
kv_storage=args.kv_storage,
graph_storage=args.graph_storage,
vector_storage=args.vector_storage,
doc_status_storage=args.doc_status_storage,
vector_db_storage_cls_kwargs={
"cosine_better_than_threshold": args.cosine_threshold
},
enable_llm_cache_for_entity_extract=False, # set to True for debuging to reduce llm fee
embedding_cache_config={
"enabled": True,
"similarity_threshold": 0.95,
"use_llm_check": False,
},
log_level=args.log_level,
namespace_prefix=args.namespace_prefix,
auto_manage_storages_states=False,
)
# Add routes
app.include_router(create_document_routes(rag, doc_manager, api_key))
app.include_router(create_query_routes(rag, api_key, args.top_k))
app.include_router(create_graph_routes(rag, api_key))
# Add Ollama API routes
ollama_api = OllamaAPI(rag, top_k=args.top_k)
app.include_router(ollama_api.router, prefix="/api")
2025-01-27 02:10:24 +01:00
@app.get("/health", dependencies=[Depends(optional_api_key)])
async def get_status():
"""Get current system status"""
return {
"status": "healthy",
"working_directory": str(args.working_dir),
"input_directory": str(args.input_dir),
"configuration": {
2025-01-10 20:30:58 +01:00
# LLM configuration binding/host address (if applicable)/model (if applicable)
"llm_binding": args.llm_binding,
"llm_binding_host": args.llm_binding_host,
"llm_model": args.llm_model,
# embedding model configuration binding/host address (if applicable)/model (if applicable)
"embedding_binding": args.embedding_binding,
"embedding_binding_host": args.embedding_binding_host,
"embedding_model": args.embedding_model,
"max_tokens": args.max_tokens,
"kv_storage": args.kv_storage,
"doc_status_storage": args.doc_status_storage,
"graph_storage": args.graph_storage,
"vector_storage": args.vector_storage,
},
}
2025-01-24 21:01:34 +01:00
# Webui mount webui/index.html
2025-02-17 01:14:33 +08:00
static_dir = Path(__file__).parent / "webui"
2025-01-24 13:50:06 +01:00
static_dir.mkdir(exist_ok=True)
2025-02-20 04:12:21 +08:00
app.mount(
"/webui",
StaticFiles(directory=static_dir, html=True, check_dir=True),
name="webui",
)
@app.get("/webui/")
async def webui_root():
return FileResponse(static_dir / "index.html")
return app
2025-01-24 21:01:34 +01:00
def main():
args = parse_args()
import uvicorn
import logging.config
# Configure uvicorn logging
logging.config.dictConfig(
{
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"default": {
"format": "%(levelname)s: %(message)s",
},
},
"handlers": {
"default": {
"formatter": "default",
"class": "logging.StreamHandler",
"stream": "ext://sys.stderr",
},
},
"loggers": {
"uvicorn.access": {
"handlers": ["default"],
"level": "INFO",
"propagate": False,
},
},
}
)
# Add filter to uvicorn access logger
uvicorn_access_logger = logging.getLogger("uvicorn.access")
uvicorn_access_logger.addFilter(AccessLogFilter())
app = create_app(args)
2025-01-24 21:01:34 +01:00
display_splash_screen(args)
2025-01-11 01:35:49 +01:00
uvicorn_config = {
"app": app,
"host": args.host,
"port": args.port,
"log_config": None, # Disable default config
2025-01-11 01:37:07 +01:00
}
2025-01-11 01:35:49 +01:00
if args.ssl:
2025-01-11 01:37:07 +01:00
uvicorn_config.update(
{
"ssl_certfile": args.ssl_certfile,
"ssl_keyfile": args.ssl_keyfile,
}
)
2025-01-11 01:35:49 +01:00
uvicorn.run(**uvicorn_config)
2024-12-24 10:35:00 +01:00
if __name__ == "__main__":
main()