Add prometheus counter for search and database (#19561)

This commit is contained in:
Mohit Yadav 2025-01-29 00:08:45 +05:30 committed by mohitdeuex
parent e04a9a284b
commit 4a3721b8cd
10 changed files with 169 additions and 28 deletions

View File

@ -296,6 +296,7 @@ eventMonitoringConfiguration:
batchSize: ${EVENT_MONITOR_BATCH_SIZE:-10}
pathPattern: ${EVENT_MONITOR_PATH_PATTERN:-["/api/v1/tables/*", "/api/v1/health-check"]}
latency: ${EVENT_MONITOR_LATENCY:-[0.99, 0.90]} # For value p99=0.99, p90=0.90, p50=0.50 etc.
servicesHealthCheckInterval: ${EVENT_MONITOR_SERVICES_HEALTH_CHECK_INTERVAL:-300}
# it will use the default auth provider for AWS services if parameters are not set
# parameters:
# region: ${OM_MONITOR_REGION:-""}

View File

@ -73,7 +73,7 @@ import org.openmetadata.service.config.OMWebConfiguration;
import org.openmetadata.service.events.EventFilter;
import org.openmetadata.service.events.EventPubSub;
import org.openmetadata.service.events.scheduled.EventSubscriptionScheduler;
import org.openmetadata.service.events.scheduled.PipelineServiceStatusJobHandler;
import org.openmetadata.service.events.scheduled.ServicesStatusJobHandler;
import org.openmetadata.service.exception.CatalogGenericExceptionMapper;
import org.openmetadata.service.exception.ConstraintViolationExceptionMapper;
import org.openmetadata.service.exception.JsonMappingExceptionMapper;
@ -264,16 +264,23 @@ public class OpenMetadataApplication extends Application<OpenMetadataApplication
// Asset Servlet Registration
registerAssetServlet(catalogConfig.getWebConfiguration(), environment);
// Handle Pipeline Service Client Status job
PipelineServiceStatusJobHandler pipelineServiceStatusJobHandler =
PipelineServiceStatusJobHandler.create(
catalogConfig.getPipelineServiceClientConfiguration(), catalogConfig.getClusterName());
pipelineServiceStatusJobHandler.addPipelineServiceStatusJob();
// Handle Services Jobs
registerHealthCheckJobs(catalogConfig);
// Register Auth Handlers
registerAuthServlets(catalogConfig, environment);
}
private void registerHealthCheckJobs(OpenMetadataApplicationConfig catalogConfig) {
ServicesStatusJobHandler healthCheckStatusHandler =
ServicesStatusJobHandler.create(
catalogConfig.getEventMonitorConfiguration(),
catalogConfig.getPipelineServiceClientConfiguration(),
catalogConfig.getClusterName());
healthCheckStatusHandler.addPipelineServiceStatusJob();
healthCheckStatusHandler.addDatabaseAndSearchStatusJobs();
}
private void registerAuthServlets(OpenMetadataApplicationConfig config, Environment environment) {
if (config.getAuthenticationConfiguration() != null
&& config

View File

@ -0,0 +1,55 @@
package org.openmetadata.service.events.scheduled;
import static org.openmetadata.service.events.scheduled.ServicesStatusJobHandler.JOB_CONTEXT_METER_REGISTRY;
import static org.openmetadata.service.events.scheduled.ServicesStatusJobHandler.UNHEALTHY_STATUS;
import io.micrometer.core.instrument.Counter;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.service.Entity;
import org.openmetadata.service.search.SearchHealthStatus;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
@Slf4j
public class DatabseAndSearchServiceStatusJob implements Job {
private static final String SERVICE_COUNTER = "omd_service_unreachable";
private static final String SERVICE_NAME = "service_name";
private static final String SEARCH_SERVICE_NAME = "search";
private static final String DATABASE_SERVICE_NAME = "database";
@Override
public void execute(JobExecutionContext jobExecutionContext) {
PrometheusMeterRegistry meterRegistry =
(PrometheusMeterRegistry)
jobExecutionContext.getJobDetail().getJobDataMap().get(JOB_CONTEXT_METER_REGISTRY);
checkDatabaseStatus(meterRegistry);
checkElasticSearchStatus(meterRegistry);
}
private void checkElasticSearchStatus(PrometheusMeterRegistry meterRegistry) {
try {
SearchHealthStatus status =
Entity.getSearchRepository().getSearchClient().getSearchHealthStatus();
if (status.getStatus().equals(UNHEALTHY_STATUS)) {
publishUnhealthyCounter(meterRegistry, SERVICE_NAME, SEARCH_SERVICE_NAME);
}
} catch (Exception ex) {
LOG.error("Elastic Search encountering exception.", ex);
publishUnhealthyCounter(meterRegistry, SERVICE_NAME, SEARCH_SERVICE_NAME);
}
}
private void checkDatabaseStatus(PrometheusMeterRegistry meterRegistry) {
try {
Entity.getCollectionDAO().systemDAO().testConnection();
} catch (Exception ex) {
LOG.error("Database encountering exception.", ex);
publishUnhealthyCounter(meterRegistry, SERVICE_NAME, DATABASE_SERVICE_NAME);
}
}
private void publishUnhealthyCounter(PrometheusMeterRegistry meterRegistry, String... tags) {
Counter.builder(SERVICE_COUNTER).tags(tags).register(meterRegistry).increment();
}
}

View File

@ -2,9 +2,9 @@ package org.openmetadata.service.events.scheduled;
import static org.openmetadata.sdk.PipelineServiceClientInterface.HEALTHY_STATUS;
import static org.openmetadata.sdk.PipelineServiceClientInterface.STATUS_KEY;
import static org.openmetadata.service.events.scheduled.PipelineServiceStatusJobHandler.JOB_CONTEXT_CLUSTER_NAME;
import static org.openmetadata.service.events.scheduled.PipelineServiceStatusJobHandler.JOB_CONTEXT_METER_REGISTRY;
import static org.openmetadata.service.events.scheduled.PipelineServiceStatusJobHandler.JOB_CONTEXT_PIPELINE_SERVICE_CLIENT;
import static org.openmetadata.service.events.scheduled.ServicesStatusJobHandler.JOB_CONTEXT_CLUSTER_NAME;
import static org.openmetadata.service.events.scheduled.ServicesStatusJobHandler.JOB_CONTEXT_METER_REGISTRY;
import static org.openmetadata.service.events.scheduled.ServicesStatusJobHandler.JOB_CONTEXT_PIPELINE_SERVICE_CLIENT;
import io.micrometer.core.instrument.Counter;
import io.micrometer.prometheus.PrometheusMeterRegistry;

View File

@ -5,7 +5,9 @@ import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.api.configuration.pipelineServiceClient.PipelineServiceClientConfiguration;
import org.openmetadata.sdk.PipelineServiceClientInterface;
import org.openmetadata.service.clients.pipeline.PipelineServiceClientFactory;
import org.openmetadata.service.monitoring.EventMonitorConfiguration;
import org.openmetadata.service.util.MicrometerBundleSingleton;
import org.quartz.Job;
import org.quartz.JobBuilder;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
@ -17,11 +19,14 @@ import org.quartz.TriggerBuilder;
import org.quartz.impl.StdSchedulerFactory;
@Slf4j
public class PipelineServiceStatusJobHandler {
public class ServicesStatusJobHandler {
public static final String HEALTHY_STATUS = "healthy";
public static final String UNHEALTHY_STATUS = "unhealthy";
public static final String DATABASE_SEARCH_STATUS_JOB = "databaseAndSearchServiceStatusJob";
public static final String PIPELINE_SERVICE_STATUS_JOB = "pipelineServiceStatusJob";
public static final String STATUS_GROUP = "status";
public static final String STATUS_CRON_TRIGGER = "statusTrigger";
public static final String PIPELINE_STATUS_CRON_TRIGGER = "pipelineStatusTrigger";
public static final String DATABSE_SEARCH_STATUS_CRON_TRIGGER = "databaseAndSearchStatusTrigger";
public static final String JOB_CONTEXT_PIPELINE_SERVICE_CLIENT = "pipelineServiceClient";
public static final String JOB_CONTEXT_METER_REGISTRY = "meterRegistry";
public static final String JOB_CONTEXT_CLUSTER_NAME = "clusterName";
@ -32,51 +37,55 @@ public class PipelineServiceStatusJobHandler {
private final String clusterName;
private final Integer healthCheckInterval;
private final Scheduler scheduler = new StdSchedulerFactory().getScheduler();
private final int servicesHealthCheckInterval;
private static ServicesStatusJobHandler instance;
private static PipelineServiceStatusJobHandler instance;
private PipelineServiceStatusJobHandler(
PipelineServiceClientConfiguration config, String clusterName) throws SchedulerException {
private ServicesStatusJobHandler(
EventMonitorConfiguration monitorConfiguration,
PipelineServiceClientConfiguration config,
String clusterName)
throws SchedulerException {
this.config = config;
this.pipelineServiceClient = PipelineServiceClientFactory.createPipelineServiceClient(config);
this.meterRegistry = MicrometerBundleSingleton.prometheusMeterRegistry;
this.clusterName = clusterName;
this.healthCheckInterval = config.getHealthCheckInterval();
this.servicesHealthCheckInterval = monitorConfiguration.getServicesHealthCheckInterval();
this.scheduler.start();
}
public static PipelineServiceStatusJobHandler create(
PipelineServiceClientConfiguration config, String clusterName) {
public static ServicesStatusJobHandler create(
EventMonitorConfiguration eventMonitorConfiguration,
PipelineServiceClientConfiguration config,
String clusterName) {
if (instance != null) return instance;
try {
instance = new PipelineServiceStatusJobHandler(config, clusterName);
instance = new ServicesStatusJobHandler(eventMonitorConfiguration, config, clusterName);
} catch (Exception ex) {
LOG.error("Failed to initialize the Pipeline Service Status Handler");
}
return instance;
}
private JobDetail jobBuilder() {
private JobDetail jobBuilder(Class<? extends Job> clazz, String jobName, String group) {
JobDataMap dataMap = new JobDataMap();
dataMap.put(JOB_CONTEXT_PIPELINE_SERVICE_CLIENT, pipelineServiceClient);
dataMap.put(JOB_CONTEXT_METER_REGISTRY, meterRegistry);
dataMap.put(JOB_CONTEXT_CLUSTER_NAME, clusterName);
JobBuilder jobBuilder =
JobBuilder.newJob(PipelineServiceStatusJob.class)
.withIdentity(PIPELINE_SERVICE_STATUS_JOB, STATUS_GROUP)
.usingJobData(dataMap);
JobBuilder.newJob(clazz).withIdentity(jobName, group).usingJobData(dataMap);
return jobBuilder.build();
}
private Trigger getTrigger() {
private Trigger getTrigger(int checkInterval, String identity, String group) {
return TriggerBuilder.newTrigger()
.withIdentity(STATUS_CRON_TRIGGER, STATUS_GROUP)
.withIdentity(identity, group)
.withSchedule(
SimpleScheduleBuilder.simpleSchedule()
.withIntervalInSeconds(healthCheckInterval)
.withIntervalInSeconds(checkInterval)
.repeatForever())
.build();
}
@ -86,12 +95,27 @@ public class PipelineServiceStatusJobHandler {
// enabled
if (config.getEnabled().equals(Boolean.TRUE)) {
try {
JobDetail jobDetail = jobBuilder();
Trigger trigger = getTrigger();
JobDetail jobDetail =
jobBuilder(PipelineServiceStatusJob.class, PIPELINE_SERVICE_STATUS_JOB, STATUS_GROUP);
Trigger trigger =
getTrigger(healthCheckInterval, PIPELINE_STATUS_CRON_TRIGGER, STATUS_GROUP);
scheduler.scheduleJob(jobDetail, trigger);
} catch (Exception ex) {
LOG.error("Failed in setting up job Scheduler for Pipeline Service Status", ex);
}
}
}
public void addDatabaseAndSearchStatusJobs() {
try {
JobDetail jobDetail =
jobBuilder(
DatabseAndSearchServiceStatusJob.class, DATABASE_SEARCH_STATUS_JOB, STATUS_GROUP);
Trigger trigger =
getTrigger(servicesHealthCheckInterval, DATABSE_SEARCH_STATUS_CRON_TRIGGER, STATUS_GROUP);
scheduler.scheduleJob(jobDetail, trigger);
} catch (Exception ex) {
LOG.error("Failed in setting up job Scheduler for Pipeline Service Status", ex);
}
}
}

View File

@ -31,4 +31,6 @@ public class EventMonitorConfiguration {
private String[] pathPattern;
private double[] latency;
private int servicesHealthCheckInterval = 300;
}

View File

@ -390,4 +390,6 @@ public interface SearchClient {
&& !subjectContext.isBot()
&& rbacConditionEvaluator != null;
}
SearchHealthStatus getSearchHealthStatus() throws IOException;
}

View File

@ -0,0 +1,12 @@
package org.openmetadata.service.search;
import lombok.Getter;
@Getter
public class SearchHealthStatus {
public SearchHealthStatus(String status) {
this.status = status;
}
String status;
}

View File

@ -13,6 +13,8 @@ import static org.openmetadata.service.Entity.GLOSSARY_TERM;
import static org.openmetadata.service.Entity.QUERY;
import static org.openmetadata.service.Entity.RAW_COST_ANALYSIS_REPORT_DATA;
import static org.openmetadata.service.Entity.TABLE;
import static org.openmetadata.service.events.scheduled.ServicesStatusJobHandler.HEALTHY_STATUS;
import static org.openmetadata.service.events.scheduled.ServicesStatusJobHandler.UNHEALTHY_STATUS;
import static org.openmetadata.service.exception.CatalogGenericExceptionMapper.getResponse;
import static org.openmetadata.service.search.EntityBuilderConstant.API_RESPONSE_SCHEMA_FIELD;
import static org.openmetadata.service.search.EntityBuilderConstant.API_RESPONSE_SCHEMA_FIELD_KEYWORD;
@ -38,6 +40,9 @@ import static org.openmetadata.service.util.FullyQualifiedName.getParentFQN;
import com.fasterxml.jackson.databind.JsonNode;
import es.org.elasticsearch.ElasticsearchStatusException;
import es.org.elasticsearch.action.ActionListener;
import es.org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import es.org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import es.org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import es.org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import es.org.elasticsearch.action.bulk.BulkRequest;
@ -60,6 +65,7 @@ import es.org.elasticsearch.client.indices.GetIndexRequest;
import es.org.elasticsearch.client.indices.GetMappingsRequest;
import es.org.elasticsearch.client.indices.GetMappingsResponse;
import es.org.elasticsearch.client.indices.PutMappingRequest;
import es.org.elasticsearch.cluster.health.ClusterHealthStatus;
import es.org.elasticsearch.cluster.metadata.MappingMetadata;
import es.org.elasticsearch.common.lucene.search.function.CombineFunction;
import es.org.elasticsearch.common.lucene.search.function.FieldValueFactorFunction;
@ -166,6 +172,7 @@ import org.openmetadata.service.jdbi3.TableRepository;
import org.openmetadata.service.jdbi3.TestCaseResultRepository;
import org.openmetadata.service.search.SearchAggregation;
import org.openmetadata.service.search.SearchClient;
import org.openmetadata.service.search.SearchHealthStatus;
import org.openmetadata.service.search.SearchIndexUtils;
import org.openmetadata.service.search.SearchRequest;
import org.openmetadata.service.search.SearchSortFilter;
@ -2840,6 +2847,18 @@ public class ElasticSearchClient implements SearchClient {
return client.getLowLevelClient();
}
@Override
public SearchHealthStatus getSearchHealthStatus() throws IOException {
ClusterHealthRequest request = new ClusterHealthRequest();
ClusterHealthResponse response = client.cluster().health(request, RequestOptions.DEFAULT);
if (response.getStatus().equals(ClusterHealthStatus.GREEN)
|| response.getStatus().equals(ClusterHealthStatus.YELLOW)) {
return new SearchHealthStatus(HEALTHY_STATUS);
} else {
return new SearchHealthStatus(UNHEALTHY_STATUS);
}
}
private void buildSearchRBACQuery(
SubjectContext subjectContext, SearchSourceBuilder searchSourceBuilder) {
if (SearchClient.shouldApplyRbacConditions(subjectContext, rbacConditionEvaluator)) {

View File

@ -12,6 +12,8 @@ import static org.openmetadata.service.Entity.GLOSSARY_TERM;
import static org.openmetadata.service.Entity.QUERY;
import static org.openmetadata.service.Entity.RAW_COST_ANALYSIS_REPORT_DATA;
import static org.openmetadata.service.Entity.TABLE;
import static org.openmetadata.service.events.scheduled.ServicesStatusJobHandler.HEALTHY_STATUS;
import static org.openmetadata.service.events.scheduled.ServicesStatusJobHandler.UNHEALTHY_STATUS;
import static org.openmetadata.service.exception.CatalogGenericExceptionMapper.getResponse;
import static org.openmetadata.service.search.EntityBuilderConstant.API_RESPONSE_SCHEMA_FIELD;
import static org.openmetadata.service.search.EntityBuilderConstant.API_RESPONSE_SCHEMA_FIELD_KEYWORD;
@ -86,6 +88,7 @@ import org.openmetadata.service.jdbi3.TableRepository;
import org.openmetadata.service.jdbi3.TestCaseResultRepository;
import org.openmetadata.service.search.SearchAggregation;
import org.openmetadata.service.search.SearchClient;
import org.openmetadata.service.search.SearchHealthStatus;
import org.openmetadata.service.search.SearchIndexUtils;
import org.openmetadata.service.search.SearchRequest;
import org.openmetadata.service.search.SearchSortFilter;
@ -135,6 +138,9 @@ import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.workflows.searchIndex.ReindexingUtil;
import os.org.opensearch.OpenSearchException;
import os.org.opensearch.OpenSearchStatusException;
import os.org.opensearch.action.ActionListener;
import os.org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
import os.org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import os.org.opensearch.action.admin.indices.alias.IndicesAliasesRequest;
import os.org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import os.org.opensearch.action.bulk.BulkRequest;
@ -156,6 +162,7 @@ import os.org.opensearch.client.indices.GetIndexRequest;
import os.org.opensearch.client.indices.GetMappingsRequest;
import os.org.opensearch.client.indices.GetMappingsResponse;
import os.org.opensearch.client.indices.PutMappingRequest;
import os.org.opensearch.cluster.health.ClusterHealthStatus;
import os.org.opensearch.cluster.metadata.MappingMetadata;
import os.org.opensearch.common.lucene.search.function.CombineFunction;
import os.org.opensearch.common.lucene.search.function.FieldValueFactorFunction;
@ -2835,4 +2842,16 @@ public class OpenSearchClient implements SearchClient {
}
}
}
@Override
public SearchHealthStatus getSearchHealthStatus() throws IOException {
ClusterHealthRequest request = new ClusterHealthRequest();
ClusterHealthResponse response = client.cluster().health(request, RequestOptions.DEFAULT);
if (response.getStatus().equals(ClusterHealthStatus.GREEN)
|| response.getStatus().equals(ClusterHealthStatus.YELLOW)) {
return new SearchHealthStatus(HEALTHY_STATUS);
} else {
return new SearchHealthStatus(UNHEALTHY_STATUS);
}
}
}