feat(reasoner): udf load from multiple package path (#200)

Co-authored-by: peilong <peilong.zpl@antgroup.com>
Co-authored-by: wenchengyao <wenchengyao.wcy@antgroup.com>
Co-authored-by: FishJoy <chengqiang.cq@antgroup.com>
This commit is contained in:
Donghai 2024-04-24 10:10:23 +08:00 committed by GitHub
parent 6b97110423
commit b3d00a2a07
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 185 additions and 36 deletions

View File

@ -24,6 +24,9 @@ public class PropertyMeta {
@JSONField(name = "name")
private String name;
@JSONField(name = "nameZh")
private String nameZh;
@JSONField(name = "attrRangeDetail")
private PropertyRangeDetail propRange;

View File

@ -25,6 +25,9 @@ public class RelationTypeDetail {
@JSONField(name = "name")
private String name;
@JSONField(name = "nameZh")
private String nameZh;
@JSONField(name = "startEntityTypeDetail")
private VertexMeta startEntityType;

View File

@ -25,6 +25,9 @@ public class VertexMeta {
@JSONField(name = "name")
private String name;
@JSONField(name = "nameZh")
private String nameZh;
@JSONField(name = "attributeTypeDetailList")
private List<PropertyMeta> attributeTypeDetailList;

View File

@ -84,6 +84,7 @@ public class Utils {
case "java.util.Date":
return KTDate$.MODULE$;
case "java.util.List<java.lang.Object>":
case "java.util.List":
return new KTList(KTObject$.MODULE$);
case "java.util.List<null>":
return new KTList(null);

View File

@ -21,7 +21,7 @@ import java.util.HashMap;
import java.util.Map;
public class EdgeProperty implements IProperty {
private final Map<String, Object> props;
protected final Map<String, Object> props;
/**
* new edge property with property data

View File

@ -20,7 +20,7 @@ import java.util.HashMap;
import java.util.Map;
public class VertexProperty implements IProperty {
private final Map<String, Object> props;
protected final Map<String, Object> props;
public VertexProperty() {
this.props = new HashMap<>();

View File

@ -27,7 +27,7 @@ import java.util.stream.Collectors;
import scala.Tuple2;
public class VertexVersionProperty implements IVersionProperty {
private final Map<String, TreeMap<Long, Object>> props;
protected final Map<String, TreeMap<Long, Object>> props;
/** default constructor */
public VertexVersionProperty() {

View File

@ -18,6 +18,7 @@ import com.antgroup.openspg.reasoner.common.graph.vertex.IVertex;
import com.antgroup.openspg.reasoner.common.graph.vertex.IVertexId;
import com.antgroup.openspg.reasoner.common.graph.vertex.impl.MirrorVertex;
import com.antgroup.openspg.reasoner.common.graph.vertex.impl.NoneVertex;
import com.antgroup.openspg.reasoner.common.graph.vertex.impl.VertexBizId;
import com.antgroup.openspg.reasoner.graphstate.GraphState;
import com.antgroup.openspg.reasoner.kggraph.KgGraph;
import com.antgroup.openspg.reasoner.lube.common.expr.Expr;
@ -83,6 +84,7 @@ public class LocalPropertyGraph implements PropertyGraph<LocalRDG> {
isCarryTraversalGraph);
result.setMaxPathLimit(getMaxPathLimit());
result.setStrictMaxPathLimit(getStrictMaxPathLimit());
result.setDisableDropOp(getDisableDropOp());
return result;
}
@ -114,6 +116,7 @@ public class LocalPropertyGraph implements PropertyGraph<LocalRDG> {
false);
result.setMaxPathLimit(getMaxPathLimit());
result.setStrictMaxPathLimit(getStrictMaxPathLimit());
result.setDisableDropOp(getDisableDropOp());
return result;
}
@ -144,7 +147,7 @@ public class LocalPropertyGraph implements PropertyGraph<LocalRDG> {
}
for (String type : JavaConversions.asJavaCollection(types)) {
for (String idStr : idStrList) {
startIdSet.add(IVertexId.from(idStr, type));
startIdSet.add(new VertexBizId(idStr, type));
}
}
if (startIdSet.isEmpty()) {
@ -160,9 +163,10 @@ public class LocalPropertyGraph implements PropertyGraph<LocalRDG> {
getTaskId(),
// subquery can not carry all graph
getExecutionRecorder(),
false);
isCarryTraversalGraph);
result.setMaxPathLimit(getMaxPathLimit());
result.setStrictMaxPathLimit(getStrictMaxPathLimit());
result.setDisableDropOp(getDisableDropOp());
return result;
}
@ -260,6 +264,14 @@ public class LocalPropertyGraph implements PropertyGraph<LocalRDG> {
return Long.parseLong(String.valueOf(maxPathLimitObj));
}
private boolean getDisableDropOp() {
Object disableDropOpObj = null;
if (null != task && null != this.task.getParams()) {
disableDropOpObj = this.task.getParams().get(ConfigKey.REASONER_DISABLE_DROP_OP);
}
return "true".equals(String.valueOf(disableDropOpObj));
}
private IExecutionRecorder getExecutionRecorder() {
if (null == task) {
return new EmptyRecorder();

View File

@ -102,6 +102,7 @@ import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Predicate;
@ -153,6 +154,9 @@ public class LocalRDG extends RDG<LocalRDG> {
/** carry all tranversal graph data */
protected boolean isCarryTraversalGraph = false;
/** disable drop op */
protected boolean disableDropOp = false;
private java.util.Set<IVertexId> getStartId(java.util.List<KgGraph<IVertexId>> kgGraphList) {
java.util.Set<IVertexId> startIdSet = new HashSet<>();
for (KgGraph<IVertexId> kgGraph : kgGraphList) {
@ -317,6 +321,7 @@ public class LocalRDG extends RDG<LocalRDG> {
@Override
public LocalRDG linkedExpand(EdgePattern<LinkedPatternConnection> pattern) {
long startTime = System.currentTimeMillis();
java.util.List<KgGraph<IVertexId>> newKgGraphList = new ArrayList<>();
UdtfMeta udtfMeta = RunnerUtil.chooseUdtfMeta(pattern);
@ -328,8 +333,29 @@ public class LocalRDG extends RDG<LocalRDG> {
LinkEdgeImpl linkEdge =
new LinkEdgeImpl(
this.taskId, this.kgGraphSchema, staticParameters, pattern, udtfMeta, null, graphState);
java.util.List<CompletableFuture<java.util.List<KgGraph<IVertexId>>>> futureList =
new ArrayList<>();
for (KgGraph<IVertexId> kgGraph : this.kgGraphList) {
java.util.List<KgGraph<IVertexId>> splitedKgGraphList = linkEdge.link(kgGraph);
CompletableFuture<java.util.List<KgGraph<IVertexId>>> future =
CompletableFuture.supplyAsync(
new Supplier<java.util.List<KgGraph<IVertexId>>>() {
@Override
public java.util.List<KgGraph<IVertexId>> get() {
return linkEdge.link(kgGraph);
}
});
futureList.add(future);
}
for (CompletableFuture<java.util.List<KgGraph<IVertexId>>> future : futureList) {
java.util.List<KgGraph<IVertexId>> splitedKgGraphList;
try {
splitedKgGraphList = future.get(this.executorTimeoutMs, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
log.warn("linkedExpandTimeout,funcName=" + pattern.edge().funcName(), e);
continue;
} catch (Exception e) {
throw new RuntimeException("patternScan error " + e.getMessage(), e);
}
if (CollectionUtils.isNotEmpty(splitedKgGraphList)) {
KgGraph<IVertexId> result = new KgGraphImpl();
result.merge(splitedKgGraphList, null);
@ -351,7 +377,9 @@ public class LocalRDG extends RDG<LocalRDG> {
+ ",matchCount="
+ count
+ ", linkedTargetVertexSize="
+ targetVertexSize);
+ targetVertexSize
+ " cost time="
+ (System.currentTimeMillis() - startTime));
this.executionRecorder.stageResultWithDetail(
"linkedExpand(" + RunnerUtil.getReadablePattern(pattern) + ")",
this.kgGraphList.size(),
@ -903,6 +931,9 @@ public class LocalRDG extends RDG<LocalRDG> {
@Override
public LocalRDG dropFields(Set<Var> fields) {
if (disableDropOp) {
return this;
}
java.util.Set<Var> dropFieldSet = new HashSet<>(JavaConversions.asJavaCollection(fields));
if (CollectionUtils.isEmpty(dropFieldSet)) {
return this;
@ -1372,4 +1403,8 @@ public class LocalRDG extends RDG<LocalRDG> {
public void setStrictMaxPathLimit(Long strictMaxPathLimit) {
this.strictMaxPathLimit = strictMaxPathLimit;
}
public void setDisableDropOp(boolean disableDropOp) {
this.disableDropOp = disableDropOp;
}
}

View File

@ -18,6 +18,7 @@ import com.antgroup.openspg.reasoner.common.graph.edge.IEdge;
import com.antgroup.openspg.reasoner.common.graph.edge.impl.Edge;
import com.antgroup.openspg.reasoner.common.graph.property.IProperty;
import com.antgroup.openspg.reasoner.common.graph.property.impl.EdgeProperty;
import com.antgroup.openspg.reasoner.common.graph.property.impl.VertexProperty;
import com.antgroup.openspg.reasoner.common.graph.vertex.IVertex;
import com.antgroup.openspg.reasoner.common.graph.vertex.IVertexId;
import com.antgroup.openspg.reasoner.common.graph.vertex.impl.Vertex;
@ -104,8 +105,10 @@ public class LinkEdgeImpl implements Serializable {
paramList.add(parameter);
}
String sourceAlias = linkedEdgePattern.src().alias();
Set<String> targetTypeSet = JavaConversions.setAsJavaSet(linkedEdgePattern.dst().typeNames());
BaseUdtf tableFunction = udtfMeta.createTableFunction();
tableFunction.initialize(graphState);
tableFunction.initialize(graphState, context, sourceAlias, targetTypeSet);
tableFunction.process(paramList);
List<List<Object>> udtfResult = tableFunction.getCollector();
List<LinkedUdtfResult> linkedUdtfResultList =
@ -123,7 +126,6 @@ public class LinkEdgeImpl implements Serializable {
if (CollectionUtils.isEmpty(linkedUdtfResultList)) {
continue;
}
String sourceAlias = linkedEdgePattern.src().alias();
List<IVertex<IVertexId, IProperty>> sourceList = path.getVertex(sourceAlias);
if (null == sourceList || sourceList.size() != 1) {
throw new RuntimeException("There is more than one start vertex in kgGraph path");
@ -137,8 +139,8 @@ public class LinkEdgeImpl implements Serializable {
for (LinkedUdtfResult linkedUdtfResult : linkedUdtfResultList) {
for (String targetIdStr : linkedUdtfResult.getTargetVertexIdList()) {
// add target vertex
String targetAlias = pc.target();
PatternElement targetVertexMeta = linkedEdgePattern.dst();
String targetAlias = targetVertexMeta.alias();
List<String> targetVertexTypes =
new ArrayList<>(JavaConversions.setAsJavaSet(targetVertexMeta.typeNames()));
if (targetVertexTypes.size() == 0) {
@ -147,13 +149,17 @@ public class LinkEdgeImpl implements Serializable {
}
for (String targetVertexType : targetVertexTypes) {
IVertexId targetId = new VertexId(targetIdStr, targetVertexType);
Map<String, Object> propertyMap = new HashMap<>();
VertexProperty vertexProperty = new VertexProperty(propertyMap);
vertexProperty.put(Constants.NODE_ID_KEY, targetIdStr);
vertexProperty.put(Constants.CONTEXT_LABEL, targetVertexType);
if (partitioner != null && !partitioner.canPartition(targetId)) {
continue;
}
// need add property with id
Set<IVertex<IVertexId, IProperty>> newVertexSet =
newAliasVertexMap.computeIfAbsent(targetAlias, k -> new HashSet<>());
newVertexSet.add(new Vertex<>(targetId));
newVertexSet.add(new Vertex<>(targetId, vertexProperty));
Map<String, Object> props = new HashMap<>(linkedUdtfResult.getEdgePropertyMap());
props.put(Constants.EDGE_TO_ID_KEY, targetIdStr);

View File

@ -162,4 +162,7 @@ public class ConfigKey {
/** the devId of akg task */
public static final String DEV_ID = "devId";
/** disable drop */
public static final String REASONER_DISABLE_DROP_OP = "kg.reasoner.disable.drop.op";
}

View File

@ -152,7 +152,24 @@ abstract class KGReasonerSession[T <: RDG[T]: TypeTag](
* @return
*/
def plan(query: String, params: Map[String, Object]): List[PhysicalOperator[T]] = {
optimizedLogicalPlan = plan2OptimizedLogicalPlan(query, params)
val start = System.currentTimeMillis()
val blocks = plan2UnresolvedLogicalPlan(query, params)
if (ParameterUtils.isEnableSPGPlanPrettyPrint(params)) {
for (block: Block <- blocks) {
logger.info(block.pretty)
}
}
logger.info(
"benchmark main plan plan2UnresolvedLogicalPlan cost = "
+ (System.currentTimeMillis() - start))
planBlock(blocks, params)
}
/**
* Generate the optimization physical plan from Blocks.
*/
def planBlock(blocks: List[Block], params: Map[String, Object]): List[PhysicalOperator[T]] = {
optimizedLogicalPlan = plan2OptimizedLogicalPlan(blocks, params)
planLogicalPlan2PhysicalPlan(optimizedLogicalPlan, params)
}
@ -166,20 +183,9 @@ abstract class KGReasonerSession[T <: RDG[T]: TypeTag](
* @return
*/
def plan2OptimizedLogicalPlan(
query: String,
blocks: List[Block],
params: Map[String, Object]): List[LogicalOperator] = {
var start = System.currentTimeMillis()
val blocks = plan2UnresolvedLogicalPlan(query, params)
if (ParameterUtils.isEnableSPGPlanPrettyPrint(params)) {
for (block: Block <- blocks) {
logger.info(block.pretty)
}
}
logger.info(
"benchmark main plan plan2UnresolvedLogicalPlan cost = "
+ (System.currentTimeMillis() - start))
start = System.currentTimeMillis()
val start = System.currentTimeMillis()
val optimizedLogicalPlan = plan2LogicalPlan(blocks, params)
logger.info(
"benchmark main plan plan2LogicalPlan cost = "

View File

@ -40,6 +40,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import scala.Tuple2;
@Slf4j(topic = "userlogger")
@ -56,18 +57,83 @@ public class UdfMngImpl implements UdfMng {
if (null == instance) {
synchronized (UdfMngImpl.class) {
if (null == instance) {
instance = createInstance();
instance =
createInstance(
Lists.newArrayList(KGDSL_UDF_PACKAGE_PATH),
Lists.newArrayList(KGDSL_UDAF_PACKAGE_PATH),
Lists.newArrayList(KGDSL_UDTF_PACKAGE_PATH),
null,
null,
null);
}
}
}
return instance;
}
private static UdfMngImpl createInstance() {
/** 多路径 */
public static UdfMngImpl getInstance(
List<String> udfPackagePaths,
List<String> udafPackagePaths,
List<String> udtfPackagePaths,
List<UdfMeta> udfMetaList,
List<UdafMeta> udafMetaList,
List<UdtfMeta> udtfMetaList) {
if (null == instance) {
synchronized (UdfMngImpl.class) {
if (null == instance) {
List<String> _udfPackagePaths = Lists.newArrayList(KGDSL_UDF_PACKAGE_PATH);
if (CollectionUtils.isNotEmpty(udfPackagePaths)) {
_udfPackagePaths.addAll(udfPackagePaths);
}
List<String> _udafPackagePaths = Lists.newArrayList(KGDSL_UDAF_PACKAGE_PATH);
if (CollectionUtils.isNotEmpty(udafPackagePaths)) {
_udafPackagePaths.addAll(udafPackagePaths);
}
List<String> _udtfPackagePaths = Lists.newArrayList(KGDSL_UDTF_PACKAGE_PATH);
if (CollectionUtils.isNotEmpty(udtfPackagePaths)) {
_udtfPackagePaths.addAll(udtfPackagePaths);
}
instance =
createInstance(
_udfPackagePaths,
_udafPackagePaths,
_udtfPackagePaths,
udfMetaList,
udafMetaList,
udtfMetaList);
}
}
}
return instance;
}
private static UdfMngImpl createInstance(
List<String> udfPackagePaths,
List<String> udafPackagePaths,
List<String> udtfPackagePaths,
List<UdfMeta> udfMetaList,
List<UdafMeta> udafMetaList,
List<UdtfMeta> udtfMetaList) {
UdfMngImpl udfMng = new UdfMngImpl();
udfMng.getAllUdf();
udfMng.getAllUdaf();
udfMng.getAllUdtf();
for (String packagePath : udfPackagePaths) {
udfMng.getUdfInPath(packagePath);
}
for (String packagePath : udafPackagePaths) {
udfMng.getUdafInPath(packagePath);
}
for (String packagePath : udtfPackagePaths) {
udfMng.getUdtfInPath(packagePath);
}
if (CollectionUtils.isNotEmpty(udfMetaList)) {
udfMetaList.forEach(udfMng::addUdfMeta);
}
if (CollectionUtils.isNotEmpty(udafMetaList)) {
udafMetaList.forEach(udfMng::addUdafMeta);
}
if (CollectionUtils.isNotEmpty(udtfMetaList)) {
udtfMetaList.forEach(udfMng::addUdtfMeta);
}
udfMng.udfCheck();
return udfMng;
}
@ -77,8 +143,8 @@ public class UdfMngImpl implements UdfMng {
private static final String KGDSL_UDF_PACKAGE_PATH =
"com.antgroup.openspg.reasoner.udf.builtin.udf";
private void getAllUdf() {
FastClasspathScanner classpathScanner = new FastClasspathScanner(KGDSL_UDF_PACKAGE_PATH);
private void getUdfInPath(String packagePath) {
FastClasspathScanner classpathScanner = new FastClasspathScanner(packagePath);
classpathScanner.addClassLoader(getClass().getClassLoader());
classpathScanner
.matchAllStandardClasses(
@ -130,8 +196,8 @@ public class UdfMngImpl implements UdfMng {
private static final String KGDSL_UDAF_PACKAGE_PATH =
"com.antgroup.openspg.reasoner.udf.builtin.udaf";
private void getAllUdaf() {
FastClasspathScanner classpathScanner = new FastClasspathScanner(KGDSL_UDAF_PACKAGE_PATH);
private void getUdafInPath(String packagePath) {
FastClasspathScanner classpathScanner = new FastClasspathScanner(packagePath);
classpathScanner.addClassLoader(getClass().getClassLoader());
classpathScanner
.matchClassesImplementing(
@ -158,8 +224,8 @@ public class UdfMngImpl implements UdfMng {
private static final String KGDSL_UDTF_PACKAGE_PATH =
"com.antgroup.openspg.reasoner.udf.builtin.udtf";
private void getAllUdtf() {
FastClasspathScanner classpathScanner = new FastClasspathScanner(KGDSL_UDTF_PACKAGE_PATH);
private void getUdtfInPath(String packagePath) {
FastClasspathScanner classpathScanner = new FastClasspathScanner(packagePath);
classpathScanner.addClassLoader(getClass().getClassLoader());
classpathScanner
.matchClassesWithAnnotation(

View File

@ -68,6 +68,9 @@ public class GraphLoaderConfig implements Serializable {
/** kgstate schema url */
protected String schemaUrl = null;
/** kgstate schema retry times */
protected int schemaRetryTimes = 3;
/** enable binary property or not */
protected Boolean binary = false;
@ -318,6 +321,14 @@ public class GraphLoaderConfig implements Serializable {
this.binary = binary;
}
public int getSchemaRetryTimes() {
return schemaRetryTimes;
}
public void setSchemaRetryTimes(int schemaRetryTimes) {
this.schemaRetryTimes = schemaRetryTimes;
}
/** verify config */
public GraphLoaderConfig verify() {
if (CollectionUtils.isEmpty(this.edgeLoaderConfigs)) {