diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/AbstractNativeApplication.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/AbstractNativeApplication.java index ab7ddaa8b76..8660eb2ed9f 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/AbstractNativeApplication.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/AbstractNativeApplication.java @@ -1,10 +1,11 @@ package org.openmetadata.service.apps; import static org.openmetadata.service.apps.scheduler.AbstractOmAppJobListener.JOB_LISTENER_NAME; -import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_INFO_KEY; +import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_ID_KEY; import static org.openmetadata.service.exception.CatalogExceptionMessage.LIVE_APP_SCHEDULE_ERR; import java.util.List; +import java.util.UUID; import lombok.Getter; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -69,6 +70,14 @@ public class AbstractNativeApplication implements NativeApplication { } if (app.getAppType() == AppType.Internal && app.getScheduleType().equals(ScheduleType.Scheduled)) { + try { + ApplicationHandler.getInstance().migrateQuartzConfig(app); + } catch (SchedulerException e) { + throw AppException.byMessage( + "ApplicationHandler", + "SchedulerError", + "Error while migrating application configuration: " + app.getName()); + } scheduleInternal(); } else if (app.getAppType() == AppType.External && app.getScheduleType().equals(ScheduleType.Scheduled)) { @@ -203,9 +212,9 @@ public class AbstractNativeApplication implements NativeApplication { @Override public void execute(JobExecutionContext jobExecutionContext) { // This is the part of the code that is executed by the scheduler - App jobApp = - JsonUtils.readOrConvertValue( - jobExecutionContext.getJobDetail().getJobDataMap().get(APP_INFO_KEY), App.class); + UUID appID = (UUID) jobExecutionContext.getJobDetail().getJobDataMap().get(APP_ID_KEY); + App jobApp = collectionDAO.applicationDAO().findEntityById(appID); + ApplicationHandler.getInstance().setAppRuntimeProperties(jobApp); // Initialise the Application this.init(jobApp); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/ApplicationHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/ApplicationHandler.java index 24ed7b716da..7b91e33856b 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/ApplicationHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/ApplicationHandler.java @@ -1,6 +1,8 @@ package org.openmetadata.service.apps; import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; +import static org.openmetadata.service.apps.scheduler.AppScheduler.APPS_JOB_GROUP; +import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_INFO_KEY; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -10,10 +12,18 @@ import org.openmetadata.schema.api.configuration.apps.AppPrivateConfig; import org.openmetadata.schema.api.configuration.apps.AppsPrivateConfiguration; import org.openmetadata.schema.entity.app.App; import org.openmetadata.service.OpenMetadataApplicationConfig; +import org.openmetadata.service.apps.scheduler.AppScheduler; import org.openmetadata.service.exception.UnhandledServerException; +import org.openmetadata.service.jdbi3.AppRepository; import org.openmetadata.service.jdbi3.CollectionDAO; +import org.openmetadata.service.jdbi3.EntityRepository; import org.openmetadata.service.search.SearchRepository; +import org.openmetadata.service.util.JsonUtils; import org.openmetadata.service.util.OpenMetadataConnectionBuilder; +import org.quartz.JobDataMap; +import org.quartz.JobDetail; +import org.quartz.JobKey; +import org.quartz.SchedulerException; @Slf4j public class ApplicationHandler { @@ -21,10 +31,12 @@ public class ApplicationHandler { @Getter private static ApplicationHandler instance; private final OpenMetadataApplicationConfig config; private final AppsPrivateConfiguration privateConfiguration; + private final AppRepository appRepository; private ApplicationHandler(OpenMetadataApplicationConfig config) { this.config = config; this.privateConfiguration = config.getAppsPrivateConfiguration(); + this.appRepository = new AppRepository(); } public static void initialize(OpenMetadataApplicationConfig config) { @@ -37,7 +49,7 @@ public class ApplicationHandler { /** * Load the apps' OM configuration and private parameters */ - private void setAppRuntimeProperties(App app) { + public void setAppRuntimeProperties(App app) { app.setOpenMetadataServerConnection( new OpenMetadataConnectionBuilder(config, app.getBot().getName()).build()); @@ -93,7 +105,9 @@ public class ApplicationHandler { return resource; } - /** Load an App from its className and call its methods dynamically */ + /** + * Load an App from its className and call its methods dynamically + */ public void runMethodFromApplication( App app, CollectionDAO daoCollection, SearchRepository searchRepository, String methodName) { // Native Application @@ -112,4 +126,33 @@ public class ApplicationHandler { throw new AppException(e.getTargetException().getMessage()); } } + + public void migrateQuartzConfig(App application) throws SchedulerException { + JobDetail jobDetails = + AppScheduler.getInstance() + .getScheduler() + .getJobDetail(new JobKey(application.getName(), APPS_JOB_GROUP)); + if (jobDetails == null) { + return; + } + JobDataMap jobDataMap = jobDetails.getJobDataMap(); + if (jobDataMap == null) { + return; + } + String appInfo = jobDataMap.getString(APP_INFO_KEY); + if (appInfo == null) { + return; + } + LOG.info("migrating app quartz configuration for {}", application.getName()); + App updatedApp = JsonUtils.readOrConvertValue(appInfo, App.class); + updatedApp.setOpenMetadataServerConnection(null); + updatedApp.setPrivateConfiguration(null); + App currentApp = appRepository.getDao().findEntityById(application.getId()); + EntityRepository.EntityUpdater updater = + appRepository.getUpdater(currentApp, updatedApp, EntityRepository.Operation.PATCH); + updater.update(); + AppScheduler.getInstance().deleteScheduledApplication(updatedApp); + AppScheduler.getInstance().addApplicationSchedule(updatedApp); + LOG.info("migrated app configuration for {}", application.getName()); + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsReportApp.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsReportApp.java index adedbf76f77..dbf65c59579 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsReportApp.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsReportApp.java @@ -8,7 +8,7 @@ import static org.openmetadata.schema.entity.events.SubscriptionDestination.Subs import static org.openmetadata.schema.type.DataReportIndex.ENTITY_REPORT_DATA_INDEX; import static org.openmetadata.service.Entity.KPI; import static org.openmetadata.service.Entity.TEAM; -import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_INFO_KEY; +import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_ID_KEY; import static org.openmetadata.service.util.SubscriptionUtil.getAdminsData; import static org.openmetadata.service.util.Utilities.getMonthAndDateFromEpoch; @@ -25,6 +25,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.TreeMap; +import java.util.UUID; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; import org.openmetadata.common.utils.CommonUtil; @@ -70,9 +71,8 @@ public class DataInsightsReportApp extends AbstractNativeApplication { @Override public void execute(JobExecutionContext jobExecutionContext) { - App app = - JsonUtils.readOrConvertValue( - jobExecutionContext.getJobDetail().getJobDataMap().get(APP_INFO_KEY), App.class); + UUID appID = (UUID) jobExecutionContext.getJobDetail().getJobDataMap().get(APP_ID_KEY); + App app = collectionDAO.applicationDAO().findEntityById(appID); // Calculate time diff long currentTime = Instant.now().toEpochMilli(); long scheduleTime = currentTime - 604800000L; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java index 92ec89099f1..00ebaf06bc3 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java @@ -1,6 +1,6 @@ package org.openmetadata.service.apps.scheduler; -import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_INFO_KEY; +import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_ID_KEY; import java.util.HashMap; import java.util.Map; @@ -10,6 +10,7 @@ import org.openmetadata.schema.entity.app.App; import org.openmetadata.schema.entity.app.AppRunRecord; import org.openmetadata.schema.entity.app.FailureContext; import org.openmetadata.schema.entity.app.SuccessContext; +import org.openmetadata.service.apps.ApplicationHandler; import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.util.JsonUtils; import org.quartz.JobDataMap; @@ -34,36 +35,48 @@ public abstract class AbstractOmAppJobListener implements JobListener { @Override public void jobToBeExecuted(JobExecutionContext jobExecutionContext) { - String runType = (String) jobExecutionContext.getJobDetail().getJobDataMap().get("triggerType"); - App jobApp = - JsonUtils.readOrConvertValue( - jobExecutionContext.getJobDetail().getJobDataMap().get(APP_INFO_KEY), App.class); - JobDataMap dataMap = jobExecutionContext.getJobDetail().getJobDataMap(); - long jobStartTime = System.currentTimeMillis(); AppRunRecord runRecord; + long jobStartTime = System.currentTimeMillis(); + UUID appID = UUID.fromString("00000000-0000-0000-0000-000000000000"); + String runType = (String) jobExecutionContext.getJobDetail().getJobDataMap().get("triggerType"); boolean update = false; - if (jobExecutionContext.isRecovering()) { - runRecord = - JsonUtils.readValue( - collectionDAO.appExtensionTimeSeriesDao().getLatestAppRun(jobApp.getId()), - AppRunRecord.class); - update = true; - } else { + try { + App jobApp = collectionDAO.applicationDAO().findEntityById(appID); + ApplicationHandler.getInstance().setAppRuntimeProperties(jobApp); + JobDataMap dataMap = jobExecutionContext.getJobDetail().getJobDataMap(); + if (jobExecutionContext.isRecovering()) { + runRecord = + JsonUtils.readValue( + collectionDAO.appExtensionTimeSeriesDao().getLatestAppRun(jobApp.getId()), + AppRunRecord.class); + update = true; + } else { + runRecord = + new AppRunRecord() + .withAppId(jobApp.getId()) + .withStartTime(jobStartTime) + .withTimestamp(jobStartTime) + .withRunType(runType) + .withStatus(AppRunRecord.Status.RUNNING) + .withScheduleInfo(jobApp.getAppSchedule()); + } + // Put the Context in the Job Data Map + dataMap.put(SCHEDULED_APP_RUN_EXTENSION, JsonUtils.pojoToJson(runRecord)); + } catch (Exception ex) { + Map failure = new HashMap<>(); + failure.put("message", "TriggerFailed:" + ex.getMessage()); + failure.put("jobStackTrace", ExceptionUtils.getStackTrace(ex)); runRecord = new AppRunRecord() - .withAppId(jobApp.getId()) + .withAppId(appID) + .withRunType(runType) + .withStatus(AppRunRecord.Status.FAILED) .withStartTime(jobStartTime) .withTimestamp(jobStartTime) - .withRunType(runType) - .withStatus(AppRunRecord.Status.RUNNING) - .withScheduleInfo(jobApp.getAppSchedule()); + .withFailureContext(new FailureContext().withAdditionalProperty("failure", failure)); } - // Put the Context in the Job Data Map - dataMap.put(SCHEDULED_APP_RUN_EXTENSION, JsonUtils.pojoToJson(runRecord)); - // Insert new Record Run pushApplicationStatusUpdates(jobExecutionContext, runRecord, update); - this.doJobToBeExecuted(jobExecutionContext); } @@ -126,10 +139,8 @@ public abstract class AbstractOmAppJobListener implements JobListener { dataMap.put(SCHEDULED_APP_RUN_EXTENSION, JsonUtils.pojoToJson(runRecord)); // Push Updates to the Database - App jobApp = - JsonUtils.readOrConvertValue( - context.getJobDetail().getJobDataMap().get(APP_INFO_KEY), App.class); - updateStatus(jobApp.getId(), runRecord, update); + UUID appId = (UUID) context.getJobDetail().getJobDataMap().get(APP_ID_KEY); + updateStatus(appId, runRecord, update); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java index cb02c3f7f96..9aa7085d9e5 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java @@ -25,7 +25,6 @@ import org.openmetadata.service.exception.UnhandledServerException; import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.jdbi3.locator.ConnectionType; import org.openmetadata.service.search.SearchRepository; -import org.openmetadata.service.util.JsonUtils; import org.quartz.CronScheduleBuilder; import org.quartz.JobBuilder; import org.quartz.JobDataMap; @@ -67,7 +66,7 @@ public class AppScheduler { public static final String APPS_JOB_GROUP = "OMAppsJobGroup"; public static final String APPS_TRIGGER_GROUP = "OMAppsJobGroup"; public static final String APP_INFO_KEY = "applicationInfoKey"; - public static final String SEARCH_CLIENT_KEY = "searchClientKey"; + public static final String APP_ID_KEY = "appID"; private static AppScheduler instance; private static volatile boolean initialized = false; @Getter private final Scheduler scheduler; @@ -170,7 +169,7 @@ public class AppScheduler { private JobDetail jobBuilder(App app, String jobIdentity) throws ClassNotFoundException { JobDataMap dataMap = new JobDataMap(); - dataMap.put(APP_INFO_KEY, JsonUtils.pojoToJson(app)); + dataMap.put(APP_ID_KEY, app.getId()); dataMap.put("triggerType", app.getAppSchedule().getScheduleTimeline().value()); Class clz = (Class) Class.forName(app.getClassName()); @@ -241,6 +240,7 @@ public class AppScheduler { JobDetail newJobDetail = jobBuilder(application, String.format("%s-%s", application.getName(), ON_DEMAND_JOB)); newJobDetail.getJobDataMap().put("triggerType", ON_DEMAND_JOB); + newJobDetail.getJobDataMap().put(APP_ID_KEY, application.getId()); Trigger trigger = TriggerBuilder.newTrigger() .withIdentity( diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/AppRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/AppRepository.java index d8d2dd4144f..a409de0392f 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/AppRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/AppRepository.java @@ -136,15 +136,14 @@ public class AppRepository extends EntityRepository { @Override public void storeEntity(App entity, boolean update) { - EntityReference botUserRef = entity.getBot(); EntityReference ownerRef = entity.getOwner(); - entity.withBot(null).withOwner(null); + entity.withOwner(null); // Store store(entity, update); // Restore entity fields - entity.withBot(botUserRef).withOwner(ownerRef); + entity.withOwner(ownerRef); } public EntityReference getBotUser(App application) { @@ -228,6 +227,7 @@ public class AppRepository extends EntityRepository { recordChange( "appConfiguration", original.getAppConfiguration(), updated.getAppConfiguration()); recordChange("appSchedule", original.getAppSchedule(), updated.getAppSchedule()); + recordChange("bot", original.getBot(), updated.getBot()); } } }