diff --git a/builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/physical/sink/impl/Neo4jSinkWriter.java b/builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/physical/sink/impl/Neo4jSinkWriter.java index fe227d4b..4cfff384 100644 --- a/builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/physical/sink/impl/Neo4jSinkWriter.java +++ b/builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/physical/sink/impl/Neo4jSinkWriter.java @@ -51,13 +51,13 @@ public class Neo4jSinkWriter extends BaseSinkWriter { private static final int NUM_THREADS = 10; + private static final int MAX_NUM_THREADS = 200; + private ExecuteNode node; private Neo4jStoreClient client; private Project project; private static final String DOT = "."; - ExecutorService executor; - - RejectedExecutionHandler handler = + private static RejectedExecutionHandler handler = (r, executor) -> { try { executor.getQueue().put(r); @@ -65,6 +65,14 @@ public class Neo4jSinkWriter extends BaseSinkWriter { Thread.currentThread().interrupt(); } }; + private static ExecutorService executor = + new ThreadPoolExecutor( + NUM_THREADS, + MAX_NUM_THREADS, + 2 * 60L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(200), + handler); public Neo4jSinkWriter(String id, String name, Neo4jSinkNodeConfig config) { super(id, name, config); @@ -79,14 +87,6 @@ public class Neo4jSinkWriter extends BaseSinkWriter { } client = new Neo4jStoreClient(context.getGraphStoreUrl()); project = JSON.parseObject(context.getProject(), Project.class); - executor = - new ThreadPoolExecutor( - NUM_THREADS, - NUM_THREADS, - 2 * 60L, - TimeUnit.SECONDS, - new LinkedBlockingQueue<>(100), - handler); } @Override diff --git a/server/core/reasoner/service/src/main/java/com/antgroup/openspg/server/core/reasoner/service/impl/Utils.java b/server/core/reasoner/service/src/main/java/com/antgroup/openspg/server/core/reasoner/service/impl/Utils.java index ad5a6373..954d4a24 100644 --- a/server/core/reasoner/service/src/main/java/com/antgroup/openspg/server/core/reasoner/service/impl/Utils.java +++ b/server/core/reasoner/service/src/main/java/com/antgroup/openspg/server/core/reasoner/service/impl/Utils.java @@ -12,34 +12,31 @@ */ package com.antgroup.openspg.server.core.reasoner.service.impl; -import com.antgroup.openspg.common.util.tuple.Tuple2; import com.antgroup.openspg.reasoner.common.constants.Constants; import com.antgroup.openspg.reasoner.common.graph.edge.Direction; import com.antgroup.openspg.reasoner.common.graph.edge.IEdge; import com.antgroup.openspg.reasoner.common.graph.edge.SPO; import com.antgroup.openspg.reasoner.common.graph.property.IProperty; +import com.antgroup.openspg.reasoner.common.graph.property.impl.VertexVersionProperty; import com.antgroup.openspg.reasoner.common.graph.vertex.IVertex; import com.antgroup.openspg.reasoner.common.graph.vertex.IVertexId; +import com.antgroup.openspg.reasoner.common.graph.vertex.impl.Vertex; +import com.antgroup.openspg.reasoner.common.graph.vertex.impl.VertexBizId; import com.antgroup.openspg.reasoner.graphstate.GraphState; +import com.antgroup.openspg.reasoner.udf.model.LinkedUdtfResult; +import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.Set; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; -/** - * @author donghai.ydh - * @version Utils.java, v 0.1 2024-04-17 10:32 donghai.ydh - */ @Slf4j public class Utils { - public static Map>> getAllRdfEntity( + public static List getAllRdfEntity( GraphState graphState, IVertexId id) { - Map>> result = new HashMap<>(); + List result = new ArrayList<>(); // find vertex prop IVertex vertex = graphState.getVertex(id, null, null); @@ -48,12 +45,28 @@ public class Utils { log.info("vertex_property,{}", vertex); for (String propertyName : vertex.getValue().getKeySet()) { Object pValue = vertex.getValue().get(propertyName); - if (null == pValue) { + if (null == pValue || propertyName.startsWith("_")) { continue; } - result - .computeIfAbsent(propertyName, k -> new HashSet<>()) - .add(new Tuple2<>(String.valueOf(pValue), "Text")); + IVertex propVertex = new Vertex<>(); + propVertex.setId(new VertexBizId(String.valueOf(pValue), "Text")); + propVertex.setValue( + new VertexVersionProperty( + "id", String.valueOf(pValue), + "name", String.valueOf(pValue))); + graphState.addVertex(propVertex); + LinkedUdtfResult udtfRes = new LinkedUdtfResult(); + udtfRes.setEdgeType(propertyName); + udtfRes.getTargetVertexIdList().add(String.valueOf(pValue)); + if (pValue instanceof Integer) { + udtfRes.getTargetVertexTypeList().add("Int"); + } else if (pValue instanceof Double || pValue instanceof Float) { + udtfRes.getTargetVertexTypeList().add("Float"); + } else { + udtfRes.getTargetVertexTypeList().add("Text"); + } + udtfRes.getEdgePropertyMap().put("value", pValue); + result.add(udtfRes); } } @@ -68,9 +81,17 @@ public class Utils { } SPO spo = new SPO(edge.getType()); log.info("TargetRdfProperty,id={},,edgeType={}", id, edge.getType()); - result - .computeIfAbsent(spo.getP(), k -> new HashSet<>()) - .add(new Tuple2<>(String.valueOf(toIdObj), edge.getTargetId().getType())); + LinkedUdtfResult udtfRes = new LinkedUdtfResult(); + udtfRes.setEdgeType(spo.getP()); + udtfRes.getTargetVertexIdList().add(String.valueOf(toIdObj)); + udtfRes.getTargetVertexTypeList().add(edge.getTargetId().getType()); + for (String propKey : edge.getValue().getKeySet()) { + if (propKey.startsWith("_")) { + continue; + } + udtfRes.getEdgePropertyMap().put(propKey, edge.getValue().get(propKey)); + } + result.add(udtfRes); } } return result; diff --git a/server/core/reasoner/service/src/main/java/com/antgroup/openspg/server/core/reasoner/service/udtf/RdfExpand.java b/server/core/reasoner/service/src/main/java/com/antgroup/openspg/server/core/reasoner/service/udtf/RdfExpand.java index 48db7a38..d7d4b8e3 100644 --- a/server/core/reasoner/service/src/main/java/com/antgroup/openspg/server/core/reasoner/service/udtf/RdfExpand.java +++ b/server/core/reasoner/service/src/main/java/com/antgroup/openspg/server/core/reasoner/service/udtf/RdfExpand.java @@ -12,7 +12,6 @@ */ package com.antgroup.openspg.server.core.reasoner.service.udtf; -import com.antgroup.openspg.common.util.tuple.Tuple2; import com.antgroup.openspg.reasoner.common.constants.Constants; import com.antgroup.openspg.reasoner.common.graph.vertex.IVertexId; import com.antgroup.openspg.reasoner.common.graph.vertex.impl.VertexBizId; @@ -27,7 +26,6 @@ import com.antgroup.openspg.server.core.reasoner.service.impl.Utils; import com.google.common.collect.Lists; import java.util.List; import java.util.Map; -import java.util.Set; import lombok.extern.slf4j.Slf4j; /** @@ -76,17 +74,9 @@ public class RdfExpand extends BaseUdtf { vertexType = (String) sMap.get(Constants.CONTEXT_LABEL); } IVertexId id = new VertexBizId(bizId, vertexType); - // 结果 - Map>> validBizIdMap = Utils.getAllRdfEntity(graphState, id); - - for (Map.Entry>> entry : validBizIdMap.entrySet()) { - LinkedUdtfResult udtfResult = new LinkedUdtfResult(); - udtfResult.setEdgeType(entry.getKey()); - for (Tuple2 data : entry.getValue()) { - udtfResult.getTargetVertexIdList().add(data.first); - udtfResult.getTargetVertexTypeList().add(data.second); - } + List validBizIds = Utils.getAllRdfEntity(graphState, id); + for (LinkedUdtfResult udtfResult : validBizIds) { forward(Lists.newArrayList(udtfResult)); } }