From 3b5218b923154df83b2cc117807f8cec7adf4a0b Mon Sep 17 00:00:00 2001 From: Imri Paran Date: Wed, 19 Feb 2025 16:23:46 +0100 Subject: [PATCH] MINOR: feat(event-subscriptions): event consumer extension (#19591) * feat(event-subscriptions): event consumer extension - implemented DI for event consumers - added custom config for event consumers - initialize the event scheduler along with application - added workflow custom type * - removed the ConsumerService as it is specific for each consumer implementation - added a "custom" alert type * wip * cleanup * reverted pom.xml changes * reverted pom.xml changes * reverted pom.xml changes * removed abstract layer AbstractOmAppJobListener * fixed import AbstractOmAppJobListener -> OmAppJobListener * fixed constructor * - add event subscription if it doesnt exist (set operation) - add deleteEventSubscription * - use patch mode when updating app.eventSubscriptions * added docs for DIContainer * aligned py tests with new automation schema * added AppRunRecord.properties * format * changes after merge * generated missing ts * use openmetadata config builder instead of the config * add json logic fixes for reverse metadata * ref: reverted changes ro workflows they will be part of https://github.com/open-metadata/OpenMetadata/pull/19723 * ref: reverted changes to workflows they will be part of https://github.com/open-metadata/OpenMetadata/pull/19723 * reverted ts changes * reverted ts changes * reverted ts changes * fix: initialization sequence for event subscription scheduler --------- Co-authored-by: karanh37 Co-authored-by: Karan Hotchandani <33024356+karanh37@users.noreply.github.com> --- .../apps/AbstractNativeApplication.java | 2 +- .../changeEvent/AbstractEventConsumer.java | 7 +- .../bundles/changeEvent/AlertPublisher.java | 4 + .../bundles/insights/DataInsightsApp.java | 4 +- .../bundles/searchIndex/SearchIndexApp.java | 4 +- .../scheduler/AbstractOmAppJobListener.java | 182 ------------------ .../service/apps/scheduler/AppScheduler.java | 2 +- .../apps/scheduler/OmAppJobListener.java | 159 ++++++++++++++- .../scheduled/EventSubscriptionScheduler.java | 52 ++++- .../service/jdbi3/AppRepository.java | 44 ++++- .../jdbi3/EventSubscriptionRepository.java | 1 + .../resources/automations/WorkflowMapper.java | 11 -- .../automations/WorkflowResource.java | 12 +- .../subscription/EventSubscriptionMapper.java | 3 +- .../EventSubscriptionResource.java | 3 +- .../service/util/DIContainer.java | 35 ++++ .../entity/applications/appRunRecord.json | 4 + .../events/api/createEventSubscription.json | 3 + .../json/schema/events/eventSubscription.json | 5 +- .../ui/src/enums/AdvancedSearch.enum.ts | 1 + .../entity/applications/appRunRecord.ts | 4 + .../events/api/createEventSubscription.ts | 1 + .../src/generated/events/eventSubscription.ts | 2 + .../ui/src/utils/JSONLogicSearchClassBase.ts | 13 ++ .../ui/src/utils/QueryBuilderUtils.tsx | 15 +- 25 files changed, 347 insertions(+), 226 deletions(-) delete mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/util/DIContainer.java diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/AbstractNativeApplication.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/AbstractNativeApplication.java index 203599c3af0..11bc9da0a66 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/AbstractNativeApplication.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/AbstractNativeApplication.java @@ -1,7 +1,7 @@ package org.openmetadata.service.apps; -import static org.openmetadata.service.apps.scheduler.AbstractOmAppJobListener.JOB_LISTENER_NAME; import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_NAME; +import static org.openmetadata.service.apps.scheduler.OmAppJobListener.JOB_LISTENER_NAME; import static org.openmetadata.service.exception.CatalogExceptionMessage.NO_MANUAL_TRIGGER_ERR; import static org.openmetadata.service.resources.apps.AppResource.SCHEDULED_TYPES; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/AbstractEventConsumer.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/AbstractEventConsumer.java index 7db4c4e4f89..93dbfefa207 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/AbstractEventConsumer.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/AbstractEventConsumer.java @@ -36,6 +36,7 @@ import org.openmetadata.schema.system.EntityError; import org.openmetadata.schema.type.ChangeEvent; import org.openmetadata.service.Entity; import org.openmetadata.service.events.errors.EventPublisherException; +import org.openmetadata.service.util.DIContainer; import org.openmetadata.service.util.JsonUtils; import org.openmetadata.service.util.ResultList; import org.quartz.DisallowConcurrentExecution; @@ -55,7 +56,7 @@ public abstract class AbstractEventConsumer public static final String OFFSET_EXTENSION = "eventSubscription.Offset"; public static final String METRICS_EXTENSION = "eventSubscription.metrics"; public static final String FAILED_EVENT_EXTENSION = "eventSubscription.failedEvent"; - + protected final DIContainer dependencies; private long offset = -1; private long startingOffset = -1; @@ -65,7 +66,9 @@ public abstract class AbstractEventConsumer protected EventSubscription eventSubscription; protected Map> destinationMap; - protected AbstractEventConsumer() {} + protected AbstractEventConsumer(DIContainer dependencies) { + this.dependencies = dependencies; + } private void init(JobExecutionContext context) { EventSubscription sub = diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/AlertPublisher.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/AlertPublisher.java index ccd6017dd36..921b001dff8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/AlertPublisher.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/AlertPublisher.java @@ -4,9 +4,13 @@ import java.util.UUID; import lombok.extern.slf4j.Slf4j; import org.openmetadata.schema.type.ChangeEvent; import org.openmetadata.service.events.errors.EventPublisherException; +import org.openmetadata.service.util.DIContainer; @Slf4j public class AlertPublisher extends AbstractEventConsumer { + public AlertPublisher(DIContainer di) { + super(di); + } @Override public void sendAlert(UUID receiverId, ChangeEvent event) throws EventPublisherException { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsApp.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsApp.java index d46f625b6bb..e57558368b7 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsApp.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsApp.java @@ -1,8 +1,8 @@ package org.openmetadata.service.apps.bundles.insights; -import static org.openmetadata.service.apps.scheduler.AbstractOmAppJobListener.APP_RUN_STATS; -import static org.openmetadata.service.apps.scheduler.AbstractOmAppJobListener.WEBSOCKET_STATUS_CHANNEL; import static org.openmetadata.service.apps.scheduler.AppScheduler.ON_DEMAND_JOB; +import static org.openmetadata.service.apps.scheduler.OmAppJobListener.APP_RUN_STATS; +import static org.openmetadata.service.apps.scheduler.OmAppJobListener.WEBSOCKET_STATUS_CHANNEL; import static org.openmetadata.service.socket.WebSocketManager.DATA_INSIGHTS_JOB_BROADCAST_CHANNEL; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getInitialStatsForEntities; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java index 49d23f87b74..6ea11fe3fcc 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java @@ -3,9 +3,9 @@ package org.openmetadata.service.apps.bundles.searchIndex; import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; import static org.openmetadata.service.Entity.TEST_CASE_RESOLUTION_STATUS; import static org.openmetadata.service.Entity.TEST_CASE_RESULT; -import static org.openmetadata.service.apps.scheduler.AbstractOmAppJobListener.APP_RUN_STATS; -import static org.openmetadata.service.apps.scheduler.AbstractOmAppJobListener.WEBSOCKET_STATUS_CHANNEL; import static org.openmetadata.service.apps.scheduler.AppScheduler.ON_DEMAND_JOB; +import static org.openmetadata.service.apps.scheduler.OmAppJobListener.APP_RUN_STATS; +import static org.openmetadata.service.apps.scheduler.OmAppJobListener.WEBSOCKET_STATUS_CHANNEL; import static org.openmetadata.service.socket.WebSocketManager.SEARCH_INDEX_JOB_BROADCAST_CHANNEL; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.ENTITY_TYPE_KEY; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.isDataInsightIndex; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java deleted file mode 100644 index d110da423fc..00000000000 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AbstractOmAppJobListener.java +++ /dev/null @@ -1,182 +0,0 @@ -package org.openmetadata.service.apps.scheduler; - -import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; -import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_NAME; - -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang.exception.ExceptionUtils; -import org.openmetadata.schema.entity.app.App; -import org.openmetadata.schema.entity.app.AppExtension; -import org.openmetadata.schema.entity.app.AppRunRecord; -import org.openmetadata.schema.entity.app.FailureContext; -import org.openmetadata.schema.entity.app.SuccessContext; -import org.openmetadata.service.apps.ApplicationHandler; -import org.openmetadata.service.jdbi3.AppRepository; -import org.openmetadata.service.jdbi3.CollectionDAO; -import org.openmetadata.service.socket.WebSocketManager; -import org.openmetadata.service.util.JsonUtils; -import org.quartz.JobDataMap; -import org.quartz.JobExecutionContext; -import org.quartz.JobExecutionException; -import org.quartz.JobListener; - -@Slf4j -public abstract class AbstractOmAppJobListener implements JobListener { - private final CollectionDAO collectionDAO; - private final AppRepository repository; - private static final String SCHEDULED_APP_RUN_EXTENSION = "AppScheduleRun"; - public static final String WEBSOCKET_STATUS_CHANNEL = "WebsocketStatusUpdateExtension"; - - public static final String APP_RUN_STATS = "AppRunStats"; - public static final String JOB_LISTENER_NAME = "OM_JOB_LISTENER"; - - protected AbstractOmAppJobListener(CollectionDAO dao) { - this.collectionDAO = dao; - this.repository = new AppRepository(); - } - - @Override - public String getName() { - return JOB_LISTENER_NAME; - } - - @Override - public void jobToBeExecuted(JobExecutionContext jobExecutionContext) { - try { - String runType = - (String) jobExecutionContext.getJobDetail().getJobDataMap().get("triggerType"); - String appName = (String) jobExecutionContext.getJobDetail().getJobDataMap().get(APP_NAME); - App jobApp = collectionDAO.applicationDAO().findEntityByName(appName); - ApplicationHandler.getInstance().setAppRuntimeProperties(jobApp); - JobDataMap dataMap = jobExecutionContext.getJobDetail().getJobDataMap(); - long jobStartTime = System.currentTimeMillis(); - AppRunRecord runRecord = - new AppRunRecord() - .withAppId(jobApp.getId()) - .withAppName(jobApp.getName()) - .withStartTime(jobStartTime) - .withTimestamp(jobStartTime) - .withRunType(runType) - .withStatus(AppRunRecord.Status.RUNNING) - .withScheduleInfo(jobApp.getAppSchedule()) - .withConfig(JsonUtils.getMap(jobApp.getAppConfiguration())); - - boolean update = false; - if (jobExecutionContext.isRecovering()) { - AppRunRecord latestRunRecord = - repository.getLatestExtensionById( - jobApp, AppRunRecord.class, AppExtension.ExtensionType.STATUS); - if (latestRunRecord != null) { - runRecord = latestRunRecord; - } - update = true; - } - // Put the Context in the Job Data Map - dataMap.put(SCHEDULED_APP_RUN_EXTENSION, JsonUtils.pojoToJson(runRecord)); - - // Insert new Record Run - pushApplicationStatusUpdates(jobExecutionContext, runRecord, update); - this.doJobToBeExecuted(jobExecutionContext); - } catch (Exception e) { - LOG.info("Error while setting up the job context", e); - } - } - - @Override - public void jobExecutionVetoed(JobExecutionContext jobExecutionContext) {} - - @Override - public void jobWasExecuted( - JobExecutionContext jobExecutionContext, JobExecutionException jobException) { - AppRunRecord runRecord = - JsonUtils.readOrConvertValue( - jobExecutionContext.getJobDetail().getJobDataMap().get(SCHEDULED_APP_RUN_EXTENSION), - AppRunRecord.class); - Object jobStats = jobExecutionContext.getJobDetail().getJobDataMap().get(APP_RUN_STATS); - long endTime = System.currentTimeMillis(); - runRecord.withEndTime(endTime); - runRecord.setExecutionTime(endTime - runRecord.getStartTime()); - - if (jobException == null - && !(runRecord.getStatus() == AppRunRecord.Status.FAILED - || runRecord.getStatus() == AppRunRecord.Status.ACTIVE_ERROR)) { - runRecord.withStatus(AppRunRecord.Status.SUCCESS); - SuccessContext context = new SuccessContext(); - if (runRecord.getSuccessContext() != null) { - context = runRecord.getSuccessContext(); - } - context.getAdditionalProperties().put("stats", JsonUtils.getMap(jobStats)); - runRecord.setSuccessContext(context); - } else { - runRecord.withStatus(AppRunRecord.Status.FAILED); - FailureContext context = new FailureContext(); - if (runRecord.getFailureContext() != null) { - context = runRecord.getFailureContext(); - } - if (jobException != null) { - Map failure = new HashMap<>(); - failure.put("message", jobException.getMessage()); - failure.put("jobStackTrace", ExceptionUtils.getStackTrace(jobException)); - context.withAdditionalProperty("failure", failure); - } - runRecord.setFailureContext(context); - } - - // Push Update on WebSocket - String webSocketChannelName = - (String) jobExecutionContext.getJobDetail().getJobDataMap().get(WEBSOCKET_STATUS_CHANNEL); - if (!nullOrEmpty(webSocketChannelName) && WebSocketManager.getInstance() != null) { - WebSocketManager.getInstance() - .broadCastMessageToAll(webSocketChannelName, JsonUtils.pojoToJson(runRecord)); - } - - // Update App Run Record - pushApplicationStatusUpdates(jobExecutionContext, runRecord, true); - - this.doJobWasExecuted(jobExecutionContext, jobException); - } - - public AppRunRecord getAppRunRecordForJob(JobExecutionContext context) { - JobDataMap dataMap = context.getJobDetail().getJobDataMap(); - return JsonUtils.readOrConvertValue( - dataMap.get(SCHEDULED_APP_RUN_EXTENSION), AppRunRecord.class); - } - - public void pushApplicationStatusUpdates( - JobExecutionContext context, AppRunRecord runRecord, boolean update) { - JobDataMap dataMap = context.getJobDetail().getJobDataMap(); - if (dataMap.containsKey(SCHEDULED_APP_RUN_EXTENSION)) { - // Update the Run Record in Data Map - dataMap.put(SCHEDULED_APP_RUN_EXTENSION, JsonUtils.pojoToJson(runRecord)); - - // Push Updates to the Database - String appName = (String) context.getJobDetail().getJobDataMap().get(APP_NAME); - UUID appId = collectionDAO.applicationDAO().findEntityByName(appName).getId(); - updateStatus(appId, runRecord, update); - } - } - - private void updateStatus(UUID appId, AppRunRecord appRunRecord, boolean update) { - if (update) { - collectionDAO - .appExtensionTimeSeriesDao() - .update( - appId.toString(), - JsonUtils.pojoToJson(appRunRecord), - appRunRecord.getTimestamp(), - AppExtension.ExtensionType.STATUS.toString()); - } else { - collectionDAO - .appExtensionTimeSeriesDao() - .insert(JsonUtils.pojoToJson(appRunRecord), AppExtension.ExtensionType.STATUS.toString()); - } - } - - protected void doJobWasExecuted( - JobExecutionContext jobExecutionContext, JobExecutionException jobException) {} - - protected void doJobToBeExecuted(JobExecutionContext jobExecutionContext) {} -} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java index 50c1fee4098..80a0e1cb8ab 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java @@ -95,7 +95,7 @@ public class AppScheduler { // Add OMJob Listener this.scheduler .getListenerManager() - .addJobListener(new OmAppJobListener(dao), jobGroupEquals(APPS_JOB_GROUP)); + .addJobListener(new OmAppJobListener(), jobGroupEquals(APPS_JOB_GROUP)); ScheduledExecutorService threadScheduler = Executors.newScheduledThreadPool(1); threadScheduler.scheduleAtFixedRate(this::resetErrorTriggers, 0, 24, TimeUnit.HOURS); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/OmAppJobListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/OmAppJobListener.java index 5eccaccb1d3..5fc30cf41d2 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/OmAppJobListener.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/OmAppJobListener.java @@ -1,9 +1,160 @@ package org.openmetadata.service.apps.scheduler; -import org.openmetadata.service.jdbi3.CollectionDAO; +import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; +import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_NAME; -public class OmAppJobListener extends AbstractOmAppJobListener { - protected OmAppJobListener(CollectionDAO dao) { - super(dao); +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.openmetadata.schema.entity.app.App; +import org.openmetadata.schema.entity.app.AppExtension; +import org.openmetadata.schema.entity.app.AppRunRecord; +import org.openmetadata.schema.entity.app.FailureContext; +import org.openmetadata.schema.entity.app.SuccessContext; +import org.openmetadata.schema.type.Include; +import org.openmetadata.service.apps.ApplicationHandler; +import org.openmetadata.service.jdbi3.AppRepository; +import org.openmetadata.service.socket.WebSocketManager; +import org.openmetadata.service.util.JsonUtils; +import org.quartz.JobDataMap; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; +import org.quartz.JobListener; + +@Slf4j +public class OmAppJobListener implements JobListener { + private final AppRepository repository; + private static final String SCHEDULED_APP_RUN_EXTENSION = "AppScheduleRun"; + public static final String WEBSOCKET_STATUS_CHANNEL = "WebsocketStatusUpdateExtension"; + + public static final String APP_RUN_STATS = "AppRunStats"; + public static final String JOB_LISTENER_NAME = "OM_JOB_LISTENER"; + + protected OmAppJobListener() { + this.repository = new AppRepository(); + } + + @Override + public String getName() { + return JOB_LISTENER_NAME; + } + + @Override + public void jobToBeExecuted(JobExecutionContext jobExecutionContext) { + try { + String runType = + (String) jobExecutionContext.getJobDetail().getJobDataMap().get("triggerType"); + String appName = (String) jobExecutionContext.getJobDetail().getJobDataMap().get(APP_NAME); + App jobApp = repository.findByName(appName, Include.NON_DELETED); + ApplicationHandler.getInstance().setAppRuntimeProperties(jobApp); + JobDataMap dataMap = jobExecutionContext.getJobDetail().getJobDataMap(); + long jobStartTime = System.currentTimeMillis(); + AppRunRecord runRecord = + new AppRunRecord() + .withAppId(jobApp.getId()) + .withAppName(jobApp.getName()) + .withStartTime(jobStartTime) + .withTimestamp(jobStartTime) + .withRunType(runType) + .withStatus(AppRunRecord.Status.RUNNING) + .withScheduleInfo(jobApp.getAppSchedule()) + .withConfig(JsonUtils.getMap(jobApp.getAppConfiguration())); + + boolean update = false; + if (jobExecutionContext.isRecovering()) { + AppRunRecord latestRunRecord = + repository.getLatestExtensionById( + jobApp, AppRunRecord.class, AppExtension.ExtensionType.STATUS); + if (latestRunRecord != null) { + runRecord = latestRunRecord; + } + update = true; + } + // Put the Context in the Job Data Map + dataMap.put(SCHEDULED_APP_RUN_EXTENSION, JsonUtils.pojoToJson(runRecord)); + + // Insert new Record Run + pushApplicationStatusUpdates(jobExecutionContext, runRecord, update); + } catch (Exception e) { + LOG.info("Error while setting up the job context", e); + } + } + + @Override + public void jobExecutionVetoed(JobExecutionContext jobExecutionContext) {} + + @Override + public void jobWasExecuted( + JobExecutionContext jobExecutionContext, JobExecutionException jobException) { + AppRunRecord runRecord = + JsonUtils.readOrConvertValue( + jobExecutionContext.getJobDetail().getJobDataMap().get(SCHEDULED_APP_RUN_EXTENSION), + AppRunRecord.class); + Object jobStats = jobExecutionContext.getJobDetail().getJobDataMap().get(APP_RUN_STATS); + long endTime = System.currentTimeMillis(); + runRecord.withEndTime(endTime); + runRecord.setExecutionTime(endTime - runRecord.getStartTime()); + + if (jobException == null + && !(runRecord.getStatus() == AppRunRecord.Status.FAILED + || runRecord.getStatus() == AppRunRecord.Status.ACTIVE_ERROR)) { + runRecord.withStatus(AppRunRecord.Status.SUCCESS); + SuccessContext context = new SuccessContext(); + if (runRecord.getSuccessContext() != null) { + context = runRecord.getSuccessContext(); + } + context.getAdditionalProperties().put("stats", JsonUtils.getMap(jobStats)); + runRecord.setSuccessContext(context); + } else { + runRecord.withStatus(AppRunRecord.Status.FAILED); + FailureContext context = new FailureContext(); + if (runRecord.getFailureContext() != null) { + context = runRecord.getFailureContext(); + } + if (jobException != null) { + Map failure = new HashMap<>(); + failure.put("message", jobException.getMessage()); + failure.put("jobStackTrace", ExceptionUtils.getStackTrace(jobException)); + context.withAdditionalProperty("failure", failure); + } + runRecord.setFailureContext(context); + } + + // Push Update on WebSocket + String webSocketChannelName = + (String) jobExecutionContext.getJobDetail().getJobDataMap().get(WEBSOCKET_STATUS_CHANNEL); + if (!nullOrEmpty(webSocketChannelName) && WebSocketManager.getInstance() != null) { + WebSocketManager.getInstance() + .broadCastMessageToAll(webSocketChannelName, JsonUtils.pojoToJson(runRecord)); + } + + // Update App Run Record + pushApplicationStatusUpdates(jobExecutionContext, runRecord, true); + } + + public AppRunRecord getAppRunRecordForJob(JobExecutionContext context) { + JobDataMap dataMap = context.getJobDetail().getJobDataMap(); + return JsonUtils.readOrConvertValue( + dataMap.get(SCHEDULED_APP_RUN_EXTENSION), AppRunRecord.class); + } + + public void pushApplicationStatusUpdates( + JobExecutionContext context, AppRunRecord runRecord, boolean update) { + JobDataMap dataMap = context.getJobDetail().getJobDataMap(); + if (dataMap.containsKey(SCHEDULED_APP_RUN_EXTENSION)) { + // Update the Run Record in Data Map + dataMap.put(SCHEDULED_APP_RUN_EXTENSION, JsonUtils.pojoToJson(runRecord)); + + // Push Updates to the Database + String appName = (String) context.getJobDetail().getJobDataMap().get(APP_NAME); + UUID appId = repository.findByName(appName, Include.NON_DELETED).getId(); + if (update) { + repository.updateAppStatus(appId, runRecord); + } else { + repository.addAppStatus(runRecord); + } + } } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/EventSubscriptionScheduler.java b/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/EventSubscriptionScheduler.java index e8ce32bb0a0..8ec4ff160a9 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/EventSubscriptionScheduler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/EventSubscriptionScheduler.java @@ -37,14 +37,20 @@ import org.openmetadata.schema.entity.events.FailedEventResponse; import org.openmetadata.schema.entity.events.SubscriptionDestination; import org.openmetadata.schema.entity.events.SubscriptionStatus; import org.openmetadata.schema.type.ChangeEvent; +import org.openmetadata.sdk.PipelineServiceClientInterface; import org.openmetadata.service.Entity; +import org.openmetadata.service.OpenMetadataApplicationConfig; import org.openmetadata.service.apps.bundles.changeEvent.AbstractEventConsumer; import org.openmetadata.service.apps.bundles.changeEvent.AlertPublisher; +import org.openmetadata.service.clients.pipeline.PipelineServiceClientFactory; import org.openmetadata.service.events.subscription.AlertUtil; import org.openmetadata.service.jdbi3.EntityRepository; import org.openmetadata.service.jdbi3.EventSubscriptionRepository; import org.openmetadata.service.resources.events.subscription.TypedEvent; +import org.openmetadata.service.util.DIContainer; import org.openmetadata.service.util.JsonUtils; +import org.openmetadata.service.util.OpenMetadataConnectionBuilder; +import org.quartz.Job; import org.quartz.JobBuilder; import org.quartz.JobDataMap; import org.quartz.JobDetail; @@ -56,6 +62,8 @@ import org.quartz.Trigger; import org.quartz.TriggerBuilder; import org.quartz.TriggerKey; import org.quartz.impl.StdSchedulerFactory; +import org.quartz.spi.JobFactory; +import org.quartz.spi.TriggerFiredBundle; @Slf4j public class EventSubscriptionScheduler { @@ -63,24 +71,53 @@ public class EventSubscriptionScheduler { public static final String ALERT_TRIGGER_GROUP = "OMAlertJobGroup"; private static EventSubscriptionScheduler instance; private static volatile boolean initialized = false; - private final Scheduler alertsScheduler = new StdSchedulerFactory().getScheduler(); - private EventSubscriptionScheduler() throws SchedulerException { + private record CustomJobFactory(DIContainer di) implements JobFactory { + + @Override + public Job newJob(TriggerFiredBundle bundle, Scheduler scheduler) throws SchedulerException { + try { + JobDetail jobDetail = bundle.getJobDetail(); + Class jobClass = jobDetail.getJobClass(); + Job job = jobClass.getDeclaredConstructor(DIContainer.class).newInstance(di); + return job; + } catch (Exception e) { + throw new SchedulerException("Failed to create job instance", e); + } + } + } + + private EventSubscriptionScheduler( + PipelineServiceClientInterface pipelineServiceClient, + OpenMetadataConnectionBuilder openMetadataConnectionBuilder) + throws SchedulerException { + DIContainer di = new DIContainer(); + di.registerResource(PipelineServiceClientInterface.class, pipelineServiceClient); + di.registerResource(OpenMetadataConnectionBuilder.class, openMetadataConnectionBuilder); + this.alertsScheduler.setJobFactory(new CustomJobFactory(di)); this.alertsScheduler.start(); } @SneakyThrows public static EventSubscriptionScheduler getInstance() { if (!initialized) { - initialize(); + throw new RuntimeException("Event Subscription Scheduler is not initialized"); } return instance; } - private static void initialize() throws SchedulerException { + public static void initialize(OpenMetadataApplicationConfig openMetadataApplicationConfig) { if (!initialized) { - instance = new EventSubscriptionScheduler(); + try { + instance = + new EventSubscriptionScheduler( + PipelineServiceClientFactory.createPipelineServiceClient( + openMetadataApplicationConfig.getPipelineServiceClientConfiguration()), + new OpenMetadataConnectionBuilder(openMetadataApplicationConfig)); + } catch (SchedulerException e) { + throw new RuntimeException("Failed to initialize Event Subscription Scheduler", e); + } initialized = true; } else { LOG.info("Event Subscription Scheduler is already initialized"); @@ -101,7 +138,10 @@ public class EventSubscriptionScheduler { Optional.ofNullable(eventSubscription.getClassName()) .orElse(defaultClass.getCanonicalName())) .asSubclass(AbstractEventConsumer.class); - AbstractEventConsumer publisher = clazz.getDeclaredConstructor().newInstance(); + // we can use an empty dependency container here because when initializing + // the consumer because it does need to access any state + AbstractEventConsumer publisher = + clazz.getDeclaredConstructor(DIContainer.class).newInstance(new DIContainer()); if (reinstall && isSubscriptionRegistered(eventSubscription)) { deleteEventSubscriptionPublisher(eventSubscription); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/AppRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/AppRepository.java index 1cd40337fd8..6d14a5fe87d 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/AppRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/AppRepository.java @@ -382,7 +382,15 @@ public class AppRepository extends EntityRepository { return new AppRepository.AppUpdater(original, updated, operation); } - public void addEventSubscription(App app, EventSubscription eventSubscription) { + public App addEventSubscription(App app, EventSubscription eventSubscription) { + EntityReference existing = + listOrEmpty(app.getEventSubscriptions()).stream() + .filter(e -> e.getId().equals(eventSubscription.getId())) + .findFirst() + .orElse(null); + if (existing != null) { + return app; + } addRelationship( app.getId(), eventSubscription.getId(), @@ -393,7 +401,39 @@ public class AppRepository extends EntityRepository { newSubs.add(eventSubscription.getEntityReference()); App updated = JsonUtils.deepCopy(app, App.class).withEventSubscriptions(newSubs); updated.setOpenMetadataServerConnection(null); - getUpdater(app, updated, EntityRepository.Operation.PATCH).update(); + getUpdater(app, updated, EntityRepository.Operation.PUT).update(); + return updated; + } + + public App deleteEventSubscription(App app, UUID eventSubscriptionId) { + deleteRelationship( + app.getId(), + Entity.APPLICATION, + eventSubscriptionId, + Entity.EVENT_SUBSCRIPTION, + Relationship.CONTAINS); + List newSubs = new ArrayList<>(listOrEmpty(app.getEventSubscriptions())); + newSubs.removeIf(sub -> sub.getId().equals(eventSubscriptionId)); + App updated = JsonUtils.deepCopy(app, App.class).withEventSubscriptions(newSubs); + updated.setOpenMetadataServerConnection(null); + getUpdater(app, updated, EntityRepository.Operation.PUT).update(); + return updated; + } + + public void updateAppStatus(UUID appID, AppRunRecord record) { + daoCollection + .appExtensionTimeSeriesDao() + .update( + appID.toString(), + JsonUtils.pojoToJson(record), + record.getTimestamp(), + AppExtension.ExtensionType.STATUS.toString()); + } + + public void addAppStatus(AppRunRecord record) { + daoCollection + .appExtensionTimeSeriesDao() + .insert(JsonUtils.pojoToJson(record), AppExtension.ExtensionType.STATUS.toString()); } public class AppUpdater extends EntityUpdater { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EventSubscriptionRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EventSubscriptionRepository.java index 52ac95ba0e4..d6c120896d7 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EventSubscriptionRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EventSubscriptionRepository.java @@ -175,6 +175,7 @@ public class EventSubscriptionRepository extends EntityRepository { - private final OpenMetadataApplicationConfig openMetadataApplicationConfig; - - public WorkflowMapper(OpenMetadataApplicationConfig config) { - this.openMetadataApplicationConfig = config; - } @Override public Workflow createToEntity(CreateWorkflow create, String user) { - OpenMetadataConnection openMetadataServerConnection = - new OpenMetadataConnectionBuilder(openMetadataApplicationConfig).build(); return copy(new Workflow(), create, user) .withDescription(create.getDescription()) .withRequest(create.getRequest()) @@ -25,7 +15,6 @@ public class WorkflowMapper implements EntityMapper { .withDisplayName(create.getDisplayName()) .withResponse(create.getResponse()) .withStatus(create.getStatus()) - .withOpenMetadataServerConnection(openMetadataServerConnection) .withName(create.getName()); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/automations/WorkflowResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/automations/WorkflowResource.java index 01f22137cca..fa95cfe95ef 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/automations/WorkflowResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/automations/WorkflowResource.java @@ -85,7 +85,7 @@ public class WorkflowResource extends EntityResource { @@ -359,8 +359,7 @@ public class WorkflowResource extends EntityResource { public static final String COLLECTION_PATH = "/v1/events/subscriptions"; @@ -132,6 +132,7 @@ public class EventSubscriptionResource @Override public void initialize(OpenMetadataApplicationConfig config) { try { + EventSubscriptionScheduler.initialize(config); EventsSubscriptionRegistry.initialize( listOrEmpty(EventSubscriptionResource.getNotificationsFilterDescriptors()), listOrEmpty(EventSubscriptionResource.getObservabilityFilterDescriptors())); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/DIContainer.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/DIContainer.java new file mode 100644 index 00000000000..da3b58fab57 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/DIContainer.java @@ -0,0 +1,35 @@ +package org.openmetadata.service.util; + +import java.util.HashMap; +import java.util.Map; + +/** + * Dependency Injection container to manage resources. This is a simple implementation and does not + * support any lifecycle management. It allows registering resources and retrieving them by a + * a consuming class. Consuming class might use a subset of the resources registered (or none at all) + * Example: + * + *
+ *     DIContainer container = new DIContainer();
+ *     container.registerResource(MyService.class, new MyService());
+ *     MyService service = container.getResource(MyService.class);
+ *     service.doSomething();
+ * 
+ * + * TODO - Add support for lifecycle management. + */ +public class DIContainer { + private final Map, Object> resources = new HashMap<>(); + + public void registerResource(Class resourceClass, T resource) { + resources.put(resourceClass, resource); + } + + public T getResource(Class resourceClass) { + T resource = resourceClass.cast(resources.get(resourceClass)); + if (resource == null) { + throw new IllegalStateException("Resource not initialized: " + resourceClass.getName()); + } + return resource; + } +} diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/applications/appRunRecord.json b/openmetadata-spec/src/main/resources/json/schema/entity/applications/appRunRecord.json index 6dde1cf433e..9f9e33a16f5 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/applications/appRunRecord.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/applications/appRunRecord.json @@ -68,6 +68,10 @@ "config": { "descripton": "The configuration used for this application run. It's type will be based on the application type. Old runs might not be compatible with schema of app configuration.", "$ref": "../../type/basic.json#/definitions/map" + }, + "properties": { + "description": "Arbitrary metadata that will be attached to the report.", + "$ref": "../../type/basic.json#/definitions/map" } }, "additionalProperties": false diff --git a/openmetadata-spec/src/main/resources/json/schema/events/api/createEventSubscription.json b/openmetadata-spec/src/main/resources/json/schema/events/api/createEventSubscription.json index d21e07f750b..672c0930741 100644 --- a/openmetadata-spec/src/main/resources/json/schema/events/api/createEventSubscription.json +++ b/openmetadata-spec/src/main/resources/json/schema/events/api/createEventSubscription.json @@ -79,6 +79,9 @@ "domain" : { "description": "Fully qualified name of the domain the Table belongs to.", "type": "string" + }, + "config": { + "$ref": "../../type/basic.json#/definitions/map" } }, "required": ["name", "alertType"], diff --git a/openmetadata-spec/src/main/resources/json/schema/events/eventSubscription.json b/openmetadata-spec/src/main/resources/json/schema/events/eventSubscription.json index 596536916e5..e0dbc7770b0 100644 --- a/openmetadata-spec/src/main/resources/json/schema/events/eventSubscription.json +++ b/openmetadata-spec/src/main/resources/json/schema/events/eventSubscription.json @@ -78,7 +78,7 @@ "alertType": { "description": "Type of Alerts supported.", "type": "string", - "enum": ["Notification", "Observability", "ActivityFeed", "GovernanceWorkflowChangeEvent"], + "enum": ["Notification", "Observability", "ActivityFeed", "GovernanceWorkflowChangeEvent", "Custom"], "default": "Notification" }, "subscriptionCategory": { @@ -328,6 +328,9 @@ "domain" : { "description": "Domain the asset belongs to. When not set, the asset inherits the domain from the parent it belongs to.", "$ref": "../type/entityReference.json" + }, + "config": { + "$ref": "../type/basic.json#/definitions/map" } }, "required": ["id", "name", "alertType", "destinations"], diff --git a/openmetadata-ui/src/main/resources/ui/src/enums/AdvancedSearch.enum.ts b/openmetadata-ui/src/main/resources/ui/src/enums/AdvancedSearch.enum.ts index f138dad2994..58498fee734 100644 --- a/openmetadata-ui/src/main/resources/ui/src/enums/AdvancedSearch.enum.ts +++ b/openmetadata-ui/src/main/resources/ui/src/enums/AdvancedSearch.enum.ts @@ -84,4 +84,5 @@ export enum EntityReferenceFields { TIER = 'tier.tagFQN', TABLE_TYPE = 'tableType', EXTENSION = 'extension', + SERVICE = 'service.fullyQualifiedName', } diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/applications/appRunRecord.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/applications/appRunRecord.ts index ad440fb58cf..436e2a74492 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/applications/appRunRecord.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/applications/appRunRecord.ts @@ -39,6 +39,10 @@ export interface AppRunRecord { * Failure Context for the Application. */ failureContext?: { [key: string]: any }; + /** + * Arbitrary metadata that will be attached to the report. + */ + properties?: { [key: string]: any }; /** * This schema defines the type of application Run. */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/events/api/createEventSubscription.ts b/openmetadata-ui/src/main/resources/ui/src/generated/events/api/createEventSubscription.ts index d7a75280f01..d1d120a8303 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/events/api/createEventSubscription.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/events/api/createEventSubscription.ts @@ -26,6 +26,7 @@ export interface CreateEventSubscription { * Consumer Class for the Event Subscription. Will use 'AlertPublisher' if not provided. */ className?: string; + config?: { [key: string]: any }; /** * A short description of the Alert, comprehensible to regular users. */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/events/eventSubscription.ts b/openmetadata-ui/src/main/resources/ui/src/generated/events/eventSubscription.ts index 6f658bb19ff..f48c91d2963 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/events/eventSubscription.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/events/eventSubscription.ts @@ -31,6 +31,7 @@ export interface EventSubscription { * Java class for the Event Subscription. */ className?: string; + config?: { [key: string]: any }; /** * A short description of the Event Subscription, comprehensible to regular users. */ @@ -116,6 +117,7 @@ export interface EventSubscription { */ export enum AlertType { ActivityFeed = "ActivityFeed", + Custom = "Custom", GovernanceWorkflowChangeEvent = "GovernanceWorkflowChangeEvent", Notification = "Notification", Observability = "Observability", diff --git a/openmetadata-ui/src/main/resources/ui/src/utils/JSONLogicSearchClassBase.ts b/openmetadata-ui/src/main/resources/ui/src/utils/JSONLogicSearchClassBase.ts index 625dd967e5a..88bddfb2e75 100644 --- a/openmetadata-ui/src/main/resources/ui/src/utils/JSONLogicSearchClassBase.ts +++ b/openmetadata-ui/src/main/resources/ui/src/utils/JSONLogicSearchClassBase.ts @@ -240,6 +240,19 @@ class JSONLogicSearchClassBase { tierOptions?: Promise; }) => { return { + [EntityReferenceFields.SERVICE]: { + label: t('label.service'), + type: 'select', + mainWidgetProps: this.mainWidgetProps, + operators: this.defaultSelectOperators, + fieldSettings: { + asyncFetch: advancedSearchClassBase.autocomplete({ + searchIndex: SearchIndex.ALL, + entityField: EntityFields.SERVICE, + }), + useAsyncSearch: true, + }, + }, [EntityReferenceFields.OWNERS]: { label: t('label.owner-plural'), type: 'select', diff --git a/openmetadata-ui/src/main/resources/ui/src/utils/QueryBuilderUtils.tsx b/openmetadata-ui/src/main/resources/ui/src/utils/QueryBuilderUtils.tsx index db49beb86e1..6330f7db373 100644 --- a/openmetadata-ui/src/main/resources/ui/src/utils/QueryBuilderUtils.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/utils/QueryBuilderUtils.tsx @@ -33,6 +33,9 @@ import { generateUUID } from './StringsUtils'; export const JSONLOGIC_FIELDS_TO_IGNORE_SPLIT = [ EntityReferenceFields.EXTENSION, + EntityReferenceFields.SERVICE, + EntityReferenceFields.DATABASE, + EntityReferenceFields.DATABASE_SCHEMA, ]; const resolveFieldType = ( @@ -534,9 +537,15 @@ export const elasticsearchToJsonLogic = ( if (field.includes('.')) { const [parentField, childField] = field.split('.'); - return JSONLOGIC_FIELDS_TO_IGNORE_SPLIT.includes( - parentField as EntityReferenceFields - ) + const shouldIgnoreSplit = + JSONLOGIC_FIELDS_TO_IGNORE_SPLIT.includes( + parentField as EntityReferenceFields + ) || + JSONLOGIC_FIELDS_TO_IGNORE_SPLIT.includes( + field as EntityReferenceFields + ); + + return shouldIgnoreSplit ? { '==': [{ var: field }, value] } : { some: [