Alert improvements (#9374)

* AlertAction update sould apply to all alert using alertActions + added api to get all action of alert with status

* update endpoint

* Added matchAnyField Function + removed usageSummary from activityFeed
This commit is contained in:
Mohit Yadav 2022-12-17 17:48:08 +05:30 committed by GitHub
parent e1d347c714
commit c7591eefc2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 329 additions and 174 deletions

View File

@ -15,7 +15,6 @@ package org.openmetadata.client.gateway;
import com.fasterxml.jackson.annotation.JsonInclude;
import feign.Feign;
import feign.RequestInterceptor;
import feign.form.FormEncoder;
import feign.jackson.JacksonDecoder;
import feign.jackson.JacksonEncoder;
@ -83,13 +82,6 @@ public class OpenMetadata {
apiClient.addAuthorization(REQUEST_INTERCEPTOR_KEY, newInterceptor);
}
public void addRequestInterceptor(String requestInterceptorKey, RequestInterceptor interceptor) {
if (apiClient.getApiAuthorizations().containsKey(requestInterceptorKey)) {
LOG.info("Interceptor with this key already exists");
}
apiClient.addAuthorization(requestInterceptorKey, interceptor);
}
public void validateVersion() {
String[] clientVersion = getClientVersion();
String[] serverVersion = getServerVersion();

View File

@ -1,4 +1,4 @@
package org.openmetadata.client.listutils;
package org.openmetadata.client.listUtils;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

View File

@ -122,14 +122,18 @@ public class AlertUtil {
}
public static boolean evaluateAlertConditions(ChangeEvent changeEvent, List<AlertFilterRule> alertFilterRules) {
boolean result;
String completeCondition = buildCompleteCondition(alertFilterRules);
AlertsRuleEvaluator ruleEvaluator = new AlertsRuleEvaluator(changeEvent);
StandardEvaluationContext evaluationContext = new StandardEvaluationContext(ruleEvaluator);
Expression expression = parseExpression(completeCondition);
result = Boolean.TRUE.equals(expression.getValue(evaluationContext, Boolean.class));
LOG.debug("Alert evaluated as Result : {}", result);
return result;
if (alertFilterRules.size() > 0) {
boolean result;
String completeCondition = buildCompleteCondition(alertFilterRules);
AlertsRuleEvaluator ruleEvaluator = new AlertsRuleEvaluator(changeEvent);
StandardEvaluationContext evaluationContext = new StandardEvaluationContext(ruleEvaluator);
Expression expression = parseExpression(completeCondition);
result = Boolean.TRUE.equals(expression.getValue(evaluationContext, Boolean.class));
LOG.debug("Alert evaluated as Result : {}", result);
return result;
} else {
return true;
}
}
public static String buildCompleteCondition(List<AlertFilterRule> alertFilterRules) {

View File

@ -64,7 +64,7 @@ public class AlertsActionPublisher extends AbstractAlertPublisher {
AlertActionStatus currentStatus =
new AlertActionStatus()
.withStatus(AlertActionStatus.Status.ACTIVE)
.withFailureDetails(new FailureDetails())
.withFailureDetails(null)
.withTimestamp(System.currentTimeMillis());
setStatus(currentStatus);
onStartDelegate();
@ -87,41 +87,41 @@ public class AlertsActionPublisher extends AbstractAlertPublisher {
return alertAction;
}
protected void setErrorStatus(Long attemptTime, Integer statusCode, String reason)
protected void setErrorStatus(Long updateTime, Integer statusCode, String reason)
throws IOException, InterruptedException {
if (!attemptTime.equals(alertAction.getStatusDetails().getFailureDetails().getLastFailedAt())) {
setStatus(AlertActionStatus.Status.FAILED, attemptTime, statusCode, reason, null);
}
alertRepository.deleteAlertActionPublisher(alert.getId(), alertAction);
setStatus(AlertActionStatus.Status.FAILED, updateTime, statusCode, reason, null);
AlertsPublisherManager.getInstance().deleteAlertActionPublisher(alert.getId(), alertAction);
throw new RuntimeException(reason);
}
protected void setAwaitingRetry(Long attemptTime, int statusCode, String reason) throws IOException {
if (!attemptTime.equals(alertAction.getStatusDetails().getFailureDetails().getLastFailedAt())) {
setStatus(
AlertActionStatus.Status.AWAITING_RETRY, attemptTime, statusCode, reason, attemptTime + currentBackoffTime);
}
protected void setAwaitingRetry(Long updateTime, int statusCode, String reason) throws IOException {
setStatus(AlertActionStatus.Status.AWAITING_RETRY, updateTime, statusCode, reason, updateTime + currentBackoffTime);
}
protected void setSuccessStatus(Long updateTime) throws IOException {
setStatus(AlertActionStatus.Status.ACTIVE, updateTime, 200, null, null);
}
protected void setStatus(
AlertActionStatus.Status status, Long attemptTime, Integer statusCode, String reason, Long timestamp)
AlertActionStatus.Status status, Long updateTime, Integer statusCode, String reason, Long nextAttemptTime)
throws IOException {
FailureDetails details =
alertAction.getStatusDetails().getFailureDetails() != null
? alertAction.getStatusDetails().getFailureDetails()
: new FailureDetails();
details
.withLastFailedAt(attemptTime)
.withLastFailedStatusCode(statusCode)
.withLastFailedReason(reason)
.withNextAttempt(timestamp);
FailureDetails details = null;
if (status != AlertActionStatus.Status.ACTIVE) {
details = new FailureDetails();
details
.withLastFailedAt(updateTime)
.withLastFailedStatusCode(statusCode)
.withLastFailedReason(reason)
.withNextAttempt(nextAttemptTime);
}
AlertActionStatus currentStatus =
alertRepository.setStatus(alert.getId(), alertAction.getId(), status, details).withTimestamp(attemptTime);
AlertsPublisherManager.getInstance().setStatus(alert.getId(), alertAction.getId(), status, details, updateTime);
alertAction.setStatusDetails(currentStatus);
}
protected void setStatus(AlertActionStatus status) throws IOException {
alertRepository.setStatus(alert.getId(), alertAction.getId(), status);
AlertsPublisherManager.getInstance().setStatus(alert.getId(), alertAction.getId(), status);
alertAction.setStatusDetails(status);
}
@ -152,11 +152,6 @@ public class AlertsActionPublisher extends AbstractAlertPublisher {
try {
LOG.info("Sending Alert {}:{}:{}", alert.getName(), alertAction.getStatusDetails().getStatus(), batch.size());
sendAlert(event);
// Successfully sent Alert, update Status
alertAction.getStatusDetails().getFailureDetails().setLastSuccessfulAt(System.currentTimeMillis());
if (alertAction.getStatusDetails().getStatus() != AlertActionStatus.Status.ACTIVE) {
setStatus(AlertActionStatus.Status.ACTIVE, System.currentTimeMillis(), null, null, null);
}
} catch (Exception ex) {
LOG.warn("Invalid Exception in Alert {}", alert.getName());
setErrorStatus(

View File

@ -0,0 +1,182 @@
package org.openmetadata.service.alerts;
import static org.openmetadata.schema.type.Relationship.USES;
import static org.openmetadata.service.Entity.ALERT;
import static org.openmetadata.service.Entity.ALERT_ACTION;
import com.lmax.disruptor.BatchEventProcessor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.entity.alerts.Alert;
import org.openmetadata.schema.entity.alerts.AlertAction;
import org.openmetadata.schema.entity.alerts.AlertActionStatus;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.FailureDetails;
import org.openmetadata.service.Entity;
import org.openmetadata.service.events.EventPubSub;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.util.JsonUtils;
@Slf4j
public class AlertsPublisherManager {
// Alert is mapped to different publisher
private static CollectionDAO daoCollection;
private static AlertsPublisherManager INSTANCE;
private static volatile boolean INITIALIZED = false;
// <Alert, <AlertAction, Publisher>>
private static final ConcurrentHashMap<UUID, Map<UUID, AlertsActionPublisher>> alertPublisherMap =
new ConcurrentHashMap<>();
public static void initialize(CollectionDAO dao) {
if (!INITIALIZED) {
daoCollection = dao;
INSTANCE = new AlertsPublisherManager();
INITIALIZED = true;
} else {
INITIALIZED = false;
LOG.info("Email Util is already initialized");
}
}
public static AlertsPublisherManager getInstance() {
return INSTANCE;
}
public List<AlertAction> getAllAlertActions(UUID id) {
List<AlertAction> alertActions = new ArrayList<>();
if (alertActions != null) {
for (AlertsActionPublisher publisher : alertPublisherMap.get(id).values()) {
alertActions.add(publisher.getAlertAction());
}
}
return alertActions;
}
public void addAlertActionPublishers(Alert alert) throws IOException {
EntityRepository<AlertAction> alertActionEntityRepository = Entity.getEntityRepository(ALERT_ACTION);
for (EntityReference alertActionRef : alert.getAlertActions()) {
AlertAction action =
alertActionEntityRepository.get(null, alertActionRef.getId(), alertActionEntityRepository.getFields("*"));
addAlertActionPublisher(alert, action);
}
}
public void addAlertActionPublisher(Alert alert, AlertAction alertAction) throws IOException {
if (Boolean.FALSE.equals(alertAction.getEnabled())) {
// Only add alert that is enabled for publishing events
setStatus(
alert.getId(), alertAction.getId(), AlertActionStatus.Status.DISABLED, null, System.currentTimeMillis());
alertAction.setStatusDetails(new AlertActionStatus().withStatus(AlertActionStatus.Status.DISABLED));
return;
}
// Activity Feed AlertAction Cannot be Created
if (alertAction.getAlertActionType() == AlertAction.AlertActionType.ACTIVITY_FEED) {
LOG.info("Activity Feed Alert Action cannot be created.");
return;
}
// Create AlertAction Publisher
AlertsActionPublisher publisher = AlertUtil.getAlertPublisher(alert, alertAction, daoCollection);
BatchEventProcessor<EventPubSub.ChangeEventHolder> processor = EventPubSub.addEventHandler(publisher);
publisher.setProcessor(processor);
LOG.info("Alert publisher started for {}", alert.getName());
Map<UUID, AlertsActionPublisher> alertsActionPublisherMap =
alertPublisherMap.get(alert.getId()) == null ? new HashMap<>() : alertPublisherMap.get(alert.getId());
alertsActionPublisherMap.put(alertAction.getId(), publisher);
alertPublisherMap.put(alert.getId(), alertsActionPublisherMap);
}
public AlertActionStatus setStatus(
UUID alertId, UUID alertActionId, AlertActionStatus.Status status, FailureDetails failureDetails, Long timeStamp)
throws IOException {
AlertActionStatus currentStatus =
new AlertActionStatus().withStatus(status).withFailureDetails(failureDetails).withTimestamp(timeStamp);
daoCollection
.entityExtensionTimeSeriesDao()
.insert(alertId.toString(), alertActionId.toString(), "alertActionStatus", JsonUtils.pojoToJson(currentStatus));
return currentStatus;
}
public void removeAlertStatus(UUID alertId, UUID alertActionId) {
daoCollection.entityExtensionTimeSeriesDao().delete(alertId.toString(), alertActionId.toString());
}
public void removeAllAlertStatus(UUID alertId) {
daoCollection.entityExtensionTimeSeriesDao().deleteAll(alertId.toString());
}
public void setStatus(UUID alertId, UUID alertActionId, AlertActionStatus status) throws IOException {
daoCollection
.entityExtensionTimeSeriesDao()
.insert(alertId.toString(), alertActionId.toString(), "alertActionStatus", JsonUtils.pojoToJson(status));
}
@SneakyThrows
public void updateAlertActionPublishers(Alert alert) {
// Delete existing alert action publisher and create with the updated Alert
// if some alerts are removed or added
deleteAlertAllPublishers(alert.getId());
addAlertActionPublishers(alert);
}
@SneakyThrows
public void updateAllAlertUsingAlertAction(AlertAction alertAction) {
List<CollectionDAO.EntityRelationshipRecord> records =
daoCollection.relationshipDAO().findFrom(alertAction.getId().toString(), ALERT_ACTION, USES.ordinal(), ALERT);
EntityRepository<Alert> alertEntityRepository = Entity.getEntityRepository(ALERT);
for (CollectionDAO.EntityRelationshipRecord record : records) {
deleteAlertAllPublishers(record.getId());
Alert alert = alertEntityRepository.get(null, record.getId(), alertEntityRepository.getFields("*"));
addAlertActionPublishers(alert);
}
}
@SneakyThrows
public void deleteAlertActionFromAllAlertPublisher(AlertAction alertAction) {
List<CollectionDAO.EntityRelationshipRecord> records =
daoCollection.relationshipDAO().findFrom(alertAction.getId().toString(), ALERT_ACTION, USES.ordinal(), ALERT);
for (CollectionDAO.EntityRelationshipRecord record : records) {
deleteAlertActionPublisher(record.getId(), alertAction);
}
}
public void deleteAlertActionPublisher(UUID alertId, AlertAction action) throws InterruptedException {
Map<UUID, AlertsActionPublisher> alertActionPublishers = alertPublisherMap.get(alertId);
if (alertActionPublishers != null) {
AlertsActionPublisher alertsActionPublisher = alertActionPublishers.get(action.getId());
if (alertsActionPublisher != null) {
alertsActionPublisher.getProcessor().halt();
alertsActionPublisher.awaitShutdown();
EventPubSub.removeProcessor(alertsActionPublisher.getProcessor());
LOG.info("Alert publisher deleted for {}", alertsActionPublisher.getAlert().getName());
alertActionPublishers.remove(action.getId());
removeAlertStatus(alertId, action.getId());
alertPublisherMap.put(alertId, alertActionPublishers);
}
}
}
public void deleteAlertAllPublishers(UUID alertId) throws InterruptedException {
Map<UUID, AlertsActionPublisher> alertPublishers = alertPublisherMap.get(alertId);
if (alertPublishers != null) {
for (AlertsActionPublisher publisher : alertPublishers.values()) {
publisher.getProcessor().halt();
publisher.awaitShutdown();
EventPubSub.removeProcessor(publisher.getProcessor());
LOG.info("Alert publisher deleted for {}", publisher.getAlert().getName());
}
alertPublisherMap.remove(alertId);
removeAllAlertStatus(alertId);
}
}
}

View File

@ -1,12 +1,14 @@
package org.openmetadata.service.alerts;
import static org.openmetadata.schema.type.Function.ParameterType.ALL_INDEX_ELASTIC_SEARCH;
import static org.openmetadata.schema.type.Function.ParameterType.NOT_REQUIRED;
import static org.openmetadata.schema.type.Function.ParameterType.READ_FROM_PARAM_CONTEXT;
import static org.openmetadata.schema.type.Function.ParameterType.SPECIFIC_INDEX_ELASTIC_SEARCH;
import static org.openmetadata.service.Entity.TEAM;
import static org.openmetadata.service.Entity.TEST_CASE;
import static org.openmetadata.service.Entity.USER;
import java.util.Set;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.EntityInterface;
@ -19,6 +21,7 @@ import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.FieldChange;
import org.openmetadata.service.security.policyevaluator.SubjectCache;
import org.openmetadata.service.util.ChangeEventParser;
@Slf4j
public class AlertsRuleEvaluator {
@ -29,7 +32,8 @@ public class AlertsRuleEvaluator {
matchAnyEntityId,
matchAnyEventType,
matchTestResult,
matchUpdatedBy
matchUpdatedBy,
matchAnyFieldChange
}
private final ChangeEvent changeEvent;
@ -101,7 +105,7 @@ public class AlertsRuleEvaluator {
}
EntityInterface entity = (EntityInterface) changeEvent.getEntity();
for (String name : entityNames) {
if (entity.getName().equals(name)) {
if (entity.getFullyQualifiedName().equals(name)) {
return true;
}
}
@ -192,4 +196,23 @@ public class AlertsRuleEvaluator {
}
return false;
}
@Function(
name = "matchAnyFieldChange",
input = "List of comma separated fields change",
description = "Returns true if the change event entity is updated by the mentioned users",
examples = {"matchAnyFieldChange('fieldName1', 'fieldName')"},
paramInputType = NOT_REQUIRED)
public boolean matchAnyFieldChange(String... fieldChangeUpdate) {
if (changeEvent == null || changeEvent.getEntity() == null) {
return false;
}
Set<String> fields = ChangeEventParser.getUpdatedField(changeEvent);
for (String name : fieldChangeUpdate) {
if (fields.contains(name)) {
return true;
}
}
return false;
}
}

View File

@ -3,6 +3,7 @@ package org.openmetadata.service.alerts.emailAlert;
import static org.openmetadata.service.Entity.TEAM;
import static org.openmetadata.service.Entity.USER;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@ -54,15 +55,17 @@ public class EmailAlertPublisher extends AlertsActionPublisher {
}
@Override
public void sendAlert(ChangeEvent event) {
public void sendAlert(ChangeEvent event) throws IOException, InterruptedException {
try {
Set<String> receivers = buildReceiversList(event);
EmailMessage emailMessage = ChangeEventParser.buildEmailMessage(event);
for (String email : receivers) {
EmailUtil.sendChangeEventMail(email, emailMessage);
}
setSuccessStatus(System.currentTimeMillis());
} catch (Exception e) {
LOG.error("Failed to publish event {} to email due to {} ", event, e.getMessage());
setErrorStatus(System.currentTimeMillis(), 500, e.getMessage());
throw new EventPublisherException(
String.format("Failed to publish event %s to email due to %s ", event, e.getMessage()));
}

View File

@ -2,6 +2,7 @@ package org.openmetadata.service.alerts.generic;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.client.Client;
@ -17,6 +18,7 @@ import org.openmetadata.schema.type.Webhook;
import org.openmetadata.service.alerts.AlertsActionPublisher;
import org.openmetadata.service.events.errors.EventPublisherException;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.resources.events.EventResource;
import org.openmetadata.service.security.SecurityUtil;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.RestUtil;
@ -60,7 +62,8 @@ public class GenericWebhookPublisher extends AlertsActionPublisher {
public void sendAlert(ChangeEvent event) throws EventPublisherException, IOException, InterruptedException {
long attemptTime = System.currentTimeMillis();
try {
String json = JsonUtils.pojoToJson(event);
EventResource.ChangeEventList list = new EventResource.ChangeEventList(List.of(event), null, null, 1);
String json = JsonUtils.pojoToJson(list);
Response response;
if (webhook.getSecretKey() != null && !webhook.getSecretKey().isEmpty()) {
String hmac = "sha256=" + CommonUtil.calculateHMAC(webhook.getSecretKey(), json);
@ -74,6 +77,7 @@ public class GenericWebhookPublisher extends AlertsActionPublisher {
alertAction.getStatusDetails().getStatus(),
batch.size(),
response.getStatusInfo());
// Successfully sent Alert, update Status
if (response.getStatus() >= 300 && response.getStatus() < 400) {
// 3xx response/redirection is not allowed for callback. Set the webhook state as in error
setErrorStatus(attemptTime, response.getStatus(), response.getStatusInfo().getReasonPhrase());
@ -82,12 +86,14 @@ public class GenericWebhookPublisher extends AlertsActionPublisher {
setNextBackOff();
setAwaitingRetry(attemptTime, response.getStatus(), response.getStatusInfo().getReasonPhrase());
Thread.sleep(currentBackoffTime);
} else if (response.getStatus() == 200) {
setSuccessStatus(System.currentTimeMillis());
}
} catch (Exception ex) {
Throwable cause = ex.getCause();
if (cause != null && cause.getClass() == UnknownHostException.class) {
LOG.warn("Invalid webhook {} endpoint {}", webhook.getName(), webhook.getEndpoint());
setErrorStatus(attemptTime, null, "UnknownHostException");
setErrorStatus(attemptTime, Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "UnknownHostException");
} else {
LOG.debug("Exception occurred while publishing webhook", ex);
}

View File

@ -62,6 +62,8 @@ public class MSTeamsWebhookPublisher extends AlertsActionPublisher {
setNextBackOff();
setAwaitingRetry(attemptTime, response.getStatus(), response.getStatusInfo().getReasonPhrase());
Thread.sleep(currentBackoffTime);
} else if (response.getStatus() == 200) {
setSuccessStatus(System.currentTimeMillis());
}
} catch (Exception e) {
LOG.error("Failed to publish event {} to msteams due to {} ", event, e.getMessage());

View File

@ -55,6 +55,7 @@ public class SlackWebhookEventPublisher extends AlertsActionPublisher {
try {
SlackMessage slackMessage = ChangeEventParser.buildSlackMessage(event);
Response response = target.post(javax.ws.rs.client.Entity.entity(slackMessage, MediaType.APPLICATION_JSON_TYPE));
// Successfully sent Alert, update Status
if (response.getStatus() >= 300 && response.getStatus() < 400) {
// 3xx response/redirection is not allowed for callback. Set the webhook state as in error
setErrorStatus(attemptTime, response.getStatus(), response.getStatusInfo().getReasonPhrase());
@ -63,6 +64,8 @@ public class SlackWebhookEventPublisher extends AlertsActionPublisher {
setNextBackOff();
setAwaitingRetry(attemptTime, response.getStatus(), response.getStatusInfo().getReasonPhrase());
Thread.sleep(currentBackoffTime);
} else if (response.getStatus() == 200) {
setSuccessStatus(System.currentTimeMillis());
}
} catch (Exception e) {
LOG.error("Failed to publish event {} to slack due to {} ", event, e.getMessage());

View File

@ -7,6 +7,7 @@ import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.entity.alerts.AlertAction;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.service.Entity;
import org.openmetadata.service.alerts.AlertsPublisherManager;
import org.openmetadata.service.resources.dqtests.TestDefinitionResource;
import org.openmetadata.service.util.EntityUtil;
@ -68,6 +69,7 @@ public class AlertActionRepository extends EntityRepository<AlertAction> {
recordChange("readTimeout", original.getReadTimeout(), updated.getReadTimeout());
recordChange("timeout", original.getTimeout(), updated.getTimeout());
recordChange("alertActionConfig", original.getAlertActionConfig(), updated.getAlertActionConfig());
AlertsPublisherManager.getInstance().updateAllAlertUsingAlertAction(updated);
}
}
}

View File

@ -16,26 +16,20 @@ package org.openmetadata.service.jdbi3;
import static org.openmetadata.service.Entity.ALERT;
import static org.openmetadata.service.Entity.ALERT_ACTION;
import com.lmax.disruptor.BatchEventProcessor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.entity.alerts.Alert;
import org.openmetadata.schema.entity.alerts.AlertAction;
import org.openmetadata.schema.entity.alerts.AlertActionStatus;
import org.openmetadata.schema.entity.alerts.AlertFilterRule;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.FailureDetails;
import org.openmetadata.schema.type.Relationship;
import org.openmetadata.service.Entity;
import org.openmetadata.service.alerts.AlertUtil;
import org.openmetadata.service.alerts.AlertsActionPublisher;
import org.openmetadata.service.events.EventPubSub;
import org.openmetadata.service.alerts.AlertsPublisherManager;
import org.openmetadata.service.resources.alerts.AlertResource;
import org.openmetadata.service.util.EntityUtil;
import org.openmetadata.service.util.EntityUtil.Fields;
@ -47,10 +41,6 @@ public class AlertRepository extends EntityRepository<Alert> {
static final String ALERT_PATCH_FIELDS = "triggerConfig,filteringRules,alertActions";
static final String ALERT_UPDATE_FIELDS = "triggerConfig,filteringRules,alertActions";
// Alert is mapped to different publisher
private static final ConcurrentHashMap<UUID, List<AlertsActionPublisher>> alertPublisherMap =
new ConcurrentHashMap<>();
public AlertRepository(CollectionDAO dao) {
super(
AlertResource.COLLECTION_PATH,
@ -113,109 +103,6 @@ public class AlertRepository extends EntityRepository<Alert> {
return new AlertUpdater(original, updated, operation);
}
public void addAlertActionPublishers(Alert alert) throws IOException {
EntityRepository<AlertAction> alertActionEntityRepository = Entity.getEntityRepository(ALERT_ACTION);
for (EntityReference alertActionRef : alert.getAlertActions()) {
AlertAction action =
alertActionEntityRepository.get(null, alertActionRef.getId(), alertActionEntityRepository.getFields("*"));
addAlertActionPublisher(alert, action);
}
}
public void addAlertActionPublisher(Alert alert, AlertAction alertAction) throws IOException {
if (Boolean.FALSE.equals(alertAction.getEnabled())) {
// Only add alert that is enabled for publishing events
setStatus(alert.getId(), alertAction.getId(), AlertActionStatus.Status.DISABLED, null);
alertAction.setStatusDetails(new AlertActionStatus().withStatus(AlertActionStatus.Status.DISABLED));
return;
}
// Activity Feed AlertAction Cannot be Created
if (alertAction.getAlertActionType() == AlertAction.AlertActionType.ACTIVITY_FEED) {
LOG.info("Activity Feed Alert Action cannot be created.");
return;
}
// Create AlertAction Publisher
AlertsActionPublisher publisher = AlertUtil.getAlertPublisher(alert, alertAction, daoCollection);
BatchEventProcessor<EventPubSub.ChangeEventHolder> processor = EventPubSub.addEventHandler(publisher);
publisher.setProcessor(processor);
LOG.info("Alert publisher started for {}", alert.getName());
List<AlertsActionPublisher> listPublisher =
alertPublisherMap.get(alert.getId()) == null ? new ArrayList<>() : alertPublisherMap.get(alert.getId());
listPublisher.add(publisher);
alertPublisherMap.put(alert.getId(), listPublisher);
}
public AlertActionStatus setStatus(
UUID alertId, UUID alertActionId, AlertActionStatus.Status status, FailureDetails failureDetails)
throws IOException {
AlertActionStatus currentStatus = new AlertActionStatus().withStatus(status).withFailureDetails(failureDetails);
daoCollection
.entityExtensionTimeSeriesDao()
.insert(alertId.toString(), alertActionId.toString(), "alertActionStatus", JsonUtils.pojoToJson(currentStatus));
return currentStatus;
}
public void removeAlertStatus(UUID alertId, UUID alertActionId) {
daoCollection.entityExtensionTimeSeriesDao().delete(alertId.toString(), alertActionId.toString());
}
public void removeAllAlertStatus(UUID alertId) {
daoCollection.entityExtensionTimeSeriesDao().deleteAll(alertId.toString());
}
public void setStatus(UUID alertId, UUID alertActionId, AlertActionStatus status) throws IOException {
daoCollection
.entityExtensionTimeSeriesDao()
.insert(alertId.toString(), alertActionId.toString(), "alertActionStatus", JsonUtils.pojoToJson(status));
}
@SneakyThrows
public void updateAlertActionPublishers(Alert alert) {
// Delete existing alert action publisher and create with the updated Alert
// if some alerts are removed or added
deleteAlertAllPublishers(alert.getId());
addAlertActionPublishers(alert);
}
public void deleteAlertActionPublisher(UUID alertId, AlertAction action) throws InterruptedException {
List<AlertsActionPublisher> alertPublishers = alertPublisherMap.get(alertId);
if (alertPublishers != null) {
int position = -1;
for (int i = 0; i < alertPublishers.size(); i++) {
AlertsActionPublisher alertsActionPublisher = alertPublishers.get(i);
if (alertsActionPublisher.getAlertAction().getId().equals(action.getId())) {
alertsActionPublisher.getProcessor().halt();
alertsActionPublisher.awaitShutdown();
EventPubSub.removeProcessor(alertsActionPublisher.getProcessor());
LOG.info("Alert publisher deleted for {}", alertsActionPublisher.getAlert().getName());
position = i;
break;
}
}
if (position != -1) {
alertPublishers.remove(position);
removeAlertStatus(alertId, action.getId());
alertPublisherMap.put(alertId, alertPublishers);
}
}
}
public void deleteAlertAllPublishers(UUID alertId) throws InterruptedException {
List<AlertsActionPublisher> alertPublishers = alertPublisherMap.get(alertId);
if (alertPublishers != null) {
for (AlertsActionPublisher alertsActionPublisher : alertPublishers) {
alertsActionPublisher.getProcessor().halt();
alertsActionPublisher.awaitShutdown();
EventPubSub.removeProcessor(alertsActionPublisher.getProcessor());
LOG.info("Alert publisher deleted for {}", alertsActionPublisher.getAlert().getName());
}
alertPublisherMap.clear();
removeAllAlertStatus(alertId);
}
}
private List<EntityReference> getAlertActions(Alert entity) throws IOException {
List<CollectionDAO.EntityRelationshipRecord> testCases =
findTo(entity.getId(), ALERT, Relationship.USES, ALERT_ACTION);
@ -246,9 +133,7 @@ public class AlertRepository extends EntityRepository<Alert> {
new ArrayList<>(original.getAlertActions()),
new ArrayList<>(updated.getAlertActions()),
false);
if (!original.getAlertActions().equals(updated.getAlertActions())) {
updateAlertActionPublishers(updated);
}
AlertsPublisherManager.getInstance().updateAlertActionPublishers(updated);
}
}
}

View File

@ -52,6 +52,7 @@ import org.openmetadata.schema.entity.alerts.AlertAction;
import org.openmetadata.schema.type.EntityHistory;
import org.openmetadata.schema.type.Include;
import org.openmetadata.service.OpenMetadataApplicationConfig;
import org.openmetadata.service.alerts.AlertsPublisherManager;
import org.openmetadata.service.jdbi3.AlertActionRepository;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.ListFilter;
@ -79,6 +80,8 @@ public class AlertActionResource extends EntityResource<AlertAction, AlertAction
public AlertActionResource(CollectionDAO dao, Authorizer authorizer) {
super(AlertAction.class, new AlertActionRepository(dao), authorizer);
// Initialize the Alerts Publisher Manager
AlertsPublisherManager.initialize(dao);
}
public static class AlertActionList extends ResultList<AlertAction> {
@ -353,7 +356,9 @@ public class AlertActionResource extends EntityResource<AlertAction, AlertAction
boolean hardDelete,
@Parameter(description = "alert Id", schema = @Schema(type = "UUID")) @PathParam("id") UUID id)
throws IOException {
return delete(uriInfo, securityContext, id, false, hardDelete);
Response response = delete(uriInfo, securityContext, id, false, hardDelete);
AlertsPublisherManager.getInstance().deleteAlertActionFromAllAlertPublisher((AlertAction) response.getEntity());
return response;
}
public AlertAction getAlertAction(CreateAlertAction create, String user) throws IOException {

View File

@ -65,6 +65,7 @@ import org.openmetadata.service.Entity;
import org.openmetadata.service.OpenMetadataApplicationConfig;
import org.openmetadata.service.alerts.ActivityFeedAlertCache;
import org.openmetadata.service.alerts.AlertUtil;
import org.openmetadata.service.alerts.AlertsPublisherManager;
import org.openmetadata.service.jdbi3.AlertRepository;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.EntityRepository;
@ -136,7 +137,7 @@ public class AlertResource extends EntityResource<Alert, AlertRepository> {
List<Alert> alertList = JsonUtils.readObjects(listAllAlerts, Alert.class);
for (Alert alert : alertList) {
if (!alert.getName().equals(activityFeedAlert.getName())) {
dao.addAlertActionPublishers(alert);
AlertsPublisherManager.getInstance().addAlertActionPublishers(alert);
}
}
}
@ -267,12 +268,36 @@ public class AlertResource extends EntityResource<Alert, AlertRepository> {
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "alert Id", schema = @Schema(type = "UUID")) @PathParam("alertId") UUID alertId,
@Parameter(description = "alertAction Id", schema = @Schema(type = "UUID")) @PathParam("alertId")
@Parameter(description = "alertAction Id", schema = @Schema(type = "UUID")) @PathParam("actionId")
UUID alertActionId)
throws IOException {
return dao.getAlertActionStatus(alertId, alertActionId);
}
@GET
@Path("/allAlertAction/{alertId}")
@Valid
@Operation(
operationId = "getAllAlertActionForAlert",
summary = "Get all alert Action of an alert",
tags = "alerts",
description = "Get all alert Action of alert by given Id , and id of the alert it is bound to",
responses = {
@ApiResponse(
responseCode = "200",
description = "Entity events",
content =
@Content(mediaType = "application/json", schema = @Schema(implementation = AlertActionStatus.class))),
@ApiResponse(responseCode = "404", description = "Entity for instance {id} is not found")
})
public List<AlertAction> getAllAlertActionForAlert(
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "alert Id", schema = @Schema(type = "UUID")) @PathParam("alertId") UUID alertId)
throws IOException {
return AlertsPublisherManager.getInstance().getAllAlertActions(alertId);
}
@GET
@Path("/defaultTriggers")
@Operation(
@ -435,7 +460,7 @@ public class AlertResource extends EntityResource<Alert, AlertRepository> {
throws IOException {
Alert alert = getAlert(create, securityContext.getUserPrincipal().getName());
Response response = create(uriInfo, securityContext, alert);
dao.addAlertActionPublishers(alert);
AlertsPublisherManager.getInstance().addAlertActionPublishers(alert);
return response;
}
@ -457,7 +482,7 @@ public class AlertResource extends EntityResource<Alert, AlertRepository> {
throws IOException {
Alert alert = getAlert(create, securityContext.getUserPrincipal().getName());
Response response = createOrUpdate(uriInfo, securityContext, alert);
dao.updateAlertActionPublishers((Alert) response.getEntity());
AlertsPublisherManager.getInstance().updateAlertActionPublishers(alert);
return response;
}
@ -485,7 +510,7 @@ public class AlertResource extends EntityResource<Alert, AlertRepository> {
JsonPatch patch)
throws IOException {
Response response = patchInternal(uriInfo, securityContext, id, patch);
dao.updateAlertActionPublishers((Alert) response.getEntity());
AlertsPublisherManager.getInstance().updateAlertActionPublishers((Alert) response.getEntity());
return response;
}
@ -518,7 +543,7 @@ public class AlertResource extends EntityResource<Alert, AlertRepository> {
@Parameter(description = "alert Id", schema = @Schema(type = "UUID")) @PathParam("id") UUID id)
throws IOException, InterruptedException {
Response response = delete(uriInfo, securityContext, id, false, hardDelete);
dao.deleteAlertAllPublishers(id);
AlertsPublisherManager.getInstance().deleteAlertAllPublishers(id);
return response;
}

View File

@ -24,6 +24,7 @@ import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@ -619,4 +620,26 @@ public final class ChangeEventParser {
}
return diff;
}
public static Set<String> getUpdatedField(ChangeEvent event) {
Set<String> fields = new HashSet<>();
ChangeDescription description = event.getChangeDescription();
if (description != null) {
List<FieldChange> fieldChanges = new ArrayList<>();
fieldChanges.addAll(description.getFieldsAdded());
fieldChanges.addAll(description.getFieldsUpdated());
fieldChanges.addAll(description.getFieldsDeleted());
fieldChanges.forEach(
(field) -> {
String fieldName = field.getName();
if (fieldName.contains(".")) {
String[] tokens = fieldName.split("\\.");
fields.add(tokens[tokens.length - 1]);
} else {
fields.add(fieldName);
}
});
}
return fields;
}
}

View File

@ -48,6 +48,11 @@
"name": "DisableIngestionActivityFeedData",
"effect": "deny",
"condition": "matchUpdatedBy('ingestion-bot')"
},
{
"name": "DisableUsageSummaryFromActivityFeed",
"effect": "deny",
"condition": "matchAnyFieldChange('usageSummary')"
}
],
"provider" : "system"