Add env FORCE_LLM_SUMMARY_ON_MERGE

This commit is contained in:
yangdx 2025-04-10 17:29:07 +08:00
parent 35431644ad
commit 3007dff153
5 changed files with 74 additions and 50 deletions

View File

@ -43,11 +43,15 @@ WEBUI_DESCRIPTION="Simple and Fast Graph Based RAG System"
SUMMARY_LANGUAGE=English
# CHUNK_SIZE=1200
# CHUNK_OVERLAP_SIZE=100
### Max tokens for entity or relations summary
# MAX_TOKEN_SUMMARY=500
### Number of parallel processing documents in one patch
# MAX_PARALLEL_INSERT=2
### Max tokens for entity/relations description after merge
# MAX_TOKEN_SUMMARY=500
### Number of entities/edges to trigger LLM re-summary on merge ( at least 3 is recommented)
# FORCE_LLM_SUMMARY_ON_MERGE=6
### Num of chunks send to Embedding in single request
# EMBEDDING_BATCH_NUM=32
### Max concurrency requests for Embedding

View File

@ -261,8 +261,12 @@ def display_splash_screen(args: argparse.Namespace) -> None:
ASCIIColors.yellow(f"{args.chunk_overlap_size}")
ASCIIColors.white(" ├─ Cosine Threshold: ", end="")
ASCIIColors.yellow(f"{args.cosine_threshold}")
ASCIIColors.white(" ─ Top-K: ", end="")
ASCIIColors.white(" ─ Top-K: ", end="")
ASCIIColors.yellow(f"{args.top_k}")
ASCIIColors.white(" ├─ Max Token Summary: ", end="")
ASCIIColors.yellow(f"{int(os.getenv('MAX_TOKEN_SUMMARY', 500))}")
ASCIIColors.white(" └─ Force LLM Summary on Merge: ", end="")
ASCIIColors.yellow(f"{int(os.getenv('FORCE_LLM_SUMMARY_ON_MERGE', 6))}")
# System Configuration
ASCIIColors.magenta("\n💾 Storage Configuration:")

View File

@ -103,8 +103,10 @@ class LightRAG:
entity_extract_max_gleaning: int = field(default=1)
"""Maximum number of entity extraction attempts for ambiguous content."""
entity_summary_to_max_tokens: int = field(
default=int(os.getenv("MAX_TOKEN_SUMMARY", 500))
summary_to_max_tokens: int = field(default=int(os.getenv("MAX_TOKEN_SUMMARY", 500)))
force_llm_summary_on_merge: int = field(
default=int(os.getenv("FORCE_LLM_SUMMARY_ON_MERGE", 6))
)
# Text chunking

View File

@ -117,15 +117,13 @@ async def _handle_entity_relation_summary(
use_llm_func: callable = global_config["llm_model_func"]
llm_max_tokens = global_config["llm_model_max_token_size"]
tiktoken_model_name = global_config["tiktoken_model_name"]
summary_max_tokens = global_config["entity_summary_to_max_tokens"]
summary_max_tokens = global_config["summary_to_max_tokens"]
language = global_config["addon_params"].get(
"language", PROMPTS["DEFAULT_LANGUAGE"]
)
tokens = encode_string_by_tiktoken(description, model_name=tiktoken_model_name)
if len(tokens) < summary_max_tokens: # No need for summary
return description
prompt_template = PROMPTS["summarize_entity_descriptions"]
use_description = decode_tokens_by_tiktoken(
tokens[:llm_max_tokens], model_name=tiktoken_model_name
@ -138,14 +136,6 @@ async def _handle_entity_relation_summary(
use_prompt = prompt_template.format(**context_base)
logger.debug(f"Trigger summary: {entity_or_relation_name}")
# Update pipeline status when LLM summary is needed
status_message = " == Use LLM == to re-summary description..."
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
# Use LLM function with cache
summary = await use_llm_func_with_cache(
use_prompt,
@ -270,23 +260,34 @@ async def _merge_nodes_then_upsert(
set([dp["file_path"] for dp in nodes_data] + already_file_paths)
)
if len(nodes_data) > 1 or len(already_entity_types) > 0:
# Update pipeline status when a node that needs merging
status_message = f"Merging entity: {entity_name} | {len(nodes_data)}+{len(already_entity_types)}"
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"]
description = await _handle_entity_relation_summary(
entity_name,
description,
global_config,
pipeline_status,
pipeline_status_lock,
llm_response_cache,
)
num_fragment = description.count(GRAPH_FIELD_SEP) + 1
num_new_fragment = len(set([dp["description"] for dp in nodes_data]))
if num_fragment > 1:
if num_fragment >= force_llm_summary_on_merge:
status_message = f"LLM merge N: {entity_name} | {num_new_fragment}+{num_fragment-num_new_fragment}"
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
description = await _handle_entity_relation_summary(
entity_name,
description,
global_config,
pipeline_status,
pipeline_status_lock,
llm_response_cache,
)
else:
status_message = f"Merge N: {entity_name} | {num_new_fragment}+{num_fragment-num_new_fragment}"
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
node_data = dict(
entity_id=entity_name,
@ -398,23 +399,36 @@ async def _merge_edges_then_upsert(
},
)
if len(edges_data) > 1 or len(already_weights) > 0:
# Update pipeline status when a edge that needs merging
status_message = f"Merging edge::: {src_id} - {tgt_id} | {len(edges_data)}+{len(already_weights)}"
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"]
description = await _handle_entity_relation_summary(
f"({src_id}, {tgt_id})",
description,
global_config,
pipeline_status,
pipeline_status_lock,
llm_response_cache,
)
num_fragment = description.count(GRAPH_FIELD_SEP) + 1
num_new_fragment = len(
set([dp["description"] for dp in edges_data if dp.get("description")])
)
if num_fragment > 1:
if num_fragment >= force_llm_summary_on_merge:
status_message = f"LLM merge E: {src_id} - {tgt_id} | {num_new_fragment}+{num_fragment-num_new_fragment}"
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
description = await _handle_entity_relation_summary(
f"({src_id}, {tgt_id})",
description,
global_config,
pipeline_status,
pipeline_status_lock,
llm_response_cache,
)
else:
status_message = f"Merge E: {src_id} - {tgt_id} | {num_new_fragment}+{num_fragment-num_new_fragment}"
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
await knowledge_graph_inst.upsert_edge(
src_id,

View File

@ -967,7 +967,7 @@ async def use_llm_func_with_cache(
res: str = await use_llm_func(input_text, **kwargs)
# Save to cache
logger.info(f"Saving LLM cache for {arg_hash}")
logger.info(f" == LLM cache == saving {arg_hash}")
await save_to_cache(
llm_response_cache,
CacheData(