diff --git a/openmetadata-clients/openmetadata-java-client/src/main/java/org/openmetadata/client/gateway/OpenMetadata.java b/openmetadata-clients/openmetadata-java-client/src/main/java/org/openmetadata/client/gateway/OpenMetadata.java index b120a476131..5bb6dd23e68 100644 --- a/openmetadata-clients/openmetadata-java-client/src/main/java/org/openmetadata/client/gateway/OpenMetadata.java +++ b/openmetadata-clients/openmetadata-java-client/src/main/java/org/openmetadata/client/gateway/OpenMetadata.java @@ -15,7 +15,6 @@ package org.openmetadata.client.gateway; import com.fasterxml.jackson.annotation.JsonInclude; import feign.Feign; -import feign.RequestInterceptor; import feign.form.FormEncoder; import feign.jackson.JacksonDecoder; import feign.jackson.JacksonEncoder; @@ -83,13 +82,6 @@ public class OpenMetadata { apiClient.addAuthorization(REQUEST_INTERCEPTOR_KEY, newInterceptor); } - public void addRequestInterceptor(String requestInterceptorKey, RequestInterceptor interceptor) { - if (apiClient.getApiAuthorizations().containsKey(requestInterceptorKey)) { - LOG.info("Interceptor with this key already exists"); - } - apiClient.addAuthorization(requestInterceptorKey, interceptor); - } - public void validateVersion() { String[] clientVersion = getClientVersion(); String[] serverVersion = getServerVersion(); diff --git a/openmetadata-clients/openmetadata-java-client/src/main/java/org/openmetadata/client/listUtils/ListUtils.java b/openmetadata-clients/openmetadata-java-client/src/main/java/org/openmetadata/client/listUtils/ListUtils.java index 11af036630d..b6a94f128fd 100644 --- a/openmetadata-clients/openmetadata-java-client/src/main/java/org/openmetadata/client/listUtils/ListUtils.java +++ b/openmetadata-clients/openmetadata-java-client/src/main/java/org/openmetadata/client/listUtils/ListUtils.java @@ -1,4 +1,4 @@ -package org.openmetadata.client.listutils; +package org.openmetadata.client.listUtils; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/alerts/AlertUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/alerts/AlertUtil.java index dc5aea001eb..2c6133a1be9 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/alerts/AlertUtil.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/alerts/AlertUtil.java @@ -122,14 +122,18 @@ public class AlertUtil { } public static boolean evaluateAlertConditions(ChangeEvent changeEvent, List alertFilterRules) { - boolean result; - String completeCondition = buildCompleteCondition(alertFilterRules); - AlertsRuleEvaluator ruleEvaluator = new AlertsRuleEvaluator(changeEvent); - StandardEvaluationContext evaluationContext = new StandardEvaluationContext(ruleEvaluator); - Expression expression = parseExpression(completeCondition); - result = Boolean.TRUE.equals(expression.getValue(evaluationContext, Boolean.class)); - LOG.debug("Alert evaluated as Result : {}", result); - return result; + if (alertFilterRules.size() > 0) { + boolean result; + String completeCondition = buildCompleteCondition(alertFilterRules); + AlertsRuleEvaluator ruleEvaluator = new AlertsRuleEvaluator(changeEvent); + StandardEvaluationContext evaluationContext = new StandardEvaluationContext(ruleEvaluator); + Expression expression = parseExpression(completeCondition); + result = Boolean.TRUE.equals(expression.getValue(evaluationContext, Boolean.class)); + LOG.debug("Alert evaluated as Result : {}", result); + return result; + } else { + return true; + } } public static String buildCompleteCondition(List alertFilterRules) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/alerts/AlertsActionPublisher.java b/openmetadata-service/src/main/java/org/openmetadata/service/alerts/AlertsActionPublisher.java index f4ca20f1762..713ad38f26b 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/alerts/AlertsActionPublisher.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/alerts/AlertsActionPublisher.java @@ -64,7 +64,7 @@ public class AlertsActionPublisher extends AbstractAlertPublisher { AlertActionStatus currentStatus = new AlertActionStatus() .withStatus(AlertActionStatus.Status.ACTIVE) - .withFailureDetails(new FailureDetails()) + .withFailureDetails(null) .withTimestamp(System.currentTimeMillis()); setStatus(currentStatus); onStartDelegate(); @@ -87,41 +87,41 @@ public class AlertsActionPublisher extends AbstractAlertPublisher { return alertAction; } - protected void setErrorStatus(Long attemptTime, Integer statusCode, String reason) + protected void setErrorStatus(Long updateTime, Integer statusCode, String reason) throws IOException, InterruptedException { - if (!attemptTime.equals(alertAction.getStatusDetails().getFailureDetails().getLastFailedAt())) { - setStatus(AlertActionStatus.Status.FAILED, attemptTime, statusCode, reason, null); - } - alertRepository.deleteAlertActionPublisher(alert.getId(), alertAction); + setStatus(AlertActionStatus.Status.FAILED, updateTime, statusCode, reason, null); + AlertsPublisherManager.getInstance().deleteAlertActionPublisher(alert.getId(), alertAction); throw new RuntimeException(reason); } - protected void setAwaitingRetry(Long attemptTime, int statusCode, String reason) throws IOException { - if (!attemptTime.equals(alertAction.getStatusDetails().getFailureDetails().getLastFailedAt())) { - setStatus( - AlertActionStatus.Status.AWAITING_RETRY, attemptTime, statusCode, reason, attemptTime + currentBackoffTime); - } + protected void setAwaitingRetry(Long updateTime, int statusCode, String reason) throws IOException { + setStatus(AlertActionStatus.Status.AWAITING_RETRY, updateTime, statusCode, reason, updateTime + currentBackoffTime); + } + + protected void setSuccessStatus(Long updateTime) throws IOException { + setStatus(AlertActionStatus.Status.ACTIVE, updateTime, 200, null, null); } protected void setStatus( - AlertActionStatus.Status status, Long attemptTime, Integer statusCode, String reason, Long timestamp) + AlertActionStatus.Status status, Long updateTime, Integer statusCode, String reason, Long nextAttemptTime) throws IOException { - FailureDetails details = - alertAction.getStatusDetails().getFailureDetails() != null - ? alertAction.getStatusDetails().getFailureDetails() - : new FailureDetails(); - details - .withLastFailedAt(attemptTime) - .withLastFailedStatusCode(statusCode) - .withLastFailedReason(reason) - .withNextAttempt(timestamp); + FailureDetails details = null; + if (status != AlertActionStatus.Status.ACTIVE) { + details = new FailureDetails(); + details + .withLastFailedAt(updateTime) + .withLastFailedStatusCode(statusCode) + .withLastFailedReason(reason) + .withNextAttempt(nextAttemptTime); + } + AlertActionStatus currentStatus = - alertRepository.setStatus(alert.getId(), alertAction.getId(), status, details).withTimestamp(attemptTime); + AlertsPublisherManager.getInstance().setStatus(alert.getId(), alertAction.getId(), status, details, updateTime); alertAction.setStatusDetails(currentStatus); } protected void setStatus(AlertActionStatus status) throws IOException { - alertRepository.setStatus(alert.getId(), alertAction.getId(), status); + AlertsPublisherManager.getInstance().setStatus(alert.getId(), alertAction.getId(), status); alertAction.setStatusDetails(status); } @@ -152,11 +152,6 @@ public class AlertsActionPublisher extends AbstractAlertPublisher { try { LOG.info("Sending Alert {}:{}:{}", alert.getName(), alertAction.getStatusDetails().getStatus(), batch.size()); sendAlert(event); - // Successfully sent Alert, update Status - alertAction.getStatusDetails().getFailureDetails().setLastSuccessfulAt(System.currentTimeMillis()); - if (alertAction.getStatusDetails().getStatus() != AlertActionStatus.Status.ACTIVE) { - setStatus(AlertActionStatus.Status.ACTIVE, System.currentTimeMillis(), null, null, null); - } } catch (Exception ex) { LOG.warn("Invalid Exception in Alert {}", alert.getName()); setErrorStatus( diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/alerts/AlertsPublisherManager.java b/openmetadata-service/src/main/java/org/openmetadata/service/alerts/AlertsPublisherManager.java new file mode 100644 index 00000000000..32133e20c3d --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/alerts/AlertsPublisherManager.java @@ -0,0 +1,182 @@ +package org.openmetadata.service.alerts; + +import static org.openmetadata.schema.type.Relationship.USES; +import static org.openmetadata.service.Entity.ALERT; +import static org.openmetadata.service.Entity.ALERT_ACTION; + +import com.lmax.disruptor.BatchEventProcessor; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.openmetadata.schema.entity.alerts.Alert; +import org.openmetadata.schema.entity.alerts.AlertAction; +import org.openmetadata.schema.entity.alerts.AlertActionStatus; +import org.openmetadata.schema.type.EntityReference; +import org.openmetadata.schema.type.FailureDetails; +import org.openmetadata.service.Entity; +import org.openmetadata.service.events.EventPubSub; +import org.openmetadata.service.jdbi3.CollectionDAO; +import org.openmetadata.service.jdbi3.EntityRepository; +import org.openmetadata.service.util.JsonUtils; + +@Slf4j +public class AlertsPublisherManager { + // Alert is mapped to different publisher + private static CollectionDAO daoCollection; + private static AlertsPublisherManager INSTANCE; + private static volatile boolean INITIALIZED = false; + // > + private static final ConcurrentHashMap> alertPublisherMap = + new ConcurrentHashMap<>(); + + public static void initialize(CollectionDAO dao) { + if (!INITIALIZED) { + daoCollection = dao; + INSTANCE = new AlertsPublisherManager(); + INITIALIZED = true; + } else { + INITIALIZED = false; + LOG.info("Email Util is already initialized"); + } + } + + public static AlertsPublisherManager getInstance() { + return INSTANCE; + } + + public List getAllAlertActions(UUID id) { + List alertActions = new ArrayList<>(); + if (alertActions != null) { + for (AlertsActionPublisher publisher : alertPublisherMap.get(id).values()) { + alertActions.add(publisher.getAlertAction()); + } + } + return alertActions; + } + + public void addAlertActionPublishers(Alert alert) throws IOException { + EntityRepository alertActionEntityRepository = Entity.getEntityRepository(ALERT_ACTION); + for (EntityReference alertActionRef : alert.getAlertActions()) { + AlertAction action = + alertActionEntityRepository.get(null, alertActionRef.getId(), alertActionEntityRepository.getFields("*")); + addAlertActionPublisher(alert, action); + } + } + + public void addAlertActionPublisher(Alert alert, AlertAction alertAction) throws IOException { + if (Boolean.FALSE.equals(alertAction.getEnabled())) { + // Only add alert that is enabled for publishing events + setStatus( + alert.getId(), alertAction.getId(), AlertActionStatus.Status.DISABLED, null, System.currentTimeMillis()); + alertAction.setStatusDetails(new AlertActionStatus().withStatus(AlertActionStatus.Status.DISABLED)); + return; + } + + // Activity Feed AlertAction Cannot be Created + if (alertAction.getAlertActionType() == AlertAction.AlertActionType.ACTIVITY_FEED) { + LOG.info("Activity Feed Alert Action cannot be created."); + return; + } + // Create AlertAction Publisher + AlertsActionPublisher publisher = AlertUtil.getAlertPublisher(alert, alertAction, daoCollection); + BatchEventProcessor processor = EventPubSub.addEventHandler(publisher); + publisher.setProcessor(processor); + LOG.info("Alert publisher started for {}", alert.getName()); + + Map alertsActionPublisherMap = + alertPublisherMap.get(alert.getId()) == null ? new HashMap<>() : alertPublisherMap.get(alert.getId()); + alertsActionPublisherMap.put(alertAction.getId(), publisher); + alertPublisherMap.put(alert.getId(), alertsActionPublisherMap); + } + + public AlertActionStatus setStatus( + UUID alertId, UUID alertActionId, AlertActionStatus.Status status, FailureDetails failureDetails, Long timeStamp) + throws IOException { + AlertActionStatus currentStatus = + new AlertActionStatus().withStatus(status).withFailureDetails(failureDetails).withTimestamp(timeStamp); + daoCollection + .entityExtensionTimeSeriesDao() + .insert(alertId.toString(), alertActionId.toString(), "alertActionStatus", JsonUtils.pojoToJson(currentStatus)); + return currentStatus; + } + + public void removeAlertStatus(UUID alertId, UUID alertActionId) { + daoCollection.entityExtensionTimeSeriesDao().delete(alertId.toString(), alertActionId.toString()); + } + + public void removeAllAlertStatus(UUID alertId) { + daoCollection.entityExtensionTimeSeriesDao().deleteAll(alertId.toString()); + } + + public void setStatus(UUID alertId, UUID alertActionId, AlertActionStatus status) throws IOException { + daoCollection + .entityExtensionTimeSeriesDao() + .insert(alertId.toString(), alertActionId.toString(), "alertActionStatus", JsonUtils.pojoToJson(status)); + } + + @SneakyThrows + public void updateAlertActionPublishers(Alert alert) { + // Delete existing alert action publisher and create with the updated Alert + // if some alerts are removed or added + deleteAlertAllPublishers(alert.getId()); + addAlertActionPublishers(alert); + } + + @SneakyThrows + public void updateAllAlertUsingAlertAction(AlertAction alertAction) { + List records = + daoCollection.relationshipDAO().findFrom(alertAction.getId().toString(), ALERT_ACTION, USES.ordinal(), ALERT); + EntityRepository alertEntityRepository = Entity.getEntityRepository(ALERT); + for (CollectionDAO.EntityRelationshipRecord record : records) { + deleteAlertAllPublishers(record.getId()); + Alert alert = alertEntityRepository.get(null, record.getId(), alertEntityRepository.getFields("*")); + addAlertActionPublishers(alert); + } + } + + @SneakyThrows + public void deleteAlertActionFromAllAlertPublisher(AlertAction alertAction) { + List records = + daoCollection.relationshipDAO().findFrom(alertAction.getId().toString(), ALERT_ACTION, USES.ordinal(), ALERT); + for (CollectionDAO.EntityRelationshipRecord record : records) { + deleteAlertActionPublisher(record.getId(), alertAction); + } + } + + public void deleteAlertActionPublisher(UUID alertId, AlertAction action) throws InterruptedException { + Map alertActionPublishers = alertPublisherMap.get(alertId); + if (alertActionPublishers != null) { + AlertsActionPublisher alertsActionPublisher = alertActionPublishers.get(action.getId()); + if (alertsActionPublisher != null) { + alertsActionPublisher.getProcessor().halt(); + alertsActionPublisher.awaitShutdown(); + EventPubSub.removeProcessor(alertsActionPublisher.getProcessor()); + LOG.info("Alert publisher deleted for {}", alertsActionPublisher.getAlert().getName()); + + alertActionPublishers.remove(action.getId()); + removeAlertStatus(alertId, action.getId()); + alertPublisherMap.put(alertId, alertActionPublishers); + } + } + } + + public void deleteAlertAllPublishers(UUID alertId) throws InterruptedException { + Map alertPublishers = alertPublisherMap.get(alertId); + if (alertPublishers != null) { + for (AlertsActionPublisher publisher : alertPublishers.values()) { + publisher.getProcessor().halt(); + publisher.awaitShutdown(); + EventPubSub.removeProcessor(publisher.getProcessor()); + LOG.info("Alert publisher deleted for {}", publisher.getAlert().getName()); + } + alertPublisherMap.remove(alertId); + removeAllAlertStatus(alertId); + } + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/alerts/AlertsRuleEvaluator.java b/openmetadata-service/src/main/java/org/openmetadata/service/alerts/AlertsRuleEvaluator.java index 507d6b655f3..4fbca68be22 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/alerts/AlertsRuleEvaluator.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/alerts/AlertsRuleEvaluator.java @@ -1,12 +1,14 @@ package org.openmetadata.service.alerts; import static org.openmetadata.schema.type.Function.ParameterType.ALL_INDEX_ELASTIC_SEARCH; +import static org.openmetadata.schema.type.Function.ParameterType.NOT_REQUIRED; import static org.openmetadata.schema.type.Function.ParameterType.READ_FROM_PARAM_CONTEXT; import static org.openmetadata.schema.type.Function.ParameterType.SPECIFIC_INDEX_ELASTIC_SEARCH; import static org.openmetadata.service.Entity.TEAM; import static org.openmetadata.service.Entity.TEST_CASE; import static org.openmetadata.service.Entity.USER; +import java.util.Set; import java.util.UUID; import lombok.extern.slf4j.Slf4j; import org.openmetadata.schema.EntityInterface; @@ -19,6 +21,7 @@ import org.openmetadata.schema.type.ChangeEvent; import org.openmetadata.schema.type.EntityReference; import org.openmetadata.schema.type.FieldChange; import org.openmetadata.service.security.policyevaluator.SubjectCache; +import org.openmetadata.service.util.ChangeEventParser; @Slf4j public class AlertsRuleEvaluator { @@ -29,7 +32,8 @@ public class AlertsRuleEvaluator { matchAnyEntityId, matchAnyEventType, matchTestResult, - matchUpdatedBy + matchUpdatedBy, + matchAnyFieldChange } private final ChangeEvent changeEvent; @@ -101,7 +105,7 @@ public class AlertsRuleEvaluator { } EntityInterface entity = (EntityInterface) changeEvent.getEntity(); for (String name : entityNames) { - if (entity.getName().equals(name)) { + if (entity.getFullyQualifiedName().equals(name)) { return true; } } @@ -192,4 +196,23 @@ public class AlertsRuleEvaluator { } return false; } + + @Function( + name = "matchAnyFieldChange", + input = "List of comma separated fields change", + description = "Returns true if the change event entity is updated by the mentioned users", + examples = {"matchAnyFieldChange('fieldName1', 'fieldName')"}, + paramInputType = NOT_REQUIRED) + public boolean matchAnyFieldChange(String... fieldChangeUpdate) { + if (changeEvent == null || changeEvent.getEntity() == null) { + return false; + } + Set fields = ChangeEventParser.getUpdatedField(changeEvent); + for (String name : fieldChangeUpdate) { + if (fields.contains(name)) { + return true; + } + } + return false; + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/alerts/emailAlert/EmailAlertPublisher.java b/openmetadata-service/src/main/java/org/openmetadata/service/alerts/emailAlert/EmailAlertPublisher.java index ff7c2bd39bd..1b9e085aa4b 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/alerts/emailAlert/EmailAlertPublisher.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/alerts/emailAlert/EmailAlertPublisher.java @@ -3,6 +3,7 @@ package org.openmetadata.service.alerts.emailAlert; import static org.openmetadata.service.Entity.TEAM; import static org.openmetadata.service.Entity.USER; +import java.io.IOException; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -54,15 +55,17 @@ public class EmailAlertPublisher extends AlertsActionPublisher { } @Override - public void sendAlert(ChangeEvent event) { + public void sendAlert(ChangeEvent event) throws IOException, InterruptedException { try { Set receivers = buildReceiversList(event); EmailMessage emailMessage = ChangeEventParser.buildEmailMessage(event); for (String email : receivers) { EmailUtil.sendChangeEventMail(email, emailMessage); } + setSuccessStatus(System.currentTimeMillis()); } catch (Exception e) { LOG.error("Failed to publish event {} to email due to {} ", event, e.getMessage()); + setErrorStatus(System.currentTimeMillis(), 500, e.getMessage()); throw new EventPublisherException( String.format("Failed to publish event %s to email due to %s ", event, e.getMessage())); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/alerts/generic/GenericWebhookPublisher.java b/openmetadata-service/src/main/java/org/openmetadata/service/alerts/generic/GenericWebhookPublisher.java index e2ce061ae64..0e1a2ea4f57 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/alerts/generic/GenericWebhookPublisher.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/alerts/generic/GenericWebhookPublisher.java @@ -2,6 +2,7 @@ package org.openmetadata.service.alerts.generic; import java.io.IOException; import java.net.UnknownHostException; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import javax.ws.rs.client.Client; @@ -17,6 +18,7 @@ import org.openmetadata.schema.type.Webhook; import org.openmetadata.service.alerts.AlertsActionPublisher; import org.openmetadata.service.events.errors.EventPublisherException; import org.openmetadata.service.jdbi3.CollectionDAO; +import org.openmetadata.service.resources.events.EventResource; import org.openmetadata.service.security.SecurityUtil; import org.openmetadata.service.util.JsonUtils; import org.openmetadata.service.util.RestUtil; @@ -60,7 +62,8 @@ public class GenericWebhookPublisher extends AlertsActionPublisher { public void sendAlert(ChangeEvent event) throws EventPublisherException, IOException, InterruptedException { long attemptTime = System.currentTimeMillis(); try { - String json = JsonUtils.pojoToJson(event); + EventResource.ChangeEventList list = new EventResource.ChangeEventList(List.of(event), null, null, 1); + String json = JsonUtils.pojoToJson(list); Response response; if (webhook.getSecretKey() != null && !webhook.getSecretKey().isEmpty()) { String hmac = "sha256=" + CommonUtil.calculateHMAC(webhook.getSecretKey(), json); @@ -74,6 +77,7 @@ public class GenericWebhookPublisher extends AlertsActionPublisher { alertAction.getStatusDetails().getStatus(), batch.size(), response.getStatusInfo()); + // Successfully sent Alert, update Status if (response.getStatus() >= 300 && response.getStatus() < 400) { // 3xx response/redirection is not allowed for callback. Set the webhook state as in error setErrorStatus(attemptTime, response.getStatus(), response.getStatusInfo().getReasonPhrase()); @@ -82,12 +86,14 @@ public class GenericWebhookPublisher extends AlertsActionPublisher { setNextBackOff(); setAwaitingRetry(attemptTime, response.getStatus(), response.getStatusInfo().getReasonPhrase()); Thread.sleep(currentBackoffTime); + } else if (response.getStatus() == 200) { + setSuccessStatus(System.currentTimeMillis()); } } catch (Exception ex) { Throwable cause = ex.getCause(); if (cause != null && cause.getClass() == UnknownHostException.class) { LOG.warn("Invalid webhook {} endpoint {}", webhook.getName(), webhook.getEndpoint()); - setErrorStatus(attemptTime, null, "UnknownHostException"); + setErrorStatus(attemptTime, Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "UnknownHostException"); } else { LOG.debug("Exception occurred while publishing webhook", ex); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/alerts/msteams/MSTeamsWebhookPublisher.java b/openmetadata-service/src/main/java/org/openmetadata/service/alerts/msteams/MSTeamsWebhookPublisher.java index 22d5f658c08..a0c52ca46e6 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/alerts/msteams/MSTeamsWebhookPublisher.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/alerts/msteams/MSTeamsWebhookPublisher.java @@ -62,6 +62,8 @@ public class MSTeamsWebhookPublisher extends AlertsActionPublisher { setNextBackOff(); setAwaitingRetry(attemptTime, response.getStatus(), response.getStatusInfo().getReasonPhrase()); Thread.sleep(currentBackoffTime); + } else if (response.getStatus() == 200) { + setSuccessStatus(System.currentTimeMillis()); } } catch (Exception e) { LOG.error("Failed to publish event {} to msteams due to {} ", event, e.getMessage()); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/alerts/slack/SlackWebhookEventPublisher.java b/openmetadata-service/src/main/java/org/openmetadata/service/alerts/slack/SlackWebhookEventPublisher.java index 5ffdda41f9a..47cde1b9b57 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/alerts/slack/SlackWebhookEventPublisher.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/alerts/slack/SlackWebhookEventPublisher.java @@ -55,6 +55,7 @@ public class SlackWebhookEventPublisher extends AlertsActionPublisher { try { SlackMessage slackMessage = ChangeEventParser.buildSlackMessage(event); Response response = target.post(javax.ws.rs.client.Entity.entity(slackMessage, MediaType.APPLICATION_JSON_TYPE)); + // Successfully sent Alert, update Status if (response.getStatus() >= 300 && response.getStatus() < 400) { // 3xx response/redirection is not allowed for callback. Set the webhook state as in error setErrorStatus(attemptTime, response.getStatus(), response.getStatusInfo().getReasonPhrase()); @@ -63,6 +64,8 @@ public class SlackWebhookEventPublisher extends AlertsActionPublisher { setNextBackOff(); setAwaitingRetry(attemptTime, response.getStatus(), response.getStatusInfo().getReasonPhrase()); Thread.sleep(currentBackoffTime); + } else if (response.getStatus() == 200) { + setSuccessStatus(System.currentTimeMillis()); } } catch (Exception e) { LOG.error("Failed to publish event {} to slack due to {} ", event, e.getMessage()); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/AlertActionRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/AlertActionRepository.java index 100c4ec59ae..94a48b44757 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/AlertActionRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/AlertActionRepository.java @@ -7,6 +7,7 @@ import lombok.extern.slf4j.Slf4j; import org.openmetadata.schema.entity.alerts.AlertAction; import org.openmetadata.schema.type.EntityReference; import org.openmetadata.service.Entity; +import org.openmetadata.service.alerts.AlertsPublisherManager; import org.openmetadata.service.resources.dqtests.TestDefinitionResource; import org.openmetadata.service.util.EntityUtil; @@ -68,6 +69,7 @@ public class AlertActionRepository extends EntityRepository { recordChange("readTimeout", original.getReadTimeout(), updated.getReadTimeout()); recordChange("timeout", original.getTimeout(), updated.getTimeout()); recordChange("alertActionConfig", original.getAlertActionConfig(), updated.getAlertActionConfig()); + AlertsPublisherManager.getInstance().updateAllAlertUsingAlertAction(updated); } } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/AlertRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/AlertRepository.java index 33c9a987153..0df28244617 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/AlertRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/AlertRepository.java @@ -16,26 +16,20 @@ package org.openmetadata.service.jdbi3; import static org.openmetadata.service.Entity.ALERT; import static org.openmetadata.service.Entity.ALERT_ACTION; -import com.lmax.disruptor.BatchEventProcessor; import java.io.IOException; import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.openmetadata.schema.entity.alerts.Alert; -import org.openmetadata.schema.entity.alerts.AlertAction; import org.openmetadata.schema.entity.alerts.AlertActionStatus; import org.openmetadata.schema.entity.alerts.AlertFilterRule; import org.openmetadata.schema.type.EntityReference; -import org.openmetadata.schema.type.FailureDetails; import org.openmetadata.schema.type.Relationship; import org.openmetadata.service.Entity; import org.openmetadata.service.alerts.AlertUtil; -import org.openmetadata.service.alerts.AlertsActionPublisher; -import org.openmetadata.service.events.EventPubSub; +import org.openmetadata.service.alerts.AlertsPublisherManager; import org.openmetadata.service.resources.alerts.AlertResource; import org.openmetadata.service.util.EntityUtil; import org.openmetadata.service.util.EntityUtil.Fields; @@ -47,10 +41,6 @@ public class AlertRepository extends EntityRepository { static final String ALERT_PATCH_FIELDS = "triggerConfig,filteringRules,alertActions"; static final String ALERT_UPDATE_FIELDS = "triggerConfig,filteringRules,alertActions"; - // Alert is mapped to different publisher - private static final ConcurrentHashMap> alertPublisherMap = - new ConcurrentHashMap<>(); - public AlertRepository(CollectionDAO dao) { super( AlertResource.COLLECTION_PATH, @@ -113,109 +103,6 @@ public class AlertRepository extends EntityRepository { return new AlertUpdater(original, updated, operation); } - public void addAlertActionPublishers(Alert alert) throws IOException { - EntityRepository alertActionEntityRepository = Entity.getEntityRepository(ALERT_ACTION); - for (EntityReference alertActionRef : alert.getAlertActions()) { - AlertAction action = - alertActionEntityRepository.get(null, alertActionRef.getId(), alertActionEntityRepository.getFields("*")); - addAlertActionPublisher(alert, action); - } - } - - public void addAlertActionPublisher(Alert alert, AlertAction alertAction) throws IOException { - if (Boolean.FALSE.equals(alertAction.getEnabled())) { - // Only add alert that is enabled for publishing events - setStatus(alert.getId(), alertAction.getId(), AlertActionStatus.Status.DISABLED, null); - alertAction.setStatusDetails(new AlertActionStatus().withStatus(AlertActionStatus.Status.DISABLED)); - return; - } - - // Activity Feed AlertAction Cannot be Created - if (alertAction.getAlertActionType() == AlertAction.AlertActionType.ACTIVITY_FEED) { - LOG.info("Activity Feed Alert Action cannot be created."); - return; - } - // Create AlertAction Publisher - AlertsActionPublisher publisher = AlertUtil.getAlertPublisher(alert, alertAction, daoCollection); - BatchEventProcessor processor = EventPubSub.addEventHandler(publisher); - publisher.setProcessor(processor); - LOG.info("Alert publisher started for {}", alert.getName()); - - List listPublisher = - alertPublisherMap.get(alert.getId()) == null ? new ArrayList<>() : alertPublisherMap.get(alert.getId()); - listPublisher.add(publisher); - alertPublisherMap.put(alert.getId(), listPublisher); - } - - public AlertActionStatus setStatus( - UUID alertId, UUID alertActionId, AlertActionStatus.Status status, FailureDetails failureDetails) - throws IOException { - AlertActionStatus currentStatus = new AlertActionStatus().withStatus(status).withFailureDetails(failureDetails); - daoCollection - .entityExtensionTimeSeriesDao() - .insert(alertId.toString(), alertActionId.toString(), "alertActionStatus", JsonUtils.pojoToJson(currentStatus)); - return currentStatus; - } - - public void removeAlertStatus(UUID alertId, UUID alertActionId) { - daoCollection.entityExtensionTimeSeriesDao().delete(alertId.toString(), alertActionId.toString()); - } - - public void removeAllAlertStatus(UUID alertId) { - daoCollection.entityExtensionTimeSeriesDao().deleteAll(alertId.toString()); - } - - public void setStatus(UUID alertId, UUID alertActionId, AlertActionStatus status) throws IOException { - daoCollection - .entityExtensionTimeSeriesDao() - .insert(alertId.toString(), alertActionId.toString(), "alertActionStatus", JsonUtils.pojoToJson(status)); - } - - @SneakyThrows - public void updateAlertActionPublishers(Alert alert) { - // Delete existing alert action publisher and create with the updated Alert - // if some alerts are removed or added - deleteAlertAllPublishers(alert.getId()); - addAlertActionPublishers(alert); - } - - public void deleteAlertActionPublisher(UUID alertId, AlertAction action) throws InterruptedException { - List alertPublishers = alertPublisherMap.get(alertId); - if (alertPublishers != null) { - int position = -1; - for (int i = 0; i < alertPublishers.size(); i++) { - AlertsActionPublisher alertsActionPublisher = alertPublishers.get(i); - if (alertsActionPublisher.getAlertAction().getId().equals(action.getId())) { - alertsActionPublisher.getProcessor().halt(); - alertsActionPublisher.awaitShutdown(); - EventPubSub.removeProcessor(alertsActionPublisher.getProcessor()); - LOG.info("Alert publisher deleted for {}", alertsActionPublisher.getAlert().getName()); - position = i; - break; - } - } - if (position != -1) { - alertPublishers.remove(position); - removeAlertStatus(alertId, action.getId()); - alertPublisherMap.put(alertId, alertPublishers); - } - } - } - - public void deleteAlertAllPublishers(UUID alertId) throws InterruptedException { - List alertPublishers = alertPublisherMap.get(alertId); - if (alertPublishers != null) { - for (AlertsActionPublisher alertsActionPublisher : alertPublishers) { - alertsActionPublisher.getProcessor().halt(); - alertsActionPublisher.awaitShutdown(); - EventPubSub.removeProcessor(alertsActionPublisher.getProcessor()); - LOG.info("Alert publisher deleted for {}", alertsActionPublisher.getAlert().getName()); - } - alertPublisherMap.clear(); - removeAllAlertStatus(alertId); - } - } - private List getAlertActions(Alert entity) throws IOException { List testCases = findTo(entity.getId(), ALERT, Relationship.USES, ALERT_ACTION); @@ -246,9 +133,7 @@ public class AlertRepository extends EntityRepository { new ArrayList<>(original.getAlertActions()), new ArrayList<>(updated.getAlertActions()), false); - if (!original.getAlertActions().equals(updated.getAlertActions())) { - updateAlertActionPublishers(updated); - } + AlertsPublisherManager.getInstance().updateAlertActionPublishers(updated); } } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/alerts/AlertActionResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/alerts/AlertActionResource.java index 17b5c40b30e..71e5528d33d 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/alerts/AlertActionResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/alerts/AlertActionResource.java @@ -52,6 +52,7 @@ import org.openmetadata.schema.entity.alerts.AlertAction; import org.openmetadata.schema.type.EntityHistory; import org.openmetadata.schema.type.Include; import org.openmetadata.service.OpenMetadataApplicationConfig; +import org.openmetadata.service.alerts.AlertsPublisherManager; import org.openmetadata.service.jdbi3.AlertActionRepository; import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.jdbi3.ListFilter; @@ -79,6 +80,8 @@ public class AlertActionResource extends EntityResource { @@ -353,7 +356,9 @@ public class AlertActionResource extends EntityResource { List alertList = JsonUtils.readObjects(listAllAlerts, Alert.class); for (Alert alert : alertList) { if (!alert.getName().equals(activityFeedAlert.getName())) { - dao.addAlertActionPublishers(alert); + AlertsPublisherManager.getInstance().addAlertActionPublishers(alert); } } } @@ -267,12 +268,36 @@ public class AlertResource extends EntityResource { @Context UriInfo uriInfo, @Context SecurityContext securityContext, @Parameter(description = "alert Id", schema = @Schema(type = "UUID")) @PathParam("alertId") UUID alertId, - @Parameter(description = "alertAction Id", schema = @Schema(type = "UUID")) @PathParam("alertId") + @Parameter(description = "alertAction Id", schema = @Schema(type = "UUID")) @PathParam("actionId") UUID alertActionId) throws IOException { return dao.getAlertActionStatus(alertId, alertActionId); } + @GET + @Path("/allAlertAction/{alertId}") + @Valid + @Operation( + operationId = "getAllAlertActionForAlert", + summary = "Get all alert Action of an alert", + tags = "alerts", + description = "Get all alert Action of alert by given Id , and id of the alert it is bound to", + responses = { + @ApiResponse( + responseCode = "200", + description = "Entity events", + content = + @Content(mediaType = "application/json", schema = @Schema(implementation = AlertActionStatus.class))), + @ApiResponse(responseCode = "404", description = "Entity for instance {id} is not found") + }) + public List getAllAlertActionForAlert( + @Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @Parameter(description = "alert Id", schema = @Schema(type = "UUID")) @PathParam("alertId") UUID alertId) + throws IOException { + return AlertsPublisherManager.getInstance().getAllAlertActions(alertId); + } + @GET @Path("/defaultTriggers") @Operation( @@ -435,7 +460,7 @@ public class AlertResource extends EntityResource { throws IOException { Alert alert = getAlert(create, securityContext.getUserPrincipal().getName()); Response response = create(uriInfo, securityContext, alert); - dao.addAlertActionPublishers(alert); + AlertsPublisherManager.getInstance().addAlertActionPublishers(alert); return response; } @@ -457,7 +482,7 @@ public class AlertResource extends EntityResource { throws IOException { Alert alert = getAlert(create, securityContext.getUserPrincipal().getName()); Response response = createOrUpdate(uriInfo, securityContext, alert); - dao.updateAlertActionPublishers((Alert) response.getEntity()); + AlertsPublisherManager.getInstance().updateAlertActionPublishers(alert); return response; } @@ -485,7 +510,7 @@ public class AlertResource extends EntityResource { JsonPatch patch) throws IOException { Response response = patchInternal(uriInfo, securityContext, id, patch); - dao.updateAlertActionPublishers((Alert) response.getEntity()); + AlertsPublisherManager.getInstance().updateAlertActionPublishers((Alert) response.getEntity()); return response; } @@ -518,7 +543,7 @@ public class AlertResource extends EntityResource { @Parameter(description = "alert Id", schema = @Schema(type = "UUID")) @PathParam("id") UUID id) throws IOException, InterruptedException { Response response = delete(uriInfo, securityContext, id, false, hardDelete); - dao.deleteAlertAllPublishers(id); + AlertsPublisherManager.getInstance().deleteAlertAllPublishers(id); return response; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/ChangeEventParser.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/ChangeEventParser.java index 1d484476c2c..fd0fe5c6a67 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/util/ChangeEventParser.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/ChangeEventParser.java @@ -24,6 +24,7 @@ import java.net.URI; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -619,4 +620,26 @@ public final class ChangeEventParser { } return diff; } + + public static Set getUpdatedField(ChangeEvent event) { + Set fields = new HashSet<>(); + ChangeDescription description = event.getChangeDescription(); + if (description != null) { + List fieldChanges = new ArrayList<>(); + fieldChanges.addAll(description.getFieldsAdded()); + fieldChanges.addAll(description.getFieldsUpdated()); + fieldChanges.addAll(description.getFieldsDeleted()); + fieldChanges.forEach( + (field) -> { + String fieldName = field.getName(); + if (fieldName.contains(".")) { + String[] tokens = fieldName.split("\\."); + fields.add(tokens[tokens.length - 1]); + } else { + fields.add(fieldName); + } + }); + } + return fields; + } } diff --git a/openmetadata-service/src/main/resources/json/data/alerts/alertsData.json b/openmetadata-service/src/main/resources/json/data/alerts/alertsData.json index 69637317196..ac3ca61d7c5 100644 --- a/openmetadata-service/src/main/resources/json/data/alerts/alertsData.json +++ b/openmetadata-service/src/main/resources/json/data/alerts/alertsData.json @@ -48,6 +48,11 @@ "name": "DisableIngestionActivityFeedData", "effect": "deny", "condition": "matchUpdatedBy('ingestion-bot')" + }, + { + "name": "DisableUsageSummaryFromActivityFeed", + "effect": "deny", + "condition": "matchAnyFieldChange('usageSummary')" } ], "provider" : "system"