Add notification for Glossary Approval Task (#14438)

This commit is contained in:
Mohit Yadav 2023-12-19 14:54:54 +05:30 committed by GitHub
parent 21909802b3
commit a469d3bbe7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 20 additions and 24 deletions

View File

@ -90,6 +90,7 @@ import org.openmetadata.service.util.EntityUtil;
import org.openmetadata.service.util.EntityUtil.Fields; import org.openmetadata.service.util.EntityUtil.Fields;
import org.openmetadata.service.util.FullyQualifiedName; import org.openmetadata.service.util.FullyQualifiedName;
import org.openmetadata.service.util.JsonUtils; import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.NotificationHandler;
@Slf4j @Slf4j
public class GlossaryTermRepository extends EntityRepository<GlossaryTerm> { public class GlossaryTermRepository extends EntityRepository<GlossaryTerm> {
@ -653,6 +654,9 @@ public class GlossaryTermRepository extends EntityRepository<GlossaryTerm> {
.withUpdatedAt(System.currentTimeMillis()); .withUpdatedAt(System.currentTimeMillis());
FeedRepository feedRepository = Entity.getFeedRepository(); FeedRepository feedRepository = Entity.getFeedRepository();
feedRepository.create(thread); feedRepository.create(thread);
// Send WebSocket Notification
NotificationHandler.handleTaskNotification(thread);
} }
private void closeApprovalTask(GlossaryTerm entity, String comment) { private void closeApprovalTask(GlossaryTerm entity, String comment) {

View File

@ -17,13 +17,12 @@ import static org.openmetadata.service.Entity.TEAM;
import static org.openmetadata.service.Entity.USER; import static org.openmetadata.service.Entity.USER;
import static org.openmetadata.service.util.EmailUtil.getSmtpSettings; import static org.openmetadata.service.util.EmailUtil.getSmtpSettings;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import freemarker.template.TemplateException; import freemarker.template.TemplateException;
import java.io.IOException; import java.io.IOException;
import java.time.Instant; import java.time.Instant;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -45,11 +44,9 @@ import org.openmetadata.service.socket.WebSocketManager;
@Slf4j @Slf4j
public class NotificationHandler { public class NotificationHandler {
private final ObjectMapper mapper;
private final ExecutorService threadScheduler; private final ExecutorService threadScheduler;
public NotificationHandler() { public NotificationHandler() {
this.mapper = new ObjectMapper();
this.threadScheduler = Executors.newFixedThreadPool(1); this.threadScheduler = Executors.newFixedThreadPool(1);
} }
@ -57,33 +54,29 @@ public class NotificationHandler {
threadScheduler.submit( threadScheduler.submit(
() -> { () -> {
try { try {
CollectionDAO collectionDAO = Entity.getCollectionDAO(); handleNotifications(responseContext);
handleNotifications(responseContext, collectionDAO);
} catch (Exception ex) { } catch (Exception ex) {
LOG.error("[NotificationHandler] Failed to use mapper in converting to Json", ex); LOG.error("[NotificationHandler] Failed to use mapper in converting to Json", ex);
} }
}); });
} }
private void handleNotifications( private void handleNotifications(ContainerResponseContext responseContext) {
ContainerResponseContext responseContext, CollectionDAO collectionDAO)
throws JsonProcessingException {
int responseCode = responseContext.getStatus(); int responseCode = responseContext.getStatus();
if (responseCode == Response.Status.CREATED.getStatusCode() if (responseCode == Response.Status.CREATED.getStatusCode()
&& responseContext.getEntity() != null && responseContext.getEntity() != null
&& responseContext.getEntity().getClass().equals(Thread.class)) { && responseContext.getEntity().getClass().equals(Thread.class)) {
Thread thread = (Thread) responseContext.getEntity(); Thread thread = (Thread) responseContext.getEntity();
switch (thread.getType()) { switch (thread.getType()) {
case Task -> handleTaskNotification(thread, collectionDAO); case Task -> handleTaskNotification(thread);
case Conversation -> handleConversationNotification(thread, collectionDAO); case Conversation -> handleConversationNotification(thread);
case Announcement -> handleAnnouncementNotification(thread); case Announcement -> handleAnnouncementNotification(thread);
} }
} }
} }
private void handleTaskNotification(Thread thread, CollectionDAO collectionDAO) public static void handleTaskNotification(Thread thread) {
throws JsonProcessingException { String jsonThread = JsonUtils.pojoToJson(thread);
String jsonThread = mapper.writeValueAsString(thread);
if (thread.getPostsCount() == 0) { if (thread.getPostsCount() == 0) {
List<EntityReference> assignees = thread.getTask().getAssignees(); List<EntityReference> assignees = thread.getTask().getAssignees();
HashSet<UUID> receiversList = new HashSet<>(); HashSet<UUID> receiversList = new HashSet<>();
@ -94,7 +87,7 @@ public class NotificationHandler {
} else if (Entity.TEAM.equals(e.getType())) { } else if (Entity.TEAM.equals(e.getType())) {
// fetch all that are there in the team // fetch all that are there in the team
List<CollectionDAO.EntityRelationshipRecord> records = List<CollectionDAO.EntityRelationshipRecord> records =
collectionDAO Entity.getCollectionDAO()
.relationshipDAO() .relationshipDAO()
.findTo(e.getId(), TEAM, Relationship.HAS.ordinal(), Entity.USER); .findTo(e.getId(), TEAM, Relationship.HAS.ordinal(), Entity.USER);
records.forEach(eRecord -> receiversList.add(eRecord.getId())); records.forEach(eRecord -> receiversList.add(eRecord.getId()));
@ -111,8 +104,8 @@ public class NotificationHandler {
} }
} }
private void handleAnnouncementNotification(Thread thread) throws JsonProcessingException { private void handleAnnouncementNotification(Thread thread) {
String jsonThread = mapper.writeValueAsString(thread); String jsonThread = JsonUtils.pojoToJson(thread);
AnnouncementDetails announcementDetails = thread.getAnnouncement(); AnnouncementDetails announcementDetails = thread.getAnnouncement();
Long currentTimestamp = Instant.now().getEpochSecond(); Long currentTimestamp = Instant.now().getEpochSecond();
if (announcementDetails.getStartTime() <= currentTimestamp if (announcementDetails.getStartTime() <= currentTimestamp
@ -122,9 +115,8 @@ public class NotificationHandler {
} }
} }
private void handleConversationNotification(Thread thread, CollectionDAO collectionDAO) private void handleConversationNotification(Thread thread) {
throws JsonProcessingException { String jsonThread = JsonUtils.pojoToJson(thread);
String jsonThread = mapper.writeValueAsString(thread);
WebSocketManager.getInstance() WebSocketManager.getInstance()
.broadCastMessageToAll(WebSocketManager.FEED_BROADCAST_CHANNEL, jsonThread); .broadCastMessageToAll(WebSocketManager.FEED_BROADCAST_CHANNEL, jsonThread);
List<MessageParser.EntityLink> mentions; List<MessageParser.EntityLink> mentions;
@ -138,14 +130,14 @@ public class NotificationHandler {
entityLink -> { entityLink -> {
String fqn = entityLink.getEntityFQN(); String fqn = entityLink.getEntityFQN();
if (USER.equals(entityLink.getEntityType())) { if (USER.equals(entityLink.getEntityType())) {
User user = collectionDAO.userDAO().findEntityByName(fqn); User user = Entity.getCollectionDAO().userDAO().findEntityByName(fqn);
WebSocketManager.getInstance() WebSocketManager.getInstance()
.sendToOne(user.getId(), WebSocketManager.MENTION_CHANNEL, jsonThread); .sendToOne(user.getId(), WebSocketManager.MENTION_CHANNEL, jsonThread);
} else if (TEAM.equals(entityLink.getEntityType())) { } else if (TEAM.equals(entityLink.getEntityType())) {
Team team = collectionDAO.teamDAO().findEntityByName(fqn); Team team = Entity.getCollectionDAO().teamDAO().findEntityByName(fqn);
// fetch all that are there in the team // fetch all that are there in the team
List<CollectionDAO.EntityRelationshipRecord> records = List<CollectionDAO.EntityRelationshipRecord> records =
collectionDAO Entity.getCollectionDAO()
.relationshipDAO() .relationshipDAO()
.findTo(team.getId(), TEAM, Relationship.HAS.ordinal(), USER); .findTo(team.getId(), TEAM, Relationship.HAS.ordinal(), USER);
// Notify on WebSocket for Realtime // Notify on WebSocket for Realtime
@ -155,7 +147,7 @@ public class NotificationHandler {
}); });
} }
private void handleEmailNotifications(HashSet<UUID> userList, Thread thread) { public static void handleEmailNotifications(Set<UUID> userList, Thread thread) {
UserRepository repository = (UserRepository) Entity.getEntityRepository(USER); UserRepository repository = (UserRepository) Entity.getEntityRepository(USER);
userList.forEach( userList.forEach(
id -> { id -> {