MINOR: fix application in corrupted state (#15830)

* fix application in corrupted state

* format

* remove old versions of the app

* format

* use name instead of fqn
This commit is contained in:
Imri Paran 2024-04-09 07:58:34 +02:00 committed by GitHub
parent a41dc54667
commit d9a7ebe5e1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 61 additions and 12 deletions

View File

@ -70,7 +70,9 @@ public class AbstractNativeApplication implements NativeApplication {
if (app.getAppType() == AppType.Internal
&& app.getScheduleType().equals(ScheduleType.Scheduled)) {
try {
ApplicationHandler.getInstance().removeOldJobs(app);
ApplicationHandler.getInstance().migrateQuartzConfig(app);
ApplicationHandler.getInstance().fixCorruptedInstallation(app);
} catch (SchedulerException e) {
throw AppException.byMessage(
"ApplicationHandler",

View File

@ -3,9 +3,11 @@ package org.openmetadata.service.apps;
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import static org.openmetadata.service.apps.scheduler.AppScheduler.APPS_JOB_GROUP;
import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_INFO_KEY;
import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_NAME;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collection;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.api.configuration.apps.AppPrivateConfig;
@ -24,6 +26,7 @@ import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobKey;
import org.quartz.SchedulerException;
import org.quartz.impl.matchers.GroupMatcher;
@Slf4j
public class ApplicationHandler {
@ -159,4 +162,46 @@ public class ApplicationHandler {
AppScheduler.getInstance().addApplicationSchedule(updatedApp);
LOG.info("migrated app configuration for {}", application.getName());
}
public void fixCorruptedInstallation(App application) throws SchedulerException {
JobDetail jobDetails =
AppScheduler.getInstance()
.getScheduler()
.getJobDetail(new JobKey(application.getName(), APPS_JOB_GROUP));
if (jobDetails == null) {
return;
}
JobDataMap jobDataMap = jobDetails.getJobDataMap();
if (jobDataMap == null) {
return;
}
String appName = jobDataMap.getString(APP_NAME);
if (appName == null) {
LOG.info("corrupt entry for app {}, reinstalling", application.getName());
App app = appRepository.getDao().findEntityByName(application.getName());
AppScheduler.getInstance().deleteScheduledApplication(app);
AppScheduler.getInstance().addApplicationSchedule(app);
}
}
public void removeOldJobs(App app) throws SchedulerException {
Collection<JobKey> jobKeys =
AppScheduler.getInstance()
.getScheduler()
.getJobKeys(GroupMatcher.groupContains(APPS_JOB_GROUP));
jobKeys.forEach(
jobKey -> {
try {
Class<?> clz =
AppScheduler.getInstance().getScheduler().getJobDetail(jobKey).getJobClass();
if (!jobKey.getName().equals(app.getName())
&& clz.getName().equals(app.getClassName())) {
LOG.info("deleting old job {}", jobKey.getName());
AppScheduler.getInstance().getScheduler().deleteJob(jobKey);
}
} catch (SchedulerException e) {
LOG.error("Error deleting job {}", jobKey.getName(), e);
}
});
}
}

View File

@ -41,23 +41,25 @@ public abstract class AbstractOmAppJobListener implements JobListener {
ApplicationHandler.getInstance().setAppRuntimeProperties(jobApp);
JobDataMap dataMap = jobExecutionContext.getJobDetail().getJobDataMap();
long jobStartTime = System.currentTimeMillis();
AppRunRecord runRecord;
AppRunRecord runRecord =
new AppRunRecord()
.withAppId(jobApp.getId())
.withStartTime(jobStartTime)
.withTimestamp(jobStartTime)
.withRunType(runType)
.withStatus(AppRunRecord.Status.RUNNING)
.withScheduleInfo(jobApp.getAppSchedule());
;
boolean update = false;
if (jobExecutionContext.isRecovering()) {
runRecord =
AppRunRecord latestRunRecord =
JsonUtils.readValue(
collectionDAO.appExtensionTimeSeriesDao().getLatestAppRun(jobApp.getId()),
AppRunRecord.class);
if (latestRunRecord != null) {
runRecord = latestRunRecord;
}
update = true;
} else {
runRecord =
new AppRunRecord()
.withAppId(jobApp.getId())
.withStartTime(jobStartTime)
.withTimestamp(jobStartTime)
.withRunType(runType)
.withStatus(AppRunRecord.Status.RUNNING)
.withScheduleInfo(jobApp.getAppSchedule());
}
// Put the Context in the Job Data Map
dataMap.put(SCHEDULED_APP_RUN_EXTENSION, JsonUtils.pojoToJson(runRecord));

View File

@ -169,7 +169,7 @@ public class AppScheduler {
private JobDetail jobBuilder(App app, String jobIdentity) throws ClassNotFoundException {
JobDataMap dataMap = new JobDataMap();
dataMap.put(APP_NAME, app.getFullyQualifiedName());
dataMap.put(APP_NAME, app.getName());
dataMap.put("triggerType", app.getAppSchedule().getScheduleTimeline().value());
Class<? extends NativeApplication> clz =
(Class<? extends NativeApplication>) Class.forName(app.getClassName());