use crate::{ config::{HEARTBEAT_INTERVAL, PING_TIMEOUT}, service::ws::{ entities::{Connect, Disconnect, SessionId}, ClientMessage, MessageData, WSServer, WsBizHandler, WsBizHandlers, }, }; use actix::{fut::wrap_future, *}; use actix_web::web::Data; use actix_web_actors::{ws, ws::Message::Text}; use bytes::Bytes; use flowy_ws::{WsMessage, WsSource}; use std::{convert::TryFrom, pin::Pin, time::Instant}; use tokio::sync::RwLock; pub struct WSClient { session_id: SessionId, server: Addr, biz_handlers: Data, hb: Instant, } impl WSClient { pub fn new>( session_id: T, server: Addr, biz_handlers: Data, ) -> Self { Self { session_id: session_id.into(), server, biz_handlers, hb: Instant::now(), } } fn hb(&self, ctx: &mut ws::WebsocketContext) { ctx.run_interval(HEARTBEAT_INTERVAL, |client, ctx| { if Instant::now().duration_since(client.hb) > PING_TIMEOUT { client.server.do_send(Disconnect { sid: client.session_id.clone(), }); ctx.stop(); } else { ctx.ping(b""); } }); } fn send(&self, data: MessageData) { let msg = ClientMessage::new(self.session_id.clone(), data); self.server.do_send(msg); } } async fn handle_binary_message(biz_handlers: Data, bytes: Bytes) { let message: WsMessage = WsMessage::try_from(bytes).unwrap(); match biz_handlers.get(&message.source) { None => { log::error!("Can't find the handler for {:?}", message.source); }, Some(handler) => handler .write() .await .receive_data(Bytes::from(message.data)), } } impl StreamHandler> for WSClient { fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { match msg { Ok(ws::Message::Ping(msg)) => { self.hb = Instant::now(); ctx.pong(&msg); }, Ok(ws::Message::Pong(_msg)) => { // log::debug!("Receive {} pong {:?}", &self.session_id, &msg); self.hb = Instant::now(); }, Ok(ws::Message::Binary(bytes)) => { log::debug!(" Receive {} binary", &self.session_id); let biz_handlers = self.biz_handlers.clone(); ctx.spawn(wrap_future(handle_binary_message(biz_handlers, bytes))); }, Ok(Text(_)) => { log::warn!("Receive unexpected text message"); }, Ok(ws::Message::Close(reason)) => { self.send(MessageData::Disconnect(self.session_id.clone())); ctx.close(reason); ctx.stop(); }, Ok(ws::Message::Continuation(_)) => {}, Ok(ws::Message::Nop) => {}, Err(e) => { log::error!( "[{}]: WebSocketStream protocol error {:?}", self.session_id, e ); ctx.stop(); }, } } } impl Handler for WSClient { type Result = (); fn handle(&mut self, msg: ClientMessage, ctx: &mut Self::Context) { match msg.data { MessageData::Binary(binary) => { ctx.binary(binary); }, MessageData::Connect(_) => {}, MessageData::Disconnect(_) => {}, } } } impl Actor for WSClient { type Context = ws::WebsocketContext; fn started(&mut self, ctx: &mut Self::Context) { self.hb(ctx); let socket = ctx.address().recipient(); let connect = Connect { socket, sid: self.session_id.clone(), }; self.server .send(connect) .into_actor(self) .then(|res, _client, _ctx| { match res { Ok(Ok(_)) => log::trace!("Send connect message to server success"), Ok(Err(e)) => log::error!("Send connect message to server failed: {:?}", e), Err(e) => log::error!("Send connect message to server failed: {:?}", e), } fut::ready(()) }) .wait(ctx); } fn stopping(&mut self, _: &mut Self::Context) -> Running { self.server.do_send(Disconnect { sid: self.session_id.clone(), }); Running::Stop } }