diff --git a/admin/admin_client.py b/admin/admin_client.py index e0c29fb7f..59e0893a1 100644 --- a/admin/admin_client.py +++ b/admin/admin_client.py @@ -429,6 +429,13 @@ class AdminCLI: username_tree: Tree = command['username'] username: str = username_tree.children[0].strip("'\"") print(f"Drop user: {username}") + url = f'http://{self.host}:{self.port}/api/v1/admin/users/{username}' + response = requests.delete(url, auth=HTTPBasicAuth(self.admin_account, self.admin_password)) + res_json = response.json() + if response.status_code == 200: + print(res_json["message"]) + else: + print(f"Fail to drop user, code: {res_json['code']}, message: {res_json['message']}") def _handle_alter_user(self, command): username_tree: Tree = command['username'] @@ -531,6 +538,7 @@ Commands: DROP USER CREATE USER ALTER USER PASSWORD + ALTER USER ACTIVE LIST DATASETS OF LIST AGENTS OF diff --git a/admin/routes.py b/admin/routes.py index e93d830ef..8cd9633b1 100644 --- a/admin/routes.py +++ b/admin/routes.py @@ -57,8 +57,11 @@ def create_user(): @login_verify def delete_user(username): try: - UserMgr.delete_user(username) - return success_response(None, "User and all data deleted successfully") + res = UserMgr.delete_user(username) + if res["success"]: + return success_response(None, res["message"]) + else: + return error_response(res["message"]) except AdminException as e: return error_response(e.message, e.code) diff --git a/admin/services.py b/admin/services.py index 708d24fbc..c5db5d90a 100644 --- a/admin/services.py +++ b/admin/services.py @@ -2,7 +2,7 @@ import re from werkzeug.security import check_password_hash from api.db import ActiveEnum from api.db.services import UserService -from api.db.joint_services.user_account_service import create_new_user +from api.db.joint_services.user_account_service import create_new_user, delete_user_data from api.db.services.canvas_service import UserCanvasService from api.db.services.user_service import TenantService from api.db.services.knowledgebase_service import KnowledgebaseService @@ -61,7 +61,13 @@ class UserMgr: @staticmethod def delete_user(username): # use email to delete - raise AdminException("delete_user: not implemented") + user_list = UserService.query_user_by_email(username) + if not user_list: + raise UserNotFoundError(username) + if len(user_list) > 1: + raise AdminException(f"Exist more than 1 user: {username}!") + usr = user_list[0] + return delete_user_data(usr.id) @staticmethod def update_user_password(username, new_password) -> str: @@ -134,7 +140,13 @@ class UserServiceMgr: tenants = TenantService.get_joined_tenants_by_user_id(usr.id) tenant_ids = [m["tenant_id"] for m in tenants] # filter permitted agents and owned agents - return UserCanvasService.get_all_agents_by_tenant_ids(tenant_ids, usr.id) + res = UserCanvasService.get_all_agents_by_tenant_ids(tenant_ids, usr.id) + return [{ + 'title': r['title'], + 'permission': r['permission'], + 'canvas_type': r['canvas_type'], + 'canvas_category': r['canvas_category'] + } for r in res] class ServiceMgr: diff --git a/api/apps/canvas_app.py b/api/apps/canvas_app.py index bba0d0094..57de77613 100644 --- a/api/apps/canvas_app.py +++ b/api/apps/canvas_app.py @@ -101,7 +101,7 @@ def save(): def get(canvas_id): if not UserCanvasService.accessible(canvas_id, current_user.id): return get_data_error_result(message="canvas not found.") - e, c = UserCanvasService.get_by_tenant_id(canvas_id) + e, c = UserCanvasService.get_by_canvas_id(canvas_id) return get_json_result(data=c) @@ -198,7 +198,7 @@ def reset(): @manager.route("/upload/", methods=["POST"]) # noqa: F821 def upload(canvas_id): - e, cvs = UserCanvasService.get_by_tenant_id(canvas_id) + e, cvs = UserCanvasService.get_by_canvas_id(canvas_id) if not e: return get_data_error_result(message="canvas not found.") diff --git a/api/apps/dataflow_app.py b/api/apps/dataflow_app.py index 36ccf57a2..cb0fe98b3 100644 --- a/api/apps/dataflow_app.py +++ b/api/apps/dataflow_app.py @@ -94,7 +94,7 @@ def save(): def get(canvas_id): if not UserCanvasService.accessible(canvas_id, current_user.id): return get_data_error_result(message="canvas not found.") - e, c = UserCanvasService.get_by_tenant_id(canvas_id) + e, c = UserCanvasService.get_by_canvas_id(canvas_id) return get_json_result(data=c) @@ -161,7 +161,7 @@ def reset(): @manager.route("/upload/", methods=["POST"]) # noqa: F821 def upload(canvas_id): - e, cvs = UserCanvasService.get_by_tenant_id(canvas_id) + e, cvs = UserCanvasService.get_by_canvas_id(canvas_id) if not e: return get_data_error_result(message="canvas not found.") diff --git a/api/db/joint_services/user_account_service.py b/api/db/joint_services/user_account_service.py index e94cf4776..61f23ccd5 100644 --- a/api/db/joint_services/user_account_service.py +++ b/api/db/joint_services/user_account_service.py @@ -17,13 +17,26 @@ import logging import uuid from api import settings -from api.db import FileType, UserTenantRole -from api.db.db_models import TenantLLM +from api.utils.api_utils import group_by +from api.db import FileType, UserTenantRole, ActiveEnum +from api.db.services.api_service import APITokenService, API4ConversationService +from api.db.services.canvas_service import UserCanvasService +from api.db.services.conversation_service import ConversationService +from api.db.services.dialog_service import DialogService +from api.db.services.document_service import DocumentService +from api.db.services.file2document_service import File2DocumentService +from api.db.services.knowledgebase_service import KnowledgebaseService +from api.db.services.langfuse_service import TenantLangfuseService from api.db.services.llm_service import get_init_tenant_llm from api.db.services.file_service import FileService +from api.db.services.mcp_server_service import MCPServerService +from api.db.services.search_service import SearchService +from api.db.services.task_service import TaskService from api.db.services.tenant_llm_service import TenantLLMService +from api.db.services.user_canvas_version import UserCanvasVersionService from api.db.services.user_service import TenantService, UserService, UserTenantService - +from rag.utils.storage_factory import STORAGE_IMPL +from rag.nlp import search def create_new_user(user_info: dict) -> dict: @@ -104,7 +117,7 @@ def create_new_user(user_info: dict) -> dict: except Exception as e: logging.exception(e) try: - TenantLLM.delete().where(TenantLLM.tenant_id == user_id).execute() + TenantLLMService.delete_by_tenant_id(user_id) except Exception as e: logging.exception(e) try: @@ -118,3 +131,197 @@ def create_new_user(user_info: dict) -> dict: logging.exception(e) # reraise raise create_error + + +def delete_user_data(user_id: str) -> dict: + # use user_id to delete + usr = UserService.filter_by_id(user_id) + if not usr: + return {"success": False, "message": f"{user_id} can't be found."} + # check is inactive and not admin + if usr.is_active == ActiveEnum.ACTIVE.value: + return {"success": False, "message": f"{user_id} is active and can't be deleted."} + if usr.is_superuser: + return {"success": False, "message": "Can't delete the super user."} + # tenant info + tenants = UserTenantService.get_user_tenant_relation_by_user_id(usr.id) + owned_tenant = [t for t in tenants if t["role"] == UserTenantRole.OWNER.value] + + done_msg = '' + try: + # step1. delete owned tenant info + if owned_tenant: + done_msg += "Start to delete owned tenant.\n" + tenant_id = owned_tenant[0]["tenant_id"] + kb_ids = KnowledgebaseService.get_kb_ids(usr.id) + # step1.1 delete knowledgebase related file and info + if kb_ids: + # step1.1.1 delete files in storage, remove bucket + for kb_id in kb_ids: + if STORAGE_IMPL.bucket_exists(kb_id): + STORAGE_IMPL.remove_bucket(kb_id) + done_msg += f"- Removed {len(kb_ids)} dataset's buckets.\n" + # step1.1.2 delete file and document info in db + doc_ids = DocumentService.get_all_doc_ids_by_kb_ids(kb_ids) + if doc_ids: + doc_delete_res = DocumentService.delete_by_ids([i["id"] for i in doc_ids]) + done_msg += f"- Deleted {doc_delete_res} document records.\n" + task_delete_res = TaskService.delete_by_doc_ids([i["id"] for i in doc_ids]) + done_msg += f"- Deleted {task_delete_res} task records.\n" + file_ids = FileService.get_all_file_ids_by_tenant_id(usr.id) + if file_ids: + file_delete_res = FileService.delete_by_ids([f["id"] for f in file_ids]) + done_msg += f"- Deleted {file_delete_res} file records.\n" + if doc_ids or file_ids: + file2doc_delete_res = File2DocumentService.delete_by_document_ids_or_file_ids( + [i["id"] for i in doc_ids], + [f["id"] for f in file_ids] + ) + done_msg += f"- Deleted {file2doc_delete_res} document-file relation records.\n" + # step1.1.3 delete chunk in es + r = settings.docStoreConn.delete({"kb_id": kb_ids}, + search.index_name(tenant_id), kb_ids) + done_msg += f"- Deleted {r} chunk records.\n" + kb_delete_res = KnowledgebaseService.delete_by_ids(kb_ids) + done_msg += f"- Deleted {kb_delete_res} knowledgebase records.\n" + # step1.1.4 delete agents + agent_delete_res = delete_user_agents(usr.id) + done_msg += f"- Deleted {agent_delete_res['agents_deleted_count']} agent, {agent_delete_res['version_deleted_count']} versions records.\n" + # step1.1.5 delete dialogs + dialog_delete_res = delete_user_dialogs(usr.id) + done_msg += f"- Deleted {dialog_delete_res['dialogs_deleted_count']} dialogs, {dialog_delete_res['conversations_deleted_count']} conversations, {dialog_delete_res['api_token_deleted_count']} api tokens, {dialog_delete_res['api4conversation_deleted_count']} api4conversations.\n" + # step1.1.6 delete mcp server + mcp_delete_res = MCPServerService.delete_by_tenant_id(usr.id) + done_msg += f"- Deleted {mcp_delete_res} MCP server.\n" + # step1.1.7 delete search + search_delete_res = SearchService.delete_by_tenant_id(usr.id) + done_msg += f"- Deleted {search_delete_res} search records.\n" + # step1.2 delete tenant_llm and tenant_langfuse + llm_delete_res = TenantLLMService.delete_by_tenant_id(tenant_id) + done_msg += f"- Deleted {llm_delete_res} tenant-LLM records.\n" + langfuse_delete_res = TenantLangfuseService.delete_ty_tenant_id(tenant_id) + done_msg += f"- Deleted {langfuse_delete_res} langfuse records.\n" + # step1.3 delete own tenant + tenant_delete_res = TenantService.delete_by_id(tenant_id) + done_msg += f"- Deleted {tenant_delete_res} tenant.\n" + # step2 delete user-tenant relation + if tenants: + # step2.1 delete docs and files in joined team + joined_tenants = [t for t in tenants if t["role"] == UserTenantRole.NORMAL.value] + if joined_tenants: + done_msg += "Start to delete data in joined tenants.\n" + created_documents = DocumentService.get_all_docs_by_creator_id(usr.id) + if created_documents: + # step2.1.1 delete files + doc_file_info = File2DocumentService.get_by_document_ids([d['id'] for d in created_documents]) + created_files = FileService.get_by_ids([f['file_id'] for f in doc_file_info]) + if created_files: + # step2.1.1.1 delete file in storage + for f in created_files: + STORAGE_IMPL.rm(f.parent_id, f.location) + done_msg += f"- Deleted {len(created_files)} uploaded file.\n" + # step2.1.1.2 delete file record + file_delete_res = FileService.delete_by_ids([f.id for f in created_files]) + done_msg += f"- Deleted {file_delete_res} file records.\n" + # step2.1.2 delete document-file relation record + file2doc_delete_res = File2DocumentService.delete_by_document_ids_or_file_ids( + [d['id'] for d in created_documents], + [f.id for f in created_files] + ) + done_msg += f"- Deleted {file2doc_delete_res} document-file relation records.\n" + # step2.1.3 delete chunks + doc_groups = group_by(created_documents, "tenant_id") + kb_grouped_doc = {k: group_by(v, "kb_id") for k, v in doc_groups.items()} + # chunks in {'tenant_id': {'kb_id': [{'id': doc_id}]}} structure + chunk_delete_res = 0 + kb_doc_info = {} + for _tenant_id, kb_doc in kb_grouped_doc.items(): + for _kb_id, docs in kb_doc.items(): + chunk_delete_res += settings.docStoreConn.delete( + {"doc_id": [d["id"] for d in docs]}, + search.index_name(_tenant_id), _kb_id + ) + # record doc info + if _kb_id in kb_doc_info.keys(): + kb_doc_info[_kb_id]['doc_num'] += 1 + kb_doc_info[_kb_id]['token_num'] += sum([d["token_num"] for d in docs]) + kb_doc_info[_kb_id]['chunk_num'] += sum([d["chunk_num"] for d in docs]) + else: + kb_doc_info[_kb_id] = { + 'doc_num': 1, + 'token_num': sum([d["token_num"] for d in docs]), + 'chunk_num': sum([d["chunk_num"] for d in docs]) + } + done_msg += f"- Deleted {chunk_delete_res} chunks.\n" + # step2.1.4 delete tasks + task_delete_res = TaskService.delete_by_doc_ids([d['id'] for d in created_documents]) + done_msg += f"- Deleted {task_delete_res} tasks.\n" + # step2.1.5 delete document record + doc_delete_res = DocumentService.delete_by_ids([d['id'] for d in created_documents]) + done_msg += f"- Deleted {doc_delete_res} documents.\n" + # step2.1.6 update knowledge base doc&chunk&token cnt + for kb_id, doc_num in kb_doc_info.items(): + KnowledgebaseService.decrease_document_num_in_delete(kb_id, doc_num) + + # step2.2 delete relation + user_tenant_delete_res = UserTenantService.delete_by_ids([t["id"] for t in tenants]) + done_msg += f"- Deleted {user_tenant_delete_res} user-tenant records.\n" + # step3 finally delete user + user_delete_res = UserService.delete_by_id(usr.id) + done_msg += f"- Deleted {user_delete_res} user.\nDelete done!" + + return {"success": True, "message": f"Successfully deleted user. Details:\n{done_msg}"} + + except Exception as e: + logging.exception(e) + return {"success": False, "message": f"Error: {str(e)}. Already done:\n{done_msg}"} + + +def delete_user_agents(user_id: str) -> dict: + """ + use user_id to delete + :return: { + "agents_deleted_count": 1, + "version_deleted_count": 2 + } + """ + agents_deleted_count, agents_version_deleted_count = 0, 0 + user_agents = UserCanvasService.get_all_agents_by_tenant_ids([user_id], user_id) + if user_agents: + agents_version = UserCanvasVersionService.get_all_canvas_version_by_canvas_ids([a['id'] for a in user_agents]) + agents_version_deleted_count = UserCanvasVersionService.delete_by_ids([v['id'] for v in agents_version]) + agents_deleted_count = UserCanvasService.delete_by_ids([a['id'] for a in user_agents]) + return { + "agents_deleted_count": agents_deleted_count, + "version_deleted_count": agents_version_deleted_count + } + + +def delete_user_dialogs(user_id: str) -> dict: + """ + use user_id to delete + :return: { + "dialogs_deleted_count": 1, + "conversations_deleted_count": 1, + "api_token_deleted_count": 2, + "api4conversation_deleted_count": 2 + } + """ + dialog_deleted_count, conversations_deleted_count, api_token_deleted_count, api4conversation_deleted_count = 0, 0, 0, 0 + user_dialogs = DialogService.get_all_dialogs_by_tenant_id(user_id) + if user_dialogs: + # delete conversation + conversations = ConversationService.get_all_conversation_by_dialog_ids([ud['id'] for ud in user_dialogs]) + conversations_deleted_count = ConversationService.delete_by_ids([c['id'] for c in conversations]) + # delete api token + api_token_deleted_count = APITokenService.delete_by_tenant_id(user_id) + # delete api for conversation + api4conversation_deleted_count = API4ConversationService.delete_by_dialog_ids([ud['id'] for ud in user_dialogs]) + # delete dialog at last + dialog_deleted_count = DialogService.delete_by_ids([ud['id'] for ud in user_dialogs]) + return { + "dialogs_deleted_count": dialog_deleted_count, + "conversations_deleted_count": conversations_deleted_count, + "api_token_deleted_count": api_token_deleted_count, + "api4conversation_deleted_count": api4conversation_deleted_count + } diff --git a/api/db/services/api_service.py b/api/db/services/api_service.py index 2fcbe0329..9a23547f7 100644 --- a/api/db/services/api_service.py +++ b/api/db/services/api_service.py @@ -35,6 +35,11 @@ class APITokenService(CommonService): cls.model.token == token ) + @classmethod + @DB.connection_context() + def delete_by_tenant_id(cls, tenant_id): + return cls.model.delete().where(cls.model.tenant_id == tenant_id).execute() + class API4ConversationService(CommonService): model = API4Conversation @@ -100,3 +105,8 @@ class API4ConversationService(CommonService): cls.model.create_date <= to_date, cls.model.source == source ).group_by(cls.model.create_date.truncate("day")).dicts() + + @classmethod + @DB.connection_context() + def delete_by_dialog_ids(cls, dialog_ids): + return cls.model.delete().where(cls.model.dialog_id.in_(dialog_ids)).execute() diff --git a/api/db/services/canvas_service.py b/api/db/services/canvas_service.py index 800a71187..7b2150c3c 100644 --- a/api/db/services/canvas_service.py +++ b/api/db/services/canvas_service.py @@ -66,6 +66,7 @@ class UserCanvasService(CommonService): def get_all_agents_by_tenant_ids(cls, tenant_ids, user_id): # will get all permitted agents, be cautious fields = [ + cls.model.id, cls.model.title, cls.model.permission, cls.model.canvas_type, @@ -93,7 +94,7 @@ class UserCanvasService(CommonService): @classmethod @DB.connection_context() - def get_by_tenant_id(cls, pid): + def get_by_canvas_id(cls, pid): try: fields = [ @@ -165,7 +166,7 @@ class UserCanvasService(CommonService): @DB.connection_context() def accessible(cls, canvas_id, tenant_id): from api.db.services.user_service import UserTenantService - e, c = UserCanvasService.get_by_tenant_id(canvas_id) + e, c = UserCanvasService.get_by_canvas_id(canvas_id) if not e: return False diff --git a/api/db/services/conversation_service.py b/api/db/services/conversation_service.py index 26361fc7b..53913f442 100644 --- a/api/db/services/conversation_service.py +++ b/api/db/services/conversation_service.py @@ -48,6 +48,21 @@ class ConversationService(CommonService): return list(sessions.dicts()) + @classmethod + @DB.connection_context() + def get_all_conversation_by_dialog_ids(cls, dialog_ids): + sessions = cls.model.select().where(cls.model.dialog_id.in_(dialog_ids)) + sessions.order_by(cls.model.create_time.asc()) + offset, limit = 0, 100 + res = [] + while True: + s_batch = sessions.offset(offset).limit(limit) + _temp = list(s_batch.dicts()) + if not _temp: + break + res.extend(_temp) + offset += limit + return res def structure_answer(conv, ans, message_id, session_id): reference = ans["reference"] diff --git a/api/db/services/dialog_service.py b/api/db/services/dialog_service.py index a51d6b925..673000ff9 100644 --- a/api/db/services/dialog_service.py +++ b/api/db/services/dialog_service.py @@ -159,6 +159,22 @@ class DialogService(CommonService): return list(dialogs.dicts()), count + @classmethod + @DB.connection_context() + def get_all_dialogs_by_tenant_id(cls, tenant_id): + fields = [cls.model.id] + dialogs = cls.model.select(*fields).where(cls.model.tenant_id == tenant_id) + dialogs.order_by(cls.model.create_time.asc()) + offset, limit = 0, 100 + res = [] + while True: + d_batch = dialogs.offset(offset).limit(limit) + _temp = list(d_batch.dicts()) + if not _temp: + break + res.extend(_temp) + offset += limit + return res def chat_solo(dialog, messages, stream=True): if TenantLLMService.llm_id2llm_type(dialog.llm_id) == "image2text": diff --git a/api/db/services/document_service.py b/api/db/services/document_service.py index ffb373d96..9624bf4dc 100644 --- a/api/db/services/document_service.py +++ b/api/db/services/document_service.py @@ -228,6 +228,46 @@ class DocumentService(CommonService): return int(query.scalar()) or 0 + @classmethod + @DB.connection_context() + def get_all_doc_ids_by_kb_ids(cls, kb_ids): + fields = [cls.model.id] + docs = cls.model.select(*fields).where(cls.model.kb_id.in_(kb_ids)) + docs.order_by(cls.model.create_time.asc()) + # maybe cause slow query by deep paginate, optimize later + offset, limit = 0, 100 + res = [] + while True: + doc_batch = docs.offset(offset).limit(limit) + _temp = list(doc_batch.dicts()) + if not _temp: + break + res.extend(_temp) + offset += limit + return res + + @classmethod + @DB.connection_context() + def get_all_docs_by_creator_id(cls, creator_id): + fields = [ + cls.model.id, cls.model.kb_id, cls.model.token_num, cls.model.chunk_num, Knowledgebase.tenant_id + ] + docs = cls.model.select(*fields).join(Knowledgebase, on=(Knowledgebase.id == cls.model.kb_id)).where( + cls.model.created_by == creator_id + ) + docs.order_by(cls.model.create_time.asc()) + # maybe cause slow query by deep paginate, optimize later + offset, limit = 0, 100 + res = [] + while True: + doc_batch = docs.offset(offset).limit(limit) + _temp = list(doc_batch.dicts()) + if not _temp: + break + res.extend(_temp) + offset += limit + return res + @classmethod @DB.connection_context() def insert(cls, doc): diff --git a/api/db/services/file2document_service.py b/api/db/services/file2document_service.py index c03dbf928..31d75accd 100644 --- a/api/db/services/file2document_service.py +++ b/api/db/services/file2document_service.py @@ -38,6 +38,12 @@ class File2DocumentService(CommonService): objs = cls.model.select().where(cls.model.document_id == document_id) return objs + @classmethod + @DB.connection_context() + def get_by_document_ids(cls, document_ids): + objs = cls.model.select().where(cls.model.document_id.in_(document_ids)) + return list(objs.dicts()) + @classmethod @DB.connection_context() def insert(cls, obj): @@ -50,6 +56,15 @@ class File2DocumentService(CommonService): def delete_by_file_id(cls, file_id): return cls.model.delete().where(cls.model.file_id == file_id).execute() + @classmethod + @DB.connection_context() + def delete_by_document_ids_or_file_ids(cls, document_ids, file_ids): + if not document_ids: + return cls.model.delete().where(cls.model.file_id.in_(file_ids)).execute() + elif not file_ids: + return cls.model.delete().where(cls.model.document_id.in_(document_ids)).execute() + return cls.model.delete().where(cls.model.document_id.in_(document_ids) | cls.model.file_id.in_(file_ids)).execute() + @classmethod @DB.connection_context() def delete_by_document_id(cls, doc_id): diff --git a/api/db/services/file_service.py b/api/db/services/file_service.py index 24f923d38..e65e34cc5 100644 --- a/api/db/services/file_service.py +++ b/api/db/services/file_service.py @@ -161,6 +161,23 @@ class FileService(CommonService): result_ids.append(folder_id) return result_ids + @classmethod + @DB.connection_context() + def get_all_file_ids_by_tenant_id(cls, tenant_id): + fields = [cls.model.id] + files = cls.model.select(*fields).where(cls.model.tenant_id == tenant_id) + files.order_by(cls.model.create_time.asc()) + offset, limit = 0, 100 + res = [] + while True: + file_batch = files.offset(offset).limit(limit) + _temp = list(file_batch.dicts()) + if not _temp: + break + res.extend(_temp) + offset += limit + return res + @classmethod @DB.connection_context() def create_folder(cls, file, parent_id, name, count): diff --git a/api/db/services/knowledgebase_service.py b/api/db/services/knowledgebase_service.py index a49ab7701..d0bd8faa4 100644 --- a/api/db/services/knowledgebase_service.py +++ b/api/db/services/knowledgebase_service.py @@ -471,3 +471,17 @@ class KnowledgebaseService(CommonService): else: raise e + @classmethod + @DB.connection_context() + def decrease_document_num_in_delete(cls, kb_id, doc_num_info: dict): + kb_row = cls.model.get_by_id(kb_id) + if not kb_row: + raise RuntimeError(f"kb_id {kb_id} does not exist") + update_dict = { + 'doc_num': kb_row.doc_num - doc_num_info['doc_num'], + 'chunk_num': kb_row.chunk_num - doc_num_info['chunk_num'], + 'token_num': kb_row.token_num - doc_num_info['token_num'], + 'update_time': current_timestamp(), + 'update_date': datetime_format(datetime.now()) + } + return cls.model.update(update_dict).where(cls.model.id == kb_id).execute() diff --git a/api/db/services/langfuse_service.py b/api/db/services/langfuse_service.py index c75f3d12e..6f4646900 100644 --- a/api/db/services/langfuse_service.py +++ b/api/db/services/langfuse_service.py @@ -51,6 +51,11 @@ class TenantLangfuseService(CommonService): except peewee.DoesNotExist: return None + @classmethod + @DB.connection_context() + def delete_ty_tenant_id(cls, tenant_id): + return cls.model.delete().where(cls.model.tenant_id == tenant_id).execute() + @classmethod def update_by_tenant(cls, tenant_id, langfuse_keys): langfuse_keys["update_time"] = current_timestamp() diff --git a/api/db/services/mcp_server_service.py b/api/db/services/mcp_server_service.py index 869350094..101555f4b 100644 --- a/api/db/services/mcp_server_service.py +++ b/api/db/services/mcp_server_service.py @@ -84,3 +84,8 @@ class MCPServerService(CommonService): return bool(mcp_server), mcp_server except Exception: return False, None + + @classmethod + @DB.connection_context() + def delete_by_tenant_id(cls, tenant_id: str): + return cls.model.delete().where(cls.model.tenant_id == tenant_id).execute() diff --git a/api/db/services/search_service.py b/api/db/services/search_service.py index 674cfcf1f..acb07da57 100644 --- a/api/db/services/search_service.py +++ b/api/db/services/search_service.py @@ -110,3 +110,8 @@ class SearchService(CommonService): query = query.paginate(page_number, items_per_page) return list(query.dicts()), count + + @classmethod + @DB.connection_context() + def delete_by_tenant_id(cls, tenant_id): + return cls.model.delete().where(cls.model.tenant_id == tenant_id).execute() diff --git a/api/db/services/task_service.py b/api/db/services/task_service.py index 46087f8ba..07cfbb600 100644 --- a/api/db/services/task_service.py +++ b/api/db/services/task_service.py @@ -308,6 +308,12 @@ class TaskService(CommonService): ) ).execute() + @classmethod + @DB.connection_context() + def delete_by_doc_ids(cls, doc_ids): + """Delete task associated with a document.""" + return cls.model.delete().where(cls.model.doc_id.in_(doc_ids)).execute() + def queue_tasks(doc: dict, bucket: str, name: str, priority: int): """Create and queue document processing tasks. diff --git a/api/db/services/tenant_llm_service.py b/api/db/services/tenant_llm_service.py index ec023f115..4eca970ec 100644 --- a/api/db/services/tenant_llm_service.py +++ b/api/db/services/tenant_llm_service.py @@ -209,6 +209,11 @@ class TenantLLMService(CommonService): objs = cls.model.select().where((cls.model.llm_factory == "OpenAI"), ~(cls.model.llm_name == "text-embedding-3-small"), ~(cls.model.llm_name == "text-embedding-3-large")).dicts() return list(objs) + @classmethod + @DB.connection_context() + def delete_by_tenant_id(cls, tenant_id): + return cls.model.delete().where(cls.model.tenant_id == tenant_id).execute() + @staticmethod def llm_id2llm_type(llm_id: str) -> str | None: from api.db.services.llm_service import LLMService diff --git a/api/db/services/user_canvas_version.py b/api/db/services/user_canvas_version.py index 9fe12e32e..9696a7834 100644 --- a/api/db/services/user_canvas_version.py +++ b/api/db/services/user_canvas_version.py @@ -24,7 +24,24 @@ class UserCanvasVersionService(CommonService): return None except Exception: return None - + + @classmethod + @DB.connection_context() + def get_all_canvas_version_by_canvas_ids(cls, canvas_ids): + fields = [cls.model.id] + versions = cls.model.select(*fields).where(cls.model.user_canvas_id.in_(canvas_ids)) + versions.order_by(cls.model.create_time.asc()) + offset, limit = 0, 100 + res = [] + while True: + version_batch = versions.offset(offset).limit(limit) + _temp = list(version_batch.dicts()) + if not _temp: + break + res.extend(_temp) + offset += limit + return res + @classmethod @DB.connection_context() def delete_all_versions(cls, user_canvas_id): diff --git a/api/db/services/user_service.py b/api/db/services/user_service.py index a0b3ea843..12c405bd6 100644 --- a/api/db/services/user_service.py +++ b/api/db/services/user_service.py @@ -288,6 +288,17 @@ class UserTenantService(CommonService): .join(User, on=((cls.model.tenant_id == User.id) & (UserTenant.user_id == user_id) & (UserTenant.status == StatusEnum.VALID.value))) .where(cls.model.status == StatusEnum.VALID.value).dicts()) + @classmethod + @DB.connection_context() + def get_user_tenant_relation_by_user_id(cls, user_id): + fields = [ + cls.model.id, + cls.model.user_id, + cls.model.tenant_id, + cls.model.role + ] + return list(cls.model.select(*fields).where(cls.model.user_id == user_id).dicts().dicts()) + @classmethod @DB.connection_context() def get_num_members(cls, user_id: str): diff --git a/api/utils/api_utils.py b/api/utils/api_utils.py index 1aaaee997..1b7452495 100644 --- a/api/utils/api_utils.py +++ b/api/utils/api_utils.py @@ -659,6 +659,16 @@ def remap_dictionary_keys(source_data: dict, key_aliases: dict = None) -> dict: return transformed_data +def group_by(list_of_dict, key): + res = {} + for item in list_of_dict: + if item[key] in res.keys(): + res[item[key]].append(item) + else: + res[item[key]] = [item] + return res + + def get_mcp_tools(mcp_servers: list, timeout: float | int = 10) -> tuple[dict, str]: results = {} tool_call_sessions = [] diff --git a/rag/utils/minio_conn.py b/rag/utils/minio_conn.py index 80a723a5c..c26e5606d 100644 --- a/rag/utils/minio_conn.py +++ b/rag/utils/minio_conn.py @@ -108,6 +108,19 @@ class RAGFlowMinio: logging.exception(f"obj_exist {bucket}/{filename} got exception") return False + def bucket_exists(self, bucket): + try: + if not self.conn.bucket_exists(bucket): + return False + else: + return True + except S3Error as e: + if e.code in ["NoSuchKey", "NoSuchBucket", "ResourceNotFound"]: + return False + except Exception: + logging.exception(f"bucket_exist {bucket} got exception") + return False + def get_presigned_url(self, bucket, fnm, expires): for _ in range(10): try: