mirror of
https://github.com/HKUDS/LightRAG.git
synced 2025-12-10 14:27:47 +00:00
Update doc: concurrent explain
This commit is contained in:
parent
7945d7de59
commit
ab805b35c4
@ -1,281 +1,114 @@
|
||||
# LightRAG Multi-Document Processing: Concurrent Control Strategy Analysis
|
||||
## LightRAG Multi-Document Processing: Concurrent Control Strategy
|
||||
|
||||
LightRAG employs a multi-layered concurrent control strategy when processing multiple documents. This article provides an in-depth analysis of the concurrent control mechanisms at document level, chunk level, and LLM request level, helping you understand why specific concurrent behaviors occur.
|
||||
|
||||
## Overview
|
||||
|
||||
LightRAG's concurrent control is divided into three layers:
|
||||
|
||||
1. **Document-level concurrency**: Controls the number of documents processed simultaneously
|
||||
2. **Chunk-level concurrency**: Controls the number of chunks processed simultaneously within a single document
|
||||
3. **LLM request-level concurrency**: Controls the global concurrent number of LLM requests
|
||||
|
||||
## 1. Document-Level Concurrent Control
|
||||
### 1. Document-Level Concurrent Control
|
||||
|
||||
**Control Parameter**: `max_parallel_insert`
|
||||
|
||||
Document-level concurrency is controlled by the `max_parallel_insert` parameter, with a default value of 2.
|
||||
This parameter controls the number of documents processed simultaneously. The purpose is to prevent excessive parallelism from overwhelming system resources, which could lead to extended processing times for individual files. Document-level concurrency is governed by the `max_parallel_insert` attribute within LightRAG, which defaults to 2 and is configurable via the `MAX_PARALLEL_INSERT` environment variable.
|
||||
|
||||
```python
|
||||
# lightrag/lightrag.py
|
||||
max_parallel_insert: int = field(default=int(os.getenv("MAX_PARALLEL_INSERT", 2)))
|
||||
```
|
||||
|
||||
### Implementation Mechanism
|
||||
|
||||
In the `apipeline_process_enqueue_documents` method, a semaphore is used to control document concurrency:
|
||||
|
||||
```python
|
||||
# lightrag/lightrag.py - apipeline_process_enqueue_documents method
|
||||
async def process_document(
|
||||
doc_id: str,
|
||||
status_doc: DocProcessingStatus,
|
||||
split_by_character: str | None,
|
||||
split_by_character_only: bool,
|
||||
pipeline_status: dict,
|
||||
pipeline_status_lock: asyncio.Lock,
|
||||
semaphore: asyncio.Semaphore, # Document-level semaphore
|
||||
) -> None:
|
||||
"""Process single document"""
|
||||
async with semaphore: # 🔥 Document-level concurrent control
|
||||
# ... Process all chunks of a single document
|
||||
|
||||
# Create document-level semaphore
|
||||
semaphore = asyncio.Semaphore(self.max_parallel_insert) # Default 2
|
||||
|
||||
# Create processing tasks for each document
|
||||
doc_tasks = []
|
||||
for doc_id, status_doc in to_process_docs.items():
|
||||
doc_tasks.append(
|
||||
process_document(
|
||||
doc_id, status_doc, split_by_character, split_by_character_only,
|
||||
pipeline_status, pipeline_status_lock, semaphore
|
||||
)
|
||||
)
|
||||
|
||||
# Wait for all documents to complete processing
|
||||
await asyncio.gather(*doc_tasks)
|
||||
```
|
||||
|
||||
## 2. Chunk-Level Concurrent Control
|
||||
### 2. Chunk-Level Concurrent Control
|
||||
|
||||
**Control Parameter**: `llm_model_max_async`
|
||||
|
||||
**Key Point**: Each document independently creates its own chunk semaphore!
|
||||
|
||||
```python
|
||||
# lightrag/lightrag.py
|
||||
llm_model_max_async: int = field(default=int(os.getenv("MAX_ASYNC", 4)))
|
||||
```
|
||||
|
||||
### Implementation Mechanism
|
||||
|
||||
In the `extract_entities` function, **each document independently creates** its own chunk semaphore:
|
||||
|
||||
```python
|
||||
# lightrag/operate.py - extract_entities function
|
||||
async def extract_entities(chunks: dict[str, TextChunkSchema], global_config: dict[str, str], ...):
|
||||
# 🔥 Key: Each document independently creates this semaphore!
|
||||
llm_model_max_async = global_config.get("llm_model_max_async", 4)
|
||||
semaphore = asyncio.Semaphore(llm_model_max_async) # Chunk semaphore for each document
|
||||
|
||||
async def _process_with_semaphore(chunk):
|
||||
async with semaphore: # 🔥 Chunk concurrent control within document
|
||||
return await _process_single_content(chunk)
|
||||
|
||||
# Create tasks for each chunk
|
||||
tasks = []
|
||||
for c in ordered_chunks:
|
||||
task = asyncio.create_task(_process_with_semaphore(c))
|
||||
tasks.append(task)
|
||||
|
||||
# Wait for all chunks to complete processing
|
||||
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
|
||||
chunk_results = [task.result() for task in tasks]
|
||||
return chunk_results
|
||||
```
|
||||
|
||||
### Important Inference: System Overall Chunk Concurrency
|
||||
|
||||
Since each document independently creates chunk semaphores, the theoretical chunk concurrency of the system is:
|
||||
|
||||
**Theoretical Chunk Concurrency = max_parallel_insert × llm_model_max_async**
|
||||
This parameter controls the number of chunks processed simultaneously in the extraction stage within a document. The purpose is to prevent a high volume of concurrent requests from monopolizing LLM processing resources, which would impede the efficient parallel processing of multiple files. Chunk-Level Concurrent Control is governed by the `llm_model_max_async` attribute within LightRAG, which defaults to 4 and is configurable via the `MAX_ASYNC` environment variable. The purpose of this parameter is to fully leverage the LLM's concurrency capabilities when processing individual documents.
|
||||
|
||||
In the `extract_entities` function, **each document independently creates** its own chunk semaphore. Since each document independently creates chunk semaphores, the theoretical chunk concurrency of the system is:
|
||||
$$
|
||||
ChunkConcurrency = Max Parallel Insert × LLM Model Max Async
|
||||
$$
|
||||
For example:
|
||||
- `max_parallel_insert = 2` (process 2 documents simultaneously)
|
||||
- `llm_model_max_async = 4` (maximum 4 chunk concurrency per document)
|
||||
- **Theoretical result**: Maximum 2 × 4 = 8 chunks simultaneously in "processing" state
|
||||
- Theoretical chunk-level concurrent: 2 × 4 = 8
|
||||
|
||||
## 3. LLM Request-Level Concurrent Control (The Real Bottleneck)
|
||||
### 3. Graph-Level Concurrent Control
|
||||
|
||||
**Control Parameter**: `llm_model_max_async` (globally shared)
|
||||
**Control Parameter**: `llm_model_max_async * 2`
|
||||
|
||||
**Key**: Although there might be 8 chunks "in processing", all LLM requests share the same global priority queue!
|
||||
This parameter controls the number of entities and relations processed simultaneously in the merging stage within a document. The purpose is to prevent a high volume of concurrent requests from monopolizing LLM processing resources, which would impede the efficient parallel processing of multiple files. Graph-level concurrency is governed by the `llm_model_max_async` attribute within LightRAG, which defaults to 4 and is configurable via the `MAX_ASYNC` environment variable. Graph-level parallelism control parameters are equally applicable to managing parallelism during the entity relationship reconstruction phase after document deletion.
|
||||
|
||||
Given that the entity relationship merging phase doesn't necessitate LLM interaction for every operation, its parallelism is set at double the LLM's parallelism. This optimizes machine utilization while concurrently preventing excessive queuing resource contention for the LLM.
|
||||
|
||||
### 4. LLM-Level Concurrent Control
|
||||
|
||||
**Control Parameter**: `llm_model_max_async`
|
||||
|
||||
This parameter governs the **concurrent volume** of LLM requests dispatched by the entire LightRAG system, encompassing the document extraction stage, merging stage, and user query handling.
|
||||
|
||||
LLM request prioritization is managed via a global priority queue, which **systematically prioritizes user queries** over merging-related requests, and merging-related requests over extraction-related requests. This strategic prioritization **minimizes user query latency**.
|
||||
|
||||
LLM-level concurrency is governed by the `llm_model_max_async` attribute within LightRAG, which defaults to 4 and is configurable via the `MAX_ASYNC` environment variable.
|
||||
|
||||
### 5. Complete Concurrent Hierarchy Diagram
|
||||
|
||||
```mermaid
|
||||
graph TD
|
||||
classDef doc fill:#e6f3ff,stroke:#5b9bd5,stroke-width:2px;
|
||||
classDef chunk fill:#fbe5d6,stroke:#ed7d31,stroke-width:1px;
|
||||
classDef merge fill:#e2f0d9,stroke:#70ad47,stroke-width:2px;
|
||||
|
||||
A["Multiple Documents<br>max_parallel_insert = 2"] --> A1
|
||||
A --> B1
|
||||
|
||||
A1[DocA: split to n chunks] --> A_chunk;
|
||||
B1[DocB: split to m chunks] --> B_chunk;
|
||||
|
||||
subgraph A_chunk[Extraction Stage]
|
||||
A_chunk_title[Entity Relation Extraction<br>llm_model_max_async = 4];
|
||||
A_chunk_title --> A_chunk1[Chunk A1]:::chunk;
|
||||
A_chunk_title --> A_chunk2[Chunk A2]:::chunk;
|
||||
A_chunk_title --> A_chunk3[Chunk A3]:::chunk;
|
||||
A_chunk_title --> A_chunk4[Chunk A4]:::chunk;
|
||||
A_chunk1 & A_chunk2 & A_chunk3 & A_chunk4 --> A_chunk_done([Extraction Complete]);
|
||||
end
|
||||
|
||||
subgraph B_chunk[Extraction Stage]
|
||||
B_chunk_title[Entity Relation Extraction<br>llm_model_max_async = 4];
|
||||
B_chunk_title --> B_chunk1[Chunk B1]:::chunk;
|
||||
B_chunk_title --> B_chunk2[Chunk B2]:::chunk;
|
||||
B_chunk_title --> B_chunk3[Chunk B3]:::chunk;
|
||||
B_chunk_title --> B_chunk4[Chunk B4]:::chunk;
|
||||
B_chunk1 & B_chunk2 & B_chunk3 & B_chunk4 --> B_chunk_done([Extraction Complete]);
|
||||
end
|
||||
A_chunk -.->|LLM Request| LLM_Queue;
|
||||
|
||||
A_chunk --> A_merge;
|
||||
B_chunk --> B_merge;
|
||||
|
||||
subgraph A_merge[Merge Stage]
|
||||
A_merge_title[Entity Relation Merging<br>llm_model_max_async * 2 = 8];
|
||||
A_merge_title --> A1_entity[Ent a1]:::merge;
|
||||
A_merge_title --> A2_entity[Ent a2]:::merge;
|
||||
A_merge_title --> A3_entity[Rel a3]:::merge;
|
||||
A_merge_title --> A4_entity[Rel a4]:::merge;
|
||||
A1_entity & A2_entity & A3_entity & A4_entity --> A_done([Merge Complete])
|
||||
end
|
||||
|
||||
subgraph B_merge[Merge Stage]
|
||||
B_merge_title[Entity Relation Merging<br>llm_model_max_async * 2 = 8];
|
||||
B_merge_title --> B1_entity[Ent b1]:::merge;
|
||||
B_merge_title --> B2_entity[Ent b2]:::merge;
|
||||
B_merge_title --> B3_entity[Rel b3]:::merge;
|
||||
B_merge_title --> B4_entity[Rel b4]:::merge;
|
||||
B1_entity & B2_entity & B3_entity & B4_entity --> B_done([Merge Complete])
|
||||
end
|
||||
|
||||
A_merge -.->|LLM Request| LLM_Queue["LLM Request Prioritized Queue<br>llm_model_max_async = 4"];
|
||||
B_merge -.->|LLM Request| LLM_Queue;
|
||||
B_chunk -.->|LLM Request| LLM_Queue;
|
||||
|
||||
```python
|
||||
# lightrag/lightrag.py - __post_init__ method
|
||||
self.llm_model_func = priority_limit_async_func_call(self.llm_model_max_async)(
|
||||
partial(
|
||||
self.llm_model_func,
|
||||
hashing_kv=hashing_kv,
|
||||
**self.llm_model_kwargs,
|
||||
)
|
||||
)
|
||||
# 🔥 Global LLM queue size = llm_model_max_async = 4
|
||||
```
|
||||
|
||||
### Priority Queue Implementation
|
||||
> The extraction and merge stages share a global prioritized LLM queue, regulated by `llm_model_max_async`. While numerous entity and relation extraction and merging operations may be "actively processing", **only a limited number will concurrently execute LLM requests** the remainder will be queued and awaiting their turn.
|
||||
|
||||
```python
|
||||
# lightrag/utils.py - priority_limit_async_func_call function
|
||||
def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
|
||||
def final_decro(func):
|
||||
queue = asyncio.PriorityQueue(maxsize=max_queue_size)
|
||||
tasks = set()
|
||||
### 6. Performance Optimization Recommendations
|
||||
|
||||
async def worker():
|
||||
"""Worker that processes tasks in the priority queue"""
|
||||
while not shutdown_event.is_set():
|
||||
try:
|
||||
priority, count, future, args, kwargs = await asyncio.wait_for(queue.get(), timeout=1.0)
|
||||
result = await func(*args, **kwargs) # 🔥 Actual LLM call
|
||||
if not future.done():
|
||||
future.set_result(result)
|
||||
except Exception as e:
|
||||
# Error handling...
|
||||
finally:
|
||||
queue.task_done()
|
||||
* **Increase LLM Concurrent Setting based on the capabilities of your LLM server or API provider**
|
||||
|
||||
# 🔥 Create fixed number of workers (max_size), this is the real concurrency limit
|
||||
for _ in range(max_size):
|
||||
task = asyncio.create_task(worker())
|
||||
tasks.add(task)
|
||||
```
|
||||
During the file processing phase, the performance and concurrency capabilities of the LLM are critical bottlenecks. When deploying LLMs locally, the service's concurrency capacity must adequately account for the context length requirements of LightRAG. LightRAG recommends that LLMs support a minimum context length of 32KB; therefore, server concurrency should be calculated based on this benchmark. For API providers, LightRAG will retry requests up to three times if the client's request is rejected due to concurrent request limits. Backend logs can be used to determine if LLM retries are occurring, thereby indicating whether `MAX_ASYNC` has exceeded the API provider's limits.
|
||||
|
||||
## 4. Chunk Internal Processing Mechanism (Serial)
|
||||
* **Align Parallel Document Insertion Settings with LLM Concurrency Configurations**
|
||||
|
||||
### Why Serial?
|
||||
|
||||
Internal processing of each chunk strictly follows this serial execution order:
|
||||
|
||||
```python
|
||||
# lightrag/operate.py - _process_single_content function
|
||||
async def _process_single_content(chunk_key_dp: tuple[str, TextChunkSchema]):
|
||||
# Step 1: Initial entity extraction
|
||||
hint_prompt = entity_extract_prompt.format(**{**context_base, "input_text": content})
|
||||
final_result = await use_llm_func_with_cache(hint_prompt, use_llm_func, ...)
|
||||
|
||||
# Process initial extraction results
|
||||
maybe_nodes, maybe_edges = await _process_extraction_result(final_result, chunk_key, file_path)
|
||||
|
||||
# Step 2: Gleaning phase
|
||||
for now_glean_index in range(entity_extract_max_gleaning):
|
||||
# 🔥 Serial wait for gleaning results
|
||||
glean_result = await use_llm_func_with_cache(
|
||||
continue_prompt, use_llm_func,
|
||||
llm_response_cache=llm_response_cache,
|
||||
history_messages=history, cache_type="extract"
|
||||
)
|
||||
|
||||
# Process gleaning results
|
||||
glean_nodes, glean_edges = await _process_extraction_result(glean_result, chunk_key, file_path)
|
||||
|
||||
# Merge results...
|
||||
|
||||
# Step 3: Determine whether to continue loop
|
||||
if now_glean_index == entity_extract_max_gleaning - 1:
|
||||
break
|
||||
|
||||
# 🔥 Serial wait for loop decision results
|
||||
if_loop_result = await use_llm_func_with_cache(
|
||||
if_loop_prompt, use_llm_func,
|
||||
llm_response_cache=llm_response_cache,
|
||||
history_messages=history, cache_type="extract"
|
||||
)
|
||||
|
||||
if if_loop_result.strip().strip('"').strip("'").lower() != "yes":
|
||||
break
|
||||
|
||||
return maybe_nodes, maybe_edges
|
||||
```
|
||||
|
||||
## 5. Complete Concurrent Hierarchy Diagram
|
||||

|
||||
|
||||
### Chunk Internal Processing (Serial)
|
||||
```
|
||||
Initial Extraction → Gleaning → Loop Decision → Complete
|
||||
```
|
||||
|
||||
## 6. Real-World Scenario Analysis
|
||||
|
||||
### Scenario 1: Single Document with Multiple Chunks
|
||||
Assume 1 document with 6 chunks:
|
||||
|
||||
- **Document level**: Only 1 document, not limited by `max_parallel_insert`
|
||||
- **Chunk level**: Maximum 4 chunks processed simultaneously (limited by `llm_model_max_async=4`)
|
||||
- **LLM level**: Global maximum 4 LLM requests concurrent
|
||||
|
||||
**Expected behavior**: 4 chunks process concurrently, remaining 2 chunks wait.
|
||||
|
||||
### Scenario 2: Multiple Documents with Multiple Chunks
|
||||
Assume 3 documents, each with 10 chunks:
|
||||
|
||||
- **Document level**: Maximum 2 documents processed simultaneously
|
||||
- **Chunk level**: Maximum 4 chunks per document processed simultaneously
|
||||
- **Theoretical Chunk concurrency**: 2 × 4 = 8 chunks processed simultaneously
|
||||
- **Actual LLM concurrency**: Only 4 LLM requests actually execute
|
||||
|
||||
**Actual state distribution**:
|
||||
```
|
||||
# Possible system state:
|
||||
Document 1: 4 chunks "processing" (2 executing LLM, 2 waiting for LLM response)
|
||||
Document 2: 4 chunks "processing" (2 executing LLM, 2 waiting for LLM response)
|
||||
Document 3: Waiting for document-level semaphore
|
||||
|
||||
Total:
|
||||
- 8 chunks in "processing" state
|
||||
- 4 LLM requests actually executing
|
||||
- 4 chunks waiting for LLM response
|
||||
```
|
||||
|
||||
## 7. Performance Optimization Recommendations
|
||||
|
||||
### Understanding the Bottleneck
|
||||
|
||||
The real bottleneck is the global LLM queue, not the chunk semaphores!
|
||||
|
||||
### Adjustment Strategies
|
||||
|
||||
**Strategy 1: Increase LLM Concurrent Capacity**
|
||||
|
||||
```bash
|
||||
# Environment variable configuration
|
||||
export MAX_PARALLEL_INSERT=2 # Keep document concurrency
|
||||
export MAX_ASYNC=8 # 🔥 Increase LLM request concurrency
|
||||
```
|
||||
|
||||
**Strategy 2: Balance Document and LLM Concurrency**
|
||||
|
||||
```python
|
||||
rag = LightRAG(
|
||||
max_parallel_insert=3, # Moderately increase document concurrency
|
||||
llm_model_max_async=12, # Significantly increase LLM concurrency
|
||||
entity_extract_max_gleaning=0, # Reduce serial steps within chunks
|
||||
)
|
||||
```
|
||||
|
||||
## 8. Summary
|
||||
|
||||
Key characteristics of LightRAG's multi-document concurrent processing mechanism:
|
||||
|
||||
### Concurrent Layers
|
||||
1. **Inter-document competition**: Controlled by `max_parallel_insert`, default 2 documents concurrent
|
||||
2. **Theoretical Chunk concurrency**: Each document independently creates semaphores, total = max_parallel_insert × llm_model_max_async
|
||||
3. **Actual LLM concurrency**: All chunks share global LLM queue, controlled by `llm_model_max_async`
|
||||
4. **Intra-chunk serial**: Multiple LLM requests within each chunk execute strictly serially
|
||||
|
||||
### Key Insights
|
||||
- **Theoretical vs Actual**: System may have many chunks "in processing", but only few are actually executing LLM requests
|
||||
- **Real Bottleneck**: Global LLM request queue is the performance bottleneck, not chunk semaphores
|
||||
- **Optimization Focus**: Increasing `llm_model_max_async` is more effective than increasing `max_parallel_insert`
|
||||
The recommended number of parallel document processing tasks is 1/4 of the LLM's concurrency, with a minimum of 2 and a maximum of 10. Setting a higher number of parallel document processing tasks typically does not accelerate overall document processing speed, as even a small number of concurrently processed documents can fully utilize the LLM's parallel processing capabilities. Excessive parallel document processing can significantly increase the processing time for each individual document. Since LightRAG commits processing results on a file-by-file basis, a large number of concurrent files would necessitate caching a substantial amount of data. In the event of a system error, all documents in the middle stage would require reprocessing, thereby increasing error handling costs. For instance, setting `MAX_PARALLEL_INSERT` to 3 is appropriate when `MAX_ASYNC` is configured to 12.
|
||||
|
||||
Binary file not shown.
|
Before Width: | Height: | Size: 183 KiB |
@ -1,277 +0,0 @@
|
||||
# LightRAG 多文档并发控制机制详解
|
||||
|
||||
LightRAG 在处理多个文档时采用了多层次的并发控制策略。本文将深入分析文档级别、chunk级别和LLM请求级别的并发控制机制,帮助您理解为什么会出现特定的并发行为。
|
||||
|
||||
## 概述
|
||||
|
||||
LightRAG 的并发控制分为三个层次:
|
||||
|
||||
1. 文档级别并发:控制同时处理的文档数量
|
||||
2. Chunk级别并发:控制单个文档内同时处理的chunk数量
|
||||
3. LLM请求级别并发:控制全局LLM请求的并发数量
|
||||
|
||||
## 1. 文档级别并发控制
|
||||
|
||||
**控制参数**:`max_parallel_insert`
|
||||
|
||||
文档级别的并发由 `max_parallel_insert` 参数控制,默认值为2。
|
||||
|
||||
```python
|
||||
# lightrag/lightrag.py
|
||||
max_parallel_insert: int = field(default=int(os.getenv("MAX_PARALLEL_INSERT", 2)))
|
||||
```
|
||||
|
||||
### 实现机制
|
||||
|
||||
在 `apipeline_process_enqueue_documents` 方法中,使用信号量控制文档并发:
|
||||
|
||||
```python
|
||||
# lightrag/lightrag.py - apipeline_process_enqueue_documents方法
|
||||
async def process_document(
|
||||
doc_id: str,
|
||||
status_doc: DocProcessingStatus,
|
||||
split_by_character: str | None,
|
||||
split_by_character_only: bool,
|
||||
pipeline_status: dict,
|
||||
pipeline_status_lock: asyncio.Lock,
|
||||
semaphore: asyncio.Semaphore, # 文档级别信号量
|
||||
) -> None:
|
||||
"""Process single document"""
|
||||
async with semaphore: # 🔥 文档级别并发控制
|
||||
# ... 处理单个文档的所有chunks
|
||||
|
||||
# 创建文档级别信号量
|
||||
semaphore = asyncio.Semaphore(self.max_parallel_insert) # 默认2
|
||||
|
||||
# 为每个文档创建处理任务
|
||||
doc_tasks = []
|
||||
for doc_id, status_doc in to_process_docs.items():
|
||||
doc_tasks.append(
|
||||
process_document(
|
||||
doc_id, status_doc, split_by_character, split_by_character_only,
|
||||
pipeline_status, pipeline_status_lock, semaphore
|
||||
)
|
||||
)
|
||||
|
||||
# 等待所有文档处理完成
|
||||
await asyncio.gather(*doc_tasks)
|
||||
```
|
||||
|
||||
## 2. Chunk级别并发控制
|
||||
|
||||
**控制参数**:`llm_model_max_async`
|
||||
|
||||
**关键点**:每个文档都会独立创建自己的chunk信号量!
|
||||
|
||||
```python
|
||||
# lightrag/lightrag.py
|
||||
llm_model_max_async: int = field(default=int(os.getenv("MAX_ASYNC", 4)))
|
||||
```
|
||||
|
||||
### 实现机制
|
||||
|
||||
在 `extract_entities` 函数中,**每个文档独立创建**自己的chunk信号量:
|
||||
|
||||
```python
|
||||
# lightrag/operate.py - extract_entities函数
|
||||
async def extract_entities(chunks: dict[str, TextChunkSchema], global_config: dict[str, str], ...):
|
||||
# 🔥 关键:每个文档都会独立创建这个信号量!
|
||||
llm_model_max_async = global_config.get("llm_model_max_async", 4)
|
||||
semaphore = asyncio.Semaphore(llm_model_max_async) # 每个文档的chunk信号量
|
||||
|
||||
async def _process_with_semaphore(chunk):
|
||||
async with semaphore: # 🔥 文档内部的chunk并发控制
|
||||
return await _process_single_content(chunk)
|
||||
|
||||
# 为每个chunk创建任务
|
||||
tasks = []
|
||||
for c in ordered_chunks:
|
||||
task = asyncio.create_task(_process_with_semaphore(c))
|
||||
tasks.append(task)
|
||||
|
||||
# 等待所有chunk处理完成
|
||||
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
|
||||
chunk_results = [task.result() for task in tasks]
|
||||
return chunk_results
|
||||
```
|
||||
|
||||
### 重要推论:系统整体Chunk并发数
|
||||
|
||||
由于每个文档独立创建chunk信号量,系统理论上的chunk并发数是:
|
||||
|
||||
**理论Chunk并发数 = max_parallel_insert × llm_model_max_async**
|
||||
|
||||
例如:
|
||||
- `max_parallel_insert = 2`(同时处理2个文档)
|
||||
- `llm_model_max_async = 4`(每个文档最多4个chunk并发)
|
||||
- 理论结果:最多 2 × 4 = 8个chunk同时处于"处理中"状态
|
||||
|
||||
## 3. LLM请求级别并发控制(真正的瓶颈)
|
||||
|
||||
**控制参数**:`llm_model_max_async`(全局共享)
|
||||
|
||||
**关键**:尽管可能有8个chunk在"处理中",但所有LLM请求共享同一个全局优先级队列!
|
||||
|
||||
```python
|
||||
# lightrag/lightrag.py - __post_init__方法
|
||||
self.llm_model_func = priority_limit_async_func_call(self.llm_model_max_async)(
|
||||
partial(
|
||||
self.llm_model_func,
|
||||
hashing_kv=hashing_kv,
|
||||
**self.llm_model_kwargs,
|
||||
)
|
||||
)
|
||||
# 🔥 全局LLM队列大小 = llm_model_max_async = 4
|
||||
```
|
||||
|
||||
### 优先级队列实现
|
||||
|
||||
```python
|
||||
# lightrag/utils.py - priority_limit_async_func_call函数
|
||||
def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
|
||||
def final_decro(func):
|
||||
queue = asyncio.PriorityQueue(maxsize=max_queue_size)
|
||||
tasks = set()
|
||||
|
||||
async def worker():
|
||||
"""Worker that processes tasks in the priority queue"""
|
||||
while not shutdown_event.is_set():
|
||||
try:
|
||||
priority, count, future, args, kwargs = await asyncio.wait_for(queue.get(), timeout=1.0)
|
||||
result = await func(*args, **kwargs) # 🔥 实际LLM调用
|
||||
if not future.done():
|
||||
future.set_result(result)
|
||||
except Exception as e:
|
||||
# 错误处理...
|
||||
finally:
|
||||
queue.task_done()
|
||||
|
||||
# 🔥 创建固定数量的worker(max_size个),这是真正的并发限制
|
||||
for _ in range(max_size):
|
||||
task = asyncio.create_task(worker())
|
||||
tasks.add(task)
|
||||
```
|
||||
|
||||
## 4. Chunk内部处理机制(串行)
|
||||
|
||||
### 为什么是串行?
|
||||
|
||||
每个chunk内部的处理严格按照以下顺序串行执行:
|
||||
|
||||
```python
|
||||
# lightrag/operate.py - _process_single_content函数
|
||||
async def _process_single_content(chunk_key_dp: tuple[str, TextChunkSchema]):
|
||||
# 步骤1:初始实体提取
|
||||
hint_prompt = entity_extract_prompt.format(**{**context_base, "input_text": content})
|
||||
final_result = await use_llm_func_with_cache(hint_prompt, use_llm_func, ...)
|
||||
|
||||
# 处理初始提取结果
|
||||
maybe_nodes, maybe_edges = await _process_extraction_result(final_result, chunk_key, file_path)
|
||||
|
||||
# 步骤2:Gleaning(深挖)阶段
|
||||
for now_glean_index in range(entity_extract_max_gleaning):
|
||||
# 🔥 串行等待gleaning结果
|
||||
glean_result = await use_llm_func_with_cache(
|
||||
continue_prompt, use_llm_func,
|
||||
llm_response_cache=llm_response_cache,
|
||||
history_messages=history, cache_type="extract"
|
||||
)
|
||||
|
||||
# 处理gleaning结果
|
||||
glean_nodes, glean_edges = await _process_extraction_result(glean_result, chunk_key, file_path)
|
||||
|
||||
# 合并结果...
|
||||
|
||||
# 步骤3:判断是否继续循环
|
||||
if now_glean_index == entity_extract_max_gleaning - 1:
|
||||
break
|
||||
|
||||
# 🔥 串行等待循环判断结果
|
||||
if_loop_result = await use_llm_func_with_cache(
|
||||
if_loop_prompt, use_llm_func,
|
||||
llm_response_cache=llm_response_cache,
|
||||
history_messages=history, cache_type="extract"
|
||||
)
|
||||
|
||||
if if_loop_result.strip().strip('"').strip("'").lower() != "yes":
|
||||
break
|
||||
|
||||
return maybe_nodes, maybe_edges
|
||||
```
|
||||
|
||||
## 5. 完整的并发层次图
|
||||

|
||||
|
||||
|
||||
## 6. 实际运行场景分析
|
||||
|
||||
### 场景1:单文档多Chunk
|
||||
假设有1个文档,包含6个chunks:
|
||||
|
||||
- 文档级别:只有1个文档,不受 `max_parallel_insert` 限制
|
||||
- Chunk级别:最多4个chunks同时处理(受 `llm_model_max_async=4` 限制)
|
||||
- LLM级别:全局最多4个LLM请求并发
|
||||
|
||||
**预期行为**:4个chunks并发处理,剩余2个chunks等待。
|
||||
|
||||
### 场景2:多文档多Chunk
|
||||
假设有3个文档,每个文档包含10个chunks:
|
||||
|
||||
- 文档级别:最多2个文档同时处理
|
||||
- Chunk级别:每个文档最多4个chunks同时处理
|
||||
- 理论Chunk并发:2 × 4 = 8个chunks同时处理
|
||||
- 实际LLM并发:只有4个LLM请求真正执行
|
||||
|
||||
**实际状态分布**:
|
||||
```
|
||||
# 可能的系统状态:
|
||||
文档1: 4个chunks"处理中"(其中2个在执行LLM,2个在等待LLM响应)
|
||||
文档2: 4个chunks"处理中"(其中2个在执行LLM,2个在等待LLM响应)
|
||||
文档3: 等待文档级别信号量
|
||||
|
||||
总计:
|
||||
- 8个chunks处于"处理中"状态
|
||||
- 4个LLM请求真正执行
|
||||
- 4个chunks等待LLM响应
|
||||
```
|
||||
|
||||
## 7. 性能优化建议
|
||||
|
||||
### 理解瓶颈
|
||||
|
||||
**真正的瓶颈是全局LLM队列,而不是chunk信号量!**
|
||||
|
||||
### 调整策略
|
||||
|
||||
**策略1:提高LLM并发能力**
|
||||
|
||||
```bash
|
||||
# 环境变量配置
|
||||
export MAX_PARALLEL_INSERT=2 # 保持文档并发
|
||||
export MAX_ASYNC=8 # 🔥 增加LLM请求并发数
|
||||
```
|
||||
|
||||
**策略2:平衡文档和LLM并发**
|
||||
|
||||
```python
|
||||
rag = LightRAG(
|
||||
max_parallel_insert=3, # 适度增加文档并发
|
||||
llm_model_max_async=12, # 大幅增加LLM并发
|
||||
entity_extract_max_gleaning=0, # 减少chunk内串行步骤
|
||||
)
|
||||
```
|
||||
|
||||
## 8. 总结
|
||||
|
||||
LightRAG的多文档并发处理机制的关键特点:
|
||||
|
||||
### 并发层次
|
||||
1. **文档间争抢**:受 `max_parallel_insert` 控制,默认2个文档并发
|
||||
2. **理论Chunk并发**:每个文档独立创建信号量,总数 = `max_parallel_insert × llm_model_max_async`
|
||||
3. **实际LLM并发**:所有chunk共享全局LLM队列,受 `llm_model_max_async` 控制
|
||||
4. **单Chunk内串行**:每个chunk内的多个LLM请求严格串行执行
|
||||
|
||||
### 关键洞察
|
||||
- **理论vs实际**:系统可能有很多chunk在"处理中",但只有少数在真正执行LLM请求
|
||||
- **真正瓶颈**:全局LLM请求队列是性能瓶颈,而不是chunk信号量
|
||||
- **优化重点**:提高 `llm_model_max_async` 比增加 `max_parallel_insert` 更有效
|
||||
Loading…
x
Reference in New Issue
Block a user