diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/AbstractEventConsumer.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/AbstractEventConsumer.java index a777bef57ba..f77ff35ff10 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/AbstractEventConsumer.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/AbstractEventConsumer.java @@ -301,6 +301,9 @@ public abstract class AbstractEventConsumer offset += batchSize; commit(jobExecutionContext); } + + // Call stop to close the client + this.stop(); } public EventSubscription getEventSubscription() { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/Alert.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/Alert.java index 36ff9b01bdd..f5bb8861eba 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/Alert.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/Alert.java @@ -17,4 +17,6 @@ import org.openmetadata.service.events.errors.EventPublisherException; public interface Alert { void sendAlert(T event) throws EventPublisherException; + + void stop(); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/Consumer.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/Consumer.java index d485814fcce..f210ab284b1 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/Consumer.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/Consumer.java @@ -13,12 +13,11 @@ package org.openmetadata.service.apps.bundles.changeEvent; -import io.dropwizard.lifecycle.Managed; import java.util.List; import org.openmetadata.service.events.errors.EventPublisherException; import org.quartz.JobExecutionContext; -public interface Consumer extends Managed { +public interface Consumer { List pollEvents(long offset, long batchSize); void publishEvents(List events);