Refac: pipelinge message

This commit is contained in:
yangdx 2025-06-26 01:00:54 +08:00
parent 88c528ec94
commit 8fb1c09b08
2 changed files with 26 additions and 35 deletions

View File

@ -836,24 +836,24 @@ async def background_delete_documents(
# Loop through each document ID and delete them one by one # Loop through each document ID and delete them one by one
for i, doc_id in enumerate(doc_ids, 1): for i, doc_id in enumerate(doc_ids, 1):
async with pipeline_status_lock: async with pipeline_status_lock:
start_msg = f"Deleting document {i}/{total_docs}: {doc_id}"
logger.info(start_msg)
pipeline_status["cur_batch"] = i pipeline_status["cur_batch"] = i
pipeline_status["latest_message"] = ( pipeline_status["latest_message"] = start_msg
f"Deleting document {i}/{total_docs}: {doc_id}" pipeline_status["history_messages"].append(start_msg)
)
pipeline_status["history_messages"].append(
f"Processing document {i}/{total_docs}: {doc_id}"
)
file_path = "#"
try: try:
result = await rag.adelete_by_doc_id(doc_id) result = await rag.adelete_by_doc_id(doc_id)
file_path = (
getattr(result, "file_path", "-") if "result" in locals() else "-"
)
if result.status == "success": if result.status == "success":
successful_deletions.append(doc_id) successful_deletions.append(doc_id)
success_msg = ( success_msg = (
f"Successfully deleted document {i}/{total_docs}: {doc_id}" f"Deleted document {i}/{total_docs}: {doc_id}[{file_path}]"
) )
logger.info(success_msg) logger.info(success_msg)
async with pipeline_status_lock: async with pipeline_status_lock:
pipeline_status["history_messages"].append(success_msg) pipeline_status["history_messages"].append(success_msg)
@ -872,6 +872,7 @@ async def background_delete_documents(
) )
logger.info(file_delete_msg) logger.info(file_delete_msg)
async with pipeline_status_lock: async with pipeline_status_lock:
pipeline_status["latest_message"] = file_delete_msg
pipeline_status["history_messages"].append( pipeline_status["history_messages"].append(
file_delete_msg file_delete_msg
) )
@ -881,6 +882,9 @@ async def background_delete_documents(
) )
logger.warning(file_not_found_msg) logger.warning(file_not_found_msg)
async with pipeline_status_lock: async with pipeline_status_lock:
pipeline_status["latest_message"] = (
file_not_found_msg
)
pipeline_status["history_messages"].append( pipeline_status["history_messages"].append(
file_not_found_msg file_not_found_msg
) )
@ -888,6 +892,7 @@ async def background_delete_documents(
file_error_msg = f"Failed to delete file {result.file_path}: {str(file_error)}" file_error_msg = f"Failed to delete file {result.file_path}: {str(file_error)}"
logger.error(file_error_msg) logger.error(file_error_msg)
async with pipeline_status_lock: async with pipeline_status_lock:
pipeline_status["latest_message"] = file_error_msg
pipeline_status["history_messages"].append( pipeline_status["history_messages"].append(
file_error_msg file_error_msg
) )
@ -895,52 +900,36 @@ async def background_delete_documents(
no_file_msg = f"No valid file path found for document {doc_id}" no_file_msg = f"No valid file path found for document {doc_id}"
logger.warning(no_file_msg) logger.warning(no_file_msg)
async with pipeline_status_lock: async with pipeline_status_lock:
pipeline_status["latest_message"] = no_file_msg
pipeline_status["history_messages"].append(no_file_msg) pipeline_status["history_messages"].append(no_file_msg)
else: else:
failed_deletions.append(doc_id) failed_deletions.append(doc_id)
error_msg = f"Failed to delete document {i}/{total_docs}: {doc_id} - {result.message}" error_msg = f"Failed to delete {i}/{total_docs}: {doc_id}[{file_path}] - {result.message}"
logger.error(error_msg) logger.error(error_msg)
async with pipeline_status_lock: async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(error_msg) pipeline_status["history_messages"].append(error_msg)
except Exception as e: except Exception as e:
failed_deletions.append(doc_id) failed_deletions.append(doc_id)
error_msg = ( error_msg = f"Error deleting document {i}/{total_docs}: {doc_id}[{file_path}] - {str(e)}"
f"Error deleting document {i}/{total_docs}: {doc_id} - {str(e)}"
)
logger.error(error_msg) logger.error(error_msg)
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
async with pipeline_status_lock: async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(error_msg) pipeline_status["history_messages"].append(error_msg)
# Final summary
summary_msg = f"Deletion completed: {len(successful_deletions)} successful, {len(failed_deletions)} failed"
logger.info(summary_msg)
async with pipeline_status_lock:
pipeline_status["history_messages"].append(summary_msg)
if successful_deletions:
pipeline_status["history_messages"].append(
f"Successfully deleted: {', '.join(successful_deletions)}"
)
if failed_deletions:
pipeline_status["history_messages"].append(
f"Failed to delete: {', '.join(failed_deletions)}"
)
except Exception as e: except Exception as e:
error_msg = f"Critical error during batch deletion: {str(e)}" error_msg = f"Critical error during batch deletion: {str(e)}"
logger.error(error_msg) logger.error(error_msg)
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
async with pipeline_status_lock: async with pipeline_status_lock:
pipeline_status["history_messages"].append(error_msg) pipeline_status["history_messages"].append(error_msg)
finally: finally:
# Final summary
async with pipeline_status_lock: async with pipeline_status_lock:
pipeline_status["busy"] = False pipeline_status["busy"] = False
completion_msg = "Document deletion process completed." completion_msg = f"Deletion completed: {len(successful_deletions)} successful, {len(failed_deletions)} failed"
pipeline_status["latest_message"] = completion_msg pipeline_status["latest_message"] = completion_msg
pipeline_status["history_messages"].append(completion_msg) pipeline_status["history_messages"].append(completion_msg)

View File

@ -1712,6 +1712,7 @@ class LightRAG:
try: try:
# 1. Get the document status and related data # 1. Get the document status and related data
doc_status_data = await self.doc_status.get_by_id(doc_id) doc_status_data = await self.doc_status.get_by_id(doc_id)
file_path = doc_status_data.get("file_path") if doc_status_data else None
if not doc_status_data: if not doc_status_data:
logger.warning(f"Document {doc_id} not found") logger.warning(f"Document {doc_id} not found")
return DeletionResult( return DeletionResult(
@ -1719,6 +1720,7 @@ class LightRAG:
doc_id=doc_id, doc_id=doc_id,
message=f"Document {doc_id} not found.", message=f"Document {doc_id} not found.",
status_code=404, status_code=404,
file_path="",
) )
# 2. Get all chunks related to this document # 2. Get all chunks related to this document
@ -1770,6 +1772,7 @@ class LightRAG:
doc_id=doc_id, doc_id=doc_id,
message=log_message, message=log_message,
status_code=200, status_code=200,
file_path=file_path,
) )
chunk_ids = set(related_chunks.keys()) chunk_ids = set(related_chunks.keys())
@ -1962,9 +1965,6 @@ class LightRAG:
logger.error(f"Failed to delete document and status: {e}") logger.error(f"Failed to delete document and status: {e}")
raise Exception(f"Failed to delete document and status: {e}") from e raise Exception(f"Failed to delete document and status: {e}") from e
# Get file path from document status for return value
file_path = doc_status_data.get("file_path") if doc_status_data else None
return DeletionResult( return DeletionResult(
status="success", status="success",
doc_id=doc_id, doc_id=doc_id,
@ -1983,6 +1983,7 @@ class LightRAG:
doc_id=doc_id, doc_id=doc_id,
message=error_message, message=error_message,
status_code=500, status_code=500,
file_path=file_path,
) )
finally: finally:
@ -2002,6 +2003,7 @@ class LightRAG:
doc_id=doc_id, doc_id=doc_id,
message=f"Deletion completed but failed to persist changes: {persistence_error}", message=f"Deletion completed but failed to persist changes: {persistence_error}",
status_code=500, status_code=500,
file_path=file_path,
) )
# If there was an original exception, log the persistence error but don't override the original error # If there was an original exception, log the persistence error but don't override the original error
# The original error result was already returned in the except block # The original error result was already returned in the except block