diff --git a/builder/core/src/main/java/com/antgroup/openspg/builder/core/physical/process/BaseExtractProcessor.java b/builder/core/src/main/java/com/antgroup/openspg/builder/core/physical/process/BaseExtractProcessor.java index 79757ade..645b3c6d 100644 --- a/builder/core/src/main/java/com/antgroup/openspg/builder/core/physical/process/BaseExtractProcessor.java +++ b/builder/core/src/main/java/com/antgroup/openspg/builder/core/physical/process/BaseExtractProcessor.java @@ -1,11 +1,66 @@ package com.antgroup.openspg.builder.core.physical.process; +import com.antgroup.openspg.builder.core.physical.operator.OperatorFactory; +import com.antgroup.openspg.builder.core.physical.operator.PythonOperatorFactory; +import com.antgroup.openspg.builder.core.physical.operator.protocol.InvokeResultWrapper; +import com.antgroup.openspg.builder.core.physical.operator.protocol.PythonRecord; +import com.antgroup.openspg.builder.core.runtime.BuilderContext; +import com.antgroup.openspg.builder.model.exception.BuilderException; import com.antgroup.openspg.builder.model.pipeline.config.BaseExtractNodeConfig; +import com.antgroup.openspg.builder.model.record.BaseRecord; +import com.antgroup.openspg.builder.model.record.BuilderRecord; +import com.antgroup.openspg.core.schema.model.identifier.SPGTypeIdentifier; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.commons.collections4.CollectionUtils; +@SuppressWarnings("unchecked") public abstract class BaseExtractProcessor extends BaseProcessor { + protected static final ObjectMapper mapper = new ObjectMapper(); + protected final OperatorFactory operatorFactory; + public BaseExtractProcessor(String id, String name, T config) { super(id, name, config); + this.operatorFactory = PythonOperatorFactory.getInstance(); } + + @Override + public void doInit(BuilderContext context) throws BuilderException { + super.doInit(context); + this.operatorFactory.init(context); + this.operatorFactory.loadOperator(config.getOperatorConfig()); + } + + @Override + public List process(List inputs) { + List results = new ArrayList<>(); + for (BaseRecord record : inputs) { + BuilderRecord builderRecord = (BuilderRecord) record; + Map result = + (Map) + operatorFactory.invoke(config.getOperatorConfig(), builderRecord.getProps()); + + InvokeResultWrapper> invokeResultWrapper = + mapper.convertValue( + result, new TypeReference>>() {}); + if (invokeResultWrapper == null || CollectionUtils.isEmpty(invokeResultWrapper.getData())) { + continue; + } + + for (PythonRecord data : invokeResultWrapper.getData()) { + results.add( + new BuilderRecord( + null, SPGTypeIdentifier.parse(data.getSpgTypeName()), data.getProperties())); + } + } + return results; + } + + @Override + public void close() throws Exception {} } diff --git a/builder/core/src/main/java/com/antgroup/openspg/builder/core/physical/process/LLMBasedExtractProcessor.java b/builder/core/src/main/java/com/antgroup/openspg/builder/core/physical/process/LLMBasedExtractProcessor.java index b2e50e80..d84a0ceb 100644 --- a/builder/core/src/main/java/com/antgroup/openspg/builder/core/physical/process/LLMBasedExtractProcessor.java +++ b/builder/core/src/main/java/com/antgroup/openspg/builder/core/physical/process/LLMBasedExtractProcessor.java @@ -1,27 +1,10 @@ package com.antgroup.openspg.builder.core.physical.process; -import com.antgroup.openspg.builder.core.runtime.BuilderContext; -import com.antgroup.openspg.builder.model.exception.BuilderException; import com.antgroup.openspg.builder.model.pipeline.config.LLMBasedExtractNodeConfig; -import com.antgroup.openspg.builder.model.record.BaseRecord; -import java.util.List; public class LLMBasedExtractProcessor extends BaseExtractProcessor { public LLMBasedExtractProcessor(String id, String name, LLMBasedExtractNodeConfig config) { super(id, name, config); } - - @Override - public void doInit(BuilderContext context) throws BuilderException { - super.doInit(context); - } - - @Override - public List process(List inputs) { - return null; - } - - @Override - public void close() throws Exception {} } diff --git a/builder/core/src/main/java/com/antgroup/openspg/builder/core/physical/process/UserDefinedExtractProcessor.java b/builder/core/src/main/java/com/antgroup/openspg/builder/core/physical/process/UserDefinedExtractProcessor.java index 9c035595..96e09432 100644 --- a/builder/core/src/main/java/com/antgroup/openspg/builder/core/physical/process/UserDefinedExtractProcessor.java +++ b/builder/core/src/main/java/com/antgroup/openspg/builder/core/physical/process/UserDefinedExtractProcessor.java @@ -1,66 +1,11 @@ package com.antgroup.openspg.builder.core.physical.process; -import com.antgroup.openspg.builder.core.physical.operator.OperatorFactory; -import com.antgroup.openspg.builder.core.physical.operator.PythonOperatorFactory; -import com.antgroup.openspg.builder.core.physical.operator.protocol.InvokeResultWrapper; -import com.antgroup.openspg.builder.core.physical.operator.protocol.PythonRecord; -import com.antgroup.openspg.builder.core.runtime.BuilderContext; -import com.antgroup.openspg.builder.model.exception.BuilderException; import com.antgroup.openspg.builder.model.pipeline.config.UserDefinedExtractNodeConfig; -import com.antgroup.openspg.builder.model.record.BaseRecord; -import com.antgroup.openspg.builder.model.record.BuilderRecord; -import com.antgroup.openspg.core.schema.model.identifier.SPGTypeIdentifier; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import org.apache.commons.collections4.CollectionUtils; -@SuppressWarnings("unchecked") public class UserDefinedExtractProcessor extends BaseExtractProcessor { - private static final ObjectMapper mapper = new ObjectMapper(); - private final OperatorFactory operatorFactory; - public UserDefinedExtractProcessor(String id, String name, UserDefinedExtractNodeConfig config) { super(id, name, config); - this.operatorFactory = PythonOperatorFactory.getInstance(); } - - @Override - public void doInit(BuilderContext context) throws BuilderException { - super.doInit(context); - this.operatorFactory.init(context); - this.operatorFactory.loadOperator(config.getOperatorConfig()); - } - - @Override - public List process(List inputs) { - List results = new ArrayList<>(); - for (BaseRecord record : inputs) { - BuilderRecord builderRecord = (BuilderRecord) record; - Map result = - (Map) - operatorFactory.invoke(config.getOperatorConfig(), builderRecord.getProps()); - - InvokeResultWrapper> invokeResultWrapper = - mapper.convertValue( - result, new TypeReference>>() {}); - if (invokeResultWrapper == null || CollectionUtils.isEmpty(invokeResultWrapper.getData())) { - continue; - } - - for (PythonRecord data : invokeResultWrapper.getData()) { - results.add( - new BuilderRecord( - null, SPGTypeIdentifier.parse(data.getSpgTypeName()), data.getProperties())); - } - } - return results; - } - - @Override - public void close() throws Exception {} } diff --git a/builder/model/src/main/java/com/antgroup/openspg/builder/model/pipeline/config/BaseExtractNodeConfig.java b/builder/model/src/main/java/com/antgroup/openspg/builder/model/pipeline/config/BaseExtractNodeConfig.java index 621faccb..4522347e 100644 --- a/builder/model/src/main/java/com/antgroup/openspg/builder/model/pipeline/config/BaseExtractNodeConfig.java +++ b/builder/model/src/main/java/com/antgroup/openspg/builder/model/pipeline/config/BaseExtractNodeConfig.java @@ -19,7 +19,10 @@ import lombok.Getter; @Getter public abstract class BaseExtractNodeConfig extends BaseNodeConfig { - public BaseExtractNodeConfig(NodeTypeEnum type) { + private final OperatorConfig operatorConfig; + + public BaseExtractNodeConfig(NodeTypeEnum type, OperatorConfig operatorConfig) { super(type); + this.operatorConfig = operatorConfig; } } diff --git a/builder/model/src/main/java/com/antgroup/openspg/builder/model/pipeline/config/LLMBasedExtractNodeConfig.java b/builder/model/src/main/java/com/antgroup/openspg/builder/model/pipeline/config/LLMBasedExtractNodeConfig.java index ae88b626..68a03ced 100644 --- a/builder/model/src/main/java/com/antgroup/openspg/builder/model/pipeline/config/LLMBasedExtractNodeConfig.java +++ b/builder/model/src/main/java/com/antgroup/openspg/builder/model/pipeline/config/LLMBasedExtractNodeConfig.java @@ -1,10 +1,12 @@ package com.antgroup.openspg.builder.model.pipeline.config; import com.antgroup.openspg.builder.model.pipeline.enums.NodeTypeEnum; +import lombok.Getter; +@Getter public class LLMBasedExtractNodeConfig extends BaseExtractNodeConfig { - public LLMBasedExtractNodeConfig() { - super(NodeTypeEnum.LLM_BASED_EXTRACT); + public LLMBasedExtractNodeConfig(OperatorConfig operatorConfig) { + super(NodeTypeEnum.LLM_BASED_EXTRACT, operatorConfig); } } diff --git a/builder/model/src/main/java/com/antgroup/openspg/builder/model/pipeline/config/UserDefinedExtractNodeConfig.java b/builder/model/src/main/java/com/antgroup/openspg/builder/model/pipeline/config/UserDefinedExtractNodeConfig.java index 01e83ba0..68132f06 100644 --- a/builder/model/src/main/java/com/antgroup/openspg/builder/model/pipeline/config/UserDefinedExtractNodeConfig.java +++ b/builder/model/src/main/java/com/antgroup/openspg/builder/model/pipeline/config/UserDefinedExtractNodeConfig.java @@ -6,10 +6,7 @@ import lombok.Getter; @Getter public class UserDefinedExtractNodeConfig extends BaseExtractNodeConfig { - private final OperatorConfig operatorConfig; - public UserDefinedExtractNodeConfig(OperatorConfig operatorConfig) { - super(NodeTypeEnum.USER_DEFINED_EXTRACT); - this.operatorConfig = operatorConfig; + super(NodeTypeEnum.USER_DEFINED_EXTRACT, operatorConfig); } }