mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-10-12 17:26:43 +00:00
* Prep monitoring skeleton * Fix naming * Push Ingestion Pipeline status to cloudwatch * Review comments and tests * import * Remove singleton object * Format
This commit is contained in:
parent
ba1e405427
commit
dda8f8054e
@ -170,6 +170,15 @@ elasticsearch:
|
|||||||
socketTimeoutSecs: ${ELASTICSEARCH_SOCKET_TIMEOUT_SECS:-60}
|
socketTimeoutSecs: ${ELASTICSEARCH_SOCKET_TIMEOUT_SECS:-60}
|
||||||
batchSize: ${ELASTICSEARCH_BATCH_SIZE:-10}
|
batchSize: ${ELASTICSEARCH_BATCH_SIZE:-10}
|
||||||
|
|
||||||
|
# eventMonitoringConfiguration:
|
||||||
|
# eventMonitor: ${EVENT_MONITOR:-cloudwatch} # Possible values are "cloudwatch"
|
||||||
|
# batchSize: ${EVENT_MONITOR_BATCH_SIZE:-10}
|
||||||
|
# it will use the default auth provider for AWS services if parameters are not set
|
||||||
|
# parameters:
|
||||||
|
# region: ${OM_MONITOR_REGION:-""}
|
||||||
|
# accessKeyId: ${OM_MONITOR_ACCESS_KEY_ID:-""}
|
||||||
|
# secretAccessKey: ${OM_MONITOR_ACCESS_KEY:-""}
|
||||||
|
|
||||||
eventHandlerConfiguration:
|
eventHandlerConfiguration:
|
||||||
eventHandlerClassNames:
|
eventHandlerClassNames:
|
||||||
- "org.openmetadata.service.events.AuditEventHandler"
|
- "org.openmetadata.service.events.AuditEventHandler"
|
||||||
|
@ -237,6 +237,13 @@
|
|||||||
<version>${awssdk.version}</version>
|
<version>${awssdk.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<!-- Dependencies for cloudwatch monitoring -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>software.amazon.awssdk</groupId>
|
||||||
|
<artifactId>cloudwatch</artifactId>
|
||||||
|
<version>${awssdk.version}</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<!--test dependencies-->
|
<!--test dependencies-->
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.glassfish.jersey.core</groupId>
|
<groupId>org.glassfish.jersey.core</groupId>
|
||||||
|
@ -73,6 +73,9 @@ import org.openmetadata.service.jdbi3.CollectionDAO;
|
|||||||
import org.openmetadata.service.jdbi3.locator.ConnectionAwareAnnotationSqlLocator;
|
import org.openmetadata.service.jdbi3.locator.ConnectionAwareAnnotationSqlLocator;
|
||||||
import org.openmetadata.service.migration.Migration;
|
import org.openmetadata.service.migration.Migration;
|
||||||
import org.openmetadata.service.migration.MigrationConfiguration;
|
import org.openmetadata.service.migration.MigrationConfiguration;
|
||||||
|
import org.openmetadata.service.monitoring.EventMonitor;
|
||||||
|
import org.openmetadata.service.monitoring.EventMonitorFactory;
|
||||||
|
import org.openmetadata.service.monitoring.EventMonitorPublisher;
|
||||||
import org.openmetadata.service.resources.CollectionRegistry;
|
import org.openmetadata.service.resources.CollectionRegistry;
|
||||||
import org.openmetadata.service.secrets.SecretsManager;
|
import org.openmetadata.service.secrets.SecretsManager;
|
||||||
import org.openmetadata.service.secrets.SecretsManagerFactory;
|
import org.openmetadata.service.secrets.SecretsManagerFactory;
|
||||||
@ -303,6 +306,16 @@ public class OpenMetadataApplication extends Application<OpenMetadataApplication
|
|||||||
openMetadataApplicationConfig.getElasticSearchConfiguration(), jdbi.onDemand(CollectionDAO.class));
|
openMetadataApplicationConfig.getElasticSearchConfiguration(), jdbi.onDemand(CollectionDAO.class));
|
||||||
EventPubSub.addEventHandler(elasticSearchEventPublisher);
|
EventPubSub.addEventHandler(elasticSearchEventPublisher);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (openMetadataApplicationConfig.getEventMonitorConfiguration() != null) {
|
||||||
|
final EventMonitor eventMonitor =
|
||||||
|
EventMonitorFactory.createEventMonitor(
|
||||||
|
openMetadataApplicationConfig.getEventMonitorConfiguration(),
|
||||||
|
openMetadataApplicationConfig.getClusterName());
|
||||||
|
EventMonitorPublisher eventMonitorPublisher =
|
||||||
|
new EventMonitorPublisher(openMetadataApplicationConfig.getEventMonitorConfiguration(), eventMonitor);
|
||||||
|
EventPubSub.addEventHandler(eventMonitorPublisher);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void registerResources(OpenMetadataApplicationConfig config, Environment environment, Jdbi jdbi) {
|
private void registerResources(OpenMetadataApplicationConfig config, Environment environment, Jdbi jdbi) {
|
||||||
|
@ -33,6 +33,7 @@ import org.openmetadata.schema.api.security.jwt.JWTTokenConfiguration;
|
|||||||
import org.openmetadata.schema.api.slackChat.SlackChatConfiguration;
|
import org.openmetadata.schema.api.slackChat.SlackChatConfiguration;
|
||||||
import org.openmetadata.schema.email.SmtpSettings;
|
import org.openmetadata.schema.email.SmtpSettings;
|
||||||
import org.openmetadata.service.migration.MigrationConfiguration;
|
import org.openmetadata.service.migration.MigrationConfiguration;
|
||||||
|
import org.openmetadata.service.monitoring.EventMonitorConfiguration;
|
||||||
import org.openmetadata.service.secrets.SecretsManagerConfiguration;
|
import org.openmetadata.service.secrets.SecretsManagerConfiguration;
|
||||||
|
|
||||||
@Getter
|
@Getter
|
||||||
@ -87,6 +88,9 @@ public class OpenMetadataApplicationConfig extends Configuration {
|
|||||||
@JsonProperty("secretsManagerConfiguration")
|
@JsonProperty("secretsManagerConfiguration")
|
||||||
private SecretsManagerConfiguration secretsManagerConfiguration;
|
private SecretsManagerConfiguration secretsManagerConfiguration;
|
||||||
|
|
||||||
|
@JsonProperty("eventMonitoringConfiguration")
|
||||||
|
private EventMonitorConfiguration eventMonitorConfiguration;
|
||||||
|
|
||||||
@JsonProperty("clusterName")
|
@JsonProperty("clusterName")
|
||||||
private String clusterName;
|
private String clusterName;
|
||||||
|
|
||||||
|
@ -147,7 +147,7 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
|
|||||||
|
|
||||||
private ChangeDescription addPipelineStatusChangeDescription(Double version, Object newValue, Object oldValue) {
|
private ChangeDescription addPipelineStatusChangeDescription(Double version, Object newValue, Object oldValue) {
|
||||||
FieldChange fieldChange =
|
FieldChange fieldChange =
|
||||||
new FieldChange().withName("testCaseResult").withNewValue(newValue).withOldValue(oldValue);
|
new FieldChange().withName("pipelineStatus").withNewValue(newValue).withOldValue(oldValue);
|
||||||
ChangeDescription change = new ChangeDescription().withPreviousVersion(version);
|
ChangeDescription change = new ChangeDescription().withPreviousVersion(version);
|
||||||
change.getFieldsUpdated().add(fieldChange);
|
change.getFieldsUpdated().add(fieldChange);
|
||||||
return change;
|
return change;
|
||||||
|
@ -0,0 +1,173 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2022 Collate
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.openmetadata.service.monitoring;
|
||||||
|
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.logging.log4j.util.Strings;
|
||||||
|
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
|
||||||
|
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus;
|
||||||
|
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatusType;
|
||||||
|
import org.openmetadata.schema.monitoring.EventMonitorProvider;
|
||||||
|
import org.openmetadata.schema.type.ChangeDescription;
|
||||||
|
import org.openmetadata.schema.type.ChangeEvent;
|
||||||
|
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
|
||||||
|
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
|
||||||
|
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
|
||||||
|
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
|
||||||
|
import software.amazon.awssdk.regions.Region;
|
||||||
|
import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;
|
||||||
|
import software.amazon.awssdk.services.cloudwatch.model.CloudWatchException;
|
||||||
|
import software.amazon.awssdk.services.cloudwatch.model.Dimension;
|
||||||
|
import software.amazon.awssdk.services.cloudwatch.model.MetricDatum;
|
||||||
|
import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataRequest;
|
||||||
|
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
public class CloudwatchEventMonitor extends EventMonitor {
|
||||||
|
|
||||||
|
public static final String ACCESS_KEY_ID = "accessKeyId";
|
||||||
|
public static final String SECRET_ACCESS_KEY = "secretAccessKey";
|
||||||
|
public static final String REGION = "region";
|
||||||
|
|
||||||
|
public static final String INGESTION_PIPELINE_CREATED = "INGESTION_PIPELINE_CREATED";
|
||||||
|
public static final String INGESTION_PIPELINE_UPDATED = "INGESTION_PIPELINE_";
|
||||||
|
public static final String INGESTION_PIPELINE_DELETED = "INGESTION_PIPELINE_DELETED";
|
||||||
|
public static final String NAMESPACE = "INGESTION_PIPELINE";
|
||||||
|
public static final String PIPELINE_STATUS = "pipelineStatus";
|
||||||
|
|
||||||
|
private final CloudWatchClient client;
|
||||||
|
|
||||||
|
private static CloudwatchEventMonitor INSTANCE;
|
||||||
|
|
||||||
|
public CloudwatchEventMonitor(
|
||||||
|
EventMonitorProvider eventMonitorProvider, EventMonitorConfiguration config, String clusterPrefix) {
|
||||||
|
super(eventMonitorProvider, config, clusterPrefix);
|
||||||
|
|
||||||
|
if (config != null
|
||||||
|
&& config.getParameters() != null
|
||||||
|
&& !Strings.isBlank(config.getParameters().getOrDefault(REGION, ""))) {
|
||||||
|
String region = config.getParameters().getOrDefault(REGION, "");
|
||||||
|
String accessKeyId = config.getParameters().getOrDefault(ACCESS_KEY_ID, "");
|
||||||
|
String secretAccessKey = config.getParameters().getOrDefault(SECRET_ACCESS_KEY, "");
|
||||||
|
AwsCredentialsProvider credentialsProvider;
|
||||||
|
if (Strings.isBlank(accessKeyId) && Strings.isBlank(secretAccessKey)) {
|
||||||
|
credentialsProvider = DefaultCredentialsProvider.create();
|
||||||
|
} else {
|
||||||
|
credentialsProvider =
|
||||||
|
StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKeyId, secretAccessKey));
|
||||||
|
}
|
||||||
|
this.client =
|
||||||
|
CloudWatchClient.builder().region(Region.of(region)).credentialsProvider(credentialsProvider).build();
|
||||||
|
} else {
|
||||||
|
this.client = CloudWatchClient.create();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* We want to control the lifecycle of an Ingestion Pipeline. We will push metrics for: 1. eventType "entityCreated":
|
||||||
|
* log when a pipeline was first created. Push the FQN and timestamp 2. eventType "entityUpdated": log when there is a
|
||||||
|
* `pipelineStatus` change with the status type 3. eventType "entityDeleted": log when an ingestionPipeline is removed
|
||||||
|
*
|
||||||
|
* @param event ChangeEvent for an IngestionPipeline
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected void pushMetric(ChangeEvent event) {
|
||||||
|
List<PutMetricDataRequest> requests = buildMetricRequest(event);
|
||||||
|
requests.forEach(client::putMetricData);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected List<PutMetricDataRequest> buildMetricRequest(ChangeEvent event) {
|
||||||
|
String fqn = event.getEntityFullyQualifiedName();
|
||||||
|
IngestionPipeline ingestionPipeline = (IngestionPipeline) event.getEntity();
|
||||||
|
String pipelineType = ingestionPipeline.getPipelineType().toString();
|
||||||
|
Long timestamp = event.getTimestamp();
|
||||||
|
|
||||||
|
List<PutMetricDataRequest> metricRequests = Collections.emptyList();
|
||||||
|
|
||||||
|
try {
|
||||||
|
switch (event.getEventType()) {
|
||||||
|
case ENTITY_CREATED:
|
||||||
|
metricRequests = List.of(logPipelineCreated(fqn, pipelineType, timestamp));
|
||||||
|
break;
|
||||||
|
case ENTITY_UPDATED:
|
||||||
|
// we can have multiple updates bundled together
|
||||||
|
metricRequests = logPipelineUpdated(fqn, pipelineType, timestamp, event.getChangeDescription());
|
||||||
|
break;
|
||||||
|
case ENTITY_DELETED:
|
||||||
|
case ENTITY_SOFT_DELETED:
|
||||||
|
metricRequests = List.of(logPipelineDeleted(fqn, pipelineType, timestamp));
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw new IllegalArgumentException("Invalid EventType " + event.getEventType());
|
||||||
|
}
|
||||||
|
} catch (IllegalArgumentException | CloudWatchException e) {
|
||||||
|
LOG.error("Failed to publish IngestionPipeline Cloudwatch metric due to " + e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
return metricRequests;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected PutMetricDataRequest logPipelineCreated(String fqn, String pipelineType, Long timestamp) {
|
||||||
|
return logPipelineStatus(fqn, pipelineType, timestamp, INGESTION_PIPELINE_CREATED);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected PutMetricDataRequest logPipelineDeleted(String fqn, String pipelineType, Long timestamp) {
|
||||||
|
return logPipelineStatus(fqn, pipelineType, timestamp, INGESTION_PIPELINE_DELETED);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected List<PutMetricDataRequest> logPipelineUpdated(
|
||||||
|
String fqn, String pipelineType, Long timestamp, ChangeDescription changeDescription) {
|
||||||
|
return changeDescription.getFieldsUpdated().stream()
|
||||||
|
.map(
|
||||||
|
change -> {
|
||||||
|
if (change.getName().equals(PIPELINE_STATUS) && change.getNewValue() != null) {
|
||||||
|
PipelineStatus pipelineStatus = (PipelineStatus) change.getNewValue();
|
||||||
|
return logPipelineStatus(
|
||||||
|
fqn, pipelineType, timestamp, getMetricNameByStatus(pipelineStatus.getPipelineState()));
|
||||||
|
} else {
|
||||||
|
LOG.debug("Ignoring Ingestion Pipeline change type " + change.getName());
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
})
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getMetricNameByStatus(PipelineStatusType statusType) {
|
||||||
|
return INGESTION_PIPELINE_UPDATED + statusType.toString().toUpperCase();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected PutMetricDataRequest logPipelineStatus(String fqn, String pipelineType, Long timestamp, String metricName) {
|
||||||
|
Dimension dimension = Dimension.builder().name(pipelineType).value(fqn).build();
|
||||||
|
Instant instant = Instant.ofEpochMilli(timestamp);
|
||||||
|
|
||||||
|
MetricDatum datum =
|
||||||
|
MetricDatum.builder()
|
||||||
|
.metricName(metricName)
|
||||||
|
.unit(StandardUnit.COUNT)
|
||||||
|
.value(1.0)
|
||||||
|
.timestamp(instant)
|
||||||
|
.dimensions(dimension)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
return PutMetricDataRequest.builder().namespace(buildMetricNamespace(NAMESPACE)).metricData(datum).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void close() {
|
||||||
|
client.close();
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,40 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2022 Collate
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.openmetadata.service.monitoring;
|
||||||
|
|
||||||
|
import lombok.Getter;
|
||||||
|
import org.openmetadata.schema.monitoring.EventMonitorProvider;
|
||||||
|
import org.openmetadata.schema.type.ChangeEvent;
|
||||||
|
|
||||||
|
public abstract class EventMonitor {
|
||||||
|
|
||||||
|
@Getter private final String clusterPrefix;
|
||||||
|
|
||||||
|
@Getter private final EventMonitorProvider eventMonitoringProvider;
|
||||||
|
@Getter private final EventMonitorConfiguration eventMonitorConfiguration;
|
||||||
|
|
||||||
|
protected EventMonitor(
|
||||||
|
EventMonitorProvider eventMonitorProvider, EventMonitorConfiguration config, String clusterPrefix) {
|
||||||
|
this.eventMonitoringProvider = eventMonitorProvider;
|
||||||
|
this.clusterPrefix = clusterPrefix;
|
||||||
|
this.eventMonitorConfiguration = config;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String buildMetricNamespace(String namespace) {
|
||||||
|
return String.format("%s/%s", this.clusterPrefix, namespace);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract void pushMetric(ChangeEvent event);
|
||||||
|
|
||||||
|
protected abstract void close();
|
||||||
|
}
|
@ -0,0 +1,30 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2022 Collate
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.openmetadata.service.monitoring;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import lombok.Getter;
|
||||||
|
import lombok.Setter;
|
||||||
|
import org.openmetadata.schema.monitoring.EventMonitorProvider;
|
||||||
|
|
||||||
|
@Getter
|
||||||
|
@Setter
|
||||||
|
public class EventMonitorConfiguration {
|
||||||
|
|
||||||
|
private EventMonitorProvider eventMonitor;
|
||||||
|
|
||||||
|
private int batchSize;
|
||||||
|
|
||||||
|
private Map<String, String> parameters;
|
||||||
|
}
|
@ -0,0 +1,29 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2022 Collate
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.openmetadata.service.monitoring;
|
||||||
|
|
||||||
|
import org.openmetadata.schema.monitoring.EventMonitorProvider;
|
||||||
|
|
||||||
|
public class EventMonitorFactory {
|
||||||
|
|
||||||
|
public static EventMonitor createEventMonitor(EventMonitorConfiguration config, String clusterName) {
|
||||||
|
|
||||||
|
EventMonitorProvider eventMonitorProvider = config != null ? config.getEventMonitor() : null;
|
||||||
|
|
||||||
|
if (eventMonitorProvider == EventMonitorProvider.CLOUDWATCH) {
|
||||||
|
return new CloudwatchEventMonitor(eventMonitorProvider, config, clusterName);
|
||||||
|
}
|
||||||
|
|
||||||
|
throw new IllegalArgumentException("Not implemented Event monitor: " + eventMonitorProvider);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,54 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2022 Collate
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.openmetadata.service.monitoring;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.openmetadata.schema.type.ChangeEvent;
|
||||||
|
import org.openmetadata.service.Entity;
|
||||||
|
import org.openmetadata.service.events.AbstractEventPublisher;
|
||||||
|
import org.openmetadata.service.events.errors.EventPublisherException;
|
||||||
|
import org.openmetadata.service.resources.events.EventResource;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
public class EventMonitorPublisher extends AbstractEventPublisher {
|
||||||
|
|
||||||
|
private final EventMonitor eventMonitor;
|
||||||
|
|
||||||
|
public EventMonitorPublisher(EventMonitorConfiguration config, EventMonitor eventMonitor) {
|
||||||
|
super(config.getBatchSize(), new ArrayList<>());
|
||||||
|
this.eventMonitor = eventMonitor;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void publish(EventResource.ChangeEventList events) throws EventPublisherException, JsonProcessingException {
|
||||||
|
for (ChangeEvent event : events.getData()) {
|
||||||
|
String entityType = event.getEntityType();
|
||||||
|
if (Entity.INGESTION_PIPELINE.equals(entityType)) {
|
||||||
|
this.eventMonitor.pushMetric(event);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onStart() {
|
||||||
|
LOG.info("Event Monitor Publisher Started");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onShutdown() {
|
||||||
|
eventMonitor.close();
|
||||||
|
LOG.info("Event Monitor Publisher Closed");
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,159 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2022 Collate
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.openmetadata.service.monitoring;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.openmetadata.service.resources.services.ingestionpipelines.IngestionPipelineResourceTest.DATABASE_METADATA_CONFIG;
|
||||||
|
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.UUID;
|
||||||
|
import org.joda.time.DateTime;
|
||||||
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.openmetadata.schema.entity.services.ingestionPipelines.AirflowConfig;
|
||||||
|
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
|
||||||
|
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus;
|
||||||
|
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatusType;
|
||||||
|
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineType;
|
||||||
|
import org.openmetadata.schema.monitoring.EventMonitorProvider;
|
||||||
|
import org.openmetadata.schema.type.ChangeDescription;
|
||||||
|
import org.openmetadata.schema.type.ChangeEvent;
|
||||||
|
import org.openmetadata.schema.type.EventType;
|
||||||
|
import org.openmetadata.schema.type.FieldChange;
|
||||||
|
import org.openmetadata.service.Entity;
|
||||||
|
import software.amazon.awssdk.services.cloudwatch.model.Dimension;
|
||||||
|
import software.amazon.awssdk.services.cloudwatch.model.MetricDatum;
|
||||||
|
import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataRequest;
|
||||||
|
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
|
||||||
|
|
||||||
|
public class CloudWatchEventMonitorTest {
|
||||||
|
|
||||||
|
private static final String CLUSTER_NAME = "openmetadata";
|
||||||
|
private static final String NAMESPACE = "INGESTION_PIPELINE";
|
||||||
|
private static final String EXPECTED_NAMESPACE = "openmetadata/INGESTION_PIPELINE";
|
||||||
|
private static final String FQN = "service.ingestion";
|
||||||
|
private static EventMonitorConfiguration config;
|
||||||
|
|
||||||
|
private static CloudwatchEventMonitor eventMonitor;
|
||||||
|
|
||||||
|
public static final Long current_ts = System.currentTimeMillis();
|
||||||
|
public static final Instant instant = Instant.ofEpochMilli(current_ts);
|
||||||
|
|
||||||
|
public static final IngestionPipeline INGESTION_PIPELINE =
|
||||||
|
new IngestionPipeline()
|
||||||
|
.withName("ingestion")
|
||||||
|
.withId(UUID.randomUUID())
|
||||||
|
.withPipelineType(PipelineType.METADATA)
|
||||||
|
.withSourceConfig(DATABASE_METADATA_CONFIG)
|
||||||
|
.withAirflowConfig(new AirflowConfig().withStartDate(new DateTime("2022-06-10T15:06:47+00:00").toDate()));
|
||||||
|
|
||||||
|
private ChangeEvent buildChangeEvent(EventType eventType) {
|
||||||
|
return new ChangeEvent()
|
||||||
|
.withEntityType(Entity.INGESTION_PIPELINE)
|
||||||
|
.withEventType(eventType)
|
||||||
|
.withEntityFullyQualifiedName(FQN)
|
||||||
|
.withTimestamp(current_ts)
|
||||||
|
.withEntity(INGESTION_PIPELINE);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Dimension buildDimension(String pipelineType, String fqn) {
|
||||||
|
return Dimension.builder().name(pipelineType).value(fqn).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@BeforeAll
|
||||||
|
static void setUp() {
|
||||||
|
config = new EventMonitorConfiguration();
|
||||||
|
config.setEventMonitor(EventMonitorProvider.CLOUDWATCH);
|
||||||
|
config.setBatchSize(10);
|
||||||
|
eventMonitor = new CloudwatchEventMonitor(EventMonitorProvider.CLOUDWATCH, config, CLUSTER_NAME);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void buildMetricNamespaceTest() {
|
||||||
|
assertEquals(eventMonitor.buildMetricNamespace(NAMESPACE), EXPECTED_NAMESPACE);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void buildMetricRequestForCreatedIngestionPipelineTest() {
|
||||||
|
ChangeEvent event = buildChangeEvent(EventType.ENTITY_CREATED);
|
||||||
|
List<PutMetricDataRequest> metricRequests = eventMonitor.buildMetricRequest(event);
|
||||||
|
|
||||||
|
PutMetricDataRequest expectedMetric =
|
||||||
|
PutMetricDataRequest.builder()
|
||||||
|
.namespace(EXPECTED_NAMESPACE)
|
||||||
|
.metricData(
|
||||||
|
MetricDatum.builder()
|
||||||
|
.metricName("INGESTION_PIPELINE_CREATED")
|
||||||
|
.unit(StandardUnit.COUNT)
|
||||||
|
.value(1.0)
|
||||||
|
.timestamp(instant)
|
||||||
|
.dimensions(buildDimension("metadata", FQN))
|
||||||
|
.build())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
assertEquals(metricRequests.get(0), expectedMetric);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void buildMetricRequestForDeletedIngestionPipelineTest() {
|
||||||
|
ChangeEvent event = buildChangeEvent(EventType.ENTITY_DELETED);
|
||||||
|
List<PutMetricDataRequest> metricRequests = eventMonitor.buildMetricRequest(event);
|
||||||
|
|
||||||
|
PutMetricDataRequest expectedMetric =
|
||||||
|
PutMetricDataRequest.builder()
|
||||||
|
.namespace(EXPECTED_NAMESPACE)
|
||||||
|
.metricData(
|
||||||
|
MetricDatum.builder()
|
||||||
|
.metricName("INGESTION_PIPELINE_DELETED")
|
||||||
|
.unit(StandardUnit.COUNT)
|
||||||
|
.value(1.0)
|
||||||
|
.timestamp(instant)
|
||||||
|
.dimensions(buildDimension("metadata", FQN))
|
||||||
|
.build())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
assertEquals(metricRequests.get(0), expectedMetric);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void buildMetricRequestForUpdatedIngestionPipelineTest() {
|
||||||
|
ChangeEvent event = buildChangeEvent(EventType.ENTITY_UPDATED);
|
||||||
|
event.withChangeDescription(
|
||||||
|
new ChangeDescription()
|
||||||
|
.withFieldsUpdated(
|
||||||
|
List.of(
|
||||||
|
new FieldChange()
|
||||||
|
.withName("pipelineStatus")
|
||||||
|
.withOldValue(null)
|
||||||
|
.withNewValue(new PipelineStatus().withPipelineState(PipelineStatusType.RUNNING)))));
|
||||||
|
|
||||||
|
List<PutMetricDataRequest> metricRequests = eventMonitor.buildMetricRequest(event);
|
||||||
|
|
||||||
|
PutMetricDataRequest expectedMetric =
|
||||||
|
PutMetricDataRequest.builder()
|
||||||
|
.namespace(EXPECTED_NAMESPACE)
|
||||||
|
.metricData(
|
||||||
|
MetricDatum.builder()
|
||||||
|
.metricName("INGESTION_PIPELINE_RUNNING")
|
||||||
|
.unit(StandardUnit.COUNT)
|
||||||
|
.value(1.0)
|
||||||
|
.timestamp(instant)
|
||||||
|
.dimensions(buildDimension("metadata", FQN))
|
||||||
|
.build())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
assertEquals(metricRequests.get(0), expectedMetric);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,39 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2022 Collate
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.openmetadata.service.monitoring;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.openmetadata.schema.monitoring.EventMonitorProvider;
|
||||||
|
|
||||||
|
public class EventMonitorFactoryTest {
|
||||||
|
|
||||||
|
private EventMonitorConfiguration config;
|
||||||
|
|
||||||
|
private static final String CLUSTER_NAME = "openmetadata";
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
void setUp() {
|
||||||
|
config = new EventMonitorConfiguration();
|
||||||
|
config.setParameters(new HashMap<>());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testIsCreatedItCloudwatchEventMonitor() {
|
||||||
|
config.setEventMonitor(EventMonitorProvider.CLOUDWATCH);
|
||||||
|
assertTrue(EventMonitorFactory.createEventMonitor(config, CLUSTER_NAME) instanceof CloudwatchEventMonitor);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,10 @@
|
|||||||
|
{
|
||||||
|
"$id": "https://open-metadata.org/schema/monitoring/eventMonitorProvider.json",
|
||||||
|
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||||
|
"title": "Event Monitor Provider",
|
||||||
|
"description": "OpenMetadata Event Monitor Provider.",
|
||||||
|
"type": "string",
|
||||||
|
"javaType": "org.openmetadata.schema.monitoring.EventMonitorProvider",
|
||||||
|
"enum": ["cloudwatch"],
|
||||||
|
"additionalProperties": false
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user