Feat: add admin CLI and admin service (#10186)

### What problem does this PR solve?

Introduce new feature: RAGFlow system admin service and CLI

### Introduction

Admin Service is a dedicated management component designed to monitor,
maintain, and administrate the RAGFlow system. It provides comprehensive
tools for ensuring system stability, performing operational tasks, and
managing users and permissions efficiently.

The service offers monitoring of critical components, including the
RAGFlow server, Task Executor processes, and dependent services such as
MySQL, Infinity / Elasticsearch, Redis, and MinIO. It automatically
checks their health status, resource usage, and uptime, and performs
restarts in case of failures to minimize downtime.

For user and system management, it supports listing, creating,
modifying, and deleting users and their associated resources like
knowledge bases and Agents.

Built with scalability and reliability in mind, the Admin Service
ensures smooth system operation and simplifies maintenance workflows.

It consists of a server-side Service and a command-line client (CLI),
both implemented in Python. User commands are parsed using the Lark
parsing toolkit.

- **Admin Service**: A backend service that interfaces with the RAGFlow
system to execute administrative operations and monitor its status.
- **Admin CLI**: A command-line interface that allows users to connect
to the Admin Service and issue commands for system management.

### Starting the Admin Service

1. Before start Admin Service, please make sure RAGFlow system is
already started.

2.  Run the service script:
    ```bash
    python admin/admin_server.py
    ```
The service will start and listen for incoming connections from the CLI
on the configured port.

### Using the Admin CLI

1.  Ensure the Admin Service is running.
2.  Launch the CLI client:
    ```bash
    python admin/admin_client.py -h 0.0.0.0 -p 9381
## Supported Commands
Commands are case-insensitive and must be terminated with a semicolon
(`;`).
### Service Management Commands
-  [x] `LIST SERVICES;`
    -   Lists all available services within the RAGFlow system.
-  [ ] `SHOW SERVICE <id>;`
- Shows detailed status information for the service identified by
`<id>`.
-  [ ] `STARTUP SERVICE <id>;`
    -   Attempts to start the service identified by `<id>`.
-  [ ] `SHUTDOWN SERVICE <id>;`
- Attempts to gracefully shut down the service identified by `<id>`.
-  [ ] `RESTART SERVICE <id>;`
    -   Attempts to restart the service identified by `<id>`.
### User Management Commands
-  [x] `LIST USERS;`
    -   Lists all users known to the system.
-  [ ] `SHOW USER '<username>';`
- Shows details and permissions for the specified user. The username
must be enclosed in single or double quotes.
-  [ ] `DROP USER '<username>';`
    -   Removes the specified user from the system. Use with caution.
-  [ ] `ALTER USER PASSWORD '<username>' '<new_password>';`
    -   Changes the password for the specified user.
### Data and Agent Commands
-  [ ] `LIST DATASETS OF '<username>';`
    -   Lists the datasets associated with the specified user.
-  [ ] `LIST AGENTS OF '<username>';`
    -   Lists the agents associated with the specified user.
### Meta-Commands
Meta-commands are prefixed with a backslash (`\`).
-   `\?` or `\help`
    -   Shows help information for the available commands.
-   `\q` or `\quit`
    -   Exits the CLI application.
## Examples
```commandline
admin> list users;
+-------------------------------+------------------------+-----------+-------------+
| create_date                   | email                  | is_active | nickname    |
+-------------------------------+------------------------+-----------+-------------+
| Fri, 22 Nov 2024 16:03:41 GMT | jeffery@infiniflow.org | 1         | Jeffery     |
| Fri, 22 Nov 2024 16:10:55 GMT | aya@infiniflow.org     | 1         | Waterdancer |
+-------------------------------+------------------------+-----------+-------------+
admin> list services;
+-------------------------------------------------------------------------------------------+-----------+----+---------------+-------+----------------+
| extra                                                                                     | host      | id | name          | port  | service_type   |
+-------------------------------------------------------------------------------------------+-----------+----+---------------+-------+----------------+
| {}                                                                                        | 0.0.0.0   | 0  | ragflow_0     | 9380  | ragflow_server |
| {'meta_type': 'mysql', 'password': 'infini_rag_flow', 'username': 'root'}                 | localhost | 1  | mysql         | 5455  | meta_data      |
| {'password': 'infini_rag_flow', 'store_type': 'minio', 'user': 'rag_flow'}                | localhost | 2  | minio         | 9000  | file_store     |
| {'password': 'infini_rag_flow', 'retrieval_type': 'elasticsearch', 'username': 'elastic'} | localhost | 3  | elasticsearch | 1200  | retrieval      |
| {'db_name': 'default_db', 'retrieval_type': 'infinity'}                                   | localhost | 4  | infinity      | 23817 | retrieval      |
| {'database': 1, 'mq_type': 'redis', 'password': 'infini_rag_flow'}                        | localhost | 5  | redis         | 6379  | message_queue  |
+-------------------------------------------------------------------------------------------+-----------+----+---------------+-------+----------------+
```

### Type of change

- [x] New Feature (non-breaking change which adds functionality)

Signed-off-by: jinhai <haijin.chn@gmail.com>
This commit is contained in:
Jin Hai 2025-09-22 10:37:49 +08:00 committed by GitHub
parent 45f9f428db
commit d11b1628a1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 1210 additions and 6 deletions

101
admin/README.md Normal file
View File

@ -0,0 +1,101 @@
# RAGFlow Admin Service & CLI
### Introduction
Admin Service is a dedicated management component designed to monitor, maintain, and administrate the RAGFlow system. It provides comprehensive tools for ensuring system stability, performing operational tasks, and managing users and permissions efficiently.
The service offers real-time monitoring of critical components, including the RAGFlow server, Task Executor processes, and dependent services such as MySQL, Elasticsearch, Redis, and MinIO. It automatically checks their health status, resource usage, and uptime, and performs restarts in case of failures to minimize downtime.
For user and system management, it supports listing, creating, modifying, and deleting users and their associated resources like knowledge bases and Agents.
Built with scalability and reliability in mind, the Admin Service ensures smooth system operation and simplifies maintenance workflows.
It consists of a server-side Service and a command-line client (CLI), both implemented in Python. User commands are parsed using the Lark parsing toolkit.
- **Admin Service**: A backend service that interfaces with the RAGFlow system to execute administrative operations and monitor its status.
- **Admin CLI**: A command-line interface that allows users to connect to the Admin Service and issue commands for system management.
### Starting the Admin Service
1. Before start Admin Service, please make sure RAGFlow system is already started.
2. Run the service script:
```bash
python admin/admin_server.py
```
The service will start and listen for incoming connections from the CLI on the configured port.
### Using the Admin CLI
1. Ensure the Admin Service is running.
2. Launch the CLI client:
```bash
python admin/admin_client.py -h 0.0.0.0 -p 9381
## Supported Commands
Commands are case-insensitive and must be terminated with a semicolon (`;`).
### Service Management Commands
- `LIST SERVICES;`
- Lists all available services within the RAGFlow system.
- `SHOW SERVICE <id>;`
- Shows detailed status information for the service identified by `<id>`.
- `STARTUP SERVICE <id>;`
- Attempts to start the service identified by `<id>`.
- `SHUTDOWN SERVICE <id>;`
- Attempts to gracefully shut down the service identified by `<id>`.
- `RESTART SERVICE <id>;`
- Attempts to restart the service identified by `<id>`.
### User Management Commands
- `LIST USERS;`
- Lists all users known to the system.
- `SHOW USER '<username>';`
- Shows details and permissions for the specified user. The username must be enclosed in single or double quotes.
- `DROP USER '<username>';`
- Removes the specified user from the system. Use with caution.
- `ALTER USER PASSWORD '<username>' '<new_password>';`
- Changes the password for the specified user.
### Data and Agent Commands
- `LIST DATASETS OF '<username>';`
- Lists the datasets associated with the specified user.
- `LIST AGENTS OF '<username>';`
- Lists the agents associated with the specified user.
### Meta-Commands
Meta-commands are prefixed with a backslash (`\`).
- `\?` or `\help`
- Shows help information for the available commands.
- `\q` or `\quit`
- Exits the CLI application.
## Examples
```commandline
admin> list users;
+-------------------------------+------------------------+-----------+-------------+
| create_date | email | is_active | nickname |
+-------------------------------+------------------------+-----------+-------------+
| Fri, 22 Nov 2024 16:03:41 GMT | jeffery@infiniflow.org | 1 | Jeffery |
| Fri, 22 Nov 2024 16:10:55 GMT | aya@infiniflow.org | 1 | Waterdancer |
+-------------------------------+------------------------+-----------+-------------+
admin> list services;
+-------------------------------------------------------------------------------------------+-----------+----+---------------+-------+----------------+
| extra | host | id | name | port | service_type |
+-------------------------------------------------------------------------------------------+-----------+----+---------------+-------+----------------+
| {} | 0.0.0.0 | 0 | ragflow_0 | 9380 | ragflow_server |
| {'meta_type': 'mysql', 'password': 'infini_rag_flow', 'username': 'root'} | localhost | 1 | mysql | 5455 | meta_data |
| {'password': 'infini_rag_flow', 'store_type': 'minio', 'user': 'rag_flow'} | localhost | 2 | minio | 9000 | file_store |
| {'password': 'infini_rag_flow', 'retrieval_type': 'elasticsearch', 'username': 'elastic'} | localhost | 3 | elasticsearch | 1200 | retrieval |
| {'db_name': 'default_db', 'retrieval_type': 'infinity'} | localhost | 4 | infinity | 23817 | retrieval |
| {'database': 1, 'mq_type': 'redis', 'password': 'infini_rag_flow'} | localhost | 5 | redis | 6379 | message_queue |
+-------------------------------------------------------------------------------------------+-----------+----+---------------+-------+----------------+
```

471
admin/admin_client.py Normal file
View File

@ -0,0 +1,471 @@
import argparse
import base64
from typing import Dict, List, Any
from lark import Lark, Transformer, Tree
import requests
from requests.auth import HTTPBasicAuth
GRAMMAR = r"""
start: command
command: sql_command | meta_command
sql_command: list_services
| show_service
| startup_service
| shutdown_service
| restart_service
| list_users
| show_user
| drop_user
| alter_user
| list_datasets
| list_agents
// meta command definition
meta_command: "\\" meta_command_name [meta_args]
meta_command_name: /[a-zA-Z?]+/
meta_args: (meta_arg)+
meta_arg: /[^\\s"']+/ | quoted_string
// command definition
LIST: "LIST"i
SERVICES: "SERVICES"i
SHOW: "SHOW"i
SERVICE: "SERVICE"i
SHUTDOWN: "SHUTDOWN"i
STARTUP: "STARTUP"i
RESTART: "RESTART"i
USERS: "USERS"i
DROP: "DROP"i
USER: "USER"i
ALTER: "ALTER"i
PASSWORD: "PASSWORD"i
DATASETS: "DATASETS"i
OF: "OF"i
AGENTS: "AGENTS"i
list_services: LIST SERVICES ";"
show_service: SHOW SERVICE NUMBER ";"
startup_service: STARTUP SERVICE NUMBER ";"
shutdown_service: SHUTDOWN SERVICE NUMBER ";"
restart_service: RESTART SERVICE NUMBER ";"
list_users: LIST USERS ";"
drop_user: DROP USER quoted_string ";"
alter_user: ALTER USER PASSWORD quoted_string quoted_string ";"
show_user: SHOW USER quoted_string ";"
list_datasets: LIST DATASETS OF quoted_string ";"
list_agents: LIST AGENTS OF quoted_string ";"
identifier: WORD
quoted_string: QUOTED_STRING
QUOTED_STRING: /'[^']+'/ | /"[^"]+"/
WORD: /[a-zA-Z0-9_\-\.]+/
NUMBER: /[0-9]+/
%import common.WS
%ignore WS
"""
class AdminTransformer(Transformer):
def start(self, items):
return items[0]
def command(self, items):
return items[0]
def list_services(self, items):
result = {'type': 'list_services'}
return result
def show_service(self, items):
service_id = int(items[2])
return {"type": "show_service", "number": service_id}
def startup_service(self, items):
service_id = int(items[2])
return {"type": "startup_service", "number": service_id}
def shutdown_service(self, items):
service_id = int(items[2])
return {"type": "shutdown_service", "number": service_id}
def restart_service(self, items):
service_id = int(items[2])
return {"type": "restart_service", "number": service_id}
def list_users(self, items):
return {"type": "list_users"}
def show_user(self, items):
user_name = items[2]
return {"type": "show_user", "username": user_name}
def drop_user(self, items):
user_name = items[2]
return {"type": "drop_user", "username": user_name}
def alter_user(self, items):
user_name = items[3]
new_password = items[4]
return {"type": "alter_user", "username": user_name, "password": new_password}
def list_datasets(self, items):
user_name = items[3]
return {"type": "list_datasets", "username": user_name}
def list_agents(self, items):
user_name = items[3]
return {"type": "list_agents", "username": user_name}
def meta_command(self, items):
command_name = str(items[0]).lower()
args = items[1:] if len(items) > 1 else []
# handle quoted parameter
parsed_args = []
for arg in args:
if hasattr(arg, 'value'):
parsed_args.append(arg.value)
else:
parsed_args.append(str(arg))
return {'type': 'meta', 'command': command_name, 'args': parsed_args}
def meta_command_name(self, items):
return items[0]
def meta_args(self, items):
return items
def encode_to_base64(input_string):
base64_encoded = base64.b64encode(input_string.encode('utf-8'))
return base64_encoded.decode('utf-8')
class AdminCommandParser:
def __init__(self):
self.parser = Lark(GRAMMAR, start='start', parser='lalr', transformer=AdminTransformer())
self.command_history = []
def parse_command(self, command_str: str) -> Dict[str, Any]:
if not command_str.strip():
return {'type': 'empty'}
self.command_history.append(command_str)
try:
result = self.parser.parse(command_str)
return result
except Exception as e:
return {'type': 'error', 'message': f'Parse error: {str(e)}'}
class AdminCLI:
def __init__(self):
self.parser = AdminCommandParser()
self.is_interactive = False
self.admin_account = "admin@ragflow.io"
self.admin_password: str = "admin"
self.host: str = ""
self.port: int = 0
def verify_admin(self, args):
conn_info = self._parse_connection_args(args)
if 'error' in conn_info:
print(f"Error: {conn_info['error']}")
return
self.host = conn_info['host']
self.port = conn_info['port']
print(f"Attempt to access ip: {self.host}, port: {self.port}")
url = f'http://{self.host}:{self.port}/api/v1/admin/auth'
try_count = 0
while True:
try_count += 1
if try_count > 3:
return False
admin_passwd = input(f"password for {self.admin_account}: ").strip()
try:
self.admin_password = encode_to_base64(admin_passwd)
response = requests.get(url, auth=HTTPBasicAuth(self.admin_account, self.admin_password))
if response.status_code == 200:
res_json = response.json()
error_code = res_json.get('code', -1)
if error_code == 0:
print("Authentication successful.")
return True
else:
error_message = res_json.get('message', 'Unknown error')
print(f"Authentication failed: {error_message}, try again")
continue
else:
print(f"Bad responsestatus: {response.status_code}, try again")
except Exception:
print(f"Can't access {self.host}, port: {self.port}")
def _print_table_simple(self, data):
if not data:
print("No data to print")
return
columns = list(data[0].keys())
col_widths = {}
for col in columns:
max_width = len(str(col))
for item in data:
value_len = len(str(item.get(col, '')))
if value_len > max_width:
max_width = value_len
col_widths[col] = max(2, max_width)
# Generate delimiter
separator = "+" + "+".join(["-" * (col_widths[col] + 2) for col in columns]) + "+"
# Print header
print(separator)
header = "|" + "|".join([f" {col:<{col_widths[col]}} " for col in columns]) + "|"
print(header)
print(separator)
# Print data
for item in data:
row = "|"
for col in columns:
value = str(item.get(col, ''))
if len(value) > col_widths[col]:
value = value[:col_widths[col] - 3] + "..."
row += f" {value:<{col_widths[col]}} |"
print(row)
print(separator)
def run_interactive(self):
self.is_interactive = True
print("RAGFlow Admin command line interface - Type '\\?' for help, '\\q' to quit")
while True:
try:
command = input("admin> ").strip()
if not command:
continue
print(f"command: {command}")
result = self.parser.parse_command(command)
self.execute_command(result)
if isinstance(result, Tree):
continue
if result.get('type') == 'meta' and result.get('command') in ['q', 'quit', 'exit']:
break
except KeyboardInterrupt:
print("\nUse '\\q' to quit")
except EOFError:
print("\nGoodbye!")
break
def run_single_command(self, args):
conn_info = self._parse_connection_args(args)
if 'error' in conn_info:
print(f"Error: {conn_info['error']}")
return
def _parse_connection_args(self, args: List[str]) -> Dict[str, Any]:
parser = argparse.ArgumentParser(description='Admin CLI Client', add_help=False)
parser.add_argument('-h', '--host', default='localhost', help='Admin service host')
parser.add_argument('-p', '--port', type=int, default=8080, help='Admin service port')
try:
parsed_args, remaining_args = parser.parse_known_args(args)
return {
'host': parsed_args.host,
'port': parsed_args.port,
}
except SystemExit:
return {'error': 'Invalid connection arguments'}
def execute_command(self, parsed_command: Dict[str, Any]):
command_dict: dict
if isinstance(parsed_command, Tree):
command_dict = parsed_command.children[0]
else:
if parsed_command['type'] == 'error':
print(f"Error: {parsed_command['message']}")
return
else:
command_dict = parsed_command
# print(f"Parsed command: {command_dict}")
command_type = command_dict['type']
match command_type:
case 'list_services':
self._handle_list_services(command_dict)
case 'show_service':
self._handle_show_service(command_dict)
case 'restart_service':
self._handle_restart_service(command_dict)
case 'shutdown_service':
self._handle_shutdown_service(command_dict)
case 'startup_service':
self._handle_startup_service(command_dict)
case 'list_users':
self._handle_list_users(command_dict)
case 'show_user':
self._handle_show_user(command_dict)
case 'drop_user':
self._handle_drop_user(command_dict)
case 'alter_user':
self._handle_alter_user(command_dict)
case 'list_datasets':
self._handle_list_datasets(command_dict)
case 'list_agents':
self._handle_list_agents(command_dict)
case 'meta':
self._handle_meta_command(command_dict)
case _:
print(f"Command '{command_type}' would be executed with API")
def _handle_list_services(self, command):
print("Listing all services")
url = f'http://{self.host}:{self.port}/api/v1/admin/services'
response = requests.get(url, auth=HTTPBasicAuth(self.admin_account, self.admin_password))
res_json = dict
if response.status_code == 200:
res_json = response.json()
self._print_table_simple(res_json['data'])
else:
print(f"Fail to get all users, code: {res_json['code']}, message: {res_json['message']}")
def _handle_show_service(self, command):
service_id: int = command['number']
print(f"Showing service: {service_id}")
def _handle_restart_service(self, command):
service_id: int = command['number']
print(f"Restart service {service_id}")
def _handle_shutdown_service(self, command):
service_id: int = command['number']
print(f"Shutdown service {service_id}")
def _handle_startup_service(self, command):
service_id: int = command['number']
print(f"Startup service {service_id}")
def _handle_list_users(self, command):
print("Listing all users")
url = f'http://{self.host}:{self.port}/api/v1/admin/users'
response = requests.get(url, auth=HTTPBasicAuth(self.admin_account, self.admin_password))
res_json = dict
if response.status_code == 200:
res_json = response.json()
self._print_table_simple(res_json['data'])
else:
print(f"Fail to get all users, code: {res_json['code']}, message: {res_json['message']}")
def _handle_show_user(self, command):
username_tree: Tree = command['username']
username: str = username_tree.children[0].strip("'\"")
print(f"Showing user: {username}")
def _handle_drop_user(self, command):
username_tree: Tree = command['username']
username: str = username_tree.children[0].strip("'\"")
print(f"Drop user: {username}")
def _handle_alter_user(self, command):
username_tree: Tree = command['username']
username: str = username_tree.children[0].strip("'\"")
password_tree: Tree = command['password']
password: str = password_tree.children[0].strip("'\"")
print(f"Alter user: {username}, password: {password}")
def _handle_list_datasets(self, command):
username_tree: Tree = command['username']
username: str = username_tree.children[0].strip("'\"")
print(f"Listing all datasets of user: {username}")
def _handle_list_agents(self, command):
username_tree: Tree = command['username']
username: str = username_tree.children[0].strip("'\"")
print(f"Listing all agents of user: {username}")
def _handle_meta_command(self, command):
meta_command = command['command']
args = command.get('args', [])
if meta_command in ['?', 'h', 'help']:
self.show_help()
elif meta_command in ['q', 'quit', 'exit']:
print("Goodbye!")
else:
print(f"Meta command '{meta_command}' with args {args}")
def show_help(self):
"""Help info"""
help_text = """
Commands:
LIST SERVICES
SHOW SERVICE <service>
STARTUP SERVICE <service>
SHUTDOWN SERVICE <service>
RESTART SERVICE <service>
LIST USERS
SHOW USER <user>
DROP USER <user>
CREATE USER <user> <password>
ALTER USER PASSWORD <user> <new_password>
LIST DATASETS OF <user>
LIST AGENTS OF <user>
Meta Commands:
\\?, \\h, \\help Show this help
\\q, \\quit, \\exit Quit the CLI
"""
print(help_text)
def main():
import sys
cli = AdminCLI()
if len(sys.argv) == 1 or (len(sys.argv) > 1 and sys.argv[1] == '-'):
print(r"""
____ ___ ______________ ___ __ _
/ __ \/ | / ____/ ____/ /___ _ __ / | ____/ /___ ___ (_)___
/ /_/ / /| |/ / __/ /_ / / __ \ | /| / / / /| |/ __ / __ `__ \/ / __ \
/ _, _/ ___ / /_/ / __/ / / /_/ / |/ |/ / / ___ / /_/ / / / / / / / / / /
/_/ |_/_/ |_\____/_/ /_/\____/|__/|__/ /_/ |_\__,_/_/ /_/ /_/_/_/ /_/
""")
if cli.verify_admin(sys.argv):
cli.run_interactive()
else:
if cli.verify_admin(sys.argv):
cli.run_interactive()
# cli.run_single_command(sys.argv[1:])
if __name__ == '__main__':
main()

46
admin/admin_server.py Normal file
View File

@ -0,0 +1,46 @@
import os
import signal
import logging
import time
import threading
import traceback
from werkzeug.serving import run_simple
from flask import Flask
from routes import admin_bp
from api.utils.log_utils import init_root_logger
from api.constants import SERVICE_CONF
from config import load_configurations, SERVICE_CONFIGS
stop_event = threading.Event()
if __name__ == '__main__':
init_root_logger("admin_service")
logging.info(r"""
____ ___ ______________ ___ __ _
/ __ \/ | / ____/ ____/ /___ _ __ / | ____/ /___ ___ (_)___
/ /_/ / /| |/ / __/ /_ / / __ \ | /| / / / /| |/ __ / __ `__ \/ / __ \
/ _, _/ ___ / /_/ / __/ / / /_/ / |/ |/ / / ___ / /_/ / / / / / / / / / /
/_/ |_/_/ |_\____/_/ /_/\____/|__/|__/ /_/ |_\__,_/_/ /_/ /_/_/_/ /_/
""")
app = Flask(__name__)
app.register_blueprint(admin_bp)
SERVICE_CONFIGS.configs = load_configurations(SERVICE_CONF)
try:
logging.info("RAGFlow Admin service start...")
run_simple(
hostname="0.0.0.0",
port=9381,
application=app,
threaded=True,
use_reloader=True,
use_debugger=True,
)
except Exception:
traceback.print_exc()
stop_event.set()
time.sleep(1)
os.kill(os.getpid(), signal.SIGKILL)

57
admin/auth.py Normal file
View File

@ -0,0 +1,57 @@
import logging
import uuid
from functools import wraps
from flask import request, jsonify
from exceptions import AdminException
from api.db.init_data import encode_to_base64
from api.db.services import UserService
def check_admin(username: str, password: str):
users = UserService.query(email=username)
if not users:
logging.info(f"Username: {username} is not registered!")
user_info = {
"id": uuid.uuid1().hex,
"password": encode_to_base64("admin"),
"nickname": "admin",
"is_superuser": True,
"email": "admin@ragflow.io",
"creator": "system",
"status": "1",
}
if not UserService.save(**user_info):
raise AdminException("Can't init admin.", 500)
user = UserService.query_user(username, password)
if user:
return True
else:
return False
def login_verify(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = request.authorization
if not auth or 'username' not in auth.parameters or 'password' not in auth.parameters:
return jsonify({
"code": 401,
"message": "Authentication required",
"data": None
}), 200
username = auth.parameters['username']
password = auth.parameters['password']
# TODO: to check the username and password from DB
if check_admin(username, password) is False:
return jsonify({
"code": 403,
"message": "Access denied",
"data": None
}), 200
return f(*args, **kwargs)
return decorated

280
admin/config.py Normal file
View File

@ -0,0 +1,280 @@
import logging
import threading
from enum import Enum
from pydantic import BaseModel
from typing import Any
from api.utils import read_config
from urllib.parse import urlparse
class ServiceConfigs:
def __init__(self):
self.configs = []
self.lock = threading.Lock()
SERVICE_CONFIGS = ServiceConfigs
class ServiceType(Enum):
METADATA = "metadata"
RETRIEVAL = "retrieval"
MESSAGE_QUEUE = "message_queue"
RAGFLOW_SERVER = "ragflow_server"
TASK_EXECUTOR = "task_executor"
FILE_STORE = "file_store"
class BaseConfig(BaseModel):
id: int
name: str
host: str
port: int
service_type: str
def to_dict(self) -> dict[str, Any]:
return {'id': self.id, 'name': self.name, 'host': self.host, 'port': self.port, 'service_type': self.service_type}
class MetaConfig(BaseConfig):
meta_type: str
def to_dict(self) -> dict[str, Any]:
result = super().to_dict()
if 'extra' not in result:
result['extra'] = dict()
extra_dict = result['extra'].copy()
extra_dict['meta_type'] = self.meta_type
result['extra'] = extra_dict
return result
class MySQLConfig(MetaConfig):
username: str
password: str
def to_dict(self) -> dict[str, Any]:
result = super().to_dict()
if 'extra' not in result:
result['extra'] = dict()
extra_dict = result['extra'].copy()
extra_dict['username'] = self.username
extra_dict['password'] = self.password
result['extra'] = extra_dict
return result
class PostgresConfig(MetaConfig):
def to_dict(self) -> dict[str, Any]:
result = super().to_dict()
if 'extra' not in result:
result['extra'] = dict()
return result
class RetrievalConfig(BaseConfig):
retrieval_type: str
def to_dict(self) -> dict[str, Any]:
result = super().to_dict()
if 'extra' not in result:
result['extra'] = dict()
extra_dict = result['extra'].copy()
extra_dict['retrieval_type'] = self.retrieval_type
result['extra'] = extra_dict
return result
class InfinityConfig(RetrievalConfig):
db_name: str
def to_dict(self) -> dict[str, Any]:
result = super().to_dict()
if 'extra' not in result:
result['extra'] = dict()
extra_dict = result['extra'].copy()
extra_dict['db_name'] = self.db_name
result['extra'] = extra_dict
return result
class ElasticsearchConfig(RetrievalConfig):
username: str
password: str
def to_dict(self) -> dict[str, Any]:
result = super().to_dict()
if 'extra' not in result:
result['extra'] = dict()
extra_dict = result['extra'].copy()
extra_dict['username'] = self.username
extra_dict['password'] = self.password
result['extra'] = extra_dict
return result
class MessageQueueConfig(BaseConfig):
mq_type: str
def to_dict(self) -> dict[str, Any]:
result = super().to_dict()
if 'extra' not in result:
result['extra'] = dict()
extra_dict = result['extra'].copy()
extra_dict['mq_type'] = self.mq_type
result['extra'] = extra_dict
return result
class RedisConfig(MessageQueueConfig):
database: int
password: str
def to_dict(self) -> dict[str, Any]:
result = super().to_dict()
if 'extra' not in result:
result['extra'] = dict()
extra_dict = result['extra'].copy()
extra_dict['database'] = self.database
extra_dict['password'] = self.password
result['extra'] = extra_dict
return result
class RabbitMQConfig(MessageQueueConfig):
def to_dict(self) -> dict[str, Any]:
result = super().to_dict()
if 'extra' not in result:
result['extra'] = dict()
return result
class RAGFlowServerConfig(BaseConfig):
def to_dict(self) -> dict[str, Any]:
result = super().to_dict()
if 'extra' not in result:
result['extra'] = dict()
return result
class TaskExecutorConfig(BaseConfig):
def to_dict(self) -> dict[str, Any]:
result = super().to_dict()
if 'extra' not in result:
result['extra'] = dict()
return result
class FileStoreConfig(BaseConfig):
store_type: str
def to_dict(self) -> dict[str, Any]:
result = super().to_dict()
if 'extra' not in result:
result['extra'] = dict()
extra_dict = result['extra'].copy()
extra_dict['store_type'] = self.store_type
result['extra'] = extra_dict
return result
class MinioConfig(FileStoreConfig):
user: str
password: str
def to_dict(self) -> dict[str, Any]:
result = super().to_dict()
if 'extra' not in result:
result['extra'] = dict()
extra_dict = result['extra'].copy()
extra_dict['user'] = self.user
extra_dict['password'] = self.password
result['extra'] = extra_dict
return result
def load_configurations(config_path: str) -> list[BaseConfig]:
raw_configs = read_config(config_path)
configurations = []
ragflow_count = 0
id_count = 0
for k, v in raw_configs.items():
match (k):
case "ragflow":
name: str = f'ragflow_{ragflow_count}'
host: str = v['host']
http_port: int = v['http_port']
config = RAGFlowServerConfig(id=id_count, name=name, host=host, port=http_port, service_type="ragflow_server")
configurations.append(config)
id_count += 1
case "es":
name: str = 'elasticsearch'
url = v['hosts']
parsed = urlparse(url)
host: str = parsed.hostname
port: int = parsed.port
username: str = v.get('username')
password: str = v.get('password')
config = ElasticsearchConfig(id=id_count, name=name, host=host, port=port, service_type="retrieval",
retrieval_type="elasticsearch",
username=username, password=password)
configurations.append(config)
id_count += 1
case "infinity":
name: str = 'infinity'
url = v['uri']
parts = url.split(':', 1)
host = parts[0]
port = int(parts[1])
database: str = v.get('db_name', 'default_db')
config = InfinityConfig(id=id_count, name=name, host=host, port=port, service_type="retrieval", retrieval_type="infinity",
db_name=database)
configurations.append(config)
id_count += 1
case "minio":
name: str = 'minio'
url = v['host']
parts = url.split(':', 1)
host = parts[0]
port = int(parts[1])
user = v.get('user')
password = v.get('password')
config = MinioConfig(id=id_count, name=name, host=host, port=port, user=user, password=password, service_type="file_store",
store_type="minio")
configurations.append(config)
id_count += 1
case "redis":
name: str = 'redis'
url = v['host']
parts = url.split(':', 1)
host = parts[0]
port = int(parts[1])
password = v.get('password')
db: int = v.get('db')
config = RedisConfig(id=id_count, name=name, host=host, port=port, password=password, database=db,
service_type="message_queue", mq_type="redis")
configurations.append(config)
id_count += 1
case "mysql":
name: str = 'mysql'
host: str = v.get('host')
port: int = v.get('port')
username = v.get('user')
password = v.get('password')
config = MySQLConfig(id=id_count, name=name, host=host, port=port, username=username, password=password,
service_type="meta_data", meta_type="mysql")
configurations.append(config)
id_count += 1
case "admin":
pass
case _:
logging.warning(f"Unknown configuration key: {k}")
continue
return configurations

17
admin/exceptions.py Normal file
View File

@ -0,0 +1,17 @@
class AdminException(Exception):
def __init__(self, message, code=400):
super().__init__(message)
self.code = code
self.message = message
class UserNotFoundError(AdminException):
def __init__(self, username):
super().__init__(f"User '{username}' not found", 404)
class UserAlreadyExistsError(AdminException):
def __init__(self, username):
super().__init__(f"User '{username}' already exists", 409)
class CannotDeleteAdminError(AdminException):
def __init__(self):
super().__init__("Cannot delete admin account", 403)

0
admin/models.py Normal file
View File

15
admin/responses.py Normal file
View File

@ -0,0 +1,15 @@
from flask import jsonify
def success_response(data=None, message="Success", code = 0):
return jsonify({
"code": code,
"message": message,
"data": data
}), 200
def error_response(message="Error", code=-1, data=None):
return jsonify({
"code": code,
"message": message,
"data": data
}), 400

141
admin/routes.py Normal file
View File

@ -0,0 +1,141 @@
from flask import Blueprint, request
from auth import login_verify
from responses import success_response, error_response
from services import UserMgr, ServiceMgr
from exceptions import AdminException
admin_bp = Blueprint('admin', __name__, url_prefix='/api/v1/admin')
@admin_bp.route('/auth', methods=['GET'])
@login_verify
def auth_admin():
try:
return success_response(None, "Admin is authorized", 0)
except Exception as e:
return error_response(str(e), 500)
@admin_bp.route('/users', methods=['GET'])
@login_verify
def list_users():
try:
users = UserMgr.get_all_users()
return success_response(users, "Get all users", 0)
except Exception as e:
return error_response(str(e), 500)
@admin_bp.route('/users', methods=['POST'])
@login_verify
def create_user():
try:
data = request.get_json()
if not data or 'username' not in data or 'password' not in data:
return error_response("Username and password are required", 400)
username = data['username']
password = data['password']
role = data.get('role', 'user')
user = UserMgr.create_user(username, password, role)
return success_response(user, "User created successfully", 201)
except AdminException as e:
return error_response(e.message, e.code)
except Exception as e:
return error_response(str(e), 500)
@admin_bp.route('/users/<username>', methods=['DELETE'])
@login_verify
def delete_user(username):
try:
UserMgr.delete_user(username)
return success_response(None, "User and all data deleted successfully")
except AdminException as e:
return error_response(e.message, e.code)
except Exception as e:
return error_response(str(e), 500)
@admin_bp.route('/users/<username>/password', methods=['PUT'])
@login_verify
def change_password(username):
try:
data = request.get_json()
if not data or 'new_password' not in data:
return error_response("New password is required", 400)
new_password = data['new_password']
UserMgr.update_user_password(username, new_password)
return success_response(None, "Password updated successfully")
except AdminException as e:
return error_response(e.message, e.code)
except Exception as e:
return error_response(str(e), 500)
@admin_bp.route('/users/<username>', methods=['GET'])
@login_verify
def get_user_details(username):
try:
user_details = UserMgr.get_user_details(username)
return success_response(user_details)
except AdminException as e:
return error_response(e.message, e.code)
except Exception as e:
return error_response(str(e), 500)
@admin_bp.route('/services', methods=['GET'])
@login_verify
def get_services():
try:
services = ServiceMgr.get_all_services()
return success_response(services, "Get all services", 0)
except Exception as e:
return error_response(str(e), 500)
@admin_bp.route('/service_types/<service_type>', methods=['GET'])
@login_verify
def get_services_by_type(service_type_str):
try:
services = ServiceMgr.get_services_by_type(service_type_str)
return success_response(services)
except Exception as e:
return error_response(str(e), 500)
@admin_bp.route('/services/<service_id>', methods=['GET'])
@login_verify
def get_service(service_id):
try:
services = ServiceMgr.get_service_details(service_id)
return success_response(services)
except Exception as e:
return error_response(str(e), 500)
@admin_bp.route('/services/<service_id>', methods=['DELETE'])
@login_verify
def shutdown_service(service_id):
try:
services = ServiceMgr.shutdown_service(service_id)
return success_response(services)
except Exception as e:
return error_response(str(e), 500)
@admin_bp.route('/services/<service_id>', methods=['PUT'])
@login_verify
def restart_service(service_id):
try:
services = ServiceMgr.restart_service(service_id)
return success_response(services)
except Exception as e:
return error_response(str(e), 500)

54
admin/services.py Normal file
View File

@ -0,0 +1,54 @@
from api.db.services import UserService
from exceptions import AdminException
from config import SERVICE_CONFIGS
class UserMgr:
@staticmethod
def get_all_users():
users = UserService.get_all_users()
result = []
for user in users:
result.append({'email': user.email, 'nickname': user.nickname, 'create_date': user.create_date, 'is_active': user.is_active})
return result
@staticmethod
def get_user_details(username):
raise AdminException("get_user_details: not implemented")
@staticmethod
def create_user(username, password, role="user"):
raise AdminException("create_user: not implemented")
@staticmethod
def delete_user(username):
raise AdminException("delete_user: not implemented")
@staticmethod
def update_user_password(username, new_password):
raise AdminException("update_user_password: not implemented")
class ServiceMgr:
@staticmethod
def get_all_services():
result = []
configs = SERVICE_CONFIGS.configs
for config in configs:
result.append(config.to_dict())
return result
@staticmethod
def get_services_by_type(service_type_str: str):
raise AdminException("get_services_by_type: not implemented")
@staticmethod
def get_service_details(service_id: int):
raise AdminException("get_service_details: not implemented")
@staticmethod
def shutdown_service(service_id: int):
raise AdminException("shutdown_service: not implemented")
@staticmethod
def restart_service(service_id: int):
raise AdminException("restart_service: not implemented")

View File

@ -45,22 +45,22 @@ class UserService(CommonService):
def query(cls, cols=None, reverse=None, order_by=None, **kwargs):
if 'access_token' in kwargs:
access_token = kwargs['access_token']
# Reject empty, None, or whitespace-only access tokens
if not access_token or not str(access_token).strip():
logging.warning("UserService.query: Rejecting empty access_token query")
return cls.model.select().where(cls.model.id == "INVALID_EMPTY_TOKEN") # Returns empty result
# Reject tokens that are too short (should be UUID, 32+ chars)
if len(str(access_token).strip()) < 32:
logging.warning(f"UserService.query: Rejecting short access_token query: {len(str(access_token))} chars")
return cls.model.select().where(cls.model.id == "INVALID_SHORT_TOKEN") # Returns empty result
# Reject tokens that start with "INVALID_" (from logout)
if str(access_token).startswith("INVALID_"):
logging.warning("UserService.query: Rejecting invalidated access_token")
return cls.model.select().where(cls.model.id == "INVALID_LOGOUT_TOKEN") # Returns empty result
# Call parent query method for valid requests
return super().query(cols=cols, reverse=reverse, order_by=order_by, **kwargs)
@ -140,6 +140,12 @@ class UserService(CommonService):
cls.model.id == user_id,
cls.model.is_superuser == 1).count() > 0
@classmethod
@DB.connection_context()
def get_all_users(cls):
users = cls.model.select()
return list(users)
class TenantService(CommonService):
"""Service class for managing tenant-related database operations.

View File

@ -1,6 +1,9 @@
ragflow:
host: 0.0.0.0
http_port: 9380
admin:
host: 0.0.0.0
http_port: 9381
mysql:
name: 'rag_flow'
user: 'root'

View File

@ -1,6 +1,9 @@
ragflow:
host: ${RAGFLOW_HOST:-0.0.0.0}
http_port: 9380
admin:
host: ${RAGFLOW_HOST:-0.0.0.0}
http_port: 9381
mysql:
name: '${MYSQL_DBNAME:-rag_flow}'
user: '${MYSQL_USER:-root}'

View File

@ -131,6 +131,7 @@ dependencies = [
"python-calamine>=0.4.0",
"litellm>=1.74.15.post1",
"flask-mail>=0.10.0",
"lark>=1.2.2",
]
[project.optional-dependencies]

13
uv.lock generated
View File

@ -1,5 +1,4 @@
version = 1
revision = 1
requires-python = ">=3.10, <3.13"
resolution-markers = [
"python_full_version >= '3.12' and sys_platform == 'darwin'",
@ -2893,6 +2892,15 @@ wheels = [
{ url = "https://mirrors.aliyun.com/pypi/packages/92/b0/8f08df3f0fa584c4132937690c6dd33e0a116f963ecf2b35567f614e0ca7/langfuse-3.2.1-py3-none-any.whl", hash = "sha256:07a84e8c1eed6ac8e149bdda1431fd866e4aee741b66124316336fb2bc7e6a32" },
]
[[package]]
name = "lark"
version = "1.2.2"
source = { registry = "https://mirrors.aliyun.com/pypi/simple" }
sdist = { url = "https://mirrors.aliyun.com/pypi/packages/af/60/bc7622aefb2aee1c0b4ba23c1446d3e30225c8770b38d7aedbfb65ca9d5a/lark-1.2.2.tar.gz", hash = "sha256:ca807d0162cd16cef15a8feecb862d7319e7a09bdb13aef927968e45040fed80" }
wheels = [
{ url = "https://mirrors.aliyun.com/pypi/packages/2d/00/d90b10b962b4277f5e64a78b6609968859ff86889f5b898c1a778c06ec00/lark-1.2.2-py3-none-any.whl", hash = "sha256:c2276486b02f0f1b90be155f2c8ba4a8e194d42775786db622faccd652d8e80c" },
]
[[package]]
name = "litellm"
version = "1.75.5.post1"
@ -5320,6 +5328,7 @@ dependencies = [
{ name = "itsdangerous" },
{ name = "json-repair" },
{ name = "langfuse" },
{ name = "lark" },
{ name = "litellm" },
{ name = "markdown" },
{ name = "markdown-to-json" },
@ -5475,6 +5484,7 @@ requires-dist = [
{ name = "itsdangerous", specifier = "==2.1.2" },
{ name = "json-repair", specifier = "==0.35.0" },
{ name = "langfuse", specifier = ">=2.60.0" },
{ name = "lark", specifier = ">=1.2.2" },
{ name = "litellm", specifier = ">=1.74.15.post1" },
{ name = "markdown", specifier = "==3.6" },
{ name = "markdown-to-json", specifier = "==2.1.1" },
@ -5553,7 +5563,6 @@ requires-dist = [
{ name = "yfinance", specifier = "==0.2.65" },
{ name = "zhipuai", specifier = "==2.0.1" },
]
provides-extras = ["full"]
[package.metadata.requires-dev]
test = [