feat(reasoner): optimize the performance of the planner (#182)

This commit is contained in:
FishJoy 2024-03-27 17:27:02 +08:00 committed by GitHub
parent 8c8074f92f
commit a0906f61ba
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
44 changed files with 390 additions and 231 deletions

View File

@ -18,7 +18,6 @@ import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
import scala.reflect.runtime.currentMirror
import scala.reflect.runtime.universe._
import scala.util.hashing.MurmurHash3
import cats.data.NonEmptyList
@ -65,103 +64,7 @@ abstract class AbstractTreeNode[T <: AbstractTreeNode[T]: TypeTag]
}
}
def withNewChildren(newChildren: Array[T]): T = {
if (sameAsCurrentChildren(newChildren)) {
self
} else {
val copyMethod = AbstractTreeNode.copyMethod(self)
val copyMethodParamTypes = copyMethod.symbol.paramLists.flatten.zipWithIndex
val valueAndTypeTuples = copyMethodParamTypes.map { case (param, index) =>
val value = if (index < productArity) {
// Access product element to retrieve the value
productElement(index)
} else {
typeOf[T] // Workaround to get implicit tag without reflection
}
value -> param.typeSignature
}
val updatedConstructorParams = updateConstructorParams(newChildren, valueAndTypeTuples)
try {
copyMethod(updatedConstructorParams: _*).asInstanceOf[T]
} catch {
case e: Exception =>
throw InvalidConstructorArgument(
s"""|Expected valid constructor arguments for $productPrefix
|Old children: ${children.mkString(", ")}
|New children: ${newChildren.mkString(", ")}
|Current product: ${productIterator.mkString(", ")}
|Constructor arguments updated with new children: ${updatedConstructorParams
.mkString(", ")}.""".stripMargin,
Some(e))
}
}
}
@inline private final def updateConstructorParams(
newChildren: Array[T],
currentValuesAndTypes: List[(Any, Type)]): Array[Any] = {
// Returns true if `instance` could be an element of
// List/NonEmptyList/Option container type `tpe`
def couldBeElementOf(instance: Any, tpe: Type): Boolean = {
currentMirror.reflect(instance).symbol.toType <:< tpe.typeArgs.head
}
val (unassignedChildren, constructorParams) =
currentValuesAndTypes.foldLeft(newChildren.toList -> Vector.empty[Any]) {
case ((remainingChildren, currentConstructorParams), nextValueAndType) =>
nextValueAndType match {
case (c: T, _) =>
remainingChildren match {
case Nil =>
throw new IllegalArgumentException(
s"""|When updating with new children: Did not have a child left to assign to
| the child that was previously $c Inferred constructor
| parameters so far: ${getClass.getSimpleName}(${currentConstructorParams
.mkString(", ")}, ...)""".stripMargin)
case h :: t => t -> (currentConstructorParams :+ h)
}
case (_: Option[_], tpe) if tpe.typeArgs.head <:< typeOf[T] =>
val option: Option[T] = remainingChildren.headOption.filter { c =>
couldBeElementOf(c, tpe)
}
remainingChildren.drop(option.size) -> (currentConstructorParams :+ option)
case (_: List[_], tpe) if tpe.typeArgs.head <:< typeOf[T] =>
val childrenList: List[T] = remainingChildren.takeWhile { c =>
couldBeElementOf(c, tpe)
}
remainingChildren.drop(
childrenList.size) -> (currentConstructorParams :+ childrenList)
case (_: NonEmptyList[_], tpe) if tpe.typeArgs.head <:< typeOf[T] =>
val childrenList = NonEmptyList.fromListUnsafe(remainingChildren.takeWhile { c =>
couldBeElementOf(c, tpe)
})
remainingChildren.drop(
childrenList.size) -> (currentConstructorParams :+ childrenList)
case (value, _) =>
remainingChildren -> (currentConstructorParams :+ value)
}
}
if (unassignedChildren.nonEmpty) {
throw new IllegalArgumentException(s"""|Could not assign children [${unassignedChildren
.mkString(", ")}] to parameters of ${getClass.getSimpleName}
|Inferred constructor parameters: ${getClass.getSimpleName}(${constructorParams
.mkString(", ")})""".stripMargin)
}
constructorParams.toArray
}
@inline private final def sameAsCurrentChildren(newChildren: Array[T]): Boolean = {
val childrenLength = children.length
if (childrenLength != newChildren.length) {
false
} else {
var i = 0
while (i < childrenLength && children(i) == newChildren(i)) i += 1
i == childrenLength
}
}
def withNewChildren(newChildren: Array[T]): T
override def foreach[O](f: T => O): Unit = transform[O] { case (node, _) =>
f(node)
@ -216,26 +119,7 @@ abstract class AbstractTreeNode[T <: AbstractTreeNode[T]: TypeTag]
lines.mkString("\n")
}
def children: Array[T] = {
if (productIterator.isEmpty) {
Array.empty[T]
} else {
val copyMethod = AbstractTreeNode.copyMethod(self)
lazy val treeType = typeOf[T].erasure
lazy val paramTypes: Seq[Type] =
copyMethod.symbol.paramLists.head.map(_.typeSignature).toIndexedSeq
productIterator.toArray.zipWithIndex.flatMap {
case (t: T, _) => Some(t)
case (o: Option[_], i) if paramTypes(i).typeArgs.head <:< treeType =>
o.asInstanceOf[Option[T]]
case (l: List[_], i) if paramTypes(i).typeArgs.head <:< treeType =>
l.asInstanceOf[List[T]]
case (nel: NonEmptyList[_], i) if paramTypes(i).typeArgs.head <:< treeType =>
nel.toList.asInstanceOf[List[T]]
case _ => Nil
}
}
}
def children: Array[T]
override def toString: String = s"${getClass.getSimpleName}${if (args.isEmpty) ""
else s"(${args.mkString(", ")})"}"
@ -293,46 +177,3 @@ abstract class AbstractTreeNode[T <: AbstractTreeNode[T]: TypeTag]
def argString: String = args.mkString(", ")
}
/**
* Caches an instance of the copy method per case class type.
*/
object AbstractTreeNode {
import scala.reflect.runtime.universe
import scala.reflect.runtime.universe._
private final lazy val mirror = universe.runtimeMirror(getClass.getClassLoader)
// No synchronization required: No problem if a cache entry is lost due to a concurrent write.
@volatile private var cachedCopyMethods = Map.empty[Class[_], MethodMirror]
@inline protected final def copyMethod(instance: AbstractTreeNode[_]): MethodMirror = {
val instanceClass = instance.getClass
cachedCopyMethods.getOrElse(
instanceClass, {
val copyMethod = reflectCopyMethod(instance)
cachedCopyMethods = cachedCopyMethods.updated(instanceClass, copyMethod)
copyMethod
})
}
@inline private final def reflectCopyMethod(instance: Object): MethodMirror = {
try {
val instanceMirror = mirror.reflect(instance)
val tpe = instanceMirror.symbol.asType.toType
val copyMethodSymbol = tpe.decl(TermName("copy")).asMethod
instanceMirror.reflectMethod(copyMethodSymbol)
} catch {
case e: Exception =>
throw new UnsupportedOperationException(
s"Could not reflect the copy method of ${instance.toString.filterNot(_ == '$')}",
e)
}
}
}
case class InvalidConstructorArgument(
message: String,
originalException: Option[Exception] = None)
extends RuntimeException(message, originalException.orNull)

