feat: Ensure thread safety for graph write operations

Add a lock to delete, adelete_by_entity, and adelete_by_relation methods to prevent race conditions and ensure data consistency during concurrent modifications to the knowledge graph.
This commit is contained in:
yangdx 2025-06-23 09:57:56 +08:00
parent c947b20bb1
commit 9fae0eadff

View File

@ -35,6 +35,7 @@ from lightrag.kg import (
from lightrag.kg.shared_storage import (
get_namespace_data,
get_pipeline_status_lock,
get_graph_db_lock,
)
from .base import (
@ -1728,109 +1729,113 @@ class LightRAG:
relationships_to_delete = set()
relationships_to_rebuild = {} # (src, tgt) -> remaining_chunk_ids
# Process entities
all_labels = await self.chunk_entity_relation_graph.get_all_labels()
for node_label in all_labels:
node_data = await self.chunk_entity_relation_graph.get_node(node_label)
if node_data and "source_id" in node_data:
# Split source_id using GRAPH_FIELD_SEP
sources = set(node_data["source_id"].split(GRAPH_FIELD_SEP))
remaining_sources = sources - chunk_ids
# Use graph database lock to ensure atomic merges and updates
graph_db_lock = get_graph_db_lock(enable_logging=False)
async with graph_db_lock:
if not remaining_sources:
entities_to_delete.add(node_label)
logger.debug(
f"Entity {node_label} marked for deletion - no remaining sources"
)
elif remaining_sources != sources:
# Entity needs to be rebuilt from remaining chunks
entities_to_rebuild[node_label] = remaining_sources
logger.debug(
f"Entity {node_label} will be rebuilt from {len(remaining_sources)} remaining chunks"
)
# Process entities
all_labels = await self.chunk_entity_relation_graph.get_all_labels()
for node_label in all_labels:
node_data = await self.chunk_entity_relation_graph.get_node(node_label)
if node_data and "source_id" in node_data:
# Split source_id using GRAPH_FIELD_SEP
sources = set(node_data["source_id"].split(GRAPH_FIELD_SEP))
remaining_sources = sources - chunk_ids
# Process relationships
for node_label in all_labels:
node_edges = await self.chunk_entity_relation_graph.get_node_edges(
node_label
)
if node_edges:
for src, tgt in node_edges:
edge_data = await self.chunk_entity_relation_graph.get_edge(
src, tgt
)
if edge_data and "source_id" in edge_data:
# Split source_id using GRAPH_FIELD_SEP
sources = set(edge_data["source_id"].split(GRAPH_FIELD_SEP))
remaining_sources = sources - chunk_ids
if not remaining_sources:
entities_to_delete.add(node_label)
logger.debug(
f"Entity {node_label} marked for deletion - no remaining sources"
)
elif remaining_sources != sources:
# Entity needs to be rebuilt from remaining chunks
entities_to_rebuild[node_label] = remaining_sources
logger.debug(
f"Entity {node_label} will be rebuilt from {len(remaining_sources)} remaining chunks"
)
if not remaining_sources:
relationships_to_delete.add((src, tgt))
logger.debug(
f"Relationship {src}-{tgt} marked for deletion - no remaining sources"
)
elif remaining_sources != sources:
# Relationship needs to be rebuilt from remaining chunks
relationships_to_rebuild[(src, tgt)] = remaining_sources
logger.debug(
f"Relationship {src}-{tgt} will be rebuilt from {len(remaining_sources)} remaining chunks"
)
# 5. Delete chunks from storage
if chunk_ids:
await self.chunks_vdb.delete(chunk_ids)
await self.text_chunks.delete(chunk_ids)
logger.info(f"Deleted {len(chunk_ids)} chunks from storage")
# 6. Delete entities that have no remaining sources
if entities_to_delete:
# Delete from vector database
entity_vdb_ids = [
compute_mdhash_id(entity, prefix="ent-")
for entity in entities_to_delete
]
await self.entities_vdb.delete(entity_vdb_ids)
# Delete from graph
await self.chunk_entity_relation_graph.remove_nodes(
list(entities_to_delete)
)
logger.info(f"Deleted {len(entities_to_delete)} entities")
# 7. Delete relationships that have no remaining sources
if relationships_to_delete:
# Delete from vector database
rel_ids_to_delete = []
for src, tgt in relationships_to_delete:
rel_ids_to_delete.extend(
[
compute_mdhash_id(src + tgt, prefix="rel-"),
compute_mdhash_id(tgt + src, prefix="rel-"),
]
# Process relationships
for node_label in all_labels:
node_edges = await self.chunk_entity_relation_graph.get_node_edges(
node_label
)
await self.relationships_vdb.delete(rel_ids_to_delete)
if node_edges:
for src, tgt in node_edges:
edge_data = await self.chunk_entity_relation_graph.get_edge(
src, tgt
)
if edge_data and "source_id" in edge_data:
# Split source_id using GRAPH_FIELD_SEP
sources = set(edge_data["source_id"].split(GRAPH_FIELD_SEP))
remaining_sources = sources - chunk_ids
# Delete from graph
await self.chunk_entity_relation_graph.remove_edges(
list(relationships_to_delete)
)
logger.info(f"Deleted {len(relationships_to_delete)} relationships")
if not remaining_sources:
relationships_to_delete.add((src, tgt))
logger.debug(
f"Relationship {src}-{tgt} marked for deletion - no remaining sources"
)
elif remaining_sources != sources:
# Relationship needs to be rebuilt from remaining chunks
relationships_to_rebuild[(src, tgt)] = remaining_sources
logger.debug(
f"Relationship {src}-{tgt} will be rebuilt from {len(remaining_sources)} remaining chunks"
)
# 8. **OPTIMIZATION 2**: Rebuild entities and relationships from remaining chunks
if entities_to_rebuild or relationships_to_rebuild:
logger.info(
f"Rebuilding {len(entities_to_rebuild)} entities and {len(relationships_to_rebuild)} relationships..."
)
await _rebuild_knowledge_from_chunks(
entities_to_rebuild=entities_to_rebuild,
relationships_to_rebuild=relationships_to_rebuild,
knowledge_graph_inst=self.chunk_entity_relation_graph,
entities_vdb=self.entities_vdb,
relationships_vdb=self.relationships_vdb,
text_chunks=self.text_chunks,
llm_response_cache=self.llm_response_cache,
global_config=asdict(self),
)
# 5. Delete chunks from storage
if chunk_ids:
await self.chunks_vdb.delete(chunk_ids)
await self.text_chunks.delete(chunk_ids)
logger.info(f"Deleted {len(chunk_ids)} chunks from storage")
# 6. Delete entities that have no remaining sources
if entities_to_delete:
# Delete from vector database
entity_vdb_ids = [
compute_mdhash_id(entity, prefix="ent-")
for entity in entities_to_delete
]
await self.entities_vdb.delete(entity_vdb_ids)
# Delete from graph
await self.chunk_entity_relation_graph.remove_nodes(
list(entities_to_delete)
)
logger.info(f"Deleted {len(entities_to_delete)} entities")
# 7. Delete relationships that have no remaining sources
if relationships_to_delete:
# Delete from vector database
rel_ids_to_delete = []
for src, tgt in relationships_to_delete:
rel_ids_to_delete.extend(
[
compute_mdhash_id(src + tgt, prefix="rel-"),
compute_mdhash_id(tgt + src, prefix="rel-"),
]
)
await self.relationships_vdb.delete(rel_ids_to_delete)
# Delete from graph
await self.chunk_entity_relation_graph.remove_edges(
list(relationships_to_delete)
)
logger.info(f"Deleted {len(relationships_to_delete)} relationships")
# 8. **OPTIMIZATION 2**: Rebuild entities and relationships from remaining chunks
if entities_to_rebuild or relationships_to_rebuild:
logger.info(
f"Rebuilding {len(entities_to_rebuild)} entities and {len(relationships_to_rebuild)} relationships..."
)
await _rebuild_knowledge_from_chunks(
entities_to_rebuild=entities_to_rebuild,
relationships_to_rebuild=relationships_to_rebuild,
knowledge_graph_inst=self.chunk_entity_relation_graph,
entities_vdb=self.entities_vdb,
relationships_vdb=self.relationships_vdb,
text_chunks=self.text_chunks,
llm_response_cache=self.llm_response_cache,
global_config=asdict(self),
)
# 9. Delete original document and status
await self.full_docs.delete([doc_id])
@ -1857,12 +1862,15 @@ class LightRAG:
"""
from .utils_graph import adelete_by_entity
return await adelete_by_entity(
self.chunk_entity_relation_graph,
self.entities_vdb,
self.relationships_vdb,
entity_name,
)
# Use graph database lock to ensure atomic merges and updates
graph_db_lock = get_graph_db_lock(enable_logging=False)
async with graph_db_lock:
return await adelete_by_entity(
self.chunk_entity_relation_graph,
self.entities_vdb,
self.relationships_vdb,
entity_name,
)
def delete_by_entity(self, entity_name: str) -> None:
loop = always_get_an_event_loop()
@ -1877,12 +1885,15 @@ class LightRAG:
"""
from .utils_graph import adelete_by_relation
return await adelete_by_relation(
self.chunk_entity_relation_graph,
self.relationships_vdb,
source_entity,
target_entity,
)
# Use graph database lock to ensure atomic merges and updates
graph_db_lock = get_graph_db_lock(enable_logging=False)
async with graph_db_lock:
return await adelete_by_relation(
self.chunk_entity_relation_graph,
self.relationships_vdb,
source_entity,
target_entity,
)
def delete_by_relation(self, source_entity: str, target_entity: str) -> None:
loop = always_get_an_event_loop()