This commit is contained in:
baifuyu 2023-12-25 19:34:37 +08:00
parent cdeba0cded
commit 3047c75288
4 changed files with 22 additions and 25 deletions

View File

@ -41,7 +41,12 @@ public abstract class BaseMappingProcessor<T extends BaseMappingNodeConfig>
}
protected static boolean isFiltered(
BuilderRecord record, List<BaseMappingNodeConfig.MappingFilter> mappingFilters) {
BuilderRecord record,
List<BaseMappingNodeConfig.MappingFilter> mappingFilters,
BaseSPGIdentifier identifier) {
if (record.getIdentifier() != null && !record.getIdentifier().equals(identifier)) {
return true;
}
if (CollectionUtils.isEmpty(mappingFilters)) {
return false;
}

View File

@ -16,19 +16,21 @@ import java.util.List;
public class RelationMappingProcessor extends BaseMappingProcessor<RelationMappingNodeConfig> {
private final RelationIdentifier identifier;
private Relation relation;
private RecordLinking recordLinking;
public RelationMappingProcessor(String id, String name, RelationMappingNodeConfig config) {
super(id, name, config);
this.identifier = RelationIdentifier.parse(config.getRelation());
}
@Override
public void doInit(BuilderContext context) throws BuilderException {
super.doInit(context);
RelationIdentifier identifier = RelationIdentifier.parse(config.getRelation());
this.relation = (Relation) loadSchema(identifier, context.getCatalog());
this.recordLinking = new RecordLinkingImpl(config.getMappingConfigs());
this.recordLinking.setDefaultPropertyLinking(new SearchBasedLinking());
this.recordLinking.init(context);
@ -39,30 +41,18 @@ public class RelationMappingProcessor extends BaseMappingProcessor<RelationMappi
List<BaseRecord> spgRecords = new ArrayList<>(inputs.size());
for (BaseRecord baseRecord : inputs) {
BuilderRecord record = (BuilderRecord) baseRecord;
RelationRecord relationRecord =
relationRecordMapping(record, relation, config, recordLinking);
if (relationRecord != null) {
spgRecords.add(relationRecord);
if (isFiltered(record, config.getMappingFilters(), identifier)) {
continue;
}
BuilderRecord mappedRecord = mapping(record, config.getMappingConfigs());
RelationRecord relationRecord = toSPGRecord(mappedRecord, relation);
recordLinking.linking(relationRecord);
spgRecords.add(relationRecord);
}
return spgRecords;
}
private static RelationRecord relationRecordMapping(
BuilderRecord record,
Relation relation,
RelationMappingNodeConfig mappingConfig,
RecordLinking recordLinking) {
if (isFiltered(record, mappingConfig.getMappingFilters())) {
return null;
}
BuilderRecord mappedRecord = mapping(record, mappingConfig.getMappingConfigs());
RelationRecord relationRecord = toSPGRecord(mappedRecord, relation);
recordLinking.linking(relationRecord);
return relationRecord;
}
@Override
public void close() throws Exception {}
}

View File

@ -36,6 +36,7 @@ import lombok.extern.slf4j.Slf4j;
@SuppressWarnings({"unchecked", "rawtypes"})
public class SPGTypeMappingProcessor extends BaseMappingProcessor<SPGTypeMappingNodeConfig> {
private final SPGTypeIdentifier identifier;
private BaseSPGType spgType;
private RecordLinking recordLinking;
private RecordPredicting recordPredicting;
@ -43,13 +44,13 @@ public class SPGTypeMappingProcessor extends BaseMappingProcessor<SPGTypeMapping
public SPGTypeMappingProcessor(String id, String name, SPGTypeMappingNodeConfig config) {
super(id, name, config);
this.identifier = SPGTypeIdentifier.parse(config.getSpgType());
}
@Override
public void doInit(BuilderContext context) throws BuilderException {
super.doInit(context);
SPGTypeIdentifier identifier = SPGTypeIdentifier.parse(config.getSpgType());
this.spgType = (BaseSPGType) loadSchema(identifier, context.getCatalog());
this.recordLinking = new RecordLinkingImpl(config.getMappingConfigs());
@ -68,7 +69,7 @@ public class SPGTypeMappingProcessor extends BaseMappingProcessor<SPGTypeMapping
List<BaseAdvancedRecord> advancedRecords = new ArrayList<>(inputs.size());
for (BaseRecord baseRecord : inputs) {
BuilderRecord record = (BuilderRecord) baseRecord;
if (isFiltered(record, config.getMappingFilters())) {
if (isFiltered(record, config.getMappingFilters(), identifier)) {
continue;
}

View File

@ -25,6 +25,7 @@ import lombok.extern.slf4j.Slf4j;
@SuppressWarnings({"unchecked", "rawtypes"})
public class SubGraphMappingProcessor extends BaseMappingProcessor<SubGraphMappingNodeConfig> {
private final SPGTypeIdentifier identifier;
private BaseSPGType spgType;
private SubGraphFusing subGraphFusing;
private RecordPredicting recordPredicating;
@ -33,13 +34,13 @@ public class SubGraphMappingProcessor extends BaseMappingProcessor<SubGraphMappi
public SubGraphMappingProcessor(String id, String name, SubGraphMappingNodeConfig config) {
super(id, name, config);
this.identifier = SPGTypeIdentifier.parse(config.getSpgType());
}
@Override
public void doInit(BuilderContext context) throws BuilderException {
super.doInit(context);
SPGTypeIdentifier identifier = SPGTypeIdentifier.parse(config.getSpgType());
this.spgType = (BaseSPGType) loadSchema(identifier, context.getCatalog());
this.recordLinking = new RecordLinkingImpl();
@ -61,7 +62,7 @@ public class SubGraphMappingProcessor extends BaseMappingProcessor<SubGraphMappi
List<BaseAdvancedRecord> advancedRecords = new ArrayList<>(inputs.size());
for (BaseRecord baseRecord : inputs) {
BuilderRecord record = (BuilderRecord) baseRecord;
if (isFiltered(record, config.getMappingFilters())) {
if (isFiltered(record, config.getMappingFilters(), identifier)) {
continue;
}