From 05bc5cfb64f23e05c13ecea6bd7743b116b62ef6 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 19 Jul 2025 10:14:22 +0800 Subject: [PATCH] Improve task execution with early failure detection - Add early failure detection for async tasks - Cancel pending tasks on first exception --- lightrag/lightrag.py | 2 +- lightrag/operate.py | 42 ++++++++++++++++++++++++++++++++++++++---- 2 files changed, 39 insertions(+), 5 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 092e06eb..4adb7eda 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1127,7 +1127,7 @@ class LightRAG: } ) - # Concurrency is controlled by graph db lock for individual entities and relationships + # Concurrency is controlled by keyed lock for individual entities and relationships if file_extraction_stage_ok: try: # Get chunk_results from entity_relation_task diff --git a/lightrag/operate.py b/lightrag/operate.py index 5e1cc1b6..909d55a8 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -480,8 +480,25 @@ async def _rebuild_knowledge_from_chunks( pipeline_status["latest_message"] = status_message pipeline_status["history_messages"].append(status_message) - # Execute all tasks in parallel with semaphore control - await asyncio.gather(*tasks) + # Execute all tasks in parallel with semaphore control and early failure detection + done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION) + + # Check if any task raised an exception + for task in done: + if task.exception(): + # If a task failed, cancel all pending tasks + for pending_task in pending: + pending_task.cancel() + + # Wait for cancellation to complete + if pending: + await asyncio.wait(pending) + + # Re-raise the exception to notify the caller + raise task.exception() + + # If all tasks completed successfully, collect results + # (No need to collect results since these tasks don't return values) # Final status report status_message = f"KG rebuild completed: {rebuilt_entities_count} entities and {rebuilt_relationships_count} relationships rebuilt successfully." @@ -1313,8 +1330,25 @@ async def merge_nodes_and_edges( for edge_key, edges in all_edges.items(): tasks.append(asyncio.create_task(_locked_process_edges(edge_key, edges))) - # Execute all tasks in parallel with semaphore control - await asyncio.gather(*tasks) + # Execute all tasks in parallel with semaphore control and early failure detection + done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION) + + # Check if any task raised an exception + for task in done: + if task.exception(): + # If a task failed, cancel all pending tasks + for pending_task in pending: + pending_task.cancel() + + # Wait for cancellation to complete + if pending: + await asyncio.wait(pending) + + # Re-raise the exception to notify the caller + raise task.exception() + + # If all tasks completed successfully, collect results + # (No need to collect results since these tasks don't return values) async def extract_entities(