This commit is contained in:
baifuyu 2023-12-10 15:02:45 +08:00
parent 77d4db04a4
commit d1e38a1c7e
11 changed files with 108 additions and 19 deletions

View File

@ -43,6 +43,10 @@
<groupId>com.antgroup.openspg</groupId>
<artifactId>cloudext-interface-search-engine</artifactId>
</dependency>
<dependency>
<groupId>com.antgroup.openspg</groupId>
<artifactId>cloudext-impl-search-engine-elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>com.antgroup.kg.reasoner</groupId>
<artifactId>local</artifactId>

View File

@ -1,5 +1,6 @@
package com.antgroup.openspg.builder.core.physical.process;
import com.antgroup.openspg.builder.core.runtime.BuilderCatalog;
import com.antgroup.openspg.builder.model.pipeline.config.BaseMappingNodeConfig;
import com.antgroup.openspg.builder.model.record.BuilderRecord;
import com.antgroup.openspg.core.schema.model.BaseOntology;
@ -7,7 +8,6 @@ import com.antgroup.openspg.core.schema.model.identifier.BaseSPGIdentifier;
import com.antgroup.openspg.core.schema.model.identifier.RelationIdentifier;
import com.antgroup.openspg.core.schema.model.identifier.SPGIdentifierTypeEnum;
import com.antgroup.openspg.core.schema.model.identifier.SPGTypeIdentifier;
import com.antgroup.openspg.core.schema.model.type.ProjectSchema;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -20,13 +20,13 @@ public abstract class BaseMappingProcessor<T extends BaseMappingNodeConfig>
super(id, name, config);
}
protected BaseOntology loadSchema(BaseSPGIdentifier identifier, ProjectSchema projectSchema) {
protected BaseOntology loadSchema(BaseSPGIdentifier identifier, BuilderCatalog catalog) {
SPGIdentifierTypeEnum identifierType = identifier.getIdentifierType();
switch (identifierType) {
case SPG_TYPE:
return projectSchema.getByName((SPGTypeIdentifier) identifier);
return catalog.getSPGType((SPGTypeIdentifier) identifier);
case RELATION:
return projectSchema.getByName((RelationIdentifier) identifier);
return catalog.getRelation((RelationIdentifier) identifier);
default:
throw new IllegalArgumentException("illegal identifier type=" + identifierType);
}

View File

@ -30,7 +30,7 @@ public class RelationMappingProcessor extends BaseMappingProcessor<RelationMappi
super.doInit(context);
RelationIdentifier identifier = RelationIdentifier.parse(config.getRelation());
this.relation = (Relation) loadSchema(identifier, context.getProjectSchema());
this.relation = (Relation) loadSchema(identifier, context.getCatalog());
this.recordNormalizer = new RecordNormalizerImpl(config.getMappingConfigs());
this.recordNormalizer.init(context);
}

View File

@ -45,7 +45,7 @@ public class SPGTypeMappingProcessor extends BaseMappingProcessor<SPGTypeMapping
super.doInit(context);
SPGTypeIdentifier identifier = SPGTypeIdentifier.parse(config.getSpgType());
this.spgType = (BaseSPGType) loadSchema(identifier, context.getProjectSchema());
this.spgType = (BaseSPGType) loadSchema(identifier, context.getCatalog());
this.recordNormalizer = new RecordNormalizerImpl(config.getMappingConfigs());
this.recordNormalizer.init(context);
}

View File

