format java format

This commit is contained in:
peilong 2024-12-06 10:31:33 +08:00
parent a3786a5a83
commit c5b7c7576b
3 changed files with 44 additions and 44 deletions

View File

@ -58,21 +58,21 @@ public class Neo4jSinkWriter extends BaseSinkWriter<Neo4jSinkNodeConfig> {
private Project project;
private static final String DOT = ".";
private static RejectedExecutionHandler handler =
(r, executor) -> {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
};
(r, executor) -> {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
};
private static ExecutorService executor =
new ThreadPoolExecutor(
new ThreadPoolExecutor(
NUM_THREADS,
MAX_NUM_THREADS,
2 * 60L,
MAX_NUM_THREADS,
2 * 60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(200),
handler);
new LinkedBlockingQueue<>(200),
handler);
public Neo4jSinkWriter(String id, String name, Neo4jSinkNodeConfig config) {
super(id, name, config);

View File

@ -12,38 +12,31 @@
*/
package com.antgroup.openspg.server.core.reasoner.service.impl;
import com.antgroup.openspg.common.util.tuple.Tuple2;
import com.antgroup.openspg.reasoner.common.constants.Constants;
import com.antgroup.openspg.reasoner.common.graph.edge.Direction;
import com.antgroup.openspg.reasoner.common.graph.edge.IEdge;
import com.antgroup.openspg.reasoner.common.graph.edge.SPO;
import com.antgroup.openspg.reasoner.common.graph.property.IProperty;
import com.antgroup.openspg.reasoner.common.graph.property.impl.VertexProperty;
import com.antgroup.openspg.reasoner.common.graph.property.impl.VertexVersionProperty;
import com.antgroup.openspg.reasoner.common.graph.vertex.IVertex;
import com.antgroup.openspg.reasoner.common.graph.vertex.IVertexId;
import com.antgroup.openspg.reasoner.common.graph.vertex.impl.Vertex;
import com.antgroup.openspg.reasoner.common.graph.vertex.impl.VertexBizId;
import com.antgroup.openspg.reasoner.graphstate.GraphState;
import com.antgroup.openspg.reasoner.udf.model.LinkedUdtfResult;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
/**
* @author donghai.ydh
* @version Utils.java, v 0.1 2024-04-17 10:32 donghai.ydh
*/
@Slf4j
public class Utils {
public static Map<String, Set<Tuple2<String, String>>> getAllRdfEntity(
public static List<LinkedUdtfResult> getAllRdfEntity(
GraphState<IVertexId> graphState, IVertexId id) {
Map<String, Set<Tuple2<String, String>>> result = new HashMap<>();
List<LinkedUdtfResult> result = new ArrayList<>();
// find vertex prop
IVertex<IVertexId, IProperty> vertex = graphState.getVertex(id, null, null);
@ -57,14 +50,23 @@ public class Utils {
}
IVertex<IVertexId, IProperty> propVertex = new Vertex<>();
propVertex.setId(new VertexBizId(String.valueOf(pValue), "Text"));
propVertex.setValue(new VertexVersionProperty(
propVertex.setValue(
new VertexVersionProperty(
"id", String.valueOf(pValue),
"name", String.valueOf(pValue)
));
"name", String.valueOf(pValue)));
graphState.addVertex(propVertex);
result
.computeIfAbsent(propertyName, k -> new HashSet<>())
.add(new Tuple2<>(String.valueOf(pValue), "Text"));
LinkedUdtfResult udtfRes = new LinkedUdtfResult();
udtfRes.setEdgeType(propertyName);
udtfRes.getTargetVertexIdList().add(String.valueOf(pValue));
if (pValue instanceof Integer) {
udtfRes.getTargetVertexTypeList().add("Int");
} else if (pValue instanceof Double || pValue instanceof Float) {
udtfRes.getTargetVertexTypeList().add("Float");
} else {
udtfRes.getTargetVertexTypeList().add("Text");
}
udtfRes.getEdgePropertyMap().put("value", pValue);
result.add(udtfRes);
}
}
@ -79,9 +81,17 @@ public class Utils {
}
SPO spo = new SPO(edge.getType());
log.info("TargetRdfProperty,id={},,edgeType={}", id, edge.getType());
result
.computeIfAbsent(spo.getP(), k -> new HashSet<>())
.add(new Tuple2<>(String.valueOf(toIdObj), edge.getTargetId().getType()));
LinkedUdtfResult udtfRes = new LinkedUdtfResult();
udtfRes.setEdgeType(spo.getP());
udtfRes.getTargetVertexIdList().add(String.valueOf(toIdObj));
udtfRes.getTargetVertexTypeList().add(edge.getTargetId().getType());
for (String propKey : edge.getValue().getKeySet()) {
if (propKey.startsWith("_")) {
continue;
}
udtfRes.getEdgePropertyMap().put(propKey, edge.getValue().get(propKey));
}
result.add(udtfRes);
}
}
return result;

View File

@ -12,7 +12,6 @@
*/
package com.antgroup.openspg.server.core.reasoner.service.udtf;
import com.antgroup.openspg.common.util.tuple.Tuple2;
import com.antgroup.openspg.reasoner.common.constants.Constants;
import com.antgroup.openspg.reasoner.common.graph.vertex.IVertexId;
import com.antgroup.openspg.reasoner.common.graph.vertex.impl.VertexBizId;
@ -27,7 +26,6 @@ import com.antgroup.openspg.server.core.reasoner.service.impl.Utils;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.Map;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
/**
@ -76,17 +74,9 @@ public class RdfExpand extends BaseUdtf {
vertexType = (String) sMap.get(Constants.CONTEXT_LABEL);
}
IVertexId id = new VertexBizId(bizId, vertexType);
// 结果
Map<String, Set<Tuple2<String, String>>> validBizIdMap = Utils.getAllRdfEntity(graphState, id);
for (Map.Entry<String, Set<Tuple2<String, String>>> entry : validBizIdMap.entrySet()) {
LinkedUdtfResult udtfResult = new LinkedUdtfResult();
udtfResult.setEdgeType(entry.getKey());
for (Tuple2<String, String> data : entry.getValue()) {
udtfResult.getTargetVertexIdList().add(data.first);
udtfResult.getTargetVertexTypeList().add(data.second);
}
List<LinkedUdtfResult> validBizIds = Utils.getAllRdfEntity(graphState, id);
for (LinkedUdtfResult udtfResult : validBizIds) {
forward(Lists.newArrayList(udtfResult));
}
}