mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2026-01-06 04:26:57 +00:00
Debuggability Improvements and Fixes (#18564)
* fix: list records from latest to earliest. * refactor: remove listing unprocessed events from listEvents API. * refactor: EventSubscriptionDiagnosticInfo.
This commit is contained in:
parent
b02c64931e
commit
9d9eccedda
@ -24,10 +24,10 @@ public class EventSubscriptionCleanupJob implements Job {
|
||||
Entity.getCollectionDAO().eventSubscriptionDAO().findSubscriptionsAboveThreshold(THRESHOLD);
|
||||
|
||||
for (String subscriptionId : subscriptionsToClean) {
|
||||
int recordCount =
|
||||
Entity.getCollectionDAO().eventSubscriptionDAO().getRecordCount(subscriptionId);
|
||||
long recordCount =
|
||||
Entity.getCollectionDAO().eventSubscriptionDAO().getSuccessfulRecordCount(subscriptionId);
|
||||
|
||||
int excessRecords = recordCount - TARGET_COUNT;
|
||||
long excessRecords = recordCount - TARGET_COUNT;
|
||||
if (excessRecords > 0) {
|
||||
Entity.getCollectionDAO()
|
||||
.eventSubscriptionDAO()
|
||||
|
||||
@ -263,19 +263,40 @@ public class EventSubscriptionScheduler {
|
||||
}
|
||||
|
||||
public EventSubscriptionDiagnosticInfo getEventSubscriptionDiagnosticInfo(
|
||||
UUID subscriptionId, int limit, int paginationOffset) {
|
||||
boolean hasProcessedAllEvents = checkIfPublisherPublishedAllEvents(subscriptionId);
|
||||
|
||||
UUID subscriptionId, int limit, int paginationOffset, boolean listCountOnly) {
|
||||
Optional<EventSubscriptionOffset> eventSubscriptionOffsetOptional =
|
||||
getEventSubscriptionOffset(subscriptionId);
|
||||
|
||||
long currentOffset =
|
||||
eventSubscriptionOffsetOptional.map(EventSubscriptionOffset::getCurrentOffset).orElse(0L);
|
||||
|
||||
long latestOffset = Entity.getCollectionDAO().changeEventDAO().getLatestOffset();
|
||||
long startingOffset =
|
||||
eventSubscriptionOffsetOptional.map(EventSubscriptionOffset::getStartingOffset).orElse(0L);
|
||||
long failedEventsCount =
|
||||
Entity.getCollectionDAO().changeEventDAO().countFailedEvents(subscriptionId.toString());
|
||||
|
||||
long successfulEventsCount =
|
||||
Entity.getCollectionDAO()
|
||||
.eventSubscriptionDAO()
|
||||
.getSuccessfulRecordCount(subscriptionId.toString());
|
||||
|
||||
long totalUnprocessedEventCount = getUnpublishedEventCount(subscriptionId);
|
||||
|
||||
boolean hasProcessedAllEvents = checkIfPublisherPublishedAllEvents(subscriptionId);
|
||||
|
||||
if (listCountOnly) {
|
||||
return new EventSubscriptionDiagnosticInfo()
|
||||
.withLatestOffset(latestOffset)
|
||||
.withCurrentOffset(currentOffset)
|
||||
.withStartingOffset(startingOffset)
|
||||
.withHasProcessedAllEvents(hasProcessedAllEvents)
|
||||
.withSuccessfulEventsCount(successfulEventsCount)
|
||||
.withFailedEventsCount(failedEventsCount)
|
||||
.withTotalUnprocessedEventsCount(totalUnprocessedEventCount)
|
||||
.withRelevantUnprocessedEventsList(null)
|
||||
.withTotalUnprocessedEventsList(null);
|
||||
}
|
||||
|
||||
List<ChangeEvent> unprocessedEvents =
|
||||
Optional.ofNullable(getRelevantUnprocessedEvents(subscriptionId, limit, paginationOffset))
|
||||
.orElse(Collections.emptyList());
|
||||
@ -288,6 +309,8 @@ public class EventSubscriptionScheduler {
|
||||
.withCurrentOffset(currentOffset)
|
||||
.withStartingOffset(startingOffset)
|
||||
.withHasProcessedAllEvents(hasProcessedAllEvents)
|
||||
.withSuccessfulEventsCount(successfulEventsCount)
|
||||
.withFailedEventsCount(failedEventsCount)
|
||||
.withTotalUnprocessedEventsCount(totalUnprocessedEventCount)
|
||||
.withTotalUnprocessedEventsList(allUnprocessedEvents)
|
||||
.withRelevantUnprocessedEventsCount((long) unprocessedEvents.size())
|
||||
@ -369,32 +392,9 @@ public class EventSubscriptionScheduler {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
List<TypedEvent> typedEvents =
|
||||
Entity.getCollectionDAO()
|
||||
.changeEventDAO()
|
||||
.listAllEventsWithStatuses(
|
||||
subscriptionId.toString(),
|
||||
eventSubscriptionOffset.get().getCurrentOffset(),
|
||||
limit,
|
||||
offset);
|
||||
|
||||
EventSubscription eventSubscription = getEventSubscription(subscriptionId);
|
||||
|
||||
return typedEvents.parallelStream()
|
||||
.filter(
|
||||
event -> {
|
||||
if (TypedEvent.Status.UNPROCESSED.equals(event.getStatus())
|
||||
&& !CommonUtil.nullOrEmpty(event.getData())) {
|
||||
for (Object eventData : event.getData()) {
|
||||
if (eventData instanceof ChangeEvent changeEvent) {
|
||||
return AlertUtil.checkIfChangeEventIsAllowed(
|
||||
changeEvent, eventSubscription.getFilteringRules());
|
||||
}
|
||||
}
|
||||
}
|
||||
return true; // Keep FAILED and SUCCESSFUL records
|
||||
})
|
||||
.toList();
|
||||
return Entity.getCollectionDAO()
|
||||
.changeEventDAO()
|
||||
.listAllEventsWithStatuses(subscriptionId.toString(), limit, offset);
|
||||
}
|
||||
|
||||
private EventSubscription getEventSubscription(UUID eventSubscriptionId) {
|
||||
|
||||
@ -2128,7 +2128,7 @@ public interface CollectionDAO {
|
||||
|
||||
@SqlQuery(
|
||||
"SELECT COUNT(*) FROM successful_sent_change_events WHERE event_subscription_id = :eventSubscriptionId")
|
||||
int getRecordCount(@Bind("eventSubscriptionId") String eventSubscriptionId);
|
||||
long getSuccessfulRecordCount(@Bind("eventSubscriptionId") String eventSubscriptionId);
|
||||
|
||||
@SqlQuery(
|
||||
"SELECT event_subscription_id FROM successful_sent_change_events "
|
||||
@ -2139,10 +2139,10 @@ public interface CollectionDAO {
|
||||
@SqlUpdate(
|
||||
"DELETE FROM successful_sent_change_events WHERE event_subscription_id = :eventSubscriptionId ORDER BY timestamp ASC LIMIT :limit")
|
||||
void deleteOldRecords(
|
||||
@Bind("eventSubscriptionId") String eventSubscriptionId, @Bind("limit") int limit);
|
||||
@Bind("eventSubscriptionId") String eventSubscriptionId, @Bind("limit") long limit);
|
||||
|
||||
@SqlQuery(
|
||||
"SELECT json FROM successful_sent_change_events WHERE event_subscription_id = :eventSubscriptionId LIMIT :limit OFFSET :paginationOffset")
|
||||
"SELECT json FROM successful_sent_change_events WHERE event_subscription_id = :eventSubscriptionId ORDER BY timestamp DESC LIMIT :limit OFFSET :paginationOffset")
|
||||
List<String> getSuccessfulChangeEventBySubscriptionId(
|
||||
@Bind("eventSubscriptionId") String eventSubscriptionId,
|
||||
@Bind("limit") int limit,
|
||||
@ -3981,22 +3981,25 @@ public interface CollectionDAO {
|
||||
|
||||
interface ChangeEventDAO {
|
||||
@SqlQuery(
|
||||
"SELECT json FROM change_event ce where ce.offset > :offset ORDER BY ce.eventTime ASC LIMIT :limit OFFSET :paginationOffset")
|
||||
"SELECT json FROM change_event ce where ce.offset > :offset ORDER BY ce.eventTime DESC LIMIT :limit OFFSET :paginationOffset")
|
||||
List<String> listUnprocessedEvents(
|
||||
@Bind("offset") long offset,
|
||||
@Bind("limit") int limit,
|
||||
@Bind("paginationOffset") int paginationOffset);
|
||||
|
||||
@SqlQuery(
|
||||
"SELECT json, source FROM consumers_dlq WHERE id = :id ORDER BY timestamp ASC LIMIT :limit OFFSET :paginationOffset")
|
||||
"SELECT json, source FROM consumers_dlq WHERE id = :id ORDER BY timestamp DESC LIMIT :limit OFFSET :paginationOffset")
|
||||
@RegisterRowMapper(FailedEventResponseMapper.class)
|
||||
List<FailedEventResponse> listFailedEventsById(
|
||||
@Bind("id") String id,
|
||||
@Bind("limit") int limit,
|
||||
@Bind("paginationOffset") int paginationOffset);
|
||||
|
||||
@SqlQuery("SELECT COUNT(*) FROM consumers_dlq WHERE id = :id")
|
||||
long countFailedEvents(@Bind("id") String id);
|
||||
|
||||
@SqlQuery(
|
||||
"SELECT json, source FROM consumers_dlq WHERE id = :id AND source = :source ORDER BY timestamp ASC LIMIT :limit OFFSET :paginationOffset")
|
||||
"SELECT json, source FROM consumers_dlq WHERE id = :id AND source = :source ORDER BY timestamp DESC LIMIT :limit OFFSET :paginationOffset")
|
||||
@RegisterRowMapper(FailedEventResponseMapper.class)
|
||||
List<FailedEventResponse> listFailedEventsByIdAndSource(
|
||||
@Bind("id") String id,
|
||||
@ -4004,13 +4007,14 @@ public interface CollectionDAO {
|
||||
@Bind("limit") int limit,
|
||||
@Bind("paginationOffset") int paginationOffset);
|
||||
|
||||
@SqlQuery("SELECT json, source FROM consumers_dlq LIMIT :limit OFFSET :paginationOffset")
|
||||
@SqlQuery(
|
||||
"SELECT json, source FROM consumers_dlq ORDER BY timestamp DESC LIMIT :limit OFFSET :paginationOffset")
|
||||
@RegisterRowMapper(FailedEventResponseMapper.class)
|
||||
List<FailedEventResponse> listAllFailedEvents(
|
||||
@Bind("limit") int limit, @Bind("paginationOffset") int paginationOffset);
|
||||
|
||||
@SqlQuery(
|
||||
"SELECT json, source FROM consumers_dlq WHERE source = :source LIMIT :limit OFFSET :paginationOffset")
|
||||
"SELECT json, source FROM consumers_dlq WHERE source = :source ORDER BY timestamp DESC LIMIT :limit OFFSET :paginationOffset")
|
||||
@RegisterRowMapper(FailedEventResponseMapper.class)
|
||||
List<FailedEventResponse> listAllFailedEventsBySource(
|
||||
@Bind("source") String source,
|
||||
@ -4026,11 +4030,8 @@ public interface CollectionDAO {
|
||||
+ " UNION ALL "
|
||||
+ " SELECT json, 'SUCCESSFUL' AS status, timestamp "
|
||||
+ " FROM successful_sent_change_events WHERE event_subscription_id = :id "
|
||||
+ " UNION ALL "
|
||||
+ " SELECT ce.json, 'UNPROCESSED' AS status, ce.eventTime AS timestamp "
|
||||
+ " FROM change_event ce WHERE ce.offset > :currentOffset "
|
||||
+ ") AS combined_events "
|
||||
+ "ORDER BY timestamp ASC "
|
||||
+ "ORDER BY timestamp DESC "
|
||||
+ "LIMIT :limit OFFSET :paginationOffset",
|
||||
connectionType = MYSQL)
|
||||
@ConnectionAwareSqlQuery(
|
||||
@ -4042,9 +4043,6 @@ public interface CollectionDAO {
|
||||
+ " UNION ALL "
|
||||
+ " SELECT json, 'successful' AS status, timestamp "
|
||||
+ " FROM successful_sent_change_events WHERE event_subscription_id = :id "
|
||||
+ " UNION ALL "
|
||||
+ " SELECT ce.json, 'unprocessed' AS status, ce.eventTime AS timestamp "
|
||||
+ " FROM change_event ce WHERE ce.offset > :currentOffset "
|
||||
+ ") AS combined_events "
|
||||
+ "ORDER BY timestamp ASC "
|
||||
+ "LIMIT :limit OFFSET :paginationOffset",
|
||||
@ -4052,7 +4050,6 @@ public interface CollectionDAO {
|
||||
@RegisterRowMapper(EventResponseMapper.class)
|
||||
List<TypedEvent> listAllEventsWithStatuses(
|
||||
@Bind("id") String id,
|
||||
@Bind("currentOffset") long currentOffset,
|
||||
@Bind("limit") int limit,
|
||||
@Bind("paginationOffset") long paginationOffset);
|
||||
|
||||
|
||||
@ -650,11 +650,11 @@ public class EventSubscriptionResource
|
||||
@Context UriInfo uriInfo,
|
||||
@Context SecurityContext securityContext,
|
||||
@Parameter(
|
||||
description = "Status of events to retrieve (failed, successful, unprocessed)",
|
||||
description = "Status of events to retrieve (failed, successful)",
|
||||
schema =
|
||||
@Schema(
|
||||
type = "string",
|
||||
allowableValues = {"failed", "successful", "unprocessed"}))
|
||||
allowableValues = {"failed", "successful"}))
|
||||
@QueryParam("status")
|
||||
String statusParam,
|
||||
@Parameter(description = "ID of the alert or destination", schema = @Schema(type = "UUID"))
|
||||
@ -684,8 +684,7 @@ public class EventSubscriptionResource
|
||||
status = TypedEvent.Status.fromValue(statusParam);
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw new WebApplicationException(
|
||||
"Invalid status. Must be 'failed', 'successful', or 'unprocessed'.",
|
||||
Response.Status.BAD_REQUEST);
|
||||
"Invalid status. Must be 'failed' or 'successful'.", Response.Status.BAD_REQUEST);
|
||||
}
|
||||
}
|
||||
|
||||
@ -718,9 +717,6 @@ public class EventSubscriptionResource
|
||||
case SUCCESSFUL -> events =
|
||||
EventSubscriptionScheduler.getInstance()
|
||||
.getSuccessfullySentChangeEventsForAlert(id, limit, paginationOffset);
|
||||
case UNPROCESSED -> events =
|
||||
EventSubscriptionScheduler.getInstance()
|
||||
.getRelevantUnprocessedEvents(id, limit, paginationOffset);
|
||||
default -> throw new IllegalArgumentException("Unknown event status: " + status);
|
||||
}
|
||||
|
||||
@ -782,7 +778,11 @@ public class EventSubscriptionResource
|
||||
int paginationOffset,
|
||||
@Parameter(description = "UUID of the Event Subscription", schema = @Schema(type = "UUID"))
|
||||
@PathParam("subscriptionId")
|
||||
UUID subscriptionId) {
|
||||
UUID subscriptionId,
|
||||
@Parameter(description = "Return only count if true")
|
||||
@QueryParam("listCountOnly")
|
||||
@DefaultValue("false")
|
||||
boolean listCountOnly) {
|
||||
authorizer.authorizeAdmin(securityContext);
|
||||
try {
|
||||
if (!EventSubscriptionScheduler.getInstance().doesRecordExist(subscriptionId)) {
|
||||
@ -793,7 +793,8 @@ public class EventSubscriptionResource
|
||||
|
||||
EventSubscriptionDiagnosticInfo diagnosticInfo =
|
||||
EventSubscriptionScheduler.getInstance()
|
||||
.getEventSubscriptionDiagnosticInfo(subscriptionId, limit, paginationOffset);
|
||||
.getEventSubscriptionDiagnosticInfo(
|
||||
subscriptionId, limit, paginationOffset, listCountOnly);
|
||||
|
||||
return Response.ok().entity(diagnosticInfo).build();
|
||||
} catch (Exception e) {
|
||||
@ -840,7 +841,11 @@ public class EventSubscriptionResource
|
||||
int paginationOffset,
|
||||
@Parameter(description = "Name of the Event Subscription", schema = @Schema(type = "string"))
|
||||
@PathParam("subscriptionName")
|
||||
String subscriptionName) {
|
||||
String subscriptionName,
|
||||
@Parameter(description = "Return only count if true")
|
||||
@QueryParam("listCountOnly")
|
||||
@DefaultValue("false")
|
||||
boolean listCountOnly) {
|
||||
authorizer.authorizeAdmin(securityContext);
|
||||
try {
|
||||
EventSubscription subscription =
|
||||
@ -854,7 +859,8 @@ public class EventSubscriptionResource
|
||||
|
||||
EventSubscriptionDiagnosticInfo diagnosticInfo =
|
||||
EventSubscriptionScheduler.getInstance()
|
||||
.getEventSubscriptionDiagnosticInfo(subscription.getId(), limit, paginationOffset);
|
||||
.getEventSubscriptionDiagnosticInfo(
|
||||
subscription.getId(), limit, paginationOffset, listCountOnly);
|
||||
|
||||
return Response.ok().entity(diagnosticInfo).build();
|
||||
} catch (Exception e) {
|
||||
|
||||
@ -22,10 +22,22 @@
|
||||
"description": "Indicates whether all events have been processed.",
|
||||
"type": "boolean"
|
||||
},
|
||||
"successfulEventsCount": {
|
||||
"description": "Count of successful events.",
|
||||
"existingJavaType": "java.lang.Long"
|
||||
},
|
||||
"failedEventsCount": {
|
||||
"description": "Count of failed events.",
|
||||
"existingJavaType": "java.lang.Long"
|
||||
},
|
||||
"relevantUnprocessedEventsCount": {
|
||||
"description": "The number of relevant unprocessed events based on the alert's filtering rules.",
|
||||
"existingJavaType": "java.lang.Long"
|
||||
},
|
||||
"totalUnprocessedEventsCount": {
|
||||
"description": "The total number of unprocessed events.",
|
||||
"existingJavaType": "java.lang.Long"
|
||||
},
|
||||
"relevantUnprocessedEventsList": {
|
||||
"description": "A list of relevant unprocessed events based on the alert's filtering criteria.",
|
||||
"type": "array",
|
||||
@ -33,10 +45,6 @@
|
||||
"$ref": "../../type/changeEvent.json"
|
||||
}
|
||||
},
|
||||
"totalUnprocessedEventsCount": {
|
||||
"description": "The total number of unprocessed events.",
|
||||
"existingJavaType": "java.lang.Long"
|
||||
},
|
||||
"totalUnprocessedEventsList": {
|
||||
"description": "A list of all unprocessed events.",
|
||||
"type": "array",
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user