feat(reasoner): supports export to SLS (#157)

This commit is contained in:
wangshaofei 2024-03-19 16:41:11 +08:00 committed by GitHub
parent 64059f36d0
commit a781b5a312
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 270 additions and 2 deletions

View File

@ -352,6 +352,14 @@
</dependency>
<!-- hive end -->
<!-- sls start -->
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>log-loghub-producer</artifactId>
<version>0.1.4</version>
</dependency>
<!-- sls end -->
<dependency>
<groupId>com.opencsv</groupId>
<artifactId>opencsv</artifactId>

View File

@ -144,5 +144,12 @@
</dependency>
<!-- hive end -->
<!-- sls start -->
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>log-loghub-producer</artifactId>
</dependency>
<!-- sls end -->
</dependencies>
</project>

View File

@ -30,9 +30,12 @@ import com.antgroup.openspg.reasoner.io.hive.HiveWriterSession;
import com.antgroup.openspg.reasoner.io.model.AbstractTableInfo;
import com.antgroup.openspg.reasoner.io.model.HiveTableInfo;
import com.antgroup.openspg.reasoner.io.model.OdpsTableInfo;
import com.antgroup.openspg.reasoner.io.model.SLSTableInfo;
import com.antgroup.openspg.reasoner.io.odps.OdpsReader;
import com.antgroup.openspg.reasoner.io.odps.OdpsUtils;
import com.antgroup.openspg.reasoner.io.odps.OdpsWriter;
import com.antgroup.openspg.reasoner.io.sls.SlsWriter;
import com.antgroup.openspg.reasoner.io.sls.SlsWriterSession;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
@ -74,6 +77,11 @@ public class IoFactory {
HiveWriterSession hiveWriterSession = HiveUtils.createHiveWriterSession(hiveTableInfo);
sessionId = hiveWriterSession.getSessionId();
SESSION_MAP.put(hiveTableInfo, new Tuple2<>(sessionId, hiveWriterSession));
} else if (tableInfo instanceof SLSTableInfo) {
SLSTableInfo slsTableInfo = (SLSTableInfo) tableInfo;
SlsWriterSession slsWriterSession = new SlsWriterSession(slsTableInfo);
sessionId = slsWriterSession.getSessionId();
SESSION_MAP.put(slsTableInfo, new Tuple2<>(sessionId, slsWriterSession));
}
log.info(
"createWriterSession,table_info="
@ -118,6 +126,21 @@ public class IoFactory {
TABLE_WRITER_CACHE.put(cacheKey, hiveWriter);
}
return TABLE_WRITER_CACHE.getIfPresent(cacheKey);
} else if (tableInfo instanceof SLSTableInfo) {
resultWriter = TABLE_WRITER_CACHE.getIfPresent(cacheKey);
if (null != resultWriter) {
return resultWriter;
}
synchronized (TABLE_WRITER_CACHE) {
resultWriter = TABLE_WRITER_CACHE.getIfPresent(cacheKey);
if (null != resultWriter) {
return resultWriter;
}
SlsWriter slsWriter = new SlsWriter();
slsWriter.open(index, parallel, tableInfo);
TABLE_WRITER_CACHE.put(cacheKey, slsWriter);
}
return TABLE_WRITER_CACHE.getIfPresent(cacheKey);
}
throw new NotImplementedException(
"table type not support," + tableInfo.getClass().getName(), null);

View File

@ -0,0 +1,40 @@
/*
* Copyright 2023 OpenSPG Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied.
*/
package com.antgroup.openspg.reasoner.io.model;
import lombok.Data;
/**
* @author kejian
* @version SLSTableInfo.java, v 0.1 2024年03月05日 11:49 AM kejian
*/
@Data
public class SLSTableInfo extends AbstractTableInfo {
/** sls project */
private String project;
/** sls endpoint */
private String endpoint;
/** sls logStore */
private String logStore;
/** sls accessId */
private String accessId;
/** sls accessKey */
private String accessKey;
/** akg Task dev id */
private String taskId;
}

View File

@ -0,0 +1,137 @@
/*
* Copyright 2023 OpenSPG Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied.
*/
package com.antgroup.openspg.reasoner.io.sls;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.log.common.LogItem;
import com.aliyun.openservices.log.producer.LogProducer;
import com.aliyun.openservices.log.producer.ProducerConfig;
import com.aliyun.openservices.log.producer.ProjectConfig;
import com.antgroup.openspg.reasoner.common.table.Field;
import com.antgroup.openspg.reasoner.io.ITableWriter;
import com.antgroup.openspg.reasoner.io.model.AbstractTableInfo;
import com.antgroup.openspg.reasoner.io.model.SLSTableInfo;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class SlsWriter implements ITableWriter {
private int taskIndex;
private SLSTableInfo slsTableInfo;
private transient LogProducer logProducer;
private String taskDevId;
private List<String> outputColumnNameList;
private List<LogItem> logItemList;
private static final int cacheSize = 2000;
private Long writeCount = 0L;
@Override
public void open(int taskIndex, int parallel, AbstractTableInfo tableInfo) {
this.taskIndex = taskIndex;
this.slsTableInfo = (SLSTableInfo) tableInfo;
this.taskDevId = slsTableInfo.getTaskId();
logItemList = new ArrayList<>(cacheSize);
this.outputColumnNameList =
slsTableInfo.getColumns().stream().map(Field::getName).collect(Collectors.toList());
log.info("open SlsWriter,index=" + this.taskIndex + ",slsTableInfo=" + this.slsTableInfo);
initLogProducer();
}
private void initLogProducer() {
ProducerConfig producerConfig = new ProducerConfig();
producerConfig.packageTimeoutInMS = 3000;
LogProducer producer = new LogProducer(producerConfig);
ProjectConfig projectConfig =
new ProjectConfig(
slsTableInfo.getProject(),
slsTableInfo.getEndpoint(),
slsTableInfo.getAccessId(),
slsTableInfo.getAccessKey());
producer.setProjectConfig(projectConfig);
this.logProducer = producer;
}
@Override
public void write(Object[] data) {
writeCount++;
Map<String, Object> content = new HashMap<>();
for (int i = 0; i < data.length; i++) {
content.put(outputColumnNameList.get(i), data[i]);
}
content.put("taskId", taskDevId);
String dataStr = JSONObject.toJSONString(content);
LogItem item = new LogItem();
item.PushBack("content", dataStr);
logItemList.add(item);
if (writeCount % 1000 == 0) {
log.info("write_sls_record content=" + dataStr + ", writer_count=" + writeCount);
}
if (logItemList.size() >= cacheSize) {
flush();
}
}
private void flush() {
try {
logProducer.send(
slsTableInfo.getProject(),
slsTableInfo.getLogStore(),
slsTableInfo.getLogStore(),
null,
null,
logItemList);
logProducer.flush();
logItemList.clear();
} catch (Exception e) {
throw new RuntimeException("SLS logProducer write error", e);
}
}
@Override
public void close() {
if (null == logProducer) {
return;
}
try {
if (!logItemList.isEmpty()) {
flush();
}
log.info(
"sls_writer_close, index="
+ this.taskIndex
+ ", info="
+ slsTableInfo
+ ", odps_write_count="
+ writeCount);
// Wait for the sending thread to end
Thread.sleep(1000);
logProducer.close();
} catch (Exception e) {
log.error("close_sls_writer_error", e);
throw new RuntimeException(e);
} finally {
logProducer = null;
writeCount = 0L;
logItemList.clear();
}
}
@Override
public long writeCount() {
return writeCount;
}
}

View File

@ -0,0 +1,30 @@
/*
* Copyright 2023 OpenSPG Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied.
*/
package com.antgroup.openspg.reasoner.io.sls;
import com.antgroup.openspg.reasoner.io.model.SLSTableInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SlsWriterSession {
private static final Logger log = LoggerFactory.getLogger(SlsWriterSession.class);
private final SLSTableInfo slsTableInfo;
public SlsWriterSession(SLSTableInfo slsTableInfo) {
this.slsTableInfo = slsTableInfo;
}
public String getSessionId() {
return String.valueOf(this.slsTableInfo.hashCode());
}
}

View File

@ -157,4 +157,7 @@ public class ConfigKey {
/** edge extra identifier, separated by commas */
public static final String EDGE_EXTRA_IDENTIFIER = "kg.reasoner.edge.extra.identifier";
/** the devId of akg task */
public static final String DEV_ID = "devId";
}

View File

@ -25,5 +25,8 @@ public enum KgReasonerSinkType implements Serializable {
/** hive table */
HIVE,
/** openspg canvas */
CANVAS
CANVAS,
/** sls */
REALTIME;
}

View File

@ -19,6 +19,7 @@ import com.antgroup.openspg.reasoner.io.model.AbstractTableInfo;
import com.antgroup.openspg.reasoner.io.model.CanvasTableInfo;
import com.antgroup.openspg.reasoner.io.model.HiveTableInfo;
import com.antgroup.openspg.reasoner.io.model.OdpsTableInfo;
import com.antgroup.openspg.reasoner.io.model.SLSTableInfo;
import com.antgroup.openspg.reasoner.progress.DecryptUtils;
import com.antgroup.openspg.reasoner.runner.ConfigKey;
import java.util.HashMap;
@ -33,7 +34,7 @@ public class KgReasonerSinkUtils {
return KgReasonerSinkType.LOG;
}
String outputType = outputTableConfig.getString("type");
return KgReasonerSinkType.valueOf(outputType);
return KgReasonerSinkType.valueOf(outputType.toUpperCase());
}
/** get sink table info from config */
@ -70,6 +71,22 @@ public class KgReasonerSinkUtils {
canvasTableInfo.setQueryId(outputTableConfig.getString("queryId"));
canvasTableInfo.setApiPath(outputTableConfig.getString("canvasUrl"));
return canvasTableInfo;
} else if (KgReasonerSinkType.REALTIME.equals(sinkType)) {
JSONObject outputTableConfig = getOutputTableConfig(params);
assert outputTableConfig != null;
String slsConfigStr = outputTableConfig.getString("SLS");
assert slsConfigStr != null;
JSONObject slsConfigs = JSON.parseObject(slsConfigStr);
Object devId = params.get(ConfigKey.DEV_ID);
assert devId != null;
SLSTableInfo slsTableInfo = new SLSTableInfo();
slsTableInfo.setProject(slsConfigs.getString("project"));
slsTableInfo.setEndpoint(slsConfigs.getString("endpoint"));
slsTableInfo.setLogStore(slsConfigs.getString("logStore"));
slsTableInfo.setAccessId(slsConfigs.getString("accessId"));
slsTableInfo.setAccessKey(DecryptUtils.decryptAccessInfo(slsConfigs.getString("accessKey")));
slsTableInfo.setTaskId(String.valueOf(devId));
return slsTableInfo;
}
return null;
}