[Backend][Task notification] (#5717)

* [Backend][WebSocket] Add taskNotifications #5638[WIP for team assigned a task]

* [Backend][WebSocket] Reformat sources

* Update catalog-rest-service/src/main/java/org/openmetadata/catalog/socket/WebSocketManager.java

Co-authored-by: Vivek Ratnavel Subramanian <vivekratnavel90@gmail.com>

* [Backend][WebSocket] Task Notification to all Users when added to a team

* [Backend][WebSocket] Remove Commented part

* [Backend][TaskNotification] reformat source

* [Backend][taskNotifications]] Allow multiple connections for one user

* [Backend][taskNotifications]] reformat

Co-authored-by: Sriharsha Chintalapani <harshach@users.noreply.github.com>
Co-authored-by: Vivek Ratnavel Subramanian <vivekratnavel90@gmail.com>
This commit is contained in:
mohitdeuex 2022-06-28 23:51:16 +05:30 committed by GitHub
parent e026d625d6
commit 3132b6f28c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -5,9 +5,7 @@ import io.socket.engineio.server.EngineIoServerOptions;
import io.socket.socketio.server.SocketIoNamespace; import io.socket.socketio.server.SocketIoNamespace;
import io.socket.socketio.server.SocketIoServer; import io.socket.socketio.server.SocketIoServer;
import io.socket.socketio.server.SocketIoSocket; import io.socket.socketio.server.SocketIoSocket;
import java.util.List; import java.util.*;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -19,8 +17,7 @@ public class WebSocketManager {
private final SocketIoServer mSocketIoServer; private final SocketIoServer mSocketIoServer;
public static final String feedBroadcastChannel = "activityFeed"; public static final String feedBroadcastChannel = "activityFeed";
public static final String taskBroadcastChannel = "taskChannel"; public static final String taskBroadcastChannel = "taskChannel";
private final Map<UUID, Map<String, SocketIoSocket>> activityFeedEndpoints = new ConcurrentHashMap<>();
private final Map<UUID, SocketIoSocket> activityFeedEndpoints = new ConcurrentHashMap<>();
private WebSocketManager(EngineIoServerOptions eiOptions) { private WebSocketManager(EngineIoServerOptions eiOptions) {
mEngineIoServer = new EngineIoServer(eiOptions); mEngineIoServer = new EngineIoServer(eiOptions);
@ -50,14 +47,22 @@ public class WebSocketManager {
"disconnect", "disconnect",
args1 -> { args1 -> {
LOG.info( LOG.info(
"Client :" "Client from:"
+ userId + userId
+ "with Remote Address :" + "with Remote Address :"
+ socket.getInitialHeaders().get("RemoteAddress") + socket.getInitialHeaders().get("RemoteAddress")
+ " disconnected."); + " disconnected.");
activityFeedEndpoints.remove(UUID.fromString(userId)); UUID id = UUID.fromString(userId);
Map<String, SocketIoSocket> allUserConnection = activityFeedEndpoints.get(id);
allUserConnection.remove(socket.getId());
activityFeedEndpoints.put(id, allUserConnection);
}); });
activityFeedEndpoints.put(UUID.fromString(userId), socket); UUID id = UUID.fromString(userId);
Map<String, SocketIoSocket> userSocketConnections;
userSocketConnections =
activityFeedEndpoints.containsKey(id) ? activityFeedEndpoints.get(id) : new HashMap<>();
userSocketConnections.put(socket.getId(), socket);
activityFeedEndpoints.put(id, userSocketConnections);
} }
}); });
} }
@ -74,26 +79,27 @@ public class WebSocketManager {
return mEngineIoServer; return mEngineIoServer;
} }
public Map<UUID, SocketIoSocket> getActivityFeedEndpoints() { public Map<UUID, Map<String, SocketIoSocket>> getActivityFeedEndpoints() {
return activityFeedEndpoints; return activityFeedEndpoints;
} }
public void broadCastMessageToAll(String event, String message) { public void broadCastMessageToAll(String event, String message) {
activityFeedEndpoints.forEach((key, value) -> value.send(event, message)); activityFeedEndpoints.forEach(
(key, value) -> {
value.forEach((key1, value1) -> value1.send(event, message));
});
} }
public void sendToOne(UUID receiver, String event, String message) { public void sendToOne(UUID receiver, String event, String message) {
if (activityFeedEndpoints.containsKey(receiver)) { if (activityFeedEndpoints.containsKey(receiver)) {
activityFeedEndpoints.get(receiver).send(event, message); activityFeedEndpoints.get(receiver).forEach((key, value) -> value.send(event, message));
} }
} }
public void sendToManyWithUUID(List<UUID> receivers, String event, String message) { public void sendToManyWithUUID(List<UUID> receivers, String event, String message) {
receivers.forEach( receivers.forEach(
(e) -> { (e) -> {
if (activityFeedEndpoints.containsKey(e)) { sendToOne(e, event, message);
activityFeedEndpoints.get(e).send(event, message);
}
}); });
} }
@ -101,9 +107,7 @@ public class WebSocketManager {
receivers.forEach( receivers.forEach(
(e) -> { (e) -> {
UUID key = UUID.fromString(e); UUID key = UUID.fromString(e);
if (activityFeedEndpoints.containsKey(key)) { sendToOne(key, event, message);
activityFeedEndpoints.get(key).send(event, message);
}
}); });
} }