From b0911bd345d33a33c59a43473994fbb1083eb9ab Mon Sep 17 00:00:00 2001 From: FishJoy Date: Tue, 30 Jan 2024 14:08:32 +0800 Subject: [PATCH] feat(reasoner): supports full node id pushdown (#101) Co-authored-by: youdonghai --- .../common/trees/TreeTransformer.scala | 35 +++ .../common/trees/AbstractTreeNodeTest.scala | 19 +- .../reasoner/parser/OpenSPGDslParser.scala | 30 ++- .../parser/OpenSPGDslParserTest.scala | 6 +- .../lube/catalog/PropertyGraphSchema.scala | 13 +- .../reasoner/lube/utils/ExprUtils.scala | 19 +- .../reasoner/lube/utils/RuleUtils.scala | 2 +- .../lube/logical/operators/LinkedExpand.scala | 6 +- .../lube/logical/operators/Project.scala | 25 +- .../logical/optimizer/LogicalOptimizer.scala | 14 +- .../lube/logical/optimizer/Rule.scala | 20 +- .../optimizer/rules/AggregatePushDown.scala | 4 +- .../logical/optimizer/rules/DistinctGet.scala | 4 +- .../optimizer/rules/EdgeToProperty.scala | 4 +- .../optimizer/rules/ExpandIntoPure.scala | 68 +++++- .../logical/optimizer/rules/FilterMerge.scala | 4 +- .../optimizer/rules/FilterPushDown.scala | 11 +- .../logical/optimizer/rules/GroupNode.scala | 7 +- .../optimizer/rules/IdFilterPushDown.scala | 218 ----------------- .../rules/NodeIdToEdgeProperty.scala | 223 ++++++++++++++++++ .../optimizer/rules/PatternJoinPure.scala | 38 +++ .../optimizer/rules/ProjectMerge.scala | 28 +-- .../lube/logical/optimizer/rules/Pure.scala | 4 +- .../optimizer/rules/SolvedModelPure.scala | 13 +- .../logical/planning/LogicalPlanner.scala | 17 +- .../planning/PatternMatchPlanner.scala | 17 +- .../lube/logical/validate/Validator.scala | 4 +- .../validate/semantic/SemanticExplainer.scala | 5 +- .../semantic/rules/NodeIdTransform.scala | 135 +++++++++++ .../lube/logical/LogicalPlannerTests.scala | 7 +- .../optimizer/NodeIdToEdgePropertyTests.scala | 187 +++++++++++++++ .../transitive/TransitiveOptionalTest.java | 2 +- .../openspg/reasoner/utils/RunnerUtil.java | 14 ++ .../openspg/reasoner/util/LoaderUtil.scala | 35 +-- .../session/ReasonerSessionTests.scala | 18 +- 35 files changed, 898 insertions(+), 358 deletions(-) delete mode 100644 reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/IdFilterPushDown.scala create mode 100644 reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/NodeIdToEdgeProperty.scala create mode 100644 reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/PatternJoinPure.scala create mode 100644 reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/validate/semantic/rules/NodeIdTransform.scala create mode 100644 reasoner/lube-logical/src/test/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/NodeIdToEdgePropertyTests.scala diff --git a/reasoner/common/src/main/scala/com/antgroup/openspg/reasoner/common/trees/TreeTransformer.scala b/reasoner/common/src/main/scala/com/antgroup/openspg/reasoner/common/trees/TreeTransformer.scala index 140bed54..34b24a47 100644 --- a/reasoner/common/src/main/scala/com/antgroup/openspg/reasoner/common/trees/TreeTransformer.scala +++ b/reasoner/common/src/main/scala/com/antgroup/openspg/reasoner/common/trees/TreeTransformer.scala @@ -120,6 +120,41 @@ case class TopDown[T <: AbstractTreeNode[T]: ClassTag](rule: PartialFunction[T, } +/** + * Applies the given partial function starting from the root of this tree. + * An additional context is being recursively passed + * from the leftmost child to its siblings and eventually to its parent. + */ +case class TopDownWithContext[T <: AbstractTreeNode[T]: ClassTag, C]( + rule: PartialFunction[(T, C), (T, C)]) + extends TreeRewriterWithContext[T, C] { + + def transform(tree: T, context: C): (T, C) = { + var (afterSelf, updatedContext) = if (rule.isDefinedAt(tree -> context)) { + rule(tree -> context) + } else { + tree -> context + } + + val childrenLength = afterSelf.children.length + val afterChildren = if (childrenLength == 0) { + afterSelf + } else { + val updatedChildren = new Array[T](childrenLength) + var i = 0 + while (i < childrenLength) { + val pair = transform(afterSelf.children(i), updatedContext) + updatedChildren(i) = pair._1 + updatedContext = pair._2 + i += 1 + } + afterSelf.withNewChildren(updatedChildren) + } + afterChildren -> updatedContext + } + +} + /** * Applies the given transformation starting from the leaves of this tree. */ diff --git a/reasoner/common/src/test/scala/com/antgroup/opensog/reasoner/common/trees/AbstractTreeNodeTest.scala b/reasoner/common/src/test/scala/com/antgroup/opensog/reasoner/common/trees/AbstractTreeNodeTest.scala index 26165e67..9ab741a1 100644 --- a/reasoner/common/src/test/scala/com/antgroup/opensog/reasoner/common/trees/AbstractTreeNodeTest.scala +++ b/reasoner/common/src/test/scala/com/antgroup/opensog/reasoner/common/trees/AbstractTreeNodeTest.scala @@ -13,7 +13,7 @@ package com.antgroup.opensog.reasoner.common.trees -import com.antgroup.openspg.reasoner.common.trees.AbstractTreeNode +import com.antgroup.openspg.reasoner.common.trees.{AbstractTreeNode, BottomUpWithContext, TopDownWithContext} import org.scalatest.funspec.AnyFunSpec import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, equal} @@ -28,6 +28,22 @@ class AbstractTreeNodeTest extends AnyFunSpec { | ├─Number(v=4) | └─Number(v=3)""".stripMargin) } + + it("rewrites with context up") { + val calculation = Add(Number(5), Add(Number(4), Number(3))) + val sumOnce: PartialFunction[(CalcExpr, Boolean), (CalcExpr, Boolean)] = { + case (Add(n1: Number, n2: Number), false) => Number(n1.v + n2.v) -> true + } + + val expected = Add(Number(5), Number(7)) -> true + + val up = BottomUpWithContext(sumOnce).transform(calculation, false) + up should equal(expected) + + val down = TopDownWithContext(sumOnce).transform(calculation, false) + down should equal(expected) + } + } case class Number(v: Int) extends CalcExpr { @@ -41,3 +57,4 @@ abstract class CalcExpr extends AbstractTreeNode[CalcExpr] { case class Add(left: CalcExpr, right: CalcExpr) extends CalcExpr { def eval: Int = left.eval + right.eval } + diff --git a/reasoner/kgdsl-parser/src/main/scala/com/antgroup/openspg/reasoner/parser/OpenSPGDslParser.scala b/reasoner/kgdsl-parser/src/main/scala/com/antgroup/openspg/reasoner/parser/OpenSPGDslParser.scala index 77fe77ea..c87a2001 100644 --- a/reasoner/kgdsl-parser/src/main/scala/com/antgroup/openspg/reasoner/parser/OpenSPGDslParser.scala +++ b/reasoner/kgdsl-parser/src/main/scala/com/antgroup/openspg/reasoner/parser/OpenSPGDslParser.scala @@ -25,7 +25,7 @@ import com.antgroup.openspg.reasoner.common.types._ import com.antgroup.openspg.reasoner.lube.block._ import com.antgroup.openspg.reasoner.lube.common.expr._ import com.antgroup.openspg.reasoner.lube.common.graph._ -import com.antgroup.openspg.reasoner.lube.common.pattern.{Element, GraphPath, PatternElement, PredicateElement} +import com.antgroup.openspg.reasoner.lube.common.pattern.{Element, EntityElement, GraphPath, PatternElement, PredicateElement} import com.antgroup.openspg.reasoner.lube.common.rule.{LogicRule, ProjectRule, Rule} import com.antgroup.openspg.reasoner.lube.parser.ParserInterface import com.antgroup.openspg.reasoner.lube.utils.{ExprUtils, RuleUtils} @@ -162,11 +162,31 @@ class OpenSPGDslParser extends ParserInterface { Ref(ddlBlockWithNodes._3.target.alias))))) DDLBlock(Set.apply(ddlBlockOp), List.apply(prjBlk)) case AddPredicate(predicate) => - var attrFields: Map[String, Ref] = Map.empty + val attrFields = new mutable.HashMap[String, Expr]() addPropertiesMap.foreach(x => if (x.name == predicate.alias) { attrFields += (x.field -> Ref(generateIRPropertyTmpVariable(x).name)) }) + attrFields.put( + Constants.EDGE_FROM_ID_KEY, + UnaryOpExpr(GetField(Constants.NODE_ID_KEY), Ref(predicate.source.alias))) + attrFields.put( + Constants.EDGE_FROM_ID_TYPE_KEY, + VString(predicate.source.typeNames.head)) + if (predicate.target.isInstanceOf[EntityElement]) { + attrFields.put( + Constants.EDGE_TO_ID_KEY, + VString(predicate.target.asInstanceOf[EntityElement].id)) + } else { + attrFields.put( + Constants.EDGE_TO_ID_KEY, + UnaryOpExpr(GetField(Constants.NODE_ID_KEY), Ref(predicate.target.alias))) + } + attrFields.put( + Constants.EDGE_TO_ID_TYPE_KEY, + VString(predicate.target.typeNames.head)) + + val depBlk = ruleBlock DDLBlock( @@ -177,7 +197,7 @@ class OpenSPGDslParser extends ParserInterface { predicate.alias, predicate.source, predicate.target, - attrFields, + attrFields.toMap, predicate.direction))), List.apply(depBlk)) case _ => DDLBlock(Set.apply(ddlBlockOp), List.apply(ruleBlock)) @@ -337,8 +357,8 @@ class OpenSPGDslParser extends ParserInterface { graphAggExpr.by.map { case Ref(refName) => refName case UnaryOpExpr(GetField(fieldName), Ref(refName)) => refName + "." + fieldName - case x => throw - new KGDSLGrammarException("OrderAndSliceBlock can not group " + x.pretty) + case x => + throw new KGDSLGrammarException("OrderAndSliceBlock can not group " + x.pretty) }) case AggIfOpExpr(_, _) | AggOpExpr(_, _) => if (realLValue == null) { diff --git a/reasoner/kgdsl-parser/src/test/scala/com/antgroup/openspg/reasoner/parser/OpenSPGDslParserTest.scala b/reasoner/kgdsl-parser/src/test/scala/com/antgroup/openspg/reasoner/parser/OpenSPGDslParserTest.scala index 1dff2121..3a84b0c4 100644 --- a/reasoner/kgdsl-parser/src/test/scala/com/antgroup/openspg/reasoner/parser/OpenSPGDslParserTest.scala +++ b/reasoner/kgdsl-parser/src/test/scala/com/antgroup/openspg/reasoner/parser/OpenSPGDslParserTest.scala @@ -22,6 +22,7 @@ import com.antgroup.openspg.reasoner.lube.common.graph._ import com.antgroup.openspg.reasoner.lube.common.pattern.{EntityElement, LinkedPatternConnection} import com.antgroup.openspg.reasoner.lube.common.rule.ProjectRule import org.scalatest.funspec.AnyFunSpec +import org.scalatest.matchers.must.Matchers.contain import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, equal} class OpenSPGDslParserTest extends AnyFunSpec { @@ -660,8 +661,7 @@ class OpenSPGDslParserTest extends AnyFunSpec { .asInstanceOf[AddPredicate] .predicate .fields - .head - ._1 should equal("same_domain_num") + .keySet should contain ("same_domain_num") blocks(1).asInstanceOf[DDLBlock].ddlOp.head.isInstanceOf[AddProperty] should equal(true) blocks(1) @@ -1015,7 +1015,7 @@ class OpenSPGDslParserTest extends AnyFunSpec { .predicate .label equals ("belongTo") val text = - """└─DDLBlock(ddlOp=Set(AddPredicate(PredicateElement(belongTo,p,(s:User,BinaryOpExpr(name=BEqual)),EntityElement(cardUser,accountQueryCrowd,o),Map(),OUT)))) + """└─DDLBlock(ddlOp=Set(AddPredicate(PredicateElement(belongTo,p,(s:User,BinaryOpExpr(name=BEqual)),EntityElement(cardUser,accountQueryCrowd,o),Map(__to_id_type__->VString(value=accountQueryCrowd/cardUser),__to_id__->VString(value=cardUser),__from_id__->UnaryOpExpr(name=GetField(id)),__from_id_type__->VString(value=User)),OUT)))) * └─FilterBlock(rules=LogicRule(R5,智信确权,BinaryOpExpr(name=BEqual))) * └─FilterBlock(rules=LogicRule(R4,绑定数目,BinaryOpExpr(name=BGreaterThan))) * └─AggregationBlock(aggregations=Aggregations(Map(IRVariable(BindNum) -> AggOpExpr(name=Sum))), group=List(IRNode(s,Set(zhixin)))) diff --git a/reasoner/lube-api/src/main/scala/com/antgroup/openspg/reasoner/lube/catalog/PropertyGraphSchema.scala b/reasoner/lube-api/src/main/scala/com/antgroup/openspg/reasoner/lube/catalog/PropertyGraphSchema.scala index c5f98f8b..cd8ef294 100644 --- a/reasoner/lube-api/src/main/scala/com/antgroup/openspg/reasoner/lube/catalog/PropertyGraphSchema.scala +++ b/reasoner/lube-api/src/main/scala/com/antgroup/openspg/reasoner/lube/catalog/PropertyGraphSchema.scala @@ -15,9 +15,10 @@ package com.antgroup.openspg.reasoner.lube.catalog import scala.collection.mutable +import com.antgroup.openspg.reasoner.common.constants.Constants import com.antgroup.openspg.reasoner.common.exception.SchemaException import com.antgroup.openspg.reasoner.common.graph.edge.{Direction, SPO} -import com.antgroup.openspg.reasoner.common.types.KgType +import com.antgroup.openspg.reasoner.common.types.{KgType, KTString} import com.antgroup.openspg.reasoner.lube.catalog.struct.{Edge, Field, Node, NodeType} import org.json4s._ import org.json4s.ext.EnumNameSerializer @@ -70,6 +71,11 @@ class PropertyGraphSchema(val nodes: mutable.Map[String, Node], val edges: mutab } def getEdgeField(spoStr: Set[String], fieldName: String): Field = { + if (fieldName.equals(Constants.EDGE_FROM_ID_KEY) || fieldName.equals( + Constants.EDGE_TO_ID_KEY) || fieldName.equals( + Constants.EDGE_FROM_ID_TYPE_KEY) || fieldName.equals(Constants.EDGE_TO_ID_TYPE_KEY)) { + return new Field(fieldName, KTString, true) + } for (spo <- spoStr) { val field = getEdgeField(spo, fieldName) if (field != null) { @@ -105,6 +111,11 @@ class PropertyGraphSchema(val nodes: mutable.Map[String, Node], val edges: mutab typeName: String, endNode: String, fieldName: String): Field = { + if (fieldName.equals(Constants.EDGE_FROM_ID_KEY) || fieldName.equals( + Constants.EDGE_TO_ID_KEY) || fieldName.equals( + Constants.EDGE_FROM_ID_TYPE_KEY) || fieldName.equals(Constants.EDGE_TO_ID_TYPE_KEY)) { + return new Field(fieldName, KTString, true) + } val spo = new SPO(startNode, typeName, endNode) val edge = edges.get(spo) if (edge.isEmpty) { diff --git a/reasoner/lube-api/src/main/scala/com/antgroup/openspg/reasoner/lube/utils/ExprUtils.scala b/reasoner/lube-api/src/main/scala/com/antgroup/openspg/reasoner/lube/utils/ExprUtils.scala index 4bf89dff..54bdab80 100644 --- a/reasoner/lube-api/src/main/scala/com/antgroup/openspg/reasoner/lube/utils/ExprUtils.scala +++ b/reasoner/lube-api/src/main/scala/com/antgroup/openspg/reasoner/lube/utils/ExprUtils.scala @@ -15,7 +15,8 @@ package com.antgroup.openspg.reasoner.lube.utils import scala.collection.mutable -import com.antgroup.openspg.reasoner.common.trees.{BottomUp, Transform} +import com.antgroup.openspg.reasoner.common.exception.UnsupportedOperationException +import com.antgroup.openspg.reasoner.common.trees.{BottomUp, TopDown, Transform} import com.antgroup.openspg.reasoner.lube.common.expr.{Expr, GetField, Ref, UnaryOpExpr} import com.antgroup.openspg.reasoner.lube.common.graph._ @@ -104,11 +105,11 @@ object ExprUtils { * @param renameFunc * @return */ - def renameVariableInExpr(expr: Expr, replaceVar: Map[IRField, IRProperty]): Expr = { + def renameVariableInExpr(expr: Expr, replaceVar: Map[IRField, IRField]): Expr = { val trans: PartialFunction[Expr, Expr] = { case expr @ UnaryOpExpr(GetField(name), Ref(alis)) => if (replaceVar.contains(IRProperty(alis, name))) { - val newProp = replaceVar.get(IRProperty(alis, name)).get + val newProp = replaceVar.get(IRProperty(alis, name)).get.asInstanceOf[IRProperty] UnaryOpExpr(GetField(newProp.field), Ref(newProp.name)) } else { expr @@ -116,13 +117,20 @@ object ExprUtils { case expr @ Ref(name) => if (replaceVar.contains(IRVariable(name))) { val newProp = replaceVar.get(IRVariable(name)).get - UnaryOpExpr(GetField(newProp.field), Ref(newProp.name)) + newProp match { + case IRVariable(name) => Ref(name) + case IRProperty(name, field) => UnaryOpExpr(GetField(field), Ref(name)) + case _ => + throw UnsupportedOperationException( + s"rename unsupported expr=${expr}, replaceVar=${replaceVar}") + } + } else { expr } case x => x } - BottomUp(trans).transform(expr) + TopDown(trans).transform(expr) } def renameAliasInExpr(expr: Expr, replaceVar: Map[String, String]): Expr = { @@ -183,4 +191,5 @@ object ExprUtils { } variable } + } diff --git a/reasoner/lube-api/src/main/scala/com/antgroup/openspg/reasoner/lube/utils/RuleUtils.scala b/reasoner/lube-api/src/main/scala/com/antgroup/openspg/reasoner/lube/utils/RuleUtils.scala index 4bb3be84..84b9b0db 100644 --- a/reasoner/lube-api/src/main/scala/com/antgroup/openspg/reasoner/lube/utils/RuleUtils.scala +++ b/reasoner/lube-api/src/main/scala/com/antgroup/openspg/reasoner/lube/utils/RuleUtils.scala @@ -49,7 +49,7 @@ object RuleUtils { * @param renameFunc * @return */ - def renameVariableInRule(rule: Rule, replaceVar: Map[IRField, IRProperty]): Rule = { + def renameVariableInRule(rule: Rule, replaceVar: Map[IRField, IRField]): Rule = { if (null == rule) { return null } diff --git a/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/operators/LinkedExpand.scala b/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/operators/LinkedExpand.scala index 785e680e..3eea3dc1 100644 --- a/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/operators/LinkedExpand.scala +++ b/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/operators/LinkedExpand.scala @@ -14,7 +14,7 @@ package com.antgroup.openspg.reasoner.lube.logical.operators import com.antgroup.openspg.reasoner.lube.common.pattern.{EdgePattern, LinkedPatternConnection} -import com.antgroup.openspg.reasoner.lube.logical.{SolvedModel, Var} +import com.antgroup.openspg.reasoner.lube.logical.{EdgeVar, SolvedModel, Var} final case class LinkedExpand( in: LogicalOperator, @@ -33,12 +33,12 @@ final case class LinkedExpand( * * @return */ - override def refFields: List[Var] = List.empty + override def refFields: List[Var] = List.apply(solved.getVar(edgePattern.edge.alias)) /** * the output fields of current operator * * @return */ - override def fields: List[Var] = in.fields + override def fields: List[Var] = in.fields ++ refFields } diff --git a/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/operators/Project.scala b/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/operators/Project.scala index b492d596..082560d8 100644 --- a/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/operators/Project.scala +++ b/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/operators/Project.scala @@ -15,12 +15,13 @@ package com.antgroup.openspg.reasoner.lube.logical.operators import scala.collection.mutable +import com.antgroup.openspg.reasoner.common.exception.UnsupportedOperationException import com.antgroup.openspg.reasoner.common.types.KTObject import com.antgroup.openspg.reasoner.lube.catalog.struct.Field -import com.antgroup.openspg.reasoner.lube.common._ import com.antgroup.openspg.reasoner.lube.common.expr.{Directly, Expr} -import com.antgroup.openspg.reasoner.lube.common.graph.IRVariable -import com.antgroup.openspg.reasoner.lube.logical.{EdgeVar, ExprUtil, NodeVar, PropertyVar, SolvedModel, Var} +import com.antgroup.openspg.reasoner.lube.common.graph.{IREdge, IRNode} +import com.antgroup.openspg.reasoner.lube.logical._ +import com.antgroup.openspg.reasoner.lube.utils.ExprUtils final case class Project(in: LogicalOperator, expr: Map[Var, Expr], solved: SolvedModel) extends StackingLogicalOperator { @@ -31,17 +32,17 @@ final case class Project(in: LogicalOperator, expr: Map[Var, Expr], solved: Solv pair._2 match { case Directly => fieldsMap.put(pair._1.name, pair._1) case e: Expr => - val refPair = ExprUtil.getReferProperties(e) - for (ref <- refPair) { - val alias = if (ref._1 != null) ref._1 else solved.getField(IRVariable(ref._2)).name - solved.fields.get(alias).get match { - case NodeVar(name, _) => - val node = NodeVar(name, Set.apply(new Field(ref._2, KTObject, true))) + val fields = + ExprUtils.getAllInputFieldInRule(e, solved.getNodeAliasSet, solved.getEdgeAliasSet) + for (ref <- fields) { + ref match { + case IRNode(name, fields) => + val node = NodeVar(name, fields.map(new Field(_, KTObject, true))) fieldsMap.put(name, node.merge(fieldsMap.get(name))) - case EdgeVar(name, _) => - val edge = EdgeVar(name, Set.apply(new Field(ref._2, KTObject, true))) + case IREdge(name, fields) => + val edge = EdgeVar(name, fields.map(new Field(_, KTObject, true))) fieldsMap.put(name, edge.merge(fieldsMap.get(name))) - case _ => + case _ => throw UnsupportedOperationException(s"unsupported $expr") } } } diff --git a/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/LogicalOptimizer.scala b/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/LogicalOptimizer.scala index e31994b6..560d5df1 100644 --- a/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/LogicalOptimizer.scala +++ b/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/LogicalOptimizer.scala @@ -13,6 +13,7 @@ package com.antgroup.openspg.reasoner.lube.logical.optimizer +import com.antgroup.openspg.reasoner.common.trees.{BottomUpWithContext, TopDownWithContext} import com.antgroup.openspg.reasoner.lube.logical.operators.LogicalOperator import com.antgroup.openspg.reasoner.lube.logical.optimizer.rules._ import com.antgroup.openspg.reasoner.lube.logical.planning.LogicalPlannerContext @@ -24,16 +25,17 @@ object LogicalOptimizer { var LOGICAL_OPT_RULES: Seq[Rule] = Seq( + PatternJoinPure, GroupNode, - IdFilterPushDown, + DistinctGet, + NodeIdToEdgeProperty, FilterPushDown, ExpandIntoPure, FilterMerge, - SolvedModelPure, - DistinctGet, AggregatePushDown, Pure, - ProjectMerge) + ProjectMerge, + SolvedModelPure) def optimize(input: LogicalOperator, optRuleList: Seq[Rule])(implicit context: LogicalPlannerContext): LogicalOperator = { @@ -41,9 +43,9 @@ object LogicalOptimizer { for (rule <- optRuleList) { for (i <- 0 until (rule.maxIterations)) { if (rule.direction.equals(Up)) { - root = root.rewrite(rule.rule) + root = BottomUpWithContext(rule.ruleWithContext).transform(root, Map.empty)._1 } else { - root = root.rewriteTopDown(rule.rule) + root = TopDownWithContext(rule.ruleWithContext).transform(root, Map.empty)._1 } } } diff --git a/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/Rule.scala b/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/Rule.scala index bd6cc3b2..832550bb 100644 --- a/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/Rule.scala +++ b/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/Rule.scala @@ -23,10 +23,26 @@ case object Down extends Direction trait Rule { - def rule(implicit - context: LogicalPlannerContext): PartialFunction[LogicalOperator, LogicalOperator] + def ruleWithContext(implicit context: LogicalPlannerContext): PartialFunction[ + (LogicalOperator, Map[String, Object]), + (LogicalOperator, Map[String, Object])] def direction: Direction def maxIterations: Int } + +abstract class SimpleRule extends Rule { + + override def ruleWithContext(implicit context: LogicalPlannerContext): PartialFunction[ + (LogicalOperator, Map[String, Object]), + (LogicalOperator, Map[String, Object])] = { + case (operator, c) if rule.isDefinedAt(operator) => + val transformedOperator = rule(context)(operator) + (transformedOperator, c) + } + + def rule(implicit + context: LogicalPlannerContext): PartialFunction[LogicalOperator, LogicalOperator] + +} diff --git a/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/AggregatePushDown.scala b/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/AggregatePushDown.scala index e1d70574..7520d836 100644 --- a/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/AggregatePushDown.scala +++ b/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/AggregatePushDown.scala @@ -21,14 +21,14 @@ import com.antgroup.openspg.reasoner.lube.common.expr._ import com.antgroup.openspg.reasoner.lube.common.pattern.Pattern import com.antgroup.openspg.reasoner.lube.logical._ import com.antgroup.openspg.reasoner.lube.logical.operators.{Filter, _} -import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, Rule, Up} +import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, SimpleRule, Up} import com.antgroup.openspg.reasoner.lube.logical.planning.LogicalPlannerContext import com.antgroup.openspg.reasoner.lube.utils.{ExprUtils, RuleUtils} /** * Aggregation push down */ -object AggregatePushDown extends Rule { +object AggregatePushDown extends SimpleRule { override def rule(implicit context: LogicalPlannerContext): PartialFunction[LogicalOperator, LogicalOperator] = { diff --git a/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/DistinctGet.scala b/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/DistinctGet.scala index c050c5da..3e928923 100644 --- a/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/DistinctGet.scala +++ b/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/DistinctGet.scala @@ -15,10 +15,10 @@ package com.antgroup.openspg.reasoner.lube.logical.optimizer.rules import com.antgroup.openspg.reasoner.lube.logical.NodeVar import com.antgroup.openspg.reasoner.lube.logical.operators.{Aggregate, LogicalOperator, Select} -import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, Rule, Up} +import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, SimpleRule, Up} import com.antgroup.openspg.reasoner.lube.logical.planning.LogicalPlannerContext -case object DistinctGet extends Rule { +case object DistinctGet extends SimpleRule { override def rule(implicit context: LogicalPlannerContext): PartialFunction[LogicalOperator, LogicalOperator] = { diff --git a/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/EdgeToProperty.scala b/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/EdgeToProperty.scala index 83975086..8cf10f3f 100644 --- a/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/EdgeToProperty.scala +++ b/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/EdgeToProperty.scala @@ -25,7 +25,7 @@ import com.antgroup.openspg.reasoner.lube.common.rule.{Rule => LogicalRule} import com.antgroup.openspg.reasoner.lube.logical.NodeVar import com.antgroup.openspg.reasoner.lube.logical.PatternOps.PatternOps import com.antgroup.openspg.reasoner.lube.logical.operators.{ExpandInto, LogicalOperator, PatternScan} -import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, Down, Rule} +import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, Down, SimpleRule} import com.antgroup.openspg.reasoner.lube.logical.planning.LogicalPlannerContext import com.antgroup.openspg.reasoner.lube.utils.RuleUtils @@ -37,7 +37,7 @@ import com.antgroup.openspg.reasoner.lube.utils.RuleUtils * 3. cur node is advanced, then travel down to find cur node is transferred from a attribute. * All possibilities require a downward traversal, and the downward traversal can cover all cases. */ -object EdgeToProperty extends Rule { +object EdgeToProperty extends SimpleRule { override def rule(implicit context: LogicalPlannerContext): PartialFunction[LogicalOperator, LogicalOperator] = { diff --git a/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/ExpandIntoPure.scala b/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/ExpandIntoPure.scala index 82f2708f..ef05007f 100644 --- a/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/ExpandIntoPure.scala +++ b/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/ExpandIntoPure.scala @@ -13,9 +13,12 @@ package com.antgroup.openspg.reasoner.lube.logical.optimizer.rules +import scala.collection.mutable + import com.antgroup.openspg.reasoner.lube.common.pattern.NodePattern -import com.antgroup.openspg.reasoner.lube.logical.operators.{ExpandInto, LogicalOperator} -import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, Rule, Up} +import com.antgroup.openspg.reasoner.lube.logical.{EdgeVar, NodeVar, PropertyVar, Var} +import com.antgroup.openspg.reasoner.lube.logical.operators._ +import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, Down, Rule} import com.antgroup.openspg.reasoner.lube.logical.planning.LogicalPlannerContext /** @@ -23,23 +26,62 @@ import com.antgroup.openspg.reasoner.lube.logical.planning.LogicalPlannerContext */ object ExpandIntoPure extends Rule { - override def rule(implicit - context: LogicalPlannerContext): PartialFunction[LogicalOperator, LogicalOperator] = { - case expandInto @ ExpandInto(in, target, _) => - val needPure = canPure(expandInto) + def ruleWithContext(implicit context: LogicalPlannerContext): PartialFunction[ + (LogicalOperator, Map[String, Object]), + (LogicalOperator, Map[String, Object])] = { + case (select: Select, map) => + select -> merge(map, select.refFields, select.solved.getNodeAliasSet) + case (ddl: DDL, map) => ddl -> merge(map, ddl.refFields, ddl.solved.getNodeAliasSet) + case (filter: Filter, map) => + filter -> merge(map, filter.refFields, filter.solved.getNodeAliasSet) + case (project: Project, map) => + project -> merge(map, project.refFields, project.solved.getNodeAliasSet) + case (aggregate: Aggregate, map) => + aggregate -> merge(map, aggregate.refFields, aggregate.solved.getNodeAliasSet) + case (order: OrderAndLimit, map) => + order -> merge(map, order.refFields, order.solved.getNodeAliasSet) + case (expandInto @ ExpandInto(in, _, _), map) => + val needPure = canPure(expandInto, map.asInstanceOf[Map[String, Var]]) if (needPure) { - in + in -> map } else { - expandInto + expandInto -> map } + } - private def canPure(expandInto: ExpandInto): Boolean = { + private def merge( + map: Map[String, Object], + fields: List[Var], + nodes: Set[String]): Map[String, Object] = { + val varMap = new mutable.HashMap[String, Var]() + varMap.++=(map.asInstanceOf[Map[String, Var]]) + for (field <- fields) { + if (varMap.contains(field.name)) { + varMap.put(field.name, varMap(field.name).merge(Option.apply(field))) + } else if (field.isInstanceOf[PropertyVar]) { + if (nodes.contains(field.name)) { + varMap.put( + field.name, + NodeVar(field.name, Set.apply(field.asInstanceOf[PropertyVar].field))) + } else { + varMap.put( + field.name, + EdgeVar(field.name, Set.apply(field.asInstanceOf[PropertyVar].field))) + } + } else { + varMap.put(field.name, field) + } + } + varMap.toMap + } + + private def canPure(expandInto: ExpandInto, map: Map[String, Var]): Boolean = { if (!expandInto.pattern.isInstanceOf[NodePattern]) { false } else { - val refFields = expandInto.refFields - if (refFields.head.isEmpty) { + val alias = expandInto.pattern.root.alias + if (!map.contains(alias) || map(alias).isEmpty) { true } else { false @@ -47,7 +89,7 @@ object ExpandIntoPure extends Rule { } } - override def direction: Direction = Up + override def direction: Direction = Down - override def maxIterations: Int = 1 + override def maxIterations: Int = 10 } diff --git a/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/FilterMerge.scala b/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/FilterMerge.scala index 205eae5a..2a5688e1 100644 --- a/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/FilterMerge.scala +++ b/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/FilterMerge.scala @@ -14,10 +14,10 @@ package com.antgroup.openspg.reasoner.lube.logical.optimizer.rules import com.antgroup.openspg.reasoner.lube.logical.operators.{Filter, LogicalOperator} -import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, Down, Rule} +import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, Down, Rule, SimpleRule} import com.antgroup.openspg.reasoner.lube.logical.planning.LogicalPlannerContext -object FilterMerge extends Rule { +object FilterMerge extends SimpleRule { override def rule(implicit context: LogicalPlannerContext): PartialFunction[LogicalOperator, LogicalOperator] = { diff --git a/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/FilterPushDown.scala b/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/FilterPushDown.scala index d2c92a60..8752e5c6 100644 --- a/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/FilterPushDown.scala +++ b/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/FilterPushDown.scala @@ -17,16 +17,11 @@ import scala.collection.mutable import com.antgroup.openspg.reasoner.common.trees.BottomUp import com.antgroup.openspg.reasoner.lube.common.graph.{IREdge, IRNode} -import com.antgroup.openspg.reasoner.lube.common.pattern.{ - EdgePattern, - NodePattern, - PartialGraphPattern, - Pattern -} +import com.antgroup.openspg.reasoner.lube.common.pattern.{EdgePattern, NodePattern, PartialGraphPattern, Pattern} import com.antgroup.openspg.reasoner.lube.logical.{EdgeVar, NodeVar, Var} import com.antgroup.openspg.reasoner.lube.logical.PatternOps.PatternOps import com.antgroup.openspg.reasoner.lube.logical.operators._ -import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, Rule, Up} +import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, Rule, SimpleRule, Up} import com.antgroup.openspg.reasoner.lube.logical.planning.LogicalPlannerContext import com.antgroup.openspg.reasoner.lube.utils.RuleUtils import org.apache.commons.lang3.StringUtils @@ -34,7 +29,7 @@ import org.apache.commons.lang3.StringUtils /** * Predicate push down */ -object FilterPushDown extends Rule { +object FilterPushDown extends SimpleRule { override def rule(implicit context: LogicalPlannerContext): PartialFunction[LogicalOperator, LogicalOperator] = { diff --git a/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/GroupNode.scala b/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/GroupNode.scala index 4fb5186e..6a2de618 100644 --- a/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/GroupNode.scala +++ b/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/GroupNode.scala @@ -15,18 +15,17 @@ package com.antgroup.openspg.reasoner.lube.logical.optimizer.rules import scala.collection.mutable import scala.collection.mutable.ListBuffer -import scala.util.control.Breaks.break import com.antgroup.openspg.reasoner.common.constants.Constants import com.antgroup.openspg.reasoner.lube.logical.{NodeVar, PropertyVar, Var} import com.antgroup.openspg.reasoner.lube.logical.operators._ -import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, Rule, Up} +import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, Rule, SimpleRule, Up} import com.antgroup.openspg.reasoner.lube.logical.planning.LogicalPlannerContext /** * Convert group node property to group node, when properties contains node id. */ -object GroupNode extends Rule { +object GroupNode extends SimpleRule { override def rule(implicit context: LogicalPlannerContext): PartialFunction[LogicalOperator, LogicalOperator] = { @@ -56,6 +55,8 @@ object GroupNode extends Rule { groups.++=(kv._2) } } + } else { + groups.++=(kv._2) } } aggregate.copy(group = groups.toList) diff --git a/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/IdFilterPushDown.scala b/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/IdFilterPushDown.scala deleted file mode 100644 index 70081e2c..00000000 --- a/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/IdFilterPushDown.scala +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Copyright 2023 OpenSPG Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. - */ - -package com.antgroup.openspg.reasoner.lube.logical.optimizer.rules - -import com.antgroup.openspg.reasoner.common.constants.Constants -import com.antgroup.openspg.reasoner.common.trees.TopDown -import com.antgroup.openspg.reasoner.lube.common.expr.{Filter => _, _} -import com.antgroup.openspg.reasoner.lube.common.pattern._ -import com.antgroup.openspg.reasoner.lube.common.rule.LogicRule -import com.antgroup.openspg.reasoner.lube.logical.PatternOps.PatternOps -import com.antgroup.openspg.reasoner.lube.logical.operators._ -import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, Rule, Up} -import com.antgroup.openspg.reasoner.lube.logical.planning.LogicalPlannerContext -import org.apache.commons.lang3.StringUtils - - - -/** - * Predicate push down - */ -object IdFilterPushDown extends Rule { - - override def rule(implicit - context: LogicalPlannerContext): PartialFunction[LogicalOperator, LogicalOperator] = { - case filter: Filter => idFilterPushDown(filter) - } - - private def idFilterPushDown(filter: Filter): LogicalOperator = { - var hasPushDown: Boolean = false - val updatedRule = filter.rule match { - case rule: LogicRule => - rule.getExpr match { - case BinaryOpExpr(opName, - UnaryOpExpr(GetField(Constants.NODE_ID_KEY), Ref(refName)), - r) => - opName match { - case BIn => r match { - case _ => (refName, r, BIn) - } - case BEqual => r match { - case _ => (refName, r, BEqual) - } - case _ => null - } - case _ => null - } - case _ => null - } - if (updatedRule == null) { - return filter - } - - var boundedVarLenExpandEdgeNameSet: Set[String] = Set.empty - def rewriter: PartialFunction[LogicalOperator, LogicalOperator] = { - case boundedVarLenExpand: BoundedVarLenExpand => - boundedVarLenExpandEdgeNameSet += boundedVarLenExpand.edgePattern.edge.alias - boundedVarLenExpandEdgeNameSet += boundedVarLenExpand.edgePattern.dst.alias - boundedVarLenExpandEdgeNameSet += boundedVarLenExpand.edgePattern.src.alias - boundedVarLenExpand - case expandInto: ExpandInto => - val res = updatePatternFilterRule(expandInto.pattern, updatedRule, - null, boundedVarLenExpandEdgeNameSet) - if (res._1) { - hasPushDown = true - expandInto.copy(pattern = res._2) - } else { - expandInto - } - case patternScan: PatternScan => - val res = updatePatternFilterRule(patternScan.pattern, updatedRule, - patternScan.pattern.root.alias, boundedVarLenExpandEdgeNameSet) - if (res._1) { - hasPushDown = true - patternScan.copy(pattern = res._2) - } else { - patternScan - } - } - val newFilter = TopDown[LogicalOperator](rewriter).transform(filter).asInstanceOf[Filter] - if (hasPushDown) { - newFilter.in - } else { - filter - } - } - - private def fillInRule( - filterRule: com.antgroup.openspg.reasoner.lube.common.rule.Rule, - alias: String, - pattern: Pattern, boundedVarLenExpandEdgeNameSet: Set[String]): (Boolean, Pattern) = { - var pushToAlias = "" - if (!boundedVarLenExpandEdgeNameSet.contains(alias)) { - pattern match { - case NodePattern(node) => - if (node.alias.equals(alias)) { - pushToAlias = node.alias - } - case EdgePattern(_, _, edge) => - if (edge.alias.equals(alias)) { - pushToAlias = edge.alias - } - case PartialGraphPattern(_, nodes, edges) => - if (nodes.contains(alias)) { - pushToAlias = alias; - } - val edgeSet = edges.values.flatten - for (e <- edgeSet) { - if (StringUtils.isBlank(pushToAlias) && - e.alias.equals(alias)) { - pushToAlias = e.alias - } - } - case _ => - } - } - - if (StringUtils.isNotBlank(pushToAlias)) { - (true, pattern.fillInRule(filterRule, pushToAlias)) - } else { - (false, pattern) - } - } - - private def updatePatternFilterRule( - pattern: Pattern, - updateExpr: (String, Expr, BinaryOpSet), - startAlias: String, - boundedVarLenExpandEdgeNameSet: Set[String]): (Boolean, Pattern) = { - var updatedPattern = pattern - - val alias = updateExpr._1 - val expr = updateExpr._2 - val opName = updateExpr._3 - var isChange = false - // node rule - if (alias.equals(startAlias)) { - val filterRule = LogicRule( - "generate_id_filter_" + alias, - "", - BinaryOpExpr(opName, UnaryOpExpr(GetField(Constants.NODE_ID_KEY), Ref(alias)), expr)) - val res = fillInRule(filterRule, alias, updatedPattern, boundedVarLenExpandEdgeNameSet) - isChange = res._1 || isChange - updatedPattern = res._2 - } - - // edge filter - val inEdges = getInConnection(alias, updatedPattern) - inEdges.foreach(x => { - val filterInEdgeRule = LogicRule( - "generate_in_edge_id_filter_" + x.alias, - "", - BinaryOpExpr(opName, UnaryOpExpr(GetField(Constants.EDGE_TO_ID_KEY), Ref(x.alias)), expr)) - val res = fillInRule(filterInEdgeRule, x.alias, updatedPattern, - boundedVarLenExpandEdgeNameSet) - - isChange = res._1 || isChange - updatedPattern = res._2 - }) - - val outEdges = getOutConnection(alias, updatedPattern) - outEdges.foreach(x => { - val filterOutEdgeRule = LogicRule( - "generate_out_edge_id_filter_" + x.alias, - "", - BinaryOpExpr(opName, UnaryOpExpr(GetField(Constants.EDGE_FROM_ID_KEY), Ref(x.alias)), expr)) - val res = fillInRule(filterOutEdgeRule, x.alias, - updatedPattern, boundedVarLenExpandEdgeNameSet) - isChange = res._1 || isChange - updatedPattern = res._2 - }) - - (isChange, updatedPattern) - } - - private def getConnection( - alias: String, - pattern: Pattern, - direction: com.antgroup.openspg.reasoner.common.graph.edge.Direction): Set[Connection] = { - pattern.topology - .flatMap(edgeSet => { - edgeSet._2 - .map { case c: PatternConnection => - val compareAlias = - if (c.direction.equals(direction)) c.source else c.target - if (compareAlias.equals(alias)) { - c - } else { - null - } - } - .filter(_ != null) - }) - .toSet - } - - private def getInConnection(alias: String, pattern: Pattern): Set[Connection] = { - getConnection(alias, pattern, com.antgroup.openspg.reasoner.common.graph.edge.Direction.IN) - } - - private def getOutConnection(alias: String, pattern: Pattern): Set[Connection] = { - getConnection(alias, pattern, com.antgroup.openspg.reasoner.common.graph.edge.Direction.OUT) - } - - override def direction: Direction = Up - - override def maxIterations: Int = 1 -} diff --git a/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/NodeIdToEdgeProperty.scala b/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/NodeIdToEdgeProperty.scala new file mode 100644 index 00000000..baf2d603 --- /dev/null +++ b/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/NodeIdToEdgeProperty.scala @@ -0,0 +1,223 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ + +package com.antgroup.openspg.reasoner.lube.logical.optimizer.rules + +import scala.collection.mutable +import scala.collection.mutable.ListBuffer + +import com.antgroup.openspg.reasoner.common.constants.Constants +import com.antgroup.openspg.reasoner.common.exception.UnsupportedOperationException +import com.antgroup.openspg.reasoner.common.graph.edge +import com.antgroup.openspg.reasoner.common.types.KTString +import com.antgroup.openspg.reasoner.lube.block.{AddPredicate, DDLOp} +import com.antgroup.openspg.reasoner.lube.catalog.struct.Field +import com.antgroup.openspg.reasoner.lube.common.expr.Expr +import com.antgroup.openspg.reasoner.lube.common.graph.{IRField, IRNode, IRProperty, IRVariable} +import com.antgroup.openspg.reasoner.lube.common.pattern.{Connection, NodePattern, Pattern, VariablePatternConnection} +import com.antgroup.openspg.reasoner.lube.logical.{NodeVar, PropertyVar, Var} +import com.antgroup.openspg.reasoner.lube.logical.operators._ +import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, Rule, Up} +import com.antgroup.openspg.reasoner.lube.logical.planning.LogicalPlannerContext +import com.antgroup.openspg.reasoner.lube.utils.{ExprUtils, RuleUtils} + +object NodeIdToEdgeProperty extends Rule { + private val NODE_DEFAULT_PROPS = Set.apply(Constants.NODE_ID_KEY, Constants.CONTEXT_LABEL) + + def ruleWithContext(implicit context: LogicalPlannerContext): PartialFunction[ + (LogicalOperator, Map[String, Object]), + (LogicalOperator, Map[String, Object])] = { + case (expandInto: ExpandInto, map) => + if (!canPushDown(expandInto)) { + expandInto -> map + } else { + val toEdge = targetConnection(expandInto) + expandInto -> (map + (expandInto.pattern.root.alias -> toEdge)) + } + case (filter: Filter, map) => + if (map.isEmpty) { + filter -> map + } else { + filterUpdate(filter, map) -> map + } + case (select: Select, map) => + if (map.isEmpty) { + select -> map + } else { + selectUpdate(select, map) -> map + } + case (ddl: DDL, map) => + if (map.isEmpty) { + ddl -> map + } else { + ddlUpdate(ddl, map) -> map + } + case (varLenExpand @ BoundedVarLenExpand(_, expandInto: ExpandInto, edgePattern, _), map) => + if (edgePattern.edge.upper == 1 && canPushDown(expandInto)) { + val toEdge = targetConnection(expandInto) + varLenExpand -> (map + (expandInto.pattern.root.alias -> toEdge)) + } else if (canPushDown(expandInto)) { + varLenExpand -> (map - expandInto.pattern.root.alias) + } else { + varLenExpand -> map + } + } + + private def genField(direction: edge.Direction, fieldName: String): String = { + (direction, fieldName) match { + case (edge.Direction.OUT, Constants.NODE_ID_KEY) => Constants.EDGE_TO_ID_KEY + case (edge.Direction.OUT, Constants.CONTEXT_LABEL) => Constants.EDGE_TO_ID_TYPE_KEY + case (edge.Direction.IN, Constants.NODE_ID_KEY) => Constants.EDGE_FROM_ID_KEY + case (edge.Direction.IN, Constants.CONTEXT_LABEL) => Constants.EDGE_FROM_ID_TYPE_KEY + case (_, _) => + throw UnsupportedOperationException(s"""unsupport (${direction}, ${fieldName})""") + } + } + + private def filterUpdate(filter: Filter, map: Map[String, Object]): Filter = { + val input = RuleUtils.getAllInputFieldInRule( + filter.rule, + filter.solved.getNodeAliasSet, + filter.solved.getEdgeAliasSet) + val replaceVar = new mutable.HashMap[IRField, IRField] + for (irField <- input) { + if (irField.isInstanceOf[IRNode] && map.contains(irField.name)) { + for (propName <- irField.asInstanceOf[IRNode].fields) { + if (NODE_DEFAULT_PROPS.contains(propName)) { + val edgeInfo = map(irField.name).asInstanceOf[Connection] + replaceVar.put( + IRProperty(irField.name, propName), + IRProperty(edgeInfo.alias, genField(edgeInfo.direction, propName))) + replaceVar.put(IRVariable(irField.name), IRVariable(edgeInfo.alias)) + } + } + } + } + if (replaceVar.isEmpty) { + filter + } else { + val newRule = RuleUtils.renameVariableInRule(filter.rule, replaceVar.toMap) + filter.copy(rule = newRule) + } + } + + private def selectUpdate(select: Select, map: Map[String, Object]): Select = { + val newFields = new ListBuffer[Var]() + for (field <- select.fields) { + if (field.isInstanceOf[PropertyVar] && map.contains(field.name)) { + val edgeInfo = map(field.name).asInstanceOf[Connection] + val propName = field.asInstanceOf[PropertyVar].field.name + if (NODE_DEFAULT_PROPS.contains(propName)) { + newFields.append( + PropertyVar( + edgeInfo.alias, + new Field(genField(edgeInfo.direction, propName), KTString, true))) + } else { + newFields.append(field) + } + } else { + newFields.append(field) + } + } + select.copy(fields = newFields.toList) + } + + private def ddlUpdate(ddl: DDL, map: Map[String, Object]): DDL = { + val ddlOps = new mutable.HashSet[DDLOp]() + for (ddlOp <- ddl.ddlOp) { + ddlOp match { + case AddPredicate(predicate) => + val newFields = new mutable.HashMap[String, Expr]() + for (field <- predicate.fields) { + val input = ExprUtils.getAllInputFieldInRule(field._2, null, null) + val replaceVar = new mutable.HashMap[IRField, IRProperty] + for (irField <- input) { + if (irField.isInstanceOf[IRNode] && map.contains(irField.name)) { + for (propName <- irField.asInstanceOf[IRNode].fields) { + if (NODE_DEFAULT_PROPS.contains(propName)) { + val edgeInfo = map(irField.name).asInstanceOf[Connection] + replaceVar.put( + IRProperty(irField.name, propName), + IRProperty(edgeInfo.alias, genField(edgeInfo.direction, propName))) + } + } + } + } + if (replaceVar.isEmpty) { + newFields.put(field._1, field._2) + } else { + newFields.put(field._1, ExprUtils.renameVariableInExpr(field._2, replaceVar.toMap)) + } + } + ddlOps.add(AddPredicate(predicate.copy(fields = newFields.toMap))) + case _ => ddlOps.add(ddlOp) + } + } + ddl.copy(ddlOp = ddlOps.toSet) + } + + private def targetConnection(expandInto: ExpandInto): Connection = { + val alias = expandInto.pattern.root.alias + val edgeAlias = expandInto.transform[Connection] { + case (patternScan: PatternScan, _) => + targetConnection(alias, patternScan.pattern) + case (expandInto: ExpandInto, list) => + if (!list.isEmpty && list.head != null) { + list.head + } else { + targetConnection(alias, expandInto.pattern) + } + case (_, list) => + if (list.isEmpty) { + null + } else { + list.head + } + } + edgeAlias + } + + private def targetConnection(alias: String, pattern: Pattern): Connection = { + val connections = pattern.topology.values.flatten.filter(_.target.equals(alias)) + val fixedEdges = connections.filter(!_.isInstanceOf[VariablePatternConnection]) + val varEdges = connections + .filter(_.isInstanceOf[VariablePatternConnection]) + .map(_.asInstanceOf[VariablePatternConnection]) + .filter(_.upper == 1) + if (fixedEdges.isEmpty && varEdges.isEmpty) { + null + } else if (fixedEdges.isEmpty) { + varEdges.head + } else { + fixedEdges.head + } + } + + private def canPushDown(expandInto: ExpandInto): Boolean = { + if (!expandInto.pattern.isInstanceOf[NodePattern]) { + false + } else { + val fieldNames = expandInto.refFields.head.asInstanceOf[NodeVar].fields.map(_.name) + val normalNames = fieldNames.filter(!NODE_DEFAULT_PROPS.contains(_)) + if (normalNames.isEmpty) { + true + } else { + false + } + } + } + + override def direction: Direction = Up + + override def maxIterations: Int = 1 +} diff --git a/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/PatternJoinPure.scala b/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/PatternJoinPure.scala new file mode 100644 index 00000000..e2659384 --- /dev/null +++ b/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/PatternJoinPure.scala @@ -0,0 +1,38 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ + +package com.antgroup.openspg.reasoner.lube.logical.optimizer.rules + +import com.antgroup.openspg.reasoner.lube.common.pattern.NodePattern +import com.antgroup.openspg.reasoner.lube.logical.operators.{BoundedVarLenExpand, LogicalOperator, PatternJoin, PatternScan} +import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, SimpleRule, Up} +import com.antgroup.openspg.reasoner.lube.logical.planning.LogicalPlannerContext + +object PatternJoinPure extends SimpleRule { + override def rule(implicit + context: LogicalPlannerContext): PartialFunction[LogicalOperator, LogicalOperator] = { + case patternJoin @ PatternJoin( + boundedVarLenExpand: BoundedVarLenExpand, + scan: PatternScan, + _) => + if (scan.pattern.isInstanceOf[NodePattern]) { + boundedVarLenExpand + } else { + patternJoin + } + } + + override def direction: Direction = Up + + override def maxIterations: Int = 1 +} diff --git a/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/ProjectMerge.scala b/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/ProjectMerge.scala index ca27bd73..6615a624 100644 --- a/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/ProjectMerge.scala +++ b/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/ProjectMerge.scala @@ -19,21 +19,21 @@ import com.antgroup.openspg.reasoner.common.exception.SystemError import com.antgroup.openspg.reasoner.lube.common.expr.{Directly, Expr} import com.antgroup.openspg.reasoner.lube.logical._ import com.antgroup.openspg.reasoner.lube.logical.operators.{LogicalOperator, Project} -import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, Rule, Up} +import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, Rule, SimpleRule, Up} import com.antgroup.openspg.reasoner.lube.logical.planning.LogicalPlannerContext /** * Merge adjacent Project operators */ -object ProjectMerge extends Rule { +object ProjectMerge extends SimpleRule { override def rule(implicit context: LogicalPlannerContext): PartialFunction[LogicalOperator, LogicalOperator] = { - case project @ Project(in, _, _) => - if (!in.isInstanceOf[Project] || !canMerge(project)) { + case project @ Project(in: Project, _, _) => + if (!canMerge(project, in)) { project } else { - val parentProjects = in.asInstanceOf[Project].expr + val parentProjects = in.expr val newProjects = new mutable.HashMap[Var, Expr]() for (pair <- parentProjects) { if (!pair._2.isInstanceOf[Directly.type]) { @@ -68,7 +68,7 @@ object ProjectMerge extends Rule { newProjects.put(pair._1, pair._2) } } - Project(in.asInstanceOf[Project].in, newProjects.toMap, in.solved) + Project(in.in, newProjects.toMap, in.solved) } } @@ -88,19 +88,17 @@ object ProjectMerge extends Rule { } } - private def canMerge(project: Project): Boolean = { - val parentOutput = - project.in.asInstanceOf[Project].expr.filter(!_._2.isInstanceOf[Directly.type]).keySet + private def canMerge(project: Project, in: Project): Boolean = { + val parentOutput = in.expr.filter(!_._2.isInstanceOf[Directly.type]).keySet val computeExprs = project.expr.filter(!_._2.isInstanceOf[Directly.type]) for (expr <- computeExprs) { val fields = ExprUtil.getReferProperties(expr._2) for (pair <- fields) { - val outputVar = parentOutput.filter(_.name.equals(pair._1)) - if (!outputVar.isEmpty && outputVar.head - .asInstanceOf[PropertyVar] - .field - .name - .equals(pair._2)) { + val outputVar = parentOutput + .filter(_.name.equals(pair._1)) + .map(_.asInstanceOf[PropertyVar]) + .map(_.field.name) + if (outputVar.contains(pair._2)) { return false } } diff --git a/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/Pure.scala b/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/Pure.scala index f8ddb8c4..ffc07115 100644 --- a/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/Pure.scala +++ b/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/Pure.scala @@ -19,7 +19,7 @@ import com.antgroup.openspg.reasoner.common.exception.InvalidRefVariable import com.antgroup.openspg.reasoner.lube.common.expr.Directly import com.antgroup.openspg.reasoner.lube.logical.{PathVar, RepeatPathVar, SolvedModel, Var} import com.antgroup.openspg.reasoner.lube.logical.operators._ -import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, Down, Rule} +import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, Down, SimpleRule} import com.antgroup.openspg.reasoner.lube.logical.planning.LogicalPlannerContext /** @@ -27,7 +27,7 @@ import com.antgroup.openspg.reasoner.lube.logical.planning.LogicalPlannerContext * if and only if the output of the current Op is not dependent on the downstream node, * add a ProjectOp for attribute clipping */ -object Pure extends Rule { +object Pure extends SimpleRule { override def rule(implicit context: LogicalPlannerContext): PartialFunction[LogicalOperator, LogicalOperator] = { diff --git a/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/SolvedModelPure.scala b/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/SolvedModelPure.scala index 7ec0f6bc..91a9d2f8 100644 --- a/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/SolvedModelPure.scala +++ b/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/rules/SolvedModelPure.scala @@ -18,13 +18,13 @@ import scala.collection.mutable import com.antgroup.openspg.reasoner.common.trees.BottomUp import com.antgroup.openspg.reasoner.lube.logical.{SolvedModel, Var} import com.antgroup.openspg.reasoner.lube.logical.operators._ -import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, Rule, Up} +import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, Rule, SimpleRule, Up} import com.antgroup.openspg.reasoner.lube.logical.planning.LogicalPlannerContext /** * Pure solvedModel after EdgeToProperty */ -object SolvedModelPure extends Rule { +object SolvedModelPure extends SimpleRule { override def rule(implicit context: LogicalPlannerContext): PartialFunction[LogicalOperator, LogicalOperator] = { @@ -39,12 +39,17 @@ object SolvedModelPure extends Rule { private def resolvedModel(input: LogicalOperator): SolvedModel = { val fields = input.transform[List[Var]] { - case (scan @ PatternScan(_, _), _) => scan.refFields - case (expandInto @ ExpandInto(_, _, _), tupleList) => + case (scan: PatternScan, _) => scan.refFields + case (expandInto: ExpandInto, tupleList) => val list = new mutable.ListBuffer[Var]() list.++=(tupleList.flatten) list.++=(expandInto.refFields) list.toList + case (linkedExpand: LinkedExpand, tupleList) => + val list = new mutable.ListBuffer[Var]() + list.++=(tupleList.flatten) + list.++=(linkedExpand.refFields) + list.toList case (_, tupleList) => tupleList.flatten } val fieldMap = fields.map(f => (f.name, f)).toMap diff --git a/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/planning/LogicalPlanner.scala b/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/planning/LogicalPlanner.scala index 154f7100..805ee322 100644 --- a/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/planning/LogicalPlanner.scala +++ b/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/planning/LogicalPlanner.scala @@ -16,11 +16,7 @@ package com.antgroup.openspg.reasoner.lube.logical.planning import scala.collection.mutable import com.antgroup.openspg.reasoner.common.constants.Constants -import com.antgroup.openspg.reasoner.common.exception.{ - NotImplementedException, - SchemaException, - UnsupportedOperationException -} +import com.antgroup.openspg.reasoner.common.exception.{NotImplementedException, SchemaException, UnsupportedOperationException} import com.antgroup.openspg.reasoner.common.graph.edge.SPO import com.antgroup.openspg.reasoner.lube.block._ import com.antgroup.openspg.reasoner.lube.catalog.{Catalog, SemanticPropertyGraph} @@ -172,15 +168,20 @@ object LogicalPlanner { field match { case node: IRNode => if (types(node.name).isEmpty && node.fields.nonEmpty) { - throw SchemaException(s"Cannot find $node.name in $types") + throw SchemaException(s"Cannot find ${node.name} in $types") } val fields = node.fields.map(graph.graphSchema.getNodeField(types(node.name), _)) NodeVar(node.name, fields) case edge: IREdge => if (types(edge.name).isEmpty && edge.fields.nonEmpty) { - throw SchemaException(s"Cannot find $edge.name in $types") + throw SchemaException(s"Cannot find ${edge.name} in $types") } - val fields = edge.fields.map(graph.graphSchema.getEdgeField(types(edge.name), _)) + val fullFields = edge.fields ++ Set.apply( + Constants.EDGE_FROM_ID_KEY, + Constants.EDGE_FROM_ID_TYPE_KEY, + Constants.EDGE_TO_ID_KEY, + Constants.EDGE_TO_ID_TYPE_KEY) + val fields = fullFields.map(graph.graphSchema.getEdgeField(types(edge.name), _)) EdgeVar(edge.name, fields) case array: IRRepeatPath => RepeatPathVar( diff --git a/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/planning/PatternMatchPlanner.scala b/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/planning/PatternMatchPlanner.scala index 9d7ae49f..dfd7e8c8 100644 --- a/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/planning/PatternMatchPlanner.scala +++ b/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/planning/PatternMatchPlanner.scala @@ -17,13 +17,12 @@ import scala.collection.mutable import scala.collection.mutable.ListBuffer import com.antgroup.openspg.reasoner.common.constants.Constants -import com.antgroup.openspg.reasoner.common.exception.UnsupportedOperationException +import com.antgroup.openspg.reasoner.common.exception.{SystemError, UnsupportedOperationException} import com.antgroup.openspg.reasoner.lube.catalog.SemanticPropertyGraph import com.antgroup.openspg.reasoner.lube.common.pattern._ import com.antgroup.openspg.reasoner.lube.common.pattern.ElementOps.toPattenElement import com.antgroup.openspg.reasoner.lube.logical.SolvedModel import com.antgroup.openspg.reasoner.lube.logical.operators._ - /** * QueryPath splitting * @param pattern GraphPath, some times are called QueryGraph @@ -60,7 +59,7 @@ class PatternMatchPlanner(val pattern: GraphPattern)(implicit context: LogicalPl val dst = getSrcAndDst(connection, chosenNodes)._2 chosenEdges.add(connection) val driving = Driving(dependency.graph, src, dependency.solved) - val scan = PatternScan(driving, buildEdgePattern(connection)) + val scan = PatternScan(driving, buildEdgePattern(src, connection)) val rhsPlanner = new PatternMatchPlanner(parts._2.copy(rootAlias = dst)) val oneRhsOperator = rhsPlanner.plan(scan, chosenNodes.clone(), chosenEdges.clone()) if (oneRhsOperator != null && rhsOperator != null) { @@ -90,7 +89,7 @@ class PatternMatchPlanner(val pattern: GraphPattern)(implicit context: LogicalPl lhsOperator = connection match { case conn: VariablePatternConnection => val edgePattern = - buildEdgePattern(conn).asInstanceOf[EdgePattern[VariablePatternConnection]] + buildEdgePattern(src, conn).asInstanceOf[EdgePattern[VariablePatternConnection]] val repeatOperator = buildBoundVarLenExpand(src, edgePattern, lhsOperator) if (rhsOperator == null) { repeatOperator @@ -109,8 +108,14 @@ class PatternMatchPlanner(val pattern: GraphPattern)(implicit context: LogicalPl } } - private def buildEdgePattern(conn: Connection) = { - EdgePattern(pattern.getNode(conn.source), pattern.getNode(conn.target), conn) + private def buildEdgePattern(root: String, conn: Connection) = { + if (root.equals(conn.source)) { + EdgePattern(pattern.getNode(conn.source), pattern.getNode(conn.target), conn) + } else if (root.equals(conn.target)) { + EdgePattern(pattern.getNode(conn.target), pattern.getNode(conn.source), conn.reverse) + } else { + throw SystemError(s"PatternMatchPlanner error, root=${root}, conn=${conn}") + } } private def getSrcAndDst( diff --git a/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/validate/Validator.scala b/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/validate/Validator.scala index 7df878bc..2c97ce41 100644 --- a/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/validate/Validator.scala +++ b/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/validate/Validator.scala @@ -127,7 +127,7 @@ object Validator extends Logging { field match { case node: IRNode => if (!types.contains(node.name) || (types(node.name).isEmpty && node.fields.nonEmpty)) { - throw SchemaException(s"Cannot find $node.name in $types") + throw SchemaException(s"Cannot find ${node.name} in $types") } varMap.put( node.name, @@ -136,7 +136,7 @@ object Validator extends Logging { node.fields.map(graph.graphSchema.getNodeField(types(node.name), _)).toSet)) case edge: IREdge => if (types(edge.name).isEmpty && edge.fields.nonEmpty) { - throw SchemaException(s"Cannot find $edge.name in $types") + throw SchemaException(s"Cannot find ${edge.name} in $types") } varMap.put( edge.name, diff --git a/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/validate/semantic/SemanticExplainer.scala b/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/validate/semantic/SemanticExplainer.scala index d8463f2b..0f32a569 100644 --- a/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/validate/semantic/SemanticExplainer.scala +++ b/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/validate/semantic/SemanticExplainer.scala @@ -15,10 +15,7 @@ package com.antgroup.openspg.reasoner.lube.logical.validate.semantic import com.antgroup.openspg.reasoner.lube.block.Block import com.antgroup.openspg.reasoner.lube.logical.planning.LogicalPlannerContext -import com.antgroup.openspg.reasoner.lube.logical.validate.semantic.rules.{ - ConceptExplain, - SpatioTemporalExplain -} +import com.antgroup.openspg.reasoner.lube.logical.validate.semantic.rules.{ConceptExplain, NodeIdTransform, SpatioTemporalExplain} object SemanticExplainer { diff --git a/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/validate/semantic/rules/NodeIdTransform.scala b/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/validate/semantic/rules/NodeIdTransform.scala new file mode 100644 index 00000000..bae9eb25 --- /dev/null +++ b/reasoner/lube-logical/src/main/scala/com/antgroup/openspg/reasoner/lube/logical/validate/semantic/rules/NodeIdTransform.scala @@ -0,0 +1,135 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ + +package com.antgroup.openspg.reasoner.lube.logical.validate.semantic.rules + +import scala.collection.mutable +import scala.collection.mutable.ListBuffer + +import com.antgroup.openspg.reasoner.common.constants.Constants +import com.antgroup.openspg.reasoner.lube.block.{Block, MatchBlock, OrderedFields, TableResultBlock} +import com.antgroup.openspg.reasoner.lube.catalog.{Catalog, SemanticPropertyGraph} +import com.antgroup.openspg.reasoner.lube.common.graph.{IRField, IRProperty} +import com.antgroup.openspg.reasoner.lube.common.pattern.{GraphPattern, VariablePatternConnection} +import com.antgroup.openspg.reasoner.lube.logical.planning.LogicalPlannerContext +import com.antgroup.openspg.reasoner.lube.logical.validate.semantic.Explain + +object NodeIdTransform extends Explain { + + /** + * Rewrite [[Block]] tree TopDown + * + * @param context + * @return + */ + override def explain(implicit context: LogicalPlannerContext): PartialFunction[Block, Block] = { + case tableResult: TableResultBlock => + tableResultTransform(tableResult, context.catalog.getGraph(Catalog.defaultGraphName)) + } + + private def tableResultTransform( + tableResultBlock: TableResultBlock, + graph: SemanticPropertyGraph): Block = { + val matchBlock = tableResultBlock.transform[MatchBlock] { + case (matchBlock: MatchBlock, _) => matchBlock + case (_, list) => + if (list == null || list.isEmpty) { + null + } else { + list.head + } + + } + val pattern = matchBlock.patterns.values.head.graphPattern + val props = tableResultBlock.selectList.orderedFields.groupBy(_.name) + val selects = new ListBuffer[IRField] + var needModResolve = false + val removed = new mutable.HashSet[String]() + for (select <- tableResultBlock.selectList.orderedFields) { + if (select.isInstanceOf[IRProperty] && props(select.name).size == 1 && select + .asInstanceOf[IRProperty] + .field + .equals(Constants.NODE_ID_KEY)) { + selects.append(transform(select.name, pattern, graph)) + removed.add(select.name) + needModResolve = true + } else { + selects.append(select) + } + } + + val newBlock = tableResultBlock.copy(selectList = OrderedFields(selects.toList)) + if (needModResolve) { + newBlock.rewrite { case matchBlock @ MatchBlock(dependencies, patterns) => + val props = new mutable.HashMap[String, Set[String]]() + for (prop <- pattern.properties) { + if (removed.contains(prop._1)) { + props.put(prop._1, Set.empty) + } else { + props.put(prop._1, prop._2) + } + } + val selectMap = selects.filter(_.isInstanceOf[IRProperty]).groupBy(_.name) + for (select <- selectMap) { + val propNames = + props(select._1) ++ (select._2.map(_.asInstanceOf[IRProperty].field).toSet) + props.put(select._1, propNames) + } + val newPattern = pattern.copy(properties = props.toMap) + val path = patterns.head._2.copy(graphPattern = newPattern) + MatchBlock(dependencies, Map.apply((patterns.head._1 -> path))) + } + } else { + newBlock + } + + } + + private def transform( + alias: String, + pattern: GraphPattern, + graph: SemanticPropertyGraph): IRProperty = { + var ir: IRProperty = null + if (pattern.edges.contains(alias)) { + val fixedEdges = pattern.edges(alias).filter(!_.isInstanceOf[VariablePatternConnection]) + val varEdges = pattern + .edges(alias) + .filter(_.isInstanceOf[VariablePatternConnection]) + .map(_.asInstanceOf[VariablePatternConnection]) + .filter(_.upper == 1) + if (!fixedEdges.isEmpty) { + ir = IRProperty(fixedEdges.head.alias, Constants.EDGE_FROM_ID_KEY) + } else if (!varEdges.isEmpty) { + ir = IRProperty(varEdges.head.alias, Constants.EDGE_FROM_ID_KEY) + } + } else { + val inEdges = pattern.edges.values.flatten.filter(_.target.equals(alias)) + val fixedEdges = inEdges.filter(!_.isInstanceOf[VariablePatternConnection]) + val varEdges = inEdges + .filter(_.isInstanceOf[VariablePatternConnection]) + .map(_.asInstanceOf[VariablePatternConnection]) + .filter(_.upper == 1) + if (!fixedEdges.isEmpty) { + ir = IRProperty(fixedEdges.head.alias, Constants.EDGE_TO_ID_KEY) + } else if (!varEdges.isEmpty) { + ir = IRProperty(varEdges.head.alias, Constants.EDGE_TO_ID_KEY) + } + } + if (ir == null) { + IRProperty(alias, Constants.NODE_ID_KEY) + } else { + ir + } + } + +} diff --git a/reasoner/lube-logical/src/test/scala/com/antgroup/openspg/reasoner/lube/logical/LogicalPlannerTests.scala b/reasoner/lube-logical/src/test/scala/com/antgroup/openspg/reasoner/lube/logical/LogicalPlannerTests.scala index 03de5ac4..de18e507 100644 --- a/reasoner/lube-logical/src/test/scala/com/antgroup/openspg/reasoner/lube/logical/LogicalPlannerTests.scala +++ b/reasoner/lube-logical/src/test/scala/com/antgroup/openspg/reasoner/lube/logical/LogicalPlannerTests.scala @@ -13,9 +13,10 @@ package com.antgroup.openspg.reasoner.lube.logical +import com.antgroup.openspg.reasoner.common.constants.Constants import com.antgroup.openspg.reasoner.lube.catalog.impl.PropertyGraphCatalog import com.antgroup.openspg.reasoner.lube.logical.LogicalOperatorOps.RichLogicalOperator -import com.antgroup.openspg.reasoner.lube.logical.operators.{Aggregate, BoundedVarLenExpand, PatternScan, Project, Start} +import com.antgroup.openspg.reasoner.lube.logical.operators._ import com.antgroup.openspg.reasoner.lube.logical.optimizer.LogicalOptimizer import com.antgroup.openspg.reasoner.lube.logical.planning.{LogicalPlanner, LogicalPlannerContext} import com.antgroup.openspg.reasoner.lube.logical.validate.Validator @@ -268,7 +269,9 @@ class LogicalPlannerTests extends AnyFunSpec { val schema: Map[String, Set[String]] = Map.apply( "TuringCrowd" -> Set.empty, "User" -> Set.apply("id"), - "User_belongTo_TuringCrowd" -> Set.empty) + "User_belongTo_TuringCrowd" -> Set.apply( + Constants.EDGE_FROM_ID_KEY, + Constants.EDGE_TO_ID_KEY)) val catalog = new PropertyGraphCatalog(schema) catalog.init() implicit val context: LogicalPlannerContext = diff --git a/reasoner/lube-logical/src/test/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/NodeIdToEdgePropertyTests.scala b/reasoner/lube-logical/src/test/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/NodeIdToEdgePropertyTests.scala new file mode 100644 index 00000000..97afd9c6 --- /dev/null +++ b/reasoner/lube-logical/src/test/scala/com/antgroup/openspg/reasoner/lube/logical/optimizer/NodeIdToEdgePropertyTests.scala @@ -0,0 +1,187 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ + +package com.antgroup.openspg.reasoner.lube.logical.optimizer + +import com.antgroup.openspg.reasoner.common.constants.Constants +import com.antgroup.openspg.reasoner.lube.catalog.impl.PropertyGraphCatalog +import com.antgroup.openspg.reasoner.lube.logical.LogicalOperatorOps.RichLogicalOperator +import com.antgroup.openspg.reasoner.lube.logical.PropertyVar +import com.antgroup.openspg.reasoner.lube.logical.operators._ +import com.antgroup.openspg.reasoner.lube.logical.optimizer.rules._ +import com.antgroup.openspg.reasoner.lube.logical.planning.{LogicalPlanner, LogicalPlannerContext} +import com.antgroup.openspg.reasoner.lube.logical.validate.Validator +import com.antgroup.openspg.reasoner.lube.utils.transformer.impl.Expr2QlexpressTransformer +import com.antgroup.openspg.reasoner.parser.OpenSPGDslParser +import org.scalatest.funspec.AnyFunSpec +import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, equal} + +class NodeIdToEdgePropertyTests extends AnyFunSpec { + it("test select nodeId should not to edge property") { + val dsl = + """ + |GraphStructure { + | (A:User)-[e1:lk]->(B:User)-[e2:lk]->(C:User) + |} + |Rule { + | R1(""): e1.weight < e2.weight + | R2(""): C.height > 170 + |} + |Action { + | get(A.id, B.name, C.name) + |} + |""".stripMargin + val parser = new OpenSPGDslParser() + val block = parser.parse(dsl) + val schema: Map[String, Set[String]] = + Map.apply( + "User" -> Set.apply("id", "name", "age", "height", "weight"), + "User_lk_User" -> Set.apply("weight")) + val catalog = new PropertyGraphCatalog(schema) + catalog.init() + implicit val context: LogicalPlannerContext = + LogicalPlannerContext( + catalog, + parser, + Map + .apply( + Constants.SPG_REASONER_MULTI_VERSION_ENABLE -> true, + Constants.START_ALIAS -> "A") + .asInstanceOf[Map[String, Object]]) + val dag = Validator.validate(List.apply(block)) + val logicalPlan = LogicalPlanner.plan(dag).popRoot() + val rule = Seq(NodeIdToEdgeProperty, FilterPushDown, ExpandIntoPure, SolvedModelPure) + val optimizedLogicalPlan = LogicalOptimizer.optimize(logicalPlan, rule) + optimizedLogicalPlan.findExactlyOne { case select: Select => + select.fields.map(f => s"${f.name}.${f.asInstanceOf[PropertyVar].field.name}") should equal( + List.apply("A.id", "B.name", "C.name")) + } + val cnt = logicalPlan.transform[Int] { + case (scan: PatternScan, cnt) => cnt.sum + 1 + case (scan: ExpandInto, cnt) => cnt.sum + 1 + case (_, cnt) => + if (cnt.isEmpty) { + 0 + } else { + cnt.sum + } + } + cnt should equal(3) + } + + it("test nodeId to edge property for select") { + val dsl = + """ + |GraphStructure { + | (s:Pkg)-[p:target]->(o:User) + |} + |Rule { + | + |} + |Action { + | get(s.id,o.id) + |} + |""".stripMargin + val parser = new OpenSPGDslParser() + val block = parser.parse(dsl) + val schema: Map[String, Set[String]] = + Map.apply( + "Pkg" -> Set.apply("id"), + "User" -> Set.apply("id"), + "Pkg_target_User" -> Set.empty) + val catalog = new PropertyGraphCatalog(schema) + catalog.init() + implicit val context: LogicalPlannerContext = + LogicalPlannerContext( + catalog, + parser, + Map + .apply(Constants.SPG_REASONER_MULTI_VERSION_ENABLE -> true, Constants.START_ALIAS -> "s") + .asInstanceOf[Map[String, Object]]) + val dag = Validator.validate(List.apply(block)) + val logicalPlan = LogicalPlanner.plan(dag).popRoot() + val rule = Seq(NodeIdToEdgeProperty, FilterPushDown, ExpandIntoPure, SolvedModelPure) + val optimizedLogicalPlan = LogicalOptimizer.optimize(logicalPlan, rule) + optimizedLogicalPlan.findExactlyOne { case select: Select => + select.fields.map(f => s"${f.name}.${f.asInstanceOf[PropertyVar].field.name}") should equal( + List.apply("s.id", s"p.${Constants.EDGE_TO_ID_KEY}")) + } + val cnt = optimizedLogicalPlan.transform[Int] { + case (scan: PatternScan, cnt) => cnt.sum + 1 + case (scan: ExpandInto, cnt) => cnt.sum + 1 + case (_, cnt) => + if (cnt.isEmpty) { + 0 + } else { + cnt.sum + } + } + cnt should equal(1) + } + + it("test nodeId to edge property for select with filter") { + val dsl = + """ + |GraphStructure { + | (A:User)-[e1:lk]->(B:User)-[e2:lk]->(C:User) + |} + |Rule { + | R1(""): e1.weight < e2.weight + | R2(""): C.id in ['123456789'] + |} + |Action { + | get(A.id, B.id, C.id) + |} + |""".stripMargin + val parser = new OpenSPGDslParser() + val block = parser.parse(dsl) + val schema: Map[String, Set[String]] = + Map.apply( + "User" -> Set.apply("id", "name", "age", "height", "weight"), + "User_lk_User" -> Set.apply("weight")) + val catalog = new PropertyGraphCatalog(schema) + catalog.init() + implicit val context: LogicalPlannerContext = + LogicalPlannerContext( + catalog, + parser, + Map + .apply(Constants.SPG_REASONER_MULTI_VERSION_ENABLE -> true) + .asInstanceOf[Map[String, Object]]) + val dag = Validator.validate(List.apply(block)) + val logicalPlan = LogicalPlanner.plan(dag).popRoot() + val rule = Seq(NodeIdToEdgeProperty, FilterPushDown, ExpandIntoPure, SolvedModelPure) + val optimizedLogicalPlan = LogicalOptimizer.optimize(logicalPlan, rule) + optimizedLogicalPlan.findExactlyOne { case select: Select => + select.fields.map(f => s"${f.name}.${f.asInstanceOf[PropertyVar].field.name}") should equal( + List.apply(s"e1.${Constants.EDGE_FROM_ID_KEY}", "B.id", s"e2.${Constants.EDGE_TO_ID_KEY}")) + } + val cnt = optimizedLogicalPlan.transform[Int] { + case (scan: PatternScan, cnt) => + val rule = scan.pattern.topology.values.flatten.filter(_.rule != null).head.rule + val qlTransformer = new Expr2QlexpressTransformer() + qlTransformer.transform(rule).head should equal( + s"""e2.${Constants.EDGE_TO_ID_KEY} in ["123456789"]""") + cnt.sum + 1 + case (scan: ExpandInto, cnt) => cnt.sum + 1 + case (_, cnt) => + if (cnt.isEmpty) { + 0 + } else { + cnt.sum + } + } + cnt should equal(1) + } + +} diff --git a/reasoner/runner/local-runner/src/test/java/com/antgroup/openspg/reasoner/runner/local/main/transitive/TransitiveOptionalTest.java b/reasoner/runner/local-runner/src/test/java/com/antgroup/openspg/reasoner/runner/local/main/transitive/TransitiveOptionalTest.java index 6857e572..7d43b3cf 100644 --- a/reasoner/runner/local-runner/src/test/java/com/antgroup/openspg/reasoner/runner/local/main/transitive/TransitiveOptionalTest.java +++ b/reasoner/runner/local-runner/src/test/java/com/antgroup/openspg/reasoner/runner/local/main/transitive/TransitiveOptionalTest.java @@ -965,7 +965,7 @@ public class TransitiveOptionalTest { + " A [TestFinParty.RelatedParty, __start__='true']\n" + " B, C [TestFinParty.RelatedParty]\n" + "// 1.17的B 必须存在\n" - + " C->B [votingRatio] repeat(0,20) as F1\n" + + " C->B [votingRatio] repeat(0,2) as F1\n" + "// 1.19的C D 可以不存在\n" + " B->A [votingRatio, __optional__='true'] as F2\n" + "}\n" diff --git a/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/utils/RunnerUtil.java b/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/utils/RunnerUtil.java index 31aeb9d9..ea3f716f 100644 --- a/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/utils/RunnerUtil.java +++ b/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/utils/RunnerUtil.java @@ -378,6 +378,20 @@ public class RunnerUtil { } vertexList.add(targetVertex); edgeProperty.put("nodes", vertexList); + if (CollectionUtils.isNotEmpty(edge.getEdgeList())) { + Object fromIdObj = edge.getEdgeList().get(0).getValue().get(Constants.EDGE_FROM_ID_KEY); + if (null != fromIdObj) { + edgeProperty.put(Constants.EDGE_FROM_ID_KEY, fromIdObj); + } + Object toIdObj = + edge.getEdgeList() + .get(edge.getEdgeList().size() - 1) + .getValue() + .get(Constants.EDGE_TO_ID_KEY); + if (null != toIdObj) { + edgeProperty.put(Constants.EDGE_TO_ID_KEY, toIdObj); + } + } return edgeProperty; } diff --git a/reasoner/runner/runner-common/src/main/scala/com/antgroup/openspg/reasoner/util/LoaderUtil.scala b/reasoner/runner/runner-common/src/main/scala/com/antgroup/openspg/reasoner/util/LoaderUtil.scala index 6468a965..638a8e12 100644 --- a/reasoner/runner/runner-common/src/main/scala/com/antgroup/openspg/reasoner/util/LoaderUtil.scala +++ b/reasoner/runner/runner-common/src/main/scala/com/antgroup/openspg/reasoner/util/LoaderUtil.scala @@ -351,23 +351,26 @@ object LoaderUtil { } case edgeVar: EdgeVar => for (typeName <- solvedModel.getTypes(field.name)) { - val edge = logicalPlan.graph.getEdge(typeName) - if (edge.resolved) { - val edgeTypeName = edge.startNode + "_" + edge.typeName + "_" + edge.endNode - val edgeLoaderConfig = new EdgeLoaderConfig() - edgeLoaderConfig.setEdgeType(edgeTypeName) - edgeLoaderConfig.setNeedProperties( - edgeVar.fields.filter(_.resolved).map(_.name).asJava) - edgeLoaderConfig.setConnection( - new util.HashSet(catalog.getConnection(edgeTypeName).asJava)) - if (edgeLoaderConfigMap.contains(edgeTypeName)) { - val oldEdgeLoaderConfig = edgeLoaderConfigMap(edgeTypeName) - edgeLoaderConfig.merge(oldEdgeLoaderConfig) + val graph = logicalPlan.graph + if (graph.containsEdge(typeName)) { + val edge = graph.getEdge(typeName) + if (edge.resolved) { + val edgeTypeName = edge.startNode + "_" + edge.typeName + "_" + edge.endNode + val edgeLoaderConfig = new EdgeLoaderConfig() + edgeLoaderConfig.setEdgeType(edgeTypeName) + edgeLoaderConfig.setNeedProperties( + edgeVar.fields.filter(_.resolved).map(_.name).asJava) + edgeLoaderConfig.setConnection( + new util.HashSet(catalog.getConnection(edgeTypeName).asJava)) + if (edgeLoaderConfigMap.contains(edgeTypeName)) { + val oldEdgeLoaderConfig = edgeLoaderConfigMap(edgeTypeName) + edgeLoaderConfig.merge(oldEdgeLoaderConfig) + } + edgeLoaderConfig.setLoadDirection( + edgeLoadDirectionMap.getOrElse(edgeTypeName, Direction.BOTH)) + edgeLoaderConfig.addEndVertexAliasSet(edgeEndVertexAliasSet.get(edge.typeName)) + edgeLoaderConfigMap.put(edgeTypeName, edgeLoaderConfig) } - edgeLoaderConfig.setLoadDirection( - edgeLoadDirectionMap.getOrElse(edgeTypeName, Direction.BOTH)) - edgeLoaderConfig.addEndVertexAliasSet(edgeEndVertexAliasSet.get(edge.typeName)) - edgeLoaderConfigMap.put(edgeTypeName, edgeLoaderConfig) } } case _ => diff --git a/reasoner/runner/runner-common/src/test/scala/com/antgroup/reasoner/session/ReasonerSessionTests.scala b/reasoner/runner/runner-common/src/test/scala/com/antgroup/reasoner/session/ReasonerSessionTests.scala index f27d6804..9a03d414 100644 --- a/reasoner/runner/runner-common/src/test/scala/com/antgroup/reasoner/session/ReasonerSessionTests.scala +++ b/reasoner/runner/runner-common/src/test/scala/com/antgroup/reasoner/session/ReasonerSessionTests.scala @@ -200,7 +200,7 @@ class ReasonerSessionTests extends AnyFunSpec { val physicalOpOrder = getPhysicalPlanOrder(subqueryPhysicalOp) println(physicalOpOrder) physicalOpOrder should equal( - "Start,Cache,DrivingRDG,PatternScan,LinkedExpand,ExpandInto,ExpandInto,Drop,Filter,Drop,DDL,Join,PatternScan,Cache,DrivingRDG,PatternScan,LinkedExpand,ExpandInto,ExpandInto,Drop,Filter,Drop,DDL,Join,ExpandInto,ExpandInto,Drop,Filter,Drop,Select") + "Start,Cache,DrivingRDG,PatternScan,LinkedExpand,ExpandInto,ExpandInto,Drop,Filter,Drop,DDL,Join,PatternScan,Cache,DrivingRDG,PatternScan,LinkedExpand,ExpandInto,ExpandInto,Drop,Filter,Drop,DDL,Join,ExpandInto,Drop,Filter,Drop,Select") } private def getPhysicalPlanOrder[T <: RDG[T]](physicalOperator: PhysicalOperator[T]) = { @@ -497,7 +497,7 @@ class ReasonerSessionTests extends AnyFunSpec { cnt.sum } } - cnt should equal(6) + cnt should equal(5) } it("test concept check") { @@ -532,7 +532,7 @@ class ReasonerSessionTests extends AnyFunSpec { cnt.sum } } - cnt should equal(2) + cnt should equal(1) } @@ -575,7 +575,7 @@ class ReasonerSessionTests extends AnyFunSpec { cnt.sum } } - cnt should equal(6) + cnt should equal(3) } @@ -678,7 +678,7 @@ class ReasonerSessionTests extends AnyFunSpec { cnt.sum } } - cnt should equal(2) + cnt should equal(1) } it("test id push down 4") { @@ -760,8 +760,8 @@ class ReasonerSessionTests extends AnyFunSpec { .asInstanceOf[Map[String, Object]]) } catch { case ex: SchemaException => - println(ex.getMessage) - ex.getMessage.contains("Cannot find p3 types in") should equal(true) + ex.printStackTrace() + ex.getMessage.contains("Cannot find p3") should equal(true) } } @@ -799,8 +799,8 @@ class ReasonerSessionTests extends AnyFunSpec { .asInstanceOf[Map[String, Object]]) } catch { case ex: SchemaException => - println(ex.getMessage) - ex.getMessage.contains("Cannot find IRNode(D,Set()).name") should equal(true) + ex.printStackTrace() + ex.getMessage.contains("Cannot find D") should equal(true) } } }