diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/AbstractEventConsumer.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/AbstractEventConsumer.java index b74b981436b..be0f5039a15 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/AbstractEventConsumer.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/AbstractEventConsumer.java @@ -16,17 +16,14 @@ package org.openmetadata.service.apps.bundles.changeEvent; import static org.openmetadata.service.events.subscription.AlertUtil.getFilteredEvents; import static org.openmetadata.service.events.subscription.AlertUtil.getStartingOffset; -import com.fasterxml.jackson.core.type.TypeReference; import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.UUID; -import java.util.stream.Collectors; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; @@ -91,62 +88,17 @@ public abstract class AbstractEventConsumer failingSubscriptionId, changeEvent); - // Update Failed Event with details - FailedEvent failedEvent = - new FailedEvent() - .withFailingSubscriptionId(failingSubscriptionId) - .withChangeEvent(changeEvent) - .withRetriesLeft(eventSubscription.getRetries()) - .withTimestamp(System.currentTimeMillis()); - - if (eventSubscription.getRetries() == 0) { - Entity.getCollectionDAO() - .eventSubscriptionDAO() - .upsertFailedEvent( - eventSubscription.getId().toString(), - String.format("%s-%s", FAILED_EVENT_EXTENSION, changeEvent.getId()), - JsonUtils.pojoToJson(failedEvent)); - } else { - // Check in Qtz Map - Set failedEventsList = - JsonUtils.convertValue( - jobDetail.getJobDataMap().get(FAILED_EVENT_EXTENSION), new TypeReference<>() {}); - if (failedEventsList == null) { - failedEventsList = new HashSet<>(); - } else { - // Remove exising change event - boolean removeChangeEvent = - failedEventsList.removeIf( - failedEvent1 -> { - if (failedEvent1 - .getChangeEvent() - .getId() - .equals(failedEvent.getChangeEvent().getId()) - && failedEvent1.getFailingSubscriptionId().equals(failingSubscriptionId)) { - failedEvent.withRetriesLeft(failedEvent1.getRetriesLeft()); - return true; - } - return false; - }); - - if (removeChangeEvent) { - if (failedEvent.getRetriesLeft() == 0) { - // If the Retries are exhausted, then remove the Event from the List to DLQ - failedEvent.withRetriesLeft(0); - } else { - failedEvent.withRetriesLeft(failedEvent.getRetriesLeft() - 1); - } - } - } - failedEventsList.add(failedEvent); - jobDetail.getJobDataMap().put(FAILED_EVENT_EXTENSION, failedEventsList); - Entity.getCollectionDAO() - .eventSubscriptionDAO() - .upsertFailedEvent( - eventSubscription.getId().toString(), - String.format("%s-%s", FAILED_EVENT_EXTENSION, changeEvent.getId()), - JsonUtils.pojoToJson(failedEvent)); - } + Entity.getCollectionDAO() + .eventSubscriptionDAO() + .upsertFailedEvent( + eventSubscription.getId().toString(), + String.format("%s-%s", FAILED_EVENT_EXTENSION, changeEvent.getId()), + JsonUtils.pojoToJson( + new FailedEvent() + .withFailingSubscriptionId(failingSubscriptionId) + .withChangeEvent(changeEvent) + .withRetriesLeft(eventSubscription.getRetries()) + .withTimestamp(System.currentTimeMillis()))); } private long loadInitialOffset(JobExecutionContext context) { @@ -278,20 +230,6 @@ public abstract class AbstractEventConsumer int batchSize = batch.size(); Map> eventsWithReceivers = createEventsWithReceivers(batch); try { - // Retry Failed Events - Set failedEventsList = - JsonUtils.convertValue( - jobDetail.getJobDataMap().get(FAILED_EVENT_EXTENSION), new TypeReference<>() {}); - if (failedEventsList != null) { - Map> failedChangeEvents = - failedEventsList.stream() - .filter(failedEvent -> failedEvent.getRetriesLeft() > 0) - .collect( - Collectors.toMap( - FailedEvent::getChangeEvent, - failedEvent -> Set.of(failedEvent.getFailingSubscriptionId()))); - eventsWithReceivers.putAll(failedChangeEvents); - } // Publish Events if (!eventsWithReceivers.isEmpty()) { alertMetrics.withTotalEvents(alertMetrics.getTotalEvents() + eventsWithReceivers.size());