From de38ab53c0754e61b6415e0c48e3e51a2ec92223 Mon Sep 17 00:00:00 2001 From: Imri Paran Date: Tue, 26 Mar 2024 10:48:43 +0100 Subject: [PATCH] Fixes #15614: remove data from quartz (#15612) * feat(apps): remove data from quartz 1. Remove data from quartz application. Store only app id. 2. Added exception handling when trying to trigger an app. 3. Save bot info in the app data in DB. 4. Added runtime properties in AbstractNativeApplication.execute. --- .../apps/AbstractNativeApplication.java | 17 +++-- .../service/apps/ApplicationHandler.java | 47 +++++++++++++- .../insights/DataInsightsReportApp.java | 8 +-- .../scheduler/AbstractOmAppJobListener.java | 63 +++++++++++-------- .../service/apps/scheduler/AppScheduler.java | 6 +- .../service/jdbi3/AppRepository.java | 6 +- 6 files changed, 105 insertions(+), 42 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/AbstractNativeApplication.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/AbstractNativeApplication.java index ab7ddaa8b76..8660eb2ed9f 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/AbstractNativeApplication.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/AbstractNativeApplication.java @@ -1,10 +1,11 @@ package org.openmetadata.service.apps; import static org.openmetadata.service.apps.scheduler.AbstractOmAppJobListener.JOB_LISTENER_NAME; -import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_INFO_KEY; +import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_ID_KEY; import static org.openmetadata.service.exception.CatalogExceptionMessage.LIVE_APP_SCHEDULE_ERR; import java.util.List; +import java.util.UUID; import lombok.Getter; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -69,6 +70,14 @@ public class AbstractNativeApplication implements NativeApplication { } if (app.getAppType() == AppType.Internal && app.getScheduleType().equals(ScheduleType.Scheduled)) { + try { + ApplicationHandler.getInstance().migrateQuartzConfig(app); + } catch (SchedulerException e) { + throw AppException.byMessage( + "ApplicationHandler", + "SchedulerError", + "Error while migrating application configuration: " + app.getName()); + } scheduleInternal(); } else if (app.getAppType() == AppType.External && app.getScheduleType().equals(ScheduleType.Scheduled)) { @@ -203,9 +212,9 @@ public class AbstractNativeApplication implements NativeApplication { @Override public void execute(JobExecutionContext jobExecutionContext) { // This is the part of the code that is executed by the scheduler - App jobApp = - JsonUtils.readOrConvertValue( - jobExecutionContext.getJobDetail().getJobDataMap().get(APP_INFO_KEY), App.class); + UUID appID = (UUID) jobExecutionContext.getJobDetail().getJobDataMap().get(APP_ID_KEY); + App jobApp = collectionDAO.applicationDAO().findEntityById(appID); + ApplicationHandler.getInstance().setAppRuntimeProperties(jobApp); // Initialise the Application this.init(jobApp); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/ApplicationHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/ApplicationHandler.java index 24ed7b716da..7b91e33856b 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/ApplicationHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/ApplicationHandler.java @@ -1,6 +1,8 @@ package org.openmetadata.service.apps; import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; +import static org.openmetadata.service.apps.scheduler.AppScheduler.APPS_JOB_GROUP; +import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_INFO_KEY; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -10,10 +12,18 @@ import org.openmetadata.schema.api.configuration.apps.AppPrivateConfig; import org.openmetadata.schema.api.configuration.apps.AppsPrivateConfiguration; import org.openmetadata.schema.entity.app.App; import org.openmetadata.service.OpenMetadataApplicationConfig; +import org.openmetadata.service.apps.scheduler.AppScheduler; import org.openmetadata.service.exception.UnhandledServerException; +import org.openmetadata.service.jdbi3.AppRepository; import org.openmetadata.service.jdbi3.CollectionDAO; +import org.openmetadata.service.jdbi3.EntityRepository; import org.openmetadata.service.search.SearchRepository; +import org.openmetadata.service.util.JsonUtils; import org.openmetadata.service.util.OpenMetadataConnectionBuilder; +import org.quartz.JobDataMap; +import org.quartz.JobDetail; +import org.quartz.JobKey; +import org.quartz.SchedulerException; @Slf4j public class ApplicationHandler { @@ -21,10 +31,12 @@ public class ApplicationHandler { @Getter private static ApplicationHandler instance; private final OpenMetadataApplicationConfig config; private final AppsPrivateConfiguration privateConfiguration; + private final AppRepository appRepository; private ApplicationHandler(OpenMetadataApplicationConfig config) { this.config = config; this.privateConfiguration = config.getAppsPrivateConfiguration(); + this.appRepository = new AppRepository(); } public static void initialize(OpenMetadataApplicationConfig config) { @@ -37,7 +49,7 @@ public class ApplicationHandler { /** * Load the apps' OM configuration and private parameters */ - private void setAppRuntimeProperties(App app) { + public void setAppRuntimeProperties(App app) { app.setOpenMetadataServerConnection( new OpenMetadataConnectionBuilder(config, app.getBot().getName()).build()); @@ -93,7 +105,9 @@ public class ApplicationHandler { return resource; } - /** Load an App from its className and call its methods dynamically */ + /** + * Load an App from its className and call its methods dynamically + */ public void runMethodFromApplication( App app, CollectionDAO daoCollection, SearchRepository searchRepository, String methodName) { // Native Application @@ -112,4 +126,33 @@ public class ApplicationHandler { throw new AppException(e.getTargetException().getMessage()); } } + + public void migrateQuartzConfig(App application) throws SchedulerException { + JobDetail jobDetails = + AppScheduler.getInstance() + .getScheduler() + .getJobDetail(new JobKey(application.getName(), APPS_JOB_GROUP)); + if (jobDetails == null) { + return; + } + JobDataMap jobDataMap = jobDetails.getJobDataMap(); + if (jobDataMap == null) { + return; + } + String appInfo = jobDataMap.getString(APP_INFO_KEY); + if (appInfo == null) { + return; + } + LOG.info("migrating app quartz configuration for {}", application.getName()); + App updatedApp = JsonUtils.readOrConvertValue(appInfo, App.class); + updatedApp.setOpenMetadataServerConnection(null); + updatedApp.setPrivateConfiguration(null); + App currentApp = appRepository.getDao().findEntityById(application.getId()); + EntityRepository.EntityUpdater updater = + appRepository.getUpdater(currentApp, updatedApp, EntityRepository.Operation.PATCH); + updater.update(); + AppScheduler.getInstance().deleteScheduledApplication(updatedApp); + AppScheduler.getInstance().addApplicationSchedule(updatedApp); + LOG.info("migrated app configuration for {}", application.getName()); + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsReportApp.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsReportApp.java index adedbf76f77..dbf65c59579 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsReportApp.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsReportApp.java @@ -8,7 +8,7 @@ import static org.openmetadata.schema.entity.events.SubscriptionDestination.Subs import static org.openmetadata.schema.type.DataReportIndex.ENTITY_REPORT_DATA_INDEX; import static org.openmetadata.service.Entity.KPI; import static org.openmetadata.service.Entity.TEAM; -import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_INFO_KEY; +import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_ID_KEY; import static org.openmetadata.service.util.SubscriptionUtil.getAdminsData; import static org.openmetadata.service.util.Utilities.getMonthAndDateFromEpoch; @@ -25,6 +25,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.TreeMap; +import java.util.UUID; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; import org.openmetadata.common.utils.CommonUtil; @@ -70,9 +71,8 @@ public class DataInsightsReportApp extends AbstractNativeApplication { @Override public void execute(JobExecutionContext jobExecutionContext) { - App app = - JsonUtils.readOrConvertValue( - jobExecutionContext.getJobDetail().getJobDataMap().get(APP_INFO_KEY), App.class); + UUID appID = (UUID) jobExecutionContext.getJobDetail().getJobDataMap().get(APP_ID_KEY); + App app = collectionDAO.applicationDAO().findEntityById(appID); // Calculate time diff long currentTime = Instant.now().toEpochMilli(); long scheduleTime = currentTime - 604800000L; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java index 92ec89099f1..00ebaf06bc3 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java @@ -1,6 +1,6 @@ package org.openmetadata.service.apps.scheduler; -import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_INFO_KEY; +import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_ID_KEY; import java.util.HashMap; import java.util.Map; @@ -10,6 +10,7 @@ import org.openmetadata.schema.entity.app.App; import org.openmetadata.schema.entity.app.AppRunRecord; import org.openmetadata.schema.entity.app.FailureContext; import org.openmetadata.schema.entity.app.SuccessContext; +import org.openmetadata.service.apps.ApplicationHandler; import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.util.JsonUtils; import org.quartz.JobDataMap; @@ -34,36 +35,48 @@ public abstract class AbstractOmAppJobListener implements JobListener { @Override public void jobToBeExecuted(JobExecutionContext jobExecutionContext) { - String runType = (String) jobExecutionContext.getJobDetail().getJobDataMap().get("triggerType"); - App jobApp = - JsonUtils.readOrConvertValue( - jobExecutionContext.getJobDetail().getJobDataMap().get(APP_INFO_KEY), App.class); - JobDataMap dataMap = jobExecutionContext.getJobDetail().getJobDataMap(); - long jobStartTime = System.currentTimeMillis(); AppRunRecord runRecord; + long jobStartTime = System.currentTimeMillis(); + UUID appID = UUID.fromString("00000000-0000-0000-0000-000000000000"); + String runType = (String) jobExecutionContext.getJobDetail().getJobDataMap().get("triggerType"); boolean update = false; - if (jobExecutionContext.isRecovering()) { - runRecord = - JsonUtils.readValue( - collectionDAO.appExtensionTimeSeriesDao().getLatestAppRun(jobApp.getId()), - AppRunRecord.class); - update = true; - } else { + try { + App jobApp = collectionDAO.applicationDAO().findEntityById(appID); + ApplicationHandler.getInstance().setAppRuntimeProperties(jobApp); + JobDataMap dataMap = jobExecutionContext.getJobDetail().getJobDataMap(); + if (jobExecutionContext.isRecovering()) { + runRecord = + JsonUtils.readValue( + collectionDAO.appExtensionTimeSeriesDao().getLatestAppRun(jobApp.getId()), + AppRunRecord.class); + update = true; + } else { + runRecord = + new AppRunRecord() + .withAppId(jobApp.getId()) + .withStartTime(jobStartTime) + .withTimestamp(jobStartTime) + .withRunType(runType) + .withStatus(AppRunRecord.Status.RUNNING) + .withScheduleInfo(jobApp.getAppSchedule()); + } + // Put the Context in the Job Data Map + dataMap.put(SCHEDULED_APP_RUN_EXTENSION, JsonUtils.pojoToJson(runRecord)); + } catch (Exception ex) { + Map failure = new HashMap<>(); + failure.put("message", "TriggerFailed:" + ex.getMessage()); + failure.put("jobStackTrace", ExceptionUtils.getStackTrace(ex)); runRecord = new AppRunRecord() - .withAppId(jobApp.getId()) + .withAppId(appID) + .withRunType(runType) + .withStatus(AppRunRecord.Status.FAILED) .withStartTime(jobStartTime) .withTimestamp(jobStartTime) - .withRunType(runType) - .withStatus(AppRunRecord.Status.RUNNING) - .withScheduleInfo(jobApp.getAppSchedule()); + .withFailureContext(new FailureContext().withAdditionalProperty("failure", failure)); } - // Put the Context in the Job Data Map - dataMap.put(SCHEDULED_APP_RUN_EXTENSION, JsonUtils.pojoToJson(runRecord)); - // Insert new Record Run pushApplicationStatusUpdates(jobExecutionContext, runRecord, update); - this.doJobToBeExecuted(jobExecutionContext); } @@ -126,10 +139,8 @@ public abstract class AbstractOmAppJobListener implements JobListener { dataMap.put(SCHEDULED_APP_RUN_EXTENSION, JsonUtils.pojoToJson(runRecord)); // Push Updates to the Database - App jobApp = - JsonUtils.readOrConvertValue( - context.getJobDetail().getJobDataMap().get(APP_INFO_KEY), App.class); - updateStatus(jobApp.getId(), runRecord, update); + UUID appId = (UUID) context.getJobDetail().getJobDataMap().get(APP_ID_KEY); + updateStatus(appId, runRecord, update); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java index cb02c3f7f96..9aa7085d9e5 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java @@ -25,7 +25,6 @@ import org.openmetadata.service.exception.UnhandledServerException; import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.jdbi3.locator.ConnectionType; import org.openmetadata.service.search.SearchRepository; -import org.openmetadata.service.util.JsonUtils; import org.quartz.CronScheduleBuilder; import org.quartz.JobBuilder; import org.quartz.JobDataMap; @@ -67,7 +66,7 @@ public class AppScheduler { public static final String APPS_JOB_GROUP = "OMAppsJobGroup"; public static final String APPS_TRIGGER_GROUP = "OMAppsJobGroup"; public static final String APP_INFO_KEY = "applicationInfoKey"; - public static final String SEARCH_CLIENT_KEY = "searchClientKey"; + public static final String APP_ID_KEY = "appID"; private static AppScheduler instance; private static volatile boolean initialized = false; @Getter private final Scheduler scheduler; @@ -170,7 +169,7 @@ public class AppScheduler { private JobDetail jobBuilder(App app, String jobIdentity) throws ClassNotFoundException { JobDataMap dataMap = new JobDataMap(); - dataMap.put(APP_INFO_KEY, JsonUtils.pojoToJson(app)); + dataMap.put(APP_ID_KEY, app.getId()); dataMap.put("triggerType", app.getAppSchedule().getScheduleTimeline().value()); Class clz = (Class) Class.forName(app.getClassName()); @@ -241,6 +240,7 @@ public class AppScheduler { JobDetail newJobDetail = jobBuilder(application, String.format("%s-%s", application.getName(), ON_DEMAND_JOB)); newJobDetail.getJobDataMap().put("triggerType", ON_DEMAND_JOB); + newJobDetail.getJobDataMap().put(APP_ID_KEY, application.getId()); Trigger trigger = TriggerBuilder.newTrigger() .withIdentity( diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/AppRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/AppRepository.java index d8d2dd4144f..a409de0392f 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/AppRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/AppRepository.java @@ -136,15 +136,14 @@ public class AppRepository extends EntityRepository { @Override public void storeEntity(App entity, boolean update) { - EntityReference botUserRef = entity.getBot(); EntityReference ownerRef = entity.getOwner(); - entity.withBot(null).withOwner(null); + entity.withOwner(null); // Store store(entity, update); // Restore entity fields - entity.withBot(botUserRef).withOwner(ownerRef); + entity.withOwner(ownerRef); } public EntityReference getBotUser(App application) { @@ -228,6 +227,7 @@ public class AppRepository extends EntityRepository { recordChange( "appConfiguration", original.getAppConfiguration(), updated.getAppConfiguration()); recordChange("appSchedule", original.getAppSchedule(), updated.getAppSchedule()); + recordChange("bot", original.getBot(), updated.getBot()); } } }