mirror of
https://github.com/OpenSPG/openspg.git
synced 2025-12-02 10:10:36 +00:00
feat(reasoner): support remove duplicate (#177)
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
This commit is contained in:
parent
415af00a32
commit
e9f1765e47
@ -0,0 +1,43 @@
|
||||
/*
|
||||
* Copyright 2023 OpenSPG Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
* or implied.
|
||||
*/
|
||||
|
||||
package com.antgroup.openspg.common.util;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Arrays;
|
||||
|
||||
public class ArrayWrapper implements Serializable {
|
||||
private final Object[] value;
|
||||
|
||||
public ArrayWrapper(Object[] value) {
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Arrays.hashCode(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (!(obj instanceof ArrayWrapper)) {
|
||||
return false;
|
||||
}
|
||||
ArrayWrapper other = (ArrayWrapper) obj;
|
||||
return Arrays.equals(this.value, other.value);
|
||||
}
|
||||
|
||||
public Object[] getValue() {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
@ -18,20 +18,36 @@ import scala.collection.mutable
|
||||
|
||||
import com.antgroup.openspg.reasoner.KGDSLParser._
|
||||
import com.antgroup.openspg.reasoner.common.constants.Constants
|
||||
import com.antgroup.openspg.reasoner.common.exception.{KGDSLGrammarException, KGDSLOneTaskException}
|
||||
import com.antgroup.openspg.reasoner.common.exception.{
|
||||
KGDSLGrammarException,
|
||||
KGDSLOneTaskException
|
||||
}
|
||||
import com.antgroup.openspg.reasoner.common.graph.edge.Direction
|
||||
import com.antgroup.openspg.reasoner.common.trees.BottomUp
|
||||
import com.antgroup.openspg.reasoner.common.types._
|
||||
import com.antgroup.openspg.reasoner.lube.block._
|
||||
import com.antgroup.openspg.reasoner.lube.common.expr._
|
||||
import com.antgroup.openspg.reasoner.lube.common.graph._
|
||||
import com.antgroup.openspg.reasoner.lube.common.pattern.{Element, EntityElement, GraphPath, PatternElement, PredicateElement}
|
||||
import com.antgroup.openspg.reasoner.lube.common.pattern.{
|
||||
Element,
|
||||
EntityElement,
|
||||
GraphPath,
|
||||
PatternElement,
|
||||
PredicateElement
|
||||
}
|
||||
import com.antgroup.openspg.reasoner.lube.common.rule.{LogicRule, ProjectRule, Rule}
|
||||
import com.antgroup.openspg.reasoner.lube.parser.ParserInterface
|
||||
import com.antgroup.openspg.reasoner.lube.utils.{ExprUtils, RuleUtils}
|
||||
import com.antgroup.openspg.reasoner.lube.utils.transformer.impl.{Expr2QlexpressTransformer, Rule2ExprTransformer}
|
||||
import com.antgroup.openspg.reasoner.lube.utils.transformer.impl.{
|
||||
Expr2QlexpressTransformer,
|
||||
Rule2ExprTransformer
|
||||
}
|
||||
import com.antgroup.openspg.reasoner.parser.expr.RuleExprParser
|
||||
import com.antgroup.openspg.reasoner.parser.pattern.{ConceptLabelType, EntityLabelType, PatternParser}
|
||||
import com.antgroup.openspg.reasoner.parser.pattern.{
|
||||
ConceptLabelType,
|
||||
EntityLabelType,
|
||||
PatternParser
|
||||
}
|
||||
|
||||
/**
|
||||
* parse dsl to Block
|
||||
@ -169,9 +185,7 @@ class OpenSPGDslParser extends ParserInterface {
|
||||
attrFields.put(
|
||||
Constants.EDGE_FROM_ID_KEY,
|
||||
UnaryOpExpr(GetField(Constants.NODE_ID_KEY), Ref(predicate.source.alias)))
|
||||
attrFields.put(
|
||||
Constants.EDGE_FROM_ID_TYPE_KEY,
|
||||
VString(predicate.source.typeNames.head))
|
||||
attrFields.put(Constants.EDGE_FROM_ID_TYPE_KEY, VString(predicate.source.typeNames.head))
|
||||
if (predicate.target.isInstanceOf[EntityElement]) {
|
||||
attrFields.put(
|
||||
Constants.EDGE_TO_ID_KEY,
|
||||
@ -181,10 +195,7 @@ class OpenSPGDslParser extends ParserInterface {
|
||||
Constants.EDGE_TO_ID_KEY,
|
||||
UnaryOpExpr(GetField(Constants.NODE_ID_KEY), Ref(predicate.target.alias)))
|
||||
}
|
||||
attrFields.put(
|
||||
Constants.EDGE_TO_ID_TYPE_KEY,
|
||||
VString(predicate.target.typeNames.head))
|
||||
|
||||
attrFields.put(Constants.EDGE_TO_ID_TYPE_KEY, VString(predicate.target.typeNames.head))
|
||||
|
||||
val depBlk = ruleBlock
|
||||
|
||||
@ -397,18 +408,16 @@ class OpenSPGDslParser extends ParserInterface {
|
||||
case _ =>
|
||||
ProjectBlock(
|
||||
List.apply(opBlock),
|
||||
ProjectFields(Map.apply(lValueName ->
|
||||
ProjectRule(lValueName, opChain))))
|
||||
ProjectFields(
|
||||
Map.apply(lValueName ->
|
||||
ProjectRule(lValueName, opChain))))
|
||||
}
|
||||
case AggIfOpExpr(_, _) | AggOpExpr(_, _) =>
|
||||
ProjectBlock(
|
||||
List.apply(opBlock),
|
||||
ProjectFields(
|
||||
Map.apply(
|
||||
lValueName ->
|
||||
ProjectRule(
|
||||
lValueName,
|
||||
opChain.curExpr))))
|
||||
Map.apply(lValueName ->
|
||||
ProjectRule(lValueName, opChain.curExpr))))
|
||||
case _ => null
|
||||
}
|
||||
}
|
||||
@ -667,7 +676,7 @@ class OpenSPGDslParser extends ParserInterface {
|
||||
|
||||
def parseGetAndAs(
|
||||
ctx: Get_actionContext,
|
||||
preBlock: Block): (List[String], List[String], List[Expr], Block, String) = {
|
||||
preBlock: Block): (List[String], List[String], List[Expr], Block, String, Boolean) = {
|
||||
var newPreBlock: Block = preBlock
|
||||
var columnNames = List[String]()
|
||||
var columnExpr = List[Expr]()
|
||||
@ -704,12 +713,23 @@ class OpenSPGDslParser extends ParserInterface {
|
||||
})
|
||||
}
|
||||
newPreBlock = addRuleToBlock(columnProjectRule.toSet, newPreBlock)
|
||||
(columnNames, outputColumnNames, columnExpr, newPreBlock, viewName)
|
||||
val distinct = ctx.action_get().getText match {
|
||||
case "distinctGet" => true
|
||||
case _ => false
|
||||
}
|
||||
(columnNames, outputColumnNames, columnExpr, newPreBlock, viewName, distinct)
|
||||
}
|
||||
|
||||
def parseGetAction(ctx: Get_actionContext, preBlock: Block): Block = {
|
||||
val (columnNames, outputColumnNames, _, newPreBlock, _) = parseGetAndAs(ctx, preBlock)
|
||||
parseTableResultBlock(columnNames, outputColumnNames, Set.empty, List.empty, newPreBlock)
|
||||
val (columnNames, outputColumnNames, _, newPreBlock, _, distinct) =
|
||||
parseGetAndAs(ctx, preBlock)
|
||||
parseTableResultBlock(
|
||||
distinct,
|
||||
columnNames,
|
||||
outputColumnNames,
|
||||
Set.empty,
|
||||
List.empty,
|
||||
newPreBlock)
|
||||
}
|
||||
|
||||
def parseOneElementInGet(ctx: One_element_in_getContext): (Rule, String, Boolean) = {
|
||||
@ -741,10 +761,7 @@ class OpenSPGDslParser extends ParserInterface {
|
||||
ctx.non_parenthesized_value_expression_primary_with_property())
|
||||
val defaultColumnName = parseExpr2ElementStr(expr)
|
||||
val columnName = parseAsAliasWithComment(ctx.as_alias_with_comment(), defaultColumnName)
|
||||
(
|
||||
ProjectRule(IRVariable(defaultColumnName), expr),
|
||||
columnName,
|
||||
false)
|
||||
(ProjectRule(IRVariable(defaultColumnName), expr), columnName, false)
|
||||
}
|
||||
|
||||
def parseAsAliasWithComment(ctx: As_alias_with_commentContext, defaultName: String): String = {
|
||||
@ -858,13 +875,11 @@ class OpenSPGDslParser extends ParserInterface {
|
||||
val expr = exprParser.parseValueExpression(ctx.value_expression())
|
||||
val defaultColumnName = parseExpr2ElementStr(expr)
|
||||
val columnName = parseReturnAlias(ctx.return_item_alias(), defaultColumnName)
|
||||
(
|
||||
ProjectRule(IRVariable(defaultColumnName), expr),
|
||||
columnName,
|
||||
false)
|
||||
(ProjectRule(IRVariable(defaultColumnName), expr), columnName, false)
|
||||
}
|
||||
|
||||
def parseTableResultBlock(
|
||||
distinct: Boolean,
|
||||
columnNames: List[String],
|
||||
outputColumnNames: List[String],
|
||||
rules: Set[Rule],
|
||||
@ -941,7 +956,11 @@ class OpenSPGDslParser extends ParserInterface {
|
||||
}
|
||||
|
||||
if (rules.isEmpty) {
|
||||
TableResultBlock(List.apply(dependency), OrderedFields(resultFields), outputColumnNames)
|
||||
TableResultBlock(
|
||||
List.apply(dependency),
|
||||
OrderedFields(resultFields),
|
||||
outputColumnNames,
|
||||
distinct)
|
||||
} else {
|
||||
TableResultBlock(
|
||||
List.apply(
|
||||
@ -949,7 +968,8 @@ class OpenSPGDslParser extends ParserInterface {
|
||||
List.apply(dependency),
|
||||
ProjectFields(rules.map(x => (x.getOutput, x)).toMap))),
|
||||
OrderedFields(resultFields),
|
||||
outputColumnNames)
|
||||
outputColumnNames,
|
||||
distinct)
|
||||
}
|
||||
}
|
||||
|
||||
@ -974,6 +994,7 @@ class OpenSPGDslParser extends ParserInterface {
|
||||
.filter(x => x != null)
|
||||
|
||||
parseTableResultBlock(
|
||||
false,
|
||||
columnNames,
|
||||
outputColumnNames,
|
||||
columnProjectRule.toSet,
|
||||
|
||||
@ -14,7 +14,11 @@
|
||||
package com.antgroup.openspg.reasoner.parser
|
||||
|
||||
import com.antgroup.openspg.reasoner.common.constants.Constants
|
||||
import com.antgroup.openspg.reasoner.common.exception.{KGDSLGrammarException, KGDSLInvalidTokenException, KGDSLOneTaskException}
|
||||
import com.antgroup.openspg.reasoner.common.exception.{
|
||||
KGDSLGrammarException,
|
||||
KGDSLInvalidTokenException,
|
||||
KGDSLOneTaskException
|
||||
}
|
||||
import com.antgroup.openspg.reasoner.lube.block._
|
||||
import com.antgroup.openspg.reasoner.lube.common.expr._
|
||||
import com.antgroup.openspg.reasoner.lube.common.graph._
|
||||
@ -803,7 +807,7 @@ class OpenSPGDslParserTest extends AnyFunSpec {
|
||||
|
||||
val blocks = parser.parse(dsl)
|
||||
print(blocks.pretty)
|
||||
val blockRst = """└─TableResultBlock(selectList=OrderedFields(List(IRProperty(A,id))), asList=List(A.id))
|
||||
val blockRst = """└─TableResultBlock(selectList=OrderedFields(List(IRProperty(A,id))), asList=List(A.id), distinct=false)
|
||||
| └─MatchBlock(patterns=Map(unresolved_default_path -> GraphPath(unresolved_default_path,GraphPattern(D,Map(A -> (A:test), D -> (D:test)),Map(D -> Set((D)->[D_C_2:abc]-(C)))),Map(D -> Set(), A -> Set(id), D_C_2 -> Set())),false)))
|
||||
| └─SourceBlock(graph=KG(Map(D -> IRNode(D,Set()), A -> IRNode(A,Set(id))),Map(D_C_2 -> IREdge(D_C_2,Set()))))""".stripMargin
|
||||
blocks.pretty should equal(blockRst)
|
||||
@ -848,7 +852,7 @@ class OpenSPGDslParserTest extends AnyFunSpec {
|
||||
|
||||
val blocks = parser.parseMultipleStatement(dsl, null)
|
||||
print(blocks.head.pretty)
|
||||
val blockRst = """└─TableResultBlock(selectList=OrderedFields(List(IRProperty(A,id))), asList=List(A.id))
|
||||
val blockRst = """└─TableResultBlock(selectList=OrderedFields(List(IRProperty(A,id))), asList=List(A.id), distinct=false)
|
||||
| └─MatchBlock(patterns=Map(unresolved_default_path -> GraphPath(unresolved_default_path,GraphPattern(null,Map(A -> (A:test), D -> (D:test)),Map(D -> Set((D)->[D_C_2:abc]-(C)))),Map(D -> Set(), A -> Set(id), D_C_2 -> Set())),false)))
|
||||
| └─SourceBlock(graph=KG(Map(D -> IRNode(D,Set()), A -> IRNode(A,Set(id))),Map(D_C_2 -> IREdge(D_C_2,Set()))))""".stripMargin
|
||||
blocks.head.pretty should equal(blockRst)
|
||||
@ -871,7 +875,7 @@ class OpenSPGDslParserTest extends AnyFunSpec {
|
||||
|
||||
val blocks = parser.parseMultipleStatement(dsl, Map.apply(Constants.START_ALIAS -> "D"))
|
||||
print(blocks.head.pretty)
|
||||
val blockRst = """└─TableResultBlock(selectList=OrderedFields(List(IRProperty(A,id))), asList=List(A.id))
|
||||
val blockRst = """└─TableResultBlock(selectList=OrderedFields(List(IRProperty(A,id))), asList=List(A.id), distinct=false)
|
||||
| └─MatchBlock(patterns=Map(unresolved_default_path -> GraphPath(unresolved_default_path,GraphPattern(D,Map(A -> (A:test), D -> (D:test)),Map(D -> Set((D)->[D_C_2:abc]-(C)))),Map(D -> Set(), A -> Set(id), D_C_2 -> Set())),false)))
|
||||
| └─SourceBlock(graph=KG(Map(D -> IRNode(D,Set()), A -> IRNode(A,Set(id))),Map(D_C_2 -> IREdge(D_C_2,Set()))))""".stripMargin
|
||||
blocks.head.pretty should equal(blockRst)
|
||||
@ -1077,7 +1081,7 @@ class OpenSPGDslParserTest extends AnyFunSpec {
|
||||
|
||||
val block = parser.parse(dsl)
|
||||
print(block.pretty)
|
||||
val text = """└─TableResultBlock(selectList=OrderedFields(List(IRProperty(s,id), IRVariable(o))), asList=List(s.id, b))
|
||||
val text = """└─TableResultBlock(selectList=OrderedFields(List(IRProperty(s,id), IRVariable(o))), asList=List(s.id, b), distinct=false)
|
||||
* └─ProjectBlock(projects=ProjectFields(Map(IRVariable(o) -> ProjectRule(IRVariable(o),FunctionExpr(name=rule_value)))))
|
||||
* └─FilterBlock(rules=LogicRule(R6,长得高,BinaryOpExpr(name=BOr)))
|
||||
* └─ProjectBlock(projects=ProjectFields(Map(IRVariable(R5) -> LogicRule(R5,颜值高,BinaryOpExpr(name=BGreaterThan)))))
|
||||
@ -1102,7 +1106,7 @@ class OpenSPGDslParserTest extends AnyFunSpec {
|
||||
|
||||
val block = parser.parse(dsl)
|
||||
print(block.pretty)
|
||||
val text = """└─TableResultBlock(selectList=OrderedFields(List(IRProperty(B,name), IRProperty(C,name))), asList=List(B.name, C.name))
|
||||
val text = """└─TableResultBlock(selectList=OrderedFields(List(IRProperty(B,name), IRProperty(C,name))), asList=List(B.name, C.name), distinct=false)
|
||||
* └─FilterBlock(rules=LogicRule(R2,导演编剧同性别,BinaryOpExpr(name=BEqual)))
|
||||
* └─FilterBlock(rules=LogicRule(R1,80后导演,BinaryOpExpr(name=BGreaterThan)))
|
||||
* └─MatchBlock(patterns=Map(unresolved_default_path -> GraphPath(unresolved_default_path,GraphPattern(null,Map(B -> (B:FilmDirector), C -> (C:FilmWriter), A -> (A:Film)),Map(B -> Set((B)<->[E3:workmates]-(C))), A -> Set((A)<->[E2:writerOfFilm]-(C)), (A)<->[E1:directFilm]-(B)))),Map(E3 -> Set(), A -> Set(), E2 -> Set(), E1 -> Set(), B -> Set(birthDate, gender, name), C -> Set(gender, name))),false)))
|
||||
|
||||
@ -16,7 +16,11 @@ package com.antgroup.openspg.reasoner.lube.block
|
||||
import com.antgroup.openspg.reasoner.common.types.KgType
|
||||
import com.antgroup.openspg.reasoner.lube.common.expr.Expr
|
||||
import com.antgroup.openspg.reasoner.lube.common.graph._
|
||||
import com.antgroup.openspg.reasoner.lube.common.pattern.{Element, PatternElement, PredicateElement}
|
||||
import com.antgroup.openspg.reasoner.lube.common.pattern.{
|
||||
Element,
|
||||
PatternElement,
|
||||
PredicateElement
|
||||
}
|
||||
|
||||
/**
|
||||
* every operator block tree of root is result block
|
||||
@ -32,7 +36,8 @@ abstract class ResultBlock[B <: Binds] extends BasicBlock[B](BlockType("result")
|
||||
final case class TableResultBlock(
|
||||
dependencies: List[Block],
|
||||
selectList: OrderedFields,
|
||||
asList: List[String])
|
||||
asList: List[String],
|
||||
distinct: Boolean)
|
||||
extends ResultBlock[OrderedFields] {
|
||||
|
||||
/**
|
||||
|
||||
@ -15,7 +15,11 @@ package com.antgroup.openspg.reasoner.lube.logical.operators
|
||||
|
||||
import com.antgroup.openspg.reasoner.lube.logical.{SolvedModel, Var}
|
||||
|
||||
final case class Select(in: LogicalOperator, fields: List[Var], as: List[String])
|
||||
final case class Select(
|
||||
in: LogicalOperator,
|
||||
fields: List[Var],
|
||||
as: List[String],
|
||||
distinct: Boolean)
|
||||
extends StackingLogicalOperator {
|
||||
|
||||
override def refFields: List[Var] = {
|
||||
|
||||
@ -35,7 +35,7 @@ object Pure extends SimpleRule {
|
||||
case ddl @ DDL(in, _) =>
|
||||
val projects = ddl.refFields.map((_, Directly)).toMap
|
||||
ddl.withNewChildren(Array.apply(Project(in, projects, in.solved)))
|
||||
case select @ Select(in, _, _) =>
|
||||
case select @ Select(in, _, _, _) =>
|
||||
val projects = select.refFields.map((_, Directly)).toMap
|
||||
select.withNewChildren(Array.apply(Project(in, projects, in.solved)))
|
||||
case project @ Project(in, _, _) =>
|
||||
|
||||
@ -16,7 +16,11 @@ package com.antgroup.openspg.reasoner.lube.logical.planning
|
||||
import scala.collection.mutable
|
||||
|
||||
import com.antgroup.openspg.reasoner.common.constants.Constants
|
||||
import com.antgroup.openspg.reasoner.common.exception.{NotImplementedException, SchemaException, UnsupportedOperationException}
|
||||
import com.antgroup.openspg.reasoner.common.exception.{
|
||||
NotImplementedException,
|
||||
SchemaException,
|
||||
UnsupportedOperationException
|
||||
}
|
||||
import com.antgroup.openspg.reasoner.common.graph.edge.SPO
|
||||
import com.antgroup.openspg.reasoner.lube.block._
|
||||
import com.antgroup.openspg.reasoner.lube.catalog.{Catalog, SemanticPropertyGraph}
|
||||
@ -76,7 +80,8 @@ object LogicalPlanner {
|
||||
PathVar(name, elements.map(e => planWithoutResult.solved.getVar(e.name)))
|
||||
case _ => throw UnsupportedOperationException(s"unsupported ${x}")
|
||||
}),
|
||||
t.asList)
|
||||
t.asList,
|
||||
t.distinct)
|
||||
case d: DDLBlock =>
|
||||
val newDDLs = new mutable.HashSet[DDLOp]()
|
||||
for (ddl <- d.ddlOp) {
|
||||
|
||||
@ -21,20 +21,21 @@ import com.antgroup.openspg.reasoner.lube.logical.validate.semantic.Explain
|
||||
import scala.collection.mutable.ListBuffer
|
||||
|
||||
object PathExplain extends Explain {
|
||||
|
||||
override def explain(implicit context: LogicalPlannerContext): PartialFunction[Block, Block] = {
|
||||
case tableResultBlock@TableResultBlock(dependencies, selectList, asList) =>
|
||||
case tableResultBlock @ TableResultBlock(dependencies, selectList, asList, distinct) =>
|
||||
if (selectList.fields.isEmpty) {
|
||||
tableResultBlock
|
||||
} else {
|
||||
val pathNodes = ListBuffer[String]()
|
||||
val pathEdges = ListBuffer[String]()
|
||||
val newSelectFields = selectList.fields.map {
|
||||
case path@IRPath(_, elements) =>
|
||||
case path @ IRPath(_, elements) =>
|
||||
val newPathField = elements.map {
|
||||
case node@IRNode(name, fields) =>
|
||||
case node @ IRNode(name, fields) =>
|
||||
pathNodes.+=(name)
|
||||
node.copy(fields = fields + Constants.PROPERTY_JSON_KEY)
|
||||
case edge@IREdge(name, fields) =>
|
||||
case edge @ IREdge(name, fields) =>
|
||||
pathEdges.+=(name)
|
||||
edge.copy(fields = fields + Constants.PROPERTY_JSON_KEY)
|
||||
case other => other
|
||||
@ -44,32 +45,32 @@ object PathExplain extends Explain {
|
||||
case other => other
|
||||
}
|
||||
val newSelectList = selectList.copy(orderedFields = newSelectFields)
|
||||
val newTableResultBlock = TableResultBlock(dependencies, newSelectList, asList)
|
||||
val newTableResultBlock = TableResultBlock(dependencies, newSelectList, asList, distinct)
|
||||
newTableResultBlock.rewriteTopDown(explainMatch(pathNodes, pathEdges))
|
||||
}
|
||||
}
|
||||
|
||||
private def explainMatch(pathNodes: ListBuffer[String],
|
||||
pathEdges: ListBuffer[String]): PartialFunction[Block, Block] = {
|
||||
case matchBlock@MatchBlock(dependencies, patterns) =>
|
||||
private def explainMatch(
|
||||
pathNodes: ListBuffer[String],
|
||||
pathEdges: ListBuffer[String]): PartialFunction[Block, Block] = {
|
||||
case matchBlock @ MatchBlock(dependencies, patterns) =>
|
||||
if (patterns.isEmpty) {
|
||||
matchBlock
|
||||
} else {
|
||||
val newPatterns = patterns.map {
|
||||
p =>
|
||||
val pattern = p._2.graphPattern
|
||||
val newProperties = pattern.properties.map {
|
||||
case (key, value) =>
|
||||
if (pathNodes.contains(key) || pathEdges.contains(key)) {
|
||||
(key, value + Constants.PROPERTY_JSON_KEY)
|
||||
} else {
|
||||
(key, value)
|
||||
}
|
||||
val newPatterns = patterns.map { p =>
|
||||
val pattern = p._2.graphPattern
|
||||
val newProperties = pattern.properties.map { case (key, value) =>
|
||||
if (pathNodes.contains(key) || pathEdges.contains(key)) {
|
||||
(key, value + Constants.PROPERTY_JSON_KEY)
|
||||
} else {
|
||||
(key, value)
|
||||
}
|
||||
val newPath = p._2.copy(graphPattern = pattern.copy(properties = newProperties))
|
||||
(p._1, newPath)
|
||||
}
|
||||
val newPath = p._2.copy(graphPattern = pattern.copy(properties = newProperties))
|
||||
(p._1, newPath)
|
||||
}
|
||||
MatchBlock(dependencies, newPatterns)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -16,13 +16,22 @@ package com.antgroup.openspg.reasoner.lube.physical.operators
|
||||
import scala.reflect.runtime.universe.TypeTag
|
||||
|
||||
import com.antgroup.openspg.reasoner.lube.logical.Var
|
||||
import com.antgroup.openspg.reasoner.lube.physical.rdg.RDG
|
||||
import com.antgroup.openspg.reasoner.lube.physical.rdg.{RDG, Row}
|
||||
|
||||
final case class Select[T <: RDG[T]: TypeTag](
|
||||
in: PhysicalOperator[T],
|
||||
orderedFields: List[Var],
|
||||
as: List[String])
|
||||
as: List[String],
|
||||
distinct: Boolean)
|
||||
extends PhysicalOperator[T] {
|
||||
def row: T#Records = in.rdg.select(orderedFields, as)
|
||||
|
||||
def row: Row[T] = {
|
||||
var row = in.rdg.select(orderedFields, as)
|
||||
if (distinct) {
|
||||
row = row.distinct()
|
||||
}
|
||||
row
|
||||
}
|
||||
|
||||
override def meta: List[Var] = orderedFields
|
||||
}
|
||||
|
||||
@ -20,7 +20,13 @@ import scala.reflect.runtime.universe.TypeTag
|
||||
import com.antgroup.openspg.reasoner.common.exception.NotImplementedException
|
||||
import com.antgroup.openspg.reasoner.common.graph.edge.Direction
|
||||
import com.antgroup.openspg.reasoner.lube.common.expr.Directly
|
||||
import com.antgroup.openspg.reasoner.lube.logical.{operators => LogicalOperators, NodeVar, PathVar, RepeatPathVar, Var}
|
||||
import com.antgroup.openspg.reasoner.lube.logical.{
|
||||
operators => LogicalOperators,
|
||||
NodeVar,
|
||||
PathVar,
|
||||
RepeatPathVar,
|
||||
Var
|
||||
}
|
||||
import com.antgroup.openspg.reasoner.lube.logical.operators.LogicalOperator
|
||||
import com.antgroup.openspg.reasoner.lube.logical.planning.{InnerJoin, LeftOuterJoin}
|
||||
import com.antgroup.openspg.reasoner.lube.physical.operators._
|
||||
@ -52,8 +58,8 @@ object PhysicalPlanner {
|
||||
PatternScan(plan[T](in, workingRdgName), pattern, input.fields)
|
||||
case LogicalOperators.ExpandInto(in, target, pattern) =>
|
||||
ExpandInto(plan[T](in, workingRdgName), target, pattern, input.fields)
|
||||
case LogicalOperators.Select(in, orderedFields, as) =>
|
||||
Select(plan[T](in, workingRdgName), orderedFields, as)
|
||||
case LogicalOperators.Select(in, orderedFields, as, distinct) =>
|
||||
Select(plan[T](in, workingRdgName), orderedFields, as, distinct)
|
||||
case LogicalOperators.Filter(in, expr) => Filter(plan[T](in, workingRdgName), expr)
|
||||
case LogicalOperators.DDL(in, ddlOp) => DDL(plan[T](in, workingRdgName), ddlOp)
|
||||
case LogicalOperators.Aggregate(in, group, aggregations, _) =>
|
||||
|
||||
@ -33,8 +33,6 @@ import com.antgroup.openspg.reasoner.lube.logical.planning.JoinType
|
||||
abstract class RDG[T <: RDG[T]] extends Result {
|
||||
this: T =>
|
||||
|
||||
type Records <: Row[T]
|
||||
|
||||
/**
|
||||
* Match the giving pattern on Graph
|
||||
* @param pattern specific pattern, see more [[Pattern]]
|
||||
@ -57,7 +55,7 @@ abstract class RDG[T <: RDG[T]] extends Result {
|
||||
* @param cols columns to select.
|
||||
* @return
|
||||
*/
|
||||
def select(cols: List[Var], as: List[String]): Records
|
||||
def select(cols: List[Var], as: List[String]): Row[T]
|
||||
|
||||
/**
|
||||
* Returns a [[RDG]] containing only data where the given expression evaluates to
|
||||
|
||||
@ -23,4 +23,10 @@ abstract class Row[T <: RDG[T]](val orderedFields: List[Var], val rdg: T) extend
|
||||
* @param rows number of rows to print
|
||||
*/
|
||||
def show(rows: Int = 20): Unit
|
||||
|
||||
/**
|
||||
* Remove duplicate result in row
|
||||
* @return
|
||||
*/
|
||||
def distinct(): Row[T]
|
||||
}
|
||||
|
||||
@ -13,20 +13,24 @@
|
||||
|
||||
package com.antgroup.openspg.reasoner.runner.local.rdg;
|
||||
|
||||
import com.antgroup.openspg.common.util.ArrayWrapper;
|
||||
import com.antgroup.openspg.reasoner.lube.logical.Var;
|
||||
import com.antgroup.openspg.reasoner.lube.physical.rdg.Row;
|
||||
import com.antgroup.openspg.reasoner.runner.local.model.LocalReasonerResult;
|
||||
import com.google.common.collect.Lists;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import scala.collection.JavaConversions;
|
||||
|
||||
@Slf4j
|
||||
public class LocalRow extends Row<LocalRDG> {
|
||||
private final List<String> columns;
|
||||
private final List<Object[]> rowList;
|
||||
private List<Object[]> rowList;
|
||||
|
||||
/** row implement */
|
||||
public LocalRow(
|
||||
@ -50,6 +54,14 @@ public class LocalRow extends Row<LocalRDG> {
|
||||
log.info("###############GeaflowRowShow###############");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Row<LocalRDG> distinct() {
|
||||
Set<ArrayWrapper> rowSet = new HashSet<>();
|
||||
this.rowList.forEach(row -> rowSet.add(new ArrayWrapper(row)));
|
||||
this.rowList = rowSet.stream().map(ArrayWrapper::getValue).collect(Collectors.toList());
|
||||
return this;
|
||||
}
|
||||
|
||||
/** get select result */
|
||||
public LocalReasonerResult getResult() {
|
||||
return new LocalReasonerResult(columns, rowList);
|
||||
|
||||
@ -15,14 +15,18 @@ package com.antgroup.reasoner.session
|
||||
|
||||
import com.antgroup.openspg.reasoner.lube.block.{DDLOp, SortItem}
|
||||
import com.antgroup.openspg.reasoner.lube.common.expr.{Aggregator, Expr}
|
||||
import com.antgroup.openspg.reasoner.lube.common.pattern.{EdgePattern, LinkedPatternConnection, Pattern, PatternElement}
|
||||
import com.antgroup.openspg.reasoner.lube.common.pattern.{
|
||||
EdgePattern,
|
||||
LinkedPatternConnection,
|
||||
Pattern,
|
||||
PatternElement
|
||||
}
|
||||
import com.antgroup.openspg.reasoner.lube.common.rule.Rule
|
||||
import com.antgroup.openspg.reasoner.lube.logical.planning.JoinType
|
||||
import com.antgroup.openspg.reasoner.lube.logical.{RichVar, Var}
|
||||
import com.antgroup.openspg.reasoner.lube.logical.planning.JoinType
|
||||
import com.antgroup.openspg.reasoner.lube.physical.rdg.RDG
|
||||
|
||||
class EmptyRDG extends RDG[EmptyRDG] {
|
||||
override type Records = EmptyRow
|
||||
|
||||
/**
|
||||
* Match the giving pattern on Graph
|
||||
@ -49,7 +53,8 @@ class EmptyRDG extends RDG[EmptyRDG] {
|
||||
* @param cols columns to select.
|
||||
* @return
|
||||
*/
|
||||
override def select(cols: List[Var], as: List[String]): EmptyRow = new EmptyRow(cols, this)
|
||||
override def select(cols: List[Var], as: List[String]): EmptyRow =
|
||||
new EmptyRow(cols, this)
|
||||
|
||||
/**
|
||||
* Returns a [[RDG]] containing only data where the given expression evaluates to
|
||||
|
||||
@ -25,4 +25,11 @@ class EmptyRow(orderedFields: List[Var], rdg: EmptyRDG)
|
||||
* @param rows number of rows to print
|
||||
*/
|
||||
override def show(rows: Int): Unit = {}
|
||||
|
||||
/**
|
||||
* Remove duplicate result in row
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
override def distinct(): Row[EmptyRDG] = this
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user