From 495d6c8ccef7a81d8fed35eddcc023b183525c38 Mon Sep 17 00:00:00 2001 From: yangdx Date: Wed, 25 Jun 2025 15:46:58 +0800 Subject: [PATCH] Improve the pipeline status message for document deletetion --- lightrag/api/routers/document_routes.py | 6 +- lightrag/lightrag.py | 144 ++++++++++++++---------- lightrag/operate.py | 6 +- 3 files changed, 87 insertions(+), 69 deletions(-) diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index 795f3ad6..361bba66 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -803,9 +803,7 @@ async def background_delete_document(rag: LightRAG, doc_id: str): } ) # Use slice assignment to clear the list in place - pipeline_status["history_messages"][:] = [ - f"Starting deletion for doc_id: {doc_id}" - ] + pipeline_status["history_messages"][:] = ["Starting document deletion process"] try: result = await rag.adelete_by_doc_id(doc_id) @@ -823,7 +821,7 @@ async def background_delete_document(rag: LightRAG, doc_id: str): finally: async with pipeline_status_lock: pipeline_status["busy"] = False - completion_msg = f"Document deletion process for {doc_id} completed." + completion_msg = "Document deletion process completed." pipeline_status["latest_message"] = completion_msg if "history_messages" in pipeline_status: pipeline_status["history_messages"].append(completion_msg) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 686c4570..ed4cc0d6 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1683,15 +1683,7 @@ class LightRAG: This method orchestrates a comprehensive deletion process for a given document ID. It ensures that not only the document itself but also all its derived and associated - data across different storage layers are removed. This includes: - 1. **Document and Status**: Deletes the document from `full_docs` and its status from `doc_status`. - 2. **Chunks**: Removes all associated text chunks from `chunks_vdb`. - 3. **Graph Data**: - - Deletes related entities from `entities_vdb`. - - Deletes related relationships from `relationships_vdb`. - - Removes corresponding nodes and edges from the `chunk_entity_relation_graph`. - 4. **Graph Reconstruction**: If entities or relationships are partially affected, it triggers - a reconstruction of their data from the remaining chunks to ensure consistency. + data across different storage layers are removed. If entities or relationships are partially affected, it triggers. Args: doc_id (str): The unique identifier of the document to be deleted. @@ -1706,9 +1698,17 @@ class LightRAG: deletion_operations_started = False original_exception = None - try: - logger.info(f"Starting deletion process for document {doc_id}") + # Get pipeline status shared data and lock for status updates + pipeline_status = await get_namespace_data("pipeline_status") + pipeline_status_lock = get_pipeline_status_lock() + async with pipeline_status_lock: + log_message = f"Starting deletion process for document {doc_id}" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) + + try: # 1. Get the document status and related data doc_status_data = await self.doc_status.get_by_id(doc_id) if not doc_status_data: @@ -1720,8 +1720,6 @@ class LightRAG: status_code=404, ) - logger.info(f"Starting optimized deletion for document {doc_id}") - # 2. Get all chunks related to this document try: all_chunks = await self.text_chunks.get_all() @@ -1731,9 +1729,14 @@ class LightRAG: if isinstance(chunk_data, dict) and chunk_data.get("full_doc_id") == doc_id } - logger.info( - f"Retrieved {len(all_chunks)} total chunks, {len(related_chunks)} related to document {doc_id}" - ) + + # Update pipeline status after getting chunks count + async with pipeline_status_lock: + log_message = f"Retrieved {len(related_chunks)} of {len(all_chunks)} related chunks" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) + except Exception as e: logger.error(f"Failed to retrieve chunks for document {doc_id}: {e}") raise Exception(f"Failed to retrieve document chunks: {e}") from e @@ -1753,16 +1756,22 @@ class LightRAG: ) raise Exception(f"Failed to delete document entry: {e}") from e + async with pipeline_status_lock: + log_message = ( + f"Document {doc_id} is deleted without associated chunks." + ) + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) + return DeletionResult( status="success", doc_id=doc_id, - message=f"Document {doc_id} found but had no associated chunks. Document entry deleted.", + message=log_message, status_code=200, ) chunk_ids = set(related_chunks.keys()) - logger.info(f"Found {len(chunk_ids)} chunks to delete") - # Mark that deletion operations have started deletion_operations_started = True @@ -1777,22 +1786,35 @@ class LightRAG: async with graph_db_lock: try: # Get all affected nodes and edges in batch - logger.info( - f"Analyzing affected entities and relationships for {len(chunk_ids)} chunks" - ) + # logger.info( + # f"Analyzing affected entities and relationships for {len(chunk_ids)} chunks" + # ) affected_nodes = ( await self.chunk_entity_relation_graph.get_nodes_by_chunk_ids( list(chunk_ids) ) ) + + # Update pipeline status after getting affected_nodes + async with pipeline_status_lock: + log_message = f"Found {len(affected_nodes)} affected entities" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) + affected_edges = ( await self.chunk_entity_relation_graph.get_edges_by_chunk_ids( list(chunk_ids) ) ) - logger.info( - f"Found {len(affected_nodes)} affected nodes and {len(affected_edges)} affected edges" - ) + + # Update pipeline status after getting affected_edges + async with pipeline_status_lock: + log_message = f"Found {len(affected_edges)} affected relations" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) + except Exception as e: logger.error(f"Failed to analyze affected graph elements: {e}") raise Exception(f"Failed to analyze graph dependencies: {e}") from e @@ -1831,12 +1853,6 @@ class LightRAG: elif remaining_sources != sources: relationships_to_rebuild[edge_tuple] = remaining_sources - logger.info( - f"Analysis complete: {len(entities_to_delete)} entities to delete, " - f"{len(entities_to_rebuild)} entities to rebuild, " - f"{len(relationships_to_delete)} relationships to delete, " - f"{len(relationships_to_rebuild)} relationships to rebuild" - ) except Exception as e: logger.error(f"Failed to process graph analysis results: {e}") raise Exception(f"Failed to process graph dependencies: {e}") from e @@ -1844,12 +1860,15 @@ class LightRAG: # 5. Delete chunks from storage if chunk_ids: try: - logger.info(f"Deleting {len(chunk_ids)} chunks from storage") await self.chunks_vdb.delete(chunk_ids) await self.text_chunks.delete(chunk_ids) - logger.info( - f"Successfully deleted {len(chunk_ids)} chunks from storage" - ) + + async with pipeline_status_lock: + log_message = f"Successfully deleted {len(chunk_ids)} chunks from storage" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) + except Exception as e: logger.error(f"Failed to delete chunks: {e}") raise Exception(f"Failed to delete document chunks: {e}") from e @@ -1857,7 +1876,6 @@ class LightRAG: # 6. Delete entities that have no remaining sources if entities_to_delete: try: - logger.info(f"Deleting {len(entities_to_delete)} entities") # Delete from vector database entity_vdb_ids = [ compute_mdhash_id(entity, prefix="ent-") @@ -1869,9 +1887,13 @@ class LightRAG: await self.chunk_entity_relation_graph.remove_nodes( list(entities_to_delete) ) - logger.info( - f"Successfully deleted {len(entities_to_delete)} entities" - ) + + async with pipeline_status_lock: + log_message = f"Successfully deleted {len(entities_to_delete)} entities" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) + except Exception as e: logger.error(f"Failed to delete entities: {e}") raise Exception(f"Failed to delete entities: {e}") from e @@ -1879,9 +1901,6 @@ class LightRAG: # 7. Delete relationships that have no remaining sources if relationships_to_delete: try: - logger.info( - f"Deleting {len(relationships_to_delete)} relationships" - ) # Delete from vector database rel_ids_to_delete = [] for src, tgt in relationships_to_delete: @@ -1897,9 +1916,13 @@ class LightRAG: await self.chunk_entity_relation_graph.remove_edges( list(relationships_to_delete) ) - logger.info( - f"Successfully deleted {len(relationships_to_delete)} relationships" - ) + + async with pipeline_status_lock: + log_message = f"Successfully deleted {len(relationships_to_delete)} relations" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) + except Exception as e: logger.error(f"Failed to delete relationships: {e}") raise Exception(f"Failed to delete relationships: {e}") from e @@ -1907,9 +1930,6 @@ class LightRAG: # 8. Rebuild entities and relationships from remaining chunks if entities_to_rebuild or relationships_to_rebuild: try: - logger.info( - f"Rebuilding {len(entities_to_rebuild)} entities and {len(relationships_to_rebuild)} relationships" - ) await _rebuild_knowledge_from_chunks( entities_to_rebuild=entities_to_rebuild, relationships_to_rebuild=relationships_to_rebuild, @@ -1920,9 +1940,13 @@ class LightRAG: llm_response_cache=self.llm_response_cache, global_config=asdict(self), ) - logger.info( - f"Successfully rebuilt {len(entities_to_rebuild)} entities and {len(relationships_to_rebuild)} relationships" - ) + + async with pipeline_status_lock: + log_message = f"Successfully rebuilt {len(entities_to_rebuild)} entities and {len(relationships_to_rebuild)} relations" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) + except Exception as e: logger.error(f"Failed to rebuild knowledge from chunks: {e}") raise Exception( @@ -1931,20 +1955,22 @@ class LightRAG: # 9. Delete original document and status try: - logger.info(f"Deleting original document {doc_id} and its status") await self.full_docs.delete([doc_id]) await self.doc_status.delete([doc_id]) - logger.info(f"Successfully deleted document {doc_id} and its status") except Exception as e: logger.error(f"Failed to delete document and status: {e}") raise Exception(f"Failed to delete document and status: {e}") from e - success_message = f"Successfully deleted document {doc_id}" - logger.info(success_message) + async with pipeline_status_lock: + log_message = f"Successfully deleted document {doc_id}" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) + return DeletionResult( status="success", doc_id=doc_id, - message=success_message, + message=log_message, status_code=200, ) @@ -1964,13 +1990,7 @@ class LightRAG: # ALWAYS ensure persistence if any deletion operations were started if deletion_operations_started: try: - logger.info( - f"Ensuring data persistence for document {doc_id} deletion" - ) await self._insert_done() - logger.info( - f"Data persistence completed successfully for document {doc_id} deletion" - ) except Exception as persistence_error: persistence_error_msg = f"Failed to persist data after deletion attempt for {doc_id}: {persistence_error}" logger.error(persistence_error_msg) diff --git a/lightrag/operate.py b/lightrag/operate.py index d5026203..77568161 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -270,7 +270,7 @@ async def _rebuild_knowledge_from_chunks( for chunk_ids in relationships_to_rebuild.values(): all_referenced_chunk_ids.update(chunk_ids) - logger.info( + logger.debug( f"Rebuilding knowledge from {len(all_referenced_chunk_ids)} cached chunk extractions" ) @@ -339,7 +339,7 @@ async def _rebuild_knowledge_from_chunks( except Exception as e: logger.error(f"Failed to rebuild relationship {src}-{tgt}: {e}") - logger.info("Completed rebuilding knowledge from cached extractions") + logger.debug("Completed rebuilding knowledge from cached extractions") async def _get_cached_extraction_results( @@ -368,7 +368,7 @@ async def _get_cached_extraction_results( extraction_result = cache_entry["return"] cached_results[chunk_id] = extraction_result - logger.info( + logger.debug( f"Found {len(cached_results)} cached extraction results for {len(chunk_ids)} chunk IDs" ) return cached_results