From b3d00a2a075cb8eb67e9545657af0bae038d926f Mon Sep 17 00:00:00 2001 From: Donghai Date: Wed, 24 Apr 2024 10:10:23 +0800 Subject: [PATCH] feat(reasoner): udf load from multiple package path (#200) Co-authored-by: peilong Co-authored-by: wenchengyao Co-authored-by: FishJoy --- .../catalog/impl/struct/PropertyMeta.java | 3 + .../impl/struct/RelationTypeDetail.java | 3 + .../catalog/impl/struct/VertexMeta.java | 3 + .../openspg/reasoner/common/Utils.java | 1 + .../graph/property/impl/EdgeProperty.java | 2 +- .../graph/property/impl/VertexProperty.java | 2 +- .../property/impl/VertexVersionProperty.java | 2 +- .../runner/local/impl/LocalPropertyGraph.java | 16 +++- .../reasoner/runner/local/rdg/LocalRDG.java | 39 +++++++- .../reasoner/rdg/common/LinkEdgeImpl.java | 14 ++- .../openspg/reasoner/runner/ConfigKey.java | 3 + .../reasoner/session/KGReasonerSession.scala | 34 ++++--- .../openspg/reasoner/udf/impl/UdfMngImpl.java | 88 ++++++++++++++++--- .../common/config/GraphLoaderConfig.java | 11 +++ 14 files changed, 185 insertions(+), 36 deletions(-) diff --git a/reasoner/catalog/openspg-catalog/src/main/java/com/antgroup/openspg/reasoner/catalog/impl/struct/PropertyMeta.java b/reasoner/catalog/openspg-catalog/src/main/java/com/antgroup/openspg/reasoner/catalog/impl/struct/PropertyMeta.java index 10ec9fab..d6e91169 100644 --- a/reasoner/catalog/openspg-catalog/src/main/java/com/antgroup/openspg/reasoner/catalog/impl/struct/PropertyMeta.java +++ b/reasoner/catalog/openspg-catalog/src/main/java/com/antgroup/openspg/reasoner/catalog/impl/struct/PropertyMeta.java @@ -24,6 +24,9 @@ public class PropertyMeta { @JSONField(name = "name") private String name; + @JSONField(name = "nameZh") + private String nameZh; + @JSONField(name = "attrRangeDetail") private PropertyRangeDetail propRange; diff --git a/reasoner/catalog/openspg-catalog/src/main/java/com/antgroup/openspg/reasoner/catalog/impl/struct/RelationTypeDetail.java b/reasoner/catalog/openspg-catalog/src/main/java/com/antgroup/openspg/reasoner/catalog/impl/struct/RelationTypeDetail.java index eb41c2d4..9414d065 100644 --- a/reasoner/catalog/openspg-catalog/src/main/java/com/antgroup/openspg/reasoner/catalog/impl/struct/RelationTypeDetail.java +++ b/reasoner/catalog/openspg-catalog/src/main/java/com/antgroup/openspg/reasoner/catalog/impl/struct/RelationTypeDetail.java @@ -25,6 +25,9 @@ public class RelationTypeDetail { @JSONField(name = "name") private String name; + @JSONField(name = "nameZh") + private String nameZh; + @JSONField(name = "startEntityTypeDetail") private VertexMeta startEntityType; diff --git a/reasoner/catalog/openspg-catalog/src/main/java/com/antgroup/openspg/reasoner/catalog/impl/struct/VertexMeta.java b/reasoner/catalog/openspg-catalog/src/main/java/com/antgroup/openspg/reasoner/catalog/impl/struct/VertexMeta.java index 0a602284..916e0ae9 100644 --- a/reasoner/catalog/openspg-catalog/src/main/java/com/antgroup/openspg/reasoner/catalog/impl/struct/VertexMeta.java +++ b/reasoner/catalog/openspg-catalog/src/main/java/com/antgroup/openspg/reasoner/catalog/impl/struct/VertexMeta.java @@ -25,6 +25,9 @@ public class VertexMeta { @JSONField(name = "name") private String name; + @JSONField(name = "nameZh") + private String nameZh; + @JSONField(name = "attributeTypeDetailList") private List attributeTypeDetailList; diff --git a/reasoner/common/src/main/java/com/antgroup/openspg/reasoner/common/Utils.java b/reasoner/common/src/main/java/com/antgroup/openspg/reasoner/common/Utils.java index 15685b34..083cdf59 100644 --- a/reasoner/common/src/main/java/com/antgroup/openspg/reasoner/common/Utils.java +++ b/reasoner/common/src/main/java/com/antgroup/openspg/reasoner/common/Utils.java @@ -84,6 +84,7 @@ public class Utils { case "java.util.Date": return KTDate$.MODULE$; case "java.util.List": + case "java.util.List": return new KTList(KTObject$.MODULE$); case "java.util.List": return new KTList(null); diff --git a/reasoner/common/src/main/java/com/antgroup/openspg/reasoner/common/graph/property/impl/EdgeProperty.java b/reasoner/common/src/main/java/com/antgroup/openspg/reasoner/common/graph/property/impl/EdgeProperty.java index d3045a48..6341eb04 100644 --- a/reasoner/common/src/main/java/com/antgroup/openspg/reasoner/common/graph/property/impl/EdgeProperty.java +++ b/reasoner/common/src/main/java/com/antgroup/openspg/reasoner/common/graph/property/impl/EdgeProperty.java @@ -21,7 +21,7 @@ import java.util.HashMap; import java.util.Map; public class EdgeProperty implements IProperty { - private final Map props; + protected final Map props; /** * new edge property with property data diff --git a/reasoner/common/src/main/java/com/antgroup/openspg/reasoner/common/graph/property/impl/VertexProperty.java b/reasoner/common/src/main/java/com/antgroup/openspg/reasoner/common/graph/property/impl/VertexProperty.java index 59435f2f..077442b8 100644 --- a/reasoner/common/src/main/java/com/antgroup/openspg/reasoner/common/graph/property/impl/VertexProperty.java +++ b/reasoner/common/src/main/java/com/antgroup/openspg/reasoner/common/graph/property/impl/VertexProperty.java @@ -20,7 +20,7 @@ import java.util.HashMap; import java.util.Map; public class VertexProperty implements IProperty { - private final Map props; + protected final Map props; public VertexProperty() { this.props = new HashMap<>(); diff --git a/reasoner/common/src/main/java/com/antgroup/openspg/reasoner/common/graph/property/impl/VertexVersionProperty.java b/reasoner/common/src/main/java/com/antgroup/openspg/reasoner/common/graph/property/impl/VertexVersionProperty.java index eaa19093..3c557f07 100644 --- a/reasoner/common/src/main/java/com/antgroup/openspg/reasoner/common/graph/property/impl/VertexVersionProperty.java +++ b/reasoner/common/src/main/java/com/antgroup/openspg/reasoner/common/graph/property/impl/VertexVersionProperty.java @@ -27,7 +27,7 @@ import java.util.stream.Collectors; import scala.Tuple2; public class VertexVersionProperty implements IVersionProperty { - private final Map> props; + protected final Map> props; /** default constructor */ public VertexVersionProperty() { diff --git a/reasoner/runner/local-runner/src/main/java/com/antgroup/openspg/reasoner/runner/local/impl/LocalPropertyGraph.java b/reasoner/runner/local-runner/src/main/java/com/antgroup/openspg/reasoner/runner/local/impl/LocalPropertyGraph.java index 9065cbb6..07b9eccc 100644 --- a/reasoner/runner/local-runner/src/main/java/com/antgroup/openspg/reasoner/runner/local/impl/LocalPropertyGraph.java +++ b/reasoner/runner/local-runner/src/main/java/com/antgroup/openspg/reasoner/runner/local/impl/LocalPropertyGraph.java @@ -18,6 +18,7 @@ import com.antgroup.openspg.reasoner.common.graph.vertex.IVertex; import com.antgroup.openspg.reasoner.common.graph.vertex.IVertexId; import com.antgroup.openspg.reasoner.common.graph.vertex.impl.MirrorVertex; import com.antgroup.openspg.reasoner.common.graph.vertex.impl.NoneVertex; +import com.antgroup.openspg.reasoner.common.graph.vertex.impl.VertexBizId; import com.antgroup.openspg.reasoner.graphstate.GraphState; import com.antgroup.openspg.reasoner.kggraph.KgGraph; import com.antgroup.openspg.reasoner.lube.common.expr.Expr; @@ -83,6 +84,7 @@ public class LocalPropertyGraph implements PropertyGraph { isCarryTraversalGraph); result.setMaxPathLimit(getMaxPathLimit()); result.setStrictMaxPathLimit(getStrictMaxPathLimit()); + result.setDisableDropOp(getDisableDropOp()); return result; } @@ -114,6 +116,7 @@ public class LocalPropertyGraph implements PropertyGraph { false); result.setMaxPathLimit(getMaxPathLimit()); result.setStrictMaxPathLimit(getStrictMaxPathLimit()); + result.setDisableDropOp(getDisableDropOp()); return result; } @@ -144,7 +147,7 @@ public class LocalPropertyGraph implements PropertyGraph { } for (String type : JavaConversions.asJavaCollection(types)) { for (String idStr : idStrList) { - startIdSet.add(IVertexId.from(idStr, type)); + startIdSet.add(new VertexBizId(idStr, type)); } } if (startIdSet.isEmpty()) { @@ -160,9 +163,10 @@ public class LocalPropertyGraph implements PropertyGraph { getTaskId(), // subquery can not carry all graph getExecutionRecorder(), - false); + isCarryTraversalGraph); result.setMaxPathLimit(getMaxPathLimit()); result.setStrictMaxPathLimit(getStrictMaxPathLimit()); + result.setDisableDropOp(getDisableDropOp()); return result; } @@ -260,6 +264,14 @@ public class LocalPropertyGraph implements PropertyGraph { return Long.parseLong(String.valueOf(maxPathLimitObj)); } + private boolean getDisableDropOp() { + Object disableDropOpObj = null; + if (null != task && null != this.task.getParams()) { + disableDropOpObj = this.task.getParams().get(ConfigKey.REASONER_DISABLE_DROP_OP); + } + return "true".equals(String.valueOf(disableDropOpObj)); + } + private IExecutionRecorder getExecutionRecorder() { if (null == task) { return new EmptyRecorder(); diff --git a/reasoner/runner/local-runner/src/main/java/com/antgroup/openspg/reasoner/runner/local/rdg/LocalRDG.java b/reasoner/runner/local-runner/src/main/java/com/antgroup/openspg/reasoner/runner/local/rdg/LocalRDG.java index 919f6692..4556447d 100644 --- a/reasoner/runner/local-runner/src/main/java/com/antgroup/openspg/reasoner/runner/local/rdg/LocalRDG.java +++ b/reasoner/runner/local-runner/src/main/java/com/antgroup/openspg/reasoner/runner/local/rdg/LocalRDG.java @@ -102,6 +102,7 @@ import java.util.Iterator; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.Predicate; @@ -153,6 +154,9 @@ public class LocalRDG extends RDG { /** carry all tranversal graph data */ protected boolean isCarryTraversalGraph = false; + /** disable drop op */ + protected boolean disableDropOp = false; + private java.util.Set getStartId(java.util.List> kgGraphList) { java.util.Set startIdSet = new HashSet<>(); for (KgGraph kgGraph : kgGraphList) { @@ -317,6 +321,7 @@ public class LocalRDG extends RDG { @Override public LocalRDG linkedExpand(EdgePattern pattern) { + long startTime = System.currentTimeMillis(); java.util.List> newKgGraphList = new ArrayList<>(); UdtfMeta udtfMeta = RunnerUtil.chooseUdtfMeta(pattern); @@ -328,8 +333,29 @@ public class LocalRDG extends RDG { LinkEdgeImpl linkEdge = new LinkEdgeImpl( this.taskId, this.kgGraphSchema, staticParameters, pattern, udtfMeta, null, graphState); + java.util.List>>> futureList = + new ArrayList<>(); for (KgGraph kgGraph : this.kgGraphList) { - java.util.List> splitedKgGraphList = linkEdge.link(kgGraph); + CompletableFuture>> future = + CompletableFuture.supplyAsync( + new Supplier>>() { + @Override + public java.util.List> get() { + return linkEdge.link(kgGraph); + } + }); + futureList.add(future); + } + for (CompletableFuture>> future : futureList) { + java.util.List> splitedKgGraphList; + try { + splitedKgGraphList = future.get(this.executorTimeoutMs, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + log.warn("linkedExpandTimeout,funcName=" + pattern.edge().funcName(), e); + continue; + } catch (Exception e) { + throw new RuntimeException("patternScan error " + e.getMessage(), e); + } if (CollectionUtils.isNotEmpty(splitedKgGraphList)) { KgGraph result = new KgGraphImpl(); result.merge(splitedKgGraphList, null); @@ -351,7 +377,9 @@ public class LocalRDG extends RDG { + ",matchCount=" + count + ", linkedTargetVertexSize=" - + targetVertexSize); + + targetVertexSize + + " cost time=" + + (System.currentTimeMillis() - startTime)); this.executionRecorder.stageResultWithDetail( "linkedExpand(" + RunnerUtil.getReadablePattern(pattern) + ")", this.kgGraphList.size(), @@ -903,6 +931,9 @@ public class LocalRDG extends RDG { @Override public LocalRDG dropFields(Set fields) { + if (disableDropOp) { + return this; + } java.util.Set dropFieldSet = new HashSet<>(JavaConversions.asJavaCollection(fields)); if (CollectionUtils.isEmpty(dropFieldSet)) { return this; @@ -1372,4 +1403,8 @@ public class LocalRDG extends RDG { public void setStrictMaxPathLimit(Long strictMaxPathLimit) { this.strictMaxPathLimit = strictMaxPathLimit; } + + public void setDisableDropOp(boolean disableDropOp) { + this.disableDropOp = disableDropOp; + } } diff --git a/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/rdg/common/LinkEdgeImpl.java b/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/rdg/common/LinkEdgeImpl.java index e9f19959..2d083827 100644 --- a/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/rdg/common/LinkEdgeImpl.java +++ b/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/rdg/common/LinkEdgeImpl.java @@ -18,6 +18,7 @@ import com.antgroup.openspg.reasoner.common.graph.edge.IEdge; import com.antgroup.openspg.reasoner.common.graph.edge.impl.Edge; import com.antgroup.openspg.reasoner.common.graph.property.IProperty; import com.antgroup.openspg.reasoner.common.graph.property.impl.EdgeProperty; +import com.antgroup.openspg.reasoner.common.graph.property.impl.VertexProperty; import com.antgroup.openspg.reasoner.common.graph.vertex.IVertex; import com.antgroup.openspg.reasoner.common.graph.vertex.IVertexId; import com.antgroup.openspg.reasoner.common.graph.vertex.impl.Vertex; @@ -104,8 +105,10 @@ public class LinkEdgeImpl implements Serializable { paramList.add(parameter); } + String sourceAlias = linkedEdgePattern.src().alias(); + Set targetTypeSet = JavaConversions.setAsJavaSet(linkedEdgePattern.dst().typeNames()); BaseUdtf tableFunction = udtfMeta.createTableFunction(); - tableFunction.initialize(graphState); + tableFunction.initialize(graphState, context, sourceAlias, targetTypeSet); tableFunction.process(paramList); List> udtfResult = tableFunction.getCollector(); List linkedUdtfResultList = @@ -123,7 +126,6 @@ public class LinkEdgeImpl implements Serializable { if (CollectionUtils.isEmpty(linkedUdtfResultList)) { continue; } - String sourceAlias = linkedEdgePattern.src().alias(); List> sourceList = path.getVertex(sourceAlias); if (null == sourceList || sourceList.size() != 1) { throw new RuntimeException("There is more than one start vertex in kgGraph path"); @@ -137,8 +139,8 @@ public class LinkEdgeImpl implements Serializable { for (LinkedUdtfResult linkedUdtfResult : linkedUdtfResultList) { for (String targetIdStr : linkedUdtfResult.getTargetVertexIdList()) { // add target vertex - String targetAlias = pc.target(); PatternElement targetVertexMeta = linkedEdgePattern.dst(); + String targetAlias = targetVertexMeta.alias(); List targetVertexTypes = new ArrayList<>(JavaConversions.setAsJavaSet(targetVertexMeta.typeNames())); if (targetVertexTypes.size() == 0) { @@ -147,13 +149,17 @@ public class LinkEdgeImpl implements Serializable { } for (String targetVertexType : targetVertexTypes) { IVertexId targetId = new VertexId(targetIdStr, targetVertexType); + Map propertyMap = new HashMap<>(); + VertexProperty vertexProperty = new VertexProperty(propertyMap); + vertexProperty.put(Constants.NODE_ID_KEY, targetIdStr); + vertexProperty.put(Constants.CONTEXT_LABEL, targetVertexType); if (partitioner != null && !partitioner.canPartition(targetId)) { continue; } // need add property with id Set> newVertexSet = newAliasVertexMap.computeIfAbsent(targetAlias, k -> new HashSet<>()); - newVertexSet.add(new Vertex<>(targetId)); + newVertexSet.add(new Vertex<>(targetId, vertexProperty)); Map props = new HashMap<>(linkedUdtfResult.getEdgePropertyMap()); props.put(Constants.EDGE_TO_ID_KEY, targetIdStr); diff --git a/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/runner/ConfigKey.java b/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/runner/ConfigKey.java index 6bef59f8..a70f10e8 100644 --- a/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/runner/ConfigKey.java +++ b/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/runner/ConfigKey.java @@ -162,4 +162,7 @@ public class ConfigKey { /** the devId of akg task */ public static final String DEV_ID = "devId"; + + /** disable drop */ + public static final String REASONER_DISABLE_DROP_OP = "kg.reasoner.disable.drop.op"; } diff --git a/reasoner/runner/runner-common/src/main/scala/com/antgroup/openspg/reasoner/session/KGReasonerSession.scala b/reasoner/runner/runner-common/src/main/scala/com/antgroup/openspg/reasoner/session/KGReasonerSession.scala index 91f35fdc..9ba4d5b1 100644 --- a/reasoner/runner/runner-common/src/main/scala/com/antgroup/openspg/reasoner/session/KGReasonerSession.scala +++ b/reasoner/runner/runner-common/src/main/scala/com/antgroup/openspg/reasoner/session/KGReasonerSession.scala @@ -152,7 +152,24 @@ abstract class KGReasonerSession[T <: RDG[T]: TypeTag]( * @return */ def plan(query: String, params: Map[String, Object]): List[PhysicalOperator[T]] = { - optimizedLogicalPlan = plan2OptimizedLogicalPlan(query, params) + val start = System.currentTimeMillis() + val blocks = plan2UnresolvedLogicalPlan(query, params) + if (ParameterUtils.isEnableSPGPlanPrettyPrint(params)) { + for (block: Block <- blocks) { + logger.info(block.pretty) + } + } + logger.info( + "benchmark main plan plan2UnresolvedLogicalPlan cost = " + + (System.currentTimeMillis() - start)) + planBlock(blocks, params) + } + + /** + * Generate the optimization physical plan from Blocks. + */ + def planBlock(blocks: List[Block], params: Map[String, Object]): List[PhysicalOperator[T]] = { + optimizedLogicalPlan = plan2OptimizedLogicalPlan(blocks, params) planLogicalPlan2PhysicalPlan(optimizedLogicalPlan, params) } @@ -166,20 +183,9 @@ abstract class KGReasonerSession[T <: RDG[T]: TypeTag]( * @return */ def plan2OptimizedLogicalPlan( - query: String, + blocks: List[Block], params: Map[String, Object]): List[LogicalOperator] = { - var start = System.currentTimeMillis() - val blocks = plan2UnresolvedLogicalPlan(query, params) - if (ParameterUtils.isEnableSPGPlanPrettyPrint(params)) { - for (block: Block <- blocks) { - logger.info(block.pretty) - } - } - - logger.info( - "benchmark main plan plan2UnresolvedLogicalPlan cost = " - + (System.currentTimeMillis() - start)) - start = System.currentTimeMillis() + val start = System.currentTimeMillis() val optimizedLogicalPlan = plan2LogicalPlan(blocks, params) logger.info( "benchmark main plan plan2LogicalPlan cost = " diff --git a/reasoner/udf/src/main/java/com/antgroup/openspg/reasoner/udf/impl/UdfMngImpl.java b/reasoner/udf/src/main/java/com/antgroup/openspg/reasoner/udf/impl/UdfMngImpl.java index 37b47d5b..90a75699 100644 --- a/reasoner/udf/src/main/java/com/antgroup/openspg/reasoner/udf/impl/UdfMngImpl.java +++ b/reasoner/udf/src/main/java/com/antgroup/openspg/reasoner/udf/impl/UdfMngImpl.java @@ -40,6 +40,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; import scala.Tuple2; @Slf4j(topic = "userlogger") @@ -56,18 +57,83 @@ public class UdfMngImpl implements UdfMng { if (null == instance) { synchronized (UdfMngImpl.class) { if (null == instance) { - instance = createInstance(); + instance = + createInstance( + Lists.newArrayList(KGDSL_UDF_PACKAGE_PATH), + Lists.newArrayList(KGDSL_UDAF_PACKAGE_PATH), + Lists.newArrayList(KGDSL_UDTF_PACKAGE_PATH), + null, + null, + null); } } } return instance; } - private static UdfMngImpl createInstance() { + /** 多路径 */ + public static UdfMngImpl getInstance( + List udfPackagePaths, + List udafPackagePaths, + List udtfPackagePaths, + List udfMetaList, + List udafMetaList, + List udtfMetaList) { + if (null == instance) { + synchronized (UdfMngImpl.class) { + if (null == instance) { + List _udfPackagePaths = Lists.newArrayList(KGDSL_UDF_PACKAGE_PATH); + if (CollectionUtils.isNotEmpty(udfPackagePaths)) { + _udfPackagePaths.addAll(udfPackagePaths); + } + List _udafPackagePaths = Lists.newArrayList(KGDSL_UDAF_PACKAGE_PATH); + if (CollectionUtils.isNotEmpty(udafPackagePaths)) { + _udafPackagePaths.addAll(udafPackagePaths); + } + List _udtfPackagePaths = Lists.newArrayList(KGDSL_UDTF_PACKAGE_PATH); + if (CollectionUtils.isNotEmpty(udtfPackagePaths)) { + _udtfPackagePaths.addAll(udtfPackagePaths); + } + instance = + createInstance( + _udfPackagePaths, + _udafPackagePaths, + _udtfPackagePaths, + udfMetaList, + udafMetaList, + udtfMetaList); + } + } + } + return instance; + } + + private static UdfMngImpl createInstance( + List udfPackagePaths, + List udafPackagePaths, + List udtfPackagePaths, + List udfMetaList, + List udafMetaList, + List udtfMetaList) { UdfMngImpl udfMng = new UdfMngImpl(); - udfMng.getAllUdf(); - udfMng.getAllUdaf(); - udfMng.getAllUdtf(); + for (String packagePath : udfPackagePaths) { + udfMng.getUdfInPath(packagePath); + } + for (String packagePath : udafPackagePaths) { + udfMng.getUdafInPath(packagePath); + } + for (String packagePath : udtfPackagePaths) { + udfMng.getUdtfInPath(packagePath); + } + if (CollectionUtils.isNotEmpty(udfMetaList)) { + udfMetaList.forEach(udfMng::addUdfMeta); + } + if (CollectionUtils.isNotEmpty(udafMetaList)) { + udafMetaList.forEach(udfMng::addUdafMeta); + } + if (CollectionUtils.isNotEmpty(udtfMetaList)) { + udtfMetaList.forEach(udfMng::addUdtfMeta); + } udfMng.udfCheck(); return udfMng; } @@ -77,8 +143,8 @@ public class UdfMngImpl implements UdfMng { private static final String KGDSL_UDF_PACKAGE_PATH = "com.antgroup.openspg.reasoner.udf.builtin.udf"; - private void getAllUdf() { - FastClasspathScanner classpathScanner = new FastClasspathScanner(KGDSL_UDF_PACKAGE_PATH); + private void getUdfInPath(String packagePath) { + FastClasspathScanner classpathScanner = new FastClasspathScanner(packagePath); classpathScanner.addClassLoader(getClass().getClassLoader()); classpathScanner .matchAllStandardClasses( @@ -130,8 +196,8 @@ public class UdfMngImpl implements UdfMng { private static final String KGDSL_UDAF_PACKAGE_PATH = "com.antgroup.openspg.reasoner.udf.builtin.udaf"; - private void getAllUdaf() { - FastClasspathScanner classpathScanner = new FastClasspathScanner(KGDSL_UDAF_PACKAGE_PATH); + private void getUdafInPath(String packagePath) { + FastClasspathScanner classpathScanner = new FastClasspathScanner(packagePath); classpathScanner.addClassLoader(getClass().getClassLoader()); classpathScanner .matchClassesImplementing( @@ -158,8 +224,8 @@ public class UdfMngImpl implements UdfMng { private static final String KGDSL_UDTF_PACKAGE_PATH = "com.antgroup.openspg.reasoner.udf.builtin.udtf"; - private void getAllUdtf() { - FastClasspathScanner classpathScanner = new FastClasspathScanner(KGDSL_UDTF_PACKAGE_PATH); + private void getUdtfInPath(String packagePath) { + FastClasspathScanner classpathScanner = new FastClasspathScanner(packagePath); classpathScanner.addClassLoader(getClass().getClassLoader()); classpathScanner .matchClassesWithAnnotation( diff --git a/reasoner/warehouse/warehouse-common/src/main/java/com/antgroup/openspg/reasoner/warehouse/common/config/GraphLoaderConfig.java b/reasoner/warehouse/warehouse-common/src/main/java/com/antgroup/openspg/reasoner/warehouse/common/config/GraphLoaderConfig.java index 1bf632b9..48396ddb 100644 --- a/reasoner/warehouse/warehouse-common/src/main/java/com/antgroup/openspg/reasoner/warehouse/common/config/GraphLoaderConfig.java +++ b/reasoner/warehouse/warehouse-common/src/main/java/com/antgroup/openspg/reasoner/warehouse/common/config/GraphLoaderConfig.java @@ -68,6 +68,9 @@ public class GraphLoaderConfig implements Serializable { /** kgstate schema url */ protected String schemaUrl = null; + /** kgstate schema retry times */ + protected int schemaRetryTimes = 3; + /** enable binary property or not */ protected Boolean binary = false; @@ -318,6 +321,14 @@ public class GraphLoaderConfig implements Serializable { this.binary = binary; } + public int getSchemaRetryTimes() { + return schemaRetryTimes; + } + + public void setSchemaRetryTimes(int schemaRetryTimes) { + this.schemaRetryTimes = schemaRetryTimes; + } + /** verify config */ public GraphLoaderConfig verify() { if (CollectionUtils.isEmpty(this.edgeLoaderConfigs)) {