Fix #9044 - Add prometheus event monitor (#9176)

* Add prometheus event monitor

* Format

* Format
This commit is contained in:
Pere Miquel Brull 2022-12-09 11:51:39 +01:00 committed by GitHub
parent becc7c1dde
commit ba9109d318
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 124 additions and 4 deletions

View File

@ -171,7 +171,7 @@ elasticsearch:
batchSize: ${ELASTICSEARCH_BATCH_SIZE:-10}
# eventMonitoringConfiguration:
# eventMonitor: ${EVENT_MONITOR:-cloudwatch} # Possible values are "cloudwatch"
# eventMonitor: ${EVENT_MONITOR:-prometheus} # Possible values are "prometheus", "cloudwatch"
# batchSize: ${EVENT_MONITOR_BATCH_SIZE:-10}
# it will use the default auth provider for AWS services if parameters are not set
# parameters:

View File

@ -51,8 +51,6 @@ public class CloudwatchEventMonitor extends EventMonitor {
private final CloudWatchClient client;
private static CloudwatchEventMonitor INSTANCE;
public CloudwatchEventMonitor(
EventMonitorProvider eventMonitorProvider, EventMonitorConfiguration config, String clusterPrefix) {
super(eventMonitorProvider, config, clusterPrefix);

View File

@ -22,6 +22,8 @@ public class EventMonitorFactory {
if (eventMonitorProvider == EventMonitorProvider.CLOUDWATCH) {
return new CloudwatchEventMonitor(eventMonitorProvider, config, clusterName);
} else if (eventMonitorProvider == EventMonitorProvider.PROMETHEUS) {
return new PrometheusEventMonitor(eventMonitorProvider, config, clusterName);
}
throw new IllegalArgumentException("Not implemented Event monitor: " + eventMonitorProvider);

View File

@ -0,0 +1,104 @@
/*
* Copyright 2022 Collate
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.service.monitoring;
import io.micrometer.core.instrument.Counter;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus;
import org.openmetadata.schema.monitoring.EventMonitorProvider;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.service.util.MicrometerBundleSingleton;
import software.amazon.awssdk.services.cloudwatch.model.CloudWatchException;
@Slf4j
public class PrometheusEventMonitor extends EventMonitor {
private final PrometheusMeterRegistry meterRegistry;
private static final String PIPELINE_STATUS = "pipelineStatus";
private static final String COUNTER_NAME = "ingestionPipeline.counter";
private static final String FQN_TAG_NAME = "fqn";
private static final String PIPELINE_TYPE_TAG_NAME = "pipelineType";
private static final String EVENT_TYPE_TAG_NAME = "eventType";
private static final String CLUSTER_TAG_NAME = "clusterName";
public PrometheusEventMonitor(
EventMonitorProvider eventMonitorProvider, EventMonitorConfiguration config, String clusterPrefix) {
super(eventMonitorProvider, config, clusterPrefix);
meterRegistry = MicrometerBundleSingleton.prometheusMeterRegistry;
}
@Override
protected void pushMetric(ChangeEvent event) {
String fqn = event.getEntityFullyQualifiedName();
IngestionPipeline ingestionPipeline = (IngestionPipeline) event.getEntity();
String pipelineType = ingestionPipeline.getPipelineType().toString();
try {
switch (event.getEventType()) {
case ENTITY_DELETED:
case ENTITY_SOFT_DELETED:
case ENTITY_CREATED:
incrementIngestionPipelineCounter(fqn, pipelineType, event.getEventType().value());
break;
case ENTITY_UPDATED:
// we can have multiple updates bundled together
event
.getChangeDescription()
.getFieldsUpdated()
.forEach(
change -> {
if (change.getName().equals(PIPELINE_STATUS) && change.getNewValue() != null) {
PipelineStatus pipelineStatus = (PipelineStatus) change.getNewValue();
incrementIngestionPipelineCounter(fqn, pipelineType, pipelineStatus.getPipelineState().value());
}
});
default:
throw new IllegalArgumentException("Invalid EventType " + event.getEventType());
}
} catch (IllegalArgumentException | CloudWatchException e) {
LOG.error("Failed to publish IngestionPipeline Cloudwatch metric due to " + e.getMessage());
}
}
@Override
protected void close() {
meterRegistry.close();
}
/**
* A new counter will be created only if it does not exist for the given set of tags. Otherwise, micrometer will
* increase the count of the existing counter. Ref <a
* href="https://stackoverflow.com/questions/59592118/dynamic-tag-values-for-the-counter-metric-in-micrometer">...</a>
*
* @param fqn Pipeline FQN
* @param pipelineType Metadata, Profiler,...
* @param eventType running, failed, entityCreated,...
*/
public void incrementIngestionPipelineCounter(String fqn, String pipelineType, String eventType) {
Counter.builder(COUNTER_NAME)
.tags(
FQN_TAG_NAME,
fqn,
PIPELINE_TYPE_TAG_NAME,
pipelineType,
EVENT_TYPE_TAG_NAME,
eventType,
CLUSTER_TAG_NAME,
getClusterPrefix())
.register(meterRegistry)
.increment();
}
}

View File

@ -1,11 +1,25 @@
/*
* Copyright 2022 Collate
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.service.util;
import io.github.maksymdolgykh.dropwizard.micrometer.MicrometerBundle;
import io.micrometer.core.instrument.Timer;
import io.micrometer.prometheus.PrometheusMeterRegistry;
public class MicrometerBundleSingleton {
private static MicrometerBundle INSTANCE;
public static Timer webAnalyticEvents;
public static PrometheusMeterRegistry prometheusMeterRegistry;
private MicrometerBundleSingleton() {}
@ -13,6 +27,8 @@ public class MicrometerBundleSingleton {
if (INSTANCE == null) {
INSTANCE = new MicrometerBundle();
webAnalyticEvents = MicrometerBundle.prometheusRegistry.timer("web.analytics.events");
// We'll use this registry to add monitoring around Ingestion Pipelines
prometheusMeterRegistry = MicrometerBundle.prometheusRegistry;
}
return INSTANCE;

View File

@ -5,6 +5,6 @@
"description": "OpenMetadata Event Monitor Provider.",
"type": "string",
"javaType": "org.openmetadata.schema.monitoring.EventMonitorProvider",
"enum": ["cloudwatch"],
"enum": ["cloudwatch", "prometheus"],
"additionalProperties": false
}