fix(server): fix graph write oom (#414)

This commit is contained in:
royzhao 2024-12-13 17:59:51 +08:00 committed by GitHub
parent 7b99252b9b
commit b05c57b451
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 51 additions and 40 deletions

View File

@ -51,13 +51,13 @@ public class Neo4jSinkWriter extends BaseSinkWriter<Neo4jSinkNodeConfig> {
private static final int NUM_THREADS = 10; private static final int NUM_THREADS = 10;
private static final int MAX_NUM_THREADS = 200;
private ExecuteNode node; private ExecuteNode node;
private Neo4jStoreClient client; private Neo4jStoreClient client;
private Project project; private Project project;
private static final String DOT = "."; private static final String DOT = ".";
ExecutorService executor; private static RejectedExecutionHandler handler =
RejectedExecutionHandler handler =
(r, executor) -> { (r, executor) -> {
try { try {
executor.getQueue().put(r); executor.getQueue().put(r);
@ -65,6 +65,14 @@ public class Neo4jSinkWriter extends BaseSinkWriter<Neo4jSinkNodeConfig> {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
}; };
private static ExecutorService executor =
new ThreadPoolExecutor(
NUM_THREADS,
MAX_NUM_THREADS,
2 * 60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(200),
handler);
public Neo4jSinkWriter(String id, String name, Neo4jSinkNodeConfig config) { public Neo4jSinkWriter(String id, String name, Neo4jSinkNodeConfig config) {
super(id, name, config); super(id, name, config);
@ -79,14 +87,6 @@ public class Neo4jSinkWriter extends BaseSinkWriter<Neo4jSinkNodeConfig> {
} }
client = new Neo4jStoreClient(context.getGraphStoreUrl()); client = new Neo4jStoreClient(context.getGraphStoreUrl());
project = JSON.parseObject(context.getProject(), Project.class); project = JSON.parseObject(context.getProject(), Project.class);
executor =
new ThreadPoolExecutor(
NUM_THREADS,
NUM_THREADS,
2 * 60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100),
handler);
} }
@Override @Override

View File

@ -12,34 +12,31 @@
*/ */
package com.antgroup.openspg.server.core.reasoner.service.impl; package com.antgroup.openspg.server.core.reasoner.service.impl;
import com.antgroup.openspg.common.util.tuple.Tuple2;
import com.antgroup.openspg.reasoner.common.constants.Constants; import com.antgroup.openspg.reasoner.common.constants.Constants;
import com.antgroup.openspg.reasoner.common.graph.edge.Direction; import com.antgroup.openspg.reasoner.common.graph.edge.Direction;
import com.antgroup.openspg.reasoner.common.graph.edge.IEdge; import com.antgroup.openspg.reasoner.common.graph.edge.IEdge;
import com.antgroup.openspg.reasoner.common.graph.edge.SPO; import com.antgroup.openspg.reasoner.common.graph.edge.SPO;
import com.antgroup.openspg.reasoner.common.graph.property.IProperty; import com.antgroup.openspg.reasoner.common.graph.property.IProperty;
import com.antgroup.openspg.reasoner.common.graph.property.impl.VertexVersionProperty;
import com.antgroup.openspg.reasoner.common.graph.vertex.IVertex; import com.antgroup.openspg.reasoner.common.graph.vertex.IVertex;
import com.antgroup.openspg.reasoner.common.graph.vertex.IVertexId; import com.antgroup.openspg.reasoner.common.graph.vertex.IVertexId;
import com.antgroup.openspg.reasoner.common.graph.vertex.impl.Vertex;
import com.antgroup.openspg.reasoner.common.graph.vertex.impl.VertexBizId;
import com.antgroup.openspg.reasoner.graphstate.GraphState; import com.antgroup.openspg.reasoner.graphstate.GraphState;
import com.antgroup.openspg.reasoner.udf.model.LinkedUdtfResult;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
/**
* @author donghai.ydh
* @version Utils.java, v 0.1 2024-04-17 10:32 donghai.ydh
*/
@Slf4j @Slf4j
public class Utils { public class Utils {
public static Map<String, Set<Tuple2<String, String>>> getAllRdfEntity( public static List<LinkedUdtfResult> getAllRdfEntity(
GraphState<IVertexId> graphState, IVertexId id) { GraphState<IVertexId> graphState, IVertexId id) {
Map<String, Set<Tuple2<String, String>>> result = new HashMap<>(); List<LinkedUdtfResult> result = new ArrayList<>();
// find vertex prop // find vertex prop
IVertex<IVertexId, IProperty> vertex = graphState.getVertex(id, null, null); IVertex<IVertexId, IProperty> vertex = graphState.getVertex(id, null, null);
@ -48,12 +45,28 @@ public class Utils {
log.info("vertex_property,{}", vertex); log.info("vertex_property,{}", vertex);
for (String propertyName : vertex.getValue().getKeySet()) { for (String propertyName : vertex.getValue().getKeySet()) {
Object pValue = vertex.getValue().get(propertyName); Object pValue = vertex.getValue().get(propertyName);
if (null == pValue) { if (null == pValue || propertyName.startsWith("_")) {
continue; continue;
} }
result IVertex<IVertexId, IProperty> propVertex = new Vertex<>();
.computeIfAbsent(propertyName, k -> new HashSet<>()) propVertex.setId(new VertexBizId(String.valueOf(pValue), "Text"));
.add(new Tuple2<>(String.valueOf(pValue), "Text")); propVertex.setValue(
new VertexVersionProperty(
"id", String.valueOf(pValue),
"name", String.valueOf(pValue)));
graphState.addVertex(propVertex);
LinkedUdtfResult udtfRes = new LinkedUdtfResult();
udtfRes.setEdgeType(propertyName);
udtfRes.getTargetVertexIdList().add(String.valueOf(pValue));
if (pValue instanceof Integer) {
udtfRes.getTargetVertexTypeList().add("Int");
} else if (pValue instanceof Double || pValue instanceof Float) {
udtfRes.getTargetVertexTypeList().add("Float");
} else {
udtfRes.getTargetVertexTypeList().add("Text");
}
udtfRes.getEdgePropertyMap().put("value", pValue);
result.add(udtfRes);
} }
} }
@ -68,9 +81,17 @@ public class Utils {
} }
SPO spo = new SPO(edge.getType()); SPO spo = new SPO(edge.getType());
log.info("TargetRdfProperty,id={},,edgeType={}", id, edge.getType()); log.info("TargetRdfProperty,id={},,edgeType={}", id, edge.getType());
result LinkedUdtfResult udtfRes = new LinkedUdtfResult();
.computeIfAbsent(spo.getP(), k -> new HashSet<>()) udtfRes.setEdgeType(spo.getP());
.add(new Tuple2<>(String.valueOf(toIdObj), edge.getTargetId().getType())); udtfRes.getTargetVertexIdList().add(String.valueOf(toIdObj));
udtfRes.getTargetVertexTypeList().add(edge.getTargetId().getType());
for (String propKey : edge.getValue().getKeySet()) {
if (propKey.startsWith("_")) {
continue;
}
udtfRes.getEdgePropertyMap().put(propKey, edge.getValue().get(propKey));
}
result.add(udtfRes);
} }
} }
return result; return result;

View File

@ -12,7 +12,6 @@
*/ */
package com.antgroup.openspg.server.core.reasoner.service.udtf; package com.antgroup.openspg.server.core.reasoner.service.udtf;
import com.antgroup.openspg.common.util.tuple.Tuple2;
import com.antgroup.openspg.reasoner.common.constants.Constants; import com.antgroup.openspg.reasoner.common.constants.Constants;
import com.antgroup.openspg.reasoner.common.graph.vertex.IVertexId; import com.antgroup.openspg.reasoner.common.graph.vertex.IVertexId;
import com.antgroup.openspg.reasoner.common.graph.vertex.impl.VertexBizId; import com.antgroup.openspg.reasoner.common.graph.vertex.impl.VertexBizId;
@ -27,7 +26,6 @@ import com.antgroup.openspg.server.core.reasoner.service.impl.Utils;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
/** /**
@ -76,17 +74,9 @@ public class RdfExpand extends BaseUdtf {
vertexType = (String) sMap.get(Constants.CONTEXT_LABEL); vertexType = (String) sMap.get(Constants.CONTEXT_LABEL);
} }
IVertexId id = new VertexBizId(bizId, vertexType); IVertexId id = new VertexBizId(bizId, vertexType);
// 结果 // 结果
Map<String, Set<Tuple2<String, String>>> validBizIdMap = Utils.getAllRdfEntity(graphState, id); List<LinkedUdtfResult> validBizIds = Utils.getAllRdfEntity(graphState, id);
for (LinkedUdtfResult udtfResult : validBizIds) {
for (Map.Entry<String, Set<Tuple2<String, String>>> entry : validBizIdMap.entrySet()) {
LinkedUdtfResult udtfResult = new LinkedUdtfResult();
udtfResult.setEdgeType(entry.getKey());
for (Tuple2<String, String> data : entry.getValue()) {
udtfResult.getTargetVertexIdList().add(data.first);
udtfResult.getTargetVertexTypeList().add(data.second);
}
forward(Lists.newArrayList(udtfResult)); forward(Lists.newArrayList(udtfResult));
} }
} }