UncleCode 3a234ec950 fix(auth): make JWT authentication optional with fallback
Modify authentication system to gracefully handle cases where JWT is not enabled or token is missing. This includes:
- Making HTTPBearer auto_error=False to prevent automatic 403 errors
- Updating token dependency to return None when JWT is disabled
- Fixing model deserialization in CrawlResult
- Updating documentation links
- Cleaning up imports

BREAKING CHANGE: Authentication behavior changed to be more permissive when JWT is disabled
2025-03-05 17:14:42 +08:00

55 lines
2.0 KiB
Python

import os
from datetime import datetime, timedelta, timezone
from typing import Dict, Optional
from jwt import JWT, jwk_from_dict
from jwt.utils import get_int_from_datetime
from fastapi import Depends, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import EmailStr
from pydantic.main import BaseModel
import base64
instance = JWT()
security = HTTPBearer(auto_error=False)
SECRET_KEY = os.environ.get("SECRET_KEY", "mysecret")
ACCESS_TOKEN_EXPIRE_MINUTES = 60
def get_jwk_from_secret(secret: str):
"""Convert a secret string into a JWK object."""
secret_bytes = secret.encode('utf-8')
b64_secret = base64.urlsafe_b64encode(secret_bytes).rstrip(b'=').decode('utf-8')
return jwk_from_dict({"kty": "oct", "k": b64_secret})
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""Create a JWT access token with an expiration."""
to_encode = data.copy()
expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
to_encode.update({"exp": get_int_from_datetime(expire)})
signing_key = get_jwk_from_secret(SECRET_KEY)
return instance.encode(to_encode, signing_key, alg='HS256')
def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)) -> Dict:
"""Verify the JWT token from the Authorization header."""
if credentials is None:
return None
token = credentials.credentials
verifying_key = get_jwk_from_secret(SECRET_KEY)
try:
payload = instance.decode(token, verifying_key, do_time_check=True, algorithms='HS256')
return payload
except Exception:
raise HTTPException(status_code=401, detail="Invalid or expired token")
def get_token_dependency(config: Dict):
"""Return the token dependency if JWT is enabled, else a function that returns None."""
if config.get("security", {}).get("jwt_enabled", False):
return verify_token
else:
return lambda: None
class TokenRequest(BaseModel):
email: EmailStr