View File

@ -13,6 +13,7 @@
package com.antgroup.opensog.reasoner.common.trees
import com.antgroup.openspg.reasoner.common.exception.UnsupportedOperationException
import com.antgroup.openspg.reasoner.common.trees.{AbstractTreeNode, BottomUpWithContext, TopDownWithContext}
import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, equal}
@ -48,6 +49,11 @@ class AbstractTreeNodeTest extends AnyFunSpec {
case class Number(v: Int) extends CalcExpr {
def eval: Int = v
override def withNewChildren(newChildren: Array[CalcExpr]): CalcExpr =
throw UnsupportedOperationException("unsupported")
override def children: Array[CalcExpr] = Array.empty
}
abstract class CalcExpr extends AbstractTreeNode[CalcExpr] {
@ -56,5 +62,9 @@ abstract class CalcExpr extends AbstractTreeNode[CalcExpr] {
case class Add(left: CalcExpr, right: CalcExpr) extends CalcExpr {
def eval: Int = left.eval + right.eval
}
override def children: Array[CalcExpr] = Array.apply(left, right)
override def withNewChildren(newChildren: Array[CalcExpr]): CalcExpr = {
Add(newChildren.apply(0), newChildren.apply(1))
}
}

View File

@ -31,6 +31,10 @@ final case class AggregationBlock(
Fields(fields.toList)
}
override def withNewChildren(newChildren: Array[Block]): Block = {
this.copy(dependencies = newChildren.toList)
}
}
final case class Aggregations(pairs: Map[IRField, Aggregator]) extends Binds {

View File

@ -43,6 +43,8 @@ abstract class Block extends AbstractTreeNode[Block] {
* @return
*/
def graph: IRGraph
override def children: Array[Block] = dependencies.toArray
}
final case class BlockType(name: String)

View File

@ -29,4 +29,8 @@ final case class FilterBlock(dependencies: List[Block], rules: Rule)
dependencies.head.binds
}
override def withNewChildren(newChildren: Array[Block]): Block = {
this.copy(dependencies = newChildren.toList)
}
}

View File

