fix(all): version 0.7.1 (#545)

This commit is contained in:
Andy 2025-04-25 16:19:27 +08:00 committed by GitHub
parent 43203a8128
commit c0eebf0244
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 201 additions and 64 deletions

View File

@ -10,7 +10,7 @@
# or implied. # or implied.
docker buildx build -f Dockerfile --platform linux/arm64/v8,linux/amd64 --push \ docker buildx build -f Dockerfile --platform linux/arm64/v8,linux/amd64 --push \
-t spg-registry.cn-hangzhou.cr.aliyuncs.com/spg/openspg-mysql:0.6 \ -t spg-registry.cn-hangzhou.cr.aliyuncs.com/spg/openspg-mysql:0.7 \
-t spg-registry.cn-hangzhou.cr.aliyuncs.com/spg/openspg-mysql:latest \ -t spg-registry.cn-hangzhou.cr.aliyuncs.com/spg/openspg-mysql:latest \
-t openspg/openspg-mysql:0.5.1 \ -t openspg/openspg-mysql:0.5.1 \
-t openspg/openspg-mysql:latest \ -t openspg/openspg-mysql:latest \

View File

@ -50,14 +50,3 @@ RUN . /etc/profile && echo ${JAVA_HOME} && mkdir -p /home/admin/ && chmod -R 777
pip install openspg-kag==0.7.0 &&\ pip install openspg-kag==0.7.0 &&\
pip install pemja==0.4.0 && \ pip install pemja==0.4.0 && \
pip cache purge pip cache purge
# RUN python3 -m venv /openspg_venv && \
# . /openspg_venv/bin/activate && \
# export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-$(dpkg --print-architecture) && \
# pip3 install openspg-kag==0.6.0b9 -i https://artifacts.antgroup-inc.cn/artifact/repositories/simple-dev/ && \
# pip3 install pemja==0.4.0 && \
# pip3 install -U "http://alps-common.oss-cn-hangzhou-zmf.aliyuncs.com/nscommon/shiji/nscommon-0.0.1.tar.gz" && \
# echo "if (tty -s); then \n . /openspg_venv/bin/activate \nfi" >> ~/.bashrc
# ADD openspg/dev/release/python/lib/builder*.jar /home/admin/miniconda3/lib/python3.10/site-packages/knext/builder/lib

View File

@ -10,7 +10,7 @@
# or implied. # or implied.
alias docker=podman alias docker=podman
IMAGE="spg-registry.cn-hangzhou.cr.aliyuncs.com/spg/openspg-python" IMAGE="spg-registry.cn-hangzhou.cr.aliyuncs.com/spg/openspg-python"
VERSION="0.6" VERSION="0.7"
LATEST="latest" LATEST="latest"
cd ../../../../ cd ../../../../

View File

@ -11,8 +11,8 @@
# for amd64 # for amd64
docker build -f Dockerfile --platform linux/amd64 --push \ docker build -f Dockerfile --platform linux/amd64 --push \
-t spg-registry.cn-hangzhou.cr.aliyuncs.com/spg/openspg-python:0.5.1 \ -t spg-registry.cn-hangzhou.cr.aliyuncs.com/spg/openspg-python:0.7 \
-t spg-registry.cn-hangzhou.cr.aliyuncs.com/spg/openspg-python:latest \ -t spg-registry.cn-hangzhou.cr.aliyuncs.com/spg/openspg-python:latest \
-t openspg/openspg-python:0.5.1 \ -t openspg/openspg-python:0.7 \
-t openspg/openspg-python:latest \ -t openspg/openspg-python:latest \
. .

View File

@ -10,6 +10,6 @@
# or implied. # or implied.
docker buildx build -f Dockerfile --platform linux/arm64/v8,linux/amd64 --push \ docker buildx build -f Dockerfile --platform linux/arm64/v8,linux/amd64 --push \
-t spg-registry.cn-hangzhou.cr.aliyuncs.com/spg/openspg-server:0.6 \ -t spg-registry.cn-hangzhou.cr.aliyuncs.com/spg/openspg-server:0.7 \
-t spg-registry.cn-hangzhou.cr.aliyuncs.com/spg/openspg-server:latest \ -t spg-registry.cn-hangzhou.cr.aliyuncs.com/spg/openspg-server:latest \
. .

View File

@ -38,3 +38,18 @@ services:
volumes: volumes:
- /etc/localtime:/etc/localtime:ro - /etc/localtime:/etc/localtime:ro
- $HOME/dozerdb/logs:/logs - $HOME/dozerdb/logs:/logs
minio:
image: spg-registry.cn-hangzhou.cr.aliyuncs.com/spg/openspg-minio:latest
container_name: release-openspg-minio
command: server --console-address ":9001" /data
restart: always
environment:
MINIO_ACCESS_KEY: minio
MINIO_SECRET_KEY: minio@openspg
TZ: Asia/Shanghai
ports:
- 9000:9000
- 9001:9001
volumes:
- /etc/localtime:/etc/localtime:ro

View File

@ -36,6 +36,8 @@ public class KagBuilderRequest extends BaseRequest {
private Integer workerGpu; private Integer workerGpu;
private String workerGpuType;
private Integer workerMemory; private Integer workerMemory;
private Integer workerStorage; private Integer workerStorage;

View File

@ -15,6 +15,7 @@
# | server | # | server |
# * ----------------------- */ # * ----------------------- */
# spring # spring
env=default
spring.application.name=openspg spring.application.name=openspg
spring.servlet.multipart.max-file-size=100GB spring.servlet.multipart.max-file-size=100GB
spring.servlet.multipart.max-request-size=100GB spring.servlet.multipart.max-request-size=100GB

View File

@ -150,5 +150,9 @@
<version>0.36.4-public</version> <version>0.36.4-public</version>
</dependency> </dependency>
<!-- dataSource end --> <!-- dataSource end -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -0,0 +1,81 @@
package com.antgroup.openspg.server.common.service.account.impl;
import com.antgroup.openspg.server.api.facade.Paged;
import com.antgroup.openspg.server.api.http.client.account.AccountService;
import com.antgroup.openspg.server.common.model.account.Account;
import java.io.IOException;
import java.util.List;
import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServletResponse;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
@Service
@ConditionalOnProperty(name = "env", havingValue = "default")
public class AccountServiceDefaultImpl implements AccountService {
@Override
public Account getLoginUser() {
return null;
}
@Override
public List<Account> getAccountByKeyword(String keyword) {
return null;
}
@Override
public Account getByUserNo(String userNo) {
return null;
}
@Override
public Account getWithPrivateByUserNo(String userNo) {
return null;
}
@Override
public Integer create(Account account) {
return null;
}
@Override
public Integer updatePassword(Account account) {
return null;
}
@Override
public Integer deleteAccount(String workNo) {
return null;
}
@Override
public Paged<Account> getAccountList(String account, Integer page, Integer size) {
return null;
}
@Override
public String getSha256HexPassword(String password, String salt) {
return null;
}
@Override
public Account getCurrentAccount(Cookie[] cookies) throws IOException {
return null;
}
@Override
public boolean login(Account account, HttpServletResponse response) {
return false;
}
@Override
public String logout(String workNo, String redirectUrl) {
return null;
}
@Override
public int updateUserConfig(Account account, Cookie[] cookies) {
return 0;
}
}

View File

@ -38,7 +38,7 @@ public class DefaultValue {
@Value("${schema.uri:}") @Value("${schema.uri:}")
private String schemaUrlHost; private String schemaUrlHost;
@Value("${builder.model.execute.num:5}") @Value("${builder.model.execute.num:20}")
private Integer modelExecuteNum; private Integer modelExecuteNum;
@Value("${python.exec:}") @Value("${python.exec:}")

View File

@ -15,6 +15,7 @@ package com.antgroup.openspg.server.core.scheduler.service.common;
import com.antgroup.openspg.common.util.DateTimeUtils; import com.antgroup.openspg.common.util.DateTimeUtils;
import com.antgroup.openspg.server.common.model.scheduler.SchedulerEnum; import com.antgroup.openspg.server.common.model.scheduler.SchedulerEnum;
import com.antgroup.openspg.server.core.scheduler.model.service.SchedulerTask; import com.antgroup.openspg.server.core.scheduler.model.service.SchedulerTask;
import com.antgroup.openspg.server.core.scheduler.service.api.SchedulerService;
import java.util.Date; import java.util.Date;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
@ -27,12 +28,15 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@Service @Service
@Slf4j @Slf4j
public class MemoryTaskServer { public class MemoryTaskServer {
@Autowired private SchedulerService schedulerService;
private final ConcurrentMap<String, SchedulerTask> taskMap = new ConcurrentHashMap<>(); private final ConcurrentMap<String, SchedulerTask> taskMap = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Future<?>> futureMap = new ConcurrentHashMap<>(); private final ConcurrentMap<String, Future<?>> futureMap = new ConcurrentHashMap<>();
@ -45,21 +49,24 @@ public class MemoryTaskServer {
new LinkedBlockingQueue<>(1000), new LinkedBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy()); new ThreadPoolExecutor.CallerRunsPolicy());
public String submit(MemoryTaskCallable<String> taskCallable, String taskId) { public String submit(MemoryTaskCallable<String> taskCallable, String taskId, Long instanceId) {
SchedulerTask taskInfo = new SchedulerTask(); SchedulerTask taskInfo = new SchedulerTask();
taskInfo.setNodeId(taskId); taskInfo.setNodeId(taskId);
taskInfo.setStatus(SchedulerEnum.TaskStatus.WAIT); taskInfo.setStatus(SchedulerEnum.TaskStatus.WAIT);
taskInfo.setInstanceId(instanceId);
taskMap.put(taskId, taskInfo); taskMap.put(taskId, taskInfo);
taskCallable.setTask(taskInfo); taskCallable.setTask(taskInfo);
Future<?> future = Future<?> future =
CompletableFuture.supplyAsync(() -> executeTask(taskId, taskCallable), executorService); CompletableFuture.supplyAsync(
() -> executeTask(taskId, instanceId, taskCallable), executorService);
futureMap.put(taskId, future); futureMap.put(taskId, future);
return taskId; return taskId;
} }
private String executeTask(String taskId, MemoryTaskCallable<String> taskCallable) { private String executeTask(
String taskId, Long instanceId, MemoryTaskCallable<String> taskCallable) {
SchedulerTask taskInfo = taskMap.get(taskId); SchedulerTask taskInfo = taskMap.get(taskId);
taskInfo.setStatus(SchedulerEnum.TaskStatus.RUNNING); taskInfo.setStatus(SchedulerEnum.TaskStatus.RUNNING);
taskInfo.setBeginTime(new Date()); taskInfo.setBeginTime(new Date());
@ -70,9 +77,10 @@ public class MemoryTaskServer {
} catch (Exception e) { } catch (Exception e) {
taskInfo.setStatus(SchedulerEnum.TaskStatus.ERROR); taskInfo.setStatus(SchedulerEnum.TaskStatus.ERROR);
taskInfo.setTraceLog(ExceptionUtils.getStackTrace(e)); taskInfo.setTraceLog(ExceptionUtils.getStackTrace(e));
log.error("executeTask Exception", e); log.error("executeTask Exception instanceId:" + instanceId, e);
} finally { } finally {
taskInfo.setFinishTime(new Date()); taskInfo.setFinishTime(new Date());
schedulerService.triggerInstance(instanceId);
} }
return taskId; return taskId;
} }
@ -87,9 +95,6 @@ public class MemoryTaskServer {
if (future != null && !future.isDone()) { if (future != null && !future.isDone()) {
boolean cancelled = future.cancel(true); boolean cancelled = future.cancel(true);
if (cancelled) { if (cancelled) {
SchedulerTask taskInfo = taskMap.get(taskId);
taskInfo.setStatus(SchedulerEnum.TaskStatus.TERMINATE);
taskMap.put(taskId, taskInfo);
futureMap.remove(taskId); futureMap.remove(taskId);
return true; return true;
} }

View File

@ -200,17 +200,16 @@ public class SchedulerHandlerClient {
if (CollectionUtils.isEmpty(infoLogs)) { if (CollectionUtils.isEmpty(infoLogs)) {
return true; return true;
} }
SchedulerInfoLog schedulerLog = infoLogs.get(infoLogs.size() - 1);
Date nowDate = new Date(); Date nowDate = new Date();
if (SchedulerInfoStatus.RUNNING.equals(schedulerInfo.getStatus())) { if (SchedulerInfoStatus.RUNNING.equals(schedulerInfo.getStatus())) {
Long hostExceptionTimeout = schedulerInfo.getHostExceptionTimeout(); Long hostExceptionTimeout = schedulerInfo.getHostExceptionTimeout();
if (hostExceptionTimeout != null if (hostExceptionTimeout != null
&& nowDate.getTime() - schedulerInfo.getGmtModified().getTime() && nowDate.getTime() - schedulerLog.getRt().getTime() >= hostExceptionTimeout * 1000) {
>= hostExceptionTimeout * 1000) {
log.info("running and timeout to pull again {} {}", name, hostExceptionTimeout); log.info("running and timeout to pull again {} {}", name, hostExceptionTimeout);
return true; return true;
} }
} }
SchedulerInfoLog schedulerLog = infoLogs.get(infoLogs.size() - 1);
if (SchedulerInfoStatus.WAIT.equals(schedulerInfo.getStatus()) if (SchedulerInfoStatus.WAIT.equals(schedulerInfo.getStatus())
&& nowDate.getTime() - schedulerLog.getRt().getTime() && nowDate.getTime() - schedulerLog.getRt().getTime()
>= schedulerInfo.getPeriod() * 1000) { >= schedulerInfo.getPeriod() * 1000) {

View File

@ -138,6 +138,9 @@ public abstract class TaskExecuteTemplate implements TaskExecute {
if (StringUtils.isBlank(old.getOutput())) { if (StringUtils.isBlank(old.getOutput())) {
old.setOutput(task.getOutput()); old.setOutput(task.getOutput());
} }
if (old.getFinishTime() == null) {
old.setFinishTime(task.getFinishTime());
}
task = old; task = old;
} }

View File

@ -67,7 +67,8 @@ public class KagAlignmentAsyncTask extends AsyncTaskExecuteTemplate {
List<String> inputs = SchedulerUtils.getTaskInputs(taskService, instance, task); List<String> inputs = SchedulerUtils.getTaskInputs(taskService, instance, task);
String taskId = String taskId =
memoryTaskServer.submit(new VectorizerTaskCallable(value, context, inputs), key); memoryTaskServer.submit(
new VectorizerTaskCallable(value, context, inputs), key, instance.getId());
context.addTraceLog("Alignment task has been successfully created!"); context.addTraceLog("Alignment task has been successfully created!");
return taskId; return taskId;
} }

View File

@ -58,7 +58,9 @@ public class KagBuilderAsyncTask extends AsyncTaskExecuteTemplate {
} }
String taskId = String taskId =
memoryTaskServer.submit( memoryTaskServer.submit(
new BuilderTaskCallable(value, builderJobService, projectService, context), key); new BuilderTaskCallable(value, builderJobService, projectService, context),
key,
context.getInstance().getId());
context.addTraceLog("Builder task has been successfully created!"); context.addTraceLog("Builder task has been successfully created!");
return taskId; return taskId;
} }

View File

@ -119,7 +119,7 @@ public class KagExtractorAsyncTask extends AsyncTaskExecuteTemplate {
JSONObject llm = JSONObject.parseObject(extractConfig.getString(CommonConstants.LLM)); JSONObject llm = JSONObject.parseObject(extractConfig.getString(CommonConstants.LLM));
String taskId = String taskId =
memoryTaskServer.submit( memoryTaskServer.submit(
new ExtractorTaskCallable(value, llm, project, context, inputs), key); new ExtractorTaskCallable(value, llm, project, context, inputs), key, instance.getId());
context.addTraceLog("Extractor task has been successfully created!"); context.addTraceLog("Extractor task has been successfully created!");
return taskId; return taskId;
} }

View File

@ -74,7 +74,8 @@ public class KagSplitterAsyncTask extends AsyncTaskExecuteTemplate {
String taskId = String taskId =
memoryTaskServer.submit( memoryTaskServer.submit(
new SplitterTaskCallable(value, builderJobService, projectService, context, inputs), new SplitterTaskCallable(value, builderJobService, projectService, context, inputs),
key); key,
instance.getId());
context.addTraceLog("Splitter task has been successfully created!"); context.addTraceLog("Splitter task has been successfully created!");
return taskId; return taskId;
} }

View File

@ -38,6 +38,7 @@ import com.antgroup.openspg.server.core.scheduler.service.utils.SchedulerUtils;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import java.nio.charset.StandardCharsets;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -72,7 +73,9 @@ public class KagVectorizerAsyncTask extends AsyncTaskExecuteTemplate {
List<String> inputs = SchedulerUtils.getTaskInputs(taskService, instance, task); List<String> inputs = SchedulerUtils.getTaskInputs(taskService, instance, task);
String taskId = String taskId =
memoryTaskServer.submit( memoryTaskServer.submit(
new VectorizerTaskCallable(value, projectService, context, inputs), key); new VectorizerTaskCallable(value, projectService, context, inputs),
key,
instance.getId());
context.addTraceLog("Vectorizer task has been successfully created!"); context.addTraceLog("Vectorizer task has been successfully created!");
return taskId; return taskId;
} }
@ -165,11 +168,14 @@ public class KagVectorizerAsyncTask extends AsyncTaskExecuteTemplate {
String fileKey = String fileKey =
CommonUtils.getTaskStorageFileKey( CommonUtils.getTaskStorageFileKey(
task.getProjectId(), task.getInstanceId(), task.getId(), task.getType()); task.getProjectId(), task.getInstanceId(), task.getId(), task.getType());
objectStorageClient.saveString( String results = JSON.toJSONString(subGraphList);
value.getBuilderBucketName(), JSON.toJSONString(subGraphList), fileKey); Long statr = System.currentTimeMillis();
byte[] bytes = results.getBytes(StandardCharsets.UTF_8);
addTraceLog("Start Store the results of the vector operator! byte length:%s", bytes.length);
objectStorageClient.saveData(value.getBuilderBucketName(), bytes, fileKey);
addTraceLog( addTraceLog(
"Store the results of the vector operator. file:%s/%s", "Store the results of the vector operator. file:%s/%s cons:%s",
value.getBuilderBucketName(), fileKey); value.getBuilderBucketName(), fileKey, System.currentTimeMillis() - statr);
return fileKey; return fileKey;
} }

