
Modify authentication system to gracefully handle cases where JWT is not enabled or token is missing. This includes: - Making HTTPBearer auto_error=False to prevent automatic 403 errors - Updating token dependency to return None when JWT is disabled - Fixing model deserialization in CrawlResult - Updating documentation links - Cleaning up imports BREAKING CHANGE: Authentication behavior changed to be more permissive when JWT is disabled
55 lines
2.0 KiB
Python
55 lines
2.0 KiB
Python
import os
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Dict, Optional
|
|
from jwt import JWT, jwk_from_dict
|
|
from jwt.utils import get_int_from_datetime
|
|
from fastapi import Depends, HTTPException
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
from pydantic import EmailStr
|
|
from pydantic.main import BaseModel
|
|
import base64
|
|
|
|
instance = JWT()
|
|
security = HTTPBearer(auto_error=False)
|
|
SECRET_KEY = os.environ.get("SECRET_KEY", "mysecret")
|
|
ACCESS_TOKEN_EXPIRE_MINUTES = 60
|
|
|
|
def get_jwk_from_secret(secret: str):
|
|
"""Convert a secret string into a JWK object."""
|
|
secret_bytes = secret.encode('utf-8')
|
|
b64_secret = base64.urlsafe_b64encode(secret_bytes).rstrip(b'=').decode('utf-8')
|
|
return jwk_from_dict({"kty": "oct", "k": b64_secret})
|
|
|
|
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
|
|
"""Create a JWT access token with an expiration."""
|
|
to_encode = data.copy()
|
|
expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
|
|
to_encode.update({"exp": get_int_from_datetime(expire)})
|
|
signing_key = get_jwk_from_secret(SECRET_KEY)
|
|
return instance.encode(to_encode, signing_key, alg='HS256')
|
|
|
|
def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)) -> Dict:
|
|
"""Verify the JWT token from the Authorization header."""
|
|
|
|
if credentials is None:
|
|
return None
|
|
token = credentials.credentials
|
|
verifying_key = get_jwk_from_secret(SECRET_KEY)
|
|
try:
|
|
payload = instance.decode(token, verifying_key, do_time_check=True, algorithms='HS256')
|
|
return payload
|
|
except Exception:
|
|
raise HTTPException(status_code=401, detail="Invalid or expired token")
|
|
|
|
|
|
def get_token_dependency(config: Dict):
|
|
"""Return the token dependency if JWT is enabled, else a function that returns None."""
|
|
|
|
if config.get("security", {}).get("jwt_enabled", False):
|
|
return verify_token
|
|
else:
|
|
return lambda: None
|
|
|
|
|
|
class TokenRequest(BaseModel):
|
|
email: EmailStr |