@ -43,6 +43,10 @@ final case class MatchBlock(
Fields(nodes.++(edges))
}
override def withNewChildren(newChildren: Array[Block]): Block = {
this.copy(dependencies = newChildren.toList)
}
private def edgeToIRField(edge: Connection, props: Map[String, Set[String]]) = {
edge match {
case connection: VariablePatternConnection =>

View File

@ -23,6 +23,10 @@ final case class OrderAndSliceBlock(
group: List[String])
extends BasicBlock[Binds](BlockType("order-and-slice")) {
override def binds: Binds = dependencies.head.binds
override def withNewChildren(newChildren: Array[Block]): Block = {
this.copy(dependencies = newChildren.toList)
}
}
sealed trait SortItem {

View File

@ -36,6 +36,10 @@ final case class ProjectBlock(
Fields(fields.toList)
}
override def withNewChildren(newChildren: Array[Block]): Block = {
this.copy(dependencies = newChildren.toList)
}
}
/**

View File

@ -16,11 +16,7 @@ package com.antgroup.openspg.reasoner.lube.block
import com.antgroup.openspg.reasoner.common.types.KgType
import com.antgroup.openspg.reasoner.lube.common.expr.Expr
import com.antgroup.openspg.reasoner.lube.common.graph._
import com.antgroup.openspg.reasoner.lube.common.pattern.{
Element,
PatternElement,
PredicateElement
}
import com.antgroup.openspg.reasoner.lube.common.pattern.{Element, PatternElement, PredicateElement}
/**
* every operator block tree of root is result block
@ -46,6 +42,11 @@ final case class TableResultBlock(
* @return
*/
override def binds: OrderedFields = selectList
override def withNewChildren(newChildren: Array[Block]): Block = {
this.copy(dependencies = newChildren.toList)
}
}
/**
@ -57,6 +58,11 @@ final case class TableResultBlock(
final case class GraphResultBlock(dependencies: List[Block], outputGraphPath: List[String])
extends ResultBlock[Binds] {
override val binds: Binds = dependencies.head.binds
override def withNewChildren(newChildren: Array[Block]): Block = {
this.copy(dependencies = newChildren.toList)
}
}
/**
@ -101,6 +107,9 @@ case class DDLBlock(ddlOp: Set[DDLOp], dependencies: List[Block]) extends Result
*/
override def binds: Fields = Fields.empty
override def withNewChildren(newChildren: Array[Block]): Block =
this.copy(dependencies = newChildren.toList)
}
final case class OrderedFields(orderedFields: List[IRField] = List.empty) extends Binds {

View File

@ -13,10 +13,15 @@
package com.antgroup.openspg.reasoner.lube.block
import com.antgroup.openspg.reasoner.common.exception.UnsupportedOperationException
import com.antgroup.openspg.reasoner.lube.common.graph.IRGraph
case class SourceBlock(override val graph: IRGraph)
extends BasicBlock[Binds](BlockType("source")) {
override val dependencies: List[Block] = List.empty
override val binds: Binds = Binds.empty
override def withNewChildren(newChildren: Array[Block]): Block =
throw UnsupportedOperationException("cannot support")
}

View File

@ -13,6 +13,7 @@
package com.antgroup.openspg.reasoner.lube.common.expr
import com.antgroup.openspg.reasoner.common.exception.UnsupportedOperationException
import com.antgroup.openspg.reasoner.common.trees.AbstractTreeNode
import com.antgroup.openspg.reasoner.common.types.KgType
@ -21,13 +22,38 @@ import com.antgroup.openspg.reasoner.common.types.KgType
*/
sealed abstract class Expr extends AbstractTreeNode[Expr] {}
final case class OpChainExpr(curExpr: Expr, preChainExpr: OpChainExpr) extends Expr {}
final case class OpChainExpr(curExpr: Expr, preChainExpr: OpChainExpr) extends Expr {
override def withNewChildren(newChildren: Array[Expr]): Expr = {
if (newChildren.length == 1) {
OpChainExpr(newChildren.apply(0), null)
} else {
OpChainExpr(newChildren.apply(0), newChildren.apply(1).asInstanceOf[OpChainExpr])
}
}
override def children: Array[Expr] = {
if (preChainExpr != null) {
Array.apply(curExpr, preChainExpr)
} else {
Array.apply(curExpr)
}
}
}
/**
* Strongly Typed Computational Expressions
*/
sealed trait TypeValidatedExpr extends Expr {}
sealed trait TypeValidatedExpr extends Expr
case object Directly extends Expr
case object Directly extends Expr {
override def withNewChildren(newChildren: Array[Expr]): Expr =
throw UnsupportedOperationException("unsupported")
override def children: Array[Expr] = Array.empty
}
/**
* Aggregation operator in calculation expression
@ -47,12 +73,18 @@ sealed trait AggregatorOpSet
/**
* Constant, such as '123', 'abcd'
*/
sealed trait VConstant extends Expr
sealed trait VConstant extends Expr {
override def withNewChildren(newChildren: Array[Expr]): Expr =
throw UnsupportedOperationException("unsupported")
override def children: Array[Expr] = Array.empty
}
/**
* String-to-basic-type conversion operators
*/
case object VNull extends Expr
case object VNull extends VConstant
final case class VString(value: String) extends VConstant
final case class VLong(value: String) extends VConstant
@ -64,7 +96,13 @@ final case class VBoolean(value: String) extends VConstant
* @param list
* @param listType
*/
final case class VList(list: List[String], listType: KgType) extends Expr {}
final case class VList(list: List[String], listType: KgType) extends Expr {
override def withNewChildren(newChildren: Array[Expr]): Expr =
throw UnsupportedOperationException("unsupported")
override def children: Array[Expr] = Array.empty
}
/**
* aggregator operator set
@ -77,16 +115,19 @@ case object Count extends AggregatorOpSet {}
case object First extends AggregatorOpSet
final case class AggUdf(name: String, funcArgs: List[Expr])
extends AggregatorOpSet {}
final case class AggUdf(name: String, funcArgs: List[Expr]) extends AggregatorOpSet {}
final case class StrJoin(tok: String) extends AggregatorOpSet {}
final case class Accumulate(op: String) extends AggregatorOpSet {}
final case class Get(index: Integer) extends ListOpSet {}
final case class Limit(column: Expr, num: Integer) extends TypeValidatedExpr {}
final case class Limit(column: Expr, num: Integer) extends TypeValidatedExpr {
override def withNewChildren(newChildren: Array[Expr]): Expr = Limit(newChildren.head, num)
override def children: Array[Expr] = Array.apply(column)
}
/**
* list object operator set
@ -101,10 +142,8 @@ final case class Slice(start: Integer, end: Integer) extends ListOpSet {}
* @param reduceFunc
* @param initValue
*/
final case class Reduce(ele: String,
res: String,
reduceFunc: Expr,
initValue: Expr) extends ListOpSet {}
final case class Reduce(ele: String, res: String, reduceFunc: Expr, initValue: Expr)
extends ListOpSet {}
/**
* list rule constraint operator
@ -112,15 +151,20 @@ final case class Reduce(ele: String,
* @param cur
* @param reduceFunc
*/
final case class Constraint(pre: String,
cur: String,
reduceFunc: Expr) extends ListOpSet {}
final case class Constraint(pre: String, cur: String, reduceFunc: Expr) extends ListOpSet {}
/**
* list operator expr paradigm
* @param name
* @param opInput
*/
final case class ListOpExpr(name: ListOpSet, opInput: Ref) extends TypeValidatedExpr {}
final case class ListOpExpr(name: ListOpSet, opInput: Ref) extends TypeValidatedExpr {
override def withNewChildren(newChildren: Array[Expr]): Expr =
this.copy(opInput = newChildren.head.asInstanceOf[Ref])
override def children: Array[Expr] = Array.apply(opInput)
}
/**
* order operator set
@ -134,23 +178,41 @@ case object AscExpr extends OrderOpSet {}
* @param order
* @param limit
*/
final case class OrderAndLimit(order: OrderOpSet, limit: Limit) extends TypeValidatedExpr {}
final case class OrderAndLimit(order: OrderOpSet, limit: Limit) extends TypeValidatedExpr {
override def withNewChildren(newChildren: Array[Expr]): Expr =
OrderAndLimit(order, newChildren.head.asInstanceOf[Limit])
override def children: Array[Expr] = Array.empty
}
/**
* filter operator
* @param condition
*/
final case class Filter(condition: Expr) extends TypeValidatedExpr {}
final case class Filter(condition: Expr) extends TypeValidatedExpr {
override def withNewChildren(newChildren: Array[Expr]): Expr = Filter(newChildren.head)
override def children: Array[Expr] = Array.apply(condition)
}
/**
* path operator set
*/
sealed trait PathOpSet
case object GetNodesExpr extends PathOpSet {}
case object GetEdgesExpr extends PathOpSet {}
/**
* path operator expr paradigm
*/
final case class PathOpExpr(name: PathOpSet, pathName: Ref) extends TypeValidatedExpr {}
final case class PathOpExpr(name: PathOpSet, pathName: Ref) extends TypeValidatedExpr {
override def withNewChildren(newChildren: Array[Expr]): Expr =
PathOpExpr(name, newChildren.head.asInstanceOf[Ref])
override def children: Array[Expr] = Array.apply(pathName)
}
/**
* aggregator operator expr paradigm
@ -158,17 +220,24 @@ final case class PathOpExpr(name: PathOpSet, pathName: Ref) extends TypeValidate
* @param name
* @param aggEleExpr
*/
case class AggOpExpr(name: AggregatorOpSet, aggEleExpr: Expr)
extends Aggregator {}
case class AggOpExpr(name: AggregatorOpSet, aggEleExpr: Expr) extends Aggregator {
override def withNewChildren(newChildren: Array[Expr]): Expr = AggOpExpr(name, newChildren.head)
override def children: Array[Expr] = Array.apply(aggEleExpr)
}
/**
* aggregator operator expr paradigm with condition
* @param name
* @param aggEleExpr
*/
final case class AggIfOpExpr(aggOpExpr: AggOpExpr,
condition: Expr) extends Aggregator {}
final case class AggIfOpExpr(aggOpExpr: AggOpExpr, condition: Expr) extends Aggregator {
override def withNewChildren(newChildren: Array[Expr]): Expr =
AggIfOpExpr(newChildren.apply(0).asInstanceOf[AggOpExpr], newChildren.apply(1))
override def children: Array[Expr] = Array.apply(aggOpExpr, condition)
}
/**
* This operator retrieves the multi-version data for a given property
@ -176,7 +245,13 @@ final case class AggIfOpExpr(aggOpExpr: AggOpExpr,
* @param start start date expr of the interval
* @param end end date expr of the interval
*/
final case class Window(start: Expr, end: Expr) extends TypeValidatedExpr {}
final case class Window(start: Expr, end: Expr) extends TypeValidatedExpr {
override def withNewChildren(newChildren: Array[Expr]): Expr =
Window(newChildren.apply(0), newChildren.apply(1))
override def children: Array[Expr] = Array.apply(start, end)
}
/**
* graph aggregation operator expr paradigm
@ -185,14 +260,42 @@ final case class Window(start: Expr, end: Expr) extends TypeValidatedExpr {}
* @param expr
*/
final case class GraphAggregatorExpr(pathName: String, by: List[Expr], expr: Aggregator)
extends Aggregator {}
extends Aggregator {
override def withNewChildren(newChildren: Array[Expr]): Expr = {
newChildren.apply(newChildren.length - 1) match {
case agg: Aggregator =>
GraphAggregatorExpr(
pathName,
newChildren.slice(0, newChildren.size - 1).toList,
newChildren.apply(newChildren.size - 1).asInstanceOf[Aggregator])
case _ =>
GraphAggregatorExpr(pathName, newChildren.toList, null)
}
}
override def children: Array[Expr] = {
if (expr != null) {
by.toArray ++ expr
} else {
by.toArray
}
}
}
/**
* function operator expr paradigm
* @param name
* @param funcArgs
*/
final case class FunctionExpr(name: String, funcArgs: List[Expr]) extends TypeValidatedExpr {}
final case class FunctionExpr(name: String, funcArgs: List[Expr]) extends TypeValidatedExpr {
override def withNewChildren(newChildren: Array[Expr]): Expr =
FunctionExpr(name, newChildren.toList)
override def children: Array[Expr] = funcArgs.toArray
}
/**
* unary operator set
@ -217,7 +320,13 @@ case class Cast(castType: String) extends UnaryOpSet
* @param name
* @param arg
*/
final case class UnaryOpExpr(name: UnaryOpSet, arg: Expr) extends TypeValidatedExpr {}
final case class UnaryOpExpr(name: UnaryOpSet, arg: Expr) extends TypeValidatedExpr {
override def withNewChildren(newChildren: Array[Expr]): Expr =
UnaryOpExpr(name, newChildren.head)
override def children: Array[Expr] = Array.apply(arg)
}
/**
* binary operator set
@ -253,17 +362,34 @@ case object BMod extends BinaryOpSet
* @param l
* @param r
*/
final case class BinaryOpExpr(name: BinaryOpSet, l: Expr, r: Expr) extends TypeValidatedExpr {}
final case class BinaryOpExpr(name: BinaryOpSet, l: Expr, r: Expr) extends TypeValidatedExpr {
override def withNewChildren(newChildren: Array[Expr]): Expr =
BinaryOpExpr(name, newChildren.apply(0), newChildren.apply(1))
override def children: Array[Expr] = Array.apply(l, r)
}
/**
* Get variable value by variable name
* @param refName
*/
final case class Ref(refName: String) extends TypeValidatedExpr {}
final case class Ref(refName: String) extends TypeValidatedExpr {
override def withNewChildren(newChildren: Array[Expr]): Expr =
throw UnsupportedOperationException("unsupported")
override def children: Array[Expr] = Array.empty
}
/**
* Get parameter script by param name
* @param paramName
*/
final case class Parameter(paramName: String) extends TypeValidatedExpr {}
final case class Parameter(paramName: String) extends TypeValidatedExpr {
override def withNewChildren(newChildren: Array[Expr]): Expr =
throw UnsupportedOperationException("unsupported")
override def children: Array[Expr] = Array.empty
}

View File

@ -15,7 +15,7 @@ package com.antgroup.openspg.reasoner.lube.logical.operators
import scala.collection.mutable
import com.antgroup.openspg.reasoner.common.exception.InvalidRefVariable
import com.antgroup.openspg.reasoner.common.exception.{InvalidRefVariable, UnsupportedOperationException}
import com.antgroup.openspg.reasoner.lube.common.expr.Aggregator
import com.antgroup.openspg.reasoner.lube.common.graph.{IREdge, IRNode}
import com.antgroup.openspg.reasoner.lube.logical._
@ -82,4 +82,8 @@ final case class Aggregate(
fieldsMap.values.toList
}
override def withNewChildren(newChildren: Array[LogicalOperator]): LogicalOperator = {
this.copy(in = newChildren.head)
}
}

