diff --git a/reasoner/pom.xml b/reasoner/pom.xml
index 0f70033a..4a896737 100644
--- a/reasoner/pom.xml
+++ b/reasoner/pom.xml
@@ -352,6 +352,14 @@
+
+
+ com.aliyun.openservices
+ log-loghub-producer
+ 0.1.4
+
+
+
com.opencsv
opencsv
diff --git a/reasoner/runner/runner-common/pom.xml b/reasoner/runner/runner-common/pom.xml
index 068de278..0939c87d 100644
--- a/reasoner/runner/runner-common/pom.xml
+++ b/reasoner/runner/runner-common/pom.xml
@@ -144,5 +144,12 @@
+
+
+ com.aliyun.openservices
+ log-loghub-producer
+
+
+
diff --git a/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/io/IoFactory.java b/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/io/IoFactory.java
index 8dd9b7f3..82a1b89d 100644
--- a/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/io/IoFactory.java
+++ b/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/io/IoFactory.java
@@ -30,9 +30,12 @@ import com.antgroup.openspg.reasoner.io.hive.HiveWriterSession;
import com.antgroup.openspg.reasoner.io.model.AbstractTableInfo;
import com.antgroup.openspg.reasoner.io.model.HiveTableInfo;
import com.antgroup.openspg.reasoner.io.model.OdpsTableInfo;
+import com.antgroup.openspg.reasoner.io.model.SLSTableInfo;
import com.antgroup.openspg.reasoner.io.odps.OdpsReader;
import com.antgroup.openspg.reasoner.io.odps.OdpsUtils;
import com.antgroup.openspg.reasoner.io.odps.OdpsWriter;
+import com.antgroup.openspg.reasoner.io.sls.SlsWriter;
+import com.antgroup.openspg.reasoner.io.sls.SlsWriterSession;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
@@ -74,6 +77,11 @@ public class IoFactory {
HiveWriterSession hiveWriterSession = HiveUtils.createHiveWriterSession(hiveTableInfo);
sessionId = hiveWriterSession.getSessionId();
SESSION_MAP.put(hiveTableInfo, new Tuple2<>(sessionId, hiveWriterSession));
+ } else if (tableInfo instanceof SLSTableInfo) {
+ SLSTableInfo slsTableInfo = (SLSTableInfo) tableInfo;
+ SlsWriterSession slsWriterSession = new SlsWriterSession(slsTableInfo);
+ sessionId = slsWriterSession.getSessionId();
+ SESSION_MAP.put(slsTableInfo, new Tuple2<>(sessionId, slsWriterSession));
}
log.info(
"createWriterSession,table_info="
@@ -118,6 +126,21 @@ public class IoFactory {
TABLE_WRITER_CACHE.put(cacheKey, hiveWriter);
}
return TABLE_WRITER_CACHE.getIfPresent(cacheKey);
+ } else if (tableInfo instanceof SLSTableInfo) {
+ resultWriter = TABLE_WRITER_CACHE.getIfPresent(cacheKey);
+ if (null != resultWriter) {
+ return resultWriter;
+ }
+ synchronized (TABLE_WRITER_CACHE) {
+ resultWriter = TABLE_WRITER_CACHE.getIfPresent(cacheKey);
+ if (null != resultWriter) {
+ return resultWriter;
+ }
+ SlsWriter slsWriter = new SlsWriter();
+ slsWriter.open(index, parallel, tableInfo);
+ TABLE_WRITER_CACHE.put(cacheKey, slsWriter);
+ }
+ return TABLE_WRITER_CACHE.getIfPresent(cacheKey);
}
throw new NotImplementedException(
"table type not support," + tableInfo.getClass().getName(), null);
diff --git a/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/io/model/SLSTableInfo.java b/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/io/model/SLSTableInfo.java
new file mode 100644
index 00000000..57d91432
--- /dev/null
+++ b/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/io/model/SLSTableInfo.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2023 OpenSPG Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied.
+ */
+package com.antgroup.openspg.reasoner.io.model;
+
+import lombok.Data;
+
+/**
+ * @author kejian
+ * @version SLSTableInfo.java, v 0.1 2024年03月05日 11:49 AM kejian
+ */
+@Data
+public class SLSTableInfo extends AbstractTableInfo {
+ /** sls project */
+ private String project;
+
+ /** sls endpoint */
+ private String endpoint;
+
+ /** sls logStore */
+ private String logStore;
+
+ /** sls accessId */
+ private String accessId;
+
+ /** sls accessKey */
+ private String accessKey;
+
+ /** akg Task dev id */
+ private String taskId;
+}
diff --git a/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/io/sls/SlsWriter.java b/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/io/sls/SlsWriter.java
new file mode 100644
index 00000000..2888d586
--- /dev/null
+++ b/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/io/sls/SlsWriter.java
@@ -0,0 +1,137 @@
+/*
+ * Copyright 2023 OpenSPG Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied.
+ */
+package com.antgroup.openspg.reasoner.io.sls;
+
+import com.alibaba.fastjson.JSONObject;
+import com.aliyun.openservices.log.common.LogItem;
+import com.aliyun.openservices.log.producer.LogProducer;
+import com.aliyun.openservices.log.producer.ProducerConfig;
+import com.aliyun.openservices.log.producer.ProjectConfig;
+import com.antgroup.openspg.reasoner.common.table.Field;
+import com.antgroup.openspg.reasoner.io.ITableWriter;
+import com.antgroup.openspg.reasoner.io.model.AbstractTableInfo;
+import com.antgroup.openspg.reasoner.io.model.SLSTableInfo;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class SlsWriter implements ITableWriter {
+ private int taskIndex;
+ private SLSTableInfo slsTableInfo;
+ private transient LogProducer logProducer;
+ private String taskDevId;
+ private List outputColumnNameList;
+ private List logItemList;
+ private static final int cacheSize = 2000;
+ private Long writeCount = 0L;
+
+ @Override
+ public void open(int taskIndex, int parallel, AbstractTableInfo tableInfo) {
+ this.taskIndex = taskIndex;
+ this.slsTableInfo = (SLSTableInfo) tableInfo;
+ this.taskDevId = slsTableInfo.getTaskId();
+ logItemList = new ArrayList<>(cacheSize);
+ this.outputColumnNameList =
+ slsTableInfo.getColumns().stream().map(Field::getName).collect(Collectors.toList());
+ log.info("open SlsWriter,index=" + this.taskIndex + ",slsTableInfo=" + this.slsTableInfo);
+ initLogProducer();
+ }
+
+ private void initLogProducer() {
+ ProducerConfig producerConfig = new ProducerConfig();
+ producerConfig.packageTimeoutInMS = 3000;
+ LogProducer producer = new LogProducer(producerConfig);
+ ProjectConfig projectConfig =
+ new ProjectConfig(
+ slsTableInfo.getProject(),
+ slsTableInfo.getEndpoint(),
+ slsTableInfo.getAccessId(),
+ slsTableInfo.getAccessKey());
+ producer.setProjectConfig(projectConfig);
+ this.logProducer = producer;
+ }
+
+ @Override
+ public void write(Object[] data) {
+ writeCount++;
+ Map content = new HashMap<>();
+ for (int i = 0; i < data.length; i++) {
+ content.put(outputColumnNameList.get(i), data[i]);
+ }
+ content.put("taskId", taskDevId);
+ String dataStr = JSONObject.toJSONString(content);
+ LogItem item = new LogItem();
+ item.PushBack("content", dataStr);
+ logItemList.add(item);
+ if (writeCount % 1000 == 0) {
+ log.info("write_sls_record content=" + dataStr + ", writer_count=" + writeCount);
+ }
+ if (logItemList.size() >= cacheSize) {
+ flush();
+ }
+ }
+
+ private void flush() {
+ try {
+ logProducer.send(
+ slsTableInfo.getProject(),
+ slsTableInfo.getLogStore(),
+ slsTableInfo.getLogStore(),
+ null,
+ null,
+ logItemList);
+ logProducer.flush();
+ logItemList.clear();
+ } catch (Exception e) {
+ throw new RuntimeException("SLS logProducer write error", e);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (null == logProducer) {
+ return;
+ }
+ try {
+ if (!logItemList.isEmpty()) {
+ flush();
+ }
+ log.info(
+ "sls_writer_close, index="
+ + this.taskIndex
+ + ", info="
+ + slsTableInfo
+ + ", odps_write_count="
+ + writeCount);
+ // Wait for the sending thread to end
+ Thread.sleep(1000);
+ logProducer.close();
+ } catch (Exception e) {
+ log.error("close_sls_writer_error", e);
+ throw new RuntimeException(e);
+ } finally {
+ logProducer = null;
+ writeCount = 0L;
+ logItemList.clear();
+ }
+ }
+
+ @Override
+ public long writeCount() {
+ return writeCount;
+ }
+}
diff --git a/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/io/sls/SlsWriterSession.java b/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/io/sls/SlsWriterSession.java
new file mode 100644
index 00000000..6991b508
--- /dev/null
+++ b/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/io/sls/SlsWriterSession.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2023 OpenSPG Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied.
+ */
+package com.antgroup.openspg.reasoner.io.sls;
+
+import com.antgroup.openspg.reasoner.io.model.SLSTableInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SlsWriterSession {
+ private static final Logger log = LoggerFactory.getLogger(SlsWriterSession.class);
+ private final SLSTableInfo slsTableInfo;
+
+ public SlsWriterSession(SLSTableInfo slsTableInfo) {
+ this.slsTableInfo = slsTableInfo;
+ }
+
+ public String getSessionId() {
+ return String.valueOf(this.slsTableInfo.hashCode());
+ }
+}
diff --git a/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/runner/ConfigKey.java b/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/runner/ConfigKey.java
index ee358211..49152061 100644
--- a/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/runner/ConfigKey.java
+++ b/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/runner/ConfigKey.java
@@ -157,4 +157,7 @@ public class ConfigKey {
/** edge extra identifier, separated by commas */
public static final String EDGE_EXTRA_IDENTIFIER = "kg.reasoner.edge.extra.identifier";
+
+ /** the devId of akg task */
+ public static final String DEV_ID = "devId";
}
diff --git a/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/sink/KgReasonerSinkType.java b/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/sink/KgReasonerSinkType.java
index ab9af654..c3e93c4f 100644
--- a/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/sink/KgReasonerSinkType.java
+++ b/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/sink/KgReasonerSinkType.java
@@ -25,5 +25,8 @@ public enum KgReasonerSinkType implements Serializable {
/** hive table */
HIVE,
/** openspg canvas */
- CANVAS
+ CANVAS,
+
+ /** sls */
+ REALTIME;
}
diff --git a/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/sink/KgReasonerSinkUtils.java b/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/sink/KgReasonerSinkUtils.java
index 858aa233..19805b3a 100644
--- a/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/sink/KgReasonerSinkUtils.java
+++ b/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/sink/KgReasonerSinkUtils.java
@@ -19,6 +19,7 @@ import com.antgroup.openspg.reasoner.io.model.AbstractTableInfo;
import com.antgroup.openspg.reasoner.io.model.CanvasTableInfo;
import com.antgroup.openspg.reasoner.io.model.HiveTableInfo;
import com.antgroup.openspg.reasoner.io.model.OdpsTableInfo;
+import com.antgroup.openspg.reasoner.io.model.SLSTableInfo;
import com.antgroup.openspg.reasoner.progress.DecryptUtils;
import com.antgroup.openspg.reasoner.runner.ConfigKey;
import java.util.HashMap;
@@ -33,7 +34,7 @@ public class KgReasonerSinkUtils {
return KgReasonerSinkType.LOG;
}
String outputType = outputTableConfig.getString("type");
- return KgReasonerSinkType.valueOf(outputType);
+ return KgReasonerSinkType.valueOf(outputType.toUpperCase());
}
/** get sink table info from config */
@@ -70,6 +71,22 @@ public class KgReasonerSinkUtils {
canvasTableInfo.setQueryId(outputTableConfig.getString("queryId"));
canvasTableInfo.setApiPath(outputTableConfig.getString("canvasUrl"));
return canvasTableInfo;
+ } else if (KgReasonerSinkType.REALTIME.equals(sinkType)) {
+ JSONObject outputTableConfig = getOutputTableConfig(params);
+ assert outputTableConfig != null;
+ String slsConfigStr = outputTableConfig.getString("SLS");
+ assert slsConfigStr != null;
+ JSONObject slsConfigs = JSON.parseObject(slsConfigStr);
+ Object devId = params.get(ConfigKey.DEV_ID);
+ assert devId != null;
+ SLSTableInfo slsTableInfo = new SLSTableInfo();
+ slsTableInfo.setProject(slsConfigs.getString("project"));
+ slsTableInfo.setEndpoint(slsConfigs.getString("endpoint"));
+ slsTableInfo.setLogStore(slsConfigs.getString("logStore"));
+ slsTableInfo.setAccessId(slsConfigs.getString("accessId"));
+ slsTableInfo.setAccessKey(DecryptUtils.decryptAccessInfo(slsConfigs.getString("accessKey")));
+ slsTableInfo.setTaskId(String.valueOf(devId));
+ return slsTableInfo;
}
return null;
}