From ee4f1b4e51d14f355faf48bb3df96e2e3b42cc61 Mon Sep 17 00:00:00 2001 From: Mohit Yadav <105265192+mohityadav766@users.noreply.github.com> Date: Mon, 10 Apr 2023 13:15:44 +0530 Subject: [PATCH] Add Stop Functionality For Reindex (#10971) * Add stop Reindexing Job * Languages * Add Status * Add Status * Add messages --- .../resources/search/SearchResource.java | 19 +++++++++++++++ .../service/util/ReIndexingHandler.java | 11 +++++++++ .../searchIndex/SearchIndexWorkflow.java | 23 +++++++++++++------ .../json/schema/system/eventPublisherJob.json | 3 ++- .../src/main/resources/ui/src/jsons/en.ts | 1 + .../ui/src/locale/languages/en-us.json | 2 ++ .../ui/src/locale/languages/es-es.json | 2 ++ .../ui/src/locale/languages/fr-fr.json | 2 ++ .../ui/src/locale/languages/ja-jp.json | 2 ++ .../ui/src/locale/languages/pt-br.json | 2 ++ .../ui/src/locale/languages/zh-cn.json | 2 ++ .../ElasticSearchReIndexPage.component.tsx | 18 +++++++++++++++ .../ui/src/rest/elasticSearchReIndexAPI.ts | 6 +++++ .../ui/src/utils/EventPublisherUtils.tsx | 4 ++++ 14 files changed, 89 insertions(+), 8 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/search/SearchResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/search/SearchResource.java index a5aab45c391..e5fc0bdd900 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/search/SearchResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/search/SearchResource.java @@ -36,6 +36,7 @@ import javax.validation.Valid; import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; import javax.ws.rs.POST; +import javax.ws.rs.PUT; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; @@ -520,6 +521,24 @@ public class SearchResource { .build(); } + @PUT + @Path("/reindex/stop/{jobId}") + @Operation( + operationId = "stopAJobWithId", + summary = "Stop Reindex Job", + description = "Stop a Reindex Job", + responses = { + @ApiResponse(responseCode = "200", description = "Success"), + @ApiResponse(responseCode = "404", description = "Bot for instance {id} is not found") + }) + public Response stopReindexJob( + @Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @Parameter(description = "jobId Id", schema = @Schema(type = "UUID")) @PathParam("jobId") UUID id) { + authorizer.authorizeAdmin(securityContext); + return Response.status(Response.Status.OK).entity(ReIndexingHandler.getInstance().stopRunningJob(id)).build(); + } + private SearchSourceBuilder buildAggregateSearchBuilder(String query, int from, int size) { QueryStringQueryBuilder queryBuilder = QueryBuilders.queryStringQuery(query).lenient(true); SearchSourceBuilder searchSourceBuilder = searchBuilder(queryBuilder, null, from, size); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/ReIndexingHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/ReIndexingHandler.java index 7be1a25d231..72384d9724d 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/util/ReIndexingHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/ReIndexingHandler.java @@ -31,6 +31,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import javax.ws.rs.core.Response; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.elasticsearch.client.RestHighLevelClient; @@ -40,6 +41,7 @@ import org.openmetadata.schema.system.Failure; import org.openmetadata.schema.system.Stats; import org.openmetadata.service.Entity; import org.openmetadata.service.elasticsearch.ElasticSearchIndexDefinition; +import org.openmetadata.service.exception.CustomExceptionMessage; import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.workflows.searchIndex.ReindexingUtil; import org.openmetadata.service.workflows.searchIndex.SearchIndexWorkflow; @@ -132,6 +134,15 @@ public class ReIndexingHandler { && entry.getValue().getJobData().getStatus() != EventPublisherJob.Status.RUNNING); } + public EventPublisherJob stopRunningJob(UUID jobId) { + SearchIndexWorkflow job = REINDEXING_JOB_MAP.get(jobId); + if (job != null) { + job.stopJob(); + return job.getJobData(); + } + throw new CustomExceptionMessage(Response.Status.BAD_REQUEST, "Job is not in Running state."); + } + private void validateJob(CreateEventPublisherJob job) { Objects.requireNonNull(job); Set storedEntityList = new HashSet<>(Entity.getEntityList()); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/SearchIndexWorkflow.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/SearchIndexWorkflow.java index f020788e030..69d27287ac1 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/SearchIndexWorkflow.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/SearchIndexWorkflow.java @@ -63,6 +63,7 @@ public class SearchIndexWorkflow implements Runnable { private final ElasticSearchIndexDefinition elasticSearchIndexDefinition; @Getter private final EventPublisherJob jobData; private final CollectionDAO dao; + private volatile boolean stopped = false; public SearchIndexWorkflow( CollectionDAO dao, @@ -127,7 +128,7 @@ public class SearchIndexWorkflow implements Runnable { reCreateIndexes(paginatedEntitiesSource.getEntityType()); contextData.put(ENTITY_TYPE_KEY, paginatedEntitiesSource.getEntityType()); ResultList resultList; - while (!paginatedEntitiesSource.isDone()) { + while (!stopped && !paginatedEntitiesSource.isDone()) { long currentTime = System.currentTimeMillis(); int requestToProcess = jobData.getBatchSize(); int failed = requestToProcess; @@ -188,7 +189,7 @@ public class SearchIndexWorkflow implements Runnable { reCreateIndexes(paginatedDataInsightSource.getEntityType()); contextData.put(ENTITY_TYPE_KEY, paginatedDataInsightSource.getEntityType()); ResultList resultList; - while (!paginatedDataInsightSource.isDone()) { + while (!stopped && !paginatedDataInsightSource.isDone()) { long currentTime = System.currentTimeMillis(); int requestToProcess = jobData.getBatchSize(); int failed = requestToProcess; @@ -374,12 +375,16 @@ public class SearchIndexWorkflow implements Runnable { } private void updateJobStatus() { - if (jobData.getFailure().getSinkError() != null - || jobData.getFailure().getSourceError() != null - || jobData.getFailure().getProcessorError() != null) { - jobData.setStatus(EventPublisherJob.Status.FAILED); + if (stopped) { + jobData.setStatus(EventPublisherJob.Status.STOPPED); } else { - jobData.setStatus(EventPublisherJob.Status.COMPLETED); + if (jobData.getFailure().getSinkError() != null + || jobData.getFailure().getSourceError() != null + || jobData.getFailure().getProcessorError() != null) { + jobData.setStatus(EventPublisherJob.Status.FAILED); + } else { + jobData.setStatus(EventPublisherJob.Status.COMPLETED); + } } } @@ -390,4 +395,8 @@ public class SearchIndexWorkflow implements Runnable { private FailureDetails getFailureDetails(String context, String reason, long time) { return new FailureDetails().withContext(context).withLastFailedReason(reason).withLastFailedAt(time); } + + public void stopJob() { + stopped = true; + } } diff --git a/openmetadata-spec/src/main/resources/json/schema/system/eventPublisherJob.json b/openmetadata-spec/src/main/resources/json/schema/system/eventPublisherJob.json index 7a84e771266..ba70e989059 100644 --- a/openmetadata-spec/src/main/resources/json/schema/system/eventPublisherJob.json +++ b/openmetadata-spec/src/main/resources/json/schema/system/eventPublisherJob.json @@ -124,7 +124,8 @@ "COMPLETED", "FAILED", "ACTIVE", - "ACTIVE_WITH_ERROR" + "ACTIVE_WITH_ERROR", + "STOPPED" ] }, "failure": { diff --git a/openmetadata-ui/src/main/resources/ui/src/jsons/en.ts b/openmetadata-ui/src/main/resources/ui/src/jsons/en.ts index 99b319f3f0b..5a5fec37bd7 100644 --- a/openmetadata-ui/src/main/resources/ui/src/jsons/en.ts +++ b/openmetadata-ui/src/main/resources/ui/src/jsons/en.ts @@ -189,6 +189,7 @@ const jsonData = { 'account-verify-success': 'Email verified successfully!', 'update-password-success': 'Password updated successfully!', 'fetch-re-index-all': 'Re-index started', + 'stop-re-index': 'Re-indexing Stopped', }, 'form-error-messages': { 'empty-email': 'Email is required.', diff --git a/openmetadata-ui/src/main/resources/ui/src/locale/languages/en-us.json b/openmetadata-ui/src/main/resources/ui/src/locale/languages/en-us.json index 1775894f4f7..5192f2b9443 100644 --- a/openmetadata-ui/src/main/resources/ui/src/locale/languages/en-us.json +++ b/openmetadata-ui/src/main/resources/ui/src/locale/languages/en-us.json @@ -745,6 +745,8 @@ "started-following": "Started following", "status": "Status", "stay-up-to-date": "Stay Up-to-date", + "stop-re-index-all": "Stop Re-Index", + "stopped": "Stopped", "sub-team-plural": "Sub Teams", "submit": "Submit", "success": "Success", diff --git a/openmetadata-ui/src/main/resources/ui/src/locale/languages/es-es.json b/openmetadata-ui/src/main/resources/ui/src/locale/languages/es-es.json index 7478ab9bf95..2d6251775a8 100644 --- a/openmetadata-ui/src/main/resources/ui/src/locale/languages/es-es.json +++ b/openmetadata-ui/src/main/resources/ui/src/locale/languages/es-es.json @@ -745,6 +745,8 @@ "started-following": "Comenzó a seguir", "status": "Estado", "stay-up-to-date": "Manténgase Actualizado", + "stop-re-index-all": "Stop Re-Index", + "stopped": "Stopped", "sub-team-plural": "Sub Equipos", "submit": "Enviar", "success": "Éxito", diff --git a/openmetadata-ui/src/main/resources/ui/src/locale/languages/fr-fr.json b/openmetadata-ui/src/main/resources/ui/src/locale/languages/fr-fr.json index 817e46df7c4..b2dc403646d 100644 --- a/openmetadata-ui/src/main/resources/ui/src/locale/languages/fr-fr.json +++ b/openmetadata-ui/src/main/resources/ui/src/locale/languages/fr-fr.json @@ -745,6 +745,8 @@ "started-following": "Started following", "status": "Statut", "stay-up-to-date": "Stay Up-to-date", + "stop-re-index-all": "Stop Re-Index", + "stopped": "Stopped", "sub-team-plural": "Sub Teams", "submit": "Envoi", "success": "Succès", diff --git a/openmetadata-ui/src/main/resources/ui/src/locale/languages/ja-jp.json b/openmetadata-ui/src/main/resources/ui/src/locale/languages/ja-jp.json index 92be91b9b45..293f77c4138 100644 --- a/openmetadata-ui/src/main/resources/ui/src/locale/languages/ja-jp.json +++ b/openmetadata-ui/src/main/resources/ui/src/locale/languages/ja-jp.json @@ -745,6 +745,8 @@ "started-following": "フォローを開始", "status": "ステータス", "stay-up-to-date": "最新を維持", + "stop-re-index-all": "Stop Re-Index", + "stopped": "Stopped", "sub-team-plural": "サブチーム", "submit": "Submit", "success": "成功", diff --git a/openmetadata-ui/src/main/resources/ui/src/locale/languages/pt-br.json b/openmetadata-ui/src/main/resources/ui/src/locale/languages/pt-br.json index d756ffbb1af..5d08731d143 100644 --- a/openmetadata-ui/src/main/resources/ui/src/locale/languages/pt-br.json +++ b/openmetadata-ui/src/main/resources/ui/src/locale/languages/pt-br.json @@ -745,6 +745,8 @@ "started-following": "Começou a seguir", "status": "Status", "stay-up-to-date": "Mantenha-se atualizado", + "stop-re-index-all": "Stop Re-Index", + "stopped": "Stopped", "sub-team-plural": "Sub-equipes", "submit": "Enviar", "success": "Sucesso", diff --git a/openmetadata-ui/src/main/resources/ui/src/locale/languages/zh-cn.json b/openmetadata-ui/src/main/resources/ui/src/locale/languages/zh-cn.json index 60a8c1d975d..1f018278e73 100644 --- a/openmetadata-ui/src/main/resources/ui/src/locale/languages/zh-cn.json +++ b/openmetadata-ui/src/main/resources/ui/src/locale/languages/zh-cn.json @@ -745,6 +745,8 @@ "started-following": "Started following", "status": "状态", "stay-up-to-date": "Stay Up-to-date", + "stop-re-index-all": "Stop Re-Index", + "stopped": "Stopped", "sub-team-plural": "Sub Teams", "submit": "提交", "success": "成功", diff --git a/openmetadata-ui/src/main/resources/ui/src/pages/ElasticSearchIndexPage/ElasticSearchReIndexPage.component.tsx b/openmetadata-ui/src/main/resources/ui/src/pages/ElasticSearchIndexPage/ElasticSearchReIndexPage.component.tsx index df792411037..3a33f105ade 100644 --- a/openmetadata-ui/src/main/resources/ui/src/pages/ElasticSearchIndexPage/ElasticSearchReIndexPage.component.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/pages/ElasticSearchIndexPage/ElasticSearchReIndexPage.component.tsx @@ -28,6 +28,7 @@ import { getBatchJobReIndexStatus, getStreamJobReIndexStatus, reIndexByPublisher, + stopBatchJobReIndex, } from 'rest/elasticSearchReIndexAPI'; import { SOCKET_EVENTS } from '../../constants/constants'; import { CreateEventPublisherJob } from '../../generated/api/createEventPublisherJob'; @@ -73,6 +74,15 @@ const ElasticSearchIndexPage = () => { } }; + const stopBatchReIndexedJob = async () => { + try { + const response = await stopBatchJobReIndex(batchJobData?.id); + showSuccessToast(jsonData['api-success-messages']['stop-re-index']); + } catch (error) { + showErrorToast(error as AxiosError); + } + }; + const fetchStreamReIndexedData = async () => { try { setStreamLoading(true); @@ -162,6 +172,14 @@ const ElasticSearchIndexPage = () => { title={t('label.refresh-log')} onClick={fetchBatchReIndexedData} /> +