diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/EventSubscriptionCleanupJob.java b/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/EventSubscriptionCleanupJob.java index 1bba401b12f..f0eb43ab9cc 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/EventSubscriptionCleanupJob.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/EventSubscriptionCleanupJob.java @@ -24,10 +24,10 @@ public class EventSubscriptionCleanupJob implements Job { Entity.getCollectionDAO().eventSubscriptionDAO().findSubscriptionsAboveThreshold(THRESHOLD); for (String subscriptionId : subscriptionsToClean) { - int recordCount = - Entity.getCollectionDAO().eventSubscriptionDAO().getRecordCount(subscriptionId); + long recordCount = + Entity.getCollectionDAO().eventSubscriptionDAO().getSuccessfulRecordCount(subscriptionId); - int excessRecords = recordCount - TARGET_COUNT; + long excessRecords = recordCount - TARGET_COUNT; if (excessRecords > 0) { Entity.getCollectionDAO() .eventSubscriptionDAO() diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/EventSubscriptionScheduler.java b/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/EventSubscriptionScheduler.java index 3d3d435f6b8..5d02360cc3c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/EventSubscriptionScheduler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/EventSubscriptionScheduler.java @@ -263,19 +263,40 @@ public class EventSubscriptionScheduler { } public EventSubscriptionDiagnosticInfo getEventSubscriptionDiagnosticInfo( - UUID subscriptionId, int limit, int paginationOffset) { - boolean hasProcessedAllEvents = checkIfPublisherPublishedAllEvents(subscriptionId); - + UUID subscriptionId, int limit, int paginationOffset, boolean listCountOnly) { Optional eventSubscriptionOffsetOptional = getEventSubscriptionOffset(subscriptionId); long currentOffset = eventSubscriptionOffsetOptional.map(EventSubscriptionOffset::getCurrentOffset).orElse(0L); - + long latestOffset = Entity.getCollectionDAO().changeEventDAO().getLatestOffset(); long startingOffset = eventSubscriptionOffsetOptional.map(EventSubscriptionOffset::getStartingOffset).orElse(0L); + long failedEventsCount = + Entity.getCollectionDAO().changeEventDAO().countFailedEvents(subscriptionId.toString()); + + long successfulEventsCount = + Entity.getCollectionDAO() + .eventSubscriptionDAO() + .getSuccessfulRecordCount(subscriptionId.toString()); long totalUnprocessedEventCount = getUnpublishedEventCount(subscriptionId); + + boolean hasProcessedAllEvents = checkIfPublisherPublishedAllEvents(subscriptionId); + + if (listCountOnly) { + return new EventSubscriptionDiagnosticInfo() + .withLatestOffset(latestOffset) + .withCurrentOffset(currentOffset) + .withStartingOffset(startingOffset) + .withHasProcessedAllEvents(hasProcessedAllEvents) + .withSuccessfulEventsCount(successfulEventsCount) + .withFailedEventsCount(failedEventsCount) + .withTotalUnprocessedEventsCount(totalUnprocessedEventCount) + .withRelevantUnprocessedEventsList(null) + .withTotalUnprocessedEventsList(null); + } + List unprocessedEvents = Optional.ofNullable(getRelevantUnprocessedEvents(subscriptionId, limit, paginationOffset)) .orElse(Collections.emptyList()); @@ -288,6 +309,8 @@ public class EventSubscriptionScheduler { .withCurrentOffset(currentOffset) .withStartingOffset(startingOffset) .withHasProcessedAllEvents(hasProcessedAllEvents) + .withSuccessfulEventsCount(successfulEventsCount) + .withFailedEventsCount(failedEventsCount) .withTotalUnprocessedEventsCount(totalUnprocessedEventCount) .withTotalUnprocessedEventsList(allUnprocessedEvents) .withRelevantUnprocessedEventsCount((long) unprocessedEvents.size()) @@ -369,32 +392,9 @@ public class EventSubscriptionScheduler { return Collections.emptyList(); } - List typedEvents = - Entity.getCollectionDAO() - .changeEventDAO() - .listAllEventsWithStatuses( - subscriptionId.toString(), - eventSubscriptionOffset.get().getCurrentOffset(), - limit, - offset); - - EventSubscription eventSubscription = getEventSubscription(subscriptionId); - - return typedEvents.parallelStream() - .filter( - event -> { - if (TypedEvent.Status.UNPROCESSED.equals(event.getStatus()) - && !CommonUtil.nullOrEmpty(event.getData())) { - for (Object eventData : event.getData()) { - if (eventData instanceof ChangeEvent changeEvent) { - return AlertUtil.checkIfChangeEventIsAllowed( - changeEvent, eventSubscription.getFilteringRules()); - } - } - } - return true; // Keep FAILED and SUCCESSFUL records - }) - .toList(); + return Entity.getCollectionDAO() + .changeEventDAO() + .listAllEventsWithStatuses(subscriptionId.toString(), limit, offset); } private EventSubscription getEventSubscription(UUID eventSubscriptionId) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java index c6775441015..fa0a5ec25f3 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java @@ -2128,7 +2128,7 @@ public interface CollectionDAO { @SqlQuery( "SELECT COUNT(*) FROM successful_sent_change_events WHERE event_subscription_id = :eventSubscriptionId") - int getRecordCount(@Bind("eventSubscriptionId") String eventSubscriptionId); + long getSuccessfulRecordCount(@Bind("eventSubscriptionId") String eventSubscriptionId); @SqlQuery( "SELECT event_subscription_id FROM successful_sent_change_events " @@ -2139,10 +2139,10 @@ public interface CollectionDAO { @SqlUpdate( "DELETE FROM successful_sent_change_events WHERE event_subscription_id = :eventSubscriptionId ORDER BY timestamp ASC LIMIT :limit") void deleteOldRecords( - @Bind("eventSubscriptionId") String eventSubscriptionId, @Bind("limit") int limit); + @Bind("eventSubscriptionId") String eventSubscriptionId, @Bind("limit") long limit); @SqlQuery( - "SELECT json FROM successful_sent_change_events WHERE event_subscription_id = :eventSubscriptionId LIMIT :limit OFFSET :paginationOffset") + "SELECT json FROM successful_sent_change_events WHERE event_subscription_id = :eventSubscriptionId ORDER BY timestamp DESC LIMIT :limit OFFSET :paginationOffset") List getSuccessfulChangeEventBySubscriptionId( @Bind("eventSubscriptionId") String eventSubscriptionId, @Bind("limit") int limit, @@ -3981,22 +3981,25 @@ public interface CollectionDAO { interface ChangeEventDAO { @SqlQuery( - "SELECT json FROM change_event ce where ce.offset > :offset ORDER BY ce.eventTime ASC LIMIT :limit OFFSET :paginationOffset") + "SELECT json FROM change_event ce where ce.offset > :offset ORDER BY ce.eventTime DESC LIMIT :limit OFFSET :paginationOffset") List listUnprocessedEvents( @Bind("offset") long offset, @Bind("limit") int limit, @Bind("paginationOffset") int paginationOffset); @SqlQuery( - "SELECT json, source FROM consumers_dlq WHERE id = :id ORDER BY timestamp ASC LIMIT :limit OFFSET :paginationOffset") + "SELECT json, source FROM consumers_dlq WHERE id = :id ORDER BY timestamp DESC LIMIT :limit OFFSET :paginationOffset") @RegisterRowMapper(FailedEventResponseMapper.class) List listFailedEventsById( @Bind("id") String id, @Bind("limit") int limit, @Bind("paginationOffset") int paginationOffset); + @SqlQuery("SELECT COUNT(*) FROM consumers_dlq WHERE id = :id") + long countFailedEvents(@Bind("id") String id); + @SqlQuery( - "SELECT json, source FROM consumers_dlq WHERE id = :id AND source = :source ORDER BY timestamp ASC LIMIT :limit OFFSET :paginationOffset") + "SELECT json, source FROM consumers_dlq WHERE id = :id AND source = :source ORDER BY timestamp DESC LIMIT :limit OFFSET :paginationOffset") @RegisterRowMapper(FailedEventResponseMapper.class) List listFailedEventsByIdAndSource( @Bind("id") String id, @@ -4004,13 +4007,14 @@ public interface CollectionDAO { @Bind("limit") int limit, @Bind("paginationOffset") int paginationOffset); - @SqlQuery("SELECT json, source FROM consumers_dlq LIMIT :limit OFFSET :paginationOffset") + @SqlQuery( + "SELECT json, source FROM consumers_dlq ORDER BY timestamp DESC LIMIT :limit OFFSET :paginationOffset") @RegisterRowMapper(FailedEventResponseMapper.class) List listAllFailedEvents( @Bind("limit") int limit, @Bind("paginationOffset") int paginationOffset); @SqlQuery( - "SELECT json, source FROM consumers_dlq WHERE source = :source LIMIT :limit OFFSET :paginationOffset") + "SELECT json, source FROM consumers_dlq WHERE source = :source ORDER BY timestamp DESC LIMIT :limit OFFSET :paginationOffset") @RegisterRowMapper(FailedEventResponseMapper.class) List listAllFailedEventsBySource( @Bind("source") String source, @@ -4026,11 +4030,8 @@ public interface CollectionDAO { + " UNION ALL " + " SELECT json, 'SUCCESSFUL' AS status, timestamp " + " FROM successful_sent_change_events WHERE event_subscription_id = :id " - + " UNION ALL " - + " SELECT ce.json, 'UNPROCESSED' AS status, ce.eventTime AS timestamp " - + " FROM change_event ce WHERE ce.offset > :currentOffset " + ") AS combined_events " - + "ORDER BY timestamp ASC " + + "ORDER BY timestamp DESC " + "LIMIT :limit OFFSET :paginationOffset", connectionType = MYSQL) @ConnectionAwareSqlQuery( @@ -4042,9 +4043,6 @@ public interface CollectionDAO { + " UNION ALL " + " SELECT json, 'successful' AS status, timestamp " + " FROM successful_sent_change_events WHERE event_subscription_id = :id " - + " UNION ALL " - + " SELECT ce.json, 'unprocessed' AS status, ce.eventTime AS timestamp " - + " FROM change_event ce WHERE ce.offset > :currentOffset " + ") AS combined_events " + "ORDER BY timestamp ASC " + "LIMIT :limit OFFSET :paginationOffset", @@ -4052,7 +4050,6 @@ public interface CollectionDAO { @RegisterRowMapper(EventResponseMapper.class) List listAllEventsWithStatuses( @Bind("id") String id, - @Bind("currentOffset") long currentOffset, @Bind("limit") int limit, @Bind("paginationOffset") long paginationOffset); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/events/subscription/EventSubscriptionResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/events/subscription/EventSubscriptionResource.java index dbcb615bbb5..8056111cc66 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/events/subscription/EventSubscriptionResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/events/subscription/EventSubscriptionResource.java @@ -650,11 +650,11 @@ public class EventSubscriptionResource @Context UriInfo uriInfo, @Context SecurityContext securityContext, @Parameter( - description = "Status of events to retrieve (failed, successful, unprocessed)", + description = "Status of events to retrieve (failed, successful)", schema = @Schema( type = "string", - allowableValues = {"failed", "successful", "unprocessed"})) + allowableValues = {"failed", "successful"})) @QueryParam("status") String statusParam, @Parameter(description = "ID of the alert or destination", schema = @Schema(type = "UUID")) @@ -684,8 +684,7 @@ public class EventSubscriptionResource status = TypedEvent.Status.fromValue(statusParam); } catch (IllegalArgumentException e) { throw new WebApplicationException( - "Invalid status. Must be 'failed', 'successful', or 'unprocessed'.", - Response.Status.BAD_REQUEST); + "Invalid status. Must be 'failed' or 'successful'.", Response.Status.BAD_REQUEST); } } @@ -718,9 +717,6 @@ public class EventSubscriptionResource case SUCCESSFUL -> events = EventSubscriptionScheduler.getInstance() .getSuccessfullySentChangeEventsForAlert(id, limit, paginationOffset); - case UNPROCESSED -> events = - EventSubscriptionScheduler.getInstance() - .getRelevantUnprocessedEvents(id, limit, paginationOffset); default -> throw new IllegalArgumentException("Unknown event status: " + status); } @@ -782,7 +778,11 @@ public class EventSubscriptionResource int paginationOffset, @Parameter(description = "UUID of the Event Subscription", schema = @Schema(type = "UUID")) @PathParam("subscriptionId") - UUID subscriptionId) { + UUID subscriptionId, + @Parameter(description = "Return only count if true") + @QueryParam("listCountOnly") + @DefaultValue("false") + boolean listCountOnly) { authorizer.authorizeAdmin(securityContext); try { if (!EventSubscriptionScheduler.getInstance().doesRecordExist(subscriptionId)) { @@ -793,7 +793,8 @@ public class EventSubscriptionResource EventSubscriptionDiagnosticInfo diagnosticInfo = EventSubscriptionScheduler.getInstance() - .getEventSubscriptionDiagnosticInfo(subscriptionId, limit, paginationOffset); + .getEventSubscriptionDiagnosticInfo( + subscriptionId, limit, paginationOffset, listCountOnly); return Response.ok().entity(diagnosticInfo).build(); } catch (Exception e) { @@ -840,7 +841,11 @@ public class EventSubscriptionResource int paginationOffset, @Parameter(description = "Name of the Event Subscription", schema = @Schema(type = "string")) @PathParam("subscriptionName") - String subscriptionName) { + String subscriptionName, + @Parameter(description = "Return only count if true") + @QueryParam("listCountOnly") + @DefaultValue("false") + boolean listCountOnly) { authorizer.authorizeAdmin(securityContext); try { EventSubscription subscription = @@ -854,7 +859,8 @@ public class EventSubscriptionResource EventSubscriptionDiagnosticInfo diagnosticInfo = EventSubscriptionScheduler.getInstance() - .getEventSubscriptionDiagnosticInfo(subscription.getId(), limit, paginationOffset); + .getEventSubscriptionDiagnosticInfo( + subscription.getId(), limit, paginationOffset, listCountOnly); return Response.ok().entity(diagnosticInfo).build(); } catch (Exception e) { diff --git a/openmetadata-spec/src/main/resources/json/schema/events/api/eventSubscriptionDiagnosticInfo.json b/openmetadata-spec/src/main/resources/json/schema/events/api/eventSubscriptionDiagnosticInfo.json index 9b0e2e56408..229ee9daaf1 100644 --- a/openmetadata-spec/src/main/resources/json/schema/events/api/eventSubscriptionDiagnosticInfo.json +++ b/openmetadata-spec/src/main/resources/json/schema/events/api/eventSubscriptionDiagnosticInfo.json @@ -22,10 +22,22 @@ "description": "Indicates whether all events have been processed.", "type": "boolean" }, + "successfulEventsCount": { + "description": "Count of successful events.", + "existingJavaType": "java.lang.Long" + }, + "failedEventsCount": { + "description": "Count of failed events.", + "existingJavaType": "java.lang.Long" + }, "relevantUnprocessedEventsCount": { "description": "The number of relevant unprocessed events based on the alert's filtering rules.", "existingJavaType": "java.lang.Long" }, + "totalUnprocessedEventsCount": { + "description": "The total number of unprocessed events.", + "existingJavaType": "java.lang.Long" + }, "relevantUnprocessedEventsList": { "description": "A list of relevant unprocessed events based on the alert's filtering criteria.", "type": "array", @@ -33,10 +45,6 @@ "$ref": "../../type/changeEvent.json" } }, - "totalUnprocessedEventsCount": { - "description": "The total number of unprocessed events.", - "existingJavaType": "java.lang.Long" - }, "totalUnprocessedEventsList": { "description": "A list of all unprocessed events.", "type": "array",