""" This module contains all graph-related routes for the LightRAG API. """ from typing import Optional, Dict, Any from fastapi import APIRouter, Depends, Query, HTTPException from pydantic import BaseModel from ..utils_api import get_combined_auth_dependency router = APIRouter(tags=["graph"]) class EntityUpdateRequest(BaseModel): entity_name: str updated_data: Dict[str, Any] allow_rename: bool = False def create_graph_routes(rag, api_key: Optional[str] = None): combined_auth = get_combined_auth_dependency(api_key) @router.get("/graph/label/list", dependencies=[Depends(combined_auth)]) async def get_graph_labels(): """ Get all graph labels Returns: List[str]: List of graph labels """ return await rag.get_graph_labels() @router.get("/graphs", dependencies=[Depends(combined_auth)]) async def get_knowledge_graph( label: str = Query(..., description="Label to get knowledge graph for"), max_depth: int = Query(3, description="Maximum depth of graph", ge=1), max_nodes: int = Query(1000, description="Maximum nodes to return", ge=1), ): """ Retrieve a connected subgraph of nodes where the label includes the specified label. When reducing the number of nodes, the prioritization criteria are as follows: 1. Hops(path) to the staring node take precedence 2. Followed by the degree of the nodes Args: label (str): Label of the starting node max_depth (int, optional): Maximum depth of the subgraph,Defaults to 3 max_nodes: Maxiumu nodes to return Returns: Dict[str, List[str]]: Knowledge graph for label """ return await rag.get_knowledge_graph( node_label=label, max_depth=max_depth, max_nodes=max_nodes, ) @router.get("/graph/entity/exists", dependencies=[Depends(combined_auth)]) async def check_entity_exists( name: str = Query(..., description="Entity name to check"), ): """ Check if an entity with the given name exists in the knowledge graph Args: name (str): Name of the entity to check Returns: Dict[str, bool]: Dictionary with 'exists' key indicating if entity exists """ try: exists = await rag.chunk_entity_relation_graph.has_node(name) return {"exists": exists} except Exception as e: raise HTTPException( status_code=500, detail=f"Error checking entity existence: {str(e)}" ) @router.post("/graph/entity/edit", dependencies=[Depends(combined_auth)]) async def update_entity(request: EntityUpdateRequest): """ Update an entity's properties in the knowledge graph Args: request (EntityUpdateRequest): Request containing entity name, updated data, and rename flag Returns: Dict: Updated entity information """ try: print(request.entity_name, request.updated_data, request.allow_rename) result = await rag.aedit_entity( entity_name=request.entity_name, updated_data=request.updated_data, allow_rename=request.allow_rename, ) return { "status": "success", "message": "Entity updated successfully", "data": result, } except ValueError as ve: raise HTTPException(status_code=400, detail=str(ve)) except Exception as e: raise HTTPException( status_code=500, detail=f"Error updating entity: {str(e)}" ) return router