This commit is contained in:
baifuyu 2023-12-25 19:03:50 +08:00
parent 22d9cac97a
commit cdeba0cded
6 changed files with 64 additions and 79 deletions

View File

@ -1,11 +1,66 @@
package com.antgroup.openspg.builder.core.physical.process; package com.antgroup.openspg.builder.core.physical.process;
import com.antgroup.openspg.builder.core.physical.operator.OperatorFactory;
import com.antgroup.openspg.builder.core.physical.operator.PythonOperatorFactory;
import com.antgroup.openspg.builder.core.physical.operator.protocol.InvokeResultWrapper;
import com.antgroup.openspg.builder.core.physical.operator.protocol.PythonRecord;
import com.antgroup.openspg.builder.core.runtime.BuilderContext;
import com.antgroup.openspg.builder.model.exception.BuilderException;
import com.antgroup.openspg.builder.model.pipeline.config.BaseExtractNodeConfig; import com.antgroup.openspg.builder.model.pipeline.config.BaseExtractNodeConfig;
import com.antgroup.openspg.builder.model.record.BaseRecord;
import com.antgroup.openspg.builder.model.record.BuilderRecord;
import com.antgroup.openspg.core.schema.model.identifier.SPGTypeIdentifier;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.collections4.CollectionUtils;
@SuppressWarnings("unchecked")
public abstract class BaseExtractProcessor<T extends BaseExtractNodeConfig> public abstract class BaseExtractProcessor<T extends BaseExtractNodeConfig>
extends BaseProcessor<T> { extends BaseProcessor<T> {
protected static final ObjectMapper mapper = new ObjectMapper();
protected final OperatorFactory operatorFactory;
public BaseExtractProcessor(String id, String name, T config) { public BaseExtractProcessor(String id, String name, T config) {
super(id, name, config); super(id, name, config);
this.operatorFactory = PythonOperatorFactory.getInstance();
} }
@Override
public void doInit(BuilderContext context) throws BuilderException {
super.doInit(context);
this.operatorFactory.init(context);
this.operatorFactory.loadOperator(config.getOperatorConfig());
}
@Override
public List<BaseRecord> process(List<BaseRecord> inputs) {
List<BaseRecord> results = new ArrayList<>();
for (BaseRecord record : inputs) {
BuilderRecord builderRecord = (BuilderRecord) record;
Map<String, Object> result =
(Map<String, Object>)
operatorFactory.invoke(config.getOperatorConfig(), builderRecord.getProps());
InvokeResultWrapper<List<PythonRecord>> invokeResultWrapper =
mapper.convertValue(
result, new TypeReference<InvokeResultWrapper<List<PythonRecord>>>() {});
if (invokeResultWrapper == null || CollectionUtils.isEmpty(invokeResultWrapper.getData())) {
continue;
}
for (PythonRecord data : invokeResultWrapper.getData()) {
results.add(
new BuilderRecord(
null, SPGTypeIdentifier.parse(data.getSpgTypeName()), data.getProperties()));
}
}
return results;
}
@Override
public void close() throws Exception {}
} }

View File

@ -1,27 +1,10 @@
package com.antgroup.openspg.builder.core.physical.process; package com.antgroup.openspg.builder.core.physical.process;
import com.antgroup.openspg.builder.core.runtime.BuilderContext;
import com.antgroup.openspg.builder.model.exception.BuilderException;
import com.antgroup.openspg.builder.model.pipeline.config.LLMBasedExtractNodeConfig; import com.antgroup.openspg.builder.model.pipeline.config.LLMBasedExtractNodeConfig;
import com.antgroup.openspg.builder.model.record.BaseRecord;
import java.util.List;
public class LLMBasedExtractProcessor extends BaseExtractProcessor<LLMBasedExtractNodeConfig> { public class LLMBasedExtractProcessor extends BaseExtractProcessor<LLMBasedExtractNodeConfig> {
public LLMBasedExtractProcessor(String id, String name, LLMBasedExtractNodeConfig config) { public LLMBasedExtractProcessor(String id, String name, LLMBasedExtractNodeConfig config) {
super(id, name, config); super(id, name, config);
} }
@Override
public void doInit(BuilderContext context) throws BuilderException {
super.doInit(context);
}
@Override
public List<BaseRecord> process(List<BaseRecord> inputs) {
return null;
}
@Override
public void close() throws Exception {}
} }

View File

