From 520d5e2657721d97695ed6e16a1a3af23f94f6ff Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Thu, 22 Jun 2023 06:53:56 +0200 Subject: [PATCH] Add Pipeline Service Status Job to publish to Prometheus (#12046) * Add Pipeline Service Status Job to publish to Prometheus * Add Pipeline Service Status Job to publish to Prometheus * Add config for health check interval --- conf/openmetadata.yaml | 1 + .../development/docker-compose-postgres.yml | 1 + docker/development/docker-compose.yml | 1 + .../docker-compose-openmetadata-postgres.yml | 1 + .../docker-compose-openmetadata.yml | 1 + .../docker-compose-postgres.yml | 1 + .../docker-compose.yml | 1 + .../deployment/upgrade/index.md | 1 + .../service/OpenMetadataApplication.java | 7 ++ .../pipeline/airflow/AirflowRESTClient.java | 5 +- .../scheduled/PipelineServiceStatusJob.java | 57 ++++++++++++ .../PipelineServiceStatusJobHandler.java | 88 +++++++++++++++++++ .../resources/openmetadata-secure-test.yaml | 1 + .../pipelineServiceClientConfiguration.json | 5 ++ 14 files changed, 170 insertions(+), 1 deletion(-) create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/PipelineServiceStatusJob.java create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/PipelineServiceStatusJobHandler.java diff --git a/conf/openmetadata.yaml b/conf/openmetadata.yaml index 78ef0a003f7..f09fae1d85f 100644 --- a/conf/openmetadata.yaml +++ b/conf/openmetadata.yaml @@ -246,6 +246,7 @@ pipelineServiceClientConfiguration: metadataApiEndpoint: ${SERVER_HOST_API_URL:-http://localhost:8585/api} ingestionIpInfoEnabled: ${PIPELINE_SERVICE_IP_INFO_ENABLED:-false} hostIp: ${PIPELINE_SERVICE_CLIENT_HOST_IP:-""} + healthCheckInterval: ${PIPELINE_SERVICE_CLIENT_HEALTH_CHECK_INTERVAL:-300} # This SSL information is about the OpenMetadata server. # It will be picked up from the pipelineServiceClient to use/ignore SSL when connecting to the OpenMetadata server. verifySSL: ${PIPELINE_SERVICE_CLIENT_VERIFY_SSL:-"no-ssl"} # Possible values are "no-ssl", "ignore", "validate" diff --git a/docker/development/docker-compose-postgres.yml b/docker/development/docker-compose-postgres.yml index 3a710e5518a..0cf39f308ba 100644 --- a/docker/development/docker-compose-postgres.yml +++ b/docker/development/docker-compose-postgres.yml @@ -87,6 +87,7 @@ services: JWT_KEY_ID: ${JWT_KEY_ID:-"Gb389a-9f76-gdjs-a92j-0242bk94356"} # OpenMetadata Server Pipeline Service Client Configuration PIPELINE_SERVICE_CLIENT_ENDPOINT: ${PIPELINE_SERVICE_CLIENT_ENDPOINT:-http://ingestion:8080} + PIPELINE_SERVICE_CLIENT_HEALTH_CHECK_INTERVAL: ${PIPELINE_SERVICE_CLIENT_HEALTH_CHECK_INTERVAL:-300} SERVER_HOST_API_URL: ${SERVER_HOST_API_URL:-http://openmetadata-server:8585/api} PIPELINE_SERVICE_CLIENT_VERIFY_SSL: ${PIPELINE_SERVICE_CLIENT_VERIFY_SSL:-"no-ssl"} PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH: ${PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH:-""} diff --git a/docker/development/docker-compose.yml b/docker/development/docker-compose.yml index 1d3c82862ea..e0975b43faf 100644 --- a/docker/development/docker-compose.yml +++ b/docker/development/docker-compose.yml @@ -86,6 +86,7 @@ services: JWT_KEY_ID: ${JWT_KEY_ID:-"Gb389a-9f76-gdjs-a92j-0242bk94356"} # OpenMetadata Server Pipeline Service Client Configuration PIPELINE_SERVICE_CLIENT_ENDPOINT: ${PIPELINE_SERVICE_CLIENT_ENDPOINT:-http://ingestion:8080} + PIPELINE_SERVICE_CLIENT_HEALTH_CHECK_INTERVAL: ${PIPELINE_SERVICE_CLIENT_HEALTH_CHECK_INTERVAL:-300} SERVER_HOST_API_URL: ${SERVER_HOST_API_URL:-http://openmetadata-server:8585/api} PIPELINE_SERVICE_CLIENT_VERIFY_SSL: ${PIPELINE_SERVICE_CLIENT_VERIFY_SSL:-"no-ssl"} PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH: ${PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH:-""} diff --git a/docker/docker-compose-openmetadata/docker-compose-openmetadata-postgres.yml b/docker/docker-compose-openmetadata/docker-compose-openmetadata-postgres.yml index 071e43b79f9..bb51c90d228 100644 --- a/docker/docker-compose-openmetadata/docker-compose-openmetadata-postgres.yml +++ b/docker/docker-compose-openmetadata/docker-compose-openmetadata-postgres.yml @@ -40,6 +40,7 @@ services: JWT_KEY_ID: ${JWT_KEY_ID:-"Gb389a-9f76-gdjs-a92j-0242bk94356"} # OpenMetadata Server Pipeline Service Client Configuration PIPELINE_SERVICE_CLIENT_ENDPOINT: ${PIPELINE_SERVICE_CLIENT_ENDPOINT:-http://ingestion:8080} + PIPELINE_SERVICE_CLIENT_HEALTH_CHECK_INTERVAL: ${PIPELINE_SERVICE_CLIENT_HEALTH_CHECK_INTERVAL:-300} SERVER_HOST_API_URL: ${SERVER_HOST_API_URL:-http://openmetadata-server:8585/api} PIPELINE_SERVICE_CLIENT_VERIFY_SSL: ${PIPELINE_SERVICE_CLIENT_VERIFY_SSL:-"no-ssl"} PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH: ${PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH:-""} diff --git a/docker/docker-compose-openmetadata/docker-compose-openmetadata.yml b/docker/docker-compose-openmetadata/docker-compose-openmetadata.yml index 8a3a6a06310..a3e4fb656cf 100644 --- a/docker/docker-compose-openmetadata/docker-compose-openmetadata.yml +++ b/docker/docker-compose-openmetadata/docker-compose-openmetadata.yml @@ -40,6 +40,7 @@ services: JWT_KEY_ID: ${JWT_KEY_ID:-"Gb389a-9f76-gdjs-a92j-0242bk94356"} # OpenMetadata Server Pipeline Service Client Configuration PIPELINE_SERVICE_CLIENT_ENDPOINT: ${PIPELINE_SERVICE_CLIENT_ENDPOINT:-http://ingestion:8080} + PIPELINE_SERVICE_CLIENT_HEALTH_CHECK_INTERVAL: ${PIPELINE_SERVICE_CLIENT_HEALTH_CHECK_INTERVAL:-300} SERVER_HOST_API_URL: ${SERVER_HOST_API_URL:-http://openmetadata-server:8585/api} PIPELINE_SERVICE_CLIENT_VERIFY_SSL: ${PIPELINE_SERVICE_CLIENT_VERIFY_SSL:-"no-ssl"} PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH: ${PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH:-""} diff --git a/docker/docker-compose-quickstart/docker-compose-postgres.yml b/docker/docker-compose-quickstart/docker-compose-postgres.yml index acd1bf9ea14..d0f96e1b8a9 100644 --- a/docker/docker-compose-quickstart/docker-compose-postgres.yml +++ b/docker/docker-compose-quickstart/docker-compose-postgres.yml @@ -81,6 +81,7 @@ services: JWT_KEY_ID: ${JWT_KEY_ID:-"Gb389a-9f76-gdjs-a92j-0242bk94356"} # OpenMetadata Server Pipeline Service Client Configuration PIPELINE_SERVICE_CLIENT_ENDPOINT: ${PIPELINE_SERVICE_CLIENT_ENDPOINT:-http://ingestion:8080} + PIPELINE_SERVICE_CLIENT_HEALTH_CHECK_INTERVAL: ${PIPELINE_SERVICE_CLIENT_HEALTH_CHECK_INTERVAL:-300} SERVER_HOST_API_URL: ${SERVER_HOST_API_URL:-http://openmetadata-server:8585/api} PIPELINE_SERVICE_CLIENT_VERIFY_SSL: ${PIPELINE_SERVICE_CLIENT_VERIFY_SSL:-"no-ssl"} PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH: ${PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH:-""} diff --git a/docker/docker-compose-quickstart/docker-compose.yml b/docker/docker-compose-quickstart/docker-compose.yml index aa5c0ef02c1..0c4412fb5e0 100644 --- a/docker/docker-compose-quickstart/docker-compose.yml +++ b/docker/docker-compose-quickstart/docker-compose.yml @@ -79,6 +79,7 @@ services: JWT_KEY_ID: ${JWT_KEY_ID:-"Gb389a-9f76-gdjs-a92j-0242bk94356"} # OpenMetadata Server Pipeline Service Client Configuration PIPELINE_SERVICE_CLIENT_ENDPOINT: ${PIPELINE_SERVICE_CLIENT_ENDPOINT:-http://ingestion:8080} + PIPELINE_SERVICE_CLIENT_HEALTH_CHECK_INTERVAL: ${PIPELINE_SERVICE_CLIENT_HEALTH_CHECK_INTERVAL:-300} SERVER_HOST_API_URL: ${SERVER_HOST_API_URL:-http://openmetadata-server:8585/api} PIPELINE_SERVICE_CLIENT_VERIFY_SSL: ${PIPELINE_SERVICE_CLIENT_VERIFY_SSL:-"no-ssl"} PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH: ${PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH:-""} diff --git a/openmetadata-docs/content/v1.1.0-snapshot/deployment/upgrade/index.md b/openmetadata-docs/content/v1.1.0-snapshot/deployment/upgrade/index.md index 899bb1986e6..a0276971389 100644 --- a/openmetadata-docs/content/v1.1.0-snapshot/deployment/upgrade/index.md +++ b/openmetadata-docs/content/v1.1.0-snapshot/deployment/upgrade/index.md @@ -77,6 +77,7 @@ pipelineServiceClientConfiguration: # Secrets Manager Loader: specify to the Ingestion Framework how to load the SM credentials from its env # Supported: noop, airflow, env secretsManagerLoader: ${PIPELINE_SERVICE_CLIENT_SECRETS_MANAGER_LOADER:-"noop"} + healthCheckInterval: ${PIPELINE_SERVICE_CLIENT_HEALTH_CHECK_INTERVAL:-300} ``` ### Secrets Manager YAML config diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java b/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java index 09388e3d733..f6cdb08bf91 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java @@ -75,6 +75,7 @@ import org.openmetadata.schema.auth.SSOAuthMechanism; import org.openmetadata.service.elasticsearch.ElasticSearchEventPublisher; import org.openmetadata.service.events.EventFilter; import org.openmetadata.service.events.EventPubSub; +import org.openmetadata.service.events.scheduled.PipelineServiceStatusJobHandler; import org.openmetadata.service.events.scheduled.ReportsHandler; import org.openmetadata.service.exception.CatalogGenericExceptionMapper; import org.openmetadata.service.exception.ConstraintViolationExceptionMapper; @@ -215,6 +216,12 @@ public class OpenMetadataApplication extends Application status = + buildUnhealthyStatus(String.format("Failed to get REST status due to [%s].", e.getMessage())); + + return Response.ok(status, MediaType.APPLICATION_JSON_TYPE).build(); } throw new PipelineServiceClientException(String.format("Failed to get REST status due to %s.", response.body())); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/PipelineServiceStatusJob.java b/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/PipelineServiceStatusJob.java new file mode 100644 index 00000000000..855f7db572f --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/PipelineServiceStatusJob.java @@ -0,0 +1,57 @@ +package org.openmetadata.service.events.scheduled; + +import static org.openmetadata.service.events.scheduled.PipelineServiceStatusJobHandler.JOB_CONTEXT_CLUSTER_NAME; +import static org.openmetadata.service.events.scheduled.PipelineServiceStatusJobHandler.JOB_CONTEXT_METER_REGISTRY; +import static org.openmetadata.service.events.scheduled.PipelineServiceStatusJobHandler.JOB_CONTEXT_PIPELINE_SERVICE_CLIENT; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.prometheus.PrometheusMeterRegistry; +import java.util.Map; +import javax.ws.rs.core.Response; +import lombok.extern.slf4j.Slf4j; +import org.openmetadata.sdk.PipelineServiceClient; +import org.quartz.Job; +import org.quartz.JobExecutionContext; + +@Slf4j +public class PipelineServiceStatusJob implements Job { + + private static final String HEALTHY_STATUS = "healthy"; + private static final String STATUS_KEY = "status"; + private static final String COUNTER_NAME = "pipelineServiceClientStatus.counter"; + private static final String UNHEALTHY_TAG_NAME = "unhealthy"; + private static final String CLUSTER_TAG_NAME = "clusterName"; + + @Override + public void execute(JobExecutionContext jobExecutionContext) { + + PipelineServiceClient pipelineServiceClient = + (PipelineServiceClient) + jobExecutionContext.getJobDetail().getJobDataMap().get(JOB_CONTEXT_PIPELINE_SERVICE_CLIENT); + PrometheusMeterRegistry meterRegistry = + (PrometheusMeterRegistry) jobExecutionContext.getJobDetail().getJobDataMap().get(JOB_CONTEXT_METER_REGISTRY); + String clusterName = (String) jobExecutionContext.getJobDetail().getJobDataMap().get(JOB_CONTEXT_CLUSTER_NAME); + try { + registerStatusMetric(pipelineServiceClient, meterRegistry, clusterName); + } catch (Exception e) { + LOG.error("[Pipeline Service Status Job] Failed in sending metric due to", e); + publishUnhealthyCounter(meterRegistry, clusterName); + } + } + + private void registerStatusMetric( + PipelineServiceClient pipelineServiceClient, PrometheusMeterRegistry meterRegistry, String clusterName) { + Response response = pipelineServiceClient.getServiceStatus(); + Map responseMap = (Map) response.getEntity(); + if (responseMap.get(STATUS_KEY) == null || !HEALTHY_STATUS.equals(responseMap.get(STATUS_KEY))) { + publishUnhealthyCounter(meterRegistry, clusterName); + } + } + + private void publishUnhealthyCounter(PrometheusMeterRegistry meterRegistry, String clusterName) { + Counter.builder(COUNTER_NAME) + .tags(STATUS_KEY, UNHEALTHY_TAG_NAME, CLUSTER_TAG_NAME, clusterName) + .register(meterRegistry) + .increment(); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/PipelineServiceStatusJobHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/PipelineServiceStatusJobHandler.java new file mode 100644 index 00000000000..8a9400d128e --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/PipelineServiceStatusJobHandler.java @@ -0,0 +1,88 @@ +package org.openmetadata.service.events.scheduled; + +import io.micrometer.prometheus.PrometheusMeterRegistry; +import lombok.extern.slf4j.Slf4j; +import org.openmetadata.schema.api.configuration.pipelineServiceClient.PipelineServiceClientConfiguration; +import org.openmetadata.sdk.PipelineServiceClient; +import org.openmetadata.service.clients.pipeline.PipelineServiceClientFactory; +import org.openmetadata.service.util.MicrometerBundleSingleton; +import org.quartz.JobBuilder; +import org.quartz.JobDataMap; +import org.quartz.JobDetail; +import org.quartz.Scheduler; +import org.quartz.SchedulerException; +import org.quartz.SimpleScheduleBuilder; +import org.quartz.Trigger; +import org.quartz.TriggerBuilder; +import org.quartz.impl.StdSchedulerFactory; + +@Slf4j +public class PipelineServiceStatusJobHandler { + + public static final String PIPELINE_SERVICE_STATUS_JOB = "pipelineServiceStatusJob"; + public static final String STATUS_GROUP = "status"; + public static final String STATUS_CRON_TRIGGER = "statusTrigger"; + public static final String JOB_CONTEXT_PIPELINE_SERVICE_CLIENT = "pipelineServiceClient"; + public static final String JOB_CONTEXT_METER_REGISTRY = "meterRegistry"; + public static final String JOB_CONTEXT_CLUSTER_NAME = "clusterName"; + + private final PipelineServiceClient pipelineServiceClient; + private final PrometheusMeterRegistry meterRegistry; + private final String clusterName; + private final Integer healthCheckInterval; + private final Scheduler scheduler = new StdSchedulerFactory().getScheduler(); + + private static PipelineServiceStatusJobHandler INSTANCE; + + private PipelineServiceStatusJobHandler(PipelineServiceClientConfiguration config, String clusterName) + throws SchedulerException { + this.pipelineServiceClient = PipelineServiceClientFactory.createPipelineServiceClient(config); + this.meterRegistry = MicrometerBundleSingleton.prometheusMeterRegistry; + this.clusterName = clusterName; + this.healthCheckInterval = config.getHealthCheckInterval(); + this.scheduler.start(); + } + + public static PipelineServiceStatusJobHandler create(PipelineServiceClientConfiguration config, String clusterName) { + if (INSTANCE != null) return INSTANCE; + + try { + INSTANCE = new PipelineServiceStatusJobHandler(config, clusterName); + } catch (Exception ex) { + LOG.error("Failed to initialize the Pipeline Service Status Handler"); + } + + return INSTANCE; + } + + private JobDetail jobBuilder() { + JobDataMap dataMap = new JobDataMap(); + dataMap.put(JOB_CONTEXT_PIPELINE_SERVICE_CLIENT, pipelineServiceClient); + dataMap.put(JOB_CONTEXT_METER_REGISTRY, meterRegistry); + dataMap.put(JOB_CONTEXT_CLUSTER_NAME, clusterName); + + JobBuilder jobBuilder = + JobBuilder.newJob(PipelineServiceStatusJob.class) + .withIdentity(PIPELINE_SERVICE_STATUS_JOB, STATUS_GROUP) + .usingJobData(dataMap); + + return jobBuilder.build(); + } + + private Trigger getTrigger() { + return TriggerBuilder.newTrigger() + .withIdentity(STATUS_CRON_TRIGGER, STATUS_GROUP) + .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(healthCheckInterval).repeatForever()) + .build(); + } + + public void addPipelineServiceStatusJob() { + try { + JobDetail jobDetail = jobBuilder(); + Trigger trigger = getTrigger(); + scheduler.scheduleJob(jobDetail, trigger); + } catch (Exception ex) { + LOG.error("Failed in setting up job Scheduler for Pipeline Service Status", ex); + } + } +} diff --git a/openmetadata-service/src/test/resources/openmetadata-secure-test.yaml b/openmetadata-service/src/test/resources/openmetadata-secure-test.yaml index 09320ab119b..328c2f8ade9 100644 --- a/openmetadata-service/src/test/resources/openmetadata-secure-test.yaml +++ b/openmetadata-service/src/test/resources/openmetadata-secure-test.yaml @@ -204,6 +204,7 @@ pipelineServiceClientConfiguration: metadataApiEndpoint: http://localhost:8585/api apiEndpoint: http://localhost:8080 hostIp: "" + healthCheckInterval: 300 verifySSL: "no-ssl" authProvider: "openmetadata" diff --git a/openmetadata-spec/src/main/resources/json/schema/configuration/pipelineServiceClientConfiguration.json b/openmetadata-spec/src/main/resources/json/schema/configuration/pipelineServiceClientConfiguration.json index 03020eb39ec..90228f16f44 100644 --- a/openmetadata-spec/src/main/resources/json/schema/configuration/pipelineServiceClientConfiguration.json +++ b/openmetadata-spec/src/main/resources/json/schema/configuration/pipelineServiceClientConfiguration.json @@ -18,6 +18,11 @@ "description": "Pipeline Service Client host IP that will be used to connect to the sources.", "type": "string" }, + "healthCheckInterval": { + "description": "Interval in seconds that the server will use to check the /status of the pipelineServiceClient and flag any errors in a Prometheus metric `pipelineServiceClientStatus.counter`.", + "type": "integer", + "default": 300 + }, "ingestionIpInfoEnabled": { "description": "Enable or disable the API that fetches the public IP running the ingestion process.", "type": "boolean",