Merge branch 'main' into pick-trunk-by-vector

This commit is contained in:
yangdx 2025-08-14 06:19:57 +08:00
commit a3f7bc5b7e

View File

@ -1116,6 +1116,48 @@ class LightRAG:
return track_id
async def _validate_and_fix_document_consistency(
self, to_process_docs: dict[str, DocProcessingStatus]
) -> dict[str, DocProcessingStatus]:
"""Validate and fix document data consistency"""
inconsistent_docs = []
for doc_id, status_doc in to_process_docs.items():
# Check if corresponding content exists in full_docs
content_data = await self.full_docs.get_by_id(doc_id)
if not content_data:
inconsistent_docs.append(doc_id)
logger.warning(
f"Document {doc_id} has status record but missing content in full_docs"
)
# Mark inconsistent documents as FAILED
if inconsistent_docs:
failed_updates = {}
for doc_id in inconsistent_docs:
status_doc = to_process_docs[doc_id]
failed_updates[doc_id] = {
"status": DocStatus.FAILED,
"error_msg": "Document content not found in full_docs storage - data inconsistency detected",
"content_summary": status_doc.content_summary,
"content_length": status_doc.content_length,
"created_at": status_doc.created_at,
"updated_at": datetime.now(timezone.utc).isoformat(),
"file_path": status_doc.file_path,
"track_id": status_doc.track_id,
}
await self.doc_status.upsert(failed_updates)
logger.info(
f"Marked {len(inconsistent_docs)} inconsistent documents as FAILED"
)
# Remove these documents from the processing list
for doc_id in inconsistent_docs:
to_process_docs.pop(doc_id, None)
return to_process_docs
async def apipeline_process_enqueue_documents(
self,
split_by_character: str | None = None,
@ -1127,9 +1169,10 @@ class LightRAG:
document status.
1. Get all pending, failed, and abnormally terminated processing documents.
2. Split document content into chunks
3. Process each chunk for entity and relation extraction
4. Update the document status
2. Validate document data consistency and fix any issues
3. Split document content into chunks
4. Process each chunk for entity and relation extraction
5. Update the document status
"""
# Get pipeline status shared data and lock
@ -1155,6 +1198,15 @@ class LightRAG:
logger.info("No documents to process")
return
# Validate document data consistency and fix any issues
to_process_docs = await self._validate_and_fix_document_consistency(
to_process_docs
)
if not to_process_docs:
logger.info("No valid documents to process after consistency check")
return
pipeline_status.update(
{
"busy": True,