2025-01-29 23:47:57 +08:00
import os
2024-11-08 14:58:41 +08:00
import asyncio
2024-11-12 13:32:40 +08:00
# import html
# import os
2024-11-08 14:58:41 +08:00
from dataclasses import dataclass
2025-02-08 23:20:37 +01:00
from typing import Any , Union
2024-11-08 14:58:41 +08:00
import numpy as np
import array
2025-01-27 09:36:34 +01:00
import pipmaster as pm
if not pm . is_installed ( " oracledb " ) :
pm . install ( " oracledb " )
2024-11-08 14:58:41 +08:00
from . . utils import logger
from . . base import (
BaseGraphStorage ,
BaseKVStorage ,
BaseVectorStorage ,
)
2025-02-08 16:05:59 +08:00
from . . namespace import NameSpace , is_namespace
2024-11-08 14:58:41 +08:00
import oracledb
2024-11-12 13:32:40 +08:00
2024-11-08 14:58:41 +08:00
class OracleDB :
2024-11-12 13:32:40 +08:00
def __init__ ( self , config , * * kwargs ) :
2024-11-08 14:58:41 +08:00
self . host = config . get ( " host " , None )
self . port = config . get ( " port " , None )
self . user = config . get ( " user " , None )
self . password = config . get ( " password " , None )
self . dsn = config . get ( " dsn " , None )
self . config_dir = config . get ( " config_dir " , None )
self . wallet_location = config . get ( " wallet_location " , None )
self . wallet_password = config . get ( " wallet_password " , None )
self . workspace = config . get ( " workspace " , None )
self . max = 12
self . increment = 1
logger . info ( f " Using the label { self . workspace } for Oracle Graph as identifier " )
if self . user is None or self . password is None :
raise ValueError ( " Missing database user or password in addon_params " )
2024-11-12 13:32:40 +08:00
2024-11-08 14:58:41 +08:00
try :
oracledb . defaults . fetch_lobs = False
self . pool = oracledb . create_pool_async (
2024-11-12 13:32:40 +08:00
user = self . user ,
password = self . password ,
dsn = self . dsn ,
config_dir = self . config_dir ,
wallet_location = self . wallet_location ,
wallet_password = self . wallet_password ,
min = 1 ,
max = self . max ,
increment = self . increment ,
)
2024-11-08 14:58:41 +08:00
logger . info ( f " Connected to Oracle database at { self . dsn } " )
except Exception as e :
logger . error ( f " Failed to connect to Oracle database at { self . dsn } " )
logger . error ( f " Oracle database error: { e } " )
raise
def numpy_converter_in ( self , value ) :
""" Convert numpy array to array.array """
if value . dtype == np . float64 :
dtype = " d "
elif value . dtype == np . float32 :
dtype = " f "
else :
dtype = " b "
return array . array ( dtype , value )
def input_type_handler ( self , cursor , value , arraysize ) :
""" Set the type handler for the input data """
if isinstance ( value , np . ndarray ) :
return cursor . var (
oracledb . DB_TYPE_VECTOR ,
arraysize = arraysize ,
inconverter = self . numpy_converter_in ,
)
def numpy_converter_out ( self , value ) :
""" Convert array.array to numpy array """
if value . typecode == " b " :
dtype = np . int8
elif value . typecode == " f " :
dtype = np . float32
else :
dtype = np . float64
return np . array ( value , copy = False , dtype = dtype )
def output_type_handler ( self , cursor , metadata ) :
""" Set the type handler for the output data """
if metadata . type_code is oracledb . DB_TYPE_VECTOR :
return cursor . var (
metadata . type_code ,
arraysize = cursor . arraysize ,
outconverter = self . numpy_converter_out ,
)
2024-11-12 13:32:40 +08:00
2024-11-08 14:58:41 +08:00
async def check_tables ( self ) :
2024-11-12 13:32:40 +08:00
for k , v in TABLES . items ( ) :
2024-11-08 14:58:41 +08:00
try :
if k . lower ( ) == " lightrag_graph " :
2024-11-12 13:32:40 +08:00
await self . query (
" SELECT id FROM GRAPH_TABLE (lightrag_graph MATCH (a) COLUMNS (a.id)) fetch first row only "
)
2024-11-08 14:58:41 +08:00
else :
await self . query ( " SELECT 1 FROM {k} " . format ( k = k ) )
except Exception as e :
logger . error ( f " Failed to check table { k } in Oracle database " )
logger . error ( f " Oracle database error: { e } " )
try :
# print(v["ddl"])
await self . execute ( v [ " ddl " ] )
logger . info ( f " Created table { k } in Oracle database " )
except Exception as e :
logger . error ( f " Failed to create table { k } in Oracle database " )
logger . error ( f " Oracle database error: { e } " )
2024-11-12 13:32:40 +08:00
logger . info ( " Finished check all tables in Oracle database " )
2024-11-25 14:21:01 +08:00
async def query (
self , sql : str , params : dict = None , multirows : bool = False
) - > Union [ dict , None ] :
2024-11-12 13:32:40 +08:00
async with self . pool . acquire ( ) as connection :
2024-11-08 14:58:41 +08:00
connection . inputtypehandler = self . input_type_handler
connection . outputtypehandler = self . output_type_handler
with connection . cursor ( ) as cursor :
try :
2024-11-15 12:57:01 +08:00
await cursor . execute ( sql , params )
2024-11-08 14:58:41 +08:00
except Exception as e :
logger . error ( f " Oracle database error: { e } " )
print ( sql )
2024-11-15 12:57:01 +08:00
print ( params )
2024-11-08 14:58:41 +08:00
raise
columns = [ column [ 0 ] . lower ( ) for column in cursor . description ]
if multirows :
rows = await cursor . fetchall ( )
if rows :
data = [ dict ( zip ( columns , row ) ) for row in rows ]
else :
data = [ ]
else :
row = await cursor . fetchone ( )
if row :
data = dict ( zip ( columns , row ) )
else :
data = None
2024-11-12 13:32:40 +08:00
return data
2024-11-08 14:58:41 +08:00
2024-12-06 11:06:20 +08:00
async def execute ( self , sql : str , data : Union [ list , dict ] = None ) :
2024-11-08 14:58:41 +08:00
# logger.info("go into OracleDB execute method")
try :
async with self . pool . acquire ( ) as connection :
connection . inputtypehandler = self . input_type_handler
connection . outputtypehandler = self . output_type_handler
with connection . cursor ( ) as cursor :
if data is None :
await cursor . execute ( sql )
else :
2024-11-12 13:32:40 +08:00
await cursor . execute ( sql , data )
2024-11-08 14:58:41 +08:00
await connection . commit ( )
except Exception as e :
2024-11-12 13:32:40 +08:00
logger . error ( f " Oracle database error: { e } " )
2024-11-08 14:58:41 +08:00
print ( sql )
print ( data )
raise
2024-11-12 13:32:40 +08:00
2024-11-08 14:58:41 +08:00
@dataclass
class OracleKVStorage ( BaseKVStorage ) :
# should pass db object to self.db
2025-01-10 11:36:28 +08:00
db : OracleDB = None
meta_fields = None
2024-11-08 14:58:41 +08:00
def __post_init__ ( self ) :
self . _data = { }
2025-01-16 12:58:15 +08:00
self . _max_batch_size = self . global_config . get ( " embedding_batch_num " , 10 )
2024-11-12 13:32:40 +08:00
2024-11-08 14:58:41 +08:00
################ QUERY METHODS ################
2025-02-08 23:18:12 +01:00
async def get_by_id ( self , id : str ) - > Union [ dict [ str , Any ] , None ] :
2025-01-10 11:36:28 +08:00
""" get doc_full data based on id. """
2024-11-25 14:15:10 +08:00
SQL = SQL_TEMPLATES [ " get_by_id_ " + self . namespace ]
2024-11-25 14:21:01 +08:00
params = { " workspace " : self . db . workspace , " id " : id }
2024-11-12 13:32:40 +08:00
# print("get_by_id:"+SQL)
2025-02-08 16:05:59 +08:00
if is_namespace ( self . namespace , NameSpace . KV_STORE_LLM_RESPONSE_CACHE ) :
2025-01-10 11:36:28 +08:00
array_res = await self . db . query ( SQL , params , multirows = True )
res = { }
for row in array_res :
res [ row [ " id " ] ] = row
else :
res = await self . db . query ( SQL , params )
2024-11-08 14:58:41 +08:00
if res :
2025-01-10 11:36:28 +08:00
return res
else :
return None
2025-01-16 12:58:15 +08:00
2025-01-10 11:36:28 +08:00
async def get_by_mode_and_id ( self , mode : str , id : str ) - > Union [ dict , None ] :
""" Specifically for llm_response_cache. """
SQL = SQL_TEMPLATES [ " get_by_mode_id_ " + self . namespace ]
params = { " workspace " : self . db . workspace , " cache_mode " : mode , " id " : id }
2025-02-08 16:05:59 +08:00
if is_namespace ( self . namespace , NameSpace . KV_STORE_LLM_RESPONSE_CACHE ) :
2025-01-10 11:36:28 +08:00
array_res = await self . db . query ( SQL , params , multirows = True )
res = { }
for row in array_res :
2025-01-16 12:58:15 +08:00
res [ row [ " id " ] ] = row
2025-01-10 11:36:28 +08:00
return res
2024-11-08 14:58:41 +08:00
else :
return None
2025-01-16 12:58:15 +08:00
2025-02-08 23:18:12 +01:00
async def get_by_ids ( self , ids : list [ str ] ) - > list [ Union [ dict [ str , Any ] , None ] ] :
2025-01-10 11:36:28 +08:00
""" get doc_chunks data based on id """
2024-11-25 14:21:01 +08:00
SQL = SQL_TEMPLATES [ " get_by_ids_ " + self . namespace ] . format (
ids = " , " . join ( [ f " ' { id } ' " for id in ids ] )
)
params = { " workspace " : self . db . workspace }
# print("get_by_ids:"+SQL)
res = await self . db . query ( SQL , params , multirows = True )
2025-02-08 16:05:59 +08:00
if is_namespace ( self . namespace , NameSpace . KV_STORE_LLM_RESPONSE_CACHE ) :
2025-01-10 11:36:28 +08:00
modes = set ( )
dict_res : dict [ str , dict ] = { }
for row in res :
modes . add ( row [ " mode " ] )
for mode in modes :
if mode not in dict_res :
dict_res [ mode ] = { }
for row in res :
dict_res [ row [ " mode " ] ] [ row [ " id " ] ] = row
2025-01-16 12:58:15 +08:00
res = [ { k : v } for k , v in dict_res . items ( ) ]
2024-11-08 14:58:41 +08:00
if res :
2024-11-12 13:32:40 +08:00
data = res # [{"data":i} for i in res]
# print(data)
2024-11-08 14:58:41 +08:00
return data
else :
return None
2024-11-12 13:32:40 +08:00
2025-01-16 12:58:15 +08:00
async def get_by_status_and_ids (
2025-02-08 23:18:12 +01:00
self , status : str
) - > Union [ list [ dict [ str , Any ] ] , None ] :
2025-01-16 12:52:37 +08:00
""" Specifically for llm_response_cache. """
2025-02-08 23:18:12 +01:00
SQL = SQL_TEMPLATES [ " get_by_status_ " + self . namespace ]
2025-01-16 12:52:37 +08:00
params = { " workspace " : self . db . workspace , " status " : status }
2025-02-08 23:20:37 +01:00
return await self . db . query ( SQL , params , multirows = True )
2025-01-16 12:58:15 +08:00
2024-11-08 14:58:41 +08:00
async def filter_keys ( self , keys : list [ str ] ) - > set [ str ] :
2025-01-16 12:52:37 +08:00
""" Return keys that don ' t exist in storage """
2024-11-25 14:21:01 +08:00
SQL = SQL_TEMPLATES [ " filter_keys " ] . format (
2025-02-08 16:06:07 +08:00
table_name = namespace_to_table_name ( self . namespace ) ,
ids = " , " . join ( [ f " ' { id } ' " for id in keys ] ) ,
2024-11-25 14:21:01 +08:00
)
params = { " workspace " : self . db . workspace }
res = await self . db . query ( SQL , params , multirows = True )
2024-11-08 14:58:41 +08:00
if res :
exist_keys = [ key [ " id " ] for key in res ]
data = set ( [ s for s in keys if s not in exist_keys ] )
2025-01-16 12:52:37 +08:00
return data
2024-11-08 14:58:41 +08:00
else :
2025-01-16 12:52:37 +08:00
return set ( keys )
2024-11-12 13:32:40 +08:00
2024-11-08 14:58:41 +08:00
################ INSERT METHODS ################
2025-02-08 23:18:12 +01:00
async def upsert ( self , data : dict [ str , Any ] ) - > None :
2025-02-08 16:05:59 +08:00
if is_namespace ( self . namespace , NameSpace . KV_STORE_TEXT_CHUNKS ) :
2024-11-08 14:58:41 +08:00
list_data = [
{
2025-01-16 12:52:37 +08:00
" id " : k ,
2024-11-08 14:58:41 +08:00
* * { k1 : v1 for k1 , v1 in v . items ( ) } ,
}
for k , v in data . items ( )
]
contents = [ v [ " content " ] for v in data . values ( ) ]
batches = [
2024-11-12 13:32:40 +08:00
contents [ i : i + self . _max_batch_size ]
2024-11-08 14:58:41 +08:00
for i in range ( 0 , len ( contents ) , self . _max_batch_size )
]
embeddings_list = await asyncio . gather (
* [ self . embedding_func ( batch ) for batch in batches ]
)
embeddings = np . concatenate ( embeddings_list )
for i , d in enumerate ( list_data ) :
d [ " __vector__ " ] = embeddings [ i ]
2025-01-16 12:58:15 +08:00
2025-01-16 12:52:37 +08:00
merge_sql = SQL_TEMPLATES [ " merge_chunk " ]
2024-11-08 14:58:41 +08:00
for item in list_data :
2025-01-16 12:52:37 +08:00
_data = {
" id " : item [ " id " ] ,
2024-11-25 14:21:01 +08:00
" content " : item [ " content " ] ,
" workspace " : self . db . workspace ,
" tokens " : item [ " tokens " ] ,
" chunk_order_index " : item [ " chunk_order_index " ] ,
" full_doc_id " : item [ " full_doc_id " ] ,
" content_vector " : item [ " __vector__ " ] ,
2025-01-16 12:52:37 +08:00
" status " : item [ " status " ] ,
2024-11-25 14:21:01 +08:00
}
2025-01-16 12:52:37 +08:00
await self . db . execute ( merge_sql , _data )
2025-02-08 16:05:59 +08:00
if is_namespace ( self . namespace , NameSpace . KV_STORE_FULL_DOCS ) :
2025-01-16 12:52:37 +08:00
for k , v in data . items ( ) :
2024-11-12 13:32:40 +08:00
# values.clear()
2024-11-25 14:15:10 +08:00
merge_sql = SQL_TEMPLATES [ " merge_doc_full " ]
2025-01-16 12:52:37 +08:00
_data = {
2024-11-25 14:21:01 +08:00
" id " : k ,
" content " : v [ " content " ] ,
" workspace " : self . db . workspace ,
}
2025-01-16 12:52:37 +08:00
await self . db . execute ( merge_sql , _data )
2025-01-10 11:36:28 +08:00
2025-02-08 16:05:59 +08:00
if is_namespace ( self . namespace , NameSpace . KV_STORE_LLM_RESPONSE_CACHE ) :
2025-01-10 11:36:28 +08:00
for mode , items in data . items ( ) :
for k , v in items . items ( ) :
upsert_sql = SQL_TEMPLATES [ " upsert_llm_response_cache " ]
_data = {
" workspace " : self . db . workspace ,
" id " : k ,
" original_prompt " : v [ " original_prompt " ] ,
" return_value " : v [ " return " ] ,
" cache_mode " : mode ,
}
await self . db . execute ( upsert_sql , _data )
2024-11-08 14:58:41 +08:00
async def index_done_callback ( self ) :
2025-02-08 16:05:59 +08:00
if is_namespace (
self . namespace ,
( NameSpace . KV_STORE_FULL_DOCS , NameSpace . KV_STORE_TEXT_CHUNKS ) ,
) :
logger . info ( " full doc and chunk data had been saved into oracle db! " )
2025-02-08 23:58:15 +01:00
2024-11-08 14:58:41 +08:00
@dataclass
class OracleVectorDBStorage ( BaseVectorStorage ) :
2025-01-10 11:36:28 +08:00
# should pass db object to self.db
db : OracleDB = None
2025-01-29 23:47:57 +08:00
cosine_better_than_threshold : float = float ( os . getenv ( " COSINE_THRESHOLD " , " 0.2 " ) )
2024-11-08 14:58:41 +08:00
def __post_init__ ( self ) :
2025-01-29 23:47:57 +08:00
# Use global config value if specified, otherwise use default
config = self . global_config . get ( " vector_db_storage_cls_kwargs " , { } )
self . cosine_better_than_threshold = config . get (
" cosine_better_than_threshold " , self . cosine_better_than_threshold
)
2024-11-12 13:32:40 +08:00
2024-11-08 14:58:41 +08:00
async def upsert ( self , data : dict [ str , dict ] ) :
""" 向向量数据库中插入数据 """
pass
async def index_done_callback ( self ) :
pass
2024-11-11 15:24:31 +08:00
#################### query method ###############
2024-11-08 14:58:41 +08:00
async def query ( self , query : str , top_k = 5 ) - > Union [ dict , list [ dict ] ] :
2024-11-12 13:32:40 +08:00
""" 从向量数据库中查询数据 """
2024-11-08 14:58:41 +08:00
embeddings = await self . embedding_func ( [ query ] )
embedding = embeddings [ 0 ]
# 转换精度
dtype = str ( embedding . dtype ) . upper ( )
dimension = embedding . shape [ 0 ]
2024-11-25 14:21:01 +08:00
embedding_string = " [ " + " , " . join ( map ( str , embedding . tolist ( ) ) ) + " ] "
2024-11-25 14:15:10 +08:00
SQL = SQL_TEMPLATES [ self . namespace ] . format ( dimension = dimension , dtype = dtype )
params = {
" embedding_string " : embedding_string ,
" workspace " : self . db . workspace ,
" top_k " : top_k ,
" better_than_threshold " : self . cosine_better_than_threshold ,
2024-11-25 14:21:01 +08:00
}
2024-11-08 14:58:41 +08:00
# print(SQL)
2024-11-25 14:21:01 +08:00
results = await self . db . query ( SQL , params = params , multirows = True )
2024-11-12 13:32:40 +08:00
# print("vector search result:",results)
2024-11-08 14:58:41 +08:00
return results
@dataclass
2024-11-12 13:32:40 +08:00
class OracleGraphStorage ( BaseGraphStorage ) :
2024-11-08 14:58:41 +08:00
""" 基于Oracle的图存储模块 """
2024-11-12 13:32:40 +08:00
2024-11-08 14:58:41 +08:00
def __post_init__ ( self ) :
""" 从graphml文件加载图 """
2025-01-16 12:52:37 +08:00
self . _max_batch_size = self . global_config . get ( " embedding_batch_num " , 10 )
2024-11-08 14:58:41 +08:00
#################### insert method ################
2024-11-12 13:32:40 +08:00
2024-11-08 14:58:41 +08:00
async def upsert_node ( self , node_id : str , node_data : dict [ str , str ] ) :
""" 插入或更新节点 """
2024-11-12 13:32:40 +08:00
# print("go into upsert node method")
2024-11-08 14:58:41 +08:00
entity_name = node_id
entity_type = node_data [ " entity_type " ]
description = node_data [ " description " ]
2024-11-12 13:32:40 +08:00
source_id = node_data [ " source_id " ]
2024-11-25 13:29:55 +08:00
logger . debug ( f " entity_name: { entity_name } , entity_type: { entity_type } " )
2024-11-12 13:32:40 +08:00
content = entity_name + description
2024-11-08 14:58:41 +08:00
contents = [ content ]
batches = [
2024-11-12 13:32:40 +08:00
contents [ i : i + self . _max_batch_size ]
2024-11-08 14:58:41 +08:00
for i in range ( 0 , len ( contents ) , self . _max_batch_size )
]
embeddings_list = await asyncio . gather (
* [ self . embedding_func ( batch ) for batch in batches ]
)
embeddings = np . concatenate ( embeddings_list )
content_vector = embeddings [ 0 ]
2024-11-25 14:21:01 +08:00
merge_sql = SQL_TEMPLATES [ " merge_node " ]
2024-11-25 14:15:10 +08:00
data = {
2024-11-25 14:21:01 +08:00
" workspace " : self . db . workspace ,
" name " : entity_name ,
" entity_type " : entity_type ,
" description " : description ,
" source_chunk_id " : source_id ,
" content " : content ,
" content_vector " : content_vector ,
}
await self . db . execute ( merge_sql , data )
2024-11-12 13:32:40 +08:00
# self._graph.add_node(node_id, **node_data)
2024-11-08 14:58:41 +08:00
async def upsert_edge (
self , source_node_id : str , target_node_id : str , edge_data : dict [ str , str ]
) :
""" 插入或更新边 """
2024-11-12 13:32:40 +08:00
# print("go into upsert edge method")
2024-11-08 14:58:41 +08:00
source_name = source_node_id
target_name = target_node_id
weight = edge_data [ " weight " ]
keywords = edge_data [ " keywords " ]
description = edge_data [ " description " ]
source_chunk_id = edge_data [ " source_id " ]
2024-11-25 14:21:01 +08:00
logger . debug (
f " source_name: { source_name } , target_name: { target_name } , keywords: { keywords } "
)
2024-11-25 14:15:10 +08:00
2024-11-12 13:32:40 +08:00
content = keywords + source_name + target_name + description
2024-11-08 14:58:41 +08:00
contents = [ content ]
batches = [
2024-11-12 13:32:40 +08:00
contents [ i : i + self . _max_batch_size ]
2024-11-08 14:58:41 +08:00
for i in range ( 0 , len ( contents ) , self . _max_batch_size )
]
embeddings_list = await asyncio . gather (
* [ self . embedding_func ( batch ) for batch in batches ]
)
embeddings = np . concatenate ( embeddings_list )
content_vector = embeddings [ 0 ]
2024-11-25 14:21:01 +08:00
merge_sql = SQL_TEMPLATES [ " merge_edge " ]
2024-11-25 14:15:10 +08:00
data = {
2024-11-25 14:21:01 +08:00
" workspace " : self . db . workspace ,
" source_name " : source_name ,
" target_name " : target_name ,
" weight " : weight ,
" keywords " : keywords ,
" description " : description ,
" source_chunk_id " : source_chunk_id ,
" content " : content ,
" content_vector " : content_vector ,
}
2024-11-12 13:32:40 +08:00
# print(merge_sql)
2024-11-25 14:21:01 +08:00
await self . db . execute ( merge_sql , data )
2024-11-12 13:32:40 +08:00
# self._graph.add_edge(source_node_id, target_node_id, **edge_data)
2024-11-08 14:58:41 +08:00
async def embed_nodes ( self , algorithm : str ) - > tuple [ np . ndarray , list [ str ] ] :
""" 为节点生成向量 """
if algorithm not in self . _node_embed_algorithms :
raise ValueError ( f " Node embedding algorithm { algorithm } not supported " )
return await self . _node_embed_algorithms [ algorithm ] ( )
async def _node2vec_embed ( self ) :
""" 为节点生成向量 """
from graspologic import embed
embeddings , nodes = embed . node2vec_embed (
self . _graph ,
* * self . config [ " node2vec_params " ] ,
)
nodes_ids = [ self . _graph . nodes [ node_id ] [ " id " ] for node_id in nodes ]
return embeddings , nodes_ids
async def index_done_callback ( self ) :
""" 写入graphhml图文件 """
2024-11-12 13:32:40 +08:00
logger . info (
" Node and edge data had been saved into oracle db already, so nothing to do here! "
)
2024-11-11 15:32:30 +08:00
#################### query method #################
2024-11-08 14:58:41 +08:00
async def has_node ( self , node_id : str ) - > bool :
2024-11-12 13:32:40 +08:00
""" 根据节点id检查节点是否存在 """
2024-11-25 14:15:10 +08:00
SQL = SQL_TEMPLATES [ " has_node " ]
2024-11-25 14:21:01 +08:00
params = { " workspace " : self . db . workspace , " node_id " : node_id }
2024-11-12 13:32:40 +08:00
# print(SQL)
# print(self.db.workspace, node_id)
2024-11-25 14:21:01 +08:00
res = await self . db . query ( SQL , params )
2024-11-08 14:58:41 +08:00
if res :
2024-11-12 13:32:40 +08:00
# print("Node exist!",res)
2024-11-08 14:58:41 +08:00
return True
else :
2024-11-12 13:32:40 +08:00
# print("Node not exist!")
2024-11-08 14:58:41 +08:00
return False
async def has_edge ( self , source_node_id : str , target_node_id : str ) - > bool :
""" 根据源和目标节点id检查边是否存在 """
2024-11-25 14:15:10 +08:00
SQL = SQL_TEMPLATES [ " has_edge " ]
params = {
2024-11-25 14:21:01 +08:00
" workspace " : self . db . workspace ,
" source_node_id " : source_node_id ,
" target_node_id " : target_node_id ,
}
2024-11-08 14:58:41 +08:00
# print(SQL)
2024-11-25 14:21:01 +08:00
res = await self . db . query ( SQL , params )
2024-11-08 14:58:41 +08:00
if res :
2024-11-12 13:32:40 +08:00
# print("Edge exist!",res)
2024-11-08 14:58:41 +08:00
return True
else :
2024-11-12 13:32:40 +08:00
# print("Edge not exist!")
2024-11-08 14:58:41 +08:00
return False
async def node_degree ( self , node_id : str ) - > int :
2024-11-12 13:32:40 +08:00
""" 根据节点id获取节点的度 """
2024-11-25 14:15:10 +08:00
SQL = SQL_TEMPLATES [ " node_degree " ]
2024-11-25 14:21:01 +08:00
params = { " workspace " : self . db . workspace , " node_id " : node_id }
2024-11-08 14:58:41 +08:00
# print(SQL)
2024-11-25 14:21:01 +08:00
res = await self . db . query ( SQL , params )
2024-11-08 14:58:41 +08:00
if res :
2024-11-12 13:32:40 +08:00
# print("Node degree",res["degree"])
2024-11-08 14:58:41 +08:00
return res [ " degree " ]
else :
2024-11-12 13:32:40 +08:00
# print("Edge not exist!")
2024-11-08 14:58:41 +08:00
return 0
async def edge_degree ( self , src_id : str , tgt_id : str ) - > int :
""" 根据源和目标节点id获取边的度 """
degree = await self . node_degree ( src_id ) + await self . node_degree ( tgt_id )
2024-11-12 13:32:40 +08:00
# print("Edge degree",degree)
2024-11-08 14:58:41 +08:00
return degree
async def get_node ( self , node_id : str ) - > Union [ dict , None ] :
""" 根据节点id获取节点数据 """
2024-11-25 14:15:10 +08:00
SQL = SQL_TEMPLATES [ " get_node " ]
2024-11-25 14:21:01 +08:00
params = { " workspace " : self . db . workspace , " node_id " : node_id }
2024-11-08 14:58:41 +08:00
# print(self.db.workspace, node_id)
# print(SQL)
2024-11-25 14:21:01 +08:00
res = await self . db . query ( SQL , params )
2024-11-08 14:58:41 +08:00
if res :
2024-11-12 13:32:40 +08:00
# print("Get node!",self.db.workspace, node_id,res)
2024-11-08 14:58:41 +08:00
return res
else :
2024-11-12 13:32:40 +08:00
# print("Can't get node!",self.db.workspace, node_id)
2024-11-08 14:58:41 +08:00
return None
2024-11-12 13:32:40 +08:00
2024-11-08 14:58:41 +08:00
async def get_edge (
self , source_node_id : str , target_node_id : str
) - > Union [ dict , None ] :
""" 根据源和目标节点id获取边 """
2024-11-25 14:15:10 +08:00
SQL = SQL_TEMPLATES [ " get_edge " ]
params = {
2024-11-25 14:21:01 +08:00
" workspace " : self . db . workspace ,
" source_node_id " : source_node_id ,
" target_node_id " : target_node_id ,
}
res = await self . db . query ( SQL , params )
2024-11-08 14:58:41 +08:00
if res :
2024-11-12 13:32:40 +08:00
# print("Get edge!",self.db.workspace, source_node_id, target_node_id,res[0])
2024-11-08 14:58:41 +08:00
return res
else :
2024-11-12 13:32:40 +08:00
# print("Edge not exist!",self.db.workspace, source_node_id, target_node_id)
2024-11-08 14:58:41 +08:00
return None
async def get_node_edges ( self , source_node_id : str ) :
""" 根据节点id获取节点的所有边 """
if await self . has_node ( source_node_id ) :
2024-11-25 14:15:10 +08:00
SQL = SQL_TEMPLATES [ " get_node_edges " ]
2024-11-25 14:21:01 +08:00
params = { " workspace " : self . db . workspace , " source_node_id " : source_node_id }
2024-11-25 14:15:10 +08:00
res = await self . db . query ( sql = SQL , params = params , multirows = True )
2024-11-08 14:58:41 +08:00
if res :
2024-11-12 13:32:40 +08:00
data = [ ( i [ " source_name " ] , i [ " target_name " ] ) for i in res ]
# print("Get node edge!",self.db.workspace, source_node_id,data)
2024-11-08 14:58:41 +08:00
return data
else :
2024-11-12 13:32:40 +08:00
# print("Node Edge not exist!",self.db.workspace, source_node_id)
2024-11-08 14:58:41 +08:00
return [ ]
2024-11-25 14:21:01 +08:00
2024-11-25 14:15:10 +08:00
async def get_all_nodes ( self , limit : int ) :
""" 查询所有节点 """
SQL = SQL_TEMPLATES [ " get_all_nodes " ]
2024-11-25 14:21:01 +08:00
params = { " workspace " : self . db . workspace , " limit " : str ( limit ) }
res = await self . db . query ( sql = SQL , params = params , multirows = True )
2024-11-25 14:15:10 +08:00
if res :
return res
2024-11-08 14:58:41 +08:00
2024-11-25 14:15:10 +08:00
async def get_all_edges ( self , limit : int ) :
""" 查询所有边 """
SQL = SQL_TEMPLATES [ " get_all_edges " ]
2024-11-25 14:21:01 +08:00
params = { " workspace " : self . db . workspace , " limit " : str ( limit ) }
res = await self . db . query ( sql = SQL , params = params , multirows = True )
2024-11-25 14:15:10 +08:00
if res :
return res
2024-11-25 14:21:01 +08:00
2024-11-25 14:15:10 +08:00
async def get_statistics ( self ) :
SQL = SQL_TEMPLATES [ " get_statistics " ]
2024-11-25 14:21:01 +08:00
params = { " workspace " : self . db . workspace }
res = await self . db . query ( sql = SQL , params = params , multirows = True )
2024-11-25 14:15:10 +08:00
if res :
return res
2024-11-25 13:29:55 +08:00
2024-11-25 14:21:01 +08:00
2024-11-08 14:58:41 +08:00
N_T = {
2025-02-08 16:05:59 +08:00
NameSpace . KV_STORE_FULL_DOCS : " LIGHTRAG_DOC_FULL " ,
NameSpace . KV_STORE_TEXT_CHUNKS : " LIGHTRAG_DOC_CHUNKS " ,
NameSpace . VECTOR_STORE_CHUNKS : " LIGHTRAG_DOC_CHUNKS " ,
NameSpace . VECTOR_STORE_ENTITIES : " LIGHTRAG_GRAPH_NODES " ,
NameSpace . VECTOR_STORE_RELATIONSHIPS : " LIGHTRAG_GRAPH_EDGES " ,
2024-11-08 14:58:41 +08:00
}
2025-02-08 16:06:07 +08:00
2025-02-08 16:05:59 +08:00
def namespace_to_table_name ( namespace : str ) - > str :
for k , v in N_T . items ( ) :
if is_namespace ( namespace , k ) :
return v
2024-11-08 14:58:41 +08:00
TABLES = {
2024-11-12 13:32:40 +08:00
" LIGHTRAG_DOC_FULL " : {
" ddl " : """ CREATE TABLE LIGHTRAG_DOC_FULL (
2025-01-10 11:36:28 +08:00
id varchar ( 256 ) ,
2024-11-08 14:58:41 +08:00
workspace varchar ( 1024 ) ,
doc_name varchar ( 1024 ) ,
content CLOB ,
2024-11-12 09:59:12 +08:00
meta JSON ,
2025-01-10 11:36:28 +08:00
content_summary varchar ( 1024 ) ,
content_length NUMBER ,
status varchar ( 256 ) ,
chunks_count NUMBER ,
2024-11-12 09:59:12 +08:00
createtime TIMESTAMP DEFAULT CURRENT_TIMESTAMP ,
2025-01-10 11:36:28 +08:00
updatetime TIMESTAMP DEFAULT NULL ,
error varchar ( 4096 )
2024-11-12 13:32:40 +08:00
) """
} ,
" LIGHTRAG_DOC_CHUNKS " : {
" ddl " : """ CREATE TABLE LIGHTRAG_DOC_CHUNKS (
2025-01-16 12:52:37 +08:00
id varchar ( 256 ) ,
2024-11-08 14:58:41 +08:00
workspace varchar ( 1024 ) ,
full_doc_id varchar ( 256 ) ,
2025-01-16 12:52:37 +08:00
status varchar ( 256 ) ,
2024-11-08 14:58:41 +08:00
chunk_order_index NUMBER ,
2024-11-12 13:32:40 +08:00
tokens NUMBER ,
2024-11-08 14:58:41 +08:00
content CLOB ,
2024-11-12 09:59:12 +08:00
content_vector VECTOR ,
createtime TIMESTAMP DEFAULT CURRENT_TIMESTAMP ,
2024-11-12 13:32:40 +08:00
updatetime TIMESTAMP DEFAULT NULL
) """
} ,
" LIGHTRAG_GRAPH_NODES " : {
" ddl " : """ CREATE TABLE LIGHTRAG_GRAPH_NODES (
2024-11-08 14:58:41 +08:00
id NUMBER GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY ,
workspace varchar ( 1024 ) ,
name varchar ( 2048 ) ,
2024-11-12 13:32:40 +08:00
entity_type varchar ( 1024 ) ,
2024-11-08 14:58:41 +08:00
description CLOB ,
source_chunk_id varchar ( 256 ) ,
content CLOB ,
2024-11-12 09:59:12 +08:00
content_vector VECTOR ,
createtime TIMESTAMP DEFAULT CURRENT_TIMESTAMP ,
updatetime TIMESTAMP DEFAULT NULL
2024-11-12 13:32:40 +08:00
) """
} ,
" LIGHTRAG_GRAPH_EDGES " : {
" ddl " : """ CREATE TABLE LIGHTRAG_GRAPH_EDGES (
2024-11-08 14:58:41 +08:00
id NUMBER GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY ,
workspace varchar ( 1024 ) ,
source_name varchar ( 2048 ) ,
2024-11-12 13:32:40 +08:00
target_name varchar ( 2048 ) ,
2024-11-08 14:58:41 +08:00
weight NUMBER ,
2024-11-12 13:32:40 +08:00
keywords CLOB ,
2024-11-08 14:58:41 +08:00
description CLOB ,
source_chunk_id varchar ( 256 ) ,
content CLOB ,
2024-11-12 09:59:12 +08:00
content_vector VECTOR ,
createtime TIMESTAMP DEFAULT CURRENT_TIMESTAMP ,
updatetime TIMESTAMP DEFAULT NULL
2024-11-12 13:32:40 +08:00
) """
} ,
" LIGHTRAG_LLM_CACHE " : {
" ddl " : """ CREATE TABLE LIGHTRAG_LLM_CACHE (
2025-01-16 12:58:15 +08:00
id varchar ( 256 ) PRIMARY KEY ,
workspace varchar ( 1024 ) ,
2025-01-10 11:36:28 +08:00
cache_mode varchar ( 256 ) ,
model_name varchar ( 256 ) ,
original_prompt clob ,
return_value clob ,
embedding CLOB ,
embedding_shape NUMBER ,
embedding_min NUMBER ,
embedding_max NUMBER ,
2024-11-12 09:59:12 +08:00
createtime TIMESTAMP DEFAULT CURRENT_TIMESTAMP ,
updatetime TIMESTAMP DEFAULT NULL
2024-11-12 13:32:40 +08:00
) """
} ,
" LIGHTRAG_GRAPH " : {
" ddl " : """ CREATE OR REPLACE PROPERTY GRAPH lightrag_graph
2024-11-08 14:58:41 +08:00
VERTEX TABLES (
lightrag_graph_nodes KEY ( id )
LABEL entity
PROPERTIES ( id , workspace , name ) - - , entity_type , description , source_chunk_id )
)
EDGE TABLES (
lightrag_graph_edges KEY ( id )
SOURCE KEY ( source_name ) REFERENCES lightrag_graph_nodes ( name )
DESTINATION KEY ( target_name ) REFERENCES lightrag_graph_nodes ( name )
LABEL has_relation
2024-11-12 13:32:40 +08:00
PROPERTIES ( id , workspace , source_name , target_name ) - - , weight , keywords , description , source_chunk_id )
) OPTIONS ( ALLOW MIXED PROPERTY TYPES ) """
} ,
}
2024-11-08 14:58:41 +08:00
SQL_TEMPLATES = {
# SQL for KVStorage
2025-01-16 12:52:37 +08:00
" get_by_id_full_docs " : " select ID,content,status from LIGHTRAG_DOC_FULL where workspace=:workspace and ID=:id " ,
" get_by_id_text_chunks " : " select ID,TOKENS,content,CHUNK_ORDER_INDEX,FULL_DOC_ID,status from LIGHTRAG_DOC_CHUNKS where workspace=:workspace and ID=:id " ,
2025-01-10 11:36:28 +08:00
" get_by_id_llm_response_cache " : """ SELECT id, original_prompt, NVL(return_value, ' ' ) as " return " , cache_mode as " mode "
FROM LIGHTRAG_LLM_CACHE WHERE workspace = : workspace AND id = : id """ ,
" get_by_mode_id_llm_response_cache " : """ SELECT id, original_prompt, NVL(return_value, ' ' ) as " return " , cache_mode as " mode "
FROM LIGHTRAG_LLM_CACHE WHERE workspace = : workspace AND cache_mode = : cache_mode AND id = : id """ ,
" get_by_ids_llm_response_cache " : """ SELECT id, original_prompt, NVL(return_value, ' ' ) as " return " , cache_mode as " mode "
FROM LIGHTRAG_LLM_CACHE WHERE workspace = : workspace AND id IN ( { ids } ) """ ,
2025-01-16 12:52:37 +08:00
" get_by_ids_full_docs " : " select t.*,createtime as created_at from LIGHTRAG_DOC_FULL t where workspace=:workspace and ID in ( {ids} ) " ,
" get_by_ids_text_chunks " : " select ID,TOKENS,content,CHUNK_ORDER_INDEX,FULL_DOC_ID from LIGHTRAG_DOC_CHUNKS where workspace=:workspace and ID in ( {ids} ) " ,
" get_by_status_ids_full_docs " : " select id,status from LIGHTRAG_DOC_FULL t where workspace=:workspace AND status=:status and ID in ( {ids} ) " ,
" get_by_status_ids_text_chunks " : " select id,status from LIGHTRAG_DOC_CHUNKS where workspace=:workspace and status=:status ID in ( {ids} ) " ,
" get_by_status_full_docs " : " select id,status from LIGHTRAG_DOC_FULL t where workspace=:workspace AND status=:status " ,
" get_by_status_text_chunks " : " select id,status from LIGHTRAG_DOC_CHUNKS where workspace=:workspace and status=:status " ,
2024-11-15 12:57:01 +08:00
" filter_keys " : " select id from {table_name} where workspace=:workspace and id in ( {ids} ) " ,
2025-01-10 11:36:28 +08:00
" merge_doc_full " : """ MERGE INTO LIGHTRAG_DOC_FULL a
USING DUAL
ON ( a . id = : id and a . workspace = : workspace )
WHEN NOT MATCHED THEN
INSERT ( id , content , workspace ) values ( : id , : content , : workspace ) """ ,
2025-01-16 12:52:37 +08:00
" merge_chunk " : """ MERGE INTO LIGHTRAG_DOC_CHUNKS
2025-01-10 11:36:28 +08:00
USING DUAL
2025-01-16 12:52:37 +08:00
ON ( id = : id and workspace = : workspace )
WHEN NOT MATCHED THEN INSERT
( id , content , workspace , tokens , chunk_order_index , full_doc_id , content_vector , status )
values ( : id , : content , : workspace , : tokens , : chunk_order_index , : full_doc_id , : content_vector , : status ) """ ,
2025-01-10 11:36:28 +08:00
" upsert_llm_response_cache " : """ MERGE INTO LIGHTRAG_LLM_CACHE a
USING DUAL
ON ( a . id = : id )
WHEN NOT MATCHED THEN
INSERT ( workspace , id , original_prompt , return_value , cache_mode )
VALUES ( : workspace , : id , : original_prompt , : return_value , : cache_mode )
WHEN MATCHED THEN UPDATE
SET original_prompt = : original_prompt ,
return_value = : return_value ,
cache_mode = : cache_mode ,
updatetime = SYSDATE """ ,
2024-11-08 14:58:41 +08:00
# SQL for VectorStorage
2024-11-12 13:32:40 +08:00
" entities " : """ SELECT name as entity_name FROM
2024-11-15 12:57:01 +08:00
( SELECT id , name , VECTOR_DISTANCE ( content_vector , vector ( : embedding_string , { dimension } , { dtype } ) , COSINE ) as distance
FROM LIGHTRAG_GRAPH_NODES WHERE workspace = : workspace )
WHERE distance > : better_than_threshold ORDER BY distance ASC FETCH FIRST : top_k ROWS ONLY """ ,
2024-11-12 13:32:40 +08:00
" relationships " : """ SELECT source_name as src_id, target_name as tgt_id FROM
2024-11-15 12:57:01 +08:00
( SELECT id , source_name , target_name , VECTOR_DISTANCE ( content_vector , vector ( : embedding_string , { dimension } , { dtype } ) , COSINE ) as distance
FROM LIGHTRAG_GRAPH_EDGES WHERE workspace = : workspace )
WHERE distance > : better_than_threshold ORDER BY distance ASC FETCH FIRST : top_k ROWS ONLY """ ,
2024-11-12 13:32:40 +08:00
" chunks " : """ SELECT id FROM
2024-11-15 12:57:01 +08:00
( SELECT id , VECTOR_DISTANCE ( content_vector , vector ( : embedding_string , { dimension } , { dtype } ) , COSINE ) as distance
FROM LIGHTRAG_DOC_CHUNKS WHERE workspace = : workspace )
WHERE distance > : better_than_threshold ORDER BY distance ASC FETCH FIRST : top_k ROWS ONLY """ ,
2024-11-08 14:58:41 +08:00
# SQL for GraphStorage
2024-11-12 13:32:40 +08:00
" has_node " : """ SELECT * FROM GRAPH_TABLE (lightrag_graph
2024-11-08 14:58:41 +08:00
MATCH ( a )
2024-11-15 12:57:01 +08:00
WHERE a . workspace = : workspace AND a . name = : node_id
2024-11-08 14:58:41 +08:00
COLUMNS ( a . name ) ) """ ,
2024-11-12 13:32:40 +08:00
" has_edge " : """ SELECT * FROM GRAPH_TABLE (lightrag_graph
2024-11-08 14:58:41 +08:00
MATCH ( a ) - [ e ] - > ( b )
2024-11-15 12:57:01 +08:00
WHERE e . workspace = : workspace and a . workspace = : workspace and b . workspace = : workspace
AND a . name = : source_node_id AND b . name = : target_node_id
2024-11-08 14:58:41 +08:00
COLUMNS ( e . source_name , e . target_name ) ) """ ,
2024-11-12 13:32:40 +08:00
" node_degree " : """ SELECT count(1) as degree FROM GRAPH_TABLE (lightrag_graph
2024-11-08 14:58:41 +08:00
MATCH ( a ) - [ e ] - > ( b )
2025-01-24 15:57:31 +01:00
WHERE e . workspace = : workspace and a . workspace = : workspace and b . workspace = : workspace
2024-11-15 12:57:01 +08:00
AND a . name = : node_id or b . name = : node_id
2024-11-08 14:58:41 +08:00
COLUMNS ( a . name ) ) """ ,
2024-11-12 13:32:40 +08:00
" get_node " : """ SELECT t1.name,t2.entity_type,t2.source_chunk_id as source_id,NVL(t2.description, ' ' ) AS description
2024-11-08 14:58:41 +08:00
FROM GRAPH_TABLE ( lightrag_graph
2024-11-12 13:32:40 +08:00
MATCH ( a )
2024-11-15 12:57:01 +08:00
WHERE a . workspace = : workspace AND a . name = : node_id
2024-11-08 14:58:41 +08:00
COLUMNS ( a . name )
) t1 JOIN LIGHTRAG_GRAPH_NODES t2 on t1 . name = t2 . name
2024-11-15 12:57:01 +08:00
WHERE t2 . workspace = : workspace """ ,
2024-11-12 13:32:40 +08:00
" get_edge " : """ SELECT t1.source_id,t2.weight,t2.source_chunk_id as source_id,t2.keywords,
2024-11-08 14:58:41 +08:00
NVL ( t2 . description , ' ' ) AS description , NVL ( t2 . KEYWORDS , ' ' ) AS keywords
FROM GRAPH_TABLE ( lightrag_graph
MATCH ( a ) - [ e ] - > ( b )
2024-11-15 12:57:01 +08:00
WHERE e . workspace = : workspace and a . workspace = : workspace and b . workspace = : workspace
AND a . name = : source_node_id and b . name = : target_node_id
2024-11-08 14:58:41 +08:00
COLUMNS ( e . id , a . name as source_id )
) t1 JOIN LIGHTRAG_GRAPH_EDGES t2 on t1 . id = t2 . id """ ,
2024-11-12 13:32:40 +08:00
" get_node_edges " : """ SELECT source_name,target_name
2024-11-08 14:58:41 +08:00
FROM GRAPH_TABLE ( lightrag_graph
MATCH ( a ) - [ e ] - > ( b )
2024-11-15 12:57:01 +08:00
WHERE e . workspace = : workspace and a . workspace = : workspace and b . workspace = : workspace
AND a . name = : source_node_id
2024-11-08 14:58:41 +08:00
COLUMNS ( a . name as source_name , b . name as target_name ) ) """ ,
" merge_node " : """ MERGE INTO LIGHTRAG_GRAPH_NODES a
USING DUAL
2025-01-10 11:36:28 +08:00
ON ( a . workspace = : workspace and a . name = : name )
2024-11-08 14:58:41 +08:00
WHEN NOT MATCHED THEN
INSERT ( workspace , name , entity_type , description , source_chunk_id , content , content_vector )
2025-01-10 11:36:28 +08:00
values ( : workspace , : name , : entity_type , : description , : source_chunk_id , : content , : content_vector )
WHEN MATCHED THEN
2025-01-16 12:58:15 +08:00
UPDATE SET
2025-01-10 11:36:28 +08:00
entity_type = : entity_type , description = : description , source_chunk_id = : source_chunk_id , content = : content , content_vector = : content_vector , updatetime = SYSDATE """ ,
2024-11-08 14:58:41 +08:00
" merge_edge " : """ MERGE INTO LIGHTRAG_GRAPH_EDGES a
USING DUAL
2025-01-10 11:36:28 +08:00
ON ( a . workspace = : workspace and a . source_name = : source_name and a . target_name = : target_name )
2024-11-08 14:58:41 +08:00
WHEN NOT MATCHED THEN
INSERT ( workspace , source_name , target_name , weight , keywords , description , source_chunk_id , content , content_vector )
2025-01-10 11:36:28 +08:00
values ( : workspace , : source_name , : target_name , : weight , : keywords , : description , : source_chunk_id , : content , : content_vector )
WHEN MATCHED THEN
2025-01-16 12:58:15 +08:00
UPDATE SET
2025-01-10 11:36:28 +08:00
weight = : weight , keywords = : keywords , description = : description , source_chunk_id = : source_chunk_id , content = : content , content_vector = : content_vector , updatetime = SYSDATE """ ,
2024-11-25 14:21:01 +08:00
" get_all_nodes " : """ WITH t0 AS (
2024-11-25 14:15:10 +08:00
SELECT name AS id , entity_type AS label , entity_type , description ,
' [ " ' | | replace ( source_chunk_id , ' <SEP> ' , ' " , " ' ) | | ' " ] ' source_chunk_ids
FROM lightrag_graph_nodes
WHERE workspace = : workspace
ORDER BY createtime DESC fetch first : limit rows only
) , t1 AS (
SELECT t0 . id , source_chunk_id
FROM t0 , JSON_TABLE ( source_chunk_ids , ' $[*] ' COLUMNS ( source_chunk_id PATH ' $ ' ) )
) , t2 AS (
SELECT t1 . id , LISTAGG ( t2 . content , ' \n ' ) content
FROM t1 LEFT JOIN lightrag_doc_chunks t2 ON t1 . source_chunk_id = t2 . id
GROUP BY t1 . id
)
SELECT t0 . id , label , entity_type , description , t2 . content
FROM t0 LEFT JOIN t2 ON t0 . id = t2 . id """ ,
2024-11-25 14:21:01 +08:00
" get_all_edges " : """ SELECT t1.id,t1.keywords as label,t1.keywords, t1.source_name as source, t1.target_name as target,
2024-11-25 14:15:10 +08:00
t1 . weight , t1 . DESCRIPTION , t2 . content
FROM LIGHTRAG_GRAPH_EDGES t1
LEFT JOIN LIGHTRAG_DOC_CHUNKS t2 on t1 . source_chunk_id = t2 . id
WHERE t1 . workspace = : workspace
order by t1 . CREATETIME DESC
fetch first : limit rows only """ ,
2024-11-25 14:21:01 +08:00
" get_statistics " : """ select count(distinct CASE WHEN type= ' node ' THEN id END) as nodes_count,
2024-11-25 14:15:10 +08:00
count ( distinct CASE WHEN type = ' edge ' THEN id END ) as edges_count
FROM (
2024-11-25 14:21:01 +08:00
select ' node ' as type , id FROM GRAPH_TABLE ( lightrag_graph
2024-11-25 14:15:10 +08:00
MATCH ( a ) WHERE a . workspace = : workspace columns ( a . name as id ) )
UNION
2024-11-25 14:21:01 +08:00
select ' edge ' as type , TO_CHAR ( id ) id FROM GRAPH_TABLE ( lightrag_graph
2024-11-25 14:15:10 +08:00
MATCH ( a ) - [ e ] - > ( b ) WHERE e . workspace = : workspace columns ( e . id ) )
) """ ,
2024-11-12 13:32:40 +08:00
}