mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-09-09 17:12:02 +00:00
Remove info log and using JsonUtils to convert the Ingestion Pipeline (#14759)
This commit is contained in:
parent
bfbd953b53
commit
70b74d7d4f
@ -273,14 +273,12 @@ public abstract class AbstractEventConsumer
|
|||||||
@Override
|
@Override
|
||||||
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
|
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
|
||||||
// Must Have , Before Execute the Init, Quartz Requires a Non-Arg Constructor
|
// Must Have , Before Execute the Init, Quartz Requires a Non-Arg Constructor
|
||||||
|
this.init(jobExecutionContext);
|
||||||
|
// Poll Events from Change Event Table
|
||||||
|
List<ChangeEvent> batch = pollEvents(offset, 100);
|
||||||
|
int batchSize = batch.size();
|
||||||
try {
|
try {
|
||||||
|
|
||||||
this.init(jobExecutionContext);
|
|
||||||
|
|
||||||
// Poll Events from Change Event Table
|
|
||||||
List<ChangeEvent> batch = pollEvents(offset, 100);
|
|
||||||
int batchSize = batch.size();
|
|
||||||
|
|
||||||
// Retry Failed Events
|
// Retry Failed Events
|
||||||
Set<FailedEvent> failedEventsList =
|
Set<FailedEvent> failedEventsList =
|
||||||
JsonUtils.convertValue(
|
JsonUtils.convertValue(
|
||||||
@ -293,7 +291,9 @@ public abstract class AbstractEventConsumer
|
|||||||
.toList();
|
.toList();
|
||||||
batch.addAll(failedChangeEvents);
|
batch.addAll(failedChangeEvents);
|
||||||
}
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Error in executing the Job : {} ", e.getMessage());
|
||||||
|
} finally {
|
||||||
if (!batch.isEmpty()) {
|
if (!batch.isEmpty()) {
|
||||||
// Publish Events
|
// Publish Events
|
||||||
alertMetrics.withTotalEvents(alertMetrics.getTotalEvents() + batch.size());
|
alertMetrics.withTotalEvents(alertMetrics.getTotalEvents() + batch.size());
|
||||||
@ -303,9 +303,6 @@ public abstract class AbstractEventConsumer
|
|||||||
offset += batchSize;
|
offset += batchSize;
|
||||||
commit(jobExecutionContext);
|
commit(jobExecutionContext);
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.error("Error in executing the Job : {} ", e.getMessage());
|
|
||||||
} finally {
|
|
||||||
// Call stop to close the client
|
// Call stop to close the client
|
||||||
this.stop();
|
this.stop();
|
||||||
}
|
}
|
||||||
|
@ -85,6 +85,6 @@ public class EmailPublisher extends AbstractEventConsumer {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void stop() {
|
public void stop() {
|
||||||
LOG.info("Email Publisher Stopped");
|
LOG.debug("Email Publisher Stopped");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -297,7 +297,8 @@ public class AlertsRuleEvaluator {
|
|||||||
}
|
}
|
||||||
for (FieldChange fieldChange : changeEvent.getChangeDescription().getFieldsUpdated()) {
|
for (FieldChange fieldChange : changeEvent.getChangeDescription().getFieldsUpdated()) {
|
||||||
if (fieldChange.getName().equals("pipelineStatus") && fieldChange.getNewValue() != null) {
|
if (fieldChange.getName().equals("pipelineStatus") && fieldChange.getNewValue() != null) {
|
||||||
PipelineStatus pipelineStatus = (PipelineStatus) fieldChange.getNewValue();
|
PipelineStatus pipelineStatus =
|
||||||
|
JsonUtils.convertValue(fieldChange.getNewValue(), PipelineStatus.class);
|
||||||
PipelineStatusType status = pipelineStatus.getPipelineState();
|
PipelineStatusType status = pipelineStatus.getPipelineState();
|
||||||
for (String givenStatus : pipelineState) {
|
for (String givenStatus : pipelineState) {
|
||||||
if (givenStatus.equals(status.value())) {
|
if (givenStatus.equals(status.value())) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user