diff --git a/builder/core/pom.xml b/builder/core/pom.xml index f4f1eea5..dd6fb040 100644 --- a/builder/core/pom.xml +++ b/builder/core/pom.xml @@ -43,6 +43,10 @@ com.antgroup.openspg cloudext-interface-search-engine + + com.antgroup.openspg + cloudext-impl-search-engine-elasticsearch + com.antgroup.kg.reasoner local diff --git a/builder/core/src/main/java/com/antgroup/openspg/builder/core/physical/process/BaseMappingProcessor.java b/builder/core/src/main/java/com/antgroup/openspg/builder/core/physical/process/BaseMappingProcessor.java index 996495e6..3c0dc54e 100644 --- a/builder/core/src/main/java/com/antgroup/openspg/builder/core/physical/process/BaseMappingProcessor.java +++ b/builder/core/src/main/java/com/antgroup/openspg/builder/core/physical/process/BaseMappingProcessor.java @@ -1,5 +1,6 @@ package com.antgroup.openspg.builder.core.physical.process; +import com.antgroup.openspg.builder.core.runtime.BuilderCatalog; import com.antgroup.openspg.builder.model.pipeline.config.BaseMappingNodeConfig; import com.antgroup.openspg.builder.model.record.BuilderRecord; import com.antgroup.openspg.core.schema.model.BaseOntology; @@ -7,7 +8,6 @@ import com.antgroup.openspg.core.schema.model.identifier.BaseSPGIdentifier; import com.antgroup.openspg.core.schema.model.identifier.RelationIdentifier; import com.antgroup.openspg.core.schema.model.identifier.SPGIdentifierTypeEnum; import com.antgroup.openspg.core.schema.model.identifier.SPGTypeIdentifier; -import com.antgroup.openspg.core.schema.model.type.ProjectSchema; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -20,13 +20,13 @@ public abstract class BaseMappingProcessor super(id, name, config); } - protected BaseOntology loadSchema(BaseSPGIdentifier identifier, ProjectSchema projectSchema) { + protected BaseOntology loadSchema(BaseSPGIdentifier identifier, BuilderCatalog catalog) { SPGIdentifierTypeEnum identifierType = identifier.getIdentifierType(); switch (identifierType) { case SPG_TYPE: - return projectSchema.getByName((SPGTypeIdentifier) identifier); + return catalog.getSPGType((SPGTypeIdentifier) identifier); case RELATION: - return projectSchema.getByName((RelationIdentifier) identifier); + return catalog.getRelation((RelationIdentifier) identifier); default: throw new IllegalArgumentException("illegal identifier type=" + identifierType); } diff --git a/builder/core/src/main/java/com/antgroup/openspg/builder/core/physical/process/RelationMappingProcessor.java b/builder/core/src/main/java/com/antgroup/openspg/builder/core/physical/process/RelationMappingProcessor.java index 3af28cb5..bc71daa6 100644 --- a/builder/core/src/main/java/com/antgroup/openspg/builder/core/physical/process/RelationMappingProcessor.java +++ b/builder/core/src/main/java/com/antgroup/openspg/builder/core/physical/process/RelationMappingProcessor.java @@ -30,7 +30,7 @@ public class RelationMappingProcessor extends BaseMappingProcessor params = new HashMap<>(); + params.put("graphName", "default"); + params.put("timeout", "60000"); + params.put("accessId", "admin"); + params.put("accessKey", "73@TuGraph"); + params.put("host", "127.0.0.1:9090"); + + return new GraphStoreConnectionInfo().setScheme("tugraph").setParams(params); + } + + @Override + public SearchEngineConnectionInfo getSearchEngineConnInfo() { + Map params = new HashMap<>(); + params.put("host", "127.0.0.1"); + params.put("port", "9200"); + params.put("scheme", "http"); + + return new SearchEngineConnectionInfo().setScheme("elasticsearch").setParams(params); + } +} diff --git a/builder/model/src/main/java/com/antgroup/openspg/builder/model/pipeline/config/GraphStoreSinkNodeConfig.java b/builder/model/src/main/java/com/antgroup/openspg/builder/model/pipeline/config/GraphStoreSinkNodeConfig.java index 54dbefa1..7ad8b12d 100644 --- a/builder/model/src/main/java/com/antgroup/openspg/builder/model/pipeline/config/GraphStoreSinkNodeConfig.java +++ b/builder/model/src/main/java/com/antgroup/openspg/builder/model/pipeline/config/GraphStoreSinkNodeConfig.java @@ -14,17 +14,12 @@ package com.antgroup.openspg.builder.model.pipeline.config; import com.antgroup.openspg.builder.model.pipeline.enums.NodeTypeEnum; -import com.antgroup.openspg.server.common.model.datasource.connection.GraphStoreConnectionInfo; import lombok.Getter; @Getter public class GraphStoreSinkNodeConfig extends BaseNodeConfig { - /** The configuration information for graph storage. */ - private final GraphStoreConnectionInfo graphStoreConnectionInfo; - - public GraphStoreSinkNodeConfig(GraphStoreConnectionInfo graphStoreConnectionInfo) { + public GraphStoreSinkNodeConfig() { super(NodeTypeEnum.GRAPH_SINK); - this.graphStoreConnectionInfo = graphStoreConnectionInfo; } } diff --git a/builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/LocalBuilderMain.java b/builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/LocalBuilderMain.java index ccdf63f7..7dfb09fa 100644 --- a/builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/LocalBuilderMain.java +++ b/builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/LocalBuilderMain.java @@ -1,6 +1,7 @@ package com.antgroup.openspg.builder.runner.local; import com.antgroup.openspg.builder.core.runtime.BuilderContext; +import com.antgroup.openspg.builder.core.runtime.impl.DefaultBuilderCatalog; import com.antgroup.openspg.builder.model.BuilderJsonUtils; import com.antgroup.openspg.builder.model.exception.PipelineConfigException; import com.antgroup.openspg.builder.model.pipeline.Pipeline; @@ -23,6 +24,7 @@ public class LocalBuilderMain { private static final String PYTHON_PATHS_OPTION = "pythonPaths"; private static final String SCHEMA_URL_OPTION = "schemaUrl"; private static final String PARALLELISM_OPTION = "parallelism"; + private static final String ALTER_OPERATION_OPTION = "alterOperation"; public static void main(String[] args) throws Exception { CommandLine commandLine = parseArgs(args); @@ -40,6 +42,8 @@ public class LocalBuilderMain { options.addRequiredOption(PYTHON_PATHS_OPTION, PYTHON_PATHS_OPTION, true, "python path"); options.addRequiredOption(SCHEMA_URL_OPTION, SCHEMA_URL_OPTION, true, "schema url"); options.addOption(PARALLELISM_OPTION, PARALLELISM_OPTION, true, "parallelism"); + options.addOption( + ALTER_OPERATION_OPTION, ALTER_OPERATION_OPTION, true, "alter operation, upsert or delete"); CommandLine commandLine = null; HelpFormatter helper = new HelpFormatter(); @@ -67,15 +71,18 @@ public class LocalBuilderMain { String parallelismStr = commandLine.getOptionValue(PARALLELISM_OPTION); int parallelism = (parallelismStr == null ? 1 : Integer.parseInt(parallelismStr)); + String alterOperation = commandLine.getOptionValue(ALTER_OPERATION_OPTION); + RecordAlterOperationEnum alterOperationEnum = RecordAlterOperationEnum.valueOf(alterOperation); + ProjectSchema projectSchema = getProjectSchema(projectId, schemaUrl); BuilderContext builderContext = new BuilderContext() .setProjectId(projectId) .setJobName(jobName) - .setProjectSchema(projectSchema) + .setCatalog(new DefaultBuilderCatalog(projectSchema)) .setPythonExec(pythonExec) .setPythonPaths(pythonPaths) - .setOperation(RecordAlterOperationEnum.UPSERT); + .setOperation(alterOperationEnum); LocalBuilderRunner runner = new LocalBuilderRunner(parallelism); runner.init(pipeline, builderContext); diff --git a/builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/physical/sink/impl/GraphStoreSinkWriter.java b/builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/physical/sink/impl/GraphStoreSinkWriter.java index 1bfaefd2..5a4d11cc 100644 --- a/builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/physical/sink/impl/GraphStoreSinkWriter.java +++ b/builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/physical/sink/impl/GraphStoreSinkWriter.java @@ -28,6 +28,7 @@ import com.antgroup.openspg.builder.runner.local.physical.sink.BaseSinkWriter; import com.antgroup.openspg.cloudext.interfaces.graphstore.GraphStoreClient; import com.antgroup.openspg.cloudext.interfaces.graphstore.GraphStoreClientDriverManager; import com.antgroup.openspg.cloudext.interfaces.searchengine.SearchEngineClient; +import com.antgroup.openspg.cloudext.interfaces.searchengine.SearchEngineClientDriverManager; import com.antgroup.openspg.core.schema.model.BasicInfo; import com.antgroup.openspg.core.schema.model.identifier.SPGTypeIdentifier; import com.antgroup.openspg.core.schema.model.predicate.Property; @@ -56,7 +57,9 @@ public class GraphStoreSinkWriter extends BaseSinkWriter