From 70b74d7d4f03f182985daa7d76f33d7a068f199f Mon Sep 17 00:00:00 2001 From: Mohit Yadav <105265192+mohityadav766@users.noreply.github.com> Date: Wed, 17 Jan 2024 17:25:33 +0530 Subject: [PATCH] Remove info log and using JsonUtils to convert the Ingestion Pipeline (#14759) --- .../changeEvent/AbstractEventConsumer.java | 17 +++++++---------- .../changeEvent/email/EmailPublisher.java | 2 +- .../subscription/AlertsRuleEvaluator.java | 3 ++- 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/AbstractEventConsumer.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/AbstractEventConsumer.java index 47b18f1cbaa..63958d2747e 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/AbstractEventConsumer.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/AbstractEventConsumer.java @@ -273,14 +273,12 @@ public abstract class AbstractEventConsumer @Override public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { // Must Have , Before Execute the Init, Quartz Requires a Non-Arg Constructor + this.init(jobExecutionContext); + // Poll Events from Change Event Table + List batch = pollEvents(offset, 100); + int batchSize = batch.size(); try { - this.init(jobExecutionContext); - - // Poll Events from Change Event Table - List batch = pollEvents(offset, 100); - int batchSize = batch.size(); - // Retry Failed Events Set failedEventsList = JsonUtils.convertValue( @@ -293,7 +291,9 @@ public abstract class AbstractEventConsumer .toList(); batch.addAll(failedChangeEvents); } - + } catch (Exception e) { + LOG.error("Error in executing the Job : {} ", e.getMessage()); + } finally { if (!batch.isEmpty()) { // Publish Events alertMetrics.withTotalEvents(alertMetrics.getTotalEvents() + batch.size()); @@ -303,9 +303,6 @@ public abstract class AbstractEventConsumer offset += batchSize; commit(jobExecutionContext); } - } catch (Exception e) { - LOG.error("Error in executing the Job : {} ", e.getMessage()); - } finally { // Call stop to close the client this.stop(); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/email/EmailPublisher.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/email/EmailPublisher.java index 82b3df8ce8e..10d4cb4b39f 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/email/EmailPublisher.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/email/EmailPublisher.java @@ -85,6 +85,6 @@ public class EmailPublisher extends AbstractEventConsumer { @Override public void stop() { - LOG.info("Email Publisher Stopped"); + LOG.debug("Email Publisher Stopped"); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/events/subscription/AlertsRuleEvaluator.java b/openmetadata-service/src/main/java/org/openmetadata/service/events/subscription/AlertsRuleEvaluator.java index 964bec950ab..9e9361574d4 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/events/subscription/AlertsRuleEvaluator.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/events/subscription/AlertsRuleEvaluator.java @@ -297,7 +297,8 @@ public class AlertsRuleEvaluator { } for (FieldChange fieldChange : changeEvent.getChangeDescription().getFieldsUpdated()) { if (fieldChange.getName().equals("pipelineStatus") && fieldChange.getNewValue() != null) { - PipelineStatus pipelineStatus = (PipelineStatus) fieldChange.getNewValue(); + PipelineStatus pipelineStatus = + JsonUtils.convertValue(fieldChange.getNewValue(), PipelineStatus.class); PipelineStatusType status = pipelineStatus.getPipelineState(); for (String givenStatus : pipelineState) { if (givenStatus.equals(status.value())) {