Fix Fetching Successfully Sent Change Events For Debuggability API (#18493)

* fix getSuccessfullySentChangeEventsForAlert

* migrations.

* offset-based pagination

* add startingOffset in EventSubscriptionDiagnosticInfo

* fix missing paginationOffset

* Record successfully sent change events in a separate table, clean up associated successful and failed records on alert deletion.

* Remove oldest 30 records if record count reaches 100 for successful_sent_change_events.

* list unprocessed changeEvents based on alert subscription rules, ensuring only relevant unprocessed events are listed.

* refactor: enhance eventSubscriptionDiagnosticInfo

Added new fields to the eventSubscriptionDiagnosticInfo schema to support detailed diagnostics:
- `relevantUnprocessedEventsCount`: Count of relevant unprocessed events based on alert filtering rules.
- `relevantUnprocessedEventsList`: List of relevant unprocessed events based on alert filtering rules.
- `totalUnprocessedEventsCount`: Count of all unprocessed events.
- `totalUnprocessedEventsList`: List of all unprocessed events.

* Refactor: delete 30 oldest records per alert when the count reaches 100 records

* fix: table creation issue in PostgreSQL

* fix tests.

* scheduleCleanupJob

* remove in-line deletion of records.

* fix postgres table structure issue.
This commit is contained in:
Siddhant 2024-11-08 17:17:16 +05:30 committed by GitHub
parent c05a8ab6be
commit 2f39b76c33
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 561 additions and 145 deletions

View File

@ -12,4 +12,27 @@ CREATE TABLE IF NOT EXISTS apps_data_store (
ALTER TABLE consumers_dlq ADD COLUMN source VARCHAR(255);
-- Create an index on the source column in the consumers_dlq table
CREATE INDEX idx_consumers_dlq_source ON consumers_dlq (source);
CREATE INDEX idx_consumers_dlq_source ON consumers_dlq (source);
-- Rename 'offset' to 'currentOffset' and add 'startingOffset'
UPDATE change_event_consumers
SET json = JSON_SET(
JSON_REMOVE(json, '$.offset'),
'$.currentOffset', JSON_EXTRACT(json, '$.offset'),
'$.startingOffset', JSON_EXTRACT(json, '$.offset')
)
WHERE JSON_EXTRACT(json, '$.offset') IS NOT NULL
AND jsonSchema = 'eventSubscriptionOffset';
-- Create table successful_sent_change_events for storing successfully sent events per alert
CREATE TABLE IF NOT EXISTS successful_sent_change_events (
id VARCHAR(36) NOT NULL,
change_event_id VARCHAR(36) NOT NULL,
event_subscription_id VARCHAR(36) NOT NULL,
json JSON NOT NULL,
timestamp BIGINT UNSIGNED NOT NULL,
PRIMARY KEY (id)
);
-- Create an index on the event_subscription_id column in the successful_sent_change_events table
CREATE INDEX idx_event_subscription_id ON successful_sent_change_events (event_subscription_id);

View File

@ -12,4 +12,26 @@ CREATE TABLE IF NOT EXISTS apps_data_store (
ALTER TABLE consumers_dlq ADD COLUMN source VARCHAR(255);
-- Create an index on the source column in the consumers_dlq table
CREATE INDEX idx_consumers_dlq_source ON consumers_dlq (source);
CREATE INDEX idx_consumers_dlq_source ON consumers_dlq (source);
-- Rename 'offset' to 'currentOffset' and add 'startingOffset'
UPDATE change_event_consumers
SET json = jsonb_set(
jsonb_set(json, '{currentOffset}', json -> 'offset'),
'{startingOffset}', json -> 'offset'
)
WHERE json -> 'offset' IS NOT NULL
AND jsonSchema = 'eventSubscriptionOffset';
-- Create table successful_sent_change_events for storing successfully sent events per alert
CREATE TABLE IF NOT EXISTS successful_sent_change_events (
id VARCHAR(36) PRIMARY KEY,
change_event_id VARCHAR(36) NOT NULL,
event_subscription_id VARCHAR(36) NOT NULL,
json jsonb NOT NULL,
timestamp BIGINT NOT NULL
);
-- Create an index on the event_subscription_id column in the successful_sent_change_events table
CREATE INDEX idx_event_subscription_id ON successful_sent_change_events (event_subscription_id);

View File

@ -55,6 +55,8 @@ public abstract class AbstractEventConsumer
public static final String FAILED_EVENT_EXTENSION = "eventSubscription.failedEvent";
private long offset = -1;
private long startingOffset = -1;
private AlertMetrics alertMetrics;
@Getter @Setter private JobDetail jobDetail;
@ -68,7 +70,8 @@ public abstract class AbstractEventConsumer
(EventSubscription) context.getJobDetail().getJobDataMap().get(ALERT_INFO_KEY);
this.jobDetail = context.getJobDetail();
this.eventSubscription = sub;
this.offset = loadInitialOffset(context);
this.offset = loadInitialOffset(context).getCurrentOffset();
this.startingOffset = loadInitialOffset(context).getStartingOffset();
this.alertMetrics = loadInitialMetrics();
this.destinationMap = loadDestinationsMap(context);
this.doInit(context);
@ -110,18 +113,29 @@ public abstract class AbstractEventConsumer
source.toString());
}
private long loadInitialOffset(JobExecutionContext context) {
private void recordSuccessfulChangeEvent(UUID eventSubscriptionId, ChangeEvent event) {
Entity.getCollectionDAO()
.eventSubscriptionDAO()
.insertSuccessfulChangeEvent(
UUID.randomUUID().toString(),
event.getId().toString(),
eventSubscriptionId.toString(),
JsonUtils.pojoToJson(event),
event.getTimestamp());
}
private EventSubscriptionOffset loadInitialOffset(JobExecutionContext context) {
EventSubscriptionOffset jobStoredOffset =
(EventSubscriptionOffset) jobDetail.getJobDataMap().get(ALERT_OFFSET_KEY);
// If the Job Data Map has the latest offset, use it
if (jobStoredOffset != null) {
return jobStoredOffset.getOffset();
return jobStoredOffset;
} else {
EventSubscriptionOffset eventSubscriptionOffset =
getStartingOffset(eventSubscription.getId());
// Update the Job Data Map with the latest offset
context.getJobDetail().getJobDataMap().put(ALERT_OFFSET_KEY, eventSubscriptionOffset);
return eventSubscriptionOffset.getOffset();
return eventSubscriptionOffset;
}
}
@ -171,6 +185,7 @@ public abstract class AbstractEventConsumer
for (UUID receiverId : eventWithReceivers.getValue()) {
try {
sendAlert(receiverId, eventWithReceivers.getKey());
recordSuccessfulChangeEvent(eventSubscription.getId(), eventWithReceivers.getKey());
alertMetrics.withSuccessEvents(alertMetrics.getSuccessEvents() + 1);
} catch (EventPublisherException e) {
alertMetrics.withFailedEvents(alertMetrics.getFailedEvents() + 1);
@ -185,7 +200,10 @@ public abstract class AbstractEventConsumer
long currentTime = System.currentTimeMillis();
// Upsert Offset
EventSubscriptionOffset eventSubscriptionOffset =
new EventSubscriptionOffset().withOffset(offset).withTimestamp(currentTime);
new EventSubscriptionOffset()
.withCurrentOffset(offset)
.withStartingOffset(startingOffset)
.withTimestamp(currentTime);
Entity.getCollectionDAO()
.eventSubscriptionDAO()

View File

@ -0,0 +1,41 @@
package org.openmetadata.service.events.scheduled;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.service.Entity;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
@Slf4j
public class EventSubscriptionCleanupJob implements Job {
private static final int TARGET_COUNT = 50;
private static final int THRESHOLD = 100;
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
performCleanup();
}
@Transaction
public static void performCleanup() {
List<String> subscriptionsToClean =
Entity.getCollectionDAO().eventSubscriptionDAO().findSubscriptionsAboveThreshold(THRESHOLD);
for (String subscriptionId : subscriptionsToClean) {
int recordCount =
Entity.getCollectionDAO().eventSubscriptionDAO().getRecordCount(subscriptionId);
int excessRecords = recordCount - TARGET_COUNT;
if (excessRecords > 0) {
Entity.getCollectionDAO()
.eventSubscriptionDAO()
.deleteOldRecords(subscriptionId, excessRecords);
}
}
LOG.debug(
"Performed cleanup for subscriptions, retaining {} records per subscription.",
TARGET_COUNT);
}
}

View File

@ -19,6 +19,7 @@ import static org.openmetadata.service.events.subscription.AlertUtil.getStarting
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
@ -36,7 +37,10 @@ import org.openmetadata.schema.entity.events.SubscriptionStatus;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.service.Entity;
import org.openmetadata.service.apps.bundles.changeEvent.AlertPublisher;
import org.openmetadata.service.events.subscription.AlertUtil;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.jdbi3.EventSubscriptionRepository;
import org.openmetadata.service.resources.events.subscription.TypedEvent;
import org.openmetadata.service.util.JsonUtils;
import org.quartz.JobBuilder;
import org.quartz.JobDataMap;
@ -56,6 +60,8 @@ public class EventSubscriptionScheduler {
public static final String ALERT_TRIGGER_GROUP = "OMAlertJobGroup";
private static EventSubscriptionScheduler instance;
private static volatile boolean initialized = false;
public static volatile boolean cleanupJobInitialised = false;
private final Scheduler alertsScheduler = new StdSchedulerFactory().getScheduler();
private EventSubscriptionScheduler() throws SchedulerException {
@ -111,6 +117,7 @@ public class EventSubscriptionScheduler {
// Schedule the Job
alertsScheduler.scheduleJob(jobDetail, trigger);
instance.scheduleCleanupJob();
LOG.info(
"Event Subscription started as {} : status {} for all Destinations",
@ -163,6 +170,44 @@ public class EventSubscriptionScheduler {
LOG.info("Alert publisher deleted for {}", deletedEntity.getName());
}
public void scheduleCleanupJob() {
if (!cleanupJobInitialised) {
try {
JobDetail cleanupJob =
JobBuilder.newJob(EventSubscriptionCleanupJob.class)
.withIdentity("CleanupJob", ALERT_JOB_GROUP)
.build();
Trigger cleanupTrigger =
TriggerBuilder.newTrigger()
.withIdentity("CleanupTrigger", ALERT_TRIGGER_GROUP)
.withSchedule(
SimpleScheduleBuilder.simpleSchedule()
.withIntervalInSeconds(10)
.repeatForever())
.startNow()
.build();
alertsScheduler.scheduleJob(cleanupJob, cleanupTrigger);
cleanupJobInitialised = true;
LOG.info("Scheduled periodic cleanup job to run every 10 seconds.");
} catch (SchedulerException e) {
LOG.error("Failed to schedule cleanup job", e);
}
}
}
@Transaction
public void deleteSuccessfulAndFailedEventsRecordByAlert(UUID id) {
Entity.getCollectionDAO()
.eventSubscriptionDAO()
.deleteSuccessfulChangeEventBySubscriptionId(id.toString());
Entity.getCollectionDAO()
.eventSubscriptionDAO()
.deleteFailedRecordsBySubscriptionId(id.toString());
}
public SubscriptionStatus getStatusForEventSubscription(UUID subscriptionId, UUID destinationId) {
Optional<EventSubscription> eventSubscriptionOpt =
getEventSubscriptionFromScheduledJob(subscriptionId);
@ -218,38 +263,42 @@ public class EventSubscriptionScheduler {
}
public EventSubscriptionDiagnosticInfo getEventSubscriptionDiagnosticInfo(
UUID subscriptionId, int limit) {
boolean isAllEventsPublished = checkIfPublisherPublishedAllEvents(subscriptionId);
EventSubscriptionOffset latestOffset = getLatestOffset();
UUID subscriptionId, int limit, int paginationOffset) {
boolean hasProcessedAllEvents = checkIfPublisherPublishedAllEvents(subscriptionId);
Optional<EventSubscriptionOffset> eventSubscriptionOffsetOptional =
getEventSubscriptionOffset(subscriptionId);
long currentOffset =
getEventSubscriptionOffset(subscriptionId)
.map(EventSubscriptionOffset::getOffset)
.orElse(0L);
eventSubscriptionOffsetOptional.map(EventSubscriptionOffset::getCurrentOffset).orElse(0L);
long unpublishedEventCount = getUnpublishedEventCount(subscriptionId);
long startingOffset =
eventSubscriptionOffsetOptional.map(EventSubscriptionOffset::getStartingOffset).orElse(0L);
long totalUnprocessedEventCount = getUnpublishedEventCount(subscriptionId);
List<ChangeEvent> unprocessedEvents =
Optional.ofNullable(getUnpublishedEvents(subscriptionId, limit))
Optional.ofNullable(getRelevantUnprocessedEvents(subscriptionId, limit, paginationOffset))
.orElse(Collections.emptyList());
return new EventSubscriptionDiagnosticInfo()
.withLatestOffset(latestOffset.getOffset())
.withCurrentOffset(currentOffset)
.withHasProcessedAllEvents(isAllEventsPublished)
.withUnprocessedEventsCount(unpublishedEventCount)
.withUnprocessedEventsList(unprocessedEvents);
}
List<ChangeEvent> allUnprocessedEvents =
getAllUnprocessedEvents(subscriptionId, limit, paginationOffset);
public static EventSubscriptionOffset getLatestOffset() {
return new EventSubscriptionOffset()
.withOffset(Entity.getCollectionDAO().changeEventDAO().getLatestOffset());
return new EventSubscriptionDiagnosticInfo()
.withLatestOffset(Entity.getCollectionDAO().changeEventDAO().getLatestOffset())
.withCurrentOffset(currentOffset)
.withStartingOffset(startingOffset)
.withHasProcessedAllEvents(hasProcessedAllEvents)
.withTotalUnprocessedEventsCount(totalUnprocessedEventCount)
.withTotalUnprocessedEventsList(allUnprocessedEvents)
.withRelevantUnprocessedEventsCount((long) unprocessedEvents.size())
.withRelevantUnprocessedEventsList(unprocessedEvents);
}
public boolean checkIfPublisherPublishedAllEvents(UUID subscriptionID) {
long countOfEvents = Entity.getCollectionDAO().changeEventDAO().getLatestOffset();
return getEventSubscriptionOffset(subscriptionID)
.map(offset -> offset.getOffset() == countOfEvents)
.map(offset -> offset.getCurrentOffset() == countOfEvents)
.orElse(false);
}
@ -257,67 +306,137 @@ public class EventSubscriptionScheduler {
long countOfEvents = Entity.getCollectionDAO().changeEventDAO().getLatestOffset();
return getEventSubscriptionOffset(subscriptionID)
.map(offset -> Math.abs(countOfEvents - offset.getOffset()))
.map(offset -> Math.abs(countOfEvents - offset.getCurrentOffset()))
.orElse(countOfEvents);
}
public List<ChangeEvent> getUnpublishedEvents(UUID subscriptionId, int limit) {
public List<ChangeEvent> getRelevantUnprocessedEvents(
UUID subscriptionId, int limit, int paginationOffset) {
long offset =
getEventSubscriptionOffset(subscriptionId)
.map(EventSubscriptionOffset::getOffset)
.map(EventSubscriptionOffset::getCurrentOffset)
.orElse(Entity.getCollectionDAO().changeEventDAO().getLatestOffset());
List<String> unprocessedEventJsonList =
Entity.getCollectionDAO().changeEventDAO().listUnprocessedEvents(offset, limit);
return Entity.getCollectionDAO()
.changeEventDAO()
.listUnprocessedEvents(offset, limit, paginationOffset)
.parallelStream()
.map(
eventJson -> {
ChangeEvent event = JsonUtils.readValue(eventJson, ChangeEvent.class);
return AlertUtil.checkIfChangeEventIsAllowed(
event, getEventSubscription(subscriptionId).getFilteringRules())
? event
: null;
})
.filter(Objects::nonNull) // Remove null entries (events that did not pass filtering)
.toList();
}
return unprocessedEventJsonList.stream()
public List<ChangeEvent> getAllUnprocessedEvents(
UUID subscriptionId, int limit, int paginationOffset) {
long offset =
getEventSubscriptionOffset(subscriptionId)
.map(EventSubscriptionOffset::getCurrentOffset)
.orElse(Entity.getCollectionDAO().changeEventDAO().getLatestOffset());
return Entity.getCollectionDAO()
.changeEventDAO()
.listUnprocessedEvents(offset, limit, paginationOffset)
.parallelStream()
.map(eventJson -> JsonUtils.readValue(eventJson, ChangeEvent.class))
.collect(Collectors.toList());
}
public List<FailedEventResponse> getFailedEventsByIdAndSource(
UUID subscriptionId, String source, int limit) {
UUID subscriptionId, String source, int limit, int paginationOffset) {
if (CommonUtil.nullOrEmpty(source)) {
return Entity.getCollectionDAO()
.changeEventDAO()
.listFailedEventsById(subscriptionId.toString(), limit);
.listFailedEventsById(subscriptionId.toString(), limit, paginationOffset);
} else {
return Entity.getCollectionDAO()
.changeEventDAO()
.listFailedEventsByIdAndSource(subscriptionId.toString(), source, limit);
.listFailedEventsByIdAndSource(
subscriptionId.toString(), source, limit, paginationOffset);
}
}
public List<FailedEventResponse> getFailedEventsById(UUID subscriptionId, int limit) {
public List<TypedEvent> listEventsForSubscription(UUID subscriptionId, int limit, long offset) {
Optional<EventSubscriptionOffset> eventSubscriptionOffset =
getEventSubscriptionOffset(subscriptionId);
if (eventSubscriptionOffset.isEmpty()) {
return Collections.emptyList();
}
List<TypedEvent> typedEvents =
Entity.getCollectionDAO()
.changeEventDAO()
.listAllEventsWithStatuses(
subscriptionId.toString(),
eventSubscriptionOffset.get().getCurrentOffset(),
limit,
offset);
EventSubscription eventSubscription = getEventSubscription(subscriptionId);
return typedEvents.parallelStream()
.filter(
event -> {
if (TypedEvent.Status.UNPROCESSED.equals(event.getStatus())
&& !CommonUtil.nullOrEmpty(event.getData())) {
for (Object eventData : event.getData()) {
if (eventData instanceof ChangeEvent changeEvent) {
return AlertUtil.checkIfChangeEventIsAllowed(
changeEvent, eventSubscription.getFilteringRules());
}
}
}
return true; // Keep FAILED and SUCCESSFUL records
})
.toList();
}
private EventSubscription getEventSubscription(UUID eventSubscriptionId) {
EventSubscriptionRepository repository =
(EventSubscriptionRepository) Entity.getEntityRepository(Entity.EVENT_SUBSCRIPTION);
return repository.get(null, eventSubscriptionId, repository.getFields("*"));
}
public List<FailedEventResponse> getFailedEventsById(UUID subscriptionId, int limit, int offset) {
return Entity.getCollectionDAO()
.changeEventDAO()
.listFailedEventsById(subscriptionId.toString(), limit);
.listFailedEventsById(subscriptionId.toString(), limit, offset);
}
public List<FailedEventResponse> getAllFailedEvents(String source, int limit) {
public List<FailedEventResponse> getAllFailedEvents(
String source, int limit, int paginationOffset) {
if (CommonUtil.nullOrEmpty(source)) {
return Entity.getCollectionDAO().changeEventDAO().listAllFailedEvents(limit);
return Entity.getCollectionDAO()
.changeEventDAO()
.listAllFailedEvents(limit, paginationOffset);
} else {
return Entity.getCollectionDAO().changeEventDAO().listAllFailedEventsBySource(source, limit);
return Entity.getCollectionDAO()
.changeEventDAO()
.listAllFailedEventsBySource(source, limit, paginationOffset);
}
}
public List<ChangeEvent> getSuccessfullySentChangeEventsForAlert(UUID id, int limit) {
public List<ChangeEvent> getSuccessfullySentChangeEventsForAlert(
UUID id, int limit, int paginationOffset) {
Optional<EventSubscriptionOffset> eventSubscriptionOffset = getEventSubscriptionOffset(id);
if (eventSubscriptionOffset.isEmpty()) {
return Collections.emptyList();
}
return eventSubscriptionOffset
.map(
offset -> {
List<String> jsonEvents =
Entity.getCollectionDAO()
.changeEventDAO()
.listChangeEventsBeforeOffset(limit, offset.getOffset());
List<String> successfullySentChangeEvents =
Entity.getCollectionDAO()
.eventSubscriptionDAO()
.getSuccessfulChangeEventBySubscriptionId(id.toString(), limit, paginationOffset);
return jsonEvents.stream()
.map(json -> JsonUtils.readValue(json, ChangeEvent.class))
.collect(Collectors.toList());
})
.orElse(Collections.emptyList());
return successfullySentChangeEvents.stream()
.map(e -> JsonUtils.readValue(e, ChangeEvent.class))
.collect(Collectors.toList());
}
public Optional<EventSubscription> getEventSubscriptionFromScheduledJob(UUID id) {

View File

@ -185,7 +185,7 @@ public final class AlertUtil {
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
private static boolean checkIfChangeEventIsAllowed(
public static boolean checkIfChangeEventIsAllowed(
ChangeEvent event, FilteringRules filteringRules) {
boolean triggerChangeEvent = AlertUtil.shouldTriggerAlert(event, filteringRules);
@ -203,7 +203,8 @@ public final class AlertUtil {
}
public static EventSubscriptionOffset getStartingOffset(UUID eventSubscriptionId) {
long eventSubscriptionOffset;
long startingOffset;
long currentOffset;
String json =
Entity.getCollectionDAO()
.eventSubscriptionDAO()
@ -211,11 +212,15 @@ public final class AlertUtil {
if (json != null) {
EventSubscriptionOffset offsetFromDb =
JsonUtils.readValue(json, EventSubscriptionOffset.class);
eventSubscriptionOffset = offsetFromDb.getOffset();
startingOffset = offsetFromDb.getStartingOffset();
currentOffset = offsetFromDb.getCurrentOffset();
} else {
eventSubscriptionOffset = Entity.getCollectionDAO().changeEventDAO().getLatestOffset();
currentOffset = Entity.getCollectionDAO().changeEventDAO().getLatestOffset();
startingOffset = currentOffset;
}
return new EventSubscriptionOffset().withOffset(eventSubscriptionOffset);
return new EventSubscriptionOffset()
.withCurrentOffset(currentOffset)
.withStartingOffset(startingOffset);
}
public static FilteringRules validateAndBuildFilteringConditions(

View File

@ -124,6 +124,7 @@ import org.openmetadata.schema.settings.SettingsType;
import org.openmetadata.schema.tests.TestCase;
import org.openmetadata.schema.tests.TestDefinition;
import org.openmetadata.schema.tests.TestSuite;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.EventType;
import org.openmetadata.schema.type.Include;
@ -140,6 +141,7 @@ import org.openmetadata.service.jdbi3.CollectionDAO.UsageDAO.UsageDetailsMapper;
import org.openmetadata.service.jdbi3.FeedRepository.FilterType;
import org.openmetadata.service.jdbi3.locator.ConnectionAwareSqlQuery;
import org.openmetadata.service.jdbi3.locator.ConnectionAwareSqlUpdate;
import org.openmetadata.service.resources.events.subscription.TypedEvent;
import org.openmetadata.service.resources.feeds.MessageParser.EntityLink;
import org.openmetadata.service.resources.tags.TagLabelUtil;
import org.openmetadata.service.util.EntityUtil;
@ -2108,6 +2110,52 @@ public interface CollectionDAO {
@Bind("extension") String extension,
@Bind("json") String json,
@Bind("source") String source);
@ConnectionAwareSqlUpdate(
value =
"INSERT INTO successful_sent_change_events (id, change_event_id, event_subscription_id, json, timestamp) VALUES (:id, :change_event_id, :event_subscription_id, :json, :timestamp)",
connectionType = MYSQL)
@ConnectionAwareSqlUpdate(
value =
"INSERT INTO successful_sent_change_events (id, change_event_id, event_subscription_id, json, timestamp) VALUES (:id, :change_event_id, :event_subscription_id, CAST(:json AS jsonb), :timestamp)",
connectionType = POSTGRES)
void insertSuccessfulChangeEvent(
@Bind("id") String id,
@Bind("change_event_id") String changeEventId,
@Bind("event_subscription_id") String eventSubscriptionId,
@Bind("json") String json,
@Bind("timestamp") long timestamp);
@SqlQuery(
"SELECT COUNT(*) FROM successful_sent_change_events WHERE event_subscription_id = :eventSubscriptionId")
int getRecordCount(@Bind("eventSubscriptionId") String eventSubscriptionId);
@SqlQuery(
"SELECT event_subscription_id FROM successful_sent_change_events "
+ "GROUP BY event_subscription_id "
+ "HAVING COUNT(*) >= :threshold")
List<String> findSubscriptionsAboveThreshold(@Bind("threshold") int threshold);
@SqlUpdate(
"DELETE FROM successful_sent_change_events WHERE event_subscription_id = :eventSubscriptionId ORDER BY timestamp ASC LIMIT :limit")
void deleteOldRecords(
@Bind("eventSubscriptionId") String eventSubscriptionId, @Bind("limit") int limit);
@SqlQuery(
"SELECT json FROM successful_sent_change_events WHERE event_subscription_id = :eventSubscriptionId LIMIT :limit OFFSET :paginationOffset")
List<String> getSuccessfulChangeEventBySubscriptionId(
@Bind("eventSubscriptionId") String eventSubscriptionId,
@Bind("limit") int limit,
@Bind("paginationOffset") long paginationOffset);
@SqlUpdate(
"DELETE FROM successful_sent_change_events WHERE event_subscription_id = :eventSubscriptionId")
void deleteSuccessfulChangeEventBySubscriptionId(
@Bind("eventSubscriptionId") String eventSubscriptionId);
@SqlUpdate("DELETE FROM consumers_dlq WHERE id = :eventSubscriptionId")
void deleteFailedRecordsBySubscriptionId(
@Bind("eventSubscriptionId") String eventSubscriptionId);
}
interface ChartDAO extends EntityDAO<Chart> {
@ -3933,34 +3981,80 @@ public interface CollectionDAO {
interface ChangeEventDAO {
@SqlQuery(
"SELECT json FROM change_event ce where ce.offset > :offset ORDER BY ce.eventTime ASC LIMIT :limit")
List<String> listUnprocessedEvents(@Bind("offset") long offset, @Bind("limit") long limit);
"SELECT json FROM change_event ce where ce.offset > :offset ORDER BY ce.eventTime ASC LIMIT :limit OFFSET :paginationOffset")
List<String> listUnprocessedEvents(
@Bind("offset") long offset,
@Bind("limit") int limit,
@Bind("paginationOffset") int paginationOffset);
@SqlQuery(
"SELECT json, source FROM consumers_dlq WHERE id = :id ORDER BY timestamp ASC LIMIT :limit")
"SELECT json, source FROM consumers_dlq WHERE id = :id ORDER BY timestamp ASC LIMIT :limit OFFSET :paginationOffset")
@RegisterRowMapper(FailedEventResponseMapper.class)
List<FailedEventResponse> listFailedEventsById(
@Bind("id") String id, @Bind("limit") long limit);
@Bind("id") String id,
@Bind("limit") int limit,
@Bind("paginationOffset") int paginationOffset);
@SqlQuery(
"SELECT json, source FROM consumers_dlq WHERE id = :id AND source = :source ORDER BY timestamp ASC LIMIT :limit")
"SELECT json, source FROM consumers_dlq WHERE id = :id AND source = :source ORDER BY timestamp ASC LIMIT :limit OFFSET :paginationOffset")
@RegisterRowMapper(FailedEventResponseMapper.class)
List<FailedEventResponse> listFailedEventsByIdAndSource(
@Bind("id") String id, @Bind("source") String source, @Bind("limit") long limit);
@Bind("id") String id,
@Bind("source") String source,
@Bind("limit") int limit,
@Bind("paginationOffset") int paginationOffset);
@SqlQuery("SELECT json, source FROM consumers_dlq LIMIT :limit")
@SqlQuery("SELECT json, source FROM consumers_dlq LIMIT :limit OFFSET :paginationOffset")
@RegisterRowMapper(FailedEventResponseMapper.class)
List<FailedEventResponse> listAllFailedEvents(@Bind("limit") long limit);
@SqlQuery("SELECT json, source FROM consumers_dlq WHERE source = :source LIMIT :limit")
@RegisterRowMapper(FailedEventResponseMapper.class)
List<FailedEventResponse> listAllFailedEventsBySource(
@Bind("source") String source, @Bind("limit") long limit);
List<FailedEventResponse> listAllFailedEvents(
@Bind("limit") int limit, @Bind("paginationOffset") int paginationOffset);
@SqlQuery(
"SELECT json FROM change_event ce where ce.offset < :offset ORDER BY ce.eventTime ASC LIMIT :limit")
List<String> listChangeEventsBeforeOffset(
@Bind("limit") long limit, @Bind("offset") long offset);
"SELECT json, source FROM consumers_dlq WHERE source = :source LIMIT :limit OFFSET :paginationOffset")
@RegisterRowMapper(FailedEventResponseMapper.class)
List<FailedEventResponse> listAllFailedEventsBySource(
@Bind("source") String source,
@Bind("limit") int limit,
@Bind("paginationOffset") int paginationOffset);
@ConnectionAwareSqlQuery(
value =
"SELECT json, status, timestamp "
+ "FROM ( "
+ " SELECT json, 'FAILED' AS status, timestamp "
+ " FROM consumers_dlq WHERE id = :id "
+ " UNION ALL "
+ " SELECT json, 'SUCCESSFUL' AS status, timestamp "
+ " FROM successful_sent_change_events WHERE event_subscription_id = :id "
+ " UNION ALL "
+ " SELECT ce.json, 'UNPROCESSED' AS status, ce.eventTime AS timestamp "
+ " FROM change_event ce WHERE ce.offset > :currentOffset "
+ ") AS combined_events "
+ "ORDER BY timestamp ASC "
+ "LIMIT :limit OFFSET :paginationOffset",
connectionType = MYSQL)
@ConnectionAwareSqlQuery(
value =
"SELECT json, status, timestamp "
+ "FROM ( "
+ " SELECT json, 'failed' AS status, timestamp "
+ " FROM consumers_dlq WHERE id = :id "
+ " UNION ALL "
+ " SELECT json, 'successful' AS status, timestamp "
+ " FROM successful_sent_change_events WHERE event_subscription_id = :id "
+ " UNION ALL "
+ " SELECT ce.json, 'unprocessed' AS status, ce.eventTime AS timestamp "
+ " FROM change_event ce WHERE ce.offset > :currentOffset "
+ ") AS combined_events "
+ "ORDER BY timestamp ASC "
+ "LIMIT :limit OFFSET :paginationOffset",
connectionType = POSTGRES)
@RegisterRowMapper(EventResponseMapper.class)
List<TypedEvent> listAllEventsWithStatuses(
@Bind("id") String id,
@Bind("currentOffset") long currentOffset,
@Bind("limit") int limit,
@Bind("paginationOffset") long paginationOffset);
@SqlQuery(
"SELECT CASE WHEN EXISTS (SELECT 1 FROM event_subscription_entity WHERE id = :id) THEN 1 ELSE 0 END AS record_exists")
@ -4028,6 +4122,28 @@ public interface CollectionDAO {
}
}
class EventResponseMapper implements RowMapper<TypedEvent> {
@Override
public TypedEvent map(ResultSet rs, StatementContext ctx) throws SQLException {
TypedEvent response = new TypedEvent();
String status = rs.getString("status").toLowerCase();
if (TypedEvent.Status.FAILED.value().equalsIgnoreCase(status)) {
FailedEvent failedEvent = JsonUtils.readValue(rs.getString("json"), FailedEvent.class);
response.setData(List.of(failedEvent));
response.setStatus(TypedEvent.Status.FAILED);
} else {
ChangeEvent changeEvent = JsonUtils.readValue(rs.getString("json"), ChangeEvent.class);
response.setData(List.of(changeEvent));
response.setStatus(TypedEvent.Status.fromValue(status));
}
long timestampMillis = rs.getLong("timestamp");
response.setTimestamp((double) timestampMillis);
return response;
}
}
interface TypeEntityDAO extends EntityDAO<Type> {
@Override
default String getTableName() {

View File

@ -31,7 +31,6 @@ import io.swagger.v3.oas.annotations.tags.Tag;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -417,33 +416,6 @@ public class EventSubscriptionResource
return super.listVersionsInternal(securityContext, id);
}
@GET
@Path("/{id}/processedEvents")
@Operation(
operationId = "checkIfThePublisherProcessedALlEvents",
summary = "Check If the Publisher Processed All Events",
description =
"Return a boolean 'true' or 'false' to indicate if the publisher processed all events",
responses = {
@ApiResponse(
responseCode = "200",
description = "List of Event Subscription versions",
content =
@Content(
mediaType = "application/json",
schema = @Schema(implementation = EntityHistory.class)))
})
public Response checkIfThePublisherProcessedALlEvents(
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "Id of the Event Subscription", schema = @Schema(type = "UUID"))
@PathParam("id")
UUID id) {
return Response.ok()
.entity(EventSubscriptionScheduler.getInstance().checkIfPublisherPublishedAllEvents(id))
.build();
}
@GET
@Path("/{id}/versions/{version}")
@Operation(
@ -502,6 +474,7 @@ public class EventSubscriptionResource
throws SchedulerException {
EventSubscription eventSubscription = repository.get(null, id, repository.getFields("id"));
EventSubscriptionScheduler.getInstance().deleteEventSubscriptionPublisher(eventSubscription);
EventSubscriptionScheduler.getInstance().deleteSuccessfulAndFailedEventsRecordByAlert(id);
return delete(uriInfo, securityContext, id, true, true);
}
@ -525,6 +498,8 @@ public class EventSubscriptionResource
EventSubscription eventSubscription =
repository.getByName(null, name, repository.getFields("id"));
EventSubscriptionScheduler.getInstance().deleteEventSubscriptionPublisher(eventSubscription);
EventSubscriptionScheduler.getInstance()
.deleteSuccessfulAndFailedEventsRecordByAlert(eventSubscription.getId());
return deleteByName(uriInfo, securityContext, name, true, true);
}
@ -631,6 +606,33 @@ public class EventSubscriptionResource
AlertUtil.validateExpression(expression, Boolean.class);
}
@GET
@Path("/{id}/processedEvents")
@Operation(
operationId = "checkIfThePublisherProcessedALlEvents",
summary = "Check If the Publisher Processed All Events",
description =
"Return a boolean 'true' or 'false' to indicate if the publisher processed all events",
responses = {
@ApiResponse(
responseCode = "200",
description = "List of Event Subscription versions",
content =
@Content(
mediaType = "application/json",
schema = @Schema(implementation = EntityHistory.class)))
})
public Response checkIfThePublisherProcessedALlEvents(
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "Id of the Event Subscription", schema = @Schema(type = "UUID"))
@PathParam("id")
UUID id) {
return Response.ok()
.entity(EventSubscriptionScheduler.getInstance().checkIfPublisherPublishedAllEvents(id))
.build();
}
@GET
@Path("id/{id}/listEvents")
@Operation(
@ -662,10 +664,15 @@ public class EventSubscriptionResource
description = "Maximum number of events to retrieve",
schema = @Schema(type = "integer"))
@QueryParam("limit")
@DefaultValue("100")
@DefaultValue("15")
@Min(0)
int limit) {
int limit,
@Parameter(
description = "Offset for pagination (starting point for records)",
schema = @Schema(type = "integer"))
@QueryParam("paginationOffset")
@DefaultValue("0")
int paginationOffset) {
authorizer.authorizeAdmin(securityContext);
try {
@ -683,13 +690,9 @@ public class EventSubscriptionResource
}
if (status == null) {
combinedEvents.addAll(fetchEvents(TypedEvent.Status.FAILED, id, limit));
combinedEvents.addAll(fetchEvents(TypedEvent.Status.SUCCESSFUL, id, limit));
combinedEvents.addAll(fetchEvents(TypedEvent.Status.UNPROCESSED, id, limit));
// Sort combined events by timestamp in ascending order.
combinedEvents.sort(Comparator.comparing(TypedEvent::getTimestamp));
combinedEvents.addAll(fetchEvents(id, limit, paginationOffset));
} else {
combinedEvents.addAll(fetchEvents(status, id, limit));
combinedEvents.addAll(fetchEvents(id, limit, paginationOffset, status));
}
return Response.ok().entity(combinedEvents).build();
@ -706,16 +709,18 @@ public class EventSubscriptionResource
}
}
private List<TypedEvent> fetchEvents(TypedEvent.Status status, UUID id, int limit) {
private List<TypedEvent> fetchEvents(
UUID id, int limit, int paginationOffset, TypedEvent.Status status) {
List<?> events;
switch (status) {
case FAILED -> events =
EventSubscriptionScheduler.getInstance().getFailedEventsById(id, limit);
EventSubscriptionScheduler.getInstance().getFailedEventsById(id, limit, paginationOffset);
case SUCCESSFUL -> events =
EventSubscriptionScheduler.getInstance()
.getSuccessfullySentChangeEventsForAlert(id, limit);
.getSuccessfullySentChangeEventsForAlert(id, limit, paginationOffset);
case UNPROCESSED -> events =
EventSubscriptionScheduler.getInstance().getUnpublishedEvents(id, limit);
EventSubscriptionScheduler.getInstance()
.getRelevantUnprocessedEvents(id, limit, paginationOffset);
default -> throw new IllegalArgumentException("Unknown event status: " + status);
}
@ -729,6 +734,11 @@ public class EventSubscriptionResource
.toList();
}
private List<TypedEvent> fetchEvents(UUID id, int limit, int paginationOffset) {
return EventSubscriptionScheduler.getInstance()
.listEventsForSubscription(id, limit, paginationOffset);
}
private Long extractTimestamp(Object event) {
if (event instanceof ChangeEvent changeEvent) {
return changeEvent.getTimestamp();
@ -760,10 +770,16 @@ public class EventSubscriptionResource
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "Maximum number of unprocessed events returned")
@DefaultValue("100")
@DefaultValue("15")
@Min(0)
@QueryParam("limit")
int limit,
@Parameter(
description = "Offset for pagination (starting point for records)",
schema = @Schema(type = "integer"))
@QueryParam("paginationOffset")
@DefaultValue("0")
int paginationOffset,
@Parameter(description = "UUID of the Event Subscription", schema = @Schema(type = "UUID"))
@PathParam("subscriptionId")
UUID subscriptionId) {
@ -777,7 +793,7 @@ public class EventSubscriptionResource
EventSubscriptionDiagnosticInfo diagnosticInfo =
EventSubscriptionScheduler.getInstance()
.getEventSubscriptionDiagnosticInfo(subscriptionId, limit);
.getEventSubscriptionDiagnosticInfo(subscriptionId, limit, paginationOffset);
return Response.ok().entity(diagnosticInfo).build();
} catch (Exception e) {
@ -812,10 +828,16 @@ public class EventSubscriptionResource
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "Maximum number of unprocessed events returned")
@DefaultValue("100")
@DefaultValue("15")
@Min(0)
@QueryParam("limit")
int limit,
@Parameter(
description = "Offset for pagination (starting point for records)",
schema = @Schema(type = "integer"))
@QueryParam("paginationOffset")
@DefaultValue("0")
int paginationOffset,
@Parameter(description = "Name of the Event Subscription", schema = @Schema(type = "string"))
@PathParam("subscriptionName")
String subscriptionName) {
@ -832,7 +854,7 @@ public class EventSubscriptionResource
EventSubscriptionDiagnosticInfo diagnosticInfo =
EventSubscriptionScheduler.getInstance()
.getEventSubscriptionDiagnosticInfo(subscription.getId(), limit);
.getEventSubscriptionDiagnosticInfo(subscription.getId(), limit, paginationOffset);
return Response.ok().entity(diagnosticInfo).build();
} catch (Exception e) {
@ -872,9 +894,15 @@ public class EventSubscriptionResource
description = "Maximum number of failed events to retrieve",
schema = @Schema(type = "integer"))
@QueryParam("limit")
@DefaultValue("100")
@DefaultValue("15")
@Min(0)
int limit,
@Parameter(
description = "Offset for pagination (starting point for records)",
schema = @Schema(type = "integer"))
@QueryParam("paginationOffset")
@DefaultValue("0")
int paginationOffset,
@Parameter(description = "Source of the failed events", schema = @Schema(type = "string"))
@QueryParam("source")
String source) {
@ -882,7 +910,8 @@ public class EventSubscriptionResource
try {
List<FailedEventResponse> failedEvents =
EventSubscriptionScheduler.getInstance().getFailedEventsByIdAndSource(id, source, limit);
EventSubscriptionScheduler.getInstance()
.getFailedEventsByIdAndSource(id, source, limit, paginationOffset);
return Response.ok().entity(failedEvents).build();
} catch (Exception e) {
@ -920,9 +949,15 @@ public class EventSubscriptionResource
description = "Maximum number of failed events to retrieve",
schema = @Schema(type = "integer"))
@QueryParam("limit")
@DefaultValue("100")
@DefaultValue("15")
@Min(0)
int limit,
@Parameter(
description = "Offset for pagination (starting point for records)",
schema = @Schema(type = "integer"))
@QueryParam("paginationOffset")
@DefaultValue("0")
int paginationOffset,
@Parameter(description = "Source of the failed events", schema = @Schema(type = "string"))
@QueryParam("source")
String source) {
@ -933,7 +968,7 @@ public class EventSubscriptionResource
List<FailedEventResponse> failedEvents =
EventSubscriptionScheduler.getInstance()
.getFailedEventsByIdAndSource(subscription.getId(), source, limit);
.getFailedEventsByIdAndSource(subscription.getId(), source, limit, paginationOffset);
return Response.ok().entity(failedEvents).build();
@ -973,9 +1008,15 @@ public class EventSubscriptionResource
description = "Maximum number of failed events to retrieve",
schema = @Schema(type = "integer"))
@QueryParam("limit")
@DefaultValue("100")
@DefaultValue("15")
@Min(0)
int limit,
@Parameter(
description = "Offset for pagination (starting point for records)",
schema = @Schema(type = "integer"))
@QueryParam("paginationOffset")
@DefaultValue("0")
int paginationOffset,
@Parameter(description = "Source of the failed events", schema = @Schema(type = "string"))
@QueryParam("source")
String source) {
@ -983,7 +1024,8 @@ public class EventSubscriptionResource
try {
List<FailedEventResponse> failedEvents =
EventSubscriptionScheduler.getInstance().getAllFailedEvents(source, limit);
EventSubscriptionScheduler.getInstance()
.getAllFailedEvents(source, limit, paginationOffset);
return Response.ok().entity(failedEvents).build();
} catch (Exception e) {
@ -1024,16 +1066,21 @@ public class EventSubscriptionResource
description = "Maximum number of change events to retrieve",
schema = @Schema(type = "integer"))
@QueryParam("limit")
@DefaultValue("100")
@DefaultValue("15")
@Min(0)
int limit) {
int limit,
@Parameter(
description = "Offset for pagination (starting point for records)",
schema = @Schema(type = "integer"))
@QueryParam("paginationOffset")
@DefaultValue("0")
int paginationOffset) {
authorizer.authorizeAdmin(securityContext);
try {
List<ChangeEvent> changeEvents =
EventSubscriptionScheduler.getInstance()
.getSuccessfullySentChangeEventsForAlert(id, limit);
.getSuccessfullySentChangeEventsForAlert(id, limit, paginationOffset);
return Response.ok().entity(changeEvents).build();
} catch (EntityNotFoundException e) {
@ -1082,9 +1129,15 @@ public class EventSubscriptionResource
description = "Maximum number of change events to retrieve",
schema = @Schema(type = "integer"))
@QueryParam("limit")
@DefaultValue("100")
@DefaultValue("15")
@Min(0)
int limit) {
int limit,
@Parameter(
description = "Offset for pagination (starting point for records)",
schema = @Schema(type = "integer"))
@QueryParam("paginationOffset")
@DefaultValue("0")
int paginationOffset) {
authorizer.authorizeAdmin(securityContext);
try {
@ -1092,7 +1145,8 @@ public class EventSubscriptionResource
List<ChangeEvent> changeEvents =
EventSubscriptionScheduler.getInstance()
.getSuccessfullySentChangeEventsForAlert(subscription.getId(), limit);
.getSuccessfullySentChangeEventsForAlert(
subscription.getId(), limit, paginationOffset);
return Response.ok().entity(changeEvents).build();
} catch (EntityNotFoundException e) {

View File

@ -1,8 +1,8 @@
{
"$id": "https://open-metadata.org/schema/events/api/eventSubscriptionDiagnosticInfo.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Event Subscription Status Response",
"description": "Schema defining the response for event subscription status, including details about processed and unprocessed events.",
"title": "Event Subscription Diagnostic Info",
"description": "Schema defining the response for event subscription diagnostics, including details about processed and unprocessed events.",
"type": "object",
"javaType": "org.openmetadata.schema.api.events.EventSubscriptionDiagnosticInfo",
"properties": {
@ -14,22 +14,35 @@
"description": "The current offset of the event subscription.",
"existingJavaType": "java.lang.Long"
},
"startingOffset": {
"description": "The initial offset of the event subscription when it started processing.",
"existingJavaType": "java.lang.Long"
},
"hasProcessedAllEvents": {
"description": "Indicates whether all events have been processed.",
"type": "boolean"
},
"unprocessedEventsCount": {
"description": "The count of unprocessed events.",
"relevantUnprocessedEventsCount": {
"description": "The number of relevant unprocessed events based on the alert's filtering rules.",
"existingJavaType": "java.lang.Long"
},
"unprocessedEventsList": {
"description": "The list of unprocessed events.",
"relevantUnprocessedEventsList": {
"description": "A list of relevant unprocessed events based on the alert's filtering criteria.",
"type": "array",
"items": {
"$ref": "../../type/changeEvent.json"
}
}
},
"totalUnprocessedEventsCount": {
"description": "The total number of unprocessed events.",
"existingJavaType": "java.lang.Long"
},
"totalUnprocessedEventsList": {
"description": "A list of all unprocessed events.",
"type": "array",
"items": {
"$ref": "../../type/changeEvent.json"
} }
},
"additionalProperties": false
}

View File

@ -2,12 +2,17 @@
"$id": "https://open-metadata.org/schema/events/eventSubscriptionOffset.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "EventSubscriptionOffset",
"description": "Event Subscription Offset",
"description": "Represents the offsets for an event subscription, tracking the starting point and current position of events processed.",
"type": "object",
"javaType": "org.openmetadata.schema.entity.events.EventSubscriptionOffset",
"properties": {
"offset": {
"description": "Name of this Event Filter.",
"startingOffset": {
"description": "The offset from where event processing starts.",
"type": "integer",
"existingJavaType": "Long"
},
"currentOffset": {
"description": "The current position in the events.",
"type": "integer",
"existingJavaType": "Long"
},
@ -16,6 +21,6 @@
"$ref": "../type/basic.json#/definitions/timestamp"
}
},
"required": ["offset"],
"required": ["startingOffset", "currentOffset"],
"additionalProperties": false
}
}