diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/AbstractNativeApplication.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/AbstractNativeApplication.java index a784860d6bb..f92d546dbde 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/AbstractNativeApplication.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/AbstractNativeApplication.java @@ -70,7 +70,9 @@ public class AbstractNativeApplication implements NativeApplication { if (app.getAppType() == AppType.Internal && app.getScheduleType().equals(ScheduleType.Scheduled)) { try { + ApplicationHandler.getInstance().removeOldJobs(app); ApplicationHandler.getInstance().migrateQuartzConfig(app); + ApplicationHandler.getInstance().fixCorruptedInstallation(app); } catch (SchedulerException e) { throw AppException.byMessage( "ApplicationHandler", diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/ApplicationHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/ApplicationHandler.java index 6bab8df810e..54ffa8ce8b7 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/ApplicationHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/ApplicationHandler.java @@ -3,9 +3,11 @@ package org.openmetadata.service.apps; import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; import static org.openmetadata.service.apps.scheduler.AppScheduler.APPS_JOB_GROUP; import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_INFO_KEY; +import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_NAME; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.util.Collection; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.openmetadata.schema.api.configuration.apps.AppPrivateConfig; @@ -24,6 +26,7 @@ import org.quartz.JobDataMap; import org.quartz.JobDetail; import org.quartz.JobKey; import org.quartz.SchedulerException; +import org.quartz.impl.matchers.GroupMatcher; @Slf4j public class ApplicationHandler { @@ -159,4 +162,46 @@ public class ApplicationHandler { AppScheduler.getInstance().addApplicationSchedule(updatedApp); LOG.info("migrated app configuration for {}", application.getName()); } + + public void fixCorruptedInstallation(App application) throws SchedulerException { + JobDetail jobDetails = + AppScheduler.getInstance() + .getScheduler() + .getJobDetail(new JobKey(application.getName(), APPS_JOB_GROUP)); + if (jobDetails == null) { + return; + } + JobDataMap jobDataMap = jobDetails.getJobDataMap(); + if (jobDataMap == null) { + return; + } + String appName = jobDataMap.getString(APP_NAME); + if (appName == null) { + LOG.info("corrupt entry for app {}, reinstalling", application.getName()); + App app = appRepository.getDao().findEntityByName(application.getName()); + AppScheduler.getInstance().deleteScheduledApplication(app); + AppScheduler.getInstance().addApplicationSchedule(app); + } + } + + public void removeOldJobs(App app) throws SchedulerException { + Collection jobKeys = + AppScheduler.getInstance() + .getScheduler() + .getJobKeys(GroupMatcher.groupContains(APPS_JOB_GROUP)); + jobKeys.forEach( + jobKey -> { + try { + Class clz = + AppScheduler.getInstance().getScheduler().getJobDetail(jobKey).getJobClass(); + if (!jobKey.getName().equals(app.getName()) + && clz.getName().equals(app.getClassName())) { + LOG.info("deleting old job {}", jobKey.getName()); + AppScheduler.getInstance().getScheduler().deleteJob(jobKey); + } + } catch (SchedulerException e) { + LOG.error("Error deleting job {}", jobKey.getName(), e); + } + }); + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java index 1c105cf43ff..cadf140af8f 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java @@ -41,23 +41,25 @@ public abstract class AbstractOmAppJobListener implements JobListener { ApplicationHandler.getInstance().setAppRuntimeProperties(jobApp); JobDataMap dataMap = jobExecutionContext.getJobDetail().getJobDataMap(); long jobStartTime = System.currentTimeMillis(); - AppRunRecord runRecord; + AppRunRecord runRecord = + new AppRunRecord() + .withAppId(jobApp.getId()) + .withStartTime(jobStartTime) + .withTimestamp(jobStartTime) + .withRunType(runType) + .withStatus(AppRunRecord.Status.RUNNING) + .withScheduleInfo(jobApp.getAppSchedule()); + ; boolean update = false; if (jobExecutionContext.isRecovering()) { - runRecord = + AppRunRecord latestRunRecord = JsonUtils.readValue( collectionDAO.appExtensionTimeSeriesDao().getLatestAppRun(jobApp.getId()), AppRunRecord.class); + if (latestRunRecord != null) { + runRecord = latestRunRecord; + } update = true; - } else { - runRecord = - new AppRunRecord() - .withAppId(jobApp.getId()) - .withStartTime(jobStartTime) - .withTimestamp(jobStartTime) - .withRunType(runType) - .withStatus(AppRunRecord.Status.RUNNING) - .withScheduleInfo(jobApp.getAppSchedule()); } // Put the Context in the Job Data Map dataMap.put(SCHEDULED_APP_RUN_EXTENSION, JsonUtils.pojoToJson(runRecord)); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java index 6abefec69fd..98e84ccdf12 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java @@ -169,7 +169,7 @@ public class AppScheduler { private JobDetail jobBuilder(App app, String jobIdentity) throws ClassNotFoundException { JobDataMap dataMap = new JobDataMap(); - dataMap.put(APP_NAME, app.getFullyQualifiedName()); + dataMap.put(APP_NAME, app.getName()); dataMap.put("triggerType", app.getAppSchedule().getScheduleTimeline().value()); Class clz = (Class) Class.forName(app.getClassName());