From fe18880fcf73e9672759dc54cc1adf055ea12edd Mon Sep 17 00:00:00 2001 From: Qu Date: Fri, 8 Dec 2023 11:25:26 +0800 Subject: [PATCH] overwrite __rshift__ --- python/knext/api/__init__.py | 12 --- python/knext/api/client.py | 14 +++ python/knext/api/component.py | 17 +++ python/knext/api/operator.py | 22 ---- python/knext/chain/base.py | 50 ++++++++- python/knext/chain/builder_chain.py | 5 +- .../knext/client/marklang/concept_rule_ml.py | 2 +- python/knext/common/restable.py | 24 +++++ python/knext/common/runnable.py | 24 +++++ python/knext/component/base.py | 102 +++++++++--------- python/knext/component/builder/__init__.py | 19 ++++ python/knext/component/builder/base.py | 80 ++++++++++++++ python/knext/component/builder/extractor.py | 47 +++----- python/knext/component/builder/mapping.py | 50 +++------ python/knext/component/builder/sink_writer.py | 35 ++---- .../knext/component/builder/source_reader.py | 26 +---- python/knext/operator/base.py | 2 +- python/knext/operator/op.py | 25 ----- .../${project}_schema_helper.py.tmpl | 13 +++ python/tests/__init__.py | 0 python/tests/chain_test.py | 33 ++++++ 21 files changed, 373 insertions(+), 229 deletions(-) create mode 100644 python/knext/common/restable.py create mode 100644 python/knext/common/runnable.py create mode 100644 python/knext/component/builder/base.py create mode 100644 python/knext/templates/schema_helper/${project}_schema_helper.py.tmpl create mode 100644 python/tests/__init__.py create mode 100644 python/tests/chain_test.py diff --git a/python/knext/api/__init__.py b/python/knext/api/__init__.py index 9291886f..e69de29b 100644 --- a/python/knext/api/__init__.py +++ b/python/knext/api/__init__.py @@ -1,12 +0,0 @@ -from knext.api.operator import ( - BaseOp, - ExtractOp, - KnowledgeExtractOp, - EntityLinkOp, - LinkOp, - EntityFuseOp, - FuseOp, - PropertyNormalizeOp, - NormalizeOp, - PromptOp, -) diff --git a/python/knext/api/client.py b/python/knext/api/client.py index e69de29b..53137d57 100644 --- a/python/knext/api/client.py +++ b/python/knext/api/client.py @@ -0,0 +1,14 @@ + +from knext.client.builder import BuilderClient +from knext.client.schema import SchemaClient +from knext.client.reasoner import ReasonerClient +from knext.client.operator import OperatorClient +from knext.client.search import SearchClient + +__all__ = [ + "BuilderClient", + "SchemaClient", + "ReasonerClient", + "OperatorClient", + "SearchClient" +] diff --git a/python/knext/api/component.py b/python/knext/api/component.py index e69de29b..1c3a22d7 100644 --- a/python/knext/api/component.py +++ b/python/knext/api/component.py @@ -0,0 +1,17 @@ + +from knext.component.builder import UserDefinedExtractor, LLMBasedExtractor +from knext.component.builder import SPGTypeMapping, RelationMapping +from knext.component.builder import CsvSourceReader +from knext.component.builder import KGSinkWriter +from knext.component.base import Component + + +__all__ = [ + "UserDefinedExtractor", + "LLMBasedExtractor", + "CsvSourceReader", + "SPGTypeMapping", + "RelationMapping", + "KGSinkWriter", + "Component" +] diff --git a/python/knext/api/operator.py b/python/knext/api/operator.py index 5f0939d9..e69de29b 100644 --- a/python/knext/api/operator.py +++ b/python/knext/api/operator.py @@ -1,22 +0,0 @@ -from knext.core.builder.operator.model.op import BaseOp, ExtractOp, LinkOp, FuseOp, NormalizeOp, PromptOp - - -KnowledgeExtractOp = ExtractOp -EntityLinkOp = LinkOp -EntityFuseOp = FuseOp -PropertyNormalizeOp = NormalizeOp - -__all__ = [ - "BaseOp", - "ExtractOp", - "LinkOp", - "FuseOp", - "NormalizeOp", - "PromptOp", - "LinkOp", -] + [ - "KnowledgeExtractOp", - "EntityLinkOp", - "EntityFuseOp", - "PropertyNormalizeOp", -] diff --git a/python/knext/chain/base.py b/python/knext/chain/base.py index 25c82f43..9f13a2f3 100644 --- a/python/knext/chain/base.py +++ b/python/knext/chain/base.py @@ -1,8 +1,54 @@ from abc import ABC +from typing import Union, Type, List + +import networkx as nx + +from knext.common.restable import RESTable +from knext.common.runnable import Runnable -class Chain(ABC): +class Chain(Runnable, RESTable): - def dag(self): + dag: nx.DiGraph + + def submit(self): pass + def to_rest(self): + pass + + def __rshift__(self, other: Union[ + Type['Chain'], + List[Type['Chain']], + Type['Component'], + List[Type['Component']], + None + ]): + from knext.component.base import Component + if not other: + return self + if not isinstance(other, list): + other = [other] + dag_list = [] + for o in other: + if not o: + dag_list.append(o.dag) + if isinstance(o, Component): + end_nodes = [node for node, out_degree in self.dag.out_degree() if out_degree == 0 or node.last] + dag = nx.DiGraph(self.dag) + if len(end_nodes) > 0: + for end_node in end_nodes: + dag.add_edge(end_node, o) + dag.add_node(o) + dag_list.append(dag) + elif isinstance(o, Chain): + combined_dag = nx.compose(self.dag, o.dag) + end_nodes = [node for node, out_degree in self.dag.out_degree() if out_degree == 0 or node.last] + start_nodes = [node for node, in_degree in o.dag.in_degree() if in_degree == 0] + + if len(end_nodes) > 0 and len(start_nodes) > 0: + for end_node in end_nodes: + for start_node in start_nodes: + combined_dag.add_edge(end_node, start_node) + final_dag = nx.compose_all(dag_list) + return Chain(dag=final_dag) diff --git a/python/knext/chain/builder_chain.py b/python/knext/chain/builder_chain.py index a26c5885..03fb9c15 100644 --- a/python/knext/chain/builder_chain.py +++ b/python/knext/chain/builder_chain.py @@ -12,7 +12,7 @@ class BuilderChain(RESTable, Chain): source: SourceReader - process: Union[SPGExtractor, Mapping] + process: Union[SPGExtractor, Mapping, ] sink: SinkWriter @@ -24,3 +24,6 @@ class BuilderChain(RESTable, Chain): def output_types(self): return None + @classmethod + def from_config(cls): + return cls() diff --git a/python/knext/client/marklang/concept_rule_ml.py b/python/knext/client/marklang/concept_rule_ml.py index 811cd58e..a23576e5 100644 --- a/python/knext/client/marklang/concept_rule_ml.py +++ b/python/knext/client/marklang/concept_rule_ml.py @@ -233,7 +233,7 @@ class SPGConceptRuleMarkLang: Load and then parse the script file """ - file = open(filename, "r") + file = open(filename, "r", encoding="utf-8") lines = file.read().splitlines() last_indent_level = 0 diff --git a/python/knext/common/restable.py b/python/knext/common/restable.py new file mode 100644 index 00000000..386b51d2 --- /dev/null +++ b/python/knext/common/restable.py @@ -0,0 +1,24 @@ +from abc import ABC + +from knext import rest + + +class RESTable(ABC): + + @property + def upstream_types(self): + raise NotImplementedError("To be implemented in subclass") + + @property + def downstream_types(self): + raise NotImplementedError("To be implemented in subclass") + + def to_rest(self): + raise NotImplementedError("To be implemented in subclass") + + @classmethod + def from_rest(cls, node: rest.Node): + raise NotImplementedError("To be implemented in subclass") + + def submit(self): + raise NotImplementedError("To be implemented in subclass") diff --git a/python/knext/common/runnable.py b/python/knext/common/runnable.py new file mode 100644 index 00000000..0a165b19 --- /dev/null +++ b/python/knext/common/runnable.py @@ -0,0 +1,24 @@ +from pydantic import BaseConfig, BaseModel + + +class Runnable(BaseModel): + + last: bool = False + + @property + def input_types(self): + return + + @property + def output_types(self): + return + + def invoke(self, input): + raise NotImplementedError("To be implemented in subclass") + + def __rshift__(self, other): + raise NotImplementedError("To be implemented in subclass") + + class Config(BaseConfig): + + arbitrary_types_allowed = True \ No newline at end of file diff --git a/python/knext/component/base.py b/python/knext/component/base.py index 2c3a6b7f..f9e4fe72 100644 --- a/python/knext/component/base.py +++ b/python/knext/component/base.py @@ -11,12 +11,14 @@ # or implied. -from abc import ABC, abstractmethod +from abc import ABC from enum import Enum -from typing import List, Union, TypeVar, Generic, Any, Dict, Tuple, Type +from typing import List, Union, TypeVar, Type -from knext import rest +import networkx as nx +from knext.common.restable import RESTable +from knext.common.runnable import Runnable Other = TypeVar("Other") @@ -46,52 +48,7 @@ class PropertyHelper: pass -class RESTable(ABC): - - @property - def upstream_types(self): - raise NotImplementedError("To be implemented in subclass") - - @property - def downstream_types(self): - raise NotImplementedError("To be implemented in subclass") - - @abstractmethod - def to_rest(self): - raise NotImplementedError("To be implemented in subclass") - - @classmethod - def from_rest(cls, node: rest.Node): - raise NotImplementedError("To be implemented in subclass") - - @abstractmethod - def submit(self): - raise NotImplementedError("To be implemented in subclass") - - -class Runnable(ABC): - - @property - def input_types(self) -> Input: - return - - @property - def output_types(self) -> Output: - return - - @abstractmethod - def invoke(self, input: Input) -> Output: - raise NotImplementedError("To be implemented in subclass") - - def __rshift__( - self, - other: Type['Runnable'] - ) -> Type['Runnable']: - """Compose this runnable with another object to create a RunnableSequence.""" - return Chain(first=self, last=coerce_to_runnable(other)) - - -class Component(ABC): +class Component(Runnable, RESTable, ABC): """ Base class for all component. """ @@ -114,3 +71,50 @@ class Component(ABC): def to_dict(self): return self.__dict__ + def __hash__(self): + return id(self) + + def __eq__(self, other): + return hash(self) == hash(other) + + def __rshift__(self, other: Union[ + Type['Chain'], + List[Type['Chain']], + Type['Component'], + List[Type['Component']], + None + ]): + from knext.chain.base import Chain + if not other: + return self + if not isinstance(other, list): + other = [other] + dag_list = [] + for o in other: + if not o: + dag = nx.DiGraph() + self.last = True + dag.add_node(self) + print(dag.nodes) + dag_list.append(dag) + if isinstance(o, Component): + dag = nx.DiGraph() + dag.add_node(self) + dag.add_node(o) + dag.add_edge(self, o) + dag_list.append(dag) + elif isinstance(o, Chain): + dag = nx.DiGraph() + dag.add_node(self) + end_nodes = [node for node, out_degree in dag.out_degree() if out_degree == 0 or node.last] + start_nodes = [node for node, in_degree in o.dag.in_degree() if in_degree == 0] + + if len(end_nodes) > 0 and len(start_nodes) > 0: + for end_node in end_nodes: + for start_node in start_nodes: + combined_dag.add_edge(end_node, start_node) + combined_dag = nx.compose(dag, o.dag) + dag_list.append(combined_dag) + final_dag = nx.compose_all(dag_list) + + return Chain(dag=final_dag) diff --git a/python/knext/component/builder/__init__.py b/python/knext/component/builder/__init__.py index e69de29b..4bcc4d6d 100644 --- a/python/knext/component/builder/__init__.py +++ b/python/knext/component/builder/__init__.py @@ -0,0 +1,19 @@ + +from knext.component.builder.extractor import UserDefinedExtractor, LLMBasedExtractor, SPGExtractor +from knext.component.builder.mapping import SPGTypeMapping, RelationMapping, Mapping +from knext.component.builder.source_reader import CsvSourceReader, SourceReader +from knext.component.builder.sink_writer import KGSinkWriter, SinkWriter + + +__all__ = [ + "UserDefinedExtractor", + "LLMBasedExtractor", + "CsvSourceReader", + "SPGTypeMapping", + "RelationMapping", + "KGSinkWriter", + "SPGExtractor", + "Mapping", + "SourceReader", + "SinkWriter", +] diff --git a/python/knext/component/builder/base.py b/python/knext/component/builder/base.py new file mode 100644 index 00000000..c514d231 --- /dev/null +++ b/python/knext/component/builder/base.py @@ -0,0 +1,80 @@ +from abc import ABC +from typing import Union + +from knext.component.base import Component, ComponentTypeEnum, ComponentLabelEnum + + +class SPGExtractor(Component, ABC): + + @property + def upstream_types(self): + return Union[SourceReader, SPGExtractor] + + @property + def downstream_types(self): + return Union[SPGExtractor, Mapping] + + @property + def type(self): + return ComponentTypeEnum.Builder + + @property + def label(self): + return ComponentLabelEnum.Extractor + + +class Mapping(Component, ABC): + + @property + def upstream_types(self): + return Union[SourceReader, SPGExtractor] + + @property + def downstream_types(self): + return Union[SinkWriter] + + @property + def type(self): + return ComponentTypeEnum.Builder + + @property + def label(self): + return ComponentLabelEnum.Mapping + + +class SinkWriter(Component, ABC): + + @property + def upstream_types(self): + return Union[Mapping] + + @property + def downstream_types(self): + return None + + @property + def type(self): + return ComponentTypeEnum.Builder + + @property + def label(self): + return ComponentLabelEnum.SinkWriter + + +class SourceReader(Component, ABC): + + @property + def upstream_types(self): + return None + + @property + def downstream_types(self): + return Union[SPGExtractor, Mapping] + + @property + def type(self): + return ComponentTypeEnum.Builder + + @property + def label(self): + return ComponentLabelEnum.SourceReader diff --git a/python/knext/component/builder/extractor.py b/python/knext/component/builder/extractor.py index 2e343f67..0f972a50 100644 --- a/python/knext/component/builder/extractor.py +++ b/python/knext/component/builder/extractor.py @@ -1,33 +1,14 @@ -from abc import ABC -from typing import Union, Mapping, Dict, List +from typing import Union, Dict, List -from NN4K.invoker.base import ModelInvoker +from knext.component.builder.base import SPGExtractor +from knext.operator.spg_record import SPGRecord +from nn4k.invoker.base import NNInvoker from knext import rest -from knext.component.base import RESTable, Component, ComponentTypeEnum, ComponentLabelEnum, Runnable, Input, Output -from knext.component.builder.source_reader import SourceReader -from knext.core.builder.operator.model.op import PromptOp +from knext.component.base import SPGTypeHelper, PropertyHelper +from knext.operator.op import PromptOp, ExtractOp -class SPGExtractor(RESTable, Component, ABC): - - @property - def upstream_types(self): - return Union[SourceReader, SPGExtractor] - - @property - def downstream_types(self): - return Union[SPGExtractor, Mapping] - - @property - def type(self): - return ComponentTypeEnum.Builder - - @property - def label(self): - return ComponentLabelEnum.Extractor - - -class LLMBasedExtractor(Runnable, SPGExtractor): +class LLMBasedExtractor(SPGExtractor): """A Process Component that transforming unstructured data into structured data. Examples: @@ -40,7 +21,7 @@ class LLMBasedExtractor(Runnable, SPGExtractor): """All output column names after knowledge extraction processing.""" output_fields: List[str] """Knowledge extract operator of this component.""" - llm: ModelInvoker + llm: NNInvoker prompt_ops: List[PromptOp] @@ -49,11 +30,11 @@ class LLMBasedExtractor(Runnable, SPGExtractor): property_names: List[Union[str, PropertyHelper]] @property - def input_types(self) -> Input: + def input_types(self): return Dict[str, str] @property - def output_types(self) -> Output: + def output_types(self): return SPGRecord def to_rest(self): @@ -68,7 +49,7 @@ class LLMBasedExtractor(Runnable, SPGExtractor): return rest.Node(**super().to_dict(), node_config=config) - def invoke(self, input: Input) -> Output: + def invoke(self, input): pass @classmethod @@ -79,7 +60,7 @@ class LLMBasedExtractor(Runnable, SPGExtractor): pass -class UserDefinedExtractor(Runnable[Dict[str, str], Dict[str, str]], SPGExtractor): +class UserDefinedExtractor(SPGExtractor): """A Process Component that transforming unstructured data into structured data. Examples: @@ -95,11 +76,11 @@ class UserDefinedExtractor(Runnable[Dict[str, str], Dict[str, str]], SPGExtracto extract_op: ExtractOp @property - def input_types(self) -> Input: + def input_types(self): return Dict[str, str] @property - def output_types(self) -> Output: + def output_types(self): return Dict[str, str] @property diff --git a/python/knext/component/builder/mapping.py b/python/knext/component/builder/mapping.py index c35b65d5..ab188f5c 100644 --- a/python/knext/component/builder/mapping.py +++ b/python/knext/component/builder/mapping.py @@ -1,34 +1,20 @@ from abc import ABC +from collections import defaultdict from typing import Union, Dict, List, Tuple -from knext.component.base import RESTable, Component, ComponentTypeEnum, ComponentLabelEnum, Runnable -from knext.component.builder.extractor import SPGExtractor -from knext.component.builder.sink_writer import SinkWriter -from knext.component.builder.source_reader import SourceReader +from knext import rest + +from knext.component.base import SPGTypeHelper, PropertyHelper, MappingTypeEnum +from knext.component.builder.base import Mapping +from knext.operator.op import LinkOp from knext.operator.spg_record import SPGRecord -class Mapping(RESTable, Component, ABC): - - @property - def upstream_types(self): - return Union[SourceReader, SPGExtractor] - - @property - def downstream_types(self): - return Union[SinkWriter] - - @property - def type(self): - return ComponentTypeEnum.Builder - - @property - def label(self): - return ComponentLabelEnum.Mapping +class NormalizeOp: + pass - -class SPGTypeMapping(Runnable[Dict[str, str], SPGRecord], Mapping): +class SPGTypeMapping(Mapping): """A Process Component that mapping data to entity/event/concept type. Args: @@ -49,8 +35,8 @@ class SPGTypeMapping(Runnable[Dict[str, str], SPGRecord], Mapping): filters: List[Tuple[str, str]] = list() - def add_field(self, source_field: str, target_field: Union[str, PropertyHelper], link_op: LinkOp, - norm_op: NormalizeOp): + def add_field(self, source_field: str, target_field: Union[str, PropertyHelper], link_op: LinkOp = None, + norm_op: NormalizeOp = None): """Adds a field mapping from source data to property of spg_type. :param source_field: The source field to be mapped. @@ -124,8 +110,11 @@ class SPGTypeMapping(Runnable[Dict[str, str], SPGRecord], Mapping): ) return rest.Node(**super().to_dict(), node_config=config) + def submit(self): + pass -class RelationMappingComponent(Component): + +class RelationMapping(Mapping): """A Process Component that mapping data to relation type. Args: @@ -150,8 +139,6 @@ class RelationMappingComponent(Component): filters: List[Tuple[str, str]] = list() - RELATION_BASE_FIELDS = ["srcId", "dstId"] - def add_field(self, source_field: str, target_field: str): """Adds a field mapping from source data to property of spg_type. @@ -173,13 +160,8 @@ class RelationMappingComponent(Component): self.filters.append((column_name, column_value)) return self - def _to_rest(self): + def to_rest(self): """Transforms `RelationMappingComponent` to REST model `MappingNodeConfig`.""" - assert all( - field in self.mapping.keys() - for field in RelationMappingComponent.RELATION_BASE_FIELDS - ), f"{self.__class__.__name__} must include mapping to {str(RelationMappingComponent.RELATION_BASE_FIELDS)}" - mapping = defaultdict(list) for dst_name, src_name in self.mapping.items(): mapping[src_name].append(dst_name) diff --git a/python/knext/component/builder/sink_writer.py b/python/knext/component/builder/sink_writer.py index 8a0f2122..393753ed 100644 --- a/python/knext/component/builder/sink_writer.py +++ b/python/knext/component/builder/sink_writer.py @@ -1,30 +1,12 @@ from abc import ABC from ctypes import Union +from typing import Dict -from knext.component.base import RESTable, Component +from knext.component.base import Runnable +from knext.component.builder.base import SinkWriter -class SinkWriter(RESTable, Component, ABC): - - @property - def upstream_types(self): - return Union[Mapping, Evaluator] - - @property - def downstream_types(self): - return None - - @property - def type(self): - return ComponentTypeEnum.Builder - - @property - def label(self): - return ComponentLabelEnum.SinkWriter - - - -class KGSinkWriter(Runnable[Dict[str, str], None], SinkWriter): +class KGSinkWriter(SinkWriter): """The Sink Component that writing data to KG storage. Args: @@ -35,14 +17,14 @@ class KGSinkWriter(Runnable[Dict[str, str], None], SinkWriter): """ @property - def input_types(self) -> Input: + def input_types(self): return Dict[str, str] @property - def output_types(self) -> Output: + def output_types(self): return None - def invoke(self, input: Input) -> Output: + def invoke(self, input): pass def to_rest(self): @@ -53,3 +35,6 @@ class KGSinkWriter(Runnable[Dict[str, str], None], SinkWriter): }, **super().to_dict(), ) + + def submit(self): + pass \ No newline at end of file diff --git a/python/knext/component/builder/source_reader.py b/python/knext/component/builder/source_reader.py index 88f4fa12..21600e25 100644 --- a/python/knext/component/builder/source_reader.py +++ b/python/knext/component/builder/source_reader.py @@ -2,29 +2,7 @@ from abc import ABC from typing import Union, List, Dict from knext import rest - -from knext.component.base import RESTable, Component, ComponentTypeEnum, ComponentLabelEnum, Runnable, Input, Output -from knext.component.builder.extractor import SPGExtractor -from knext.component.builder.mapping import Mapping - - -class SourceReader(RESTable, Component, ABC): - - @property - def upstream_types(self): - return None - - @property - def downstream_types(self): - return Union[SPGExtractor, Mapping] - - @property - def type(self): - return ComponentTypeEnum.Builder - - @property - def label(self): - return ComponentLabelEnum.SourceReader +from knext.component.builder.base import SourceReader class CsvSourceReader(SourceReader): @@ -51,7 +29,7 @@ class CsvSourceReader(SourceReader): If the CSV file includes a header, it needs to be greater than or equal to 2.""" start_row: int - def invoke(self, input: Input) -> Output: + def invoke(self, input): pass def submit(self): diff --git a/python/knext/operator/base.py b/python/knext/operator/base.py index ed478a84..fbf5e0dd 100644 --- a/python/knext/operator/base.py +++ b/python/knext/operator/base.py @@ -15,7 +15,7 @@ from enum import Enum from typing import List, Dict, Any, Type from knext.operator.eval_result import EvalResult -from knext.operator.spg_record import Vertex +from knext.operator.spg_record import SPGRecord class OperatorTypeEnum(str, Enum): diff --git a/python/knext/operator/op.py b/python/knext/operator/op.py index a03c5986..43f96997 100644 --- a/python/knext/operator/op.py +++ b/python/knext/operator/op.py @@ -31,31 +31,6 @@ class ExtractOp(BaseOp, ABC): return EvalResult[List[SPGRecord]](output).to_dict() -class NormalizeOp(BaseOp, ABC): - """Base class for all property normalize operators.""" - - def __init__(self, params: Dict[str, str] = None): - super().__init__(params) - - def eval(self, property: str, record: SPGRecord) -> str: - raise NotImplementedError( - f"{self.__class__.__name__} need to implement `eval` method." - ) - - @staticmethod - def _pre_process(*inputs): - return inputs[0], SPGRecord.from_dict(inputs[1]) - - @staticmethod - def _post_process(output) -> Dict[str, Any]: - if isinstance(output, EvalResult): - return output.to_dict() - if isinstance(output, tuple): - return EvalResult[str](*output[:3]).to_dict() - else: - return EvalResult[str](output).to_dict() - - class LinkOp(BaseOp, ABC): """Base class for all entity link operators.""" diff --git a/python/knext/templates/schema_helper/${project}_schema_helper.py.tmpl b/python/knext/templates/schema_helper/${project}_schema_helper.py.tmpl new file mode 100644 index 00000000..2b21b57d --- /dev/null +++ b/python/knext/templates/schema_helper/${project}_schema_helper.py.tmpl @@ -0,0 +1,13 @@ + +# ATTENTION! +# This file is generated by Schema automatically, it will be refreshed after schema has been committed +# PLEASE DO NOT MODIFY THIS FILE!!! + +class ${namespace}: + def __init__(self): + self.BodyPart = self.BodyPart() + self.Disease = self.Disease() + self.Symptom = self.Symptom() + self.Drug = self.Drug() + self.HospitalDepartment = self.HospitalDepartment() + self.Indicator = self.Indicator() \ No newline at end of file diff --git a/python/tests/__init__.py b/python/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/python/tests/chain_test.py b/python/tests/chain_test.py new file mode 100644 index 00000000..07c9e086 --- /dev/null +++ b/python/tests/chain_test.py @@ -0,0 +1,33 @@ +import networkx as nx +import matplotlib.pyplot as plt + +from knext.api.component import SPGTypeMapping +from knext.api.component import KGSinkWriter +from knext.api.component import CsvSourceReader + +if __name__ == '__main__': + source = CsvSourceReader( + local_path="./builder/job/data/BodyPart.csv", columns=["id"], start_row=1 + ) + + mapping1 = SPGTypeMapping(spg_type_name="Medical.BodyPart").add_field( + "id", "Medical.BodyPart.id" + ) + + mapping2 = SPGTypeMapping(spg_type_name="Medical.BodyPart").add_field( + "id", "Medical.BodyPart.id1" + ) + + sink = KGSinkWriter() + sink2 = KGSinkWriter() + + builder_chain = source >> [mapping1, None] >> sink2 + + print(builder_chain.dag.edges) + + G = builder_chain.dag + # 绘制图形 + nx.draw(G, with_labels=True, arrows=True) + + # 显示图形 + plt.show()