Added announcement channel (#6604)

* Added announcement channel

* Added announcement channel

* Addressing comments
This commit is contained in:
Parth Panchal 2022-08-09 10:43:08 +05:30 committed by GitHub
parent 9012bf7f06
commit 555962dfd3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 12 additions and 1 deletions

View File

@ -22,6 +22,7 @@ import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -182,7 +183,12 @@ public class ChangeEventHandler implements EventHandler {
});
break;
case Announcement:
default:
AnnouncementDetails announcementDetails = thread.getAnnouncement();
Long currentTimestamp = Instant.now().getEpochSecond();
if (announcementDetails.getStartTime() <= currentTimestamp
&& currentTimestamp <= announcementDetails.getEndTime()) {
WebSocketManager.getInstance().broadCastMessageToAll(WebSocketManager.ANNOUNCEMENT_CHANNEL, jsonThread);
}
break;
}
} catch (JsonProcessingException e) {

View File

@ -21,6 +21,7 @@ public class WebSocketManager {
public static final String FEED_BROADCAST_CHANNEL = "activityFeed";
public static final String TASK_BROADCAST_CHANNEL = "taskChannel";
public static final String MENTION_CHANNEL = "mentionChannel";
public static final String ANNOUNCEMENT_CHANNEL = "announcementChannel";
private final Map<UUID, Map<String, SocketIoSocket>> activityFeedEndpoints = new ConcurrentHashMap<>();
private WebSocketManager(EngineIoServerOptions eiOptions) {
@ -54,11 +55,15 @@ public class WebSocketManager {
allUserConnection.remove(socket.getId());
activityFeedEndpoints.put(id, allUserConnection);
});
// On Socket Connection Error
socket.on(
"connect_error",
args1 ->
LOG.error(
"Connection ERROR for user:{} with Remote Address:{} disconnected", userId, remoteAddress));
// On Socket Connection Failure
socket.on(
"connect_failed",
args1 ->