import argparse import base64 from Cryptodome.PublicKey import RSA from Cryptodome.Cipher import PKCS1_v1_5 as Cipher_pkcs1_v1_5 from typing import Dict, List, Any from lark import Lark, Transformer, Tree import requests from requests.auth import HTTPBasicAuth from api.common.base64 import encode_to_base64 GRAMMAR = r""" start: command command: sql_command | meta_command sql_command: list_services | show_service | startup_service | shutdown_service | restart_service | list_users | show_user | drop_user | alter_user | create_user | activate_user | list_datasets | list_agents // meta command definition meta_command: "\\" meta_command_name [meta_args] meta_command_name: /[a-zA-Z?]+/ meta_args: (meta_arg)+ meta_arg: /[^\\s"']+/ | quoted_string // command definition LIST: "LIST"i SERVICES: "SERVICES"i SHOW: "SHOW"i CREATE: "CREATE"i SERVICE: "SERVICE"i SHUTDOWN: "SHUTDOWN"i STARTUP: "STARTUP"i RESTART: "RESTART"i USERS: "USERS"i DROP: "DROP"i USER: "USER"i ALTER: "ALTER"i ACTIVE: "ACTIVE"i PASSWORD: "PASSWORD"i DATASETS: "DATASETS"i OF: "OF"i AGENTS: "AGENTS"i list_services: LIST SERVICES ";" show_service: SHOW SERVICE NUMBER ";" startup_service: STARTUP SERVICE NUMBER ";" shutdown_service: SHUTDOWN SERVICE NUMBER ";" restart_service: RESTART SERVICE NUMBER ";" list_users: LIST USERS ";" drop_user: DROP USER quoted_string ";" alter_user: ALTER USER PASSWORD quoted_string quoted_string ";" show_user: SHOW USER quoted_string ";" create_user: CREATE USER quoted_string quoted_string ";" activate_user: ALTER USER ACTIVE quoted_string status ";" list_datasets: LIST DATASETS OF quoted_string ";" list_agents: LIST AGENTS OF quoted_string ";" identifier: WORD quoted_string: QUOTED_STRING status: WORD QUOTED_STRING: /'[^']+'/ | /"[^"]+"/ WORD: /[a-zA-Z0-9_\-\.]+/ NUMBER: /[0-9]+/ %import common.WS %ignore WS """ class AdminTransformer(Transformer): def start(self, items): return items[0] def command(self, items): return items[0] def list_services(self, items): result = {'type': 'list_services'} return result def show_service(self, items): service_id = int(items[2]) return {"type": "show_service", "number": service_id} def startup_service(self, items): service_id = int(items[2]) return {"type": "startup_service", "number": service_id} def shutdown_service(self, items): service_id = int(items[2]) return {"type": "shutdown_service", "number": service_id} def restart_service(self, items): service_id = int(items[2]) return {"type": "restart_service", "number": service_id} def list_users(self, items): return {"type": "list_users"} def show_user(self, items): user_name = items[2] return {"type": "show_user", "username": user_name} def drop_user(self, items): user_name = items[2] return {"type": "drop_user", "username": user_name} def alter_user(self, items): user_name = items[3] new_password = items[4] return {"type": "alter_user", "username": user_name, "password": new_password} def create_user(self, items): user_name = items[2] password = items[3] return {"type": "create_user", "username": user_name, "password": password, "role": "user"} def activate_user(self, items): user_name = items[3] activate_status = items[4] return {"type": "activate_user", "activate_status": activate_status, "username": user_name} def list_datasets(self, items): user_name = items[3] return {"type": "list_datasets", "username": user_name} def list_agents(self, items): user_name = items[3] return {"type": "list_agents", "username": user_name} def meta_command(self, items): command_name = str(items[0]).lower() args = items[1:] if len(items) > 1 else [] # handle quoted parameter parsed_args = [] for arg in args: if hasattr(arg, 'value'): parsed_args.append(arg.value) else: parsed_args.append(str(arg)) return {'type': 'meta', 'command': command_name, 'args': parsed_args} def meta_command_name(self, items): return items[0] def meta_args(self, items): return items def encrypt(input_string): pub = '-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEArq9XTUSeYr2+N1h3Afl/z8Dse/2yD0ZGrKwx+EEEcdsBLca9Ynmx3nIB5obmLlSfmskLpBo0UACBmB5rEjBp2Q2f3AG3Hjd4B+gNCG6BDaawuDlgANIhGnaTLrIqWrrcm4EMzJOnAOI1fgzJRsOOUEfaS318Eq9OVO3apEyCCt0lOQK6PuksduOjVxtltDav+guVAA068NrPYmRNabVKRNLJpL8w4D44sfth5RvZ3q9t+6RTArpEtc5sh5ChzvqPOzKGMXW83C95TxmXqpbK6olN4RevSfVjEAgCydH6HN6OhtOQEcnrU97r9H0iZOWwbw3pVrZiUkuRD1R56Wzs2wIDAQAB\n-----END PUBLIC KEY-----' pub_key = RSA.importKey(pub) cipher = Cipher_pkcs1_v1_5.new(pub_key) cipher_text = cipher.encrypt(base64.b64encode(input_string.encode('utf-8'))) return base64.b64encode(cipher_text).decode("utf-8") class AdminCommandParser: def __init__(self): self.parser = Lark(GRAMMAR, start='start', parser='lalr', transformer=AdminTransformer()) self.command_history = [] def parse_command(self, command_str: str) -> Dict[str, Any]: if not command_str.strip(): return {'type': 'empty'} self.command_history.append(command_str) try: result = self.parser.parse(command_str) return result except Exception as e: return {'type': 'error', 'message': f'Parse error: {str(e)}'} class AdminCLI: def __init__(self): self.parser = AdminCommandParser() self.is_interactive = False self.admin_account = "admin@ragflow.io" self.admin_password: str = "admin" self.host: str = "" self.port: int = 0 def verify_admin(self, args): conn_info = self._parse_connection_args(args) if 'error' in conn_info: print(f"Error: {conn_info['error']}") return self.host = conn_info['host'] self.port = conn_info['port'] print(f"Attempt to access ip: {self.host}, port: {self.port}") url = f'http://{self.host}:{self.port}/api/v1/admin/auth' try_count = 0 while True: try_count += 1 if try_count > 3: return False admin_passwd = input(f"password for {self.admin_account}: ").strip() try: self.admin_password = encode_to_base64(admin_passwd) response = requests.get(url, auth=HTTPBasicAuth(self.admin_account, self.admin_password)) if response.status_code == 200: res_json = response.json() error_code = res_json.get('code', -1) if error_code == 0: print("Authentication successful.") return True else: error_message = res_json.get('message', 'Unknown error') print(f"Authentication failed: {error_message}, try again") continue else: print(f"Bad response,status: {response.status_code}, try again") except Exception: print(f"Can't access {self.host}, port: {self.port}") def _print_table_simple(self, data): if not data: print("No data to print") return if isinstance(data, dict): # handle single row data data = [data] columns = list(data[0].keys()) col_widths = {} for col in columns: max_width = len(str(col)) for item in data: value_len = len(str(item.get(col, ''))) if value_len > max_width: max_width = value_len col_widths[col] = max(2, max_width) # Generate delimiter separator = "+" + "+".join(["-" * (col_widths[col] + 2) for col in columns]) + "+" # Print header print(separator) header = "|" + "|".join([f" {col:<{col_widths[col]}} " for col in columns]) + "|" print(header) print(separator) # Print data for item in data: row = "|" for col in columns: value = str(item.get(col, '')) if len(value) > col_widths[col]: value = value[:col_widths[col] - 3] + "..." row += f" {value:<{col_widths[col]}} |" print(row) print(separator) def run_interactive(self): self.is_interactive = True print("RAGFlow Admin command line interface - Type '\\?' for help, '\\q' to quit") while True: try: command = input("admin> ").strip() if not command: continue print(f"command: {command}") result = self.parser.parse_command(command) self.execute_command(result) if isinstance(result, Tree): continue if result.get('type') == 'meta' and result.get('command') in ['q', 'quit', 'exit']: break except KeyboardInterrupt: print("\nUse '\\q' to quit") except EOFError: print("\nGoodbye!") break def run_single_command(self, args): conn_info = self._parse_connection_args(args) if 'error' in conn_info: print(f"Error: {conn_info['error']}") return def _parse_connection_args(self, args: List[str]) -> Dict[str, Any]: parser = argparse.ArgumentParser(description='Admin CLI Client', add_help=False) parser.add_argument('-h', '--host', default='localhost', help='Admin service host') parser.add_argument('-p', '--port', type=int, default=8080, help='Admin service port') try: parsed_args, remaining_args = parser.parse_known_args(args) return { 'host': parsed_args.host, 'port': parsed_args.port, } except SystemExit: return {'error': 'Invalid connection arguments'} def execute_command(self, parsed_command: Dict[str, Any]): command_dict: dict if isinstance(parsed_command, Tree): command_dict = parsed_command.children[0] else: if parsed_command['type'] == 'error': print(f"Error: {parsed_command['message']}") return else: command_dict = parsed_command # print(f"Parsed command: {command_dict}") command_type = command_dict['type'] match command_type: case 'list_services': self._handle_list_services(command_dict) case 'show_service': self._handle_show_service(command_dict) case 'restart_service': self._handle_restart_service(command_dict) case 'shutdown_service': self._handle_shutdown_service(command_dict) case 'startup_service': self._handle_startup_service(command_dict) case 'list_users': self._handle_list_users(command_dict) case 'show_user': self._handle_show_user(command_dict) case 'drop_user': self._handle_drop_user(command_dict) case 'alter_user': self._handle_alter_user(command_dict) case 'create_user': self._handle_create_user(command_dict) case 'activate_user': self._handle_activate_user(command_dict) case 'list_datasets': self._handle_list_datasets(command_dict) case 'list_agents': self._handle_list_agents(command_dict) case 'meta': self._handle_meta_command(command_dict) case _: print(f"Command '{command_type}' would be executed with API") def _handle_list_services(self, command): print("Listing all services") url = f'http://{self.host}:{self.port}/api/v1/admin/services' response = requests.get(url, auth=HTTPBasicAuth(self.admin_account, self.admin_password)) res_json = response.json() if response.status_code == 200: self._print_table_simple(res_json['data']) else: print(f"Fail to get all users, code: {res_json['code']}, message: {res_json['message']}") def _handle_show_service(self, command): service_id: int = command['number'] print(f"Showing service: {service_id}") def _handle_restart_service(self, command): service_id: int = command['number'] print(f"Restart service {service_id}") def _handle_shutdown_service(self, command): service_id: int = command['number'] print(f"Shutdown service {service_id}") def _handle_startup_service(self, command): service_id: int = command['number'] print(f"Startup service {service_id}") def _handle_list_users(self, command): print("Listing all users") url = f'http://{self.host}:{self.port}/api/v1/admin/users' response = requests.get(url, auth=HTTPBasicAuth(self.admin_account, self.admin_password)) res_json = response.json() if response.status_code == 200: self._print_table_simple(res_json['data']) else: print(f"Fail to get all users, code: {res_json['code']}, message: {res_json['message']}") def _handle_show_user(self, command): username_tree: Tree = command['username'] username: str = username_tree.children[0].strip("'\"") print(f"Showing user: {username}") url = f'http://{self.host}:{self.port}/api/v1/admin/users/{username}' response = requests.get(url, auth=HTTPBasicAuth(self.admin_account, self.admin_password)) res_json = response.json() if response.status_code == 200: self._print_table_simple(res_json['data']) else: print(f"Fail to get user {username}, code: {res_json['code']}, message: {res_json['message']}") def _handle_drop_user(self, command): username_tree: Tree = command['username'] username: str = username_tree.children[0].strip("'\"") print(f"Drop user: {username}") url = f'http://{self.host}:{self.port}/api/v1/admin/users/{username}' response = requests.delete(url, auth=HTTPBasicAuth(self.admin_account, self.admin_password)) res_json = response.json() if response.status_code == 200: print(res_json["message"]) else: print(f"Fail to drop user, code: {res_json['code']}, message: {res_json['message']}") def _handle_alter_user(self, command): username_tree: Tree = command['username'] username: str = username_tree.children[0].strip("'\"") password_tree: Tree = command['password'] password: str = password_tree.children[0].strip("'\"") print(f"Alter user: {username}, password: {password}") url = f'http://{self.host}:{self.port}/api/v1/admin/users/{username}/password' response = requests.put(url, auth=HTTPBasicAuth(self.admin_account, self.admin_password), json={'new_password': encrypt(password)}) res_json = response.json() if response.status_code == 200: print(res_json["message"]) else: print(f"Fail to alter password, code: {res_json['code']}, message: {res_json['message']}") def _handle_create_user(self, command): username_tree: Tree = command['username'] username: str = username_tree.children[0].strip("'\"") password_tree: Tree = command['password'] password: str = password_tree.children[0].strip("'\"") role: str = command['role'] print(f"Create user: {username}, password: {password}, role: {role}") url = f'http://{self.host}:{self.port}/api/v1/admin/users' response = requests.post( url, auth=HTTPBasicAuth(self.admin_account, self.admin_password), json={'username': username, 'password': encrypt(password), 'role': role} ) res_json = response.json() if response.status_code == 200: self._print_table_simple(res_json['data']) else: print(f"Fail to create user {username}, code: {res_json['code']}, message: {res_json['message']}") def _handle_activate_user(self, command): username_tree: Tree = command['username'] username: str = username_tree.children[0].strip("'\"") activate_tree: Tree = command['activate_status'] activate_status: str = activate_tree.children[0].strip("'\"") if activate_status.lower() in ['on', 'off']: print(f"Alter user {username} activate status, turn {activate_status.lower()}.") url = f'http://{self.host}:{self.port}/api/v1/admin/users/{username}/activate' response = requests.put(url, auth=HTTPBasicAuth(self.admin_account, self.admin_password), json={'activate_status': activate_status}) res_json = response.json() if response.status_code == 200: print(res_json["message"]) else: print(f"Fail to alter activate status, code: {res_json['code']}, message: {res_json['message']}") else: print(f"Unknown activate status: {activate_status}.") def _handle_list_datasets(self, command): username_tree: Tree = command['username'] username: str = username_tree.children[0].strip("'\"") print(f"Listing all datasets of user: {username}") url = f'http://{self.host}:{self.port}/api/v1/admin/users/{username}/datasets' response = requests.get(url, auth=HTTPBasicAuth(self.admin_account, self.admin_password)) res_json = response.json() if response.status_code == 200: self._print_table_simple(res_json['data']) else: print(f"Fail to get all datasets of {username}, code: {res_json['code']}, message: {res_json['message']}") def _handle_list_agents(self, command): username_tree: Tree = command['username'] username: str = username_tree.children[0].strip("'\"") print(f"Listing all agents of user: {username}") url = f'http://{self.host}:{self.port}/api/v1/admin/users/{username}/agents' response = requests.get(url, auth=HTTPBasicAuth(self.admin_account, self.admin_password)) res_json = response.json() if response.status_code == 200: self._print_table_simple(res_json['data']) else: print(f"Fail to get all agents of {username}, code: {res_json['code']}, message: {res_json['message']}") def _handle_meta_command(self, command): meta_command = command['command'] args = command.get('args', []) if meta_command in ['?', 'h', 'help']: self.show_help() elif meta_command in ['q', 'quit', 'exit']: print("Goodbye!") else: print(f"Meta command '{meta_command}' with args {args}") def show_help(self): """Help info""" help_text = """ Commands: LIST SERVICES SHOW SERVICE STARTUP SERVICE SHUTDOWN SERVICE RESTART SERVICE LIST USERS SHOW USER DROP USER CREATE USER ALTER USER PASSWORD ALTER USER ACTIVE LIST DATASETS OF LIST AGENTS OF Meta Commands: \\?, \\h, \\help Show this help \\q, \\quit, \\exit Quit the CLI """ print(help_text) def main(): import sys cli = AdminCLI() if len(sys.argv) == 1 or (len(sys.argv) > 1 and sys.argv[1] == '-'): print(r""" ____ ___ ______________ ___ __ _ / __ \/ | / ____/ ____/ /___ _ __ / | ____/ /___ ___ (_)___ / /_/ / /| |/ / __/ /_ / / __ \ | /| / / / /| |/ __ / __ `__ \/ / __ \ / _, _/ ___ / /_/ / __/ / / /_/ / |/ |/ / / ___ / /_/ / / / / / / / / / / /_/ |_/_/ |_\____/_/ /_/\____/|__/|__/ /_/ |_\__,_/_/ /_/ /_/_/_/ /_/ """) if cli.verify_admin(sys.argv): cli.run_interactive() else: if cli.verify_admin(sys.argv): cli.run_interactive() # cli.run_single_command(sys.argv[1:]) if __name__ == '__main__': main()