Merge branch 'master' into add_multi_edge_instance

This commit is contained in:
wenchengyao 2024-04-29 14:07:39 +08:00
commit a6a0160f65
6 changed files with 102 additions and 51 deletions

1
.gitignore vendored
View File

@ -458,3 +458,4 @@ hs_err_pid*
!/server/lib/spgreasoner-local-0.0.1.jar
/logs/
**/spotless-index-file
**/pom.xml.versionsBackup

View File

@ -0,0 +1,20 @@
/*
* Copyright 2023 OpenSPG Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied.
*/
package com.antgroup.openspg.reasoner.runner.local.callable;
import java.util.concurrent.Callable;
public interface CallableWrapper {
<T> Callable<T> wrap(Callable<T> wrappedCallable);
}

View File

@ -28,6 +28,7 @@ import com.antgroup.openspg.reasoner.lube.utils.transformer.impl.Expr2QlexpressT
import com.antgroup.openspg.reasoner.recorder.EmptyRecorder;
import com.antgroup.openspg.reasoner.recorder.IExecutionRecorder;
import com.antgroup.openspg.reasoner.runner.ConfigKey;
import com.antgroup.openspg.reasoner.runner.local.callable.CallableWrapper;
import com.antgroup.openspg.reasoner.runner.local.model.LocalReasonerTask;
import com.antgroup.openspg.reasoner.runner.local.rdg.LocalRDG;
import com.antgroup.openspg.reasoner.udf.rule.RuleRunner;
@ -85,6 +86,7 @@ public class LocalPropertyGraph implements PropertyGraph<LocalRDG> {
result.setMaxPathLimit(getMaxPathLimit());
result.setStrictMaxPathLimit(getStrictMaxPathLimit());
result.setDisableDropOp(getDisableDropOp());
result.setCallableWrapper(getCallableWrapper());
return result;
}
@ -117,6 +119,7 @@ public class LocalPropertyGraph implements PropertyGraph<LocalRDG> {
result.setMaxPathLimit(getMaxPathLimit());
result.setStrictMaxPathLimit(getStrictMaxPathLimit());
result.setDisableDropOp(getDisableDropOp());
result.setCallableWrapper(getCallableWrapper());
return result;
}
@ -167,6 +170,7 @@ public class LocalPropertyGraph implements PropertyGraph<LocalRDG> {
result.setMaxPathLimit(getMaxPathLimit());
result.setStrictMaxPathLimit(getStrictMaxPathLimit());
result.setDisableDropOp(getDisableDropOp());
result.setCallableWrapper(getCallableWrapper());
return result;
}
@ -272,6 +276,17 @@ public class LocalPropertyGraph implements PropertyGraph<LocalRDG> {
return "true".equals(String.valueOf(disableDropOpObj));
}
private CallableWrapper getCallableWrapper() {
CallableWrapper callableWrapper = null;
if (null != task && null != this.task.getParams()) {
Object obj = this.task.getParams().get(ConfigKey.REASONER_CALLABLE_WRAPPER);
if (obj instanceof CallableWrapper) {
callableWrapper = (CallableWrapper) obj;
}
}
return callableWrapper;
}
private IExecutionRecorder getExecutionRecorder() {
if (null == task) {
return new EmptyRecorder();

View File

@ -84,6 +84,7 @@ import com.antgroup.openspg.reasoner.recorder.EmptyRecorder;
import com.antgroup.openspg.reasoner.recorder.IExecutionRecorder;
import com.antgroup.openspg.reasoner.recorder.action.DebugInfoWithStartId;
import com.antgroup.openspg.reasoner.recorder.action.SampleAction;
import com.antgroup.openspg.reasoner.runner.local.callable.CallableWrapper;
import com.antgroup.openspg.reasoner.runner.local.model.LocalReasonerResult;
import com.antgroup.openspg.reasoner.udf.model.UdtfMeta;
import com.antgroup.openspg.reasoner.util.Convert2ScalaUtil;
@ -99,14 +100,14 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
@ -157,6 +158,9 @@ public class LocalRDG extends RDG<LocalRDG> {
/** disable drop op */
protected boolean disableDropOp = false;
/** callable wrapper */
protected CallableWrapper callableWrapper = null;
private java.util.Set<IVertexId> getStartId(java.util.List<KgGraph<IVertexId>> kgGraphList) {
java.util.Set<IVertexId> startIdSet = new HashSet<>();
for (KgGraph<IVertexId> kgGraph : kgGraphList) {
@ -245,15 +249,14 @@ public class LocalRDG extends RDG<LocalRDG> {
long count = 0;
java.util.List<KgGraph<IVertexId>> newKgGraphList = new ArrayList<>();
java.util.List<CompletableFuture<KgGraph<IVertexId>>> futureList = new ArrayList<>();
java.util.List<Future<KgGraph<IVertexId>>> futureList = new ArrayList<>();
patternMatcher.resetInitTime();
for (KgGraph<IVertexId> kgGraphId : this.kgGraphList) {
IVertexId id = kgGraphId.getVertex(this.startVertexAlias).get(0).getId();
CompletableFuture<KgGraph<IVertexId>> future =
CompletableFuture.supplyAsync(
new Supplier<KgGraph<IVertexId>>() {
Callable<KgGraph<IVertexId>> patternScanCallable =
new Callable<KgGraph<IVertexId>>() {
@Override
public KgGraph<IVertexId> get() {
public KgGraph<IVertexId> call() throws Exception {
return patternMatcher.patternMatch(
id,
null,
@ -269,11 +272,13 @@ public class LocalRDG extends RDG<LocalRDG> {
true,
60 * 1000);
}
},
threadPoolExecutor);
futureList.add(future);
};
if (null != callableWrapper) {
patternScanCallable = callableWrapper.wrap(patternScanCallable);
}
for (CompletableFuture<KgGraph<IVertexId>> future : futureList) {
futureList.add(threadPoolExecutor.submit(patternScanCallable));
}
for (Future<KgGraph<IVertexId>> future : futureList) {
KgGraph<IVertexId> kgGraph;
try {
kgGraph = future.get(this.executorTimeoutMs, TimeUnit.MILLISECONDS);
@ -333,20 +338,21 @@ public class LocalRDG extends RDG<LocalRDG> {
LinkEdgeImpl linkEdge =
new LinkEdgeImpl(
this.taskId, this.kgGraphSchema, staticParameters, pattern, udtfMeta, null, graphState);
java.util.List<CompletableFuture<java.util.List<KgGraph<IVertexId>>>> futureList =
new ArrayList<>();
java.util.List<Future<java.util.List<KgGraph<IVertexId>>>> futureList = new ArrayList<>();
for (KgGraph<IVertexId> kgGraph : this.kgGraphList) {
CompletableFuture<java.util.List<KgGraph<IVertexId>>> future =
CompletableFuture.supplyAsync(
new Supplier<java.util.List<KgGraph<IVertexId>>>() {
Callable<java.util.List<KgGraph<IVertexId>>> linkExpandCallable =
new Callable<java.util.List<KgGraph<IVertexId>>>() {
@Override
public java.util.List<KgGraph<IVertexId>> get() {
public java.util.List<KgGraph<IVertexId>> call() throws Exception {
return linkEdge.link(kgGraph);
}
});
futureList.add(future);
};
if (null != callableWrapper) {
linkExpandCallable = callableWrapper.wrap(linkExpandCallable);
}
for (CompletableFuture<java.util.List<KgGraph<IVertexId>>> future : futureList) {
futureList.add(threadPoolExecutor.submit(linkExpandCallable));
}
for (Future<java.util.List<KgGraph<IVertexId>>> future : futureList) {
java.util.List<KgGraph<IVertexId>> splitedKgGraphList;
try {
splitedKgGraphList = future.get(this.executorTimeoutMs, TimeUnit.MILLISECONDS);
@ -354,7 +360,7 @@ public class LocalRDG extends RDG<LocalRDG> {
log.warn("linkedExpandTimeout,funcName=" + pattern.edge().funcName(), e);
continue;
} catch (Exception e) {
throw new RuntimeException("patternScan error " + e.getMessage(), e);
throw new RuntimeException("linkedExpand error " + e.getMessage(), e);
}
if (CollectionUtils.isNotEmpty(splitedKgGraphList)) {
KgGraph<IVertexId> result = new KgGraphImpl();
@ -388,7 +394,7 @@ public class LocalRDG extends RDG<LocalRDG> {
return this;
}
private CompletableFuture<java.util.List<KgGraph<IVertexId>>> processKgGraphWithSameRoot(
private Future<java.util.List<KgGraph<IVertexId>>> processKgGraphWithSameRoot(
IVertexId rootId,
java.util.List<KgGraph<IVertexId>> sameRootKgGraphList,
java.util.Set<String> intersectionAliasSet,
@ -401,10 +407,10 @@ public class LocalRDG extends RDG<LocalRDG> {
ThreadPoolExecutor threadPoolExecutor) {
PartialGraphPattern beforeKgGraphSchema = this.kgGraphSchema;
return CompletableFuture.supplyAsync(
new Supplier<java.util.List<KgGraph<IVertexId>>>() {
Callable<java.util.List<KgGraph<IVertexId>>> expandIntoCallable =
new Callable<java.util.List<KgGraph<IVertexId>>>() {
@Override
public java.util.List<KgGraph<IVertexId>> get() {
public java.util.List<KgGraph<IVertexId>> call() throws Exception {
if (null == rootId) {
return null;
}
@ -474,8 +480,11 @@ public class LocalRDG extends RDG<LocalRDG> {
}
return result;
}
},
threadPoolExecutor);
};
if (null != callableWrapper) {
expandIntoCallable = callableWrapper.wrap(expandIntoCallable);
}
return threadPoolExecutor.submit(expandIntoCallable);
}
@Override
@ -511,8 +520,7 @@ public class LocalRDG extends RDG<LocalRDG> {
java.util.Set<String> intersectionAliasSet =
RunnerUtil.getIntersectionAliasSet(this.kgGraphSchema, matchPattern);
java.util.List<CompletableFuture<java.util.List<KgGraph<IVertexId>>>> futureList =
new ArrayList<>();
java.util.List<Future<java.util.List<KgGraph<IVertexId>>>> futureList = new ArrayList<>();
java.util.List<KgGraph<IVertexId>> sameRootKgGraphList = new ArrayList<>();
IVertexId lastVertexId = null;
@ -556,7 +564,7 @@ public class LocalRDG extends RDG<LocalRDG> {
long count = 0;
java.util.List<KgGraph<IVertexId>> newKgGraphList = new ArrayList<>();
for (CompletableFuture<java.util.List<KgGraph<IVertexId>>> future : futureList) {
for (Future<java.util.List<KgGraph<IVertexId>>> future : futureList) {
java.util.List<KgGraph<IVertexId>> resultKgGraph;
try {
resultKgGraph = future.get(this.executorTimeoutMs, TimeUnit.MILLISECONDS);
@ -1414,4 +1422,8 @@ public class LocalRDG extends RDG<LocalRDG> {
public void setDisableDropOp(boolean disableDropOp) {
this.disableDropOp = disableDropOp;
}
public void setCallableWrapper(CallableWrapper callableWrapper) {
this.callableWrapper = callableWrapper;
}
}

View File

@ -88,7 +88,7 @@ public class MemGraphState implements GraphState<IVertexId>, IConceptTree {
IVertexId id, Map<String, Object> property, MergeTypeEnum mergeType, Long version) {
IVersionProperty iProperty = (IVersionProperty) vertexMap.get(id);
if (iProperty == null) {
log.info("MemGraphState get vertex is null " + id.getInternalId() + " " + id.getType());
log.debug("MemGraphState get vertex is null " + id.getInternalId() + " " + id.getType());
return;
}
for (String key : property.keySet()) {
@ -109,7 +109,7 @@ public class MemGraphState implements GraphState<IVertexId>, IConceptTree {
public IVertex<IVertexId, IProperty> getVertex(IVertexId id, Long version) {
IVersionProperty iProperty = (IVersionProperty) vertexMap.get(id);
if (iProperty == null) {
log.info("MemGraphState get vertex is null " + id.getInternalId() + " " + id.getType());
log.debug("MemGraphState get vertex is null " + id.getInternalId() + " " + id.getType());
return null;
}
IVersionProperty resultValue = PropertyUtil.buildVertexProperty(id, null);

View File

@ -165,4 +165,7 @@ public class ConfigKey {
/** disable drop */
public static final String REASONER_DISABLE_DROP_OP = "kg.reasoner.disable.drop.op";
/** set callable wrapper */
public static final String REASONER_CALLABLE_WRAPPER = "kg.reasoner.callable.wrapper";
}