feat(reasoner): supports full node id pushdown (#101)

Co-authored-by: youdonghai <donghai.ydh@antgroup.com>
This commit is contained in:
FishJoy 2024-01-30 14:08:32 +08:00 committed by GitHub
parent 3c7d5b41f2
commit b0911bd345
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
35 changed files with 898 additions and 358 deletions

View File

@ -120,6 +120,41 @@ case class TopDown[T <: AbstractTreeNode[T]: ClassTag](rule: PartialFunction[T,
}
/**
* Applies the given partial function starting from the root of this tree.
* An additional context is being recursively passed
* from the leftmost child to its siblings and eventually to its parent.
*/
case class TopDownWithContext[T <: AbstractTreeNode[T]: ClassTag, C](
rule: PartialFunction[(T, C), (T, C)])
extends TreeRewriterWithContext[T, C] {
def transform(tree: T, context: C): (T, C) = {
var (afterSelf, updatedContext) = if (rule.isDefinedAt(tree -> context)) {
rule(tree -> context)
} else {
tree -> context
}
val childrenLength = afterSelf.children.length
val afterChildren = if (childrenLength == 0) {
afterSelf
} else {
val updatedChildren = new Array[T](childrenLength)
var i = 0
while (i < childrenLength) {
val pair = transform(afterSelf.children(i), updatedContext)
updatedChildren(i) = pair._1
updatedContext = pair._2
i += 1
}
afterSelf.withNewChildren(updatedChildren)
}
afterChildren -> updatedContext
}
}
/**
* Applies the given transformation starting from the leaves of this tree.
*/

View File

@ -13,7 +13,7 @@
package com.antgroup.opensog.reasoner.common.trees
import com.antgroup.openspg.reasoner.common.trees.AbstractTreeNode
import com.antgroup.openspg.reasoner.common.trees.{AbstractTreeNode, BottomUpWithContext, TopDownWithContext}
import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, equal}
@ -28,6 +28,22 @@ class AbstractTreeNodeTest extends AnyFunSpec {
| ├─Number(v=4)
| └─Number(v=3)""".stripMargin)
}
it("rewrites with context up") {
val calculation = Add(Number(5), Add(Number(4), Number(3)))
val sumOnce: PartialFunction[(CalcExpr, Boolean), (CalcExpr, Boolean)] = {
case (Add(n1: Number, n2: Number), false) => Number(n1.v + n2.v) -> true
}
val expected = Add(Number(5), Number(7)) -> true
val up = BottomUpWithContext(sumOnce).transform(calculation, false)
up should equal(expected)
val down = TopDownWithContext(sumOnce).transform(calculation, false)
down should equal(expected)
}
}
case class Number(v: Int) extends CalcExpr {
@ -41,3 +57,4 @@ abstract class CalcExpr extends AbstractTreeNode[CalcExpr] {
case class Add(left: CalcExpr, right: CalcExpr) extends CalcExpr {
def eval: Int = left.eval + right.eval
}

View File

@ -25,7 +25,7 @@ import com.antgroup.openspg.reasoner.common.types._
import com.antgroup.openspg.reasoner.lube.block._
import com.antgroup.openspg.reasoner.lube.common.expr._
import com.antgroup.openspg.reasoner.lube.common.graph._
import com.antgroup.openspg.reasoner.lube.common.pattern.{Element, GraphPath, PatternElement, PredicateElement}
import com.antgroup.openspg.reasoner.lube.common.pattern.{Element, EntityElement, GraphPath, PatternElement, PredicateElement}
import com.antgroup.openspg.reasoner.lube.common.rule.{LogicRule, ProjectRule, Rule}
import com.antgroup.openspg.reasoner.lube.parser.ParserInterface
import com.antgroup.openspg.reasoner.lube.utils.{ExprUtils, RuleUtils}
@ -162,11 +162,31 @@ class OpenSPGDslParser extends ParserInterface {
Ref(ddlBlockWithNodes._3.target.alias)))))
DDLBlock(Set.apply(ddlBlockOp), List.apply(prjBlk))
case AddPredicate(predicate) =>
var attrFields: Map[String, Ref] = Map.empty
val attrFields = new mutable.HashMap[String, Expr]()
addPropertiesMap.foreach(x =>
if (x.name == predicate.alias) {
attrFields += (x.field -> Ref(generateIRPropertyTmpVariable(x).name))
})
attrFields.put(
Constants.EDGE_FROM_ID_KEY,
UnaryOpExpr(GetField(Constants.NODE_ID_KEY), Ref(predicate.source.alias)))
attrFields.put(
Constants.EDGE_FROM_ID_TYPE_KEY,
VString(predicate.source.typeNames.head))
if (predicate.target.isInstanceOf[EntityElement]) {
attrFields.put(
Constants.EDGE_TO_ID_KEY,
VString(predicate.target.asInstanceOf[EntityElement].id))
} else {
attrFields.put(
Constants.EDGE_TO_ID_KEY,
UnaryOpExpr(GetField(Constants.NODE_ID_KEY), Ref(predicate.target.alias)))
}
attrFields.put(
Constants.EDGE_TO_ID_TYPE_KEY,
VString(predicate.target.typeNames.head))
val depBlk = ruleBlock
DDLBlock(
@ -177,7 +197,7 @@ class OpenSPGDslParser extends ParserInterface {
predicate.alias,
predicate.source,
predicate.target,
attrFields,
attrFields.toMap,
predicate.direction))),
List.apply(depBlk))
case _ => DDLBlock(Set.apply(ddlBlockOp), List.apply(ruleBlock))
@ -337,8 +357,8 @@ class OpenSPGDslParser extends ParserInterface {
graphAggExpr.by.map {
case Ref(refName) => refName
case UnaryOpExpr(GetField(fieldName), Ref(refName)) => refName + "." + fieldName
case x => throw
new KGDSLGrammarException("OrderAndSliceBlock can not group " + x.pretty)
case x =>
throw new KGDSLGrammarException("OrderAndSliceBlock can not group " + x.pretty)
})
case AggIfOpExpr(_, _) | AggOpExpr(_, _) =>
if (realLValue == null) {

View File

@ -22,6 +22,7 @@ import com.antgroup.openspg.reasoner.lube.common.graph._
import com.antgroup.openspg.reasoner.lube.common.pattern.{EntityElement, LinkedPatternConnection}
import com.antgroup.openspg.reasoner.lube.common.rule.ProjectRule
import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.must.Matchers.contain
import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, equal}
class OpenSPGDslParserTest extends AnyFunSpec {
@ -660,8 +661,7 @@ class OpenSPGDslParserTest extends AnyFunSpec {
.asInstanceOf[AddPredicate]
.predicate
.fields
.head
._1 should equal("same_domain_num")
.keySet should contain ("same_domain_num")
blocks(1).asInstanceOf[DDLBlock].ddlOp.head.isInstanceOf[AddProperty] should equal(true)
blocks(1)
@ -1015,7 +1015,7 @@ class OpenSPGDslParserTest extends AnyFunSpec {
.predicate
.label equals ("belongTo")
val text =
"""└─DDLBlock(ddlOp=Set(AddPredicate(PredicateElement(belongTo,p,(s:User,BinaryOpExpr(name=BEqual)),EntityElement(cardUser,accountQueryCrowd,o),Map(),OUT))))
"""└─DDLBlock(ddlOp=Set(AddPredicate(PredicateElement(belongTo,p,(s:User,BinaryOpExpr(name=BEqual)),EntityElement(cardUser,accountQueryCrowd,o),Map(__to_id_type__->VString(value=accountQueryCrowd/cardUser),__to_id__->VString(value=cardUser),__from_id__->UnaryOpExpr(name=GetField(id)),__from_id_type__->VString(value=User)),OUT))))
* └─FilterBlock(rules=LogicRule(R5,智信确权,BinaryOpExpr(name=BEqual)))
* └─FilterBlock(rules=LogicRule(R4,绑定数目,BinaryOpExpr(name=BGreaterThan)))
* └─AggregationBlock(aggregations=Aggregations(Map(IRVariable(BindNum) -> AggOpExpr(name=Sum))), group=List(IRNode(s,Set(zhixin))))

View File

@ -15,9 +15,10 @@ package com.antgroup.openspg.reasoner.lube.catalog
import scala.collection.mutable
import com.antgroup.openspg.reasoner.common.constants.Constants
import com.antgroup.openspg.reasoner.common.exception.SchemaException
import com.antgroup.openspg.reasoner.common.graph.edge.{Direction, SPO}
import com.antgroup.openspg.reasoner.common.types.KgType
import com.antgroup.openspg.reasoner.common.types.{KgType, KTString}
import com.antgroup.openspg.reasoner.lube.catalog.struct.{Edge, Field, Node, NodeType}
import org.json4s._
import org.json4s.ext.EnumNameSerializer
@ -70,6 +71,11 @@ class PropertyGraphSchema(val nodes: mutable.Map[String, Node], val edges: mutab
}
def getEdgeField(spoStr: Set[String], fieldName: String): Field = {
if (fieldName.equals(Constants.EDGE_FROM_ID_KEY) || fieldName.equals(
Constants.EDGE_TO_ID_KEY) || fieldName.equals(
Constants.EDGE_FROM_ID_TYPE_KEY) || fieldName.equals(Constants.EDGE_TO_ID_TYPE_KEY)) {
return new Field(fieldName, KTString, true)
}
for (spo <- spoStr) {
val field = getEdgeField(spo, fieldName)
if (field != null) {
@ -105,6 +111,11 @@ class PropertyGraphSchema(val nodes: mutable.Map[String, Node], val edges: mutab
typeName: String,
endNode: String,
fieldName: String): Field = {
if (fieldName.equals(Constants.EDGE_FROM_ID_KEY) || fieldName.equals(
Constants.EDGE_TO_ID_KEY) || fieldName.equals(
Constants.EDGE_FROM_ID_TYPE_KEY) || fieldName.equals(Constants.EDGE_TO_ID_TYPE_KEY)) {
return new Field(fieldName, KTString, true)
}
val spo = new SPO(startNode, typeName, endNode)
val edge = edges.get(spo)
if (edge.isEmpty) {

View File

@ -15,7 +15,8 @@ package com.antgroup.openspg.reasoner.lube.utils
import scala.collection.mutable
import com.antgroup.openspg.reasoner.common.trees.{BottomUp, Transform}
import com.antgroup.openspg.reasoner.common.exception.UnsupportedOperationException
import com.antgroup.openspg.reasoner.common.trees.{BottomUp, TopDown, Transform}
import com.antgroup.openspg.reasoner.lube.common.expr.{Expr, GetField, Ref, UnaryOpExpr}
import com.antgroup.openspg.reasoner.lube.common.graph._
@ -104,11 +105,11 @@ object ExprUtils {
* @param renameFunc
* @return
*/
def renameVariableInExpr(expr: Expr, replaceVar: Map[IRField, IRProperty]): Expr = {
def renameVariableInExpr(expr: Expr, replaceVar: Map[IRField, IRField]): Expr = {
val trans: PartialFunction[Expr, Expr] = {
case expr @ UnaryOpExpr(GetField(name), Ref(alis)) =>
if (replaceVar.contains(IRProperty(alis, name))) {
val newProp = replaceVar.get(IRProperty(alis, name)).get
val newProp = replaceVar.get(IRProperty(alis, name)).get.asInstanceOf[IRProperty]
UnaryOpExpr(GetField(newProp.field), Ref(newProp.name))
} else {
expr
@ -116,13 +117,20 @@ object ExprUtils {
case expr @ Ref(name) =>
if (replaceVar.contains(IRVariable(name))) {
val newProp = replaceVar.get(IRVariable(name)).get
UnaryOpExpr(GetField(newProp.field), Ref(newProp.name))
newProp match {
case IRVariable(name) => Ref(name)
case IRProperty(name, field) => UnaryOpExpr(GetField(field), Ref(name))
case _ =>
throw UnsupportedOperationException(
s"rename unsupported expr=${expr}, replaceVar=${replaceVar}")
}
} else {
expr
}
case x => x
}
BottomUp(trans).transform(expr)
TopDown(trans).transform(expr)
}
def renameAliasInExpr(expr: Expr, replaceVar: Map[String, String]): Expr = {
@ -183,4 +191,5 @@ object ExprUtils {
}
variable
}
}

View File

@ -49,7 +49,7 @@ object RuleUtils {
* @param renameFunc
* @return
*/
def renameVariableInRule(rule: Rule, replaceVar: Map[IRField, IRProperty]): Rule = {
def renameVariableInRule(rule: Rule, replaceVar: Map[IRField, IRField]): Rule = {
if (null == rule) {
return null
}

View File

@ -14,7 +14,7 @@
package com.antgroup.openspg.reasoner.lube.logical.operators
import com.antgroup.openspg.reasoner.lube.common.pattern.{EdgePattern, LinkedPatternConnection}
import com.antgroup.openspg.reasoner.lube.logical.{SolvedModel, Var}
import com.antgroup.openspg.reasoner.lube.logical.{EdgeVar, SolvedModel, Var}
final case class LinkedExpand(
in: LogicalOperator,
@ -33,12 +33,12 @@ final case class LinkedExpand(
*
* @return
*/
override def refFields: List[Var] = List.empty
override def refFields: List[Var] = List.apply(solved.getVar(edgePattern.edge.alias))
/**
* the output fields of current operator
*
* @return
*/
override def fields: List[Var] = in.fields
override def fields: List[Var] = in.fields ++ refFields
}

View File

@ -15,12 +15,13 @@ package com.antgroup.openspg.reasoner.lube.logical.operators
import scala.collection.mutable
import com.antgroup.openspg.reasoner.common.exception.UnsupportedOperationException
import com.antgroup.openspg.reasoner.common.types.KTObject
import com.antgroup.openspg.reasoner.lube.catalog.struct.Field
import com.antgroup.openspg.reasoner.lube.common._
import com.antgroup.openspg.reasoner.lube.common.expr.{Directly, Expr}
import com.antgroup.openspg.reasoner.lube.common.graph.IRVariable
import com.antgroup.openspg.reasoner.lube.logical.{EdgeVar, ExprUtil, NodeVar, PropertyVar, SolvedModel, Var}
import com.antgroup.openspg.reasoner.lube.common.graph.{IREdge, IRNode}
import com.antgroup.openspg.reasoner.lube.logical._
import com.antgroup.openspg.reasoner.lube.utils.ExprUtils
final case class Project(in: LogicalOperator, expr: Map[Var, Expr], solved: SolvedModel)
extends StackingLogicalOperator {
@ -31,17 +32,17 @@ final case class Project(in: LogicalOperator, expr: Map[Var, Expr], solved: Solv
pair._2 match {
case Directly => fieldsMap.put(pair._1.name, pair._1)
case e: Expr =>
val refPair = ExprUtil.getReferProperties(e)
for (ref <- refPair) {
val alias = if (ref._1 != null) ref._1 else solved.getField(IRVariable(ref._2)).name
solved.fields.get(alias).get match {
case NodeVar(name, _) =>
val node = NodeVar(name, Set.apply(new Field(ref._2, KTObject, true)))
val fields =
ExprUtils.getAllInputFieldInRule(e, solved.getNodeAliasSet, solved.getEdgeAliasSet)
for (ref <- fields) {
ref match {
case IRNode(name, fields) =>
val node = NodeVar(name, fields.map(new Field(_, KTObject, true)))
fieldsMap.put(name, node.merge(fieldsMap.get(name)))
case EdgeVar(name, _) =>
val edge = EdgeVar(name, Set.apply(new Field(ref._2, KTObject, true)))
case IREdge(name, fields) =>
val edge = EdgeVar(name, fields.map(new Field(_, KTObject, true)))
fieldsMap.put(name, edge.merge(fieldsMap.get(name)))
case _ =>
case _ => throw UnsupportedOperationException(s"unsupported $expr")
}
}
}

View File

@ -13,6 +13,7 @@
package com.antgroup.openspg.reasoner.lube.logical.optimizer
import com.antgroup.openspg.reasoner.common.trees.{BottomUpWithContext, TopDownWithContext}
import com.antgroup.openspg.reasoner.lube.logical.operators.LogicalOperator
import com.antgroup.openspg.reasoner.lube.logical.optimizer.rules._
import com.antgroup.openspg.reasoner.lube.logical.planning.LogicalPlannerContext
@ -24,16 +25,17 @@ object LogicalOptimizer {
var LOGICAL_OPT_RULES: Seq[Rule] =
Seq(
PatternJoinPure,
GroupNode,
IdFilterPushDown,
DistinctGet,
NodeIdToEdgeProperty,
FilterPushDown,
ExpandIntoPure,
FilterMerge,
SolvedModelPure,
DistinctGet,
AggregatePushDown,
Pure,
ProjectMerge)
ProjectMerge,
SolvedModelPure)
def optimize(input: LogicalOperator, optRuleList: Seq[Rule])(implicit
context: LogicalPlannerContext): LogicalOperator = {
@ -41,9 +43,9 @@ object LogicalOptimizer {
for (rule <- optRuleList) {
for (i <- 0 until (rule.maxIterations)) {
if (rule.direction.equals(Up)) {
root = root.rewrite(rule.rule)
root = BottomUpWithContext(rule.ruleWithContext).transform(root, Map.empty)._1
} else {
root = root.rewriteTopDown(rule.rule)
root = TopDownWithContext(rule.ruleWithContext).transform(root, Map.empty)._1
}
}
}

View File

@ -23,10 +23,26 @@ case object Down extends Direction
trait Rule {
def rule(implicit
context: LogicalPlannerContext): PartialFunction[LogicalOperator, LogicalOperator]
def ruleWithContext(implicit context: LogicalPlannerContext): PartialFunction[
(LogicalOperator, Map[String, Object]),
(LogicalOperator, Map[String, Object])]
def direction: Direction
def maxIterations: Int
}
abstract class SimpleRule extends Rule {
override def ruleWithContext(implicit context: LogicalPlannerContext): PartialFunction[
(LogicalOperator, Map[String, Object]),
(LogicalOperator, Map[String, Object])] = {
case (operator, c) if rule.isDefinedAt(operator) =>
val transformedOperator = rule(context)(operator)
(transformedOperator, c)
}
def rule(implicit
context: LogicalPlannerContext): PartialFunction[LogicalOperator, LogicalOperator]
}

View File

@ -21,14 +21,14 @@ import com.antgroup.openspg.reasoner.lube.common.expr._
import com.antgroup.openspg.reasoner.lube.common.pattern.Pattern
import com.antgroup.openspg.reasoner.lube.logical._
import com.antgroup.openspg.reasoner.lube.logical.operators.{Filter, _}
import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, Rule, Up}
import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, SimpleRule, Up}
import com.antgroup.openspg.reasoner.lube.logical.planning.LogicalPlannerContext
import com.antgroup.openspg.reasoner.lube.utils.{ExprUtils, RuleUtils}
/**
* Aggregation push down
*/
object AggregatePushDown extends Rule {
object AggregatePushDown extends SimpleRule {
override def rule(implicit
context: LogicalPlannerContext): PartialFunction[LogicalOperator, LogicalOperator] = {

View File

@ -15,10 +15,10 @@ package com.antgroup.openspg.reasoner.lube.logical.optimizer.rules
import com.antgroup.openspg.reasoner.lube.logical.NodeVar
import com.antgroup.openspg.reasoner.lube.logical.operators.{Aggregate, LogicalOperator, Select}
import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, Rule, Up}
import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, SimpleRule, Up}
import com.antgroup.openspg.reasoner.lube.logical.planning.LogicalPlannerContext
case object DistinctGet extends Rule {
case object DistinctGet extends SimpleRule {
override def rule(implicit
context: LogicalPlannerContext): PartialFunction[LogicalOperator, LogicalOperator] = {

View File

@ -25,7 +25,7 @@ import com.antgroup.openspg.reasoner.lube.common.rule.{Rule => LogicalRule}
import com.antgroup.openspg.reasoner.lube.logical.NodeVar
import com.antgroup.openspg.reasoner.lube.logical.PatternOps.PatternOps
import com.antgroup.openspg.reasoner.lube.logical.operators.{ExpandInto, LogicalOperator, PatternScan}
import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, Down, Rule}
import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, Down, SimpleRule}
import com.antgroup.openspg.reasoner.lube.logical.planning.LogicalPlannerContext
import com.antgroup.openspg.reasoner.lube.utils.RuleUtils
@ -37,7 +37,7 @@ import com.antgroup.openspg.reasoner.lube.utils.RuleUtils
* 3. cur node is advanced, then travel down to find cur node is transferred from a attribute.
* All possibilities require a downward traversal, and the downward traversal can cover all cases.
*/
object EdgeToProperty extends Rule {
object EdgeToProperty extends SimpleRule {
override def rule(implicit
context: LogicalPlannerContext): PartialFunction[LogicalOperator, LogicalOperator] = {

View File

@ -13,9 +13,12 @@
package com.antgroup.openspg.reasoner.lube.logical.optimizer.rules
import scala.collection.mutable
import com.antgroup.openspg.reasoner.lube.common.pattern.NodePattern
import com.antgroup.openspg.reasoner.lube.logical.operators.{ExpandInto, LogicalOperator}
import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, Rule, Up}
import com.antgroup.openspg.reasoner.lube.logical.{EdgeVar, NodeVar, PropertyVar, Var}
import com.antgroup.openspg.reasoner.lube.logical.operators._
import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, Down, Rule}
import com.antgroup.openspg.reasoner.lube.logical.planning.LogicalPlannerContext
/**
@ -23,23 +26,62 @@ import com.antgroup.openspg.reasoner.lube.logical.planning.LogicalPlannerContext
*/
object ExpandIntoPure extends Rule {
override def rule(implicit
context: LogicalPlannerContext): PartialFunction[LogicalOperator, LogicalOperator] = {
case expandInto @ ExpandInto(in, target, _) =>
val needPure = canPure(expandInto)
def ruleWithContext(implicit context: LogicalPlannerContext): PartialFunction[
(LogicalOperator, Map[String, Object]),
(LogicalOperator, Map[String, Object])] = {
case (select: Select, map) =>
select -> merge(map, select.refFields, select.solved.getNodeAliasSet)
case (ddl: DDL, map) => ddl -> merge(map, ddl.refFields, ddl.solved.getNodeAliasSet)
case (filter: Filter, map) =>
filter -> merge(map, filter.refFields, filter.solved.getNodeAliasSet)
case (project: Project, map) =>
project -> merge(map, project.refFields, project.solved.getNodeAliasSet)
case (aggregate: Aggregate, map) =>
aggregate -> merge(map, aggregate.refFields, aggregate.solved.getNodeAliasSet)
case (order: OrderAndLimit, map) =>
order -> merge(map, order.refFields, order.solved.getNodeAliasSet)
case (expandInto @ ExpandInto(in, _, _), map) =>
val needPure = canPure(expandInto, map.asInstanceOf[Map[String, Var]])
if (needPure) {
in
in -> map
} else {
expandInto
expandInto -> map
}
}
private def canPure(expandInto: ExpandInto): Boolean = {
private def merge(
map: Map[String, Object],
fields: List[Var],
nodes: Set[String]): Map[String, Object] = {
val varMap = new mutable.HashMap[String, Var]()
varMap.++=(map.asInstanceOf[Map[String, Var]])
for (field <- fields) {
if (varMap.contains(field.name)) {
varMap.put(field.name, varMap(field.name).merge(Option.apply(field)))
} else if (field.isInstanceOf[PropertyVar]) {
if (nodes.contains(field.name)) {
varMap.put(
field.name,
NodeVar(field.name, Set.apply(field.asInstanceOf[PropertyVar].field)))
} else {
varMap.put(
field.name,
EdgeVar(field.name, Set.apply(field.asInstanceOf[PropertyVar].field)))
}
} else {
varMap.put(field.name, field)
}
}
varMap.toMap
}
private def canPure(expandInto: ExpandInto, map: Map[String, Var]): Boolean = {
if (!expandInto.pattern.isInstanceOf[NodePattern]) {
false
} else {
val refFields = expandInto.refFields
if (refFields.head.isEmpty) {
val alias = expandInto.pattern.root.alias
if (!map.contains(alias) || map(alias).isEmpty) {
true
} else {
false
@ -47,7 +89,7 @@ object ExpandIntoPure extends Rule {
}
}
override def direction: Direction = Up
override def direction: Direction = Down
override def maxIterations: Int = 1
override def maxIterations: Int = 10
}

View File

@ -14,10 +14,10 @@
package com.antgroup.openspg.reasoner.lube.logical.optimizer.rules
import com.antgroup.openspg.reasoner.lube.logical.operators.{Filter, LogicalOperator}
import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, Down, Rule}
import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, Down, Rule, SimpleRule}
import com.antgroup.openspg.reasoner.lube.logical.planning.LogicalPlannerContext
object FilterMerge extends Rule {
object FilterMerge extends SimpleRule {
override def rule(implicit
context: LogicalPlannerContext): PartialFunction[LogicalOperator, LogicalOperator] = {

View File

@ -17,16 +17,11 @@ import scala.collection.mutable
import com.antgroup.openspg.reasoner.common.trees.BottomUp
import com.antgroup.openspg.reasoner.lube.common.graph.{IREdge, IRNode}
import com.antgroup.openspg.reasoner.lube.common.pattern.{
EdgePattern,
NodePattern,
PartialGraphPattern,
Pattern
}
import com.antgroup.openspg.reasoner.lube.common.pattern.{EdgePattern, NodePattern, PartialGraphPattern, Pattern}
import com.antgroup.openspg.reasoner.lube.logical.{EdgeVar, NodeVar, Var}
import com.antgroup.openspg.reasoner.lube.logical.PatternOps.PatternOps
import com.antgroup.openspg.reasoner.lube.logical.operators._
import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, Rule, Up}
import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, Rule, SimpleRule, Up}
import com.antgroup.openspg.reasoner.lube.logical.planning.LogicalPlannerContext
import com.antgroup.openspg.reasoner.lube.utils.RuleUtils
import org.apache.commons.lang3.StringUtils
@ -34,7 +29,7 @@ import org.apache.commons.lang3.StringUtils
/**
* Predicate push down
*/
object FilterPushDown extends Rule {
object FilterPushDown extends SimpleRule {
override def rule(implicit
context: LogicalPlannerContext): PartialFunction[LogicalOperator, LogicalOperator] = {

View File

@ -15,18 +15,17 @@ package com.antgroup.openspg.reasoner.lube.logical.optimizer.rules
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.util.control.Breaks.break
import com.antgroup.openspg.reasoner.common.constants.Constants
import com.antgroup.openspg.reasoner.lube.logical.{NodeVar, PropertyVar, Var}
import com.antgroup.openspg.reasoner.lube.logical.operators._
import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, Rule, Up}
import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, Rule, SimpleRule, Up}
import com.antgroup.openspg.reasoner.lube.logical.planning.LogicalPlannerContext
/**
* Convert group node property to group node, when properties contains node id.
*/
object GroupNode extends Rule {
object GroupNode extends SimpleRule {
override def rule(implicit
context: LogicalPlannerContext): PartialFunction[LogicalOperator, LogicalOperator] = {
@ -56,6 +55,8 @@ object GroupNode extends Rule {
groups.++=(kv._2)
}
}
} else {
groups.++=(kv._2)
}
}
aggregate.copy(group = groups.toList)

View File

@ -1,218 +0,0 @@
/*
* Copyright 2023 OpenSPG Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied.
*/
package com.antgroup.openspg.reasoner.lube.logical.optimizer.rules
import com.antgroup.openspg.reasoner.common.constants.Constants
import com.antgroup.openspg.reasoner.common.trees.TopDown
import com.antgroup.openspg.reasoner.lube.common.expr.{Filter => _, _}
import com.antgroup.openspg.reasoner.lube.common.pattern._
import com.antgroup.openspg.reasoner.lube.common.rule.LogicRule
import com.antgroup.openspg.reasoner.lube.logical.PatternOps.PatternOps
import com.antgroup.openspg.reasoner.lube.logical.operators._
import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, Rule, Up}
import com.antgroup.openspg.reasoner.lube.logical.planning.LogicalPlannerContext
import org.apache.commons.lang3.StringUtils
/**
* Predicate push down
*/
object IdFilterPushDown extends Rule {
override def rule(implicit
context: LogicalPlannerContext): PartialFunction[LogicalOperator, LogicalOperator] = {
case filter: Filter => idFilterPushDown(filter)
}
private def idFilterPushDown(filter: Filter): LogicalOperator = {
var hasPushDown: Boolean = false
val updatedRule = filter.rule match {
case rule: LogicRule =>
rule.getExpr match {
case BinaryOpExpr(opName,
UnaryOpExpr(GetField(Constants.NODE_ID_KEY), Ref(refName)),
r) =>
opName match {
case BIn => r match {
case _ => (refName, r, BIn)
}
case BEqual => r match {
case _ => (refName, r, BEqual)
}
case _ => null
}
case _ => null
}
case _ => null
}
if (updatedRule == null) {
return filter
}
var boundedVarLenExpandEdgeNameSet: Set[String] = Set.empty
def rewriter: PartialFunction[LogicalOperator, LogicalOperator] = {
case boundedVarLenExpand: BoundedVarLenExpand =>
boundedVarLenExpandEdgeNameSet += boundedVarLenExpand.edgePattern.edge.alias
boundedVarLenExpandEdgeNameSet += boundedVarLenExpand.edgePattern.dst.alias
boundedVarLenExpandEdgeNameSet += boundedVarLenExpand.edgePattern.src.alias
boundedVarLenExpand
case expandInto: ExpandInto =>
val res = updatePatternFilterRule(expandInto.pattern, updatedRule,
null, boundedVarLenExpandEdgeNameSet)
if (res._1) {
hasPushDown = true
expandInto.copy(pattern = res._2)
} else {
expandInto
}
case patternScan: PatternScan =>
val res = updatePatternFilterRule(patternScan.pattern, updatedRule,
patternScan.pattern.root.alias, boundedVarLenExpandEdgeNameSet)
if (res._1) {
hasPushDown = true
patternScan.copy(pattern = res._2)
} else {
patternScan
}
}
val newFilter = TopDown[LogicalOperator](rewriter).transform(filter).asInstanceOf[Filter]
if (hasPushDown) {
newFilter.in
} else {
filter
}
}
private def fillInRule(
filterRule: com.antgroup.openspg.reasoner.lube.common.rule.Rule,
alias: String,
pattern: Pattern, boundedVarLenExpandEdgeNameSet: Set[String]): (Boolean, Pattern) = {
var pushToAlias = ""
if (!boundedVarLenExpandEdgeNameSet.contains(alias)) {
pattern match {
case NodePattern(node) =>
if (node.alias.equals(alias)) {
pushToAlias = node.alias
}
case EdgePattern(_, _, edge) =>
if (edge.alias.equals(alias)) {
pushToAlias = edge.alias
}
case PartialGraphPattern(_, nodes, edges) =>
if (nodes.contains(alias)) {
pushToAlias = alias;
}
val edgeSet = edges.values.flatten
for (e <- edgeSet) {
if (StringUtils.isBlank(pushToAlias) &&
e.alias.equals(alias)) {
pushToAlias = e.alias
}
}
case _ =>
}
}
if (StringUtils.isNotBlank(pushToAlias)) {
(true, pattern.fillInRule(filterRule, pushToAlias))
} else {
(false, pattern)
}
}
private def updatePatternFilterRule(
pattern: Pattern,
updateExpr: (String, Expr, BinaryOpSet),
startAlias: String,
boundedVarLenExpandEdgeNameSet: Set[String]): (Boolean, Pattern) = {
var updatedPattern = pattern
val alias = updateExpr._1
val expr = updateExpr._2
val opName = updateExpr._3
var isChange = false
// node rule
if (alias.equals(startAlias)) {
val filterRule = LogicRule(
"generate_id_filter_" + alias,
"",
BinaryOpExpr(opName, UnaryOpExpr(GetField(Constants.NODE_ID_KEY), Ref(alias)), expr))
val res = fillInRule(filterRule, alias, updatedPattern, boundedVarLenExpandEdgeNameSet)
isChange = res._1 || isChange
updatedPattern = res._2
}
// edge filter
val inEdges = getInConnection(alias, updatedPattern)
inEdges.foreach(x => {
val filterInEdgeRule = LogicRule(
"generate_in_edge_id_filter_" + x.alias,
"",
BinaryOpExpr(opName, UnaryOpExpr(GetField(Constants.EDGE_TO_ID_KEY), Ref(x.alias)), expr))
val res = fillInRule(filterInEdgeRule, x.alias, updatedPattern,
boundedVarLenExpandEdgeNameSet)
isChange = res._1 || isChange
updatedPattern = res._2
})
val outEdges = getOutConnection(alias, updatedPattern)
outEdges.foreach(x => {
val filterOutEdgeRule = LogicRule(
"generate_out_edge_id_filter_" + x.alias,
"",
BinaryOpExpr(opName, UnaryOpExpr(GetField(Constants.EDGE_FROM_ID_KEY), Ref(x.alias)), expr))
val res = fillInRule(filterOutEdgeRule, x.alias,
updatedPattern, boundedVarLenExpandEdgeNameSet)
isChange = res._1 || isChange
updatedPattern = res._2
})
(isChange, updatedPattern)
}
private def getConnection(
alias: String,
pattern: Pattern,
direction: com.antgroup.openspg.reasoner.common.graph.edge.Direction): Set[Connection] = {
pattern.topology
.flatMap(edgeSet => {
edgeSet._2
.map { case c: PatternConnection =>
val compareAlias =
if (c.direction.equals(direction)) c.source else c.target
if (compareAlias.equals(alias)) {
c
} else {
null
}
}
.filter(_ != null)
})
.toSet
}
private def getInConnection(alias: String, pattern: Pattern): Set[Connection] = {
getConnection(alias, pattern, com.antgroup.openspg.reasoner.common.graph.edge.Direction.IN)
}
private def getOutConnection(alias: String, pattern: Pattern): Set[Connection] = {
getConnection(alias, pattern, com.antgroup.openspg.reasoner.common.graph.edge.Direction.OUT)
}
override def direction: Direction = Up
override def maxIterations: Int = 1
}

View File

@ -0,0 +1,223 @@
/*
* Copyright 2023 OpenSPG Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied.
*/
package com.antgroup.openspg.reasoner.lube.logical.optimizer.rules
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import com.antgroup.openspg.reasoner.common.constants.Constants
import com.antgroup.openspg.reasoner.common.exception.UnsupportedOperationException
import com.antgroup.openspg.reasoner.common.graph.edge
import com.antgroup.openspg.reasoner.common.types.KTString
import com.antgroup.openspg.reasoner.lube.block.{AddPredicate, DDLOp}
import com.antgroup.openspg.reasoner.lube.catalog.struct.Field
import com.antgroup.openspg.reasoner.lube.common.expr.Expr
import com.antgroup.openspg.reasoner.lube.common.graph.{IRField, IRNode, IRProperty, IRVariable}
import com.antgroup.openspg.reasoner.lube.common.pattern.{Connection, NodePattern, Pattern, VariablePatternConnection}
import com.antgroup.openspg.reasoner.lube.logical.{NodeVar, PropertyVar, Var}
import com.antgroup.openspg.reasoner.lube.logical.operators._
import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, Rule, Up}
import com.antgroup.openspg.reasoner.lube.logical.planning.LogicalPlannerContext
import com.antgroup.openspg.reasoner.lube.utils.{ExprUtils, RuleUtils}
object NodeIdToEdgeProperty extends Rule {
private val NODE_DEFAULT_PROPS = Set.apply(Constants.NODE_ID_KEY, Constants.CONTEXT_LABEL)
def ruleWithContext(implicit context: LogicalPlannerContext): PartialFunction[
(LogicalOperator, Map[String, Object]),
(LogicalOperator, Map[String, Object])] = {
case (expandInto: ExpandInto, map) =>
if (!canPushDown(expandInto)) {
expandInto -> map
} else {
val toEdge = targetConnection(expandInto)
expandInto -> (map + (expandInto.pattern.root.alias -> toEdge))
}
case (filter: Filter, map) =>
if (map.isEmpty) {
filter -> map
} else {
filterUpdate(filter, map) -> map
}
case (select: Select, map) =>
if (map.isEmpty) {
select -> map
} else {
selectUpdate(select, map) -> map
}
case (ddl: DDL, map) =>
if (map.isEmpty) {
ddl -> map
} else {
ddlUpdate(ddl, map) -> map
}
case (varLenExpand @ BoundedVarLenExpand(_, expandInto: ExpandInto, edgePattern, _), map) =>
if (edgePattern.edge.upper == 1 && canPushDown(expandInto)) {
val toEdge = targetConnection(expandInto)
varLenExpand -> (map + (expandInto.pattern.root.alias -> toEdge))
} else if (canPushDown(expandInto)) {
varLenExpand -> (map - expandInto.pattern.root.alias)
} else {
varLenExpand -> map
}
}
private def genField(direction: edge.Direction, fieldName: String): String = {
(direction, fieldName) match {
case (edge.Direction.OUT, Constants.NODE_ID_KEY) => Constants.EDGE_TO_ID_KEY
case (edge.Direction.OUT, Constants.CONTEXT_LABEL) => Constants.EDGE_TO_ID_TYPE_KEY
case (edge.Direction.IN, Constants.NODE_ID_KEY) => Constants.EDGE_FROM_ID_KEY
case (edge.Direction.IN, Constants.CONTEXT_LABEL) => Constants.EDGE_FROM_ID_TYPE_KEY
case (_, _) =>
throw UnsupportedOperationException(s"""unsupport (${direction}, ${fieldName})""")
}
}
private def filterUpdate(filter: Filter, map: Map[String, Object]): Filter = {
val input = RuleUtils.getAllInputFieldInRule(
filter.rule,
filter.solved.getNodeAliasSet,
filter.solved.getEdgeAliasSet)
val replaceVar = new mutable.HashMap[IRField, IRField]
for (irField <- input) {
if (irField.isInstanceOf[IRNode] && map.contains(irField.name)) {
for (propName <- irField.asInstanceOf[IRNode].fields) {
if (NODE_DEFAULT_PROPS.contains(propName)) {
val edgeInfo = map(irField.name).asInstanceOf[Connection]
replaceVar.put(
IRProperty(irField.name, propName),
IRProperty(edgeInfo.alias, genField(edgeInfo.direction, propName)))
replaceVar.put(IRVariable(irField.name), IRVariable(edgeInfo.alias))
}
}
}
}
if (replaceVar.isEmpty) {
filter
} else {
val newRule = RuleUtils.renameVariableInRule(filter.rule, replaceVar.toMap)
filter.copy(rule = newRule)
}
}
private def selectUpdate(select: Select, map: Map[String, Object]): Select = {
val newFields = new ListBuffer[Var]()
for (field <- select.fields) {
if (field.isInstanceOf[PropertyVar] && map.contains(field.name)) {
val edgeInfo = map(field.name).asInstanceOf[Connection]
val propName = field.asInstanceOf[PropertyVar].field.name
if (NODE_DEFAULT_PROPS.contains(propName)) {
newFields.append(
PropertyVar(
edgeInfo.alias,
new Field(genField(edgeInfo.direction, propName), KTString, true)))
} else {
newFields.append(field)
}
} else {
newFields.append(field)
}
}
select.copy(fields = newFields.toList)
}
private def ddlUpdate(ddl: DDL, map: Map[String, Object]): DDL = {
val ddlOps = new mutable.HashSet[DDLOp]()
for (ddlOp <- ddl.ddlOp) {
ddlOp match {
case AddPredicate(predicate) =>
val newFields = new mutable.HashMap[String, Expr]()
for (field <- predicate.fields) {
val input = ExprUtils.getAllInputFieldInRule(field._2, null, null)
val replaceVar = new mutable.HashMap[IRField, IRProperty]
for (irField <- input) {
if (irField.isInstanceOf[IRNode] && map.contains(irField.name)) {
for (propName <- irField.asInstanceOf[IRNode].fields) {
if (NODE_DEFAULT_PROPS.contains(propName)) {
val edgeInfo = map(irField.name).asInstanceOf[Connection]
replaceVar.put(
IRProperty(irField.name, propName),
IRProperty(edgeInfo.alias, genField(edgeInfo.direction, propName)))
}
}
}
}
if (replaceVar.isEmpty) {
newFields.put(field._1, field._2)
} else {
newFields.put(field._1, ExprUtils.renameVariableInExpr(field._2, replaceVar.toMap))
}
}
ddlOps.add(AddPredicate(predicate.copy(fields = newFields.toMap)))
case _ => ddlOps.add(ddlOp)
}
}
ddl.copy(ddlOp = ddlOps.toSet)
}
private def targetConnection(expandInto: ExpandInto): Connection = {
val alias = expandInto.pattern.root.alias
val edgeAlias = expandInto.transform[Connection] {
case (patternScan: PatternScan, _) =>
targetConnection(alias, patternScan.pattern)
case (expandInto: ExpandInto, list) =>
if (!list.isEmpty && list.head != null) {
list.head
} else {
targetConnection(alias, expandInto.pattern)
}
case (_, list) =>
if (list.isEmpty) {
null
} else {
list.head
}
}
edgeAlias
}
private def targetConnection(alias: String, pattern: Pattern): Connection = {
val connections = pattern.topology.values.flatten.filter(_.target.equals(alias))
val fixedEdges = connections.filter(!_.isInstanceOf[VariablePatternConnection])
val varEdges = connections
.filter(_.isInstanceOf[VariablePatternConnection])
.map(_.asInstanceOf[VariablePatternConnection])
.filter(_.upper == 1)
if (fixedEdges.isEmpty && varEdges.isEmpty) {
null
} else if (fixedEdges.isEmpty) {
varEdges.head
} else {
fixedEdges.head
}
}
private def canPushDown(expandInto: ExpandInto): Boolean = {
if (!expandInto.pattern.isInstanceOf[NodePattern]) {
false
} else {
val fieldNames = expandInto.refFields.head.asInstanceOf[NodeVar].fields.map(_.name)
val normalNames = fieldNames.filter(!NODE_DEFAULT_PROPS.contains(_))
if (normalNames.isEmpty) {
true
} else {
false
}
}
}
override def direction: Direction = Up
override def maxIterations: Int = 1
}

View File

@ -0,0 +1,38 @@
/*
* Copyright 2023 OpenSPG Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied.
*/
package com.antgroup.openspg.reasoner.lube.logical.optimizer.rules
import com.antgroup.openspg.reasoner.lube.common.pattern.NodePattern
import com.antgroup.openspg.reasoner.lube.logical.operators.{BoundedVarLenExpand, LogicalOperator, PatternJoin, PatternScan}
import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, SimpleRule, Up}
import com.antgroup.openspg.reasoner.lube.logical.planning.LogicalPlannerContext
object PatternJoinPure extends SimpleRule {
override def rule(implicit
context: LogicalPlannerContext): PartialFunction[LogicalOperator, LogicalOperator] = {
case patternJoin @ PatternJoin(
boundedVarLenExpand: BoundedVarLenExpand,
scan: PatternScan,
_) =>
if (scan.pattern.isInstanceOf[NodePattern]) {
boundedVarLenExpand
} else {
patternJoin
}
}
override def direction: Direction = Up
override def maxIterations: Int = 1
}

View File

@ -19,21 +19,21 @@ import com.antgroup.openspg.reasoner.common.exception.SystemError
import com.antgroup.openspg.reasoner.lube.common.expr.{Directly, Expr}
import com.antgroup.openspg.reasoner.lube.logical._
import com.antgroup.openspg.reasoner.lube.logical.operators.{LogicalOperator, Project}
import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, Rule, Up}
import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, Rule, SimpleRule, Up}
import com.antgroup.openspg.reasoner.lube.logical.planning.LogicalPlannerContext
/**
* Merge adjacent Project operators
*/
object ProjectMerge extends Rule {
object ProjectMerge extends SimpleRule {
override def rule(implicit
context: LogicalPlannerContext): PartialFunction[LogicalOperator, LogicalOperator] = {
case project @ Project(in, _, _) =>
if (!in.isInstanceOf[Project] || !canMerge(project)) {
case project @ Project(in: Project, _, _) =>
if (!canMerge(project, in)) {
project
} else {
val parentProjects = in.asInstanceOf[Project].expr
val parentProjects = in.expr
val newProjects = new mutable.HashMap[Var, Expr]()
for (pair <- parentProjects) {
if (!pair._2.isInstanceOf[Directly.type]) {
@ -68,7 +68,7 @@ object ProjectMerge extends Rule {
newProjects.put(pair._1, pair._2)
}
}
Project(in.asInstanceOf[Project].in, newProjects.toMap, in.solved)
Project(in.in, newProjects.toMap, in.solved)
}
}
@ -88,19 +88,17 @@ object ProjectMerge extends Rule {
}
}
private def canMerge(project: Project): Boolean = {
val parentOutput =
project.in.asInstanceOf[Project].expr.filter(!_._2.isInstanceOf[Directly.type]).keySet
private def canMerge(project: Project, in: Project): Boolean = {
val parentOutput = in.expr.filter(!_._2.isInstanceOf[Directly.type]).keySet
val computeExprs = project.expr.filter(!_._2.isInstanceOf[Directly.type])
for (expr <- computeExprs) {
val fields = ExprUtil.getReferProperties(expr._2)
for (pair <- fields) {
val outputVar = parentOutput.filter(_.name.equals(pair._1))
if (!outputVar.isEmpty && outputVar.head
.asInstanceOf[PropertyVar]
.field
.name
.equals(pair._2)) {
val outputVar = parentOutput
.filter(_.name.equals(pair._1))
.map(_.asInstanceOf[PropertyVar])
.map(_.field.name)
if (outputVar.contains(pair._2)) {
return false
}
}

View File

@ -19,7 +19,7 @@ import com.antgroup.openspg.reasoner.common.exception.InvalidRefVariable
import com.antgroup.openspg.reasoner.lube.common.expr.Directly
import com.antgroup.openspg.reasoner.lube.logical.{PathVar, RepeatPathVar, SolvedModel, Var}
import com.antgroup.openspg.reasoner.lube.logical.operators._
import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, Down, Rule}
import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, Down, SimpleRule}
import com.antgroup.openspg.reasoner.lube.logical.planning.LogicalPlannerContext
/**
@ -27,7 +27,7 @@ import com.antgroup.openspg.reasoner.lube.logical.planning.LogicalPlannerContext
* if and only if the output of the current Op is not dependent on the downstream node,
* add a ProjectOp for attribute clipping
*/
object Pure extends Rule {
object Pure extends SimpleRule {
override def rule(implicit
context: LogicalPlannerContext): PartialFunction[LogicalOperator, LogicalOperator] = {

View File

@ -18,13 +18,13 @@ import scala.collection.mutable
import com.antgroup.openspg.reasoner.common.trees.BottomUp
import com.antgroup.openspg.reasoner.lube.logical.{SolvedModel, Var}
import com.antgroup.openspg.reasoner.lube.logical.operators._
import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, Rule, Up}
import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, Rule, SimpleRule, Up}
import com.antgroup.openspg.reasoner.lube.logical.planning.LogicalPlannerContext
/**
* Pure solvedModel after EdgeToProperty
*/
object SolvedModelPure extends Rule {
object SolvedModelPure extends SimpleRule {
override def rule(implicit
context: LogicalPlannerContext): PartialFunction[LogicalOperator, LogicalOperator] = {
@ -39,12 +39,17 @@ object SolvedModelPure extends Rule {
private def resolvedModel(input: LogicalOperator): SolvedModel = {
val fields = input.transform[List[Var]] {
case (scan @ PatternScan(_, _), _) => scan.refFields
case (expandInto @ ExpandInto(_, _, _), tupleList) =>
case (scan: PatternScan, _) => scan.refFields
case (expandInto: ExpandInto, tupleList) =>
val list = new mutable.ListBuffer[Var]()
list.++=(tupleList.flatten)
list.++=(expandInto.refFields)
list.toList
case (linkedExpand: LinkedExpand, tupleList) =>
val list = new mutable.ListBuffer[Var]()
list.++=(tupleList.flatten)
list.++=(linkedExpand.refFields)
list.toList
case (_, tupleList) => tupleList.flatten
}
val fieldMap = fields.map(f => (f.name, f)).toMap

View File

@ -16,11 +16,7 @@ package com.antgroup.openspg.reasoner.lube.logical.planning
import scala.collection.mutable
import com.antgroup.openspg.reasoner.common.constants.Constants
import com.antgroup.openspg.reasoner.common.exception.{
NotImplementedException,
SchemaException,
UnsupportedOperationException
}
import com.antgroup.openspg.reasoner.common.exception.{NotImplementedException, SchemaException, UnsupportedOperationException}
import com.antgroup.openspg.reasoner.common.graph.edge.SPO
import com.antgroup.openspg.reasoner.lube.block._
import com.antgroup.openspg.reasoner.lube.catalog.{Catalog, SemanticPropertyGraph}
@ -172,15 +168,20 @@ object LogicalPlanner {
field match {
case node: IRNode =>
if (types(node.name).isEmpty && node.fields.nonEmpty) {
throw SchemaException(s"Cannot find $node.name in $types")
throw SchemaException(s"Cannot find ${node.name} in $types")
}
val fields = node.fields.map(graph.graphSchema.getNodeField(types(node.name), _))
NodeVar(node.name, fields)
case edge: IREdge =>
if (types(edge.name).isEmpty && edge.fields.nonEmpty) {
throw SchemaException(s"Cannot find $edge.name in $types")
throw SchemaException(s"Cannot find ${edge.name} in $types")
}
val fields = edge.fields.map(graph.graphSchema.getEdgeField(types(edge.name), _))
val fullFields = edge.fields ++ Set.apply(
Constants.EDGE_FROM_ID_KEY,
Constants.EDGE_FROM_ID_TYPE_KEY,
Constants.EDGE_TO_ID_KEY,
Constants.EDGE_TO_ID_TYPE_KEY)
val fields = fullFields.map(graph.graphSchema.getEdgeField(types(edge.name), _))
EdgeVar(edge.name, fields)
case array: IRRepeatPath =>
RepeatPathVar(

View File

@ -17,13 +17,12 @@ import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import com.antgroup.openspg.reasoner.common.constants.Constants
import com.antgroup.openspg.reasoner.common.exception.UnsupportedOperationException
import com.antgroup.openspg.reasoner.common.exception.{SystemError, UnsupportedOperationException}
import com.antgroup.openspg.reasoner.lube.catalog.SemanticPropertyGraph
import com.antgroup.openspg.reasoner.lube.common.pattern._
import com.antgroup.openspg.reasoner.lube.common.pattern.ElementOps.toPattenElement
import com.antgroup.openspg.reasoner.lube.logical.SolvedModel
import com.antgroup.openspg.reasoner.lube.logical.operators._
/**
* QueryPath splitting
* @param pattern GraphPath, some times are called QueryGraph
@ -60,7 +59,7 @@ class PatternMatchPlanner(val pattern: GraphPattern)(implicit context: LogicalPl
val dst = getSrcAndDst(connection, chosenNodes)._2
chosenEdges.add(connection)
val driving = Driving(dependency.graph, src, dependency.solved)
val scan = PatternScan(driving, buildEdgePattern(connection))
val scan = PatternScan(driving, buildEdgePattern(src, connection))
val rhsPlanner = new PatternMatchPlanner(parts._2.copy(rootAlias = dst))
val oneRhsOperator = rhsPlanner.plan(scan, chosenNodes.clone(), chosenEdges.clone())
if (oneRhsOperator != null && rhsOperator != null) {
@ -90,7 +89,7 @@ class PatternMatchPlanner(val pattern: GraphPattern)(implicit context: LogicalPl
lhsOperator = connection match {
case conn: VariablePatternConnection =>
val edgePattern =
buildEdgePattern(conn).asInstanceOf[EdgePattern[VariablePatternConnection]]
buildEdgePattern(src, conn).asInstanceOf[EdgePattern[VariablePatternConnection]]
val repeatOperator = buildBoundVarLenExpand(src, edgePattern, lhsOperator)
if (rhsOperator == null) {
repeatOperator
@ -109,8 +108,14 @@ class PatternMatchPlanner(val pattern: GraphPattern)(implicit context: LogicalPl
}
}
private def buildEdgePattern(conn: Connection) = {
EdgePattern(pattern.getNode(conn.source), pattern.getNode(conn.target), conn)
private def buildEdgePattern(root: String, conn: Connection) = {
if (root.equals(conn.source)) {
EdgePattern(pattern.getNode(conn.source), pattern.getNode(conn.target), conn)
} else if (root.equals(conn.target)) {
EdgePattern(pattern.getNode(conn.target), pattern.getNode(conn.source), conn.reverse)
} else {
throw SystemError(s"PatternMatchPlanner error, root=${root}, conn=${conn}")
}
}
private def getSrcAndDst(

View File

@ -127,7 +127,7 @@ object Validator extends Logging {
field match {
case node: IRNode =>
if (!types.contains(node.name) || (types(node.name).isEmpty && node.fields.nonEmpty)) {
throw SchemaException(s"Cannot find $node.name in $types")
throw SchemaException(s"Cannot find ${node.name} in $types")
}
varMap.put(
node.name,
@ -136,7 +136,7 @@ object Validator extends Logging {
node.fields.map(graph.graphSchema.getNodeField(types(node.name), _)).toSet))
case edge: IREdge =>
if (types(edge.name).isEmpty && edge.fields.nonEmpty) {
throw SchemaException(s"Cannot find $edge.name in $types")
throw SchemaException(s"Cannot find ${edge.name} in $types")
}
varMap.put(
edge.name,

View File

@ -15,10 +15,7 @@ package com.antgroup.openspg.reasoner.lube.logical.validate.semantic
import com.antgroup.openspg.reasoner.lube.block.Block
import com.antgroup.openspg.reasoner.lube.logical.planning.LogicalPlannerContext
import com.antgroup.openspg.reasoner.lube.logical.validate.semantic.rules.{
ConceptExplain,
SpatioTemporalExplain
}
import com.antgroup.openspg.reasoner.lube.logical.validate.semantic.rules.{ConceptExplain, NodeIdTransform, SpatioTemporalExplain}
object SemanticExplainer {

View File

@ -0,0 +1,135 @@
/*
* Copyright 2023 OpenSPG Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied.
*/
package com.antgroup.openspg.reasoner.lube.logical.validate.semantic.rules
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import com.antgroup.openspg.reasoner.common.constants.Constants
import com.antgroup.openspg.reasoner.lube.block.{Block, MatchBlock, OrderedFields, TableResultBlock}
import com.antgroup.openspg.reasoner.lube.catalog.{Catalog, SemanticPropertyGraph}
import com.antgroup.openspg.reasoner.lube.common.graph.{IRField, IRProperty}
import com.antgroup.openspg.reasoner.lube.common.pattern.{GraphPattern, VariablePatternConnection}
import com.antgroup.openspg.reasoner.lube.logical.planning.LogicalPlannerContext
import com.antgroup.openspg.reasoner.lube.logical.validate.semantic.Explain
object NodeIdTransform extends Explain {
/**
* Rewrite [[Block]] tree TopDown
*
* @param context
* @return
*/
override def explain(implicit context: LogicalPlannerContext): PartialFunction[Block, Block] = {
case tableResult: TableResultBlock =>
tableResultTransform(tableResult, context.catalog.getGraph(Catalog.defaultGraphName))
}
private def tableResultTransform(
tableResultBlock: TableResultBlock,
graph: SemanticPropertyGraph): Block = {
val matchBlock = tableResultBlock.transform[MatchBlock] {
case (matchBlock: MatchBlock, _) => matchBlock
case (_, list) =>
if (list == null || list.isEmpty) {
null
} else {
list.head
}
}
val pattern = matchBlock.patterns.values.head.graphPattern
val props = tableResultBlock.selectList.orderedFields.groupBy(_.name)
val selects = new ListBuffer[IRField]
var needModResolve = false
val removed = new mutable.HashSet[String]()
for (select <- tableResultBlock.selectList.orderedFields) {
if (select.isInstanceOf[IRProperty] && props(select.name).size == 1 && select
.asInstanceOf[IRProperty]
.field
.equals(Constants.NODE_ID_KEY)) {
selects.append(transform(select.name, pattern, graph))
removed.add(select.name)
needModResolve = true
} else {
selects.append(select)
}
}
val newBlock = tableResultBlock.copy(selectList = OrderedFields(selects.toList))
if (needModResolve) {
newBlock.rewrite { case matchBlock @ MatchBlock(dependencies, patterns) =>
val props = new mutable.HashMap[String, Set[String]]()
for (prop <- pattern.properties) {
if (removed.contains(prop._1)) {
props.put(prop._1, Set.empty)
} else {
props.put(prop._1, prop._2)
}
}
val selectMap = selects.filter(_.isInstanceOf[IRProperty]).groupBy(_.name)
for (select <- selectMap) {
val propNames =
props(select._1) ++ (select._2.map(_.asInstanceOf[IRProperty].field).toSet)
props.put(select._1, propNames)
}
val newPattern = pattern.copy(properties = props.toMap)
val path = patterns.head._2.copy(graphPattern = newPattern)
MatchBlock(dependencies, Map.apply((patterns.head._1 -> path)))
}
} else {
newBlock
}
}
private def transform(
alias: String,
pattern: GraphPattern,
graph: SemanticPropertyGraph): IRProperty = {
var ir: IRProperty = null
if (pattern.edges.contains(alias)) {
val fixedEdges = pattern.edges(alias).filter(!_.isInstanceOf[VariablePatternConnection])
val varEdges = pattern
.edges(alias)
.filter(_.isInstanceOf[VariablePatternConnection])
.map(_.asInstanceOf[VariablePatternConnection])
.filter(_.upper == 1)
if (!fixedEdges.isEmpty) {
ir = IRProperty(fixedEdges.head.alias, Constants.EDGE_FROM_ID_KEY)
} else if (!varEdges.isEmpty) {
ir = IRProperty(varEdges.head.alias, Constants.EDGE_FROM_ID_KEY)
}
} else {
val inEdges = pattern.edges.values.flatten.filter(_.target.equals(alias))
val fixedEdges = inEdges.filter(!_.isInstanceOf[VariablePatternConnection])
val varEdges = inEdges
.filter(_.isInstanceOf[VariablePatternConnection])
.map(_.asInstanceOf[VariablePatternConnection])
.filter(_.upper == 1)
if (!fixedEdges.isEmpty) {
ir = IRProperty(fixedEdges.head.alias, Constants.EDGE_TO_ID_KEY)
} else if (!varEdges.isEmpty) {
ir = IRProperty(varEdges.head.alias, Constants.EDGE_TO_ID_KEY)
}
}
if (ir == null) {
IRProperty(alias, Constants.NODE_ID_KEY)
} else {
ir
}
}
}

View File

@ -13,9 +13,10 @@
package com.antgroup.openspg.reasoner.lube.logical
import com.antgroup.openspg.reasoner.common.constants.Constants
import com.antgroup.openspg.reasoner.lube.catalog.impl.PropertyGraphCatalog
import com.antgroup.openspg.reasoner.lube.logical.LogicalOperatorOps.RichLogicalOperator
import com.antgroup.openspg.reasoner.lube.logical.operators.{Aggregate, BoundedVarLenExpand, PatternScan, Project, Start}
import com.antgroup.openspg.reasoner.lube.logical.operators._
import com.antgroup.openspg.reasoner.lube.logical.optimizer.LogicalOptimizer
import com.antgroup.openspg.reasoner.lube.logical.planning.{LogicalPlanner, LogicalPlannerContext}
import com.antgroup.openspg.reasoner.lube.logical.validate.Validator
@ -268,7 +269,9 @@ class LogicalPlannerTests extends AnyFunSpec {
val schema: Map[String, Set[String]] = Map.apply(
"TuringCrowd" -> Set.empty,
"User" -> Set.apply("id"),
"User_belongTo_TuringCrowd" -> Set.empty)
"User_belongTo_TuringCrowd" -> Set.apply(
Constants.EDGE_FROM_ID_KEY,
Constants.EDGE_TO_ID_KEY))
val catalog = new PropertyGraphCatalog(schema)
catalog.init()
implicit val context: LogicalPlannerContext =

View File

@ -0,0 +1,187 @@
/*
* Copyright 2023 OpenSPG Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied.
*/
package com.antgroup.openspg.reasoner.lube.logical.optimizer
import com.antgroup.openspg.reasoner.common.constants.Constants
import com.antgroup.openspg.reasoner.lube.catalog.impl.PropertyGraphCatalog
import com.antgroup.openspg.reasoner.lube.logical.LogicalOperatorOps.RichLogicalOperator
import com.antgroup.openspg.reasoner.lube.logical.PropertyVar
import com.antgroup.openspg.reasoner.lube.logical.operators._
import com.antgroup.openspg.reasoner.lube.logical.optimizer.rules._
import com.antgroup.openspg.reasoner.lube.logical.planning.{LogicalPlanner, LogicalPlannerContext}
import com.antgroup.openspg.reasoner.lube.logical.validate.Validator
import com.antgroup.openspg.reasoner.lube.utils.transformer.impl.Expr2QlexpressTransformer
import com.antgroup.openspg.reasoner.parser.OpenSPGDslParser
import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, equal}
class NodeIdToEdgePropertyTests extends AnyFunSpec {
it("test select nodeId should not to edge property") {
val dsl =
"""
|GraphStructure {
| (A:User)-[e1:lk]->(B:User)-[e2:lk]->(C:User)
|}
|Rule {
| R1(""): e1.weight < e2.weight
| R2(""): C.height > 170
|}
|Action {
| get(A.id, B.name, C.name)
|}
|""".stripMargin
val parser = new OpenSPGDslParser()
val block = parser.parse(dsl)
val schema: Map[String, Set[String]] =
Map.apply(
"User" -> Set.apply("id", "name", "age", "height", "weight"),
"User_lk_User" -> Set.apply("weight"))
val catalog = new PropertyGraphCatalog(schema)
catalog.init()
implicit val context: LogicalPlannerContext =
LogicalPlannerContext(
catalog,
parser,
Map
.apply(
Constants.SPG_REASONER_MULTI_VERSION_ENABLE -> true,
Constants.START_ALIAS -> "A")
.asInstanceOf[Map[String, Object]])
val dag = Validator.validate(List.apply(block))
val logicalPlan = LogicalPlanner.plan(dag).popRoot()
val rule = Seq(NodeIdToEdgeProperty, FilterPushDown, ExpandIntoPure, SolvedModelPure)
val optimizedLogicalPlan = LogicalOptimizer.optimize(logicalPlan, rule)
optimizedLogicalPlan.findExactlyOne { case select: Select =>
select.fields.map(f => s"${f.name}.${f.asInstanceOf[PropertyVar].field.name}") should equal(
List.apply("A.id", "B.name", "C.name"))
}
val cnt = logicalPlan.transform[Int] {
case (scan: PatternScan, cnt) => cnt.sum + 1
case (scan: ExpandInto, cnt) => cnt.sum + 1
case (_, cnt) =>
if (cnt.isEmpty) {
0
} else {
cnt.sum
}
}
cnt should equal(3)
}
it("test nodeId to edge property for select") {
val dsl =
"""
|GraphStructure {
| (s:Pkg)-[p:target]->(o:User)
|}
|Rule {
|
|}
|Action {
| get(s.id,o.id)
|}
|""".stripMargin
val parser = new OpenSPGDslParser()
val block = parser.parse(dsl)
val schema: Map[String, Set[String]] =
Map.apply(
"Pkg" -> Set.apply("id"),
"User" -> Set.apply("id"),
"Pkg_target_User" -> Set.empty)
val catalog = new PropertyGraphCatalog(schema)
catalog.init()
implicit val context: LogicalPlannerContext =
LogicalPlannerContext(
catalog,
parser,
Map
.apply(Constants.SPG_REASONER_MULTI_VERSION_ENABLE -> true, Constants.START_ALIAS -> "s")
.asInstanceOf[Map[String, Object]])
val dag = Validator.validate(List.apply(block))
val logicalPlan = LogicalPlanner.plan(dag).popRoot()
val rule = Seq(NodeIdToEdgeProperty, FilterPushDown, ExpandIntoPure, SolvedModelPure)
val optimizedLogicalPlan = LogicalOptimizer.optimize(logicalPlan, rule)
optimizedLogicalPlan.findExactlyOne { case select: Select =>
select.fields.map(f => s"${f.name}.${f.asInstanceOf[PropertyVar].field.name}") should equal(
List.apply("s.id", s"p.${Constants.EDGE_TO_ID_KEY}"))
}
val cnt = optimizedLogicalPlan.transform[Int] {
case (scan: PatternScan, cnt) => cnt.sum + 1
case (scan: ExpandInto, cnt) => cnt.sum + 1
case (_, cnt) =>
if (cnt.isEmpty) {
0
} else {
cnt.sum
}
}
cnt should equal(1)
}
it("test nodeId to edge property for select with filter") {
val dsl =
"""
|GraphStructure {
| (A:User)-[e1:lk]->(B:User)-[e2:lk]->(C:User)
|}
|Rule {
| R1(""): e1.weight < e2.weight
| R2(""): C.id in ['123456789']
|}
|Action {
| get(A.id, B.id, C.id)
|}
|""".stripMargin
val parser = new OpenSPGDslParser()
val block = parser.parse(dsl)
val schema: Map[String, Set[String]] =
Map.apply(
"User" -> Set.apply("id", "name", "age", "height", "weight"),
"User_lk_User" -> Set.apply("weight"))
val catalog = new PropertyGraphCatalog(schema)
catalog.init()
implicit val context: LogicalPlannerContext =
LogicalPlannerContext(
catalog,
parser,
Map
.apply(Constants.SPG_REASONER_MULTI_VERSION_ENABLE -> true)
.asInstanceOf[Map[String, Object]])
val dag = Validator.validate(List.apply(block))
val logicalPlan = LogicalPlanner.plan(dag).popRoot()
val rule = Seq(NodeIdToEdgeProperty, FilterPushDown, ExpandIntoPure, SolvedModelPure)
val optimizedLogicalPlan = LogicalOptimizer.optimize(logicalPlan, rule)
optimizedLogicalPlan.findExactlyOne { case select: Select =>
select.fields.map(f => s"${f.name}.${f.asInstanceOf[PropertyVar].field.name}") should equal(
List.apply(s"e1.${Constants.EDGE_FROM_ID_KEY}", "B.id", s"e2.${Constants.EDGE_TO_ID_KEY}"))
}
val cnt = optimizedLogicalPlan.transform[Int] {
case (scan: PatternScan, cnt) =>
val rule = scan.pattern.topology.values.flatten.filter(_.rule != null).head.rule
val qlTransformer = new Expr2QlexpressTransformer()
qlTransformer.transform(rule).head should equal(
s"""e2.${Constants.EDGE_TO_ID_KEY} in ["123456789"]""")
cnt.sum + 1
case (scan: ExpandInto, cnt) => cnt.sum + 1
case (_, cnt) =>
if (cnt.isEmpty) {
0
} else {
cnt.sum
}
}
cnt should equal(1)
}
}

View File

@ -965,7 +965,7 @@ public class TransitiveOptionalTest {
+ " A [TestFinParty.RelatedParty, __start__='true']\n"
+ " B, C [TestFinParty.RelatedParty]\n"
+ "// 1.17的B 必须存在\n"
+ " C->B [votingRatio] repeat(0,20) as F1\n"
+ " C->B [votingRatio] repeat(0,2) as F1\n"
+ "// 1.19的C D 可以不存在\n"
+ " B->A [votingRatio, __optional__='true'] as F2\n"
+ "}\n"

View File

@ -378,6 +378,20 @@ public class RunnerUtil {
}
vertexList.add(targetVertex);
edgeProperty.put("nodes", vertexList);
if (CollectionUtils.isNotEmpty(edge.getEdgeList())) {
Object fromIdObj = edge.getEdgeList().get(0).getValue().get(Constants.EDGE_FROM_ID_KEY);
if (null != fromIdObj) {
edgeProperty.put(Constants.EDGE_FROM_ID_KEY, fromIdObj);
}
Object toIdObj =
edge.getEdgeList()
.get(edge.getEdgeList().size() - 1)
.getValue()
.get(Constants.EDGE_TO_ID_KEY);
if (null != toIdObj) {
edgeProperty.put(Constants.EDGE_TO_ID_KEY, toIdObj);
}
}
return edgeProperty;
}

View File

@ -351,23 +351,26 @@ object LoaderUtil {
}
case edgeVar: EdgeVar =>
for (typeName <- solvedModel.getTypes(field.name)) {
val edge = logicalPlan.graph.getEdge(typeName)
if (edge.resolved) {
val edgeTypeName = edge.startNode + "_" + edge.typeName + "_" + edge.endNode
val edgeLoaderConfig = new EdgeLoaderConfig()
edgeLoaderConfig.setEdgeType(edgeTypeName)
edgeLoaderConfig.setNeedProperties(
edgeVar.fields.filter(_.resolved).map(_.name).asJava)
edgeLoaderConfig.setConnection(
new util.HashSet(catalog.getConnection(edgeTypeName).asJava))
if (edgeLoaderConfigMap.contains(edgeTypeName)) {
val oldEdgeLoaderConfig = edgeLoaderConfigMap(edgeTypeName)
edgeLoaderConfig.merge(oldEdgeLoaderConfig)
val graph = logicalPlan.graph
if (graph.containsEdge(typeName)) {
val edge = graph.getEdge(typeName)
if (edge.resolved) {
val edgeTypeName = edge.startNode + "_" + edge.typeName + "_" + edge.endNode
val edgeLoaderConfig = new EdgeLoaderConfig()
edgeLoaderConfig.setEdgeType(edgeTypeName)
edgeLoaderConfig.setNeedProperties(
edgeVar.fields.filter(_.resolved).map(_.name).asJava)
edgeLoaderConfig.setConnection(
new util.HashSet(catalog.getConnection(edgeTypeName).asJava))
if (edgeLoaderConfigMap.contains(edgeTypeName)) {
val oldEdgeLoaderConfig = edgeLoaderConfigMap(edgeTypeName)
edgeLoaderConfig.merge(oldEdgeLoaderConfig)
}
edgeLoaderConfig.setLoadDirection(
edgeLoadDirectionMap.getOrElse(edgeTypeName, Direction.BOTH))
edgeLoaderConfig.addEndVertexAliasSet(edgeEndVertexAliasSet.get(edge.typeName))
edgeLoaderConfigMap.put(edgeTypeName, edgeLoaderConfig)
}
edgeLoaderConfig.setLoadDirection(
edgeLoadDirectionMap.getOrElse(edgeTypeName, Direction.BOTH))
edgeLoaderConfig.addEndVertexAliasSet(edgeEndVertexAliasSet.get(edge.typeName))
edgeLoaderConfigMap.put(edgeTypeName, edgeLoaderConfig)
}
}
case _ =>

View File

@ -200,7 +200,7 @@ class ReasonerSessionTests extends AnyFunSpec {
val physicalOpOrder = getPhysicalPlanOrder(subqueryPhysicalOp)
println(physicalOpOrder)
physicalOpOrder should equal(
"Start,Cache,DrivingRDG,PatternScan,LinkedExpand,ExpandInto,ExpandInto,Drop,Filter,Drop,DDL,Join,PatternScan,Cache,DrivingRDG,PatternScan,LinkedExpand,ExpandInto,ExpandInto,Drop,Filter,Drop,DDL,Join,ExpandInto,ExpandInto,Drop,Filter,Drop,Select")
"Start,Cache,DrivingRDG,PatternScan,LinkedExpand,ExpandInto,ExpandInto,Drop,Filter,Drop,DDL,Join,PatternScan,Cache,DrivingRDG,PatternScan,LinkedExpand,ExpandInto,ExpandInto,Drop,Filter,Drop,DDL,Join,ExpandInto,Drop,Filter,Drop,Select")
}
private def getPhysicalPlanOrder[T <: RDG[T]](physicalOperator: PhysicalOperator[T]) = {
@ -497,7 +497,7 @@ class ReasonerSessionTests extends AnyFunSpec {
cnt.sum
}
}
cnt should equal(6)
cnt should equal(5)
}
it("test concept check") {
@ -532,7 +532,7 @@ class ReasonerSessionTests extends AnyFunSpec {
cnt.sum
}
}
cnt should equal(2)
cnt should equal(1)
}
@ -575,7 +575,7 @@ class ReasonerSessionTests extends AnyFunSpec {
cnt.sum
}
}
cnt should equal(6)
cnt should equal(3)
}
@ -678,7 +678,7 @@ class ReasonerSessionTests extends AnyFunSpec {
cnt.sum
}
}
cnt should equal(2)
cnt should equal(1)
}
it("test id push down 4") {
@ -760,8 +760,8 @@ class ReasonerSessionTests extends AnyFunSpec {
.asInstanceOf[Map[String, Object]])
} catch {
case ex: SchemaException =>
println(ex.getMessage)
ex.getMessage.contains("Cannot find p3 types in") should equal(true)
ex.printStackTrace()
ex.getMessage.contains("Cannot find p3") should equal(true)
}
}
@ -799,8 +799,8 @@ class ReasonerSessionTests extends AnyFunSpec {
.asInstanceOf[Map[String, Object]])
} catch {
case ex: SchemaException =>
println(ex.getMessage)
ex.getMessage.contains("Cannot find IRNode(D,Set()).name") should equal(true)
ex.printStackTrace()
ex.getMessage.contains("Cannot find D") should equal(true)
}
}
}