286 lines
8.5 KiB
Python
Raw Normal View History

2023-12-06 17:26:39 +08:00
from abc import ABC
2023-12-08 11:25:26 +08:00
from collections import defaultdict
2023-12-11 10:44:37 +08:00
from enum import Enum
from typing import Union, Dict, List, Tuple, Sequence, Any
2023-12-06 17:26:39 +08:00
2023-12-08 11:25:26 +08:00
from knext import rest
2023-12-11 10:44:37 +08:00
from knext.common.runnable import Input, Output
2023-12-06 17:26:39 +08:00
2023-12-11 10:44:37 +08:00
from knext.common.schema_helper import SPGTypeHelper, PropertyHelper
2023-12-08 11:25:26 +08:00
from knext.component.builder.base import Mapping
from knext.operator.op import LinkOp
from knext.operator.spg_record import SPGRecord
2023-12-06 17:26:39 +08:00
2023-12-11 10:44:37 +08:00
class MappingTypeEnum(str, Enum):
SPGType = "SPG_TYPE"
Relation = "RELATION"
class LinkStrategyEnum(str, Enum):
IDEqual = "ID_EQUAL"
Search = "SEARCH"
SPG_TYPE_BASE_FIELDS = ["id"]
RELATION_BASE_FIELDS = ["src_id", "dst_id"]
2023-12-06 17:26:39 +08:00
2023-12-08 11:25:26 +08:00
class SPGTypeMapping(Mapping):
2023-12-06 17:26:39 +08:00
"""A Process Component that mapping data to entity/event/concept type.
Args:
spg_type_name: The SPG type name import from SPGTypeHelper.
Examples:
mapping = SPGTypeMapping(
spg_type_name=DEFAULT.App
).add_field("id", DEFAULT.App.id) \
.add_field("id", DEFAULT.App.name) \
.add_field("riskMark", DEFAULT.App.riskMark) \
.add_field("useCert", DEFAULT.App.useCert)
"""
spg_type_name: Union[str, SPGTypeHelper]
2023-12-11 10:44:37 +08:00
mapping: Dict[str, List[str]] = defaultdict(list)
2023-12-06 17:26:39 +08:00
filters: List[Tuple[str, str]] = list()
2023-12-11 10:44:37 +08:00
link_strategies: Dict[str, Union[LinkStrategyEnum, LinkOp]] = dict()
@property
def input_types(self) -> Input:
return Dict[str, str]
@property
def output_types(self) -> Output:
return SPGRecord
@property
def input_keys(self):
return None
@property
def output_keys(self):
return self.output_fields
def add_field(self, source_field: str, target_field: Union[str, PropertyHelper], link_strategy: Union[LinkStrategyEnum ,LinkOp] = None):
2023-12-06 17:26:39 +08:00
"""Adds a field mapping from source data to property of spg_type.
:param source_field: The source field to be mapped.
:param target_field: The target field to map the source field to.
:return: self
"""
2023-12-11 10:44:37 +08:00
self.mapping[source_field].append(target_field)
self.link_strategies[target_field] = link_strategy
2023-12-06 17:26:39 +08:00
return self
def add_filter(self, column_name: str, column_value: str):
"""Adds data filtering rule.
Only the column that meets `column_name=column_value` will execute the mapping.
:param column_name: The column name to be filtered.
:param column_value: The column value to be filtered.
:return: self
"""
self.filters.append((column_name, column_value))
return self
def to_rest(self):
"""
Transforms `EntityMappingComponent` to REST model `MappingNodeConfig`.
"""
schema = {}
2023-12-11 10:44:37 +08:00
#TODO generate schema with link_strategy
2023-12-06 17:26:39 +08:00
mapping_filters = [
rest.MappingFilter(column_name=name, column_value=value)
for name, value in self.filters
]
mapping_configs = [
rest.MappingConfig(source=src_name, target=tgt_names)
2023-12-11 10:44:37 +08:00
for src_name, tgt_names in self.mapping.items()
2023-12-06 17:26:39 +08:00
]
mapping_schemas = [
rest.MappingSchema(name, operator_config=operator_config)
for name, operator_config in schema.items()
]
2023-12-11 10:44:37 +08:00
node_config_list = []
2023-12-06 17:26:39 +08:00
config = rest.MappingNodeConfig(
spg_name=self.spg_type_name,
mapping_type=self.mapping_type,
mapping_filters=mapping_filters,
mapping_schemas=mapping_schemas,
mapping_configs=mapping_configs,
)
return rest.Node(**super().to_dict(), node_config=config)
2023-12-11 10:44:37 +08:00
def invoke(self, input: Input) -> Sequence[Output]:
pass
@classmethod
def from_rest(cls, node: rest.Node):
pass
2023-12-08 11:25:26 +08:00
def submit(self):
pass
2023-12-06 17:26:39 +08:00
2023-12-08 11:25:26 +08:00
class RelationMapping(Mapping):
2023-12-06 17:26:39 +08:00
"""A Process Component that mapping data to relation type.
Args:
subject_name: The subject name import from SPGTypeHelper.
predicate_name: The predicate name.
object_name: The object name import from SPGTypeHelper.
Examples:
mapping = RelationMappingComponent(
subject_name=DEFAULT.App,
predicate_name=DEFAULT.App.useCert,
object_name=DEFAULT.Cert,
).add_field("src_id", "srcId") \
.add_field("dst_id", "dstId")
"""
subject_name: Union[str, SPGTypeHelper]
predicate_name: Union[str, PropertyHelper]
object_name: Union[str, SPGTypeHelper]
2023-12-11 10:44:37 +08:00
mapping: Dict[str, List[str]] = defaultdict(list)
2023-12-06 17:26:39 +08:00
filters: List[Tuple[str, str]] = list()
def add_field(self, source_field: str, target_field: str):
"""Adds a field mapping from source data to property of spg_type.
:param source_field: The source field to be mapped.
:param target_field: The target field to map the source field to.
:return: self
"""
2023-12-11 10:44:37 +08:00
self.mapping[source_field].append(target_field)
2023-12-06 17:26:39 +08:00
return self
def add_filter(self, column_name: str, column_value: str):
"""Adds data filtering rule.
Only the column that meets `column_ame=column_value` will execute the mapping.
:param column_name: The column name to be filtered.
:param column_value: The column value to be filtered.
:return: self
"""
self.filters.append((column_name, column_value))
return self
2023-12-08 11:25:26 +08:00
def to_rest(self):
2023-12-06 17:26:39 +08:00
"""Transforms `RelationMappingComponent` to REST model `MappingNodeConfig`."""
mapping_filters = [
rest.MappingFilter(column_name=name, column_value=value)
for name, value in self.filters
]
mapping_configs = [
rest.MappingConfig(source=src_name, target=tgt_names)
2023-12-11 10:44:37 +08:00
for src_name, tgt_names in self.mapping.items()
2023-12-06 17:26:39 +08:00
]
mapping_schemas = []
config = rest.MappingNodeConfig(
spg_name=f"{self.subject_name}_{self.predicate_name}_{self.object_name}",
mapping_type=MappingTypeEnum.Relation,
mapping_filters=mapping_filters,
mapping_schemas=mapping_schemas,
mapping_configs=mapping_configs,
)
return rest.Node(**super().to_dict(), node_config=config)
2023-12-11 10:44:37 +08:00
def invoke(self, input: Input) -> Sequence[Output]:
pass
@classmethod
def from_rest(cls, node: rest.Node):
pass
def submit(self):
pass
class SubGraphMapping(Mapping):
spg_type_name: Union[str, SPGTypeHelper]
mapping: Dict[str, List[str]] = defaultdict(list)
filters: List[Tuple[str, str]] = list()
link_strategies: Dict[str, Union[LinkStrategyEnum, LinkOp]] = dict()
@property
def input_types(self) -> Input:
return Dict[str, str]
@property
def output_types(self) -> Output:
return SPGRecord
@property
def input_keys(self):
return None
@property
def output_keys(self):
return self.output_fields
def add_field(self, source_field: str, target_field: Union[str, PropertyHelper], link_strategy: Union[LinkStrategyEnum ,LinkOp] = None):
"""Adds a field mapping from source data to property of spg_type.
:param source_field: The source field to be mapped.
:param target_field: The target field to map the source field to.
:return: self
"""
self.mapping[source_field].append(target_field)
self.link_strategies[target_field] = link_strategy
return self
def invoke(self, input: Input) -> Sequence[Output]:
pass
def to_rest(self) -> rest.Node:
schema = {}
# TODO generate schema with link_strategy
mapping_filters = [
rest.MappingFilter(column_name=name, column_value=value)
for name, value in self.filters
]
mapping_configs = [
rest.MappingConfig(source=src_name, target=tgt_names)
for src_name, tgt_names in self.mapping.items()
]
mapping_schemas = [
rest.MappingSchema(name, operator_config=operator_config)
for name, operator_config in schema.items()
]
config = rest.MappingNodeConfig(
spg_name=self.spg_type_name,
mapping_type=self.mapping_type,
mapping_filters=mapping_filters,
mapping_schemas=mapping_schemas,
mapping_configs=mapping_configs,
)
return rest.Node(**super().to_dict(), node_config=config)
@classmethod
def from_rest(cls, node: rest.Node):
pass
def submit(self):
pass