View File

@ -58,4 +58,7 @@ case class BoundedVarLenExpand(
varMap.values.toList
}
override def withNewChildren(newChildren: Array[LogicalOperator]): LogicalOperator = {
this.copy(lhs = newChildren.apply(0), rhs = newChildren.apply(1))
}
}

View File

@ -84,4 +84,8 @@ final case class DDL(in: LogicalOperator, ddlOp: Set[DDLOp])
}
override def solved: SolvedModel = in.solved.solve
override def withNewChildren(newChildren: Array[LogicalOperator]): LogicalOperator = {
this.copy(in = newChildren.head)
}
}

View File

@ -37,4 +37,8 @@ final case class ExpandInto(in: LogicalOperator, target: PatternElement, pattern
override def solved: SolvedModel = in.solved
override def refFields: List[Var] = pattern.toVar(solved, graph)
override def withNewChildren(newChildren: Array[LogicalOperator]): LogicalOperator = {
this.copy(in = newChildren.head)
}
}

View File

@ -49,4 +49,8 @@ final case class Filter(in: LogicalOperator, rule: Rule) extends StackingLogical
}
override def solved: SolvedModel = in.solved.solve
override def withNewChildren(newChildren: Array[LogicalOperator]): LogicalOperator = {
this.copy(in = newChildren.head)
}
}

