AppFlowy/backend/src/service/ws/ws_client.rs

148 lines
5.5 KiB
Rust
Raw Normal View History

2021-09-10 16:22:38 +08:00
use crate::{
config::{HEARTBEAT_INTERVAL, PING_TIMEOUT},
2021-09-23 16:10:24 +08:00
service::ws::{
2021-09-10 16:22:38 +08:00
entities::{Connect, Disconnect, SessionId},
ClientMessage,
MessageData,
WSServer,
},
};
2021-09-19 23:21:10 +08:00
use actix::*;
use actix_web_actors::{ws, ws::Message::Text};
use std::time::Instant;
2021-08-19 08:36:30 +08:00
2021-09-19 23:21:10 +08:00
// Frontend │ Backend
//
// │
// ┌──────────┐ WsMessage ┌───────────┐ ClientMessage ┌──────────┐
// │ user 1 │─────────┼────▶│ws_client_1│──────────────────▶│ws_server │
// └──────────┘ └───────────┘ └──────────┘
// │ │
// WsMessage ▼
// ┌──────────┐ │ ┌───────────┐ ClientMessage Group
// │ user 2 │◀──────────────│ws_client_2│◀───────┐ ┌───────────────┐
// └──────────┘ │ └───────────┘ │ │ ws_user_1 │
// │ │ │
// │ └────────│ ws_user_2 │
// ┌──────────┐ ┌───────────┐ │ │
// │ user 3 │─────────┼────▶│ws_client_3│ └───────────────┘
// └──────────┘ └───────────┘
// │
2021-08-19 14:08:24 +08:00
pub struct WSClient {
2021-09-19 23:21:10 +08:00
session_id: SessionId,
2021-08-19 08:36:30 +08:00
server: Addr<WSServer>,
hb: Instant,
}
2021-08-19 14:08:24 +08:00
impl WSClient {
2021-09-19 23:21:10 +08:00
pub fn new<T: Into<SessionId>>(session_id: T, server: Addr<WSServer>) -> Self {
2021-08-19 08:36:30 +08:00
Self {
2021-09-19 23:21:10 +08:00
session_id: session_id.into(),
2021-08-19 08:36:30 +08:00
hb: Instant::now(),
server,
}
}
fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
2021-09-19 23:21:10 +08:00
ctx.run_interval(HEARTBEAT_INTERVAL, |client, ctx| {
if Instant::now().duration_since(client.hb) > PING_TIMEOUT {
client.server.do_send(Disconnect {
sid: client.session_id.clone(),
2021-08-19 08:36:30 +08:00
});
ctx.stop();
2021-09-19 23:21:10 +08:00
} else {
ctx.ping(b"");
2021-08-19 08:36:30 +08:00
}
});
}
2021-08-19 14:08:24 +08:00
fn send(&self, data: MessageData) {
2021-09-19 23:21:10 +08:00
let msg = ClientMessage::new(self.session_id.clone(), data);
2021-08-19 14:08:24 +08:00
self.server.do_send(msg);
}
}
impl Actor for WSClient {
type Context = ws::WebsocketContext<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
2021-08-19 08:36:30 +08:00
self.hb(ctx);
let socket = ctx.address().recipient();
let connect = Connect {
socket,
2021-09-19 23:21:10 +08:00
sid: self.session_id.clone(),
2021-08-19 08:36:30 +08:00
};
self.server
.send(connect)
.into_actor(self)
2021-09-20 15:38:55 +08:00
.then(|res, _client, _ctx| {
2021-08-19 08:36:30 +08:00
match res {
2021-09-19 23:21:10 +08:00
Ok(Ok(_)) => log::trace!("Send connect message to server success"),
Ok(Err(e)) => log::error!("Send connect message to server failed: {:?}", e),
Err(e) => log::error!("Send connect message to server failed: {:?}", e),
2021-08-19 08:36:30 +08:00
}
fut::ready(())
})
.wait(ctx);
}
fn stopping(&mut self, _: &mut Self::Context) -> Running {
self.server.do_send(Disconnect {
2021-09-19 23:21:10 +08:00
sid: self.session_id.clone(),
2021-08-19 08:36:30 +08:00
});
Running::Stop
}
}
2021-08-19 14:08:24 +08:00
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WSClient {
2021-08-19 08:36:30 +08:00
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
match msg {
Ok(ws::Message::Ping(msg)) => {
self.hb = Instant::now();
ctx.pong(&msg);
},
2021-09-22 14:42:14 +08:00
Ok(ws::Message::Pong(_msg)) => {
// log::debug!("Receive {} pong {:?}", &self.session_id, &msg);
2021-08-19 08:36:30 +08:00
self.hb = Instant::now();
},
Ok(ws::Message::Binary(bin)) => {
2021-09-19 23:21:10 +08:00
log::debug!(" Receive {} binary", &self.session_id);
2021-08-19 14:08:24 +08:00
self.send(MessageData::Binary(bin));
2021-08-19 08:36:30 +08:00
},
2021-09-19 23:21:10 +08:00
Ok(Text(_)) => {
log::warn!("Receive unexpected text message");
},
2021-08-19 08:36:30 +08:00
Ok(ws::Message::Close(reason)) => {
2021-09-19 23:21:10 +08:00
self.send(MessageData::Disconnect(self.session_id.clone()));
2021-08-19 08:36:30 +08:00
ctx.close(reason);
ctx.stop();
},
2021-09-19 23:21:10 +08:00
Ok(ws::Message::Continuation(_)) => {},
Ok(ws::Message::Nop) => {},
2021-08-19 08:36:30 +08:00
Err(e) => {
2021-09-19 23:21:10 +08:00
log::error!(
"[{}]: WebSocketStream protocol error {:?}",
self.session_id,
e
);
2021-08-19 08:36:30 +08:00
ctx.stop();
},
}
}
}
2021-08-19 14:08:24 +08:00
impl Handler<ClientMessage> for WSClient {
2021-08-19 08:36:30 +08:00
type Result = ();
2021-08-19 14:08:24 +08:00
fn handle(&mut self, msg: ClientMessage, ctx: &mut Self::Context) {
match msg.data {
MessageData::Binary(binary) => {
2021-08-19 08:36:30 +08:00
ctx.binary(binary);
},
2021-09-19 23:21:10 +08:00
MessageData::Connect(_) => {},
MessageData::Disconnect(_) => {},
2021-08-19 08:36:30 +08:00
}
}
}