Fix Alert Intermittent Issues for some entities (#12189)

* Fix Alert Intermittent Issues for some entities

* Fix Failing test for disabled publisher

* Fix Failing test due to deleted entity

* Fix Tests

---------

Co-authored-by: Sriharsha Chintalapani <harshach@users.noreply.github.com>
This commit is contained in:
Mohit Yadav 2023-06-30 07:30:54 +05:30 committed by GitHub
parent 41f6945cfc
commit 6410fcbea4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 78 additions and 60 deletions

View File

@ -159,7 +159,7 @@ echo "✔running reindexing"
curl 'http://localhost:8585/api/v1/search/reindex' \
-H 'Authorization: Bearer eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg' \
-H 'Content-Type: application/json' \
--data-raw '{"recreateIndex":true,"entities":["table","topic","dashboard","pipeline","mlmodel","user","team","glossaryTerm","tag","entityReportData","webAnalyticEntityViewReportData","webAnalyticUserActivityReportData","container","query"],"batchSize":10,"searchIndexMappingLanguage":"EN","runMode":"batch","publisherType":"elasticSearch"}' \
--data-raw '{"recreateIndex":true,"entities":["table","topic","dashboard","pipeline","mlmodel","user","team","glossaryTerm","tag","entityReportData","webAnalyticEntityViewReportData","webAnalyticUserActivityReportData","container","query", "testCase"],"batchSize":10,"searchIndexMappingLanguage":"EN","runMode":"batch","publisherType":"elasticSearch"}' \
--compressed
sleep 60 # Sleep for 60 seconds to make sure the elasticsearch reindexing from UI finishes
tput setaf 2

View File

@ -14,6 +14,7 @@
package org.openmetadata.service.events;
import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
import static org.openmetadata.service.events.subscription.AlertsRuleEvaluator.getEntity;
import static org.openmetadata.service.formatter.util.FormatterUtil.getChangeEventFromResponseContext;
import com.fasterxml.jackson.databind.ObjectMapper;
@ -84,11 +85,11 @@ public class ChangeEventHandler implements EventHandler {
for (Thread thread : listOrEmpty(FeedUtils.getThreads(changeEvent, loggedInUserName))) {
// Don't create a thread if there is no message
if (thread.getMessage() != null && !thread.getMessage().isEmpty()) {
feedDao.create(thread);
feedDao.create(thread, responseContext);
String jsonThread = mapper.writeValueAsString(thread);
WebSocketManager.getInstance().broadCastMessageToAll(WebSocketManager.FEED_BROADCAST_CHANNEL, jsonThread);
if (changeEvent.getEventType().equals(EventType.ENTITY_DELETED)) {
deleteAllConversationsRelatedToEntity((EntityInterface) changeEvent.getEntity());
deleteAllConversationsRelatedToEntity(getEntity(changeEvent));
}
}
}

View File

@ -241,12 +241,16 @@ public class AlertsRuleEvaluator {
public static EntityInterface getEntity(ChangeEvent event) throws IOException {
Class<? extends EntityInterface> entityClass = Entity.getEntityClassFromType(event.getEntityType());
EntityInterface entity;
if (event.getEntity() instanceof String) {
entity = JsonUtils.readValue((String) event.getEntity(), entityClass);
} else {
entity = JsonUtils.convertValue(event.getEntity(), entityClass);
if (entityClass != null) {
EntityInterface entity;
if (event.getEntity() instanceof String) {
entity = JsonUtils.readValue((String) event.getEntity(), entityClass);
} else {
entity = JsonUtils.convertValue(event.getEntity(), entityClass);
}
return entity;
}
return entity;
throw new IllegalArgumentException(
String.format("Change Event Data Asset is not an entity %s", JsonUtils.pojoToJson(event.getEntity())));
}
}

View File

@ -79,21 +79,8 @@ public class SubscriptionPublisher extends AbstractAlertPublisher {
return eventSubscription;
}
public synchronized void updateEventSubscription(EventSubscription updatedEventSub) {
currentBackoffTime = BACKOFF_NORMAL;
eventSubscription.setDescription(updatedEventSub.getDescription());
eventSubscription.setTimeout(updatedEventSub.getTimeout());
eventSubscription.setBatchSize(updatedEventSub.getBatchSize());
eventSubscription.setFilteringRules(updatedEventSub.getFilteringRules());
eventSubscription.setSubscriptionType(updatedEventSub.getSubscriptionType());
eventSubscription.setSubscriptionConfig(updatedEventSub.getSubscriptionConfig());
}
public synchronized void setErrorStatus(Long attemptTime, Integer statusCode, String reason)
throws InterruptedException {
SubscriptionStatus status = setStatus(FAILED, attemptTime, statusCode, reason, null);
eventSubscriptionRepository.removeProcessorForEventSubscription(eventSubscription.getId(), status);
throw new RuntimeException(reason);
public synchronized void setErrorStatus(Long attemptTime, Integer statusCode, String reason) {
setStatus(FAILED, attemptTime, statusCode, reason, null);
}
public synchronized void setAwaitingRetry(Long attemptTime, int statusCode, String reason) {

View File

@ -14,9 +14,11 @@
package org.openmetadata.service.events.subscription.email;
import static org.openmetadata.schema.api.events.CreateEventSubscription.SubscriptionType.EMAIL;
import static org.openmetadata.service.events.subscription.AlertsRuleEvaluator.getEntity;
import static org.openmetadata.service.util.SubscriptionUtil.buildReceiversListFromActions;
import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
@ -61,7 +63,7 @@ public class EmailPublisher extends SubscriptionPublisher {
}
@Override
public void sendAlert(EventResource.EventList list) throws InterruptedException, JsonProcessingException {
public void sendAlert(EventResource.EventList list) throws JsonProcessingException {
for (ChangeEvent event : list.getData()) {
try {
Set<String> receivers = buildReceiversList(event);
@ -79,10 +81,10 @@ public class EmailPublisher extends SubscriptionPublisher {
}
}
private Set<String> buildReceiversList(ChangeEvent changeEvent) {
private Set<String> buildReceiversList(ChangeEvent changeEvent) throws IOException {
Set<String> receiverList =
emailAlertConfig.getReceivers() == null ? new HashSet<>() : emailAlertConfig.getReceivers();
EntityInterface entityInterface = (EntityInterface) changeEvent.getEntity();
EntityInterface entityInterface = getEntity(changeEvent);
receiverList.addAll(
buildReceiversListFromActions(
emailAlertConfig, EMAIL, daoCollection, entityInterface.getId(), changeEvent.getEntityType()));

View File

@ -13,12 +13,13 @@
package org.openmetadata.service.formatter.decorators;
import static org.openmetadata.service.events.subscription.AlertsRuleEvaluator.getEntity;
import static org.openmetadata.service.formatter.util.FormatterUtil.getFormattedMessages;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.service.ChangeEventConfig;
import org.openmetadata.service.Entity;
@ -63,7 +64,7 @@ public class EmailMessageDecorator implements MessageDecorator<EmailMessage> {
}
@Override
public EmailMessage buildMessage(ChangeEvent event) {
public EmailMessage buildMessage(ChangeEvent event) throws IOException {
EmailMessage emailMessage = new EmailMessage();
emailMessage.setUserName(event.getUserName());
if (event.getEntity() != null) {
@ -75,7 +76,7 @@ public class EmailMessageDecorator implements MessageDecorator<EmailMessage> {
}
}
Map<MessageParser.EntityLink, String> messages =
getFormattedMessages(this, event.getChangeDescription(), (EntityInterface) event.getEntity());
getFormattedMessages(this, event.getChangeDescription(), getEntity(event));
List<String> changeMessage = new ArrayList<>();
for (Map.Entry<MessageParser.EntityLink, String> entry : messages.entrySet()) {
changeMessage.add(entry.getValue());

View File

@ -13,12 +13,13 @@
package org.openmetadata.service.formatter.decorators;
import static org.openmetadata.service.events.subscription.AlertsRuleEvaluator.getEntity;
import static org.openmetadata.service.formatter.util.FormatterUtil.getFormattedMessages;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.service.ChangeEventConfig;
import org.openmetadata.service.events.subscription.gchat.GChatMessage;
@ -64,7 +65,7 @@ public class GChatMessageDecorator implements MessageDecorator<GChatMessage> {
}
@Override
public GChatMessage buildMessage(ChangeEvent event) {
public GChatMessage buildMessage(ChangeEvent event) throws IOException {
GChatMessage gChatMessage = new GChatMessage();
GChatMessage.CardsV2 cardsV2 = new GChatMessage.CardsV2();
GChatMessage.Card card = new GChatMessage.Card();
@ -80,16 +81,12 @@ public class GChatMessageDecorator implements MessageDecorator<GChatMessage> {
gChatMessage.setText(headerText);
GChatMessage.CardHeader cardHeader = new GChatMessage.CardHeader();
String cardHeaderText =
String.format(
headerTemplate,
event.getUserName(),
event.getEntityType(),
((EntityInterface) event.getEntity()).getName());
String.format(headerTemplate, event.getUserName(), event.getEntityType(), (getEntity(event)).getName());
cardHeader.setTitle(cardHeaderText);
card.setHeader(cardHeader);
}
Map<MessageParser.EntityLink, String> messages =
getFormattedMessages(this, event.getChangeDescription(), (EntityInterface) event.getEntity());
getFormattedMessages(this, event.getChangeDescription(), getEntity(event));
List<GChatMessage.Widget> widgets = new ArrayList<>();
for (Map.Entry<MessageParser.EntityLink, String> entry : messages.entrySet()) {
GChatMessage.Widget widget = new GChatMessage.Widget();

View File

@ -13,12 +13,13 @@
package org.openmetadata.service.formatter.decorators;
import static org.openmetadata.service.events.subscription.AlertsRuleEvaluator.getEntity;
import static org.openmetadata.service.formatter.util.FormatterUtil.getFormattedMessages;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.service.ChangeEventConfig;
import org.openmetadata.service.events.subscription.msteams.TeamsMessage;
@ -62,7 +63,7 @@ public class MSTeamsMessageDecorator implements MessageDecorator<TeamsMessage> {
}
@Override
public TeamsMessage buildMessage(ChangeEvent event) {
public TeamsMessage buildMessage(ChangeEvent event) throws IOException {
TeamsMessage teamsMessage = new TeamsMessage();
teamsMessage.setSummary("Change Event From OMD");
TeamsMessage.Section teamsSections = new TeamsMessage.Section();
@ -76,7 +77,7 @@ public class MSTeamsMessageDecorator implements MessageDecorator<TeamsMessage> {
teamsSections.setActivityTitle(headerText);
}
Map<MessageParser.EntityLink, String> messages =
getFormattedMessages(this, event.getChangeDescription(), (EntityInterface) event.getEntity());
getFormattedMessages(this, event.getChangeDescription(), getEntity(event));
List<TeamsMessage.Section> attachmentList = new ArrayList<>();
for (Map.Entry<MessageParser.EntityLink, String> entry : messages.entrySet()) {
TeamsMessage.Section section = new TeamsMessage.Section();

View File

@ -13,6 +13,7 @@
package org.openmetadata.service.formatter.decorators;
import java.io.IOException;
import java.util.LinkedList;
import org.apache.commons.lang3.StringUtils;
import org.bitbucket.cowwoc.diffmatchpatch.DiffMatchPatch;
@ -41,7 +42,7 @@ public interface MessageDecorator<T> {
return "<!remove>";
}
T buildMessage(ChangeEvent event);
T buildMessage(ChangeEvent event) throws IOException;
default String getPlaintextDiff(String oldValue, String newValue) {
// create a configured DiffRowGenerator

View File

@ -13,12 +13,13 @@
package org.openmetadata.service.formatter.decorators;
import static org.openmetadata.service.events.subscription.AlertsRuleEvaluator.getEntity;
import static org.openmetadata.service.formatter.util.FormatterUtil.getFormattedMessages;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.tests.TestCase;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.service.ChangeEventConfig;
@ -66,7 +67,7 @@ public class SlackMessageDecorator implements MessageDecorator<SlackMessage> {
}
@Override
public SlackMessage buildMessage(ChangeEvent event) {
public SlackMessage buildMessage(ChangeEvent event) throws IOException {
SlackMessage slackMessage = new SlackMessage();
slackMessage.setUsername(event.getUserName());
if (event.getEntity() != null) {
@ -92,7 +93,7 @@ public class SlackMessageDecorator implements MessageDecorator<SlackMessage> {
slackMessage.setText(headerText);
}
Map<MessageParser.EntityLink, String> messages =
getFormattedMessages(this, event.getChangeDescription(), (EntityInterface) event.getEntity());
getFormattedMessages(this, event.getChangeDescription(), getEntity(event));
List<SlackAttachment> attachmentList = new ArrayList<>();
for (Map.Entry<MessageParser.EntityLink, String> entry : messages.entrySet()) {
SlackAttachment attachment = new SlackAttachment();

View File

@ -144,15 +144,8 @@ public class EventSubscriptionRepository extends EntityRepository<EventSubscript
}
// Update the existing publisher
SubscriptionStatus.Status status = previousPublisher.getEventSubscription().getStatusDetails().getStatus();
previousPublisher.updateEventSubscription(eventSubscription);
if (status != SubscriptionStatus.Status.ACTIVE && status != SubscriptionStatus.Status.AWAITING_RETRY) {
// Restart the previously stopped publisher (in states notStarted, error, retryLimitReached)
BatchEventProcessor<EventPubSub.ChangeEventHolder> processor =
EventPubSub.addEventHandler(previousPublisher);
previousPublisher.setProcessor(processor);
LOG.info("Webhook publisher restarted for {}", eventSubscription.getName());
}
deleteEventSubscriptionPublisher(eventSubscription);
addSubscriptionPublisher(eventSubscription);
} else {
// Remove the webhook publisher
removeProcessorForEventSubscription(
@ -184,7 +177,7 @@ public class EventSubscriptionRepository extends EntityRepository<EventSubscript
switch (deletedEntity.getAlertType()) {
case CHANGE_EVENT:
SubscriptionPublisher publisher = subscriptionPublisherMap.remove(deletedEntity.getId());
if (publisher != null) {
if (publisher != null && publisher.getProcessor() != null) {
publisher.getProcessor().halt();
publisher.awaitShutdown();
EventPubSub.removeProcessor(publisher.getProcessor());

View File

@ -42,6 +42,7 @@ import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.json.JsonPatch;
import javax.ws.rs.container.ContainerResponseContext;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.SecurityContext;
import javax.ws.rs.core.UriInfo;
@ -58,6 +59,7 @@ import org.openmetadata.schema.api.feed.ThreadCount;
import org.openmetadata.schema.entity.feed.Thread;
import org.openmetadata.schema.entity.teams.User;
import org.openmetadata.schema.type.AnnouncementDetails;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.Include;
import org.openmetadata.schema.type.Post;
@ -129,10 +131,13 @@ public class FeedRepository {
field = "id";
}
EntityInterface aboutEntity = Entity.getEntity(about, field, ALL);
;
thread.withEntityId(aboutEntity.getId()); // Add entity id to thread
EntityReference entityOwner = aboutEntity.getOwner();
return createThread(thread, about, aboutEntity.getOwner());
}
@Transaction
private Thread createThread(Thread thread, EntityLink about, EntityReference entityOwner)
throws JsonProcessingException {
// Validate user creating thread
User createdByUser = SubjectCache.getInstance().getUser(thread.getCreatedBy());
@ -147,7 +152,7 @@ public class FeedRepository {
List<String> announcements =
dao.feedDAO()
.listAnnouncementBetween(thread.getId().toString(), thread.getEntityId().toString(), startTime, endTime);
if (announcements.size() > 0) {
if (!announcements.isEmpty()) {
// There is already an announcement that overlaps the new one
throw new IllegalArgumentException(ANNOUNCEMENT_OVERLAP);
}
@ -183,6 +188,28 @@ public class FeedRepository {
return thread;
}
@Transaction
public Thread create(Thread thread, ContainerResponseContext responseContext) throws IOException {
// Validate about data entity is valid and get the owner for that entity
EntityInterface entity;
// In case of ENTITY_FIELDS_CHANGED entity from responseContext will be a ChangeEvent
if (responseContext.getEntity() instanceof ChangeEvent) {
ChangeEvent change = (ChangeEvent) responseContext.getEntity();
entity = (EntityInterface) change.getEntity();
} else {
entity = (EntityInterface) responseContext.getEntity();
}
EntityReference owner = null;
try {
owner = Entity.getOwner(entity.getEntityReference());
} catch (Exception ignored) {
// Either deleted or owner field not available
}
EntityLink about = EntityLink.parse(thread.getAbout());
thread.withEntityId(entity.getId()); // Add entity id to thread
return createThread(thread, about, owner);
}
public Thread get(String id) throws IOException {
Thread thread = EntityUtil.validate(id, dao.feedDAO().findById(id), Thread.class);
sortPosts(thread);

View File

@ -15,7 +15,9 @@ package org.openmetadata.service.util;
import static org.openmetadata.service.Entity.TEAM;
import static org.openmetadata.service.Entity.USER;
import static org.openmetadata.service.events.subscription.AlertsRuleEvaluator.getEntity;
import java.io.IOException;
import java.time.Period;
import java.time.ZoneId;
import java.util.ArrayList;
@ -200,8 +202,9 @@ public class SubscriptionUtil {
CreateEventSubscription.SubscriptionType type,
Client client,
CollectionDAO daoCollection,
ChangeEvent event) {
EntityInterface entityInterface = (EntityInterface) event.getEntity();
ChangeEvent event)
throws IOException {
EntityInterface entityInterface = getEntity(event);
List<Invocation.Builder> targets = new ArrayList<>();
Set<String> receiversUrls =
buildReceiversListFromActions(action, type, daoCollection, entityInterface.getId(), event.getEntityType());