View File

@ -41,4 +41,8 @@ final case class LinkedExpand(
* @return
*/
override def fields: List[Var] = in.fields ++ refFields
override def withNewChildren(newChildren: Array[LogicalOperator]): LogicalOperator = {
this.copy(in = newChildren.head)
}
}

View File

@ -13,6 +13,7 @@
package com.antgroup.openspg.reasoner.lube.logical.operators
import com.antgroup.openspg.reasoner.common.exception.UnsupportedOperationException
import com.antgroup.openspg.reasoner.common.trees.AbstractTreeNode
import com.antgroup.openspg.reasoner.lube.catalog.SemanticPropertyGraph
import com.antgroup.openspg.reasoner.lube.logical.{SolvedModel, Var}
@ -43,12 +44,20 @@ abstract class LogicalOperator extends AbstractTreeNode[LogicalOperator] {
def fields: List[Var]
}
abstract class LogicalLeafOperator extends LogicalOperator
abstract class LogicalLeafOperator extends LogicalOperator {
override def children: Array[LogicalOperator] = Array.empty
override def withNewChildren(newChildren: Array[LogicalOperator]): LogicalOperator = {
throw UnsupportedOperationException("LogicalLeafOperator cannot construct children")
}
}
abstract class StackingLogicalOperator extends LogicalOperator {
def in: LogicalOperator
override def graph: SemanticPropertyGraph = in.graph
override def children: Array[LogicalOperator] = Array.apply(in)
}
abstract class BinaryLogicalOperator extends LogicalOperator {
@ -57,6 +66,8 @@ abstract class BinaryLogicalOperator extends LogicalOperator {
def rhs: LogicalOperator
override def graph: SemanticPropertyGraph = rhs.graph
override def children: Array[LogicalOperator] = Array.apply(lhs, rhs)
}
trait EmptyFields extends LogicalOperator {

View File

@ -56,4 +56,8 @@ case class Optional(
varMap.values.toList
}
override def withNewChildren(newChildren: Array[LogicalOperator]): LogicalOperator = {
this.copy(lhs = newChildren.apply(0), rhs = newChildren.apply(1))
}
}

View File

@ -66,4 +66,8 @@ final case class OrderAndLimit(
override def fields: List[Var] = in.fields
override def solved: SolvedModel = in.solved.solve
override def withNewChildren(newChildren: Array[LogicalOperator]): LogicalOperator = {
this.copy(in = newChildren.head)
}
}

View File

@ -64,4 +64,8 @@ case class PatternJoin(
varMap.values.toList
}
override def withNewChildren(newChildren: Array[LogicalOperator]): LogicalOperator = {
this.copy(lhs = newChildren.apply(0), rhs = newChildren.apply(1))
}
}

