diff --git a/builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/physical/sink/impl/Neo4jSinkWriter.java b/builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/physical/sink/impl/Neo4jSinkWriter.java index 6d8ec0ad..4cfff384 100644 --- a/builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/physical/sink/impl/Neo4jSinkWriter.java +++ b/builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/physical/sink/impl/Neo4jSinkWriter.java @@ -58,21 +58,21 @@ public class Neo4jSinkWriter extends BaseSinkWriter { private Project project; private static final String DOT = "."; private static RejectedExecutionHandler handler = - (r, executor) -> { - try { - executor.getQueue().put(r); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - }; + (r, executor) -> { + try { + executor.getQueue().put(r); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }; private static ExecutorService executor = - new ThreadPoolExecutor( + new ThreadPoolExecutor( NUM_THREADS, - MAX_NUM_THREADS, - 2 * 60L, + MAX_NUM_THREADS, + 2 * 60L, TimeUnit.SECONDS, - new LinkedBlockingQueue<>(200), - handler); + new LinkedBlockingQueue<>(200), + handler); public Neo4jSinkWriter(String id, String name, Neo4jSinkNodeConfig config) { super(id, name, config); diff --git a/server/core/reasoner/service/src/main/java/com/antgroup/openspg/server/core/reasoner/service/impl/Utils.java b/server/core/reasoner/service/src/main/java/com/antgroup/openspg/server/core/reasoner/service/impl/Utils.java index 43711e03..954d4a24 100644 --- a/server/core/reasoner/service/src/main/java/com/antgroup/openspg/server/core/reasoner/service/impl/Utils.java +++ b/server/core/reasoner/service/src/main/java/com/antgroup/openspg/server/core/reasoner/service/impl/Utils.java @@ -12,38 +12,31 @@ */ package com.antgroup.openspg.server.core.reasoner.service.impl; -import com.antgroup.openspg.common.util.tuple.Tuple2; import com.antgroup.openspg.reasoner.common.constants.Constants; import com.antgroup.openspg.reasoner.common.graph.edge.Direction; import com.antgroup.openspg.reasoner.common.graph.edge.IEdge; import com.antgroup.openspg.reasoner.common.graph.edge.SPO; import com.antgroup.openspg.reasoner.common.graph.property.IProperty; -import com.antgroup.openspg.reasoner.common.graph.property.impl.VertexProperty; import com.antgroup.openspg.reasoner.common.graph.property.impl.VertexVersionProperty; import com.antgroup.openspg.reasoner.common.graph.vertex.IVertex; import com.antgroup.openspg.reasoner.common.graph.vertex.IVertexId; import com.antgroup.openspg.reasoner.common.graph.vertex.impl.Vertex; import com.antgroup.openspg.reasoner.common.graph.vertex.impl.VertexBizId; import com.antgroup.openspg.reasoner.graphstate.GraphState; +import com.antgroup.openspg.reasoner.udf.model.LinkedUdtfResult; +import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.Set; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; -/** - * @author donghai.ydh - * @version Utils.java, v 0.1 2024-04-17 10:32 donghai.ydh - */ @Slf4j public class Utils { - public static Map>> getAllRdfEntity( + public static List getAllRdfEntity( GraphState graphState, IVertexId id) { - Map>> result = new HashMap<>(); + List result = new ArrayList<>(); // find vertex prop IVertex vertex = graphState.getVertex(id, null, null); @@ -57,14 +50,23 @@ public class Utils { } IVertex propVertex = new Vertex<>(); propVertex.setId(new VertexBizId(String.valueOf(pValue), "Text")); - propVertex.setValue(new VertexVersionProperty( + propVertex.setValue( + new VertexVersionProperty( "id", String.valueOf(pValue), - "name", String.valueOf(pValue) - )); + "name", String.valueOf(pValue))); graphState.addVertex(propVertex); - result - .computeIfAbsent(propertyName, k -> new HashSet<>()) - .add(new Tuple2<>(String.valueOf(pValue), "Text")); + LinkedUdtfResult udtfRes = new LinkedUdtfResult(); + udtfRes.setEdgeType(propertyName); + udtfRes.getTargetVertexIdList().add(String.valueOf(pValue)); + if (pValue instanceof Integer) { + udtfRes.getTargetVertexTypeList().add("Int"); + } else if (pValue instanceof Double || pValue instanceof Float) { + udtfRes.getTargetVertexTypeList().add("Float"); + } else { + udtfRes.getTargetVertexTypeList().add("Text"); + } + udtfRes.getEdgePropertyMap().put("value", pValue); + result.add(udtfRes); } } @@ -79,9 +81,17 @@ public class Utils { } SPO spo = new SPO(edge.getType()); log.info("TargetRdfProperty,id={},,edgeType={}", id, edge.getType()); - result - .computeIfAbsent(spo.getP(), k -> new HashSet<>()) - .add(new Tuple2<>(String.valueOf(toIdObj), edge.getTargetId().getType())); + LinkedUdtfResult udtfRes = new LinkedUdtfResult(); + udtfRes.setEdgeType(spo.getP()); + udtfRes.getTargetVertexIdList().add(String.valueOf(toIdObj)); + udtfRes.getTargetVertexTypeList().add(edge.getTargetId().getType()); + for (String propKey : edge.getValue().getKeySet()) { + if (propKey.startsWith("_")) { + continue; + } + udtfRes.getEdgePropertyMap().put(propKey, edge.getValue().get(propKey)); + } + result.add(udtfRes); } } return result; diff --git a/server/core/reasoner/service/src/main/java/com/antgroup/openspg/server/core/reasoner/service/udtf/RdfExpand.java b/server/core/reasoner/service/src/main/java/com/antgroup/openspg/server/core/reasoner/service/udtf/RdfExpand.java index 48db7a38..d7d4b8e3 100644 --- a/server/core/reasoner/service/src/main/java/com/antgroup/openspg/server/core/reasoner/service/udtf/RdfExpand.java +++ b/server/core/reasoner/service/src/main/java/com/antgroup/openspg/server/core/reasoner/service/udtf/RdfExpand.java @@ -12,7 +12,6 @@ */ package com.antgroup.openspg.server.core.reasoner.service.udtf; -import com.antgroup.openspg.common.util.tuple.Tuple2; import com.antgroup.openspg.reasoner.common.constants.Constants; import com.antgroup.openspg.reasoner.common.graph.vertex.IVertexId; import com.antgroup.openspg.reasoner.common.graph.vertex.impl.VertexBizId; @@ -27,7 +26,6 @@ import com.antgroup.openspg.server.core.reasoner.service.impl.Utils; import com.google.common.collect.Lists; import java.util.List; import java.util.Map; -import java.util.Set; import lombok.extern.slf4j.Slf4j; /** @@ -76,17 +74,9 @@ public class RdfExpand extends BaseUdtf { vertexType = (String) sMap.get(Constants.CONTEXT_LABEL); } IVertexId id = new VertexBizId(bizId, vertexType); - // 结果 - Map>> validBizIdMap = Utils.getAllRdfEntity(graphState, id); - - for (Map.Entry>> entry : validBizIdMap.entrySet()) { - LinkedUdtfResult udtfResult = new LinkedUdtfResult(); - udtfResult.setEdgeType(entry.getKey()); - for (Tuple2 data : entry.getValue()) { - udtfResult.getTargetVertexIdList().add(data.first); - udtfResult.getTargetVertexTypeList().add(data.second); - } + List validBizIds = Utils.getAllRdfEntity(graphState, id); + for (LinkedUdtfResult udtfResult : validBizIds) { forward(Lists.newArrayList(udtfResult)); } }