mirror of
https://github.com/HKUDS/LightRAG.git
synced 2025-07-05 08:01:05 +00:00

- Add role-based token system with metadata support - Implement automatic guest mode for unconfigured authentication - Create new /auth-status endpoint for authentication status checking - Modify frontend to auto-detect auth status and bypass login when appropriate - Add guest mode indicator in site header for better UX This change allows users to automatically access the system without manual login when authentication is not configured, while maintaining secure authentication when credentials are properly set up.
93 lines
3.0 KiB
Python
93 lines
3.0 KiB
Python
import os
|
|
from datetime import datetime, timedelta
|
|
import jwt
|
|
from fastapi import HTTPException, status
|
|
from pydantic import BaseModel
|
|
|
|
|
|
class TokenPayload(BaseModel):
|
|
sub: str # Username
|
|
exp: datetime # Expiration time
|
|
role: str = "user" # User role, default is regular user
|
|
metadata: dict = {} # Additional metadata
|
|
|
|
|
|
class AuthHandler:
|
|
def __init__(self):
|
|
self.secret = os.getenv("TOKEN_SECRET", "4f85ds4f56dsf46")
|
|
self.algorithm = "HS256"
|
|
self.expire_hours = int(os.getenv("TOKEN_EXPIRE_HOURS", 4))
|
|
self.guest_expire_hours = int(os.getenv("GUEST_TOKEN_EXPIRE_HOURS", 2)) # Guest token default expiration time
|
|
|
|
def create_token(self, username: str, role: str = "user", custom_expire_hours: int = None, metadata: dict = None) -> str:
|
|
"""
|
|
Create JWT token
|
|
|
|
Args:
|
|
username: Username
|
|
role: User role, default is "user", guest is "guest"
|
|
custom_expire_hours: Custom expiration time (hours), if None use default value
|
|
metadata: Additional metadata
|
|
|
|
Returns:
|
|
str: Encoded JWT token
|
|
"""
|
|
# Choose default expiration time based on role
|
|
if custom_expire_hours is None:
|
|
if role == "guest":
|
|
expire_hours = self.guest_expire_hours
|
|
else:
|
|
expire_hours = self.expire_hours
|
|
else:
|
|
expire_hours = custom_expire_hours
|
|
|
|
expire = datetime.utcnow() + timedelta(hours=expire_hours)
|
|
|
|
# Create payload
|
|
payload = TokenPayload(
|
|
sub=username,
|
|
exp=expire,
|
|
role=role,
|
|
metadata=metadata or {}
|
|
)
|
|
|
|
return jwt.encode(payload.dict(), self.secret, algorithm=self.algorithm)
|
|
|
|
def validate_token(self, token: str) -> dict:
|
|
"""
|
|
Validate JWT token
|
|
|
|
Args:
|
|
token: JWT token
|
|
|
|
Returns:
|
|
dict: Dictionary containing user information
|
|
|
|
Raises:
|
|
HTTPException: If token is invalid or expired
|
|
"""
|
|
try:
|
|
payload = jwt.decode(token, self.secret, algorithms=[self.algorithm])
|
|
expire_timestamp = payload["exp"]
|
|
expire_time = datetime.utcfromtimestamp(expire_timestamp)
|
|
|
|
if datetime.utcnow() > expire_time:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED, detail="Token expired"
|
|
)
|
|
|
|
# Return complete payload instead of just username
|
|
return {
|
|
"username": payload["sub"],
|
|
"role": payload.get("role", "user"),
|
|
"metadata": payload.get("metadata", {}),
|
|
"exp": expire_time
|
|
}
|
|
except jwt.PyJWTError:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token"
|
|
)
|
|
|
|
|
|
auth_handler = AuthHandler()
|