- fix Reindex status in Operations (#16065)

* - fix Status

* - TODO stats print

* - Fix Stats

* - Update Error Message
This commit is contained in:
Mohit Yadav 2024-04-29 09:28:45 +05:30 committed by GitHub
parent c6afcce52d
commit 4272c5ffee
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 78 additions and 37 deletions

View File

@ -53,7 +53,7 @@ public abstract class AbstractOmAppJobListener implements JobListener {
.withRunType(runType)
.withStatus(AppRunRecord.Status.RUNNING)
.withScheduleInfo(jobApp.getAppSchedule());
;
boolean update = false;
if (jobExecutionContext.isRecovering()) {
AppRunRecord latestRunRecord =
@ -89,6 +89,7 @@ public abstract class AbstractOmAppJobListener implements JobListener {
Object jobStats = jobExecutionContext.getJobDetail().getJobDataMap().get(APP_RUN_STATS);
long endTime = System.currentTimeMillis();
runRecord.withEndTime(endTime);
runRecord.setExecutionTime(endTime - runRecord.getStartTime());
if (jobException == null
&& !(runRecord.getStatus() == AppRunRecord.Status.FAILED

View File

@ -1,6 +1,7 @@
package org.openmetadata.service.util;
import static org.flywaydb.core.internal.info.MigrationInfoDumper.dumpToAsciiTable;
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import static org.openmetadata.service.Entity.FIELD_OWNER;
import ch.qos.logback.classic.Level;
@ -29,6 +30,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import javax.validation.Validator;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.flywaydb.core.Flyway;
import org.flywaydb.core.api.MigrationVersion;
@ -53,6 +55,7 @@ import org.openmetadata.service.apps.ApplicationHandler;
import org.openmetadata.service.apps.scheduler.AppScheduler;
import org.openmetadata.service.clients.pipeline.PipelineServiceClientFactory;
import org.openmetadata.service.exception.EntityNotFoundException;
import org.openmetadata.service.exception.UnhandledServerException;
import org.openmetadata.service.fernet.Fernet;
import org.openmetadata.service.jdbi3.AppRepository;
import org.openmetadata.service.jdbi3.CollectionDAO;
@ -260,49 +263,86 @@ public class OpenMetadataOperations implements Callable<Integer> {
AppScheduler.initialize(config, collectionDAO, searchRepository);
String appName = "SearchIndexingApplication";
App searchIndexApp =
new App()
.withId(UUID.randomUUID())
.withName(appName)
.withFullyQualifiedName(appName)
.withClassName("org.openmetadata.service.apps.bundles.searchIndex.SearchIndexApp")
.withAppSchedule(new AppSchedule().withScheduleTimeline(ScheduleTimeline.DAILY))
.withAppConfiguration(
new EventPublisherJob()
.withEntities(new HashSet<>(List.of("all")))
.withRecreateIndex(recreateIndexes)
.withBatchSize(batchSize)
.withSearchIndexMappingLanguage(
config.getElasticSearchConfiguration().getSearchIndexMappingLanguage()))
.withRuntime(new ScheduledExecutionContext().withEnabled(true));
App searchIndexApp = getSearchIndexingApp(appName, batchSize, recreateIndexes);
AppScheduler.getInstance().triggerOnDemandApplication(searchIndexApp);
do {
Thread.sleep(3000l);
} while (!AppScheduler.getInstance().getScheduler().getCurrentlyExecutingJobs().isEmpty());
AppRepository appRepository = (AppRepository) Entity.getEntityRepository(Entity.APPLICATION);
AppRunRecord appRunRecord = appRepository.getLatestAppRuns(searchIndexApp.getId());
if (appRunRecord != null) {
List<String> columns =
new ArrayList<>(
List.of("status", "startTime", "endTime", "executionTime", "success", "failure"));
List<List<String>> rows = new ArrayList<>();
rows.add(
Arrays.asList(
appRunRecord.getStatus().value(),
appRunRecord.getStartTime().toString(),
appRunRecord.getEndTime().toString(),
appRunRecord.getExecutionTime().toString(),
appRunRecord.getSuccessContext().toString(),
appRunRecord.getFailureContext().toString()));
printToAsciiTable(columns, rows, "No Search Indexing Application found");
}
return 0;
return waitAndReturnReindexingAppStatus(searchIndexApp);
} catch (Exception e) {
LOG.error("Failed to reindex due to ", e);
return 1;
}
}
private App getSearchIndexingApp(String appName, int batchSize, boolean recreateIndexes) {
try {
AppRepository appRepository = (AppRepository) Entity.getEntityRepository(Entity.APPLICATION);
App searchApp =
appRepository.getByName(null, "SearchIndexingApplication", appRepository.getFields("id"));
return searchApp;
} catch (EntityNotFoundException ex) {
return new App()
.withId(UUID.randomUUID())
.withName(appName)
.withFullyQualifiedName(appName)
.withClassName("org.openmetadata.service.apps.bundles.searchIndex.SearchIndexApp")
.withAppSchedule(new AppSchedule().withScheduleTimeline(ScheduleTimeline.DAILY))
.withAppConfiguration(
new EventPublisherJob()
.withEntities(new HashSet<>(List.of("all")))
.withRecreateIndex(recreateIndexes)
.withBatchSize(batchSize)
.withSearchIndexMappingLanguage(
config.getElasticSearchConfiguration().getSearchIndexMappingLanguage()))
.withRuntime(new ScheduledExecutionContext().withEnabled(true));
}
}
@SneakyThrows
private int waitAndReturnReindexingAppStatus(App searchIndexApp) {
AppRunRecord appRunRecord;
do {
try {
AppRepository appRepository =
(AppRepository) Entity.getEntityRepository(Entity.APPLICATION);
appRunRecord = appRepository.getLatestAppRuns(searchIndexApp.getId());
if (appRunRecord != null) {
List<String> columns =
new ArrayList<>(
List.of("status", "startTime", "endTime", "executionTime", "success", "failure"));
List<List<String>> rows = new ArrayList<>();
rows.add(
Arrays.asList(
appRunRecord.getStatus().value(),
appRunRecord.getStartTime().toString(),
appRunRecord.getEndTime().toString(),
appRunRecord.getExecutionTime().toString(),
nullOrEmpty(appRunRecord.getSuccessContext())
? "Unavailable"
: JsonUtils.pojoToJson(appRunRecord.getSuccessContext()),
nullOrEmpty(appRunRecord.getFailureContext())
? "Unavailable"
: JsonUtils.pojoToJson(appRunRecord.getFailureContext())));
printToAsciiTable(columns, rows, "Failed to run Search Reindexing");
}
} catch (UnhandledServerException e) {
LOG.info(
"Reindexing Status not available yet, waiting for 10 seconds to fetch the status again.");
appRunRecord = null;
Thread.sleep(10000);
}
} while (appRunRecord == null
&& (appRunRecord.getStatus().equals(AppRunRecord.Status.SUCCESS)
|| appRunRecord.getStatus().equals(AppRunRecord.Status.FAILED)));
if (appRunRecord.getStatus().equals(AppRunRecord.Status.SUCCESS)
|| appRunRecord.getStatus().equals(AppRunRecord.Status.COMPLETED)) {
LOG.debug("Reindexing Completed Successfully.");
return 0;
}
LOG.error("Reindexing completed in Failure.");
return 1;
}
@Command(name = "deploy-pipelines", description = "Deploy all the service pipelines.")
public Integer deployPipelines() {
try {