View File

@ -25,4 +25,8 @@ final case class PatternScan(in: LogicalOperator, pattern: Pattern)
override def fields: List[Var] = pattern.toVar(solved, graph)
override def solved: SolvedModel = in.solved
override def withNewChildren(newChildren: Array[LogicalOperator]): LogicalOperator = {
this.copy(in = newChildren.head)
}
}

View File

@ -56,4 +56,8 @@ case class PatternUnion(
varMap.values.toList
}
override def withNewChildren(newChildren: Array[LogicalOperator]): LogicalOperator = {
this.copy(lhs = newChildren.apply(0), rhs = newChildren.apply(1))
}
}

View File

@ -78,4 +78,8 @@ final case class Project(in: LogicalOperator, expr: Map[Var, Expr], solved: Solv
fieldsMap.values.toList
}
override def withNewChildren(newChildren: Array[LogicalOperator]): LogicalOperator = {
this.copy(in = newChildren.head)
}
}

View File

@ -27,4 +27,8 @@ final case class Select(
}
override def solved: SolvedModel = in.solved.solve
override def withNewChildren(newChildren: Array[LogicalOperator]): LogicalOperator = {
this.copy(in = newChildren.head)
}
}

View File

@ -54,5 +54,9 @@ final case class SubQuery(lhs: LogicalOperator, rhs: LogicalOperator)
varMap.values.toList
}
override def withNewChildren(newChildren: Array[LogicalOperator]): LogicalOperator = {
this.copy(lhs = newChildren.apply(0), rhs = newChildren.apply(1))
}
}

View File

@ -431,17 +431,11 @@ class LogicalPlannerTests extends AnyFunSpec {
catalog.init()
implicit val context: LogicalPlannerContext =
LogicalPlannerContext(catalog, parser, Map.empty)
val logicalPlan = LogicalPlanner.plan(block).head
print(logicalPlan.pretty)
val cnt = logicalPlan.transform[Int] {
case (agg: BoundedVarLenExpand, cnt) => cnt.sum + 1
case (_, cnt) =>
if (cnt.isEmpty) {
0
} else {
cnt.sum
}
val start = System.nanoTime()
for (i <- 0 until 10000) {
val logicalPlan = LogicalPlanner.plan(block).head
LogicalOptimizer.optimize(logicalPlan.head)
}
cnt should equal(5)
println(s"cost=${System.nanoTime() - start}")
}
}

View File