@ -1,66 +1,11 @@
package com.antgroup.openspg.builder.core.physical.process; package com.antgroup.openspg.builder.core.physical.process;
import com.antgroup.openspg.builder.core.physical.operator.OperatorFactory;
import com.antgroup.openspg.builder.core.physical.operator.PythonOperatorFactory;
import com.antgroup.openspg.builder.core.physical.operator.protocol.InvokeResultWrapper;
import com.antgroup.openspg.builder.core.physical.operator.protocol.PythonRecord;
import com.antgroup.openspg.builder.core.runtime.BuilderContext;
import com.antgroup.openspg.builder.model.exception.BuilderException;
import com.antgroup.openspg.builder.model.pipeline.config.UserDefinedExtractNodeConfig; import com.antgroup.openspg.builder.model.pipeline.config.UserDefinedExtractNodeConfig;
import com.antgroup.openspg.builder.model.record.BaseRecord;
import com.antgroup.openspg.builder.model.record.BuilderRecord;
import com.antgroup.openspg.core.schema.model.identifier.SPGTypeIdentifier;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.collections4.CollectionUtils;
@SuppressWarnings("unchecked")
public class UserDefinedExtractProcessor public class UserDefinedExtractProcessor
extends BaseExtractProcessor<UserDefinedExtractNodeConfig> { extends BaseExtractProcessor<UserDefinedExtractNodeConfig> {
private static final ObjectMapper mapper = new ObjectMapper();
private final OperatorFactory operatorFactory;
public UserDefinedExtractProcessor(String id, String name, UserDefinedExtractNodeConfig config) { public UserDefinedExtractProcessor(String id, String name, UserDefinedExtractNodeConfig config) {
super(id, name, config); super(id, name, config);
this.operatorFactory = PythonOperatorFactory.getInstance();
} }
@Override
public void doInit(BuilderContext context) throws BuilderException {
super.doInit(context);
this.operatorFactory.init(context);
this.operatorFactory.loadOperator(config.getOperatorConfig());
}
@Override
public List<BaseRecord> process(List<BaseRecord> inputs) {
List<BaseRecord> results = new ArrayList<>();
for (BaseRecord record : inputs) {
BuilderRecord builderRecord = (BuilderRecord) record;
Map<String, Object> result =
(Map<String, Object>)
operatorFactory.invoke(config.getOperatorConfig(), builderRecord.getProps());
InvokeResultWrapper<List<PythonRecord>> invokeResultWrapper =
mapper.convertValue(
result, new TypeReference<InvokeResultWrapper<List<PythonRecord>>>() {});
if (invokeResultWrapper == null || CollectionUtils.isEmpty(invokeResultWrapper.getData())) {
continue;
}
for (PythonRecord data : invokeResultWrapper.getData()) {
results.add(
new BuilderRecord(
null, SPGTypeIdentifier.parse(data.getSpgTypeName()), data.getProperties()));
}
}
return results;
}
@Override
public void close() throws Exception {}
} }

View File

@ -19,7 +19,10 @@ import lombok.Getter;
@Getter @Getter
public abstract class BaseExtractNodeConfig extends BaseNodeConfig { public abstract class BaseExtractNodeConfig extends BaseNodeConfig {
public BaseExtractNodeConfig(NodeTypeEnum type) { private final OperatorConfig operatorConfig;
public BaseExtractNodeConfig(NodeTypeEnum type, OperatorConfig operatorConfig) {
super(type); super(type);
this.operatorConfig = operatorConfig;
} }
} }

View File

@ -1,10 +1,12 @@
package com.antgroup.openspg.builder.model.pipeline.config; package com.antgroup.openspg.builder.model.pipeline.config;
import com.antgroup.openspg.builder.model.pipeline.enums.NodeTypeEnum; import com.antgroup.openspg.builder.model.pipeline.enums.NodeTypeEnum;
import lombok.Getter;
@Getter
public class LLMBasedExtractNodeConfig extends BaseExtractNodeConfig { public class LLMBasedExtractNodeConfig extends BaseExtractNodeConfig {
public LLMBasedExtractNodeConfig() { public LLMBasedExtractNodeConfig(OperatorConfig operatorConfig) {
super(NodeTypeEnum.LLM_BASED_EXTRACT); super(NodeTypeEnum.LLM_BASED_EXTRACT, operatorConfig);
} }
} }

View File

@ -6,10 +6,7 @@ import lombok.Getter;
@Getter @Getter
public class UserDefinedExtractNodeConfig extends BaseExtractNodeConfig { public class UserDefinedExtractNodeConfig extends BaseExtractNodeConfig {
private final OperatorConfig operatorConfig;
public UserDefinedExtractNodeConfig(OperatorConfig operatorConfig) { public UserDefinedExtractNodeConfig(OperatorConfig operatorConfig) {
super(NodeTypeEnum.USER_DEFINED_EXTRACT); super(NodeTypeEnum.USER_DEFINED_EXTRACT, operatorConfig);
this.operatorConfig = operatorConfig;
} }
} }