diff --git a/conf/openmetadata.yaml b/conf/openmetadata.yaml
index 39645768205..f4776733c9b 100644
--- a/conf/openmetadata.yaml
+++ b/conf/openmetadata.yaml
@@ -170,6 +170,15 @@ elasticsearch:
socketTimeoutSecs: ${ELASTICSEARCH_SOCKET_TIMEOUT_SECS:-60}
batchSize: ${ELASTICSEARCH_BATCH_SIZE:-10}
+# eventMonitoringConfiguration:
+ # eventMonitor: ${EVENT_MONITOR:-cloudwatch} # Possible values are "cloudwatch"
+ # batchSize: ${EVENT_MONITOR_BATCH_SIZE:-10}
+ # it will use the default auth provider for AWS services if parameters are not set
+ # parameters:
+ # region: ${OM_MONITOR_REGION:-""}
+ # accessKeyId: ${OM_MONITOR_ACCESS_KEY_ID:-""}
+ # secretAccessKey: ${OM_MONITOR_ACCESS_KEY:-""}
+
eventHandlerConfiguration:
eventHandlerClassNames:
- "org.openmetadata.service.events.AuditEventHandler"
diff --git a/openmetadata-service/pom.xml b/openmetadata-service/pom.xml
index cb596eb4b39..4ab90944b5f 100644
--- a/openmetadata-service/pom.xml
+++ b/openmetadata-service/pom.xml
@@ -237,6 +237,13 @@
${awssdk.version}
+
+
+ software.amazon.awssdk
+ cloudwatch
+ ${awssdk.version}
+
+
org.glassfish.jersey.core
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java b/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java
index 83915ff063c..bdd1389a1ab 100644
--- a/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java
@@ -73,6 +73,9 @@ import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.locator.ConnectionAwareAnnotationSqlLocator;
import org.openmetadata.service.migration.Migration;
import org.openmetadata.service.migration.MigrationConfiguration;
+import org.openmetadata.service.monitoring.EventMonitor;
+import org.openmetadata.service.monitoring.EventMonitorFactory;
+import org.openmetadata.service.monitoring.EventMonitorPublisher;
import org.openmetadata.service.resources.CollectionRegistry;
import org.openmetadata.service.secrets.SecretsManager;
import org.openmetadata.service.secrets.SecretsManagerFactory;
@@ -303,6 +306,16 @@ public class OpenMetadataApplication extends Application requests = buildMetricRequest(event);
+ requests.forEach(client::putMetricData);
+ }
+
+ protected List buildMetricRequest(ChangeEvent event) {
+ String fqn = event.getEntityFullyQualifiedName();
+ IngestionPipeline ingestionPipeline = (IngestionPipeline) event.getEntity();
+ String pipelineType = ingestionPipeline.getPipelineType().toString();
+ Long timestamp = event.getTimestamp();
+
+ List metricRequests = Collections.emptyList();
+
+ try {
+ switch (event.getEventType()) {
+ case ENTITY_CREATED:
+ metricRequests = List.of(logPipelineCreated(fqn, pipelineType, timestamp));
+ break;
+ case ENTITY_UPDATED:
+ // we can have multiple updates bundled together
+ metricRequests = logPipelineUpdated(fqn, pipelineType, timestamp, event.getChangeDescription());
+ break;
+ case ENTITY_DELETED:
+ case ENTITY_SOFT_DELETED:
+ metricRequests = List.of(logPipelineDeleted(fqn, pipelineType, timestamp));
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid EventType " + event.getEventType());
+ }
+ } catch (IllegalArgumentException | CloudWatchException e) {
+ LOG.error("Failed to publish IngestionPipeline Cloudwatch metric due to " + e.getMessage());
+ }
+
+ return metricRequests;
+ }
+
+ protected PutMetricDataRequest logPipelineCreated(String fqn, String pipelineType, Long timestamp) {
+ return logPipelineStatus(fqn, pipelineType, timestamp, INGESTION_PIPELINE_CREATED);
+ }
+
+ protected PutMetricDataRequest logPipelineDeleted(String fqn, String pipelineType, Long timestamp) {
+ return logPipelineStatus(fqn, pipelineType, timestamp, INGESTION_PIPELINE_DELETED);
+ }
+
+ protected List logPipelineUpdated(
+ String fqn, String pipelineType, Long timestamp, ChangeDescription changeDescription) {
+ return changeDescription.getFieldsUpdated().stream()
+ .map(
+ change -> {
+ if (change.getName().equals(PIPELINE_STATUS) && change.getNewValue() != null) {
+ PipelineStatus pipelineStatus = (PipelineStatus) change.getNewValue();
+ return logPipelineStatus(
+ fqn, pipelineType, timestamp, getMetricNameByStatus(pipelineStatus.getPipelineState()));
+ } else {
+ LOG.debug("Ignoring Ingestion Pipeline change type " + change.getName());
+ }
+ return null;
+ })
+ .collect(Collectors.toList());
+ }
+
+ private String getMetricNameByStatus(PipelineStatusType statusType) {
+ return INGESTION_PIPELINE_UPDATED + statusType.toString().toUpperCase();
+ }
+
+ protected PutMetricDataRequest logPipelineStatus(String fqn, String pipelineType, Long timestamp, String metricName) {
+ Dimension dimension = Dimension.builder().name(pipelineType).value(fqn).build();
+ Instant instant = Instant.ofEpochMilli(timestamp);
+
+ MetricDatum datum =
+ MetricDatum.builder()
+ .metricName(metricName)
+ .unit(StandardUnit.COUNT)
+ .value(1.0)
+ .timestamp(instant)
+ .dimensions(dimension)
+ .build();
+
+ return PutMetricDataRequest.builder().namespace(buildMetricNamespace(NAMESPACE)).metricData(datum).build();
+ }
+
+ @Override
+ protected void close() {
+ client.close();
+ }
+}
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/monitoring/EventMonitor.java b/openmetadata-service/src/main/java/org/openmetadata/service/monitoring/EventMonitor.java
new file mode 100644
index 00000000000..481149e0e92
--- /dev/null
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/monitoring/EventMonitor.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2022 Collate
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openmetadata.service.monitoring;
+
+import lombok.Getter;
+import org.openmetadata.schema.monitoring.EventMonitorProvider;
+import org.openmetadata.schema.type.ChangeEvent;
+
+public abstract class EventMonitor {
+
+ @Getter private final String clusterPrefix;
+
+ @Getter private final EventMonitorProvider eventMonitoringProvider;
+ @Getter private final EventMonitorConfiguration eventMonitorConfiguration;
+
+ protected EventMonitor(
+ EventMonitorProvider eventMonitorProvider, EventMonitorConfiguration config, String clusterPrefix) {
+ this.eventMonitoringProvider = eventMonitorProvider;
+ this.clusterPrefix = clusterPrefix;
+ this.eventMonitorConfiguration = config;
+ }
+
+ protected String buildMetricNamespace(String namespace) {
+ return String.format("%s/%s", this.clusterPrefix, namespace);
+ }
+
+ protected abstract void pushMetric(ChangeEvent event);
+
+ protected abstract void close();
+}
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/monitoring/EventMonitorConfiguration.java b/openmetadata-service/src/main/java/org/openmetadata/service/monitoring/EventMonitorConfiguration.java
new file mode 100644
index 00000000000..ca7cb5f5c8e
--- /dev/null
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/monitoring/EventMonitorConfiguration.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2022 Collate
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.openmetadata.service.monitoring;
+
+import java.util.Map;
+import lombok.Getter;
+import lombok.Setter;
+import org.openmetadata.schema.monitoring.EventMonitorProvider;
+
+@Getter
+@Setter
+public class EventMonitorConfiguration {
+
+ private EventMonitorProvider eventMonitor;
+
+ private int batchSize;
+
+ private Map parameters;
+}
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/monitoring/EventMonitorFactory.java b/openmetadata-service/src/main/java/org/openmetadata/service/monitoring/EventMonitorFactory.java
new file mode 100644
index 00000000000..d4ff86fa688
--- /dev/null
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/monitoring/EventMonitorFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2022 Collate
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openmetadata.service.monitoring;
+
+import org.openmetadata.schema.monitoring.EventMonitorProvider;
+
+public class EventMonitorFactory {
+
+ public static EventMonitor createEventMonitor(EventMonitorConfiguration config, String clusterName) {
+
+ EventMonitorProvider eventMonitorProvider = config != null ? config.getEventMonitor() : null;
+
+ if (eventMonitorProvider == EventMonitorProvider.CLOUDWATCH) {
+ return new CloudwatchEventMonitor(eventMonitorProvider, config, clusterName);
+ }
+
+ throw new IllegalArgumentException("Not implemented Event monitor: " + eventMonitorProvider);
+ }
+}
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/monitoring/EventMonitorPublisher.java b/openmetadata-service/src/main/java/org/openmetadata/service/monitoring/EventMonitorPublisher.java
new file mode 100644
index 00000000000..e4a382d073f
--- /dev/null
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/monitoring/EventMonitorPublisher.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2022 Collate
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openmetadata.service.monitoring;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.util.ArrayList;
+import lombok.extern.slf4j.Slf4j;
+import org.openmetadata.schema.type.ChangeEvent;
+import org.openmetadata.service.Entity;
+import org.openmetadata.service.events.AbstractEventPublisher;
+import org.openmetadata.service.events.errors.EventPublisherException;
+import org.openmetadata.service.resources.events.EventResource;
+
+@Slf4j
+public class EventMonitorPublisher extends AbstractEventPublisher {
+
+ private final EventMonitor eventMonitor;
+
+ public EventMonitorPublisher(EventMonitorConfiguration config, EventMonitor eventMonitor) {
+ super(config.getBatchSize(), new ArrayList<>());
+ this.eventMonitor = eventMonitor;
+ }
+
+ @Override
+ public void publish(EventResource.ChangeEventList events) throws EventPublisherException, JsonProcessingException {
+ for (ChangeEvent event : events.getData()) {
+ String entityType = event.getEntityType();
+ if (Entity.INGESTION_PIPELINE.equals(entityType)) {
+ this.eventMonitor.pushMetric(event);
+ }
+ }
+ }
+
+ @Override
+ public void onStart() {
+ LOG.info("Event Monitor Publisher Started");
+ }
+
+ @Override
+ public void onShutdown() {
+ eventMonitor.close();
+ LOG.info("Event Monitor Publisher Closed");
+ }
+}
diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/monitoring/CloudWatchEventMonitorTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/monitoring/CloudWatchEventMonitorTest.java
new file mode 100644
index 00000000000..d4099c4071c
--- /dev/null
+++ b/openmetadata-service/src/test/java/org/openmetadata/service/monitoring/CloudWatchEventMonitorTest.java
@@ -0,0 +1,159 @@
+/*
+ * Copyright 2022 Collate
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.openmetadata.service.monitoring;
+
+import static org.junit.Assert.assertEquals;
+import static org.openmetadata.service.resources.services.ingestionpipelines.IngestionPipelineResourceTest.DATABASE_METADATA_CONFIG;
+
+import java.time.Instant;
+import java.util.List;
+import java.util.UUID;
+import org.joda.time.DateTime;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.openmetadata.schema.entity.services.ingestionPipelines.AirflowConfig;
+import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
+import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus;
+import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatusType;
+import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineType;
+import org.openmetadata.schema.monitoring.EventMonitorProvider;
+import org.openmetadata.schema.type.ChangeDescription;
+import org.openmetadata.schema.type.ChangeEvent;
+import org.openmetadata.schema.type.EventType;
+import org.openmetadata.schema.type.FieldChange;
+import org.openmetadata.service.Entity;
+import software.amazon.awssdk.services.cloudwatch.model.Dimension;
+import software.amazon.awssdk.services.cloudwatch.model.MetricDatum;
+import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataRequest;
+import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
+
+public class CloudWatchEventMonitorTest {
+
+ private static final String CLUSTER_NAME = "openmetadata";
+ private static final String NAMESPACE = "INGESTION_PIPELINE";
+ private static final String EXPECTED_NAMESPACE = "openmetadata/INGESTION_PIPELINE";
+ private static final String FQN = "service.ingestion";
+ private static EventMonitorConfiguration config;
+
+ private static CloudwatchEventMonitor eventMonitor;
+
+ public static final Long current_ts = System.currentTimeMillis();
+ public static final Instant instant = Instant.ofEpochMilli(current_ts);
+
+ public static final IngestionPipeline INGESTION_PIPELINE =
+ new IngestionPipeline()
+ .withName("ingestion")
+ .withId(UUID.randomUUID())
+ .withPipelineType(PipelineType.METADATA)
+ .withSourceConfig(DATABASE_METADATA_CONFIG)
+ .withAirflowConfig(new AirflowConfig().withStartDate(new DateTime("2022-06-10T15:06:47+00:00").toDate()));
+
+ private ChangeEvent buildChangeEvent(EventType eventType) {
+ return new ChangeEvent()
+ .withEntityType(Entity.INGESTION_PIPELINE)
+ .withEventType(eventType)
+ .withEntityFullyQualifiedName(FQN)
+ .withTimestamp(current_ts)
+ .withEntity(INGESTION_PIPELINE);
+ }
+
+ private Dimension buildDimension(String pipelineType, String fqn) {
+ return Dimension.builder().name(pipelineType).value(fqn).build();
+ }
+
+ @BeforeAll
+ static void setUp() {
+ config = new EventMonitorConfiguration();
+ config.setEventMonitor(EventMonitorProvider.CLOUDWATCH);
+ config.setBatchSize(10);
+ eventMonitor = new CloudwatchEventMonitor(EventMonitorProvider.CLOUDWATCH, config, CLUSTER_NAME);
+ }
+
+ @Test
+ void buildMetricNamespaceTest() {
+ assertEquals(eventMonitor.buildMetricNamespace(NAMESPACE), EXPECTED_NAMESPACE);
+ }
+
+ @Test
+ void buildMetricRequestForCreatedIngestionPipelineTest() {
+ ChangeEvent event = buildChangeEvent(EventType.ENTITY_CREATED);
+ List metricRequests = eventMonitor.buildMetricRequest(event);
+
+ PutMetricDataRequest expectedMetric =
+ PutMetricDataRequest.builder()
+ .namespace(EXPECTED_NAMESPACE)
+ .metricData(
+ MetricDatum.builder()
+ .metricName("INGESTION_PIPELINE_CREATED")
+ .unit(StandardUnit.COUNT)
+ .value(1.0)
+ .timestamp(instant)
+ .dimensions(buildDimension("metadata", FQN))
+ .build())
+ .build();
+
+ assertEquals(metricRequests.get(0), expectedMetric);
+ }
+
+ @Test
+ void buildMetricRequestForDeletedIngestionPipelineTest() {
+ ChangeEvent event = buildChangeEvent(EventType.ENTITY_DELETED);
+ List metricRequests = eventMonitor.buildMetricRequest(event);
+
+ PutMetricDataRequest expectedMetric =
+ PutMetricDataRequest.builder()
+ .namespace(EXPECTED_NAMESPACE)
+ .metricData(
+ MetricDatum.builder()
+ .metricName("INGESTION_PIPELINE_DELETED")
+ .unit(StandardUnit.COUNT)
+ .value(1.0)
+ .timestamp(instant)
+ .dimensions(buildDimension("metadata", FQN))
+ .build())
+ .build();
+
+ assertEquals(metricRequests.get(0), expectedMetric);
+ }
+
+ @Test
+ void buildMetricRequestForUpdatedIngestionPipelineTest() {
+ ChangeEvent event = buildChangeEvent(EventType.ENTITY_UPDATED);
+ event.withChangeDescription(
+ new ChangeDescription()
+ .withFieldsUpdated(
+ List.of(
+ new FieldChange()
+ .withName("pipelineStatus")
+ .withOldValue(null)
+ .withNewValue(new PipelineStatus().withPipelineState(PipelineStatusType.RUNNING)))));
+
+ List metricRequests = eventMonitor.buildMetricRequest(event);
+
+ PutMetricDataRequest expectedMetric =
+ PutMetricDataRequest.builder()
+ .namespace(EXPECTED_NAMESPACE)
+ .metricData(
+ MetricDatum.builder()
+ .metricName("INGESTION_PIPELINE_RUNNING")
+ .unit(StandardUnit.COUNT)
+ .value(1.0)
+ .timestamp(instant)
+ .dimensions(buildDimension("metadata", FQN))
+ .build())
+ .build();
+
+ assertEquals(metricRequests.get(0), expectedMetric);
+ }
+}
diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/monitoring/EventMonitorFactoryTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/monitoring/EventMonitorFactoryTest.java
new file mode 100644
index 00000000000..6b060e72ef0
--- /dev/null
+++ b/openmetadata-service/src/test/java/org/openmetadata/service/monitoring/EventMonitorFactoryTest.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2022 Collate
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openmetadata.service.monitoring;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.openmetadata.schema.monitoring.EventMonitorProvider;
+
+public class EventMonitorFactoryTest {
+
+ private EventMonitorConfiguration config;
+
+ private static final String CLUSTER_NAME = "openmetadata";
+
+ @BeforeEach
+ void setUp() {
+ config = new EventMonitorConfiguration();
+ config.setParameters(new HashMap<>());
+ }
+
+ @Test
+ void testIsCreatedItCloudwatchEventMonitor() {
+ config.setEventMonitor(EventMonitorProvider.CLOUDWATCH);
+ assertTrue(EventMonitorFactory.createEventMonitor(config, CLUSTER_NAME) instanceof CloudwatchEventMonitor);
+ }
+}
diff --git a/openmetadata-spec/src/main/resources/json/schema/monitoring/eventMonitorProvider.json b/openmetadata-spec/src/main/resources/json/schema/monitoring/eventMonitorProvider.json
new file mode 100644
index 00000000000..72747f75442
--- /dev/null
+++ b/openmetadata-spec/src/main/resources/json/schema/monitoring/eventMonitorProvider.json
@@ -0,0 +1,10 @@
+{
+ "$id": "https://open-metadata.org/schema/monitoring/eventMonitorProvider.json",
+ "$schema": "http://json-schema.org/draft-07/schema#",
+ "title": "Event Monitor Provider",
+ "description": "OpenMetadata Event Monitor Provider.",
+ "type": "string",
+ "javaType": "org.openmetadata.schema.monitoring.EventMonitorProvider",
+ "enum": ["cloudwatch"],
+ "additionalProperties": false
+}