From 84a2266e520b69bbae2e6bb85786cc16e0e19e81 Mon Sep 17 00:00:00 2001 From: Qu Date: Sun, 24 Dec 2023 17:54:27 +0800 Subject: [PATCH] fix --- .../knext/knext/component/builder/mapping.py | 7 ++++--- .../builder/job/state_and_indicator.py | 18 ++++++++---------- python/knext/knext/operator/op.py | 6 ++---- 3 files changed, 14 insertions(+), 17 deletions(-) diff --git a/python/knext/knext/component/builder/mapping.py b/python/knext/knext/component/builder/mapping.py index 985db23b..8a09b48a 100644 --- a/python/knext/knext/component/builder/mapping.py +++ b/python/knext/knext/component/builder/mapping.py @@ -422,8 +422,9 @@ class SubGraphMapping(Mapping): operator_config=predicting_strategy.to_rest() ) elif not predicting_strategy: - if (self.spg_type_name, predicate_name) in PredictOp.bind_schemas: - op_name = PredictOp.bind_schemas[(self.spg_type_name, predicate_name)] + object_type_name = spg_type.properties[predicate_name].object_type_name + if (self.spg_type_name, predicate_name, object_type_name) in PredictOp.bind_schemas: + op_name = PredictOp.bind_schemas[(self.spg_type_name, predicate_name, object_type_name)] op = PredictOp.by_name(op_name)() strategy_config = rest.OperatorPredictingConfig( operator_config=op.to_rest() @@ -434,7 +435,7 @@ class SubGraphMapping(Mapping): raise ValueError(f"Invalid predicting_strategy [{predicting_strategy}].") if strategy_config: predicting_configs.append( - strategy_config + rest.PredictingConfig(target=predicate_name,predicting_config=strategy_config) ) if isinstance(self.subject_fusing_strategy, FuseOp): diff --git a/python/knext/knext/examples/financial/builder/job/state_and_indicator.py b/python/knext/knext/examples/financial/builder/job/state_and_indicator.py index b5711fea..c0133bed 100644 --- a/python/knext/knext/examples/financial/builder/job/state_and_indicator.py +++ b/python/knext/knext/examples/financial/builder/job/state_and_indicator.py @@ -1,21 +1,20 @@ # -*- coding: utf-8 -*- from knext.client.model.builder_job import BuilderJob -from knext.api.component import CSVReader, SPGTypeMapping, KGWriter -from knext.component.builder import LLMBasedExtractor, SubGraphMapping +from knext.api.component import ( + CSVReader, + LLMBasedExtractor, + KGWriter, + SubGraphMapping +) from nn4k.invoker import LLMInvoker -try: - from schema.financial_schema_helper import Financial -except: - pass - class StateAndIndicator(BuilderJob): def build(self): source = CSVReader( - local_path="/Users/jier/openspg/python/knext/knext/examples/financial/builder/job/data/document.csv", + local_path="builder/job/data/document.csv", columns=["input"], start_row=2 ) @@ -23,7 +22,7 @@ class StateAndIndicator(BuilderJob): from knext.examples.financial.builder.operator.IndicatorNER import IndicatorNER from knext.examples.financial.builder.operator.IndicatorREL import IndicatorREL from knext.examples.financial.builder.operator.IndicatorLOGIC import IndicatorLOGIC - extract = LLMBasedExtractor(llm=LLMInvoker.from_config("/Users/jier/openspg/python/knext/knext/examples/financial/builder/model/openai_infer.json"), + extract = LLMBasedExtractor(llm=LLMInvoker.from_config("builder/model/openai_infer.json"), prompt_ops=[IndicatorNER(), IndicatorREL(), IndicatorLOGIC()] ) @@ -36,7 +35,6 @@ class StateAndIndicator(BuilderJob): indicator_mapping = SubGraphMapping(spg_type_name="Financial.Indicator")\ .add_mapping_field("id", "id") \ .add_mapping_field("name", "name") - # .add_predicting_field("isA") sink = KGWriter() diff --git a/python/knext/knext/operator/op.py b/python/knext/knext/operator/op.py index f8fe6672..249fefc4 100644 --- a/python/knext/knext/operator/op.py +++ b/python/knext/knext/operator/op.py @@ -134,7 +134,7 @@ class PredictOp(BaseOp, ABC): bind_to: Tuple[SPGTypeName, PropertyName, SPGTypeName] - bind_schemas: Dict[Tuple[SPGTypeName, PropertyName], str] = {} + bind_schemas: Dict[Tuple[SPGTypeName, PropertyName, SPGTypeName], str] = {} def invoke(self, subject_record: SPGRecord) -> List[SPGRecord]: raise NotImplementedError( @@ -143,9 +143,7 @@ class PredictOp(BaseOp, ABC): @staticmethod def _pre_process(*inputs): - return [ - SPGRecord.from_dict(input) for input in inputs[0] - ], + return SPGRecord.from_dict(inputs[0]), @staticmethod def _post_process(output) -> Dict[str, Any]: