diff --git a/dev/release/mysql/buildx-release-mysql.sh b/dev/release/mysql/buildx-release-mysql.sh index ec759a15..b609b581 100644 --- a/dev/release/mysql/buildx-release-mysql.sh +++ b/dev/release/mysql/buildx-release-mysql.sh @@ -10,7 +10,7 @@ # or implied. docker buildx build -f Dockerfile --platform linux/arm64/v8,linux/amd64 --push \ - -t spg-registry.cn-hangzhou.cr.aliyuncs.com/spg/openspg-mysql:0.6 \ + -t spg-registry.cn-hangzhou.cr.aliyuncs.com/spg/openspg-mysql:0.7 \ -t spg-registry.cn-hangzhou.cr.aliyuncs.com/spg/openspg-mysql:latest \ -t openspg/openspg-mysql:0.5.1 \ -t openspg/openspg-mysql:latest \ diff --git a/dev/release/python/Dockerfile b/dev/release/python/Dockerfile index 60917b55..1220b9f1 100644 --- a/dev/release/python/Dockerfile +++ b/dev/release/python/Dockerfile @@ -50,14 +50,3 @@ RUN . /etc/profile && echo ${JAVA_HOME} && mkdir -p /home/admin/ && chmod -R 777 pip install openspg-kag==0.7.0 &&\ pip install pemja==0.4.0 && \ pip cache purge - - -# RUN python3 -m venv /openspg_venv && \ -# . /openspg_venv/bin/activate && \ -# export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-$(dpkg --print-architecture) && \ -# pip3 install openspg-kag==0.6.0b9 -i https://artifacts.antgroup-inc.cn/artifact/repositories/simple-dev/ && \ -# pip3 install pemja==0.4.0 && \ -# pip3 install -U "http://alps-common.oss-cn-hangzhou-zmf.aliyuncs.com/nscommon/shiji/nscommon-0.0.1.tar.gz" && \ -# echo "if (tty -s); then \n . /openspg_venv/bin/activate \nfi" >> ~/.bashrc - -# ADD openspg/dev/release/python/lib/builder*.jar /home/admin/miniconda3/lib/python3.10/site-packages/knext/builder/lib diff --git a/dev/release/python/build-release-python-aliyun.sh b/dev/release/python/build-release-python-aliyun.sh index 95210c7b..7812c06c 100644 --- a/dev/release/python/build-release-python-aliyun.sh +++ b/dev/release/python/build-release-python-aliyun.sh @@ -10,7 +10,7 @@ # or implied. alias docker=podman IMAGE="spg-registry.cn-hangzhou.cr.aliyuncs.com/spg/openspg-python" -VERSION="0.6" +VERSION="0.7" LATEST="latest" cd ../../../../ diff --git a/dev/release/python/build-release-python.sh b/dev/release/python/build-release-python.sh index c04e9a69..7b367d1b 100644 --- a/dev/release/python/build-release-python.sh +++ b/dev/release/python/build-release-python.sh @@ -11,8 +11,8 @@ # for amd64 docker build -f Dockerfile --platform linux/amd64 --push \ - -t spg-registry.cn-hangzhou.cr.aliyuncs.com/spg/openspg-python:0.5.1 \ + -t spg-registry.cn-hangzhou.cr.aliyuncs.com/spg/openspg-python:0.7 \ -t spg-registry.cn-hangzhou.cr.aliyuncs.com/spg/openspg-python:latest \ - -t openspg/openspg-python:0.5.1 \ + -t openspg/openspg-python:0.7 \ -t openspg/openspg-python:latest \ . diff --git a/dev/release/server/buildx-release-server.sh b/dev/release/server/buildx-release-server.sh index 96bbd481..859256d7 100644 --- a/dev/release/server/buildx-release-server.sh +++ b/dev/release/server/buildx-release-server.sh @@ -10,6 +10,6 @@ # or implied. docker buildx build -f Dockerfile --platform linux/arm64/v8,linux/amd64 --push \ - -t spg-registry.cn-hangzhou.cr.aliyuncs.com/spg/openspg-server:0.6 \ + -t spg-registry.cn-hangzhou.cr.aliyuncs.com/spg/openspg-server:0.7 \ -t spg-registry.cn-hangzhou.cr.aliyuncs.com/spg/openspg-server:latest \ . diff --git a/dev/test/docker-compose.yml b/dev/test/docker-compose.yml index 9062bfdc..1fe88897 100644 --- a/dev/test/docker-compose.yml +++ b/dev/test/docker-compose.yml @@ -38,3 +38,18 @@ services: volumes: - /etc/localtime:/etc/localtime:ro - $HOME/dozerdb/logs:/logs + + minio: + image: spg-registry.cn-hangzhou.cr.aliyuncs.com/spg/openspg-minio:latest + container_name: release-openspg-minio + command: server --console-address ":9001" /data + restart: always + environment: + MINIO_ACCESS_KEY: minio + MINIO_SECRET_KEY: minio@openspg + TZ: Asia/Shanghai + ports: + - 9000:9000 + - 9001:9001 + volumes: + - /etc/localtime:/etc/localtime:ro diff --git a/server/api/facade/src/main/java/com/antgroup/openspg/server/api/facade/dto/common/request/KagBuilderRequest.java b/server/api/facade/src/main/java/com/antgroup/openspg/server/api/facade/dto/common/request/KagBuilderRequest.java index 81286b01..4ee82282 100644 --- a/server/api/facade/src/main/java/com/antgroup/openspg/server/api/facade/dto/common/request/KagBuilderRequest.java +++ b/server/api/facade/src/main/java/com/antgroup/openspg/server/api/facade/dto/common/request/KagBuilderRequest.java @@ -36,6 +36,8 @@ public class KagBuilderRequest extends BaseRequest { private Integer workerGpu; + private String workerGpuType; + private Integer workerMemory; private Integer workerStorage; diff --git a/server/arks/sofaboot/src/main/resources/config/application-default.properties b/server/arks/sofaboot/src/main/resources/config/application-default.properties index ac8ca3ea..25e47503 100644 --- a/server/arks/sofaboot/src/main/resources/config/application-default.properties +++ b/server/arks/sofaboot/src/main/resources/config/application-default.properties @@ -15,6 +15,7 @@ # | server | # * ----------------------- */ # spring +env=default spring.application.name=openspg spring.servlet.multipart.max-file-size=100GB spring.servlet.multipart.max-request-size=100GB diff --git a/server/common/service/pom.xml b/server/common/service/pom.xml index d41a73ec..2ad5ba0c 100644 --- a/server/common/service/pom.xml +++ b/server/common/service/pom.xml @@ -150,5 +150,9 @@ 0.36.4-public + + org.springframework.boot + spring-boot-autoconfigure + diff --git a/server/common/service/src/main/java/com/antgroup/openspg/server/common/service/account/impl/AccountServiceDefaultImpl.java b/server/common/service/src/main/java/com/antgroup/openspg/server/common/service/account/impl/AccountServiceDefaultImpl.java new file mode 100644 index 00000000..931031c6 --- /dev/null +++ b/server/common/service/src/main/java/com/antgroup/openspg/server/common/service/account/impl/AccountServiceDefaultImpl.java @@ -0,0 +1,81 @@ +package com.antgroup.openspg.server.common.service.account.impl; + +import com.antgroup.openspg.server.api.facade.Paged; +import com.antgroup.openspg.server.api.http.client.account.AccountService; +import com.antgroup.openspg.server.common.model.account.Account; +import java.io.IOException; +import java.util.List; +import javax.servlet.http.Cookie; +import javax.servlet.http.HttpServletResponse; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Service; + +@Service +@ConditionalOnProperty(name = "env", havingValue = "default") +public class AccountServiceDefaultImpl implements AccountService { + + @Override + public Account getLoginUser() { + return null; + } + + @Override + public List getAccountByKeyword(String keyword) { + return null; + } + + @Override + public Account getByUserNo(String userNo) { + return null; + } + + @Override + public Account getWithPrivateByUserNo(String userNo) { + return null; + } + + @Override + public Integer create(Account account) { + return null; + } + + @Override + public Integer updatePassword(Account account) { + return null; + } + + @Override + public Integer deleteAccount(String workNo) { + return null; + } + + @Override + public Paged getAccountList(String account, Integer page, Integer size) { + return null; + } + + @Override + public String getSha256HexPassword(String password, String salt) { + return null; + } + + @Override + public Account getCurrentAccount(Cookie[] cookies) throws IOException { + return null; + } + + @Override + public boolean login(Account account, HttpServletResponse response) { + return false; + } + + @Override + public String logout(String workNo, String redirectUrl) { + return null; + } + + @Override + public int updateUserConfig(Account account, Cookie[] cookies) { + return 0; + } +} diff --git a/server/common/service/src/main/java/com/antgroup/openspg/server/common/service/config/DefaultValue.java b/server/common/service/src/main/java/com/antgroup/openspg/server/common/service/config/DefaultValue.java index 60d725ca..9754ed5a 100644 --- a/server/common/service/src/main/java/com/antgroup/openspg/server/common/service/config/DefaultValue.java +++ b/server/common/service/src/main/java/com/antgroup/openspg/server/common/service/config/DefaultValue.java @@ -38,7 +38,7 @@ public class DefaultValue { @Value("${schema.uri:}") private String schemaUrlHost; - @Value("${builder.model.execute.num:5}") + @Value("${builder.model.execute.num:20}") private Integer modelExecuteNum; @Value("${python.exec:}") diff --git a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/common/MemoryTaskServer.java b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/common/MemoryTaskServer.java index 5fc2815c..672821fa 100644 --- a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/common/MemoryTaskServer.java +++ b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/common/MemoryTaskServer.java @@ -15,6 +15,7 @@ package com.antgroup.openspg.server.core.scheduler.service.common; import com.antgroup.openspg.common.util.DateTimeUtils; import com.antgroup.openspg.server.common.model.scheduler.SchedulerEnum; import com.antgroup.openspg.server.core.scheduler.model.service.SchedulerTask; +import com.antgroup.openspg.server.core.scheduler.service.api.SchedulerService; import java.util.Date; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; @@ -27,12 +28,15 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.exception.ExceptionUtils; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service @Slf4j public class MemoryTaskServer { + @Autowired private SchedulerService schedulerService; + private final ConcurrentMap taskMap = new ConcurrentHashMap<>(); private final ConcurrentMap> futureMap = new ConcurrentHashMap<>(); @@ -45,21 +49,24 @@ public class MemoryTaskServer { new LinkedBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy()); - public String submit(MemoryTaskCallable taskCallable, String taskId) { + public String submit(MemoryTaskCallable taskCallable, String taskId, Long instanceId) { SchedulerTask taskInfo = new SchedulerTask(); taskInfo.setNodeId(taskId); taskInfo.setStatus(SchedulerEnum.TaskStatus.WAIT); + taskInfo.setInstanceId(instanceId); taskMap.put(taskId, taskInfo); taskCallable.setTask(taskInfo); Future future = - CompletableFuture.supplyAsync(() -> executeTask(taskId, taskCallable), executorService); + CompletableFuture.supplyAsync( + () -> executeTask(taskId, instanceId, taskCallable), executorService); futureMap.put(taskId, future); return taskId; } - private String executeTask(String taskId, MemoryTaskCallable taskCallable) { + private String executeTask( + String taskId, Long instanceId, MemoryTaskCallable taskCallable) { SchedulerTask taskInfo = taskMap.get(taskId); taskInfo.setStatus(SchedulerEnum.TaskStatus.RUNNING); taskInfo.setBeginTime(new Date()); @@ -70,9 +77,10 @@ public class MemoryTaskServer { } catch (Exception e) { taskInfo.setStatus(SchedulerEnum.TaskStatus.ERROR); taskInfo.setTraceLog(ExceptionUtils.getStackTrace(e)); - log.error("executeTask Exception", e); + log.error("executeTask Exception instanceId:" + instanceId, e); } finally { taskInfo.setFinishTime(new Date()); + schedulerService.triggerInstance(instanceId); } return taskId; } @@ -87,9 +95,6 @@ public class MemoryTaskServer { if (future != null && !future.isDone()) { boolean cancelled = future.cancel(true); if (cancelled) { - SchedulerTask taskInfo = taskMap.get(taskId); - taskInfo.setStatus(SchedulerEnum.TaskStatus.TERMINATE); - taskMap.put(taskId, taskInfo); futureMap.remove(taskId); return true; } diff --git a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/handler/client/db/SchedulerHandlerClient.java b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/handler/client/db/SchedulerHandlerClient.java index 4e7d4609..4438cd3f 100644 --- a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/handler/client/db/SchedulerHandlerClient.java +++ b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/handler/client/db/SchedulerHandlerClient.java @@ -200,17 +200,16 @@ public class SchedulerHandlerClient { if (CollectionUtils.isEmpty(infoLogs)) { return true; } + SchedulerInfoLog schedulerLog = infoLogs.get(infoLogs.size() - 1); Date nowDate = new Date(); if (SchedulerInfoStatus.RUNNING.equals(schedulerInfo.getStatus())) { Long hostExceptionTimeout = schedulerInfo.getHostExceptionTimeout(); if (hostExceptionTimeout != null - && nowDate.getTime() - schedulerInfo.getGmtModified().getTime() - >= hostExceptionTimeout * 1000) { + && nowDate.getTime() - schedulerLog.getRt().getTime() >= hostExceptionTimeout * 1000) { log.info("running and timeout to pull again {} {}", name, hostExceptionTimeout); return true; } } - SchedulerInfoLog schedulerLog = infoLogs.get(infoLogs.size() - 1); if (SchedulerInfoStatus.WAIT.equals(schedulerInfo.getStatus()) && nowDate.getTime() - schedulerLog.getRt().getTime() >= schedulerInfo.getPeriod() * 1000) { diff --git a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/TaskExecuteTemplate.java b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/TaskExecuteTemplate.java index 76985a29..70ef2056 100644 --- a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/TaskExecuteTemplate.java +++ b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/TaskExecuteTemplate.java @@ -138,6 +138,9 @@ public abstract class TaskExecuteTemplate implements TaskExecute { if (StringUtils.isBlank(old.getOutput())) { old.setOutput(task.getOutput()); } + if (old.getFinishTime() == null) { + old.setFinishTime(task.getFinishTime()); + } task = old; } diff --git a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/async/builder/KagAlignmentAsyncTask.java b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/async/builder/KagAlignmentAsyncTask.java index 14d60593..a8f8726e 100644 --- a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/async/builder/KagAlignmentAsyncTask.java +++ b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/async/builder/KagAlignmentAsyncTask.java @@ -67,7 +67,8 @@ public class KagAlignmentAsyncTask extends AsyncTaskExecuteTemplate { List inputs = SchedulerUtils.getTaskInputs(taskService, instance, task); String taskId = - memoryTaskServer.submit(new VectorizerTaskCallable(value, context, inputs), key); + memoryTaskServer.submit( + new VectorizerTaskCallable(value, context, inputs), key, instance.getId()); context.addTraceLog("Alignment task has been successfully created!"); return taskId; } diff --git a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/async/builder/KagBuilderAsyncTask.java b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/async/builder/KagBuilderAsyncTask.java index 4a3aee14..f78601fc 100644 --- a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/async/builder/KagBuilderAsyncTask.java +++ b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/async/builder/KagBuilderAsyncTask.java @@ -58,7 +58,9 @@ public class KagBuilderAsyncTask extends AsyncTaskExecuteTemplate { } String taskId = memoryTaskServer.submit( - new BuilderTaskCallable(value, builderJobService, projectService, context), key); + new BuilderTaskCallable(value, builderJobService, projectService, context), + key, + context.getInstance().getId()); context.addTraceLog("Builder task has been successfully created!"); return taskId; } diff --git a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/async/builder/KagExtractorAsyncTask.java b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/async/builder/KagExtractorAsyncTask.java index aef548a5..fe726ff6 100644 --- a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/async/builder/KagExtractorAsyncTask.java +++ b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/async/builder/KagExtractorAsyncTask.java @@ -119,7 +119,7 @@ public class KagExtractorAsyncTask extends AsyncTaskExecuteTemplate { JSONObject llm = JSONObject.parseObject(extractConfig.getString(CommonConstants.LLM)); String taskId = memoryTaskServer.submit( - new ExtractorTaskCallable(value, llm, project, context, inputs), key); + new ExtractorTaskCallable(value, llm, project, context, inputs), key, instance.getId()); context.addTraceLog("Extractor task has been successfully created!"); return taskId; } diff --git a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/async/builder/KagSplitterAsyncTask.java b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/async/builder/KagSplitterAsyncTask.java index 3d47c48c..1a7211d2 100644 --- a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/async/builder/KagSplitterAsyncTask.java +++ b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/async/builder/KagSplitterAsyncTask.java @@ -74,7 +74,8 @@ public class KagSplitterAsyncTask extends AsyncTaskExecuteTemplate { String taskId = memoryTaskServer.submit( new SplitterTaskCallable(value, builderJobService, projectService, context, inputs), - key); + key, + instance.getId()); context.addTraceLog("Splitter task has been successfully created!"); return taskId; } diff --git a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/async/builder/KagVectorizerAsyncTask.java b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/async/builder/KagVectorizerAsyncTask.java index 0570cb00..ade5477e 100644 --- a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/async/builder/KagVectorizerAsyncTask.java +++ b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/async/builder/KagVectorizerAsyncTask.java @@ -38,6 +38,7 @@ import com.antgroup.openspg.server.core.scheduler.service.utils.SchedulerUtils; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; @@ -72,7 +73,9 @@ public class KagVectorizerAsyncTask extends AsyncTaskExecuteTemplate { List inputs = SchedulerUtils.getTaskInputs(taskService, instance, task); String taskId = memoryTaskServer.submit( - new VectorizerTaskCallable(value, projectService, context, inputs), key); + new VectorizerTaskCallable(value, projectService, context, inputs), + key, + instance.getId()); context.addTraceLog("Vectorizer task has been successfully created!"); return taskId; } @@ -165,11 +168,14 @@ public class KagVectorizerAsyncTask extends AsyncTaskExecuteTemplate { String fileKey = CommonUtils.getTaskStorageFileKey( task.getProjectId(), task.getInstanceId(), task.getId(), task.getType()); - objectStorageClient.saveString( - value.getBuilderBucketName(), JSON.toJSONString(subGraphList), fileKey); + String results = JSON.toJSONString(subGraphList); + Long statr = System.currentTimeMillis(); + byte[] bytes = results.getBytes(StandardCharsets.UTF_8); + addTraceLog("Start Store the results of the vector operator! byte length:%s", bytes.length); + objectStorageClient.saveData(value.getBuilderBucketName(), bytes, fileKey); addTraceLog( - "Store the results of the vector operator. file:%s/%s", - value.getBuilderBucketName(), fileKey); + "Store the results of the vector operator. file:%s/%s cons:%s", + value.getBuilderBucketName(), fileKey, System.currentTimeMillis() - statr); return fileKey; } diff --git a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/async/builder/KagWriterAsyncTask.java b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/async/builder/KagWriterAsyncTask.java index 7edd12c9..2b9c90de 100644 --- a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/async/builder/KagWriterAsyncTask.java +++ b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/async/builder/KagWriterAsyncTask.java @@ -37,6 +37,7 @@ import com.antgroup.openspg.server.core.scheduler.service.metadata.SchedulerTask import com.antgroup.openspg.server.core.scheduler.service.task.async.AsyncTaskExecuteTemplate; import com.antgroup.openspg.server.core.scheduler.service.utils.SchedulerUtils; import com.google.common.collect.Lists; +import java.util.Date; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -78,7 +79,8 @@ public class KagWriterAsyncTask extends AsyncTaskExecuteTemplate { String taskId = memoryTaskServer.submit( new WriterTaskCallable(value, projectManager, context, builderJob.getAction(), inputs), - key); + key, + instance.getId()); context.addTraceLog("Writer task has been successfully created!"); return taskId; } @@ -114,6 +116,7 @@ public class KagWriterAsyncTask extends AsyncTaskExecuteTemplate { memoryTaskServer.stopTask(resource); schedulerTask.setOutput(resource); removeInputs(context); + task.setFinishTime(new Date()); break; default: context.addTraceLog( diff --git a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/sync/builder/KagMappingSyncTask.java b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/sync/builder/KagMappingSyncTask.java index 4833dd35..0c6cd4f8 100644 --- a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/sync/builder/KagMappingSyncTask.java +++ b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/task/sync/builder/KagMappingSyncTask.java @@ -46,6 +46,8 @@ import org.springframework.stereotype.Component; @Component("kagMappingSyncTask") public class KagMappingSyncTask extends SyncTaskExecuteTemplate { + private static final Integer BATCH_MAX_NUM = 2000; + @Autowired private DefaultValue value; @Autowired private BuilderJobService builderJobService; @@ -109,28 +111,34 @@ public class KagMappingSyncTask extends SyncTaskExecuteTemplate { projectId, mapping, Maps.newHashMap()); - int index = 0; - for (Map data : datas) { - context.addTraceLog("Invoke the mapping operator. index:%s/%s", ++index, datas.size()); + + for (int i = 0; i < datas.size(); i += BATCH_MAX_NUM) { + int index = Math.min(i + BATCH_MAX_NUM, datas.size()); + context.addTraceLog("Invoke the mapping operator. index:%s/%s", index, datas.size()); + List> batch = datas.subList(i, index); List result = (List) PemjaUtils.invoke( - pemjaConfig, BuilderConstant.MAPPING_ABC, pyConfig.toJSONString(), data); + pemjaConfig, BuilderConstant.MAPPING_ABC, pyConfig.toJSONString(), batch); List records = JSON.parseObject(JSON.toJSONString(result), new TypeReference>() {}); subGraphList.addAll(records); + int nodes = 0; + int edges = 0; for (SubGraphRecord subGraphRecord : records) { - int nodes = - CollectionUtils.isEmpty(subGraphRecord.getResultNodes()) - ? 0 - : subGraphRecord.getResultNodes().size(); - int edges = - CollectionUtils.isEmpty(subGraphRecord.getResultEdges()) - ? 0 - : subGraphRecord.getResultEdges().size(); - context.addTraceLog( - "Mapping operator was invoked successfully nodes:%s edges:%s", nodes, edges); + nodes = + nodes + + (CollectionUtils.isEmpty(subGraphRecord.getResultNodes()) + ? 0 + : subGraphRecord.getResultNodes().size()); + edges = + edges + + (CollectionUtils.isEmpty(subGraphRecord.getResultEdges()) + ? 0 + : subGraphRecord.getResultEdges().size()); } + context.addTraceLog( + "Mapping operator was invoked successfully nodes:%s edges:%s", nodes, edges); } return subGraphList; } diff --git a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/translate/builder/KagStructureBuilderTranslate.java b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/translate/builder/KagStructureBuilderTranslate.java index e604d4d0..d6fd22c3 100644 --- a/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/translate/builder/KagStructureBuilderTranslate.java +++ b/server/core/scheduler/service/src/main/java/com/antgroup/openspg/server/core/scheduler/service/translate/builder/KagStructureBuilderTranslate.java @@ -12,6 +12,8 @@ */ package com.antgroup.openspg.server.core.scheduler.service.translate.builder; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; import com.antgroup.openspg.cloudext.interfaces.objectstorage.ObjectStorageClient; import com.antgroup.openspg.cloudext.interfaces.objectstorage.ObjectStorageClientDriverManager; import com.antgroup.openspg.common.constants.BuilderConstant; @@ -76,6 +78,12 @@ public class KagStructureBuilderTranslate implements Translate { List nodes = Lists.newArrayList(); List edges = Lists.newArrayList(); + JSONObject extension = JSON.parseObject(builderJob.getExtension()); + JSONObject mappingConf = extension.getJSONObject(BuilderConstant.MAPPING_CONFIG); + String type = + (String) + mappingConf.getOrDefault(BuilderConstant.MAPPING_TYPE, BuilderConstant.ENTITY_MAPPING); + TaskExecuteDag taskDag = new TaskExecuteDag(); String checkPartitionId = UUID.randomUUID().toString(); @@ -101,12 +109,14 @@ public class KagStructureBuilderTranslate implements Translate { mapping.setTaskComponent("kagMappingSyncTask"); nodes.add(mapping); - TaskExecuteDag.Node vectorizer = new TaskExecuteDag.Node(); String vectorizerId = UUID.randomUUID().toString(); - vectorizer.setId(vectorizerId); - vectorizer.setName("Vectorizer"); - vectorizer.setTaskComponent("kagVectorizerAsyncTask"); - nodes.add(vectorizer); + if (BuilderConstant.ENTITY_MAPPING.equalsIgnoreCase(type)) { + TaskExecuteDag.Node vectorizer = new TaskExecuteDag.Node(); + vectorizer.setId(vectorizerId); + vectorizer.setName("Vectorizer"); + vectorizer.setTaskComponent("kagVectorizerAsyncTask"); + nodes.add(vectorizer); + } TaskExecuteDag.Node writer = new TaskExecuteDag.Node(); String writerId = UUID.randomUUID().toString(); @@ -127,15 +137,22 @@ public class KagStructureBuilderTranslate implements Translate { edge.setTo(mappingId); edges.add(edge); - TaskExecuteDag.Edge edge2 = new TaskExecuteDag.Edge(); - edge2.setFrom(mappingId); - edge2.setTo(vectorizerId); - edges.add(edge2); + if (BuilderConstant.ENTITY_MAPPING.equalsIgnoreCase(type)) { + TaskExecuteDag.Edge edge2 = new TaskExecuteDag.Edge(); + edge2.setFrom(mappingId); + edge2.setTo(vectorizerId); + edges.add(edge2); - TaskExecuteDag.Edge edge4 = new TaskExecuteDag.Edge(); - edge4.setFrom(vectorizerId); - edge4.setTo(writerId); - edges.add(edge4); + TaskExecuteDag.Edge edge4 = new TaskExecuteDag.Edge(); + edge4.setFrom(vectorizerId); + edge4.setTo(writerId); + edges.add(edge4); + } else { + TaskExecuteDag.Edge edge2 = new TaskExecuteDag.Edge(); + edge2.setFrom(mappingId); + edge2.setTo(writerId); + edges.add(edge2); + } taskDag.setNodes(nodes); taskDag.setEdges(edges);