2025-02-14 22:50:49 +01:00
from __future__ import annotations
2025-04-17 16:08:58 +08:00
import traceback
2024-10-10 15:02:30 +08:00
import asyncio
2025-02-11 03:54:54 +08:00
import configparser
2025-02-20 00:26:35 +01:00
import os
2025-05-27 16:07:04 +08:00
import time
2025-03-04 01:07:34 +08:00
import warnings
2024-10-10 15:02:30 +08:00
from dataclasses import asdict , dataclass , field
2025-05-01 02:16:19 +08:00
from datetime import datetime , timezone
2024-10-10 15:02:30 +08:00
from functools import partial
2025-04-18 16:14:31 +02:00
from typing import (
Any ,
AsyncIterator ,
Callable ,
Iterator ,
cast ,
final ,
Literal ,
Optional ,
List ,
Dict ,
)
2025-05-06 22:00:43 +08:00
from lightrag . constants import (
2025-07-01 17:13:33 +08:00
DEFAULT_MAX_GLEANING ,
2025-05-06 22:00:43 +08:00
DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE ,
)
2025-05-10 08:58:18 +08:00
from lightrag . utils import get_env_value
2025-02-20 12:54:52 +01:00
2025-02-20 13:44:17 +01:00
from lightrag . kg import (
STORAGES ,
verify_storage_implementation ,
)
2025-02-20 13:21:41 +01:00
2025-04-14 00:07:31 +08:00
from lightrag . kg . shared_storage import (
get_namespace_data ,
get_pipeline_status_lock ,
2025-06-23 09:57:56 +08:00
get_graph_db_lock ,
2025-04-14 00:07:31 +08:00
)
2025-02-09 19:21:49 +01:00
from . base import (
BaseGraphStorage ,
BaseKVStorage ,
BaseVectorStorage ,
DocProcessingStatus ,
DocStatus ,
DocStatusStorage ,
QueryParam ,
StorageNameSpace ,
2025-02-19 03:46:18 +08:00
StoragesStatus ,
2025-06-23 17:59:27 +08:00
DeletionResult ,
2025-02-09 19:21:49 +01:00
)
2025-07-06 00:16:47 +08:00
from . namespace import NameSpace
2024-10-10 15:02:30 +08:00
from . operate import (
chunking_by_token_size ,
2025-02-09 11:24:08 +01:00
extract_entities ,
2025-04-28 01:14:00 +08:00
merge_nodes_and_edges ,
2025-02-09 11:24:08 +01:00
kg_query ,
naive_query ,
2025-03-11 15:43:04 +08:00
query_with_keywords ,
2025-06-09 18:52:34 +08:00
_rebuild_knowledge_from_chunks ,
2024-10-10 15:02:30 +08:00
)
2025-06-25 12:37:57 +08:00
from . constants import GRAPH_FIELD_SEP
2024-10-10 15:02:30 +08:00
from . utils import (
2025-04-17 10:56:23 +02:00
Tokenizer ,
TiktokenTokenizer ,
2024-10-10 15:02:30 +08:00
EmbeddingFunc ,
2025-02-20 13:18:17 +01:00
always_get_an_event_loop ,
2024-10-10 15:02:30 +08:00
compute_mdhash_id ,
convert_response_to_json ,
2025-02-20 13:18:17 +01:00
lazy_external_import ,
2025-04-28 18:12:29 +08:00
priority_limit_async_func_call ,
2025-03-11 15:43:04 +08:00
get_content_summary ,
clean_text ,
check_storage_env_vars ,
2024-10-10 15:02:30 +08:00
logger ,
)
2025-02-20 14:29:36 +01:00
from . types import KnowledgeGraph
2025-02-22 13:25:12 +08:00
from dotenv import load_dotenv
2025-03-29 03:48:38 +08:00
# use the .env that is inside the current folder
# allows to use different .env file for each lightrag instance
# the OS environment variables take precedence over the .env file
load_dotenv ( dotenv_path = " .env " , override = False )
2025-02-09 11:24:08 +01:00
2025-02-20 13:39:46 +01:00
# TODO: TO REMOVE @Yannick
2025-02-11 03:54:54 +08:00
config = configparser . ConfigParser ( )
config . read ( " config.ini " , " utf-8 " )
2025-02-20 13:09:33 +01:00
2025-02-20 13:05:35 +01:00
@final
2024-10-10 15:02:30 +08:00
@dataclass
class LightRAG :
2025-02-09 00:23:55 +01:00
""" LightRAG: Simple and Fast Retrieval-Augmented Generation. """
2025-02-20 13:13:38 +01:00
# Directory
# ---
2025-07-07 00:57:21 +08:00
working_dir : str = field ( default = " ./rag_storage " )
2025-02-09 00:23:55 +01:00
""" Directory where cache and temporary files are stored. """
2025-02-20 13:13:38 +01:00
# Storage
# ---
2025-02-09 00:23:55 +01:00
2024-11-12 13:32:40 +08:00
kv_storage : str = field ( default = " JsonKVStorage " )
2025-02-09 00:23:55 +01:00
""" Storage backend for key-value data. """
2024-11-08 14:58:41 +08:00
vector_storage : str = field ( default = " NanoVectorDBStorage " )
2025-02-09 00:23:55 +01:00
""" Storage backend for vector embeddings. """
2024-11-08 14:58:41 +08:00
graph_storage : str = field ( default = " NetworkXStorage " )
2025-02-09 00:23:55 +01:00
""" Storage backend for knowledge graphs. """
2024-11-01 11:01:50 -04:00
2025-02-12 22:25:34 +08:00
doc_status_storage : str = field ( default = " JsonDocStatusStorage " )
""" Storage type for tracking document processing statuses. """
2025-07-07 00:57:21 +08:00
# Workspace
# ---
workspace : str = field ( default_factory = lambda : os . getenv ( " WORKSPACE " , " " ) )
""" Workspace for data isolation. Defaults to empty string if WORKSPACE environment variable is not set. """
2025-03-04 01:07:34 +08:00
# Logging (Deprecated, use setup_logger in utils.py instead)
2025-02-20 13:13:38 +01:00
# ---
2025-03-04 01:28:08 +08:00
log_level : int | None = field ( default = None )
log_file_path : str | None = field ( default = None )
2024-11-01 11:01:50 -04:00
2025-02-20 13:13:38 +01:00
# Entity extraction
# ---
2025-07-01 17:13:33 +08:00
entity_extract_max_gleaning : int = field (
default = get_env_value ( " MAX_GLEANING " , DEFAULT_MAX_GLEANING , int )
)
2025-02-20 13:13:38 +01:00
""" Maximum number of entity extraction attempts for ambiguous content. """
2025-04-10 17:29:07 +08:00
force_llm_summary_on_merge : int = field (
2025-05-06 22:00:43 +08:00
default = get_env_value (
" FORCE_LLM_SUMMARY_ON_MERGE " , DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE , int
)
2025-02-20 13:13:38 +01:00
)
2025-02-09 00:23:55 +01:00
# Text chunking
2025-02-20 13:13:38 +01:00
# ---
2025-02-20 13:05:59 +01:00
chunk_token_size : int = field ( default = int ( os . getenv ( " CHUNK_SIZE " , 1200 ) ) )
2025-02-09 00:23:55 +01:00
""" Maximum number of tokens per text chunk when splitting documents. """
2025-02-20 13:09:33 +01:00
chunk_overlap_token_size : int = field (
default = int ( os . getenv ( " CHUNK_OVERLAP_SIZE " , 100 ) )
)
2025-02-09 00:23:55 +01:00
""" Number of overlapping tokens between consecutive text chunks to preserve context. """
2025-04-17 10:56:23 +02:00
tokenizer : Optional [ Tokenizer ] = field ( default = None )
"""
A function that returns a Tokenizer instance .
If None , and a ` tiktoken_model_name ` is provided , a TiktokenTokenizer will be created .
If both are None , the default TiktokenTokenizer is used .
"""
2024-10-10 15:02:30 +08:00
2025-04-17 10:56:23 +02:00
tiktoken_model_name : str = field ( default = " gpt-4o-mini " )
""" Model name used for tokenization when chunking text with tiktoken. Defaults to `gpt-4o-mini`. """
2024-10-10 15:02:30 +08:00
2025-02-20 13:13:38 +01:00
chunking_func : Callable [
[
2025-04-17 10:56:23 +02:00
Tokenizer ,
2025-02-20 13:13:38 +01:00
str ,
2025-04-17 10:56:23 +02:00
Optional [ str ] ,
2025-02-20 13:13:38 +01:00
bool ,
int ,
int ,
] ,
2025-04-17 10:56:23 +02:00
List [ Dict [ str , Any ] ] ,
2025-02-20 13:13:38 +01:00
] = field ( default_factory = lambda : chunking_by_token_size )
"""
Custom chunking function for splitting text into chunks before processing .
The function should take the following parameters :
2025-04-17 10:56:23 +02:00
- ` tokenizer ` : A Tokenizer instance to use for tokenization .
2025-02-20 13:13:38 +01:00
- ` content ` : The text to be split into chunks .
- ` split_by_character ` : The character to split the text on . If None , the text is split into chunks of ` chunk_token_size ` tokens .
- ` split_by_character_only ` : If True , the text is split only on the specified character .
- ` chunk_token_size ` : The maximum number of tokens per chunk .
- ` chunk_overlap_token_size ` : The number of overlapping tokens between consecutive chunks .
The function should return a list of dictionaries , where each dictionary contains the following keys :
- ` tokens ` : The number of tokens in the chunk .
- ` content ` : The text content of the chunk .
Defaults to ` chunking_by_token_size ` if not specified .
"""
# Embedding
# ---
2025-02-20 13:06:16 +01:00
embedding_func : EmbeddingFunc | None = field ( default = None )
2025-02-09 00:23:55 +01:00
""" Function for computing text embeddings. Must be set before use. """
2024-10-10 15:02:30 +08:00
2025-07-08 11:00:09 +08:00
embedding_batch_num : int = field ( default = int ( os . getenv ( " EMBEDDING_BATCH_NUM " , 10 ) ) )
2025-02-09 00:23:55 +01:00
""" Batch size for embedding computations. """
2025-03-21 21:51:52 +08:00
embedding_func_max_async : int = field (
2025-07-13 03:13:52 +08:00
default = int ( os . getenv ( " EMBEDDING_FUNC_MAX_ASYNC " , 8 ) )
2025-03-21 21:51:52 +08:00
)
2025-02-09 00:23:55 +01:00
""" Maximum number of concurrent embedding function calls. """
2025-02-20 13:13:38 +01:00
embedding_cache_config : dict [ str , Any ] = field (
2025-02-20 14:17:26 +01:00
default_factory = lambda : {
2025-02-20 13:13:38 +01:00
" enabled " : False ,
" similarity_threshold " : 0.95 ,
" use_llm_check " : False ,
}
)
""" Configuration for embedding cache.
- enabled : If True , enables caching to avoid redundant computations .
- similarity_threshold : Minimum similarity score to use cached embeddings .
- use_llm_check : If True , validates cached embeddings using an LLM .
"""
2025-02-09 00:23:55 +01:00
# LLM Configuration
2025-02-20 13:13:38 +01:00
# ---
2025-02-20 13:06:16 +01:00
llm_model_func : Callable [ . . . , object ] | None = field ( default = None )
2025-02-09 00:23:55 +01:00
""" Function for interacting with the large language model (LLM). Must be set before use. """
2025-02-20 13:06:16 +01:00
llm_model_name : str = field ( default = " gpt-4o-mini " )
2025-02-09 00:23:55 +01:00
""" Name of the LLM model used for generating responses. """
2024-10-10 15:02:30 +08:00
2025-07-14 01:55:04 +08:00
llm_model_max_token_size : int = field ( default = int ( os . getenv ( " MAX_TOKENS " , 32000 ) ) )
2025-02-09 00:23:55 +01:00
""" Maximum number of tokens allowed per LLM response. """
2025-03-16 23:56:34 +08:00
llm_model_max_async : int = field ( default = int ( os . getenv ( " MAX_ASYNC " , 4 ) ) )
2025-02-09 00:23:55 +01:00
""" Maximum number of concurrent LLM calls. """
llm_model_kwargs : dict [ str , Any ] = field ( default_factory = dict )
""" Additional keyword arguments passed to the LLM model function. """
2025-07-07 22:44:59 +08:00
# Rerank Configuration
# ---
rerank_model_func : Callable [ . . . , object ] | None = field ( default = None )
2025-07-08 11:16:34 +08:00
""" Function for reranking retrieved documents. All rerank configurations (model name, API keys, top_k, etc.) should be included in this function. Optional. """
2025-07-07 22:44:59 +08:00
2025-02-09 00:23:55 +01:00
# Storage
2025-02-20 13:13:38 +01:00
# ---
2025-02-09 00:23:55 +01:00
vector_db_storage_cls_kwargs : dict [ str , Any ] = field ( default_factory = dict )
""" Additional parameters for vector database storage. """
2024-10-10 15:02:30 +08:00
2025-02-20 13:06:34 +01:00
enable_llm_cache : bool = field ( default = True )
2025-02-09 00:23:55 +01:00
""" Enables caching for LLM responses to avoid redundant computations. """
2025-02-20 13:06:34 +01:00
enable_llm_cache_for_entity_extract : bool = field ( default = True )
2025-02-09 00:23:55 +01:00
""" If True, enables caching for entity extraction steps to reduce LLM costs. """
# Extensions
2025-02-20 13:13:38 +01:00
# ---
2025-03-16 23:56:34 +08:00
max_parallel_insert : int = field ( default = int ( os . getenv ( " MAX_PARALLEL_INSERT " , 2 ) ) )
2025-02-20 12:57:25 +01:00
""" Maximum number of parallel insert operations. """
2025-02-20 13:09:33 +01:00
2025-07-07 21:53:57 +08:00
max_graph_nodes : int = field ( default = get_env_value ( " MAX_GRAPH_NODES " , 1000 , int ) )
""" Maximum number of graph nodes to return in knowledge graph queries. """
2025-03-04 14:02:14 +08:00
addon_params : dict [ str , Any ] = field (
default_factory = lambda : {
2025-05-06 22:00:43 +08:00
" language " : get_env_value ( " SUMMARY_LANGUAGE " , " English " , str )
2025-03-04 14:02:14 +08:00
}
)
2024-10-10 15:02:30 +08:00
2025-02-19 05:27:38 +08:00
# Storages Management
2025-02-20 13:13:38 +01:00
# ---
2025-02-20 13:06:34 +01:00
auto_manage_storages_states : bool = field ( default = True )
2025-02-19 05:27:38 +08:00
""" If True, lightrag will automatically calls initialize_storages and finalize_storages at the appropriate times. """
2025-02-19 03:46:18 +08:00
2025-02-20 13:13:38 +01:00
# Storages Management
# ---
2025-02-20 13:09:33 +01:00
convert_response_to_json_func : Callable [ [ str ] , dict [ str , Any ] ] = field (
default_factory = lambda : convert_response_to_json
2025-02-09 13:18:47 +01:00
)
2025-02-20 13:09:33 +01:00
"""
Custom function for converting LLM responses to JSON format .
The default function is : func : ` . utils . convert_response_to_json ` .
"""
2024-10-10 15:02:30 +08:00
2025-02-20 13:44:17 +01:00
cosine_better_than_threshold : float = field (
default = float ( os . getenv ( " COSINE_THRESHOLD " , 0.2 ) )
)
2025-02-20 13:30:30 +01:00
_storages_status : StoragesStatus = field ( default = StoragesStatus . NOT_CREATED )
2024-10-19 09:43:17 +05:30
def __post_init__ ( self ) :
2025-02-27 19:05:51 +08:00
from lightrag . kg . shared_storage import (
initialize_share_data ,
)
2025-02-28 21:35:04 +08:00
2025-03-04 01:07:34 +08:00
# Handle deprecated parameters
2025-03-04 01:28:08 +08:00
if self . log_level is not None :
2025-03-04 01:07:34 +08:00
warnings . warn (
" WARNING: log_level parameter is deprecated, use setup_logger in utils.py instead " ,
UserWarning ,
stacklevel = 2 ,
)
2025-03-04 01:28:08 +08:00
if self . log_file_path is not None :
2025-03-04 01:07:34 +08:00
warnings . warn (
" WARNING: log_file_path parameter is deprecated, use setup_logger in utils.py instead " ,
UserWarning ,
stacklevel = 2 ,
)
2025-03-04 01:28:39 +08:00
2025-03-04 01:28:08 +08:00
# Remove these attributes to prevent their use
if hasattr ( self , " log_level " ) :
delattr ( self , " log_level " )
if hasattr ( self , " log_file_path " ) :
2025-03-04 01:07:34 +08:00
delattr ( self , " log_file_path " )
2025-02-27 19:03:53 +08:00
initialize_share_data ( )
2025-01-16 12:52:37 +08:00
if not os . path . exists ( self . working_dir ) :
logger . info ( f " Creating working directory { self . working_dir } " )
os . makedirs ( self . working_dir )
2024-10-19 09:43:17 +05:30
2025-02-11 00:55:52 +08:00
# Verify storage implementation compatibility and environment variables
storage_configs = [
( " KV_STORAGE " , self . kv_storage ) ,
( " VECTOR_STORAGE " , self . vector_storage ) ,
( " GRAPH_STORAGE " , self . graph_storage ) ,
( " DOC_STATUS_STORAGE " , self . doc_status_storage ) ,
]
for storage_type , storage_name in storage_configs :
# Verify storage implementation compatibility
2025-02-20 13:39:46 +01:00
verify_storage_implementation ( storage_type , storage_name )
2025-02-11 00:55:52 +08:00
# Check environment variables
2025-03-11 15:43:04 +08:00
check_storage_env_vars ( storage_name )
2025-02-11 00:55:52 +08:00
2025-02-13 03:25:48 +08:00
# Ensure vector_db_storage_cls_kwargs has required fields
self . vector_db_storage_cls_kwargs = {
2025-02-20 13:44:17 +01:00
" cosine_better_than_threshold " : self . cosine_better_than_threshold ,
2025-02-13 04:12:00 +08:00
* * self . vector_db_storage_cls_kwargs ,
2025-02-13 03:25:48 +08:00
}
2025-04-17 10:56:23 +02:00
# Init Tokenizer
# Post-initialization hook to handle backward compatabile tokenizer initialization based on provided parameters
if self . tokenizer is None :
if self . tiktoken_model_name :
self . tokenizer = TiktokenTokenizer ( self . tiktoken_model_name )
else :
self . tokenizer = TiktokenTokenizer ( )
2025-04-20 14:51:11 +08:00
# Fix global_config now
global_config = asdict ( self )
2025-07-03 04:18:25 +08:00
2025-04-20 14:51:11 +08:00
_print_config = " , \n " . join ( [ f " { k } = { v } " for k , v in global_config . items ( ) ] )
logger . debug ( f " LightRAG init with param: \n { _print_config } \n " )
2025-04-17 10:56:23 +02:00
# Init Embedding
2025-04-28 18:12:29 +08:00
self . embedding_func = priority_limit_async_func_call (
self . embedding_func_max_async
) ( self . embedding_func )
2024-11-01 08:47:52 -04:00
2025-01-16 12:52:37 +08:00
# Initialize all storages
2025-02-14 23:31:27 +01:00
self . key_string_value_json_storage_cls : type [ BaseKVStorage ] = (
2025-01-16 12:58:15 +08:00
self . _get_storage_class ( self . kv_storage )
2025-02-14 23:31:27 +01:00
) # type: ignore
self . vector_db_storage_cls : type [ BaseVectorStorage ] = self . _get_storage_class (
2024-11-12 13:32:40 +08:00
self . vector_storage
2025-02-14 23:31:27 +01:00
) # type: ignore
self . graph_storage_cls : type [ BaseGraphStorage ] = self . _get_storage_class (
2024-11-12 13:32:40 +08:00
self . graph_storage
2025-02-14 23:31:27 +01:00
) # type: ignore
self . key_string_value_json_storage_cls = partial ( # type: ignore
2025-01-16 12:58:15 +08:00
self . key_string_value_json_storage_cls , global_config = global_config
2025-01-16 12:52:37 +08:00
)
2025-02-14 23:31:27 +01:00
self . vector_db_storage_cls = partial ( # type: ignore
2025-01-16 12:58:15 +08:00
self . vector_db_storage_cls , global_config = global_config
2024-11-12 13:32:40 +08:00
)
2025-02-14 23:31:27 +01:00
self . graph_storage_cls = partial ( # type: ignore
2025-01-16 12:58:15 +08:00
self . graph_storage_cls , global_config = global_config
2025-01-16 12:52:37 +08:00
)
2025-02-11 10:17:51 +08:00
# Initialize document status storage
self . doc_status_storage_cls = self . _get_storage_class ( self . doc_status_storage )
2025-02-11 03:55:15 +08:00
2025-02-14 23:31:27 +01:00
self . llm_response_cache : BaseKVStorage = self . key_string_value_json_storage_cls ( # type: ignore
2025-07-06 00:16:47 +08:00
namespace = NameSpace . KV_STORE_LLM_RESPONSE_CACHE ,
2025-07-07 00:57:21 +08:00
workspace = self . workspace ,
global_config = global_config ,
2025-02-11 10:17:51 +08:00
embedding_func = self . embedding_func ,
)
2024-10-15 19:40:08 +08:00
2025-02-14 23:31:27 +01:00
self . full_docs : BaseKVStorage = self . key_string_value_json_storage_cls ( # type: ignore
2025-07-06 00:16:47 +08:00
namespace = NameSpace . KV_STORE_FULL_DOCS ,
2025-07-07 00:57:21 +08:00
workspace = self . workspace ,
2024-11-12 13:32:40 +08:00
embedding_func = self . embedding_func ,
2024-11-08 14:58:41 +08:00
)
2025-05-07 02:03:57 +08:00
2025-02-14 23:31:27 +01:00
self . text_chunks : BaseKVStorage = self . key_string_value_json_storage_cls ( # type: ignore
2025-07-06 00:16:47 +08:00
namespace = NameSpace . KV_STORE_TEXT_CHUNKS ,
2025-07-07 00:57:21 +08:00
workspace = self . workspace ,
2024-11-12 13:32:40 +08:00
embedding_func = self . embedding_func ,
2024-11-08 14:58:41 +08:00
)
2025-06-28 12:11:53 +08:00
2025-02-14 23:31:27 +01:00
self . chunk_entity_relation_graph : BaseGraphStorage = self . graph_storage_cls ( # type: ignore
2025-07-06 00:16:47 +08:00
namespace = NameSpace . GRAPH_STORE_CHUNK_ENTITY_RELATION ,
2025-07-07 00:57:21 +08:00
workspace = self . workspace ,
2024-12-03 16:04:58 +08:00
embedding_func = self . embedding_func ,
2024-11-08 14:58:41 +08:00
)
2025-02-14 23:31:27 +01:00
self . entities_vdb : BaseVectorStorage = self . vector_db_storage_cls ( # type: ignore
2025-07-06 00:16:47 +08:00
namespace = NameSpace . VECTOR_STORE_ENTITIES ,
2025-07-07 00:57:21 +08:00
workspace = self . workspace ,
2024-10-19 09:43:17 +05:30
embedding_func = self . embedding_func ,
2025-03-17 23:32:35 +08:00
meta_fields = { " entity_name " , " source_id " , " content " , " file_path " } ,
2024-10-10 15:02:30 +08:00
)
2025-02-14 23:31:27 +01:00
self . relationships_vdb : BaseVectorStorage = self . vector_db_storage_cls ( # type: ignore
2025-07-06 00:16:47 +08:00
namespace = NameSpace . VECTOR_STORE_RELATIONSHIPS ,
2025-07-07 00:57:21 +08:00
workspace = self . workspace ,
2024-10-19 09:43:17 +05:30
embedding_func = self . embedding_func ,
2025-03-17 23:32:35 +08:00
meta_fields = { " src_id " , " tgt_id " , " source_id " , " content " , " file_path " } ,
2024-10-10 15:02:30 +08:00
)
2025-02-14 23:31:27 +01:00
self . chunks_vdb : BaseVectorStorage = self . vector_db_storage_cls ( # type: ignore
2025-07-06 00:16:47 +08:00
namespace = NameSpace . VECTOR_STORE_CHUNKS ,
2025-07-07 00:57:21 +08:00
workspace = self . workspace ,
2024-10-19 09:43:17 +05:30
embedding_func = self . embedding_func ,
2025-03-17 23:32:35 +08:00
meta_fields = { " full_doc_id " , " content " , " file_path " } ,
2024-10-10 15:02:30 +08:00
)
2024-10-19 09:43:17 +05:30
2025-02-12 22:25:34 +08:00
# Initialize document status storage
self . doc_status : DocStatusStorage = self . doc_status_storage_cls (
2025-07-06 00:16:47 +08:00
namespace = NameSpace . DOC_STATUS ,
2025-07-07 00:57:21 +08:00
workspace = self . workspace ,
2025-02-12 22:25:34 +08:00
global_config = global_config ,
embedding_func = None ,
)
2025-03-09 22:15:26 +08:00
# Directly use llm_response_cache, don't create a new object
hashing_kv = self . llm_response_cache
2025-02-14 23:33:59 +01:00
2025-04-28 18:12:29 +08:00
self . llm_model_func = priority_limit_async_func_call ( self . llm_model_max_async ) (
2024-10-28 17:05:38 +02:00
partial (
2025-02-14 23:31:27 +01:00
self . llm_model_func , # type: ignore
2025-01-16 12:52:37 +08:00
hashing_kv = hashing_kv ,
2024-10-28 17:05:38 +02:00
* * self . llm_model_kwargs ,
)
2024-10-10 15:02:30 +08:00
)
2024-11-06 11:18:14 -05:00
2025-07-07 22:44:59 +08:00
# Init Rerank
2025-07-15 11:52:34 +08:00
if self . rerank_model_func :
2025-07-07 22:44:59 +08:00
logger . info ( " Rerank model initialized for improved retrieval quality " )
2025-07-15 11:52:34 +08:00
else :
2025-07-07 22:44:59 +08:00
logger . warning (
" Rerank is enabled but no rerank_model_func provided. Reranking will be skipped. "
)
2025-02-20 13:30:30 +01:00
self . _storages_status = StoragesStatus . CREATED
2025-02-19 03:46:18 +08:00
2025-02-19 05:27:38 +08:00
if self . auto_manage_storages_states :
2025-02-25 04:16:22 +07:00
self . _run_async_safely ( self . initialize_storages , " Storage Initialization " )
2025-02-19 03:46:18 +08:00
def __del__ ( self ) :
2025-02-19 05:27:38 +08:00
if self . auto_manage_storages_states :
2025-02-25 04:16:22 +07:00
self . _run_async_safely ( self . finalize_storages , " Storage Finalization " )
def _run_async_safely ( self , async_func , action_name = " " ) :
""" Safely execute an async function, avoiding event loop conflicts. """
try :
2025-02-19 03:46:18 +08:00
loop = always_get_an_event_loop ( )
2025-02-25 04:16:22 +07:00
if loop . is_running ( ) :
task = loop . create_task ( async_func ( ) )
task . add_done_callback (
2025-02-25 04:18:52 +07:00
lambda t : logger . info ( f " { action_name } completed! " )
2025-02-25 04:16:22 +07:00
)
else :
loop . run_until_complete ( async_func ( ) )
except RuntimeError :
logger . warning (
f " No running event loop, creating a new loop for { action_name } . "
)
loop = asyncio . new_event_loop ( )
loop . run_until_complete ( async_func ( ) )
loop . close ( )
2025-02-19 03:46:18 +08:00
async def initialize_storages ( self ) :
""" Asynchronously initialize the storages """
2025-02-20 13:30:30 +01:00
if self . _storages_status == StoragesStatus . CREATED :
2025-02-19 03:46:18 +08:00
tasks = [ ]
for storage in (
self . full_docs ,
self . text_chunks ,
self . entities_vdb ,
self . relationships_vdb ,
self . chunks_vdb ,
self . chunk_entity_relation_graph ,
self . llm_response_cache ,
self . doc_status ,
) :
if storage :
tasks . append ( storage . initialize ( ) )
await asyncio . gather ( * tasks )
2025-02-20 13:30:30 +01:00
self . _storages_status = StoragesStatus . INITIALIZED
2025-02-19 03:46:18 +08:00
logger . debug ( " Initialized Storages " )
async def finalize_storages ( self ) :
""" Asynchronously finalize the storages """
2025-02-20 13:30:30 +01:00
if self . _storages_status == StoragesStatus . INITIALIZED :
2025-02-19 03:46:18 +08:00
tasks = [ ]
for storage in (
self . full_docs ,
self . text_chunks ,
self . entities_vdb ,
self . relationships_vdb ,
self . chunks_vdb ,
self . chunk_entity_relation_graph ,
self . llm_response_cache ,
self . doc_status ,
) :
if storage :
tasks . append ( storage . finalize ( ) )
await asyncio . gather ( * tasks )
2025-02-20 13:30:30 +01:00
self . _storages_status = StoragesStatus . FINALIZED
2025-02-19 04:30:52 +08:00
logger . debug ( " Finalized Storages " )
2025-02-19 03:46:18 +08:00
2025-02-20 15:09:43 +01:00
async def get_graph_labels ( self ) :
text = await self . chunk_entity_relation_graph . get_all_labels ( )
return text
2025-02-20 14:29:36 +01:00
async def get_knowledge_graph (
2025-03-05 11:37:55 +08:00
self ,
node_label : str ,
2025-03-07 07:34:29 +08:00
max_depth : int = 3 ,
2025-07-07 21:53:57 +08:00
max_nodes : int = None ,
2025-02-20 14:29:36 +01:00
) - > KnowledgeGraph :
2025-03-04 16:08:05 +08:00
""" Get knowledge graph for a given label
Args :
node_label ( str ) : Label to get knowledge graph for
max_depth ( int ) : Maximum depth of graph
2025-07-07 21:53:57 +08:00
max_nodes ( int , optional ) : Maximum number of nodes to return . Defaults to self . max_graph_nodes .
2025-03-04 16:08:05 +08:00
Returns :
KnowledgeGraph : Knowledge graph containing nodes and edges
"""
2025-07-07 21:53:57 +08:00
# Use self.max_graph_nodes as default if max_nodes is None
if max_nodes is None :
max_nodes = self . max_graph_nodes
else :
# Limit max_nodes to not exceed self.max_graph_nodes
max_nodes = min ( max_nodes , self . max_graph_nodes )
2025-03-07 08:17:25 +08:00
2025-04-02 18:36:05 +08:00
return await self . chunk_entity_relation_graph . get_knowledge_graph (
node_label , max_depth , max_nodes
)
2025-02-20 14:29:36 +01:00
2025-02-14 22:50:49 +01:00
def _get_storage_class ( self , storage_name : str ) - > Callable [ . . . , Any ] :
2025-01-16 12:52:37 +08:00
import_path = STORAGES [ storage_name ]
storage_class = lazy_external_import ( import_path , storage_name )
return storage_class
2025-01-16 12:58:15 +08:00
2025-01-09 11:55:49 +08:00
def insert (
2025-02-09 13:18:47 +01:00
self ,
2025-02-14 22:50:49 +01:00
input : str | list [ str ] ,
2025-02-09 11:29:05 +01:00
split_by_character : str | None = None ,
split_by_character_only : bool = False ,
2025-02-26 14:41:10 +08:00
ids : str | list [ str ] | None = None ,
2025-03-17 23:32:35 +08:00
file_paths : str | list [ str ] | None = None ,
2025-02-18 21:16:52 +01:00
) - > None :
2025-02-09 11:29:05 +01:00
""" Sync Insert documents with checkpoint support
Args :
2025-02-14 22:50:49 +01:00
input : Single document string or list of document strings
2025-02-09 11:29:05 +01:00
split_by_character : if split_by_character is not None , split the string by character , if chunk longer than
2025-03-14 10:59:24 +08:00
chunk_token_size , it will be split again by token size .
2025-02-09 11:29:05 +01:00
split_by_character_only : if split_by_character_only is True , split the string by character only , when
split_by_character is None , this parameter is ignored .
2025-02-26 14:41:10 +08:00
ids : single string of the document ID or list of unique document IDs , if not provided , MD5 hash IDs will be generated
2025-03-17 23:32:35 +08:00
file_paths : single string of the file path or list of file paths , used for citation
2025-02-09 13:18:47 +01:00
"""
2024-10-10 15:02:30 +08:00
loop = always_get_an_event_loop ( )
2025-02-18 21:16:52 +01:00
loop . run_until_complete (
2025-03-17 23:36:00 +08:00
self . ainsert (
input , split_by_character , split_by_character_only , ids , file_paths
)
2025-01-07 16:26:12 +08:00
)
2024-10-10 15:02:30 +08:00
2025-01-09 11:55:49 +08:00
async def ainsert (
2025-02-09 11:24:08 +01:00
self ,
2025-02-14 22:50:49 +01:00
input : str | list [ str ] ,
2025-02-09 11:24:08 +01:00
split_by_character : str | None = None ,
split_by_character_only : bool = False ,
2025-02-26 14:41:10 +08:00
ids : str | list [ str ] | None = None ,
2025-03-17 23:32:35 +08:00
file_paths : str | list [ str ] | None = None ,
2025-02-18 21:16:52 +01:00
) - > None :
2025-02-09 11:29:05 +01:00
""" Async Insert documents with checkpoint support
2024-12-28 00:11:25 +08:00
Args :
2025-02-14 22:50:49 +01:00
input : Single document string or list of document strings
2025-01-09 11:55:49 +08:00
split_by_character : if split_by_character is not None , split the string by character , if chunk longer than
2025-03-14 10:59:24 +08:00
chunk_token_size , it will be split again by token size .
2025-01-09 11:55:49 +08:00
split_by_character_only : if split_by_character_only is True , split the string by character only , when
split_by_character is None , this parameter is ignored .
2025-02-20 00:26:35 +01:00
ids : list of unique document IDs , if not provided , MD5 hash IDs will be generated
2025-03-17 23:32:35 +08:00
file_paths : list of file paths corresponding to each document , used for citation
2024-12-28 00:11:25 +08:00
"""
2025-03-17 23:32:35 +08:00
await self . apipeline_enqueue_documents ( input , ids , file_paths )
2025-02-09 15:24:52 +01:00
await self . apipeline_process_enqueue_documents (
split_by_character , split_by_character_only
)
2024-12-28 00:11:25 +08:00
2025-03-10 16:48:59 +08:00
# TODO: deprecated, use insert instead
2025-02-26 12:11:28 +01:00
def insert_custom_chunks (
self ,
full_text : str ,
text_chunks : list [ str ] ,
doc_id : str | list [ str ] | None = None ,
) - > None :
2025-01-07 20:57:39 +05:30
loop = always_get_an_event_loop ( )
2025-02-26 12:11:28 +01:00
loop . run_until_complete (
self . ainsert_custom_chunks ( full_text , text_chunks , doc_id )
)
2025-01-07 20:57:39 +05:30
2025-03-10 16:48:59 +08:00
# TODO: deprecated, use ainsert instead
2025-02-18 21:16:52 +01:00
async def ainsert_custom_chunks (
2025-02-26 14:41:10 +08:00
self , full_text : str , text_chunks : list [ str ] , doc_id : str | None = None
2025-02-18 21:16:52 +01:00
) - > None :
2025-01-07 20:57:39 +05:30
update_storage = False
try :
2025-02-21 13:18:26 +08:00
# Clean input texts
2025-03-11 15:43:04 +08:00
full_text = clean_text ( full_text )
text_chunks = [ clean_text ( chunk ) for chunk in text_chunks ]
2025-04-17 11:53:01 +08:00
file_path = " "
2025-02-21 13:18:26 +08:00
# Process cleaned texts
2025-02-26 14:41:10 +08:00
if doc_id is None :
doc_key = compute_mdhash_id ( full_text , prefix = " doc- " )
else :
doc_key = doc_id
2025-02-21 13:18:26 +08:00
new_docs = { doc_key : { " content " : full_text } }
2025-01-07 20:57:39 +05:30
2025-02-20 23:08:36 +01:00
_add_doc_keys = await self . full_docs . filter_keys ( { doc_key } )
2025-01-07 20:57:39 +05:30
new_docs = { k : v for k , v in new_docs . items ( ) if k in _add_doc_keys }
if not len ( new_docs ) :
logger . warning ( " This document is already in the storage. " )
return
update_storage = True
2025-02-19 22:07:25 +01:00
logger . info ( f " Inserting { len ( new_docs ) } docs " )
2025-01-07 20:57:39 +05:30
2025-02-09 19:56:12 +01:00
inserting_chunks : dict [ str , Any ] = { }
2025-04-17 11:53:01 +08:00
for index , chunk_text in enumerate ( text_chunks ) :
2025-02-21 13:18:26 +08:00
chunk_key = compute_mdhash_id ( chunk_text , prefix = " chunk- " )
2025-04-18 16:32:09 +02:00
tokens = len ( self . tokenizer . encode ( chunk_text ) )
2025-01-07 20:57:39 +05:30
inserting_chunks [ chunk_key ] = {
2025-02-21 13:18:26 +08:00
" content " : chunk_text ,
2025-01-07 20:57:39 +05:30
" full_doc_id " : doc_key ,
2025-04-17 11:53:01 +08:00
" tokens " : tokens ,
" chunk_order_index " : index ,
" file_path " : file_path ,
2025-01-07 20:57:39 +05:30
}
2025-02-09 19:56:12 +01:00
doc_ids = set ( inserting_chunks . keys ( ) )
add_chunk_keys = await self . text_chunks . filter_keys ( doc_ids )
2025-01-07 20:57:39 +05:30
inserting_chunks = {
2025-02-09 19:56:12 +01:00
k : v for k , v in inserting_chunks . items ( ) if k in add_chunk_keys
2025-01-07 20:57:39 +05:30
}
if not len ( inserting_chunks ) :
logger . warning ( " All chunks are already in the storage. " )
return
2025-02-09 21:42:04 +01:00
tasks = [
self . chunks_vdb . upsert ( inserting_chunks ) ,
self . _process_entity_relation_graph ( inserting_chunks ) ,
self . full_docs . upsert ( new_docs ) ,
self . text_chunks . upsert ( inserting_chunks ) ,
]
await asyncio . gather ( * tasks )
2025-01-07 20:57:39 +05:30
finally :
if update_storage :
await self . _insert_done ( )
2025-02-20 00:26:35 +01:00
async def apipeline_enqueue_documents (
2025-03-17 23:36:00 +08:00
self ,
input : str | list [ str ] ,
ids : list [ str ] | None = None ,
file_paths : str | list [ str ] | None = None ,
2025-02-20 00:26:35 +01:00
) - > None :
2025-02-09 14:39:32 +01:00
"""
Pipeline for Processing Documents
2025-02-09 15:24:52 +01:00
2025-02-20 00:26:35 +01:00
1. Validate ids if provided or generate MD5 hash IDs
2. Remove duplicate contents
3. Generate document initial status
4. Filter out already processed documents
5. Enqueue document in status
2025-03-17 23:36:00 +08:00
2025-03-17 23:32:35 +08:00
Args :
input : Single document string or list of document strings
ids : list of unique document IDs , if not provided , MD5 hash IDs will be generated
file_paths : list of file paths corresponding to each document , used for citation
2025-02-09 15:24:52 +01:00
"""
2025-02-14 22:50:49 +01:00
if isinstance ( input , str ) :
input = [ input ]
2025-02-26 14:41:10 +08:00
if isinstance ( ids , str ) :
ids = [ ids ]
2025-03-17 23:32:35 +08:00
if isinstance ( file_paths , str ) :
file_paths = [ file_paths ]
2025-03-17 23:36:00 +08:00
2025-03-17 23:32:35 +08:00
# If file_paths is provided, ensure it matches the number of documents
if file_paths is not None :
if isinstance ( file_paths , str ) :
file_paths = [ file_paths ]
if len ( file_paths ) != len ( input ) :
2025-03-17 23:36:00 +08:00
raise ValueError (
" Number of file paths must match the number of documents "
)
2025-03-17 23:32:35 +08:00
else :
# If no file paths provided, use placeholder
file_paths = [ " unknown_source " ] * len ( input )
2025-01-16 12:52:37 +08:00
2025-02-20 00:26:35 +01:00
# 1. Validate ids if provided or generate MD5 hash IDs
if ids is not None :
# Check if the number of IDs matches the number of documents
if len ( ids ) != len ( input ) :
raise ValueError ( " Number of IDs must match the number of documents " )
# Check if IDs are unique
if len ( ids ) != len ( set ( ids ) ) :
raise ValueError ( " IDs must be unique " )
# Generate contents dict of IDs provided by user and documents
2025-03-17 23:36:00 +08:00
contents = {
id_ : { " content " : doc , " file_path " : path }
for id_ , doc , path in zip ( ids , input , file_paths )
}
2025-02-20 00:26:35 +01:00
else :
2025-02-23 15:46:47 +08:00
# Clean input text and remove duplicates
2025-03-17 23:36:00 +08:00
cleaned_input = [
( clean_text ( doc ) , path ) for doc , path in zip ( input , file_paths )
]
2025-03-17 23:32:35 +08:00
unique_content_with_paths = { }
2025-03-17 23:36:00 +08:00
2025-03-17 23:32:35 +08:00
# Keep track of unique content and their paths
for content , path in cleaned_input :
if content not in unique_content_with_paths :
unique_content_with_paths [ content ] = path
2025-03-17 23:36:00 +08:00
2025-03-17 23:32:35 +08:00
# Generate contents dict of MD5 hash IDs and documents with paths
2025-03-17 23:36:00 +08:00
contents = {
compute_mdhash_id ( content , prefix = " doc- " ) : {
" content " : content ,
" file_path " : path ,
}
for content , path in unique_content_with_paths . items ( )
}
2025-02-20 00:26:35 +01:00
# 2. Remove duplicate contents
2025-03-17 23:32:35 +08:00
unique_contents = { }
for id_ , content_data in contents . items ( ) :
content = content_data [ " content " ]
file_path = content_data [ " file_path " ]
if content not in unique_contents :
unique_contents [ content ] = ( id_ , file_path )
2025-03-17 23:36:00 +08:00
2025-03-17 23:32:35 +08:00
# Reconstruct contents with unique content
2025-03-17 23:36:00 +08:00
contents = {
id_ : { " content " : content , " file_path " : file_path }
for content , ( id_ , file_path ) in unique_contents . items ( )
}
2025-01-16 12:52:37 +08:00
2025-02-20 00:26:35 +01:00
# 3. Generate document initial status
2025-02-09 11:10:46 +01:00
new_docs : dict [ str , Any ] = {
2025-02-20 00:26:35 +01:00
id_ : {
2025-02-17 18:26:07 +01:00
" status " : DocStatus . PENDING ,
2025-03-17 23:32:35 +08:00
" content " : content_data [ " content " ] ,
" content_summary " : get_content_summary ( content_data [ " content " ] ) ,
" content_length " : len ( content_data [ " content " ] ) ,
2025-05-01 02:16:19 +08:00
" created_at " : datetime . now ( timezone . utc ) . isoformat ( ) ,
" updated_at " : datetime . now ( timezone . utc ) . isoformat ( ) ,
2025-03-17 23:36:00 +08:00
" file_path " : content_data [
" file_path "
] , # Store file path in document status
2025-01-16 12:52:37 +08:00
}
2025-03-17 23:32:35 +08:00
for id_ , content_data in contents . items ( )
2025-01-16 12:52:37 +08:00
}
2025-02-20 00:26:35 +01:00
# 4. Filter out already processed documents
2025-02-09 14:55:52 +01:00
# Get docs ids
2025-02-09 19:24:41 +01:00
all_new_doc_ids = set ( new_docs . keys ( ) )
# Exclude IDs of documents that are already in progress
2025-02-09 21:17:09 +01:00
unique_new_doc_ids = await self . doc_status . filter_keys ( all_new_doc_ids )
2025-03-02 23:57:57 +08:00
# Log ignored document IDs
ignored_ids = [
doc_id for doc_id in unique_new_doc_ids if doc_id not in new_docs
]
if ignored_ids :
logger . warning (
f " Ignoring { len ( ignored_ids ) } document IDs not found in new_docs "
)
for doc_id in ignored_ids :
logger . warning ( f " Ignored document ID: { doc_id } " )
2025-02-09 19:24:41 +01:00
# Filter new_docs to only include documents with unique IDs
2025-03-02 23:57:57 +08:00
new_docs = {
doc_id : new_docs [ doc_id ]
for doc_id in unique_new_doc_ids
if doc_id in new_docs
}
2025-01-16 12:52:37 +08:00
if not new_docs :
2025-02-11 13:28:18 +08:00
logger . info ( " No new unique documents were found. " )
2025-02-09 11:10:46 +01:00
return
2025-01-16 12:52:37 +08:00
2025-02-20 00:26:35 +01:00
# 5. Store status document
2025-02-09 13:18:47 +01:00
await self . doc_status . upsert ( new_docs )
2025-01-16 12:52:37 +08:00
logger . info ( f " Stored { len ( new_docs ) } new unique documents " )
2025-01-16 12:58:15 +08:00
2025-02-09 14:32:48 +01:00
async def apipeline_process_enqueue_documents (
2025-02-09 11:24:08 +01:00
self ,
split_by_character : str | None = None ,
split_by_character_only : bool = False ,
) - > None :
2025-02-09 11:30:54 +01:00
"""
2025-02-09 14:32:48 +01:00
Process pending documents by splitting them into chunks , processing
2025-02-09 14:36:49 +01:00
each chunk for entity and relation extraction , and updating the
2025-02-09 14:32:48 +01:00
document status .
2025-02-09 14:36:49 +01:00
2025-02-11 13:28:18 +08:00
1. Get all pending , failed , and abnormally terminated processing documents .
2025-02-09 14:32:48 +01:00
2. Split document content into chunks
3. Process each chunk for entity and relation extraction
4. Update the document status
2025-02-09 14:36:49 +01:00
"""
2025-02-28 11:52:42 +08:00
# Get pipeline status shared data and lock
2025-03-01 02:22:35 +08:00
pipeline_status = await get_namespace_data ( " pipeline_status " )
2025-03-01 10:48:55 +08:00
pipeline_status_lock = get_pipeline_status_lock ( )
2025-02-28 21:35:04 +08:00
2025-02-28 11:52:42 +08:00
# Check if another process is already processing the queue
2025-03-01 10:48:55 +08:00
async with pipeline_status_lock :
2025-02-28 21:35:04 +08:00
# Ensure only one worker is processing documents
2025-02-28 11:52:42 +08:00
if not pipeline_status . get ( " busy " , False ) :
2025-03-02 11:09:32 +08:00
processing_docs , failed_docs , pending_docs = await asyncio . gather (
self . doc_status . get_docs_by_status ( DocStatus . PROCESSING ) ,
self . doc_status . get_docs_by_status ( DocStatus . FAILED ) ,
self . doc_status . get_docs_by_status ( DocStatus . PENDING ) ,
)
2025-02-09 14:36:49 +01:00
2025-03-02 11:09:32 +08:00
to_process_docs : dict [ str , DocProcessingStatus ] = { }
to_process_docs . update ( processing_docs )
to_process_docs . update ( failed_docs )
to_process_docs . update ( pending_docs )
if not to_process_docs :
logger . info ( " No documents to process " )
return
2025-02-28 21:35:04 +08:00
pipeline_status . update (
{
" busy " : True ,
2025-03-26 16:58:31 +08:00
" job_name " : " Default Job " ,
2025-05-01 02:16:19 +08:00
" job_start " : datetime . now ( timezone . utc ) . isoformat ( ) ,
2025-02-28 21:35:04 +08:00
" docs " : 0 ,
2025-04-24 14:15:10 +08:00
" batchs " : 0 , # Total number of files to be processed
" cur_batch " : 0 , # Number of files already processed
2025-02-28 21:35:04 +08:00
" request_pending " : False , # Clear any previous request
" latest_message " : " " ,
2025-02-19 23:53:25 +01:00
}
2025-02-28 21:35:04 +08:00
)
2025-03-02 04:43:41 +08:00
# Cleaning history_messages without breaking it as a shared list object
2025-03-02 11:09:32 +08:00
del pipeline_status [ " history_messages " ] [ : ]
2025-02-28 11:52:42 +08:00
else :
# Another process is busy, just set request flag and return
pipeline_status [ " request_pending " ] = True
2025-02-28 21:35:04 +08:00
logger . info (
" Another process is already processing the document queue. Request queued. "
)
2025-03-02 11:09:32 +08:00
return
2025-02-28 21:35:04 +08:00
2025-02-28 11:52:42 +08:00
try :
# Process documents until no more documents or requests
while True :
if not to_process_docs :
2025-02-28 13:53:40 +08:00
log_message = " All documents have been processed or are duplicates "
logger . info ( log_message )
pipeline_status [ " latest_message " ] = log_message
pipeline_status [ " history_messages " ] . append ( log_message )
2025-02-28 11:52:42 +08:00
break
2025-04-24 13:45:44 +08:00
log_message = f " Processing { len ( to_process_docs ) } document(s) "
2025-02-28 13:53:40 +08:00
logger . info ( log_message )
2025-03-02 11:09:32 +08:00
2025-04-24 14:15:10 +08:00
# Update pipeline_status, batchs now represents the total number of files to be processed
2025-03-26 16:58:31 +08:00
pipeline_status [ " docs " ] = len ( to_process_docs )
2025-04-24 13:45:44 +08:00
pipeline_status [ " batchs " ] = len ( to_process_docs )
2025-04-24 14:15:10 +08:00
pipeline_status [ " cur_batch " ] = 0
2025-02-28 13:53:40 +08:00
pipeline_status [ " latest_message " ] = log_message
pipeline_status [ " history_messages " ] . append ( log_message )
2025-02-28 11:52:42 +08:00
2025-03-26 16:58:31 +08:00
# Get first document's file path and total count for job name
first_doc_id , first_doc = next ( iter ( to_process_docs . items ( ) ) )
first_doc_path = first_doc . file_path
2025-07-08 19:35:22 -03:00
# Handle cases where first_doc_path is None
if first_doc_path :
path_prefix = first_doc_path [ : 20 ] + (
" ... " if len ( first_doc_path ) > 20 else " "
)
else :
path_prefix = " unknown_source "
2025-03-26 16:58:31 +08:00
total_files = len ( to_process_docs )
job_name = f " { path_prefix } [ { total_files } files] "
pipeline_status [ " job_name " ] = job_name
2025-04-24 14:15:10 +08:00
# Create a counter to track the number of processed files
2025-04-24 13:45:44 +08:00
processed_count = 0
2025-04-24 14:15:10 +08:00
# Create a semaphore to limit the number of concurrent file processing
2025-04-24 13:45:44 +08:00
semaphore = asyncio . Semaphore ( self . max_parallel_insert )
2025-03-17 04:00:38 +08:00
async def process_document (
2025-03-17 04:11:25 +08:00
doc_id : str ,
2025-03-17 04:00:38 +08:00
status_doc : DocProcessingStatus ,
split_by_character : str | None ,
split_by_character_only : bool ,
pipeline_status : dict ,
2025-03-17 04:11:25 +08:00
pipeline_status_lock : asyncio . Lock ,
2025-04-24 13:45:44 +08:00
semaphore : asyncio . Semaphore ,
2025-03-17 04:00:38 +08:00
) - > None :
""" Process single document """
2025-04-28 01:14:00 +08:00
file_extraction_stage_ok = False
2025-04-24 13:45:44 +08:00
async with semaphore :
nonlocal processed_count
current_file_number = 0
try :
# Get file path from status document
2025-04-24 14:15:10 +08:00
file_path = getattr (
status_doc , " file_path " , " unknown_source "
)
2025-04-24 13:45:44 +08:00
async with pipeline_status_lock :
2025-04-24 14:15:10 +08:00
# Update processed file count and save current file number
2025-04-24 13:45:44 +08:00
processed_count + = 1
2025-04-24 14:15:10 +08:00
current_file_number = (
processed_count # Save the current file number
)
2025-04-24 13:45:44 +08:00
pipeline_status [ " cur_batch " ] = processed_count
2025-04-24 14:15:10 +08:00
2025-04-28 02:39:18 +08:00
log_message = f " Extracting stage { current_file_number } / { total_files } : { file_path } "
2025-04-24 13:45:44 +08:00
logger . info ( log_message )
pipeline_status [ " history_messages " ] . append ( log_message )
log_message = f " Processing d-id: { doc_id } "
logger . info ( log_message )
pipeline_status [ " latest_message " ] = log_message
pipeline_status [ " history_messages " ] . append ( log_message )
# Generate chunks from document
chunks : dict [ str , Any ] = {
compute_mdhash_id ( dp [ " content " ] , prefix = " chunk- " ) : {
* * dp ,
" full_doc_id " : doc_id ,
" file_path " : file_path , # Add file path to each chunk
2025-07-03 04:18:25 +08:00
" llm_cache_list " : [ ] , # Initialize empty LLM cache list for each chunk
2025-04-24 13:45:44 +08:00
}
for dp in self . chunking_func (
self . tokenizer ,
status_doc . content ,
split_by_character ,
split_by_character_only ,
self . chunk_overlap_token_size ,
self . chunk_token_size ,
)
2025-02-19 23:53:25 +01:00
}
2025-03-17 23:36:00 +08:00
2025-07-05 14:28:42 +08:00
if not chunks :
2025-07-05 23:16:39 +08:00
logger . warning ( " No document chunks to process " )
2025-07-05 14:28:42 +08:00
2025-07-03 04:18:25 +08:00
# Process document in two stages
# Stage 1: Process text chunks and docs (parallel execution)
2025-04-24 13:45:44 +08:00
doc_status_task = asyncio . create_task (
self . doc_status . upsert (
{
doc_id : {
" status " : DocStatus . PROCESSING ,
" chunks_count " : len ( chunks ) ,
2025-07-03 04:18:25 +08:00
" chunks_list " : list (
chunks . keys ( )
) , # Save chunks list
2025-04-24 13:45:44 +08:00
" content " : status_doc . content ,
" content_summary " : status_doc . content_summary ,
" content_length " : status_doc . content_length ,
" created_at " : status_doc . created_at ,
2025-05-01 10:04:31 +08:00
" updated_at " : datetime . now (
timezone . utc
) . isoformat ( ) ,
2025-04-24 13:45:44 +08:00
" file_path " : file_path ,
}
}
)
)
chunks_vdb_task = asyncio . create_task (
self . chunks_vdb . upsert ( chunks )
)
full_docs_task = asyncio . create_task (
self . full_docs . upsert (
{ doc_id : { " content " : status_doc . content } }
)
)
text_chunks_task = asyncio . create_task (
self . text_chunks . upsert ( chunks )
)
2025-07-03 04:18:25 +08:00
# First stage tasks (parallel execution)
first_stage_tasks = [
2025-04-24 13:45:44 +08:00
doc_status_task ,
chunks_vdb_task ,
full_docs_task ,
text_chunks_task ,
]
2025-07-03 04:18:25 +08:00
entity_relation_task = None
# Execute first stage tasks
await asyncio . gather ( * first_stage_tasks )
# Stage 2: Process entity relation graph (after text_chunks are saved)
entity_relation_task = asyncio . create_task (
self . _process_entity_relation_graph (
chunks , pipeline_status , pipeline_status_lock
)
)
await entity_relation_task
2025-04-28 01:14:00 +08:00
file_extraction_stage_ok = True
except Exception as e :
# Log error and update pipeline status
2025-05-04 22:20:44 +08:00
logger . error ( traceback . format_exc ( ) )
2025-06-09 08:28:14 -05:00
error_msg = f " Failed to extract document { current_file_number } / { total_files } : { file_path } "
2025-04-28 01:14:00 +08:00
logger . error ( error_msg )
async with pipeline_status_lock :
pipeline_status [ " latest_message " ] = error_msg
2025-05-04 22:20:44 +08:00
pipeline_status [ " history_messages " ] . append (
traceback . format_exc ( )
)
2025-04-28 01:14:00 +08:00
pipeline_status [ " history_messages " ] . append ( error_msg )
2025-07-03 04:18:25 +08:00
# Cancel tasks that are not yet completed
all_tasks = first_stage_tasks + (
[ entity_relation_task ]
if entity_relation_task
else [ ]
)
for task in all_tasks :
if task and not task . done ( ) :
2025-04-28 01:14:00 +08:00
task . cancel ( )
2025-05-03 23:00:09 +08:00
# Persistent llm cache
if self . llm_response_cache :
2025-05-13 21:35:04 +08:00
await self . llm_response_cache . index_done_callback ( )
2025-05-03 23:00:09 +08:00
2025-04-28 01:14:00 +08:00
# Update document status to failed
await self . doc_status . upsert (
{
doc_id : {
" status " : DocStatus . FAILED ,
" error " : str ( e ) ,
" content " : status_doc . content ,
" content_summary " : status_doc . content_summary ,
" content_length " : status_doc . content_length ,
" created_at " : status_doc . created_at ,
2025-05-01 10:04:31 +08:00
" updated_at " : datetime . now (
timezone . utc
) . isoformat ( ) ,
2025-04-28 01:14:00 +08:00
" file_path " : file_path ,
}
}
)
2025-07-12 03:32:08 +08:00
# Concurrency is controlled by graph db lock for individual entities and relationships
if file_extraction_stage_ok :
try :
# Get chunk_results from entity_relation_task
chunk_results = await entity_relation_task
await merge_nodes_and_edges (
chunk_results = chunk_results , # result collected from entity_relation_task
knowledge_graph_inst = self . chunk_entity_relation_graph ,
entity_vdb = self . entities_vdb ,
relationships_vdb = self . relationships_vdb ,
global_config = asdict ( self ) ,
pipeline_status = pipeline_status ,
pipeline_status_lock = pipeline_status_lock ,
llm_response_cache = self . llm_response_cache ,
current_file_number = current_file_number ,
total_files = total_files ,
file_path = file_path ,
)
2025-04-28 01:14:00 +08:00
2025-07-12 03:32:08 +08:00
await self . doc_status . upsert (
{
doc_id : {
" status " : DocStatus . PROCESSED ,
" chunks_count " : len ( chunks ) ,
" chunks_list " : list (
chunks . keys ( )
) , # 保留 chunks_list
" content " : status_doc . content ,
" content_summary " : status_doc . content_summary ,
" content_length " : status_doc . content_length ,
" created_at " : status_doc . created_at ,
" updated_at " : datetime . now (
timezone . utc
) . isoformat ( ) ,
" file_path " : file_path ,
}
2025-03-17 04:00:38 +08:00
}
2025-05-08 11:35:10 +10:00
)
2025-07-09 09:24:44 +08:00
2025-07-12 03:32:08 +08:00
# Call _insert_done after processing each file
await self . _insert_done ( )
async with pipeline_status_lock :
log_message = f " Completed processing file { current_file_number } / { total_files } : { file_path } "
logger . info ( log_message )
pipeline_status [ " latest_message " ] = log_message
pipeline_status [ " history_messages " ] . append (
log_message
)
except Exception as e :
# Log error and update pipeline status
logger . error ( traceback . format_exc ( ) )
error_msg = f " Merging stage failed in document { current_file_number } / { total_files } : { file_path } "
logger . error ( error_msg )
async with pipeline_status_lock :
pipeline_status [ " latest_message " ] = error_msg
pipeline_status [ " history_messages " ] . append (
traceback . format_exc ( )
)
pipeline_status [ " history_messages " ] . append (
error_msg
)
# Persistent llm cache
if self . llm_response_cache :
await self . llm_response_cache . index_done_callback ( )
# Update document status to failed
await self . doc_status . upsert (
{
doc_id : {
" status " : DocStatus . FAILED ,
" error " : str ( e ) ,
" content " : status_doc . content ,
" content_summary " : status_doc . content_summary ,
" content_length " : status_doc . content_length ,
" created_at " : status_doc . created_at ,
" updated_at " : datetime . now ( ) . isoformat ( ) ,
" file_path " : file_path ,
}
2025-07-09 09:24:44 +08:00
}
2025-07-12 03:32:08 +08:00
)
2025-03-17 04:00:38 +08:00
2025-04-24 14:15:10 +08:00
# Create processing tasks for all documents
2025-04-24 13:45:44 +08:00
doc_tasks = [ ]
for doc_id , status_doc in to_process_docs . items ( ) :
doc_tasks . append (
process_document (
doc_id ,
status_doc ,
split_by_character ,
split_by_character_only ,
pipeline_status ,
pipeline_status_lock ,
semaphore ,
2025-02-19 23:53:25 +01:00
)
2025-03-17 04:11:25 +08:00
)
2025-04-24 14:15:10 +08:00
# Wait for all document processing to complete
2025-04-24 13:45:44 +08:00
await asyncio . gather ( * doc_tasks )
2025-02-28 21:35:04 +08:00
2025-02-28 11:52:42 +08:00
# Check if there's a pending request to process more documents (with lock)
has_pending_request = False
2025-03-01 10:48:55 +08:00
async with pipeline_status_lock :
2025-02-28 11:52:42 +08:00
has_pending_request = pipeline_status . get ( " request_pending " , False )
if has_pending_request :
# Clear the request flag before checking for more documents
pipeline_status [ " request_pending " ] = False
2025-02-28 21:35:04 +08:00
2025-02-28 11:52:42 +08:00
if not has_pending_request :
break
2025-02-28 21:35:04 +08:00
2025-02-28 13:53:40 +08:00
log_message = " Processing additional documents due to pending request "
logger . info ( log_message )
pipeline_status [ " latest_message " ] = log_message
pipeline_status [ " history_messages " ] . append ( log_message )
2025-02-28 21:35:04 +08:00
2025-03-04 15:30:52 +08:00
# Check for pending documents again
2025-03-02 11:09:32 +08:00
processing_docs , failed_docs , pending_docs = await asyncio . gather (
self . doc_status . get_docs_by_status ( DocStatus . PROCESSING ) ,
self . doc_status . get_docs_by_status ( DocStatus . FAILED ) ,
self . doc_status . get_docs_by_status ( DocStatus . PENDING ) ,
)
2025-02-19 23:53:25 +01:00
2025-03-02 11:09:32 +08:00
to_process_docs = { }
to_process_docs . update ( processing_docs )
to_process_docs . update ( failed_docs )
to_process_docs . update ( pending_docs )
2025-02-19 23:53:25 +01:00
2025-02-28 11:52:42 +08:00
finally :
2025-02-28 13:53:40 +08:00
log_message = " Document processing pipeline completed "
logger . info ( log_message )
2025-03-01 02:22:35 +08:00
# Always reset busy status when done or if an exception occurs (with lock)
2025-03-01 10:48:55 +08:00
async with pipeline_status_lock :
2025-03-01 02:22:35 +08:00
pipeline_status [ " busy " ] = False
pipeline_status [ " latest_message " ] = log_message
pipeline_status [ " history_messages " ] . append ( log_message )
2025-01-16 12:52:37 +08:00
2025-03-10 17:30:40 +08:00
async def _process_entity_relation_graph (
self , chunk : dict [ str , Any ] , pipeline_status = None , pipeline_status_lock = None
2025-04-28 01:14:00 +08:00
) - > list :
2025-02-09 13:18:47 +01:00
try :
2025-04-28 01:14:00 +08:00
chunk_results = await extract_entities (
2025-02-09 13:18:47 +01:00
chunk ,
global_config = asdict ( self ) ,
2025-03-10 16:48:59 +08:00
pipeline_status = pipeline_status ,
pipeline_status_lock = pipeline_status_lock ,
llm_response_cache = self . llm_response_cache ,
2025-07-03 04:18:25 +08:00
text_chunks_storage = self . text_chunks ,
2025-02-09 13:18:47 +01:00
)
2025-04-28 01:14:00 +08:00
return chunk_results
2025-02-09 13:18:47 +01:00
except Exception as e :
2025-04-24 15:46:25 +08:00
error_msg = f " Failed to extract entities and relationships: { str ( e ) } "
logger . error ( error_msg )
async with pipeline_status_lock :
pipeline_status [ " latest_message " ] = error_msg
pipeline_status [ " history_messages " ] . append ( error_msg )
2025-02-09 13:18:47 +01:00
raise e
2025-03-10 17:30:40 +08:00
async def _insert_done (
self , pipeline_status = None , pipeline_status_lock = None
) - > None :
2025-02-14 23:31:27 +01:00
tasks = [
cast ( StorageNameSpace , storage_inst ) . index_done_callback ( )
for storage_inst in [ # type: ignore
self . full_docs ,
2025-06-09 18:52:34 +08:00
self . doc_status ,
2025-02-14 23:31:27 +01:00
self . text_chunks ,
self . llm_response_cache ,
self . entities_vdb ,
self . relationships_vdb ,
self . chunks_vdb ,
self . chunk_entity_relation_graph ,
]
if storage_inst is not None
]
2024-10-10 15:02:30 +08:00
await asyncio . gather ( * tasks )
2025-02-28 21:35:04 +08:00
2025-03-17 04:25:23 +08:00
log_message = " In memory DB persist to disk "
2025-02-28 13:53:40 +08:00
logger . info ( log_message )
2025-02-28 21:35:04 +08:00
2025-03-10 16:48:59 +08:00
if pipeline_status is not None and pipeline_status_lock is not None :
async with pipeline_status_lock :
pipeline_status [ " latest_message " ] = log_message
pipeline_status [ " history_messages " ] . append ( log_message )
2024-10-10 15:02:30 +08:00
2025-03-03 14:54:28 +08:00
def insert_custom_kg (
self , custom_kg : dict [ str , Any ] , full_doc_id : str = None
) - > None :
2024-11-25 18:06:19 +08:00
loop = always_get_an_event_loop ( )
2025-03-01 13:26:02 +01:00
loop . run_until_complete ( self . ainsert_custom_kg ( custom_kg , full_doc_id ) )
2024-11-25 18:06:19 +08:00
2025-03-03 14:54:28 +08:00
async def ainsert_custom_kg (
2025-03-17 23:36:00 +08:00
self ,
custom_kg : dict [ str , Any ] ,
full_doc_id : str = None ,
2025-03-03 14:54:28 +08:00
) - > None :
2024-11-25 18:06:19 +08:00
update_storage = False
try :
2024-12-04 19:44:04 +08:00
# Insert chunks into vector storage
2025-02-14 23:31:27 +01:00
all_chunks_data : dict [ str , dict [ str , str ] ] = { }
chunk_to_source_map : dict [ str , str ] = { }
2025-03-03 21:09:45 +08:00
for chunk_data in custom_kg . get ( " chunks " , [ ] ) :
2025-03-11 15:43:04 +08:00
chunk_content = clean_text ( chunk_data [ " content " ] )
2024-12-04 19:44:04 +08:00
source_id = chunk_data [ " source_id " ]
2025-05-27 16:07:04 +08:00
file_path = chunk_data . get ( " file_path " , " custom_kg " )
2025-04-18 16:14:31 +02:00
tokens = len ( self . tokenizer . encode ( chunk_content ) )
2025-02-19 10:28:25 +01:00
chunk_order_index = (
0
if " chunk_order_index " not in chunk_data . keys ( )
else chunk_data [ " chunk_order_index " ]
)
2025-02-17 15:25:50 +01:00
chunk_id = compute_mdhash_id ( chunk_content , prefix = " chunk- " )
2024-12-04 19:44:04 +08:00
2025-02-17 15:12:35 +01:00
chunk_entry = {
2025-02-17 15:25:50 +01:00
" content " : chunk_content ,
2025-02-17 15:12:35 +01:00
" source_id " : source_id ,
2025-02-19 07:15:30 +01:00
" tokens " : tokens ,
" chunk_order_index " : chunk_order_index ,
2025-03-03 14:54:28 +08:00
" full_doc_id " : full_doc_id
if full_doc_id is not None
else source_id ,
2025-05-27 16:07:04 +08:00
" file_path " : file_path ,
2025-02-17 15:25:50 +01:00
" status " : DocStatus . PROCESSED ,
2025-02-17 15:12:35 +01:00
}
2024-12-04 19:44:04 +08:00
all_chunks_data [ chunk_id ] = chunk_entry
chunk_to_source_map [ source_id ] = chunk_id
update_storage = True
2025-02-14 23:31:27 +01:00
if all_chunks_data :
2025-03-03 21:09:45 +08:00
await asyncio . gather (
self . chunks_vdb . upsert ( all_chunks_data ) ,
self . text_chunks . upsert ( all_chunks_data ) ,
)
2024-12-04 19:44:04 +08:00
2024-11-25 18:06:19 +08:00
# Insert entities into knowledge graph
2025-02-14 23:31:27 +01:00
all_entities_data : list [ dict [ str , str ] ] = [ ]
2024-11-25 18:06:19 +08:00
for entity_data in custom_kg . get ( " entities " , [ ] ) :
2025-03-02 14:23:06 +08:00
entity_name = entity_data [ " entity_name " ]
2024-11-25 18:06:19 +08:00
entity_type = entity_data . get ( " entity_type " , " UNKNOWN " )
description = entity_data . get ( " description " , " No description provided " )
2024-12-04 19:44:04 +08:00
source_chunk_id = entity_data . get ( " source_id " , " UNKNOWN " )
source_id = chunk_to_source_map . get ( source_chunk_id , " UNKNOWN " )
2025-05-27 16:07:04 +08:00
file_path = entity_data . get ( " file_path " , " custom_kg " )
2024-12-04 19:44:04 +08:00
# Log if source_id is UNKNOWN
if source_id == " UNKNOWN " :
logger . warning (
f " Entity ' { entity_name } ' has an UNKNOWN source_id. Please check the source mapping. "
)
2024-11-25 18:06:19 +08:00
# Prepare node data
2025-02-14 23:31:27 +01:00
node_data : dict [ str , str ] = {
2025-03-13 16:52:48 +08:00
" entity_id " : entity_name ,
2024-11-25 18:06:19 +08:00
" entity_type " : entity_type ,
" description " : description ,
" source_id " : source_id ,
2025-05-26 20:30:59 +03:00
" file_path " : file_path ,
2025-05-27 16:07:04 +08:00
" created_at " : int ( time . time ( ) ) ,
2024-11-25 18:06:19 +08:00
}
# Insert node data into the knowledge graph
await self . chunk_entity_relation_graph . upsert_node (
entity_name , node_data = node_data
)
node_data [ " entity_name " ] = entity_name
all_entities_data . append ( node_data )
update_storage = True
# Insert relationships into knowledge graph
2025-02-14 23:31:27 +01:00
all_relationships_data : list [ dict [ str , str ] ] = [ ]
2024-11-25 18:06:19 +08:00
for relationship_data in custom_kg . get ( " relationships " , [ ] ) :
2025-03-02 14:23:06 +08:00
src_id = relationship_data [ " src_id " ]
tgt_id = relationship_data [ " tgt_id " ]
2024-11-25 18:06:19 +08:00
description = relationship_data [ " description " ]
keywords = relationship_data [ " keywords " ]
weight = relationship_data . get ( " weight " , 1.0 )
2024-12-04 19:44:04 +08:00
source_chunk_id = relationship_data . get ( " source_id " , " UNKNOWN " )
source_id = chunk_to_source_map . get ( source_chunk_id , " UNKNOWN " )
2025-05-27 16:07:04 +08:00
file_path = relationship_data . get ( " file_path " , " custom_kg " )
2024-12-04 19:44:04 +08:00
# Log if source_id is UNKNOWN
if source_id == " UNKNOWN " :
logger . warning (
f " Relationship from ' { src_id } ' to ' { tgt_id } ' has an UNKNOWN source_id. Please check the source mapping. "
)
2024-11-25 18:06:19 +08:00
# Check if nodes exist in the knowledge graph
for need_insert_id in [ src_id , tgt_id ] :
if not (
2025-01-07 16:26:12 +08:00
await self . chunk_entity_relation_graph . has_node ( need_insert_id )
2024-11-25 18:06:19 +08:00
) :
await self . chunk_entity_relation_graph . upsert_node (
need_insert_id ,
node_data = {
2025-03-13 16:52:48 +08:00
" entity_id " : need_insert_id ,
2024-11-25 18:06:19 +08:00
" source_id " : source_id ,
" description " : " UNKNOWN " ,
" entity_type " : " UNKNOWN " ,
2025-05-27 16:07:04 +08:00
" file_path " : file_path ,
" created_at " : int ( time . time ( ) ) ,
2024-11-25 18:06:19 +08:00
} ,
)
# Insert edge into the knowledge graph
await self . chunk_entity_relation_graph . upsert_edge (
src_id ,
tgt_id ,
edge_data = {
" weight " : weight ,
" description " : description ,
" keywords " : keywords ,
" source_id " : source_id ,
2025-05-27 16:07:04 +08:00
" file_path " : file_path ,
" created_at " : int ( time . time ( ) ) ,
2024-11-25 18:06:19 +08:00
} ,
)
2025-05-27 16:07:04 +08:00
2025-02-14 23:31:27 +01:00
edge_data : dict [ str , str ] = {
2024-11-25 18:06:19 +08:00
" src_id " : src_id ,
" tgt_id " : tgt_id ,
" description " : description ,
" keywords " : keywords ,
2025-03-03 21:09:45 +08:00
" source_id " : source_id ,
" weight " : weight ,
2025-05-27 16:07:04 +08:00
" file_path " : file_path ,
" created_at " : int ( time . time ( ) ) ,
2024-11-25 18:06:19 +08:00
}
all_relationships_data . append ( edge_data )
update_storage = True
2025-03-03 21:09:45 +08:00
# Insert entities into vector storage with consistent format
2025-02-14 23:31:27 +01:00
data_for_vdb = {
2025-02-14 23:33:59 +01:00
compute_mdhash_id ( dp [ " entity_name " ] , prefix = " ent- " ) : {
2025-03-03 21:09:45 +08:00
" content " : dp [ " entity_name " ] + " \n " + dp [ " description " ] ,
2025-02-14 23:33:59 +01:00
" entity_name " : dp [ " entity_name " ] ,
2025-03-03 21:09:45 +08:00
" source_id " : dp [ " source_id " ] ,
" description " : dp [ " description " ] ,
" entity_type " : dp [ " entity_type " ] ,
2025-05-27 16:07:04 +08:00
" file_path " : dp . get ( " file_path " , " custom_kg " ) ,
2024-11-25 18:06:19 +08:00
}
2025-02-14 23:33:59 +01:00
for dp in all_entities_data
}
2025-02-14 23:31:27 +01:00
await self . entities_vdb . upsert ( data_for_vdb )
2024-11-25 18:06:19 +08:00
2025-03-03 21:09:45 +08:00
# Insert relationships into vector storage with consistent format
2025-02-14 23:31:27 +01:00
data_for_vdb = {
2025-02-14 23:33:59 +01:00
compute_mdhash_id ( dp [ " src_id " ] + dp [ " tgt_id " ] , prefix = " rel- " ) : {
" src_id " : dp [ " src_id " ] ,
" tgt_id " : dp [ " tgt_id " ] ,
2025-03-03 21:09:45 +08:00
" source_id " : dp [ " source_id " ] ,
" content " : f " { dp [ ' keywords ' ] } \t { dp [ ' src_id ' ] } \n { dp [ ' tgt_id ' ] } \n { dp [ ' description ' ] } " ,
" keywords " : dp [ " keywords " ] ,
" description " : dp [ " description " ] ,
" weight " : dp [ " weight " ] ,
2025-05-27 16:07:04 +08:00
" file_path " : dp . get ( " file_path " , " custom_kg " ) ,
2024-11-25 18:06:19 +08:00
}
2025-02-14 23:33:59 +01:00
for dp in all_relationships_data
}
2025-02-14 23:31:27 +01:00
await self . relationships_vdb . upsert ( data_for_vdb )
2025-02-14 23:33:59 +01:00
2025-03-03 21:09:45 +08:00
except Exception as e :
logger . error ( f " Error in ainsert_custom_kg: { e } " )
raise
2024-11-25 18:06:19 +08:00
finally :
if update_storage :
await self . _insert_done ( )
2025-02-14 23:31:27 +01:00
def query (
2025-02-17 16:45:00 +05:30
self ,
query : str ,
param : QueryParam = QueryParam ( ) ,
system_prompt : str | None = None ,
2025-02-14 23:42:52 +01:00
) - > str | Iterator [ str ] :
2025-02-14 23:31:27 +01:00
"""
Perform a sync query .
Args :
query ( str ) : The query to be executed .
param ( QueryParam ) : Configuration parameters for query execution .
prompt ( Optional [ str ] ) : Custom prompts for fine - tuned control over the system ' s behavior. Defaults to None, which uses PROMPTS[ " rag_response " ].
Returns :
str : The result of the query execution .
2025-02-14 23:33:59 +01:00
"""
2024-10-10 15:02:30 +08:00
loop = always_get_an_event_loop ( )
2025-02-14 23:52:05 +01:00
2025-02-17 16:45:00 +05:30
return loop . run_until_complete ( self . aquery ( query , param , system_prompt ) ) # type: ignore
2024-10-19 09:43:17 +05:30
2025-01-27 10:32:22 +05:30
async def aquery (
2025-02-14 23:31:27 +01:00
self ,
query : str ,
param : QueryParam = QueryParam ( ) ,
2025-02-17 16:45:00 +05:30
system_prompt : str | None = None ,
2025-02-14 23:42:52 +01:00
) - > str | AsyncIterator [ str ] :
2025-02-14 23:31:27 +01:00
"""
Perform a async query .
Args :
query ( str ) : The query to be executed .
param ( QueryParam ) : Configuration parameters for query execution .
2025-03-23 21:33:49 +05:30
If param . model_func is provided , it will be used instead of the global model .
2025-02-14 23:31:27 +01:00
prompt ( Optional [ str ] ) : Custom prompts for fine - tuned control over the system ' s behavior. Defaults to None, which uses PROMPTS[ " rag_response " ].
Returns :
str : The result of the query execution .
"""
2025-03-23 21:33:49 +05:30
# If a custom model is provided in param, temporarily update global config
global_config = asdict ( self )
2025-05-07 17:42:14 +08:00
# Save original query for vector search
param . original_query = query
2025-03-25 15:20:09 +05:30
2025-05-07 17:42:14 +08:00
if param . mode in [ " local " , " global " , " hybrid " , " mix " ] :
2024-11-25 13:29:55 +08:00
response = await kg_query (
2025-03-03 13:53:45 +05:30
query . strip ( ) ,
2024-10-10 15:02:30 +08:00
self . chunk_entity_relation_graph ,
self . entities_vdb ,
self . relationships_vdb ,
self . text_chunks ,
param ,
2025-03-23 21:33:49 +05:30
global_config ,
2025-05-07 17:42:14 +08:00
hashing_kv = self . llm_response_cache ,
2025-02-17 16:45:00 +05:30
system_prompt = system_prompt ,
2025-05-07 17:42:14 +08:00
chunks_vdb = self . chunks_vdb ,
2024-10-10 15:02:30 +08:00
)
elif param . mode == " naive " :
response = await naive_query (
2025-03-03 13:53:45 +05:30
query . strip ( ) ,
2024-10-10 15:02:30 +08:00
self . chunks_vdb ,
param ,
2025-03-23 21:33:49 +05:30
global_config ,
2025-05-07 17:42:14 +08:00
hashing_kv = self . llm_response_cache ,
2025-02-17 16:45:00 +05:30
system_prompt = system_prompt ,
2024-12-28 11:56:28 +08:00
)
2025-04-10 23:17:33 +08:00
elif param . mode == " bypass " :
# Bypass mode: directly use LLM without knowledge retrieval
use_llm_func = param . model_func or global_config [ " llm_model_func " ]
2025-04-28 19:36:21 +08:00
# Apply higher priority (8) to entity/relation summary tasks
use_llm_func = partial ( use_llm_func , _priority = 8 )
2025-04-11 11:12:01 +08:00
param . stream = True if param . stream is None else param . stream
2025-04-10 23:17:33 +08:00
response = await use_llm_func (
query . strip ( ) ,
system_prompt = system_prompt ,
history_messages = param . conversation_history ,
2025-04-11 11:12:01 +08:00
stream = param . stream ,
2025-04-10 23:17:33 +08:00
)
2024-10-10 15:02:30 +08:00
else :
raise ValueError ( f " Unknown mode { param . mode } " )
await self . _query_done ( )
2025-01-14 22:10:47 +05:30
return response
2025-05-08 09:36:57 +08:00
# TODO: Deprecated, use user_prompt in QueryParam instead
2025-01-14 22:10:47 +05:30
def query_with_separate_keyword_extraction (
2025-02-14 23:52:05 +01:00
self , query : str , prompt : str , param : QueryParam = QueryParam ( )
2025-01-14 22:10:47 +05:30
) :
"""
2025-03-11 15:43:04 +08:00
Query with separate keyword extraction step .
2025-03-11 15:44:01 +08:00
2025-03-11 15:43:04 +08:00
This method extracts keywords from the query first , then uses them for the query .
2025-03-11 15:44:01 +08:00
2025-03-11 15:43:04 +08:00
Args :
query : User query
prompt : Additional prompt for the query
param : Query parameters
2025-03-11 15:44:01 +08:00
2025-03-11 15:43:04 +08:00
Returns :
Query response
2025-01-14 22:10:47 +05:30
"""
loop = always_get_an_event_loop ( )
2025-01-14 22:23:14 +05:30
return loop . run_until_complete (
self . aquery_with_separate_keyword_extraction ( query , prompt , param )
)
2025-05-08 09:36:57 +08:00
# TODO: Deprecated, use user_prompt in QueryParam instead
2025-01-14 22:10:47 +05:30
async def aquery_with_separate_keyword_extraction (
2025-02-14 23:52:05 +01:00
self , query : str , prompt : str , param : QueryParam = QueryParam ( )
2025-02-15 00:01:21 +01:00
) - > str | AsyncIterator [ str ] :
2025-01-14 22:10:47 +05:30
"""
2025-03-11 15:43:04 +08:00
Async version of query_with_separate_keyword_extraction .
2025-03-11 15:44:01 +08:00
2025-03-11 15:43:04 +08:00
Args :
query : User query
prompt : Additional prompt for the query
param : Query parameters
2025-03-11 15:44:01 +08:00
2025-03-11 15:43:04 +08:00
Returns :
Query response or async iterator
2025-01-14 22:10:47 +05:30
"""
2025-03-11 15:43:04 +08:00
response = await query_with_keywords (
query = query ,
prompt = prompt ,
2025-01-14 22:10:47 +05:30
param = param ,
2025-03-11 15:43:04 +08:00
knowledge_graph_inst = self . chunk_entity_relation_graph ,
entities_vdb = self . entities_vdb ,
relationships_vdb = self . relationships_vdb ,
chunks_vdb = self . chunks_vdb ,
text_chunks_db = self . text_chunks ,
2025-01-14 22:10:47 +05:30
global_config = asdict ( self ) ,
2025-03-11 15:43:04 +08:00
hashing_kv = self . llm_response_cache ,
2025-01-14 22:10:47 +05:30
)
2025-03-11 15:44:01 +08:00
2025-01-14 22:10:47 +05:30
await self . _query_done ( )
2024-10-10 15:02:30 +08:00
return response
async def _query_done ( self ) :
2025-02-15 00:01:21 +01:00
await self . llm_response_cache . index_done_callback ( )
2024-11-11 17:48:40 +08:00
2025-04-14 03:06:23 +08:00
async def aclear_cache ( self , modes : list [ str ] | None = None ) - > None :
""" Clear cache data from the LLM response cache storage.
2025-03-04 15:53:20 +08:00
2025-03-04 15:50:53 +08:00
Args :
2025-04-14 03:06:23 +08:00
modes ( list [ str ] | None ) : Modes of cache to clear . Options : [ " default " , " naive " , " local " , " global " , " hybrid " , " mix " ] .
" default " represents extraction cache .
If None , clears all cache .
2025-03-04 15:50:53 +08:00
2025-04-14 03:06:23 +08:00
Example :
# Clear all cache
await rag . aclear_cache ( )
2025-03-04 15:53:20 +08:00
2025-04-14 03:06:23 +08:00
# Clear local mode cache
await rag . aclear_cache ( modes = [ " local " ] )
# Clear extraction cache
await rag . aclear_cache ( modes = [ " default " ] )
2025-03-04 15:50:53 +08:00
"""
2025-04-14 03:06:23 +08:00
if not self . llm_response_cache :
logger . warning ( " No cache storage configured " )
return
2025-03-04 15:53:20 +08:00
2025-04-14 03:06:23 +08:00
valid_modes = [ " default " , " naive " , " local " , " global " , " hybrid " , " mix " ]
2025-03-04 15:53:20 +08:00
2025-04-14 03:06:23 +08:00
# Validate input
if modes and not all ( mode in valid_modes for mode in modes ) :
raise ValueError ( f " Invalid mode. Valid modes are: { valid_modes } " )
2025-03-04 15:53:20 +08:00
2025-04-14 03:06:23 +08:00
try :
# Reset the cache storage for specified mode
if modes :
success = await self . llm_response_cache . drop_cache_by_modes ( modes )
if success :
logger . info ( f " Cleared cache for modes: { modes } " )
else :
logger . warning ( f " Failed to clear cache for modes: { modes } " )
else :
# Clear all modes
success = await self . llm_response_cache . drop_cache_by_modes ( valid_modes )
if success :
logger . info ( " Cleared all cache " )
else :
logger . warning ( " Failed to clear all cache " )
2025-03-04 15:53:20 +08:00
2025-04-14 03:06:23 +08:00
await self . llm_response_cache . index_done_callback ( )
2025-03-04 15:50:53 +08:00
2025-04-14 03:06:23 +08:00
except Exception as e :
logger . error ( f " Error while clearing cache: { e } " )
def clear_cache ( self , modes : list [ str ] | None = None ) - > None :
""" Synchronous version of aclear_cache. """
return always_get_an_event_loop ( ) . run_until_complete ( self . aclear_cache ( modes ) )
2024-12-28 00:11:25 +08:00
2025-02-17 01:03:05 +08:00
async def get_docs_by_status (
self , status : DocStatus
) - > dict [ str , DocProcessingStatus ] :
""" Get documents by status
Returns :
Dict with document id is keys and document status is values
"""
return await self . doc_status . get_docs_by_status ( status )
2025-04-21 13:27:16 +08:00
async def aget_docs_by_ids (
self , ids : str | list [ str ]
) - > dict [ str , DocProcessingStatus ] :
""" Retrieves the processing status for one or more documents by their IDs.
Args :
ids : A single document ID ( string ) or a list of document IDs ( list of strings ) .
Returns :
A dictionary where keys are the document IDs for which a status was found ,
and values are the corresponding DocProcessingStatus objects . IDs that
are not found in the storage will be omitted from the result dictionary .
"""
if isinstance ( ids , str ) :
# Ensure input is always a list of IDs for uniform processing
id_list = [ ids ]
elif (
ids is None
) : # Handle potential None input gracefully, although type hint suggests str/list
logger . warning (
" aget_docs_by_ids called with None input, returning empty dict. "
)
return { }
else :
# Assume input is already a list if not a string
id_list = ids
# Return early if the final list of IDs is empty
if not id_list :
logger . debug ( " aget_docs_by_ids called with an empty list of IDs. " )
return { }
# Create tasks to fetch document statuses concurrently using the doc_status storage
tasks = [ self . doc_status . get_by_id ( doc_id ) for doc_id in id_list ]
# Execute tasks concurrently and gather the results. Results maintain order.
# Type hint indicates results can be DocProcessingStatus or None if not found.
results_list : list [ Optional [ DocProcessingStatus ] ] = await asyncio . gather ( * tasks )
# Build the result dictionary, mapping found IDs to their statuses
found_statuses : dict [ str , DocProcessingStatus ] = { }
# Keep track of IDs for which no status was found (for logging purposes)
not_found_ids : list [ str ] = [ ]
# Iterate through the results, correlating them back to the original IDs
for i , status_obj in enumerate ( results_list ) :
doc_id = id_list [
i
] # Get the original ID corresponding to this result index
if status_obj :
# If a status object was returned (not None), add it to the result dict
found_statuses [ doc_id ] = status_obj
else :
# If status_obj is None, the document ID was not found in storage
not_found_ids . append ( doc_id )
# Log a warning if any of the requested document IDs were not found
if not_found_ids :
logger . warning (
f " Document statuses not found for the following IDs: { not_found_ids } "
)
# Return the dictionary containing statuses only for the found document IDs
return found_statuses
2025-06-23 17:59:27 +08:00
async def adelete_by_doc_id ( self , doc_id : str ) - > DeletionResult :
2025-06-23 21:08:51 +08:00
""" Delete a document and all its related data, including chunks, graph elements, and cached entries.
This method orchestrates a comprehensive deletion process for a given document ID .
It ensures that not only the document itself but also all its derived and associated
2025-06-25 15:46:58 +08:00
data across different storage layers are removed . If entities or relationships are partially affected , it triggers .
2024-12-31 17:15:57 +08:00
Args :
2025-06-23 21:08:51 +08:00
doc_id ( str ) : The unique identifier of the document to be deleted .
Returns :
DeletionResult : An object containing the outcome of the deletion process .
2025-06-23 21:38:47 +08:00
- ` status ` ( str ) : " success " , " not_found " , or " failure " .
- ` doc_id ` ( str ) : The ID of the document attempted to be deleted .
2025-06-23 21:08:51 +08:00
- ` message ` ( str ) : A summary of the operation ' s result.
2025-06-23 21:38:47 +08:00
- ` status_code ` ( int ) : HTTP status code ( e . g . , 200 , 404 , 500 ) .
2025-06-25 19:02:46 +08:00
- ` file_path ` ( str | None ) : The file path of the deleted document , if available .
2024-12-31 17:15:57 +08:00
"""
2025-06-25 14:58:41 +08:00
deletion_operations_started = False
original_exception = None
2025-06-25 14:59:45 +08:00
2025-06-25 15:46:58 +08:00
# Get pipeline status shared data and lock for status updates
pipeline_status = await get_namespace_data ( " pipeline_status " )
pipeline_status_lock = get_pipeline_status_lock ( )
2025-06-25 14:59:45 +08:00
2025-06-25 15:46:58 +08:00
async with pipeline_status_lock :
log_message = f " Starting deletion process for document { doc_id } "
logger . info ( log_message )
pipeline_status [ " latest_message " ] = log_message
pipeline_status [ " history_messages " ] . append ( log_message )
2024-12-31 17:15:57 +08:00
try :
# 1. Get the document status and related data
2025-06-25 14:58:41 +08:00
doc_status_data = await self . doc_status . get_by_id ( doc_id )
2025-06-26 01:00:54 +08:00
file_path = doc_status_data . get ( " file_path " ) if doc_status_data else None
2025-06-25 14:58:41 +08:00
if not doc_status_data :
2024-12-31 17:15:57 +08:00
logger . warning ( f " Document { doc_id } not found " )
2025-06-23 17:59:27 +08:00
return DeletionResult (
status = " not_found " ,
doc_id = doc_id ,
message = f " Document { doc_id } not found. " ,
status_code = 404 ,
2025-06-26 01:00:54 +08:00
file_path = " " ,
2025-06-23 17:59:27 +08:00
)
2024-12-31 17:32:04 +08:00
2025-07-03 04:18:25 +08:00
# 2. Get chunk IDs from document status
chunk_ids = set ( doc_status_data . get ( " chunks_list " , [ ] ) )
2025-02-27 23:34:57 +07:00
2025-07-03 04:18:25 +08:00
if not chunk_ids :
2025-03-04 13:22:33 +08:00
logger . warning ( f " No chunks found for document { doc_id } " )
2025-06-25 14:58:41 +08:00
# Mark that deletion operations have started
deletion_operations_started = True
try :
# Still need to delete the doc status and full doc
await self . full_docs . delete ( [ doc_id ] )
await self . doc_status . delete ( [ doc_id ] )
logger . info ( f " Deleted document { doc_id } with no associated chunks " )
except Exception as e :
2025-06-25 14:59:45 +08:00
logger . error (
f " Failed to delete document { doc_id } with no chunks: { e } "
2025-03-04 16:36:58 +08:00
)
2025-06-25 14:58:41 +08:00
raise Exception ( f " Failed to delete document entry: { e } " ) from e
2025-06-25 14:59:45 +08:00
2025-06-25 15:46:58 +08:00
async with pipeline_status_lock :
log_message = (
f " Document { doc_id } is deleted without associated chunks. "
2025-03-04 16:36:58 +08:00
)
2025-06-25 15:46:58 +08:00
logger . info ( log_message )
pipeline_status [ " latest_message " ] = log_message
pipeline_status [ " history_messages " ] . append ( log_message )
2024-12-31 17:15:57 +08:00
2025-06-23 17:59:27 +08:00
return DeletionResult (
status = " success " ,
doc_id = doc_id ,
2025-06-25 15:46:58 +08:00
message = log_message ,
2025-06-23 17:59:27 +08:00
status_code = 200 ,
2025-06-26 01:00:54 +08:00
file_path = file_path ,
2024-12-31 17:32:04 +08:00
)
2024-12-31 17:15:57 +08:00
2025-06-25 14:58:41 +08:00
# Mark that deletion operations have started
deletion_operations_started = True
2024-12-31 17:15:57 +08:00
2025-06-09 18:52:34 +08:00
# 4. Analyze entities and relationships that will be affected
2024-12-31 17:15:57 +08:00
entities_to_delete = set ( )
2025-06-09 18:52:34 +08:00
entities_to_rebuild = { } # entity_name -> remaining_chunk_ids
2024-12-31 17:15:57 +08:00
relationships_to_delete = set ( )
2025-06-09 18:52:34 +08:00
relationships_to_rebuild = { } # (src, tgt) -> remaining_chunk_ids
2024-12-31 17:15:57 +08:00
2025-06-23 09:57:56 +08:00
# Use graph database lock to ensure atomic merges and updates
graph_db_lock = get_graph_db_lock ( enable_logging = False )
async with graph_db_lock :
2025-06-25 14:58:41 +08:00
try :
# Get all affected nodes and edges in batch
2025-06-25 15:46:58 +08:00
# logger.info(
# f"Analyzing affected entities and relationships for {len(chunk_ids)} chunks"
# )
2025-06-25 14:58:41 +08:00
affected_nodes = (
await self . chunk_entity_relation_graph . get_nodes_by_chunk_ids (
list ( chunk_ids )
)
2025-03-04 16:36:58 +08:00
)
2025-06-25 15:46:58 +08:00
2025-06-25 14:58:41 +08:00
affected_edges = (
await self . chunk_entity_relation_graph . get_edges_by_chunk_ids (
list ( chunk_ids )
)
2025-03-04 16:36:58 +08:00
)
2024-12-31 17:15:57 +08:00
2025-06-25 14:58:41 +08:00
except Exception as e :
logger . error ( f " Failed to analyze affected graph elements: { e } " )
raise Exception ( f " Failed to analyze graph dependencies: { e } " ) from e
try :
# Process entities
for node_data in affected_nodes :
node_label = node_data . get ( " entity_id " )
if node_label and " source_id " in node_data :
sources = set ( node_data [ " source_id " ] . split ( GRAPH_FIELD_SEP ) )
remaining_sources = sources - chunk_ids
if not remaining_sources :
entities_to_delete . add ( node_label )
elif remaining_sources != sources :
entities_to_rebuild [ node_label ] = remaining_sources
2025-07-03 14:08:19 +08:00
async with pipeline_status_lock :
log_message = (
f " Found { len ( entities_to_rebuild ) } affected entities "
)
logger . info ( log_message )
pipeline_status [ " latest_message " ] = log_message
pipeline_status [ " history_messages " ] . append ( log_message )
2025-06-25 14:58:41 +08:00
# Process relationships
for edge_data in affected_edges :
src = edge_data . get ( " source " )
tgt = edge_data . get ( " target " )
if src and tgt and " source_id " in edge_data :
edge_tuple = tuple ( sorted ( ( src , tgt ) ) )
if (
edge_tuple in relationships_to_delete
or edge_tuple in relationships_to_rebuild
) :
continue
2024-12-31 17:15:57 +08:00
2025-06-25 14:58:41 +08:00
sources = set ( edge_data [ " source_id " ] . split ( GRAPH_FIELD_SEP ) )
remaining_sources = sources - chunk_ids
2024-12-31 17:15:57 +08:00
2025-06-25 14:58:41 +08:00
if not remaining_sources :
relationships_to_delete . add ( edge_tuple )
elif remaining_sources != sources :
relationships_to_rebuild [ edge_tuple ] = remaining_sources
2024-12-31 17:15:57 +08:00
2025-07-03 14:08:19 +08:00
async with pipeline_status_lock :
log_message = (
f " Found { len ( relationships_to_rebuild ) } affected relations "
)
logger . info ( log_message )
pipeline_status [ " latest_message " ] = log_message
pipeline_status [ " history_messages " ] . append ( log_message )
2025-06-25 14:58:41 +08:00
except Exception as e :
logger . error ( f " Failed to process graph analysis results: { e } " )
raise Exception ( f " Failed to process graph dependencies: { e } " ) from e
2025-06-23 09:57:56 +08:00
# 5. Delete chunks from storage
if chunk_ids :
2025-06-25 14:58:41 +08:00
try :
await self . chunks_vdb . delete ( chunk_ids )
await self . text_chunks . delete ( chunk_ids )
2025-06-25 15:46:58 +08:00
async with pipeline_status_lock :
log_message = f " Successfully deleted { len ( chunk_ids ) } chunks from storage "
logger . info ( log_message )
pipeline_status [ " latest_message " ] = log_message
pipeline_status [ " history_messages " ] . append ( log_message )
2025-06-25 14:58:41 +08:00
except Exception as e :
logger . error ( f " Failed to delete chunks: { e } " )
raise Exception ( f " Failed to delete document chunks: { e } " ) from e
2025-06-23 09:57:56 +08:00
# 6. Delete entities that have no remaining sources
if entities_to_delete :
2025-06-25 14:58:41 +08:00
try :
# Delete from vector database
entity_vdb_ids = [
compute_mdhash_id ( entity , prefix = " ent- " )
for entity in entities_to_delete
]
await self . entities_vdb . delete ( entity_vdb_ids )
# Delete from graph
await self . chunk_entity_relation_graph . remove_nodes (
list ( entities_to_delete )
)
2025-02-27 23:34:57 +07:00
2025-06-25 15:46:58 +08:00
async with pipeline_status_lock :
log_message = f " Successfully deleted { len ( entities_to_delete ) } entities "
logger . info ( log_message )
pipeline_status [ " latest_message " ] = log_message
pipeline_status [ " history_messages " ] . append ( log_message )
2025-06-25 14:58:41 +08:00
except Exception as e :
logger . error ( f " Failed to delete entities: { e } " )
raise Exception ( f " Failed to delete entities: { e } " ) from e
2025-06-23 09:57:56 +08:00
# 7. Delete relationships that have no remaining sources
if relationships_to_delete :
2025-06-25 14:58:41 +08:00
try :
# Delete from vector database
rel_ids_to_delete = [ ]
for src , tgt in relationships_to_delete :
rel_ids_to_delete . extend (
[
compute_mdhash_id ( src + tgt , prefix = " rel- " ) ,
compute_mdhash_id ( tgt + src , prefix = " rel- " ) ,
]
2025-02-27 23:34:57 +07:00
)
2025-06-25 14:58:41 +08:00
await self . relationships_vdb . delete ( rel_ids_to_delete )
2025-02-27 23:34:57 +07:00
2025-06-25 14:58:41 +08:00
# Delete from graph
await self . chunk_entity_relation_graph . remove_edges (
list ( relationships_to_delete )
)
2024-12-31 17:32:04 +08:00
2025-06-25 15:46:58 +08:00
async with pipeline_status_lock :
log_message = f " Successfully deleted { len ( relationships_to_delete ) } relations "
logger . info ( log_message )
pipeline_status [ " latest_message " ] = log_message
pipeline_status [ " history_messages " ] . append ( log_message )
2025-06-25 14:58:41 +08:00
except Exception as e :
logger . error ( f " Failed to delete relationships: { e } " )
raise Exception ( f " Failed to delete relationships: { e } " ) from e
2024-12-31 17:15:57 +08:00
2025-06-25 14:58:41 +08:00
# 8. Rebuild entities and relationships from remaining chunks
2025-06-23 09:57:56 +08:00
if entities_to_rebuild or relationships_to_rebuild :
2025-06-25 14:58:41 +08:00
try :
await _rebuild_knowledge_from_chunks (
entities_to_rebuild = entities_to_rebuild ,
relationships_to_rebuild = relationships_to_rebuild ,
knowledge_graph_inst = self . chunk_entity_relation_graph ,
entities_vdb = self . entities_vdb ,
relationships_vdb = self . relationships_vdb ,
2025-07-03 04:18:25 +08:00
text_chunks_storage = self . text_chunks ,
2025-06-25 14:58:41 +08:00
llm_response_cache = self . llm_response_cache ,
global_config = asdict ( self ) ,
2025-06-29 21:27:12 +08:00
pipeline_status = pipeline_status ,
pipeline_status_lock = pipeline_status_lock ,
2025-06-25 14:58:41 +08:00
)
2024-12-31 17:15:57 +08:00
2025-06-25 14:58:41 +08:00
except Exception as e :
logger . error ( f " Failed to rebuild knowledge from chunks: { e } " )
2025-06-25 14:59:45 +08:00
raise Exception (
f " Failed to rebuild knowledge graph: { e } "
) from e
2024-12-31 17:15:57 +08:00
2025-06-09 18:52:34 +08:00
# 9. Delete original document and status
2025-06-25 14:58:41 +08:00
try :
await self . full_docs . delete ( [ doc_id ] )
await self . doc_status . delete ( [ doc_id ] )
except Exception as e :
logger . error ( f " Failed to delete document and status: { e } " )
raise Exception ( f " Failed to delete document and status: { e } " ) from e
2025-06-23 17:59:27 +08:00
return DeletionResult (
status = " success " ,
doc_id = doc_id ,
2025-06-25 15:46:58 +08:00
message = log_message ,
2025-06-23 17:59:27 +08:00
status_code = 200 ,
2025-06-25 19:02:46 +08:00
file_path = file_path ,
2024-12-31 17:15:57 +08:00
)
except Exception as e :
2025-06-25 14:58:41 +08:00
original_exception = e
2025-06-23 17:59:27 +08:00
error_message = f " Error while deleting document { doc_id } : { e } "
logger . error ( error_message )
logger . error ( traceback . format_exc ( ) )
return DeletionResult (
2025-06-23 21:38:47 +08:00
status = " fail " ,
2025-06-23 17:59:27 +08:00
doc_id = doc_id ,
message = error_message ,
status_code = 500 ,
2025-06-26 01:00:54 +08:00
file_path = file_path ,
2025-06-23 17:59:27 +08:00
)
2025-06-25 14:59:45 +08:00
2025-06-25 14:58:41 +08:00
finally :
# ALWAYS ensure persistence if any deletion operations were started
if deletion_operations_started :
try :
await self . _insert_done ( )
except Exception as persistence_error :
persistence_error_msg = f " Failed to persist data after deletion attempt for { doc_id } : { persistence_error } "
logger . error ( persistence_error_msg )
logger . error ( traceback . format_exc ( ) )
2025-06-25 14:59:45 +08:00
2025-06-25 14:58:41 +08:00
# If there was no original exception, this persistence error becomes the main error
if original_exception is None :
return DeletionResult (
status = " fail " ,
doc_id = doc_id ,
message = f " Deletion completed but failed to persist changes: { persistence_error } " ,
status_code = 500 ,
2025-06-26 01:00:54 +08:00
file_path = file_path ,
2025-06-25 14:58:41 +08:00
)
# If there was an original exception, log the persistence error but don't override the original error
# The original error result was already returned in the except block
else :
2025-06-25 14:59:45 +08:00
logger . debug (
f " No deletion operations were started for document { doc_id } , skipping persistence "
)
2024-12-31 17:15:57 +08:00
2025-06-23 21:39:45 +08:00
async def adelete_by_entity ( self , entity_name : str ) - > DeletionResult :
2025-04-14 03:06:23 +08:00
""" Asynchronously delete an entity and all its relationships.
2024-12-31 17:32:04 +08:00
2025-04-14 03:06:23 +08:00
Args :
2025-06-23 21:39:45 +08:00
entity_name : Name of the entity to delete .
Returns :
DeletionResult : An object containing the outcome of the deletion process .
2025-04-14 03:06:23 +08:00
"""
from . utils_graph import adelete_by_entity
2025-04-14 12:08:56 +08:00
2025-04-14 03:06:23 +08:00
return await adelete_by_entity (
self . chunk_entity_relation_graph ,
self . entities_vdb ,
self . relationships_vdb ,
2025-04-14 12:08:56 +08:00
entity_name ,
2024-12-31 17:32:04 +08:00
)
2025-06-23 21:39:45 +08:00
def delete_by_entity ( self , entity_name : str ) - > DeletionResult :
""" Synchronously delete an entity and all its relationships.
Args :
entity_name : Name of the entity to delete .
Returns :
DeletionResult : An object containing the outcome of the deletion process .
"""
2025-04-14 03:06:23 +08:00
loop = always_get_an_event_loop ( )
return loop . run_until_complete ( self . adelete_by_entity ( entity_name ) )
2025-02-20 13:18:17 +01:00
2025-06-23 21:39:45 +08:00
async def adelete_by_relation (
self , source_entity : str , target_entity : str
) - > DeletionResult :
2025-04-14 03:06:23 +08:00
""" Asynchronously delete a relation between two entities.
2025-03-01 18:30:58 +08:00
Args :
2025-06-23 21:39:45 +08:00
source_entity : Name of the source entity .
target_entity : Name of the target entity .
Returns :
DeletionResult : An object containing the outcome of the deletion process .
2025-03-01 18:30:58 +08:00
"""
2025-04-14 03:06:23 +08:00
from . utils_graph import adelete_by_relation
2025-04-14 12:08:56 +08:00
2025-04-14 03:06:23 +08:00
return await adelete_by_relation (
self . chunk_entity_relation_graph ,
self . relationships_vdb ,
source_entity ,
2025-04-14 12:08:56 +08:00
target_entity ,
2025-04-14 03:06:23 +08:00
)
2025-03-01 18:30:58 +08:00
2025-06-23 21:39:45 +08:00
def delete_by_relation (
self , source_entity : str , target_entity : str
) - > DeletionResult :
""" Synchronously delete a relation between two entities.
Args :
source_entity : Name of the source entity .
target_entity : Name of the target entity .
Returns :
DeletionResult : An object containing the outcome of the deletion process .
"""
2025-04-14 03:06:23 +08:00
loop = always_get_an_event_loop ( )
2025-04-14 12:08:56 +08:00
return loop . run_until_complete (
self . adelete_by_relation ( source_entity , target_entity )
)
2025-03-01 18:30:58 +08:00
2025-04-14 03:06:23 +08:00
async def get_processing_status ( self ) - > dict [ str , int ] :
""" Get current document processing status counts
2025-03-01 18:30:58 +08:00
2025-04-14 03:06:23 +08:00
Returns :
Dict with counts for each status
"""
return await self . doc_status . get_status_counts ( )
2025-03-01 18:30:58 +08:00
2025-04-14 03:06:23 +08:00
async def get_entity_info (
self , entity_name : str , include_vector_data : bool = False
) - > dict [ str , str | None | dict [ str , str ] ] :
""" Get detailed information of an entity """
from . utils_graph import get_entity_info
2025-04-14 12:08:56 +08:00
2025-04-14 03:06:23 +08:00
return await get_entity_info (
self . chunk_entity_relation_graph ,
self . entities_vdb ,
entity_name ,
2025-04-14 12:08:56 +08:00
include_vector_data ,
2025-04-14 03:06:23 +08:00
)
2025-03-01 18:30:58 +08:00
2025-04-14 03:06:23 +08:00
async def get_relation_info (
self , src_entity : str , tgt_entity : str , include_vector_data : bool = False
) - > dict [ str , str | None | dict [ str , str ] ] :
""" Get detailed information of a relationship """
from . utils_graph import get_relation_info
2025-04-14 12:08:56 +08:00
2025-04-14 03:06:23 +08:00
return await get_relation_info (
self . chunk_entity_relation_graph ,
self . relationships_vdb ,
src_entity ,
tgt_entity ,
2025-04-14 12:08:56 +08:00
include_vector_data ,
2025-04-14 03:06:23 +08:00
)
2025-03-03 21:09:45 +08:00
async def aedit_entity (
self , entity_name : str , updated_data : dict [ str , str ] , allow_rename : bool = True
) - > dict [ str , Any ] :
""" Asynchronously edit entity information.
Updates entity information in the knowledge graph and re - embeds the entity in the vector database .
Args :
entity_name : Name of the entity to edit
updated_data : Dictionary containing updated attributes , e . g . { " description " : " new description " , " entity_type " : " new type " }
allow_rename : Whether to allow entity renaming , defaults to True
Returns :
Dictionary containing updated entity information
"""
2025-04-14 03:06:23 +08:00
from . utils_graph import aedit_entity
2025-04-14 12:08:56 +08:00
2025-04-14 03:06:23 +08:00
return await aedit_entity (
self . chunk_entity_relation_graph ,
self . entities_vdb ,
self . relationships_vdb ,
entity_name ,
updated_data ,
2025-04-14 12:08:56 +08:00
allow_rename ,
2025-04-14 03:06:23 +08:00
)
2025-03-03 21:09:45 +08:00
def edit_entity (
self , entity_name : str , updated_data : dict [ str , str ] , allow_rename : bool = True
) - > dict [ str , Any ] :
loop = always_get_an_event_loop ( )
return loop . run_until_complete (
self . aedit_entity ( entity_name , updated_data , allow_rename )
)
async def aedit_relation (
self , source_entity : str , target_entity : str , updated_data : dict [ str , Any ]
) - > dict [ str , Any ] :
""" Asynchronously edit relation information.
Updates relation ( edge ) information in the knowledge graph and re - embeds the relation in the vector database .
Args :
source_entity : Name of the source entity
target_entity : Name of the target entity
updated_data : Dictionary containing updated attributes , e . g . { " description " : " new description " , " keywords " : " new keywords " }
Returns :
Dictionary containing updated relation information
"""
2025-04-14 03:06:23 +08:00
from . utils_graph import aedit_relation
2025-04-14 12:08:56 +08:00
2025-04-14 03:06:23 +08:00
return await aedit_relation (
self . chunk_entity_relation_graph ,
self . entities_vdb ,
self . relationships_vdb ,
source_entity ,
target_entity ,
2025-04-14 12:08:56 +08:00
updated_data ,
2025-04-14 03:06:23 +08:00
)
2025-03-03 21:09:45 +08:00
def edit_relation (
self , source_entity : str , target_entity : str , updated_data : dict [ str , Any ]
) - > dict [ str , Any ] :
loop = always_get_an_event_loop ( )
return loop . run_until_complete (
self . aedit_relation ( source_entity , target_entity , updated_data )
)
async def acreate_entity (
self , entity_name : str , entity_data : dict [ str , Any ]
) - > dict [ str , Any ] :
""" Asynchronously create a new entity.
Creates a new entity in the knowledge graph and adds it to the vector database .
Args :
entity_name : Name of the new entity
entity_data : Dictionary containing entity attributes , e . g . { " description " : " description " , " entity_type " : " type " }
Returns :
Dictionary containing created entity information
"""
2025-04-14 03:06:23 +08:00
from . utils_graph import acreate_entity
2025-04-14 12:08:56 +08:00
2025-04-14 03:06:23 +08:00
return await acreate_entity (
self . chunk_entity_relation_graph ,
self . entities_vdb ,
self . relationships_vdb ,
entity_name ,
2025-04-14 12:08:56 +08:00
entity_data ,
2025-04-14 03:06:23 +08:00
)
2025-03-03 21:09:45 +08:00
def create_entity (
self , entity_name : str , entity_data : dict [ str , Any ]
) - > dict [ str , Any ] :
loop = always_get_an_event_loop ( )
return loop . run_until_complete ( self . acreate_entity ( entity_name , entity_data ) )
async def acreate_relation (
self , source_entity : str , target_entity : str , relation_data : dict [ str , Any ]
) - > dict [ str , Any ] :
""" Asynchronously create a new relation between entities.
Creates a new relation ( edge ) in the knowledge graph and adds it to the vector database .
Args :
source_entity : Name of the source entity
target_entity : Name of the target entity
relation_data : Dictionary containing relation attributes , e . g . { " description " : " description " , " keywords " : " keywords " }
Returns :
Dictionary containing created relation information
"""
2025-04-14 03:06:23 +08:00
from . utils_graph import acreate_relation
2025-04-14 12:08:56 +08:00
2025-04-14 03:06:23 +08:00
return await acreate_relation (
self . chunk_entity_relation_graph ,
self . entities_vdb ,
self . relationships_vdb ,
source_entity ,
target_entity ,
2025-04-14 12:08:56 +08:00
relation_data ,
2025-04-14 03:06:23 +08:00
)
2025-03-03 21:09:45 +08:00
def create_relation (
self , source_entity : str , target_entity : str , relation_data : dict [ str , Any ]
) - > dict [ str , Any ] :
loop = always_get_an_event_loop ( )
return loop . run_until_complete (
self . acreate_relation ( source_entity , target_entity , relation_data )
)
2025-03-06 00:53:23 +08:00
async def amerge_entities (
self ,
source_entities : list [ str ] ,
target_entity : str ,
merge_strategy : dict [ str , str ] = None ,
target_entity_data : dict [ str , Any ] = None ,
) - > dict [ str , Any ] :
""" Asynchronously merge multiple entities into one entity.
Merges multiple source entities into a target entity , handling all relationships ,
and updating both the knowledge graph and vector database .
Args :
source_entities : List of source entity names to merge
target_entity : Name of the target entity after merging
merge_strategy : Merge strategy configuration , e . g . { " description " : " concatenate " , " entity_type " : " keep_first " }
Supported strategies :
- " concatenate " : Concatenate all values ( for text fields )
- " keep_first " : Keep the first non - empty value
- " keep_last " : Keep the last non - empty value
- " join_unique " : Join all unique values ( for fields separated by delimiter )
target_entity_data : Dictionary of specific values to set for the target entity ,
overriding any merged values , e . g . { " description " : " custom description " , " entity_type " : " PERSON " }
Returns :
Dictionary containing the merged entity information
"""
2025-04-14 03:06:23 +08:00
from . utils_graph import amerge_entities
2025-04-14 12:08:56 +08:00
2025-04-14 03:06:23 +08:00
return await amerge_entities (
self . chunk_entity_relation_graph ,
self . entities_vdb ,
self . relationships_vdb ,
source_entities ,
target_entity ,
merge_strategy ,
2025-04-14 12:08:56 +08:00
target_entity_data ,
2025-04-14 03:06:23 +08:00
)
2025-03-06 00:53:23 +08:00
2025-04-14 03:06:23 +08:00
def merge_entities (
self ,
source_entities : list [ str ] ,
target_entity : str ,
merge_strategy : dict [ str , str ] = None ,
target_entity_data : dict [ str , Any ] = None ,
) - > dict [ str , Any ] :
loop = always_get_an_event_loop ( )
return loop . run_until_complete (
self . amerge_entities (
source_entities , target_entity , merge_strategy , target_entity_data
)
)
2025-03-06 00:53:23 +08:00
2025-03-12 00:04:23 +05:30
async def aexport_data (
self ,
output_path : str ,
file_format : Literal [ " csv " , " excel " , " md " , " txt " ] = " csv " ,
include_vector_data : bool = False ,
) - > None :
"""
Asynchronously exports all entities , relations , and relationships to various formats .
Args :
output_path : The path to the output file ( including extension ) .
file_format : Output format - " csv " , " excel " , " md " , " txt " .
- csv : Comma - separated values file
- excel : Microsoft Excel file with multiple sheets
- md : Markdown tables
- txt : Plain text formatted output
- table : Print formatted tables to console
include_vector_data : Whether to include data from the vector database .
"""
2025-04-14 03:06:23 +08:00
from . utils import aexport_data as utils_aexport_data
2025-04-14 12:08:56 +08:00
2025-04-14 03:06:23 +08:00
await utils_aexport_data (
self . chunk_entity_relation_graph ,
self . entities_vdb ,
self . relationships_vdb ,
output_path ,
file_format ,
2025-04-14 12:08:56 +08:00
include_vector_data ,
2025-04-14 03:06:23 +08:00
)
2025-03-12 00:04:23 +05:30
def export_data (
self ,
output_path : str ,
file_format : Literal [ " csv " , " excel " , " md " , " txt " ] = " csv " ,
include_vector_data : bool = False ,
) - > None :
"""
Synchronously exports all entities , relations , and relationships to various formats .
Args :
output_path : The path to the output file ( including extension ) .
file_format : Output format - " csv " , " excel " , " md " , " txt " .
- csv : Comma - separated values file
- excel : Microsoft Excel file with multiple sheets
- md : Markdown tables
- txt : Plain text formatted output
- table : Print formatted tables to console
include_vector_data : Whether to include data from the vector database .
"""
try :
loop = asyncio . get_event_loop ( )
except RuntimeError :
loop = asyncio . new_event_loop ( )
asyncio . set_event_loop ( loop )
loop . run_until_complete (
self . aexport_data ( output_path , file_format , include_vector_data )
)