149 lines
4.5 KiB
Rust
Raw Normal View History

2021-12-06 21:47:21 +08:00
use crate::{
services::{
2021-12-13 13:55:44 +08:00
doc::editor::ServerDocUser,
2021-12-06 21:47:21 +08:00
util::{md5, parse_from_bytes},
},
web_socket::{entities::Socket, WsClientData, WsUser},
2021-09-30 17:24:02 +08:00
};
use actix_rt::task::spawn_blocking;
use actix_web::web::Data;
use async_stream::stream;
2021-11-20 08:35:04 +08:00
use backend_service::errors::{internal_error, Result as DocResult, ServerError};
use flowy_collaboration::{
2021-12-12 16:02:58 +08:00
core::sync::{OpenDocHandle, ServerDocManager},
2021-12-13 13:55:44 +08:00
protobuf::{NewDocUser, WsDataType, WsDocumentData},
};
use futures::stream::StreamExt;
2021-12-07 22:32:34 +08:00
use lib_ot::protobuf::Revision;
use sqlx::PgPool;
use std::{convert::TryInto, sync::Arc};
use tokio::sync::{mpsc, oneshot};
pub enum DocWsMsg {
ClientData {
2021-09-30 17:24:02 +08:00
client_data: WsClientData,
pool: Data<PgPool>,
ret: oneshot::Sender<DocResult<()>>,
},
}
pub struct DocWsActor {
receiver: Option<mpsc::Receiver<DocWsMsg>>,
2021-12-12 16:02:58 +08:00
doc_manager: Arc<ServerDocManager>,
}
impl DocWsActor {
2021-12-12 16:02:58 +08:00
pub fn new(receiver: mpsc::Receiver<DocWsMsg>, manager: Arc<ServerDocManager>) -> Self {
Self {
receiver: Some(receiver),
doc_manager: manager,
}
}
pub async fn run(mut self) {
let mut receiver = self
.receiver
.take()
.expect("DocActor's receiver should only take one time");
let stream = stream! {
loop {
match receiver.recv().await {
Some(msg) => yield msg,
None => break,
}
}
};
stream.for_each(|msg| self.handle_message(msg)).await;
}
async fn handle_message(&self, msg: DocWsMsg) {
match msg {
2021-09-30 17:24:02 +08:00
DocWsMsg::ClientData { client_data, pool, ret } => {
let _ = ret.send(self.handle_client_data(client_data, pool).await);
},
}
}
2021-09-30 17:24:02 +08:00
async fn handle_client_data(&self, client_data: WsClientData, pool: Data<PgPool>) -> DocResult<()> {
let WsClientData { user, socket, data } = client_data;
let document_data = spawn_blocking(move || {
2021-09-30 17:24:02 +08:00
let document_data: WsDocumentData = parse_from_bytes(&data)?;
DocResult::Ok(document_data)
})
.await
.map_err(internal_error)??;
let data = document_data.data;
match document_data.ty {
2021-09-30 17:24:02 +08:00
WsDataType::Acked => Ok(()),
2021-12-03 22:40:56 +08:00
WsDataType::PushRev => self.apply_pushed_rev(user, socket, data, pool).await,
WsDataType::NewDocUser => self.add_doc_user(user, socket, data, pool).await,
2021-09-30 17:24:02 +08:00
WsDataType::PullRev => Ok(()),
WsDataType::Conflict => Ok(()),
}
}
2021-12-03 22:40:56 +08:00
async fn add_doc_user(
2021-10-03 21:29:56 +08:00
&self,
user: Arc<WsUser>,
socket: Socket,
data: Vec<u8>,
pg_pool: Data<PgPool>,
2021-10-03 21:29:56 +08:00
) -> DocResult<()> {
let doc_user = spawn_blocking(move || {
let user: NewDocUser = parse_from_bytes(&data)?;
DocResult::Ok(user)
})
.await
.map_err(internal_error)??;
if let Some(handle) = self.get_doc_handle(&doc_user.doc_id, pg_pool.clone()).await {
2021-12-12 16:02:58 +08:00
let user = Arc::new(ServerDocUser { user, socket, pg_pool });
handle.add_user(user, doc_user.rev_id).await.map_err(internal_error)?;
2021-10-03 21:29:56 +08:00
}
Ok(())
}
2021-12-03 22:40:56 +08:00
async fn apply_pushed_rev(
&self,
2021-09-30 17:24:02 +08:00
user: Arc<WsUser>,
socket: Socket,
data: Vec<u8>,
pg_pool: Data<PgPool>,
) -> DocResult<()> {
let mut revision = spawn_blocking(move || {
let revision: Revision = parse_from_bytes(&data)?;
2021-09-30 17:24:02 +08:00
let _ = verify_md5(&revision)?;
DocResult::Ok(revision)
})
.await
.map_err(internal_error)??;
if let Some(handle) = self.get_doc_handle(&revision.doc_id, pg_pool.clone()).await {
2021-12-12 16:02:58 +08:00
let user = Arc::new(ServerDocUser { user, socket, pg_pool });
let revision = (&mut revision).try_into().map_err(internal_error).unwrap();
handle.apply_revision(user, revision).await.map_err(internal_error)?;
2021-10-03 21:29:56 +08:00
}
Ok(())
}
2021-12-13 13:55:44 +08:00
async fn get_doc_handle(&self, doc_id: &str, _pg_pool: Data<PgPool>) -> Option<Arc<OpenDocHandle>> {
match self.doc_manager.get(doc_id).await {
Ok(Some(edit_doc)) => Some(edit_doc),
Ok(None) => None,
Err(e) => {
log::error!("{}", e);
None
},
}
}
}
2021-09-30 17:24:02 +08:00
fn verify_md5(revision: &Revision) -> DocResult<()> {
if md5(&revision.delta_data) != revision.md5 {
return Err(ServerError::internal().context("Revision md5 not match"));
}
Ok(())
}