mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-07-24 17:59:52 +00:00
* feat(apps): remove data from quartz 1. Remove data from quartz application. Store only app id. 2. Added exception handling when trying to trigger an app. 3. Save bot info in the app data in DB. 4. Added runtime properties in AbstractNativeApplication.execute.
This commit is contained in:
parent
f28f196c34
commit
de38ab53c0
@ -1,10 +1,11 @@
|
||||
package org.openmetadata.service.apps;
|
||||
|
||||
import static org.openmetadata.service.apps.scheduler.AbstractOmAppJobListener.JOB_LISTENER_NAME;
|
||||
import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_INFO_KEY;
|
||||
import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_ID_KEY;
|
||||
import static org.openmetadata.service.exception.CatalogExceptionMessage.LIVE_APP_SCHEDULE_ERR;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import lombok.Getter;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@ -69,6 +70,14 @@ public class AbstractNativeApplication implements NativeApplication {
|
||||
}
|
||||
if (app.getAppType() == AppType.Internal
|
||||
&& app.getScheduleType().equals(ScheduleType.Scheduled)) {
|
||||
try {
|
||||
ApplicationHandler.getInstance().migrateQuartzConfig(app);
|
||||
} catch (SchedulerException e) {
|
||||
throw AppException.byMessage(
|
||||
"ApplicationHandler",
|
||||
"SchedulerError",
|
||||
"Error while migrating application configuration: " + app.getName());
|
||||
}
|
||||
scheduleInternal();
|
||||
} else if (app.getAppType() == AppType.External
|
||||
&& app.getScheduleType().equals(ScheduleType.Scheduled)) {
|
||||
@ -203,9 +212,9 @@ public class AbstractNativeApplication implements NativeApplication {
|
||||
@Override
|
||||
public void execute(JobExecutionContext jobExecutionContext) {
|
||||
// This is the part of the code that is executed by the scheduler
|
||||
App jobApp =
|
||||
JsonUtils.readOrConvertValue(
|
||||
jobExecutionContext.getJobDetail().getJobDataMap().get(APP_INFO_KEY), App.class);
|
||||
UUID appID = (UUID) jobExecutionContext.getJobDetail().getJobDataMap().get(APP_ID_KEY);
|
||||
App jobApp = collectionDAO.applicationDAO().findEntityById(appID);
|
||||
ApplicationHandler.getInstance().setAppRuntimeProperties(jobApp);
|
||||
// Initialise the Application
|
||||
this.init(jobApp);
|
||||
|
||||
|
@ -1,6 +1,8 @@
|
||||
package org.openmetadata.service.apps;
|
||||
|
||||
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
|
||||
import static org.openmetadata.service.apps.scheduler.AppScheduler.APPS_JOB_GROUP;
|
||||
import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_INFO_KEY;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
@ -10,10 +12,18 @@ import org.openmetadata.schema.api.configuration.apps.AppPrivateConfig;
|
||||
import org.openmetadata.schema.api.configuration.apps.AppsPrivateConfiguration;
|
||||
import org.openmetadata.schema.entity.app.App;
|
||||
import org.openmetadata.service.OpenMetadataApplicationConfig;
|
||||
import org.openmetadata.service.apps.scheduler.AppScheduler;
|
||||
import org.openmetadata.service.exception.UnhandledServerException;
|
||||
import org.openmetadata.service.jdbi3.AppRepository;
|
||||
import org.openmetadata.service.jdbi3.CollectionDAO;
|
||||
import org.openmetadata.service.jdbi3.EntityRepository;
|
||||
import org.openmetadata.service.search.SearchRepository;
|
||||
import org.openmetadata.service.util.JsonUtils;
|
||||
import org.openmetadata.service.util.OpenMetadataConnectionBuilder;
|
||||
import org.quartz.JobDataMap;
|
||||
import org.quartz.JobDetail;
|
||||
import org.quartz.JobKey;
|
||||
import org.quartz.SchedulerException;
|
||||
|
||||
@Slf4j
|
||||
public class ApplicationHandler {
|
||||
@ -21,10 +31,12 @@ public class ApplicationHandler {
|
||||
@Getter private static ApplicationHandler instance;
|
||||
private final OpenMetadataApplicationConfig config;
|
||||
private final AppsPrivateConfiguration privateConfiguration;
|
||||
private final AppRepository appRepository;
|
||||
|
||||
private ApplicationHandler(OpenMetadataApplicationConfig config) {
|
||||
this.config = config;
|
||||
this.privateConfiguration = config.getAppsPrivateConfiguration();
|
||||
this.appRepository = new AppRepository();
|
||||
}
|
||||
|
||||
public static void initialize(OpenMetadataApplicationConfig config) {
|
||||
@ -37,7 +49,7 @@ public class ApplicationHandler {
|
||||
/**
|
||||
* Load the apps' OM configuration and private parameters
|
||||
*/
|
||||
private void setAppRuntimeProperties(App app) {
|
||||
public void setAppRuntimeProperties(App app) {
|
||||
app.setOpenMetadataServerConnection(
|
||||
new OpenMetadataConnectionBuilder(config, app.getBot().getName()).build());
|
||||
|
||||
@ -93,7 +105,9 @@ public class ApplicationHandler {
|
||||
return resource;
|
||||
}
|
||||
|
||||
/** Load an App from its className and call its methods dynamically */
|
||||
/**
|
||||
* Load an App from its className and call its methods dynamically
|
||||
*/
|
||||
public void runMethodFromApplication(
|
||||
App app, CollectionDAO daoCollection, SearchRepository searchRepository, String methodName) {
|
||||
// Native Application
|
||||
@ -112,4 +126,33 @@ public class ApplicationHandler {
|
||||
throw new AppException(e.getTargetException().getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public void migrateQuartzConfig(App application) throws SchedulerException {
|
||||
JobDetail jobDetails =
|
||||
AppScheduler.getInstance()
|
||||
.getScheduler()
|
||||
.getJobDetail(new JobKey(application.getName(), APPS_JOB_GROUP));
|
||||
if (jobDetails == null) {
|
||||
return;
|
||||
}
|
||||
JobDataMap jobDataMap = jobDetails.getJobDataMap();
|
||||
if (jobDataMap == null) {
|
||||
return;
|
||||
}
|
||||
String appInfo = jobDataMap.getString(APP_INFO_KEY);
|
||||
if (appInfo == null) {
|
||||
return;
|
||||
}
|
||||
LOG.info("migrating app quartz configuration for {}", application.getName());
|
||||
App updatedApp = JsonUtils.readOrConvertValue(appInfo, App.class);
|
||||
updatedApp.setOpenMetadataServerConnection(null);
|
||||
updatedApp.setPrivateConfiguration(null);
|
||||
App currentApp = appRepository.getDao().findEntityById(application.getId());
|
||||
EntityRepository<App>.EntityUpdater updater =
|
||||
appRepository.getUpdater(currentApp, updatedApp, EntityRepository.Operation.PATCH);
|
||||
updater.update();
|
||||
AppScheduler.getInstance().deleteScheduledApplication(updatedApp);
|
||||
AppScheduler.getInstance().addApplicationSchedule(updatedApp);
|
||||
LOG.info("migrated app configuration for {}", application.getName());
|
||||
}
|
||||
}
|
||||
|
@ -8,7 +8,7 @@ import static org.openmetadata.schema.entity.events.SubscriptionDestination.Subs
|
||||
import static org.openmetadata.schema.type.DataReportIndex.ENTITY_REPORT_DATA_INDEX;
|
||||
import static org.openmetadata.service.Entity.KPI;
|
||||
import static org.openmetadata.service.Entity.TEAM;
|
||||
import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_INFO_KEY;
|
||||
import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_ID_KEY;
|
||||
import static org.openmetadata.service.util.SubscriptionUtil.getAdminsData;
|
||||
import static org.openmetadata.service.util.Utilities.getMonthAndDateFromEpoch;
|
||||
|
||||
@ -25,6 +25,7 @@ import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.UUID;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.openmetadata.common.utils.CommonUtil;
|
||||
@ -70,9 +71,8 @@ public class DataInsightsReportApp extends AbstractNativeApplication {
|
||||
|
||||
@Override
|
||||
public void execute(JobExecutionContext jobExecutionContext) {
|
||||
App app =
|
||||
JsonUtils.readOrConvertValue(
|
||||
jobExecutionContext.getJobDetail().getJobDataMap().get(APP_INFO_KEY), App.class);
|
||||
UUID appID = (UUID) jobExecutionContext.getJobDetail().getJobDataMap().get(APP_ID_KEY);
|
||||
App app = collectionDAO.applicationDAO().findEntityById(appID);
|
||||
// Calculate time diff
|
||||
long currentTime = Instant.now().toEpochMilli();
|
||||
long scheduleTime = currentTime - 604800000L;
|
||||
|
@ -1,6 +1,6 @@
|
||||
package org.openmetadata.service.apps.scheduler;
|
||||
|
||||
import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_INFO_KEY;
|
||||
import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_ID_KEY;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
@ -10,6 +10,7 @@ import org.openmetadata.schema.entity.app.App;
|
||||
import org.openmetadata.schema.entity.app.AppRunRecord;
|
||||
import org.openmetadata.schema.entity.app.FailureContext;
|
||||
import org.openmetadata.schema.entity.app.SuccessContext;
|
||||
import org.openmetadata.service.apps.ApplicationHandler;
|
||||
import org.openmetadata.service.jdbi3.CollectionDAO;
|
||||
import org.openmetadata.service.util.JsonUtils;
|
||||
import org.quartz.JobDataMap;
|
||||
@ -34,36 +35,48 @@ public abstract class AbstractOmAppJobListener implements JobListener {
|
||||
|
||||
@Override
|
||||
public void jobToBeExecuted(JobExecutionContext jobExecutionContext) {
|
||||
String runType = (String) jobExecutionContext.getJobDetail().getJobDataMap().get("triggerType");
|
||||
App jobApp =
|
||||
JsonUtils.readOrConvertValue(
|
||||
jobExecutionContext.getJobDetail().getJobDataMap().get(APP_INFO_KEY), App.class);
|
||||
JobDataMap dataMap = jobExecutionContext.getJobDetail().getJobDataMap();
|
||||
long jobStartTime = System.currentTimeMillis();
|
||||
AppRunRecord runRecord;
|
||||
long jobStartTime = System.currentTimeMillis();
|
||||
UUID appID = UUID.fromString("00000000-0000-0000-0000-000000000000");
|
||||
String runType = (String) jobExecutionContext.getJobDetail().getJobDataMap().get("triggerType");
|
||||
boolean update = false;
|
||||
if (jobExecutionContext.isRecovering()) {
|
||||
runRecord =
|
||||
JsonUtils.readValue(
|
||||
collectionDAO.appExtensionTimeSeriesDao().getLatestAppRun(jobApp.getId()),
|
||||
AppRunRecord.class);
|
||||
update = true;
|
||||
} else {
|
||||
try {
|
||||
App jobApp = collectionDAO.applicationDAO().findEntityById(appID);
|
||||
ApplicationHandler.getInstance().setAppRuntimeProperties(jobApp);
|
||||
JobDataMap dataMap = jobExecutionContext.getJobDetail().getJobDataMap();
|
||||
if (jobExecutionContext.isRecovering()) {
|
||||
runRecord =
|
||||
JsonUtils.readValue(
|
||||
collectionDAO.appExtensionTimeSeriesDao().getLatestAppRun(jobApp.getId()),
|
||||
AppRunRecord.class);
|
||||
update = true;
|
||||
} else {
|
||||
runRecord =
|
||||
new AppRunRecord()
|
||||
.withAppId(jobApp.getId())
|
||||
.withStartTime(jobStartTime)
|
||||
.withTimestamp(jobStartTime)
|
||||
.withRunType(runType)
|
||||
.withStatus(AppRunRecord.Status.RUNNING)
|
||||
.withScheduleInfo(jobApp.getAppSchedule());
|
||||
}
|
||||
// Put the Context in the Job Data Map
|
||||
dataMap.put(SCHEDULED_APP_RUN_EXTENSION, JsonUtils.pojoToJson(runRecord));
|
||||
} catch (Exception ex) {
|
||||
Map<String, Object> failure = new HashMap<>();
|
||||
failure.put("message", "TriggerFailed:" + ex.getMessage());
|
||||
failure.put("jobStackTrace", ExceptionUtils.getStackTrace(ex));
|
||||
runRecord =
|
||||
new AppRunRecord()
|
||||
.withAppId(jobApp.getId())
|
||||
.withAppId(appID)
|
||||
.withRunType(runType)
|
||||
.withStatus(AppRunRecord.Status.FAILED)
|
||||
.withStartTime(jobStartTime)
|
||||
.withTimestamp(jobStartTime)
|
||||
.withRunType(runType)
|
||||
.withStatus(AppRunRecord.Status.RUNNING)
|
||||
.withScheduleInfo(jobApp.getAppSchedule());
|
||||
.withFailureContext(new FailureContext().withAdditionalProperty("failure", failure));
|
||||
}
|
||||
// Put the Context in the Job Data Map
|
||||
dataMap.put(SCHEDULED_APP_RUN_EXTENSION, JsonUtils.pojoToJson(runRecord));
|
||||
|
||||
// Insert new Record Run
|
||||
pushApplicationStatusUpdates(jobExecutionContext, runRecord, update);
|
||||
|
||||
this.doJobToBeExecuted(jobExecutionContext);
|
||||
}
|
||||
|
||||
@ -126,10 +139,8 @@ public abstract class AbstractOmAppJobListener implements JobListener {
|
||||
dataMap.put(SCHEDULED_APP_RUN_EXTENSION, JsonUtils.pojoToJson(runRecord));
|
||||
|
||||
// Push Updates to the Database
|
||||
App jobApp =
|
||||
JsonUtils.readOrConvertValue(
|
||||
context.getJobDetail().getJobDataMap().get(APP_INFO_KEY), App.class);
|
||||
updateStatus(jobApp.getId(), runRecord, update);
|
||||
UUID appId = (UUID) context.getJobDetail().getJobDataMap().get(APP_ID_KEY);
|
||||
updateStatus(appId, runRecord, update);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -25,7 +25,6 @@ import org.openmetadata.service.exception.UnhandledServerException;
|
||||
import org.openmetadata.service.jdbi3.CollectionDAO;
|
||||
import org.openmetadata.service.jdbi3.locator.ConnectionType;
|
||||
import org.openmetadata.service.search.SearchRepository;
|
||||
import org.openmetadata.service.util.JsonUtils;
|
||||
import org.quartz.CronScheduleBuilder;
|
||||
import org.quartz.JobBuilder;
|
||||
import org.quartz.JobDataMap;
|
||||
@ -67,7 +66,7 @@ public class AppScheduler {
|
||||
public static final String APPS_JOB_GROUP = "OMAppsJobGroup";
|
||||
public static final String APPS_TRIGGER_GROUP = "OMAppsJobGroup";
|
||||
public static final String APP_INFO_KEY = "applicationInfoKey";
|
||||
public static final String SEARCH_CLIENT_KEY = "searchClientKey";
|
||||
public static final String APP_ID_KEY = "appID";
|
||||
private static AppScheduler instance;
|
||||
private static volatile boolean initialized = false;
|
||||
@Getter private final Scheduler scheduler;
|
||||
@ -170,7 +169,7 @@ public class AppScheduler {
|
||||
|
||||
private JobDetail jobBuilder(App app, String jobIdentity) throws ClassNotFoundException {
|
||||
JobDataMap dataMap = new JobDataMap();
|
||||
dataMap.put(APP_INFO_KEY, JsonUtils.pojoToJson(app));
|
||||
dataMap.put(APP_ID_KEY, app.getId());
|
||||
dataMap.put("triggerType", app.getAppSchedule().getScheduleTimeline().value());
|
||||
Class<? extends NativeApplication> clz =
|
||||
(Class<? extends NativeApplication>) Class.forName(app.getClassName());
|
||||
@ -241,6 +240,7 @@ public class AppScheduler {
|
||||
JobDetail newJobDetail =
|
||||
jobBuilder(application, String.format("%s-%s", application.getName(), ON_DEMAND_JOB));
|
||||
newJobDetail.getJobDataMap().put("triggerType", ON_DEMAND_JOB);
|
||||
newJobDetail.getJobDataMap().put(APP_ID_KEY, application.getId());
|
||||
Trigger trigger =
|
||||
TriggerBuilder.newTrigger()
|
||||
.withIdentity(
|
||||
|
@ -136,15 +136,14 @@ public class AppRepository extends EntityRepository<App> {
|
||||
|
||||
@Override
|
||||
public void storeEntity(App entity, boolean update) {
|
||||
EntityReference botUserRef = entity.getBot();
|
||||
EntityReference ownerRef = entity.getOwner();
|
||||
entity.withBot(null).withOwner(null);
|
||||
entity.withOwner(null);
|
||||
|
||||
// Store
|
||||
store(entity, update);
|
||||
|
||||
// Restore entity fields
|
||||
entity.withBot(botUserRef).withOwner(ownerRef);
|
||||
entity.withOwner(ownerRef);
|
||||
}
|
||||
|
||||
public EntityReference getBotUser(App application) {
|
||||
@ -228,6 +227,7 @@ public class AppRepository extends EntityRepository<App> {
|
||||
recordChange(
|
||||
"appConfiguration", original.getAppConfiguration(), updated.getAppConfiguration());
|
||||
recordChange("appSchedule", original.getAppSchedule(), updated.getAppSchedule());
|
||||
recordChange("bot", original.getBot(), updated.getBot());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user