106 lines
3.2 KiB
Python
Raw Normal View History

import os
from datetime import datetime, timedelta
import jwt
from fastapi import HTTPException, status
from pydantic import BaseModel
from dotenv import load_dotenv
load_dotenv()
2025-03-21 16:56:47 +08:00
class TokenPayload(BaseModel):
sub: str # Username
exp: datetime # Expiration time
role: str = "user" # User role, default is regular user
metadata: dict = {} # Additional metadata
class AuthHandler:
def __init__(self):
self.secret = os.getenv("TOKEN_SECRET", "4f85ds4f56dsf46")
self.algorithm = "HS256"
self.expire_hours = int(os.getenv("TOKEN_EXPIRE_HOURS", 4))
self.guest_expire_hours = int(os.getenv("GUEST_TOKEN_EXPIRE_HOURS", 2))
self.accounts = {}
auth_accounts = os.getenv("AUTH_ACCOUNTS")
if auth_accounts:
for account in auth_accounts.split(','):
username, password = account.split(':', 1)
self.accounts[username] = password
2025-03-18 03:30:43 +08:00
def create_token(
self,
username: str,
role: str = "user",
custom_expire_hours: int = None,
metadata: dict = None,
) -> str:
"""
Create JWT token
2025-03-18 03:30:43 +08:00
Args:
username: Username
role: User role, default is "user", guest is "guest"
custom_expire_hours: Custom expiration time (hours), if None use default value
metadata: Additional metadata
2025-03-18 03:30:43 +08:00
Returns:
str: Encoded JWT token
"""
# Choose default expiration time based on role
if custom_expire_hours is None:
if role == "guest":
expire_hours = self.guest_expire_hours
else:
expire_hours = self.expire_hours
else:
expire_hours = custom_expire_hours
2025-03-18 03:30:43 +08:00
expire = datetime.utcnow() + timedelta(hours=expire_hours)
2025-03-18 03:30:43 +08:00
# Create payload
payload = TokenPayload(
2025-03-18 03:30:43 +08:00
sub=username, exp=expire, role=role, metadata=metadata or {}
)
2025-03-18 03:30:43 +08:00
return jwt.encode(payload.dict(), self.secret, algorithm=self.algorithm)
def validate_token(self, token: str) -> dict:
"""
Validate JWT token
2025-03-18 03:30:43 +08:00
Args:
token: JWT token
2025-03-18 03:30:43 +08:00
Returns:
dict: Dictionary containing user information
2025-03-18 03:30:43 +08:00
Raises:
HTTPException: If token is invalid or expired
"""
try:
payload = jwt.decode(token, self.secret, algorithms=[self.algorithm])
expire_timestamp = payload["exp"]
expire_time = datetime.utcfromtimestamp(expire_timestamp)
if datetime.utcnow() > expire_time:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Token expired"
)
2025-03-18 03:30:43 +08:00
# Return complete payload instead of just username
return {
"username": payload["sub"],
"role": payload.get("role", "user"),
"metadata": payload.get("metadata", {}),
2025-03-18 03:30:43 +08:00
"exp": expire_time,
}
except jwt.PyJWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token"
)
auth_handler = AuthHandler()