From 4a3721b8cd6c6a0c6c46f7f24cb6abc8a40b15fd Mon Sep 17 00:00:00 2001 From: Mohit Yadav <105265192+mohityadav766@users.noreply.github.com> Date: Wed, 29 Jan 2025 00:08:45 +0530 Subject: [PATCH] Add prometheus counter for search and database (#19561) --- conf/openmetadata.yaml | 1 + .../service/OpenMetadataApplication.java | 19 ++++-- .../DatabseAndSearchServiceStatusJob.java | 55 ++++++++++++++++ .../scheduled/PipelineServiceStatusJob.java | 6 +- ...ler.java => ServicesStatusJobHandler.java} | 62 +++++++++++++------ .../monitoring/EventMonitorConfiguration.java | 2 + .../service/search/SearchClient.java | 2 + .../service/search/SearchHealthStatus.java | 12 ++++ .../elasticsearch/ElasticSearchClient.java | 19 ++++++ .../search/opensearch/OpenSearchClient.java | 19 ++++++ 10 files changed, 169 insertions(+), 28 deletions(-) create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/DatabseAndSearchServiceStatusJob.java rename openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/{PipelineServiceStatusJobHandler.java => ServicesStatusJobHandler.java} (55%) create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/search/SearchHealthStatus.java diff --git a/conf/openmetadata.yaml b/conf/openmetadata.yaml index cf7f07b7f28..cc6909053c8 100644 --- a/conf/openmetadata.yaml +++ b/conf/openmetadata.yaml @@ -296,6 +296,7 @@ eventMonitoringConfiguration: batchSize: ${EVENT_MONITOR_BATCH_SIZE:-10} pathPattern: ${EVENT_MONITOR_PATH_PATTERN:-["/api/v1/tables/*", "/api/v1/health-check"]} latency: ${EVENT_MONITOR_LATENCY:-[0.99, 0.90]} # For value p99=0.99, p90=0.90, p50=0.50 etc. + servicesHealthCheckInterval: ${EVENT_MONITOR_SERVICES_HEALTH_CHECK_INTERVAL:-300} # it will use the default auth provider for AWS services if parameters are not set # parameters: # region: ${OM_MONITOR_REGION:-""} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java b/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java index ee00e54cf12..900b34fcf0d 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java @@ -73,7 +73,7 @@ import org.openmetadata.service.config.OMWebConfiguration; import org.openmetadata.service.events.EventFilter; import org.openmetadata.service.events.EventPubSub; import org.openmetadata.service.events.scheduled.EventSubscriptionScheduler; -import org.openmetadata.service.events.scheduled.PipelineServiceStatusJobHandler; +import org.openmetadata.service.events.scheduled.ServicesStatusJobHandler; import org.openmetadata.service.exception.CatalogGenericExceptionMapper; import org.openmetadata.service.exception.ConstraintViolationExceptionMapper; import org.openmetadata.service.exception.JsonMappingExceptionMapper; @@ -264,16 +264,23 @@ public class OpenMetadataApplication extends Application clazz, String jobName, String group) { JobDataMap dataMap = new JobDataMap(); dataMap.put(JOB_CONTEXT_PIPELINE_SERVICE_CLIENT, pipelineServiceClient); dataMap.put(JOB_CONTEXT_METER_REGISTRY, meterRegistry); dataMap.put(JOB_CONTEXT_CLUSTER_NAME, clusterName); JobBuilder jobBuilder = - JobBuilder.newJob(PipelineServiceStatusJob.class) - .withIdentity(PIPELINE_SERVICE_STATUS_JOB, STATUS_GROUP) - .usingJobData(dataMap); + JobBuilder.newJob(clazz).withIdentity(jobName, group).usingJobData(dataMap); return jobBuilder.build(); } - private Trigger getTrigger() { + private Trigger getTrigger(int checkInterval, String identity, String group) { return TriggerBuilder.newTrigger() - .withIdentity(STATUS_CRON_TRIGGER, STATUS_GROUP) + .withIdentity(identity, group) .withSchedule( SimpleScheduleBuilder.simpleSchedule() - .withIntervalInSeconds(healthCheckInterval) + .withIntervalInSeconds(checkInterval) .repeatForever()) .build(); } @@ -86,12 +95,27 @@ public class PipelineServiceStatusJobHandler { // enabled if (config.getEnabled().equals(Boolean.TRUE)) { try { - JobDetail jobDetail = jobBuilder(); - Trigger trigger = getTrigger(); + JobDetail jobDetail = + jobBuilder(PipelineServiceStatusJob.class, PIPELINE_SERVICE_STATUS_JOB, STATUS_GROUP); + Trigger trigger = + getTrigger(healthCheckInterval, PIPELINE_STATUS_CRON_TRIGGER, STATUS_GROUP); scheduler.scheduleJob(jobDetail, trigger); } catch (Exception ex) { LOG.error("Failed in setting up job Scheduler for Pipeline Service Status", ex); } } } + + public void addDatabaseAndSearchStatusJobs() { + try { + JobDetail jobDetail = + jobBuilder( + DatabseAndSearchServiceStatusJob.class, DATABASE_SEARCH_STATUS_JOB, STATUS_GROUP); + Trigger trigger = + getTrigger(servicesHealthCheckInterval, DATABSE_SEARCH_STATUS_CRON_TRIGGER, STATUS_GROUP); + scheduler.scheduleJob(jobDetail, trigger); + } catch (Exception ex) { + LOG.error("Failed in setting up job Scheduler for Pipeline Service Status", ex); + } + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/monitoring/EventMonitorConfiguration.java b/openmetadata-service/src/main/java/org/openmetadata/service/monitoring/EventMonitorConfiguration.java index 4e3f17b84c7..552d34db3da 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/monitoring/EventMonitorConfiguration.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/monitoring/EventMonitorConfiguration.java @@ -31,4 +31,6 @@ public class EventMonitorConfiguration { private String[] pathPattern; private double[] latency; + + private int servicesHealthCheckInterval = 300; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java index d764b11a8b0..88fb80571a3 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java @@ -390,4 +390,6 @@ public interface SearchClient { && !subjectContext.isBot() && rbacConditionEvaluator != null; } + + SearchHealthStatus getSearchHealthStatus() throws IOException; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchHealthStatus.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchHealthStatus.java new file mode 100644 index 00000000000..f3d2b5cf043 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchHealthStatus.java @@ -0,0 +1,12 @@ +package org.openmetadata.service.search; + +import lombok.Getter; + +@Getter +public class SearchHealthStatus { + public SearchHealthStatus(String status) { + this.status = status; + } + + String status; +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java index 4422ecb2727..8a22119ceef 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java @@ -13,6 +13,8 @@ import static org.openmetadata.service.Entity.GLOSSARY_TERM; import static org.openmetadata.service.Entity.QUERY; import static org.openmetadata.service.Entity.RAW_COST_ANALYSIS_REPORT_DATA; import static org.openmetadata.service.Entity.TABLE; +import static org.openmetadata.service.events.scheduled.ServicesStatusJobHandler.HEALTHY_STATUS; +import static org.openmetadata.service.events.scheduled.ServicesStatusJobHandler.UNHEALTHY_STATUS; import static org.openmetadata.service.exception.CatalogGenericExceptionMapper.getResponse; import static org.openmetadata.service.search.EntityBuilderConstant.API_RESPONSE_SCHEMA_FIELD; import static org.openmetadata.service.search.EntityBuilderConstant.API_RESPONSE_SCHEMA_FIELD_KEYWORD; @@ -38,6 +40,9 @@ import static org.openmetadata.service.util.FullyQualifiedName.getParentFQN; import com.fasterxml.jackson.databind.JsonNode; import es.org.elasticsearch.ElasticsearchStatusException; +import es.org.elasticsearch.action.ActionListener; +import es.org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; +import es.org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import es.org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import es.org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import es.org.elasticsearch.action.bulk.BulkRequest; @@ -60,6 +65,7 @@ import es.org.elasticsearch.client.indices.GetIndexRequest; import es.org.elasticsearch.client.indices.GetMappingsRequest; import es.org.elasticsearch.client.indices.GetMappingsResponse; import es.org.elasticsearch.client.indices.PutMappingRequest; +import es.org.elasticsearch.cluster.health.ClusterHealthStatus; import es.org.elasticsearch.cluster.metadata.MappingMetadata; import es.org.elasticsearch.common.lucene.search.function.CombineFunction; import es.org.elasticsearch.common.lucene.search.function.FieldValueFactorFunction; @@ -166,6 +172,7 @@ import org.openmetadata.service.jdbi3.TableRepository; import org.openmetadata.service.jdbi3.TestCaseResultRepository; import org.openmetadata.service.search.SearchAggregation; import org.openmetadata.service.search.SearchClient; +import org.openmetadata.service.search.SearchHealthStatus; import org.openmetadata.service.search.SearchIndexUtils; import org.openmetadata.service.search.SearchRequest; import org.openmetadata.service.search.SearchSortFilter; @@ -2840,6 +2847,18 @@ public class ElasticSearchClient implements SearchClient { return client.getLowLevelClient(); } + @Override + public SearchHealthStatus getSearchHealthStatus() throws IOException { + ClusterHealthRequest request = new ClusterHealthRequest(); + ClusterHealthResponse response = client.cluster().health(request, RequestOptions.DEFAULT); + if (response.getStatus().equals(ClusterHealthStatus.GREEN) + || response.getStatus().equals(ClusterHealthStatus.YELLOW)) { + return new SearchHealthStatus(HEALTHY_STATUS); + } else { + return new SearchHealthStatus(UNHEALTHY_STATUS); + } + } + private void buildSearchRBACQuery( SubjectContext subjectContext, SearchSourceBuilder searchSourceBuilder) { if (SearchClient.shouldApplyRbacConditions(subjectContext, rbacConditionEvaluator)) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java index 51da4f271de..069f3ee75e3 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java @@ -12,6 +12,8 @@ import static org.openmetadata.service.Entity.GLOSSARY_TERM; import static org.openmetadata.service.Entity.QUERY; import static org.openmetadata.service.Entity.RAW_COST_ANALYSIS_REPORT_DATA; import static org.openmetadata.service.Entity.TABLE; +import static org.openmetadata.service.events.scheduled.ServicesStatusJobHandler.HEALTHY_STATUS; +import static org.openmetadata.service.events.scheduled.ServicesStatusJobHandler.UNHEALTHY_STATUS; import static org.openmetadata.service.exception.CatalogGenericExceptionMapper.getResponse; import static org.openmetadata.service.search.EntityBuilderConstant.API_RESPONSE_SCHEMA_FIELD; import static org.openmetadata.service.search.EntityBuilderConstant.API_RESPONSE_SCHEMA_FIELD_KEYWORD; @@ -86,6 +88,7 @@ import org.openmetadata.service.jdbi3.TableRepository; import org.openmetadata.service.jdbi3.TestCaseResultRepository; import org.openmetadata.service.search.SearchAggregation; import org.openmetadata.service.search.SearchClient; +import org.openmetadata.service.search.SearchHealthStatus; import org.openmetadata.service.search.SearchIndexUtils; import org.openmetadata.service.search.SearchRequest; import org.openmetadata.service.search.SearchSortFilter; @@ -135,6 +138,9 @@ import org.openmetadata.service.util.JsonUtils; import org.openmetadata.service.workflows.searchIndex.ReindexingUtil; import os.org.opensearch.OpenSearchException; import os.org.opensearch.OpenSearchStatusException; +import os.org.opensearch.action.ActionListener; +import os.org.opensearch.action.admin.cluster.health.ClusterHealthRequest; +import os.org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import os.org.opensearch.action.admin.indices.alias.IndicesAliasesRequest; import os.org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import os.org.opensearch.action.bulk.BulkRequest; @@ -156,6 +162,7 @@ import os.org.opensearch.client.indices.GetIndexRequest; import os.org.opensearch.client.indices.GetMappingsRequest; import os.org.opensearch.client.indices.GetMappingsResponse; import os.org.opensearch.client.indices.PutMappingRequest; +import os.org.opensearch.cluster.health.ClusterHealthStatus; import os.org.opensearch.cluster.metadata.MappingMetadata; import os.org.opensearch.common.lucene.search.function.CombineFunction; import os.org.opensearch.common.lucene.search.function.FieldValueFactorFunction; @@ -2835,4 +2842,16 @@ public class OpenSearchClient implements SearchClient { } } } + + @Override + public SearchHealthStatus getSearchHealthStatus() throws IOException { + ClusterHealthRequest request = new ClusterHealthRequest(); + ClusterHealthResponse response = client.cluster().health(request, RequestOptions.DEFAULT); + if (response.getStatus().equals(ClusterHealthStatus.GREEN) + || response.getStatus().equals(ClusterHealthStatus.YELLOW)) { + return new SearchHealthStatus(HEALTHY_STATUS); + } else { + return new SearchHealthStatus(UNHEALTHY_STATUS); + } + } }