@ -55,7 +55,7 @@ public class SubgraphMappingProcessor extends BaseMappingProcessor<SubGraphMappi
private void setUpVariables(
BaseMappingNodeConfig mappingNodeConfig, BaseSPGIdentifier identifier) {
this.ontologies.put(identifier, loadSchema(identifier, context.getProjectSchema()));
this.ontologies.put(identifier, loadSchema(identifier, context.getCatalog()));
RecordNormalizerImpl propertyNormalizerFactory =
new RecordNormalizerImpl(mappingNodeConfig.getMappingConfigs());
propertyNormalizerFactory.init(context);

View File

@ -0,0 +1,22 @@
package com.antgroup.openspg.builder.core.runtime;
import com.antgroup.openspg.core.schema.model.identifier.RelationIdentifier;
import com.antgroup.openspg.core.schema.model.identifier.SPGTypeIdentifier;
import com.antgroup.openspg.core.schema.model.predicate.Relation;
import com.antgroup.openspg.core.schema.model.type.BaseSPGType;
import com.antgroup.openspg.server.common.model.datasource.connection.GraphStoreConnectionInfo;
import com.antgroup.openspg.server.common.model.datasource.connection.SearchEngineConnectionInfo;
import java.io.Serializable;
public interface BuilderCatalog extends Serializable {
boolean isSpreadable(SPGTypeIdentifier identifier);
BaseSPGType getSPGType(SPGTypeIdentifier identifier);
Relation getRelation(RelationIdentifier identifier);
SearchEngineConnectionInfo getSearchEngineConnInfo();
GraphStoreConnectionInfo getGraphStoreConnInfo();
}

View File

@ -14,7 +14,6 @@
package com.antgroup.openspg.builder.core.runtime;
import com.antgroup.openspg.builder.model.record.RecordAlterOperationEnum;
import com.antgroup.openspg.core.schema.model.type.ProjectSchema;
import java.io.Serializable;
import lombok.Getter;
import lombok.Setter;
@ -27,8 +26,8 @@ public class BuilderContext implements Serializable {
private long projectId;
private String jobName;
private ProjectSchema projectSchema;
private RecordAlterOperationEnum operation;
private BuilderCatalog catalog;
private String pythonExec;
private String pythonPaths;

View File

@ -0,0 +1,59 @@
package com.antgroup.openspg.builder.core.runtime.impl;
import com.antgroup.openspg.builder.core.runtime.BuilderCatalog;
import com.antgroup.openspg.core.schema.model.identifier.RelationIdentifier;
import com.antgroup.openspg.core.schema.model.identifier.SPGTypeIdentifier;
import com.antgroup.openspg.core.schema.model.predicate.Relation;
import com.antgroup.openspg.core.schema.model.type.BaseSPGType;
import com.antgroup.openspg.core.schema.model.type.ProjectSchema;
import com.antgroup.openspg.server.common.model.datasource.connection.GraphStoreConnectionInfo;
import com.antgroup.openspg.server.common.model.datasource.connection.SearchEngineConnectionInfo;
import java.util.HashMap;
import java.util.Map;
public class DefaultBuilderCatalog implements BuilderCatalog {
private final ProjectSchema projectSchema;
public DefaultBuilderCatalog(ProjectSchema projectSchema) {
this.projectSchema = projectSchema;
}
@Override
public boolean isSpreadable(SPGTypeIdentifier identifier) {
return projectSchema.getSpreadable(identifier);
}
@Override
public BaseSPGType getSPGType(SPGTypeIdentifier identifier) {
return projectSchema.getByName(identifier);
}
@Override
public Relation getRelation(RelationIdentifier identifier) {
return projectSchema.getByName(identifier);
}
@Override
public GraphStoreConnectionInfo getGraphStoreConnInfo() {
// "graphstore:tugraph://127.0.0.1:9090/default?timeout=60000&accessId=admin&accessKey=73@TuGraph";
Map<String, Object> params = new HashMap<>();
params.put("graphName", "default");
params.put("timeout", "60000");
params.put("accessId", "admin");
params.put("accessKey", "73@TuGraph");
params.put("host", "127.0.0.1:9090");
return new GraphStoreConnectionInfo().setScheme("tugraph").setParams(params);
}
@Override
public SearchEngineConnectionInfo getSearchEngineConnInfo() {
Map<String, Object> params = new HashMap<>();
params.put("host", "127.0.0.1");
params.put("port", "9200");
params.put("scheme", "http");
return new SearchEngineConnectionInfo().setScheme("elasticsearch").setParams(params);
}
}

View File

@ -14,17 +14,12 @@
package com.antgroup.openspg.builder.model.pipeline.config;
import com.antgroup.openspg.builder.model.pipeline.enums.NodeTypeEnum;
import com.antgroup.openspg.server.common.model.datasource.connection.GraphStoreConnectionInfo;
import lombok.Getter;
@Getter
public class GraphStoreSinkNodeConfig extends BaseNodeConfig {
/** The configuration information for graph storage. */
private final GraphStoreConnectionInfo graphStoreConnectionInfo;
public GraphStoreSinkNodeConfig(GraphStoreConnectionInfo graphStoreConnectionInfo) {
public GraphStoreSinkNodeConfig() {
super(NodeTypeEnum.GRAPH_SINK);
this.graphStoreConnectionInfo = graphStoreConnectionInfo;
}
}

View File

@ -1,6 +1,7 @@
package com.antgroup.openspg.builder.runner.local;
import com.antgroup.openspg.builder.core.runtime.BuilderContext;
import com.antgroup.openspg.builder.core.runtime.impl.DefaultBuilderCatalog;
import com.antgroup.openspg.builder.model.BuilderJsonUtils;
import com.antgroup.openspg.builder.model.exception.PipelineConfigException;
import com.antgroup.openspg.builder.model.pipeline.Pipeline;
@ -23,6 +24,7 @@ public class LocalBuilderMain {
private static final String PYTHON_PATHS_OPTION = "pythonPaths";
private static final String SCHEMA_URL_OPTION = "schemaUrl";
private static final String PARALLELISM_OPTION = "parallelism";
private static final String ALTER_OPERATION_OPTION = "alterOperation";
public static void main(String[] args) throws Exception {
CommandLine commandLine = parseArgs(args);
@ -40,6 +42,8 @@ public class LocalBuilderMain {
options.addRequiredOption(PYTHON_PATHS_OPTION, PYTHON_PATHS_OPTION, true, "python path");
options.addRequiredOption(SCHEMA_URL_OPTION, SCHEMA_URL_OPTION, true, "schema url");
options.addOption(PARALLELISM_OPTION, PARALLELISM_OPTION, true, "parallelism");
options.addOption(
ALTER_OPERATION_OPTION, ALTER_OPERATION_OPTION, true, "alter operation, upsert or delete");
CommandLine commandLine = null;
HelpFormatter helper = new HelpFormatter();
@ -67,15 +71,18 @@ public class LocalBuilderMain {
String parallelismStr = commandLine.getOptionValue(PARALLELISM_OPTION);
int parallelism = (parallelismStr == null ? 1 : Integer.parseInt(parallelismStr));
String alterOperation = commandLine.getOptionValue(ALTER_OPERATION_OPTION);
RecordAlterOperationEnum alterOperationEnum = RecordAlterOperationEnum.valueOf(alterOperation);
ProjectSchema projectSchema = getProjectSchema(projectId, schemaUrl);
BuilderContext builderContext =
new BuilderContext()
.setProjectId(projectId)
.setJobName(jobName)
.setProjectSchema(projectSchema)
.setCatalog(new DefaultBuilderCatalog(projectSchema))
.setPythonExec(pythonExec)
.setPythonPaths(pythonPaths)
.setOperation(RecordAlterOperationEnum.UPSERT);
.setOperation(alterOperationEnum);
LocalBuilderRunner runner = new LocalBuilderRunner(parallelism);
runner.init(pipeline, builderContext);

View File

@ -28,6 +28,7 @@ import com.antgroup.openspg.builder.runner.local.physical.sink.BaseSinkWriter;
import com.antgroup.openspg.cloudext.interfaces.graphstore.GraphStoreClient;
import com.antgroup.openspg.cloudext.interfaces.graphstore.GraphStoreClientDriverManager;
import com.antgroup.openspg.cloudext.interfaces.searchengine.SearchEngineClient;
import com.antgroup.openspg.cloudext.interfaces.searchengine.SearchEngineClientDriverManager;
import com.antgroup.openspg.core.schema.model.BasicInfo;
import com.antgroup.openspg.core.schema.model.identifier.SPGTypeIdentifier;
import com.antgroup.openspg.core.schema.model.predicate.Property;
@ -56,7 +57,9 @@ public class GraphStoreSinkWriter extends BaseSinkWriter<GraphStoreSinkNodeConfi
@Override
public void doInit(BuilderContext context) throws BuilderException {
graphStoreClient =
GraphStoreClientDriverManager.getClient(config.getGraphStoreConnectionInfo());
GraphStoreClientDriverManager.getClient(context.getCatalog().getGraphStoreConnInfo());
searchEngineClient =
SearchEngineClientDriverManager.getClient(context.getCatalog().getSearchEngineConnInfo());
checkProcessor = new CheckProcessor();
checkProcessor.init(context);
}
@ -106,7 +109,7 @@ public class GraphStoreSinkWriter extends BaseSinkWriter<GraphStoreSinkNodeConfi
}
SPGTypeIdentifier spgTypeIdentifier =
property.getObjectTypeRef().getBaseSpgIdentifier();
if (!context.getProjectSchema().getSpreadable(spgTypeIdentifier)) {
if (!context.getCatalog().isSpreadable(spgTypeIdentifier)) {
Property propertyType = ((SPGPropertyRecord) property).getProperty();
propertyType.setObjectTypeRef(TEXT_REF);
}