mirror of
https://github.com/unclecode/crawl4ai.git
synced 2026-01-01 04:44:45 +00:00
Add JWT token-based authentication to Docker server and client. Refactor server architecture for better code organization and error handling. Move Dockerfile to root deploy directory and update configuration. Add comprehensive documentation and examples. BREAKING CHANGE: Docker server now requires authentication by default. Endpoints require JWT tokens when security.jwt_enabled is true in config.
46 lines
1.9 KiB
Python
46 lines
1.9 KiB
Python
import os
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Dict, Optional
|
|
from jwt import JWT, jwk_from_dict
|
|
from jwt.utils import get_int_from_datetime
|
|
from fastapi import Depends, HTTPException
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
from pydantic import EmailStr
|
|
from pydantic.main import BaseModel
|
|
import base64
|
|
|
|
instance = JWT()
|
|
security = HTTPBearer()
|
|
SECRET_KEY = os.environ.get("SECRET_KEY", "mysecret")
|
|
ACCESS_TOKEN_EXPIRE_MINUTES = 60
|
|
|
|
def get_jwk_from_secret(secret: str):
|
|
"""Convert a secret string into a JWK object."""
|
|
secret_bytes = secret.encode('utf-8')
|
|
b64_secret = base64.urlsafe_b64encode(secret_bytes).rstrip(b'=').decode('utf-8')
|
|
return jwk_from_dict({"kty": "oct", "k": b64_secret})
|
|
|
|
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
|
|
"""Create a JWT access token with an expiration."""
|
|
to_encode = data.copy()
|
|
expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
|
|
to_encode.update({"exp": get_int_from_datetime(expire)})
|
|
signing_key = get_jwk_from_secret(SECRET_KEY)
|
|
return instance.encode(to_encode, signing_key, alg='HS256')
|
|
|
|
def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)) -> Dict:
|
|
"""Verify the JWT token from the Authorization header."""
|
|
token = credentials.credentials
|
|
verifying_key = get_jwk_from_secret(SECRET_KEY)
|
|
try:
|
|
payload = instance.decode(token, verifying_key, do_time_check=True, algorithms='HS256')
|
|
return payload
|
|
except Exception:
|
|
raise HTTPException(status_code=401, detail="Invalid or expired token")
|
|
|
|
def get_token_dependency(config: Dict):
|
|
"""Return the token dependency if JWT is enabled, else None."""
|
|
return verify_token if config.get("security", {}).get("jwt_enabled", False) else None
|
|
|
|
class TokenRequest(BaseModel):
|
|
email: EmailStr |