mirror of
https://github.com/OpenSPG/openspg.git
synced 2025-09-25 16:30:21 +00:00
bugfix
This commit is contained in:
parent
b0bf97090f
commit
81c05ccbf4
@ -14,7 +14,7 @@
|
||||
package com.antgroup.openspg.builder.core.physical;
|
||||
|
||||
import com.antgroup.openspg.builder.model.BuilderException;
|
||||
import com.antgroup.openspg.builder.core.runtime.RuntimeContext;
|
||||
import com.antgroup.openspg.builder.core.runtime.BuilderContext;
|
||||
import java.util.Objects;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
@ -53,9 +53,9 @@ public abstract class BasePhysicalNode implements Comparable<BasePhysicalNode> {
|
||||
* instance ID, task parallelism, and so on. The task parallelism can be used for distributed data
|
||||
* reading and partitioning.
|
||||
*
|
||||
* <p>For detailed runtime parameters, please refer to the {@link RuntimeContext} class.
|
||||
* <p>For detailed runtime parameters, please refer to the {@link BuilderContext} class.
|
||||
*/
|
||||
protected RuntimeContext context;
|
||||
protected BuilderContext context;
|
||||
|
||||
/** Whether the node is initialized. */
|
||||
private volatile boolean isInitialized = false;
|
||||
@ -65,7 +65,7 @@ public abstract class BasePhysicalNode implements Comparable<BasePhysicalNode> {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public void init(RuntimeContext context) throws BuilderException {
|
||||
public void init(BuilderContext context) throws BuilderException {
|
||||
this.context = context;
|
||||
if (!isInitialized) {
|
||||
synchronized (this) {
|
||||
@ -77,7 +77,7 @@ public abstract class BasePhysicalNode implements Comparable<BasePhysicalNode> {
|
||||
}
|
||||
}
|
||||
|
||||
public void doInit(RuntimeContext context) throws BuilderException {}
|
||||
public void doInit(BuilderContext context) throws BuilderException {}
|
||||
|
||||
public abstract void close() throws Exception;
|
||||
|
||||
|
@ -1,13 +1,13 @@
|
||||
package com.antgroup.openspg.builder.core.physical.operator;
|
||||
|
||||
import com.antgroup.openspg.builder.model.BuilderException;
|
||||
import com.antgroup.openspg.builder.core.runtime.RuntimeContext;
|
||||
import com.antgroup.openspg.builder.core.runtime.BuilderContext;
|
||||
import com.antgroup.openspg.builder.model.pipeline.config.OperatorConfig;
|
||||
import com.antgroup.openspg.core.schema.model.type.OperatorKey;
|
||||
|
||||
public interface OperatorFactory {
|
||||
|
||||
void init(RuntimeContext context) throws BuilderException;
|
||||
void init(BuilderContext context) throws BuilderException;
|
||||
|
||||
boolean register(OperatorConfig config);
|
||||
|
||||
|
@ -1,6 +1,6 @@
|
||||
package com.antgroup.openspg.builder.core.physical.operator;
|
||||
|
||||
import com.antgroup.openspg.builder.core.runtime.RuntimeContext;
|
||||
import com.antgroup.openspg.builder.core.runtime.BuilderContext;
|
||||
import com.antgroup.openspg.builder.model.pipeline.config.OperatorConfig;
|
||||
import com.antgroup.openspg.common.util.Md5Utils;
|
||||
import com.antgroup.openspg.core.schema.model.type.OperatorKey;
|
||||
@ -47,7 +47,7 @@ public class PythonOperatorFactory implements OperatorFactory {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(RuntimeContext context) {
|
||||
public void init(BuilderContext context) {
|
||||
if (pythonInterpreter == null) {
|
||||
synchronized (PythonOperatorFactory.class) {
|
||||
if (pythonInterpreter == null) {
|
||||
|
@ -17,7 +17,7 @@ import com.antgroup.openspg.builder.model.BuilderException;
|
||||
import com.antgroup.openspg.builder.model.record.BuilderRecord;
|
||||
import com.antgroup.openspg.builder.core.physical.invoker.operator.OperatorInvoker;
|
||||
import com.antgroup.openspg.builder.core.physical.invoker.operator.impl.OperatorInvokerImpl;
|
||||
import com.antgroup.openspg.builder.core.runtime.RuntimeContext;
|
||||
import com.antgroup.openspg.builder.core.runtime.BuilderContext;
|
||||
import com.antgroup.openspg.builder.model.pipeline.config.ExtractNodeConfig;
|
||||
import com.antgroup.openspg.builder.model.record.BaseRecord;
|
||||
import java.util.ArrayList;
|
||||
@ -53,7 +53,7 @@ public class ExtractProcessor extends BaseProcessor<ExtractNodeConfig> {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doInit(RuntimeContext context) throws BuilderException {
|
||||
public void doInit(BuilderContext context) throws BuilderException {
|
||||
this.operatorInvoker = new OperatorInvokerImpl();
|
||||
this.operatorInvoker.init(context);
|
||||
this.operatorInvoker.register(config.getOperatorConfig());
|
||||
|
@ -14,7 +14,7 @@
|
||||
package com.antgroup.openspg.builder.core.physical.process;
|
||||
|
||||
import com.antgroup.openspg.builder.model.BuilderException;
|
||||
import com.antgroup.openspg.builder.core.runtime.RuntimeContext;
|
||||
import com.antgroup.openspg.builder.core.runtime.BuilderContext;
|
||||
import com.antgroup.openspg.builder.core.semantic.PropertyMounter;
|
||||
import com.antgroup.openspg.builder.core.semantic.PropertyMounterFactory;
|
||||
import com.antgroup.openspg.builder.model.pipeline.config.MappingNodeConfig;
|
||||
@ -58,7 +58,7 @@ public class MappingProcessor extends BaseProcessor<MappingNodeConfig> {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doInit(RuntimeContext context) throws BuilderException {
|
||||
public void doInit(BuilderContext context) throws BuilderException {
|
||||
loadPropertyMounter();
|
||||
}
|
||||
|
||||
|
@ -13,21 +13,21 @@
|
||||
|
||||
package com.antgroup.openspg.builder.core.runtime;
|
||||
|
||||
import com.antgroup.openspg.builder.model.record.RecordAlterOperationEnum;
|
||||
import com.antgroup.openspg.core.schema.model.type.ProjectSchema;
|
||||
import java.io.Serializable;
|
||||
import java.util.Map;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.experimental.Accessors;
|
||||
|
||||
@Setter
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
public class RuntimeContext implements Serializable {
|
||||
@Accessors(chain = true)
|
||||
public class BuilderContext implements Serializable {
|
||||
|
||||
private final long projectId;
|
||||
private final String jobName;
|
||||
private final ProjectSchema projectSchema;
|
||||
private final RecordAlterOperationEnum operation;
|
||||
private long projectId;
|
||||
private String jobName;
|
||||
private ProjectSchema projectSchema;
|
||||
|
||||
private final Map<String, Object> params;
|
||||
private String pythonExec;
|
||||
private String pythonPaths;
|
||||
}
|
@ -9,7 +9,7 @@ import java.util.List;
|
||||
public interface BuilderExecutor {
|
||||
|
||||
/** 初始化物理执行计划,如果报错则不再进行构建流程 */
|
||||
void init(PhysicalPlan plan, RuntimeContext context) throws BuilderException;
|
||||
void init(PhysicalPlan plan, BuilderContext context) throws BuilderException;
|
||||
|
||||
/** 输入一批record,执行物理计划并返回计算的结果,如果抛出异常则表示这一批数据构建失败,由runner处理 */
|
||||
List<BaseRecord> eval(List<BaseRecord> inputRecords) throws BuilderRecordException;
|
||||
|
@ -10,7 +10,7 @@ import com.antgroup.openspg.builder.model.pipeline.Pipeline;
|
||||
public interface BuilderRunner {
|
||||
|
||||
/** runner的初始化,当初始化异常时则直接抛出异常不再进行构建流程 */
|
||||
void init(Pipeline pipeline, RuntimeContext context) throws BuilderException;
|
||||
void init(Pipeline pipeline, BuilderContext context) throws BuilderException;
|
||||
|
||||
/** 开始执行runner,即开始知识构建流程,会按照pipeline的定义在具体执行引擎上执行知识构建 */
|
||||
void execute();
|
||||
|
@ -6,7 +6,7 @@ import com.antgroup.openspg.builder.core.physical.PhysicalPlan;
|
||||
import com.antgroup.openspg.builder.core.physical.process.BaseProcessor;
|
||||
import com.antgroup.openspg.builder.core.runtime.BuilderExecutor;
|
||||
import com.antgroup.openspg.builder.core.runtime.BuilderRecordException;
|
||||
import com.antgroup.openspg.builder.core.runtime.RuntimeContext;
|
||||
import com.antgroup.openspg.builder.core.runtime.BuilderContext;
|
||||
import com.antgroup.openspg.builder.model.record.BaseRecord;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
@ -18,7 +18,7 @@ public class DefaultBuilderExecutor implements BuilderExecutor {
|
||||
private PhysicalPlan plan;
|
||||
|
||||
@Override
|
||||
public void init(PhysicalPlan plan, RuntimeContext context) throws BuilderException {
|
||||
public void init(PhysicalPlan plan, BuilderContext context) throws BuilderException {
|
||||
this.plan = plan;
|
||||
for (BasePhysicalNode node : plan.nodes()) {
|
||||
node.init(context);
|
||||
|
@ -2,14 +2,14 @@ package com.antgroup.openspg.builder.core.semantic;
|
||||
|
||||
import com.antgroup.openspg.builder.model.BuilderException;
|
||||
import com.antgroup.openspg.builder.core.runtime.PropertyMounterException;
|
||||
import com.antgroup.openspg.builder.core.runtime.RuntimeContext;
|
||||
import com.antgroup.openspg.builder.core.runtime.BuilderContext;
|
||||
import com.antgroup.openspg.builder.model.record.property.SPGPropertyRecord;
|
||||
|
||||
/** 属性挂载将某个非基础类型的属性链接到具体某个实例id上 */
|
||||
public interface PropertyMounter {
|
||||
|
||||
/** 初始化属性挂载策略 */
|
||||
void init(RuntimeContext context) throws BuilderException;
|
||||
void init(BuilderContext context) throws BuilderException;
|
||||
|
||||
/** 输出一条spg记录,当该spg记录的某些属性是非基础类型时,原地执行属性挂载 */
|
||||
void propertyMount(SPGPropertyRecord record) throws PropertyMounterException;
|
||||
|
@ -2,7 +2,7 @@ package com.antgroup.openspg.builder.core.semantic.impl;
|
||||
|
||||
import com.antgroup.openspg.builder.model.BuilderException;
|
||||
import com.antgroup.openspg.builder.core.runtime.PropertyMounterException;
|
||||
import com.antgroup.openspg.builder.core.runtime.RuntimeContext;
|
||||
import com.antgroup.openspg.builder.core.runtime.BuilderContext;
|
||||
import com.antgroup.openspg.builder.core.semantic.PropertyMounter;
|
||||
import com.antgroup.openspg.builder.model.record.property.SPGPropertyRecord;
|
||||
|
||||
@ -13,7 +13,7 @@ public class IdEqualsPropertyMounter implements PropertyMounter {
|
||||
private IdEqualsPropertyMounter() {}
|
||||
|
||||
@Override
|
||||
public void init(RuntimeContext context) throws BuilderException {}
|
||||
public void init(BuilderContext context) throws BuilderException {}
|
||||
|
||||
@Override
|
||||
public void propertyMount(SPGPropertyRecord record) throws PropertyMounterException {}
|
||||
|
@ -6,7 +6,7 @@ import com.antgroup.openspg.builder.core.physical.operator.PythonOperatorFactory
|
||||
import com.antgroup.openspg.builder.core.physical.operator.protocol.EvalResult;
|
||||
import com.antgroup.openspg.builder.core.physical.operator.protocol.Vertex;
|
||||
import com.antgroup.openspg.builder.core.runtime.PropertyMounterException;
|
||||
import com.antgroup.openspg.builder.core.runtime.RuntimeContext;
|
||||
import com.antgroup.openspg.builder.core.runtime.BuilderContext;
|
||||
import com.antgroup.openspg.builder.core.semantic.PropertyMounter;
|
||||
import com.antgroup.openspg.builder.model.pipeline.config.OperatorPropertyMounterConfig;
|
||||
import com.antgroup.openspg.builder.model.record.property.SPGPropertyRecord;
|
||||
@ -30,7 +30,7 @@ public class OperatorPropertyMounter implements PropertyMounter {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(RuntimeContext context) throws BuilderException {
|
||||
public void init(BuilderContext context) throws BuilderException {
|
||||
operatorFactory = PythonOperatorFactory.getInstance();
|
||||
operatorFactory.init(context);
|
||||
operatorFactory.register(mounterConfig.getConfig());
|
||||
|
@ -2,7 +2,7 @@ package com.antgroup.openspg.builder.core.semantic.impl;
|
||||
|
||||
import com.antgroup.openspg.builder.model.BuilderException;
|
||||
import com.antgroup.openspg.builder.core.runtime.PropertyMounterException;
|
||||
import com.antgroup.openspg.builder.core.runtime.RuntimeContext;
|
||||
import com.antgroup.openspg.builder.core.runtime.BuilderContext;
|
||||
import com.antgroup.openspg.builder.core.semantic.PropertyMounter;
|
||||
import com.antgroup.openspg.builder.model.pipeline.config.SearchEnginePropertyMounterConfig;
|
||||
import com.antgroup.openspg.builder.model.record.property.SPGPropertyRecord;
|
||||
@ -16,10 +16,9 @@ public class SearchEnginePropertyMounter implements PropertyMounter {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(RuntimeContext context) throws BuilderException {}
|
||||
public void init(BuilderContext context) throws BuilderException {}
|
||||
|
||||
@Override
|
||||
public boolean propertyMount(SPGPropertyRecord record) throws PropertyMounterException {
|
||||
return true;
|
||||
public void propertyMount(SPGPropertyRecord record) throws PropertyMounterException {
|
||||
}
|
||||
}
|
||||
|
@ -35,5 +35,9 @@
|
||||
<groupId>org.jgrapht</groupId>
|
||||
<artifactId>jgrapht-core</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.danilopianini</groupId>
|
||||
<artifactId>gson-extras</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
|
@ -17,11 +17,11 @@ import com.antgroup.openspg.server.common.model.exception.OpenSPGException;
|
||||
|
||||
public class BuilderException extends OpenSPGException {
|
||||
|
||||
protected BuilderException(Throwable cause, String messagePattern, Object... args) {
|
||||
public BuilderException(Throwable cause, String messagePattern, Object... args) {
|
||||
super(cause, true, true, messagePattern, args);
|
||||
}
|
||||
|
||||
protected BuilderException(String messagePattern, Object... args) {
|
||||
public BuilderException(String messagePattern, Object... args) {
|
||||
this(null, messagePattern, args);
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,62 @@
|
||||
package com.antgroup.openspg.builder.model;
|
||||
|
||||
import com.antgroup.openspg.builder.model.pipeline.NodeTypeEnum;
|
||||
import com.antgroup.openspg.builder.model.pipeline.config.*;
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.GsonBuilder;
|
||||
import com.google.gson.typeadapters.RuntimeTypeAdapterFactory;
|
||||
import java.lang.reflect.Type;
|
||||
|
||||
public class BuilderJsonUtils {
|
||||
|
||||
public static final String DEFAULT_TYPE_FIELD_NAME = "@type";
|
||||
|
||||
public static Gson gson = null;
|
||||
|
||||
static {
|
||||
gson =
|
||||
new GsonBuilder()
|
||||
.registerTypeAdapterFactory(
|
||||
RuntimeTypeAdapterFactory.of(BaseNodeConfig.class, DEFAULT_TYPE_FIELD_NAME)
|
||||
.registerSubtype(CsvSourceNodeConfig.class, NodeTypeEnum.CSV_SOURCE.name())
|
||||
.registerSubtype(ExtractNodeConfig.class, NodeTypeEnum.EXTRACT.name())
|
||||
.registerSubtype(MappingNodeConfig.class, NodeTypeEnum.MAPPING.name())
|
||||
.registerSubtype(GraphStoreSinkNodeConfig.class, NodeTypeEnum.GRAPH_SINK.name())
|
||||
.recognizeSubtypes())
|
||||
.create();
|
||||
}
|
||||
|
||||
/**
|
||||
* Serialize the given Java object into JSON string.
|
||||
*
|
||||
* @param obj Object
|
||||
* @return String representation of the JSON
|
||||
*/
|
||||
public static String serialize(Object obj) {
|
||||
return gson.toJson(obj);
|
||||
}
|
||||
|
||||
/**
|
||||
* Deserialize the given JSON string to Java object.
|
||||
*
|
||||
* @param <T> Type
|
||||
* @param body The JSON string
|
||||
* @param type The class to deserialize into
|
||||
* @return The deserialized Java object
|
||||
*/
|
||||
public static <T> T deserialize(String body, Type type) {
|
||||
return gson.fromJson(body, type);
|
||||
}
|
||||
|
||||
/**
|
||||
* Deserialize the given JSON string to Java object.
|
||||
*
|
||||
* @param <T> Type
|
||||
* @param body The JSON string
|
||||
* @param clazz The class to deserialize into
|
||||
* @return The deserialized Java object
|
||||
*/
|
||||
public static <T> T deserialize(String body, Class<T> clazz) {
|
||||
return gson.fromJson(body, clazz);
|
||||
}
|
||||
}
|
@ -14,7 +14,9 @@
|
||||
package com.antgroup.openspg.builder.model.pipeline.config;
|
||||
|
||||
import com.antgroup.openspg.builder.model.pipeline.NodeTypeEnum;
|
||||
import lombok.Getter;
|
||||
|
||||
@Getter
|
||||
public abstract class BaseNodeConfig {
|
||||
|
||||
/** The type of the node. */
|
||||
@ -23,8 +25,4 @@ public abstract class BaseNodeConfig {
|
||||
public BaseNodeConfig(NodeTypeEnum type) {
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
public NodeTypeEnum getType() {
|
||||
return type;
|
||||
}
|
||||
}
|
||||
|
@ -1,31 +1,100 @@
|
||||
package com.antgroup.openspg.builder.runner.local;
|
||||
|
||||
import com.antgroup.openspg.builder.core.runtime.BuilderContext;
|
||||
import com.antgroup.openspg.builder.model.BuilderJsonUtils;
|
||||
import com.antgroup.openspg.builder.model.exception.PipelineConfigException;
|
||||
import com.antgroup.openspg.builder.model.pipeline.Pipeline;
|
||||
import com.antgroup.openspg.core.schema.model.type.ProjectSchema;
|
||||
import com.antgroup.openspg.server.api.facade.ApiResponse;
|
||||
import com.antgroup.openspg.server.api.facade.client.SchemaFacade;
|
||||
import com.antgroup.openspg.server.api.facade.dto.schema.request.ProjectSchemaRequest;
|
||||
import com.antgroup.openspg.server.api.http.client.HttpSchemaFacade;
|
||||
import com.antgroup.openspg.server.api.http.client.util.ConnectionInfo;
|
||||
import com.antgroup.openspg.server.api.http.client.util.HttpClientBootstrap;
|
||||
import org.apache.commons.cli.*;
|
||||
|
||||
public class LocalBuilderMain {
|
||||
|
||||
public static void main(String[] args) {
|
||||
private static final String PROJECT_ID_OPTION = "projectId";
|
||||
private static final String JOB_NAME_OPTION = "jobName";
|
||||
private static final String PIPELINE_OPTION = "pipeline";
|
||||
private static final String PYTHON_EXEC_OPTION = "pythonExec";
|
||||
private static final String PYTHON_PATHS_OPTION = "pythonPaths";
|
||||
private static final String SCHEMA_URL_OPTION = "schemaUrl";
|
||||
private static final String PARALLELISM_OPTION = "parallelism";
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
CommandLine commandLine = parseArgs(args);
|
||||
run(commandLine);
|
||||
}
|
||||
|
||||
public static CommandLine parseArgs(String[] args) {
|
||||
CommandLineParser parser = new DefaultParser();
|
||||
Options options = new Options();
|
||||
|
||||
options.addRequiredOption("prj", "project", true, "project");
|
||||
options.addRequiredOption("n", "jobName", true, "job name");
|
||||
options.addRequiredOption("p", "pipeline", true, "pipeline info");
|
||||
options.addRequiredOption("pe", "pythonExec", true, "python exec");
|
||||
options.addRequiredOption("pp", "pythonPaths", true, "python path");
|
||||
options.addOption("s", "schema", true, "schema info");
|
||||
options.addRequiredOption(PROJECT_ID_OPTION, null, true, "project id");
|
||||
options.addRequiredOption(JOB_NAME_OPTION, null, true, "job name");
|
||||
options.addRequiredOption(PIPELINE_OPTION, null, true, "pipeline info");
|
||||
options.addRequiredOption(PYTHON_EXEC_OPTION, null, true, "python exec");
|
||||
options.addRequiredOption(PYTHON_PATHS_OPTION, null, true, "python path");
|
||||
options.addRequiredOption(SCHEMA_URL_OPTION, null, true, "schema url");
|
||||
options.addOption(PARALLELISM_OPTION, null, true, "parallelism");
|
||||
|
||||
CommandLine commandLine = null;
|
||||
HelpFormatter helper = new HelpFormatter();
|
||||
try {
|
||||
commandLine = parser.parse(options, args);
|
||||
String pipelineStr = commandLine.getOptionValue("p");
|
||||
String schemaStr = commandLine.getOptionValue("s");
|
||||
|
||||
} catch (ParseException e) {
|
||||
System.out.println(e.getMessage());
|
||||
helper.printHelp("Usage: ", options);
|
||||
System.exit(0);
|
||||
}
|
||||
return commandLine;
|
||||
}
|
||||
|
||||
private static void run(CommandLine commandLine) throws Exception {
|
||||
long projectId = Long.parseLong(commandLine.getOptionValue(PROJECT_ID_OPTION));
|
||||
String jobName = commandLine.getOptionValue(JOB_NAME_OPTION);
|
||||
|
||||
String pipelineStr = commandLine.getOptionValue(PIPELINE_OPTION);
|
||||
Pipeline pipeline = BuilderJsonUtils.deserialize(pipelineStr, Pipeline.class);
|
||||
|
||||
String pythonExec = commandLine.getOptionValue(PYTHON_EXEC_OPTION);
|
||||
String pythonPaths = commandLine.getOptionValue(PYTHON_PATHS_OPTION);
|
||||
String schemaUrl = commandLine.getOptionValue(SCHEMA_URL_OPTION);
|
||||
|
||||
String parallelismStr = commandLine.getOptionValue(PARALLELISM_OPTION);
|
||||
int parallelism = (parallelismStr == null ? 1 : Integer.parseInt(parallelismStr));
|
||||
|
||||
ProjectSchema projectSchema = getProjectSchema(projectId, schemaUrl);
|
||||
BuilderContext builderContext =
|
||||
new BuilderContext()
|
||||
.setProjectId(projectId)
|
||||
.setJobName(jobName)
|
||||
.setProjectSchema(projectSchema)
|
||||
.setPythonExec(pythonExec)
|
||||
.setPythonPaths(pythonPaths);
|
||||
|
||||
LocalBuilderRunner runner = new LocalBuilderRunner(parallelism);
|
||||
runner.init(pipeline, builderContext);
|
||||
|
||||
try {
|
||||
runner.execute();
|
||||
} finally {
|
||||
runner.close();
|
||||
}
|
||||
}
|
||||
|
||||
private static ProjectSchema getProjectSchema(long projectId, String schemaUrl) {
|
||||
HttpClientBootstrap.init(
|
||||
new ConnectionInfo(schemaUrl).setConnectTimeout(6000).setReadTimeout(600000));
|
||||
|
||||
SchemaFacade schemaFacade = new HttpSchemaFacade();
|
||||
ApiResponse<ProjectSchema> response =
|
||||
schemaFacade.queryProjectSchema(new ProjectSchemaRequest(projectId));
|
||||
if (response.isSuccess()) {
|
||||
return response.getData();
|
||||
}
|
||||
throw new PipelineConfigException();
|
||||
}
|
||||
}
|
||||
|
@ -1,13 +1,13 @@
|
||||
package com.antgroup.openspg.builder.runner.local;
|
||||
|
||||
import com.antgroup.openspg.builder.model.BuilderException;
|
||||
import com.antgroup.openspg.builder.core.logical.LogicalPlan;
|
||||
import com.antgroup.openspg.builder.core.physical.PhysicalPlan;
|
||||
import com.antgroup.openspg.builder.core.runtime.BuilderContext;
|
||||
import com.antgroup.openspg.builder.core.runtime.BuilderExecutor;
|
||||
import com.antgroup.openspg.builder.core.runtime.BuilderRecordException;
|
||||
import com.antgroup.openspg.builder.core.runtime.BuilderRunner;
|
||||
import com.antgroup.openspg.builder.core.runtime.RuntimeContext;
|
||||
import com.antgroup.openspg.builder.core.runtime.impl.DefaultBuilderExecutor;
|
||||
import com.antgroup.openspg.builder.model.BuilderException;
|
||||
import com.antgroup.openspg.builder.model.pipeline.Pipeline;
|
||||
import com.antgroup.openspg.builder.model.record.BaseRecord;
|
||||
import com.antgroup.openspg.builder.runner.local.runtime.BuilderMetric;
|
||||
@ -17,10 +17,13 @@ import com.antgroup.openspg.builder.runner.local.sink.BaseSinkWriter;
|
||||
import com.antgroup.openspg.builder.runner.local.sink.SinkWriterFactory;
|
||||
import com.antgroup.openspg.builder.runner.local.source.BaseSourceReader;
|
||||
import com.antgroup.openspg.builder.runner.local.source.SourceReaderFactory;
|
||||
import com.antgroup.openspg.common.util.thread.ThreadUtils;
|
||||
import com.codahale.metrics.Counter;
|
||||
import com.codahale.metrics.Meter;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.commons.collections4.CollectionUtils;
|
||||
|
||||
/** 本地的知识构建runner,在本地执行构建任务 */
|
||||
@ -32,8 +35,17 @@ public class LocalBuilderRunner implements BuilderRunner {
|
||||
private BuilderMetric builderMetric = null;
|
||||
private ErrorRecordCollector errorRecordCollector = null;
|
||||
|
||||
private final int parallelism;
|
||||
private final ThreadPoolExecutor threadPoolExecutor;
|
||||
|
||||
public LocalBuilderRunner(int parallelism) {
|
||||
this.parallelism = parallelism;
|
||||
this.threadPoolExecutor =
|
||||
ThreadUtils.newDaemonFixedThreadPool(parallelism, "localBuilderRunner-");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(Pipeline pipeline, RuntimeContext context) throws BuilderException {
|
||||
public void init(Pipeline pipeline, BuilderContext context) throws BuilderException {
|
||||
LogicalPlan logicalPlan = LogicalPlan.parse(pipeline);
|
||||
sourceReader =
|
||||
logicalPlan.startNodes().stream()
|
||||
@ -59,19 +71,32 @@ public class LocalBuilderRunner implements BuilderRunner {
|
||||
public void execute() {
|
||||
Meter totalMeter = builderMetric.getTotalCnt();
|
||||
Counter errorCnt = builderMetric.getErrorCnt();
|
||||
List<BaseRecord> records = Collections.unmodifiableList(sourceReader.read());
|
||||
|
||||
while (CollectionUtils.isNotEmpty(records)) {
|
||||
totalMeter.mark(records.size());
|
||||
List<BaseRecord> results = null;
|
||||
try {
|
||||
results = builderExecutor.eval(records);
|
||||
} catch (BuilderRecordException e) {
|
||||
errorCnt.inc(records.size());
|
||||
// todo
|
||||
}
|
||||
sinkWriter.write(records);
|
||||
records = Collections.unmodifiableList(sourceReader.read());
|
||||
for (int i = 0; i < parallelism; i++) {
|
||||
threadPoolExecutor.execute(
|
||||
new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
List<BaseRecord> records = Collections.unmodifiableList(sourceReader.read());
|
||||
while (CollectionUtils.isNotEmpty(records)) {
|
||||
totalMeter.mark(records.size());
|
||||
List<BaseRecord> results = null;
|
||||
try {
|
||||
results = builderExecutor.eval(records);
|
||||
} catch (BuilderRecordException e) {
|
||||
errorCnt.inc(records.size());
|
||||
// todo
|
||||
}
|
||||
sinkWriter.write(results);
|
||||
records = Collections.unmodifiableList(sourceReader.read());
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
try {
|
||||
threadPoolExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
throw new BuilderException("", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -14,7 +14,7 @@
|
||||
package com.antgroup.openspg.builder.runner.local.sink.impl;
|
||||
|
||||
import com.antgroup.openspg.builder.core.physical.process.CheckProcessor;
|
||||
import com.antgroup.openspg.builder.core.runtime.RuntimeContext;
|
||||
import com.antgroup.openspg.builder.core.runtime.BuilderContext;
|
||||
import com.antgroup.openspg.builder.model.pipeline.config.GraphStoreSinkNodeConfig;
|
||||
import com.antgroup.openspg.builder.model.record.BaseRecord;
|
||||
import com.antgroup.openspg.builder.model.record.BaseSPGRecord;
|
||||
@ -54,7 +54,7 @@ public class GraphStoreSinkWriter extends BaseSinkWriter<GraphStoreSinkNodeConfi
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doInit(RuntimeContext context) throws Exception {
|
||||
public void doInit(BuilderContext context) throws Exception {
|
||||
if (context.getGraphStoreClient() != null) {
|
||||
graphStoreClient = context.getGraphStoreClient();
|
||||
} else {
|
||||
|
@ -1,7 +1,7 @@
|
||||
package com.antgroup.openspg.builder.runner.local.source.impl;
|
||||
|
||||
import com.antgroup.openspg.builder.model.record.BuilderRecord;
|
||||
import com.antgroup.openspg.builder.core.runtime.RuntimeContext;
|
||||
import com.antgroup.openspg.builder.core.runtime.BuilderContext;
|
||||
import com.antgroup.openspg.builder.model.pipeline.config.CsvSourceNodeConfig;
|
||||
import com.antgroup.openspg.builder.model.record.BaseRecord;
|
||||
import com.antgroup.openspg.builder.runner.local.source.BaseSourceReader;
|
||||
@ -27,7 +27,7 @@ public class CsvFileSourceReader extends BaseSourceReader<CsvSourceNodeConfig> {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doInit(RuntimeContext context) throws Exception {
|
||||
public void doInit(BuilderContext context) throws Exception {
|
||||
queue = new ArrayBlockingQueue<>(context.getBatchSize() * context.getParallelism());
|
||||
csvReader =
|
||||
new CSVReaderBuilder(new FileReader(config.getUrl()))
|
||||
|
@ -108,13 +108,6 @@ public class JSON {
|
||||
.registerSubtype(
|
||||
OperatorIdentifier.class, SPGIdentifierTypeEnum.OPERATOR.name())
|
||||
.recognizeSubtypes())
|
||||
.registerTypeAdapterFactory(
|
||||
RuntimeTypeAdapterFactory.of(BaseNodeConfig.class, DEFAULT_TYPE_FIELD_NAME)
|
||||
.registerSubtype(CsvSourceNodeConfig.class, NodeTypeEnum.CSV_SOURCE.name())
|
||||
.registerSubtype(ExtractNodeConfig.class, NodeTypeEnum.EXTRACT.name())
|
||||
.registerSubtype(MappingNodeConfig.class, NodeTypeEnum.MAPPING.name())
|
||||
.registerSubtype(GraphStoreSinkNodeConfig.class, NodeTypeEnum.GRAPH_SINK.name())
|
||||
.recognizeSubtypes())
|
||||
.registerTypeAdapterFactory(
|
||||
RuntimeTypeAdapterFactory.of(BaseBuilderResult.class, DEFAULT_TYPE_FIELD_NAME)
|
||||
.registerSubtype(FailureBuilderResult.class, JobInstStatusEnum.FAILURE.name())
|
||||
|
@ -13,16 +13,16 @@
|
||||
|
||||
package com.antgroup.openspg.cloudext.impl.computing.local.impl;
|
||||
|
||||
import static com.antgroup.openspg.builder.core.runtime.RuntimeContext.GRAPH_STORE_CONN_INFO;
|
||||
import static com.antgroup.openspg.builder.core.runtime.RuntimeContext.SEARCH_ENGINE_CONN_INFO;
|
||||
import static com.antgroup.openspg.builder.core.runtime.RuntimeContext.TABLE_STORE_CONN_INFO;
|
||||
import static com.antgroup.openspg.builder.core.runtime.BuilderContext.GRAPH_STORE_CONN_INFO;
|
||||
import static com.antgroup.openspg.builder.core.runtime.BuilderContext.SEARCH_ENGINE_CONN_INFO;
|
||||
import static com.antgroup.openspg.builder.core.runtime.BuilderContext.TABLE_STORE_CONN_INFO;
|
||||
|
||||
import com.antgroup.openspg.builder.core.physical.PhysicalPlan;
|
||||
import com.antgroup.openspg.builder.core.runtime.BuilderMetric;
|
||||
import com.antgroup.openspg.builder.core.runtime.BuilderStat;
|
||||
import com.antgroup.openspg.builder.core.runtime.PipelineExecutor;
|
||||
import com.antgroup.openspg.builder.core.runtime.RecordCollector;
|
||||
import com.antgroup.openspg.builder.core.runtime.RuntimeContext;
|
||||
import com.antgroup.openspg.builder.core.runtime.BuilderContext;
|
||||
import com.antgroup.openspg.builder.runner.local.runtime.impl.DefaultRecordCollector;
|
||||
import com.antgroup.openspg.cloudext.impl.computing.local.LocalBuilderExecutor;
|
||||
import com.antgroup.openspg.cloudext.interfaces.computing.cmd.BuilderJobCanSubmitQuery;
|
||||
@ -129,8 +129,8 @@ public class LocalBuilderExecutorImpl implements LocalBuilderExecutor {
|
||||
|
||||
Map<String, Object> params = buildParams(cmd);
|
||||
for (int i = 0; i < parallelism; i++) {
|
||||
RuntimeContext runtimeContext =
|
||||
new RuntimeContext(
|
||||
BuilderContext runtimeContext =
|
||||
new BuilderContext(
|
||||
jobInfo.getProjectId(),
|
||||
cmd.getSchemaUrl(),
|
||||
jobInfo.getJobName(),
|
||||
|
Loading…
x
Reference in New Issue
Block a user