Add Stop Functionality For Reindex (#10971)

* Add stop Reindexing Job

* Languages

* Add Status

* Add Status

* Add messages
This commit is contained in:
Mohit Yadav 2023-04-10 13:15:44 +05:30 committed by GitHub
parent 751919e6ca
commit ee4f1b4e51
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 89 additions and 8 deletions

View File

@ -36,6 +36,7 @@ import javax.validation.Valid;
import javax.ws.rs.DefaultValue; import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET; import javax.ws.rs.GET;
import javax.ws.rs.POST; import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path; import javax.ws.rs.Path;
import javax.ws.rs.PathParam; import javax.ws.rs.PathParam;
import javax.ws.rs.Produces; import javax.ws.rs.Produces;
@ -520,6 +521,24 @@ public class SearchResource {
.build(); .build();
} }
@PUT
@Path("/reindex/stop/{jobId}")
@Operation(
operationId = "stopAJobWithId",
summary = "Stop Reindex Job",
description = "Stop a Reindex Job",
responses = {
@ApiResponse(responseCode = "200", description = "Success"),
@ApiResponse(responseCode = "404", description = "Bot for instance {id} is not found")
})
public Response stopReindexJob(
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "jobId Id", schema = @Schema(type = "UUID")) @PathParam("jobId") UUID id) {
authorizer.authorizeAdmin(securityContext);
return Response.status(Response.Status.OK).entity(ReIndexingHandler.getInstance().stopRunningJob(id)).build();
}
private SearchSourceBuilder buildAggregateSearchBuilder(String query, int from, int size) { private SearchSourceBuilder buildAggregateSearchBuilder(String query, int from, int size) {
QueryStringQueryBuilder queryBuilder = QueryBuilders.queryStringQuery(query).lenient(true); QueryStringQueryBuilder queryBuilder = QueryBuilders.queryStringQuery(query).lenient(true);
SearchSourceBuilder searchSourceBuilder = searchBuilder(queryBuilder, null, from, size); SearchSourceBuilder searchSourceBuilder = searchBuilder(queryBuilder, null, from, size);

View File

@ -31,6 +31,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import javax.ws.rs.core.Response;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.RestHighLevelClient;
@ -40,6 +41,7 @@ import org.openmetadata.schema.system.Failure;
import org.openmetadata.schema.system.Stats; import org.openmetadata.schema.system.Stats;
import org.openmetadata.service.Entity; import org.openmetadata.service.Entity;
import org.openmetadata.service.elasticsearch.ElasticSearchIndexDefinition; import org.openmetadata.service.elasticsearch.ElasticSearchIndexDefinition;
import org.openmetadata.service.exception.CustomExceptionMessage;
import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.workflows.searchIndex.ReindexingUtil; import org.openmetadata.service.workflows.searchIndex.ReindexingUtil;
import org.openmetadata.service.workflows.searchIndex.SearchIndexWorkflow; import org.openmetadata.service.workflows.searchIndex.SearchIndexWorkflow;
@ -132,6 +134,15 @@ public class ReIndexingHandler {
&& entry.getValue().getJobData().getStatus() != EventPublisherJob.Status.RUNNING); && entry.getValue().getJobData().getStatus() != EventPublisherJob.Status.RUNNING);
} }
public EventPublisherJob stopRunningJob(UUID jobId) {
SearchIndexWorkflow job = REINDEXING_JOB_MAP.get(jobId);
if (job != null) {
job.stopJob();
return job.getJobData();
}
throw new CustomExceptionMessage(Response.Status.BAD_REQUEST, "Job is not in Running state.");
}
private void validateJob(CreateEventPublisherJob job) { private void validateJob(CreateEventPublisherJob job) {
Objects.requireNonNull(job); Objects.requireNonNull(job);
Set<String> storedEntityList = new HashSet<>(Entity.getEntityList()); Set<String> storedEntityList = new HashSet<>(Entity.getEntityList());

View File

@ -63,6 +63,7 @@ public class SearchIndexWorkflow implements Runnable {
private final ElasticSearchIndexDefinition elasticSearchIndexDefinition; private final ElasticSearchIndexDefinition elasticSearchIndexDefinition;
@Getter private final EventPublisherJob jobData; @Getter private final EventPublisherJob jobData;
private final CollectionDAO dao; private final CollectionDAO dao;
private volatile boolean stopped = false;
public SearchIndexWorkflow( public SearchIndexWorkflow(
CollectionDAO dao, CollectionDAO dao,
@ -127,7 +128,7 @@ public class SearchIndexWorkflow implements Runnable {
reCreateIndexes(paginatedEntitiesSource.getEntityType()); reCreateIndexes(paginatedEntitiesSource.getEntityType());
contextData.put(ENTITY_TYPE_KEY, paginatedEntitiesSource.getEntityType()); contextData.put(ENTITY_TYPE_KEY, paginatedEntitiesSource.getEntityType());
ResultList<? extends EntityInterface> resultList; ResultList<? extends EntityInterface> resultList;
while (!paginatedEntitiesSource.isDone()) { while (!stopped && !paginatedEntitiesSource.isDone()) {
long currentTime = System.currentTimeMillis(); long currentTime = System.currentTimeMillis();
int requestToProcess = jobData.getBatchSize(); int requestToProcess = jobData.getBatchSize();
int failed = requestToProcess; int failed = requestToProcess;
@ -188,7 +189,7 @@ public class SearchIndexWorkflow implements Runnable {
reCreateIndexes(paginatedDataInsightSource.getEntityType()); reCreateIndexes(paginatedDataInsightSource.getEntityType());
contextData.put(ENTITY_TYPE_KEY, paginatedDataInsightSource.getEntityType()); contextData.put(ENTITY_TYPE_KEY, paginatedDataInsightSource.getEntityType());
ResultList<ReportData> resultList; ResultList<ReportData> resultList;
while (!paginatedDataInsightSource.isDone()) { while (!stopped && !paginatedDataInsightSource.isDone()) {
long currentTime = System.currentTimeMillis(); long currentTime = System.currentTimeMillis();
int requestToProcess = jobData.getBatchSize(); int requestToProcess = jobData.getBatchSize();
int failed = requestToProcess; int failed = requestToProcess;
@ -374,12 +375,16 @@ public class SearchIndexWorkflow implements Runnable {
} }
private void updateJobStatus() { private void updateJobStatus() {
if (jobData.getFailure().getSinkError() != null if (stopped) {
|| jobData.getFailure().getSourceError() != null jobData.setStatus(EventPublisherJob.Status.STOPPED);
|| jobData.getFailure().getProcessorError() != null) {
jobData.setStatus(EventPublisherJob.Status.FAILED);
} else { } else {
jobData.setStatus(EventPublisherJob.Status.COMPLETED); if (jobData.getFailure().getSinkError() != null
|| jobData.getFailure().getSourceError() != null
|| jobData.getFailure().getProcessorError() != null) {
jobData.setStatus(EventPublisherJob.Status.FAILED);
} else {
jobData.setStatus(EventPublisherJob.Status.COMPLETED);
}
} }
} }
@ -390,4 +395,8 @@ public class SearchIndexWorkflow implements Runnable {
private FailureDetails getFailureDetails(String context, String reason, long time) { private FailureDetails getFailureDetails(String context, String reason, long time) {
return new FailureDetails().withContext(context).withLastFailedReason(reason).withLastFailedAt(time); return new FailureDetails().withContext(context).withLastFailedReason(reason).withLastFailedAt(time);
} }
public void stopJob() {
stopped = true;
}
} }

View File

@ -124,7 +124,8 @@
"COMPLETED", "COMPLETED",
"FAILED", "FAILED",
"ACTIVE", "ACTIVE",
"ACTIVE_WITH_ERROR" "ACTIVE_WITH_ERROR",
"STOPPED"
] ]
}, },
"failure": { "failure": {

View File

@ -189,6 +189,7 @@ const jsonData = {
'account-verify-success': 'Email verified successfully!', 'account-verify-success': 'Email verified successfully!',
'update-password-success': 'Password updated successfully!', 'update-password-success': 'Password updated successfully!',
'fetch-re-index-all': 'Re-index started', 'fetch-re-index-all': 'Re-index started',
'stop-re-index': 'Re-indexing Stopped',
}, },
'form-error-messages': { 'form-error-messages': {
'empty-email': 'Email is required.', 'empty-email': 'Email is required.',

View File

@ -745,6 +745,8 @@
"started-following": "Started following", "started-following": "Started following",
"status": "Status", "status": "Status",
"stay-up-to-date": "Stay Up-to-date", "stay-up-to-date": "Stay Up-to-date",
"stop-re-index-all": "Stop Re-Index",
"stopped": "Stopped",
"sub-team-plural": "Sub Teams", "sub-team-plural": "Sub Teams",
"submit": "Submit", "submit": "Submit",
"success": "Success", "success": "Success",

View File

@ -745,6 +745,8 @@
"started-following": "Comenzó a seguir", "started-following": "Comenzó a seguir",
"status": "Estado", "status": "Estado",
"stay-up-to-date": "Manténgase Actualizado", "stay-up-to-date": "Manténgase Actualizado",
"stop-re-index-all": "Stop Re-Index",
"stopped": "Stopped",
"sub-team-plural": "Sub Equipos", "sub-team-plural": "Sub Equipos",
"submit": "Enviar", "submit": "Enviar",
"success": "Éxito", "success": "Éxito",

View File

@ -745,6 +745,8 @@
"started-following": "Started following", "started-following": "Started following",
"status": "Statut", "status": "Statut",
"stay-up-to-date": "Stay Up-to-date", "stay-up-to-date": "Stay Up-to-date",
"stop-re-index-all": "Stop Re-Index",
"stopped": "Stopped",
"sub-team-plural": "Sub Teams", "sub-team-plural": "Sub Teams",
"submit": "Envoi", "submit": "Envoi",
"success": "Succès", "success": "Succès",

View File

@ -745,6 +745,8 @@
"started-following": "フォローを開始", "started-following": "フォローを開始",
"status": "ステータス", "status": "ステータス",
"stay-up-to-date": "最新を維持", "stay-up-to-date": "最新を維持",
"stop-re-index-all": "Stop Re-Index",
"stopped": "Stopped",
"sub-team-plural": "サブチーム", "sub-team-plural": "サブチーム",
"submit": "Submit", "submit": "Submit",
"success": "成功", "success": "成功",

View File

@ -745,6 +745,8 @@
"started-following": "Começou a seguir", "started-following": "Começou a seguir",
"status": "Status", "status": "Status",
"stay-up-to-date": "Mantenha-se atualizado", "stay-up-to-date": "Mantenha-se atualizado",
"stop-re-index-all": "Stop Re-Index",
"stopped": "Stopped",
"sub-team-plural": "Sub-equipes", "sub-team-plural": "Sub-equipes",
"submit": "Enviar", "submit": "Enviar",
"success": "Sucesso", "success": "Sucesso",

View File

@ -745,6 +745,8 @@
"started-following": "Started following", "started-following": "Started following",
"status": "状态", "status": "状态",
"stay-up-to-date": "Stay Up-to-date", "stay-up-to-date": "Stay Up-to-date",
"stop-re-index-all": "Stop Re-Index",
"stopped": "Stopped",
"sub-team-plural": "Sub Teams", "sub-team-plural": "Sub Teams",
"submit": "提交", "submit": "提交",
"success": "成功", "success": "成功",

View File

@ -28,6 +28,7 @@ import {
getBatchJobReIndexStatus, getBatchJobReIndexStatus,
getStreamJobReIndexStatus, getStreamJobReIndexStatus,
reIndexByPublisher, reIndexByPublisher,
stopBatchJobReIndex,
} from 'rest/elasticSearchReIndexAPI'; } from 'rest/elasticSearchReIndexAPI';
import { SOCKET_EVENTS } from '../../constants/constants'; import { SOCKET_EVENTS } from '../../constants/constants';
import { CreateEventPublisherJob } from '../../generated/api/createEventPublisherJob'; import { CreateEventPublisherJob } from '../../generated/api/createEventPublisherJob';
@ -73,6 +74,15 @@ const ElasticSearchIndexPage = () => {
} }
}; };
const stopBatchReIndexedJob = async () => {
try {
const response = await stopBatchJobReIndex(batchJobData?.id);
showSuccessToast(jsonData['api-success-messages']['stop-re-index']);
} catch (error) {
showErrorToast(error as AxiosError);
}
};
const fetchStreamReIndexedData = async () => { const fetchStreamReIndexedData = async () => {
try { try {
setStreamLoading(true); setStreamLoading(true);
@ -162,6 +172,14 @@ const ElasticSearchIndexPage = () => {
title={t('label.refresh-log')} title={t('label.refresh-log')}
onClick={fetchBatchReIndexedData} onClick={fetchBatchReIndexedData}
/> />
<Button
data-testid="elastic-search-stop-batch-re-index"
disabled={!isAdminUser}
size="small"
type="primary"
onClick={stopBatchReIndexedJob}>
{t('label.stop-re-index-all')}
</Button>
<Button <Button
data-testid="elastic-search-re-index-all" data-testid="elastic-search-re-index-all"
disabled={!isAdminUser} disabled={!isAdminUser}

View File

@ -33,6 +33,12 @@ export const getBatchJobReIndexStatus = async () => {
return res.data; return res.data;
}; };
export const stopBatchJobReIndex = async (id: string) => {
const response = await axiosClient.put(`search/reindex/stop/${id}`);
return response.data;
};
export const reIndexByPublisher = async (data: CreateEventPublisherJob) => { export const reIndexByPublisher = async (data: CreateEventPublisherJob) => {
const payload = { const payload = {
...data, ...data,

View File

@ -21,6 +21,8 @@ import { ReactComponent as IconSuccessBadge } from '../assets/svg/success-badge.
export const getStatusResultBadgeIcon = (status?: string) => { export const getStatusResultBadgeIcon = (status?: string) => {
switch (status) { switch (status) {
case Status.Stopped:
return <IconTaskOpen height={14} width={14} />;
case Status.Completed: case Status.Completed:
return <IconSuccessBadge height={14} width={14} />; return <IconSuccessBadge height={14} width={14} />;
@ -40,6 +42,8 @@ export const getStatusResultBadgeIcon = (status?: string) => {
export const getEventPublisherStatusText = (status?: string) => { export const getEventPublisherStatusText = (status?: string) => {
switch (status) { switch (status) {
case Status.Stopped:
return t('label.stopped');
case Status.Failed: case Status.Failed:
return t('label.failed'); return t('label.failed');
case Status.Running: case Status.Running: