diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/ChangeEventHandler.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/ChangeEventHandler.java index 9a33e62be7b..60786c263ab 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/ChangeEventHandler.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/events/ChangeEventHandler.java @@ -22,6 +22,7 @@ import static org.openmetadata.common.utils.CommonUtil.listOrEmpty; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -182,7 +183,12 @@ public class ChangeEventHandler implements EventHandler { }); break; case Announcement: - default: + AnnouncementDetails announcementDetails = thread.getAnnouncement(); + Long currentTimestamp = Instant.now().getEpochSecond(); + if (announcementDetails.getStartTime() <= currentTimestamp + && currentTimestamp <= announcementDetails.getEndTime()) { + WebSocketManager.getInstance().broadCastMessageToAll(WebSocketManager.ANNOUNCEMENT_CHANNEL, jsonThread); + } break; } } catch (JsonProcessingException e) { diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/socket/WebSocketManager.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/socket/WebSocketManager.java index 2ebcbb05138..38edb0c757d 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/socket/WebSocketManager.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/socket/WebSocketManager.java @@ -21,6 +21,7 @@ public class WebSocketManager { public static final String FEED_BROADCAST_CHANNEL = "activityFeed"; public static final String TASK_BROADCAST_CHANNEL = "taskChannel"; public static final String MENTION_CHANNEL = "mentionChannel"; + public static final String ANNOUNCEMENT_CHANNEL = "announcementChannel"; private final Map> activityFeedEndpoints = new ConcurrentHashMap<>(); private WebSocketManager(EngineIoServerOptions eiOptions) { @@ -54,11 +55,15 @@ public class WebSocketManager { allUserConnection.remove(socket.getId()); activityFeedEndpoints.put(id, allUserConnection); }); + + // On Socket Connection Error socket.on( "connect_error", args1 -> LOG.error( "Connection ERROR for user:{} with Remote Address:{} disconnected", userId, remoteAddress)); + + // On Socket Connection Failure socket.on( "connect_failed", args1 ->