mirror of
https://github.com/HKUDS/LightRAG.git
synced 2025-07-04 07:26:17 +00:00
100 lines
3.0 KiB
Python
100 lines
3.0 KiB
Python
import os
|
|
from datetime import datetime, timedelta
|
|
import jwt
|
|
from fastapi import HTTPException, status
|
|
from pydantic import BaseModel
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv()
|
|
|
|
class TokenPayload(BaseModel):
|
|
sub: str # Username
|
|
exp: datetime # Expiration time
|
|
role: str = "user" # User role, default is regular user
|
|
metadata: dict = {} # Additional metadata
|
|
|
|
|
|
class AuthHandler:
|
|
def __init__(self):
|
|
self.secret = os.getenv("TOKEN_SECRET", "4f85ds4f56dsf46")
|
|
self.algorithm = "HS256"
|
|
self.expire_hours = int(os.getenv("TOKEN_EXPIRE_HOURS", 4))
|
|
self.guest_expire_hours = int(
|
|
os.getenv("GUEST_TOKEN_EXPIRE_HOURS", 2)
|
|
) # Guest token default expiration time
|
|
|
|
def create_token(
|
|
self,
|
|
username: str,
|
|
role: str = "user",
|
|
custom_expire_hours: int = None,
|
|
metadata: dict = None,
|
|
) -> str:
|
|
"""
|
|
Create JWT token
|
|
|
|
Args:
|
|
username: Username
|
|
role: User role, default is "user", guest is "guest"
|
|
custom_expire_hours: Custom expiration time (hours), if None use default value
|
|
metadata: Additional metadata
|
|
|
|
Returns:
|
|
str: Encoded JWT token
|
|
"""
|
|
# Choose default expiration time based on role
|
|
if custom_expire_hours is None:
|
|
if role == "guest":
|
|
expire_hours = self.guest_expire_hours
|
|
else:
|
|
expire_hours = self.expire_hours
|
|
else:
|
|
expire_hours = custom_expire_hours
|
|
|
|
expire = datetime.utcnow() + timedelta(hours=expire_hours)
|
|
|
|
# Create payload
|
|
payload = TokenPayload(
|
|
sub=username, exp=expire, role=role, metadata=metadata or {}
|
|
)
|
|
|
|
return jwt.encode(payload.dict(), self.secret, algorithm=self.algorithm)
|
|
|
|
def validate_token(self, token: str) -> dict:
|
|
"""
|
|
Validate JWT token
|
|
|
|
Args:
|
|
token: JWT token
|
|
|
|
Returns:
|
|
dict: Dictionary containing user information
|
|
|
|
Raises:
|
|
HTTPException: If token is invalid or expired
|
|
"""
|
|
try:
|
|
payload = jwt.decode(token, self.secret, algorithms=[self.algorithm])
|
|
expire_timestamp = payload["exp"]
|
|
expire_time = datetime.utcfromtimestamp(expire_timestamp)
|
|
|
|
if datetime.utcnow() > expire_time:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED, detail="Token expired"
|
|
)
|
|
|
|
# Return complete payload instead of just username
|
|
return {
|
|
"username": payload["sub"],
|
|
"role": payload.get("role", "user"),
|
|
"metadata": payload.get("metadata", {}),
|
|
"exp": expire_time,
|
|
}
|
|
except jwt.PyJWTError:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token"
|
|
)
|
|
|
|
|
|
auth_handler = AuthHandler()
|