From aa4a72552912a1ee4486b24d6487bcbebfb456cb Mon Sep 17 00:00:00 2001 From: Kevin Hu Date: Tue, 15 Jul 2025 17:19:27 +0800 Subject: [PATCH] Pref: use redis to check if canceled. (#8853) ### What problem does this PR solve? ### Type of change - [x] Performance Improvement --- api/apps/document_app.py | 5 ++++- api/db/services/task_service.py | 17 +++++++++++++++++ rag/svr/task_executor.py | 12 ++++++------ 3 files changed, 27 insertions(+), 7 deletions(-) diff --git a/api/apps/document_app.py b/api/apps/document_app.py index c0d1a6be2..976ed4ece 100644 --- a/api/apps/document_app.py +++ b/api/apps/document_app.py @@ -32,7 +32,7 @@ from api.db.services.document_service import DocumentService, doc_upload_and_par from api.db.services.file2document_service import File2DocumentService from api.db.services.file_service import FileService from api.db.services.knowledgebase_service import KnowledgebaseService -from api.db.services.task_service import TaskService, queue_tasks +from api.db.services.task_service import TaskService, queue_tasks, cancel_all_task_of from api.db.services.user_service import UserTenantService from api.utils import get_uuid from api.utils.api_utils import ( @@ -438,6 +438,9 @@ def run(): if settings.docStoreConn.indexExist(search.index_name(tenant_id), doc.kb_id): settings.docStoreConn.delete({"doc_id": id}, search.index_name(tenant_id), doc.kb_id) + if str(req["run"]) == TaskStatus.CANCEL.value: + cancel_all_task_of(id) + if str(req["run"]) == TaskStatus.RUNNING.value: e, doc = DocumentService.get_by_id(id) doc = doc.to_dict() diff --git a/api/db/services/task_service.py b/api/db/services/task_service.py index 95317453d..207b6355d 100644 --- a/api/db/services/task_service.py +++ b/api/db/services/task_service.py @@ -453,3 +453,20 @@ def reuse_prev_task_chunks(task: dict, prev_tasks: list[dict], chunking_config: prev_task["chunk_ids"] = "" return len(task["chunk_ids"].split()) + + +def cancel_all_task_of(doc_id): + for t in TaskService.query(doc_id=doc_id): + try: + REDIS_CONN.set(f"{t.id}-cancel", "x") + except Exception as e: + logging.exception(e) + + +def has_canceled(task_id): + try: + if REDIS_CONN.get(f"{task_id}-cancel"): + return True + except Exception as e: + logging.exception(e) + return False diff --git a/rag/svr/task_executor.py b/rag/svr/task_executor.py index 094bf6299..b2a308f1b 100644 --- a/rag/svr/task_executor.py +++ b/rag/svr/task_executor.py @@ -50,7 +50,7 @@ from peewee import DoesNotExist from api.db import LLMType, ParserType from api.db.services.document_service import DocumentService from api.db.services.llm_service import LLMBundle -from api.db.services.task_service import TaskService +from api.db.services.task_service import TaskService, has_canceled from api.db.services.file2document_service import File2DocumentService from api import settings from api.versions import get_ragflow_version @@ -157,7 +157,7 @@ def set_progress(task_id, from_page=0, to_page=-1, prog=None, msg="Processing... try: if prog is not None and prog < 0: msg = "[ERROR]" + msg - cancel = TaskService.do_cancel(task_id) + cancel = has_canceled(task_id) if cancel: msg += " [Canceled]" @@ -214,7 +214,7 @@ async def collect(): canceled = False task = TaskService.get_task(msg["id"]) if task: - canceled = DocumentService.do_cancel(task["doc_id"]) + canceled = has_canceled(task["id"]) if not task or canceled: state = "is unknown" if not task else "has been cancelled" FAILED_TASKS += 1 @@ -382,7 +382,7 @@ async def build_chunks(task, progress_callback): docs_to_tag = [] for d in docs: - task_canceled = DocumentService.do_cancel(task["doc_id"]) + task_canceled = has_canceled(task["id"]) if task_canceled: progress_callback(-1, msg="Task has been canceled.") return @@ -531,7 +531,7 @@ async def do_handle_task(task): progress_callback(-1, msg=error_message) raise Exception(error_message) - task_canceled = DocumentService.do_cancel(task_doc_id) + task_canceled = has_canceled(task_id) if task_canceled: progress_callback(-1, msg="Task has been canceled.") return @@ -609,7 +609,7 @@ async def do_handle_task(task): for b in range(0, len(chunks), DOC_BULK_SIZE): doc_store_result = await trio.to_thread.run_sync(lambda: settings.docStoreConn.insert(chunks[b:b + DOC_BULK_SIZE], search.index_name(task_tenant_id), task_dataset_id)) - task_canceled = DocumentService.do_cancel(task_doc_id) + task_canceled = has_canceled(task_id) if task_canceled: progress_callback(-1, msg="Task has been canceled.") return