MINOR: feat(event-subscriptions): event consumer extension (#19591)

* feat(event-subscriptions): event consumer extension

- implemented DI for event consumers
- added custom config for event consumers
- initialize the event scheduler along with application
- added workflow custom type

* - removed the ConsumerService as it is specific for each consumer implementation
- added a "custom" alert type

* wip

* cleanup

* reverted pom.xml changes

* reverted pom.xml changes

* reverted pom.xml changes

* removed abstract layer AbstractOmAppJobListener

* fixed import AbstractOmAppJobListener -> OmAppJobListener

* fixed constructor

* - add event subscription if it doesnt exist (set operation)
- add deleteEventSubscription

* - use patch mode when updating app.eventSubscriptions

* added docs for DIContainer

* aligned py tests with new automation schema

* added AppRunRecord.properties

* format

* changes after merge

* generated missing ts

* use openmetadata config builder instead of the config

* add json logic fixes for reverse metadata

* ref: reverted changes ro workflows

they will be part of https://github.com/open-metadata/OpenMetadata/pull/19723

* ref: reverted changes to workflows

they will be part of https://github.com/open-metadata/OpenMetadata/pull/19723

* reverted ts changes

* reverted ts changes

* reverted ts changes

* fix: initialization sequence for event subscription scheduler

---------

Co-authored-by: karanh37 <karanh37@gmail.com>
Co-authored-by: Karan Hotchandani <33024356+karanh37@users.noreply.github.com>
This commit is contained in:
Imri Paran 2025-02-19 16:23:46 +01:00 committed by GitHub
parent 9d51add739
commit 3b5218b923
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 347 additions and 226 deletions

View File

@ -1,7 +1,7 @@
package org.openmetadata.service.apps;
import static org.openmetadata.service.apps.scheduler.AbstractOmAppJobListener.JOB_LISTENER_NAME;
import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_NAME;
import static org.openmetadata.service.apps.scheduler.OmAppJobListener.JOB_LISTENER_NAME;
import static org.openmetadata.service.exception.CatalogExceptionMessage.NO_MANUAL_TRIGGER_ERR;
import static org.openmetadata.service.resources.apps.AppResource.SCHEDULED_TYPES;

View File

@ -36,6 +36,7 @@ import org.openmetadata.schema.system.EntityError;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.service.Entity;
import org.openmetadata.service.events.errors.EventPublisherException;
import org.openmetadata.service.util.DIContainer;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.ResultList;
import org.quartz.DisallowConcurrentExecution;
@ -55,7 +56,7 @@ public abstract class AbstractEventConsumer
public static final String OFFSET_EXTENSION = "eventSubscription.Offset";
public static final String METRICS_EXTENSION = "eventSubscription.metrics";
public static final String FAILED_EVENT_EXTENSION = "eventSubscription.failedEvent";
protected final DIContainer dependencies;
private long offset = -1;
private long startingOffset = -1;
@ -65,7 +66,9 @@ public abstract class AbstractEventConsumer
protected EventSubscription eventSubscription;
protected Map<UUID, Destination<ChangeEvent>> destinationMap;
protected AbstractEventConsumer() {}
protected AbstractEventConsumer(DIContainer dependencies) {
this.dependencies = dependencies;
}
private void init(JobExecutionContext context) {
EventSubscription sub =

View File

@ -4,9 +4,13 @@ import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.service.events.errors.EventPublisherException;
import org.openmetadata.service.util.DIContainer;
@Slf4j
public class AlertPublisher extends AbstractEventConsumer {
public AlertPublisher(DIContainer di) {
super(di);
}
@Override
public void sendAlert(UUID receiverId, ChangeEvent event) throws EventPublisherException {

View File

@ -1,8 +1,8 @@
package org.openmetadata.service.apps.bundles.insights;
import static org.openmetadata.service.apps.scheduler.AbstractOmAppJobListener.APP_RUN_STATS;
import static org.openmetadata.service.apps.scheduler.AbstractOmAppJobListener.WEBSOCKET_STATUS_CHANNEL;
import static org.openmetadata.service.apps.scheduler.AppScheduler.ON_DEMAND_JOB;
import static org.openmetadata.service.apps.scheduler.OmAppJobListener.APP_RUN_STATS;
import static org.openmetadata.service.apps.scheduler.OmAppJobListener.WEBSOCKET_STATUS_CHANNEL;
import static org.openmetadata.service.socket.WebSocketManager.DATA_INSIGHTS_JOB_BROADCAST_CHANNEL;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getInitialStatsForEntities;

View File

@ -3,9 +3,9 @@ package org.openmetadata.service.apps.bundles.searchIndex;
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import static org.openmetadata.service.Entity.TEST_CASE_RESOLUTION_STATUS;
import static org.openmetadata.service.Entity.TEST_CASE_RESULT;
import static org.openmetadata.service.apps.scheduler.AbstractOmAppJobListener.APP_RUN_STATS;
import static org.openmetadata.service.apps.scheduler.AbstractOmAppJobListener.WEBSOCKET_STATUS_CHANNEL;
import static org.openmetadata.service.apps.scheduler.AppScheduler.ON_DEMAND_JOB;
import static org.openmetadata.service.apps.scheduler.OmAppJobListener.APP_RUN_STATS;
import static org.openmetadata.service.apps.scheduler.OmAppJobListener.WEBSOCKET_STATUS_CHANNEL;
import static org.openmetadata.service.socket.WebSocketManager.SEARCH_INDEX_JOB_BROADCAST_CHANNEL;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.ENTITY_TYPE_KEY;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.isDataInsightIndex;

View File

@ -1,182 +0,0 @@
package org.openmetadata.service.apps.scheduler;
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_NAME;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.entity.app.AppExtension;
import org.openmetadata.schema.entity.app.AppRunRecord;
import org.openmetadata.schema.entity.app.FailureContext;
import org.openmetadata.schema.entity.app.SuccessContext;
import org.openmetadata.service.apps.ApplicationHandler;
import org.openmetadata.service.jdbi3.AppRepository;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.socket.WebSocketManager;
import org.openmetadata.service.util.JsonUtils;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.JobListener;
@Slf4j
public abstract class AbstractOmAppJobListener implements JobListener {
private final CollectionDAO collectionDAO;
private final AppRepository repository;
private static final String SCHEDULED_APP_RUN_EXTENSION = "AppScheduleRun";
public static final String WEBSOCKET_STATUS_CHANNEL = "WebsocketStatusUpdateExtension";
public static final String APP_RUN_STATS = "AppRunStats";
public static final String JOB_LISTENER_NAME = "OM_JOB_LISTENER";
protected AbstractOmAppJobListener(CollectionDAO dao) {
this.collectionDAO = dao;
this.repository = new AppRepository();
}
@Override
public String getName() {
return JOB_LISTENER_NAME;
}
@Override
public void jobToBeExecuted(JobExecutionContext jobExecutionContext) {
try {
String runType =
(String) jobExecutionContext.getJobDetail().getJobDataMap().get("triggerType");
String appName = (String) jobExecutionContext.getJobDetail().getJobDataMap().get(APP_NAME);
App jobApp = collectionDAO.applicationDAO().findEntityByName(appName);
ApplicationHandler.getInstance().setAppRuntimeProperties(jobApp);
JobDataMap dataMap = jobExecutionContext.getJobDetail().getJobDataMap();
long jobStartTime = System.currentTimeMillis();
AppRunRecord runRecord =
new AppRunRecord()
.withAppId(jobApp.getId())
.withAppName(jobApp.getName())
.withStartTime(jobStartTime)
.withTimestamp(jobStartTime)
.withRunType(runType)
.withStatus(AppRunRecord.Status.RUNNING)
.withScheduleInfo(jobApp.getAppSchedule())
.withConfig(JsonUtils.getMap(jobApp.getAppConfiguration()));
boolean update = false;
if (jobExecutionContext.isRecovering()) {
AppRunRecord latestRunRecord =
repository.getLatestExtensionById(
jobApp, AppRunRecord.class, AppExtension.ExtensionType.STATUS);
if (latestRunRecord != null) {
runRecord = latestRunRecord;
}
update = true;
}
// Put the Context in the Job Data Map
dataMap.put(SCHEDULED_APP_RUN_EXTENSION, JsonUtils.pojoToJson(runRecord));
// Insert new Record Run
pushApplicationStatusUpdates(jobExecutionContext, runRecord, update);
this.doJobToBeExecuted(jobExecutionContext);
} catch (Exception e) {
LOG.info("Error while setting up the job context", e);
}
}
@Override
public void jobExecutionVetoed(JobExecutionContext jobExecutionContext) {}
@Override
public void jobWasExecuted(
JobExecutionContext jobExecutionContext, JobExecutionException jobException) {
AppRunRecord runRecord =
JsonUtils.readOrConvertValue(
jobExecutionContext.getJobDetail().getJobDataMap().get(SCHEDULED_APP_RUN_EXTENSION),
AppRunRecord.class);
Object jobStats = jobExecutionContext.getJobDetail().getJobDataMap().get(APP_RUN_STATS);
long endTime = System.currentTimeMillis();
runRecord.withEndTime(endTime);
runRecord.setExecutionTime(endTime - runRecord.getStartTime());
if (jobException == null
&& !(runRecord.getStatus() == AppRunRecord.Status.FAILED
|| runRecord.getStatus() == AppRunRecord.Status.ACTIVE_ERROR)) {
runRecord.withStatus(AppRunRecord.Status.SUCCESS);
SuccessContext context = new SuccessContext();
if (runRecord.getSuccessContext() != null) {
context = runRecord.getSuccessContext();
}
context.getAdditionalProperties().put("stats", JsonUtils.getMap(jobStats));
runRecord.setSuccessContext(context);
} else {
runRecord.withStatus(AppRunRecord.Status.FAILED);
FailureContext context = new FailureContext();
if (runRecord.getFailureContext() != null) {
context = runRecord.getFailureContext();
}
if (jobException != null) {
Map<String, Object> failure = new HashMap<>();
failure.put("message", jobException.getMessage());
failure.put("jobStackTrace", ExceptionUtils.getStackTrace(jobException));
context.withAdditionalProperty("failure", failure);
}
runRecord.setFailureContext(context);
}
// Push Update on WebSocket
String webSocketChannelName =
(String) jobExecutionContext.getJobDetail().getJobDataMap().get(WEBSOCKET_STATUS_CHANNEL);
if (!nullOrEmpty(webSocketChannelName) && WebSocketManager.getInstance() != null) {
WebSocketManager.getInstance()
.broadCastMessageToAll(webSocketChannelName, JsonUtils.pojoToJson(runRecord));
}
// Update App Run Record
pushApplicationStatusUpdates(jobExecutionContext, runRecord, true);
this.doJobWasExecuted(jobExecutionContext, jobException);
}
public AppRunRecord getAppRunRecordForJob(JobExecutionContext context) {
JobDataMap dataMap = context.getJobDetail().getJobDataMap();
return JsonUtils.readOrConvertValue(
dataMap.get(SCHEDULED_APP_RUN_EXTENSION), AppRunRecord.class);
}
public void pushApplicationStatusUpdates(
JobExecutionContext context, AppRunRecord runRecord, boolean update) {
JobDataMap dataMap = context.getJobDetail().getJobDataMap();
if (dataMap.containsKey(SCHEDULED_APP_RUN_EXTENSION)) {
// Update the Run Record in Data Map
dataMap.put(SCHEDULED_APP_RUN_EXTENSION, JsonUtils.pojoToJson(runRecord));
// Push Updates to the Database
String appName = (String) context.getJobDetail().getJobDataMap().get(APP_NAME);
UUID appId = collectionDAO.applicationDAO().findEntityByName(appName).getId();
updateStatus(appId, runRecord, update);
}
}
private void updateStatus(UUID appId, AppRunRecord appRunRecord, boolean update) {
if (update) {
collectionDAO
.appExtensionTimeSeriesDao()
.update(
appId.toString(),
JsonUtils.pojoToJson(appRunRecord),
appRunRecord.getTimestamp(),
AppExtension.ExtensionType.STATUS.toString());
} else {
collectionDAO
.appExtensionTimeSeriesDao()
.insert(JsonUtils.pojoToJson(appRunRecord), AppExtension.ExtensionType.STATUS.toString());
}
}
protected void doJobWasExecuted(
JobExecutionContext jobExecutionContext, JobExecutionException jobException) {}
protected void doJobToBeExecuted(JobExecutionContext jobExecutionContext) {}
}

View File

@ -95,7 +95,7 @@ public class AppScheduler {
// Add OMJob Listener
this.scheduler
.getListenerManager()
.addJobListener(new OmAppJobListener(dao), jobGroupEquals(APPS_JOB_GROUP));
.addJobListener(new OmAppJobListener(), jobGroupEquals(APPS_JOB_GROUP));
ScheduledExecutorService threadScheduler = Executors.newScheduledThreadPool(1);
threadScheduler.scheduleAtFixedRate(this::resetErrorTriggers, 0, 24, TimeUnit.HOURS);

View File

@ -1,9 +1,160 @@
package org.openmetadata.service.apps.scheduler;
import org.openmetadata.service.jdbi3.CollectionDAO;
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_NAME;
public class OmAppJobListener extends AbstractOmAppJobListener {
protected OmAppJobListener(CollectionDAO dao) {
super(dao);
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.entity.app.AppExtension;
import org.openmetadata.schema.entity.app.AppRunRecord;
import org.openmetadata.schema.entity.app.FailureContext;
import org.openmetadata.schema.entity.app.SuccessContext;
import org.openmetadata.schema.type.Include;
import org.openmetadata.service.apps.ApplicationHandler;
import org.openmetadata.service.jdbi3.AppRepository;
import org.openmetadata.service.socket.WebSocketManager;
import org.openmetadata.service.util.JsonUtils;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.JobListener;
@Slf4j
public class OmAppJobListener implements JobListener {
private final AppRepository repository;
private static final String SCHEDULED_APP_RUN_EXTENSION = "AppScheduleRun";
public static final String WEBSOCKET_STATUS_CHANNEL = "WebsocketStatusUpdateExtension";
public static final String APP_RUN_STATS = "AppRunStats";
public static final String JOB_LISTENER_NAME = "OM_JOB_LISTENER";
protected OmAppJobListener() {
this.repository = new AppRepository();
}
@Override
public String getName() {
return JOB_LISTENER_NAME;
}
@Override
public void jobToBeExecuted(JobExecutionContext jobExecutionContext) {
try {
String runType =
(String) jobExecutionContext.getJobDetail().getJobDataMap().get("triggerType");
String appName = (String) jobExecutionContext.getJobDetail().getJobDataMap().get(APP_NAME);
App jobApp = repository.findByName(appName, Include.NON_DELETED);
ApplicationHandler.getInstance().setAppRuntimeProperties(jobApp);
JobDataMap dataMap = jobExecutionContext.getJobDetail().getJobDataMap();
long jobStartTime = System.currentTimeMillis();
AppRunRecord runRecord =
new AppRunRecord()
.withAppId(jobApp.getId())
.withAppName(jobApp.getName())
.withStartTime(jobStartTime)
.withTimestamp(jobStartTime)
.withRunType(runType)
.withStatus(AppRunRecord.Status.RUNNING)
.withScheduleInfo(jobApp.getAppSchedule())
.withConfig(JsonUtils.getMap(jobApp.getAppConfiguration()));
boolean update = false;
if (jobExecutionContext.isRecovering()) {
AppRunRecord latestRunRecord =
repository.getLatestExtensionById(
jobApp, AppRunRecord.class, AppExtension.ExtensionType.STATUS);
if (latestRunRecord != null) {
runRecord = latestRunRecord;
}
update = true;
}
// Put the Context in the Job Data Map
dataMap.put(SCHEDULED_APP_RUN_EXTENSION, JsonUtils.pojoToJson(runRecord));
// Insert new Record Run
pushApplicationStatusUpdates(jobExecutionContext, runRecord, update);
} catch (Exception e) {
LOG.info("Error while setting up the job context", e);
}
}
@Override
public void jobExecutionVetoed(JobExecutionContext jobExecutionContext) {}
@Override
public void jobWasExecuted(
JobExecutionContext jobExecutionContext, JobExecutionException jobException) {
AppRunRecord runRecord =
JsonUtils.readOrConvertValue(
jobExecutionContext.getJobDetail().getJobDataMap().get(SCHEDULED_APP_RUN_EXTENSION),
AppRunRecord.class);
Object jobStats = jobExecutionContext.getJobDetail().getJobDataMap().get(APP_RUN_STATS);
long endTime = System.currentTimeMillis();
runRecord.withEndTime(endTime);
runRecord.setExecutionTime(endTime - runRecord.getStartTime());
if (jobException == null
&& !(runRecord.getStatus() == AppRunRecord.Status.FAILED
|| runRecord.getStatus() == AppRunRecord.Status.ACTIVE_ERROR)) {
runRecord.withStatus(AppRunRecord.Status.SUCCESS);
SuccessContext context = new SuccessContext();
if (runRecord.getSuccessContext() != null) {
context = runRecord.getSuccessContext();
}
context.getAdditionalProperties().put("stats", JsonUtils.getMap(jobStats));
runRecord.setSuccessContext(context);
} else {
runRecord.withStatus(AppRunRecord.Status.FAILED);
FailureContext context = new FailureContext();
if (runRecord.getFailureContext() != null) {
context = runRecord.getFailureContext();
}
if (jobException != null) {
Map<String, Object> failure = new HashMap<>();
failure.put("message", jobException.getMessage());
failure.put("jobStackTrace", ExceptionUtils.getStackTrace(jobException));
context.withAdditionalProperty("failure", failure);
}
runRecord.setFailureContext(context);
}
// Push Update on WebSocket
String webSocketChannelName =
(String) jobExecutionContext.getJobDetail().getJobDataMap().get(WEBSOCKET_STATUS_CHANNEL);
if (!nullOrEmpty(webSocketChannelName) && WebSocketManager.getInstance() != null) {
WebSocketManager.getInstance()
.broadCastMessageToAll(webSocketChannelName, JsonUtils.pojoToJson(runRecord));
}
// Update App Run Record
pushApplicationStatusUpdates(jobExecutionContext, runRecord, true);
}
public AppRunRecord getAppRunRecordForJob(JobExecutionContext context) {
JobDataMap dataMap = context.getJobDetail().getJobDataMap();
return JsonUtils.readOrConvertValue(
dataMap.get(SCHEDULED_APP_RUN_EXTENSION), AppRunRecord.class);
}
public void pushApplicationStatusUpdates(
JobExecutionContext context, AppRunRecord runRecord, boolean update) {
JobDataMap dataMap = context.getJobDetail().getJobDataMap();
if (dataMap.containsKey(SCHEDULED_APP_RUN_EXTENSION)) {
// Update the Run Record in Data Map
dataMap.put(SCHEDULED_APP_RUN_EXTENSION, JsonUtils.pojoToJson(runRecord));
// Push Updates to the Database
String appName = (String) context.getJobDetail().getJobDataMap().get(APP_NAME);
UUID appId = repository.findByName(appName, Include.NON_DELETED).getId();
if (update) {
repository.updateAppStatus(appId, runRecord);
} else {
repository.addAppStatus(runRecord);
}
}
}
}

View File

@ -37,14 +37,20 @@ import org.openmetadata.schema.entity.events.FailedEventResponse;
import org.openmetadata.schema.entity.events.SubscriptionDestination;
import org.openmetadata.schema.entity.events.SubscriptionStatus;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.sdk.PipelineServiceClientInterface;
import org.openmetadata.service.Entity;
import org.openmetadata.service.OpenMetadataApplicationConfig;
import org.openmetadata.service.apps.bundles.changeEvent.AbstractEventConsumer;
import org.openmetadata.service.apps.bundles.changeEvent.AlertPublisher;
import org.openmetadata.service.clients.pipeline.PipelineServiceClientFactory;
import org.openmetadata.service.events.subscription.AlertUtil;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.jdbi3.EventSubscriptionRepository;
import org.openmetadata.service.resources.events.subscription.TypedEvent;
import org.openmetadata.service.util.DIContainer;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.OpenMetadataConnectionBuilder;
import org.quartz.Job;
import org.quartz.JobBuilder;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
@ -56,6 +62,8 @@ import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.TriggerKey;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.spi.JobFactory;
import org.quartz.spi.TriggerFiredBundle;
@Slf4j
public class EventSubscriptionScheduler {
@ -63,24 +71,53 @@ public class EventSubscriptionScheduler {
public static final String ALERT_TRIGGER_GROUP = "OMAlertJobGroup";
private static EventSubscriptionScheduler instance;
private static volatile boolean initialized = false;
private final Scheduler alertsScheduler = new StdSchedulerFactory().getScheduler();
private EventSubscriptionScheduler() throws SchedulerException {
private record CustomJobFactory(DIContainer di) implements JobFactory {
@Override
public Job newJob(TriggerFiredBundle bundle, Scheduler scheduler) throws SchedulerException {
try {
JobDetail jobDetail = bundle.getJobDetail();
Class<? extends Job> jobClass = jobDetail.getJobClass();
Job job = jobClass.getDeclaredConstructor(DIContainer.class).newInstance(di);
return job;
} catch (Exception e) {
throw new SchedulerException("Failed to create job instance", e);
}
}
}
private EventSubscriptionScheduler(
PipelineServiceClientInterface pipelineServiceClient,
OpenMetadataConnectionBuilder openMetadataConnectionBuilder)
throws SchedulerException {
DIContainer di = new DIContainer();
di.registerResource(PipelineServiceClientInterface.class, pipelineServiceClient);
di.registerResource(OpenMetadataConnectionBuilder.class, openMetadataConnectionBuilder);
this.alertsScheduler.setJobFactory(new CustomJobFactory(di));
this.alertsScheduler.start();
}
@SneakyThrows
public static EventSubscriptionScheduler getInstance() {
if (!initialized) {
initialize();
throw new RuntimeException("Event Subscription Scheduler is not initialized");
}
return instance;
}
private static void initialize() throws SchedulerException {
public static void initialize(OpenMetadataApplicationConfig openMetadataApplicationConfig) {
if (!initialized) {
instance = new EventSubscriptionScheduler();
try {
instance =
new EventSubscriptionScheduler(
PipelineServiceClientFactory.createPipelineServiceClient(
openMetadataApplicationConfig.getPipelineServiceClientConfiguration()),
new OpenMetadataConnectionBuilder(openMetadataApplicationConfig));
} catch (SchedulerException e) {
throw new RuntimeException("Failed to initialize Event Subscription Scheduler", e);
}
initialized = true;
} else {
LOG.info("Event Subscription Scheduler is already initialized");
@ -101,7 +138,10 @@ public class EventSubscriptionScheduler {
Optional.ofNullable(eventSubscription.getClassName())
.orElse(defaultClass.getCanonicalName()))
.asSubclass(AbstractEventConsumer.class);
AbstractEventConsumer publisher = clazz.getDeclaredConstructor().newInstance();
// we can use an empty dependency container here because when initializing
// the consumer because it does need to access any state
AbstractEventConsumer publisher =
clazz.getDeclaredConstructor(DIContainer.class).newInstance(new DIContainer());
if (reinstall && isSubscriptionRegistered(eventSubscription)) {
deleteEventSubscriptionPublisher(eventSubscription);
}

View File

@ -382,7 +382,15 @@ public class AppRepository extends EntityRepository<App> {
return new AppRepository.AppUpdater(original, updated, operation);
}
public void addEventSubscription(App app, EventSubscription eventSubscription) {
public App addEventSubscription(App app, EventSubscription eventSubscription) {
EntityReference existing =
listOrEmpty(app.getEventSubscriptions()).stream()
.filter(e -> e.getId().equals(eventSubscription.getId()))
.findFirst()
.orElse(null);
if (existing != null) {
return app;
}
addRelationship(
app.getId(),
eventSubscription.getId(),
@ -393,7 +401,39 @@ public class AppRepository extends EntityRepository<App> {
newSubs.add(eventSubscription.getEntityReference());
App updated = JsonUtils.deepCopy(app, App.class).withEventSubscriptions(newSubs);
updated.setOpenMetadataServerConnection(null);
getUpdater(app, updated, EntityRepository.Operation.PATCH).update();
getUpdater(app, updated, EntityRepository.Operation.PUT).update();
return updated;
}
public App deleteEventSubscription(App app, UUID eventSubscriptionId) {
deleteRelationship(
app.getId(),
Entity.APPLICATION,
eventSubscriptionId,
Entity.EVENT_SUBSCRIPTION,
Relationship.CONTAINS);
List<EntityReference> newSubs = new ArrayList<>(listOrEmpty(app.getEventSubscriptions()));
newSubs.removeIf(sub -> sub.getId().equals(eventSubscriptionId));
App updated = JsonUtils.deepCopy(app, App.class).withEventSubscriptions(newSubs);
updated.setOpenMetadataServerConnection(null);
getUpdater(app, updated, EntityRepository.Operation.PUT).update();
return updated;
}
public void updateAppStatus(UUID appID, AppRunRecord record) {
daoCollection
.appExtensionTimeSeriesDao()
.update(
appID.toString(),
JsonUtils.pojoToJson(record),
record.getTimestamp(),
AppExtension.ExtensionType.STATUS.toString());
}
public void addAppStatus(AppRunRecord record) {
daoCollection
.appExtensionTimeSeriesDao()
.insert(JsonUtils.pojoToJson(record), AppExtension.ExtensionType.STATUS.toString());
}
public class AppUpdater extends EntityUpdater {

View File

@ -175,6 +175,7 @@ public class EventSubscriptionRepository extends EntityRepository<EventSubscript
objectMatch,
false);
recordChange("trigger", original.getTrigger(), updated.getTrigger(), true);
recordChange("config", original.getConfig(), updated.getConfig(), true);
}
}
}

View File

@ -2,22 +2,12 @@ package org.openmetadata.service.resources.automations;
import org.openmetadata.schema.entity.automations.CreateWorkflow;
import org.openmetadata.schema.entity.automations.Workflow;
import org.openmetadata.schema.services.connections.metadata.OpenMetadataConnection;
import org.openmetadata.service.OpenMetadataApplicationConfig;
import org.openmetadata.service.mapper.EntityMapper;
import org.openmetadata.service.util.OpenMetadataConnectionBuilder;
public class WorkflowMapper implements EntityMapper<Workflow, CreateWorkflow> {
private final OpenMetadataApplicationConfig openMetadataApplicationConfig;
public WorkflowMapper(OpenMetadataApplicationConfig config) {
this.openMetadataApplicationConfig = config;
}
@Override
public Workflow createToEntity(CreateWorkflow create, String user) {
OpenMetadataConnection openMetadataServerConnection =
new OpenMetadataConnectionBuilder(openMetadataApplicationConfig).build();
return copy(new Workflow(), create, user)
.withDescription(create.getDescription())
.withRequest(create.getRequest())
@ -25,7 +15,6 @@ public class WorkflowMapper implements EntityMapper<Workflow, CreateWorkflow> {
.withDisplayName(create.getDisplayName())
.withResponse(create.getResponse())
.withStatus(create.getStatus())
.withOpenMetadataServerConnection(openMetadataServerConnection)
.withName(create.getName());
}
}

View File

@ -85,7 +85,7 @@ public class WorkflowResource extends EntityResource<Workflow, WorkflowRepositor
static final String FIELDS = "owners";
private WorkflowMapper mapper;
private PipelineServiceClientInterface pipelineServiceClient;
private OpenMetadataApplicationConfig openMetadataApplicationConfig;
private OpenMetadataConnectionBuilder openMetadataConnectionBuilder;
public WorkflowResource(Authorizer authorizer, Limits limits) {
super(Entity.WORKFLOW, authorizer, limits);
@ -93,11 +93,11 @@ public class WorkflowResource extends EntityResource<Workflow, WorkflowRepositor
@Override
public void initialize(OpenMetadataApplicationConfig config) {
this.openMetadataApplicationConfig = config;
this.mapper = new WorkflowMapper(config);
this.mapper = new WorkflowMapper();
this.pipelineServiceClient =
PipelineServiceClientFactory.createPipelineServiceClient(
config.getPipelineServiceClientConfiguration());
openMetadataConnectionBuilder = new OpenMetadataConnectionBuilder(config);
}
public static class WorkflowList extends ResultList<Workflow> {
@ -359,8 +359,7 @@ public class WorkflowResource extends EntityResource<Workflow, WorkflowRepositor
@Context SecurityContext securityContext) {
EntityUtil.Fields fields = getFields(FIELD_OWNERS);
Workflow workflow = repository.get(uriInfo, id, fields);
workflow.setOpenMetadataServerConnection(
new OpenMetadataConnectionBuilder(openMetadataApplicationConfig).build());
workflow.setOpenMetadataServerConnection(openMetadataConnectionBuilder.build());
/*
We will send the encrypted Workflow to the Pipeline Service Client
It will be fetched from the API from there, since we are
@ -571,8 +570,7 @@ public class WorkflowResource extends EntityResource<Workflow, WorkflowRepositor
return workflowConverted;
}
Workflow workflowDecrypted = secretsManager.decryptWorkflow(workflow);
OpenMetadataConnection openMetadataServerConnection =
new OpenMetadataConnectionBuilder(openMetadataApplicationConfig).build();
OpenMetadataConnection openMetadataServerConnection = openMetadataConnectionBuilder.build();
workflowDecrypted.setOpenMetadataServerConnection(
secretsManager.encryptOpenMetadataConnection(openMetadataServerConnection, false));
if (authorizer.shouldMaskPasswords(securityContext)) {

View File

@ -36,7 +36,8 @@ public class EventSubscriptionMapper
.withClassName(
validateConsumerClass(
Optional.ofNullable(create.getClassName())
.orElse(AlertPublisher.class.getCanonicalName())));
.orElse(AlertPublisher.class.getCanonicalName())))
.withConfig(create.getConfig());
}
private String validateConsumerClass(String className) {

View File

@ -102,7 +102,7 @@ import org.quartz.SchedulerException;
"The `Events` are changes to metadata and are sent when entities are created, modified, or updated. External systems can subscribe to events using event subscription API over Webhooks, Slack, or Microsoft Teams.")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@Collection(name = "events/subscriptions")
@Collection(name = "events/subscriptions", order = 7) // needs to initialize before applications
public class EventSubscriptionResource
extends EntityResource<EventSubscription, EventSubscriptionRepository> {
public static final String COLLECTION_PATH = "/v1/events/subscriptions";
@ -132,6 +132,7 @@ public class EventSubscriptionResource
@Override
public void initialize(OpenMetadataApplicationConfig config) {
try {
EventSubscriptionScheduler.initialize(config);
EventsSubscriptionRegistry.initialize(
listOrEmpty(EventSubscriptionResource.getNotificationsFilterDescriptors()),
listOrEmpty(EventSubscriptionResource.getObservabilityFilterDescriptors()));

View File

@ -0,0 +1,35 @@
package org.openmetadata.service.util;
import java.util.HashMap;
import java.util.Map;
/**
* Dependency Injection container to manage resources. This is a simple implementation and does not
* support any lifecycle management. It allows registering resources and retrieving them by a
* a consuming class. Consuming class might use a subset of the resources registered (or none at all)
* Example:
*
* <pre>
* DIContainer container = new DIContainer();
* container.registerResource(MyService.class, new MyService());
* MyService service = container.getResource(MyService.class);
* service.doSomething();
* </pre>
*
* TODO - Add support for lifecycle management.
*/
public class DIContainer {
private final Map<Class<?>, Object> resources = new HashMap<>();
public <T> void registerResource(Class<T> resourceClass, T resource) {
resources.put(resourceClass, resource);
}
public <T> T getResource(Class<T> resourceClass) {
T resource = resourceClass.cast(resources.get(resourceClass));
if (resource == null) {
throw new IllegalStateException("Resource not initialized: " + resourceClass.getName());
}
return resource;
}
}

View File

@ -68,6 +68,10 @@
"config": {
"descripton": "The configuration used for this application run. It's type will be based on the application type. Old runs might not be compatible with schema of app configuration.",
"$ref": "../../type/basic.json#/definitions/map"
},
"properties": {
"description": "Arbitrary metadata that will be attached to the report.",
"$ref": "../../type/basic.json#/definitions/map"
}
},
"additionalProperties": false

View File

@ -79,6 +79,9 @@
"domain" : {
"description": "Fully qualified name of the domain the Table belongs to.",
"type": "string"
},
"config": {
"$ref": "../../type/basic.json#/definitions/map"
}
},
"required": ["name", "alertType"],

View File

@ -78,7 +78,7 @@
"alertType": {
"description": "Type of Alerts supported.",
"type": "string",
"enum": ["Notification", "Observability", "ActivityFeed", "GovernanceWorkflowChangeEvent"],
"enum": ["Notification", "Observability", "ActivityFeed", "GovernanceWorkflowChangeEvent", "Custom"],
"default": "Notification"
},
"subscriptionCategory": {
@ -328,6 +328,9 @@
"domain" : {
"description": "Domain the asset belongs to. When not set, the asset inherits the domain from the parent it belongs to.",
"$ref": "../type/entityReference.json"
},
"config": {
"$ref": "../type/basic.json#/definitions/map"
}
},
"required": ["id", "name", "alertType", "destinations"],

View File

@ -84,4 +84,5 @@ export enum EntityReferenceFields {
TIER = 'tier.tagFQN',
TABLE_TYPE = 'tableType',
EXTENSION = 'extension',
SERVICE = 'service.fullyQualifiedName',
}

View File

@ -39,6 +39,10 @@ export interface AppRunRecord {
* Failure Context for the Application.
*/
failureContext?: { [key: string]: any };
/**
* Arbitrary metadata that will be attached to the report.
*/
properties?: { [key: string]: any };
/**
* This schema defines the type of application Run.
*/

View File

@ -26,6 +26,7 @@ export interface CreateEventSubscription {
* Consumer Class for the Event Subscription. Will use 'AlertPublisher' if not provided.
*/
className?: string;
config?: { [key: string]: any };
/**
* A short description of the Alert, comprehensible to regular users.
*/

View File

@ -31,6 +31,7 @@ export interface EventSubscription {
* Java class for the Event Subscription.
*/
className?: string;
config?: { [key: string]: any };
/**
* A short description of the Event Subscription, comprehensible to regular users.
*/
@ -116,6 +117,7 @@ export interface EventSubscription {
*/
export enum AlertType {
ActivityFeed = "ActivityFeed",
Custom = "Custom",
GovernanceWorkflowChangeEvent = "GovernanceWorkflowChangeEvent",
Notification = "Notification",
Observability = "Observability",

View File

@ -240,6 +240,19 @@ class JSONLogicSearchClassBase {
tierOptions?: Promise<AsyncFetchListValues>;
}) => {
return {
[EntityReferenceFields.SERVICE]: {
label: t('label.service'),
type: 'select',
mainWidgetProps: this.mainWidgetProps,
operators: this.defaultSelectOperators,
fieldSettings: {
asyncFetch: advancedSearchClassBase.autocomplete({
searchIndex: SearchIndex.ALL,
entityField: EntityFields.SERVICE,
}),
useAsyncSearch: true,
},
},
[EntityReferenceFields.OWNERS]: {
label: t('label.owner-plural'),
type: 'select',

View File

@ -33,6 +33,9 @@ import { generateUUID } from './StringsUtils';
export const JSONLOGIC_FIELDS_TO_IGNORE_SPLIT = [
EntityReferenceFields.EXTENSION,
EntityReferenceFields.SERVICE,
EntityReferenceFields.DATABASE,
EntityReferenceFields.DATABASE_SCHEMA,
];
const resolveFieldType = (
@ -534,9 +537,15 @@ export const elasticsearchToJsonLogic = (
if (field.includes('.')) {
const [parentField, childField] = field.split('.');
return JSONLOGIC_FIELDS_TO_IGNORE_SPLIT.includes(
parentField as EntityReferenceFields
)
const shouldIgnoreSplit =
JSONLOGIC_FIELDS_TO_IGNORE_SPLIT.includes(
parentField as EntityReferenceFields
) ||
JSONLOGIC_FIELDS_TO_IGNORE_SPLIT.includes(
field as EntityReferenceFields
);
return shouldIgnoreSplit
? { '==': [{ var: field }, value] }
: {
some: [