Merge branch 'master' into rac_udf_senyu

This commit is contained in:
wenchengyao 2024-04-17 11:12:06 +08:00
commit 1aa6f3b617
29 changed files with 700 additions and 52 deletions

View File

@ -53,7 +53,8 @@ OpenSPG Core Capabilities:
## Advanced tutorials
* [OpenSPG User Guide](https://openspg.yuque.com/ndx6g9/ns5nw2)
* [OpenSPG User Guide](https://openspg.yuque.com/ndx6g9/ps5q6b)
* [OneKE User Guide](https://openspg.yuque.com/ndx6g9/ps5q6b/vfoi61ks3mqwygvy)
# How to contribute

View File

@ -52,7 +52,8 @@ OpenSPG核心能力模型包括
## 进阶教程
* [用户手册](https://openspg.yuque.com/ndx6g9/ooil9x)
* [OpenSPG用户手册](https://openspg.yuque.com/ndx6g9/nmwkzz)
* [OneKE用户手册](https://openspg.yuque.com/ndx6g9/nmwkzz/dht0wtgycuw032gd)
# 如何贡献代码

View File

@ -139,9 +139,6 @@ class OpenSPGDslParser extends ParserInterface {
parseBaseRuleDefine(ctx.base_rule_define(), ddlBlockWithNodes._2, ddlBlockWithNodes._3)
val ddlBlockOp = ddlBlockWithNodes._1.ddlOp.head
val ruleBlock = ddlInfo._1
if (ddlInfo._2.nonEmpty) {
return DDLBlock(ddlInfo._2, List.apply(ruleBlock))
}
ddlBlockOp match {
case AddProperty(s, propertyName, _) =>
val isLastAssignTargetAlis = ruleBlock match {
@ -176,7 +173,7 @@ class OpenSPGDslParser extends ParserInterface {
ProjectRule(
IRProperty(s.alias, propertyName),
Ref(ddlBlockWithNodes._3.target.alias)))))
DDLBlock(Set.apply(ddlBlockOp), List.apply(prjBlk))
DDLBlock(Set.apply(ddlBlockOp) ++ ddlInfo._2, List.apply(prjBlk))
case AddPredicate(predicate) =>
val attrFields = new mutable.HashMap[String, Expr]()
addPropertiesMap.foreach(x =>
@ -209,9 +206,9 @@ class OpenSPGDslParser extends ParserInterface {
predicate.source,
predicate.target,
attrFields.toMap,
predicate.direction))),
predicate.direction))) ++ ddlInfo._2,
List.apply(depBlk))
case _ => DDLBlock(Set.apply(ddlBlockOp), List.apply(ruleBlock))
case _ => DDLBlock(Set.apply(ddlBlockOp) ++ ddlInfo._2, List.apply(ruleBlock))
}
}
@ -298,7 +295,7 @@ class OpenSPGDslParser extends ParserInterface {
predicate: PredicateElement): (Block, Set[DDLOp]) = {
val matchBlock = parseGraphStructure(ctx.the_graph_structure(), head, predicate)
val ruleBlock = parseRule(ctx.the_rule(), matchBlock)
val ddlOp = parseCreateAction(ctx.create_action())
val ddlOp = parseCreateAction(ctx.create_action(), matchBlock)
(ruleBlock, ddlOp)
}
@ -600,11 +597,31 @@ class OpenSPGDslParser extends ParserInterface {
curBlock
}
def parseCreateAction(ctx: Create_actionContext): Set[DDLOp] = {
def parseCreateAction(ctx: Create_actionContext, matchBlock: MatchBlock): Set[DDLOp] = {
if (ctx == null) {
Set.empty
} else {
ctx.create_action_body().asScala.map(x => parseCreateActionBody(x)).toSet
val ddlBlockSet = ctx.create_action_body().asScala.map(x => parseCreateActionBody(x)).toSet
val matchEleInfo = matchBlock.patterns.map(x => x._2.graphPattern.nodes).flatten
val allEleInfo = ddlBlockSet.map {
case AddVertex(s, _) => s.alias -> s
case _ => null
}.filter(_ != null).toMap ++ matchEleInfo
ddlBlockSet.map {
case c: AddVertex => c
case c: AddProperty => c
case c: AddPredicate => AddPredicate(
PredicateElement(
c.predicate.label,
c.predicate.alias,
allEleInfo(c.predicate.source.alias),
allEleInfo(c.predicate.target.alias),
c.predicate.fields,
c.predicate.direction
)
)
}.toSet
}
}

View File

@ -387,8 +387,8 @@ class OpenSPGDslParserTest extends AnyFunSpec {
val block = parser.parse(dsl)
print(block.pretty)
block.dependencies.head.isInstanceOf[ProjectBlock] should equal(true)
block.asInstanceOf[DDLBlock].ddlOp.size should equal(2)
block.asInstanceOf[DDLBlock].ddlOp.head.isInstanceOf[AddVertex] should equal(true)
block.asInstanceOf[DDLBlock].ddlOp.size should equal(3)
block.asInstanceOf[DDLBlock].ddlOp.head.isInstanceOf[AddProperty] should equal(true)
}
it("addNodeException") {
@ -504,8 +504,8 @@ class OpenSPGDslParserTest extends AnyFunSpec {
val block = parser.parse(dsl)
print(block.pretty)
block.dependencies.head.isInstanceOf[MatchBlock] should equal(true)
block.asInstanceOf[DDLBlock].ddlOp.size should equal(1)
block.asInstanceOf[DDLBlock].ddlOp.head.isInstanceOf[AddVertex] should equal(true)
block.asInstanceOf[DDLBlock].ddlOp.size should equal(2)
block.asInstanceOf[DDLBlock].ddlOp.head.isInstanceOf[AddPredicate] should equal(true)
}

View File

@ -15,8 +15,12 @@ package com.antgroup.openspg.reasoner.lube.utils
import scala.collection.mutable
import com.antgroup.openspg.reasoner.common.constants.Constants
import com.antgroup.openspg.reasoner.common.exception.UnsupportedOperationException
import com.antgroup.openspg.reasoner.common.graph.edge.SPO
import com.antgroup.openspg.reasoner.lube.block._
import com.antgroup.openspg.reasoner.lube.common.expr.{BEqual, BIn, BinaryOpExpr}
import com.antgroup.openspg.reasoner.lube.common.graph.IRNode
import com.antgroup.openspg.reasoner.lube.common.pattern.GraphPath
import com.antgroup.openspg.reasoner.lube.utils.transformer.impl.Block2GraphPathTransformer
@ -41,15 +45,95 @@ object BlockUtils {
predicate.target.typeNames.head).toString)
case AddProperty(s, propertyName, _) =>
defines.add(s.typeNames.head + "." + propertyName)
case AddVertex(s, _) =>
// defines.add(s.typeNames.head)
return Set.apply("result")
case _ =>
}
})
case _ => defines.add("result")
}
defines.toSet
if (defines.isEmpty) {
Set.apply("result")
} else {
defines.toSet
}
}
def getStarts(block: Block): Set[String] = {
val start = block.transform[Set[String]] {
case (AggregationBlock(_, _, group), groupList) =>
val groupAlias = group.map(_.name).toSet
if (groupList.head.isEmpty) {
groupAlias
} else {
val commonGroups = groupList.head.intersect(groupAlias)
if (commonGroups.isEmpty) {
throw UnsupportedOperationException(
s"cannot support groups ${groupAlias}, ${groupList.head}")
} else {
commonGroups
}
}
case (DDLBlock(ddlOp, _), list) =>
val starts = new mutable.HashSet[String]()
for (ddl <- ddlOp) {
ddl match {
case AddProperty(s, _, _) => starts.add(s.alias)
case AddPredicate(p) =>
starts.add(p.source.alias)
starts.add(p.target.alias)
case _ =>
}
}
if (list.head.isEmpty) {
starts.toSet
} else if (starts.isEmpty) {
list.head
} else {
val commonStart = list.head.intersect(starts)
if (commonStart.isEmpty) {
throw UnsupportedOperationException(
s"cannot support non-common starts ${list.head}, ${starts}")
} else {
commonStart
}
}
case (SourceBlock(_), _) => Set.empty
case (_, groupList) => groupList.head
}
if (start.isEmpty) {
getFilterStarts(block)
} else {
start
}
}
private def getFilterStarts(block: Block): Set[String] = {
block.transform[Set[String]] {
case (FilterBlock(_, rule), list) =>
rule.getExpr match {
case BinaryOpExpr(BEqual | BIn, _, _) =>
val irFields = ExprUtils.getAllInputFieldInRule(rule.getExpr, null, null)
if (irFields.size != 1 || !irFields.head.isInstanceOf[IRNode] || !irFields.head
.asInstanceOf[IRNode]
.fields
.equals(Set.apply(Constants.NODE_ID_KEY))) {
list.head
} else {
if (list.head.isEmpty) {
Set.apply(irFields.head.name)
} else {
val commonStart = list.head.intersect(Set.apply(irFields.head.name))
if (commonStart.isEmpty) {
list.head
} else {
commonStart
}
}
}
case _ => list.head
}
case (SourceBlock(_), _) => Set.empty
case (_, groupList) => groupList.head
}
}
}

