diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/AbstractEventConsumer.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/AbstractEventConsumer.java index 47b18f1cbaa..63958d2747e 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/AbstractEventConsumer.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/AbstractEventConsumer.java @@ -273,14 +273,12 @@ public abstract class AbstractEventConsumer @Override public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { // Must Have , Before Execute the Init, Quartz Requires a Non-Arg Constructor + this.init(jobExecutionContext); + // Poll Events from Change Event Table + List batch = pollEvents(offset, 100); + int batchSize = batch.size(); try { - this.init(jobExecutionContext); - - // Poll Events from Change Event Table - List batch = pollEvents(offset, 100); - int batchSize = batch.size(); - // Retry Failed Events Set failedEventsList = JsonUtils.convertValue( @@ -293,7 +291,9 @@ public abstract class AbstractEventConsumer .toList(); batch.addAll(failedChangeEvents); } - + } catch (Exception e) { + LOG.error("Error in executing the Job : {} ", e.getMessage()); + } finally { if (!batch.isEmpty()) { // Publish Events alertMetrics.withTotalEvents(alertMetrics.getTotalEvents() + batch.size()); @@ -303,9 +303,6 @@ public abstract class AbstractEventConsumer offset += batchSize; commit(jobExecutionContext); } - } catch (Exception e) { - LOG.error("Error in executing the Job : {} ", e.getMessage()); - } finally { // Call stop to close the client this.stop(); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/email/EmailPublisher.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/email/EmailPublisher.java index 82b3df8ce8e..10d4cb4b39f 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/email/EmailPublisher.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/email/EmailPublisher.java @@ -85,6 +85,6 @@ public class EmailPublisher extends AbstractEventConsumer { @Override public void stop() { - LOG.info("Email Publisher Stopped"); + LOG.debug("Email Publisher Stopped"); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/events/subscription/AlertsRuleEvaluator.java b/openmetadata-service/src/main/java/org/openmetadata/service/events/subscription/AlertsRuleEvaluator.java index 964bec950ab..9e9361574d4 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/events/subscription/AlertsRuleEvaluator.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/events/subscription/AlertsRuleEvaluator.java @@ -297,7 +297,8 @@ public class AlertsRuleEvaluator { } for (FieldChange fieldChange : changeEvent.getChangeDescription().getFieldsUpdated()) { if (fieldChange.getName().equals("pipelineStatus") && fieldChange.getNewValue() != null) { - PipelineStatus pipelineStatus = (PipelineStatus) fieldChange.getNewValue(); + PipelineStatus pipelineStatus = + JsonUtils.convertValue(fieldChange.getNewValue(), PipelineStatus.class); PipelineStatusType status = pipelineStatus.getPipelineState(); for (String givenStatus : pipelineState) { if (givenStatus.equals(status.value())) {