View File

@ -37,6 +37,7 @@ import com.antgroup.openspg.server.core.scheduler.service.metadata.SchedulerTask
import com.antgroup.openspg.server.core.scheduler.service.task.async.AsyncTaskExecuteTemplate; import com.antgroup.openspg.server.core.scheduler.service.task.async.AsyncTaskExecuteTemplate;
import com.antgroup.openspg.server.core.scheduler.service.utils.SchedulerUtils; import com.antgroup.openspg.server.core.scheduler.service.utils.SchedulerUtils;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import java.util.Date;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -78,7 +79,8 @@ public class KagWriterAsyncTask extends AsyncTaskExecuteTemplate {
String taskId = String taskId =
memoryTaskServer.submit( memoryTaskServer.submit(
new WriterTaskCallable(value, projectManager, context, builderJob.getAction(), inputs), new WriterTaskCallable(value, projectManager, context, builderJob.getAction(), inputs),
key); key,
instance.getId());
context.addTraceLog("Writer task has been successfully created!"); context.addTraceLog("Writer task has been successfully created!");
return taskId; return taskId;
} }
@ -114,6 +116,7 @@ public class KagWriterAsyncTask extends AsyncTaskExecuteTemplate {
memoryTaskServer.stopTask(resource); memoryTaskServer.stopTask(resource);
schedulerTask.setOutput(resource); schedulerTask.setOutput(resource);
removeInputs(context); removeInputs(context);
task.setFinishTime(new Date());
break; break;
default: default:
context.addTraceLog( context.addTraceLog(

View File

@ -46,6 +46,8 @@ import org.springframework.stereotype.Component;
@Component("kagMappingSyncTask") @Component("kagMappingSyncTask")
public class KagMappingSyncTask extends SyncTaskExecuteTemplate { public class KagMappingSyncTask extends SyncTaskExecuteTemplate {
private static final Integer BATCH_MAX_NUM = 2000;
@Autowired private DefaultValue value; @Autowired private DefaultValue value;
@Autowired private BuilderJobService builderJobService; @Autowired private BuilderJobService builderJobService;
@ -109,28 +111,34 @@ public class KagMappingSyncTask extends SyncTaskExecuteTemplate {
projectId, projectId,
mapping, mapping,
Maps.newHashMap()); Maps.newHashMap());
int index = 0;
for (Map<String, Object> data : datas) { for (int i = 0; i < datas.size(); i += BATCH_MAX_NUM) {
context.addTraceLog("Invoke the mapping operator. index:%s/%s", ++index, datas.size()); int index = Math.min(i + BATCH_MAX_NUM, datas.size());
context.addTraceLog("Invoke the mapping operator. index:%s/%s", index, datas.size());
List<Map<String, Object>> batch = datas.subList(i, index);
List<Object> result = List<Object> result =
(List<Object>) (List<Object>)
PemjaUtils.invoke( PemjaUtils.invoke(
pemjaConfig, BuilderConstant.MAPPING_ABC, pyConfig.toJSONString(), data); pemjaConfig, BuilderConstant.MAPPING_ABC, pyConfig.toJSONString(), batch);
List<SubGraphRecord> records = List<SubGraphRecord> records =
JSON.parseObject(JSON.toJSONString(result), new TypeReference<List<SubGraphRecord>>() {}); JSON.parseObject(JSON.toJSONString(result), new TypeReference<List<SubGraphRecord>>() {});
subGraphList.addAll(records); subGraphList.addAll(records);
int nodes = 0;
int edges = 0;
for (SubGraphRecord subGraphRecord : records) { for (SubGraphRecord subGraphRecord : records) {
int nodes = nodes =
CollectionUtils.isEmpty(subGraphRecord.getResultNodes()) nodes
? 0 + (CollectionUtils.isEmpty(subGraphRecord.getResultNodes())
: subGraphRecord.getResultNodes().size(); ? 0
int edges = : subGraphRecord.getResultNodes().size());
CollectionUtils.isEmpty(subGraphRecord.getResultEdges()) edges =
? 0 edges
: subGraphRecord.getResultEdges().size(); + (CollectionUtils.isEmpty(subGraphRecord.getResultEdges())
context.addTraceLog( ? 0
"Mapping operator was invoked successfully nodes:%s edges:%s", nodes, edges); : subGraphRecord.getResultEdges().size());
} }
context.addTraceLog(
"Mapping operator was invoked successfully nodes:%s edges:%s", nodes, edges);
} }
return subGraphList; return subGraphList;
} }

View File

@ -12,6 +12,8 @@
*/ */
package com.antgroup.openspg.server.core.scheduler.service.translate.builder; package com.antgroup.openspg.server.core.scheduler.service.translate.builder;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.antgroup.openspg.cloudext.interfaces.objectstorage.ObjectStorageClient; import com.antgroup.openspg.cloudext.interfaces.objectstorage.ObjectStorageClient;
import com.antgroup.openspg.cloudext.interfaces.objectstorage.ObjectStorageClientDriverManager; import com.antgroup.openspg.cloudext.interfaces.objectstorage.ObjectStorageClientDriverManager;
import com.antgroup.openspg.common.constants.BuilderConstant; import com.antgroup.openspg.common.constants.BuilderConstant;
@ -76,6 +78,12 @@ public class KagStructureBuilderTranslate implements Translate {
List<TaskExecuteDag.Node> nodes = Lists.newArrayList(); List<TaskExecuteDag.Node> nodes = Lists.newArrayList();
List<TaskExecuteDag.Edge> edges = Lists.newArrayList(); List<TaskExecuteDag.Edge> edges = Lists.newArrayList();
JSONObject extension = JSON.parseObject(builderJob.getExtension());
JSONObject mappingConf = extension.getJSONObject(BuilderConstant.MAPPING_CONFIG);
String type =
(String)
mappingConf.getOrDefault(BuilderConstant.MAPPING_TYPE, BuilderConstant.ENTITY_MAPPING);
TaskExecuteDag taskDag = new TaskExecuteDag(); TaskExecuteDag taskDag = new TaskExecuteDag();
String checkPartitionId = UUID.randomUUID().toString(); String checkPartitionId = UUID.randomUUID().toString();
@ -101,12 +109,14 @@ public class KagStructureBuilderTranslate implements Translate {
mapping.setTaskComponent("kagMappingSyncTask"); mapping.setTaskComponent("kagMappingSyncTask");
nodes.add(mapping); nodes.add(mapping);
TaskExecuteDag.Node vectorizer = new TaskExecuteDag.Node();
String vectorizerId = UUID.randomUUID().toString(); String vectorizerId = UUID.randomUUID().toString();
vectorizer.setId(vectorizerId); if (BuilderConstant.ENTITY_MAPPING.equalsIgnoreCase(type)) {
vectorizer.setName("Vectorizer"); TaskExecuteDag.Node vectorizer = new TaskExecuteDag.Node();
vectorizer.setTaskComponent("kagVectorizerAsyncTask"); vectorizer.setId(vectorizerId);
nodes.add(vectorizer); vectorizer.setName("Vectorizer");
vectorizer.setTaskComponent("kagVectorizerAsyncTask");
nodes.add(vectorizer);
}
TaskExecuteDag.Node writer = new TaskExecuteDag.Node(); TaskExecuteDag.Node writer = new TaskExecuteDag.Node();
String writerId = UUID.randomUUID().toString(); String writerId = UUID.randomUUID().toString();
@ -127,15 +137,22 @@ public class KagStructureBuilderTranslate implements Translate {
edge.setTo(mappingId); edge.setTo(mappingId);
edges.add(edge); edges.add(edge);
TaskExecuteDag.Edge edge2 = new TaskExecuteDag.Edge(); if (BuilderConstant.ENTITY_MAPPING.equalsIgnoreCase(type)) {
edge2.setFrom(mappingId); TaskExecuteDag.Edge edge2 = new TaskExecuteDag.Edge();
edge2.setTo(vectorizerId); edge2.setFrom(mappingId);
edges.add(edge2); edge2.setTo(vectorizerId);
edges.add(edge2);
TaskExecuteDag.Edge edge4 = new TaskExecuteDag.Edge(); TaskExecuteDag.Edge edge4 = new TaskExecuteDag.Edge();
edge4.setFrom(vectorizerId); edge4.setFrom(vectorizerId);
edge4.setTo(writerId); edge4.setTo(writerId);
edges.add(edge4); edges.add(edge4);
} else {
TaskExecuteDag.Edge edge2 = new TaskExecuteDag.Edge();
edge2.setFrom(mappingId);
edge2.setTo(writerId);
edges.add(edge2);
}
taskDag.setNodes(nodes); taskDag.setNodes(nodes);
taskDag.setEdges(edges); taskDag.setEdges(edges);