mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-09-25 17:04:54 +00:00
Add Pipeline Service Status Job to publish to Prometheus (#12046)
* Add Pipeline Service Status Job to publish to Prometheus * Add Pipeline Service Status Job to publish to Prometheus * Add config for health check interval
This commit is contained in:
parent
1e86b6533c
commit
520d5e2657
@ -246,6 +246,7 @@ pipelineServiceClientConfiguration:
|
||||
metadataApiEndpoint: ${SERVER_HOST_API_URL:-http://localhost:8585/api}
|
||||
ingestionIpInfoEnabled: ${PIPELINE_SERVICE_IP_INFO_ENABLED:-false}
|
||||
hostIp: ${PIPELINE_SERVICE_CLIENT_HOST_IP:-""}
|
||||
healthCheckInterval: ${PIPELINE_SERVICE_CLIENT_HEALTH_CHECK_INTERVAL:-300}
|
||||
# This SSL information is about the OpenMetadata server.
|
||||
# It will be picked up from the pipelineServiceClient to use/ignore SSL when connecting to the OpenMetadata server.
|
||||
verifySSL: ${PIPELINE_SERVICE_CLIENT_VERIFY_SSL:-"no-ssl"} # Possible values are "no-ssl", "ignore", "validate"
|
||||
|
@ -87,6 +87,7 @@ services:
|
||||
JWT_KEY_ID: ${JWT_KEY_ID:-"Gb389a-9f76-gdjs-a92j-0242bk94356"}
|
||||
# OpenMetadata Server Pipeline Service Client Configuration
|
||||
PIPELINE_SERVICE_CLIENT_ENDPOINT: ${PIPELINE_SERVICE_CLIENT_ENDPOINT:-http://ingestion:8080}
|
||||
PIPELINE_SERVICE_CLIENT_HEALTH_CHECK_INTERVAL: ${PIPELINE_SERVICE_CLIENT_HEALTH_CHECK_INTERVAL:-300}
|
||||
SERVER_HOST_API_URL: ${SERVER_HOST_API_URL:-http://openmetadata-server:8585/api}
|
||||
PIPELINE_SERVICE_CLIENT_VERIFY_SSL: ${PIPELINE_SERVICE_CLIENT_VERIFY_SSL:-"no-ssl"}
|
||||
PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH: ${PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH:-""}
|
||||
|
@ -86,6 +86,7 @@ services:
|
||||
JWT_KEY_ID: ${JWT_KEY_ID:-"Gb389a-9f76-gdjs-a92j-0242bk94356"}
|
||||
# OpenMetadata Server Pipeline Service Client Configuration
|
||||
PIPELINE_SERVICE_CLIENT_ENDPOINT: ${PIPELINE_SERVICE_CLIENT_ENDPOINT:-http://ingestion:8080}
|
||||
PIPELINE_SERVICE_CLIENT_HEALTH_CHECK_INTERVAL: ${PIPELINE_SERVICE_CLIENT_HEALTH_CHECK_INTERVAL:-300}
|
||||
SERVER_HOST_API_URL: ${SERVER_HOST_API_URL:-http://openmetadata-server:8585/api}
|
||||
PIPELINE_SERVICE_CLIENT_VERIFY_SSL: ${PIPELINE_SERVICE_CLIENT_VERIFY_SSL:-"no-ssl"}
|
||||
PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH: ${PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH:-""}
|
||||
|
@ -40,6 +40,7 @@ services:
|
||||
JWT_KEY_ID: ${JWT_KEY_ID:-"Gb389a-9f76-gdjs-a92j-0242bk94356"}
|
||||
# OpenMetadata Server Pipeline Service Client Configuration
|
||||
PIPELINE_SERVICE_CLIENT_ENDPOINT: ${PIPELINE_SERVICE_CLIENT_ENDPOINT:-http://ingestion:8080}
|
||||
PIPELINE_SERVICE_CLIENT_HEALTH_CHECK_INTERVAL: ${PIPELINE_SERVICE_CLIENT_HEALTH_CHECK_INTERVAL:-300}
|
||||
SERVER_HOST_API_URL: ${SERVER_HOST_API_URL:-http://openmetadata-server:8585/api}
|
||||
PIPELINE_SERVICE_CLIENT_VERIFY_SSL: ${PIPELINE_SERVICE_CLIENT_VERIFY_SSL:-"no-ssl"}
|
||||
PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH: ${PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH:-""}
|
||||
|
@ -40,6 +40,7 @@ services:
|
||||
JWT_KEY_ID: ${JWT_KEY_ID:-"Gb389a-9f76-gdjs-a92j-0242bk94356"}
|
||||
# OpenMetadata Server Pipeline Service Client Configuration
|
||||
PIPELINE_SERVICE_CLIENT_ENDPOINT: ${PIPELINE_SERVICE_CLIENT_ENDPOINT:-http://ingestion:8080}
|
||||
PIPELINE_SERVICE_CLIENT_HEALTH_CHECK_INTERVAL: ${PIPELINE_SERVICE_CLIENT_HEALTH_CHECK_INTERVAL:-300}
|
||||
SERVER_HOST_API_URL: ${SERVER_HOST_API_URL:-http://openmetadata-server:8585/api}
|
||||
PIPELINE_SERVICE_CLIENT_VERIFY_SSL: ${PIPELINE_SERVICE_CLIENT_VERIFY_SSL:-"no-ssl"}
|
||||
PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH: ${PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH:-""}
|
||||
|
@ -81,6 +81,7 @@ services:
|
||||
JWT_KEY_ID: ${JWT_KEY_ID:-"Gb389a-9f76-gdjs-a92j-0242bk94356"}
|
||||
# OpenMetadata Server Pipeline Service Client Configuration
|
||||
PIPELINE_SERVICE_CLIENT_ENDPOINT: ${PIPELINE_SERVICE_CLIENT_ENDPOINT:-http://ingestion:8080}
|
||||
PIPELINE_SERVICE_CLIENT_HEALTH_CHECK_INTERVAL: ${PIPELINE_SERVICE_CLIENT_HEALTH_CHECK_INTERVAL:-300}
|
||||
SERVER_HOST_API_URL: ${SERVER_HOST_API_URL:-http://openmetadata-server:8585/api}
|
||||
PIPELINE_SERVICE_CLIENT_VERIFY_SSL: ${PIPELINE_SERVICE_CLIENT_VERIFY_SSL:-"no-ssl"}
|
||||
PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH: ${PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH:-""}
|
||||
|
@ -79,6 +79,7 @@ services:
|
||||
JWT_KEY_ID: ${JWT_KEY_ID:-"Gb389a-9f76-gdjs-a92j-0242bk94356"}
|
||||
# OpenMetadata Server Pipeline Service Client Configuration
|
||||
PIPELINE_SERVICE_CLIENT_ENDPOINT: ${PIPELINE_SERVICE_CLIENT_ENDPOINT:-http://ingestion:8080}
|
||||
PIPELINE_SERVICE_CLIENT_HEALTH_CHECK_INTERVAL: ${PIPELINE_SERVICE_CLIENT_HEALTH_CHECK_INTERVAL:-300}
|
||||
SERVER_HOST_API_URL: ${SERVER_HOST_API_URL:-http://openmetadata-server:8585/api}
|
||||
PIPELINE_SERVICE_CLIENT_VERIFY_SSL: ${PIPELINE_SERVICE_CLIENT_VERIFY_SSL:-"no-ssl"}
|
||||
PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH: ${PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH:-""}
|
||||
|
@ -77,6 +77,7 @@ pipelineServiceClientConfiguration:
|
||||
# Secrets Manager Loader: specify to the Ingestion Framework how to load the SM credentials from its env
|
||||
# Supported: noop, airflow, env
|
||||
secretsManagerLoader: ${PIPELINE_SERVICE_CLIENT_SECRETS_MANAGER_LOADER:-"noop"}
|
||||
healthCheckInterval: ${PIPELINE_SERVICE_CLIENT_HEALTH_CHECK_INTERVAL:-300}
|
||||
```
|
||||
|
||||
### Secrets Manager YAML config
|
||||
|
@ -75,6 +75,7 @@ import org.openmetadata.schema.auth.SSOAuthMechanism;
|
||||
import org.openmetadata.service.elasticsearch.ElasticSearchEventPublisher;
|
||||
import org.openmetadata.service.events.EventFilter;
|
||||
import org.openmetadata.service.events.EventPubSub;
|
||||
import org.openmetadata.service.events.scheduled.PipelineServiceStatusJobHandler;
|
||||
import org.openmetadata.service.events.scheduled.ReportsHandler;
|
||||
import org.openmetadata.service.exception.CatalogGenericExceptionMapper;
|
||||
import org.openmetadata.service.exception.ConstraintViolationExceptionMapper;
|
||||
@ -215,6 +216,12 @@ public class OpenMetadataApplication extends Application<OpenMetadataApplication
|
||||
environment.servlets().addServlet("static", assetServlet).addMapping(pathPattern);
|
||||
|
||||
registerExtensions(catalogConfig, environment, jdbi);
|
||||
|
||||
// Handle Pipeline Service Client Status job
|
||||
PipelineServiceStatusJobHandler pipelineServiceStatusJobHandler =
|
||||
PipelineServiceStatusJobHandler.create(
|
||||
catalogConfig.getPipelineServiceClientConfiguration(), catalogConfig.getClusterName());
|
||||
pipelineServiceStatusJobHandler.addPipelineServiceStatusJob();
|
||||
}
|
||||
|
||||
private void registerExtensions(OpenMetadataApplicationConfig catalogConfig, Environment environment, Jdbi jdbi) {
|
||||
|
@ -267,7 +267,10 @@ public class AirflowRESTClient extends PipelineServiceClient {
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
throw PipelineServiceClientException.byMessage("Failed to get REST status.", e.getMessage());
|
||||
Map<String, String> status =
|
||||
buildUnhealthyStatus(String.format("Failed to get REST status due to [%s].", e.getMessage()));
|
||||
|
||||
return Response.ok(status, MediaType.APPLICATION_JSON_TYPE).build();
|
||||
}
|
||||
throw new PipelineServiceClientException(String.format("Failed to get REST status due to %s.", response.body()));
|
||||
}
|
||||
|
@ -0,0 +1,57 @@
|
||||
package org.openmetadata.service.events.scheduled;
|
||||
|
||||
import static org.openmetadata.service.events.scheduled.PipelineServiceStatusJobHandler.JOB_CONTEXT_CLUSTER_NAME;
|
||||
import static org.openmetadata.service.events.scheduled.PipelineServiceStatusJobHandler.JOB_CONTEXT_METER_REGISTRY;
|
||||
import static org.openmetadata.service.events.scheduled.PipelineServiceStatusJobHandler.JOB_CONTEXT_PIPELINE_SERVICE_CLIENT;
|
||||
|
||||
import io.micrometer.core.instrument.Counter;
|
||||
import io.micrometer.prometheus.PrometheusMeterRegistry;
|
||||
import java.util.Map;
|
||||
import javax.ws.rs.core.Response;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.openmetadata.sdk.PipelineServiceClient;
|
||||
import org.quartz.Job;
|
||||
import org.quartz.JobExecutionContext;
|
||||
|
||||
@Slf4j
|
||||
public class PipelineServiceStatusJob implements Job {
|
||||
|
||||
private static final String HEALTHY_STATUS = "healthy";
|
||||
private static final String STATUS_KEY = "status";
|
||||
private static final String COUNTER_NAME = "pipelineServiceClientStatus.counter";
|
||||
private static final String UNHEALTHY_TAG_NAME = "unhealthy";
|
||||
private static final String CLUSTER_TAG_NAME = "clusterName";
|
||||
|
||||
@Override
|
||||
public void execute(JobExecutionContext jobExecutionContext) {
|
||||
|
||||
PipelineServiceClient pipelineServiceClient =
|
||||
(PipelineServiceClient)
|
||||
jobExecutionContext.getJobDetail().getJobDataMap().get(JOB_CONTEXT_PIPELINE_SERVICE_CLIENT);
|
||||
PrometheusMeterRegistry meterRegistry =
|
||||
(PrometheusMeterRegistry) jobExecutionContext.getJobDetail().getJobDataMap().get(JOB_CONTEXT_METER_REGISTRY);
|
||||
String clusterName = (String) jobExecutionContext.getJobDetail().getJobDataMap().get(JOB_CONTEXT_CLUSTER_NAME);
|
||||
try {
|
||||
registerStatusMetric(pipelineServiceClient, meterRegistry, clusterName);
|
||||
} catch (Exception e) {
|
||||
LOG.error("[Pipeline Service Status Job] Failed in sending metric due to", e);
|
||||
publishUnhealthyCounter(meterRegistry, clusterName);
|
||||
}
|
||||
}
|
||||
|
||||
private void registerStatusMetric(
|
||||
PipelineServiceClient pipelineServiceClient, PrometheusMeterRegistry meterRegistry, String clusterName) {
|
||||
Response response = pipelineServiceClient.getServiceStatus();
|
||||
Map<String, String> responseMap = (Map<String, String>) response.getEntity();
|
||||
if (responseMap.get(STATUS_KEY) == null || !HEALTHY_STATUS.equals(responseMap.get(STATUS_KEY))) {
|
||||
publishUnhealthyCounter(meterRegistry, clusterName);
|
||||
}
|
||||
}
|
||||
|
||||
private void publishUnhealthyCounter(PrometheusMeterRegistry meterRegistry, String clusterName) {
|
||||
Counter.builder(COUNTER_NAME)
|
||||
.tags(STATUS_KEY, UNHEALTHY_TAG_NAME, CLUSTER_TAG_NAME, clusterName)
|
||||
.register(meterRegistry)
|
||||
.increment();
|
||||
}
|
||||
}
|
@ -0,0 +1,88 @@
|
||||
package org.openmetadata.service.events.scheduled;
|
||||
|
||||
import io.micrometer.prometheus.PrometheusMeterRegistry;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.openmetadata.schema.api.configuration.pipelineServiceClient.PipelineServiceClientConfiguration;
|
||||
import org.openmetadata.sdk.PipelineServiceClient;
|
||||
import org.openmetadata.service.clients.pipeline.PipelineServiceClientFactory;
|
||||
import org.openmetadata.service.util.MicrometerBundleSingleton;
|
||||
import org.quartz.JobBuilder;
|
||||
import org.quartz.JobDataMap;
|
||||
import org.quartz.JobDetail;
|
||||
import org.quartz.Scheduler;
|
||||
import org.quartz.SchedulerException;
|
||||
import org.quartz.SimpleScheduleBuilder;
|
||||
import org.quartz.Trigger;
|
||||
import org.quartz.TriggerBuilder;
|
||||
import org.quartz.impl.StdSchedulerFactory;
|
||||
|
||||
@Slf4j
|
||||
public class PipelineServiceStatusJobHandler {
|
||||
|
||||
public static final String PIPELINE_SERVICE_STATUS_JOB = "pipelineServiceStatusJob";
|
||||
public static final String STATUS_GROUP = "status";
|
||||
public static final String STATUS_CRON_TRIGGER = "statusTrigger";
|
||||
public static final String JOB_CONTEXT_PIPELINE_SERVICE_CLIENT = "pipelineServiceClient";
|
||||
public static final String JOB_CONTEXT_METER_REGISTRY = "meterRegistry";
|
||||
public static final String JOB_CONTEXT_CLUSTER_NAME = "clusterName";
|
||||
|
||||
private final PipelineServiceClient pipelineServiceClient;
|
||||
private final PrometheusMeterRegistry meterRegistry;
|
||||
private final String clusterName;
|
||||
private final Integer healthCheckInterval;
|
||||
private final Scheduler scheduler = new StdSchedulerFactory().getScheduler();
|
||||
|
||||
private static PipelineServiceStatusJobHandler INSTANCE;
|
||||
|
||||
private PipelineServiceStatusJobHandler(PipelineServiceClientConfiguration config, String clusterName)
|
||||
throws SchedulerException {
|
||||
this.pipelineServiceClient = PipelineServiceClientFactory.createPipelineServiceClient(config);
|
||||
this.meterRegistry = MicrometerBundleSingleton.prometheusMeterRegistry;
|
||||
this.clusterName = clusterName;
|
||||
this.healthCheckInterval = config.getHealthCheckInterval();
|
||||
this.scheduler.start();
|
||||
}
|
||||
|
||||
public static PipelineServiceStatusJobHandler create(PipelineServiceClientConfiguration config, String clusterName) {
|
||||
if (INSTANCE != null) return INSTANCE;
|
||||
|
||||
try {
|
||||
INSTANCE = new PipelineServiceStatusJobHandler(config, clusterName);
|
||||
} catch (Exception ex) {
|
||||
LOG.error("Failed to initialize the Pipeline Service Status Handler");
|
||||
}
|
||||
|
||||
return INSTANCE;
|
||||
}
|
||||
|
||||
private JobDetail jobBuilder() {
|
||||
JobDataMap dataMap = new JobDataMap();
|
||||
dataMap.put(JOB_CONTEXT_PIPELINE_SERVICE_CLIENT, pipelineServiceClient);
|
||||
dataMap.put(JOB_CONTEXT_METER_REGISTRY, meterRegistry);
|
||||
dataMap.put(JOB_CONTEXT_CLUSTER_NAME, clusterName);
|
||||
|
||||
JobBuilder jobBuilder =
|
||||
JobBuilder.newJob(PipelineServiceStatusJob.class)
|
||||
.withIdentity(PIPELINE_SERVICE_STATUS_JOB, STATUS_GROUP)
|
||||
.usingJobData(dataMap);
|
||||
|
||||
return jobBuilder.build();
|
||||
}
|
||||
|
||||
private Trigger getTrigger() {
|
||||
return TriggerBuilder.newTrigger()
|
||||
.withIdentity(STATUS_CRON_TRIGGER, STATUS_GROUP)
|
||||
.withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(healthCheckInterval).repeatForever())
|
||||
.build();
|
||||
}
|
||||
|
||||
public void addPipelineServiceStatusJob() {
|
||||
try {
|
||||
JobDetail jobDetail = jobBuilder();
|
||||
Trigger trigger = getTrigger();
|
||||
scheduler.scheduleJob(jobDetail, trigger);
|
||||
} catch (Exception ex) {
|
||||
LOG.error("Failed in setting up job Scheduler for Pipeline Service Status", ex);
|
||||
}
|
||||
}
|
||||
}
|
@ -204,6 +204,7 @@ pipelineServiceClientConfiguration:
|
||||
metadataApiEndpoint: http://localhost:8585/api
|
||||
apiEndpoint: http://localhost:8080
|
||||
hostIp: ""
|
||||
healthCheckInterval: 300
|
||||
verifySSL: "no-ssl"
|
||||
authProvider: "openmetadata"
|
||||
|
||||
|
@ -18,6 +18,11 @@
|
||||
"description": "Pipeline Service Client host IP that will be used to connect to the sources.",
|
||||
"type": "string"
|
||||
},
|
||||
"healthCheckInterval": {
|
||||
"description": "Interval in seconds that the server will use to check the /status of the pipelineServiceClient and flag any errors in a Prometheus metric `pipelineServiceClientStatus.counter`.",
|
||||
"type": "integer",
|
||||
"default": 300
|
||||
},
|
||||
"ingestionIpInfoEnabled": {
|
||||
"description": "Enable or disable the API that fetches the public IP running the ingestion process.",
|
||||
"type": "boolean",
|
||||
|
Loading…
x
Reference in New Issue
Block a user