diff --git a/bootstrap/sql/migrations/native/1.6.0/mysql/schemaChanges.sql b/bootstrap/sql/migrations/native/1.6.0/mysql/schemaChanges.sql index 7f90aff02ae..e5f8effea67 100644 --- a/bootstrap/sql/migrations/native/1.6.0/mysql/schemaChanges.sql +++ b/bootstrap/sql/migrations/native/1.6.0/mysql/schemaChanges.sql @@ -12,4 +12,27 @@ CREATE TABLE IF NOT EXISTS apps_data_store ( ALTER TABLE consumers_dlq ADD COLUMN source VARCHAR(255); -- Create an index on the source column in the consumers_dlq table -CREATE INDEX idx_consumers_dlq_source ON consumers_dlq (source); \ No newline at end of file +CREATE INDEX idx_consumers_dlq_source ON consumers_dlq (source); + +-- Rename 'offset' to 'currentOffset' and add 'startingOffset' +UPDATE change_event_consumers +SET json = JSON_SET( + JSON_REMOVE(json, '$.offset'), + '$.currentOffset', JSON_EXTRACT(json, '$.offset'), + '$.startingOffset', JSON_EXTRACT(json, '$.offset') +) +WHERE JSON_EXTRACT(json, '$.offset') IS NOT NULL + AND jsonSchema = 'eventSubscriptionOffset'; + +-- Create table successful_sent_change_events for storing successfully sent events per alert +CREATE TABLE IF NOT EXISTS successful_sent_change_events ( + id VARCHAR(36) NOT NULL, + change_event_id VARCHAR(36) NOT NULL, + event_subscription_id VARCHAR(36) NOT NULL, + json JSON NOT NULL, + timestamp BIGINT UNSIGNED NOT NULL, + PRIMARY KEY (id) +); + +-- Create an index on the event_subscription_id column in the successful_sent_change_events table +CREATE INDEX idx_event_subscription_id ON successful_sent_change_events (event_subscription_id); \ No newline at end of file diff --git a/bootstrap/sql/migrations/native/1.6.0/postgres/schemaChanges.sql b/bootstrap/sql/migrations/native/1.6.0/postgres/schemaChanges.sql index 7f90aff02ae..f2cecb79040 100644 --- a/bootstrap/sql/migrations/native/1.6.0/postgres/schemaChanges.sql +++ b/bootstrap/sql/migrations/native/1.6.0/postgres/schemaChanges.sql @@ -12,4 +12,26 @@ CREATE TABLE IF NOT EXISTS apps_data_store ( ALTER TABLE consumers_dlq ADD COLUMN source VARCHAR(255); -- Create an index on the source column in the consumers_dlq table -CREATE INDEX idx_consumers_dlq_source ON consumers_dlq (source); \ No newline at end of file +CREATE INDEX idx_consumers_dlq_source ON consumers_dlq (source); + +-- Rename 'offset' to 'currentOffset' and add 'startingOffset' +UPDATE change_event_consumers +SET json = jsonb_set( + jsonb_set(json, '{currentOffset}', json -> 'offset'), + '{startingOffset}', json -> 'offset' +) +WHERE json -> 'offset' IS NOT NULL + AND jsonSchema = 'eventSubscriptionOffset'; + + +-- Create table successful_sent_change_events for storing successfully sent events per alert +CREATE TABLE IF NOT EXISTS successful_sent_change_events ( + id VARCHAR(36) PRIMARY KEY, + change_event_id VARCHAR(36) NOT NULL, + event_subscription_id VARCHAR(36) NOT NULL, + json jsonb NOT NULL, + timestamp BIGINT NOT NULL +); + +-- Create an index on the event_subscription_id column in the successful_sent_change_events table +CREATE INDEX idx_event_subscription_id ON successful_sent_change_events (event_subscription_id); \ No newline at end of file diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/AbstractEventConsumer.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/AbstractEventConsumer.java index 9f5085b57aa..718ae326086 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/AbstractEventConsumer.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/AbstractEventConsumer.java @@ -55,6 +55,8 @@ public abstract class AbstractEventConsumer public static final String FAILED_EVENT_EXTENSION = "eventSubscription.failedEvent"; private long offset = -1; + private long startingOffset = -1; + private AlertMetrics alertMetrics; @Getter @Setter private JobDetail jobDetail; @@ -68,7 +70,8 @@ public abstract class AbstractEventConsumer (EventSubscription) context.getJobDetail().getJobDataMap().get(ALERT_INFO_KEY); this.jobDetail = context.getJobDetail(); this.eventSubscription = sub; - this.offset = loadInitialOffset(context); + this.offset = loadInitialOffset(context).getCurrentOffset(); + this.startingOffset = loadInitialOffset(context).getStartingOffset(); this.alertMetrics = loadInitialMetrics(); this.destinationMap = loadDestinationsMap(context); this.doInit(context); @@ -110,18 +113,29 @@ public abstract class AbstractEventConsumer source.toString()); } - private long loadInitialOffset(JobExecutionContext context) { + private void recordSuccessfulChangeEvent(UUID eventSubscriptionId, ChangeEvent event) { + Entity.getCollectionDAO() + .eventSubscriptionDAO() + .insertSuccessfulChangeEvent( + UUID.randomUUID().toString(), + event.getId().toString(), + eventSubscriptionId.toString(), + JsonUtils.pojoToJson(event), + event.getTimestamp()); + } + + private EventSubscriptionOffset loadInitialOffset(JobExecutionContext context) { EventSubscriptionOffset jobStoredOffset = (EventSubscriptionOffset) jobDetail.getJobDataMap().get(ALERT_OFFSET_KEY); // If the Job Data Map has the latest offset, use it if (jobStoredOffset != null) { - return jobStoredOffset.getOffset(); + return jobStoredOffset; } else { EventSubscriptionOffset eventSubscriptionOffset = getStartingOffset(eventSubscription.getId()); // Update the Job Data Map with the latest offset context.getJobDetail().getJobDataMap().put(ALERT_OFFSET_KEY, eventSubscriptionOffset); - return eventSubscriptionOffset.getOffset(); + return eventSubscriptionOffset; } } @@ -171,6 +185,7 @@ public abstract class AbstractEventConsumer for (UUID receiverId : eventWithReceivers.getValue()) { try { sendAlert(receiverId, eventWithReceivers.getKey()); + recordSuccessfulChangeEvent(eventSubscription.getId(), eventWithReceivers.getKey()); alertMetrics.withSuccessEvents(alertMetrics.getSuccessEvents() + 1); } catch (EventPublisherException e) { alertMetrics.withFailedEvents(alertMetrics.getFailedEvents() + 1); @@ -185,7 +200,10 @@ public abstract class AbstractEventConsumer long currentTime = System.currentTimeMillis(); // Upsert Offset EventSubscriptionOffset eventSubscriptionOffset = - new EventSubscriptionOffset().withOffset(offset).withTimestamp(currentTime); + new EventSubscriptionOffset() + .withCurrentOffset(offset) + .withStartingOffset(startingOffset) + .withTimestamp(currentTime); Entity.getCollectionDAO() .eventSubscriptionDAO() diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/EventSubscriptionCleanupJob.java b/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/EventSubscriptionCleanupJob.java new file mode 100644 index 00000000000..1bba401b12f --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/EventSubscriptionCleanupJob.java @@ -0,0 +1,41 @@ +package org.openmetadata.service.events.scheduled; + +import java.util.List; +import lombok.extern.slf4j.Slf4j; +import org.jdbi.v3.sqlobject.transaction.Transaction; +import org.openmetadata.service.Entity; +import org.quartz.Job; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; + +@Slf4j +public class EventSubscriptionCleanupJob implements Job { + private static final int TARGET_COUNT = 50; + private static final int THRESHOLD = 100; + + @Override + public void execute(JobExecutionContext context) throws JobExecutionException { + performCleanup(); + } + + @Transaction + public static void performCleanup() { + List subscriptionsToClean = + Entity.getCollectionDAO().eventSubscriptionDAO().findSubscriptionsAboveThreshold(THRESHOLD); + + for (String subscriptionId : subscriptionsToClean) { + int recordCount = + Entity.getCollectionDAO().eventSubscriptionDAO().getRecordCount(subscriptionId); + + int excessRecords = recordCount - TARGET_COUNT; + if (excessRecords > 0) { + Entity.getCollectionDAO() + .eventSubscriptionDAO() + .deleteOldRecords(subscriptionId, excessRecords); + } + } + LOG.debug( + "Performed cleanup for subscriptions, retaining {} records per subscription.", + TARGET_COUNT); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/EventSubscriptionScheduler.java b/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/EventSubscriptionScheduler.java index 68e3cbde3ec..3d3d435f6b8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/EventSubscriptionScheduler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/EventSubscriptionScheduler.java @@ -19,6 +19,7 @@ import static org.openmetadata.service.events.subscription.AlertUtil.getStarting import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.UUID; import java.util.stream.Collectors; @@ -36,7 +37,10 @@ import org.openmetadata.schema.entity.events.SubscriptionStatus; import org.openmetadata.schema.type.ChangeEvent; import org.openmetadata.service.Entity; import org.openmetadata.service.apps.bundles.changeEvent.AlertPublisher; +import org.openmetadata.service.events.subscription.AlertUtil; import org.openmetadata.service.jdbi3.EntityRepository; +import org.openmetadata.service.jdbi3.EventSubscriptionRepository; +import org.openmetadata.service.resources.events.subscription.TypedEvent; import org.openmetadata.service.util.JsonUtils; import org.quartz.JobBuilder; import org.quartz.JobDataMap; @@ -56,6 +60,8 @@ public class EventSubscriptionScheduler { public static final String ALERT_TRIGGER_GROUP = "OMAlertJobGroup"; private static EventSubscriptionScheduler instance; private static volatile boolean initialized = false; + public static volatile boolean cleanupJobInitialised = false; + private final Scheduler alertsScheduler = new StdSchedulerFactory().getScheduler(); private EventSubscriptionScheduler() throws SchedulerException { @@ -111,6 +117,7 @@ public class EventSubscriptionScheduler { // Schedule the Job alertsScheduler.scheduleJob(jobDetail, trigger); + instance.scheduleCleanupJob(); LOG.info( "Event Subscription started as {} : status {} for all Destinations", @@ -163,6 +170,44 @@ public class EventSubscriptionScheduler { LOG.info("Alert publisher deleted for {}", deletedEntity.getName()); } + public void scheduleCleanupJob() { + if (!cleanupJobInitialised) { + try { + JobDetail cleanupJob = + JobBuilder.newJob(EventSubscriptionCleanupJob.class) + .withIdentity("CleanupJob", ALERT_JOB_GROUP) + .build(); + + Trigger cleanupTrigger = + TriggerBuilder.newTrigger() + .withIdentity("CleanupTrigger", ALERT_TRIGGER_GROUP) + .withSchedule( + SimpleScheduleBuilder.simpleSchedule() + .withIntervalInSeconds(10) + .repeatForever()) + .startNow() + .build(); + + alertsScheduler.scheduleJob(cleanupJob, cleanupTrigger); + cleanupJobInitialised = true; + LOG.info("Scheduled periodic cleanup job to run every 10 seconds."); + } catch (SchedulerException e) { + LOG.error("Failed to schedule cleanup job", e); + } + } + } + + @Transaction + public void deleteSuccessfulAndFailedEventsRecordByAlert(UUID id) { + Entity.getCollectionDAO() + .eventSubscriptionDAO() + .deleteSuccessfulChangeEventBySubscriptionId(id.toString()); + + Entity.getCollectionDAO() + .eventSubscriptionDAO() + .deleteFailedRecordsBySubscriptionId(id.toString()); + } + public SubscriptionStatus getStatusForEventSubscription(UUID subscriptionId, UUID destinationId) { Optional eventSubscriptionOpt = getEventSubscriptionFromScheduledJob(subscriptionId); @@ -218,38 +263,42 @@ public class EventSubscriptionScheduler { } public EventSubscriptionDiagnosticInfo getEventSubscriptionDiagnosticInfo( - UUID subscriptionId, int limit) { - boolean isAllEventsPublished = checkIfPublisherPublishedAllEvents(subscriptionId); - EventSubscriptionOffset latestOffset = getLatestOffset(); + UUID subscriptionId, int limit, int paginationOffset) { + boolean hasProcessedAllEvents = checkIfPublisherPublishedAllEvents(subscriptionId); + + Optional eventSubscriptionOffsetOptional = + getEventSubscriptionOffset(subscriptionId); long currentOffset = - getEventSubscriptionOffset(subscriptionId) - .map(EventSubscriptionOffset::getOffset) - .orElse(0L); + eventSubscriptionOffsetOptional.map(EventSubscriptionOffset::getCurrentOffset).orElse(0L); - long unpublishedEventCount = getUnpublishedEventCount(subscriptionId); + long startingOffset = + eventSubscriptionOffsetOptional.map(EventSubscriptionOffset::getStartingOffset).orElse(0L); + + long totalUnprocessedEventCount = getUnpublishedEventCount(subscriptionId); List unprocessedEvents = - Optional.ofNullable(getUnpublishedEvents(subscriptionId, limit)) + Optional.ofNullable(getRelevantUnprocessedEvents(subscriptionId, limit, paginationOffset)) .orElse(Collections.emptyList()); - return new EventSubscriptionDiagnosticInfo() - .withLatestOffset(latestOffset.getOffset()) - .withCurrentOffset(currentOffset) - .withHasProcessedAllEvents(isAllEventsPublished) - .withUnprocessedEventsCount(unpublishedEventCount) - .withUnprocessedEventsList(unprocessedEvents); - } + List allUnprocessedEvents = + getAllUnprocessedEvents(subscriptionId, limit, paginationOffset); - public static EventSubscriptionOffset getLatestOffset() { - return new EventSubscriptionOffset() - .withOffset(Entity.getCollectionDAO().changeEventDAO().getLatestOffset()); + return new EventSubscriptionDiagnosticInfo() + .withLatestOffset(Entity.getCollectionDAO().changeEventDAO().getLatestOffset()) + .withCurrentOffset(currentOffset) + .withStartingOffset(startingOffset) + .withHasProcessedAllEvents(hasProcessedAllEvents) + .withTotalUnprocessedEventsCount(totalUnprocessedEventCount) + .withTotalUnprocessedEventsList(allUnprocessedEvents) + .withRelevantUnprocessedEventsCount((long) unprocessedEvents.size()) + .withRelevantUnprocessedEventsList(unprocessedEvents); } public boolean checkIfPublisherPublishedAllEvents(UUID subscriptionID) { long countOfEvents = Entity.getCollectionDAO().changeEventDAO().getLatestOffset(); return getEventSubscriptionOffset(subscriptionID) - .map(offset -> offset.getOffset() == countOfEvents) + .map(offset -> offset.getCurrentOffset() == countOfEvents) .orElse(false); } @@ -257,67 +306,137 @@ public class EventSubscriptionScheduler { long countOfEvents = Entity.getCollectionDAO().changeEventDAO().getLatestOffset(); return getEventSubscriptionOffset(subscriptionID) - .map(offset -> Math.abs(countOfEvents - offset.getOffset())) + .map(offset -> Math.abs(countOfEvents - offset.getCurrentOffset())) .orElse(countOfEvents); } - public List getUnpublishedEvents(UUID subscriptionId, int limit) { + public List getRelevantUnprocessedEvents( + UUID subscriptionId, int limit, int paginationOffset) { long offset = getEventSubscriptionOffset(subscriptionId) - .map(EventSubscriptionOffset::getOffset) + .map(EventSubscriptionOffset::getCurrentOffset) .orElse(Entity.getCollectionDAO().changeEventDAO().getLatestOffset()); - List unprocessedEventJsonList = - Entity.getCollectionDAO().changeEventDAO().listUnprocessedEvents(offset, limit); + return Entity.getCollectionDAO() + .changeEventDAO() + .listUnprocessedEvents(offset, limit, paginationOffset) + .parallelStream() + .map( + eventJson -> { + ChangeEvent event = JsonUtils.readValue(eventJson, ChangeEvent.class); + return AlertUtil.checkIfChangeEventIsAllowed( + event, getEventSubscription(subscriptionId).getFilteringRules()) + ? event + : null; + }) + .filter(Objects::nonNull) // Remove null entries (events that did not pass filtering) + .toList(); + } - return unprocessedEventJsonList.stream() + public List getAllUnprocessedEvents( + UUID subscriptionId, int limit, int paginationOffset) { + long offset = + getEventSubscriptionOffset(subscriptionId) + .map(EventSubscriptionOffset::getCurrentOffset) + .orElse(Entity.getCollectionDAO().changeEventDAO().getLatestOffset()); + + return Entity.getCollectionDAO() + .changeEventDAO() + .listUnprocessedEvents(offset, limit, paginationOffset) + .parallelStream() .map(eventJson -> JsonUtils.readValue(eventJson, ChangeEvent.class)) .collect(Collectors.toList()); } public List getFailedEventsByIdAndSource( - UUID subscriptionId, String source, int limit) { + UUID subscriptionId, String source, int limit, int paginationOffset) { if (CommonUtil.nullOrEmpty(source)) { return Entity.getCollectionDAO() .changeEventDAO() - .listFailedEventsById(subscriptionId.toString(), limit); + .listFailedEventsById(subscriptionId.toString(), limit, paginationOffset); } else { return Entity.getCollectionDAO() .changeEventDAO() - .listFailedEventsByIdAndSource(subscriptionId.toString(), source, limit); + .listFailedEventsByIdAndSource( + subscriptionId.toString(), source, limit, paginationOffset); } } - public List getFailedEventsById(UUID subscriptionId, int limit) { + public List listEventsForSubscription(UUID subscriptionId, int limit, long offset) { + Optional eventSubscriptionOffset = + getEventSubscriptionOffset(subscriptionId); + if (eventSubscriptionOffset.isEmpty()) { + return Collections.emptyList(); + } + + List typedEvents = + Entity.getCollectionDAO() + .changeEventDAO() + .listAllEventsWithStatuses( + subscriptionId.toString(), + eventSubscriptionOffset.get().getCurrentOffset(), + limit, + offset); + + EventSubscription eventSubscription = getEventSubscription(subscriptionId); + + return typedEvents.parallelStream() + .filter( + event -> { + if (TypedEvent.Status.UNPROCESSED.equals(event.getStatus()) + && !CommonUtil.nullOrEmpty(event.getData())) { + for (Object eventData : event.getData()) { + if (eventData instanceof ChangeEvent changeEvent) { + return AlertUtil.checkIfChangeEventIsAllowed( + changeEvent, eventSubscription.getFilteringRules()); + } + } + } + return true; // Keep FAILED and SUCCESSFUL records + }) + .toList(); + } + + private EventSubscription getEventSubscription(UUID eventSubscriptionId) { + EventSubscriptionRepository repository = + (EventSubscriptionRepository) Entity.getEntityRepository(Entity.EVENT_SUBSCRIPTION); + return repository.get(null, eventSubscriptionId, repository.getFields("*")); + } + + public List getFailedEventsById(UUID subscriptionId, int limit, int offset) { return Entity.getCollectionDAO() .changeEventDAO() - .listFailedEventsById(subscriptionId.toString(), limit); + .listFailedEventsById(subscriptionId.toString(), limit, offset); } - public List getAllFailedEvents(String source, int limit) { + public List getAllFailedEvents( + String source, int limit, int paginationOffset) { if (CommonUtil.nullOrEmpty(source)) { - return Entity.getCollectionDAO().changeEventDAO().listAllFailedEvents(limit); + return Entity.getCollectionDAO() + .changeEventDAO() + .listAllFailedEvents(limit, paginationOffset); } else { - return Entity.getCollectionDAO().changeEventDAO().listAllFailedEventsBySource(source, limit); + return Entity.getCollectionDAO() + .changeEventDAO() + .listAllFailedEventsBySource(source, limit, paginationOffset); } } - public List getSuccessfullySentChangeEventsForAlert(UUID id, int limit) { + public List getSuccessfullySentChangeEventsForAlert( + UUID id, int limit, int paginationOffset) { Optional eventSubscriptionOffset = getEventSubscriptionOffset(id); + if (eventSubscriptionOffset.isEmpty()) { + return Collections.emptyList(); + } - return eventSubscriptionOffset - .map( - offset -> { - List jsonEvents = - Entity.getCollectionDAO() - .changeEventDAO() - .listChangeEventsBeforeOffset(limit, offset.getOffset()); + List successfullySentChangeEvents = + Entity.getCollectionDAO() + .eventSubscriptionDAO() + .getSuccessfulChangeEventBySubscriptionId(id.toString(), limit, paginationOffset); - return jsonEvents.stream() - .map(json -> JsonUtils.readValue(json, ChangeEvent.class)) - .collect(Collectors.toList()); - }) - .orElse(Collections.emptyList()); + return successfullySentChangeEvents.stream() + .map(e -> JsonUtils.readValue(e, ChangeEvent.class)) + .collect(Collectors.toList()); } public Optional getEventSubscriptionFromScheduledJob(UUID id) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/events/subscription/AlertUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/events/subscription/AlertUtil.java index 625aa8ae0be..5371cd3bad3 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/events/subscription/AlertUtil.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/events/subscription/AlertUtil.java @@ -185,7 +185,7 @@ public final class AlertUtil { .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } - private static boolean checkIfChangeEventIsAllowed( + public static boolean checkIfChangeEventIsAllowed( ChangeEvent event, FilteringRules filteringRules) { boolean triggerChangeEvent = AlertUtil.shouldTriggerAlert(event, filteringRules); @@ -203,7 +203,8 @@ public final class AlertUtil { } public static EventSubscriptionOffset getStartingOffset(UUID eventSubscriptionId) { - long eventSubscriptionOffset; + long startingOffset; + long currentOffset; String json = Entity.getCollectionDAO() .eventSubscriptionDAO() @@ -211,11 +212,15 @@ public final class AlertUtil { if (json != null) { EventSubscriptionOffset offsetFromDb = JsonUtils.readValue(json, EventSubscriptionOffset.class); - eventSubscriptionOffset = offsetFromDb.getOffset(); + startingOffset = offsetFromDb.getStartingOffset(); + currentOffset = offsetFromDb.getCurrentOffset(); } else { - eventSubscriptionOffset = Entity.getCollectionDAO().changeEventDAO().getLatestOffset(); + currentOffset = Entity.getCollectionDAO().changeEventDAO().getLatestOffset(); + startingOffset = currentOffset; } - return new EventSubscriptionOffset().withOffset(eventSubscriptionOffset); + return new EventSubscriptionOffset() + .withCurrentOffset(currentOffset) + .withStartingOffset(startingOffset); } public static FilteringRules validateAndBuildFilteringConditions( diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java index 2d1adf7cb9b..c6775441015 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java @@ -124,6 +124,7 @@ import org.openmetadata.schema.settings.SettingsType; import org.openmetadata.schema.tests.TestCase; import org.openmetadata.schema.tests.TestDefinition; import org.openmetadata.schema.tests.TestSuite; +import org.openmetadata.schema.type.ChangeEvent; import org.openmetadata.schema.type.EntityReference; import org.openmetadata.schema.type.EventType; import org.openmetadata.schema.type.Include; @@ -140,6 +141,7 @@ import org.openmetadata.service.jdbi3.CollectionDAO.UsageDAO.UsageDetailsMapper; import org.openmetadata.service.jdbi3.FeedRepository.FilterType; import org.openmetadata.service.jdbi3.locator.ConnectionAwareSqlQuery; import org.openmetadata.service.jdbi3.locator.ConnectionAwareSqlUpdate; +import org.openmetadata.service.resources.events.subscription.TypedEvent; import org.openmetadata.service.resources.feeds.MessageParser.EntityLink; import org.openmetadata.service.resources.tags.TagLabelUtil; import org.openmetadata.service.util.EntityUtil; @@ -2108,6 +2110,52 @@ public interface CollectionDAO { @Bind("extension") String extension, @Bind("json") String json, @Bind("source") String source); + + @ConnectionAwareSqlUpdate( + value = + "INSERT INTO successful_sent_change_events (id, change_event_id, event_subscription_id, json, timestamp) VALUES (:id, :change_event_id, :event_subscription_id, :json, :timestamp)", + connectionType = MYSQL) + @ConnectionAwareSqlUpdate( + value = + "INSERT INTO successful_sent_change_events (id, change_event_id, event_subscription_id, json, timestamp) VALUES (:id, :change_event_id, :event_subscription_id, CAST(:json AS jsonb), :timestamp)", + connectionType = POSTGRES) + void insertSuccessfulChangeEvent( + @Bind("id") String id, + @Bind("change_event_id") String changeEventId, + @Bind("event_subscription_id") String eventSubscriptionId, + @Bind("json") String json, + @Bind("timestamp") long timestamp); + + @SqlQuery( + "SELECT COUNT(*) FROM successful_sent_change_events WHERE event_subscription_id = :eventSubscriptionId") + int getRecordCount(@Bind("eventSubscriptionId") String eventSubscriptionId); + + @SqlQuery( + "SELECT event_subscription_id FROM successful_sent_change_events " + + "GROUP BY event_subscription_id " + + "HAVING COUNT(*) >= :threshold") + List findSubscriptionsAboveThreshold(@Bind("threshold") int threshold); + + @SqlUpdate( + "DELETE FROM successful_sent_change_events WHERE event_subscription_id = :eventSubscriptionId ORDER BY timestamp ASC LIMIT :limit") + void deleteOldRecords( + @Bind("eventSubscriptionId") String eventSubscriptionId, @Bind("limit") int limit); + + @SqlQuery( + "SELECT json FROM successful_sent_change_events WHERE event_subscription_id = :eventSubscriptionId LIMIT :limit OFFSET :paginationOffset") + List getSuccessfulChangeEventBySubscriptionId( + @Bind("eventSubscriptionId") String eventSubscriptionId, + @Bind("limit") int limit, + @Bind("paginationOffset") long paginationOffset); + + @SqlUpdate( + "DELETE FROM successful_sent_change_events WHERE event_subscription_id = :eventSubscriptionId") + void deleteSuccessfulChangeEventBySubscriptionId( + @Bind("eventSubscriptionId") String eventSubscriptionId); + + @SqlUpdate("DELETE FROM consumers_dlq WHERE id = :eventSubscriptionId") + void deleteFailedRecordsBySubscriptionId( + @Bind("eventSubscriptionId") String eventSubscriptionId); } interface ChartDAO extends EntityDAO { @@ -3933,34 +3981,80 @@ public interface CollectionDAO { interface ChangeEventDAO { @SqlQuery( - "SELECT json FROM change_event ce where ce.offset > :offset ORDER BY ce.eventTime ASC LIMIT :limit") - List listUnprocessedEvents(@Bind("offset") long offset, @Bind("limit") long limit); + "SELECT json FROM change_event ce where ce.offset > :offset ORDER BY ce.eventTime ASC LIMIT :limit OFFSET :paginationOffset") + List listUnprocessedEvents( + @Bind("offset") long offset, + @Bind("limit") int limit, + @Bind("paginationOffset") int paginationOffset); @SqlQuery( - "SELECT json, source FROM consumers_dlq WHERE id = :id ORDER BY timestamp ASC LIMIT :limit") + "SELECT json, source FROM consumers_dlq WHERE id = :id ORDER BY timestamp ASC LIMIT :limit OFFSET :paginationOffset") @RegisterRowMapper(FailedEventResponseMapper.class) List listFailedEventsById( - @Bind("id") String id, @Bind("limit") long limit); + @Bind("id") String id, + @Bind("limit") int limit, + @Bind("paginationOffset") int paginationOffset); @SqlQuery( - "SELECT json, source FROM consumers_dlq WHERE id = :id AND source = :source ORDER BY timestamp ASC LIMIT :limit") + "SELECT json, source FROM consumers_dlq WHERE id = :id AND source = :source ORDER BY timestamp ASC LIMIT :limit OFFSET :paginationOffset") @RegisterRowMapper(FailedEventResponseMapper.class) List listFailedEventsByIdAndSource( - @Bind("id") String id, @Bind("source") String source, @Bind("limit") long limit); + @Bind("id") String id, + @Bind("source") String source, + @Bind("limit") int limit, + @Bind("paginationOffset") int paginationOffset); - @SqlQuery("SELECT json, source FROM consumers_dlq LIMIT :limit") + @SqlQuery("SELECT json, source FROM consumers_dlq LIMIT :limit OFFSET :paginationOffset") @RegisterRowMapper(FailedEventResponseMapper.class) - List listAllFailedEvents(@Bind("limit") long limit); - - @SqlQuery("SELECT json, source FROM consumers_dlq WHERE source = :source LIMIT :limit") - @RegisterRowMapper(FailedEventResponseMapper.class) - List listAllFailedEventsBySource( - @Bind("source") String source, @Bind("limit") long limit); + List listAllFailedEvents( + @Bind("limit") int limit, @Bind("paginationOffset") int paginationOffset); @SqlQuery( - "SELECT json FROM change_event ce where ce.offset < :offset ORDER BY ce.eventTime ASC LIMIT :limit") - List listChangeEventsBeforeOffset( - @Bind("limit") long limit, @Bind("offset") long offset); + "SELECT json, source FROM consumers_dlq WHERE source = :source LIMIT :limit OFFSET :paginationOffset") + @RegisterRowMapper(FailedEventResponseMapper.class) + List listAllFailedEventsBySource( + @Bind("source") String source, + @Bind("limit") int limit, + @Bind("paginationOffset") int paginationOffset); + + @ConnectionAwareSqlQuery( + value = + "SELECT json, status, timestamp " + + "FROM ( " + + " SELECT json, 'FAILED' AS status, timestamp " + + " FROM consumers_dlq WHERE id = :id " + + " UNION ALL " + + " SELECT json, 'SUCCESSFUL' AS status, timestamp " + + " FROM successful_sent_change_events WHERE event_subscription_id = :id " + + " UNION ALL " + + " SELECT ce.json, 'UNPROCESSED' AS status, ce.eventTime AS timestamp " + + " FROM change_event ce WHERE ce.offset > :currentOffset " + + ") AS combined_events " + + "ORDER BY timestamp ASC " + + "LIMIT :limit OFFSET :paginationOffset", + connectionType = MYSQL) + @ConnectionAwareSqlQuery( + value = + "SELECT json, status, timestamp " + + "FROM ( " + + " SELECT json, 'failed' AS status, timestamp " + + " FROM consumers_dlq WHERE id = :id " + + " UNION ALL " + + " SELECT json, 'successful' AS status, timestamp " + + " FROM successful_sent_change_events WHERE event_subscription_id = :id " + + " UNION ALL " + + " SELECT ce.json, 'unprocessed' AS status, ce.eventTime AS timestamp " + + " FROM change_event ce WHERE ce.offset > :currentOffset " + + ") AS combined_events " + + "ORDER BY timestamp ASC " + + "LIMIT :limit OFFSET :paginationOffset", + connectionType = POSTGRES) + @RegisterRowMapper(EventResponseMapper.class) + List listAllEventsWithStatuses( + @Bind("id") String id, + @Bind("currentOffset") long currentOffset, + @Bind("limit") int limit, + @Bind("paginationOffset") long paginationOffset); @SqlQuery( "SELECT CASE WHEN EXISTS (SELECT 1 FROM event_subscription_entity WHERE id = :id) THEN 1 ELSE 0 END AS record_exists") @@ -4028,6 +4122,28 @@ public interface CollectionDAO { } } + class EventResponseMapper implements RowMapper { + @Override + public TypedEvent map(ResultSet rs, StatementContext ctx) throws SQLException { + TypedEvent response = new TypedEvent(); + String status = rs.getString("status").toLowerCase(); + + if (TypedEvent.Status.FAILED.value().equalsIgnoreCase(status)) { + FailedEvent failedEvent = JsonUtils.readValue(rs.getString("json"), FailedEvent.class); + response.setData(List.of(failedEvent)); + response.setStatus(TypedEvent.Status.FAILED); + } else { + ChangeEvent changeEvent = JsonUtils.readValue(rs.getString("json"), ChangeEvent.class); + response.setData(List.of(changeEvent)); + response.setStatus(TypedEvent.Status.fromValue(status)); + } + + long timestampMillis = rs.getLong("timestamp"); + response.setTimestamp((double) timestampMillis); + return response; + } + } + interface TypeEntityDAO extends EntityDAO { @Override default String getTableName() { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/events/subscription/EventSubscriptionResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/events/subscription/EventSubscriptionResource.java index 48760fa9d98..dbcb615bbb5 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/events/subscription/EventSubscriptionResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/events/subscription/EventSubscriptionResource.java @@ -31,7 +31,6 @@ import io.swagger.v3.oas.annotations.tags.Tag; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -417,33 +416,6 @@ public class EventSubscriptionResource return super.listVersionsInternal(securityContext, id); } - @GET - @Path("/{id}/processedEvents") - @Operation( - operationId = "checkIfThePublisherProcessedALlEvents", - summary = "Check If the Publisher Processed All Events", - description = - "Return a boolean 'true' or 'false' to indicate if the publisher processed all events", - responses = { - @ApiResponse( - responseCode = "200", - description = "List of Event Subscription versions", - content = - @Content( - mediaType = "application/json", - schema = @Schema(implementation = EntityHistory.class))) - }) - public Response checkIfThePublisherProcessedALlEvents( - @Context UriInfo uriInfo, - @Context SecurityContext securityContext, - @Parameter(description = "Id of the Event Subscription", schema = @Schema(type = "UUID")) - @PathParam("id") - UUID id) { - return Response.ok() - .entity(EventSubscriptionScheduler.getInstance().checkIfPublisherPublishedAllEvents(id)) - .build(); - } - @GET @Path("/{id}/versions/{version}") @Operation( @@ -502,6 +474,7 @@ public class EventSubscriptionResource throws SchedulerException { EventSubscription eventSubscription = repository.get(null, id, repository.getFields("id")); EventSubscriptionScheduler.getInstance().deleteEventSubscriptionPublisher(eventSubscription); + EventSubscriptionScheduler.getInstance().deleteSuccessfulAndFailedEventsRecordByAlert(id); return delete(uriInfo, securityContext, id, true, true); } @@ -525,6 +498,8 @@ public class EventSubscriptionResource EventSubscription eventSubscription = repository.getByName(null, name, repository.getFields("id")); EventSubscriptionScheduler.getInstance().deleteEventSubscriptionPublisher(eventSubscription); + EventSubscriptionScheduler.getInstance() + .deleteSuccessfulAndFailedEventsRecordByAlert(eventSubscription.getId()); return deleteByName(uriInfo, securityContext, name, true, true); } @@ -631,6 +606,33 @@ public class EventSubscriptionResource AlertUtil.validateExpression(expression, Boolean.class); } + @GET + @Path("/{id}/processedEvents") + @Operation( + operationId = "checkIfThePublisherProcessedALlEvents", + summary = "Check If the Publisher Processed All Events", + description = + "Return a boolean 'true' or 'false' to indicate if the publisher processed all events", + responses = { + @ApiResponse( + responseCode = "200", + description = "List of Event Subscription versions", + content = + @Content( + mediaType = "application/json", + schema = @Schema(implementation = EntityHistory.class))) + }) + public Response checkIfThePublisherProcessedALlEvents( + @Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @Parameter(description = "Id of the Event Subscription", schema = @Schema(type = "UUID")) + @PathParam("id") + UUID id) { + return Response.ok() + .entity(EventSubscriptionScheduler.getInstance().checkIfPublisherPublishedAllEvents(id)) + .build(); + } + @GET @Path("id/{id}/listEvents") @Operation( @@ -662,10 +664,15 @@ public class EventSubscriptionResource description = "Maximum number of events to retrieve", schema = @Schema(type = "integer")) @QueryParam("limit") - @DefaultValue("100") + @DefaultValue("15") @Min(0) - int limit) { - + int limit, + @Parameter( + description = "Offset for pagination (starting point for records)", + schema = @Schema(type = "integer")) + @QueryParam("paginationOffset") + @DefaultValue("0") + int paginationOffset) { authorizer.authorizeAdmin(securityContext); try { @@ -683,13 +690,9 @@ public class EventSubscriptionResource } if (status == null) { - combinedEvents.addAll(fetchEvents(TypedEvent.Status.FAILED, id, limit)); - combinedEvents.addAll(fetchEvents(TypedEvent.Status.SUCCESSFUL, id, limit)); - combinedEvents.addAll(fetchEvents(TypedEvent.Status.UNPROCESSED, id, limit)); - // Sort combined events by timestamp in ascending order. - combinedEvents.sort(Comparator.comparing(TypedEvent::getTimestamp)); + combinedEvents.addAll(fetchEvents(id, limit, paginationOffset)); } else { - combinedEvents.addAll(fetchEvents(status, id, limit)); + combinedEvents.addAll(fetchEvents(id, limit, paginationOffset, status)); } return Response.ok().entity(combinedEvents).build(); @@ -706,16 +709,18 @@ public class EventSubscriptionResource } } - private List fetchEvents(TypedEvent.Status status, UUID id, int limit) { + private List fetchEvents( + UUID id, int limit, int paginationOffset, TypedEvent.Status status) { List events; switch (status) { case FAILED -> events = - EventSubscriptionScheduler.getInstance().getFailedEventsById(id, limit); + EventSubscriptionScheduler.getInstance().getFailedEventsById(id, limit, paginationOffset); case SUCCESSFUL -> events = EventSubscriptionScheduler.getInstance() - .getSuccessfullySentChangeEventsForAlert(id, limit); + .getSuccessfullySentChangeEventsForAlert(id, limit, paginationOffset); case UNPROCESSED -> events = - EventSubscriptionScheduler.getInstance().getUnpublishedEvents(id, limit); + EventSubscriptionScheduler.getInstance() + .getRelevantUnprocessedEvents(id, limit, paginationOffset); default -> throw new IllegalArgumentException("Unknown event status: " + status); } @@ -729,6 +734,11 @@ public class EventSubscriptionResource .toList(); } + private List fetchEvents(UUID id, int limit, int paginationOffset) { + return EventSubscriptionScheduler.getInstance() + .listEventsForSubscription(id, limit, paginationOffset); + } + private Long extractTimestamp(Object event) { if (event instanceof ChangeEvent changeEvent) { return changeEvent.getTimestamp(); @@ -760,10 +770,16 @@ public class EventSubscriptionResource @Context UriInfo uriInfo, @Context SecurityContext securityContext, @Parameter(description = "Maximum number of unprocessed events returned") - @DefaultValue("100") + @DefaultValue("15") @Min(0) @QueryParam("limit") int limit, + @Parameter( + description = "Offset for pagination (starting point for records)", + schema = @Schema(type = "integer")) + @QueryParam("paginationOffset") + @DefaultValue("0") + int paginationOffset, @Parameter(description = "UUID of the Event Subscription", schema = @Schema(type = "UUID")) @PathParam("subscriptionId") UUID subscriptionId) { @@ -777,7 +793,7 @@ public class EventSubscriptionResource EventSubscriptionDiagnosticInfo diagnosticInfo = EventSubscriptionScheduler.getInstance() - .getEventSubscriptionDiagnosticInfo(subscriptionId, limit); + .getEventSubscriptionDiagnosticInfo(subscriptionId, limit, paginationOffset); return Response.ok().entity(diagnosticInfo).build(); } catch (Exception e) { @@ -812,10 +828,16 @@ public class EventSubscriptionResource @Context UriInfo uriInfo, @Context SecurityContext securityContext, @Parameter(description = "Maximum number of unprocessed events returned") - @DefaultValue("100") + @DefaultValue("15") @Min(0) @QueryParam("limit") int limit, + @Parameter( + description = "Offset for pagination (starting point for records)", + schema = @Schema(type = "integer")) + @QueryParam("paginationOffset") + @DefaultValue("0") + int paginationOffset, @Parameter(description = "Name of the Event Subscription", schema = @Schema(type = "string")) @PathParam("subscriptionName") String subscriptionName) { @@ -832,7 +854,7 @@ public class EventSubscriptionResource EventSubscriptionDiagnosticInfo diagnosticInfo = EventSubscriptionScheduler.getInstance() - .getEventSubscriptionDiagnosticInfo(subscription.getId(), limit); + .getEventSubscriptionDiagnosticInfo(subscription.getId(), limit, paginationOffset); return Response.ok().entity(diagnosticInfo).build(); } catch (Exception e) { @@ -872,9 +894,15 @@ public class EventSubscriptionResource description = "Maximum number of failed events to retrieve", schema = @Schema(type = "integer")) @QueryParam("limit") - @DefaultValue("100") + @DefaultValue("15") @Min(0) int limit, + @Parameter( + description = "Offset for pagination (starting point for records)", + schema = @Schema(type = "integer")) + @QueryParam("paginationOffset") + @DefaultValue("0") + int paginationOffset, @Parameter(description = "Source of the failed events", schema = @Schema(type = "string")) @QueryParam("source") String source) { @@ -882,7 +910,8 @@ public class EventSubscriptionResource try { List failedEvents = - EventSubscriptionScheduler.getInstance().getFailedEventsByIdAndSource(id, source, limit); + EventSubscriptionScheduler.getInstance() + .getFailedEventsByIdAndSource(id, source, limit, paginationOffset); return Response.ok().entity(failedEvents).build(); } catch (Exception e) { @@ -920,9 +949,15 @@ public class EventSubscriptionResource description = "Maximum number of failed events to retrieve", schema = @Schema(type = "integer")) @QueryParam("limit") - @DefaultValue("100") + @DefaultValue("15") @Min(0) int limit, + @Parameter( + description = "Offset for pagination (starting point for records)", + schema = @Schema(type = "integer")) + @QueryParam("paginationOffset") + @DefaultValue("0") + int paginationOffset, @Parameter(description = "Source of the failed events", schema = @Schema(type = "string")) @QueryParam("source") String source) { @@ -933,7 +968,7 @@ public class EventSubscriptionResource List failedEvents = EventSubscriptionScheduler.getInstance() - .getFailedEventsByIdAndSource(subscription.getId(), source, limit); + .getFailedEventsByIdAndSource(subscription.getId(), source, limit, paginationOffset); return Response.ok().entity(failedEvents).build(); @@ -973,9 +1008,15 @@ public class EventSubscriptionResource description = "Maximum number of failed events to retrieve", schema = @Schema(type = "integer")) @QueryParam("limit") - @DefaultValue("100") + @DefaultValue("15") @Min(0) int limit, + @Parameter( + description = "Offset for pagination (starting point for records)", + schema = @Schema(type = "integer")) + @QueryParam("paginationOffset") + @DefaultValue("0") + int paginationOffset, @Parameter(description = "Source of the failed events", schema = @Schema(type = "string")) @QueryParam("source") String source) { @@ -983,7 +1024,8 @@ public class EventSubscriptionResource try { List failedEvents = - EventSubscriptionScheduler.getInstance().getAllFailedEvents(source, limit); + EventSubscriptionScheduler.getInstance() + .getAllFailedEvents(source, limit, paginationOffset); return Response.ok().entity(failedEvents).build(); } catch (Exception e) { @@ -1024,16 +1066,21 @@ public class EventSubscriptionResource description = "Maximum number of change events to retrieve", schema = @Schema(type = "integer")) @QueryParam("limit") - @DefaultValue("100") + @DefaultValue("15") @Min(0) - int limit) { - + int limit, + @Parameter( + description = "Offset for pagination (starting point for records)", + schema = @Schema(type = "integer")) + @QueryParam("paginationOffset") + @DefaultValue("0") + int paginationOffset) { authorizer.authorizeAdmin(securityContext); try { List changeEvents = EventSubscriptionScheduler.getInstance() - .getSuccessfullySentChangeEventsForAlert(id, limit); + .getSuccessfullySentChangeEventsForAlert(id, limit, paginationOffset); return Response.ok().entity(changeEvents).build(); } catch (EntityNotFoundException e) { @@ -1082,9 +1129,15 @@ public class EventSubscriptionResource description = "Maximum number of change events to retrieve", schema = @Schema(type = "integer")) @QueryParam("limit") - @DefaultValue("100") + @DefaultValue("15") @Min(0) - int limit) { + int limit, + @Parameter( + description = "Offset for pagination (starting point for records)", + schema = @Schema(type = "integer")) + @QueryParam("paginationOffset") + @DefaultValue("0") + int paginationOffset) { authorizer.authorizeAdmin(securityContext); try { @@ -1092,7 +1145,8 @@ public class EventSubscriptionResource List changeEvents = EventSubscriptionScheduler.getInstance() - .getSuccessfullySentChangeEventsForAlert(subscription.getId(), limit); + .getSuccessfullySentChangeEventsForAlert( + subscription.getId(), limit, paginationOffset); return Response.ok().entity(changeEvents).build(); } catch (EntityNotFoundException e) { diff --git a/openmetadata-spec/src/main/resources/json/schema/events/api/eventSubscriptionDiagnosticInfo.json b/openmetadata-spec/src/main/resources/json/schema/events/api/eventSubscriptionDiagnosticInfo.json index 5fb1ffd030d..9b0e2e56408 100644 --- a/openmetadata-spec/src/main/resources/json/schema/events/api/eventSubscriptionDiagnosticInfo.json +++ b/openmetadata-spec/src/main/resources/json/schema/events/api/eventSubscriptionDiagnosticInfo.json @@ -1,8 +1,8 @@ { "$id": "https://open-metadata.org/schema/events/api/eventSubscriptionDiagnosticInfo.json", "$schema": "http://json-schema.org/draft-07/schema#", - "title": "Event Subscription Status Response", - "description": "Schema defining the response for event subscription status, including details about processed and unprocessed events.", + "title": "Event Subscription Diagnostic Info", + "description": "Schema defining the response for event subscription diagnostics, including details about processed and unprocessed events.", "type": "object", "javaType": "org.openmetadata.schema.api.events.EventSubscriptionDiagnosticInfo", "properties": { @@ -14,22 +14,35 @@ "description": "The current offset of the event subscription.", "existingJavaType": "java.lang.Long" }, + "startingOffset": { + "description": "The initial offset of the event subscription when it started processing.", + "existingJavaType": "java.lang.Long" + }, "hasProcessedAllEvents": { "description": "Indicates whether all events have been processed.", "type": "boolean" }, - "unprocessedEventsCount": { - "description": "The count of unprocessed events.", + "relevantUnprocessedEventsCount": { + "description": "The number of relevant unprocessed events based on the alert's filtering rules.", "existingJavaType": "java.lang.Long" }, - "unprocessedEventsList": { - "description": "The list of unprocessed events.", + "relevantUnprocessedEventsList": { + "description": "A list of relevant unprocessed events based on the alert's filtering criteria.", "type": "array", "items": { "$ref": "../../type/changeEvent.json" - } - } + }, + "totalUnprocessedEventsCount": { + "description": "The total number of unprocessed events.", + "existingJavaType": "java.lang.Long" + }, + "totalUnprocessedEventsList": { + "description": "A list of all unprocessed events.", + "type": "array", + "items": { + "$ref": "../../type/changeEvent.json" + } } }, "additionalProperties": false } \ No newline at end of file diff --git a/openmetadata-spec/src/main/resources/json/schema/events/eventSubscriptionOffset.json b/openmetadata-spec/src/main/resources/json/schema/events/eventSubscriptionOffset.json index cc0c61dfa5e..03cd198dcdb 100644 --- a/openmetadata-spec/src/main/resources/json/schema/events/eventSubscriptionOffset.json +++ b/openmetadata-spec/src/main/resources/json/schema/events/eventSubscriptionOffset.json @@ -2,12 +2,17 @@ "$id": "https://open-metadata.org/schema/events/eventSubscriptionOffset.json", "$schema": "http://json-schema.org/draft-07/schema#", "title": "EventSubscriptionOffset", - "description": "Event Subscription Offset", + "description": "Represents the offsets for an event subscription, tracking the starting point and current position of events processed.", "type": "object", "javaType": "org.openmetadata.schema.entity.events.EventSubscriptionOffset", "properties": { - "offset": { - "description": "Name of this Event Filter.", + "startingOffset": { + "description": "The offset from where event processing starts.", + "type": "integer", + "existingJavaType": "Long" + }, + "currentOffset": { + "description": "The current position in the events.", "type": "integer", "existingJavaType": "Long" }, @@ -16,6 +21,6 @@ "$ref": "../type/basic.json#/definitions/timestamp" } }, - "required": ["offset"], + "required": ["startingOffset", "currentOffset"], "additionalProperties": false -} +} \ No newline at end of file