mirror of
https://github.com/OpenSPG/openspg.git
synced 2025-11-11 16:08:49 +00:00
bugfix
This commit is contained in:
parent
a008ae9a16
commit
4f5cfa743a
@ -31,6 +31,10 @@
|
||||
<groupId>com.antgroup.openspg</groupId>
|
||||
<artifactId>cloudext-interface-graph-store</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.antgroup.openspg</groupId>
|
||||
<artifactId>cloudext-impl-graph-store-tugraph</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.antgroup.openspg</groupId>
|
||||
<artifactId>cloudext-interface-table-store</artifactId>
|
||||
|
||||
@ -0,0 +1,26 @@
|
||||
package com.antgroup.openspg.builder.core.normalize;
|
||||
|
||||
import com.antgroup.openspg.builder.core.normalize.impl.IdEqualsPropertyNormalizer;
|
||||
import com.antgroup.openspg.builder.core.normalize.impl.OperatorPropertyNormalizer;
|
||||
import com.antgroup.openspg.builder.core.normalize.impl.SearchPropertyNormalizer;
|
||||
import com.antgroup.openspg.builder.model.pipeline.config.OperatorPropertyNormalizerConfig;
|
||||
import com.antgroup.openspg.builder.model.pipeline.config.PropertyNormalizerConfig;
|
||||
import com.antgroup.openspg.builder.model.pipeline.config.SearchPropertyNormalizerConfig;
|
||||
import com.antgroup.openspg.builder.model.pipeline.enums.PropertyNormalizerTypeEnum;
|
||||
|
||||
public abstract class AdvancedPropertyNormalizer implements PropertyNormalizer {
|
||||
|
||||
public static AdvancedPropertyNormalizer getPropertyNormalizer(PropertyNormalizerConfig config) {
|
||||
PropertyNormalizerTypeEnum normalizerType = config.getNormalizerType();
|
||||
switch (normalizerType) {
|
||||
case OPERATOR:
|
||||
return new OperatorPropertyNormalizer((OperatorPropertyNormalizerConfig) config);
|
||||
case SEARCH:
|
||||
return new SearchPropertyNormalizer((SearchPropertyNormalizerConfig) config);
|
||||
case ID_EQUALS:
|
||||
return IdEqualsPropertyNormalizer.INSTANCE;
|
||||
default:
|
||||
throw new IllegalArgumentException("illegal property mounter type=" + normalizerType);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,22 @@
|
||||
package com.antgroup.openspg.builder.core.normalize;
|
||||
|
||||
import com.antgroup.openspg.builder.core.runtime.BuilderContext;
|
||||
import com.antgroup.openspg.builder.model.exception.BuilderException;
|
||||
import com.antgroup.openspg.builder.model.exception.PropertyNormalizeException;
|
||||
import com.antgroup.openspg.builder.model.record.property.SPGPropertyRecord;
|
||||
|
||||
/**
|
||||
* 属性标准化,针对以下情况会执行对应的标准化操作:
|
||||
*
|
||||
* <ul>
|
||||
* <li>1. 当属性是基础类型时,则对属性进行类型校验及转化到正确类型
|
||||
* <li>2. 当属性是非基础类型时,则对属性进行属性挂载
|
||||
* </ul>
|
||||
*/
|
||||
public interface PropertyNormalizer {
|
||||
/** 初始化属性标准策略 */
|
||||
void init(BuilderContext context) throws BuilderException;
|
||||
|
||||
/** 输入一条spg属性记录,对该属性进行标准化 */
|
||||
void propertyNormalize(SPGPropertyRecord record) throws PropertyNormalizeException;
|
||||
}
|
||||
@ -0,0 +1,13 @@
|
||||
package com.antgroup.openspg.builder.core.normalize;
|
||||
|
||||
import com.antgroup.openspg.builder.core.runtime.BuilderContext;
|
||||
import com.antgroup.openspg.builder.model.exception.BuilderException;
|
||||
import com.antgroup.openspg.builder.model.exception.PropertyNormalizeException;
|
||||
import com.antgroup.openspg.builder.model.record.BaseSPGRecord;
|
||||
|
||||
public interface RecordNormalizer {
|
||||
|
||||
void init(BuilderContext context) throws BuilderException;
|
||||
|
||||
void propertyNormalize(BaseSPGRecord spgRecord) throws PropertyNormalizeException;
|
||||
}
|
||||
@ -0,0 +1,40 @@
|
||||
package com.antgroup.openspg.builder.core.normalize;
|
||||
|
||||
import com.antgroup.openspg.builder.core.runtime.BuilderContext;
|
||||
import com.antgroup.openspg.builder.model.exception.BuilderException;
|
||||
import com.antgroup.openspg.builder.model.exception.PropertyNormalizeException;
|
||||
import com.antgroup.openspg.builder.model.pipeline.config.BaseMappingNodeConfig;
|
||||
import com.antgroup.openspg.builder.model.record.BaseSPGRecord;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.commons.collections4.CollectionUtils;
|
||||
|
||||
public class RecordNormalizerImpl implements RecordNormalizer {
|
||||
|
||||
private final List<BaseMappingNodeConfig.MappingConfig> mappingConfigs;
|
||||
private final Map<String, List<AdvancedPropertyNormalizer>> propertyNormalizers;
|
||||
|
||||
public RecordNormalizerImpl(List<BaseMappingNodeConfig.MappingConfig> mappingConfigs) {
|
||||
this.mappingConfigs = mappingConfigs;
|
||||
this.propertyNormalizers = new HashMap<>(mappingConfigs.size());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(BuilderContext context) throws BuilderException {
|
||||
if (CollectionUtils.isEmpty(mappingConfigs)) {
|
||||
return;
|
||||
}
|
||||
for (BaseMappingNodeConfig.MappingConfig mappingConfig : mappingConfigs) {
|
||||
propertyNormalizers.put(
|
||||
mappingConfig.getTarget(),
|
||||
mappingConfig.getMounterConfigs().stream()
|
||||
.map(AdvancedPropertyNormalizer::getPropertyNormalizer)
|
||||
.collect(Collectors.toList()));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void propertyNormalize(BaseSPGRecord spgRecord) throws PropertyNormalizeException {}
|
||||
}
|
||||
@ -0,0 +1,42 @@
|
||||
package com.antgroup.openspg.builder.core.normalize.impl;
|
||||
|
||||
import com.antgroup.openspg.builder.core.normalize.PropertyNormalizer;
|
||||
import com.antgroup.openspg.builder.core.runtime.BuilderContext;
|
||||
import com.antgroup.openspg.builder.model.exception.BuilderException;
|
||||
import com.antgroup.openspg.builder.model.exception.PropertyNormalizeException;
|
||||
import com.antgroup.openspg.builder.model.record.property.SPGPropertyRecord;
|
||||
import com.antgroup.openspg.core.schema.model.type.BasicTypeEnum;
|
||||
import com.antgroup.openspg.core.schema.model.type.SPGTypeRef;
|
||||
|
||||
public class BasicPropertyNormalizer implements PropertyNormalizer {
|
||||
@Override
|
||||
public void init(BuilderContext context) throws BuilderException {}
|
||||
|
||||
@Override
|
||||
public void propertyNormalize(SPGPropertyRecord record) throws PropertyNormalizeException {
|
||||
SPGTypeRef objectTypeRef = record.getObjectTypeRef();
|
||||
if (!objectTypeRef.isBasicType()) {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
||||
BasicTypeEnum basicType = BasicTypeEnum.from(objectTypeRef.getName());
|
||||
Object stdValue = null;
|
||||
String rawValue = record.getValue().getRaw();
|
||||
try {
|
||||
switch (basicType) {
|
||||
case LONG:
|
||||
stdValue = Long.valueOf(rawValue);
|
||||
break;
|
||||
case DOUBLE:
|
||||
stdValue = Double.valueOf(rawValue);
|
||||
break;
|
||||
default:
|
||||
stdValue = rawValue;
|
||||
break;
|
||||
}
|
||||
} catch (NumberFormatException e) {
|
||||
throw new PropertyNormalizeException(e, "");
|
||||
}
|
||||
record.getValue().setSingleStd(stdValue);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,20 @@
|
||||
package com.antgroup.openspg.builder.core.normalize.impl;
|
||||
|
||||
import com.antgroup.openspg.builder.core.normalize.AdvancedPropertyNormalizer;
|
||||
import com.antgroup.openspg.builder.core.runtime.BuilderContext;
|
||||
import com.antgroup.openspg.builder.model.exception.BuilderException;
|
||||
import com.antgroup.openspg.builder.model.exception.PropertyNormalizeException;
|
||||
import com.antgroup.openspg.builder.model.record.property.SPGPropertyRecord;
|
||||
|
||||
public class IdEqualsPropertyNormalizer extends AdvancedPropertyNormalizer {
|
||||
|
||||
public static final IdEqualsPropertyNormalizer INSTANCE = new IdEqualsPropertyNormalizer();
|
||||
|
||||
private IdEqualsPropertyNormalizer() {}
|
||||
|
||||
@Override
|
||||
public void init(BuilderContext context) throws BuilderException {}
|
||||
|
||||
@Override
|
||||
public void propertyNormalize(SPGPropertyRecord record) throws PropertyNormalizeException {}
|
||||
}
|
||||
@ -1,14 +1,14 @@
|
||||
package com.antgroup.openspg.builder.core.semantic.impl;
|
||||
package com.antgroup.openspg.builder.core.normalize.impl;
|
||||
|
||||
import com.antgroup.openspg.builder.core.physical.operator.OperatorFactory;
|
||||
import com.antgroup.openspg.builder.core.physical.operator.PythonOperatorFactory;
|
||||
import com.antgroup.openspg.builder.core.physical.operator.protocol.EvalResult;
|
||||
import com.antgroup.openspg.builder.core.physical.operator.protocol.Vertex;
|
||||
import com.antgroup.openspg.builder.core.normalize.AdvancedPropertyNormalizer;
|
||||
import com.antgroup.openspg.builder.core.runtime.BuilderContext;
|
||||
import com.antgroup.openspg.builder.core.semantic.PropertyMounter;
|
||||
import com.antgroup.openspg.builder.model.exception.BuilderException;
|
||||
import com.antgroup.openspg.builder.model.exception.PropertyMounterException;
|
||||
import com.antgroup.openspg.builder.model.pipeline.config.OperatorPropertyMounterConfig;
|
||||
import com.antgroup.openspg.builder.model.exception.PropertyNormalizeException;
|
||||
import com.antgroup.openspg.builder.model.pipeline.config.OperatorPropertyNormalizerConfig;
|
||||
import com.antgroup.openspg.builder.model.record.property.SPGPropertyRecord;
|
||||
import com.antgroup.openspg.core.schema.model.type.OperatorKey;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
@ -17,14 +17,14 @@ import java.util.*;
|
||||
import org.apache.commons.collections4.CollectionUtils;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public class OperatorPropertyMounter implements PropertyMounter {
|
||||
public class OperatorPropertyNormalizer extends AdvancedPropertyNormalizer {
|
||||
|
||||
private static final ObjectMapper mapper = new ObjectMapper();
|
||||
private final OperatorKey operatorKey;
|
||||
private final OperatorPropertyMounterConfig mounterConfig;
|
||||
private final OperatorPropertyNormalizerConfig mounterConfig;
|
||||
private OperatorFactory operatorFactory;
|
||||
|
||||
public OperatorPropertyMounter(OperatorPropertyMounterConfig config) {
|
||||
public OperatorPropertyNormalizer(OperatorPropertyNormalizerConfig config) {
|
||||
this.mounterConfig = config;
|
||||
this.operatorKey = config.getConfig().toKey();
|
||||
}
|
||||
@ -37,7 +37,7 @@ public class OperatorPropertyMounter implements PropertyMounter {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void propertyMount(SPGPropertyRecord record) throws PropertyMounterException {
|
||||
public void propertyNormalize(SPGPropertyRecord record) throws PropertyNormalizeException {
|
||||
List<String> rawValues = record.getRawValues();
|
||||
|
||||
// todo
|
||||
@ -0,0 +1,25 @@
|
||||
package com.antgroup.openspg.builder.core.normalize.impl;
|
||||
|
||||
import com.antgroup.openspg.builder.core.normalize.AdvancedPropertyNormalizer;
|
||||
import com.antgroup.openspg.builder.core.runtime.BuilderContext;
|
||||
import com.antgroup.openspg.builder.model.exception.BuilderException;
|
||||
import com.antgroup.openspg.builder.model.exception.PropertyNormalizeException;
|
||||
import com.antgroup.openspg.builder.model.pipeline.config.SearchPropertyNormalizerConfig;
|
||||
import com.antgroup.openspg.builder.model.record.property.SPGPropertyRecord;
|
||||
|
||||
public class SearchPropertyNormalizer extends AdvancedPropertyNormalizer {
|
||||
|
||||
private final SearchPropertyNormalizerConfig config;
|
||||
|
||||
public SearchPropertyNormalizer(SearchPropertyNormalizerConfig config) {
|
||||
this.config = config;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(BuilderContext context) throws BuilderException {}
|
||||
|
||||
@Override
|
||||
public void propertyNormalize(SPGPropertyRecord record) throws PropertyNormalizeException {
|
||||
|
||||
}
|
||||
}
|
||||
@ -1,16 +1,16 @@
|
||||
package com.antgroup.openspg.builder.core.physical.process;
|
||||
|
||||
import com.antgroup.openspg.builder.core.semantic.PropertyMounter;
|
||||
import com.antgroup.openspg.builder.core.semantic.PropertyMounterFactory;
|
||||
import com.antgroup.openspg.builder.model.pipeline.config.BaseMappingNodeConfig;
|
||||
import com.antgroup.openspg.builder.model.record.BuilderRecord;
|
||||
import com.antgroup.openspg.core.schema.model.BaseOntology;
|
||||
import com.antgroup.openspg.core.schema.model.identifier.BaseSPGIdentifier;
|
||||
import com.antgroup.openspg.core.schema.model.identifier.RelationIdentifier;
|
||||
import com.antgroup.openspg.core.schema.model.identifier.SPGIdentifierTypeEnum;
|
||||
import com.antgroup.openspg.core.schema.model.identifier.SPGTypeIdentifier;
|
||||
import com.antgroup.openspg.core.schema.model.type.ProjectSchema;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.commons.collections4.CollectionUtils;
|
||||
|
||||
public abstract class BaseMappingProcessor<T extends BaseMappingNodeConfig>
|
||||
@ -21,24 +21,15 @@ public abstract class BaseMappingProcessor<T extends BaseMappingNodeConfig>
|
||||
}
|
||||
|
||||
protected BaseOntology loadSchema(BaseSPGIdentifier identifier, ProjectSchema projectSchema) {
|
||||
// todo
|
||||
return null;
|
||||
SPGIdentifierTypeEnum identifierType = identifier.getIdentifierType();
|
||||
switch (identifierType) {
|
||||
case SPG_TYPE:
|
||||
return projectSchema.getByName((SPGTypeIdentifier) identifier);
|
||||
case RELATION:
|
||||
return projectSchema.getByName((RelationIdentifier) identifier);
|
||||
default:
|
||||
throw new IllegalArgumentException("illegal identifier type=" + identifierType);
|
||||
}
|
||||
|
||||
protected Map<String, List<PropertyMounter>> loadPropertyMounters(
|
||||
List<BaseMappingNodeConfig.MappingConfig> mappingConfigs) {
|
||||
if (CollectionUtils.isEmpty(mappingConfigs)) {
|
||||
return new HashMap<>(0);
|
||||
}
|
||||
Map<String, List<PropertyMounter>> results = new HashMap<>(mappingConfigs.size());
|
||||
for (BaseMappingNodeConfig.MappingConfig mappingConfig : mappingConfigs) {
|
||||
List<PropertyMounter> propertyMounters =
|
||||
mappingConfig.getMounterConfigs().stream()
|
||||
.map(PropertyMounterFactory::getPropertyMounter)
|
||||
.collect(Collectors.toList());
|
||||
results.put(mappingConfig.getTarget(), propertyMounters);
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
protected static boolean isFiltered(
|
||||
|
||||
@ -1,5 +1,7 @@
|
||||
package com.antgroup.openspg.builder.core.physical.process;
|
||||
|
||||
import com.antgroup.openspg.builder.core.normalize.RecordNormalizer;
|
||||
import com.antgroup.openspg.builder.core.normalize.RecordNormalizerImpl;
|
||||
import com.antgroup.openspg.builder.core.runtime.BuilderContext;
|
||||
import com.antgroup.openspg.builder.model.exception.BuilderException;
|
||||
import com.antgroup.openspg.builder.model.exception.BuilderRecordException;
|
||||
@ -17,6 +19,7 @@ import java.util.List;
|
||||
public class RelationMappingProcessor extends BaseMappingProcessor<RelationMappingNodeConfig> {
|
||||
|
||||
private Relation relation;
|
||||
private RecordNormalizer recordNormalizer;
|
||||
|
||||
public RelationMappingProcessor(String id, String name, RelationMappingNodeConfig config) {
|
||||
super(id, name, config);
|
||||
@ -28,6 +31,8 @@ public class RelationMappingProcessor extends BaseMappingProcessor<RelationMappi
|
||||
|
||||
RelationIdentifier identifier = RelationIdentifier.parse(config.getRelation());
|
||||
this.relation = (Relation) loadSchema(identifier, context.getProjectSchema());
|
||||
this.recordNormalizer = new RecordNormalizerImpl(config.getMappingConfigs());
|
||||
this.recordNormalizer.init(context);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -35,7 +40,8 @@ public class RelationMappingProcessor extends BaseMappingProcessor<RelationMappi
|
||||
List<BaseRecord> spgRecords = new ArrayList<>(inputs.size());
|
||||
for (BaseRecord baseRecord : inputs) {
|
||||
BuilderRecord record = (BuilderRecord) baseRecord;
|
||||
RelationRecord relationRecord = relationRecordMapping(record, relation, config);
|
||||
RelationRecord relationRecord =
|
||||
relationRecordMapping(record, relation, config, recordNormalizer);
|
||||
if (relationRecord != null) {
|
||||
spgRecords.add(relationRecord);
|
||||
}
|
||||
@ -44,13 +50,18 @@ public class RelationMappingProcessor extends BaseMappingProcessor<RelationMappi
|
||||
}
|
||||
|
||||
public static RelationRecord relationRecordMapping(
|
||||
BuilderRecord record, Relation relation, RelationMappingNodeConfig mappingConfig) {
|
||||
BuilderRecord record,
|
||||
Relation relation,
|
||||
RelationMappingNodeConfig mappingConfig,
|
||||
RecordNormalizer propertyNormalizerFactory) {
|
||||
if (isFiltered(record, mappingConfig.getMappingFilters())) {
|
||||
return null;
|
||||
}
|
||||
|
||||
BuilderRecord mappedRecord = mapping(record, mappingConfig.getMappingConfigs());
|
||||
return toSPGRecord(mappedRecord, relation);
|
||||
RelationRecord relationRecord = toSPGRecord(mappedRecord, relation);
|
||||
propertyNormalizerFactory.propertyNormalize(relationRecord);
|
||||
return relationRecord;
|
||||
}
|
||||
|
||||
private static RelationRecord toSPGRecord(BuilderRecord record, Relation relation) {
|
||||
|
||||
@ -13,31 +13,28 @@
|
||||
|
||||
package com.antgroup.openspg.builder.core.physical.process;
|
||||
|
||||
import com.antgroup.openspg.builder.core.normalize.RecordNormalizer;
|
||||
import com.antgroup.openspg.builder.core.normalize.RecordNormalizerImpl;
|
||||
import com.antgroup.openspg.builder.core.runtime.BuilderContext;
|
||||
import com.antgroup.openspg.builder.core.semantic.PropertyMounter;
|
||||
import com.antgroup.openspg.builder.model.exception.BuilderException;
|
||||
import com.antgroup.openspg.builder.model.exception.BuilderRecordException;
|
||||
import com.antgroup.openspg.builder.model.pipeline.config.SPGTypeMappingNodeConfig;
|
||||
import com.antgroup.openspg.builder.model.record.BaseAdvancedRecord;
|
||||
import com.antgroup.openspg.builder.model.record.BaseRecord;
|
||||
import com.antgroup.openspg.builder.model.record.BuilderRecord;
|
||||
import com.antgroup.openspg.builder.model.record.property.SPGPropertyRecord;
|
||||
import com.antgroup.openspg.cloudext.interfaces.graphstore.adapter.record.impl.convertor.VertexRecordConvertor;
|
||||
import com.antgroup.openspg.common.util.StringUtils;
|
||||
import com.antgroup.openspg.core.schema.model.identifier.SPGTypeIdentifier;
|
||||
import com.antgroup.openspg.core.schema.model.type.BaseSPGType;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.collections4.CollectionUtils;
|
||||
import org.apache.commons.collections4.MapUtils;
|
||||
|
||||
@Slf4j
|
||||
public class SPGTypeMappingProcessor extends BaseMappingProcessor<SPGTypeMappingNodeConfig> {
|
||||
|
||||
private BaseSPGType spgType;
|
||||
private Map<String, List<PropertyMounter>> propertyMounters;
|
||||
private RecordNormalizer recordNormalizer;
|
||||
|
||||
public SPGTypeMappingProcessor(String id, String name, SPGTypeMappingNodeConfig config) {
|
||||
super(id, name, config);
|
||||
@ -49,7 +46,8 @@ public class SPGTypeMappingProcessor extends BaseMappingProcessor<SPGTypeMapping
|
||||
|
||||
SPGTypeIdentifier identifier = SPGTypeIdentifier.parse(config.getSpgType());
|
||||
this.spgType = (BaseSPGType) loadSchema(identifier, context.getProjectSchema());
|
||||
this.propertyMounters = loadPropertyMounters(config.getMappingConfigs());
|
||||
this.recordNormalizer = new RecordNormalizerImpl(config.getMappingConfigs());
|
||||
this.recordNormalizer.init(context);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -58,7 +56,7 @@ public class SPGTypeMappingProcessor extends BaseMappingProcessor<SPGTypeMapping
|
||||
for (BaseRecord baseRecord : inputs) {
|
||||
BuilderRecord record = (BuilderRecord) baseRecord;
|
||||
BaseAdvancedRecord advancedRecord =
|
||||
spgTypeRecordMapping(record, spgType, config, propertyMounters);
|
||||
spgTypeRecordMapping(record, spgType, config, recordNormalizer);
|
||||
if (advancedRecord != null) {
|
||||
spgRecords.add(advancedRecord);
|
||||
}
|
||||
@ -70,14 +68,14 @@ public class SPGTypeMappingProcessor extends BaseMappingProcessor<SPGTypeMapping
|
||||
BuilderRecord record,
|
||||
BaseSPGType spgType,
|
||||
SPGTypeMappingNodeConfig mappingConfig,
|
||||
Map<String, List<PropertyMounter>> propertyMounters) {
|
||||
RecordNormalizer propertyNormalizerFactory) {
|
||||
if (isFiltered(record, mappingConfig.getMappingFilters())) {
|
||||
return null;
|
||||
}
|
||||
|
||||
BuilderRecord mappedRecord = mapping(record, mappingConfig.getMappingConfigs());
|
||||
BaseAdvancedRecord advancedRecord = toSPGRecord(mappedRecord, spgType);
|
||||
propertyMount(advancedRecord, propertyMounters);
|
||||
propertyNormalizerFactory.propertyNormalize(advancedRecord);
|
||||
return advancedRecord;
|
||||
}
|
||||
|
||||
@ -89,24 +87,6 @@ public class SPGTypeMappingProcessor extends BaseMappingProcessor<SPGTypeMapping
|
||||
return VertexRecordConvertor.toAdvancedRecord(spgType, bizId, record.getProps());
|
||||
}
|
||||
|
||||
private static void propertyMount(
|
||||
BaseAdvancedRecord advancedRecord, Map<String, List<PropertyMounter>> propertyMounters) {
|
||||
if (MapUtils.isEmpty(propertyMounters)) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (SPGPropertyRecord propertyRecord : advancedRecord.getSpgProperties()) {
|
||||
if (!propertyRecord.getProperty().getObjectTypeRef().isAdvancedType()) {
|
||||
continue;
|
||||
}
|
||||
List<PropertyMounter> mounters = propertyMounters.get(propertyRecord.getName());
|
||||
if (CollectionUtils.isEmpty(mounters)) {
|
||||
continue;
|
||||
}
|
||||
mounters.forEach(mounter -> mounter.propertyMount(propertyRecord));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {}
|
||||
}
|
||||
|
||||
@ -1,7 +1,8 @@
|
||||
package com.antgroup.openspg.builder.core.physical.process;
|
||||
|
||||
import com.antgroup.openspg.builder.core.normalize.RecordNormalizer;
|
||||
import com.antgroup.openspg.builder.core.normalize.RecordNormalizerImpl;
|
||||
import com.antgroup.openspg.builder.core.runtime.BuilderContext;
|
||||
import com.antgroup.openspg.builder.core.semantic.PropertyMounter;
|
||||
import com.antgroup.openspg.builder.model.exception.BuilderException;
|
||||
import com.antgroup.openspg.builder.model.exception.PipelineConfigException;
|
||||
import com.antgroup.openspg.builder.model.pipeline.config.BaseMappingNodeConfig;
|
||||
@ -25,8 +26,7 @@ public class SubgraphMappingProcessor extends BaseMappingProcessor<SubGraphMappi
|
||||
|
||||
private final Map<BaseSPGIdentifier, BaseMappingNodeConfig> mappingNodeConfigs = new HashMap<>();
|
||||
private final Map<BaseSPGIdentifier, BaseOntology> ontologies = new HashMap<>();
|
||||
private final Map<BaseSPGIdentifier, Map<String, List<PropertyMounter>>> propertyMounters =
|
||||
new HashMap<>();
|
||||
private final Map<BaseSPGIdentifier, RecordNormalizer> recordNormalizers = new HashMap<>();
|
||||
|
||||
public SubgraphMappingProcessor(String id, String name, SubGraphMappingNodeConfig config) {
|
||||
super(id, name, config);
|
||||
@ -41,22 +41,28 @@ public class SubgraphMappingProcessor extends BaseMappingProcessor<SubGraphMappi
|
||||
case SPG_TYPE_MAPPING:
|
||||
SPGTypeMappingNodeConfig mappingConfig1 = (SPGTypeMappingNodeConfig) mappingConfig;
|
||||
SPGTypeIdentifier identifier1 = SPGTypeIdentifier.parse(mappingConfig1.getSpgType());
|
||||
this.ontologies.put(identifier1, loadSchema(identifier1, context.getProjectSchema()));
|
||||
this.propertyMounters.put(
|
||||
identifier1, loadPropertyMounters(mappingConfig1.getMappingConfigs()));
|
||||
mappingNodeConfigs.put(identifier1, mappingConfig1);
|
||||
setUpVariables(mappingConfig1, identifier1);
|
||||
break;
|
||||
case RELATION_MAPPING:
|
||||
RelationMappingNodeConfig mappingConfig2 = (RelationMappingNodeConfig) mappingConfig;
|
||||
RelationIdentifier identifier2 = RelationIdentifier.parse(mappingConfig2.getRelation());
|
||||
this.ontologies.put(identifier2, loadSchema(identifier2, context.getProjectSchema()));
|
||||
mappingNodeConfigs.put(identifier2, mappingConfig2);
|
||||
setUpVariables(mappingConfig2, identifier2);
|
||||
default:
|
||||
throw new PipelineConfigException("illegal mapping config for SubgraphMappingProcessor");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void setUpVariables(
|
||||
BaseMappingNodeConfig mappingNodeConfig, BaseSPGIdentifier identifier) {
|
||||
this.ontologies.put(identifier, loadSchema(identifier, context.getProjectSchema()));
|
||||
RecordNormalizerImpl propertyNormalizerFactory =
|
||||
new RecordNormalizerImpl(mappingNodeConfig.getMappingConfigs());
|
||||
propertyNormalizerFactory.init(context);
|
||||
this.recordNormalizers.put(identifier, propertyNormalizerFactory);
|
||||
mappingNodeConfigs.put(identifier, mappingNodeConfig);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<BaseRecord> process(List<BaseRecord> inputs) {
|
||||
List<BaseRecord> spgRecords = new ArrayList<>(inputs.size());
|
||||
@ -80,22 +86,25 @@ public class SubgraphMappingProcessor extends BaseMappingProcessor<SubGraphMappi
|
||||
BuilderRecord record, BaseSPGIdentifier identifier) {
|
||||
BaseMappingNodeConfig mappingNodeConfig = mappingNodeConfigs.get(identifier);
|
||||
BaseOntology baseOntology = ontologies.get(identifier);
|
||||
RecordNormalizer normalizerFactory = recordNormalizers.get(identifier);
|
||||
|
||||
BaseRecord result = null;
|
||||
switch (mappingNodeConfig.getType()) {
|
||||
case SPG_TYPE_MAPPING:
|
||||
Map<String, List<PropertyMounter>> mounters = propertyMounters.get(identifier);
|
||||
result =
|
||||
SPGTypeMappingProcessor.spgTypeRecordMapping(
|
||||
record,
|
||||
(BaseSPGType) baseOntology,
|
||||
(SPGTypeMappingNodeConfig) mappingNodeConfig,
|
||||
mounters);
|
||||
normalizerFactory);
|
||||
break;
|
||||
case RELATION_MAPPING:
|
||||
result =
|
||||
RelationMappingProcessor.relationRecordMapping(
|
||||
record, (Relation) baseOntology, (RelationMappingNodeConfig) mappingNodeConfig);
|
||||
record,
|
||||
(Relation) baseOntology,
|
||||
(RelationMappingNodeConfig) mappingNodeConfig,
|
||||
normalizerFactory);
|
||||
break;
|
||||
default:
|
||||
throw new PipelineConfigException("illegal mapping config for SubgraphMappingProcessor");
|
||||
|
||||
@ -1,16 +0,0 @@
|
||||
package com.antgroup.openspg.builder.core.semantic;
|
||||
|
||||
import com.antgroup.openspg.builder.core.runtime.BuilderContext;
|
||||
import com.antgroup.openspg.builder.model.exception.BuilderException;
|
||||
import com.antgroup.openspg.builder.model.exception.PropertyMounterException;
|
||||
import com.antgroup.openspg.builder.model.record.property.SPGPropertyRecord;
|
||||
|
||||
/** 属性挂载将某个非基础类型的属性链接到具体某个实例id上 */
|
||||
public interface PropertyMounter {
|
||||
|
||||
/** 初始化属性挂载策略 */
|
||||
void init(BuilderContext context) throws BuilderException;
|
||||
|
||||
/** 输出一条spg记录,当该spg记录的某些属性是非基础类型时,原地执行属性挂载 */
|
||||
void propertyMount(SPGPropertyRecord record) throws PropertyMounterException;
|
||||
}
|
||||
@ -1,25 +0,0 @@
|
||||
package com.antgroup.openspg.builder.core.semantic;
|
||||
|
||||
import com.antgroup.openspg.builder.core.semantic.impl.IdEqualsPropertyMounter;
|
||||
import com.antgroup.openspg.builder.core.semantic.impl.OperatorPropertyMounter;
|
||||
import com.antgroup.openspg.builder.core.semantic.impl.SearchEnginePropertyMounter;
|
||||
import com.antgroup.openspg.builder.model.pipeline.config.OperatorPropertyMounterConfig;
|
||||
import com.antgroup.openspg.builder.model.pipeline.config.PropertyMounterConfig;
|
||||
import com.antgroup.openspg.builder.model.pipeline.config.SearchEnginePropertyMounterConfig;
|
||||
|
||||
public class PropertyMounterFactory {
|
||||
|
||||
public static PropertyMounter getPropertyMounter(PropertyMounterConfig config) {
|
||||
switch (config.getMounterType()) {
|
||||
case OPERATOR:
|
||||
return new OperatorPropertyMounter((OperatorPropertyMounterConfig) config);
|
||||
case SEARCH_ENGINE:
|
||||
return new SearchEnginePropertyMounter((SearchEnginePropertyMounterConfig) config);
|
||||
case ID_EQUALS:
|
||||
return IdEqualsPropertyMounter.INSTANCE;
|
||||
default:
|
||||
throw new IllegalArgumentException(
|
||||
"illegal property mounter type=" + config.getMounterType());
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1,20 +0,0 @@
|
||||
package com.antgroup.openspg.builder.core.semantic.impl;
|
||||
|
||||
import com.antgroup.openspg.builder.core.runtime.BuilderContext;
|
||||
import com.antgroup.openspg.builder.core.semantic.PropertyMounter;
|
||||
import com.antgroup.openspg.builder.model.exception.BuilderException;
|
||||
import com.antgroup.openspg.builder.model.exception.PropertyMounterException;
|
||||
import com.antgroup.openspg.builder.model.record.property.SPGPropertyRecord;
|
||||
|
||||
public class IdEqualsPropertyMounter implements PropertyMounter {
|
||||
|
||||
public static final IdEqualsPropertyMounter INSTANCE = new IdEqualsPropertyMounter();
|
||||
|
||||
private IdEqualsPropertyMounter() {}
|
||||
|
||||
@Override
|
||||
public void init(BuilderContext context) throws BuilderException {}
|
||||
|
||||
@Override
|
||||
public void propertyMount(SPGPropertyRecord record) throws PropertyMounterException {}
|
||||
}
|
||||
@ -1,23 +0,0 @@
|
||||
package com.antgroup.openspg.builder.core.semantic.impl;
|
||||
|
||||
import com.antgroup.openspg.builder.core.runtime.BuilderContext;
|
||||
import com.antgroup.openspg.builder.core.semantic.PropertyMounter;
|
||||
import com.antgroup.openspg.builder.model.exception.BuilderException;
|
||||
import com.antgroup.openspg.builder.model.exception.PropertyMounterException;
|
||||
import com.antgroup.openspg.builder.model.pipeline.config.SearchEnginePropertyMounterConfig;
|
||||
import com.antgroup.openspg.builder.model.record.property.SPGPropertyRecord;
|
||||
|
||||
public class SearchEnginePropertyMounter implements PropertyMounter {
|
||||
|
||||
private final SearchEnginePropertyMounterConfig mounterConfig;
|
||||
|
||||
public SearchEnginePropertyMounter(SearchEnginePropertyMounterConfig config) {
|
||||
this.mounterConfig = config;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(BuilderContext context) throws BuilderException {}
|
||||
|
||||
@Override
|
||||
public void propertyMount(SPGPropertyRecord record) throws PropertyMounterException {}
|
||||
}
|
||||
@ -1,12 +0,0 @@
|
||||
package com.antgroup.openspg.builder.model.exception;
|
||||
|
||||
public class PropertyMounterException extends BuilderRecordException {
|
||||
|
||||
public PropertyMounterException(Throwable cause, String messagePattern, Object... args) {
|
||||
super(cause, messagePattern, args);
|
||||
}
|
||||
|
||||
public PropertyMounterException(String messagePattern, Object... args) {
|
||||
this(null, messagePattern, args);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,12 @@
|
||||
package com.antgroup.openspg.builder.model.exception;
|
||||
|
||||
public class PropertyNormalizeException extends BuilderRecordException {
|
||||
|
||||
public PropertyNormalizeException(Throwable cause, String messagePattern, Object... args) {
|
||||
super(cause, messagePattern, args);
|
||||
}
|
||||
|
||||
public PropertyNormalizeException(String messagePattern, Object... args) {
|
||||
this(null, messagePattern, args);
|
||||
}
|
||||
}
|
||||
@ -23,6 +23,10 @@ public abstract class BaseMappingNodeConfig extends BaseNodeConfig {
|
||||
public static class MappingConfig {
|
||||
private final String source;
|
||||
private final String target;
|
||||
private final List<PropertyMounterConfig> mounterConfigs;
|
||||
private final List<PropertyNormalizerConfig> mounterConfigs;
|
||||
}
|
||||
|
||||
public List<MappingConfig> getMappingConfigs() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,15 +1,15 @@
|
||||
package com.antgroup.openspg.builder.model.pipeline.config;
|
||||
|
||||
import com.antgroup.openspg.builder.model.pipeline.enums.PropertyMounterTypeEnum;
|
||||
import com.antgroup.openspg.builder.model.pipeline.enums.PropertyNormalizerTypeEnum;
|
||||
import lombok.Getter;
|
||||
|
||||
@Getter
|
||||
public abstract class BasePropertyMounterConfig {
|
||||
|
||||
/** the type of property mounter */
|
||||
private final PropertyMounterTypeEnum type;
|
||||
private final PropertyNormalizerTypeEnum type;
|
||||
|
||||
protected BasePropertyMounterConfig(PropertyMounterTypeEnum type) {
|
||||
protected BasePropertyMounterConfig(PropertyNormalizerTypeEnum type) {
|
||||
this.type = type;
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,18 +1,18 @@
|
||||
package com.antgroup.openspg.builder.model.pipeline.config;
|
||||
|
||||
import com.antgroup.openspg.builder.model.pipeline.enums.PropertyMounterTypeEnum;
|
||||
import com.antgroup.openspg.builder.model.pipeline.enums.PropertyNormalizerTypeEnum;
|
||||
import java.util.Map;
|
||||
import lombok.Getter;
|
||||
|
||||
@Getter
|
||||
public class OperatorPropertyMounterConfig extends PropertyMounterConfig {
|
||||
public class OperatorPropertyNormalizerConfig extends PropertyNormalizerConfig {
|
||||
|
||||
private final OperatorConfig config;
|
||||
|
||||
private final Map<String, String> params;
|
||||
|
||||
public OperatorPropertyMounterConfig(OperatorConfig config, Map<String, String> params) {
|
||||
super(PropertyMounterTypeEnum.OPERATOR);
|
||||
public OperatorPropertyNormalizerConfig(OperatorConfig config, Map<String, String> params) {
|
||||
super(PropertyNormalizerTypeEnum.OPERATOR);
|
||||
this.config = config;
|
||||
this.params = params;
|
||||
}
|
||||
@ -1,15 +0,0 @@
|
||||
package com.antgroup.openspg.builder.model.pipeline.config;
|
||||
|
||||
import com.antgroup.openspg.builder.model.pipeline.enums.PropertyMounterTypeEnum;
|
||||
import com.antgroup.openspg.server.common.model.base.BaseValObj;
|
||||
import lombok.Getter;
|
||||
|
||||
@Getter
|
||||
public abstract class PropertyMounterConfig extends BaseValObj {
|
||||
|
||||
private final PropertyMounterTypeEnum mounterType;
|
||||
|
||||
public PropertyMounterConfig(PropertyMounterTypeEnum mounterType) {
|
||||
this.mounterType = mounterType;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,15 @@
|
||||
package com.antgroup.openspg.builder.model.pipeline.config;
|
||||
|
||||
import com.antgroup.openspg.builder.model.pipeline.enums.PropertyNormalizerTypeEnum;
|
||||
import com.antgroup.openspg.server.common.model.base.BaseValObj;
|
||||
import lombok.Getter;
|
||||
|
||||
@Getter
|
||||
public abstract class PropertyNormalizerConfig extends BaseValObj {
|
||||
|
||||
private final PropertyNormalizerTypeEnum normalizerType;
|
||||
|
||||
public PropertyNormalizerConfig(PropertyNormalizerTypeEnum normalizerType) {
|
||||
this.normalizerType = normalizerType;
|
||||
}
|
||||
}
|
||||
@ -1,10 +0,0 @@
|
||||
package com.antgroup.openspg.builder.model.pipeline.config;
|
||||
|
||||
import com.antgroup.openspg.builder.model.pipeline.enums.PropertyMounterTypeEnum;
|
||||
|
||||
public class SearchEnginePropertyMounterConfig extends PropertyMounterConfig {
|
||||
|
||||
public SearchEnginePropertyMounterConfig() {
|
||||
super(PropertyMounterTypeEnum.SEARCH_ENGINE);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,10 @@
|
||||
package com.antgroup.openspg.builder.model.pipeline.config;
|
||||
|
||||
import com.antgroup.openspg.builder.model.pipeline.enums.PropertyNormalizerTypeEnum;
|
||||
|
||||
public class SearchPropertyNormalizerConfig extends PropertyNormalizerConfig {
|
||||
|
||||
public SearchPropertyNormalizerConfig() {
|
||||
super(PropertyNormalizerTypeEnum.SEARCH);
|
||||
}
|
||||
}
|
||||
@ -1,8 +1,8 @@
|
||||
package com.antgroup.openspg.builder.model.pipeline.enums;
|
||||
|
||||
public enum PropertyMounterTypeEnum {
|
||||
public enum PropertyNormalizerTypeEnum {
|
||||
OPERATOR,
|
||||
SEARCH_ENGINE,
|
||||
SEARCH,
|
||||
ID_EQUALS,
|
||||
;
|
||||
}
|
||||
@ -14,6 +14,7 @@
|
||||
package com.antgroup.openspg.builder.model.record.property;
|
||||
|
||||
import com.antgroup.openspg.server.common.model.base.BaseValObj;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import lombok.Getter;
|
||||
@ -41,4 +42,9 @@ public class SPGPropertyValue extends BaseValObj {
|
||||
public String getStdValue() {
|
||||
return stds.stream().map(Object::toString).collect(Collectors.joining(SEPARATOR));
|
||||
}
|
||||
|
||||
public void setSingleStd(Object std) {
|
||||
stds = new ArrayList<>(1);
|
||||
stds.add(std);
|
||||
}
|
||||
}
|
||||
|
||||
19
builder/runner/local/src/main/resources/logback.xml
Normal file
19
builder/runner/local/src/main/resources/logback.xml
Normal file
@ -0,0 +1,19 @@
|
||||
<Configuration>
|
||||
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>%-4relative [%thread] %-5level %logger{35} - %msg %n</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<logger name="com.baidu.brpc" level="ERROR" additivity="false">
|
||||
<appender-ref ref="STDOUT"/>
|
||||
</logger>
|
||||
|
||||
<logger name="com.dtflys.forest" level="ERROR" additivity="false">
|
||||
<appender-ref ref="STDOUT"/>
|
||||
</logger>
|
||||
|
||||
<root level="INFO">
|
||||
<appender-ref ref="STDOUT"/>
|
||||
</root>
|
||||
</Configuration>
|
||||
@ -13,7 +13,10 @@
|
||||
|
||||
package com.antgroup.openspg.core.schema.model.type;
|
||||
|
||||
import com.antgroup.openspg.core.schema.model.identifier.RelationIdentifier;
|
||||
import com.antgroup.openspg.core.schema.model.identifier.SPGTripleIdentifier;
|
||||
import com.antgroup.openspg.core.schema.model.identifier.SPGTypeIdentifier;
|
||||
import com.antgroup.openspg.core.schema.model.predicate.Relation;
|
||||
import com.antgroup.openspg.server.common.model.base.BaseToString;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -52,6 +55,13 @@ public class ProjectSchema extends BaseToString {
|
||||
return spgTypeMap.get(name);
|
||||
}
|
||||
|
||||
public Relation getByName(RelationIdentifier identifier) {
|
||||
BaseSPGType spgType = getByName(identifier.getStart());
|
||||
return spgType.getRelationByName(
|
||||
new SPGTripleIdentifier(
|
||||
identifier.getStart(), identifier.getPredicate(), identifier.getEnd()));
|
||||
}
|
||||
|
||||
public boolean getSpreadable(SPGTypeIdentifier identifier) {
|
||||
BaseSPGType spgType = getByName(identifier);
|
||||
if (!(spgType instanceof StandardType)) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user