89 lines
1.7 KiB
Python
Raw Normal View History

2023-12-08 11:25:26 +08:00
from abc import ABC
2023-12-11 10:44:37 +08:00
from enum import Enum
2023-12-08 11:25:26 +08:00
from typing import Union
2023-12-11 10:44:37 +08:00
from knext.component.base import Component, ComponentTypeEnum
2023-12-08 11:25:26 +08:00
2023-12-11 10:44:37 +08:00
class ComponentLabelEnum(str, Enum):
SourceReader = "SOURCE_READER"
Extractor = "EXTRACTOR"
Mapping = "MAPPING"
Evaluator = "EVALUATOR"
SinkWriter = "SINK_WRITER"
class BuilderComponent(Component, ABC):
@property
def type(self):
return ComponentTypeEnum.Builder
@property
def input_keys(self):
return
@property
def output_keys(self):
return
class SourceReader(BuilderComponent, ABC):
2023-12-08 11:25:26 +08:00
@property
def upstream_types(self):
2023-12-11 10:44:37 +08:00
return None
2023-12-08 11:25:26 +08:00
@property
def downstream_types(self):
return Union[SPGExtractor, Mapping]
@property
def label(self):
2023-12-11 10:44:37 +08:00
return ComponentLabelEnum.SourceReader
2023-12-08 11:25:26 +08:00
2023-12-11 10:44:37 +08:00
class SPGExtractor(BuilderComponent, ABC):
2023-12-08 11:25:26 +08:00
@property
def upstream_types(self):
return Union[SourceReader, SPGExtractor]
@property
def downstream_types(self):
2023-12-11 10:44:37 +08:00
return Union[SPGExtractor, Mapping]
2023-12-08 11:25:26 +08:00
@property
def label(self):
2023-12-11 10:44:37 +08:00
return ComponentLabelEnum.Extractor
2023-12-08 11:25:26 +08:00
2023-12-11 10:44:37 +08:00
class Mapping(BuilderComponent, ABC):
2023-12-08 11:25:26 +08:00
@property
def upstream_types(self):
2023-12-11 10:44:37 +08:00
return Union[SourceReader, SPGExtractor]
2023-12-08 11:25:26 +08:00
@property
def downstream_types(self):
2023-12-11 10:44:37 +08:00
return Union[SinkWriter]
2023-12-08 11:25:26 +08:00
@property
def label(self):
2023-12-11 10:44:37 +08:00
return ComponentLabelEnum.Mapping
2023-12-08 11:25:26 +08:00
2023-12-11 10:44:37 +08:00
class SinkWriter(BuilderComponent, ABC):
2023-12-08 11:25:26 +08:00
@property
def upstream_types(self):
2023-12-11 10:44:37 +08:00
return Union[Mapping]
2023-12-08 11:25:26 +08:00
@property
def downstream_types(self):
2023-12-11 10:44:37 +08:00
return None
2023-12-08 11:25:26 +08:00
@property
def label(self):
2023-12-11 10:44:37 +08:00
return ComponentLabelEnum.SinkWriter