diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java index d2d959c1624..8e23e7be7f4 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java @@ -53,7 +53,7 @@ public abstract class AbstractOmAppJobListener implements JobListener { .withRunType(runType) .withStatus(AppRunRecord.Status.RUNNING) .withScheduleInfo(jobApp.getAppSchedule()); - ; + boolean update = false; if (jobExecutionContext.isRecovering()) { AppRunRecord latestRunRecord = @@ -89,6 +89,7 @@ public abstract class AbstractOmAppJobListener implements JobListener { Object jobStats = jobExecutionContext.getJobDetail().getJobDataMap().get(APP_RUN_STATS); long endTime = System.currentTimeMillis(); runRecord.withEndTime(endTime); + runRecord.setExecutionTime(endTime - runRecord.getStartTime()); if (jobException == null && !(runRecord.getStatus() == AppRunRecord.Status.FAILED diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java index 3d3a5928f69..9c1d355bd76 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java @@ -1,6 +1,7 @@ package org.openmetadata.service.util; import static org.flywaydb.core.internal.info.MigrationInfoDumper.dumpToAsciiTable; +import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; import static org.openmetadata.service.Entity.FIELD_OWNER; import ch.qos.logback.classic.Level; @@ -29,6 +30,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; import javax.validation.Validator; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.flywaydb.core.Flyway; import org.flywaydb.core.api.MigrationVersion; @@ -53,6 +55,7 @@ import org.openmetadata.service.apps.ApplicationHandler; import org.openmetadata.service.apps.scheduler.AppScheduler; import org.openmetadata.service.clients.pipeline.PipelineServiceClientFactory; import org.openmetadata.service.exception.EntityNotFoundException; +import org.openmetadata.service.exception.UnhandledServerException; import org.openmetadata.service.fernet.Fernet; import org.openmetadata.service.jdbi3.AppRepository; import org.openmetadata.service.jdbi3.CollectionDAO; @@ -260,49 +263,86 @@ public class OpenMetadataOperations implements Callable { AppScheduler.initialize(config, collectionDAO, searchRepository); String appName = "SearchIndexingApplication"; - App searchIndexApp = - new App() - .withId(UUID.randomUUID()) - .withName(appName) - .withFullyQualifiedName(appName) - .withClassName("org.openmetadata.service.apps.bundles.searchIndex.SearchIndexApp") - .withAppSchedule(new AppSchedule().withScheduleTimeline(ScheduleTimeline.DAILY)) - .withAppConfiguration( - new EventPublisherJob() - .withEntities(new HashSet<>(List.of("all"))) - .withRecreateIndex(recreateIndexes) - .withBatchSize(batchSize) - .withSearchIndexMappingLanguage( - config.getElasticSearchConfiguration().getSearchIndexMappingLanguage())) - .withRuntime(new ScheduledExecutionContext().withEnabled(true)); + + App searchIndexApp = getSearchIndexingApp(appName, batchSize, recreateIndexes); AppScheduler.getInstance().triggerOnDemandApplication(searchIndexApp); - do { - Thread.sleep(3000l); - } while (!AppScheduler.getInstance().getScheduler().getCurrentlyExecutingJobs().isEmpty()); - AppRepository appRepository = (AppRepository) Entity.getEntityRepository(Entity.APPLICATION); - AppRunRecord appRunRecord = appRepository.getLatestAppRuns(searchIndexApp.getId()); - if (appRunRecord != null) { - List columns = - new ArrayList<>( - List.of("status", "startTime", "endTime", "executionTime", "success", "failure")); - List> rows = new ArrayList<>(); - rows.add( - Arrays.asList( - appRunRecord.getStatus().value(), - appRunRecord.getStartTime().toString(), - appRunRecord.getEndTime().toString(), - appRunRecord.getExecutionTime().toString(), - appRunRecord.getSuccessContext().toString(), - appRunRecord.getFailureContext().toString())); - printToAsciiTable(columns, rows, "No Search Indexing Application found"); - } - return 0; + return waitAndReturnReindexingAppStatus(searchIndexApp); } catch (Exception e) { LOG.error("Failed to reindex due to ", e); return 1; } } + private App getSearchIndexingApp(String appName, int batchSize, boolean recreateIndexes) { + try { + AppRepository appRepository = (AppRepository) Entity.getEntityRepository(Entity.APPLICATION); + App searchApp = + appRepository.getByName(null, "SearchIndexingApplication", appRepository.getFields("id")); + return searchApp; + } catch (EntityNotFoundException ex) { + return new App() + .withId(UUID.randomUUID()) + .withName(appName) + .withFullyQualifiedName(appName) + .withClassName("org.openmetadata.service.apps.bundles.searchIndex.SearchIndexApp") + .withAppSchedule(new AppSchedule().withScheduleTimeline(ScheduleTimeline.DAILY)) + .withAppConfiguration( + new EventPublisherJob() + .withEntities(new HashSet<>(List.of("all"))) + .withRecreateIndex(recreateIndexes) + .withBatchSize(batchSize) + .withSearchIndexMappingLanguage( + config.getElasticSearchConfiguration().getSearchIndexMappingLanguage())) + .withRuntime(new ScheduledExecutionContext().withEnabled(true)); + } + } + + @SneakyThrows + private int waitAndReturnReindexingAppStatus(App searchIndexApp) { + AppRunRecord appRunRecord; + do { + try { + AppRepository appRepository = + (AppRepository) Entity.getEntityRepository(Entity.APPLICATION); + appRunRecord = appRepository.getLatestAppRuns(searchIndexApp.getId()); + if (appRunRecord != null) { + List columns = + new ArrayList<>( + List.of("status", "startTime", "endTime", "executionTime", "success", "failure")); + List> rows = new ArrayList<>(); + rows.add( + Arrays.asList( + appRunRecord.getStatus().value(), + appRunRecord.getStartTime().toString(), + appRunRecord.getEndTime().toString(), + appRunRecord.getExecutionTime().toString(), + nullOrEmpty(appRunRecord.getSuccessContext()) + ? "Unavailable" + : JsonUtils.pojoToJson(appRunRecord.getSuccessContext()), + nullOrEmpty(appRunRecord.getFailureContext()) + ? "Unavailable" + : JsonUtils.pojoToJson(appRunRecord.getFailureContext()))); + printToAsciiTable(columns, rows, "Failed to run Search Reindexing"); + } + } catch (UnhandledServerException e) { + LOG.info( + "Reindexing Status not available yet, waiting for 10 seconds to fetch the status again."); + appRunRecord = null; + Thread.sleep(10000); + } + } while (appRunRecord == null + && (appRunRecord.getStatus().equals(AppRunRecord.Status.SUCCESS) + || appRunRecord.getStatus().equals(AppRunRecord.Status.FAILED))); + + if (appRunRecord.getStatus().equals(AppRunRecord.Status.SUCCESS) + || appRunRecord.getStatus().equals(AppRunRecord.Status.COMPLETED)) { + LOG.debug("Reindexing Completed Successfully."); + return 0; + } + LOG.error("Reindexing completed in Failure."); + return 1; + } + @Command(name = "deploy-pipelines", description = "Deploy all the service pipelines.") public Integer deployPipelines() { try {