KAG/kag/builder/component/vectorizer/batch_vectorizer.py
zhuzhongshu123 f5ad5f1101
refactor(all): kag v0.6 (#174)
* add path find

* fix find path

* spg guided relation extraction

* fix dict parse with same key

* rename graphalgoclient to graphclient

* rename graphalgoclient to graphclient

* file reader supports http url

* add checkpointer class

* parser supports checkpoint

* add build

* remove incorrect logs

* remove logs

* update examples

* update chain checkpointer

* vectorizer batch size set to 32

* add a zodb backended checkpointer

* add a zodb backended checkpointer

* fix zodb based checkpointer

* add thread for zodb IO

* fix(common): resolve mutlithread conflict in zodb IO

* fix(common): load existing zodb checkpoints

* update examples

* update examples

* fix zodb writer

* add docstring

* fix jieba version mismatch

* commit kag_config-tc.yaml

1、rename type to register_name
2、put a uniqe & specific name to register_name
3、rename reader to scanner
4、rename parser to reader
5、rename num_parallel to num_parallel_file, rename chain_level_num_paralle to num_parallel_chain_of_file
6、rename kag_extractor to schema_free_extractor, schema_base_extractor to schema_constraint_extractor
7、pre-define llm & vectorize_model and refer them in the yaml file

Issues to be resolved:
1、examples of event extract & spg extract
2、statistic of indexer, such as nums of nodes & edges extracted, ratio of llm invoke.
3、Exceptions such as Debt, account does not exist should be thrown in llm invoke.
4、conf of solver need to be re-examined.

* commit kag_config-tc.yaml

1、rename type to register_name
2、put a uniqe & specific name to register_name
3、rename reader to scanner
4、rename parser to reader
5、rename num_parallel to num_parallel_file, rename chain_level_num_paralle to num_parallel_chain_of_file
6、rename kag_extractor to schema_free_extractor, schema_base_extractor to schema_constraint_extractor
7、pre-define llm & vectorize_model and refer them in the yaml file

Issues to be resolved:
1、examples of event extract & spg extract
2、statistic of indexer, such as nums of nodes & edges extracted, ratio of llm invoke.
3、Exceptions such as Debt, account does not exist should be thrown in llm invoke.
4、conf of solver need to be re-examined.

* 1、fix bug in base_table_splitter

* 1、fix bug in base_table_splitter

* 1、fix bug in default_chain

* 增加solver

* add kag

* update outline splitter

* add main test

* add op

* code refactor

* add tools

* fix outline splitter

* fix outline prompt

* graph api pass

* commit with page rank

* add search api and graph api

* add markdown report

* fix vectorizer num batch compute

* add retry for vectorize model call

* update markdown reader

* update markdown reader

* update pdf reader

* raise extractor failure

* add default expr

* add log

* merge jc reader features

* rm import

* add build

* fix zodb based checkpointer

* add thread for zodb IO

* fix(common): resolve mutlithread conflict in zodb IO

* fix(common): load existing zodb checkpoints

* update examples

* update examples

* fix zodb writer

* add docstring

* fix jieba version mismatch

* commit kag_config-tc.yaml

1、rename type to register_name
2、put a uniqe & specific name to register_name
3、rename reader to scanner
4、rename parser to reader
5、rename num_parallel to num_parallel_file, rename chain_level_num_paralle to num_parallel_chain_of_file
6、rename kag_extractor to schema_free_extractor, schema_base_extractor to schema_constraint_extractor
7、pre-define llm & vectorize_model and refer them in the yaml file

Issues to be resolved:
1、examples of event extract & spg extract
2、statistic of indexer, such as nums of nodes & edges extracted, ratio of llm invoke.
3、Exceptions such as Debt, account does not exist should be thrown in llm invoke.
4、conf of solver need to be re-examined.

* commit kag_config-tc.yaml

1、rename type to register_name
2、put a uniqe & specific name to register_name
3、rename reader to scanner
4、rename parser to reader
5、rename num_parallel to num_parallel_file, rename chain_level_num_paralle to num_parallel_chain_of_file
6、rename kag_extractor to schema_free_extractor, schema_base_extractor to schema_constraint_extractor
7、pre-define llm & vectorize_model and refer them in the yaml file

Issues to be resolved:
1、examples of event extract & spg extract
2、statistic of indexer, such as nums of nodes & edges extracted, ratio of llm invoke.
3、Exceptions such as Debt, account does not exist should be thrown in llm invoke.
4、conf of solver need to be re-examined.

* 1、fix bug in base_table_splitter

* 1、fix bug in base_table_splitter

* 1、fix bug in default_chain

* update outline splitter

* add main test

* add markdown report

* code refactor

* fix outline splitter

* fix outline prompt

* update markdown reader

* fix vectorizer num batch compute

* add retry for vectorize model call

* update markdown reader

* raise extractor failure

* rm parser

* run pipeline

* add config option of whether to perform llm config check, default to false

* fix

* recover pdf reader

* several components can be null for default chain

* 支持完整qa运行

* add if

* remove unused code

* 使用chunk兜底

* excluded source relation to choose

* add generate

* default recall 10

* add local memory

* 排除相似边

* 增加保护

* 修复并发问题

* add debug logger

* 支持topk参数化

* 支持chunk截断和调整spo select 的prompt

* 增加查询请求保护

* 增加force_chunk配置

* fix entity linker algorithm

* 增加sub query改写

* fix md reader dup in test

* fix

* merge knext to kag parallel

* fix package

* 修复指标下跌问题

* scanner update

* scanner update

* add doc and update example scripts

* fix

* add bridge to spg server

* add format

* fix bridge

* update conf for baike

* disable ckpt for spg server runner

* llm invoke error default raise exceptions

* chore(version): bump version to X.Y.Z

* update default response generation prompt

* add method getSummarizationMetrics

* fix(common): fix project conf empty error

* fix typo

* 增加上报信息

* 修改main solver

* postprocessor support spg server

* 修改solver支持名

* fix language

* 修改chunker接口,增加openapi

* rename vectorizer to vectorize_model in spg server config

* generate_random_string start with gen

* add knext llm vector checker

* add knext llm vector checker

* add knext llm vector checker

* solver移除默认值

* udpate yaml and register_name for baike

* udpate yaml and register_name for baike

* remove config key check

* 修复llmmodule

* fix knext project

* udpate yaml and register_name for examples

* udpate yaml and register_name for examples

* Revert "udpate yaml and register_name for examples"

This reverts commit b3fa5ca9ba749e501133ac67bd8746027ab839d9.

* update register name

* fix

* fix

* support multiple resigter names

* update component

* update reader register names (#183)

* fix markdown reader

* fix llm client for retry

* feat(common): add processed chunk id checkpoint (#185)

* update reader register names

* add processed chunk id checkpoint

* feat(example): add example config (#186)

* update reader register names

* add processed chunk id checkpoint

* add example config file

* add max_workers parameter for getSummarizationMetrics to make it faster

* add csqa data generation script generate_data.py

* commit generated csqa builder and solver data

* add csqa basic project files

* adjust split_length and num_threads_per_chain to match lightrag settings

* ignore ckpt dirs

* add csqa evaluation script eval.py

* save evaluation scripts summarization_metrics.py and factual_correctness.py

* save LightRAG output csqa_lightrag_answers.json

* ignore KAG output csqa_kag_answers.json

* add README.md for CSQA

* fix(solver): fix solver pipeline conf (#191)

* update reader register names

* add processed chunk id checkpoint

* add example config file

* update solver pipeline config

* fix project create

* update links and file paths

* reformat csqa kag_config.yaml

* reformat csqa python files

* reformat getSummarizationMetrics and compare_summarization_answers

* fix(solver): fix solver config (#192)

* update reader register names

* add processed chunk id checkpoint

* add example config file

* update solver pipeline config

* fix project create

* fix main solver conf

* add except

* fix typo in csqa README.md

* feat(conf): support reinitialize config for call from java side (#199)

* update reader register names

* add processed chunk id checkpoint

* add example config file

* update solver pipeline config

* fix project create

* fix main solver conf

* support reinitialize config for java call

* revert default response generation prompt

* update project list

* add README.md for the hotpotqa, 2wiki and musique examples

* 增加spo检索

* turn off kag config dump by default

* turn off knext schema dump by default

* add .gitignore and fix kag_config.yaml

* add README.md for the medicine example

* add README.md for the supplychain example

* bugfix for risk mining

* use exact out

* refactor(solver): format solver code (#205)

* update reader register names

* add processed chunk id checkpoint

* add example config file

* update solver pipeline config

* fix project create

* fix main solver conf

* support reinitialize config for java call

* black format

---------

Co-authored-by: peilong <peilong.zpl@antgroup.com>
Co-authored-by: 锦呈 <zhangxinhong.zxh@antgroup.com>
Co-authored-by: zhengke.gzk <zhengke.gzk@antgroup.com>
Co-authored-by: huaidong.xhd <huaidong.xhd@antgroup.com>
2025-01-03 17:10:51 +08:00

227 lines
9.0 KiB
Python

# -*- coding: utf-8 -*-
# Copyright 2023 OpenSPG Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied.
from collections import defaultdict
from typing import List
from tenacity import stop_after_attempt, retry
from kag.builder.model.sub_graph import SubGraph
from kag.common.conf import KAG_PROJECT_CONF
from kag.common.utils import get_vector_field_name
from kag.interface import VectorizerABC, VectorizeModelABC
from knext.schema.client import SchemaClient
from knext.schema.model.base import IndexTypeEnum
from knext.common.base.runnable import Input, Output
class EmbeddingVectorPlaceholder(object):
def __init__(self, number, properties, vector_field, property_key, property_value):
self._number = number
self._properties = properties
self._vector_field = vector_field
self._property_key = property_key
self._property_value = property_value
self._embedding_vector = None
def replace(self):
if self._embedding_vector is not None:
self._properties[self._vector_field] = self._embedding_vector
def __repr__(self):
return repr(self._number)
class EmbeddingVectorManager(object):
def __init__(self):
self._placeholders = []
def get_placeholder(self, properties, vector_field):
for property_key, property_value in properties.items():
field_name = get_vector_field_name(property_key)
if field_name != vector_field:
continue
if not property_value:
return None
if not isinstance(property_value, str):
message = f"property {property_key!r} must be string to generate embedding vector, got {property_value} with type {type(property_value)}"
raise RuntimeError(message)
num = len(self._placeholders)
placeholder = EmbeddingVectorPlaceholder(
num, properties, vector_field, property_key, property_value
)
self._placeholders.append(placeholder)
return placeholder
return None
def _get_text_batch(self):
text_batch = dict()
for placeholder in self._placeholders:
property_value = placeholder._property_value
if property_value not in text_batch:
text_batch[property_value] = list()
text_batch[property_value].append(placeholder)
return text_batch
def _generate_vectors(self, vectorizer, text_batch, batch_size=32):
texts = list(text_batch)
if not texts:
return []
if len(texts) % batch_size == 0:
n_batchs = len(texts) // batch_size
else:
n_batchs = len(texts) // batch_size + 1
embeddings = []
for idx in range(n_batchs):
start = idx * batch_size
end = min(start + batch_size, len(texts))
embeddings.extend(vectorizer.vectorize(texts[start:end]))
return embeddings
def _fill_vectors(self, vectors, text_batch):
for vector, (_text, placeholders) in zip(vectors, text_batch.items()):
for placeholder in placeholders:
placeholder._embedding_vector = vector
def batch_generate(self, vectorizer, batch_size=32):
text_batch = self._get_text_batch()
vectors = self._generate_vectors(vectorizer, text_batch, batch_size)
self._fill_vectors(vectors, text_batch)
def patch(self):
for placeholder in self._placeholders:
placeholder.replace()
class EmbeddingVectorGenerator(object):
def __init__(self, vectorizer, vector_index_meta=None, extra_labels=("Entity",)):
self._vectorizer = vectorizer
self._extra_labels = extra_labels
self._vector_index_meta = vector_index_meta or {}
def batch_generate(self, node_batch, batch_size=32):
manager = EmbeddingVectorManager()
vector_index_meta = self._vector_index_meta
for node_item in node_batch:
label, properties = node_item
labels = [label]
if self._extra_labels:
labels.extend(self._extra_labels)
for label in labels:
if label not in vector_index_meta:
continue
for vector_field in vector_index_meta[label]:
if vector_field in properties:
continue
placeholder = manager.get_placeholder(properties, vector_field)
if placeholder is not None:
properties[vector_field] = placeholder
manager.batch_generate(self._vectorizer, batch_size)
manager.patch()
@VectorizerABC.register("batch")
@VectorizerABC.register("batch_vectorizer")
class BatchVectorizer(VectorizerABC):
"""
A class for generating embedding vectors for node attributes in a SubGraph in batches.
This class inherits from VectorizerABC and provides the functionality to generate embedding vectors
for node attributes in a SubGraph in batches. It uses a specified vectorization model and processes
the nodes of a specified batch size.
Attributes:
project_id (int): The ID of the project associated with the SubGraph.
vec_meta (defaultdict): Metadata for vector fields in the SubGraph.
vectorize_model (VectorizeModelABC): The model used for generating embedding vectors.
batch_size (int): The size of the batches in which to process the nodes.
"""
def __init__(self, vectorize_model: VectorizeModelABC, batch_size: int = 32):
"""
Initializes the BatchVectorizer with the specified vectorization model and batch size.
Args:
vectorize_model (VectorizeModelABC): The model used for generating embedding vectors.
batch_size (int): The size of the batches in which to process the nodes. Defaults to 32.
"""
super().__init__()
self.project_id = KAG_PROJECT_CONF.project_id
# self._init_graph_store()
self.vec_meta = self._init_vec_meta()
self.vectorize_model = vectorize_model
self.batch_size = batch_size
def _init_vec_meta(self):
"""
Initializes the vector metadata for the SubGraph.
Returns:
defaultdict: Metadata for vector fields in the SubGraph.
"""
vec_meta = defaultdict(list)
schema_client = SchemaClient(project_id=self.project_id)
spg_types = schema_client.load()
for type_name, spg_type in spg_types.items():
for prop_name, prop in spg_type.properties.items():
if prop_name == "name" or prop.index_type in [
IndexTypeEnum.Vector,
IndexTypeEnum.TextAndVector,
]:
vec_meta[type_name].append(get_vector_field_name(prop_name))
return vec_meta
@retry(stop=stop_after_attempt(3))
def _generate_embedding_vectors(self, input_subgraph: SubGraph) -> SubGraph:
"""
Generates embedding vectors for the nodes in the input SubGraph.
Args:
input_subgraph (SubGraph): The SubGraph for which to generate embedding vectors.
Returns:
SubGraph: The modified SubGraph with generated embedding vectors.
"""
node_list = []
node_batch = []
for node in input_subgraph.nodes:
if not node.id or not node.name:
continue
properties = {"id": node.id, "name": node.name}
properties.update(node.properties)
node_list.append((node, properties))
node_batch.append((node.label, properties.copy()))
generator = EmbeddingVectorGenerator(self.vectorize_model, self.vec_meta)
generator.batch_generate(node_batch, self.batch_size)
for (node, properties), (_node_label, new_properties) in zip(
node_list, node_batch
):
for key, value in properties.items():
if key in new_properties and new_properties[key] == value:
del new_properties[key]
node.properties.update(new_properties)
return input_subgraph
def _invoke(self, input_subgraph: Input, **kwargs) -> List[Output]:
"""
Invokes the generation of embedding vectors for the input SubGraph.
Args:
input_subgraph (Input): The SubGraph for which to generate embedding vectors.
**kwargs: Additional keyword arguments, currently unused but kept for potential future expansion.
Returns:
List[Output]: A list containing the modified SubGraph with generated embedding vectors.
"""
modified_input = self._generate_embedding_vectors(input_subgraph)
return [modified_input]