@ -21,7 +21,7 @@ import com.antgroup.openspg.reasoner.lube.logical.Var
import com.antgroup.openspg.reasoner.lube.physical.rdg.RDG
final case class AddInto[T <: RDG[T]: TypeTag](in: PhysicalOperator[T], fields: Map[Var, Expr])
extends PhysicalOperator[T] {
extends StackingPhysicalOperator[T] {
override def rdg: T = in.rdg.addFields(fields)
override def meta: List[Var] = {
@ -37,4 +37,7 @@ final case class AddInto[T <: RDG[T]: TypeTag](in: PhysicalOperator[T], fields:
fieldMap.values.toList
}
override def withNewChildren(newChildren: Array[PhysicalOperator[T]]): PhysicalOperator[T] = {
this.copy(in = newChildren.head)
}
}

View File

@ -24,9 +24,12 @@ final case class Aggregate[T <: RDG[T]: TypeTag](
group: List[Var],
aggregations: Map[Var, Aggregator],
meta: List[Var])
extends PhysicalOperator[T] {
extends StackingPhysicalOperator[T] {
override def rdg: T =
in.rdg.groupBy(group, aggregations)
override def withNewChildren(newChildren: Array[PhysicalOperator[T]]): PhysicalOperator[T] = {
this.copy(in = newChildren.head)
}
}

View File

@ -21,7 +21,7 @@ import com.antgroup.openspg.reasoner.lube.logical.Var
import com.antgroup.openspg.reasoner.lube.physical.rdg.RDG
final case class Cache[T <: RDG[T]: TypeTag](in: PhysicalOperator[T])
extends PhysicalOperator[T] {
extends StackingPhysicalOperator[T] {
/**
* The output of the current operator
@ -49,4 +49,8 @@ final case class Cache[T <: RDG[T]: TypeTag](in: PhysicalOperator[T])
override def toString: String = {
s"Cache(RdgId=${cacheName})"
}
override def withNewChildren(newChildren: Array[PhysicalOperator[T]]): PhysicalOperator[T] = {
this.copy(in = newChildren.head)
}
}

View File

@ -21,7 +21,7 @@ import com.antgroup.openspg.reasoner.lube.logical.Var
import com.antgroup.openspg.reasoner.lube.physical.rdg.RDG
final case class DDL[T <: RDG[T]: TypeTag](in: PhysicalOperator[T], ddlOp: Set[DDLOp])
extends PhysicalOperator[T] {
extends StackingPhysicalOperator[T] {
override def rdg: T = {
val list = new mutable.ListBuffer[DDLOp]()
@ -49,4 +49,8 @@ final case class DDL[T <: RDG[T]: TypeTag](in: PhysicalOperator[T], ddlOp: Set[D
}
override def meta: List[Var] = List.empty
override def withNewChildren(newChildren: Array[PhysicalOperator[T]]): PhysicalOperator[T] = {
this.copy(in = newChildren.head)
}
}

View File

@ -20,7 +20,7 @@ import com.antgroup.openspg.reasoner.lube.logical.Var
import com.antgroup.openspg.reasoner.lube.physical.rdg.RDG
final case class Drop[T <: RDG[T]: TypeTag](in: PhysicalOperator[T], fields: Set[Var])
extends PhysicalOperator[T] {
extends StackingPhysicalOperator[T] {
override def rdg: T = in.rdg.dropFields(fields)
override def meta: List[Var] = {
val fieldMap = new mutable.HashMap[String, Var]()
@ -32,4 +32,8 @@ final case class Drop[T <: RDG[T]: TypeTag](in: PhysicalOperator[T], fields: Set
}
fieldMap.values.toList
}
override def withNewChildren(newChildren: Array[PhysicalOperator[T]]): PhysicalOperator[T] = {
this.copy(in = newChildren.head)
}
}

View File

@ -24,9 +24,13 @@ final case class ExpandInto[T <: RDG[T]: TypeTag](
target: PatternElement,
pattern: Pattern,
meta: List[Var])
extends PhysicalOperator[T] {
extends StackingPhysicalOperator[T] {
override def rdg: T =
in.rdg.expandInto(target, pattern)
override def withNewChildren(newChildren: Array[PhysicalOperator[T]]): PhysicalOperator[T] = {
this.copy(in = newChildren.head)
}
}

View File

@ -20,7 +20,11 @@ import com.antgroup.openspg.reasoner.lube.logical.Var
import com.antgroup.openspg.reasoner.lube.physical.rdg.RDG
final case class Filter[T <: RDG[T]: TypeTag](in: PhysicalOperator[T], expr: Rule)
extends PhysicalOperator[T] {
extends StackingPhysicalOperator[T] {
override def rdg: T = in.rdg.filter(expr)
override def meta: List[Var] = in.meta
override def withNewChildren(newChildren: Array[PhysicalOperator[T]]): PhysicalOperator[T] = {
this.copy(in = newChildren.head)
}
}

View File

@ -22,7 +22,7 @@ import com.antgroup.openspg.reasoner.lube.physical.rdg.RDG
final case class Fold[T <: RDG[T]: TypeTag](
in: PhysicalOperator[T],
foldMapping: List[(List[Var], RichVar)])
extends PhysicalOperator[T] {
extends StackingPhysicalOperator[T] {
override def rdg: T = in.rdg.fold(foldMapping)
@ -39,4 +39,8 @@ final case class Fold[T <: RDG[T]: TypeTag](
outMeta.appendAll(foldMapping.map(_._2))
outMeta.toList
}
override def withNewChildren(newChildren: Array[PhysicalOperator[T]]): PhysicalOperator[T] = {
this.copy(in = newChildren.head)
}
}

View File

@ -27,7 +27,7 @@ final case class Join[T <: RDG[T]: TypeTag](
joinType: JoinType,
lhsSchemaMapping: Map[Var, Var],
rhsSchemaMapping: Map[Var, Var])
extends PhysicalOperator[T] {
extends BinaryPhysicalOperator[T] {
/**
* The output of the current operator
@ -57,4 +57,8 @@ final case class Join[T <: RDG[T]: TypeTag](
}
varMap.values.toList
}
override def withNewChildren(newChildren: Array[PhysicalOperator[T]]): PhysicalOperator[T] = {
Union(newChildren.apply(0), newChildren.apply(1))
}
}

View File

@ -23,7 +23,11 @@ final case class LinkedExpand[T <: RDG[T]: TypeTag](
in: PhysicalOperator[T],
pattern: EdgePattern[LinkedPatternConnection],
meta: List[Var])
extends PhysicalOperator[T] {
extends StackingPhysicalOperator[T] {
override def rdg: T = in.rdg.linkedExpand(pattern)
override def withNewChildren(newChildren: Array[PhysicalOperator[T]]): PhysicalOperator[T] = {
this.copy(in = newChildren.head)
}
}

View File

@ -24,11 +24,15 @@ final case class OrderBy[T <: RDG[T]: TypeTag](
sortItem: Seq[SortItem],
group: List[Var],
limit: Option[Int])
extends PhysicalOperator[T] {
extends StackingPhysicalOperator[T] {
override def rdg: T = {
in.rdg.orderBy(group, sortItem.toList, limit.get)
}
override def meta: List[Var] = in.meta
override def withNewChildren(newChildren: Array[PhysicalOperator[T]]): PhysicalOperator[T] = {
this.copy(in = newChildren.head)
}
}

View File

@ -23,6 +23,10 @@ final case class PatternScan[T <: RDG[T]: TypeTag](
in: PhysicalOperator[T],
pattern: Pattern,
meta: List[Var])
extends PhysicalOperator[T] {
extends StackingPhysicalOperator[T] {
override def rdg: T = in.rdg.patternScan(pattern)
override def withNewChildren(newChildren: Array[PhysicalOperator[T]]): PhysicalOperator[T] = {
this.copy(in = newChildren.head)
}
}

View File

@ -15,6 +15,7 @@ package com.antgroup.openspg.reasoner.lube.physical.operators
import scala.reflect.runtime.universe.TypeTag
import com.antgroup.openspg.reasoner.common.exception.UnsupportedOperationException
import com.antgroup.openspg.reasoner.common.trees.AbstractTreeNode
import com.antgroup.openspg.reasoner.lube.logical.Var
import com.antgroup.openspg.reasoner.lube.physical.planning.PhysicalPlannerContext
@ -41,3 +42,31 @@ abstract class PhysicalOperator[T <: RDG[T]: TypeTag]
*/
def meta: List[Var]
}
abstract class PhysicalLeafOperator[T <: RDG[T]: TypeTag] extends PhysicalOperator[T] {
override def children: Array[PhysicalOperator[T]] = Array.empty
override def withNewChildren(newChildren: Array[PhysicalOperator[T]]): PhysicalOperator[T] = {
throw UnsupportedOperationException("LogicalLeafOperator cannot construct children")
}
}
abstract class StackingPhysicalOperator[T <: RDG[T]: TypeTag] extends PhysicalOperator[T] {
/**
* the input physical operator
* @return
*/
def in: PhysicalOperator[T]
override def children: Array[PhysicalOperator[T]] = Array.apply(in)
}
abstract class BinaryPhysicalOperator[T <: RDG[T]: TypeTag] extends PhysicalOperator[T] {
def lhs: PhysicalOperator[T]
def rhs: PhysicalOperator[T]
override def children: Array[PhysicalOperator[T]] = Array.apply(lhs, rhs)
}

View File

@ -23,7 +23,7 @@ final case class Select[T <: RDG[T]: TypeTag](
orderedFields: List[Var],
as: List[String],
distinct: Boolean)
extends PhysicalOperator[T] {
extends StackingPhysicalOperator[T] {
def row: Row[T] = {
var row = in.rdg.select(orderedFields, as)
@ -34,4 +34,8 @@ final case class Select[T <: RDG[T]: TypeTag](
}
override def meta: List[Var] = orderedFields
override def withNewChildren(newChildren: Array[PhysicalOperator[T]]): PhysicalOperator[T] = {
this.copy(in = newChildren.head)
}
}

View File

@ -24,7 +24,7 @@ final case class Start[T <: RDG[T]: TypeTag](
alias: String,
meta: List[Var],
types: Set[String])(implicit override val context: PhysicalPlannerContext[T])
extends PhysicalOperator[T] {
extends PhysicalLeafOperator[T] {
override def rdg: T = context.graphSession.getGraph(graphName).createRDG(alias, types)
}
@ -33,7 +33,7 @@ final case class DrivingRDG[T <: RDG[T]: TypeTag](
meta: List[Var],
alias: String,
workingRdgName: String)(implicit override val context: PhysicalPlannerContext[T])
extends PhysicalOperator[T] {
extends PhysicalLeafOperator[T] {
override def rdg: T = {
val workingRdg = context.graphSession.getWorkingRDG(workingRdgName)
context.graphSession.getGraph(graphName).createRDG(alias, workingRdg)

View File

@ -22,7 +22,7 @@ import com.antgroup.openspg.reasoner.lube.physical.rdg.RDG
final case class Unfold[T <: RDG[T]: TypeTag](
in: PhysicalOperator[T],
foldMapping: List[(RichVar, List[Var])])
extends PhysicalOperator[T] {
extends StackingPhysicalOperator[T] {
override def rdg: T = in.rdg.unfold(foldMapping)
@ -47,4 +47,8 @@ final case class Unfold[T <: RDG[T]: TypeTag](
}
outMeta.toList
}
override def withNewChildren(newChildren: Array[PhysicalOperator[T]]): PhysicalOperator[T] = {
this.copy(in = newChildren.head)
}
}

View File

@ -19,10 +19,8 @@ import scala.reflect.runtime.universe.TypeTag
import com.antgroup.openspg.reasoner.lube.logical.Var
import com.antgroup.openspg.reasoner.lube.physical.rdg.RDG
final case class Union[T <: RDG[T]: TypeTag](
lhs: PhysicalOperator[T],
rhs: PhysicalOperator[T])
extends PhysicalOperator[T] {
final case class Union[T <: RDG[T]: TypeTag](lhs: PhysicalOperator[T], rhs: PhysicalOperator[T])
extends BinaryPhysicalOperator[T] {
/**
* The output of the current operator
@ -52,4 +50,9 @@ final case class Union[T <: RDG[T]: TypeTag](
}
varMap.values.toList
}
override def withNewChildren(newChildren: Array[PhysicalOperator[T]]): PhysicalOperator[T] = {
Union(newChildren.apply(0), newChildren.apply(1))
}
}