View File

@ -65,7 +65,13 @@ case class SolvedModel(
}
def solve: SolvedModel = {
val tmp = tmpFields.values.map(p => fields(p.name).merge(Option.apply(p))).toList
val tmp = tmpFields.values.map(p => {
if (!fields.contains(p.name)) {
throw InvalidRefVariable(s"not found fields name : ${p} in solved fields")
}
fields(p.name).merge(Option.apply(p))
}
).toList
var newFields = fields
for (t <- tmp) {
newFields = newFields.updated(t.name, newFields(t.name).merge(Option.apply(t)))

View File

@ -14,6 +14,7 @@
package com.antgroup.openspg.reasoner.lube.logical.operators
import com.antgroup.openspg.reasoner.lube.catalog.SemanticPropertyGraph
import com.antgroup.openspg.reasoner.lube.common.expr.Expr
import com.antgroup.openspg.reasoner.lube.logical.{SolvedModel, Var}
abstract class Source extends LogicalLeafOperator {
@ -31,6 +32,18 @@ final case class Start(
override def fields: List[Var] = solved.fields.values.toList
}
final case class StartFromVertex(
graph: SemanticPropertyGraph,
id: Expr,
types: Set[String],
alias: String,
solved: SolvedModel)
extends Source {
override def refFields: List[Var] = fields
override def fields: List[Var] = solved.fields.values.toList
}
final case class Driving(graph: SemanticPropertyGraph, alias: String, solved: SolvedModel)
extends Source {
override def refFields: List[Var] = fields

View File

@ -26,6 +26,7 @@ object LogicalOptimizer {
var LOGICAL_OPT_RULES: Seq[Rule] =
Seq(
PatternJoinPure,
IdEqualPushDown,
GroupNode,
DistinctGet,
NodeIdToEdgeProperty,

View File

@ -0,0 +1,77 @@
/*
* Copyright 2023 OpenSPG Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied.
*/
package com.antgroup.openspg.reasoner.lube.logical.optimizer.rules
import com.antgroup.openspg.reasoner.common.constants.Constants
import com.antgroup.openspg.reasoner.common.trees.BottomUp
import com.antgroup.openspg.reasoner.lube.common.expr.{BEqual, BIn, BinaryOpExpr, Directly, Expr, GetField, OpChainExpr, TypeValidatedExpr, UnaryOpExpr, VConstant, VList}
import com.antgroup.openspg.reasoner.lube.common.graph.IRNode
import com.antgroup.openspg.reasoner.lube.logical.operators._
import com.antgroup.openspg.reasoner.lube.logical.optimizer.{Direction, Rule, Up}
import com.antgroup.openspg.reasoner.lube.logical.planning.LogicalPlannerContext
import com.antgroup.openspg.reasoner.lube.utils.ExprUtils
case object IdEqualPushDown extends Rule {
override def ruleWithContext(implicit context: LogicalPlannerContext): PartialFunction[
(LogicalOperator, Map[String, Object]),
(LogicalOperator, Map[String, Object])] = {
case (filter: Filter, map) =>
val start = map.get(Constants.START_ALIAS)
val idExpr = getIdExpr(filter, start)
if (idExpr == null) {
filter -> map
} else {
def rewriter: PartialFunction[LogicalOperator, LogicalOperator] = { case start: Start =>
StartFromVertex(start.graph, idExpr, start.types, start.alias, start.solved)
}
val newFilter = BottomUp[LogicalOperator](rewriter).transform(filter).asInstanceOf[Filter]
newFilter -> map
}
case (start: Start, _) =>
start -> Map.apply(Constants.START_ALIAS -> start.alias)
}
private def getIdExpr(filter: Filter, start: Option[Object]): Expr = {
if (start.isEmpty) {
return null
}
filter.rule.getExpr match {
case BinaryOpExpr(BEqual | BIn, left, right) =>
val irFields = ExprUtils.getAllInputFieldInRule(
filter.rule.getExpr,
filter.solved.getNodeAliasSet,
filter.solved.getEdgeAliasSet)
if (irFields.size != 1 || !irFields.head.isInstanceOf[IRNode] || !irFields.head
.asInstanceOf[IRNode]
.name
.equals(start.get) || !irFields.head
.asInstanceOf[IRNode]
.fields
.equals(Set.apply(Constants.NODE_ID_KEY))) {
null
} else {
left match {
case UnaryOpExpr(GetField(_), _) => right
case _ => left
}
}
case _ => null
}
}
override def direction: Direction = Up
override def maxIterations: Int = 1
}

View File

@ -16,11 +16,7 @@ package com.antgroup.openspg.reasoner.lube.logical.planning
import scala.collection.mutable
import com.antgroup.openspg.reasoner.common.constants.Constants
import com.antgroup.openspg.reasoner.common.exception.{
NotImplementedException,
SchemaException,
UnsupportedOperationException
}
import com.antgroup.openspg.reasoner.common.exception.{NotImplementedException, SchemaException, UnsupportedOperationException}
import com.antgroup.openspg.reasoner.common.graph.edge.SPO
import com.antgroup.openspg.reasoner.lube.block._
import com.antgroup.openspg.reasoner.lube.catalog.{Catalog, SemanticPropertyGraph}
@ -31,7 +27,7 @@ import com.antgroup.openspg.reasoner.lube.common.rule.Rule
import com.antgroup.openspg.reasoner.lube.logical._
import com.antgroup.openspg.reasoner.lube.logical.operators._
import com.antgroup.openspg.reasoner.lube.logical.validate.Dag
import com.antgroup.openspg.reasoner.lube.utils.{ExprUtils, RuleUtils}
import com.antgroup.openspg.reasoner.lube.utils.{BlockUtils, ExprUtils, RuleUtils}
/**
* Logical planner for KGReasoner, generate an optimal logical plan for KGDSL or GQL.
@ -59,7 +55,7 @@ object LogicalPlanner {
*/
def plan(input: Block)(implicit context: LogicalPlannerContext): List[LogicalOperator] = {
val source = resolve(input)
val groups = getStarts(input)
val groups = BlockUtils.getStarts(input)
val planWithoutResult = if (groups.isEmpty) {
planBlock(input.dependencies.head, input, None, source)
} else {
@ -402,10 +398,22 @@ object LogicalPlanner {
val starts = new mutable.HashSet[String]()
for (ddl <- ddlOp) {
ddl match {
case AddProperty(s, _, _) => starts.add(s.alias)
case AddProperty(s, _, _) =>
if (starts.isEmpty) {
starts.add(s.alias)
} else {
val common = starts.intersect(Set.apply(s.alias))
starts.clear()
starts.++=(common)
}
case AddPredicate(p) =>
starts.add(p.source.alias)
starts.add(p.target.alias)
if (starts.isEmpty) {
starts.++=(Set.apply(p.source.alias, p.target.alias))
} else {
val common = starts.intersect(Set.apply(p.source.alias, p.target.alias))
starts.clear()
starts.++=(common)
}
case _ =>
}
}

View File

@ -254,13 +254,11 @@ class SubQueryPlanner(val dag: Dag[Block])(implicit context: LogicalPlannerConte
rootAlias = predicate.target.alias
} else if (direction == Direction.OUT && predicate.direction == Direction.IN) {
rootAlias == predicate.target.alias
} else {
} else if (direction != null) {
rootAlias == predicate.source.alias
}
case AddProperty(s, _, _) =>
rootAlias = s.alias
case AddVertex(s, _) =>
rootAlias = s.alias
case _ =>
}
})

View File

@ -0,0 +1,58 @@
/*
* Copyright 2023 OpenSPG Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied.
*/
package com.antgroup.openspg.reasoner.lube.logical
import com.antgroup.openspg.reasoner.lube.utils.BlockUtils
import com.antgroup.openspg.reasoner.parser.OpenSPGDslParser
import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, equal}
class BlockUtilTests extends AnyFunSpec{
it("group start test") {
val dsl =
"""
|GraphStructure {
| (s: test)-[p: abc]->(o: test)
|}
|Rule {
| amt = group(s).sum(p.amt)
|}
|Action {
| get(s.id)
|}
|""".stripMargin
val parser = new OpenSPGDslParser()
val block = parser.parse(dsl)
BlockUtils.getStarts(block) should equal (Set.apply("s"))
}
it("group filter with id test") {
val dsl =
"""
|GraphStructure {
| (s: test)-[p: abc]->(o: test)
|}
|Rule {
| R1: o.id == '1111111'
|}
|Action {
| get(s.id)
|}
|""".stripMargin
val parser = new OpenSPGDslParser()
val block = parser.parse(dsl)
BlockUtils.getStarts(block) should equal (Set.apply("o"))
}
}

View File

@ -249,4 +249,118 @@ class OptimizerTests extends AnyFunSpec {
group should equal(List.apply(NodeVar("A", Set.empty)))
}
}
it("test id equal push down") {
val dsl =
"""
|GraphStructure {
| (s: test)-[p: abc]->(o: test)
|}
|Rule {
| R1: o.id == '1111111'
|}
|Action {
| get(s.id)
|}
|""".stripMargin
val parser = new OpenSPGDslParser()
val block = parser.parse(dsl)
val schema: Map[String, Set[String]] =
Map.apply(
"test" -> Set.apply("id"),
"test_abc_test" -> Set.empty)
val catalog = new PropertyGraphCatalog(schema)
catalog.init()
implicit val context: LogicalPlannerContext =
LogicalPlannerContext(
catalog,
parser,
Map
.apply((Constants.SPG_REASONER_MULTI_VERSION_ENABLE, true))
.asInstanceOf[Map[String, Object]])
val dag = Validator.validate(List.apply(block))
val logicalPlan = LogicalPlanner.plan(dag).popRoot()
val rule = Seq(IdEqualPushDown)
val optimizedLogicalPlan = LogicalOptimizer.optimize(logicalPlan, rule)
optimizedLogicalPlan.findExactlyOne { case start: StartFromVertex =>
start.id should equal(VString("1111111"))
start.alias should equal("o")
}
}
it("test id equal push down left") {
val dsl =
"""
|GraphStructure {
| (s: test)-[p: abc]->(o: test)
|}
|Rule {
| R1: '1111111' == o.id
|}
|Action {
| get(s.id)
|}
|""".stripMargin
val parser = new OpenSPGDslParser()
val block = parser.parse(dsl)
val schema: Map[String, Set[String]] =
Map.apply(
"test" -> Set.apply("id"),
"test_abc_test" -> Set.empty)
val catalog = new PropertyGraphCatalog(schema)
catalog.init()
implicit val context: LogicalPlannerContext =
LogicalPlannerContext(
catalog,
parser,
Map
.apply((Constants.SPG_REASONER_MULTI_VERSION_ENABLE, true))
.asInstanceOf[Map[String, Object]])
val dag = Validator.validate(List.apply(block))
val logicalPlan = LogicalPlanner.plan(dag).popRoot()
val rule = Seq(IdEqualPushDown)
val optimizedLogicalPlan = LogicalOptimizer.optimize(logicalPlan, rule)
optimizedLogicalPlan.findExactlyOne { case start: StartFromVertex =>
start.id should equal(VString("1111111"))
start.alias should equal("o")
}
}
it("test id equal push down with in") {
val dsl =
"""
|GraphStructure {
| (s: test)-[p: abc]->(o: test)
|}
|Rule {
| R1: o.id in ['1111111', '2222222']
|}
|Action {
| get(s.id)
|}
|""".stripMargin
val parser = new OpenSPGDslParser()
val block = parser.parse(dsl)
val schema: Map[String, Set[String]] =
Map.apply(
"test" -> Set.apply("id"),
"test_abc_test" -> Set.empty)
val catalog = new PropertyGraphCatalog(schema)
catalog.init()
implicit val context: LogicalPlannerContext =
LogicalPlannerContext(
catalog,
parser,
Map
.apply((Constants.SPG_REASONER_MULTI_VERSION_ENABLE, true))
.asInstanceOf[Map[String, Object]])
val dag = Validator.validate(List.apply(block))
val logicalPlan = LogicalPlanner.plan(dag).popRoot()
val rule = Seq(IdEqualPushDown)
val optimizedLogicalPlan = LogicalOptimizer.optimize(logicalPlan, rule)
optimizedLogicalPlan.findExactlyOne { case start: StartFromVertex =>
start.id.asInstanceOf[VList].list should equal(List.apply("1111111", "2222222"))
start.alias should equal("o")
}
}
}

View File

@ -133,7 +133,10 @@ class NodeIdToEdgePropertyTests extends AnyFunSpec {
val dsl =
"""
|GraphStructure {
| (A:User)-[e1:lk]->(B:User)-[e2:lk]->(C:User)
| B [User, __start__ = 'true']
| A, C [User]
| A -> B [lk] as e1
| B -> C [lk] as e2
|}
|Rule {
| R1(""): e1.weight < e2.weight

View File

@ -13,6 +13,7 @@
package com.antgroup.openspg.reasoner.lube.physical
import com.antgroup.openspg.reasoner.lube.common.expr.Expr
import com.antgroup.openspg.reasoner.lube.logical.RepeatPathVar
import com.antgroup.openspg.reasoner.lube.physical.rdg.RDG
@ -35,6 +36,16 @@ trait PropertyGraph[T <: RDG[T]] {
*/
def createRDG(alias: String, rdg: T): T
/**
* Start with specific vertex.
*
* @param alias
* @param id
* @param types
* @return
*/
def createRDG(alias: String, id: Expr, types: Set[String]): T
/**
* Start with specific rdg with specific alias which in [[RepeatPathVar]]
* @param repeatVar

View File

@ -50,6 +50,9 @@ abstract class PhysicalLeafOperator[T <: RDG[T]: TypeTag] extends PhysicalOperat
throw UnsupportedOperationException("LogicalLeafOperator cannot construct children")
}
def alias: String
def types: Set[String]
}
abstract class StackingPhysicalOperator[T <: RDG[T]: TypeTag] extends PhysicalOperator[T] {

View File

@ -15,6 +15,7 @@ package com.antgroup.openspg.reasoner.lube.physical.operators
import scala.reflect.runtime.universe.TypeTag
import com.antgroup.openspg.reasoner.lube.common.expr.Expr
import com.antgroup.openspg.reasoner.lube.logical.Var
import com.antgroup.openspg.reasoner.lube.physical.planning.PhysicalPlannerContext
import com.antgroup.openspg.reasoner.lube.physical.rdg.RDG
@ -28,14 +29,27 @@ final case class Start[T <: RDG[T]: TypeTag](
override def rdg: T = context.graphSession.getGraph(graphName).createRDG(alias, types)
}
final case class StartFromVertex[T <: RDG[T]: TypeTag](
graphName: String,
alias: String,
meta: List[Var],
vId: Expr,
types: Set[String])(implicit override val context: PhysicalPlannerContext[T])
extends PhysicalLeafOperator[T] {
override def rdg: T = context.graphSession.getGraph(graphName).createRDG(alias, vId, types)
}
final case class DrivingRDG[T <: RDG[T]: TypeTag](
graphName: String,
meta: List[Var],
alias: String,
workingRdgName: String)(implicit override val context: PhysicalPlannerContext[T])
extends PhysicalLeafOperator[T] {
override def rdg: T = {
val workingRdg = context.graphSession.getWorkingRDG(workingRdgName)
context.graphSession.getGraph(graphName).createRDG(alias, workingRdg)
}
override def types: Set[String] = Set.empty
}

View File

@ -91,6 +91,10 @@ object PhysicalPlanner {
Start(start.graph.graphName, start.alias, start.fields, start.types)(
implicitly[TypeTag[T]],
context)
case start: LogicalOperators.StartFromVertex =>
StartFromVertex(start.graph.graphName, start.alias, start.fields, start.id, start.types)(
implicitly[TypeTag[T]],
context)
case driving: LogicalOperators.Driving =>
DrivingRDG(start.graph.graphName, start.fields, start.alias, workingRdgName)(
implicitly[TypeTag[T]],

View File

@ -15,19 +15,19 @@ package com.antgroup.openspg.reasoner.lube.physical.util
import scala.reflect.runtime.universe.TypeTag
import com.antgroup.openspg.reasoner.lube.physical.operators.{PhysicalOperator, Start}
import com.antgroup.openspg.reasoner.lube.physical.operators.{PhysicalLeafOperator, PhysicalOperator, Start, StartFromVertex}
import com.antgroup.openspg.reasoner.lube.physical.rdg.RDG
import com.antgroup.openspg.reasoner.lube.physical.util.PhysicalOperatorOps.RichPhysicalOperator
object PhysicalOperatorUtil {
def getStartTypes[T <: RDG[T]: TypeTag](physicalOp: PhysicalOperator[T]): Set[String] = {
getStartOp(physicalOp).types
}
def getStartOp[T <: RDG[T]: TypeTag](physicalOp: PhysicalOperator[T]): Start[T] = {
val op = physicalOp.findExactlyOne { case start: Start[T] => }
op.asInstanceOf[Start[T]]
def getStartOp[T <: RDG[T]: TypeTag](
physicalOp: PhysicalOperator[T]): PhysicalLeafOperator[T] = {
val op = physicalOp.findExactlyOne {
case start: Start[T] =>
case start: StartFromVertex[T] =>
}
op.asInstanceOf[PhysicalLeafOperator[T]]
}
}

View File

@ -20,9 +20,9 @@ import com.antgroup.openspg.reasoner.graphstate.GraphState;
import com.antgroup.openspg.reasoner.graphstate.impl.MemGraphState;
import com.antgroup.openspg.reasoner.lube.catalog.Catalog;
import com.antgroup.openspg.reasoner.lube.parser.ParserInterface;
import com.antgroup.openspg.reasoner.lube.physical.operators.PhysicalLeafOperator;
import com.antgroup.openspg.reasoner.lube.physical.operators.PhysicalOperator;
import com.antgroup.openspg.reasoner.lube.physical.operators.Select;
import com.antgroup.openspg.reasoner.lube.physical.operators.Start;
import com.antgroup.openspg.reasoner.lube.physical.util.PhysicalOperatorUtil;
import com.antgroup.openspg.reasoner.parser.OpenSPGDslParser;
import com.antgroup.openspg.reasoner.runner.ConfigKey;
@ -115,7 +115,7 @@ public class LocalReasonerRunner {
boolean isLastDsl = (i + 1 == dslDagList.size());
if (isLastDsl) {
Start<LocalRDG> start =
PhysicalLeafOperator<LocalRDG> start =
PhysicalOperatorUtil.getStartOp(
dslDagList.get(i),
com.antgroup.openspg.reasoner.runner.local.rdg.TypeTags.rdgTypeTag());

View File

@ -20,15 +20,19 @@ import com.antgroup.openspg.reasoner.common.graph.vertex.impl.MirrorVertex;
import com.antgroup.openspg.reasoner.common.graph.vertex.impl.NoneVertex;
import com.antgroup.openspg.reasoner.graphstate.GraphState;
import com.antgroup.openspg.reasoner.kggraph.KgGraph;
import com.antgroup.openspg.reasoner.lube.common.expr.Expr;
import com.antgroup.openspg.reasoner.lube.logical.RepeatPathVar;
import com.antgroup.openspg.reasoner.lube.physical.PropertyGraph;
import com.antgroup.openspg.reasoner.lube.utils.transformer.impl.Expr2QlexpressTransformer;
import com.antgroup.openspg.reasoner.recorder.EmptyRecorder;
import com.antgroup.openspg.reasoner.recorder.IExecutionRecorder;
import com.antgroup.openspg.reasoner.runner.ConfigKey;
import com.antgroup.openspg.reasoner.runner.local.model.LocalReasonerTask;
import com.antgroup.openspg.reasoner.runner.local.rdg.LocalRDG;
import com.antgroup.openspg.reasoner.udf.rule.RuleRunner;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@ -113,6 +117,55 @@ public class LocalPropertyGraph implements PropertyGraph<LocalRDG> {
return result;
}
@Override
public LocalRDG createRDG(String alias, Expr id, Set<String> types) {
java.util.Set<IVertexId> startIdSet = new HashSet<>();
Expr2QlexpressTransformer transformer =
new Expr2QlexpressTransformer(RuleRunner::convertPropertyName);
List<String> exprQlList =
Lists.newArrayList(JavaConversions.seqAsJavaList(transformer.transform(id)));
List<String> idStrList = new ArrayList<>();
Object idObj = RuleRunner.getInstance().executeExpression(new HashMap<>(), exprQlList, "");
if (idObj instanceof String) {
idStrList.add(String.valueOf(idObj));
} else if (idObj instanceof List) {
List idOList = (List) idObj;
for (Object ido : idOList) {
idStrList.add(String.valueOf(ido));
}
} else if (idObj instanceof String[]) {
String[] idArray = (String[]) idObj;
idStrList.addAll(Lists.newArrayList(idArray));
} else if (idObj instanceof Object[]) {
Object[] idArray = (Object[]) idObj;
for (Object idO : idArray) {
idStrList.add(String.valueOf(idO));
}
}
for (String type : JavaConversions.asJavaCollection(types)) {
for (String idStr : idStrList) {
startIdSet.add(IVertexId.from(idStr, type));
}
}
if (startIdSet.isEmpty()) {
throw new RuntimeException("can not extract start id list");
}
LocalRDG result =
new LocalRDG(
graphState,
Lists.newArrayList(startIdSet),
threadPoolExecutor,
executorTimeoutMs,
alias,
getTaskId(),
// subquery can not carry all graph
getExecutionRecorder(),
false);
result.setMaxPathLimit(getMaxPathLimit());
result.setStrictMaxPathLimit(getStrictMaxPathLimit());
return result;
}
@Override
public LocalRDG createRDGFromPath(RepeatPathVar repeatVar, String alias, LocalRDG rdg) {
return null;

View File

@ -42,6 +42,11 @@ public class KgReasonerAliasSetKFilmTest {
FileMutex.runTestWithMutex(this::doTest0);
}
@Test
public void testUseRule0() {
FileMutex.runTestWithMutex(this::doTestUseRule0);
}
private void doTest0() {
String dsl =
"\n"
@ -73,6 +78,36 @@ public class KgReasonerAliasSetKFilmTest {
Assert.assertEquals("B", result.get(0)[1]);
}
private void doTestUseRule0() {
String dsl =
"\n"
+ "GraphStructure {\n"
+ " (A:User)-[p1:trans]->(B:User)\n"
+ "}\n"
+ "Rule {\n"
+ " R1: A.id in ['A']"
+ "}\n"
+ "Action {\n"
+ " get(A.id, B.id)\n"
+ "}";
List<String[]> result =
TransBaseTestData.runTestResult(
dsl,
new HashMap<String, Object>() {
{
put(Constants.START_ALIAS, "A");
put(
ConfigKey.KG_REASONER_MOCK_GRAPH_DATA,
"Graph {\n" + " A [User]\n" + " B [User]\n" + " A->B [trans]\n" + "}");
put(ConfigKey.KG_REASONER_OUTPUT_GRAPH, "true");
}
});
Assert.assertEquals(1, result.size());
Assert.assertEquals(2, result.get(0).length);
Assert.assertEquals("A", result.get(0)[0]);
Assert.assertEquals("B", result.get(0)[1]);
}
@Test
public void test1() {
FileMutex.runTestWithMutex(this::doTest1);

View File

@ -84,7 +84,7 @@ public class KgReasonerLeadToTest {
LocalReasonerRunner runner = new LocalReasonerRunner();
LocalReasonerResult result = runner.run(task);
System.out.println(result);
Assert.assertEquals(1, result.getVertexList().size());
Assert.assertEquals(2, result.getVertexList().size());
}
public static class GraphLoaderForAddVertex extends AbstractLocalGraphLoader {

View File

@ -89,6 +89,7 @@ public class TransBaseTestData {
params.put(Constants.SPG_REASONER_PLAN_PRETTY_PRINT_LOGGER_ENABLE, true);
params.putAll(runParams);
task.setParams(params);
task.setExecutorTimeoutMs(5 * 60 * 1000);
task.setStartIdList(Lists.newArrayList(new Tuple2<>("1", "User")));

View File

@ -14,6 +14,9 @@
package com.antgroup.openspg.reasoner.runner.local.main.transitive;
import com.antgroup.openspg.reasoner.common.constants.Constants;
import com.antgroup.openspg.reasoner.common.graph.edge.IEdge;
import com.antgroup.openspg.reasoner.common.graph.property.IProperty;
import com.antgroup.openspg.reasoner.common.graph.vertex.IVertex;
import com.antgroup.openspg.reasoner.graphstate.impl.MemGraphState;
import com.antgroup.openspg.reasoner.lube.catalog.Catalog;
import com.antgroup.openspg.reasoner.lube.catalog.impl.PropertyGraphCatalog;
@ -25,9 +28,11 @@ import com.antgroup.openspg.reasoner.runner.local.loader.MockLocalGraphLoader;
import com.antgroup.openspg.reasoner.runner.local.model.LocalReasonerResult;
import com.antgroup.openspg.reasoner.runner.local.model.LocalReasonerTask;
import com.antgroup.openspg.reasoner.util.Convert2ScalaUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.junit.Assert;
import org.junit.Test;
@ -859,7 +864,7 @@ public class TransitiveOptionalTest {
+ "B->A [relatedReason] as F1\n"
+ "\n"
+ "// 1.8的C\n"
+ "B->C [relatedReason] repeat(1,20) as F3\n"
+ "B->C [relatedReason] repeat(1,2) as F3\n"
+ "}\n"
+ "Rule {\n"
+ " R1: A.id == 'A_730'\n"
@ -1286,4 +1291,130 @@ public class TransitiveOptionalTest {
LocalReasonerResult rst = runTest(schema, dsl, dataGraphStr2);
Assert.assertEquals(1, rst.getRows().size());
}
@Test
public void testCreateInstance() {
String dsl =
"Define (s:Custid)-[p:strNum]->(o:Int) {\n"
+ " GraphStructure {\n"
+ " (s)<-[pp:hasCust]-(str:STR)\n"
+ " }\n"
+ " Rule {\n"
+ " o = group(s).countIf(str.status == 'CLOSE', str)\n"
+ " }\n"
+ "}\n"
+ "\n"
+ "Define (s:Custid)-[p:isInWhiteBlack]->(o:Boolean) {\n"
+ " GraphStructure {\n"
+ " (s)<-[:hasCust]-(str:STR)\n"
+ " }\n"
+ " Rule {\n"
+ " R1: str.matchrule == '0202'\n"
+ " R2: str.isreport == '1' \n"
+ "\n"
+ " o = (R1 and R2)\n"
+ " }\n"
+ "}\n"
+ "\n"
+ "Define (s:Custid)-[p:isAggregator]->(o:Boolean) {\n"
+ " GraphStructure {\n"
+ " (s)<-[e:complained]-(u1:Custid)\n"
+ " }\n"
+ " Rule {\n"
+ " R0: s.isInWhiteBlack == null or s.isInWhiteBlack == false\n"
+ " complainNum = group(s).count(e)\n"
+ " R5(\"被投诉大于20条\"): complainNum >=20\n"
+ "\n"
+ " o = true\n"
+ " }\n"
+ " Action {\n"
+ " gang = createNodeInstance(\n"
+ " type=Gang,\n"
+ " value={\n"
+ " id=concat(s.id, \"_gang\")\n"
+ " }\n"
+ " )\n"
+ " createEdgeInstance(\n"
+ " src=gang,\n"
+ " dst=s,\n"
+ " type=has,\n"
+ " value={\n"
+ " }\n"
+ " )\n"
+ " }\n"
+ "}\n"
+ "GraphStructure {"
+ " A [Custid, __start__='true']\n"
+ " B [Gang]\n"
+ " B->A [has]\n"
+ "}\n"
+ "Rule {\n"
+ "}\n"
+ "Action {\n"
+ " get(A.id, A.isAggregator, B.id) \n"
+ "}";
System.out.println(dsl);
LocalReasonerTask task = new LocalReasonerTask();
task.setDsl(dsl);
// add mock catalog
Map<String, Set<String>> schema = new HashMap<>();
schema.put(
"Custid",
Convert2ScalaUtil.toScalaImmutableSet(
Sets.newHashSet(
"trdAmtIn90d",
"trdAmt90d",
"trdCntCustIn90d",
"custcntpty90CustNum90dInGenderFemale",
"custcntpty90CustNum90dInGenderMale",
"name")));
schema.put(
"STR",
Convert2ScalaUtil.toScalaImmutableSet(
Sets.newHashSet("conclusion", "name", "status", "matchrule", "isreport")));
schema.put("Gang", Convert2ScalaUtil.toScalaImmutableSet(Sets.newHashSet("cid", "name")));
schema.put("Gang_has_Custid", Convert2ScalaUtil.toScalaImmutableSet(Sets.newHashSet("info")));
schema.put(
"Custid_complained_Custid",
Convert2ScalaUtil.toScalaImmutableSet(Sets.newHashSet("createMemo")));
schema.put(
"STR_hasCust_Custid", Convert2ScalaUtil.toScalaImmutableSet(Sets.newHashSet("createMemo")));
Catalog catalog = new PropertyGraphCatalog(Convert2ScalaUtil.toScalaImmutableMap(schema));
catalog.init();
task.setCatalog(catalog);
task.setGraphLoadClass(
"com.antgroup.openspg.reasoner.runner.local.main.transitive.TransitiveOptionalTest$GangGraphLoader");
// enable subquery
Map<String, Object> params = new HashMap<>();
params.put(Constants.SPG_REASONER_LUBE_SUBQUERY_ENABLE, true);
params.put(Constants.SPG_REASONER_MULTI_VERSION_ENABLE, "true");
task.setParams(params);
LocalReasonerRunner runner = new LocalReasonerRunner();
LocalReasonerResult result = runner.run(task);
Assert.assertEquals(2, result.getRows().size());
}
public static class GangGraphLoader extends AbstractLocalGraphLoader {
@Override
public List<IVertex<String, IProperty>> genVertexList() {
return Lists.newArrayList(
constructionVertex("A1", "Custid", "name", "A1", "cid", "a1"),
constructionVertex("A2", "Custid", "name", "A2", "cid", "a2"),
constructionVertex("B1", "Gang", "name", "B2", "cid", "b1"));
}
@Override
public List<IEdge<String, IProperty>> genEdgeList() {
return Lists.newArrayList(
constructionEdge("B1", "has", "A1", "info", "b1_a1"),
constructionEdge("B1", "has", "A2", "info", "b1_a2"),
constructionEdge("A1", "complained", "A2", "info", "a1ca2"));
}
}
}

View File

@ -60,7 +60,7 @@ public class ExtractRelationImpl implements Serializable {
private final String taskId;
private final PatternElement sourceElement;
private final Element sourceElement;
private final EntityElement targetEntityElement;
private final PatternElement targetPatternElement;
@ -84,7 +84,7 @@ public class ExtractRelationImpl implements Serializable {
this.propertyRuleMap.put(propertyName, rule);
}
PatternElement sourceElement = (PatternElement) addPredicate.predicate().source();
Element se = addPredicate.predicate().source();
Element te = addPredicate.predicate().target();
EntityElement targetEntityElement = null;
PatternElement targetPatternElement = null;
@ -114,11 +114,10 @@ public class ExtractRelationImpl implements Serializable {
}
}
if (!this.propertyRuleMap.containsKey(Constants.EDGE_FROM_ID_KEY)) {
this.propertyRuleMap.put(
Constants.EDGE_FROM_ID_KEY, Lists.newArrayList(sourceElement.alias() + ".id"));
this.propertyRuleMap.put(Constants.EDGE_FROM_ID_KEY, Lists.newArrayList(se.alias() + ".id"));
}
this.sourceElement = sourceElement;
this.sourceElement = se;
this.targetEntityElement = targetEntityElement;
this.targetPatternElement = targetPatternElement;
}

View File

@ -445,6 +445,12 @@ object LoaderUtil {
} else {
merge(solvedModel, list.head)
}
case (StartFromVertex(_, _, _, _, solvedModel), list) =>
if (list == null || list.isEmpty) {
solvedModel
} else {
merge(solvedModel, list.head)
}
case (LinkedExpand(_, edgePattern), list) =>
if (edgePattern.edge.funcName.equals(Constants.CONCEPT_EDGE_EXPAND_FUNC_NAME)) {
merge(getConceptEdgeExpandSolvedModel(logicalPlan.graph, edgePattern), list.head)

View File

@ -14,6 +14,7 @@
package com.antgroup.openspg.reasoner.session
import com.antgroup.openspg.reasoner.lube.catalog.Catalog
import com.antgroup.openspg.reasoner.lube.common.expr.Expr
import com.antgroup.openspg.reasoner.lube.logical.RepeatPathVar
import com.antgroup.openspg.reasoner.lube.parser.ParserInterface
import com.antgroup.openspg.reasoner.lube.physical.PropertyGraph
@ -51,6 +52,15 @@ class EmptyPropertyGraph extends PropertyGraph[EmptyRDG] {
alias: String,
rdg: EmptyRDG): EmptyRDG = rdg
/**
* Start with specific vertex.
*
* @param alias
* @param id
* @param types
* @return
*/
override def createRDG(alias: String, id: Expr, types: Set[String]): EmptyRDG = new EmptyRDG()
}
class EmptySession(parser: ParserInterface, catalog: Catalog)

View File

@ -51,6 +51,6 @@ class PhysicalOpUtilTests extends AnyFunSpec {
catalog.init()
val session = new EmptySession(new OpenSPGDslParser(), catalog)
val rst = session.plan(dsl, Map.empty)
PhysicalOperatorUtil.getStartTypes(rst.head) should equal (Set.apply("User"))
PhysicalOperatorUtil.getStartOp(rst.head).alias should equal ("s")
}
}