MINOR: use separate quartz scheduler (#19953)

* fix: use separate quartz scheduler

* format

* added threadCount

* throw error if ServicesStatusJobHandler fails to start
This commit is contained in:
Imri Paran 2025-02-27 11:50:40 +01:00 committed by GitHub
parent ab7666b8fe
commit 415d798a37
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 22 additions and 2 deletions

View File

@ -16,12 +16,15 @@ package org.openmetadata.service.events.scheduled;
import static org.openmetadata.service.apps.bundles.changeEvent.AbstractEventConsumer.ALERT_INFO_KEY;
import static org.openmetadata.service.apps.bundles.changeEvent.AbstractEventConsumer.ALERT_OFFSET_KEY;
import static org.openmetadata.service.events.subscription.AlertUtil.getStartingOffset;
import static org.quartz.impl.StdSchedulerFactory.PROP_SCHED_INSTANCE_NAME;
import static org.quartz.impl.StdSchedulerFactory.PROP_THREAD_POOL_PREFIX;
import java.lang.reflect.InvocationTargetException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.stream.Collectors;
import lombok.SneakyThrows;
@ -71,7 +74,24 @@ public class EventSubscriptionScheduler {
public static final String ALERT_TRIGGER_GROUP = "OMAlertJobGroup";
private static EventSubscriptionScheduler instance;
private static volatile boolean initialized = false;
private final Scheduler alertsScheduler = new StdSchedulerFactory().getScheduler();
private static final Scheduler alertsScheduler;
private static final String SCHEDULER_NAME = "OpenMetadataEventSubscriptionScheduler";
private static final int SCHEDULER_THREAD_COUNT = 5;
static {
Properties properties = new Properties();
properties.setProperty(PROP_SCHED_INSTANCE_NAME, SCHEDULER_NAME);
properties.setProperty(
PROP_THREAD_POOL_PREFIX + ".threadCount", String.valueOf(SCHEDULER_THREAD_COUNT));
try {
StdSchedulerFactory factory = new StdSchedulerFactory();
factory.initialize(properties);
alertsScheduler = factory.getScheduler();
} catch (SchedulerException e) {
throw new ExceptionInInitializerError("Failed to initialize scheduler: " + e.getMessage());
}
}
private record CustomJobFactory(DIContainer di) implements JobFactory {

View File

@ -63,7 +63,7 @@ public class ServicesStatusJobHandler {
try {
instance = new ServicesStatusJobHandler(eventMonitorConfiguration, config, clusterName);
} catch (Exception ex) {
LOG.error("Failed to initialize the Pipeline Service Status Handler");
throw new RuntimeException("Failed to initialize the Pipeline Service Status Handler", ex);
}
return instance;
}