diff --git a/.gitignore b/.gitignore index 8273b179..67b1d1ce 100644 --- a/.gitignore +++ b/.gitignore @@ -458,3 +458,4 @@ hs_err_pid* !/server/lib/spgreasoner-local-0.0.1.jar /logs/ **/spotless-index-file +**/pom.xml.versionsBackup diff --git a/reasoner/runner/local-runner/src/main/java/com/antgroup/openspg/reasoner/runner/local/callable/CallableWrapper.java b/reasoner/runner/local-runner/src/main/java/com/antgroup/openspg/reasoner/runner/local/callable/CallableWrapper.java new file mode 100644 index 00000000..7b3ff103 --- /dev/null +++ b/reasoner/runner/local-runner/src/main/java/com/antgroup/openspg/reasoner/runner/local/callable/CallableWrapper.java @@ -0,0 +1,20 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ + +package com.antgroup.openspg.reasoner.runner.local.callable; + +import java.util.concurrent.Callable; + +public interface CallableWrapper { + Callable wrap(Callable wrappedCallable); +} diff --git a/reasoner/runner/local-runner/src/main/java/com/antgroup/openspg/reasoner/runner/local/impl/LocalPropertyGraph.java b/reasoner/runner/local-runner/src/main/java/com/antgroup/openspg/reasoner/runner/local/impl/LocalPropertyGraph.java index 07b9eccc..8432e656 100644 --- a/reasoner/runner/local-runner/src/main/java/com/antgroup/openspg/reasoner/runner/local/impl/LocalPropertyGraph.java +++ b/reasoner/runner/local-runner/src/main/java/com/antgroup/openspg/reasoner/runner/local/impl/LocalPropertyGraph.java @@ -28,6 +28,7 @@ import com.antgroup.openspg.reasoner.lube.utils.transformer.impl.Expr2QlexpressT import com.antgroup.openspg.reasoner.recorder.EmptyRecorder; import com.antgroup.openspg.reasoner.recorder.IExecutionRecorder; import com.antgroup.openspg.reasoner.runner.ConfigKey; +import com.antgroup.openspg.reasoner.runner.local.callable.CallableWrapper; import com.antgroup.openspg.reasoner.runner.local.model.LocalReasonerTask; import com.antgroup.openspg.reasoner.runner.local.rdg.LocalRDG; import com.antgroup.openspg.reasoner.udf.rule.RuleRunner; @@ -85,6 +86,7 @@ public class LocalPropertyGraph implements PropertyGraph { result.setMaxPathLimit(getMaxPathLimit()); result.setStrictMaxPathLimit(getStrictMaxPathLimit()); result.setDisableDropOp(getDisableDropOp()); + result.setCallableWrapper(getCallableWrapper()); return result; } @@ -117,6 +119,7 @@ public class LocalPropertyGraph implements PropertyGraph { result.setMaxPathLimit(getMaxPathLimit()); result.setStrictMaxPathLimit(getStrictMaxPathLimit()); result.setDisableDropOp(getDisableDropOp()); + result.setCallableWrapper(getCallableWrapper()); return result; } @@ -167,6 +170,7 @@ public class LocalPropertyGraph implements PropertyGraph { result.setMaxPathLimit(getMaxPathLimit()); result.setStrictMaxPathLimit(getStrictMaxPathLimit()); result.setDisableDropOp(getDisableDropOp()); + result.setCallableWrapper(getCallableWrapper()); return result; } @@ -272,6 +276,17 @@ public class LocalPropertyGraph implements PropertyGraph { return "true".equals(String.valueOf(disableDropOpObj)); } + private CallableWrapper getCallableWrapper() { + CallableWrapper callableWrapper = null; + if (null != task && null != this.task.getParams()) { + Object obj = this.task.getParams().get(ConfigKey.REASONER_CALLABLE_WRAPPER); + if (obj instanceof CallableWrapper) { + callableWrapper = (CallableWrapper) obj; + } + } + return callableWrapper; + } + private IExecutionRecorder getExecutionRecorder() { if (null == task) { return new EmptyRecorder(); diff --git a/reasoner/runner/local-runner/src/main/java/com/antgroup/openspg/reasoner/runner/local/rdg/LocalRDG.java b/reasoner/runner/local-runner/src/main/java/com/antgroup/openspg/reasoner/runner/local/rdg/LocalRDG.java index fa8af6af..b83ad9a7 100644 --- a/reasoner/runner/local-runner/src/main/java/com/antgroup/openspg/reasoner/runner/local/rdg/LocalRDG.java +++ b/reasoner/runner/local-runner/src/main/java/com/antgroup/openspg/reasoner/runner/local/rdg/LocalRDG.java @@ -84,6 +84,7 @@ import com.antgroup.openspg.reasoner.recorder.EmptyRecorder; import com.antgroup.openspg.reasoner.recorder.IExecutionRecorder; import com.antgroup.openspg.reasoner.recorder.action.DebugInfoWithStartId; import com.antgroup.openspg.reasoner.recorder.action.SampleAction; +import com.antgroup.openspg.reasoner.runner.local.callable.CallableWrapper; import com.antgroup.openspg.reasoner.runner.local.model.LocalReasonerResult; import com.antgroup.openspg.reasoner.udf.model.UdtfMeta; import com.antgroup.openspg.reasoner.util.Convert2ScalaUtil; @@ -99,14 +100,14 @@ import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.Predicate; -import java.util.function.Supplier; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; @@ -157,6 +158,9 @@ public class LocalRDG extends RDG { /** disable drop op */ protected boolean disableDropOp = false; + /** callable wrapper */ + protected CallableWrapper callableWrapper = null; + private java.util.Set getStartId(java.util.List> kgGraphList) { java.util.Set startIdSet = new HashSet<>(); for (KgGraph kgGraph : kgGraphList) { @@ -245,35 +249,36 @@ public class LocalRDG extends RDG { long count = 0; java.util.List> newKgGraphList = new ArrayList<>(); - java.util.List>> futureList = new ArrayList<>(); + java.util.List>> futureList = new ArrayList<>(); patternMatcher.resetInitTime(); for (KgGraph kgGraphId : this.kgGraphList) { IVertexId id = kgGraphId.getVertex(this.startVertexAlias).get(0).getId(); - CompletableFuture> future = - CompletableFuture.supplyAsync( - new Supplier>() { - @Override - public KgGraph get() { - return patternMatcher.patternMatch( - id, - null, - null, - pattern, - rootVertexRuleList, - dstVertexRuleMap, - edgeRuleMap, - new HashMap<>(), - pattern.root().rule(), - edgeTypeRuleMap, - maxPathLimit, - true, - 60 * 1000); - } - }, - threadPoolExecutor); - futureList.add(future); + Callable> patternScanCallable = + new Callable>() { + @Override + public KgGraph call() throws Exception { + return patternMatcher.patternMatch( + id, + null, + null, + pattern, + rootVertexRuleList, + dstVertexRuleMap, + edgeRuleMap, + new HashMap<>(), + pattern.root().rule(), + edgeTypeRuleMap, + maxPathLimit, + true, + 60 * 1000); + } + }; + if (null != callableWrapper) { + patternScanCallable = callableWrapper.wrap(patternScanCallable); + } + futureList.add(threadPoolExecutor.submit(patternScanCallable)); } - for (CompletableFuture> future : futureList) { + for (Future> future : futureList) { KgGraph kgGraph; try { kgGraph = future.get(this.executorTimeoutMs, TimeUnit.MILLISECONDS); @@ -333,20 +338,21 @@ public class LocalRDG extends RDG { LinkEdgeImpl linkEdge = new LinkEdgeImpl( this.taskId, this.kgGraphSchema, staticParameters, pattern, udtfMeta, null, graphState); - java.util.List>>> futureList = - new ArrayList<>(); + java.util.List>>> futureList = new ArrayList<>(); for (KgGraph kgGraph : this.kgGraphList) { - CompletableFuture>> future = - CompletableFuture.supplyAsync( - new Supplier>>() { - @Override - public java.util.List> get() { - return linkEdge.link(kgGraph); - } - }); - futureList.add(future); + Callable>> linkExpandCallable = + new Callable>>() { + @Override + public java.util.List> call() throws Exception { + return linkEdge.link(kgGraph); + } + }; + if (null != callableWrapper) { + linkExpandCallable = callableWrapper.wrap(linkExpandCallable); + } + futureList.add(threadPoolExecutor.submit(linkExpandCallable)); } - for (CompletableFuture>> future : futureList) { + for (Future>> future : futureList) { java.util.List> splitedKgGraphList; try { splitedKgGraphList = future.get(this.executorTimeoutMs, TimeUnit.MILLISECONDS); @@ -354,7 +360,7 @@ public class LocalRDG extends RDG { log.warn("linkedExpandTimeout,funcName=" + pattern.edge().funcName(), e); continue; } catch (Exception e) { - throw new RuntimeException("patternScan error " + e.getMessage(), e); + throw new RuntimeException("linkedExpand error " + e.getMessage(), e); } if (CollectionUtils.isNotEmpty(splitedKgGraphList)) { KgGraph result = new KgGraphImpl(); @@ -388,7 +394,7 @@ public class LocalRDG extends RDG { return this; } - private CompletableFuture>> processKgGraphWithSameRoot( + private Future>> processKgGraphWithSameRoot( IVertexId rootId, java.util.List> sameRootKgGraphList, java.util.Set intersectionAliasSet, @@ -401,10 +407,10 @@ public class LocalRDG extends RDG { ThreadPoolExecutor threadPoolExecutor) { PartialGraphPattern beforeKgGraphSchema = this.kgGraphSchema; - return CompletableFuture.supplyAsync( - new Supplier>>() { + Callable>> expandIntoCallable = + new Callable>>() { @Override - public java.util.List> get() { + public java.util.List> call() throws Exception { if (null == rootId) { return null; } @@ -474,8 +480,11 @@ public class LocalRDG extends RDG { } return result; } - }, - threadPoolExecutor); + }; + if (null != callableWrapper) { + expandIntoCallable = callableWrapper.wrap(expandIntoCallable); + } + return threadPoolExecutor.submit(expandIntoCallable); } @Override @@ -511,8 +520,7 @@ public class LocalRDG extends RDG { java.util.Set intersectionAliasSet = RunnerUtil.getIntersectionAliasSet(this.kgGraphSchema, matchPattern); - java.util.List>>> futureList = - new ArrayList<>(); + java.util.List>>> futureList = new ArrayList<>(); java.util.List> sameRootKgGraphList = new ArrayList<>(); IVertexId lastVertexId = null; @@ -556,7 +564,7 @@ public class LocalRDG extends RDG { long count = 0; java.util.List> newKgGraphList = new ArrayList<>(); - for (CompletableFuture>> future : futureList) { + for (Future>> future : futureList) { java.util.List> resultKgGraph; try { resultKgGraph = future.get(this.executorTimeoutMs, TimeUnit.MILLISECONDS); @@ -1414,4 +1422,8 @@ public class LocalRDG extends RDG { public void setDisableDropOp(boolean disableDropOp) { this.disableDropOp = disableDropOp; } + + public void setCallableWrapper(CallableWrapper callableWrapper) { + this.callableWrapper = callableWrapper; + } } diff --git a/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/graphstate/impl/MemGraphState.java b/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/graphstate/impl/MemGraphState.java index bc2dcda8..b4f3c4b7 100644 --- a/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/graphstate/impl/MemGraphState.java +++ b/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/graphstate/impl/MemGraphState.java @@ -88,7 +88,7 @@ public class MemGraphState implements GraphState, IConceptTree { IVertexId id, Map property, MergeTypeEnum mergeType, Long version) { IVersionProperty iProperty = (IVersionProperty) vertexMap.get(id); if (iProperty == null) { - log.info("MemGraphState get vertex is null " + id.getInternalId() + " " + id.getType()); + log.debug("MemGraphState get vertex is null " + id.getInternalId() + " " + id.getType()); return; } for (String key : property.keySet()) { @@ -109,7 +109,7 @@ public class MemGraphState implements GraphState, IConceptTree { public IVertex getVertex(IVertexId id, Long version) { IVersionProperty iProperty = (IVersionProperty) vertexMap.get(id); if (iProperty == null) { - log.info("MemGraphState get vertex is null " + id.getInternalId() + " " + id.getType()); + log.debug("MemGraphState get vertex is null " + id.getInternalId() + " " + id.getType()); return null; } IVersionProperty resultValue = PropertyUtil.buildVertexProperty(id, null); diff --git a/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/runner/ConfigKey.java b/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/runner/ConfigKey.java index a70f10e8..ec353c7f 100644 --- a/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/runner/ConfigKey.java +++ b/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/runner/ConfigKey.java @@ -165,4 +165,7 @@ public class ConfigKey { /** disable drop */ public static final String REASONER_DISABLE_DROP_OP = "kg.reasoner.disable.drop.op"; + + /** set callable wrapper */ + public static final String REASONER_CALLABLE_WRAPPER = "kg.reasoner.callable.wrapper"; }