diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java index 218358d9544..1bbe997598f 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java @@ -2,6 +2,7 @@ package org.openmetadata.service.apps.bundles.searchIndex; import static org.openmetadata.schema.system.IndexingError.ErrorSource.READER; import static org.openmetadata.service.apps.scheduler.AbstractOmAppJobListener.APP_RUN_STATS; +import static org.openmetadata.service.apps.scheduler.AppScheduler.ON_DEMAND_JOB; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.ENTITY_TYPE_KEY; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getTotalRequestToProcess; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.isDataInsightIndex; @@ -19,7 +20,6 @@ import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.analytics.ReportData; import org.openmetadata.schema.entity.app.App; import org.openmetadata.schema.entity.app.AppRunRecord; -import org.openmetadata.schema.entity.app.AppRunType; import org.openmetadata.schema.entity.app.FailureContext; import org.openmetadata.schema.entity.app.SuccessContext; import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration; @@ -110,58 +110,23 @@ public class SearchIndexApp extends AbstractNativeApplication { if (request.getEntities().contains(ALL)) { request.setEntities(ALL_ENTITIES); } - int totalRecords = getTotalRequestToProcess(request.getEntities(), collectionDAO); - this.jobData = request; - this.jobData.setStats( - new Stats() - .withJobStats( - new StepStats() - .withTotalRecords(totalRecords) - .withFailedRecords(0) - .withSuccessRecords(0))); - request - .getEntities() - .forEach( - entityType -> { - if (!isDataInsightIndex(entityType)) { - List fields = List.of("*"); - PaginatedEntitiesSource source = - new PaginatedEntitiesSource(entityType, jobData.getBatchSize(), fields); - if (!CommonUtil.nullOrEmpty(request.getAfterCursor())) { - source.setCursor(request.getAfterCursor()); - } - paginatedEntitiesSources.add(source); - } else { - paginatedDataInsightSources.add( - new PaginatedDataInsightSource( - collectionDAO, entityType, jobData.getBatchSize())); - } - }); - if (searchRepository.getSearchType().equals(ElasticSearchConfiguration.SearchType.OPENSEARCH)) { - this.entityProcessor = new OpenSearchEntitiesProcessor(totalRecords); - this.dataInsightProcessor = new OpenSearchDataInsightProcessor(totalRecords); - this.searchIndexSink = new OpenSearchIndexSink(searchRepository, totalRecords); - } else { - this.entityProcessor = new ElasticSearchEntitiesProcessor(totalRecords); - this.dataInsightProcessor = new ElasticSearchDataInsightProcessor(totalRecords); - this.searchIndexSink = new ElasticSearchIndexSink(searchRepository, totalRecords); - } + jobData = request; } @Override public void startApp(JobExecutionContext jobExecutionContext) { try { + initializeJob(); LOG.info("Executing Reindexing Job with JobData : {}", jobData); // Update Job Status jobData.setStatus(EventPublisherJob.Status.RUNNING); // Make recreate as false for onDemand - AppRunType runType = - AppRunType.fromValue( - (String) jobExecutionContext.getJobDetail().getJobDataMap().get("triggerType")); + String runType = + (String) jobExecutionContext.getJobDetail().getJobDataMap().get("triggerType"); - // Schedule Run has recreate as false always - if (runType.equals(AppRunType.Scheduled)) { + // Schedule Run has re-create set to false + if (!runType.equals(ON_DEMAND_JOB)) { jobData.setRecreateIndex(false); } @@ -187,6 +152,44 @@ public class SearchIndexApp extends AbstractNativeApplication { } } + private void initializeJob() { + int totalRecords = getTotalRequestToProcess(jobData.getEntities(), collectionDAO); + this.jobData.setStats( + new Stats() + .withJobStats( + new StepStats() + .withTotalRecords(totalRecords) + .withFailedRecords(0) + .withSuccessRecords(0))); + jobData + .getEntities() + .forEach( + entityType -> { + if (!isDataInsightIndex(entityType)) { + List fields = List.of("*"); + PaginatedEntitiesSource source = + new PaginatedEntitiesSource(entityType, jobData.getBatchSize(), fields); + if (!CommonUtil.nullOrEmpty(jobData.getAfterCursor())) { + source.setCursor(jobData.getAfterCursor()); + } + paginatedEntitiesSources.add(source); + } else { + paginatedDataInsightSources.add( + new PaginatedDataInsightSource( + collectionDAO, entityType, jobData.getBatchSize())); + } + }); + if (searchRepository.getSearchType().equals(ElasticSearchConfiguration.SearchType.OPENSEARCH)) { + this.entityProcessor = new OpenSearchEntitiesProcessor(totalRecords); + this.dataInsightProcessor = new OpenSearchDataInsightProcessor(totalRecords); + this.searchIndexSink = new OpenSearchIndexSink(searchRepository, totalRecords); + } else { + this.entityProcessor = new ElasticSearchEntitiesProcessor(totalRecords); + this.dataInsightProcessor = new ElasticSearchDataInsightProcessor(totalRecords); + this.searchIndexSink = new ElasticSearchIndexSink(searchRepository, totalRecords); + } + } + public void updateRecordToDb(JobExecutionContext jobExecutionContext) { AppRunRecord appRecord = getJobRecord(jobExecutionContext); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java index 72be015f76c..92ec89099f1 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java @@ -8,7 +8,6 @@ import java.util.UUID; import org.apache.commons.lang.exception.ExceptionUtils; import org.openmetadata.schema.entity.app.App; import org.openmetadata.schema.entity.app.AppRunRecord; -import org.openmetadata.schema.entity.app.AppRunType; import org.openmetadata.schema.entity.app.FailureContext; import org.openmetadata.schema.entity.app.SuccessContext; import org.openmetadata.service.jdbi3.CollectionDAO; @@ -35,9 +34,7 @@ public abstract class AbstractOmAppJobListener implements JobListener { @Override public void jobToBeExecuted(JobExecutionContext jobExecutionContext) { - AppRunType runType = - AppRunType.fromValue( - (String) jobExecutionContext.getJobDetail().getJobDataMap().get("triggerType")); + String runType = (String) jobExecutionContext.getJobDetail().getJobDataMap().get("triggerType"); App jobApp = JsonUtils.readOrConvertValue( jobExecutionContext.getJobDetail().getJobDataMap().get(APP_INFO_KEY), App.class); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java index 2908783d71b..cb02c3f7f96 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java @@ -17,7 +17,6 @@ import lombok.extern.slf4j.Slf4j; import org.openmetadata.common.utils.CommonUtil; import org.openmetadata.schema.AppRuntime; import org.openmetadata.schema.entity.app.App; -import org.openmetadata.schema.entity.app.AppRunType; import org.openmetadata.schema.entity.app.AppSchedule; import org.openmetadata.schema.entity.app.ScheduleTimeline; import org.openmetadata.service.OpenMetadataApplicationConfig; @@ -44,6 +43,7 @@ import org.quartz.impl.StdSchedulerFactory; @Slf4j public class AppScheduler { private static final Map defaultAppScheduleConfig = new HashMap<>(); + public static final String ON_DEMAND_JOB = "OnDemandJob"; static { defaultAppScheduleConfig.put("org.quartz.scheduler.instanceName", "AppScheduler"); @@ -143,11 +143,7 @@ public class AppScheduler { AppRuntime context = getAppRuntime(application); if (Boolean.TRUE.equals(context.getEnabled())) { JobDetail jobDetail = jobBuilder(application, application.getName()); - if (!application - .getAppSchedule() - .getScheduleTimeline() - .value() - .equals(ScheduleTimeline.NONE)) { + if (!application.getAppSchedule().getScheduleTimeline().equals(ScheduleTimeline.NONE)) { Trigger trigger = trigger(application); scheduler.scheduleJob(jobDetail, trigger); } @@ -167,12 +163,9 @@ public class AppScheduler { // OnDemand Jobs scheduler.deleteJob( - new JobKey( - String.format("%s-%s", app.getName(), AppRunType.OnDemand.value()), APPS_JOB_GROUP)); + new JobKey(String.format("%s-%s", app.getName(), ON_DEMAND_JOB), APPS_JOB_GROUP)); scheduler.unscheduleJob( - new TriggerKey( - String.format("%s-%s", app.getName(), AppRunType.OnDemand.value()), - APPS_TRIGGER_GROUP)); + new TriggerKey(String.format("%s-%s", app.getName(), ON_DEMAND_JOB), APPS_TRIGGER_GROUP)); } private JobDetail jobBuilder(App app, String jobIdentity) throws ClassNotFoundException { @@ -230,8 +223,7 @@ public class AppScheduler { JobDetail jobDetailOnDemand = scheduler.getJobDetail( new JobKey( - String.format("%s-%s", application.getName(), AppRunType.OnDemand.value()), - APPS_JOB_GROUP)); + String.format("%s-%s", application.getName(), ON_DEMAND_JOB), APPS_JOB_GROUP)); // Check if the job is already running List currentJobs = scheduler.getCurrentlyExecutingJobs(); for (JobExecutionContext context : currentJobs) { @@ -247,14 +239,12 @@ public class AppScheduler { AppRuntime context = getAppRuntime(application); if (Boolean.TRUE.equals(context.getEnabled())) { JobDetail newJobDetail = - jobBuilder( - application, - String.format("%s-%s", application.getName(), AppRunType.OnDemand.value())); - newJobDetail.getJobDataMap().put("triggerType", AppRunType.OnDemand.value()); + jobBuilder(application, String.format("%s-%s", application.getName(), ON_DEMAND_JOB)); + newJobDetail.getJobDataMap().put("triggerType", ON_DEMAND_JOB); Trigger trigger = TriggerBuilder.newTrigger() .withIdentity( - String.format("%s-%s", application.getName(), AppRunType.OnDemand.value()), + String.format("%s-%s", application.getName(), ON_DEMAND_JOB), APPS_TRIGGER_GROUP) .startNow() .build(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java index 50cd202c5f6..0a26c3a6f0f 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java @@ -140,20 +140,6 @@ public class AppResource extends EntityResource { .installApplication(app, Entity.getCollectionDAO(), searchRepository); } } - - // Initialize installed applications - for (App installedApp : repository.listAll()) { - App appWithBot = getAppForInit(installedApp.getName()); - if (appWithBot == null) { - LOG.error( - String.format( - "Failed to init app [%s]. GET should return the installed app", - installedApp.getName())); - } else { - ApplicationHandler.getInstance().runAppInit(appWithBot, dao, searchRepository); - LOG.info(String.format("Initialized installed app [%s]", installedApp.getName())); - } - } } catch (Exception ex) { LOG.error("Failed in Create App Requests", ex); } diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/applications/appRunRecord.json b/openmetadata-spec/src/main/resources/json/schema/entity/applications/appRunRecord.json index 5f9378d7dc5..9ef46dcb9b3 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/applications/appRunRecord.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/applications/appRunRecord.json @@ -25,21 +25,8 @@ ] }, "runType": { - "javaType": "org.openmetadata.schema.entity.app.AppRunType", "description": "This schema defines the type of application Run.", - "type": "string", - "enum": [ - "Scheduled", - "OnDemand" - ], - "javaEnums": [ - { - "name": "Scheduled" - }, - { - "name": "OnDemand" - } - ] + "type": "string" }, "startTime": { "description": "Start of the job status.", diff --git a/openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppLogsViewer/AppLogsViewer.test.tsx b/openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppLogsViewer/AppLogsViewer.test.tsx index 88257deee68..fec8d841099 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppLogsViewer/AppLogsViewer.test.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppLogsViewer/AppLogsViewer.test.tsx @@ -14,7 +14,6 @@ import { render, screen } from '@testing-library/react'; import userEvent from '@testing-library/user-event'; import React from 'react'; import { - RunType, ScheduleTimeline, Status, } from '../../../../generated/entity/applications/appRunRecord'; @@ -60,7 +59,7 @@ const mockProps1 = { data: { appId: '6e4d3dcf-238d-4874-b4e4-dd863ede6544', status: Status.Success, - runType: RunType.OnDemand, + runType: 'OnDemand', startTime: 1706871884587, endTime: 1706871891251, timestamp: 1706871884587,