MINOR: feat(apps): support event subscriptions (#19338)

* feat(apps): support event subscriptions

- added support for apps with event subscriptions.
- added support for custom consumers on event subscriptions.
- use native application methods instead of reflection in ApplicationHandler

* removed runMethodFromApplication reflection and use concrete methods from AbstractNativeApplication

* format
This commit is contained in:
Imri Paran 2025-01-21 08:10:26 +02:00 committed by GitHub
parent 00a37c6180
commit 3abae45403
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 1335 additions and 108 deletions

View File

@ -1,5 +1,6 @@
package org.openmetadata.service.apps;
import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
import static org.openmetadata.service.apps.scheduler.AppScheduler.APPS_JOB_GROUP;
import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_INFO_KEY;
import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_NAME;
@ -7,18 +8,29 @@ import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_NAME;
import io.dropwizard.configuration.ConfigurationException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.ws.rs.core.Response;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.api.configuration.apps.AppPrivateConfig;
import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.entity.app.AppMarketPlaceDefinition;
import org.openmetadata.schema.entity.events.EventSubscription;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.Include;
import org.openmetadata.service.OpenMetadataApplicationConfig;
import org.openmetadata.service.apps.scheduler.AppScheduler;
import org.openmetadata.service.exception.UnhandledServerException;
import org.openmetadata.service.events.scheduled.EventSubscriptionScheduler;
import org.openmetadata.service.exception.EntityNotFoundException;
import org.openmetadata.service.jdbi3.AppMarketPlaceRepository;
import org.openmetadata.service.jdbi3.AppRepository;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.jdbi3.EventSubscriptionRepository;
import org.openmetadata.service.resources.events.subscription.EventSubscriptionMapper;
import org.openmetadata.service.search.SearchRepository;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.OpenMetadataConnectionBuilder;
@ -34,11 +46,15 @@ public class ApplicationHandler {
@Getter private static ApplicationHandler instance;
private final OpenMetadataApplicationConfig config;
private final AppRepository appRepository;
private final AppMarketPlaceRepository appMarketPlaceRepository;
private final EventSubscriptionRepository eventSubscriptionRepository;
private final ConfigurationReader configReader = new ConfigurationReader();
private ApplicationHandler(OpenMetadataApplicationConfig config) {
this.config = config;
this.appRepository = new AppRepository();
this.appMarketPlaceRepository = new AppMarketPlaceRepository();
this.eventSubscriptionRepository = new EventSubscriptionRepository();
}
public static void initialize(OpenMetadataApplicationConfig config) {
@ -80,25 +96,120 @@ public class ApplicationHandler {
public void triggerApplicationOnDemand(
App app, CollectionDAO daoCollection, SearchRepository searchRepository) {
runMethodFromApplication(app, daoCollection, searchRepository, "triggerOnDemand");
try {
runAppInit(app, daoCollection, searchRepository).triggerOnDemand();
} catch (ClassNotFoundException
| NoSuchMethodException
| InvocationTargetException
| InstantiationException
| IllegalAccessException e) {
LOG.error("Failed to install application {}", app.getName(), e);
throw AppException.byMessage(
app.getName(), "triggerOnDemand", e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR);
}
}
public void installApplication(
App app, CollectionDAO daoCollection, SearchRepository searchRepository) {
runMethodFromApplication(app, daoCollection, searchRepository, "install");
App app, CollectionDAO daoCollection, SearchRepository searchRepository, String installedBy) {
try {
runAppInit(app, daoCollection, searchRepository).install();
installEventSubscriptions(app, installedBy);
} catch (ClassNotFoundException
| NoSuchMethodException
| InvocationTargetException
| InstantiationException
| IllegalAccessException e) {
LOG.error("Failed to install application {}", app.getName(), e);
throw AppException.byMessage(
app.getName(), "install", e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR);
}
}
private void installEventSubscriptions(App app, String installedBy) {
AppMarketPlaceDefinition definition = appMarketPlaceRepository.getDefinition(app);
Map<String, EntityReference> eventSubscriptionsReferences =
listOrEmpty(app.getEventSubscriptions()).stream()
.collect(Collectors.toMap(EntityReference::getName, e -> e));
definition.getEventSubscriptions().stream()
.map(
request ->
Optional.ofNullable(eventSubscriptionsReferences.get(request.getName()))
.flatMap(
sub ->
Optional.ofNullable(
eventSubscriptionRepository.findByNameOrNull(
sub.getName(), Include.ALL)))
.orElseGet(
() -> {
EventSubscription createdEventSub =
eventSubscriptionRepository.create(
null,
// TODO need to get the actual user
new EventSubscriptionMapper()
.createToEntity(request, installedBy));
appRepository.addEventSubscription(app, createdEventSub);
return createdEventSub;
}))
.forEach(
eventSub -> {
try {
EventSubscriptionScheduler.getInstance().addSubscriptionPublisher(eventSub, true);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
public void configureApplication(
App app, CollectionDAO daoCollection, SearchRepository searchRepository) {
runMethodFromApplication(app, daoCollection, searchRepository, "configure");
try {
runAppInit(app, daoCollection, searchRepository).configure();
} catch (ClassNotFoundException
| NoSuchMethodException
| InvocationTargetException
| InstantiationException
| IllegalAccessException e) {
LOG.error("Failed to configure application {}", app.getName(), e);
throw AppException.byMessage(
app.getName(), "configure", e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR);
}
}
public void performCleanup(
App app, CollectionDAO daoCollection, SearchRepository searchRepository) {
runMethodFromApplication(app, daoCollection, searchRepository, "cleanup");
App app, CollectionDAO daoCollection, SearchRepository searchRepository, String deletedBy) {
try {
runAppInit(app, daoCollection, searchRepository).cleanup();
} catch (ClassNotFoundException
| NoSuchMethodException
| InvocationTargetException
| InstantiationException
| IllegalAccessException e) {
throw new RuntimeException(e);
}
deleteEventSubscriptions(app, deletedBy);
}
public Object runAppInit(App app, CollectionDAO daoCollection, SearchRepository searchRepository)
private void deleteEventSubscriptions(App app, String deletedBy) {
listOrEmpty(app.getEventSubscriptions())
.forEach(
eventSubscriptionReference -> {
try {
EventSubscription eventSub =
eventSubscriptionRepository.find(
eventSubscriptionReference.getId(), Include.ALL);
EventSubscriptionScheduler.getInstance().deleteEventSubscriptionPublisher(eventSub);
eventSubscriptionRepository.delete(deletedBy, eventSub.getId(), false, true);
} catch (EntityNotFoundException e) {
LOG.debug("Event subscription {} not found", eventSubscriptionReference.getId());
} catch (SchedulerException e) {
throw new RuntimeException(e);
}
});
}
public AbstractNativeApplication runAppInit(
App app, CollectionDAO daoCollection, SearchRepository searchRepository)
throws ClassNotFoundException,
NoSuchMethodException,
InvocationTargetException,
@ -106,47 +217,21 @@ public class ApplicationHandler {
IllegalAccessException {
// add private runtime properties
setAppRuntimeProperties(app);
Class<?> clz = Class.forName(app.getClassName());
Object resource =
Class<? extends AbstractNativeApplication> clz =
Class.forName(app.getClassName()).asSubclass(AbstractNativeApplication.class);
AbstractNativeApplication resource =
clz.getDeclaredConstructor(CollectionDAO.class, SearchRepository.class)
.newInstance(daoCollection, searchRepository);
// Raise preview message if the app is in Preview mode
if (Boolean.TRUE.equals(app.getPreview())) {
Method preview = resource.getClass().getMethod("raisePreviewMessage", App.class);
preview.invoke(resource, app);
resource.raisePreviewMessage(app);
}
// Call init Method
Method initMethod = resource.getClass().getMethod("init", App.class);
initMethod.invoke(resource, app);
resource.init(app);
return resource;
}
/**
* Load an App from its className and call its methods dynamically
*/
public void runMethodFromApplication(
App app, CollectionDAO daoCollection, SearchRepository searchRepository, String methodName) {
// Native Application
setAppRuntimeProperties(app);
try {
Object resource = runAppInit(app, daoCollection, searchRepository);
// Call method on demand
Method scheduleMethod = resource.getClass().getMethod(methodName);
scheduleMethod.invoke(resource);
} catch (NoSuchMethodException | InstantiationException | IllegalAccessException e) {
LOG.error("Exception encountered", e);
throw new UnhandledServerException(e.getMessage());
} catch (ClassNotFoundException e) {
throw new UnhandledServerException(e.getMessage());
} catch (InvocationTargetException e) {
throw AppException.byMessage(app.getName(), methodName, e.getTargetException().getMessage());
}
}
public void migrateQuartzConfig(App application) throws SchedulerException {
JobDetail jobDetails =
AppScheduler.getInstance()

View File

@ -17,6 +17,7 @@ import static org.openmetadata.service.apps.bundles.changeEvent.AbstractEventCon
import static org.openmetadata.service.apps.bundles.changeEvent.AbstractEventConsumer.ALERT_OFFSET_KEY;
import static org.openmetadata.service.events.subscription.AlertUtil.getStartingOffset;
import java.lang.reflect.InvocationTargetException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
@ -37,6 +38,7 @@ import org.openmetadata.schema.entity.events.SubscriptionDestination;
import org.openmetadata.schema.entity.events.SubscriptionStatus;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.service.Entity;
import org.openmetadata.service.apps.bundles.changeEvent.AbstractEventConsumer;
import org.openmetadata.service.apps.bundles.changeEvent.AlertPublisher;
import org.openmetadata.service.events.subscription.AlertUtil;
import org.openmetadata.service.jdbi3.EntityRepository;
@ -87,9 +89,23 @@ public class EventSubscriptionScheduler {
}
@Transaction
public void addSubscriptionPublisher(EventSubscription eventSubscription)
throws SchedulerException {
AlertPublisher alertPublisher = new AlertPublisher();
public void addSubscriptionPublisher(EventSubscription eventSubscription, boolean reinstall)
throws SchedulerException,
ClassNotFoundException,
NoSuchMethodException,
InvocationTargetException,
InstantiationException,
IllegalAccessException {
Class<? extends AbstractEventConsumer> defaultClass = AlertPublisher.class;
Class<? extends AbstractEventConsumer> clazz =
Class.forName(
Optional.ofNullable(eventSubscription.getClassName())
.orElse(defaultClass.getCanonicalName()))
.asSubclass(AbstractEventConsumer.class);
AbstractEventConsumer publisher = clazz.getDeclaredConstructor().newInstance();
if (reinstall && isSubscriptionRegistered(eventSubscription)) {
deleteEventSubscriptionPublisher(eventSubscription);
}
if (Boolean.FALSE.equals(
eventSubscription.getEnabled())) { // Only add webhook that is enabled for publishing events
eventSubscription
@ -111,7 +127,7 @@ public class EventSubscriptionScheduler {
getSubscriptionStatusAtCurrentTime(SubscriptionStatus.Status.ACTIVE)));
JobDetail jobDetail =
jobBuilder(
alertPublisher,
publisher,
eventSubscription,
String.format("%s", eventSubscription.getId().toString()));
Trigger trigger = trigger(eventSubscription);
@ -127,8 +143,17 @@ public class EventSubscriptionScheduler {
}
}
public boolean isSubscriptionRegistered(EventSubscription eventSubscription) {
try {
return alertsScheduler.checkExists(getJobKey(eventSubscription));
} catch (SchedulerException e) {
LOG.error("Failed to check if subscription is registered: {}", eventSubscription.getId(), e);
return false;
}
}
private JobDetail jobBuilder(
AlertPublisher publisher, EventSubscription eventSubscription, String jobIdentity) {
AbstractEventConsumer publisher, EventSubscription eventSubscription, String jobIdentity) {
JobDataMap dataMap = new JobDataMap();
dataMap.put(ALERT_INFO_KEY, eventSubscription);
dataMap.put(ALERT_OFFSET_KEY, getStartingOffset(eventSubscription.getId()));
@ -158,7 +183,7 @@ public class EventSubscriptionScheduler {
// Remove Existing Subscription Publisher
deleteEventSubscriptionPublisher(eventSubscription);
if (Boolean.TRUE.equals(eventSubscription.getEnabled())) {
addSubscriptionPublisher(eventSubscription);
addSubscriptionPublisher(eventSubscription, true);
}
}
@ -536,6 +561,14 @@ public class EventSubscriptionScheduler {
return Entity.getCollectionDAO().changeEventDAO().recordExists(id.toString()) > 0;
}
public static JobKey getJobKey(EventSubscription eventSubscription) {
return getJobKey(eventSubscription.getId());
}
private static JobKey getJobKey(UUID subscriptionId) {
return new JobKey(subscriptionId.toString(), ALERT_JOB_GROUP);
}
public static void shutDown() throws SchedulerException {
LOG.info("Shutting Down Event Subscription Scheduler");
if (instance != null) {

View File

@ -1,6 +1,8 @@
package org.openmetadata.service.jdbi3;
import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.entity.app.AppMarketPlaceDefinition;
import org.openmetadata.schema.type.Include;
import org.openmetadata.service.Entity;
import org.openmetadata.service.resources.apps.AppMarketPlaceResource;
import org.openmetadata.service.util.EntityUtil;
@ -24,6 +26,10 @@ public class AppMarketPlaceRepository extends EntityRepository<AppMarketPlaceDef
/* Nothing to do */
}
public AppMarketPlaceDefinition getDefinition(App app) {
return findByName(app.getName(), Include.NON_DELETED);
}
@Override
public void clearFields(AppMarketPlaceDefinition entity, EntityUtil.Fields fields) {
/* Nothing to do */

View File

@ -1,5 +1,6 @@
package org.openmetadata.service.jdbi3;
import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import static org.openmetadata.service.util.UserUtil.getUser;
@ -14,6 +15,7 @@ import org.openmetadata.schema.entity.Bot;
import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.entity.app.AppExtension;
import org.openmetadata.schema.entity.app.AppRunRecord;
import org.openmetadata.schema.entity.events.EventSubscription;
import org.openmetadata.schema.entity.teams.AuthenticationMechanism;
import org.openmetadata.schema.entity.teams.User;
import org.openmetadata.schema.type.EntityReference;
@ -380,6 +382,20 @@ public class AppRepository extends EntityRepository<App> {
return new AppRepository.AppUpdater(original, updated, operation);
}
public void addEventSubscription(App app, EventSubscription eventSubscription) {
addRelationship(
app.getId(),
eventSubscription.getId(),
Entity.APPLICATION,
Entity.EVENT_SUBSCRIPTION,
Relationship.CONTAINS);
List<EntityReference> newSubs = new ArrayList<>(listOrEmpty(app.getEventSubscriptions()));
newSubs.add(eventSubscription.getEntityReference());
App updated = JsonUtils.deepCopy(app, App.class).withEventSubscriptions(newSubs);
updated.setOpenMetadataServerConnection(null);
getUpdater(app, updated, EntityRepository.Operation.PATCH).update();
}
public class AppUpdater extends EntityUpdater {
public AppUpdater(App original, App updated, Operation operation) {
super(original, updated, operation);
@ -391,6 +407,8 @@ public class AppRepository extends EntityRepository<App> {
"appConfiguration", original.getAppConfiguration(), updated.getAppConfiguration());
recordChange("appSchedule", original.getAppSchedule(), updated.getAppSchedule());
recordChange("bot", original.getBot(), updated.getBot());
recordChange(
"eventSubscriptions", original.getEventSubscriptions(), updated.getEventSubscriptions());
}
}
}

View File

@ -54,7 +54,8 @@ public class AppMapper implements EntityMapper<App, CreateApp> {
.withSourcePythonClass(marketPlaceDefinition.getSourcePythonClass())
.withAllowConfiguration(marketPlaceDefinition.getAllowConfiguration())
.withSystem(marketPlaceDefinition.getSystem())
.withSupportsInterrupt(marketPlaceDefinition.getSupportsInterrupt());
.withSupportsInterrupt(marketPlaceDefinition.getSupportsInterrupt())
.withFullyQualifiedName(marketPlaceDefinition.getFullyQualifiedName());
// validate Bot if provided
validateAndAddBot(app, createAppRequest.getBot());

View File

@ -38,7 +38,8 @@ public class AppMarketPlaceMapper
.withSourcePythonClass(create.getSourcePythonClass())
.withAllowConfiguration(create.getAllowConfiguration())
.withSystem(create.getSystem())
.withSupportsInterrupt(create.getSupportsInterrupt());
.withSupportsInterrupt(create.getSupportsInterrupt())
.withEventSubscriptions(create.getEventSubscriptions());
// Validate App
validateApplication(app);

View File

@ -2,6 +2,7 @@ package org.openmetadata.service.resources.apps;
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import static org.openmetadata.schema.type.Include.ALL;
import static org.openmetadata.service.Entity.ADMIN_USER_NAME;
import static org.openmetadata.service.Entity.APPLICATION;
import static org.openmetadata.service.Entity.FIELD_OWNERS;
import static org.openmetadata.service.jdbi3.EntityRepository.getEntitiesFromSeedData;
@ -133,18 +134,19 @@ public class AppResource extends EntityResource<App, AppRepository> {
try {
App app = getAppForInit(createApp.getName());
if (app == null) {
app =
mapper.createToEntity(createApp, "admin").withFullyQualifiedName(createApp.getName());
app = mapper.createToEntity(createApp, ADMIN_USER_NAME);
repository.initializeEntity(app);
}
// Schedule
if (SCHEDULED_TYPES.contains(app.getScheduleType())) {
ApplicationHandler.getInstance()
.installApplication(app, Entity.getCollectionDAO(), searchRepository);
.installApplication(
app, Entity.getCollectionDAO(), searchRepository, ADMIN_USER_NAME);
}
} catch (Exception ex) {
LOG.error("Failed in Creation/Initialization of Application : {}", createApp.getName(), ex);
repository.deleteByName("admin", createApp.getName(), false, true);
}
}
}
@ -618,9 +620,11 @@ public class AppResource extends EntityResource<App, AppRepository> {
new OperationContext(Entity.APPLICATION, MetadataOperation.CREATE));
if (SCHEDULED_TYPES.contains(app.getScheduleType())) {
ApplicationHandler.getInstance()
.installApplication(app, Entity.getCollectionDAO(), searchRepository);
ApplicationHandler.getInstance()
.configureApplication(app, Entity.getCollectionDAO(), searchRepository);
.installApplication(
app,
Entity.getCollectionDAO(),
searchRepository,
securityContext.getUserPrincipal().getName());
}
// We don't want to store this information
unsetAppRuntimeProperties(app);
@ -663,7 +667,11 @@ public class AppResource extends EntityResource<App, AppRepository> {
App updatedApp = (App) response.getEntity();
if (SCHEDULED_TYPES.contains(app.getScheduleType())) {
ApplicationHandler.getInstance()
.installApplication(updatedApp, Entity.getCollectionDAO(), searchRepository);
.installApplication(
updatedApp,
Entity.getCollectionDAO(),
searchRepository,
securityContext.getUserPrincipal().getName());
}
// We don't want to store this information
unsetAppRuntimeProperties(updatedApp);
@ -707,7 +715,11 @@ public class AppResource extends EntityResource<App, AppRepository> {
App updatedApp = (App) response.getEntity();
if (SCHEDULED_TYPES.contains(app.getScheduleType())) {
ApplicationHandler.getInstance()
.installApplication(updatedApp, Entity.getCollectionDAO(), searchRepository);
.installApplication(
updatedApp,
Entity.getCollectionDAO(),
searchRepository,
securityContext.getUserPrincipal().getName());
}
// We don't want to store this information
unsetAppRuntimeProperties(updatedApp);
@ -735,7 +747,11 @@ public class AppResource extends EntityResource<App, AppRepository> {
AppScheduler.getInstance().deleteScheduledApplication(app);
if (SCHEDULED_TYPES.contains(app.getScheduleType())) {
ApplicationHandler.getInstance()
.installApplication(app, Entity.getCollectionDAO(), searchRepository);
.installApplication(
app,
Entity.getCollectionDAO(),
searchRepository,
securityContext.getUserPrincipal().getName());
}
// We don't want to store this information
unsetAppRuntimeProperties(app);
@ -773,7 +789,11 @@ public class AppResource extends EntityResource<App, AppRepository> {
}
ApplicationHandler.getInstance()
.performCleanup(app, Entity.getCollectionDAO(), searchRepository);
.performCleanup(
app,
Entity.getCollectionDAO(),
searchRepository,
securityContext.getUserPrincipal().getName());
limits.invalidateCache(entityType);
// Remove from Pipeline Service
@ -810,7 +830,11 @@ public class AppResource extends EntityResource<App, AppRepository> {
}
ApplicationHandler.getInstance()
.performCleanup(app, Entity.getCollectionDAO(), searchRepository);
.performCleanup(
app,
Entity.getCollectionDAO(),
searchRepository,
securityContext.getUserPrincipal().getName());
// Remove from Pipeline Service
deleteApp(securityContext, app);
@ -842,7 +866,11 @@ public class AppResource extends EntityResource<App, AppRepository> {
App app = (App) response.getEntity();
if (SCHEDULED_TYPES.contains(app.getScheduleType())) {
ApplicationHandler.getInstance()
.installApplication(app, Entity.getCollectionDAO(), searchRepository);
.installApplication(
app,
Entity.getCollectionDAO(),
searchRepository,
securityContext.getUserPrincipal().getName());
}
// We don't want to store this information
unsetAppRuntimeProperties(app);
@ -878,7 +906,12 @@ public class AppResource extends EntityResource<App, AppRepository> {
repository.getByName(uriInfo, name, new EntityUtil.Fields(repository.getAllowedFields()));
if (SCHEDULED_TYPES.contains(app.getScheduleType())) {
ApplicationHandler.getInstance()
.installApplication(app, repository.getDaoCollection(), searchRepository);
.installApplication(
app,
repository.getDaoCollection(),
searchRepository,
securityContext.getUserPrincipal().getName());
return Response.status(Response.Status.OK).entity("App is Scheduled.").build();
}
throw new IllegalArgumentException("App is not of schedule type Scheduled.");
@ -912,15 +945,9 @@ public class AppResource extends EntityResource<App, AppRepository> {
repository.getByName(uriInfo, name, new EntityUtil.Fields(repository.getAllowedFields()));
// The application will have the updated appConfiguration we can use to run the `configure`
// logic
try {
ApplicationHandler.getInstance()
.configureApplication(app, repository.getDaoCollection(), searchRepository);
return Response.status(Response.Status.OK).entity("App has been configured.").build();
} catch (RuntimeException e) {
return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
.entity(String.format("Error configuring app [%s]", e.getMessage()))
.build();
}
ApplicationHandler.getInstance()
.configureApplication(app, repository.getDaoCollection(), searchRepository);
return Response.status(Response.Status.OK).entity("App has been configured.").build();
}
@POST
@ -1029,7 +1056,11 @@ public class AppResource extends EntityResource<App, AppRepository> {
App app = repository.getByName(uriInfo, name, fields);
if (app.getAppType().equals(AppType.Internal)) {
ApplicationHandler.getInstance()
.installApplication(app, Entity.getCollectionDAO(), searchRepository);
.installApplication(
app,
Entity.getCollectionDAO(),
searchRepository,
securityContext.getUserPrincipal().getName());
return Response.status(Response.Status.OK).entity("Application Deployed").build();
} else {
if (!app.getPipelines().isEmpty()) {

View File

@ -6,10 +6,14 @@ import static org.openmetadata.service.fernet.Fernet.encryptWebhookSecretKey;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import javax.ws.rs.BadRequestException;
import org.openmetadata.schema.api.events.CreateEventSubscription;
import org.openmetadata.schema.entity.events.EventSubscription;
import org.openmetadata.schema.entity.events.SubscriptionDestination;
import org.openmetadata.service.apps.bundles.changeEvent.AbstractEventConsumer;
import org.openmetadata.service.apps.bundles.changeEvent.AlertPublisher;
import org.openmetadata.service.mapper.EntityMapper;
public class EventSubscriptionMapper
@ -28,7 +32,20 @@ public class EventSubscriptionMapper
.withProvider(create.getProvider())
.withRetries(create.getRetries())
.withPollInterval(create.getPollInterval())
.withInput(create.getInput());
.withInput(create.getInput())
.withClassName(
validateConsumerClass(
Optional.ofNullable(create.getClassName())
.orElse(AlertPublisher.class.getCanonicalName())));
}
private String validateConsumerClass(String className) {
try {
Class.forName(className).asSubclass(AbstractEventConsumer.class);
return className;
} catch (ClassNotFoundException e) {
throw new BadRequestException("Consumer class not found: " + className);
}
}
private List<SubscriptionDestination> getSubscriptions(

View File

@ -27,6 +27,7 @@ import io.swagger.v3.oas.annotations.parameters.RequestBody;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -143,19 +144,18 @@ public class EventSubscriptionResource
}
private void initializeEventSubscriptions() {
try {
CollectionDAO daoCollection = repository.getDaoCollection();
List<String> listAllEventsSubscriptions =
daoCollection.eventSubscriptionDAO().listAllEventsSubscriptions();
List<EventSubscription> eventSubList =
JsonUtils.readObjects(listAllEventsSubscriptions, EventSubscription.class);
for (EventSubscription subscription : eventSubList) {
EventSubscriptionScheduler.getInstance().addSubscriptionPublisher(subscription);
}
} catch (Exception ex) {
// Starting application should not fail
LOG.warn("Exception during initializeEventSubscriptions", ex);
}
CollectionDAO daoCollection = repository.getDaoCollection();
daoCollection.eventSubscriptionDAO().listAllEventsSubscriptions().stream()
.map(obj -> JsonUtils.readValue(obj, EventSubscription.class))
.forEach(
subscription -> {
try {
EventSubscriptionScheduler.getInstance()
.addSubscriptionPublisher(subscription, true);
} catch (Exception ex) {
LOG.error("Failed to initialize subscription: {}", subscription.getId(), ex);
}
});
}
@GET
@ -293,12 +293,17 @@ public class EventSubscriptionResource
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Valid CreateEventSubscription request)
throws SchedulerException {
throws SchedulerException,
ClassNotFoundException,
InvocationTargetException,
NoSuchMethodException,
InstantiationException,
IllegalAccessException {
EventSubscription eventSub =
mapper.createToEntity(request, securityContext.getUserPrincipal().getName());
// Only one Creation is allowed
Response response = create(uriInfo, securityContext, eventSub);
EventSubscriptionScheduler.getInstance().addSubscriptionPublisher(eventSub);
EventSubscriptionScheduler.getInstance().addSubscriptionPublisher(eventSub, false);
return response;
}

View File

@ -1,6 +1,8 @@
package org.openmetadata.service.resources.apps;
import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
import static javax.ws.rs.core.Response.Status.CREATED;
import static javax.ws.rs.core.Response.Status.NOT_FOUND;
import static javax.ws.rs.core.Response.Status.OK;
import static org.openmetadata.common.utils.CommonUtil.listOf;
import static org.openmetadata.schema.type.ColumnDataType.INT;
@ -18,6 +20,7 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
@ -27,6 +30,8 @@ import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.client.HttpResponseException;
import org.apache.http.util.EntityUtils;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.analytics.PageViewData;
@ -34,23 +39,33 @@ import org.openmetadata.schema.analytics.ReportData;
import org.openmetadata.schema.analytics.WebAnalyticEventData;
import org.openmetadata.schema.analytics.type.WebAnalyticEventType;
import org.openmetadata.schema.api.data.CreateTableProfile;
import org.openmetadata.schema.api.events.CreateEventSubscription;
import org.openmetadata.schema.api.services.CreateDatabaseService;
import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.entity.app.AppConfiguration;
import org.openmetadata.schema.entity.app.AppExtension;
import org.openmetadata.schema.entity.app.AppMarketPlaceDefinition;
import org.openmetadata.schema.entity.app.AppRunRecord;
import org.openmetadata.schema.entity.app.AppSchedule;
import org.openmetadata.schema.entity.app.AppType;
import org.openmetadata.schema.entity.app.CreateApp;
import org.openmetadata.schema.entity.app.CreateAppMarketPlaceDefinitionReq;
import org.openmetadata.schema.entity.app.NativeAppPermission;
import org.openmetadata.schema.entity.app.ScheduleTimeline;
import org.openmetadata.schema.entity.app.ScheduleType;
import org.openmetadata.schema.entity.app.ScheduledExecutionContext;
import org.openmetadata.schema.entity.data.Database;
import org.openmetadata.schema.entity.data.DatabaseSchema;
import org.openmetadata.schema.entity.data.Table;
import org.openmetadata.schema.entity.events.EventSubscription;
import org.openmetadata.schema.entity.services.DatabaseService;
import org.openmetadata.schema.entity.teams.User;
import org.openmetadata.schema.type.AccessDetails;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.schema.type.Column;
import org.openmetadata.schema.type.EventType;
import org.openmetadata.schema.type.LifeCycle;
import org.openmetadata.schema.type.ProviderType;
import org.openmetadata.schema.type.TableProfile;
import org.openmetadata.service.Entity;
import org.openmetadata.service.apps.bundles.insights.utils.TimestampUtils;
@ -62,6 +77,8 @@ import org.openmetadata.service.resources.EntityResourceTest;
import org.openmetadata.service.resources.databases.DatabaseResourceTest;
import org.openmetadata.service.resources.databases.DatabaseSchemaResourceTest;
import org.openmetadata.service.resources.databases.TableResourceTest;
import org.openmetadata.service.resources.events.BaseCallbackResource;
import org.openmetadata.service.resources.events.EventSubscriptionResourceTest;
import org.openmetadata.service.resources.services.DatabaseServiceResourceTest;
import org.openmetadata.service.resources.teams.UserResourceTest;
import org.openmetadata.service.security.SecurityUtil;
@ -384,11 +401,119 @@ public class AppsResourceTest extends EntityResourceTest<App, CreateApp> {
"App does not support manual trigger.");
}
@SneakyThrows
@Test
void app_with_event_subscription() {
String subscriptionName = "TestEventSubscription";
// register app in marketplace
EventSubscriptionResourceTest eventSubscriptionResourceTest =
new EventSubscriptionResourceTest();
CreateAppMarketPlaceDefinitionReq createRequest =
new CreateAppMarketPlaceDefinitionReq()
.withName("TestAppEventSubscription")
.withDisplayName("Test App Event Subscription")
.withDescription("A Test application with event subscriptions.")
.withFeatures("nothing really")
.withDeveloper("Collate Inc.")
.withDeveloperUrl("https://www.example.com")
.withPrivacyPolicyUrl("https://www.example.com/privacy")
.withSupportEmail("support@example.com")
.withClassName("org.openmetadata.service.resources.apps.TestApp")
.withAppType(AppType.Internal)
.withScheduleType(ScheduleType.Scheduled)
.withRuntime(new ScheduledExecutionContext().withEnabled(true))
.withAppConfiguration(new AppConfiguration())
.withPermission(NativeAppPermission.All)
.withEventSubscriptions(
List.of(
new CreateEventSubscription()
.withName(subscriptionName)
.withDisplayName("Test Event Subscription")
.withDescription(
"Consume EntityChange Events in order to trigger reverse metadata changes.")
.withAlertType(CreateEventSubscription.AlertType.NOTIFICATION)
.withResources(List.of("all"))
.withProvider(ProviderType.USER)
.withPollInterval(5)
.withEnabled(true)));
String endpoint =
"http://localhost:" + APP.getLocalPort() + "/api/v1/test/webhook/" + subscriptionName;
createRequest
.getEventSubscriptions()
.get(0)
.setDestinations(eventSubscriptionResourceTest.getWebhook(endpoint));
createAppMarketPlaceDefinition(createRequest, ADMIN_AUTH_HEADERS);
// install app
CreateApp installApp =
new CreateApp()
.withName(createRequest.getName())
.withAppConfiguration(new AppConfiguration());
createEntity(installApp, ADMIN_AUTH_HEADERS);
TestUtils.get(
getResource(String.format("events/subscriptions/name/%s", subscriptionName)),
EventSubscription.class,
ADMIN_AUTH_HEADERS);
// make change in the system
TableResourceTest tableResourceTest = new TableResourceTest();
Table table =
tableResourceTest.getEntityByName(TEST_TABLE1.getFullyQualifiedName(), ADMIN_AUTH_HEADERS);
Table updated = JsonUtils.deepCopy(table, Table.class);
updated.setDescription("Updated Description");
tableResourceTest.patchEntity(
table.getId(), JsonUtils.pojoToJson(table), updated, ADMIN_AUTH_HEADERS);
// assert webhook was called
Awaitility.await()
.timeout(
Duration.ofSeconds(createRequest.getEventSubscriptions().get(0).getPollInterval() + 10))
.untilAsserted(
() -> {
BaseCallbackResource.EventDetails<ChangeEvent> result =
webhookCallbackResource.getEventDetails(subscriptionName);
Assertions.assertNotNull(result);
Assertions.assertTrue(
result.getEvents().stream()
.anyMatch(
e ->
e.getEventType().equals(EventType.ENTITY_UPDATED)
&& e.getChangeDescription()
.getFieldsUpdated()
.get(0)
.getNewValue()
.equals("Updated Description")));
});
// uninstall app
deleteEntityByName(installApp.getName(), true, true, ADMIN_AUTH_HEADERS);
Table updated2 = JsonUtils.deepCopy(updated, Table.class);
updated2.setDescription("Updated Description 2");
tableResourceTest.patchEntity(
table.getId(), JsonUtils.pojoToJson(table), updated2, ADMIN_AUTH_HEADERS);
// assert event subscription was deleted
TestUtils.assertResponse(
() ->
TestUtils.get(
getResource(String.format("events/subscriptions/name/%s", subscriptionName)),
EventSubscription.class,
ADMIN_AUTH_HEADERS),
NOT_FOUND,
String.format("eventsubscription instance for %s not found", subscriptionName));
}
@Override
public void validateCreatedEntity(
App createdEntity, CreateApp request, Map<String, String> authHeaders)
throws HttpResponseException {}
public void createAppMarketPlaceDefinition(
CreateAppMarketPlaceDefinitionReq create, Map<String, String> authHeaders)
throws HttpResponseException {
WebTarget target = getResource("apps/marketplace");
TestUtils.post(
target, create, AppMarketPlaceDefinition.class, CREATED.getStatusCode(), authHeaders);
}
@Override
public void compareEntities(App expected, App updated, Map<String, String> authHeaders)
throws HttpResponseException {}

View File

@ -0,0 +1,11 @@
package org.openmetadata.service.resources.apps;
import org.openmetadata.service.apps.AbstractNativeApplication;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.search.SearchRepository;
public class TestApp extends AbstractNativeApplication {
public TestApp(CollectionDAO collectionDAO, SearchRepository searchRepository) {
super(collectionDAO, searchRepository);
}
}

View File

@ -141,7 +141,7 @@ public abstract class BaseCallbackResource<T> {
entityCallbackMap.clear();
}
static class EventDetails<T> {
public static class EventDetails<T> {
@Getter @Setter long firstEventTime;
@Getter @Setter long latestEventTime;
@Getter final ConcurrentLinkedQueue<T> events = new ConcurrentLinkedQueue<>();

View File

@ -255,6 +255,10 @@
"description": "If the app run can be interrupted as part of the execution.",
"type": "boolean",
"default": false
},
"eventSubscriptions": {
"description": "Event Subscriptions for the Application.",
"$ref": "../../type/entityReferenceList.json"
}
},
"additionalProperties": false,

View File

@ -149,6 +149,14 @@
"description": "If the app run can be interrupted as part of the execution.",
"type": "boolean",
"default": false
},
"eventSubscriptions": {
"description": "Event subscriptions that will be created when the application is installed.",
"type": "array",
"default": [],
"items": {
"$ref": "../../../events/api/createEventSubscription.json"
}
}
},
"additionalProperties": false,

View File

@ -111,6 +111,14 @@
"description": "If the app run can be interrupted as part of the execution.",
"type": "boolean",
"default": false
},
"eventSubscriptions": {
"description": "Event subscriptions that will be created when the application is installed.",
"type": "array",
"default": [],
"items": {
"$ref": "../../../events/api/createEventSubscription.json"
}
}
},
"additionalProperties": false,

View File

@ -11,6 +11,10 @@
"description": "Name that uniquely identifies this Alert.",
"$ref": "../../type/basic.json#/definitions/entityName"
},
"className": {
"description": "Consumer Class for the Event Subscription. Will use 'AlertPublisher' if not provided.",
"type": "string"
},
"displayName": {
"description": "Display name for this Alert.",
"type": "string"

View File

@ -155,6 +155,9 @@
},
{
"$ref": "./emailAlertConfig.json"
},
{
"$ref": "../type/basic.json#/definitions/map"
}
]
}
@ -231,6 +234,10 @@
"description": "Unique identifier that identifies this Event Subscription.",
"$ref": "../type/basic.json#/definitions/uuid"
},
"className": {
"description": "Java class for the Event Subscription.",
"type": "string"
},
"name": {
"description": "Name that uniquely identifies this Event Subscription.",
"$ref": "../type/basic.json#/definitions/entityName"

View File

@ -173,6 +173,12 @@
".{1,}": { "type": "string" }
}
},
"map": {
"description": "A generic map that can be deserialized later.",
"existingJavaType" : "java.util.Map<String,Object>",
"type" : "object",
"additionalProperties": true
},
"status" : {
"javaType": "org.openmetadata.schema.type.ApiStatus",
"description": "State of an action over API.",

View File

@ -1,5 +1,5 @@
/*
* Copyright 2024 Collate.
* Copyright 2025 Collate.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@ -10,9 +10,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
/**
* This schema defines the applications for Open-Metadata.
*/
export interface App {
@ -78,6 +76,10 @@ export interface App {
* it belongs to.
*/
domain?: EntityReference;
/**
* Event Subscriptions for the Application.
*/
eventSubscriptions?: EntityReference[];
/**
* Features of the Application.
*/
@ -283,8 +285,12 @@ export interface CollateAIAppConfig {
*
* Remove Owner Action Type
*
* Add a Custom Property to the selected assets.
*
* Add owners to the selected assets.
*
* Remove Custom Properties Action Type
*
* Propagate description, tags and glossary terms via lineage
*
* ML Tagging action configuration for external automator.
@ -314,6 +320,9 @@ export interface Action {
* Update the description even if they are already defined in the asset. By default, we'll
* only add the descriptions to assets without the description set.
*
* Update the Custom Property even if it is defined in the asset. By default, we will only
* apply the owners to assets without the given Custom Property informed.
*
* Update the tier even if it is defined in the asset. By default, we will only apply the
* tier to assets without tier.
*
@ -343,6 +352,12 @@ export interface Action {
* Description to apply
*/
description?: string;
/**
* Owners to apply
*
* Custom Properties keys to remove
*/
customProperties?: any;
/**
* tier to apply
*/
@ -546,6 +561,8 @@ export interface Style {
*
* Add Description Action Type.
*
* Add Custom Properties Action Type.
*
* Remove Description Action Type
*
* Add Tier Action Type.
@ -554,11 +571,14 @@ export interface Style {
*
* Remove Owner Action Type
*
* Remove Custom Properties Action Type.
*
* Lineage propagation action type.
*
* ML PII Tagging action type.
*/
export enum ActionType {
AddCustomPropertiesAction = "AddCustomPropertiesAction",
AddDescriptionAction = "AddDescriptionAction",
AddDomainAction = "AddDomainAction",
AddOwnerAction = "AddOwnerAction",
@ -566,6 +586,7 @@ export enum ActionType {
AddTierAction = "AddTierAction",
LineagePropagationAction = "LineagePropagationAction",
MLTaggingAction = "MLTaggingAction",
RemoveCustomPropertiesAction = "RemoveCustomPropertiesAction",
RemoveDescriptionAction = "RemoveDescriptionAction",
RemoveDomainAction = "RemoveDomainAction",
RemoveOwnerAction = "RemoveOwnerAction",

View File

@ -1,5 +1,5 @@
/*
* Copyright 2024 Collate.
* Copyright 2025 Collate.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@ -10,9 +10,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
/**
* This schema defines the applications for Open-Metadata.
*/
export interface AppMarketPlaceDefinition {
@ -70,6 +68,10 @@ export interface AppMarketPlaceDefinition {
* it belongs to.
*/
domain?: EntityReference;
/**
* Event subscriptions that will be created when the application is installed.
*/
eventSubscriptions?: CreateEventSubscription[];
/**
* Features of the Application.
*/
@ -268,8 +270,12 @@ export interface CollateAIAppConfig {
*
* Remove Owner Action Type
*
* Add a Custom Property to the selected assets.
*
* Add owners to the selected assets.
*
* Remove Custom Properties Action Type
*
* Propagate description, tags and glossary terms via lineage
*
* ML Tagging action configuration for external automator.
@ -299,6 +305,9 @@ export interface Action {
* Update the description even if they are already defined in the asset. By default, we'll
* only add the descriptions to assets without the description set.
*
* Update the Custom Property even if it is defined in the asset. By default, we will only
* apply the owners to assets without the given Custom Property informed.
*
* Update the tier even if it is defined in the asset. By default, we will only apply the
* tier to assets without tier.
*
@ -328,6 +337,12 @@ export interface Action {
* Description to apply
*/
description?: string;
/**
* Owners to apply
*
* Custom Properties keys to remove
*/
customProperties?: any;
/**
* tier to apply
*/
@ -529,6 +544,8 @@ export interface Style {
*
* Add Description Action Type.
*
* Add Custom Properties Action Type.
*
* Remove Description Action Type
*
* Add Tier Action Type.
@ -537,11 +554,14 @@ export interface Style {
*
* Remove Owner Action Type
*
* Remove Custom Properties Action Type.
*
* Lineage propagation action type.
*
* ML PII Tagging action type.
*/
export enum ActionType {
AddCustomPropertiesAction = "AddCustomPropertiesAction",
AddDescriptionAction = "AddDescriptionAction",
AddDomainAction = "AddDomainAction",
AddOwnerAction = "AddOwnerAction",
@ -549,6 +569,7 @@ export enum ActionType {
AddTierAction = "AddTierAction",
LineagePropagationAction = "LineagePropagationAction",
MLTaggingAction = "MLTaggingAction",
RemoveCustomPropertiesAction = "RemoveCustomPropertiesAction",
RemoveDescriptionAction = "RemoveDescriptionAction",
RemoveDomainAction = "RemoveDomainAction",
RemoveOwnerAction = "RemoveOwnerAction",
@ -665,6 +686,398 @@ export interface FieldChange {
oldValue?: any;
}
/**
* This defines schema for sending alerts for OpenMetadata
*/
export interface CreateEventSubscription {
/**
* Type of Alert
*/
alertType: AlertType;
/**
* Maximum number of events sent in a batch (Default 10).
*/
batchSize?: number;
/**
* Consumer Class for the Event Subscription. Will use 'AlertPublisher' if not provided.
*/
className?: string;
/**
* A short description of the Alert, comprehensible to regular users.
*/
description?: string;
/**
* Subscription Config.
*/
destinations?: Destination[];
/**
* Display name for this Alert.
*/
displayName?: string;
/**
* Fully qualified name of the domain the Table belongs to.
*/
domain?: string;
/**
* Is the alert enabled.
*/
enabled?: boolean;
/**
* Input for the Filters.
*/
input?: AlertFilteringInput;
/**
* Name that uniquely identifies this Alert.
*/
name: string;
/**
* Owners of this Alert.
*/
owners?: EntityReference[];
/**
* Poll Interval in seconds.
*/
pollInterval?: number;
provider?: ProviderType;
/**
* Defines a list of resources that triggers the Event Subscription, Eg All, User, Teams etc.
*/
resources?: string[];
/**
* Number of times to retry callback on failure. (Default 3).
*/
retries?: number;
trigger?: Trigger;
}
/**
* Type of Alert
*
* Type of Alerts supported.
*/
export enum AlertType {
ActivityFeed = "ActivityFeed",
GovernanceWorkflowChangeEvent = "GovernanceWorkflowChangeEvent",
Notification = "Notification",
Observability = "Observability",
}
/**
* Subscription which has a type and the config.
*/
export interface Destination {
category: SubscriptionCategory;
config?: Webhook;
/**
* Is the subscription enabled.
*/
enabled?: boolean;
/**
* Unique identifier that identifies this Event Subscription.
*/
id?: string;
/**
* Read timeout in seconds. (Default 12s).
*/
readTimeout?: number;
statusDetails?: TionStatus;
/**
* Connection timeout in seconds. (Default 10s).
*/
timeout?: number;
type: SubscriptionType;
}
/**
* Subscription Endpoint Type.
*/
export enum SubscriptionCategory {
Admins = "Admins",
Assignees = "Assignees",
External = "External",
Followers = "Followers",
Mentions = "Mentions",
Owners = "Owners",
Teams = "Teams",
Users = "Users",
}
/**
* This schema defines webhook for receiving events from OpenMetadata.
*
* This schema defines email config for receiving events from OpenMetadata.
*
* A generic map that can be deserialized later.
*/
export interface Webhook {
/**
* Endpoint to receive the webhook events over POST requests.
*/
endpoint?: string;
/**
* Custom headers to be sent with the webhook request.
*/
headers?: { [key: string]: any };
/**
* HTTP operation to send the webhook request. Supports POST or PUT.
*/
httpMethod?: HTTPMethod;
/**
* List of receivers to send mail to
*/
receivers?: string[];
/**
* Secret set by the webhook client used for computing HMAC SHA256 signature of webhook
* payload and sent in `X-OM-Signature` header in POST requests to publish the events.
*/
secretKey?: string;
/**
* Send the Event to Admins
*
* Send the Mails to Admins
*/
sendToAdmins?: boolean;
/**
* Send the Event to Followers
*
* Send the Mails to Followers
*/
sendToFollowers?: boolean;
/**
* Send the Event to Owners
*
* Send the Mails to Owners
*/
sendToOwners?: boolean;
[property: string]: any;
}
/**
* HTTP operation to send the webhook request. Supports POST or PUT.
*/
export enum HTTPMethod {
Post = "POST",
Put = "PUT",
}
/**
* Current status of the subscription, including details on the last successful and failed
* attempts, and retry information.
*
* Detailed status of the destination during a test operation, including HTTP response
* information.
*/
export interface TionStatus {
/**
* Timestamp of the last failed callback in UNIX UTC epoch time in milliseconds.
*/
lastFailedAt?: number;
/**
* Detailed reason for the last failure received during callback.
*/
lastFailedReason?: string;
/**
* HTTP status code received during the last failed callback attempt.
*/
lastFailedStatusCode?: number;
/**
* Timestamp of the last successful callback in UNIX UTC epoch time in milliseconds.
*/
lastSuccessfulAt?: number;
/**
* Timestamp for the next retry attempt in UNIX epoch time in milliseconds. Only valid if
* `status` is `awaitingRetry`.
*/
nextAttempt?: number;
/**
* Status is `disabled` when the event subscription was created with `enabled` set to false
* and it never started publishing events. Status is `active` when the event subscription is
* functioning normally and a 200 OK response was received for the callback notification.
* Status is `failed` when a bad callback URL, connection failures, or `1xx` or `3xx`
* response was received for the callback notification. Status is `awaitingRetry` when the
* previous attempt at callback timed out or received a `4xx` or `5xx` response. Status is
* `retryLimitReached` after all retries fail.
*
* Overall test status, indicating if the test operation succeeded or failed.
*/
status?: Status;
/**
* Current timestamp of this status in UNIX epoch time in milliseconds.
*
* Timestamp when the response was received, in UNIX epoch time milliseconds.
*/
timestamp?: number;
/**
* Body of the HTTP response, if any, returned by the server.
*/
entity?: string;
/**
* HTTP headers returned in the response as a map of header names to values.
*/
headers?: any;
/**
* URL location if the response indicates a redirect or newly created resource.
*/
location?: string;
/**
* Media type of the response entity, if specified (e.g., application/json).
*/
mediaType?: string;
/**
* Detailed reason for failure if the test did not succeed.
*/
reason?: string;
/**
* HTTP status code of the response (e.g., 200 for OK, 404 for Not Found).
*/
statusCode?: number;
/**
* HTTP status reason phrase associated with the status code (e.g., 'Not Found').
*/
statusInfo?: string;
}
/**
* Status is `disabled` when the event subscription was created with `enabled` set to false
* and it never started publishing events. Status is `active` when the event subscription is
* functioning normally and a 200 OK response was received for the callback notification.
* Status is `failed` when a bad callback URL, connection failures, or `1xx` or `3xx`
* response was received for the callback notification. Status is `awaitingRetry` when the
* previous attempt at callback timed out or received a `4xx` or `5xx` response. Status is
* `retryLimitReached` after all retries fail.
*
* Overall test status, indicating if the test operation succeeded or failed.
*/
export enum Status {
Active = "active",
AwaitingRetry = "awaitingRetry",
Disabled = "disabled",
Failed = "failed",
RetryLimitReached = "retryLimitReached",
StatusFailed = "Failed",
Success = "Success",
}
/**
* Subscription Endpoint Type.
*/
export enum SubscriptionType {
ActivityFeed = "ActivityFeed",
Email = "Email",
GChat = "GChat",
GovernanceWorkflowChangeEvent = "GovernanceWorkflowChangeEvent",
MSTeams = "MsTeams",
Slack = "Slack",
Webhook = "Webhook",
}
/**
* Input for the Filters.
*
* Observability of the event subscription.
*/
export interface AlertFilteringInput {
/**
* List of filters for the event subscription.
*/
actions?: ArgumentsInput[];
/**
* List of filters for the event subscription.
*/
filters?: ArgumentsInput[];
}
/**
* Observability Filters for Event Subscription.
*/
export interface ArgumentsInput {
/**
* Arguments List
*/
arguments?: Argument[];
effect?: Effect;
/**
* Name of the filter
*/
name?: string;
/**
* Prefix Condition for the filter.
*/
prefixCondition?: PrefixCondition;
}
/**
* Argument for the filter.
*/
export interface Argument {
/**
* Value of the Argument
*/
input?: string[];
/**
* Name of the Argument
*/
name?: string;
}
export enum Effect {
Exclude = "exclude",
Include = "include",
}
/**
* Prefix Condition for the filter.
*
* Prefix Condition to be applied to the Condition.
*/
export enum PrefixCondition {
And = "AND",
Or = "OR",
}
/**
* Type of provider of an entity. Some entities are provided by the `system`. Some are
* entities created and provided by the `user`. Typically `system` provide entities can't be
* deleted and can only be disabled.
*/
export enum ProviderType {
System = "system",
User = "user",
}
/**
* Trigger Configuration for Alerts.
*/
export interface Trigger {
/**
* Cron Expression in case of Custom scheduled Trigger
*/
cronExpression?: string;
/**
* Schedule Info
*/
scheduleInfo?: ScheduleInfo;
triggerType: TriggerType;
}
/**
* Schedule Info
*/
export enum ScheduleInfo {
Custom = "Custom",
Daily = "Daily",
Monthly = "Monthly",
Weekly = "Weekly",
}
/**
* Trigger Configuration for Alerts.
*/
export enum TriggerType {
RealTime = "RealTime",
Scheduled = "Scheduled",
}
/**
* Permission used by Native Applications.
*

View File

@ -1,5 +1,5 @@
/*
* Copyright 2024 Collate.
* Copyright 2025 Collate.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@ -10,9 +10,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
/**
* This schema defines the applications for Open-Metadata.
*/
export interface CreateAppMarketPlaceDefinitionReq {
@ -61,6 +59,10 @@ export interface CreateAppMarketPlaceDefinitionReq {
* Fully qualified name of the domain the Table belongs to.
*/
domain?: string;
/**
* Event subscriptions that will be created when the application is installed.
*/
eventSubscriptions?: CreateEventSubscription[];
/**
* Features of the Application.
*/
@ -229,8 +231,12 @@ export interface CollateAIAppConfig {
*
* Remove Owner Action Type
*
* Add a Custom Property to the selected assets.
*
* Add owners to the selected assets.
*
* Remove Custom Properties Action Type
*
* Propagate description, tags and glossary terms via lineage
*
* ML Tagging action configuration for external automator.
@ -260,6 +266,9 @@ export interface Action {
* Update the description even if they are already defined in the asset. By default, we'll
* only add the descriptions to assets without the description set.
*
* Update the Custom Property even if it is defined in the asset. By default, we will only
* apply the owners to assets without the given Custom Property informed.
*
* Update the tier even if it is defined in the asset. By default, we will only apply the
* tier to assets without tier.
*
@ -289,6 +298,12 @@ export interface Action {
* Description to apply
*/
description?: string;
/**
* Owners to apply
*
* Custom Properties keys to remove
*/
customProperties?: any;
/**
* tier to apply
*/
@ -487,6 +502,8 @@ export interface Style {
*
* Add Description Action Type.
*
* Add Custom Properties Action Type.
*
* Remove Description Action Type
*
* Add Tier Action Type.
@ -495,11 +512,14 @@ export interface Style {
*
* Remove Owner Action Type
*
* Remove Custom Properties Action Type.
*
* Lineage propagation action type.
*
* ML PII Tagging action type.
*/
export enum ActionType {
AddCustomPropertiesAction = "AddCustomPropertiesAction",
AddDescriptionAction = "AddDescriptionAction",
AddDomainAction = "AddDomainAction",
AddOwnerAction = "AddOwnerAction",
@ -507,6 +527,7 @@ export enum ActionType {
AddTierAction = "AddTierAction",
LineagePropagationAction = "LineagePropagationAction",
MLTaggingAction = "MLTaggingAction",
RemoveCustomPropertiesAction = "RemoveCustomPropertiesAction",
RemoveDescriptionAction = "RemoveDescriptionAction",
RemoveDomainAction = "RemoveDomainAction",
RemoveOwnerAction = "RemoveOwnerAction",
@ -582,6 +603,398 @@ export enum AppType {
Internal = "internal",
}
/**
* This defines schema for sending alerts for OpenMetadata
*/
export interface CreateEventSubscription {
/**
* Type of Alert
*/
alertType: AlertType;
/**
* Maximum number of events sent in a batch (Default 10).
*/
batchSize?: number;
/**
* Consumer Class for the Event Subscription. Will use 'AlertPublisher' if not provided.
*/
className?: string;
/**
* A short description of the Alert, comprehensible to regular users.
*/
description?: string;
/**
* Subscription Config.
*/
destinations?: Destination[];
/**
* Display name for this Alert.
*/
displayName?: string;
/**
* Fully qualified name of the domain the Table belongs to.
*/
domain?: string;
/**
* Is the alert enabled.
*/
enabled?: boolean;
/**
* Input for the Filters.
*/
input?: AlertFilteringInput;
/**
* Name that uniquely identifies this Alert.
*/
name: string;
/**
* Owners of this Alert.
*/
owners?: EntityReference[];
/**
* Poll Interval in seconds.
*/
pollInterval?: number;
provider?: ProviderType;
/**
* Defines a list of resources that triggers the Event Subscription, Eg All, User, Teams etc.
*/
resources?: string[];
/**
* Number of times to retry callback on failure. (Default 3).
*/
retries?: number;
trigger?: Trigger;
}
/**
* Type of Alert
*
* Type of Alerts supported.
*/
export enum AlertType {
ActivityFeed = "ActivityFeed",
GovernanceWorkflowChangeEvent = "GovernanceWorkflowChangeEvent",
Notification = "Notification",
Observability = "Observability",
}
/**
* Subscription which has a type and the config.
*/
export interface Destination {
category: SubscriptionCategory;
config?: Webhook;
/**
* Is the subscription enabled.
*/
enabled?: boolean;
/**
* Unique identifier that identifies this Event Subscription.
*/
id?: string;
/**
* Read timeout in seconds. (Default 12s).
*/
readTimeout?: number;
statusDetails?: TionStatus;
/**
* Connection timeout in seconds. (Default 10s).
*/
timeout?: number;
type: SubscriptionType;
}
/**
* Subscription Endpoint Type.
*/
export enum SubscriptionCategory {
Admins = "Admins",
Assignees = "Assignees",
External = "External",
Followers = "Followers",
Mentions = "Mentions",
Owners = "Owners",
Teams = "Teams",
Users = "Users",
}
/**
* This schema defines webhook for receiving events from OpenMetadata.
*
* This schema defines email config for receiving events from OpenMetadata.
*
* A generic map that can be deserialized later.
*/
export interface Webhook {
/**
* Endpoint to receive the webhook events over POST requests.
*/
endpoint?: string;
/**
* Custom headers to be sent with the webhook request.
*/
headers?: { [key: string]: any };
/**
* HTTP operation to send the webhook request. Supports POST or PUT.
*/
httpMethod?: HTTPMethod;
/**
* List of receivers to send mail to
*/
receivers?: string[];
/**
* Secret set by the webhook client used for computing HMAC SHA256 signature of webhook
* payload and sent in `X-OM-Signature` header in POST requests to publish the events.
*/
secretKey?: string;
/**
* Send the Event to Admins
*
* Send the Mails to Admins
*/
sendToAdmins?: boolean;
/**
* Send the Event to Followers
*
* Send the Mails to Followers
*/
sendToFollowers?: boolean;
/**
* Send the Event to Owners
*
* Send the Mails to Owners
*/
sendToOwners?: boolean;
[property: string]: any;
}
/**
* HTTP operation to send the webhook request. Supports POST or PUT.
*/
export enum HTTPMethod {
Post = "POST",
Put = "PUT",
}
/**
* Current status of the subscription, including details on the last successful and failed
* attempts, and retry information.
*
* Detailed status of the destination during a test operation, including HTTP response
* information.
*/
export interface TionStatus {
/**
* Timestamp of the last failed callback in UNIX UTC epoch time in milliseconds.
*/
lastFailedAt?: number;
/**
* Detailed reason for the last failure received during callback.
*/
lastFailedReason?: string;
/**
* HTTP status code received during the last failed callback attempt.
*/
lastFailedStatusCode?: number;
/**
* Timestamp of the last successful callback in UNIX UTC epoch time in milliseconds.
*/
lastSuccessfulAt?: number;
/**
* Timestamp for the next retry attempt in UNIX epoch time in milliseconds. Only valid if
* `status` is `awaitingRetry`.
*/
nextAttempt?: number;
/**
* Status is `disabled` when the event subscription was created with `enabled` set to false
* and it never started publishing events. Status is `active` when the event subscription is
* functioning normally and a 200 OK response was received for the callback notification.
* Status is `failed` when a bad callback URL, connection failures, or `1xx` or `3xx`
* response was received for the callback notification. Status is `awaitingRetry` when the
* previous attempt at callback timed out or received a `4xx` or `5xx` response. Status is
* `retryLimitReached` after all retries fail.
*
* Overall test status, indicating if the test operation succeeded or failed.
*/
status?: Status;
/**
* Current timestamp of this status in UNIX epoch time in milliseconds.
*
* Timestamp when the response was received, in UNIX epoch time milliseconds.
*/
timestamp?: number;
/**
* Body of the HTTP response, if any, returned by the server.
*/
entity?: string;
/**
* HTTP headers returned in the response as a map of header names to values.
*/
headers?: any;
/**
* URL location if the response indicates a redirect or newly created resource.
*/
location?: string;
/**
* Media type of the response entity, if specified (e.g., application/json).
*/
mediaType?: string;
/**
* Detailed reason for failure if the test did not succeed.
*/
reason?: string;
/**
* HTTP status code of the response (e.g., 200 for OK, 404 for Not Found).
*/
statusCode?: number;
/**
* HTTP status reason phrase associated with the status code (e.g., 'Not Found').
*/
statusInfo?: string;
}
/**
* Status is `disabled` when the event subscription was created with `enabled` set to false
* and it never started publishing events. Status is `active` when the event subscription is
* functioning normally and a 200 OK response was received for the callback notification.
* Status is `failed` when a bad callback URL, connection failures, or `1xx` or `3xx`
* response was received for the callback notification. Status is `awaitingRetry` when the
* previous attempt at callback timed out or received a `4xx` or `5xx` response. Status is
* `retryLimitReached` after all retries fail.
*
* Overall test status, indicating if the test operation succeeded or failed.
*/
export enum Status {
Active = "active",
AwaitingRetry = "awaitingRetry",
Disabled = "disabled",
Failed = "failed",
RetryLimitReached = "retryLimitReached",
StatusFailed = "Failed",
Success = "Success",
}
/**
* Subscription Endpoint Type.
*/
export enum SubscriptionType {
ActivityFeed = "ActivityFeed",
Email = "Email",
GChat = "GChat",
GovernanceWorkflowChangeEvent = "GovernanceWorkflowChangeEvent",
MSTeams = "MsTeams",
Slack = "Slack",
Webhook = "Webhook",
}
/**
* Input for the Filters.
*
* Observability of the event subscription.
*/
export interface AlertFilteringInput {
/**
* List of filters for the event subscription.
*/
actions?: ArgumentsInput[];
/**
* List of filters for the event subscription.
*/
filters?: ArgumentsInput[];
}
/**
* Observability Filters for Event Subscription.
*/
export interface ArgumentsInput {
/**
* Arguments List
*/
arguments?: Argument[];
effect?: Effect;
/**
* Name of the filter
*/
name?: string;
/**
* Prefix Condition for the filter.
*/
prefixCondition?: PrefixCondition;
}
/**
* Argument for the filter.
*/
export interface Argument {
/**
* Value of the Argument
*/
input?: string[];
/**
* Name of the Argument
*/
name?: string;
}
export enum Effect {
Exclude = "exclude",
Include = "include",
}
/**
* Prefix Condition for the filter.
*
* Prefix Condition to be applied to the Condition.
*/
export enum PrefixCondition {
And = "AND",
Or = "OR",
}
/**
* Type of provider of an entity. Some entities are provided by the `system`. Some are
* entities created and provided by the `user`. Typically `system` provide entities can't be
* deleted and can only be disabled.
*/
export enum ProviderType {
System = "system",
User = "user",
}
/**
* Trigger Configuration for Alerts.
*/
export interface Trigger {
/**
* Cron Expression in case of Custom scheduled Trigger
*/
cronExpression?: string;
/**
* Schedule Info
*/
scheduleInfo?: ScheduleInfo;
triggerType: TriggerType;
}
/**
* Schedule Info
*/
export enum ScheduleInfo {
Custom = "Custom",
Daily = "Daily",
Monthly = "Monthly",
Weekly = "Weekly",
}
/**
* Trigger Configuration for Alerts.
*/
export enum TriggerType {
RealTime = "RealTime",
Scheduled = "Scheduled",
}
/**
* Permission used by Native Applications.
*

View File

@ -1,5 +1,5 @@
/*
* Copyright 2024 Collate.
* Copyright 2025 Collate.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@ -10,9 +10,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
/**
* This defines schema for sending alerts for OpenMetadata
*/
export interface CreateEventSubscription {
@ -24,6 +22,10 @@ export interface CreateEventSubscription {
* Maximum number of events sent in a batch (Default 10).
*/
batchSize?: number;
/**
* Consumer Class for the Event Subscription. Will use 'AlertPublisher' if not provided.
*/
className?: string;
/**
* A short description of the Alert, comprehensible to regular users.
*/
@ -128,6 +130,8 @@ export enum SubscriptionCategory {
* This schema defines webhook for receiving events from OpenMetadata.
*
* This schema defines email config for receiving events from OpenMetadata.
*
* A generic map that can be deserialized later.
*/
export interface Webhook {
/**
@ -169,6 +173,7 @@ export interface Webhook {
* Send the Mails to Owners
*/
sendToOwners?: boolean;
[property: string]: any;
}
/**

View File

@ -1,5 +1,5 @@
/*
* Copyright 2024 Collate.
* Copyright 2025 Collate.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@ -10,9 +10,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
/**
* This schema defines the EventSubscription entity. An Event Subscription has trigger,
* filters and Subscription
*/
@ -29,6 +27,10 @@ export interface EventSubscription {
* Change that led to this version of the Event Subscription.
*/
changeDescription?: ChangeDescription;
/**
* Java class for the Event Subscription.
*/
className?: string;
/**
* A short description of the Event Subscription, comprehensible to regular users.
*/
@ -204,6 +206,8 @@ export enum SubscriptionCategory {
* This schema defines webhook for receiving events from OpenMetadata.
*
* This schema defines email config for receiving events from OpenMetadata.
*
* A generic map that can be deserialized later.
*/
export interface Webhook {
/**
@ -245,6 +249,7 @@ export interface Webhook {
* Send the Mails to Owners
*/
sendToOwners?: boolean;
[property: string]: any;
}
/**