diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index b94709f2..967ba600 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1703,9 +1703,15 @@ class LightRAG: - `message` (str): A summary of the operation's result. - `status_code` (int): HTTP status code (e.g., 200, 404, 500). """ + deletion_operations_started = False + original_exception = None + try: + logger.info(f"Starting deletion process for document {doc_id}") + # 1. Get the document status and related data - if not await self.doc_status.get_by_id(doc_id): + doc_status_data = await self.doc_status.get_by_id(doc_id) + if not doc_status_data: logger.warning(f"Document {doc_id} not found") return DeletionResult( status="not_found", @@ -1717,19 +1723,32 @@ class LightRAG: logger.info(f"Starting optimized deletion for document {doc_id}") # 2. Get all chunks related to this document - all_chunks = await self.text_chunks.get_all() - related_chunks = { - chunk_id: chunk_data - for chunk_id, chunk_data in all_chunks.items() - if isinstance(chunk_data, dict) - and chunk_data.get("full_doc_id") == doc_id - } + try: + all_chunks = await self.text_chunks.get_all() + related_chunks = { + chunk_id: chunk_data + for chunk_id, chunk_data in all_chunks.items() + if isinstance(chunk_data, dict) + and chunk_data.get("full_doc_id") == doc_id + } + logger.info(f"Retrieved {len(all_chunks)} total chunks, {len(related_chunks)} related to document {doc_id}") + except Exception as e: + logger.error(f"Failed to retrieve chunks for document {doc_id}: {e}") + raise Exception(f"Failed to retrieve document chunks: {e}") from e if not related_chunks: logger.warning(f"No chunks found for document {doc_id}") - # Still need to delete the doc status and full doc - await self.full_docs.delete([doc_id]) - await self.doc_status.delete([doc_id]) + # Mark that deletion operations have started + deletion_operations_started = True + try: + # Still need to delete the doc status and full doc + await self.full_docs.delete([doc_id]) + await self.doc_status.delete([doc_id]) + logger.info(f"Deleted document {doc_id} with no associated chunks") + except Exception as e: + logger.error(f"Failed to delete document {doc_id} with no chunks: {e}") + raise Exception(f"Failed to delete document entry: {e}") from e + return DeletionResult( status="success", doc_id=doc_id, @@ -1740,17 +1759,8 @@ class LightRAG: chunk_ids = set(related_chunks.keys()) logger.info(f"Found {len(chunk_ids)} chunks to delete") - # # 3. **OPTIMIZATION 1**: Clear LLM cache for related chunks - # logger.info("Clearing LLM cache for related chunks...") - # cache_cleared = await self.llm_response_cache.drop_cache_by_chunk_ids( - # list(chunk_ids) - # ) - # if cache_cleared: - # logger.info(f"Successfully cleared cache for {len(chunk_ids)} chunks") - # else: - # logger.warning( - # "Failed to clear chunk cache or cache clearing not supported" - # ) + # Mark that deletion operations have started + deletion_operations_started = True # 4. Analyze entities and relationships that will be affected entities_to_delete = set() @@ -1761,122 +1771,151 @@ class LightRAG: # Use graph database lock to ensure atomic merges and updates graph_db_lock = get_graph_db_lock(enable_logging=False) async with graph_db_lock: - # Get all affected nodes and edges in batch - affected_nodes = ( - await self.chunk_entity_relation_graph.get_nodes_by_chunk_ids( - list(chunk_ids) + try: + # Get all affected nodes and edges in batch + logger.info(f"Analyzing affected entities and relationships for {len(chunk_ids)} chunks") + affected_nodes = ( + await self.chunk_entity_relation_graph.get_nodes_by_chunk_ids( + list(chunk_ids) + ) ) - ) - affected_edges = ( - await self.chunk_entity_relation_graph.get_edges_by_chunk_ids( - list(chunk_ids) + affected_edges = ( + await self.chunk_entity_relation_graph.get_edges_by_chunk_ids( + list(chunk_ids) + ) ) - ) + logger.info(f"Found {len(affected_nodes)} affected nodes and {len(affected_edges)} affected edges") + except Exception as e: + logger.error(f"Failed to analyze affected graph elements: {e}") + raise Exception(f"Failed to analyze graph dependencies: {e}") from e - # logger.info(f"chunk_ids: {chunk_ids}") - # logger.info(f"affected_nodes: {affected_nodes}") - # logger.info(f"affected_edges: {affected_edges}") + try: + # Process entities + for node_data in affected_nodes: + node_label = node_data.get("entity_id") + if node_label and "source_id" in node_data: + sources = set(node_data["source_id"].split(GRAPH_FIELD_SEP)) + remaining_sources = sources - chunk_ids - # Process entities - for node_data in affected_nodes: - node_label = node_data.get("entity_id") - if node_label and "source_id" in node_data: - sources = set(node_data["source_id"].split(GRAPH_FIELD_SEP)) - remaining_sources = sources - chunk_ids + if not remaining_sources: + entities_to_delete.add(node_label) + elif remaining_sources != sources: + entities_to_rebuild[node_label] = remaining_sources - if not remaining_sources: - entities_to_delete.add(node_label) - elif remaining_sources != sources: - entities_to_rebuild[node_label] = remaining_sources + # Process relationships + for edge_data in affected_edges: + src = edge_data.get("source") + tgt = edge_data.get("target") - # Process relationships - for edge_data in affected_edges: - src = edge_data.get("source") - tgt = edge_data.get("target") + if src and tgt and "source_id" in edge_data: + edge_tuple = tuple(sorted((src, tgt))) + if ( + edge_tuple in relationships_to_delete + or edge_tuple in relationships_to_rebuild + ): + continue - if src and tgt and "source_id" in edge_data: - edge_tuple = tuple(sorted((src, tgt))) - if ( - edge_tuple in relationships_to_delete - or edge_tuple in relationships_to_rebuild - ): - continue + sources = set(edge_data["source_id"].split(GRAPH_FIELD_SEP)) + remaining_sources = sources - chunk_ids - sources = set(edge_data["source_id"].split(GRAPH_FIELD_SEP)) - remaining_sources = sources - chunk_ids - - if not remaining_sources: - relationships_to_delete.add(edge_tuple) - elif remaining_sources != sources: - relationships_to_rebuild[edge_tuple] = remaining_sources + if not remaining_sources: + relationships_to_delete.add(edge_tuple) + elif remaining_sources != sources: + relationships_to_rebuild[edge_tuple] = remaining_sources + + logger.info(f"Analysis complete: {len(entities_to_delete)} entities to delete, " + f"{len(entities_to_rebuild)} entities to rebuild, " + f"{len(relationships_to_delete)} relationships to delete, " + f"{len(relationships_to_rebuild)} relationships to rebuild") + except Exception as e: + logger.error(f"Failed to process graph analysis results: {e}") + raise Exception(f"Failed to process graph dependencies: {e}") from e # 5. Delete chunks from storage if chunk_ids: - await self.chunks_vdb.delete(chunk_ids) - await self.text_chunks.delete(chunk_ids) - logger.info(f"Deleted {len(chunk_ids)} chunks from storage") + try: + logger.info(f"Deleting {len(chunk_ids)} chunks from storage") + await self.chunks_vdb.delete(chunk_ids) + await self.text_chunks.delete(chunk_ids) + logger.info(f"Successfully deleted {len(chunk_ids)} chunks from storage") + except Exception as e: + logger.error(f"Failed to delete chunks: {e}") + raise Exception(f"Failed to delete document chunks: {e}") from e # 6. Delete entities that have no remaining sources if entities_to_delete: - # Delete from vector database - entity_vdb_ids = [ - compute_mdhash_id(entity, prefix="ent-") - for entity in entities_to_delete - ] - await self.entities_vdb.delete(entity_vdb_ids) + try: + logger.info(f"Deleting {len(entities_to_delete)} entities") + # Delete from vector database + entity_vdb_ids = [ + compute_mdhash_id(entity, prefix="ent-") + for entity in entities_to_delete + ] + await self.entities_vdb.delete(entity_vdb_ids) - # Delete from graph - await self.chunk_entity_relation_graph.remove_nodes( - list(entities_to_delete) - ) - logger.info(f"Deleted {len(entities_to_delete)} entities") + # Delete from graph + await self.chunk_entity_relation_graph.remove_nodes( + list(entities_to_delete) + ) + logger.info(f"Successfully deleted {len(entities_to_delete)} entities") + except Exception as e: + logger.error(f"Failed to delete entities: {e}") + raise Exception(f"Failed to delete entities: {e}") from e # 7. Delete relationships that have no remaining sources if relationships_to_delete: - # Delete from vector database - rel_ids_to_delete = [] - for src, tgt in relationships_to_delete: - rel_ids_to_delete.extend( - [ - compute_mdhash_id(src + tgt, prefix="rel-"), - compute_mdhash_id(tgt + src, prefix="rel-"), - ] + try: + logger.info(f"Deleting {len(relationships_to_delete)} relationships") + # Delete from vector database + rel_ids_to_delete = [] + for src, tgt in relationships_to_delete: + rel_ids_to_delete.extend( + [ + compute_mdhash_id(src + tgt, prefix="rel-"), + compute_mdhash_id(tgt + src, prefix="rel-"), + ] + ) + await self.relationships_vdb.delete(rel_ids_to_delete) + + # Delete from graph + await self.chunk_entity_relation_graph.remove_edges( + list(relationships_to_delete) ) - await self.relationships_vdb.delete(rel_ids_to_delete) + logger.info(f"Successfully deleted {len(relationships_to_delete)} relationships") + except Exception as e: + logger.error(f"Failed to delete relationships: {e}") + raise Exception(f"Failed to delete relationships: {e}") from e - # Delete from graph - await self.chunk_entity_relation_graph.remove_edges( - list(relationships_to_delete) - ) - logger.info(f"Deleted {len(relationships_to_delete)} relationships") - - # 8. **OPTIMIZATION 2**: Rebuild entities and relationships from remaining chunks + # 8. Rebuild entities and relationships from remaining chunks if entities_to_rebuild or relationships_to_rebuild: - logger.info( - f"Rebuilding {len(entities_to_rebuild)} entities and {len(relationships_to_rebuild)} relationships..." - ) - await _rebuild_knowledge_from_chunks( - entities_to_rebuild=entities_to_rebuild, - relationships_to_rebuild=relationships_to_rebuild, - knowledge_graph_inst=self.chunk_entity_relation_graph, - entities_vdb=self.entities_vdb, - relationships_vdb=self.relationships_vdb, - text_chunks=self.text_chunks, - llm_response_cache=self.llm_response_cache, - global_config=asdict(self), - ) + try: + logger.info(f"Rebuilding {len(entities_to_rebuild)} entities and {len(relationships_to_rebuild)} relationships") + await _rebuild_knowledge_from_chunks( + entities_to_rebuild=entities_to_rebuild, + relationships_to_rebuild=relationships_to_rebuild, + knowledge_graph_inst=self.chunk_entity_relation_graph, + entities_vdb=self.entities_vdb, + relationships_vdb=self.relationships_vdb, + text_chunks=self.text_chunks, + llm_response_cache=self.llm_response_cache, + global_config=asdict(self), + ) + logger.info(f"Successfully rebuilt {len(entities_to_rebuild)} entities and {len(relationships_to_rebuild)} relationships") + except Exception as e: + logger.error(f"Failed to rebuild knowledge from chunks: {e}") + raise Exception(f"Failed to rebuild knowledge graph: {e}") from e # 9. Delete original document and status - await self.full_docs.delete([doc_id]) - await self.doc_status.delete([doc_id]) - - # 10. Ensure all indexes are updated - await self._insert_done() - - success_message = f"""Successfully deleted document {doc_id}. -Deleted: {len(entities_to_delete)} entities, {len(relationships_to_delete)} relationships. -Rebuilt: {len(entities_to_rebuild)} entities, {len(relationships_to_rebuild)} relationships.""" + try: + logger.info(f"Deleting original document {doc_id} and its status") + await self.full_docs.delete([doc_id]) + await self.doc_status.delete([doc_id]) + logger.info(f"Successfully deleted document {doc_id} and its status") + except Exception as e: + logger.error(f"Failed to delete document and status: {e}") + raise Exception(f"Failed to delete document and status: {e}") from e + success_message = f"Successfully deleted document {doc_id}" logger.info(success_message) return DeletionResult( status="success", @@ -1886,6 +1925,7 @@ Rebuilt: {len(entities_to_rebuild)} entities, {len(relationships_to_rebuild)} re ) except Exception as e: + original_exception = e error_message = f"Error while deleting document {doc_id}: {e}" logger.error(error_message) logger.error(traceback.format_exc()) @@ -1895,6 +1935,31 @@ Rebuilt: {len(entities_to_rebuild)} entities, {len(relationships_to_rebuild)} re message=error_message, status_code=500, ) + + finally: + # ALWAYS ensure persistence if any deletion operations were started + if deletion_operations_started: + try: + logger.info(f"Ensuring data persistence for document {doc_id} deletion") + await self._insert_done() + logger.info(f"Data persistence completed successfully for document {doc_id} deletion") + except Exception as persistence_error: + persistence_error_msg = f"Failed to persist data after deletion attempt for {doc_id}: {persistence_error}" + logger.error(persistence_error_msg) + logger.error(traceback.format_exc()) + + # If there was no original exception, this persistence error becomes the main error + if original_exception is None: + return DeletionResult( + status="fail", + doc_id=doc_id, + message=f"Deletion completed but failed to persist changes: {persistence_error}", + status_code=500, + ) + # If there was an original exception, log the persistence error but don't override the original error + # The original error result was already returned in the except block + else: + logger.debug(f"No deletion operations were started for document {doc_id}, skipping persistence") async def adelete_by_entity(self, entity_name: str) -> DeletionResult: """Asynchronously delete an entity and all its relationships.