Refac: Add KG rebuild logging with pipeline status

- Logs detailed progress, including warnings and failures, to the pipeline status.
- Adds counters to report the total number of successfully rebuilt entities and relationships upon completion.
This commit is contained in:
yangdx 2025-06-29 21:27:12 +08:00
parent 1800a169b6
commit e70f5a35e5
2 changed files with 58 additions and 18 deletions

View File

@ -1943,14 +1943,10 @@ class LightRAG:
text_chunks=self.text_chunks,
llm_response_cache=self.llm_response_cache,
global_config=asdict(self),
pipeline_status=pipeline_status,
pipeline_status_lock=pipeline_status_lock,
)
async with pipeline_status_lock:
log_message = f"Successfully rebuilt {len(entities_to_rebuild)} entities and {len(relationships_to_rebuild)} relations"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
except Exception as e:
logger.error(f"Failed to rebuild knowledge from chunks: {e}")
raise Exception(

View File

@ -250,6 +250,8 @@ async def _rebuild_knowledge_from_chunks(
text_chunks: BaseKVStorage,
llm_response_cache: BaseKVStorage,
global_config: dict[str, str],
pipeline_status: dict | None = None,
pipeline_status_lock=None,
) -> None:
"""Rebuild entity and relationship descriptions from cached extraction results
@ -262,6 +264,8 @@ async def _rebuild_knowledge_from_chunks(
"""
if not entities_to_rebuild and not relationships_to_rebuild:
return
rebuilt_entities_count = 0
rebuilt_relationships_count = 0
# Get all referenced chunk IDs
all_referenced_chunk_ids = set()
@ -270,9 +274,12 @@ async def _rebuild_knowledge_from_chunks(
for chunk_ids in relationships_to_rebuild.values():
all_referenced_chunk_ids.update(chunk_ids)
logger.debug(
f"Rebuilding knowledge from {len(all_referenced_chunk_ids)} cached chunk extractions"
)
status_message = f"Rebuilding knowledge from {len(all_referenced_chunk_ids)} cached chunk extractions"
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
# Get cached extraction results for these chunks
cached_results = await _get_cached_extraction_results(
@ -280,7 +287,12 @@ async def _rebuild_knowledge_from_chunks(
)
if not cached_results:
logger.warning("No cached extraction results found, cannot rebuild")
status_message = "No cached extraction results found, cannot rebuild"
logger.warning(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
return
# Process cached results to get entities and relationships for each chunk
@ -297,9 +309,14 @@ async def _rebuild_knowledge_from_chunks(
chunk_entities[chunk_id] = entities
chunk_relationships[chunk_id] = relationships
except Exception as e:
logger.error(
status_message = (
f"Failed to parse cached extraction result for chunk {chunk_id}: {e}"
)
logger.info(status_message) # Per requirement, change to info
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
continue
# Rebuild entities
@ -314,11 +331,22 @@ async def _rebuild_knowledge_from_chunks(
llm_response_cache=llm_response_cache,
global_config=global_config,
)
logger.debug(
f"Rebuilt entity {entity_name} from {len(chunk_ids)} cached extractions"
rebuilt_entities_count += 1
status_message = (
f"Rebuilt entity: {entity_name} from {len(chunk_ids)} chunks"
)
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
except Exception as e:
logger.error(f"Failed to rebuild entity {entity_name}: {e}")
status_message = f"Failed to rebuild entity {entity_name}: {e}"
logger.info(status_message) # Per requirement, change to info
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
# Rebuild relationships
for (src, tgt), chunk_ids in relationships_to_rebuild.items():
@ -333,13 +361,29 @@ async def _rebuild_knowledge_from_chunks(
llm_response_cache=llm_response_cache,
global_config=global_config,
)
logger.debug(
f"Rebuilt relationship {src}-{tgt} from {len(chunk_ids)} cached extractions"
rebuilt_relationships_count += 1
status_message = (
f"Rebuilt relationship: {src}->{tgt} from {len(chunk_ids)} chunks"
)
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
except Exception as e:
logger.error(f"Failed to rebuild relationship {src}-{tgt}: {e}")
status_message = f"Failed to rebuild relationship {src}->{tgt}: {e}"
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
logger.debug("Completed rebuilding knowledge from cached extractions")
status_message = f"KG rebuild completed: {rebuilt_entities_count} entities and {rebuilt_relationships_count} relationships."
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
async def _get_cached_extraction_results(