KAG/knext/reasoner/client.py
zhuzhongshu123 f5ad5f1101
refactor(all): kag v0.6 (#174)
* add path find

* fix find path

* spg guided relation extraction

* fix dict parse with same key

* rename graphalgoclient to graphclient

* rename graphalgoclient to graphclient

* file reader supports http url

* add checkpointer class

* parser supports checkpoint

* add build

* remove incorrect logs

* remove logs

* update examples

* update chain checkpointer

* vectorizer batch size set to 32

* add a zodb backended checkpointer

* add a zodb backended checkpointer

* fix zodb based checkpointer

* add thread for zodb IO

* fix(common): resolve mutlithread conflict in zodb IO

* fix(common): load existing zodb checkpoints

* update examples

* update examples

* fix zodb writer

* add docstring

* fix jieba version mismatch

* commit kag_config-tc.yaml

1、rename type to register_name
2、put a uniqe & specific name to register_name
3、rename reader to scanner
4、rename parser to reader
5、rename num_parallel to num_parallel_file, rename chain_level_num_paralle to num_parallel_chain_of_file
6、rename kag_extractor to schema_free_extractor, schema_base_extractor to schema_constraint_extractor
7、pre-define llm & vectorize_model and refer them in the yaml file

Issues to be resolved:
1、examples of event extract & spg extract
2、statistic of indexer, such as nums of nodes & edges extracted, ratio of llm invoke.
3、Exceptions such as Debt, account does not exist should be thrown in llm invoke.
4、conf of solver need to be re-examined.

* commit kag_config-tc.yaml

1、rename type to register_name
2、put a uniqe & specific name to register_name
3、rename reader to scanner
4、rename parser to reader
5、rename num_parallel to num_parallel_file, rename chain_level_num_paralle to num_parallel_chain_of_file
6、rename kag_extractor to schema_free_extractor, schema_base_extractor to schema_constraint_extractor
7、pre-define llm & vectorize_model and refer them in the yaml file

Issues to be resolved:
1、examples of event extract & spg extract
2、statistic of indexer, such as nums of nodes & edges extracted, ratio of llm invoke.
3、Exceptions such as Debt, account does not exist should be thrown in llm invoke.
4、conf of solver need to be re-examined.

* 1、fix bug in base_table_splitter

* 1、fix bug in base_table_splitter

* 1、fix bug in default_chain

* 增加solver

* add kag

* update outline splitter

* add main test

* add op

* code refactor

* add tools

* fix outline splitter

* fix outline prompt

* graph api pass

* commit with page rank

* add search api and graph api

* add markdown report

* fix vectorizer num batch compute

* add retry for vectorize model call

* update markdown reader

* update markdown reader

* update pdf reader

* raise extractor failure

* add default expr

* add log

* merge jc reader features

* rm import

* add build

* fix zodb based checkpointer

* add thread for zodb IO

* fix(common): resolve mutlithread conflict in zodb IO

* fix(common): load existing zodb checkpoints

* update examples

* update examples

* fix zodb writer

* add docstring

* fix jieba version mismatch

* commit kag_config-tc.yaml

1、rename type to register_name
2、put a uniqe & specific name to register_name
3、rename reader to scanner
4、rename parser to reader
5、rename num_parallel to num_parallel_file, rename chain_level_num_paralle to num_parallel_chain_of_file
6、rename kag_extractor to schema_free_extractor, schema_base_extractor to schema_constraint_extractor
7、pre-define llm & vectorize_model and refer them in the yaml file

Issues to be resolved:
1、examples of event extract & spg extract
2、statistic of indexer, such as nums of nodes & edges extracted, ratio of llm invoke.
3、Exceptions such as Debt, account does not exist should be thrown in llm invoke.
4、conf of solver need to be re-examined.

* commit kag_config-tc.yaml

1、rename type to register_name
2、put a uniqe & specific name to register_name
3、rename reader to scanner
4、rename parser to reader
5、rename num_parallel to num_parallel_file, rename chain_level_num_paralle to num_parallel_chain_of_file
6、rename kag_extractor to schema_free_extractor, schema_base_extractor to schema_constraint_extractor
7、pre-define llm & vectorize_model and refer them in the yaml file

Issues to be resolved:
1、examples of event extract & spg extract
2、statistic of indexer, such as nums of nodes & edges extracted, ratio of llm invoke.
3、Exceptions such as Debt, account does not exist should be thrown in llm invoke.
4、conf of solver need to be re-examined.

* 1、fix bug in base_table_splitter

* 1、fix bug in base_table_splitter

* 1、fix bug in default_chain

* update outline splitter

* add main test

* add markdown report

* code refactor

* fix outline splitter

* fix outline prompt

* update markdown reader

* fix vectorizer num batch compute

* add retry for vectorize model call

* update markdown reader

* raise extractor failure

* rm parser

* run pipeline

* add config option of whether to perform llm config check, default to false

* fix

* recover pdf reader

* several components can be null for default chain

* 支持完整qa运行

* add if

* remove unused code

* 使用chunk兜底

* excluded source relation to choose

* add generate

* default recall 10

* add local memory

* 排除相似边

* 增加保护

* 修复并发问题

* add debug logger

* 支持topk参数化

* 支持chunk截断和调整spo select 的prompt

* 增加查询请求保护

* 增加force_chunk配置

* fix entity linker algorithm

* 增加sub query改写

* fix md reader dup in test

* fix

* merge knext to kag parallel

* fix package

* 修复指标下跌问题

* scanner update

* scanner update

* add doc and update example scripts

* fix

* add bridge to spg server

* add format

* fix bridge

* update conf for baike

* disable ckpt for spg server runner

* llm invoke error default raise exceptions

* chore(version): bump version to X.Y.Z

* update default response generation prompt

* add method getSummarizationMetrics

* fix(common): fix project conf empty error

* fix typo

* 增加上报信息

* 修改main solver

* postprocessor support spg server

* 修改solver支持名

* fix language

* 修改chunker接口,增加openapi

* rename vectorizer to vectorize_model in spg server config

* generate_random_string start with gen

* add knext llm vector checker

* add knext llm vector checker

* add knext llm vector checker

* solver移除默认值

* udpate yaml and register_name for baike

* udpate yaml and register_name for baike

* remove config key check

* 修复llmmodule

* fix knext project

* udpate yaml and register_name for examples

* udpate yaml and register_name for examples

* Revert "udpate yaml and register_name for examples"

This reverts commit b3fa5ca9ba749e501133ac67bd8746027ab839d9.

* update register name

* fix

* fix

* support multiple resigter names

* update component

* update reader register names (#183)

* fix markdown reader

* fix llm client for retry

* feat(common): add processed chunk id checkpoint (#185)

* update reader register names

* add processed chunk id checkpoint

* feat(example): add example config (#186)

* update reader register names

* add processed chunk id checkpoint

* add example config file

* add max_workers parameter for getSummarizationMetrics to make it faster

* add csqa data generation script generate_data.py

* commit generated csqa builder and solver data

* add csqa basic project files

* adjust split_length and num_threads_per_chain to match lightrag settings

* ignore ckpt dirs

* add csqa evaluation script eval.py

* save evaluation scripts summarization_metrics.py and factual_correctness.py

* save LightRAG output csqa_lightrag_answers.json

* ignore KAG output csqa_kag_answers.json

* add README.md for CSQA

* fix(solver): fix solver pipeline conf (#191)

* update reader register names

* add processed chunk id checkpoint

* add example config file

* update solver pipeline config

* fix project create

* update links and file paths

* reformat csqa kag_config.yaml

* reformat csqa python files

* reformat getSummarizationMetrics and compare_summarization_answers

* fix(solver): fix solver config (#192)

* update reader register names

* add processed chunk id checkpoint

* add example config file

* update solver pipeline config

* fix project create

* fix main solver conf

* add except

* fix typo in csqa README.md

* feat(conf): support reinitialize config for call from java side (#199)

* update reader register names

* add processed chunk id checkpoint

* add example config file

* update solver pipeline config

* fix project create

* fix main solver conf

* support reinitialize config for java call

* revert default response generation prompt

* update project list

* add README.md for the hotpotqa, 2wiki and musique examples

* 增加spo检索

* turn off kag config dump by default

* turn off knext schema dump by default

* add .gitignore and fix kag_config.yaml

* add README.md for the medicine example

* add README.md for the supplychain example

* bugfix for risk mining

* use exact out

* refactor(solver): format solver code (#205)

* update reader register names

* add processed chunk id checkpoint

* add example config file

* update solver pipeline config

* fix project create

* fix main solver conf

* support reinitialize config for java call

* black format

---------

Co-authored-by: peilong <peilong.zpl@antgroup.com>
Co-authored-by: 锦呈 <zhangxinhong.zxh@antgroup.com>
Co-authored-by: zhengke.gzk <zhengke.gzk@antgroup.com>
Co-authored-by: huaidong.xhd <huaidong.xhd@antgroup.com>
2025-01-03 17:10:51 +08:00

151 lines
6.2 KiB
Python

# -*- coding: utf-8 -*-
# Copyright 2023 OpenSPG Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied.
import os
import datetime
from knext.reasoner.rest.models.reason_task_response import ReasonTaskResponse
import knext.common.cache
from knext.common.base.client import Client
from knext.common.rest import ApiClient, Configuration
from knext.reasoner import ReasonTask
from knext.reasoner import rest
from knext.reasoner.rest import SpgTypeQueryRequest
from knext.schema.client import SchemaSession
from knext.schema.model.base import SpgTypeEnum
reason_cache = knext.common.cache.SchemaCache()
class ReasonerClient(Client):
"""SPG Reasoner Client."""
def __init__(self, host_addr: str = None, project_id: int = None, namespace=None):
super().__init__(host_addr, str(project_id))
self._rest_client: rest.ReasonerApi = rest.ReasonerApi(
api_client=ApiClient(configuration=Configuration(host=host_addr))
)
self._namespace = namespace or os.environ.get("KAG_PROJECT_NAMESPACE")
self._session = None
# load schema cache
self.get_reason_schema()
def create_session(self):
"""Create session for altering schema."""
schema_session = reason_cache.get(self._project_id)
if not schema_session:
schema_session = SchemaSession(self._rest_client, self._project_id)
reason_cache.put(self._project_id, schema_session)
return schema_session
def get_reason_schema(self):
"""
Create a new session and load schema information.
- Create a session object `schema_session`.
- Iterate through all types in the session and filter out types that are Concepts, Entities, or Events.
- Construct a dictionary where keys are type names and values are the type objects themselves.
- Return the constructed dictionary `schema`.
"""
schema_session = self.create_session()
schema = {
k: v
for k, v in schema_session.spg_types.items()
if v.spg_type_enum
in [SpgTypeEnum.Concept, SpgTypeEnum.Entity, SpgTypeEnum.Event]
}
return schema
def generate_graph_connect_config(self, lib):
"""
Generates the graph connection configuration based on environment variables.
This function first attempts to retrieve the graph store URI from environment variables.
If the URI is not set, it returns the local graph store URL and the local graph state class.
If the URI is set, it retrieves the username, password, and database information from
environment variables and constructs a graph store URL with this information.
Parameters:
lib (reasoner constants): Contains constants and classes related to graph connections.
Returns:
tuple: A tuple containing the graph store URL and the graph state class. If the URI is from
environment variables, the URL is a remote address; otherwise, it is a local address.
"""
# Attempt to get the graph store URI; if not set, default to an empty string
uri = os.environ.get("KAG_GRAPH_STORE_URI", "")
# If URI is empty, return the local graph store URL and the local graph state class
if uri == "":
return lib.LOCAL_GRAPH_STORE_URL, lib.LOCAL_GRAPH_STATE_CLASS
# Retrieve username, password, and database information from environment variables
user = os.getenv("KAG_GRAPH_STORE_USER")
password = os.getenv("KAG_GRAPH_STORE_PASSWORD")
database = os.getenv("KAG_GRAPH_STORE_DATABASE")
namespace = self._namespace or os.environ.get("KAG_PROJECT_NAMESPACE")
# Construct a graph store URL with authentication information
graph_store_url = f"{uri}?user={user}&password={password}&database={database}&namespace={namespace}"
# Return the constructed graph store URL and the local graph state class
return graph_store_url, lib.LOCAL_GRAPH_STATE_CLASS
def execute(self, dsl_content: str, output_file: str = None):
"""
Execute a synchronous builder job in local runner.
"""
task_response: ReasonTaskResponse = self.syn_execute(dsl_content)
task: ReasonTask = task_response.task
if task.status != "FINISH":
print(f"RUN {task.status} {dsl_content}")
else:
default_output_file = output_file or (
f"./{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.csv"
)
show_data = [
task.result_table_result.header
] + task.result_table_result.rows
import pandas as pd
df = pd.DataFrame(show_data)
print(df)
df.to_csv(default_output_file, index=False)
def query_node(self, label, id_value):
req = SpgTypeQueryRequest(
project_id=self._project_id, spg_type=label, ids=[id_value]
)
resp = self._rest_client.query_spg_type_post(spg_type_query_request=req)
if len(resp) == 0:
return {}
return resp[0].properties
def syn_execute(self, dsl_content: str, **kwargs):
task = ReasonTask(project_id=self._project_id, dsl=dsl_content, params=kwargs)
return self._rest_client.reason_run_post(reason_task=task)
if __name__ == "__main__":
sc = ReasonerClient("http://127.0.0.1:8887", 4)
reason_schema = sc.get_reason_schema()
print(reason_schema)
prop_set = sc.query_node("KQA.Others", "Panic_disorder")
import time
start_time = time.time()
ret = sc.syn_execute(
"MATCH (n:KQA.Others)-[p:rdf_expand()]-(o:Entity) WHERE n.id in $nid and o.id in $oid RETURN p",
start_alias="n",
nid='["Panic_disorder"]',
oid='["Anxiety_and_nervousness"]',
)
print(ret)
print(f"cost={time.time() - start_time}")