diff --git a/python/knext/api/__init__.py b/python/knext/api/__init__.py index e69de29b..9291886f 100644 --- a/python/knext/api/__init__.py +++ b/python/knext/api/__init__.py @@ -0,0 +1,12 @@ +from knext.api.operator import ( + BaseOp, + ExtractOp, + KnowledgeExtractOp, + EntityLinkOp, + LinkOp, + EntityFuseOp, + FuseOp, + PropertyNormalizeOp, + NormalizeOp, + PromptOp, +) diff --git a/python/knext/api/chain.py b/python/knext/api/chain.py new file mode 100644 index 00000000..e69de29b diff --git a/python/knext/api/client.py b/python/knext/api/client.py new file mode 100644 index 00000000..e69de29b diff --git a/python/knext/api/component.py b/python/knext/api/component.py new file mode 100644 index 00000000..e69de29b diff --git a/python/knext/api/operator.py b/python/knext/api/operator.py new file mode 100644 index 00000000..5f0939d9 --- /dev/null +++ b/python/knext/api/operator.py @@ -0,0 +1,22 @@ +from knext.core.builder.operator.model.op import BaseOp, ExtractOp, LinkOp, FuseOp, NormalizeOp, PromptOp + + +KnowledgeExtractOp = ExtractOp +EntityLinkOp = LinkOp +EntityFuseOp = FuseOp +PropertyNormalizeOp = NormalizeOp + +__all__ = [ + "BaseOp", + "ExtractOp", + "LinkOp", + "FuseOp", + "NormalizeOp", + "PromptOp", + "LinkOp", +] + [ + "KnowledgeExtractOp", + "EntityLinkOp", + "EntityFuseOp", + "PropertyNormalizeOp", +] diff --git a/python/knext/chain/__init__.py b/python/knext/chain/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/python/knext/chain/base.py b/python/knext/chain/base.py new file mode 100644 index 00000000..25c82f43 --- /dev/null +++ b/python/knext/chain/base.py @@ -0,0 +1,8 @@ +from abc import ABC + + +class Chain(ABC): + + def dag(self): + pass + diff --git a/python/knext/chain/builder_chain.py b/python/knext/chain/builder_chain.py new file mode 100644 index 00000000..a26c5885 --- /dev/null +++ b/python/knext/chain/builder_chain.py @@ -0,0 +1,26 @@ +from typing import Union + +from knext.chain.base import Chain +from knext.component.base import RESTable +from knext.component.builder.extractor import SPGExtractor +from knext.component.builder.mapping import Mapping +from knext.component.builder.sink_writer import SinkWriter +from knext.component.builder.source_reader import SourceReader + + +class BuilderChain(RESTable, Chain): + + source: SourceReader + + process: Union[SPGExtractor, Mapping] + + sink: SinkWriter + + @property + def input_types(self): + return None + + @property + def output_types(self): + return None + diff --git a/python/knext/client/__init__.py b/python/knext/client/__init__.py index 19913efa..e69de29b 100644 --- a/python/knext/client/__init__.py +++ b/python/knext/client/__init__.py @@ -1,10 +0,0 @@ -# Copyright 2023 Ant Group CO., Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except -# in compliance with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software distributed under the License -# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. diff --git a/python/knext/client/base.py b/python/knext/client/base.py new file mode 100644 index 00000000..d4faf968 --- /dev/null +++ b/python/knext/client/base.py @@ -0,0 +1,5 @@ +from abc import ABC + + +class Client(ABC): + pass \ No newline at end of file diff --git a/python/knext/core/builder/job/builder.py b/python/knext/client/builder.py similarity index 95% rename from python/knext/core/builder/job/builder.py rename to python/knext/client/builder.py index 459f6ae2..ee41a4b5 100644 --- a/python/knext/core/builder/job/builder.py +++ b/python/knext/client/builder.py @@ -13,11 +13,12 @@ import os from knext import rest +from knext.client.base import Client from knext.common.class_register import register_from_package -from knext.core.builder.job.model.builder_job import BuilderJob +from knext.core.builder.job.builder_job import BuilderJob -class Builder: +class BuilderClient(Client): """SPG Builder Client.""" def __init__(self): diff --git a/python/knext/client/marklang/__init__.py b/python/knext/client/marklang/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/python/knext/core/schema/concept_rule_ml.py b/python/knext/client/marklang/concept_rule_ml.py similarity index 99% rename from python/knext/core/schema/concept_rule_ml.py rename to python/knext/client/marklang/concept_rule_ml.py index 692fee73..811cd58e 100644 --- a/python/knext/core/schema/concept_rule_ml.py +++ b/python/knext/client/marklang/concept_rule_ml.py @@ -14,7 +14,7 @@ import re from knext import rest from knext.core.schema import Schema -from knext.core.schema.model.base import SpgTypeEnum +from knext.client.model.base import SpgTypeEnum class SPGConceptRuleMarkLang: diff --git a/python/knext/core/schema/schema_ml.py b/python/knext/client/marklang/schema_ml.py similarity index 99% rename from python/knext/core/schema/schema_ml.py rename to python/knext/client/marklang/schema_ml.py index 94a3586d..20bf88a7 100644 --- a/python/knext/core/schema/schema_ml.py +++ b/python/knext/client/marklang/schema_ml.py @@ -23,7 +23,7 @@ from knext.core.schema.model import ( Property, Relation, ) -from knext.core.schema.model.base import ( +from knext.client.model.base import ( HypernymPredicateEnum, BasicTypeEnum, ConstraintTypeEnum, diff --git a/python/knext/client/model/__init__.py b/python/knext/client/model/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/python/knext/core/schema/model/base.py b/python/knext/client/model/base.py similarity index 99% rename from python/knext/core/schema/model/base.py rename to python/knext/client/model/base.py index 157adbcd..3741ae24 100644 --- a/python/knext/core/schema/model/base.py +++ b/python/knext/client/model/base.py @@ -793,7 +793,7 @@ class BaseSpgType(ABC): import knext class_obj = getattr( - knext.core.schema.model.spg_type, f"{SpgTypeEnum(type_enum).name}Type" + knext.client.model.spg_type, f"{SpgTypeEnum(type_enum).name}Type" ) return class_obj diff --git a/python/knext/core/builder/job/model/builder_job.py b/python/knext/client/model/builder_job.py similarity index 100% rename from python/knext/core/builder/job/model/builder_job.py rename to python/knext/client/model/builder_job.py diff --git a/python/knext/core/schema/model/property.py b/python/knext/client/model/property.py similarity index 97% rename from python/knext/core/schema/model/property.py rename to python/knext/client/model/property.py index 724ee1f4..6974fbf8 100644 --- a/python/knext/core/schema/model/property.py +++ b/python/knext/client/model/property.py @@ -12,7 +12,7 @@ from typing import List, Type, Union, Dict -from knext.core.schema.model.base import ( +from knext.client.model.base import ( ConstraintTypeEnum, PropertyGroupEnum, BaseProperty, diff --git a/python/knext/core/schema/model/relation.py b/python/knext/client/model/relation.py similarity index 94% rename from python/knext/core/schema/model/relation.py rename to python/knext/client/model/relation.py index 3b4c9430..1cb4c7dc 100644 --- a/python/knext/core/schema/model/relation.py +++ b/python/knext/client/model/relation.py @@ -12,8 +12,8 @@ from typing import List, Type, Dict -from knext.core.schema.model.base import BaseProperty -from knext.core.schema.model.property import Property +from knext.client.model.base import BaseProperty +from knext.client.model.property import Property class Relation(BaseProperty): diff --git a/python/knext/core/schema/model/spg_type.py b/python/knext/client/model/spg_type.py similarity index 99% rename from python/knext/core/schema/model/spg_type.py rename to python/knext/client/model/spg_type.py index 908c7c8f..f9262ff6 100644 --- a/python/knext/core/schema/model/spg_type.py +++ b/python/knext/client/model/spg_type.py @@ -19,15 +19,15 @@ from knext.core.builder.operator import ( EntityFuseOp, EntityLinkOp, ) -from knext.core.schema.model.base import ( +from knext.client.model.base import ( BaseSpgType, SpgTypeEnum, ROOT_TYPE_UNIQUE_NAME, HypernymPredicateEnum, ConstraintTypeEnum, ) -from knext.core.schema.model.property import Property -from knext.core.schema.model.relation import Relation +from knext.client.model.property import Property +from knext.client.model.relation import Relation class EntityType(BaseSpgType): diff --git a/python/knext/core/builder/operator/operator.py b/python/knext/client/operator.py similarity index 97% rename from python/knext/core/builder/operator/operator.py rename to python/knext/client/operator.py index 0a18d0a5..c897775d 100644 --- a/python/knext/core/builder/operator/operator.py +++ b/python/knext/client/operator.py @@ -15,6 +15,7 @@ from enum import Enum from typing import Dict from knext import rest +from knext.client.base import Client from knext.common.class_register import register_from_package from knext.core.builder.operator.model.op import BaseOp @@ -26,7 +27,7 @@ class OperatorTypeEnum(str, Enum): KnowledgeExtractOp = "KNOWLEDGE_EXTRACT" -class Operator(object): +class OperatorClient(Client): """SPG Operator Client.""" def __init__( @@ -65,7 +66,7 @@ class Operator(object): if op.bind_to is not None: from knext.core.schema import Schema - from knext.core.schema.model.base import SpgTypeEnum + from knext.client.model.base import SpgTypeEnum schema_session = Schema().create_session() spg_type = schema_session.get(op.bind_to) diff --git a/python/knext/core/reasoner/reasoner.py b/python/knext/client/reasoner.py similarity index 98% rename from python/knext/core/reasoner/reasoner.py rename to python/knext/client/reasoner.py index 7e47961d..04036702 100644 --- a/python/knext/core/reasoner/reasoner.py +++ b/python/knext/client/reasoner.py @@ -29,7 +29,7 @@ class LocalClusterModeEnum(str, Enum): Remote = "REMOTE" -class Reasoner: +class ReasonerClient(Client): """SPG Reasoner Client.""" def __init__(self): diff --git a/python/knext/core/schema/schema.py b/python/knext/client/schema.py similarity index 97% rename from python/knext/core/schema/schema.py rename to python/knext/client/schema.py index 272ab941..1df2aea8 100644 --- a/python/knext/core/schema/schema.py +++ b/python/knext/client/schema.py @@ -14,11 +14,12 @@ import os from typing import List from knext import rest +from knext.client.base import Client from knext.core.schema.model import Relation -from knext.core.schema.model.base import BaseSpgType, AlterOperationEnum, SpgTypeEnum +from knext.client.model.base import BaseSpgType, AlterOperationEnum, SpgTypeEnum -class Schema: +class SchemaClient(Client): """ """ def __init__(self): diff --git a/python/knext/core/wrapper/search_client.py b/python/knext/client/search.py similarity index 100% rename from python/knext/core/wrapper/search_client.py rename to python/knext/client/search.py diff --git a/python/knext/client/command/__init__.py b/python/knext/command/__init__.py similarity index 100% rename from python/knext/client/command/__init__.py rename to python/knext/command/__init__.py diff --git a/python/knext/client/exception.py b/python/knext/command/exception.py similarity index 100% rename from python/knext/client/exception.py rename to python/knext/command/exception.py diff --git a/python/knext/client/knext_cli.py b/python/knext/command/knext_cli.py similarity index 71% rename from python/knext/client/knext_cli.py rename to python/knext/command/knext_cli.py index 30f820f1..ad9e2261 100644 --- a/python/knext/client/knext_cli.py +++ b/python/knext/command/knext_cli.py @@ -15,24 +15,24 @@ import sys import click -from knext.client.command.builder import get_job -from knext.client.command.builder import submit_job -from knext.client.command.config import GLOBAL_CONFIG, LOCAL_CONFIG, CFG_PREFIX -from knext.client.command.config import edit_config -from knext.client.command.config import get_config -from knext.client.command.config import list_config -from knext.client.command.operator import list_operator -from knext.client.command.operator import publish_operator -from knext.client.command.project import create_project -from knext.client.command.project import list_project -from knext.client.command.reasoner import query_reasoner_job -from knext.client.command.reasoner import run_dsl -from knext.client.command.reasoner import submit_reasoner_job -from knext.client.command.schema import commit_schema -from knext.client.command.schema import diff_schema -from knext.client.command.schema import list_schema -from knext.client.command.schema import reg_concept_rule -from knext.client.exception import _ApiExceptionHandler +from knext.command.sub_command.builder import get_job +from knext.command.sub_command.builder import submit_job +from knext.command.sub_command.config import GLOBAL_CONFIG, LOCAL_CONFIG, CFG_PREFIX +from knext.command.sub_command.config import edit_config +from knext.command.sub_command.config import get_config +from knext.command.sub_command.config import list_config +from knext.command.sub_command.operator import list_operator +from knext.command.sub_command.operator import publish_operator +from knext.command.sub_command.project import create_project +from knext.command.sub_command.project import list_project +from knext.command.sub_command.reasoner import query_reasoner_job +from knext.command.sub_command.reasoner import run_dsl +from knext.command.sub_command.reasoner import submit_reasoner_job +from knext.command.sub_command.schema import commit_schema +from knext.command.sub_command.schema import diff_schema +from knext.command.sub_command.schema import list_schema +from knext.command.sub_command.schema import reg_concept_rule +from knext.command.exception import _ApiExceptionHandler from knext import __version__ diff --git a/python/knext/core/__init__.py b/python/knext/command/sub_command/__init__.py similarity index 100% rename from python/knext/core/__init__.py rename to python/knext/command/sub_command/__init__.py diff --git a/python/knext/client/command/builder.py b/python/knext/command/sub_command/builder.py similarity index 100% rename from python/knext/client/command/builder.py rename to python/knext/command/sub_command/builder.py diff --git a/python/knext/client/command/config.py b/python/knext/command/sub_command/config.py similarity index 100% rename from python/knext/client/command/config.py rename to python/knext/command/sub_command/config.py diff --git a/python/knext/client/command/operator.py b/python/knext/command/sub_command/operator.py similarity index 100% rename from python/knext/client/command/operator.py rename to python/knext/command/sub_command/operator.py diff --git a/python/knext/client/command/project.py b/python/knext/command/sub_command/project.py similarity index 100% rename from python/knext/client/command/project.py rename to python/knext/command/sub_command/project.py diff --git a/python/knext/client/command/reasoner.py b/python/knext/command/sub_command/reasoner.py similarity index 100% rename from python/knext/client/command/reasoner.py rename to python/knext/command/sub_command/reasoner.py diff --git a/python/knext/client/command/schema.py b/python/knext/command/sub_command/schema.py similarity index 94% rename from python/knext/client/command/schema.py rename to python/knext/command/sub_command/schema.py index c454b08b..0f06aada 100644 --- a/python/knext/client/command/schema.py +++ b/python/knext/command/sub_command/schema.py @@ -15,8 +15,8 @@ from pathlib import Path import click -from knext.core.schema.concept_rule_ml import SPGConceptRuleMarkLang -from knext.core.schema.schema_ml import SPGSchemaMarkLang +from knext.client.marklang.concept_rule_ml import SPGConceptRuleMarkLang +from knext.client.marklang.schema_ml import SPGSchemaMarkLang def list_schema(): diff --git a/python/knext/component/__init__.py b/python/knext/component/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/python/knext/component/base.py b/python/knext/component/base.py new file mode 100644 index 00000000..2c3a6b7f --- /dev/null +++ b/python/knext/component/base.py @@ -0,0 +1,116 @@ +# -*- coding: utf-8 -*- +# Copyright 2023 Ant Group CO., Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. + + +from abc import ABC, abstractmethod +from enum import Enum +from typing import List, Union, TypeVar, Generic, Any, Dict, Tuple, Type + +from knext import rest + + +Other = TypeVar("Other") + + +class ComponentTypeEnum(str, Enum): + Builder = "BUILDER" + + +class ComponentLabelEnum(str, Enum): + SourceReader = "SOURCE_READER" + Extractor = "EXTRACTOR" + Mapping = "MAPPING" + Evaluator = "EVALUATOR" + SinkWriter = "SINK_WRITER" + + +class MappingTypeEnum(str, Enum): + SPGType = "SPG_TYPE" + Relation = "RELATION" + + +class SPGTypeHelper: + pass + + +class PropertyHelper: + pass + + +class RESTable(ABC): + + @property + def upstream_types(self): + raise NotImplementedError("To be implemented in subclass") + + @property + def downstream_types(self): + raise NotImplementedError("To be implemented in subclass") + + @abstractmethod + def to_rest(self): + raise NotImplementedError("To be implemented in subclass") + + @classmethod + def from_rest(cls, node: rest.Node): + raise NotImplementedError("To be implemented in subclass") + + @abstractmethod + def submit(self): + raise NotImplementedError("To be implemented in subclass") + + +class Runnable(ABC): + + @property + def input_types(self) -> Input: + return + + @property + def output_types(self) -> Output: + return + + @abstractmethod + def invoke(self, input: Input) -> Output: + raise NotImplementedError("To be implemented in subclass") + + def __rshift__( + self, + other: Type['Runnable'] + ) -> Type['Runnable']: + """Compose this runnable with another object to create a RunnableSequence.""" + return Chain(first=self, last=coerce_to_runnable(other)) + + +class Component(ABC): + """ + Base class for all component. + """ + + def id(self): + return str(id(self)) + + @property + def type(self): + return + + @property + def label(self): + return + + @property + def name(self): + return + + def to_dict(self): + return self.__dict__ + diff --git a/python/knext/component/builder/__init__.py b/python/knext/component/builder/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/python/knext/component/builder/extractor.py b/python/knext/component/builder/extractor.py new file mode 100644 index 00000000..2e343f67 --- /dev/null +++ b/python/knext/component/builder/extractor.py @@ -0,0 +1,124 @@ +from abc import ABC +from typing import Union, Mapping, Dict, List + +from NN4K.invoker.base import ModelInvoker +from knext import rest +from knext.component.base import RESTable, Component, ComponentTypeEnum, ComponentLabelEnum, Runnable, Input, Output +from knext.component.builder.source_reader import SourceReader +from knext.core.builder.operator.model.op import PromptOp + + +class SPGExtractor(RESTable, Component, ABC): + + @property + def upstream_types(self): + return Union[SourceReader, SPGExtractor] + + @property + def downstream_types(self): + return Union[SPGExtractor, Mapping] + + @property + def type(self): + return ComponentTypeEnum.Builder + + @property + def label(self): + return ComponentLabelEnum.Extractor + + +class LLMBasedExtractor(Runnable, SPGExtractor): + """A Process Component that transforming unstructured data into structured data. + + Examples: + extract = UserDefinedExtractor( + output_fields=["id", 'riskMark', 'useCert'] + ).set_operator("DemoExtractOp") + + """ + + """All output column names after knowledge extraction processing.""" + output_fields: List[str] + """Knowledge extract operator of this component.""" + llm: ModelInvoker + + prompt_ops: List[PromptOp] + + spg_type_name: Union[str, SPGTypeHelper] + + property_names: List[Union[str, PropertyHelper]] + + @property + def input_types(self) -> Input: + return Dict[str, str] + + @property + def output_types(self) -> Output: + return SPGRecord + + def to_rest(self): + """Transforms `LLMBasedExtractor` to REST model `ExtractNodeConfig`.""" + # operator_config = client._generate_op_config( + # op_name=self.extract_op.name, params=self.extract_op.params + # ) + operator_config = {} + config = rest.ExtractNodeConfig( + output_fields=self.output_fields, operator_config=operator_config + ) + + return rest.Node(**super().to_dict(), node_config=config) + + def invoke(self, input: Input) -> Output: + pass + + @classmethod + def from_rest(cls, node: rest.Node): + pass + + def submit(self): + pass + + +class UserDefinedExtractor(Runnable[Dict[str, str], Dict[str, str]], SPGExtractor): + """A Process Component that transforming unstructured data into structured data. + + Examples: + extract = UserDefinedExtractor( + output_fields=["id", 'riskMark', 'useCert'] + ).set_operator("DemoExtractOp") + + """ + + """All output column names after knowledge extraction processing.""" + output_fields: List[str] + """Knowledge extract operator of this component.""" + extract_op: ExtractOp + + @property + def input_types(self) -> Input: + return Dict[str, str] + + @property + def output_types(self) -> Output: + return Dict[str, str] + + @property + def name(self): + return self.__class__.__name__ + + def set_operator(self, op_name: str, params: Dict[str, str] = None): + """Sets knowledge extract operator to this component.""" + self.extract_op = ExtractOp.by_name(op_name)(params) + return self + + def to_rest(self): + """Transforms `UserDefinedExtractor` to REST model `ExtractNodeConfig`.""" + # operator_config = client._generate_op_config( + # op_name=self.extract_op.name, params=self.extract_op.params + # ) + operator_config = {} + config = rest.ExtractNodeConfig( + output_fields=self.output_fields, operator_config=operator_config + ) + + return rest.Node(**super().to_dict(), node_config=config) diff --git a/python/knext/component/builder/mapping.py b/python/knext/component/builder/mapping.py new file mode 100644 index 00000000..c35b65d5 --- /dev/null +++ b/python/knext/component/builder/mapping.py @@ -0,0 +1,204 @@ +from abc import ABC +from typing import Union, Dict, List, Tuple + +from knext.component.base import RESTable, Component, ComponentTypeEnum, ComponentLabelEnum, Runnable +from knext.component.builder.extractor import SPGExtractor +from knext.component.builder.sink_writer import SinkWriter +from knext.component.builder.source_reader import SourceReader +from knext.operator.spg_record import SPGRecord + + +class Mapping(RESTable, Component, ABC): + + @property + def upstream_types(self): + return Union[SourceReader, SPGExtractor] + + @property + def downstream_types(self): + return Union[SinkWriter] + + @property + def type(self): + return ComponentTypeEnum.Builder + + @property + def label(self): + return ComponentLabelEnum.Mapping + + + +class SPGTypeMapping(Runnable[Dict[str, str], SPGRecord], Mapping): + """A Process Component that mapping data to entity/event/concept type. + + Args: + spg_type_name: The SPG type name import from SPGTypeHelper. + Examples: + mapping = SPGTypeMapping( + spg_type_name=DEFAULT.App + ).add_field("id", DEFAULT.App.id) \ + .add_field("id", DEFAULT.App.name) \ + .add_field("riskMark", DEFAULT.App.riskMark) \ + .add_field("useCert", DEFAULT.App.useCert) + + """ + + spg_type_name: Union[str, SPGTypeHelper] + + mapping: Dict[str, str] = dict() + + filters: List[Tuple[str, str]] = list() + + def add_field(self, source_field: str, target_field: Union[str, PropertyHelper], link_op: LinkOp, + norm_op: NormalizeOp): + """Adds a field mapping from source data to property of spg_type. + + :param source_field: The source field to be mapped. + :param target_field: The target field to map the source field to. + :return: self + """ + self.mapping[target_field] = source_field + return self + + def add_filter(self, column_name: str, column_value: str): + """Adds data filtering rule. + Only the column that meets `column_name=column_value` will execute the mapping. + + :param column_name: The column name to be filtered. + :param column_value: The column value to be filtered. + :return: self + """ + self.filters.append((column_name, column_value)) + return self + + def to_rest(self): + """ + Transforms `EntityMappingComponent` to REST model `MappingNodeConfig`. + """ + assert all( + field in self.mapping.keys() + for field in EntityMappingComponent.ENTITY_BASE_FIELDS + ), f"{self.__class__.__name__} must include mapping to {str(EntityMappingComponent.ENTITY_BASE_FIELDS)}" + mapping = defaultdict(list) + schema = {} + subject_type = self.schema_session.get(self.spg_type_name) + for dst_name, src_name in self.mapping.items(): + prop = subject_type.properties.get(dst_name) + mapping[src_name].append(prop.name) + object_type_name = prop.object_type_name + + object_type = self.schema_session.get(object_type_name) + if ( + hasattr(object_type, "link_operator") + and object_type.link_operator is not None + ): + schema[dst_name] = object_type.link_operator + if ( + hasattr(object_type, "normalize_operator") + and object_type.normalize_operator is not None + ): + schema[dst_name] = object_type.normalize_operator + if os.environ.get("KNEXT_DEBUG"): + for name, operator in self.debug_operators: + schema[name] = operator + + mapping_filters = [ + rest.MappingFilter(column_name=name, column_value=value) + for name, value in self.filters + ] + mapping_configs = [ + rest.MappingConfig(source=src_name, target=tgt_names) + for src_name, tgt_names in mapping.items() + ] + mapping_schemas = [ + rest.MappingSchema(name, operator_config=operator_config) + for name, operator_config in schema.items() + ] + + config = rest.MappingNodeConfig( + spg_name=self.spg_type_name, + mapping_type=self.mapping_type, + mapping_filters=mapping_filters, + mapping_schemas=mapping_schemas, + mapping_configs=mapping_configs, + ) + return rest.Node(**super().to_dict(), node_config=config) + + +class RelationMappingComponent(Component): + """A Process Component that mapping data to relation type. + + Args: + subject_name: The subject name import from SPGTypeHelper. + predicate_name: The predicate name. + object_name: The object name import from SPGTypeHelper. + Examples: + mapping = RelationMappingComponent( + subject_name=DEFAULT.App, + predicate_name=DEFAULT.App.useCert, + object_name=DEFAULT.Cert, + ).add_field("src_id", "srcId") \ + .add_field("dst_id", "dstId") + + """ + + subject_name: Union[str, SPGTypeHelper] + predicate_name: Union[str, PropertyHelper] + object_name: Union[str, SPGTypeHelper] + + mapping: Dict[str, str] = dict() + + filters: List[Tuple[str, str]] = list() + + RELATION_BASE_FIELDS = ["srcId", "dstId"] + + def add_field(self, source_field: str, target_field: str): + """Adds a field mapping from source data to property of spg_type. + + :param source_field: The source field to be mapped. + :param target_field: The target field to map the source field to. + :return: self + """ + self.mapping[target_field] = source_field + return self + + def add_filter(self, column_name: str, column_value: str): + """Adds data filtering rule. + Only the column that meets `column_ame=column_value` will execute the mapping. + + :param column_name: The column name to be filtered. + :param column_value: The column value to be filtered. + :return: self + """ + self.filters.append((column_name, column_value)) + return self + + def _to_rest(self): + """Transforms `RelationMappingComponent` to REST model `MappingNodeConfig`.""" + assert all( + field in self.mapping.keys() + for field in RelationMappingComponent.RELATION_BASE_FIELDS + ), f"{self.__class__.__name__} must include mapping to {str(RelationMappingComponent.RELATION_BASE_FIELDS)}" + + mapping = defaultdict(list) + for dst_name, src_name in self.mapping.items(): + mapping[src_name].append(dst_name) + + mapping_filters = [ + rest.MappingFilter(column_name=name, column_value=value) + for name, value in self.filters + ] + mapping_configs = [ + rest.MappingConfig(source=src_name, target=tgt_names) + for src_name, tgt_names in mapping.items() + ] + mapping_schemas = [] + + config = rest.MappingNodeConfig( + spg_name=f"{self.subject_name}_{self.predicate_name}_{self.object_name}", + mapping_type=MappingTypeEnum.Relation, + mapping_filters=mapping_filters, + mapping_schemas=mapping_schemas, + mapping_configs=mapping_configs, + ) + return rest.Node(**super().to_dict(), node_config=config) diff --git a/python/knext/component/builder/sink_writer.py b/python/knext/component/builder/sink_writer.py new file mode 100644 index 00000000..8a0f2122 --- /dev/null +++ b/python/knext/component/builder/sink_writer.py @@ -0,0 +1,55 @@ +from abc import ABC +from ctypes import Union + +from knext.component.base import RESTable, Component + + +class SinkWriter(RESTable, Component, ABC): + + @property + def upstream_types(self): + return Union[Mapping, Evaluator] + + @property + def downstream_types(self): + return None + + @property + def type(self): + return ComponentTypeEnum.Builder + + @property + def label(self): + return ComponentLabelEnum.SinkWriter + + + +class KGSinkWriter(Runnable[Dict[str, str], None], SinkWriter): + """The Sink Component that writing data to KG storage. + + Args: + None + Examples: + sink = KGSinkWriter() + + """ + + @property + def input_types(self) -> Input: + return Dict[str, str] + + @property + def output_types(self) -> Output: + return None + + def invoke(self, input: Input) -> Output: + pass + + def to_rest(self): + """Transforms `SinkToKgComponent` to REST model `GraphStoreSinkNodeConfig`.""" + return dict( + { + "properties": {}, + }, + **super().to_dict(), + ) diff --git a/python/knext/component/builder/source_reader.py b/python/knext/component/builder/source_reader.py new file mode 100644 index 00000000..88f4fa12 --- /dev/null +++ b/python/knext/component/builder/source_reader.py @@ -0,0 +1,70 @@ +from abc import ABC +from typing import Union, List, Dict + +from knext import rest + +from knext.component.base import RESTable, Component, ComponentTypeEnum, ComponentLabelEnum, Runnable, Input, Output +from knext.component.builder.extractor import SPGExtractor +from knext.component.builder.mapping import Mapping + + +class SourceReader(RESTable, Component, ABC): + + @property + def upstream_types(self): + return None + + @property + def downstream_types(self): + return Union[SPGExtractor, Mapping] + + @property + def type(self): + return ComponentTypeEnum.Builder + + @property + def label(self): + return ComponentLabelEnum.SourceReader + + +class CsvSourceReader(SourceReader): + """A source component that reading data from CSV file. + + Args: + local_path: The local path of CSV file. + columns: The column names that need to be read from the CSV file. + start_row: The starting number of rows read from the CSV file. + If the CSV file includes a header, it needs to be greater than or equal to 2. + Examples: + source = SourceCsvComponent( + local_path="./builder/job/data/App.csv", + columns=["id", 'riskMark', 'useCert'], + start_row=2 + ) + """ + + """The local path of CSV file.""" + local_path: str + """The column names that need to be read from the CSV file.""" + columns: List[str] + """The starting number of rows read from the CSV file. + If the CSV file includes a header, it needs to be greater than or equal to 2.""" + start_row: int + + def invoke(self, input: Input) -> Output: + pass + + def submit(self): + pass + + def to_rest(self): + """Transforms `SourceCsvComponent` to REST model `CsvSourceNodeConfig`.""" + + config = rest.CsvSourceNodeConfig( + start_row=self.start_row, url=self.local_path, columns=self.columns + ) + return rest.Node(**super().to_dict(), node_config=config) + + @classmethod + def from_rest(cls, node: rest.Node): + return cls() \ No newline at end of file diff --git a/python/knext/core/builder/__init__.py b/python/knext/core/builder/__init__.py deleted file mode 100644 index 19913efa..00000000 --- a/python/knext/core/builder/__init__.py +++ /dev/null @@ -1,10 +0,0 @@ -# Copyright 2023 Ant Group CO., Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except -# in compliance with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software distributed under the License -# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. diff --git a/python/knext/core/builder/job/__init__.py b/python/knext/core/builder/job/__init__.py deleted file mode 100644 index 481fb2f2..00000000 --- a/python/knext/core/builder/job/__init__.py +++ /dev/null @@ -1,29 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2023 Ant Group CO., Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except -# in compliance with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software distributed under the License -# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. - -from knext.core.builder.job.model.builder_job import BuilderJob -from knext.core.builder.job.model.component import ( - SourceCsvComponent, - KnowledgeExtractComponent, - EntityMappingComponent, - RelationMappingComponent, - SinkToKgComponent, -) - -__all__ = [ - "BuilderJob", - "SourceCsvComponent", - "KnowledgeExtractComponent", - "EntityMappingComponent", - "RelationMappingComponent", - "SinkToKgComponent", -] diff --git a/python/knext/core/builder/job/model/__init__.py b/python/knext/core/builder/job/model/__init__.py deleted file mode 100644 index 19913efa..00000000 --- a/python/knext/core/builder/job/model/__init__.py +++ /dev/null @@ -1,10 +0,0 @@ -# Copyright 2023 Ant Group CO., Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except -# in compliance with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software distributed under the License -# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. diff --git a/python/knext/core/builder/job/model/component.py b/python/knext/core/builder/job/model/component.py deleted file mode 100644 index d9b34655..00000000 --- a/python/knext/core/builder/job/model/component.py +++ /dev/null @@ -1,457 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2023 Ant Group CO., Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except -# in compliance with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software distributed under the License -# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. - - -import os -from abc import ABC -from collections import defaultdict -from enum import Enum -from typing import List, Type, Dict, Union, TypeVar - -from knext import rest -from knext.core.builder.operator.operator import Operator -from knext.core.schema import Schema -from knext.core.schema.model.base import SpgTypeEnum - - -class MappingTypeEnum(str, Enum): - Entity = "SPG_TYPE" - Relation = "RELATION" - - -class ComponentTypeEnum(str, Enum): - SourceCsv = "CSV_SOURCE" - Mapping = "MAPPING" - Extract = "EXTRACT" - SinkToKg = "GRAPH_SINK" - - -T = TypeVar("T", bound="Component") - - -class Component(ABC): - """ - Base class for all component. - """ - - def __init__(self, type: ComponentTypeEnum, name: str): - self.id = str(id(self)) - self.type = type - self.name = name - - self.next = [] - self.pre = [] - - def rename(self, name: str): - self.name = name - return self - - def to_dict(self): - return {"id": self.id, "name": self.name} - - def _translate(self): - return self - - def _to_rest(self): - pass - - def __rshift__(self, other: Union[T, List[T]]): - last = [self] - while last and last[0].next: - last = last[0].next - - if isinstance(other, list): - if isinstance(last, list): - for l in last: - l.next = other - for o in other: - o._translate().pre = last - else: - last.next = other - for o in other: - o._translate().pre = [last] - else: - o = other._translate() - if isinstance(last, list): - for l in last: - l.next = [o] - o.pre = last - else: - last.next = [o] - o.pre = [last] - - return self - - -class SourceCsvComponent(Component): - """A source component that reading data from CSV file. - - Args: - local_path: The local path of CSV file. - columns: The column names that need to be read from the CSV file. - start_row: The starting number of rows read from the CSV file. - If the CSV file includes a header, it needs to be greater than or equal to 2. - Examples: - source = SourceCsvComponent( - local_path="./builder/job/data/App.csv", - columns=["id", 'riskMark', 'useCert'], - start_row=2 - ) - """ - - def __init__(self, local_path: str, columns: List[str], start_row: int): - super().__init__(type=ComponentTypeEnum.SourceCsv, name="CSV") - self.local_path = local_path - self.columns = columns - self.start_row = start_row - - def _to_rest(self): - """Transforms `SourceCsvComponent` to REST model `CsvSourceNodeConfig`.""" - api_client = rest.ObjectStoreApi() - url = api_client.object_store_post( - name=self.local_path.split("/")[-1], file=self.local_path - ).absolute_path - - config = rest.CsvSourceNodeConfig( - start_row=self.start_row, url=url, columns=self.columns - ) - return rest.Node(**super().to_dict(), node_config=config) - - -class KnowledgeExtractComponent(Component): - """A Process Component that transforming unstructured data into structured data. - - Args: - output_fields: All output column names after knowledge extraction processing. - Examples: - extract = KnowledgeExtractComponent( - output_fields=["id", 'riskMark', 'useCert'] - ).set_operator("DemoExtractOp") - - """ - - def __init__( - self, - output_fields: List[str], - ): - super().__init__(type=ComponentTypeEnum.Extract, name="知识抽取") - self.output_fields = output_fields - self.operator = None - self.params = None - - def set_operator(self, op_name: str, params: Dict[str, str] = None): - """Sets knowledge extract operator to this component.""" - self.operator = op_name - self.params = params - return self - - def _to_rest(self): - """Transforms `KnowledgeExtractComponent` to REST model `ExtractNodeConfig`.""" - client = Operator() - operator_config = client._generate_op_config( - op_name=self.operator, params=self.params - ) - config = rest.ExtractNodeConfig( - output_fields=self.output_fields, operator_config=operator_config - ) - - return rest.Node(**super().to_dict(), node_config=config) - - -class RelationMappingComponent(Component): - """A Process Component that mapping data to relation type. - - Args: - subject_name: The subject name import from SchemaHelper. - predicate_name: The predicate name. - object_name: The object name import from SchemaHelper. - Examples: - mapping = RelationMappingComponent( - subject_name=DEFAULT.App, - predicate_name=DEFAULT.App.useCert, - object_name=DEFAULT.Cert, - ).add_field("src_id", "srcId") \ - .add_field("dst_id", "dstId") - - """ - - RELATION_BASE_FIELDS = ["srcId", "dstId"] - - def __init__( - self, - subject_name: Union[Type, str], - predicate_name: str, - object_name: Union[Type, str], - ): - super().__init__(type=ComponentTypeEnum.Mapping, name="关系映射") - - if isinstance(subject_name, str): - self.subject_name = subject_name - else: - assert hasattr( - subject_name, "__typename__" - ), f"Cannot find `__typename__` of `{subject_name}` in schema_helper." - self.subject_name = subject_name.__typename__ - if isinstance(object_name, str): - self.object_name = object_name - else: - assert hasattr( - object_name, "__typename__" - ), f"Cannot find `__typename__` of `{object_name}` in schema_helper." - self.object_name = object_name.__typename__ - self.predicate_name = predicate_name - self.mapping = dict() - self.mapping_type = MappingTypeEnum.Relation - - self.filters = list() - - def add_field(self, source_field: str, target_field: str): - """Adds a field mapping from source data to property of spg_type. - - :param source_field: The source field to be mapped. - :param target_field: The target field to map the source field to. - :return: self - """ - self.mapping[target_field] = source_field - return self - - def add_filter(self, column_name: str, column_value: str): - """Adds data filtering rule. - Only the column that meets `column_ame=column_value` will execute the mapping. - - :param column_name: The column name to be filtered. - :param column_value: The column value to be filtered. - :return: self - """ - self.filters.append((column_name, column_value)) - return self - - def _to_rest(self): - """Transforms `RelationMappingComponent` to REST model `MappingNodeConfig`.""" - assert all( - field in self.mapping.keys() - for field in RelationMappingComponent.RELATION_BASE_FIELDS - ), f"{self.__class__.__name__} must include mapping to {str(RelationMappingComponent.RELATION_BASE_FIELDS)}" - - mapping = defaultdict(list) - for dst_name, src_name in self.mapping.items(): - mapping[src_name].append(dst_name) - - mapping_filters = [ - rest.MappingFilter(column_name=name, column_value=value) - for name, value in self.filters - ] - mapping_configs = [ - rest.MappingConfig(source=src_name, target=tgt_names) - for src_name, tgt_names in mapping.items() - ] - mapping_schemas = [] - - config = rest.MappingNodeConfig( - spg_name=f"{self.subject_name}_{self.predicate_name}_{self.object_name}", - mapping_type=self.mapping_type, - mapping_filters=mapping_filters, - mapping_schemas=mapping_schemas, - mapping_configs=mapping_configs, - ) - return rest.Node(**super().to_dict(), node_config=config) - - -class EntityMappingComponent(Component): - """A Process Component that mapping data to entity/event/concept type. - - Args: - spg_type_name: The SPG type name import from SchemaHelper. - Examples: - mapping = EntityMappingComponent( - spg_type_name=DEFAULT.App - ).add_field("id", DEFAULT.App.id) \ - .add_field("id", DEFAULT.App.name) \ - .add_field("riskMark", DEFAULT.App.riskMark) \ - .add_field("useCert", DEFAULT.App.useCert) - - """ - - ENTITY_BASE_FIELDS = ["id"] - - def __init__(self, spg_type_name: Union[Type, str]): - super().__init__(type=ComponentTypeEnum.Mapping, name="实体映射") - - if isinstance(spg_type_name, str): - self.spg_type_name = spg_type_name - else: - assert hasattr( - spg_type_name, "__typename__" - ), f"Cannot find `__typename__` of `{spg_type_name}` in schema_helper." - self.spg_type_name = spg_type_name.__typename__ - self.mapping = dict() - self.mapping_type = MappingTypeEnum.Entity - self.schema_session = Schema().create_session() - self.op_client = Operator() - - self.debug_operators = dict() - self.filters = list() - - def add_field(self, source_field: str, target_field: str): - """Adds a field mapping from source data to property of spg_type. - - :param source_field: The source field to be mapped. - :param target_field: The target field to map the source field to. - :return: self - """ - self.mapping[target_field] = source_field - return self - - def add_filter(self, column_name: str, column_value: str): - """Adds data filtering rule. - Only the column that meets `column_name=column_value` will execute the mapping. - - :param column_name: The column name to be filtered. - :param column_value: The column value to be filtered. - :return: self - """ - self.filters.append((column_name, column_value)) - return self - - def _to_rest(self): - """ - Transforms `EntityMappingComponent` to REST model `MappingNodeConfig`. - """ - assert all( - field in self.mapping.keys() - for field in EntityMappingComponent.ENTITY_BASE_FIELDS - ), f"{self.__class__.__name__} must include mapping to {str(EntityMappingComponent.ENTITY_BASE_FIELDS)}" - mapping = defaultdict(list) - schema = {} - subject_type = self.schema_session.get(self.spg_type_name) - for dst_name, src_name in self.mapping.items(): - prop = subject_type.properties.get(dst_name) - mapping[src_name].append(prop.name) - object_type_name = prop.object_type_name - - object_type = self.schema_session.get(object_type_name) - if ( - hasattr(object_type, "link_operator") - and object_type.link_operator is not None - ): - schema[dst_name] = object_type.link_operator - if ( - hasattr(object_type, "normalize_operator") - and object_type.normalize_operator is not None - ): - schema[dst_name] = object_type.normalize_operator - if os.environ.get("KNEXT_DEBUG"): - for name, operator in self.debug_operators: - schema[name] = operator - - mapping_filters = [ - rest.MappingFilter(column_name=name, column_value=value) - for name, value in self.filters - ] - mapping_configs = [ - rest.MappingConfig(source=src_name, target=tgt_names) - for src_name, tgt_names in mapping.items() - ] - mapping_schemas = [ - rest.MappingSchema(name, operator_config=operator_config) - for name, operator_config in schema.items() - ] - - config = rest.MappingNodeConfig( - spg_name=self.spg_type_name, - mapping_type=self.mapping_type, - mapping_filters=mapping_filters, - mapping_schemas=mapping_schemas, - mapping_configs=mapping_configs, - ) - return rest.Node(**super().to_dict(), node_config=config) - - -class SPGMappingComponent(Component): - """A Process Component that extract SPO triples from long texts, - and mapping data to entity/event/concept type based on the schema definition. - - Args: - spg_type_name: The SPG type name import from SchemaHelper. - Examples: - mapping = SPGMappingComponent( - spg_type_name=DEFAULT.Disease - ).set_operator("DiseaseExtractor") - - """ - - def __init__(self, spg_type_name: Union[Type, str]): - super().__init__(type=ComponentTypeEnum.Mapping, name="SPG抽取映射") - - if isinstance(spg_type_name, str): - self.spg_type_name = spg_type_name - else: - assert hasattr( - spg_type_name, "__typename__" - ), f"Cannot find `__typename__` of `{spg_type_name}` in schema_helper." - self.spg_type_name = spg_type_name.__typename__ - self.schema_session = Schema().create_session() - self.operator = None - self.params = None - - def set_operator(self, op_name: str, params: Dict[str, str] = None): - """Sets knowledge extract operator to this component.""" - self.operator = op_name - self.params = params - return self - - def _translate(self): - """Transforms `SPGMappingComponent` to REST model `ExtractNodeConfig` and `MappingNodeConfig`.""" - extract = KnowledgeExtractComponent([]).set_operator(self.operator, self.params) - - subject_mapping = EntityMappingComponent(self.spg_type_name).add_filter( - "__vertex_type__", self.spg_type_name - ) - subject_type = self.schema_session.get(self.spg_type_name) - - object_mappings = [] - for prop in subject_type.properties: - subject_mapping.add_field(prop, prop) - object_type_name = subject_type.properties[prop].object_type_name - object_type = self.schema_session.get(object_type_name) - - if object_type.spg_type_enum == SpgTypeEnum.Basic: - continue - - object_mapping = ( - EntityMappingComponent(object_type_name) - .add_filter("__vertex_type__", object_type_name) - .add_field("id", "id") - .add_field("name", "name") - ) - object_mappings.append(object_mapping) - - return extract >> [subject_mapping] + object_mappings - - -class SinkToKgComponent(Component): - """The Sink Component that writing data to KG storage. - - Args: - None - Examples: - sink = SinkToKgComponent() - - """ - - def __init__(self): - super().__init__(type=ComponentTypeEnum.SinkToKg, name="图谱") - - def _to_rest(self): - """Transforms `SinkToKgComponent` to REST model `GraphStoreSinkNodeConfig`.""" - config = rest.GraphStoreSinkNodeConfig() - return rest.Node(**super().to_dict(), node_config=config) diff --git a/python/knext/core/builder/operator/__init__.py b/python/knext/core/builder/operator/__init__.py deleted file mode 100644 index 4afb09fb..00000000 --- a/python/knext/core/builder/operator/__init__.py +++ /dev/null @@ -1,21 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2023 Ant Group CO., Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except -# in compliance with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software distributed under the License -# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. - -from knext.core.builder.operator.model.eval_result import EvalResult -from knext.core.builder.operator.model.op import BaseOp -from knext.core.builder.operator.model.op import EntityFuseOp -from knext.core.builder.operator.model.op import EntityLinkOp -from knext.core.builder.operator.model.op import KnowledgeExtractOp -from knext.core.builder.operator.model.op import OperatorTypeEnum -from knext.core.builder.operator.model.op import PropertyNormalizeOp -from knext.core.builder.operator.model.vertex import Vertex -from knext.core.builder.operator.operator import Operator diff --git a/python/knext/core/builder/operator/model/__init__.py b/python/knext/core/builder/operator/model/__init__.py deleted file mode 100644 index 19913efa..00000000 --- a/python/knext/core/builder/operator/model/__init__.py +++ /dev/null @@ -1,10 +0,0 @@ -# Copyright 2023 Ant Group CO., Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except -# in compliance with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software distributed under the License -# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. diff --git a/python/knext/core/builder/operator/model/op.py b/python/knext/core/builder/operator/model/op.py deleted file mode 100644 index 799024ce..00000000 --- a/python/knext/core/builder/operator/model/op.py +++ /dev/null @@ -1,198 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2023 Ant Group CO., Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except -# in compliance with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software distributed under the License -# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. - -from abc import ABC -from enum import Enum -from typing import List, Dict, Any, Type - -from knext.core.builder.operator.model.eval_result import EvalResult -from knext.core.builder.operator.model.vertex import Vertex - - -class OperatorTypeEnum(str, Enum): - EntityLinkOp = "ENTITY_LINK" - EntityFuseOp = "ENTITY_FUSE" - PropertyNormalizeOp = "PROPERTY_NORMALIZE" - KnowledgeExtractOp = "KNOWLEDGE_EXTRACT" - - -class BaseOp(ABC): - """Base class for all user-defined operator functions. - - The execution logic of the operator needs to be implemented in the `eval` method. - """ - - name: str - desc: str = "" - bind_to: Type = None - - _registry = {} - _local_path: str - _type: str - _version: int - - def __init__(self, params: Dict[str, str] = None): - self.params = params - - def eval(self, *args): - """Used to implement operator execution logic.""" - raise NotImplementedError( - f"{self.__class__.__name__} need to implement `eval` method." - ) - - def handle(self, *inputs) -> Dict[str, Any]: - """Only available for Builder in OpenKgEngine to call through the pemja tool.""" - pre_input = self._pre_process(*inputs) - output = self.eval(*pre_input) - post_output = self._post_process(output) - return post_output - - @staticmethod - def _pre_process(*inputs): - """Convert data structures in building job into structures in operator before `eval` method.""" - pass - - @staticmethod - def _post_process(output: EvalResult) -> Dict[str, Any]: - """Convert result structures in operator into structures in building job after `eval` method.""" - pass - - @classmethod - def register(cls, name: str, local_path: str): - """ - Register a class as subclass of BaseOp with name and local_path. - After registration, the subclass object can be inspected by `BaseOp.by_name(op_name)`. - """ - - def add_subclass_to_registry(subclass: Type["BaseOp"]): - subclass.name = name - subclass._local_path = local_path - subclass._type = OperatorTypeEnum[subclass.__base__.__name__] - if name in cls._registry: - raise ValueError( - f"Operator [{name}] conflict in {subclass._local_path} and {cls.by_name(name)._local_path}." - ) - cls._registry[name] = subclass - return subclass - - return add_subclass_to_registry - - @classmethod - def by_name(cls, name: str): - """Reflection from op name to subclass object of BaseOp.""" - if name in cls._registry: - subclass = cls._registry[name] - return subclass - else: - raise ValueError(f"{name} is not a registered name for {cls.__name__}. ") - - -class KnowledgeExtractOp(BaseOp, ABC): - """Base class for all knowledge extract operators.""" - - def __init__(self, params: Dict[str, str] = None): - super().__init__(params) - - def eval(self, record: Dict[str, str]) -> List[Vertex]: - raise NotImplementedError( - f"{self.__class__.__name__} need to implement `eval` method." - ) - - @staticmethod - def _pre_process(*inputs): - return (Vertex.from_dict(inputs[0]).properties,) - - @staticmethod - def _post_process(output) -> Dict[str, Any]: - if isinstance(output, tuple): - result = EvalResult[List[Vertex]](*output[:3]).to_dict() - else: - result = EvalResult[List[Vertex]](output).to_dict() - for data in result["data"]: - if data.get("bizId"): - data["props"]["id"] = data["bizId"] - if data.get("vertexType"): - data["props"]["__vertex_type__"] = data["vertexType"] - return result - - -class PropertyNormalizeOp(BaseOp, ABC): - """Base class for all property normalize operators.""" - - def __init__(self, params: Dict[str, str] = None): - super().__init__(params) - - def eval(self, property: str, record: Vertex) -> str: - raise NotImplementedError( - f"{self.__class__.__name__} need to implement `eval` method." - ) - - @staticmethod - def _pre_process(*inputs): - return inputs[0], Vertex.from_dict(inputs[1]) - - @staticmethod - def _post_process(output) -> Dict[str, Any]: - if isinstance(output, tuple): - return EvalResult[str](*output[:3]).to_dict() - else: - return EvalResult[str](output).to_dict() - - -class EntityLinkOp(BaseOp, ABC): - """Base class for all entity link operators.""" - - def __init__(self, params: Dict[str, str] = None): - super().__init__(params) - - def eval(self, property: str, record: Vertex) -> List[Vertex]: - raise NotImplementedError( - f"{self.__class__.__name__} need to implement `eval` method." - ) - - @staticmethod - def _pre_process(*inputs): - return inputs[0], Vertex.from_dict(inputs[1]) - - @staticmethod - def _post_process(output) -> Dict[str, Any]: - if isinstance(output, tuple): - return EvalResult[List[Vertex]](*output[:3]).to_dict() - else: - return EvalResult[List[Vertex]](output).to_dict() - - -class EntityFuseOp(BaseOp, ABC): - """Base class for all entity fuse operators.""" - - def __init__(self, params: Dict[str, str] = None): - super().__init__(params) - - def eval( - self, source_vertex: Vertex, target_vertexes: List[Vertex] - ) -> List[Vertex]: - raise NotImplementedError( - f"{self.__class__.__name__} need to implement `eval` method." - ) - - @staticmethod - def _pre_process(*inputs): - return Vertex.from_dict(inputs[0]), [ - Vertex.from_dict(input) for input in inputs[1] - ] - - @staticmethod - def _post_process(output) -> Dict[str, Any]: - if isinstance(output, tuple): - return EvalResult[List[Vertex]](*output[:3]).to_dict() - else: - return EvalResult[List[Vertex]](output).to_dict() diff --git a/python/knext/core/reasoner/__init__.py b/python/knext/core/reasoner/__init__.py deleted file mode 100644 index fdb957a5..00000000 --- a/python/knext/core/reasoner/__init__.py +++ /dev/null @@ -1,12 +0,0 @@ -# Copyright 2023 Ant Group CO., Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except -# in compliance with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software distributed under the License -# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. - -from knext.core.reasoner.reasoner import Reasoner diff --git a/python/knext/core/reasoner/model/__init__.py b/python/knext/core/reasoner/model/__init__.py deleted file mode 100644 index 19913efa..00000000 --- a/python/knext/core/reasoner/model/__init__.py +++ /dev/null @@ -1,10 +0,0 @@ -# Copyright 2023 Ant Group CO., Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except -# in compliance with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software distributed under the License -# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. diff --git a/python/knext/core/schema/__init__.py b/python/knext/core/schema/__init__.py deleted file mode 100644 index 8fbce26e..00000000 --- a/python/knext/core/schema/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2023 Ant Group CO., Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except -# in compliance with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software distributed under the License -# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. - -from knext.core.schema.schema import Schema diff --git a/python/knext/core/schema/model/__init__.py b/python/knext/core/schema/model/__init__.py deleted file mode 100644 index b108a56a..00000000 --- a/python/knext/core/schema/model/__init__.py +++ /dev/null @@ -1,18 +0,0 @@ -# Copyright 2023 Ant Group CO., Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except -# in compliance with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software distributed under the License -# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. - -from knext.core.schema.model.property import Property -from knext.core.schema.model.relation import Relation -from knext.core.schema.model.spg_type import BasicType -from knext.core.schema.model.spg_type import ConceptType -from knext.core.schema.model.spg_type import EntityType -from knext.core.schema.model.spg_type import EventType -from knext.core.schema.model.spg_type import StandardType diff --git a/python/knext/core/wrapper/__init__.py b/python/knext/core/wrapper/__init__.py deleted file mode 100644 index 19913efa..00000000 --- a/python/knext/core/wrapper/__init__.py +++ /dev/null @@ -1,10 +0,0 @@ -# Copyright 2023 Ant Group CO., Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except -# in compliance with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software distributed under the License -# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. diff --git a/python/knext/operator/__init__.py b/python/knext/operator/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/python/knext/operator/base.py b/python/knext/operator/base.py new file mode 100644 index 00000000..ed478a84 --- /dev/null +++ b/python/knext/operator/base.py @@ -0,0 +1,96 @@ +# -*- coding: utf-8 -*- +# Copyright 2023 Ant Group CO., Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. + +from abc import ABC +from enum import Enum +from typing import List, Dict, Any, Type + +from knext.operator.eval_result import EvalResult +from knext.operator.spg_record import Vertex + + +class OperatorTypeEnum(str, Enum): + EntityLinkOp = "ENTITY_LINK" + EntityFuseOp = "ENTITY_FUSE" + PropertyNormalizeOp = "PROPERTY_NORMALIZE" + KnowledgeExtractOp = "KNOWLEDGE_EXTRACT" + + +class BaseOp(ABC): + """Base class for all user-defined operator functions. + + The execution logic of the operator needs to be implemented in the `eval` method. + """ + + name: str + desc: str = "" + bind_to: Type = None + + _registry = {} + _local_path: str + _type: str + _version: int + + def __init__(self, params: Dict[str, str] = None): + self.params = params + + def eval(self, *args): + """Used to implement operator execution logic.""" + raise NotImplementedError( + f"{self.__class__.__name__} need to implement `eval` method." + ) + + def handle(self, *inputs) -> Dict[str, Any]: + """Only available for Builder in OpenKgEngine to call through the pemja tool.""" + pre_input = self._pre_process(*inputs) + output = self.eval(*pre_input) + post_output = self._post_process(output) + return post_output + + @staticmethod + def _pre_process(*inputs): + """Convert data structures in building job into structures in operator before `eval` method.""" + pass + + @staticmethod + def _post_process(output: EvalResult) -> Dict[str, Any]: + """Convert result structures in operator into structures in building job after `eval` method.""" + pass + + @classmethod + def register(cls, name: str, local_path: str): + """ + Register a class as subclass of BaseOp with name and local_path. + After registration, the subclass object can be inspected by `BaseOp.by_name(op_name)`. + """ + + def add_subclass_to_registry(subclass: Type["BaseOp"]): + subclass.name = name + subclass._local_path = local_path + subclass._type = OperatorTypeEnum[subclass.__base__.__name__] + if name in cls._registry: + raise ValueError( + f"Operator [{name}] conflict in {subclass._local_path} and {cls.by_name(name)._local_path}." + ) + cls._registry[name] = subclass + return subclass + + return add_subclass_to_registry + + @classmethod + def by_name(cls, name: str): + """Reflection from op name to subclass object of BaseOp.""" + if name in cls._registry: + subclass = cls._registry[name] + return subclass + else: + raise ValueError(f"{name} is not a registered name for {cls.__name__}. ") diff --git a/python/knext/operator/builtin/__init__.py b/python/knext/operator/builtin/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/python/knext/operator/builtin/online_runner.py b/python/knext/operator/builtin/online_runner.py new file mode 100644 index 00000000..33e55f5d --- /dev/null +++ b/python/knext/operator/builtin/online_runner.py @@ -0,0 +1,48 @@ +from typing import Dict, List + +from knext.api.operator import ExtractOp +from knext.models.runtime.vertex import Vertex +from NN4K.invoker.base import ModelInvoker + + +class BuiltInOnlineLLMBasedExtractOp(ExtractOp): + + def __init__(self, params: Dict[str, str] = None): + """ + + Args: + params: {"model_name": "openai", "token": "**"} + """ + super().__init__(params) + self.model = ModelInvoker.from_config(params) + self.prompt_ops = [] + + def eval(self, record: Dict[str, str]) -> List[Vertex]: + + # 对于单条数据【record】执行多层抽取 + # 每次抽取都需要执行op.build_prompt()->model.predict()->op.parse_response()流程 + # 且每次抽取后可能得到多条结果,下次抽取需要对多条结果分别进行抽取。 + record_list = [record] + # 循环所有prompt算子,算子数量决定对单条数据执行几层抽取 + for index, op in enumerate(self.prompt_ops): + extract_result_list = [] + # record_list可能有多条数据,对多条数据都要进行抽取 + while record_list: + _record = record_list.pop() + # 生成完整query + query = op.build_prompt(_record) + # 模型预测,生成模型输出结果 + response = self.model.inference(query) + # response = self.model[op.name] + # 模型结果的后置处理,可能会拆分成多条数据 List[dict[str, str]] + result_list = op.parse_response(response) + # 把输入的record和模型输出的result拼成一个新的dict,作为这次抽取最终结果 + for result in result_list: + _ = _record.copy() + _.update(result) + extract_result_list.append(_) + # record_list为空时,执行下一层抽取 + if index == len(self.prompt_ops) - 1: + return extract_result_list + else: + record_list.extend(extract_result_list) diff --git a/python/knext/core/builder/operator/model/eval_result.py b/python/knext/operator/eval_result.py similarity index 100% rename from python/knext/core/builder/operator/model/eval_result.py rename to python/knext/operator/eval_result.py diff --git a/python/knext/operator/op.py b/python/knext/operator/op.py new file mode 100644 index 00000000..a03c5986 --- /dev/null +++ b/python/knext/operator/op.py @@ -0,0 +1,156 @@ +from abc import ABC +from typing import List, Dict, Any + +from knext.operator.base import BaseOp +from knext.operator.eval_result import EvalResult +from knext.operator.spg_record import SPGRecord + + +class ExtractOp(BaseOp, ABC): + """Base class for all knowledge extract operators.""" + + def __init__(self, params: Dict[str, str] = None): + super().__init__(params) + + def eval(self, record: Dict[str, str]) -> List[SPGRecord]: + raise NotImplementedError( + f"{self.__class__.__name__} need to implement `eval` method." + ) + + @staticmethod + def _pre_process(*inputs): + return (SPGRecord.from_dict(inputs[0]).properties,) + + @staticmethod + def _post_process(output) -> Dict[str, Any]: + if isinstance(output, EvalResult): + return output.to_dict() + if isinstance(output, tuple): + return EvalResult[List[SPGRecord]](*output[:3]).to_dict() + else: + return EvalResult[List[SPGRecord]](output).to_dict() + + +class NormalizeOp(BaseOp, ABC): + """Base class for all property normalize operators.""" + + def __init__(self, params: Dict[str, str] = None): + super().__init__(params) + + def eval(self, property: str, record: SPGRecord) -> str: + raise NotImplementedError( + f"{self.__class__.__name__} need to implement `eval` method." + ) + + @staticmethod + def _pre_process(*inputs): + return inputs[0], SPGRecord.from_dict(inputs[1]) + + @staticmethod + def _post_process(output) -> Dict[str, Any]: + if isinstance(output, EvalResult): + return output.to_dict() + if isinstance(output, tuple): + return EvalResult[str](*output[:3]).to_dict() + else: + return EvalResult[str](output).to_dict() + + +class LinkOp(BaseOp, ABC): + """Base class for all entity link operators.""" + + def __init__(self, params: Dict[str, str] = None): + super().__init__(params) + + def eval(self, property: str, record: SPGRecord) -> List[SPGRecord]: + raise NotImplementedError( + f"{self.__class__.__name__} need to implement `eval` method." + ) + + @staticmethod + def _pre_process(*inputs): + return inputs[0], SPGRecord.from_dict(inputs[1]) + + @staticmethod + def _post_process(output) -> Dict[str, Any]: + if isinstance(output, EvalResult): + return output.to_dict() + if isinstance(output, tuple): + return EvalResult[List[SPGRecord]](*output[:3]).to_dict() + else: + return EvalResult[List[SPGRecord]](output).to_dict() + + +class FuseOp(BaseOp, ABC): + """Base class for all entity fuse operators.""" + + def __init__(self, params: Dict[str, str] = None): + super().__init__(params) + + def eval( + self, source_SPGRecord: SPGRecord, target_SPGRecordes: List[SPGRecord] + ) -> List[SPGRecord]: + raise NotImplementedError( + f"{self.__class__.__name__} need to implement `eval` method." + ) + + @staticmethod + def _pre_process(*inputs): + return SPGRecord.from_dict(inputs[0]), [ + SPGRecord.from_dict(input) for input in inputs[1] + ] + + @staticmethod + def _post_process(output) -> Dict[str, Any]: + if isinstance(output, EvalResult): + return output.to_dict() + if isinstance(output, tuple): + return EvalResult[List[SPGRecord]](*output[:3]).to_dict() + else: + return EvalResult[List[SPGRecord]](output).to_dict() + + +class PromptOp(ExtractOp, ABC): + """Base class for all prompt operators.""" + + template: str + + def __init__(self, params: Dict[str, str] = None): + super().__init__(params) + + def build_prompt(self, record: Dict[str, str]) -> str: + raise NotImplementedError( + f"{self.__class__.__name__} need to implement `build_prompt` method." + ) + + def parse_response(self, response: str) -> List[Dict[str, str]]: + raise NotImplementedError( + f"{self.__class__.__name__} need to implement `parse_response` method." + ) + + def eval(self, *args): + """Used to implement operator execution logic.""" + raise NotImplementedError( + f"{self.__class__.__name__} need to implement `eval` method." + ) + + def handle(self, *inputs) -> Dict[str, Any]: + """Only available for Builder in OpenKgEngine to call through the pemja tool.""" + pre_input = self._pre_process(*inputs) + output = self.eval(*pre_input) + post_output = self._post_process(output) + return post_output + + @staticmethod + def _pre_process(*inputs): + """Convert data structures in building job into structures in operator before `eval` method.""" + pass + + @staticmethod + def _post_process(output) -> Dict[str, Any]: + if isinstance(output, EvalResult): + return output.to_dict() + if isinstance(output, tuple): + return EvalResult[List[SPGRecord]](*output[:3]).to_dict() + else: + return EvalResult[List[SPGRecord]](output).to_dict() diff --git a/python/knext/core/builder/operator/model/vertex.py b/python/knext/operator/spg_record.py similarity index 96% rename from python/knext/core/builder/operator/model/vertex.py rename to python/knext/operator/spg_record.py index 1b05178e..e96ab00b 100644 --- a/python/knext/core/builder/operator/model/vertex.py +++ b/python/knext/operator/spg_record.py @@ -14,7 +14,7 @@ import pprint from typing import Dict, Any, List -class Vertex: +class SPGRecord: """Data structure in operator, used to store entity information.""" def __init__( @@ -151,10 +151,10 @@ class Vertex: "props": self.properties, } - @staticmethod - def from_dict(input: Dict[str, Any]): + @classmethod + def from_dict(cls, input: Dict[str, Any]): """Returns the model from a dict""" - return Vertex(input.get("bizId"), input.get("vertexType"), input.get("props")) + return cls(input.get("bizId"), input.get("vertexType"), input.get("props")) def __repr__(self): """For `print` and `pprint`"""