mirror of
https://github.com/HKUDS/LightRAG.git
synced 2025-06-26 22:00:19 +00:00
Add comprehensive error handling for document deletion
This commit is contained in:
parent
922484915b
commit
8a365533d7
@ -1703,9 +1703,15 @@ class LightRAG:
|
||||
- `message` (str): A summary of the operation's result.
|
||||
- `status_code` (int): HTTP status code (e.g., 200, 404, 500).
|
||||
"""
|
||||
deletion_operations_started = False
|
||||
original_exception = None
|
||||
|
||||
try:
|
||||
logger.info(f"Starting deletion process for document {doc_id}")
|
||||
|
||||
# 1. Get the document status and related data
|
||||
if not await self.doc_status.get_by_id(doc_id):
|
||||
doc_status_data = await self.doc_status.get_by_id(doc_id)
|
||||
if not doc_status_data:
|
||||
logger.warning(f"Document {doc_id} not found")
|
||||
return DeletionResult(
|
||||
status="not_found",
|
||||
@ -1717,19 +1723,32 @@ class LightRAG:
|
||||
logger.info(f"Starting optimized deletion for document {doc_id}")
|
||||
|
||||
# 2. Get all chunks related to this document
|
||||
all_chunks = await self.text_chunks.get_all()
|
||||
related_chunks = {
|
||||
chunk_id: chunk_data
|
||||
for chunk_id, chunk_data in all_chunks.items()
|
||||
if isinstance(chunk_data, dict)
|
||||
and chunk_data.get("full_doc_id") == doc_id
|
||||
}
|
||||
try:
|
||||
all_chunks = await self.text_chunks.get_all()
|
||||
related_chunks = {
|
||||
chunk_id: chunk_data
|
||||
for chunk_id, chunk_data in all_chunks.items()
|
||||
if isinstance(chunk_data, dict)
|
||||
and chunk_data.get("full_doc_id") == doc_id
|
||||
}
|
||||
logger.info(f"Retrieved {len(all_chunks)} total chunks, {len(related_chunks)} related to document {doc_id}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to retrieve chunks for document {doc_id}: {e}")
|
||||
raise Exception(f"Failed to retrieve document chunks: {e}") from e
|
||||
|
||||
if not related_chunks:
|
||||
logger.warning(f"No chunks found for document {doc_id}")
|
||||
# Still need to delete the doc status and full doc
|
||||
await self.full_docs.delete([doc_id])
|
||||
await self.doc_status.delete([doc_id])
|
||||
# Mark that deletion operations have started
|
||||
deletion_operations_started = True
|
||||
try:
|
||||
# Still need to delete the doc status and full doc
|
||||
await self.full_docs.delete([doc_id])
|
||||
await self.doc_status.delete([doc_id])
|
||||
logger.info(f"Deleted document {doc_id} with no associated chunks")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to delete document {doc_id} with no chunks: {e}")
|
||||
raise Exception(f"Failed to delete document entry: {e}") from e
|
||||
|
||||
return DeletionResult(
|
||||
status="success",
|
||||
doc_id=doc_id,
|
||||
@ -1740,17 +1759,8 @@ class LightRAG:
|
||||
chunk_ids = set(related_chunks.keys())
|
||||
logger.info(f"Found {len(chunk_ids)} chunks to delete")
|
||||
|
||||
# # 3. **OPTIMIZATION 1**: Clear LLM cache for related chunks
|
||||
# logger.info("Clearing LLM cache for related chunks...")
|
||||
# cache_cleared = await self.llm_response_cache.drop_cache_by_chunk_ids(
|
||||
# list(chunk_ids)
|
||||
# )
|
||||
# if cache_cleared:
|
||||
# logger.info(f"Successfully cleared cache for {len(chunk_ids)} chunks")
|
||||
# else:
|
||||
# logger.warning(
|
||||
# "Failed to clear chunk cache or cache clearing not supported"
|
||||
# )
|
||||
# Mark that deletion operations have started
|
||||
deletion_operations_started = True
|
||||
|
||||
# 4. Analyze entities and relationships that will be affected
|
||||
entities_to_delete = set()
|
||||
@ -1761,122 +1771,151 @@ class LightRAG:
|
||||
# Use graph database lock to ensure atomic merges and updates
|
||||
graph_db_lock = get_graph_db_lock(enable_logging=False)
|
||||
async with graph_db_lock:
|
||||
# Get all affected nodes and edges in batch
|
||||
affected_nodes = (
|
||||
await self.chunk_entity_relation_graph.get_nodes_by_chunk_ids(
|
||||
list(chunk_ids)
|
||||
try:
|
||||
# Get all affected nodes and edges in batch
|
||||
logger.info(f"Analyzing affected entities and relationships for {len(chunk_ids)} chunks")
|
||||
affected_nodes = (
|
||||
await self.chunk_entity_relation_graph.get_nodes_by_chunk_ids(
|
||||
list(chunk_ids)
|
||||
)
|
||||
)
|
||||
)
|
||||
affected_edges = (
|
||||
await self.chunk_entity_relation_graph.get_edges_by_chunk_ids(
|
||||
list(chunk_ids)
|
||||
affected_edges = (
|
||||
await self.chunk_entity_relation_graph.get_edges_by_chunk_ids(
|
||||
list(chunk_ids)
|
||||
)
|
||||
)
|
||||
)
|
||||
logger.info(f"Found {len(affected_nodes)} affected nodes and {len(affected_edges)} affected edges")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to analyze affected graph elements: {e}")
|
||||
raise Exception(f"Failed to analyze graph dependencies: {e}") from e
|
||||
|
||||
# logger.info(f"chunk_ids: {chunk_ids}")
|
||||
# logger.info(f"affected_nodes: {affected_nodes}")
|
||||
# logger.info(f"affected_edges: {affected_edges}")
|
||||
try:
|
||||
# Process entities
|
||||
for node_data in affected_nodes:
|
||||
node_label = node_data.get("entity_id")
|
||||
if node_label and "source_id" in node_data:
|
||||
sources = set(node_data["source_id"].split(GRAPH_FIELD_SEP))
|
||||
remaining_sources = sources - chunk_ids
|
||||
|
||||
# Process entities
|
||||
for node_data in affected_nodes:
|
||||
node_label = node_data.get("entity_id")
|
||||
if node_label and "source_id" in node_data:
|
||||
sources = set(node_data["source_id"].split(GRAPH_FIELD_SEP))
|
||||
remaining_sources = sources - chunk_ids
|
||||
if not remaining_sources:
|
||||
entities_to_delete.add(node_label)
|
||||
elif remaining_sources != sources:
|
||||
entities_to_rebuild[node_label] = remaining_sources
|
||||
|
||||
if not remaining_sources:
|
||||
entities_to_delete.add(node_label)
|
||||
elif remaining_sources != sources:
|
||||
entities_to_rebuild[node_label] = remaining_sources
|
||||
# Process relationships
|
||||
for edge_data in affected_edges:
|
||||
src = edge_data.get("source")
|
||||
tgt = edge_data.get("target")
|
||||
|
||||
# Process relationships
|
||||
for edge_data in affected_edges:
|
||||
src = edge_data.get("source")
|
||||
tgt = edge_data.get("target")
|
||||
if src and tgt and "source_id" in edge_data:
|
||||
edge_tuple = tuple(sorted((src, tgt)))
|
||||
if (
|
||||
edge_tuple in relationships_to_delete
|
||||
or edge_tuple in relationships_to_rebuild
|
||||
):
|
||||
continue
|
||||
|
||||
if src and tgt and "source_id" in edge_data:
|
||||
edge_tuple = tuple(sorted((src, tgt)))
|
||||
if (
|
||||
edge_tuple in relationships_to_delete
|
||||
or edge_tuple in relationships_to_rebuild
|
||||
):
|
||||
continue
|
||||
sources = set(edge_data["source_id"].split(GRAPH_FIELD_SEP))
|
||||
remaining_sources = sources - chunk_ids
|
||||
|
||||
sources = set(edge_data["source_id"].split(GRAPH_FIELD_SEP))
|
||||
remaining_sources = sources - chunk_ids
|
||||
|
||||
if not remaining_sources:
|
||||
relationships_to_delete.add(edge_tuple)
|
||||
elif remaining_sources != sources:
|
||||
relationships_to_rebuild[edge_tuple] = remaining_sources
|
||||
if not remaining_sources:
|
||||
relationships_to_delete.add(edge_tuple)
|
||||
elif remaining_sources != sources:
|
||||
relationships_to_rebuild[edge_tuple] = remaining_sources
|
||||
|
||||
logger.info(f"Analysis complete: {len(entities_to_delete)} entities to delete, "
|
||||
f"{len(entities_to_rebuild)} entities to rebuild, "
|
||||
f"{len(relationships_to_delete)} relationships to delete, "
|
||||
f"{len(relationships_to_rebuild)} relationships to rebuild")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to process graph analysis results: {e}")
|
||||
raise Exception(f"Failed to process graph dependencies: {e}") from e
|
||||
|
||||
# 5. Delete chunks from storage
|
||||
if chunk_ids:
|
||||
await self.chunks_vdb.delete(chunk_ids)
|
||||
await self.text_chunks.delete(chunk_ids)
|
||||
logger.info(f"Deleted {len(chunk_ids)} chunks from storage")
|
||||
try:
|
||||
logger.info(f"Deleting {len(chunk_ids)} chunks from storage")
|
||||
await self.chunks_vdb.delete(chunk_ids)
|
||||
await self.text_chunks.delete(chunk_ids)
|
||||
logger.info(f"Successfully deleted {len(chunk_ids)} chunks from storage")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to delete chunks: {e}")
|
||||
raise Exception(f"Failed to delete document chunks: {e}") from e
|
||||
|
||||
# 6. Delete entities that have no remaining sources
|
||||
if entities_to_delete:
|
||||
# Delete from vector database
|
||||
entity_vdb_ids = [
|
||||
compute_mdhash_id(entity, prefix="ent-")
|
||||
for entity in entities_to_delete
|
||||
]
|
||||
await self.entities_vdb.delete(entity_vdb_ids)
|
||||
try:
|
||||
logger.info(f"Deleting {len(entities_to_delete)} entities")
|
||||
# Delete from vector database
|
||||
entity_vdb_ids = [
|
||||
compute_mdhash_id(entity, prefix="ent-")
|
||||
for entity in entities_to_delete
|
||||
]
|
||||
await self.entities_vdb.delete(entity_vdb_ids)
|
||||
|
||||
# Delete from graph
|
||||
await self.chunk_entity_relation_graph.remove_nodes(
|
||||
list(entities_to_delete)
|
||||
)
|
||||
logger.info(f"Deleted {len(entities_to_delete)} entities")
|
||||
# Delete from graph
|
||||
await self.chunk_entity_relation_graph.remove_nodes(
|
||||
list(entities_to_delete)
|
||||
)
|
||||
logger.info(f"Successfully deleted {len(entities_to_delete)} entities")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to delete entities: {e}")
|
||||
raise Exception(f"Failed to delete entities: {e}") from e
|
||||
|
||||
# 7. Delete relationships that have no remaining sources
|
||||
if relationships_to_delete:
|
||||
# Delete from vector database
|
||||
rel_ids_to_delete = []
|
||||
for src, tgt in relationships_to_delete:
|
||||
rel_ids_to_delete.extend(
|
||||
[
|
||||
compute_mdhash_id(src + tgt, prefix="rel-"),
|
||||
compute_mdhash_id(tgt + src, prefix="rel-"),
|
||||
]
|
||||
try:
|
||||
logger.info(f"Deleting {len(relationships_to_delete)} relationships")
|
||||
# Delete from vector database
|
||||
rel_ids_to_delete = []
|
||||
for src, tgt in relationships_to_delete:
|
||||
rel_ids_to_delete.extend(
|
||||
[
|
||||
compute_mdhash_id(src + tgt, prefix="rel-"),
|
||||
compute_mdhash_id(tgt + src, prefix="rel-"),
|
||||
]
|
||||
)
|
||||
await self.relationships_vdb.delete(rel_ids_to_delete)
|
||||
|
||||
# Delete from graph
|
||||
await self.chunk_entity_relation_graph.remove_edges(
|
||||
list(relationships_to_delete)
|
||||
)
|
||||
await self.relationships_vdb.delete(rel_ids_to_delete)
|
||||
logger.info(f"Successfully deleted {len(relationships_to_delete)} relationships")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to delete relationships: {e}")
|
||||
raise Exception(f"Failed to delete relationships: {e}") from e
|
||||
|
||||
# Delete from graph
|
||||
await self.chunk_entity_relation_graph.remove_edges(
|
||||
list(relationships_to_delete)
|
||||
)
|
||||
logger.info(f"Deleted {len(relationships_to_delete)} relationships")
|
||||
|
||||
# 8. **OPTIMIZATION 2**: Rebuild entities and relationships from remaining chunks
|
||||
# 8. Rebuild entities and relationships from remaining chunks
|
||||
if entities_to_rebuild or relationships_to_rebuild:
|
||||
logger.info(
|
||||
f"Rebuilding {len(entities_to_rebuild)} entities and {len(relationships_to_rebuild)} relationships..."
|
||||
)
|
||||
await _rebuild_knowledge_from_chunks(
|
||||
entities_to_rebuild=entities_to_rebuild,
|
||||
relationships_to_rebuild=relationships_to_rebuild,
|
||||
knowledge_graph_inst=self.chunk_entity_relation_graph,
|
||||
entities_vdb=self.entities_vdb,
|
||||
relationships_vdb=self.relationships_vdb,
|
||||
text_chunks=self.text_chunks,
|
||||
llm_response_cache=self.llm_response_cache,
|
||||
global_config=asdict(self),
|
||||
)
|
||||
try:
|
||||
logger.info(f"Rebuilding {len(entities_to_rebuild)} entities and {len(relationships_to_rebuild)} relationships")
|
||||
await _rebuild_knowledge_from_chunks(
|
||||
entities_to_rebuild=entities_to_rebuild,
|
||||
relationships_to_rebuild=relationships_to_rebuild,
|
||||
knowledge_graph_inst=self.chunk_entity_relation_graph,
|
||||
entities_vdb=self.entities_vdb,
|
||||
relationships_vdb=self.relationships_vdb,
|
||||
text_chunks=self.text_chunks,
|
||||
llm_response_cache=self.llm_response_cache,
|
||||
global_config=asdict(self),
|
||||
)
|
||||
logger.info(f"Successfully rebuilt {len(entities_to_rebuild)} entities and {len(relationships_to_rebuild)} relationships")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to rebuild knowledge from chunks: {e}")
|
||||
raise Exception(f"Failed to rebuild knowledge graph: {e}") from e
|
||||
|
||||
# 9. Delete original document and status
|
||||
await self.full_docs.delete([doc_id])
|
||||
await self.doc_status.delete([doc_id])
|
||||
|
||||
# 10. Ensure all indexes are updated
|
||||
await self._insert_done()
|
||||
|
||||
success_message = f"""Successfully deleted document {doc_id}.
|
||||
Deleted: {len(entities_to_delete)} entities, {len(relationships_to_delete)} relationships.
|
||||
Rebuilt: {len(entities_to_rebuild)} entities, {len(relationships_to_rebuild)} relationships."""
|
||||
try:
|
||||
logger.info(f"Deleting original document {doc_id} and its status")
|
||||
await self.full_docs.delete([doc_id])
|
||||
await self.doc_status.delete([doc_id])
|
||||
logger.info(f"Successfully deleted document {doc_id} and its status")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to delete document and status: {e}")
|
||||
raise Exception(f"Failed to delete document and status: {e}") from e
|
||||
|
||||
success_message = f"Successfully deleted document {doc_id}"
|
||||
logger.info(success_message)
|
||||
return DeletionResult(
|
||||
status="success",
|
||||
@ -1886,6 +1925,7 @@ Rebuilt: {len(entities_to_rebuild)} entities, {len(relationships_to_rebuild)} re
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
original_exception = e
|
||||
error_message = f"Error while deleting document {doc_id}: {e}"
|
||||
logger.error(error_message)
|
||||
logger.error(traceback.format_exc())
|
||||
@ -1895,6 +1935,31 @@ Rebuilt: {len(entities_to_rebuild)} entities, {len(relationships_to_rebuild)} re
|
||||
message=error_message,
|
||||
status_code=500,
|
||||
)
|
||||
|
||||
finally:
|
||||
# ALWAYS ensure persistence if any deletion operations were started
|
||||
if deletion_operations_started:
|
||||
try:
|
||||
logger.info(f"Ensuring data persistence for document {doc_id} deletion")
|
||||
await self._insert_done()
|
||||
logger.info(f"Data persistence completed successfully for document {doc_id} deletion")
|
||||
except Exception as persistence_error:
|
||||
persistence_error_msg = f"Failed to persist data after deletion attempt for {doc_id}: {persistence_error}"
|
||||
logger.error(persistence_error_msg)
|
||||
logger.error(traceback.format_exc())
|
||||
|
||||
# If there was no original exception, this persistence error becomes the main error
|
||||
if original_exception is None:
|
||||
return DeletionResult(
|
||||
status="fail",
|
||||
doc_id=doc_id,
|
||||
message=f"Deletion completed but failed to persist changes: {persistence_error}",
|
||||
status_code=500,
|
||||
)
|
||||
# If there was an original exception, log the persistence error but don't override the original error
|
||||
# The original error result was already returned in the except block
|
||||
else:
|
||||
logger.debug(f"No deletion operations were started for document {doc_id}, skipping persistence")
|
||||
|
||||
async def adelete_by_entity(self, entity_name: str) -> DeletionResult:
|
||||
"""Asynchronously delete an entity and all its relationships.
|
||||
|
Loading…
x
Reference in New Issue
Block a user