diff --git a/conf/openmetadata.yaml b/conf/openmetadata.yaml index 35c184173fe..9acaf8055ae 100644 --- a/conf/openmetadata.yaml +++ b/conf/openmetadata.yaml @@ -171,7 +171,7 @@ elasticsearch: batchSize: ${ELASTICSEARCH_BATCH_SIZE:-10} # eventMonitoringConfiguration: - # eventMonitor: ${EVENT_MONITOR:-cloudwatch} # Possible values are "cloudwatch" + # eventMonitor: ${EVENT_MONITOR:-prometheus} # Possible values are "prometheus", "cloudwatch" # batchSize: ${EVENT_MONITOR_BATCH_SIZE:-10} # it will use the default auth provider for AWS services if parameters are not set # parameters: diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/monitoring/CloudwatchEventMonitor.java b/openmetadata-service/src/main/java/org/openmetadata/service/monitoring/CloudwatchEventMonitor.java index eb8a28a0e25..a3985cae615 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/monitoring/CloudwatchEventMonitor.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/monitoring/CloudwatchEventMonitor.java @@ -51,8 +51,6 @@ public class CloudwatchEventMonitor extends EventMonitor { private final CloudWatchClient client; - private static CloudwatchEventMonitor INSTANCE; - public CloudwatchEventMonitor( EventMonitorProvider eventMonitorProvider, EventMonitorConfiguration config, String clusterPrefix) { super(eventMonitorProvider, config, clusterPrefix); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/monitoring/EventMonitorFactory.java b/openmetadata-service/src/main/java/org/openmetadata/service/monitoring/EventMonitorFactory.java index d4ff86fa688..43d7b178289 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/monitoring/EventMonitorFactory.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/monitoring/EventMonitorFactory.java @@ -22,6 +22,8 @@ public class EventMonitorFactory { if (eventMonitorProvider == EventMonitorProvider.CLOUDWATCH) { return new CloudwatchEventMonitor(eventMonitorProvider, config, clusterName); + } else if (eventMonitorProvider == EventMonitorProvider.PROMETHEUS) { + return new PrometheusEventMonitor(eventMonitorProvider, config, clusterName); } throw new IllegalArgumentException("Not implemented Event monitor: " + eventMonitorProvider); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/monitoring/PrometheusEventMonitor.java b/openmetadata-service/src/main/java/org/openmetadata/service/monitoring/PrometheusEventMonitor.java new file mode 100644 index 00000000000..4bab82bc9f4 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/monitoring/PrometheusEventMonitor.java @@ -0,0 +1,104 @@ +/* + * Copyright 2022 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.openmetadata.service.monitoring; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.prometheus.PrometheusMeterRegistry; +import lombok.extern.slf4j.Slf4j; +import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline; +import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus; +import org.openmetadata.schema.monitoring.EventMonitorProvider; +import org.openmetadata.schema.type.ChangeEvent; +import org.openmetadata.service.util.MicrometerBundleSingleton; +import software.amazon.awssdk.services.cloudwatch.model.CloudWatchException; + +@Slf4j +public class PrometheusEventMonitor extends EventMonitor { + + private final PrometheusMeterRegistry meterRegistry; + private static final String PIPELINE_STATUS = "pipelineStatus"; + private static final String COUNTER_NAME = "ingestionPipeline.counter"; + private static final String FQN_TAG_NAME = "fqn"; + private static final String PIPELINE_TYPE_TAG_NAME = "pipelineType"; + private static final String EVENT_TYPE_TAG_NAME = "eventType"; + private static final String CLUSTER_TAG_NAME = "clusterName"; + + public PrometheusEventMonitor( + EventMonitorProvider eventMonitorProvider, EventMonitorConfiguration config, String clusterPrefix) { + super(eventMonitorProvider, config, clusterPrefix); + meterRegistry = MicrometerBundleSingleton.prometheusMeterRegistry; + } + + @Override + protected void pushMetric(ChangeEvent event) { + String fqn = event.getEntityFullyQualifiedName(); + IngestionPipeline ingestionPipeline = (IngestionPipeline) event.getEntity(); + String pipelineType = ingestionPipeline.getPipelineType().toString(); + + try { + switch (event.getEventType()) { + case ENTITY_DELETED: + case ENTITY_SOFT_DELETED: + case ENTITY_CREATED: + incrementIngestionPipelineCounter(fqn, pipelineType, event.getEventType().value()); + break; + case ENTITY_UPDATED: + // we can have multiple updates bundled together + event + .getChangeDescription() + .getFieldsUpdated() + .forEach( + change -> { + if (change.getName().equals(PIPELINE_STATUS) && change.getNewValue() != null) { + PipelineStatus pipelineStatus = (PipelineStatus) change.getNewValue(); + incrementIngestionPipelineCounter(fqn, pipelineType, pipelineStatus.getPipelineState().value()); + } + }); + + default: + throw new IllegalArgumentException("Invalid EventType " + event.getEventType()); + } + } catch (IllegalArgumentException | CloudWatchException e) { + LOG.error("Failed to publish IngestionPipeline Cloudwatch metric due to " + e.getMessage()); + } + } + + @Override + protected void close() { + meterRegistry.close(); + } + + /** + * A new counter will be created only if it does not exist for the given set of tags. Otherwise, micrometer will + * increase the count of the existing counter. Ref ... + * + * @param fqn Pipeline FQN + * @param pipelineType Metadata, Profiler,... + * @param eventType running, failed, entityCreated,... + */ + public void incrementIngestionPipelineCounter(String fqn, String pipelineType, String eventType) { + Counter.builder(COUNTER_NAME) + .tags( + FQN_TAG_NAME, + fqn, + PIPELINE_TYPE_TAG_NAME, + pipelineType, + EVENT_TYPE_TAG_NAME, + eventType, + CLUSTER_TAG_NAME, + getClusterPrefix()) + .register(meterRegistry) + .increment(); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/MicrometerBundleSingleton.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/MicrometerBundleSingleton.java index 6fd5374a2ad..d21c65ca178 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/util/MicrometerBundleSingleton.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/MicrometerBundleSingleton.java @@ -1,11 +1,25 @@ +/* + * Copyright 2022 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.openmetadata.service.util; import io.github.maksymdolgykh.dropwizard.micrometer.MicrometerBundle; import io.micrometer.core.instrument.Timer; +import io.micrometer.prometheus.PrometheusMeterRegistry; public class MicrometerBundleSingleton { private static MicrometerBundle INSTANCE; public static Timer webAnalyticEvents; + public static PrometheusMeterRegistry prometheusMeterRegistry; private MicrometerBundleSingleton() {} @@ -13,6 +27,8 @@ public class MicrometerBundleSingleton { if (INSTANCE == null) { INSTANCE = new MicrometerBundle(); webAnalyticEvents = MicrometerBundle.prometheusRegistry.timer("web.analytics.events"); + // We'll use this registry to add monitoring around Ingestion Pipelines + prometheusMeterRegistry = MicrometerBundle.prometheusRegistry; } return INSTANCE; diff --git a/openmetadata-spec/src/main/resources/json/schema/monitoring/eventMonitorProvider.json b/openmetadata-spec/src/main/resources/json/schema/monitoring/eventMonitorProvider.json index 72747f75442..dcdb3f50582 100644 --- a/openmetadata-spec/src/main/resources/json/schema/monitoring/eventMonitorProvider.json +++ b/openmetadata-spec/src/main/resources/json/schema/monitoring/eventMonitorProvider.json @@ -5,6 +5,6 @@ "description": "OpenMetadata Event Monitor Provider.", "type": "string", "javaType": "org.openmetadata.schema.monitoring.EventMonitorProvider", - "enum": ["cloudwatch"], + "enum": ["cloudwatch", "prometheus"], "additionalProperties": false }