LightRAG/lightrag/operate.py

3218 lines
116 KiB
Python
Raw Normal View History

2025-02-15 22:37:12 +01:00
from __future__ import annotations
from functools import partial
2025-02-15 22:37:12 +01:00
2024-10-10 15:02:30 +08:00
import asyncio
import json
import re
import os
2025-02-15 22:37:12 +01:00
from typing import Any, AsyncIterator
2024-10-10 15:02:30 +08:00
from collections import Counter, defaultdict
2024-10-10 15:02:30 +08:00
from .utils import (
logger,
clean_str,
compute_mdhash_id,
Tokenizer,
2024-10-10 15:02:30 +08:00
is_float_regex,
normalize_extracted_info,
2024-10-10 15:02:30 +08:00
pack_user_ass_to_openai_messages,
split_string_by_multi_markers,
truncate_list_by_token_size,
compute_args_hash,
handle_cache,
save_to_cache,
CacheData,
2025-01-24 18:59:24 +08:00
get_conversation_turns,
use_llm_func_with_cache,
update_chunk_cache_list,
remove_think_tags,
linear_gradient_weighted_polling,
process_chunks_unified,
build_file_path,
2024-10-10 15:02:30 +08:00
)
from .base import (
BaseGraphStorage,
BaseKVStorage,
BaseVectorStorage,
TextChunkSchema,
QueryParam,
)
from .prompt import PROMPTS
from .constants import (
GRAPH_FIELD_SEP,
DEFAULT_MAX_ENTITY_TOKENS,
DEFAULT_MAX_RELATION_TOKENS,
DEFAULT_MAX_TOTAL_TOKENS,
2025-07-15 21:35:12 +08:00
DEFAULT_RELATED_CHUNK_NUMBER,
)
from .kg.shared_storage import get_storage_keyed_lock
import time
from dotenv import load_dotenv
# use the .env that is inside the current folder
# allows to use different .env file for each lightrag instance
# the OS environment variables take precedence over the .env file
load_dotenv(dotenv_path=".env", override=False)
2024-10-10 15:02:30 +08:00
2024-10-10 15:02:30 +08:00
def chunking_by_token_size(
tokenizer: Tokenizer,
2025-01-07 16:26:12 +08:00
content: str,
2025-02-15 22:37:12 +01:00
split_by_character: str | None = None,
2025-02-09 13:18:47 +01:00
split_by_character_only: bool = False,
overlap_token_size: int = 128,
max_token_size: int = 1024,
2025-02-09 10:39:48 +01:00
) -> list[dict[str, Any]]:
tokens = tokenizer.encode(content)
2025-02-09 10:39:48 +01:00
results: list[dict[str, Any]] = []
if split_by_character:
raw_chunks = content.split(split_by_character)
new_chunks = []
if split_by_character_only:
for chunk in raw_chunks:
_tokens = tokenizer.encode(chunk)
new_chunks.append((len(_tokens), chunk))
else:
for chunk in raw_chunks:
_tokens = tokenizer.encode(chunk)
if len(_tokens) > max_token_size:
for start in range(
0, len(_tokens), max_token_size - overlap_token_size
):
chunk_content = tokenizer.decode(
_tokens[start : start + max_token_size]
)
new_chunks.append(
(min(max_token_size, len(_tokens) - start), chunk_content)
)
else:
new_chunks.append((len(_tokens), chunk))
for index, (_len, chunk) in enumerate(new_chunks):
results.append(
{
"tokens": _len,
"content": chunk.strip(),
"chunk_order_index": index,
}
)
else:
for index, start in enumerate(
2025-01-07 16:26:12 +08:00
range(0, len(tokens), max_token_size - overlap_token_size)
):
2025-04-18 16:14:31 +02:00
chunk_content = tokenizer.decode(tokens[start : start + max_token_size])
results.append(
{
"tokens": min(max_token_size, len(tokens) - start),
"content": chunk_content.strip(),
"chunk_order_index": index,
}
)
2024-10-10 15:02:30 +08:00
return results
2024-10-10 15:02:30 +08:00
async def _handle_entity_relation_summary(
2025-01-07 16:26:12 +08:00
entity_or_relation_name: str,
description: str,
global_config: dict,
llm_response_cache: BaseKVStorage | None = None,
2024-10-10 15:02:30 +08:00
) -> str:
"""Handle entity relation summary
For each entity or relation, input is the combined description of already existing description and new description.
If too long, use LLM to summarize.
"""
2024-10-10 15:02:30 +08:00
use_llm_func: callable = global_config["llm_model_func"]
# Apply higher priority (8) to entity/relation summary tasks
use_llm_func = partial(use_llm_func, _priority=8)
tokenizer: Tokenizer = global_config["tokenizer"]
2024-10-10 15:02:30 +08:00
llm_max_tokens = global_config["llm_model_max_token_size"]
2025-04-10 17:29:07 +08:00
2024-11-28 14:28:29 +01:00
language = global_config["addon_params"].get(
"language", PROMPTS["DEFAULT_LANGUAGE"]
)
2024-10-10 15:02:30 +08:00
tokens = tokenizer.encode(description)
### summarize is not determined here anymore (It's determined by num_fragment now)
# if len(tokens) < summary_max_tokens: # No need for summary
# return description
2024-10-10 15:02:30 +08:00
prompt_template = PROMPTS["summarize_entity_descriptions"]
2025-04-18 16:14:31 +02:00
use_description = tokenizer.decode(tokens[:llm_max_tokens])
2024-10-10 15:02:30 +08:00
context_base = dict(
entity_name=entity_or_relation_name,
description_list=use_description.split(GRAPH_FIELD_SEP),
2024-11-28 14:28:29 +01:00
language=language,
2024-10-10 15:02:30 +08:00
)
use_prompt = prompt_template.format(**context_base)
logger.debug(f"Trigger summary: {entity_or_relation_name}")
# Use LLM function with cache (higher priority for summary generation)
summary = await use_llm_func_with_cache(
2025-04-10 03:58:04 +08:00
use_prompt,
use_llm_func,
llm_response_cache=llm_response_cache,
# max_tokens=summary_max_tokens,
cache_type="extract",
)
2024-10-10 15:02:30 +08:00
return summary
async def _handle_single_entity_extraction(
2025-01-07 16:26:12 +08:00
record_attributes: list[str],
chunk_key: str,
2025-03-17 23:32:35 +08:00
file_path: str = "unknown_source",
2024-10-10 15:02:30 +08:00
):
2025-04-22 15:22:37 +08:00
if len(record_attributes) < 4 or '"entity"' not in record_attributes[0]:
2024-10-10 15:02:30 +08:00
return None
# Clean and validate entity name
2025-04-17 22:58:36 +08:00
entity_name = clean_str(record_attributes[1]).strip()
if not entity_name:
logger.warning(
f"Entity extraction error: empty entity name in: {record_attributes}"
)
2024-10-10 15:02:30 +08:00
return None
# Normalize entity name
2025-04-12 20:45:41 +08:00
entity_name = normalize_extracted_info(entity_name, is_entity=True)
2025-07-05 12:07:37 +08:00
# Check if entity name became empty after normalization
if not entity_name or not entity_name.strip():
logger.warning(
f"Entity extraction error: entity name became empty after normalization. Original: '{record_attributes[1]}'"
)
return None
# Clean and validate entity type
2025-03-01 17:45:06 +08:00
entity_type = clean_str(record_attributes[2]).strip('"')
if not entity_type.strip() or entity_type.startswith('("'):
logger.warning(
f"Entity extraction error: invalid entity type in: {record_attributes}"
)
return None
# Clean and validate description
2025-04-12 20:45:41 +08:00
entity_description = clean_str(record_attributes[3])
entity_description = normalize_extracted_info(entity_description)
if not entity_description.strip():
logger.warning(
f"Entity extraction error: empty description for entity '{entity_name}' of type '{entity_type}'"
)
return None
2024-10-10 15:02:30 +08:00
return dict(
entity_name=entity_name,
entity_type=entity_type,
description=entity_description,
source_id=chunk_key,
2025-03-20 16:29:24 +08:00
file_path=file_path,
2024-10-10 15:02:30 +08:00
)
async def _handle_single_relationship_extraction(
2025-01-07 16:26:12 +08:00
record_attributes: list[str],
chunk_key: str,
2025-03-17 23:32:35 +08:00
file_path: str = "unknown_source",
2024-10-10 15:02:30 +08:00
):
2025-04-22 15:22:37 +08:00
if len(record_attributes) < 5 or '"relationship"' not in record_attributes[0]:
2024-10-10 15:02:30 +08:00
return None
# add this record as edge
2025-04-12 20:45:41 +08:00
source = clean_str(record_attributes[1])
target = clean_str(record_attributes[2])
# Normalize source and target entity names
2025-04-12 20:45:41 +08:00
source = normalize_extracted_info(source, is_entity=True)
target = normalize_extracted_info(target, is_entity=True)
2025-07-05 12:07:37 +08:00
# Check if source or target became empty after normalization
if not source or not source.strip():
logger.warning(
f"Relationship extraction error: source entity became empty after normalization. Original: '{record_attributes[1]}'"
)
return None
2025-07-05 12:07:37 +08:00
if not target or not target.strip():
logger.warning(
f"Relationship extraction error: target entity became empty after normalization. Original: '{record_attributes[2]}'"
)
return None
2025-07-05 12:07:37 +08:00
if source == target:
logger.debug(
f"Relationship source and target are the same in: {record_attributes}"
)
return None
2025-04-12 20:45:41 +08:00
edge_description = clean_str(record_attributes[3])
edge_description = normalize_extracted_info(edge_description)
2025-05-08 16:05:52 +08:00
edge_keywords = normalize_extracted_info(
clean_str(record_attributes[4]), is_entity=True
)
edge_keywords = edge_keywords.replace("", ",")
2024-10-10 15:02:30 +08:00
edge_source_id = chunk_key
weight = (
2025-04-12 20:45:41 +08:00
float(record_attributes[-1].strip('"').strip("'"))
if is_float_regex(record_attributes[-1].strip('"').strip("'"))
2025-03-01 17:45:06 +08:00
else 1.0
2024-10-10 15:02:30 +08:00
)
return dict(
src_id=source,
tgt_id=target,
weight=weight,
description=edge_description,
keywords=edge_keywords,
source_id=edge_source_id,
2025-03-20 16:29:24 +08:00
file_path=file_path,
2024-10-10 15:02:30 +08:00
)
2025-06-09 18:52:34 +08:00
async def _rebuild_knowledge_from_chunks(
entities_to_rebuild: dict[str, set[str]],
relationships_to_rebuild: dict[tuple[str, str], set[str]],
knowledge_graph_inst: BaseGraphStorage,
entities_vdb: BaseVectorStorage,
relationships_vdb: BaseVectorStorage,
text_chunks_storage: BaseKVStorage,
2025-06-09 18:52:34 +08:00
llm_response_cache: BaseKVStorage,
global_config: dict[str, str],
pipeline_status: dict | None = None,
pipeline_status_lock=None,
2025-06-09 18:52:34 +08:00
) -> None:
"""Rebuild entity and relationship descriptions from cached extraction results with parallel processing
2025-06-09 18:52:34 +08:00
This method uses cached LLM extraction results instead of calling LLM again,
following the same approach as the insert process. Now with parallel processing
controlled by llm_model_max_async and using get_storage_keyed_lock for data consistency.
2025-06-09 18:52:34 +08:00
Args:
entities_to_rebuild: Dict mapping entity_name -> set of remaining chunk_ids
relationships_to_rebuild: Dict mapping (src, tgt) -> set of remaining chunk_ids
knowledge_graph_inst: Knowledge graph storage
entities_vdb: Entity vector database
relationships_vdb: Relationship vector database
text_chunks_storage: Text chunks storage
llm_response_cache: LLM response cache
global_config: Global configuration containing llm_model_max_async
pipeline_status: Pipeline status dictionary
pipeline_status_lock: Lock for pipeline status
2025-06-09 18:52:34 +08:00
"""
if not entities_to_rebuild and not relationships_to_rebuild:
return
# Get all referenced chunk IDs
all_referenced_chunk_ids = set()
for chunk_ids in entities_to_rebuild.values():
all_referenced_chunk_ids.update(chunk_ids)
for chunk_ids in relationships_to_rebuild.values():
all_referenced_chunk_ids.update(chunk_ids)
status_message = f"Rebuilding knowledge from {len(all_referenced_chunk_ids)} cached chunk extractions (parallel processing)"
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
2025-06-09 18:52:34 +08:00
# Get cached extraction results for these chunks using storage
# cached_results chunk_id -> [list of extraction result from LLM cache sorted by created_at]
2025-06-09 18:52:34 +08:00
cached_results = await _get_cached_extraction_results(
llm_response_cache,
all_referenced_chunk_ids,
text_chunks_storage=text_chunks_storage,
2025-06-09 18:52:34 +08:00
)
if not cached_results:
status_message = "No cached extraction results found, cannot rebuild"
logger.warning(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
2025-06-09 18:52:34 +08:00
return
# Process cached results to get entities and relationships for each chunk
chunk_entities = {} # chunk_id -> {entity_name: [entity_data]}
chunk_relationships = {} # chunk_id -> {(src, tgt): [relationship_data]}
for chunk_id, extraction_results in cached_results.items():
2025-06-09 18:52:34 +08:00
try:
# Handle multiple extraction results per chunk
chunk_entities[chunk_id] = defaultdict(list)
chunk_relationships[chunk_id] = defaultdict(list)
# process multiple LLM extraction results for a single chunk_id
for extraction_result in extraction_results:
entities, relationships = await _parse_extraction_result(
text_chunks_storage=text_chunks_storage,
extraction_result=extraction_result,
chunk_id=chunk_id,
)
# Merge entities and relationships from this extraction result
# Only keep the first occurrence of each entity_name in the same chunk_id
for entity_name, entity_list in entities.items():
if (
entity_name not in chunk_entities[chunk_id]
or len(chunk_entities[chunk_id][entity_name]) == 0
):
chunk_entities[chunk_id][entity_name].extend(entity_list)
# Only keep the first occurrence of each rel_key in the same chunk_id
for rel_key, rel_list in relationships.items():
if (
rel_key not in chunk_relationships[chunk_id]
or len(chunk_relationships[chunk_id][rel_key]) == 0
):
chunk_relationships[chunk_id][rel_key].extend(rel_list)
2025-06-09 18:52:34 +08:00
except Exception as e:
status_message = (
2025-06-09 18:52:34 +08:00
f"Failed to parse cached extraction result for chunk {chunk_id}: {e}"
)
logger.info(status_message) # Per requirement, change to info
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
2025-06-09 18:52:34 +08:00
continue
# Get max async tasks limit from global_config for semaphore control
graph_max_async = global_config.get("llm_model_max_async", 4) * 2
semaphore = asyncio.Semaphore(graph_max_async)
# Counters for tracking progress
rebuilt_entities_count = 0
rebuilt_relationships_count = 0
failed_entities_count = 0
failed_relationships_count = 0
async def _locked_rebuild_entity(entity_name, chunk_ids):
nonlocal rebuilt_entities_count, failed_entities_count
async with semaphore:
workspace = global_config.get("workspace", "")
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
async with get_storage_keyed_lock(
[entity_name], namespace=namespace, enable_logging=False
):
try:
await _rebuild_single_entity(
knowledge_graph_inst=knowledge_graph_inst,
entities_vdb=entities_vdb,
entity_name=entity_name,
chunk_ids=chunk_ids,
chunk_entities=chunk_entities,
llm_response_cache=llm_response_cache,
global_config=global_config,
)
rebuilt_entities_count += 1
status_message = (
f"Rebuilt entity: {entity_name} from {len(chunk_ids)} chunks"
)
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
except Exception as e:
failed_entities_count += 1
status_message = f"Failed to rebuild entity {entity_name}: {e}"
logger.info(status_message) # Per requirement, change to info
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
async def _locked_rebuild_relationship(src, tgt, chunk_ids):
nonlocal rebuilt_relationships_count, failed_relationships_count
async with semaphore:
workspace = global_config.get("workspace", "")
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
# Sort src and tgt to ensure order-independent lock key generation
sorted_key_parts = sorted([src, tgt])
async with get_storage_keyed_lock(
sorted_key_parts,
namespace=namespace,
enable_logging=False,
):
try:
await _rebuild_single_relationship(
knowledge_graph_inst=knowledge_graph_inst,
relationships_vdb=relationships_vdb,
src=src,
tgt=tgt,
chunk_ids=chunk_ids,
chunk_relationships=chunk_relationships,
llm_response_cache=llm_response_cache,
global_config=global_config,
)
rebuilt_relationships_count += 1
status_message = f"Rebuilt relationship: {src}->{tgt} from {len(chunk_ids)} chunks"
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
except Exception as e:
failed_relationships_count += 1
status_message = f"Failed to rebuild relationship {src}->{tgt}: {e}"
logger.info(status_message) # Per requirement, change to info
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
# Create tasks for parallel processing
tasks = []
# Add entity rebuilding tasks
2025-06-09 18:52:34 +08:00
for entity_name, chunk_ids in entities_to_rebuild.items():
task = asyncio.create_task(_locked_rebuild_entity(entity_name, chunk_ids))
tasks.append(task)
2025-06-09 18:52:34 +08:00
# Add relationship rebuilding tasks
2025-06-09 18:52:34 +08:00
for (src, tgt), chunk_ids in relationships_to_rebuild.items():
task = asyncio.create_task(_locked_rebuild_relationship(src, tgt, chunk_ids))
tasks.append(task)
# Log parallel processing start
status_message = f"Starting parallel rebuild of {len(entities_to_rebuild)} entities and {len(relationships_to_rebuild)} relationships (async: {graph_max_async})"
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
# Execute all tasks in parallel with semaphore control and early failure detection
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
# Check if any task raised an exception
for task in done:
if task.exception():
# If a task failed, cancel all pending tasks
for pending_task in pending:
pending_task.cancel()
# Wait for cancellation to complete
if pending:
await asyncio.wait(pending)
# Re-raise the exception to notify the caller
raise task.exception()
# If all tasks completed successfully, collect results
# (No need to collect results since these tasks don't return values)
# Final status report
status_message = f"KG rebuild completed: {rebuilt_entities_count} entities and {rebuilt_relationships_count} relationships rebuilt successfully."
if failed_entities_count > 0 or failed_relationships_count > 0:
status_message += f" Failed: {failed_entities_count} entities, {failed_relationships_count} relationships."
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
2025-06-09 18:52:34 +08:00
async def _get_cached_extraction_results(
llm_response_cache: BaseKVStorage,
chunk_ids: set[str],
text_chunks_storage: BaseKVStorage,
) -> dict[str, list[str]]:
2025-06-09 18:52:34 +08:00
"""Get cached extraction results for specific chunk IDs
Args:
llm_response_cache: LLM response cache storage
2025-06-09 18:52:34 +08:00
chunk_ids: Set of chunk IDs to get cached results for
text_chunks_data: Pre-loaded chunk data (optional, for performance)
text_chunks_storage: Text chunks storage (fallback if text_chunks_data is None)
2025-06-09 18:52:34 +08:00
Returns:
Dict mapping chunk_id -> list of extraction_result_text
2025-06-09 18:52:34 +08:00
"""
cached_results = {}
# Collect all LLM cache IDs from chunks
all_cache_ids = set()
# Read from storage
chunk_data_list = await text_chunks_storage.get_by_ids(list(chunk_ids))
for chunk_id, chunk_data in zip(chunk_ids, chunk_data_list):
if chunk_data and isinstance(chunk_data, dict):
llm_cache_list = chunk_data.get("llm_cache_list", [])
if llm_cache_list:
all_cache_ids.update(llm_cache_list)
else:
logger.warning(
f"Chunk {chunk_id} data is invalid or None: {type(chunk_data)}"
)
if not all_cache_ids:
logger.warning(f"No LLM cache IDs found for {len(chunk_ids)} chunk IDs")
return cached_results
# Batch get LLM cache entries
cache_data_list = await llm_response_cache.get_by_ids(list(all_cache_ids))
2025-06-09 18:52:34 +08:00
# Process cache entries and group by chunk_id
valid_entries = 0
for cache_id, cache_entry in zip(all_cache_ids, cache_data_list):
2025-06-09 18:52:34 +08:00
if (
cache_entry is not None
and isinstance(cache_entry, dict)
2025-06-09 18:52:34 +08:00
and cache_entry.get("cache_type") == "extract"
and cache_entry.get("chunk_id") in chunk_ids
):
chunk_id = cache_entry["chunk_id"]
extraction_result = cache_entry["return"]
create_time = cache_entry.get(
"create_time", 0
) # Get creation time, default to 0
valid_entries += 1
# Support multiple LLM caches per chunk
if chunk_id not in cached_results:
cached_results[chunk_id] = []
# Store tuple with extraction result and creation time for sorting
cached_results[chunk_id].append((extraction_result, create_time))
# Sort extraction results by create_time for each chunk
for chunk_id in cached_results:
# Sort by create_time (x[1]), then extract only extraction_result (x[0])
cached_results[chunk_id].sort(key=lambda x: x[1])
cached_results[chunk_id] = [item[0] for item in cached_results[chunk_id]]
2025-06-09 18:52:34 +08:00
logger.info(
f"Found {valid_entries} valid cache entries, {len(cached_results)} chunks with results"
2025-06-09 18:52:34 +08:00
)
return cached_results
async def _parse_extraction_result(
text_chunks_storage: BaseKVStorage, extraction_result: str, chunk_id: str
2025-06-09 18:52:34 +08:00
) -> tuple[dict, dict]:
"""Parse cached extraction result using the same logic as extract_entities
Args:
text_chunks_storage: Text chunks storage to get chunk data
2025-06-09 18:52:34 +08:00
extraction_result: The cached LLM extraction result
chunk_id: The chunk ID for source tracking
Returns:
Tuple of (entities_dict, relationships_dict)
"""
# Get chunk data for file_path from storage
chunk_data = await text_chunks_storage.get_by_id(chunk_id)
2025-06-09 18:52:34 +08:00
file_path = (
chunk_data.get("file_path", "unknown_source")
if chunk_data
else "unknown_source"
)
context_base = dict(
tuple_delimiter=PROMPTS["DEFAULT_TUPLE_DELIMITER"],
record_delimiter=PROMPTS["DEFAULT_RECORD_DELIMITER"],
completion_delimiter=PROMPTS["DEFAULT_COMPLETION_DELIMITER"],
)
maybe_nodes = defaultdict(list)
maybe_edges = defaultdict(list)
# Parse the extraction result using the same logic as in extract_entities
records = split_string_by_multi_markers(
extraction_result,
[context_base["record_delimiter"], context_base["completion_delimiter"]],
)
for record in records:
record = re.search(r"\((.*)\)", record)
if record is None:
continue
record = record.group(1)
record_attributes = split_string_by_multi_markers(
record, [context_base["tuple_delimiter"]]
)
# Try to parse as entity
entity_data = await _handle_single_entity_extraction(
record_attributes, chunk_id, file_path
)
if entity_data is not None:
maybe_nodes[entity_data["entity_name"]].append(entity_data)
continue
# Try to parse as relationship
relationship_data = await _handle_single_relationship_extraction(
record_attributes, chunk_id, file_path
)
if relationship_data is not None:
maybe_edges[
(relationship_data["src_id"], relationship_data["tgt_id"])
].append(relationship_data)
return dict(maybe_nodes), dict(maybe_edges)
async def _rebuild_single_entity(
knowledge_graph_inst: BaseGraphStorage,
entities_vdb: BaseVectorStorage,
entity_name: str,
chunk_ids: set[str],
chunk_entities: dict,
llm_response_cache: BaseKVStorage,
global_config: dict[str, str],
) -> None:
"""Rebuild a single entity from cached extraction results"""
# Get current entity data
current_entity = await knowledge_graph_inst.get_node(entity_name)
if not current_entity:
return
2025-06-09 19:40:29 +08:00
# Helper function to update entity in both graph and vector storage
async def _update_entity_storage(
2025-06-09 19:47:29 +08:00
final_description: str, entity_type: str, file_paths: set[str]
2025-06-09 19:40:29 +08:00
):
# Update entity in graph storage
updated_entity_data = {
**current_entity,
"description": final_description,
"entity_type": entity_type,
"source_id": GRAPH_FIELD_SEP.join(chunk_ids),
"file_path": build_file_path(
current_entity.get("file_path", "").split(GRAPH_FIELD_SEP)
if current_entity.get("file_path")
else [],
[{"file_path": fp} for fp in file_paths],
entity_name,
)
2025-06-09 19:47:29 +08:00
if file_paths
else current_entity.get("file_path", "unknown_source"),
2025-06-09 19:40:29 +08:00
}
await knowledge_graph_inst.upsert_node(entity_name, updated_entity_data)
# Update entity in vector database
entity_vdb_id = compute_mdhash_id(entity_name, prefix="ent-")
# Delete old vector record first
try:
await entities_vdb.delete([entity_vdb_id])
except Exception as e:
2025-06-09 19:47:29 +08:00
logger.debug(
f"Could not delete old entity vector record {entity_vdb_id}: {e}"
)
2025-06-09 19:40:29 +08:00
# Insert new vector record
entity_content = f"{entity_name}\n{final_description}"
await entities_vdb.upsert(
{
entity_vdb_id: {
"content": entity_content,
"entity_name": entity_name,
"source_id": updated_entity_data["source_id"],
"description": final_description,
"entity_type": entity_type,
"file_path": updated_entity_data["file_path"],
}
}
)
# Helper function to generate final description with optional LLM summary
async def _generate_final_description(combined_description: str) -> str:
force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"]
num_fragment = combined_description.count(GRAPH_FIELD_SEP) + 1
if num_fragment >= force_llm_summary_on_merge:
2025-06-09 19:40:29 +08:00
return await _handle_entity_relation_summary(
entity_name,
combined_description,
global_config,
llm_response_cache=llm_response_cache,
)
else:
return combined_description
2025-06-09 18:52:34 +08:00
# Collect all entity data from relevant chunks
all_entity_data = []
for chunk_id in chunk_ids:
if chunk_id in chunk_entities and entity_name in chunk_entities[chunk_id]:
all_entity_data.extend(chunk_entities[chunk_id][entity_name])
if not all_entity_data:
2025-06-09 19:47:29 +08:00
logger.warning(
f"No cached entity data found for {entity_name}, trying to rebuild from relationships"
)
2025-06-09 19:40:29 +08:00
# Get all edges connected to this entity
edges = await knowledge_graph_inst.get_node_edges(entity_name)
if not edges:
logger.warning(f"No relationships found for entity {entity_name}")
return
2025-06-09 19:47:29 +08:00
2025-06-09 19:40:29 +08:00
# Collect relationship data to extract entity information
relationship_descriptions = []
file_paths = set()
2025-06-09 19:47:29 +08:00
2025-06-09 19:40:29 +08:00
# Get edge data for all connected relationships
for src_id, tgt_id in edges:
edge_data = await knowledge_graph_inst.get_edge(src_id, tgt_id)
if edge_data:
if edge_data.get("description"):
relationship_descriptions.append(edge_data["description"])
2025-06-09 19:47:29 +08:00
2025-06-09 19:40:29 +08:00
if edge_data.get("file_path"):
edge_file_paths = edge_data["file_path"].split(GRAPH_FIELD_SEP)
file_paths.update(edge_file_paths)
2025-06-09 19:47:29 +08:00
2025-06-09 19:40:29 +08:00
# Generate description from relationships or fallback to current
if relationship_descriptions:
combined_description = GRAPH_FIELD_SEP.join(relationship_descriptions)
final_description = await _generate_final_description(combined_description)
else:
final_description = current_entity.get("description", "")
2025-06-09 19:47:29 +08:00
2025-06-09 19:40:29 +08:00
entity_type = current_entity.get("entity_type", "UNKNOWN")
await _update_entity_storage(final_description, entity_type, file_paths)
2025-06-09 18:52:34 +08:00
return
2025-06-09 19:40:29 +08:00
# Process cached entity data
2025-06-09 18:52:34 +08:00
descriptions = []
entity_types = []
file_paths = set()
for entity_data in all_entity_data:
if entity_data.get("description"):
descriptions.append(entity_data["description"])
if entity_data.get("entity_type"):
entity_types.append(entity_data["entity_type"])
if entity_data.get("file_path"):
file_paths.add(entity_data["file_path"])
# Combine all descriptions
combined_description = (
GRAPH_FIELD_SEP.join(descriptions)
if descriptions
else current_entity.get("description", "")
)
# Get most common entity type
entity_type = (
max(set(entity_types), key=entity_types.count)
if entity_types
else current_entity.get("entity_type", "UNKNOWN")
)
2025-06-09 19:40:29 +08:00
# Generate final description and update storage
final_description = await _generate_final_description(combined_description)
await _update_entity_storage(final_description, entity_type, file_paths)
2025-06-09 18:52:34 +08:00
async def _rebuild_single_relationship(
knowledge_graph_inst: BaseGraphStorage,
relationships_vdb: BaseVectorStorage,
src: str,
tgt: str,
chunk_ids: set[str],
chunk_relationships: dict,
llm_response_cache: BaseKVStorage,
global_config: dict[str, str],
) -> None:
"""Rebuild a single relationship from cached extraction results
Note: This function assumes the caller has already acquired the appropriate
keyed lock for the relationship pair to ensure thread safety.
"""
2025-06-09 18:52:34 +08:00
# Get current relationship data
current_relationship = await knowledge_graph_inst.get_edge(src, tgt)
if not current_relationship:
return
# Collect all relationship data from relevant chunks
all_relationship_data = []
for chunk_id in chunk_ids:
if chunk_id in chunk_relationships:
# Check both (src, tgt) and (tgt, src) since relationships can be bidirectional
for edge_key in [(src, tgt), (tgt, src)]:
if edge_key in chunk_relationships[chunk_id]:
all_relationship_data.extend(
chunk_relationships[chunk_id][edge_key]
)
if not all_relationship_data:
logger.warning(f"No cached relationship data found for {src}-{tgt}")
return
# Merge descriptions and keywords
descriptions = []
keywords = []
weights = []
file_paths = set()
for rel_data in all_relationship_data:
if rel_data.get("description"):
descriptions.append(rel_data["description"])
if rel_data.get("keywords"):
keywords.append(rel_data["keywords"])
if rel_data.get("weight"):
weights.append(rel_data["weight"])
if rel_data.get("file_path"):
file_paths.add(rel_data["file_path"])
# Combine descriptions and keywords
combined_description = (
GRAPH_FIELD_SEP.join(descriptions)
if descriptions
else current_relationship.get("description", "")
)
combined_keywords = (
", ".join(set(keywords))
if keywords
else current_relationship.get("keywords", "")
)
2025-06-09 19:47:29 +08:00
# weight = (
# sum(weights) / len(weights)
# if weights
# else current_relationship.get("weight", 1.0)
# )
weight = sum(weights) if weights else current_relationship.get("weight", 1.0)
2025-06-09 18:52:34 +08:00
# Use summary if description has too many fragments
force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"]
num_fragment = combined_description.count(GRAPH_FIELD_SEP) + 1
if num_fragment >= force_llm_summary_on_merge:
2025-06-09 18:52:34 +08:00
final_description = await _handle_entity_relation_summary(
f"{src}-{tgt}",
combined_description,
global_config,
llm_response_cache=llm_response_cache,
)
else:
final_description = combined_description
# Update relationship in graph storage
updated_relationship_data = {
**current_relationship,
"description": final_description,
"keywords": combined_keywords,
2025-06-09 19:47:29 +08:00
"weight": weight,
2025-06-09 18:52:34 +08:00
"source_id": GRAPH_FIELD_SEP.join(chunk_ids),
"file_path": build_file_path(
current_relationship.get("file_path", "").split(GRAPH_FIELD_SEP)
if current_relationship.get("file_path")
else [],
[{"file_path": fp} for fp in file_paths if fp],
f"{src}-{tgt}",
)
2025-06-09 18:52:34 +08:00
if file_paths
else current_relationship.get("file_path", "unknown_source"),
}
await knowledge_graph_inst.upsert_edge(src, tgt, updated_relationship_data)
# Update relationship in vector database
rel_vdb_id = compute_mdhash_id(src + tgt, prefix="rel-")
rel_vdb_id_reverse = compute_mdhash_id(tgt + src, prefix="rel-")
# Delete old vector records first (both directions to be safe)
try:
await relationships_vdb.delete([rel_vdb_id, rel_vdb_id_reverse])
except Exception as e:
logger.debug(
f"Could not delete old relationship vector records {rel_vdb_id}, {rel_vdb_id_reverse}: {e}"
)
# Insert new vector record
rel_content = f"{combined_keywords}\t{src}\n{tgt}\n{final_description}"
await relationships_vdb.upsert(
{
rel_vdb_id: {
"src_id": src,
"tgt_id": tgt,
"source_id": updated_relationship_data["source_id"],
"content": rel_content,
"keywords": combined_keywords,
"description": final_description,
2025-06-09 19:47:29 +08:00
"weight": weight,
2025-06-09 18:52:34 +08:00
"file_path": updated_relationship_data["file_path"],
}
}
)
2024-10-10 15:02:30 +08:00
async def _merge_nodes_then_upsert(
2025-01-07 16:26:12 +08:00
entity_name: str,
nodes_data: list[dict],
knowledge_graph_inst: BaseGraphStorage,
global_config: dict,
pipeline_status: dict = None,
pipeline_status_lock=None,
llm_response_cache: BaseKVStorage | None = None,
2024-10-10 15:02:30 +08:00
):
"""Get existing nodes from knowledge graph use name,if exists, merge data, else create, then upsert."""
already_entity_types = []
2024-10-10 15:02:30 +08:00
already_source_ids = []
already_description = []
2025-03-17 23:32:35 +08:00
already_file_paths = []
2024-10-10 15:02:30 +08:00
2024-10-26 00:11:21 -04:00
already_node = await knowledge_graph_inst.get_node(entity_name)
2025-05-30 11:03:59 +08:00
if already_node:
already_entity_types.append(already_node["entity_type"])
2024-10-10 15:02:30 +08:00
already_source_ids.extend(
split_string_by_multi_markers(already_node["source_id"], [GRAPH_FIELD_SEP])
)
2025-03-17 23:32:35 +08:00
already_file_paths.extend(
2025-03-20 16:29:24 +08:00
split_string_by_multi_markers(already_node["file_path"], [GRAPH_FIELD_SEP])
2025-03-17 23:32:35 +08:00
)
2024-10-10 15:02:30 +08:00
already_description.append(already_node["description"])
entity_type = sorted(
Counter(
[dp["entity_type"] for dp in nodes_data] + already_entity_types
2024-10-10 15:02:30 +08:00
).items(),
key=lambda x: x[1],
reverse=True,
)[0][0]
description = GRAPH_FIELD_SEP.join(
sorted(set([dp["description"] for dp in nodes_data] + already_description))
)
source_id = GRAPH_FIELD_SEP.join(
set([dp["source_id"] for dp in nodes_data] + already_source_ids)
)
2025-07-26 00:46:02 +08:00
file_path = build_file_path(already_file_paths, nodes_data, entity_name)
2025-04-10 17:29:07 +08:00
force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"]
num_fragment = description.count(GRAPH_FIELD_SEP) + 1
num_new_fragment = len(set([dp["description"] for dp in nodes_data]))
if num_fragment > 1:
if num_fragment >= force_llm_summary_on_merge:
status_message = f"LLM merge N: {entity_name} | {num_new_fragment}+{num_fragment-num_new_fragment}"
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
description = await _handle_entity_relation_summary(
entity_name,
description,
global_config,
llm_response_cache,
)
else:
status_message = f"Merge N: {entity_name} | {num_new_fragment}+{num_fragment-num_new_fragment}"
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
2024-10-10 15:02:30 +08:00
node_data = dict(
entity_id=entity_name,
2024-10-10 15:02:30 +08:00
entity_type=entity_type,
description=description,
source_id=source_id,
2025-03-17 23:32:35 +08:00
file_path=file_path,
2025-05-01 15:14:15 +08:00
created_at=int(time.time()),
2024-10-10 15:02:30 +08:00
)
2024-10-26 00:11:21 -04:00
await knowledge_graph_inst.upsert_node(
2024-10-10 15:02:30 +08:00
entity_name,
node_data=node_data,
)
node_data["entity_name"] = entity_name
return node_data
async def _merge_edges_then_upsert(
2025-01-07 16:26:12 +08:00
src_id: str,
tgt_id: str,
2025-02-17 23:21:14 +01:00
edges_data: list[dict],
2025-01-07 16:26:12 +08:00
knowledge_graph_inst: BaseGraphStorage,
2025-02-17 23:21:14 +01:00
global_config: dict,
pipeline_status: dict = None,
pipeline_status_lock=None,
llm_response_cache: BaseKVStorage | None = None,
2024-10-10 15:02:30 +08:00
):
if src_id == tgt_id:
return None
2025-02-17 23:21:14 +01:00
already_weights = []
already_source_ids = []
already_description = []
already_keywords = []
2025-03-17 23:32:35 +08:00
already_file_paths = []
2024-10-10 15:02:30 +08:00
2024-10-26 00:11:21 -04:00
if await knowledge_graph_inst.has_edge(src_id, tgt_id):
already_edge = await knowledge_graph_inst.get_edge(src_id, tgt_id)
2025-02-17 23:20:10 +01:00
# Handle the case where get_edge returns None or missing fields
2025-02-17 23:26:51 +01:00
if already_edge:
# Get weight with default 1.0 if missing
already_weights.append(already_edge.get("weight", 1.0))
2025-02-17 23:26:51 +01:00
# Get source_id with empty string default if missing or None
if already_edge.get("source_id") is not None:
already_source_ids.extend(
split_string_by_multi_markers(
already_edge["source_id"], [GRAPH_FIELD_SEP]
)
)
2025-03-17 23:36:00 +08:00
2025-03-17 23:32:35 +08:00
# Get file_path with empty string default if missing or None
if already_edge.get("file_path") is not None:
already_file_paths.extend(
split_string_by_multi_markers(
2025-03-20 16:29:24 +08:00
already_edge["file_path"], [GRAPH_FIELD_SEP]
2025-03-17 23:32:35 +08:00
)
)
2024-10-10 15:02:30 +08:00
2025-02-17 23:26:51 +01:00
# Get description with empty string default if missing or None
if already_edge.get("description") is not None:
already_description.append(already_edge["description"])
2025-02-17 23:26:51 +01:00
# Get keywords with empty string default if missing or None
if already_edge.get("keywords") is not None:
already_keywords.extend(
split_string_by_multi_markers(
already_edge["keywords"], [GRAPH_FIELD_SEP]
)
)
2024-10-10 15:02:30 +08:00
# Process edges_data with None checks
2025-06-09 19:47:29 +08:00
weight = sum([dp["weight"] for dp in edges_data] + already_weights)
2024-10-10 15:02:30 +08:00
description = GRAPH_FIELD_SEP.join(
sorted(
set(
[dp["description"] for dp in edges_data if dp.get("description")]
+ already_description
)
)
2024-10-10 15:02:30 +08:00
)
2025-05-08 15:52:18 +08:00
# Split all existing and new keywords into individual terms, then combine and deduplicate
all_keywords = set()
# Process already_keywords (which are comma-separated)
for keyword_str in already_keywords:
if keyword_str: # Skip empty strings
all_keywords.update(k.strip() for k in keyword_str.split(",") if k.strip())
# Process new keywords from edges_data
for edge in edges_data:
if edge.get("keywords"):
all_keywords.update(
k.strip() for k in edge["keywords"].split(",") if k.strip()
)
2025-05-08 15:52:18 +08:00
# Join all unique keywords with commas
keywords = ",".join(sorted(all_keywords))
2024-10-10 15:02:30 +08:00
source_id = GRAPH_FIELD_SEP.join(
set(
[dp["source_id"] for dp in edges_data if dp.get("source_id")]
+ already_source_ids
)
2024-10-10 15:02:30 +08:00
)
file_path = build_file_path(already_file_paths, edges_data, f"{src_id}-{tgt_id}")
2024-10-10 15:02:30 +08:00
for need_insert_id in [src_id, tgt_id]:
if not (await knowledge_graph_inst.has_node(need_insert_id)):
await knowledge_graph_inst.upsert_node(
need_insert_id,
node_data={
"entity_id": need_insert_id,
"source_id": source_id,
"description": description,
"entity_type": "UNKNOWN",
"file_path": file_path,
"created_at": int(time.time()),
},
)
2025-04-10 17:29:07 +08:00
force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"]
num_fragment = description.count(GRAPH_FIELD_SEP) + 1
num_new_fragment = len(
set([dp["description"] for dp in edges_data if dp.get("description")])
2024-10-10 15:02:30 +08:00
)
2025-04-10 17:29:07 +08:00
if num_fragment > 1:
if num_fragment >= force_llm_summary_on_merge:
status_message = f"LLM merge E: {src_id} - {tgt_id} | {num_new_fragment}+{num_fragment-num_new_fragment}"
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
description = await _handle_entity_relation_summary(
f"({src_id}, {tgt_id})",
description,
global_config,
llm_response_cache,
)
else:
status_message = f"Merge E: {src_id} - {tgt_id} | {num_new_fragment}+{num_fragment-num_new_fragment}"
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
2024-10-26 00:11:21 -04:00
await knowledge_graph_inst.upsert_edge(
2024-10-10 15:02:30 +08:00
src_id,
tgt_id,
edge_data=dict(
weight=weight,
description=description,
keywords=keywords,
source_id=source_id,
2025-03-17 23:32:35 +08:00
file_path=file_path,
2025-05-01 15:14:15 +08:00
created_at=int(time.time()),
2024-10-10 15:02:30 +08:00
),
)
edge_data = dict(
src_id=src_id,
tgt_id=tgt_id,
description=description,
keywords=keywords,
2025-02-27 23:34:57 +07:00
source_id=source_id,
2025-03-17 23:32:35 +08:00
file_path=file_path,
2025-05-27 16:07:04 +08:00
created_at=int(time.time()),
2024-10-10 15:02:30 +08:00
)
2024-10-10 15:02:30 +08:00
return edge_data
async def merge_nodes_and_edges(
chunk_results: list,
knowledge_graph_inst: BaseGraphStorage,
entity_vdb: BaseVectorStorage,
relationships_vdb: BaseVectorStorage,
global_config: dict[str, str],
pipeline_status: dict = None,
pipeline_status_lock=None,
llm_response_cache: BaseKVStorage | None = None,
current_file_number: int = 0,
total_files: int = 0,
file_path: str = "unknown_source",
) -> None:
"""Merge nodes and edges from extraction results
2025-04-28 02:15:25 +08:00
Args:
chunk_results: List of tuples (maybe_nodes, maybe_edges) containing extracted entities and relationships
knowledge_graph_inst: Knowledge graph storage
entity_vdb: Entity vector database
relationships_vdb: Relationship vector database
global_config: Global configuration
pipeline_status: Pipeline status dictionary
pipeline_status_lock: Lock for pipeline status
llm_response_cache: LLM response cache
"""
2025-04-28 02:15:25 +08:00
# Collect all nodes and edges from all chunks
all_nodes = defaultdict(list)
all_edges = defaultdict(list)
for maybe_nodes, maybe_edges in chunk_results:
# Collect nodes
for entity_name, entities in maybe_nodes.items():
all_nodes[entity_name].extend(entities)
# Collect edges with sorted keys for undirected graph
for edge_key, edges in maybe_edges.items():
sorted_edge_key = tuple(sorted(edge_key))
all_edges[sorted_edge_key].extend(edges)
# Centralized processing of all nodes and edges
total_entities_count = len(all_nodes)
total_relations_count = len(all_edges)
# Merge nodes and edges
log_message = f"Merging stage {current_file_number}/{total_files}: {file_path}"
logger.info(log_message)
async with pipeline_status_lock:
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
# Get max async tasks limit from global_config for semaphore control
graph_max_async = global_config.get("llm_model_max_async", 4) * 2
semaphore = asyncio.Semaphore(graph_max_async)
# Process and update all entities and relationships in parallel
log_message = f"Processing: {total_entities_count} entities and {total_relations_count} relations (async: {graph_max_async})"
logger.info(log_message)
async with pipeline_status_lock:
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
async def _locked_process_entity_name(entity_name, entities):
async with semaphore:
workspace = global_config.get("workspace", "")
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
async with get_storage_keyed_lock(
[entity_name], namespace=namespace, enable_logging=False
):
entity_data = await _merge_nodes_then_upsert(
entity_name,
entities,
knowledge_graph_inst,
global_config,
pipeline_status,
pipeline_status_lock,
llm_response_cache,
)
if entity_vdb is not None:
data_for_vdb = {
compute_mdhash_id(entity_data["entity_name"], prefix="ent-"): {
"entity_name": entity_data["entity_name"],
"entity_type": entity_data["entity_type"],
"content": f"{entity_data['entity_name']}\n{entity_data['description']}",
"source_id": entity_data["source_id"],
"file_path": entity_data.get("file_path", "unknown_source"),
}
}
await entity_vdb.upsert(data_for_vdb)
return entity_data
async def _locked_process_edges(edge_key, edges):
async with semaphore:
workspace = global_config.get("workspace", "")
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
# Sort the edge_key components to ensure consistent lock key generation
sorted_edge_key = sorted([edge_key[0], edge_key[1]])
# logger.info(f"Processing edge: {sorted_edge_key[0]} - {sorted_edge_key[1]}")
async with get_storage_keyed_lock(
sorted_edge_key,
namespace=namespace,
enable_logging=False,
):
edge_data = await _merge_edges_then_upsert(
edge_key[0],
edge_key[1],
edges,
knowledge_graph_inst,
global_config,
pipeline_status,
pipeline_status_lock,
llm_response_cache,
)
if edge_data is None:
return None
if relationships_vdb is not None:
data_for_vdb = {
compute_mdhash_id(
edge_data["src_id"] + edge_data["tgt_id"], prefix="rel-"
): {
"src_id": edge_data["src_id"],
"tgt_id": edge_data["tgt_id"],
"keywords": edge_data["keywords"],
"content": f"{edge_data['src_id']}\t{edge_data['tgt_id']}\n{edge_data['keywords']}\n{edge_data['description']}",
"source_id": edge_data["source_id"],
"file_path": edge_data.get("file_path", "unknown_source"),
"weight": edge_data.get("weight", 1.0),
}
}
await relationships_vdb.upsert(data_for_vdb)
return edge_data
# Create a single task queue for both entities and edges
tasks = []
2025-07-14 16:27:49 +08:00
# Add entity processing tasks
for entity_name, entities in all_nodes.items():
tasks.append(
asyncio.create_task(_locked_process_entity_name(entity_name, entities))
)
2025-07-14 16:27:49 +08:00
# Add edge processing tasks
for edge_key, edges in all_edges.items():
tasks.append(asyncio.create_task(_locked_process_edges(edge_key, edges)))
2025-07-14 16:27:49 +08:00
# Check if there are any tasks to process
if not tasks:
log_message = f"No entities or relationships to process for {file_path}"
logger.info(log_message)
if pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
return
# Execute all tasks in parallel with semaphore control and early failure detection
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
# Check if any task raised an exception
for task in done:
if task.exception():
# If a task failed, cancel all pending tasks
for pending_task in pending:
pending_task.cancel()
# Wait for cancellation to complete
if pending:
await asyncio.wait(pending)
# Re-raise the exception to notify the caller
raise task.exception()
# If all tasks completed successfully, collect results
# (No need to collect results since these tasks don't return values)
2024-10-10 15:02:30 +08:00
async def extract_entities(
2025-01-07 16:26:12 +08:00
chunks: dict[str, TextChunkSchema],
global_config: dict[str, str],
pipeline_status: dict = None,
2025-03-10 17:30:40 +08:00
pipeline_status_lock=None,
llm_response_cache: BaseKVStorage | None = None,
text_chunks_storage: BaseKVStorage | None = None,
) -> list:
2024-10-10 15:02:30 +08:00
use_llm_func: callable = global_config["llm_model_func"]
entity_extract_max_gleaning = global_config["entity_extract_max_gleaning"]
ordered_chunks = list(chunks.items())
2024-11-25 13:29:55 +08:00
# add language and example number params to prompt
2024-11-25 13:40:38 +08:00
language = global_config["addon_params"].get(
"language", PROMPTS["DEFAULT_LANGUAGE"]
)
entity_types = global_config["addon_params"].get(
"entity_types", PROMPTS["DEFAULT_ENTITY_TYPES"]
)
2024-11-25 13:29:55 +08:00
example_number = global_config["addon_params"].get("example_number", None)
2024-11-25 13:40:38 +08:00
if example_number and example_number < len(PROMPTS["entity_extraction_examples"]):
examples = "\n".join(
PROMPTS["entity_extraction_examples"][: int(example_number)]
)
2024-11-25 13:29:55 +08:00
else:
2024-11-25 13:40:38 +08:00
examples = "\n".join(PROMPTS["entity_extraction_examples"])
2024-12-03 22:25:50 +08:00
example_context_base = dict(
tuple_delimiter=PROMPTS["DEFAULT_TUPLE_DELIMITER"],
record_delimiter=PROMPTS["DEFAULT_RECORD_DELIMITER"],
completion_delimiter=PROMPTS["DEFAULT_COMPLETION_DELIMITER"],
2025-02-26 23:04:21 +08:00
entity_types=", ".join(entity_types),
2024-12-03 22:25:50 +08:00
language=language,
)
# add example's format
examples = examples.format(**example_context_base)
2024-10-10 15:02:30 +08:00
entity_extract_prompt = PROMPTS["entity_extraction"]
context_base = dict(
tuple_delimiter=PROMPTS["DEFAULT_TUPLE_DELIMITER"],
record_delimiter=PROMPTS["DEFAULT_RECORD_DELIMITER"],
completion_delimiter=PROMPTS["DEFAULT_COMPLETION_DELIMITER"],
entity_types=",".join(entity_types),
2024-11-25 13:29:55 +08:00
examples=examples,
2024-11-25 13:40:38 +08:00
language=language,
)
2025-03-17 16:58:04 +08:00
continue_prompt = PROMPTS["entity_continue_extraction"].format(**context_base)
2025-03-09 01:21:39 +08:00
if_loop_prompt = PROMPTS["entity_if_loop_extraction"]
2024-10-10 15:02:30 +08:00
processed_chunks = 0
total_chunks = len(ordered_chunks)
2025-03-17 23:36:00 +08:00
async def _process_extraction_result(
result: str, chunk_key: str, file_path: str = "unknown_source"
):
"""Process a single extraction result (either initial or gleaning)
Args:
result (str): The extraction result to process
chunk_key (str): The chunk key for source tracking
2025-03-17 23:32:35 +08:00
file_path (str): The file path for citation
Returns:
tuple: (nodes_dict, edges_dict) containing the extracted entities and relationships
"""
maybe_nodes = defaultdict(list)
maybe_edges = defaultdict(list)
2025-03-11 12:23:51 +08:00
2024-10-10 15:02:30 +08:00
records = split_string_by_multi_markers(
result,
2024-10-10 15:02:30 +08:00
[context_base["record_delimiter"], context_base["completion_delimiter"]],
)
2025-03-11 12:23:51 +08:00
2024-10-10 15:02:30 +08:00
for record in records:
record = re.search(r"\((.*)\)", record)
if record is None:
continue
record = record.group(1)
record_attributes = split_string_by_multi_markers(
record, [context_base["tuple_delimiter"]]
)
2025-03-11 12:23:51 +08:00
2024-10-10 15:02:30 +08:00
if_entities = await _handle_single_entity_extraction(
2025-03-17 23:32:35 +08:00
record_attributes, chunk_key, file_path
2024-10-10 15:02:30 +08:00
)
if if_entities is not None:
maybe_nodes[if_entities["entity_name"]].append(if_entities)
continue
2025-03-11 12:23:51 +08:00
2024-10-10 15:02:30 +08:00
if_relation = await _handle_single_relationship_extraction(
2025-03-17 23:32:35 +08:00
record_attributes, chunk_key, file_path
2024-10-10 15:02:30 +08:00
)
if if_relation is not None:
maybe_edges[(if_relation["src_id"], if_relation["tgt_id"])].append(
if_relation
)
2025-03-11 12:23:51 +08:00
return maybe_nodes, maybe_edges
async def _process_single_content(chunk_key_dp: tuple[str, TextChunkSchema]):
"""Process a single chunk
Args:
chunk_key_dp (tuple[str, TextChunkSchema]):
("chunk-xxxxxx", {"tokens": int, "content": str, "full_doc_id": str, "chunk_order_index": int})
Returns:
tuple: (maybe_nodes, maybe_edges) containing extracted entities and relationships
"""
nonlocal processed_chunks
chunk_key = chunk_key_dp[0]
chunk_dp = chunk_key_dp[1]
content = chunk_dp["content"]
2025-03-17 23:32:35 +08:00
# Get file path from chunk data or use default
file_path = chunk_dp.get("file_path", "unknown_source")
2025-03-11 12:23:51 +08:00
# Create cache keys collector for batch processing
cache_keys_collector = []
# Get initial extraction
2025-04-23 19:35:05 +08:00
hint_prompt = entity_extract_prompt.format(
**{**context_base, "input_text": content}
)
2025-03-11 12:23:51 +08:00
final_result = await use_llm_func_with_cache(
hint_prompt,
use_llm_func,
llm_response_cache=llm_response_cache,
cache_type="extract",
2025-06-09 18:52:34 +08:00
chunk_id=chunk_key,
cache_keys_collector=cache_keys_collector,
)
# Store LLM cache reference in chunk (will be handled by use_llm_func_with_cache)
history = pack_user_ass_to_openai_messages(hint_prompt, final_result)
2025-03-11 12:23:51 +08:00
2025-03-17 23:32:35 +08:00
# Process initial extraction with file path
2025-03-11 12:23:51 +08:00
maybe_nodes, maybe_edges = await _process_extraction_result(
2025-03-17 23:32:35 +08:00
final_result, chunk_key, file_path
2025-03-11 12:23:51 +08:00
)
# Process additional gleaning results
for now_glean_index in range(entity_extract_max_gleaning):
glean_result = await use_llm_func_with_cache(
continue_prompt,
use_llm_func,
llm_response_cache=llm_response_cache,
history_messages=history,
cache_type="extract",
2025-06-09 18:52:34 +08:00
chunk_id=chunk_key,
cache_keys_collector=cache_keys_collector,
)
2025-03-11 12:23:51 +08:00
history += pack_user_ass_to_openai_messages(continue_prompt, glean_result)
2025-03-11 12:23:51 +08:00
2025-03-17 23:32:35 +08:00
# Process gleaning result separately with file path
2025-03-11 12:23:51 +08:00
glean_nodes, glean_edges = await _process_extraction_result(
2025-03-17 23:32:35 +08:00
glean_result, chunk_key, file_path
2025-03-11 12:23:51 +08:00
)
# Merge results - only add entities and edges with new names
for entity_name, entities in glean_nodes.items():
2025-04-10 20:32:40 +08:00
if (
entity_name not in maybe_nodes
): # Only accetp entities with new name in gleaning stage
maybe_nodes[entity_name].extend(entities)
for edge_key, edges in glean_edges.items():
2025-04-10 20:32:40 +08:00
if (
edge_key not in maybe_edges
): # Only accetp edges with new name in gleaning stage
maybe_edges[edge_key].extend(edges)
2025-03-11 12:23:51 +08:00
if now_glean_index == entity_extract_max_gleaning - 1:
break
2025-03-11 12:23:51 +08:00
if_loop_result: str = await use_llm_func_with_cache(
if_loop_prompt,
use_llm_func,
llm_response_cache=llm_response_cache,
history_messages=history,
cache_type="extract",
cache_keys_collector=cache_keys_collector,
)
if_loop_result = if_loop_result.strip().strip('"').strip("'").lower()
if if_loop_result != "yes":
break
2025-03-11 12:23:51 +08:00
# Batch update chunk's llm_cache_list with all collected cache keys
if cache_keys_collector and text_chunks_storage:
await update_chunk_cache_list(
chunk_key,
text_chunks_storage,
cache_keys_collector,
"entity_extraction",
)
processed_chunks += 1
entities_count = len(maybe_nodes)
relations_count = len(maybe_edges)
2025-04-29 18:20:35 +08:00
log_message = f"Chunk {processed_chunks} of {total_chunks} extracted {entities_count} Ent + {relations_count} Rel"
logger.info(log_message)
if pipeline_status is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
2024-10-10 15:02:30 +08:00
# Return the extracted nodes and edges for centralized processing
return maybe_nodes, maybe_edges
# Get max async tasks limit from global_config
chunk_max_async = global_config.get("llm_model_max_async", 4)
semaphore = asyncio.Semaphore(chunk_max_async)
async def _process_with_semaphore(chunk):
async with semaphore:
return await _process_single_content(chunk)
tasks = []
for c in ordered_chunks:
task = asyncio.create_task(_process_with_semaphore(c))
tasks.append(task)
# Wait for tasks to complete or for the first exception to occur
# This allows us to cancel remaining tasks if any task fails
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
# Check if any task raised an exception
for task in done:
if task.exception():
# If a task failed, cancel all pending tasks
# This prevents unnecessary processing since the parent function will abort anyway
for pending_task in pending:
pending_task.cancel()
# Wait for cancellation to complete
if pending:
await asyncio.wait(pending)
# Re-raise the exception to notify the caller
raise task.exception()
# If all tasks completed successfully, collect results
chunk_results = [task.result() for task in tasks]
2025-04-28 02:15:25 +08:00
# Return the chunk_results for later processing in merge_nodes_and_edges
return chunk_results
2024-10-10 15:02:30 +08:00
2024-11-25 13:40:38 +08:00
2024-11-25 13:29:55 +08:00
async def kg_query(
query: str,
2025-01-07 16:26:12 +08:00
knowledge_graph_inst: BaseGraphStorage,
entities_vdb: BaseVectorStorage,
relationships_vdb: BaseVectorStorage,
2025-02-08 23:33:11 +01:00
text_chunks_db: BaseKVStorage,
2025-01-07 16:26:12 +08:00
query_param: QueryParam,
global_config: dict[str, str],
hashing_kv: BaseKVStorage | None = None,
system_prompt: str | None = None,
chunks_vdb: BaseVectorStorage = None,
2025-02-20 14:47:31 +01:00
) -> str | AsyncIterator[str]:
if query_param.model_func:
use_model_func = query_param.model_func
else:
use_model_func = global_config["llm_model_func"]
# Apply higher priority (5) to query relation LLM function
use_model_func = partial(use_model_func, _priority=5)
# Handle cache
args_hash = compute_args_hash(query_param.mode, query)
cached_response, quantized, min_val, max_val = await handle_cache(
2025-01-24 18:59:24 +08:00
hashing_kv, args_hash, query, query_param.mode, cache_type="query"
)
if cached_response is not None:
return cached_response
hl_keywords, ll_keywords = await get_keywords_from_query(
2025-01-24 18:59:24 +08:00
query, query_param, global_config, hashing_kv
2024-11-28 14:28:29 +01:00
)
2024-11-25 13:40:38 +08:00
2025-01-29 21:00:42 +08:00
logger.debug(f"High-level keywords: {hl_keywords}")
logger.debug(f"Low-level keywords: {ll_keywords}")
2025-01-24 18:59:24 +08:00
# Handle empty keywords
2024-11-25 13:29:55 +08:00
if hl_keywords == [] and ll_keywords == []:
logger.warning("low_level_keywords and high_level_keywords is empty")
2024-11-25 13:40:38 +08:00
return PROMPTS["fail_response"]
if ll_keywords == [] and query_param.mode in ["local", "hybrid"]:
2025-01-07 22:02:34 +08:00
logger.warning(
"low_level_keywords is empty, switching from %s mode to global mode",
query_param.mode,
)
query_param.mode = "global"
2024-11-25 13:40:38 +08:00
if hl_keywords == [] and query_param.mode in ["global", "hybrid"]:
2025-01-07 22:02:34 +08:00
logger.warning(
"high_level_keywords is empty, switching from %s mode to local mode",
query_param.mode,
)
query_param.mode = "local"
2025-02-18 09:09:12 +01:00
ll_keywords_str = ", ".join(ll_keywords) if ll_keywords else ""
hl_keywords_str = ", ".join(hl_keywords) if hl_keywords else ""
2024-11-25 13:40:38 +08:00
2024-11-25 13:29:55 +08:00
# Build context
context = await _build_query_context(
2025-07-08 13:31:05 +08:00
query,
2025-02-18 09:09:12 +01:00
ll_keywords_str,
hl_keywords_str,
2024-11-25 13:40:38 +08:00
knowledge_graph_inst,
entities_vdb,
relationships_vdb,
text_chunks_db,
query_param,
chunks_vdb,
2024-11-25 13:40:38 +08:00
)
2024-10-10 15:02:30 +08:00
if query_param.only_need_context:
return context if context is not None else PROMPTS["fail_response"]
2024-10-10 15:02:30 +08:00
if context is None:
return PROMPTS["fail_response"]
2025-01-24 18:59:24 +08:00
# Process conversation history
history_context = ""
if query_param.conversation_history:
history_context = get_conversation_turns(
query_param.conversation_history, query_param.history_turns
2025-01-14 22:23:14 +05:30
)
2025-05-08 03:38:47 +08:00
# Build system prompt
2025-05-08 04:29:43 +08:00
user_prompt = (
query_param.user_prompt
if query_param.user_prompt
else PROMPTS["DEFAULT_USER_PROMPT"]
)
sys_prompt_temp = system_prompt if system_prompt else PROMPTS["rag_response"]
sys_prompt = sys_prompt_temp.format(
2025-01-24 18:59:24 +08:00
context_data=context,
response_type=query_param.response_type,
history=history_context,
2025-05-08 03:38:47 +08:00
user_prompt=user_prompt,
)
if query_param.only_need_prompt:
return sys_prompt
2025-01-14 22:23:14 +05:30
tokenizer: Tokenizer = global_config["tokenizer"]
len_of_prompts = len(tokenizer.encode(query + sys_prompt))
2025-07-14 15:53:48 +08:00
logger.debug(
f"[kg_query] Sending to LLM: {len_of_prompts:,} tokens (Query: {len(tokenizer.encode(query))}, System: {len(tokenizer.encode(sys_prompt))})"
)
response = await use_model_func(
query,
system_prompt=sys_prompt,
stream=query_param.stream,
)
if isinstance(response, str) and len(response) > len(sys_prompt):
response = (
response.replace(sys_prompt, "")
.replace("user", "")
.replace("model", "")
.replace(query, "")
.replace("<system>", "")
.replace("</system>", "")
.strip()
)
2025-04-16 01:24:59 +08:00
if hashing_kv.global_config.get("enable_llm_cache"):
# Save to cache
await save_to_cache(
hashing_kv,
CacheData(
args_hash=args_hash,
content=response,
prompt=query,
quantized=quantized,
min_val=min_val,
max_val=max_val,
mode=query_param.mode,
cache_type="query",
),
)
return response
2025-01-14 22:23:14 +05:30
async def get_keywords_from_query(
query: str,
query_param: QueryParam,
global_config: dict[str, str],
hashing_kv: BaseKVStorage | None = None,
) -> tuple[list[str], list[str]]:
"""
Retrieves high-level and low-level keywords for RAG operations.
2025-04-02 21:52:06 +03:00
This function checks if keywords are already provided in query parameters,
and if not, extracts them from the query text using LLM.
2025-04-02 21:52:06 +03:00
Args:
query: The user's query text
query_param: Query parameters that may contain pre-defined keywords
global_config: Global configuration dictionary
hashing_kv: Optional key-value storage for caching results
2025-04-02 21:52:06 +03:00
Returns:
A tuple containing (high_level_keywords, low_level_keywords)
"""
2025-04-03 17:46:28 +08:00
# Check if pre-defined keywords are already provided
if query_param.hl_keywords or query_param.ll_keywords:
return query_param.hl_keywords, query_param.ll_keywords
# Extract keywords using extract_keywords_only function which already supports conversation history
hl_keywords, ll_keywords = await extract_keywords_only(
query, query_param, global_config, hashing_kv
)
return hl_keywords, ll_keywords
async def extract_keywords_only(
text: str,
param: QueryParam,
global_config: dict[str, str],
hashing_kv: BaseKVStorage | None = None,
) -> tuple[list[str], list[str]]:
"""
Extract high-level and low-level keywords from the given 'text' using the LLM.
This method does NOT build the final RAG context or provide a final answer.
It ONLY extracts keywords (hl_keywords, ll_keywords).
"""
2025-01-24 18:59:24 +08:00
# 1. Handle cache if needed - add cache type for keywords
args_hash = compute_args_hash(param.mode, text)
cached_response, quantized, min_val, max_val = await handle_cache(
2025-01-24 18:59:24 +08:00
hashing_kv, args_hash, text, param.mode, cache_type="keywords"
)
if cached_response is not None:
2025-01-24 18:59:24 +08:00
try:
keywords_data = json.loads(cached_response)
return keywords_data["high_level_keywords"], keywords_data[
"low_level_keywords"
]
except (json.JSONDecodeError, KeyError):
logger.warning(
"Invalid cache format for keywords, proceeding with extraction"
)
# 2. Build the examples
example_number = global_config["addon_params"].get("example_number", None)
if example_number and example_number < len(PROMPTS["keywords_extraction_examples"]):
examples = "\n".join(
PROMPTS["keywords_extraction_examples"][: int(example_number)]
)
else:
examples = "\n".join(PROMPTS["keywords_extraction_examples"])
language = global_config["addon_params"].get(
"language", PROMPTS["DEFAULT_LANGUAGE"]
)
2025-01-24 18:59:24 +08:00
# 3. Process conversation history
history_context = ""
if param.conversation_history:
history_context = get_conversation_turns(
param.conversation_history, param.history_turns
)
2025-01-24 18:59:24 +08:00
# 4. Build the keyword-extraction prompt
kw_prompt = PROMPTS["keywords_extraction"].format(
query=text, examples=examples, language=language, history=history_context
)
tokenizer: Tokenizer = global_config["tokenizer"]
len_of_prompts = len(tokenizer.encode(kw_prompt))
2025-07-14 15:53:48 +08:00
logger.debug(
f"[extract_keywords] Sending to LLM: {len_of_prompts:,} tokens (Prompt: {len_of_prompts})"
)
2025-01-24 18:59:24 +08:00
# 5. Call the LLM for keyword extraction
if param.model_func:
use_model_func = param.model_func
else:
use_model_func = global_config["llm_model_func"]
# Apply higher priority (5) to query relation LLM function
use_model_func = partial(use_model_func, _priority=5)
result = await use_model_func(kw_prompt, keyword_extraction=True)
2025-01-24 18:59:24 +08:00
# 6. Parse out JSON from the LLM response
result = remove_think_tags(result)
match = re.search(r"\{.*?\}", result, re.DOTALL)
if not match:
logger.error("No JSON-like structure found in the LLM respond.")
return [], []
try:
keywords_data = json.loads(match.group(0))
except json.JSONDecodeError as e:
logger.error(f"JSON parsing error: {e}")
return [], []
hl_keywords = keywords_data.get("high_level_keywords", [])
ll_keywords = keywords_data.get("low_level_keywords", [])
2025-01-24 18:59:24 +08:00
# 7. Cache only the processed keywords with cache type
if hl_keywords or ll_keywords:
2025-02-02 01:56:32 +08:00
cache_data = {
"high_level_keywords": hl_keywords,
"low_level_keywords": ll_keywords,
}
2025-04-16 01:24:59 +08:00
if hashing_kv.global_config.get("enable_llm_cache"):
await save_to_cache(
hashing_kv,
CacheData(
args_hash=args_hash,
content=json.dumps(cache_data),
prompt=text,
quantized=quantized,
min_val=min_val,
max_val=max_val,
mode=param.mode,
cache_type="keywords",
),
)
return hl_keywords, ll_keywords
2025-01-14 22:23:14 +05:30
async def _get_vector_context(
query: str,
chunks_vdb: BaseVectorStorage,
query_param: QueryParam,
2025-07-08 13:31:05 +08:00
) -> list[dict]:
"""
2025-07-08 13:31:05 +08:00
Retrieve text chunks from the vector database without reranking or truncation.
2025-07-08 13:31:05 +08:00
This function performs vector search to find relevant text chunks for a query.
Reranking and truncation will be handled later in the unified processing.
Args:
query: The query string to search for
chunks_vdb: Vector database containing document chunks
2025-07-08 13:31:05 +08:00
query_param: Query parameters including chunk_top_k and ids
Returns:
2025-07-08 13:31:05 +08:00
List of text chunks with metadata
"""
try:
2025-07-08 13:31:05 +08:00
# Use chunk_top_k if specified, otherwise fall back to top_k
search_top_k = query_param.chunk_top_k or query_param.top_k
results = await chunks_vdb.query(query, top_k=search_top_k, ids=query_param.ids)
if not results:
2025-07-08 13:31:05 +08:00
return []
valid_chunks = []
for result in results:
if "content" in result:
2025-07-08 13:31:05 +08:00
chunk_with_metadata = {
"content": result["content"],
"created_at": result.get("created_at", None),
"file_path": result.get("file_path", "unknown_source"),
2025-07-08 13:31:05 +08:00
"source_type": "vector", # Mark the source type
"chunk_id": result.get("id"), # Add chunk_id for deduplication
}
2025-07-08 13:31:05 +08:00
valid_chunks.append(chunk_with_metadata)
2025-07-09 04:36:52 +08:00
logger.info(
f"Naive query: {len(valid_chunks)} chunks (chunk_top_k: {search_top_k})"
)
2025-07-08 13:31:05 +08:00
return valid_chunks
except Exception as e:
logger.error(f"Error in _get_vector_context: {e}")
2025-07-08 13:31:05 +08:00
return []
2024-11-25 13:29:55 +08:00
async def _build_query_context(
2025-07-08 13:31:05 +08:00
query: str,
2025-02-18 09:05:51 +01:00
ll_keywords: str,
hl_keywords: str,
2025-01-07 16:26:12 +08:00
knowledge_graph_inst: BaseGraphStorage,
entities_vdb: BaseVectorStorage,
relationships_vdb: BaseVectorStorage,
2025-02-08 23:33:11 +01:00
text_chunks_db: BaseKVStorage,
2025-01-07 16:26:12 +08:00
query_param: QueryParam,
2025-07-08 13:31:05 +08:00
chunks_vdb: BaseVectorStorage = None,
2024-11-25 13:40:38 +08:00
):
logger.info(f"Process {os.getpid()} building query context...")
# Collect chunks from different sources separately
vector_chunks = []
entity_chunks = []
relation_chunks = []
2025-07-08 13:31:05 +08:00
entities_context = []
relations_context = []
2025-07-15 18:57:57 +08:00
# Store original data for later text chunk retrieval
local_entities = []
local_relations = []
global_entities = []
global_relations = []
2025-07-15 18:57:57 +08:00
2025-07-08 13:31:05 +08:00
# Handle local and global modes
if query_param.mode == "local":
local_entities, local_relations = await _get_node_data(
ll_keywords,
knowledge_graph_inst,
entities_vdb,
query_param,
2024-11-25 13:40:38 +08:00
)
2025-07-08 13:31:05 +08:00
elif query_param.mode == "global":
global_relations, global_entities = await _get_edge_data(
hl_keywords,
knowledge_graph_inst,
relationships_vdb,
query_param,
)
2025-07-08 13:31:05 +08:00
else: # hybrid or mix mode
local_entities, local_relations = await _get_node_data(
ll_keywords,
knowledge_graph_inst,
entities_vdb,
query_param,
)
global_relations, global_entities = await _get_edge_data(
hl_keywords,
knowledge_graph_inst,
relationships_vdb,
query_param,
2025-01-24 16:06:04 +01:00
)
2025-07-15 10:56:48 +08:00
# Get vector chunks first if in mix mode
2025-07-08 13:31:05 +08:00
if query_param.mode == "mix" and chunks_vdb:
vector_chunks = await _get_vector_context(
query,
chunks_vdb,
query_param,
)
# Use round-robin merge to combine local and global data fairly
final_entities = []
seen_entities = set()
# Round-robin merge entities
max_len = max(len(local_entities), len(global_entities))
for i in range(max_len):
# First from local
if i < len(local_entities):
entity = local_entities[i]
entity_name = entity.get("entity_name")
if entity_name and entity_name not in seen_entities:
final_entities.append(entity)
seen_entities.add(entity_name)
# Then from global
if i < len(global_entities):
entity = global_entities[i]
entity_name = entity.get("entity_name")
if entity_name and entity_name not in seen_entities:
final_entities.append(entity)
seen_entities.add(entity_name)
# Round-robin merge relations
final_relations = []
seen_relations = set()
max_len = max(len(local_relations), len(global_relations))
for i in range(max_len):
# First from local
if i < len(local_relations):
relation = local_relations[i]
# Build relation unique identifier
if "src_tgt" in relation:
rel_key = tuple(sorted(relation["src_tgt"]))
else:
rel_key = tuple(
sorted([relation.get("src_id"), relation.get("tgt_id")])
)
if rel_key not in seen_relations:
final_relations.append(relation)
seen_relations.add(rel_key)
# Then from global
if i < len(global_relations):
relation = global_relations[i]
# Build relation unique identifier
if "src_tgt" in relation:
rel_key = tuple(sorted(relation["src_tgt"]))
else:
rel_key = tuple(
sorted([relation.get("src_id"), relation.get("tgt_id")])
)
if rel_key not in seen_relations:
final_relations.append(relation)
seen_relations.add(rel_key)
# Generate entities context
entities_context = []
for i, n in enumerate(final_entities):
created_at = n.get("created_at", "UNKNOWN")
if isinstance(created_at, (int, float)):
created_at = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(created_at))
# Get file path from node data
file_path = n.get("file_path", "unknown_source")
2025-07-15 10:56:48 +08:00
entities_context.append(
{
"id": i + 1,
"entity": n["entity_name"],
"type": n.get("entity_type", "UNKNOWN"),
"description": n.get("description", "UNKNOWN"),
"created_at": created_at,
"file_path": file_path,
}
)
# Generate relations context
relations_context = []
for i, e in enumerate(final_relations):
created_at = e.get("created_at", "UNKNOWN")
# Convert timestamp to readable format
if isinstance(created_at, (int, float)):
created_at = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(created_at))
# Get file path from edge data
file_path = e.get("file_path", "unknown_source")
# Handle different relation data formats
if "src_tgt" in e:
entity1, entity2 = e["src_tgt"]
else:
entity1, entity2 = e.get("src_id"), e.get("tgt_id")
relations_context.append(
{
"id": i + 1,
"entity1": entity1,
"entity2": entity2,
"description": e.get("description", "UNKNOWN"),
"created_at": created_at,
"file_path": file_path,
}
)
2025-07-08 13:31:05 +08:00
2025-07-24 04:31:06 +08:00
logger.debug(
f"Initial KG query results: {len(entities_context)} entities, {len(relations_context)} relations"
2025-07-08 13:31:05 +08:00
)
2025-07-14 15:53:48 +08:00
# Unified token control system - Apply precise token limits to entities and relations
tokenizer = text_chunks_db.global_config.get("tokenizer")
2025-07-23 10:19:45 +08:00
# Get new token limits from query_param (with fallback to global_config)
max_entity_tokens = getattr(
query_param,
"max_entity_tokens",
text_chunks_db.global_config.get(
"max_entity_tokens", DEFAULT_MAX_ENTITY_TOKENS
),
)
max_relation_tokens = getattr(
query_param,
"max_relation_tokens",
text_chunks_db.global_config.get(
"max_relation_tokens", DEFAULT_MAX_RELATION_TOKENS
),
)
max_total_tokens = getattr(
query_param,
"max_total_tokens",
2025-07-23 16:16:37 +08:00
text_chunks_db.global_config.get("max_total_tokens", DEFAULT_MAX_TOTAL_TOKENS),
2025-07-23 10:19:45 +08:00
)
2025-07-14 15:53:48 +08:00
2025-07-23 10:19:45 +08:00
# Truncate entities based on complete JSON serialization
if entities_context:
# Process entities context to replace GRAPH_FIELD_SEP with : in file_path fields
for entity in entities_context:
if "file_path" in entity and entity["file_path"]:
2025-07-23 16:16:37 +08:00
entity["file_path"] = entity["file_path"].replace(GRAPH_FIELD_SEP, ";")
2025-07-15 11:52:34 +08:00
2025-07-23 10:19:45 +08:00
entities_context = truncate_list_by_token_size(
entities_context,
key=lambda x: json.dumps(x, ensure_ascii=False),
max_token_size=max_entity_tokens,
tokenizer=tokenizer,
)
2025-07-14 15:53:48 +08:00
2025-07-23 10:19:45 +08:00
# Truncate relations based on complete JSON serialization
if relations_context:
# Process relations context to replace GRAPH_FIELD_SEP with : in file_path fields
for relation in relations_context:
if "file_path" in relation and relation["file_path"]:
relation["file_path"] = relation["file_path"].replace(
GRAPH_FIELD_SEP, ";"
)
2025-07-15 11:52:34 +08:00
2025-07-23 10:19:45 +08:00
relations_context = truncate_list_by_token_size(
relations_context,
key=lambda x: json.dumps(x, ensure_ascii=False),
max_token_size=max_relation_tokens,
tokenizer=tokenizer,
)
2025-07-14 15:53:48 +08:00
2025-07-15 18:57:57 +08:00
# After truncation, get text chunks based on final entities and relations
logger.info(
f"Truncated KG query results: {len(entities_context)} entities, {len(relations_context)} relations"
)
2025-07-15 18:57:57 +08:00
# Create filtered data based on truncated context
final_node_datas = []
if entities_context and final_entities:
2025-07-15 21:35:12 +08:00
final_entity_names = {e["entity"] for e in entities_context}
seen_nodes = set()
for node in final_entities:
2025-07-15 21:35:12 +08:00
name = node.get("entity_name")
if name in final_entity_names and name not in seen_nodes:
final_node_datas.append(node)
seen_nodes.add(name)
2025-07-15 18:57:57 +08:00
2025-07-15 21:35:12 +08:00
final_edge_datas = []
if relations_context and final_relations:
2025-07-15 21:35:12 +08:00
final_relation_pairs = {(r["entity1"], r["entity2"]) for r in relations_context}
seen_edges = set()
for edge in final_relations:
2025-07-15 21:35:12 +08:00
src, tgt = edge.get("src_id"), edge.get("tgt_id")
if src is None or tgt is None:
src, tgt = edge.get("src_tgt", (None, None))
pair = (src, tgt)
if pair in final_relation_pairs and pair not in seen_edges:
final_edge_datas.append(edge)
seen_edges.add(pair)
2025-07-15 18:57:57 +08:00
# Get text chunks based on final filtered data
if final_node_datas:
entity_chunks = await _find_most_related_text_unit_from_entities(
final_node_datas,
query_param,
text_chunks_db,
knowledge_graph_inst,
2025-07-15 18:57:57 +08:00
)
if final_edge_datas:
relation_chunks = await _find_related_text_unit_from_relationships(
final_edge_datas,
query_param,
text_chunks_db,
entity_chunks,
2025-07-15 18:57:57 +08:00
)
# Round-robin merge chunks from different sources with deduplication by chunk_id
merged_chunks = []
seen_chunk_ids = set()
max_len = max(len(vector_chunks), len(entity_chunks), len(relation_chunks))
2025-07-24 01:31:49 +08:00
origin_len = len(vector_chunks) + len(entity_chunks) + len(relation_chunks)
for i in range(max_len):
# Add from vector chunks first (Naive mode)
if i < len(vector_chunks):
chunk = vector_chunks[i]
chunk_id = chunk.get("chunk_id") or chunk.get("id")
if chunk_id and chunk_id not in seen_chunk_ids:
seen_chunk_ids.add(chunk_id)
merged_chunks.append(
{
"content": chunk["content"],
"file_path": chunk.get("file_path", "unknown_source"),
}
)
# Add from entity chunks (Local mode)
if i < len(entity_chunks):
chunk = entity_chunks[i]
chunk_id = chunk.get("chunk_id") or chunk.get("id")
if chunk_id and chunk_id not in seen_chunk_ids:
seen_chunk_ids.add(chunk_id)
merged_chunks.append(
{
"content": chunk["content"],
"file_path": chunk.get("file_path", "unknown_source"),
}
)
2025-07-15 18:57:57 +08:00
# Add from relation chunks (Global mode)
if i < len(relation_chunks):
chunk = relation_chunks[i]
chunk_id = chunk.get("chunk_id") or chunk.get("id")
if chunk_id and chunk_id not in seen_chunk_ids:
seen_chunk_ids.add(chunk_id)
merged_chunks.append(
{
"content": chunk["content"],
"file_path": chunk.get("file_path", "unknown_source"),
}
)
2025-07-24 01:31:49 +08:00
logger.debug(
f"Round-robin merged total chunks from {origin_len} to {len(merged_chunks)}"
)
# Apply token processing to merged chunks
2025-07-15 18:57:57 +08:00
text_units_context = []
if merged_chunks:
2025-07-14 15:53:48 +08:00
# Calculate dynamic token limit for text chunks
entities_str = json.dumps(entities_context, ensure_ascii=False)
relations_str = json.dumps(relations_context, ensure_ascii=False)
# Calculate base context tokens (entities + relations + template)
kg_context_template = """-----Entities(KG)-----
```json
{entities_str}
```
-----Relationships(KG)-----
```json
{relations_str}
```
-----Document Chunks(DC)-----
```json
[]
```
"""
kg_context = kg_context_template.format(
entities_str=entities_str, relations_str=relations_str
)
kg_context_tokens = len(tokenizer.encode(kg_context))
# Calculate actual system prompt overhead dynamically
# 1. Calculate conversation history tokens
history_context = ""
if query_param.conversation_history:
history_context = get_conversation_turns(
query_param.conversation_history, query_param.history_turns
)
history_tokens = (
len(tokenizer.encode(history_context)) if history_context else 0
)
# 2. Calculate system prompt template tokens (excluding context_data)
user_prompt = query_param.user_prompt if query_param.user_prompt else ""
response_type = (
query_param.response_type
if query_param.response_type
else "Multiple Paragraphs"
)
# Get the system prompt template from PROMPTS
sys_prompt_template = text_chunks_db.global_config.get(
"system_prompt_template", PROMPTS["rag_response"]
)
# Create a sample system prompt with placeholders filled (excluding context_data)
sample_sys_prompt = sys_prompt_template.format(
history=history_context,
context_data="", # Empty for overhead calculation
response_type=response_type,
user_prompt=user_prompt,
)
sys_prompt_template_tokens = len(tokenizer.encode(sample_sys_prompt))
# Total system prompt overhead = template + query tokens
query_tokens = len(tokenizer.encode(query))
sys_prompt_overhead = sys_prompt_template_tokens + query_tokens
buffer_tokens = 100 # Safety buffer as requested
# Calculate available tokens for text chunks
used_tokens = kg_context_tokens + sys_prompt_overhead + buffer_tokens
available_chunk_tokens = max_total_tokens - used_tokens
logger.debug(
f"Token allocation - Total: {max_total_tokens}, History: {history_tokens}, SysPrompt: {sys_prompt_overhead}, KG: {kg_context_tokens}, Buffer: {buffer_tokens}, Available for chunks: {available_chunk_tokens}"
)
# Apply token truncation to chunks using the dynamic limit
truncated_chunks = await process_chunks_unified(
query=query,
unique_chunks=merged_chunks,
query_param=query_param,
global_config=text_chunks_db.global_config,
source_type=query_param.mode,
chunk_token_limit=available_chunk_tokens, # Pass dynamic limit
)
2025-07-14 15:53:48 +08:00
# Rebuild text_units_context with truncated chunks
for i, chunk in enumerate(truncated_chunks):
text_units_context.append(
{
"id": i + 1,
"content": chunk["content"],
"file_path": chunk.get("file_path", "unknown_source"),
}
2025-07-14 15:53:48 +08:00
)
logger.debug(
f"Final chunk processing: {len(merged_chunks)} -> {len(text_units_context)} (chunk available tokens: {available_chunk_tokens})"
)
2025-07-14 15:53:48 +08:00
2025-07-14 18:43:22 +08:00
logger.info(
f"Final context: {len(entities_context)} entities, {len(relations_context)} relations, {len(text_units_context)} chunks"
)
# not necessary to use LLM to generate a response
if not entities_context and not relations_context:
return None
2025-01-29 22:14:18 +08:00
entities_str = json.dumps(entities_context, ensure_ascii=False)
relations_str = json.dumps(relations_context, ensure_ascii=False)
text_units_str = json.dumps(text_units_context, ensure_ascii=False)
2025-05-08 12:53:28 +08:00
result = f"""-----Entities(KG)-----
2025-04-22 21:17:01 +08:00
```json
{entities_str}
```
2025-05-08 12:53:28 +08:00
-----Relationships(KG)-----
2025-04-22 21:17:01 +08:00
```json
{relations_str}
```
2025-05-08 12:53:28 +08:00
-----Document Chunks(DC)-----
2025-04-22 21:17:01 +08:00
```json
{text_units_str}
```
"""
return result
2024-11-25 13:29:55 +08:00
async def _get_node_data(
2025-02-18 09:05:51 +01:00
query: str,
2025-01-07 16:26:12 +08:00
knowledge_graph_inst: BaseGraphStorage,
entities_vdb: BaseVectorStorage,
query_param: QueryParam,
2024-10-10 15:02:30 +08:00
):
# get similar entities
2025-02-13 04:12:00 +08:00
logger.info(
f"Query nodes: {query}, top_k: {query_param.top_k}, cosine: {entities_vdb.cosine_better_than_threshold}"
)
results = await entities_vdb.query(
query, top_k=query_param.top_k, ids=query_param.ids
)
2024-10-10 15:02:30 +08:00
if not len(results):
return [], []
2025-04-15 12:34:04 +08:00
# Extract all entity IDs from your results list
node_ids = [r["entity_name"] for r in results]
# Call the batch node retrieval and degree functions concurrently.
nodes_dict, degrees_dict = await asyncio.gather(
2025-04-15 12:34:04 +08:00
knowledge_graph_inst.get_nodes_batch(node_ids),
knowledge_graph_inst.node_degrees_batch(node_ids),
2024-10-10 15:02:30 +08:00
)
2025-01-24 16:06:04 +01:00
# Now, if you need the node data and degree in order:
node_datas = [nodes_dict.get(nid) for nid in node_ids]
node_degrees = [degrees_dict.get(nid, 0) for nid in node_ids]
2024-10-10 15:02:30 +08:00
if not all([n is not None for n in node_datas]):
logger.warning("Some nodes are missing, maybe the storage is damaged")
2024-11-25 13:40:38 +08:00
2024-10-10 15:02:30 +08:00
node_datas = [
2025-05-01 15:14:15 +08:00
{
**n,
"entity_name": k["entity_name"],
"rank": d,
"created_at": k.get("created_at"),
}
2024-10-10 15:02:30 +08:00
for k, n, d in zip(results, node_datas, node_degrees)
if n is not None
2025-07-15 18:57:57 +08:00
]
use_relations = await _find_most_related_edges_from_entities(
2025-04-18 16:14:31 +02:00
node_datas,
query_param,
knowledge_graph_inst,
2024-10-10 15:02:30 +08:00
)
2024-10-10 15:02:30 +08:00
logger.info(
2025-07-15 18:57:57 +08:00
f"Local query: {len(node_datas)} entites, {len(use_relations)} relations"
2024-11-25 13:40:38 +08:00
)
2024-11-25 13:29:55 +08:00
# Entities are sorted by cosine similarity
# Relations are sorted by rank + weight
return node_datas, use_relations
2024-10-10 15:02:30 +08:00
2024-10-10 15:02:30 +08:00
async def _find_most_related_text_unit_from_entities(
2025-01-07 16:26:12 +08:00
node_datas: list[dict],
query_param: QueryParam,
2025-02-08 23:33:11 +01:00
text_chunks_db: BaseKVStorage,
2025-01-07 16:26:12 +08:00
knowledge_graph_inst: BaseGraphStorage,
2024-10-10 15:02:30 +08:00
):
"""
Find text chunks related to entities using linear gradient weighted polling algorithm.
This function implements the optimized text chunk selection strategy:
1. Sort text chunks for each entity by occurrence count in other entities
2. Use linear gradient weighted polling to select chunks fairly
"""
logger.debug(f"Searching text chunks for {len(node_datas)} entities")
2024-11-11 10:45:22 +08:00
if not node_datas:
return []
2025-04-15 12:34:04 +08:00
# Step 1: Collect all text chunks for each entity
entities_with_chunks = []
for entity in node_datas:
if entity.get("source_id"):
chunks = split_string_by_multi_markers(
entity["source_id"], [GRAPH_FIELD_SEP]
)
if chunks:
entities_with_chunks.append(
{
"entity_name": entity["entity_name"],
"chunks": chunks,
"entity_data": entity,
}
)
2024-11-11 10:45:22 +08:00
if not entities_with_chunks:
logger.warning("No entities with text chunks found")
return []
2024-11-11 10:45:22 +08:00
# Step 2: Count chunk occurrences and deduplicate (keep chunks from earlier positioned entities)
chunk_occurrence_count = {}
for entity_info in entities_with_chunks:
deduplicated_chunks = []
for chunk_id in entity_info["chunks"]:
chunk_occurrence_count[chunk_id] = (
chunk_occurrence_count.get(chunk_id, 0) + 1
)
2025-03-05 15:12:01 +08:00
# If this is the first occurrence (count == 1), keep it; otherwise skip (duplicate from later position)
if chunk_occurrence_count[chunk_id] == 1:
deduplicated_chunks.append(chunk_id)
# count > 1 means this chunk appeared in an earlier entity, so skip it
2024-11-21 14:35:18 +08:00
# Update entity's chunks to deduplicated chunks
entity_info["chunks"] = deduplicated_chunks
2025-04-06 17:45:32 +08:00
# Step 3: Sort chunks for each entity by occurrence count (higher count = higher priority)
for entity_info in entities_with_chunks:
sorted_chunks = sorted(
entity_info["chunks"],
key=lambda chunk_id: chunk_occurrence_count.get(chunk_id, 0),
reverse=True,
)
entity_info["sorted_chunks"] = sorted_chunks
2025-01-24 16:06:04 +01:00
# Step 4: Apply linear gradient weighted polling algorithm
max_related_chunks = text_chunks_db.global_config.get(
"related_chunk_number", DEFAULT_RELATED_CHUNK_NUMBER
)
2025-01-24 16:06:04 +01:00
selected_chunk_ids = linear_gradient_weighted_polling(
entities_with_chunks, max_related_chunks, min_related_chunks=1
)
2024-11-11 10:45:22 +08:00
2025-07-24 01:31:49 +08:00
logger.debug(
f"Found {len(selected_chunk_ids)} entity-related chunks using linear gradient weighted polling"
)
if not selected_chunk_ids:
2024-11-05 18:36:59 -08:00
return []
2024-11-11 10:45:22 +08:00
# Step 5: Batch retrieve chunk data
unique_chunk_ids = list(
dict.fromkeys(selected_chunk_ids)
) # Remove duplicates while preserving order
chunk_data_list = await text_chunks_db.get_by_ids(unique_chunk_ids)
2024-11-11 10:45:22 +08:00
# Step 6: Build result chunks with valid data
2025-07-08 13:31:05 +08:00
result_chunks = []
for chunk_id, chunk_data in zip(unique_chunk_ids, chunk_data_list):
if chunk_data is not None and "content" in chunk_data:
chunk_data_copy = chunk_data.copy()
chunk_data_copy["source_type"] = "entity"
chunk_data_copy["chunk_id"] = chunk_id # Add chunk_id for deduplication
result_chunks.append(chunk_data_copy)
2025-07-08 13:31:05 +08:00
return result_chunks
2024-10-10 15:02:30 +08:00
2024-10-10 15:02:30 +08:00
async def _find_most_related_edges_from_entities(
2025-01-07 16:26:12 +08:00
node_datas: list[dict],
query_param: QueryParam,
knowledge_graph_inst: BaseGraphStorage,
2024-10-10 15:02:30 +08:00
):
node_names = [dp["entity_name"] for dp in node_datas]
batch_edges_dict = await knowledge_graph_inst.get_nodes_edges_batch(node_names)
2024-11-14 15:59:37 +08:00
all_edges = []
seen = set()
for node_name in node_names:
this_edges = batch_edges_dict.get(node_name, [])
2024-11-14 15:59:37 +08:00
for e in this_edges:
sorted_edge = tuple(sorted(e))
if sorted_edge not in seen:
seen.add(sorted_edge)
all_edges.append(sorted_edge)
# Prepare edge pairs in two forms:
# For the batch edge properties function, use dicts.
edge_pairs_dicts = [{"src": e[0], "tgt": e[1]} for e in all_edges]
# For edge degrees, use tuples.
edge_pairs_tuples = list(all_edges) # all_edges is already a list of tuples
2025-04-15 12:34:04 +08:00
# Call the batched functions concurrently.
edge_data_dict, edge_degrees_dict = await asyncio.gather(
knowledge_graph_inst.get_edges_batch(edge_pairs_dicts),
2025-04-15 12:34:04 +08:00
knowledge_graph_inst.edge_degrees_batch(edge_pairs_tuples),
2024-10-10 15:02:30 +08:00
)
2025-04-15 12:34:04 +08:00
# Reconstruct edge_datas list in the same order as the deduplicated results.
all_edges_data = []
for pair in all_edges:
edge_props = edge_data_dict.get(pair)
if edge_props is not None:
if "weight" not in edge_props:
2025-05-11 11:24:52 +08:00
logger.warning(
f"Edge {pair} missing 'weight' attribute, using default value 1.0"
2025-05-11 11:24:52 +08:00
)
edge_props["weight"] = 1.0
2025-05-11 11:24:52 +08:00
combined = {
"src_tgt": pair,
"rank": edge_degrees_dict.get(pair, 0),
**edge_props,
}
all_edges_data.append(combined)
2024-10-10 15:02:30 +08:00
all_edges_data = sorted(
all_edges_data, key=lambda x: (x["rank"], x["weight"]), reverse=True
)
2024-10-10 15:02:30 +08:00
return all_edges_data
2024-11-25 13:29:55 +08:00
async def _get_edge_data(
2025-01-07 16:26:12 +08:00
keywords,
knowledge_graph_inst: BaseGraphStorage,
relationships_vdb: BaseVectorStorage,
query_param: QueryParam,
2024-10-10 15:02:30 +08:00
):
2025-02-13 04:12:00 +08:00
logger.info(
f"Query edges: {keywords}, top_k: {query_param.top_k}, cosine: {relationships_vdb.cosine_better_than_threshold}"
)
results = await relationships_vdb.query(
keywords, top_k=query_param.top_k, ids=query_param.ids
)
2024-10-10 15:02:30 +08:00
if not len(results):
return [], []
# Prepare edge pairs in two forms:
# For the batch edge properties function, use dicts.
edge_pairs_dicts = [{"src": r["src_id"], "tgt": r["tgt_id"]} for r in results]
edge_data_dict = await knowledge_graph_inst.get_edges_batch(edge_pairs_dicts)
# Reconstruct edge_datas list in the same order as results.
edge_datas = []
for k in results:
pair = (k["src_id"], k["tgt_id"])
edge_props = edge_data_dict.get(pair)
if edge_props is not None:
if "weight" not in edge_props:
2025-05-11 11:24:52 +08:00
logger.warning(
f"Edge {pair} missing 'weight' attribute, using default value 1.0"
2025-05-11 11:24:52 +08:00
)
edge_props["weight"] = 1.0
2025-05-11 11:24:52 +08:00
# Keep edge data without rank, maintain vector search order
combined = {
"src_id": k["src_id"],
"tgt_id": k["tgt_id"],
"created_at": k.get("created_at", None),
**edge_props,
}
edge_datas.append(combined)
2025-04-15 12:34:04 +08:00
# Relations maintain vector search order (sorted by similarity)
2025-07-15 18:57:57 +08:00
use_entities = await _find_most_related_entities_from_relationships(
edge_datas,
query_param,
knowledge_graph_inst,
2024-10-10 15:02:30 +08:00
)
2025-07-15 18:57:57 +08:00
2024-10-10 15:02:30 +08:00
logger.info(
2025-07-15 18:57:57 +08:00
f"Global query: {len(use_entities)} entites, {len(edge_datas)} relations"
2024-10-10 15:02:30 +08:00
)
2024-11-25 13:29:55 +08:00
return edge_datas, use_entities
2024-10-10 15:02:30 +08:00
2024-10-10 15:02:30 +08:00
async def _find_most_related_entities_from_relationships(
2025-01-07 16:26:12 +08:00
edge_datas: list[dict],
query_param: QueryParam,
knowledge_graph_inst: BaseGraphStorage,
2024-10-10 15:02:30 +08:00
):
2024-11-14 15:59:37 +08:00
entity_names = []
seen = set()
2024-10-10 15:02:30 +08:00
for e in edge_datas:
2024-11-14 15:59:37 +08:00
if e["src_id"] not in seen:
entity_names.append(e["src_id"])
seen.add(e["src_id"])
if e["tgt_id"] not in seen:
entity_names.append(e["tgt_id"])
seen.add(e["tgt_id"])
# Only get nodes data, no need for node degrees
nodes_dict = await knowledge_graph_inst.get_nodes_batch(entity_names)
# Rebuild the list in the same order as entity_names
node_datas = []
for entity_name in entity_names:
node = nodes_dict.get(entity_name)
if node is None:
logger.warning(f"Node '{entity_name}' not found in batch retrieval.")
continue
# Combine the node data with the entity name, no rank needed
combined = {**node, "entity_name": entity_name}
node_datas.append(combined)
2024-10-10 15:02:30 +08:00
return node_datas
2024-10-10 15:02:30 +08:00
async def _find_related_text_unit_from_relationships(
2025-01-07 16:26:12 +08:00
edge_datas: list[dict],
query_param: QueryParam,
2025-02-08 23:33:11 +01:00
text_chunks_db: BaseKVStorage,
entity_chunks: list[dict] = None,
2024-10-10 15:02:30 +08:00
):
"""
Find text chunks related to relationships using linear gradient weighted polling algorithm.
This function implements the optimized text chunk selection strategy:
1. Sort text chunks for each relationship by occurrence count in other relationships
2. Use linear gradient weighted polling to select chunks fairly
"""
2025-07-15 18:59:52 +08:00
logger.debug(f"Searching text chunks for {len(edge_datas)} relationships")
if not edge_datas:
return []
# Step 1: Collect all text chunks for each relationship
relations_with_chunks = []
for relation in edge_datas:
if relation.get("source_id"):
chunks = split_string_by_multi_markers(
relation["source_id"], [GRAPH_FIELD_SEP]
2025-07-15 23:57:24 +08:00
)
if chunks:
# Build relation identifier
if "src_tgt" in relation:
rel_key = tuple(sorted(relation["src_tgt"]))
else:
rel_key = tuple(
sorted([relation.get("src_id"), relation.get("tgt_id")])
)
2025-01-24 16:06:04 +01:00
relations_with_chunks.append(
{
"relation_key": rel_key,
"chunks": chunks,
"relation_data": relation,
}
)
if not relations_with_chunks:
logger.warning("No relationships with text chunks found")
return []
# Step 2: Count chunk occurrences and deduplicate (keep chunks from earlier positioned relationships)
chunk_occurrence_count = {}
for relation_info in relations_with_chunks:
deduplicated_chunks = []
for chunk_id in relation_info["chunks"]:
chunk_occurrence_count[chunk_id] = (
chunk_occurrence_count.get(chunk_id, 0) + 1
)
# If this is the first occurrence (count == 1), keep it; otherwise skip (duplicate from later position)
if chunk_occurrence_count[chunk_id] == 1:
deduplicated_chunks.append(chunk_id)
# count > 1 means this chunk appeared in an earlier relationship, so skip it
# Update relationship's chunks to deduplicated chunks
relation_info["chunks"] = deduplicated_chunks
# Step 3: Sort chunks for each relationship by occurrence count (higher count = higher priority)
for relation_info in relations_with_chunks:
sorted_chunks = sorted(
relation_info["chunks"],
key=lambda chunk_id: chunk_occurrence_count.get(chunk_id, 0),
reverse=True,
)
relation_info["sorted_chunks"] = sorted_chunks
# Step 4: Apply linear gradient weighted polling algorithm
max_related_chunks = text_chunks_db.global_config.get(
"related_chunk_number", DEFAULT_RELATED_CHUNK_NUMBER
)
selected_chunk_ids = linear_gradient_weighted_polling(
relations_with_chunks, max_related_chunks, min_related_chunks=1
)
2025-07-24 01:31:49 +08:00
logger.debug(
f"Found {len(selected_chunk_ids)} relationship-related chunks using linear gradient weighted polling"
)
logger.info(
f"KG related chunks: {len(entity_chunks)} from entitys, {len(selected_chunk_ids)} from relations"
)
if not selected_chunk_ids:
return []
# Step 4.5: Remove duplicates with entity_chunks before batch retrieval
if entity_chunks:
# Extract chunk IDs from entity_chunks
entity_chunk_ids = set()
for chunk in entity_chunks:
2025-07-24 02:13:39 +08:00
chunk_id = chunk.get("chunk_id")
if chunk_id:
entity_chunk_ids.add(chunk_id)
# Filter out duplicate chunk IDs
original_count = len(selected_chunk_ids)
selected_chunk_ids = [
chunk_id
for chunk_id in selected_chunk_ids
if chunk_id not in entity_chunk_ids
]
logger.debug(
2025-07-24 02:13:39 +08:00
f"Deduplication relation-chunks with entity-chunks: {original_count} -> {len(selected_chunk_ids)} chunks "
)
# Early return if no chunks remain after deduplication
if not selected_chunk_ids:
return []
# Step 5: Batch retrieve chunk data
unique_chunk_ids = list(
dict.fromkeys(selected_chunk_ids)
) # Remove duplicates while preserving order
chunk_data_list = await text_chunks_db.get_by_ids(unique_chunk_ids)
# Step 6: Build result chunks with valid data
2025-07-08 13:31:05 +08:00
result_chunks = []
for chunk_id, chunk_data in zip(unique_chunk_ids, chunk_data_list):
if chunk_data is not None and "content" in chunk_data:
chunk_data_copy = chunk_data.copy()
chunk_data_copy["source_type"] = "relationship"
chunk_data_copy["chunk_id"] = chunk_id # Add chunk_id for deduplication
result_chunks.append(chunk_data_copy)
2024-10-10 15:02:30 +08:00
2025-07-08 13:31:05 +08:00
return result_chunks
2024-10-10 15:02:30 +08:00
2024-10-10 15:02:30 +08:00
async def naive_query(
query: str,
2025-01-07 16:26:12 +08:00
chunks_vdb: BaseVectorStorage,
query_param: QueryParam,
global_config: dict[str, str],
hashing_kv: BaseKVStorage | None = None,
system_prompt: str | None = None,
2025-02-14 23:49:39 +01:00
) -> str | AsyncIterator[str]:
if query_param.model_func:
use_model_func = query_param.model_func
else:
use_model_func = global_config["llm_model_func"]
# Apply higher priority (5) to query relation LLM function
use_model_func = partial(use_model_func, _priority=5)
# Handle cache
args_hash = compute_args_hash(query_param.mode, query)
cached_response, quantized, min_val, max_val = await handle_cache(
2025-02-11 11:42:46 +08:00
hashing_kv, args_hash, query, query_param.mode, cache_type="query"
)
if cached_response is not None:
return cached_response
tokenizer: Tokenizer = global_config["tokenizer"]
2025-05-07 03:57:14 +08:00
2025-07-08 13:31:05 +08:00
chunks = await _get_vector_context(query, chunks_vdb, query_param)
2025-07-08 13:31:05 +08:00
if chunks is None or len(chunks) == 0:
return PROMPTS["fail_response"]
2025-07-14 15:53:48 +08:00
# Calculate dynamic token limit for chunks
# Get token limits from query_param (with fallback to global_config)
max_total_tokens = getattr(
2025-07-15 17:24:57 +08:00
query_param,
"max_total_tokens",
global_config.get("max_total_tokens", DEFAULT_MAX_TOTAL_TOKENS),
2025-07-14 15:53:48 +08:00
)
# Calculate conversation history tokens
history_context = ""
if query_param.conversation_history:
history_context = get_conversation_turns(
query_param.conversation_history, query_param.history_turns
)
history_tokens = len(tokenizer.encode(history_context)) if history_context else 0
# Calculate system prompt template tokens (excluding content_data)
user_prompt = query_param.user_prompt if query_param.user_prompt else ""
response_type = (
query_param.response_type
if query_param.response_type
else "Multiple Paragraphs"
)
# Use the provided system prompt or default
sys_prompt_template = (
system_prompt if system_prompt else PROMPTS["naive_rag_response"]
)
# Create a sample system prompt with empty content_data to calculate overhead
sample_sys_prompt = sys_prompt_template.format(
content_data="", # Empty for overhead calculation
response_type=response_type,
history=history_context,
user_prompt=user_prompt,
)
sys_prompt_template_tokens = len(tokenizer.encode(sample_sys_prompt))
# Total system prompt overhead = template + query tokens
query_tokens = len(tokenizer.encode(query))
sys_prompt_overhead = sys_prompt_template_tokens + query_tokens
buffer_tokens = 100 # Safety buffer
# Calculate available tokens for chunks
used_tokens = sys_prompt_overhead + buffer_tokens
available_chunk_tokens = max_total_tokens - used_tokens
logger.debug(
f"Naive query token allocation - Total: {max_total_tokens}, History: {history_tokens}, SysPrompt: {sys_prompt_overhead}, Buffer: {buffer_tokens}, Available for chunks: {available_chunk_tokens}"
)
# Process chunks using unified processing with dynamic token limit
2025-07-08 13:31:05 +08:00
processed_chunks = await process_chunks_unified(
query=query,
unique_chunks=chunks,
2025-07-08 13:31:05 +08:00
query_param=query_param,
global_config=global_config,
source_type="vector",
2025-07-14 15:53:48 +08:00
chunk_token_limit=available_chunk_tokens, # Pass dynamic limit
2025-07-08 13:31:05 +08:00
)
2025-07-09 12:10:06 +08:00
logger.info(f"Final context: {len(processed_chunks)} chunks")
2025-07-08 13:31:05 +08:00
# Build text_units_context from processed chunks
text_units_context = []
for i, chunk in enumerate(processed_chunks):
text_units_context.append(
{
"id": i + 1,
"content": chunk["content"],
"file_path": chunk.get("file_path", "unknown_source"),
}
)
text_units_str = json.dumps(text_units_context, ensure_ascii=False)
2024-10-10 15:02:30 +08:00
if query_param.only_need_context:
return f"""
---Document Chunks---
```json
{text_units_str}
```
"""
# Process conversation history
history_context = ""
if query_param.conversation_history:
history_context = get_conversation_turns(
query_param.conversation_history, query_param.history_turns
)
2025-05-08 03:38:47 +08:00
# Build system prompt
2025-05-08 04:29:43 +08:00
user_prompt = (
query_param.user_prompt
if query_param.user_prompt
else PROMPTS["DEFAULT_USER_PROMPT"]
)
sys_prompt_temp = system_prompt if system_prompt else PROMPTS["naive_rag_response"]
2024-10-10 15:02:30 +08:00
sys_prompt = sys_prompt_temp.format(
content_data=text_units_str,
2025-01-25 16:57:47 +08:00
response_type=query_param.response_type,
history=history_context,
2025-05-08 03:38:47 +08:00
user_prompt=user_prompt,
2024-10-10 15:02:30 +08:00
)
if query_param.only_need_prompt:
return sys_prompt
len_of_prompts = len(tokenizer.encode(query + sys_prompt))
2025-07-14 15:53:48 +08:00
logger.debug(
f"[naive_query] Sending to LLM: {len_of_prompts:,} tokens (Query: {len(tokenizer.encode(query))}, System: {len(tokenizer.encode(sys_prompt))})"
)
2024-10-10 15:02:30 +08:00
response = await use_model_func(
query,
system_prompt=sys_prompt,
stream=query_param.stream,
2024-10-10 15:02:30 +08:00
)
2024-10-14 19:41:07 +08:00
if isinstance(response, str) and len(response) > len(sys_prompt):
response = (
2025-01-07 16:26:12 +08:00
response[len(sys_prompt) :]
.replace(sys_prompt, "")
.replace("user", "")
.replace("model", "")
.replace(query, "")
.replace("<system>", "")
.replace("</system>", "")
.strip()
)
2024-10-10 15:02:30 +08:00
2025-04-16 01:24:59 +08:00
if hashing_kv.global_config.get("enable_llm_cache"):
# Save to cache
await save_to_cache(
hashing_kv,
CacheData(
args_hash=args_hash,
content=response,
prompt=query,
quantized=quantized,
min_val=min_val,
max_val=max_val,
mode=query_param.mode,
cache_type="query",
),
)
2024-11-06 11:18:14 -05:00
return response
2025-05-08 03:38:47 +08:00
# TODO: Deprecated, use user_prompt in QueryParam instead
2025-01-24 18:59:24 +08:00
async def kg_query_with_keywords(
query: str,
2025-01-07 16:26:12 +08:00
knowledge_graph_inst: BaseGraphStorage,
entities_vdb: BaseVectorStorage,
relationships_vdb: BaseVectorStorage,
2025-02-08 23:33:11 +01:00
text_chunks_db: BaseKVStorage,
2025-01-07 16:26:12 +08:00
query_param: QueryParam,
global_config: dict[str, str],
hashing_kv: BaseKVStorage | None = None,
ll_keywords: list[str] = [],
hl_keywords: list[str] = [],
chunks_vdb: BaseVectorStorage | None = None,
2025-02-15 00:10:37 +01:00
) -> str | AsyncIterator[str]:
"""
2025-01-24 18:59:24 +08:00
Refactored kg_query that does NOT extract keywords by itself.
It expects hl_keywords and ll_keywords to be set in query_param, or defaults to empty.
Then it uses those to build context and produce a final LLM response.
"""
if query_param.model_func:
use_model_func = query_param.model_func
else:
use_model_func = global_config["llm_model_func"]
# Apply higher priority (5) to query relation LLM function
use_model_func = partial(use_model_func, _priority=5)
2025-01-24 18:59:24 +08:00
args_hash = compute_args_hash(query_param.mode, query)
cached_response, quantized, min_val, max_val = await handle_cache(
2025-01-24 18:59:24 +08:00
hashing_kv, args_hash, query, query_param.mode, cache_type="query"
)
if cached_response is not None:
return cached_response
2025-01-24 18:59:24 +08:00
# If neither has any keywords, you could handle that logic here.
if not hl_keywords and not ll_keywords:
logger.warning(
"No keywords found in query_param. Could default to global mode or fail."
)
return PROMPTS["fail_response"]
if not ll_keywords and query_param.mode in ["local", "hybrid"]:
logger.warning("low_level_keywords is empty, switching to global mode.")
query_param.mode = "global"
if not hl_keywords and query_param.mode in ["global", "hybrid"]:
logger.warning("high_level_keywords is empty, switching to local mode.")
query_param.mode = "local"
ll_keywords_str = ", ".join(ll_keywords) if ll_keywords else ""
hl_keywords_str = ", ".join(hl_keywords) if hl_keywords else ""
2025-01-24 18:59:24 +08:00
context = await _build_query_context(
2025-07-08 13:31:05 +08:00
query,
2025-02-18 09:05:51 +01:00
ll_keywords_str,
hl_keywords_str,
2025-01-24 18:59:24 +08:00
knowledge_graph_inst,
entities_vdb,
relationships_vdb,
text_chunks_db,
query_param,
chunks_vdb=chunks_vdb,
)
2025-01-24 18:59:24 +08:00
if not context:
return PROMPTS["fail_response"]
if query_param.only_need_context:
2025-01-24 18:59:24 +08:00
return context
2025-01-24 18:59:24 +08:00
# Process conversation history
history_context = ""
if query_param.conversation_history:
history_context = get_conversation_turns(
query_param.conversation_history, query_param.history_turns
)
sys_prompt_temp = PROMPTS["rag_response"]
sys_prompt = sys_prompt_temp.format(
context_data=context,
response_type=query_param.response_type,
2025-01-24 18:59:24 +08:00
history=history_context,
)
if query_param.only_need_prompt:
return sys_prompt
tokenizer: Tokenizer = global_config["tokenizer"]
len_of_prompts = len(tokenizer.encode(query + sys_prompt))
2025-07-14 15:53:48 +08:00
logger.debug(
f"[kg_query_with_keywords] Sending to LLM: {len_of_prompts:,} tokens (Query: {len(tokenizer.encode(query))}, System: {len(tokenizer.encode(sys_prompt))})"
)
2025-03-17 23:32:35 +08:00
# 6. Generate response
response = await use_model_func(
query,
system_prompt=sys_prompt,
stream=query_param.stream,
)
2025-03-01 17:45:06 +08:00
2025-03-17 23:32:35 +08:00
# Clean up response content
if isinstance(response, str) and len(response) > len(sys_prompt):
response = (
response.replace(sys_prompt, "")
.replace("user", "")
.replace("model", "")
.replace(query, "")
.replace("<system>", "")
.replace("</system>", "")
.strip()
)
2025-04-16 01:24:59 +08:00
if hashing_kv.global_config.get("enable_llm_cache"):
await save_to_cache(
hashing_kv,
CacheData(
args_hash=args_hash,
content=response,
prompt=query,
quantized=quantized,
min_val=min_val,
max_val=max_val,
mode=query_param.mode,
cache_type="query",
),
)
2025-03-01 17:45:06 +08:00
return response
2025-03-11 15:43:04 +08:00
2025-03-11 15:44:01 +08:00
2025-05-08 03:38:47 +08:00
# TODO: Deprecated, use user_prompt in QueryParam instead
2025-03-11 15:43:04 +08:00
async def query_with_keywords(
query: str,
prompt: str,
param: QueryParam,
knowledge_graph_inst: BaseGraphStorage,
entities_vdb: BaseVectorStorage,
relationships_vdb: BaseVectorStorage,
chunks_vdb: BaseVectorStorage,
text_chunks_db: BaseKVStorage,
global_config: dict[str, str],
hashing_kv: BaseKVStorage | None = None,
) -> str | AsyncIterator[str]:
"""
Extract keywords from the query and then use them for retrieving information.
2025-03-11 15:44:01 +08:00
2025-03-11 15:43:04 +08:00
1. Extracts high-level and low-level keywords from the query
2. Formats the query with the extracted keywords and prompt
3. Uses the appropriate query method based on param.mode
2025-03-11 15:44:01 +08:00
2025-03-11 15:43:04 +08:00
Args:
query: The user's query
prompt: Additional prompt to prepend to the query
param: Query parameters
knowledge_graph_inst: Knowledge graph storage
entities_vdb: Entities vector database
relationships_vdb: Relationships vector database
chunks_vdb: Document chunks vector database
text_chunks_db: Text chunks storage
global_config: Global configuration
hashing_kv: Cache storage
2025-03-11 15:44:01 +08:00
2025-03-11 15:43:04 +08:00
Returns:
Query response or async iterator
"""
# Extract keywords
hl_keywords, ll_keywords = await get_keywords_from_query(
query=query,
query_param=param,
2025-03-11 15:43:04 +08:00
global_config=global_config,
hashing_kv=hashing_kv,
)
# Create a new string with the prompt and the keywords
keywords_str = ", ".join(ll_keywords + hl_keywords)
formatted_question = (
f"{prompt}\n\n### Keywords\n\n{keywords_str}\n\n### Query\n\n{query}"
)
2025-03-11 15:43:04 +08:00
# Use appropriate query method based on mode
if param.mode in ["local", "global", "hybrid", "mix"]:
2025-03-11 15:43:04 +08:00
return await kg_query_with_keywords(
formatted_question,
knowledge_graph_inst,
entities_vdb,
relationships_vdb,
text_chunks_db,
param,
global_config,
hashing_kv=hashing_kv,
hl_keywords=hl_keywords,
ll_keywords=ll_keywords,
chunks_vdb=chunks_vdb,
2025-03-11 15:43:04 +08:00
)
elif param.mode == "naive":
return await naive_query(
formatted_question,
chunks_vdb,
text_chunks_db,
param,
global_config,
hashing_kv=hashing_kv,
)
else:
raise ValueError(f"Unknown mode {param.mode}")