diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/Entity.java b/openmetadata-service/src/main/java/org/openmetadata/service/Entity.java index 3c6b205f8da..b231c7a87bf 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/Entity.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/Entity.java @@ -163,7 +163,7 @@ public final class Entity { DAO_MAP.put(entity, dao); ENTITY_REPOSITORY_MAP.put(entity, entityRepository); EntityInterface.CANONICAL_ENTITY_NAME_MAP.put(entity.toLowerCase(Locale.ROOT), entity); - + EntityInterface.ENTITY_TYPE_TO_CLASS_MAP.put(entity.toLowerCase(Locale.ROOT), clazz); ENTITY_LIST.add(entity); Collections.sort(ENTITY_LIST); @@ -287,6 +287,10 @@ public final class Entity { return EntityInterface.CANONICAL_ENTITY_NAME_MAP.get(object.getClass().getSimpleName().toLowerCase(Locale.ROOT)); } + public static Class getEntityClassFromType(String entityType) { + return EntityInterface.ENTITY_TYPE_TO_CLASS_MAP.get(entityType); + } + /** * Get list of all the entity field names from JsonPropertyOrder annotation from generated java class from entity.json */ diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/alerts/AlertsPublisherManager.java b/openmetadata-service/src/main/java/org/openmetadata/service/alerts/AlertsPublisherManager.java index 039e95974f2..c7ac909072a 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/alerts/AlertsPublisherManager.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/alerts/AlertsPublisherManager.java @@ -1,11 +1,10 @@ package org.openmetadata.service.alerts; -import static org.openmetadata.schema.type.Relationship.CONTAINS; -import static org.openmetadata.service.Entity.ALERT; import static org.openmetadata.service.Entity.ALERT_ACTION; import com.lmax.disruptor.BatchEventProcessor; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -68,17 +67,6 @@ public class AlertsPublisherManager { } public void addAlertActionPublisher(Alert alert, AlertAction alertAction) { - if (Boolean.FALSE.equals(alertAction.getEnabled())) { - // Only add alert that is enabled for publishing events - AlertActionStatus status = - new AlertActionStatus() - .withStatus(AlertActionStatus.Status.DISABLED) - .withTimestamp(System.currentTimeMillis()) - .withFailureDetails(null); - alertAction.setStatusDetails(status); - return; - } - // Activity Feed AlertAction Cannot be Created if (alertAction.getAlertActionType() == AlertAction.AlertActionType.ACTIVITY_FEED) { LOG.info("Activity Feed Alert Action cannot be created."); @@ -86,9 +74,19 @@ public class AlertsPublisherManager { } // Create AlertAction Publisher AlertsActionPublisher publisher = AlertUtil.getAlertPublisher(alert, alertAction, daoCollection); - BatchEventProcessor processor = EventPubSub.addEventHandler(publisher); - publisher.setProcessor(processor); - LOG.info("Alert publisher started for {}", alert.getName()); + if (Boolean.TRUE.equals(alertAction.getEnabled())) { + BatchEventProcessor processor = EventPubSub.addEventHandler(publisher); + publisher.setProcessor(processor); + LOG.info("Alert publisher started for {}", alert.getName()); + } else { + // Only add alert that is enabled for publishing events + AlertActionStatus status = + new AlertActionStatus() + .withStatus(AlertActionStatus.Status.DISABLED) + .withTimestamp(System.currentTimeMillis()) + .withFailureDetails(null); + alertAction.setStatusDetails(status); + } Map alertsActionPublisherMap = alertPublisherMap.get(alert.getId()) == null ? new HashMap<>() : alertPublisherMap.get(alert.getId()); @@ -106,42 +104,56 @@ public class AlertsPublisherManager { @SneakyThrows public void updateAllAlertUsingAlertAction(AlertAction alertAction) { - List records = - daoCollection - .relationshipDAO() - .findFrom(alertAction.getId().toString(), ALERT_ACTION, CONTAINS.ordinal(), ALERT); - EntityRepository alertEntityRepository = Entity.getEntityRepository(ALERT); - for (CollectionDAO.EntityRelationshipRecord record : records) { - deleteAlertAllPublishers(record.getId()); - Alert alert = alertEntityRepository.get(null, record.getId(), alertEntityRepository.getFields("*")); - addAlertActionPublisher(alert, alertAction); + List publishers = getAlertPublisherFromAlertAction(alertAction.getId()); + // Avoid handling from DB + if (publishers.size() != 0) { + for (AlertsActionPublisher publisher : publishers) { + Alert alert = publisher.getAlert(); + AlertAction action = publisher.getAlertAction(); + deleteAlertAllPublishers(alert.getId()); + if (action.getId().equals(alertAction.getId())) { + addAlertActionPublisher(alert, alertAction); + } else { + addAlertActionPublisher(alert, action); + } + } } } + public List getAlertPublisherFromAlertAction(UUID alertActionId) { + List publisherManagers = new ArrayList<>(); + for (Map.Entry> alertValues : alertPublisherMap.entrySet()) { + if (alertValues.getValue().containsKey(alertActionId)) { + publisherManagers.add(alertValues.getValue().get(alertActionId)); + } + } + return publisherManagers; + } + @SneakyThrows public void deleteAlertActionFromAllAlertPublisher(AlertAction alertAction) { - List records = - daoCollection - .relationshipDAO() - .findFrom(alertAction.getId().toString(), ALERT_ACTION, CONTAINS.ordinal(), ALERT); - for (CollectionDAO.EntityRelationshipRecord record : records) { - deleteAlertActionPublisher(record.getId(), alertAction); + List publishers = getAlertPublisherFromAlertAction(alertAction.getId()); + // Avoid handling from DB + if (publishers.size() != 0) { + for (AlertsActionPublisher alertsActionPublisher : publishers) { + if (alertsActionPublisher != null) { + deleteProcessorFromPubSub(alertsActionPublisher); + UUID alertId = alertsActionPublisher.getAlert().getId(); + Map alertActionPublishersMap = alertPublisherMap.get(alertId); + alertActionPublishersMap.remove(alertAction.getId()); + alertPublisherMap.put(alertId, alertActionPublishersMap); + } + } } } - public void deleteAlertActionPublisher(UUID alertId, AlertAction action) throws InterruptedException { - Map alertActionPublishers = alertPublisherMap.get(alertId); - if (alertActionPublishers != null) { - AlertsActionPublisher alertsActionPublisher = alertActionPublishers.get(action.getId()); - if (alertsActionPublisher != null) { - alertsActionPublisher.getProcessor().halt(); - alertsActionPublisher.awaitShutdown(); - EventPubSub.removeProcessor(alertsActionPublisher.getProcessor()); - LOG.info("Alert publisher deleted for {}", alertsActionPublisher.getAlert().getName()); - - alertActionPublishers.remove(action.getId()); - alertPublisherMap.put(alertId, alertActionPublishers); - } + public void deleteProcessorFromPubSub(AlertsActionPublisher publisher) throws InterruptedException { + BatchEventProcessor processor = publisher.getProcessor(); + if (processor != null) { + processor.halt(); + publisher.awaitShutdown(); + EventPubSub.removeProcessor(publisher.getProcessor()); + LOG.info("Alert publisher deleted for {}", publisher.getAlert().getName()); } } @@ -149,10 +161,7 @@ public class AlertsPublisherManager { Map alertPublishers = alertPublisherMap.get(alertId); if (alertPublishers != null) { for (AlertsActionPublisher publisher : alertPublishers.values()) { - publisher.getProcessor().halt(); - publisher.awaitShutdown(); - EventPubSub.removeProcessor(publisher.getProcessor()); - LOG.info("Alert publisher deleted for {}", publisher.getAlert().getName()); + deleteProcessorFromPubSub(publisher); } alertPublisherMap.remove(alertId); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/alerts/AlertsRuleEvaluator.java b/openmetadata-service/src/main/java/org/openmetadata/service/alerts/AlertsRuleEvaluator.java index 30a300b43c0..cd2391794a8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/alerts/AlertsRuleEvaluator.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/alerts/AlertsRuleEvaluator.java @@ -8,6 +8,7 @@ import static org.openmetadata.service.Entity.TEAM; import static org.openmetadata.service.Entity.TEST_CASE; import static org.openmetadata.service.Entity.USER; +import java.io.IOException; import java.util.Set; import java.util.UUID; import lombok.extern.slf4j.Slf4j; @@ -20,8 +21,10 @@ import org.openmetadata.schema.tests.type.TestCaseStatus; import org.openmetadata.schema.type.ChangeEvent; import org.openmetadata.schema.type.EntityReference; import org.openmetadata.schema.type.FieldChange; +import org.openmetadata.service.Entity; import org.openmetadata.service.security.policyevaluator.SubjectCache; import org.openmetadata.service.util.ChangeEventParser; +import org.openmetadata.service.util.JsonUtils; @Slf4j public class AlertsRuleEvaluator { @@ -67,11 +70,18 @@ public class AlertsRuleEvaluator { description = "Returns true if the change event entity being accessed has following owners from the List.", examples = {"matchAnyOwnerName('Owner1', 'Owner2')"}, paramInputType = SPECIFIC_INDEX_ELASTIC_SEARCH) - public boolean matchAnyOwnerName(String... ownerNameList) { + public boolean matchAnyOwnerName(String... ownerNameList) throws IOException { if (changeEvent == null || changeEvent.getEntity() == null) { return false; } - EntityInterface entity = (EntityInterface) changeEvent.getEntity(); + Class entityClass = Entity.getEntityClassFromType(changeEvent.getEntityType()); + EntityInterface entity; + if (changeEvent.getEntity() instanceof String) { + entity = JsonUtils.readValue((String) changeEvent.getEntity(), entityClass); + } else { + entity = JsonUtils.convertValue(changeEvent.getEntity(), entityClass); + } + EntityReference ownerReference = entity.getOwner(); if (ownerReference != null) { if (USER.equals(ownerReference.getType())) { diff --git a/openmetadata-service/src/main/resources/json/data/alerts/filterData.json b/openmetadata-service/src/main/resources/json/data/alerts/filterData.json index 5ec9294820e..74c39432761 100644 --- a/openmetadata-service/src/main/resources/json/data/alerts/filterData.json +++ b/openmetadata-service/src/main/resources/json/data/alerts/filterData.json @@ -275,6 +275,7 @@ "matchAnyOwnerName", "matchAnyEntityFqn", "matchAnyEventType", + "matchTestResult", "matchUpdatedBy" ] }, @@ -284,6 +285,7 @@ "matchAnyOwnerName", "matchAnyEntityFqn", "matchAnyEventType", + "matchTestResult", "matchUpdatedBy" ] }, diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/resources/events/AlertResourceTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/resources/events/AlertResourceTest.java index 6a9ff48b621..f4db3b25db2 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/resources/events/AlertResourceTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/resources/events/AlertResourceTest.java @@ -28,7 +28,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.http.client.HttpResponseException; import org.awaitility.Awaitility; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; @@ -55,7 +54,6 @@ import org.openmetadata.service.util.TestUtils; @Slf4j @TestMethodOrder(MethodOrderer.OrderAnnotation.class) -@Disabled public class AlertResourceTest extends EntityResourceTest { public static final TriggerConfig ALL_EVENTS_FILTER = new TriggerConfig().withType(TriggerConfig.AlertTriggerType.ALL_DATA_ASSETS); @@ -100,8 +98,8 @@ public class AlertResourceTest extends EntityResourceTest { // For the DISABLED Action Publisher are not available so it will have no status AlertActionStatus status = - getStatus(alert.getId(), genericWebhookAction.getId(), Response.Status.NO_CONTENT.getStatusCode()); - assertNull(status); + getStatus(alert.getId(), genericWebhookAction.getId(), Response.Status.OK.getStatusCode()); + assertEquals(AlertActionStatus.Status.DISABLED, status.getStatus()); WebhookCallbackResource.EventDetails details = webhookCallbackResource.getEventDetails(webhookName); assertNull(details); // @@ -148,8 +146,8 @@ public class AlertResourceTest extends EntityResourceTest { TestUtils.UpdateType.MINOR_UPDATE, change); AlertActionStatus status3 = - getStatus(alert.getId(), genericWebhookAction.getId(), Response.Status.NO_CONTENT.getStatusCode()); - assertNull(status); + getStatus(alert.getId(), genericWebhookAction.getId(), Response.Status.OK.getStatusCode()); + assertEquals(AlertActionStatus.Status.DISABLED, status3.getStatus()); int iterations = 0; while (iterations < 10) { diff --git a/openmetadata-spec/src/main/java/org/openmetadata/schema/EntityInterface.java b/openmetadata-spec/src/main/java/org/openmetadata/schema/EntityInterface.java index 7b7cbb2ecf6..eb44efe5d0d 100644 --- a/openmetadata-spec/src/main/java/org/openmetadata/schema/EntityInterface.java +++ b/openmetadata-spec/src/main/java/org/openmetadata/schema/EntityInterface.java @@ -29,6 +29,7 @@ import org.openmetadata.schema.type.TagLabel; public interface EntityInterface { // Lower case entity name to canonical entity name map Map CANONICAL_ENTITY_NAME_MAP = new HashMap<>(); + Map> ENTITY_TYPE_TO_CLASS_MAP = new HashMap<>(); UUID getId();