Alert Test Fix (#9640)

* Alert Test Fix

* Removed Db calls in case local publishers available

* Fix test

* Removed unrequired code

* Maintaing entityType to class map

* checkstyle
This commit is contained in:
Mohit Yadav 2023-01-10 12:56:10 +05:30 committed by GitHub
parent 8255e4685b
commit ea0e0e03e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 81 additions and 57 deletions

View File

@ -163,7 +163,7 @@ public final class Entity {
DAO_MAP.put(entity, dao); DAO_MAP.put(entity, dao);
ENTITY_REPOSITORY_MAP.put(entity, entityRepository); ENTITY_REPOSITORY_MAP.put(entity, entityRepository);
EntityInterface.CANONICAL_ENTITY_NAME_MAP.put(entity.toLowerCase(Locale.ROOT), entity); EntityInterface.CANONICAL_ENTITY_NAME_MAP.put(entity.toLowerCase(Locale.ROOT), entity);
EntityInterface.ENTITY_TYPE_TO_CLASS_MAP.put(entity.toLowerCase(Locale.ROOT), clazz);
ENTITY_LIST.add(entity); ENTITY_LIST.add(entity);
Collections.sort(ENTITY_LIST); Collections.sort(ENTITY_LIST);
@ -287,6 +287,10 @@ public final class Entity {
return EntityInterface.CANONICAL_ENTITY_NAME_MAP.get(object.getClass().getSimpleName().toLowerCase(Locale.ROOT)); return EntityInterface.CANONICAL_ENTITY_NAME_MAP.get(object.getClass().getSimpleName().toLowerCase(Locale.ROOT));
} }
public static Class<? extends EntityInterface> getEntityClassFromType(String entityType) {
return EntityInterface.ENTITY_TYPE_TO_CLASS_MAP.get(entityType);
}
/** /**
* Get list of all the entity field names from JsonPropertyOrder annotation from generated java class from entity.json * Get list of all the entity field names from JsonPropertyOrder annotation from generated java class from entity.json
*/ */

View File

@ -1,11 +1,10 @@
package org.openmetadata.service.alerts; package org.openmetadata.service.alerts;
import static org.openmetadata.schema.type.Relationship.CONTAINS;
import static org.openmetadata.service.Entity.ALERT;
import static org.openmetadata.service.Entity.ALERT_ACTION; import static org.openmetadata.service.Entity.ALERT_ACTION;
import com.lmax.disruptor.BatchEventProcessor; import com.lmax.disruptor.BatchEventProcessor;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -68,17 +67,6 @@ public class AlertsPublisherManager {
} }
public void addAlertActionPublisher(Alert alert, AlertAction alertAction) { public void addAlertActionPublisher(Alert alert, AlertAction alertAction) {
if (Boolean.FALSE.equals(alertAction.getEnabled())) {
// Only add alert that is enabled for publishing events
AlertActionStatus status =
new AlertActionStatus()
.withStatus(AlertActionStatus.Status.DISABLED)
.withTimestamp(System.currentTimeMillis())
.withFailureDetails(null);
alertAction.setStatusDetails(status);
return;
}
// Activity Feed AlertAction Cannot be Created // Activity Feed AlertAction Cannot be Created
if (alertAction.getAlertActionType() == AlertAction.AlertActionType.ACTIVITY_FEED) { if (alertAction.getAlertActionType() == AlertAction.AlertActionType.ACTIVITY_FEED) {
LOG.info("Activity Feed Alert Action cannot be created."); LOG.info("Activity Feed Alert Action cannot be created.");
@ -86,9 +74,19 @@ public class AlertsPublisherManager {
} }
// Create AlertAction Publisher // Create AlertAction Publisher
AlertsActionPublisher publisher = AlertUtil.getAlertPublisher(alert, alertAction, daoCollection); AlertsActionPublisher publisher = AlertUtil.getAlertPublisher(alert, alertAction, daoCollection);
if (Boolean.TRUE.equals(alertAction.getEnabled())) {
BatchEventProcessor<EventPubSub.ChangeEventHolder> processor = EventPubSub.addEventHandler(publisher); BatchEventProcessor<EventPubSub.ChangeEventHolder> processor = EventPubSub.addEventHandler(publisher);
publisher.setProcessor(processor); publisher.setProcessor(processor);
LOG.info("Alert publisher started for {}", alert.getName()); LOG.info("Alert publisher started for {}", alert.getName());
} else {
// Only add alert that is enabled for publishing events
AlertActionStatus status =
new AlertActionStatus()
.withStatus(AlertActionStatus.Status.DISABLED)
.withTimestamp(System.currentTimeMillis())
.withFailureDetails(null);
alertAction.setStatusDetails(status);
}
Map<UUID, AlertsActionPublisher> alertsActionPublisherMap = Map<UUID, AlertsActionPublisher> alertsActionPublisherMap =
alertPublisherMap.get(alert.getId()) == null ? new HashMap<>() : alertPublisherMap.get(alert.getId()); alertPublisherMap.get(alert.getId()) == null ? new HashMap<>() : alertPublisherMap.get(alert.getId());
@ -106,53 +104,64 @@ public class AlertsPublisherManager {
@SneakyThrows @SneakyThrows
public void updateAllAlertUsingAlertAction(AlertAction alertAction) { public void updateAllAlertUsingAlertAction(AlertAction alertAction) {
List<CollectionDAO.EntityRelationshipRecord> records = List<AlertsActionPublisher> publishers = getAlertPublisherFromAlertAction(alertAction.getId());
daoCollection // Avoid handling from DB
.relationshipDAO() if (publishers.size() != 0) {
.findFrom(alertAction.getId().toString(), ALERT_ACTION, CONTAINS.ordinal(), ALERT); for (AlertsActionPublisher publisher : publishers) {
EntityRepository<Alert> alertEntityRepository = Entity.getEntityRepository(ALERT); Alert alert = publisher.getAlert();
for (CollectionDAO.EntityRelationshipRecord record : records) { AlertAction action = publisher.getAlertAction();
deleteAlertAllPublishers(record.getId()); deleteAlertAllPublishers(alert.getId());
Alert alert = alertEntityRepository.get(null, record.getId(), alertEntityRepository.getFields("*")); if (action.getId().equals(alertAction.getId())) {
addAlertActionPublisher(alert, alertAction); addAlertActionPublisher(alert, alertAction);
} else {
addAlertActionPublisher(alert, action);
} }
} }
}
}
public List<AlertsActionPublisher> getAlertPublisherFromAlertAction(UUID alertActionId) {
List<AlertsActionPublisher> publisherManagers = new ArrayList<>();
for (Map.Entry<UUID, Map<UUID, AlertsActionPublisher>> alertValues : alertPublisherMap.entrySet()) {
if (alertValues.getValue().containsKey(alertActionId)) {
publisherManagers.add(alertValues.getValue().get(alertActionId));
}
}
return publisherManagers;
}
@SneakyThrows @SneakyThrows
public void deleteAlertActionFromAllAlertPublisher(AlertAction alertAction) { public void deleteAlertActionFromAllAlertPublisher(AlertAction alertAction) {
List<CollectionDAO.EntityRelationshipRecord> records = List<AlertsActionPublisher> publishers = getAlertPublisherFromAlertAction(alertAction.getId());
daoCollection // Avoid handling from DB
.relationshipDAO() if (publishers.size() != 0) {
.findFrom(alertAction.getId().toString(), ALERT_ACTION, CONTAINS.ordinal(), ALERT); for (AlertsActionPublisher alertsActionPublisher : publishers) {
for (CollectionDAO.EntityRelationshipRecord record : records) {
deleteAlertActionPublisher(record.getId(), alertAction);
}
}
public void deleteAlertActionPublisher(UUID alertId, AlertAction action) throws InterruptedException {
Map<UUID, AlertsActionPublisher> alertActionPublishers = alertPublisherMap.get(alertId);
if (alertActionPublishers != null) {
AlertsActionPublisher alertsActionPublisher = alertActionPublishers.get(action.getId());
if (alertsActionPublisher != null) { if (alertsActionPublisher != null) {
alertsActionPublisher.getProcessor().halt(); deleteProcessorFromPubSub(alertsActionPublisher);
alertsActionPublisher.awaitShutdown(); UUID alertId = alertsActionPublisher.getAlert().getId();
EventPubSub.removeProcessor(alertsActionPublisher.getProcessor()); Map<UUID, AlertsActionPublisher> alertActionPublishersMap = alertPublisherMap.get(alertId);
LOG.info("Alert publisher deleted for {}", alertsActionPublisher.getAlert().getName()); alertActionPublishersMap.remove(alertAction.getId());
alertPublisherMap.put(alertId, alertActionPublishersMap);
alertActionPublishers.remove(action.getId());
alertPublisherMap.put(alertId, alertActionPublishers);
} }
} }
} }
}
public void deleteProcessorFromPubSub(AlertsActionPublisher publisher) throws InterruptedException {
BatchEventProcessor<EventPubSub.ChangeEventHolder> processor = publisher.getProcessor();
if (processor != null) {
processor.halt();
publisher.awaitShutdown();
EventPubSub.removeProcessor(publisher.getProcessor());
LOG.info("Alert publisher deleted for {}", publisher.getAlert().getName());
}
}
public void deleteAlertAllPublishers(UUID alertId) throws InterruptedException { public void deleteAlertAllPublishers(UUID alertId) throws InterruptedException {
Map<UUID, AlertsActionPublisher> alertPublishers = alertPublisherMap.get(alertId); Map<UUID, AlertsActionPublisher> alertPublishers = alertPublisherMap.get(alertId);
if (alertPublishers != null) { if (alertPublishers != null) {
for (AlertsActionPublisher publisher : alertPublishers.values()) { for (AlertsActionPublisher publisher : alertPublishers.values()) {
publisher.getProcessor().halt(); deleteProcessorFromPubSub(publisher);
publisher.awaitShutdown();
EventPubSub.removeProcessor(publisher.getProcessor());
LOG.info("Alert publisher deleted for {}", publisher.getAlert().getName());
} }
alertPublisherMap.remove(alertId); alertPublisherMap.remove(alertId);
} }

View File

@ -8,6 +8,7 @@ import static org.openmetadata.service.Entity.TEAM;
import static org.openmetadata.service.Entity.TEST_CASE; import static org.openmetadata.service.Entity.TEST_CASE;
import static org.openmetadata.service.Entity.USER; import static org.openmetadata.service.Entity.USER;
import java.io.IOException;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -20,8 +21,10 @@ import org.openmetadata.schema.tests.type.TestCaseStatus;
import org.openmetadata.schema.type.ChangeEvent; import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.schema.type.EntityReference; import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.FieldChange; import org.openmetadata.schema.type.FieldChange;
import org.openmetadata.service.Entity;
import org.openmetadata.service.security.policyevaluator.SubjectCache; import org.openmetadata.service.security.policyevaluator.SubjectCache;
import org.openmetadata.service.util.ChangeEventParser; import org.openmetadata.service.util.ChangeEventParser;
import org.openmetadata.service.util.JsonUtils;
@Slf4j @Slf4j
public class AlertsRuleEvaluator { public class AlertsRuleEvaluator {
@ -67,11 +70,18 @@ public class AlertsRuleEvaluator {
description = "Returns true if the change event entity being accessed has following owners from the List.", description = "Returns true if the change event entity being accessed has following owners from the List.",
examples = {"matchAnyOwnerName('Owner1', 'Owner2')"}, examples = {"matchAnyOwnerName('Owner1', 'Owner2')"},
paramInputType = SPECIFIC_INDEX_ELASTIC_SEARCH) paramInputType = SPECIFIC_INDEX_ELASTIC_SEARCH)
public boolean matchAnyOwnerName(String... ownerNameList) { public boolean matchAnyOwnerName(String... ownerNameList) throws IOException {
if (changeEvent == null || changeEvent.getEntity() == null) { if (changeEvent == null || changeEvent.getEntity() == null) {
return false; return false;
} }
EntityInterface entity = (EntityInterface) changeEvent.getEntity(); Class<? extends EntityInterface> entityClass = Entity.getEntityClassFromType(changeEvent.getEntityType());
EntityInterface entity;
if (changeEvent.getEntity() instanceof String) {
entity = JsonUtils.readValue((String) changeEvent.getEntity(), entityClass);
} else {
entity = JsonUtils.convertValue(changeEvent.getEntity(), entityClass);
}
EntityReference ownerReference = entity.getOwner(); EntityReference ownerReference = entity.getOwner();
if (ownerReference != null) { if (ownerReference != null) {
if (USER.equals(ownerReference.getType())) { if (USER.equals(ownerReference.getType())) {

View File

@ -275,6 +275,7 @@
"matchAnyOwnerName", "matchAnyOwnerName",
"matchAnyEntityFqn", "matchAnyEntityFqn",
"matchAnyEventType", "matchAnyEventType",
"matchTestResult",
"matchUpdatedBy" "matchUpdatedBy"
] ]
}, },
@ -284,6 +285,7 @@
"matchAnyOwnerName", "matchAnyOwnerName",
"matchAnyEntityFqn", "matchAnyEntityFqn",
"matchAnyEventType", "matchAnyEventType",
"matchTestResult",
"matchUpdatedBy" "matchUpdatedBy"
] ]
}, },

View File

@ -28,7 +28,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.http.client.HttpResponseException; import org.apache.http.client.HttpResponseException;
import org.awaitility.Awaitility; import org.awaitility.Awaitility;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.TestInfo;
@ -55,7 +54,6 @@ import org.openmetadata.service.util.TestUtils;
@Slf4j @Slf4j
@TestMethodOrder(MethodOrderer.OrderAnnotation.class) @TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@Disabled
public class AlertResourceTest extends EntityResourceTest<Alert, CreateAlert> { public class AlertResourceTest extends EntityResourceTest<Alert, CreateAlert> {
public static final TriggerConfig ALL_EVENTS_FILTER = public static final TriggerConfig ALL_EVENTS_FILTER =
new TriggerConfig().withType(TriggerConfig.AlertTriggerType.ALL_DATA_ASSETS); new TriggerConfig().withType(TriggerConfig.AlertTriggerType.ALL_DATA_ASSETS);
@ -100,8 +98,8 @@ public class AlertResourceTest extends EntityResourceTest<Alert, CreateAlert> {
// For the DISABLED Action Publisher are not available so it will have no status // For the DISABLED Action Publisher are not available so it will have no status
AlertActionStatus status = AlertActionStatus status =
getStatus(alert.getId(), genericWebhookAction.getId(), Response.Status.NO_CONTENT.getStatusCode()); getStatus(alert.getId(), genericWebhookAction.getId(), Response.Status.OK.getStatusCode());
assertNull(status); assertEquals(AlertActionStatus.Status.DISABLED, status.getStatus());
WebhookCallbackResource.EventDetails details = webhookCallbackResource.getEventDetails(webhookName); WebhookCallbackResource.EventDetails details = webhookCallbackResource.getEventDetails(webhookName);
assertNull(details); assertNull(details);
// //
@ -148,8 +146,8 @@ public class AlertResourceTest extends EntityResourceTest<Alert, CreateAlert> {
TestUtils.UpdateType.MINOR_UPDATE, TestUtils.UpdateType.MINOR_UPDATE,
change); change);
AlertActionStatus status3 = AlertActionStatus status3 =
getStatus(alert.getId(), genericWebhookAction.getId(), Response.Status.NO_CONTENT.getStatusCode()); getStatus(alert.getId(), genericWebhookAction.getId(), Response.Status.OK.getStatusCode());
assertNull(status); assertEquals(AlertActionStatus.Status.DISABLED, status3.getStatus());
int iterations = 0; int iterations = 0;
while (iterations < 10) { while (iterations < 10) {

View File

@ -29,6 +29,7 @@ import org.openmetadata.schema.type.TagLabel;
public interface EntityInterface { public interface EntityInterface {
// Lower case entity name to canonical entity name map // Lower case entity name to canonical entity name map
Map<String, String> CANONICAL_ENTITY_NAME_MAP = new HashMap<>(); Map<String, String> CANONICAL_ENTITY_NAME_MAP = new HashMap<>();
Map<String, Class<? extends EntityInterface>> ENTITY_TYPE_TO_CLASS_MAP = new HashMap<>();
UUID getId(); UUID getId();