From e7cc11cc4474287f62f59d227ba169a18ebdf8e7 Mon Sep 17 00:00:00 2001 From: appflowy Date: Fri, 3 Dec 2021 22:40:56 +0800 Subject: [PATCH] rename some structs --- backend/src/context.rs | 2 +- backend/src/service/doc/edit/edit_actor.rs | 6 +- .../doc/edit/{edit_doc.rs => editor.rs} | 4 +- backend/src/service/doc/edit/mod.rs | 8 +- backend/src/service/doc/edit/open_handle.rs | 70 ------------------ .../src/service/doc/{doc.rs => manager.rs} | 74 +++++++++++++++++-- backend/src/service/doc/mod.rs | 2 +- backend/src/service/doc/ws_actor.rs | 18 ++--- backend/src/service/view/router.rs | 2 +- backend/tests/document/helper.rs | 2 +- .../rust-lib/flowy-document/src/module.rs | 4 +- .../flowy-document/src/services/cache.rs | 12 +-- .../src/services/doc/doc_controller.rs | 12 ++- .../src/services/doc/edit/doc_actor.rs | 44 +++++------ .../doc/edit/{edit_doc.rs => editor.rs} | 70 +++++++++--------- .../src/services/doc/edit/mod.rs | 4 +- .../src/services/doc/revision/model.rs | 1 - 17 files changed, 163 insertions(+), 172 deletions(-) rename backend/src/service/doc/edit/{edit_doc.rs => editor.rs} (99%) delete mode 100644 backend/src/service/doc/edit/open_handle.rs rename backend/src/service/doc/{doc.rs => manager.rs} (50%) rename frontend/rust-lib/flowy-document/src/services/doc/edit/{edit_doc.rs => editor.rs} (84%) diff --git a/backend/src/context.rs b/backend/src/context.rs index e70abb6cf7..7b01cc556b 100644 --- a/backend/src/context.rs +++ b/backend/src/context.rs @@ -1,5 +1,5 @@ use crate::service::{ - doc::doc::DocBiz, + doc::manager::DocBiz, ws::{WsBizHandlers, WsServer}, }; use actix::Addr; diff --git a/backend/src/service/doc/edit/edit_actor.rs b/backend/src/service/doc/edit/edit_actor.rs index 19d31ce18c..c883c0ab70 100644 --- a/backend/src/service/doc/edit/edit_actor.rs +++ b/backend/src/service/doc/edit/edit_actor.rs @@ -1,5 +1,5 @@ use crate::service::{ - doc::edit::ServerEditDoc, + doc::edit::ServerDocEditor, ws::{entities::Socket, WsUser}, }; use actix_web::web::Data; @@ -48,13 +48,13 @@ pub enum EditMsg { pub struct EditDocActor { receiver: Option>, - edit_doc: Arc, + edit_doc: Arc, pg_pool: Data, } impl EditDocActor { pub fn new(receiver: mpsc::Receiver, doc: Doc, pg_pool: Data) -> Result { - let edit_doc = Arc::new(ServerEditDoc::new(doc)?); + let edit_doc = Arc::new(ServerDocEditor::new(doc)?); Ok(Self { receiver: Some(receiver), edit_doc, diff --git a/backend/src/service/doc/edit/edit_doc.rs b/backend/src/service/doc/edit/editor.rs similarity index 99% rename from backend/src/service/doc/edit/edit_doc.rs rename to backend/src/service/doc/edit/editor.rs index 3756668792..a10c0e87e4 100644 --- a/backend/src/service/doc/edit/edit_doc.rs +++ b/backend/src/service/doc/edit/editor.rs @@ -24,14 +24,14 @@ use std::{ time::Duration, }; -pub struct ServerEditDoc { +pub struct ServerDocEditor { pub doc_id: String, pub rev_id: AtomicI64, document: Arc>, users: DashMap, } -impl ServerEditDoc { +impl ServerDocEditor { pub fn new(doc: Doc) -> Result { let delta = Delta::from_bytes(&doc.data).map_err(internal_error)?; let document = Arc::new(RwLock::new(Document::from_delta(delta))); diff --git a/backend/src/service/doc/edit/mod.rs b/backend/src/service/doc/edit/mod.rs index d55861229f..9012184ffc 100644 --- a/backend/src/service/doc/edit/mod.rs +++ b/backend/src/service/doc/edit/mod.rs @@ -1,7 +1,5 @@ -mod edit_actor; -mod edit_doc; -mod open_handle; +pub(crate) mod edit_actor; +mod editor; pub use edit_actor::*; -pub use edit_doc::*; -pub use open_handle::*; +pub use editor::*; diff --git a/backend/src/service/doc/edit/open_handle.rs b/backend/src/service/doc/edit/open_handle.rs deleted file mode 100644 index 006811daee..0000000000 --- a/backend/src/service/doc/edit/open_handle.rs +++ /dev/null @@ -1,70 +0,0 @@ -use crate::service::{ - doc::edit::edit_actor::{EditDocActor, EditMsg}, - ws::{entities::Socket, WsUser}, -}; -use actix_web::web::Data; -use backend_service::errors::{internal_error, Result as DocResult, ServerError}; -use flowy_document_infra::protobuf::{Doc, Revision}; -use sqlx::PgPool; -use std::sync::Arc; -use tokio::sync::{mpsc, oneshot}; - -pub struct DocHandle { - pub sender: mpsc::Sender, -} - -impl DocHandle { - pub fn new(doc: Doc, pg_pool: Data) -> Result { - let (sender, receiver) = mpsc::channel(100); - let actor = EditDocActor::new(receiver, doc, pg_pool)?; - tokio::task::spawn(actor.run()); - Ok(Self { sender }) - } - - pub async fn handle_new_user(&self, user: Arc, rev_id: i64, socket: Socket) -> Result<(), ServerError> { - let (ret, rx) = oneshot::channel(); - let msg = EditMsg::NewDocUser { - user, - socket, - rev_id, - ret, - }; - let _ = self.send(msg, rx).await?; - Ok(()) - } - - pub async fn apply_revision( - &self, - user: Arc, - socket: Socket, - revision: Revision, - ) -> Result<(), ServerError> { - let (ret, rx) = oneshot::channel(); - let msg = EditMsg::Revision { - user, - socket, - revision, - ret, - }; - let _ = self.send(msg, rx).await?; - Ok(()) - } - - pub async fn document_json(&self) -> DocResult { - let (ret, rx) = oneshot::channel(); - let msg = EditMsg::DocumentJson { ret }; - self.send(msg, rx).await? - } - - pub async fn rev_id(&self) -> DocResult { - let (ret, rx) = oneshot::channel(); - let msg = EditMsg::DocumentRevId { ret }; - self.send(msg, rx).await? - } - - pub(crate) async fn send(&self, msg: EditMsg, rx: oneshot::Receiver) -> DocResult { - let _ = self.sender.send(msg).await.map_err(internal_error)?; - let result = rx.await?; - Ok(result) - } -} diff --git a/backend/src/service/doc/doc.rs b/backend/src/service/doc/manager.rs similarity index 50% rename from backend/src/service/doc/doc.rs rename to backend/src/service/doc/manager.rs index 2b749b585e..3d66848f01 100644 --- a/backend/src/service/doc/doc.rs +++ b/backend/src/service/doc/manager.rs @@ -1,15 +1,15 @@ use crate::service::{ doc::{ - edit::DocHandle, + edit::edit_actor::{EditDocActor, EditMsg}, read_doc, ws_actor::{DocWsActor, DocWsMsg}, }, - ws::{WsBizHandler, WsClientData}, + ws::{entities::Socket, WsBizHandler, WsClientData, WsUser}, }; use actix_web::web::Data; -use backend_service::errors::{internal_error, ServerError}; +use backend_service::errors::{internal_error, Result as DocResult, ServerError}; use dashmap::DashMap; -use flowy_document_infra::protobuf::DocIdentifier; +use flowy_document_infra::protobuf::{Doc, DocIdentifier, Revision}; use sqlx::PgPool; use std::sync::Arc; use tokio::{ @@ -58,7 +58,7 @@ impl WsBizHandler for DocBiz { } pub struct DocManager { - docs_map: DashMap>, + docs_map: DashMap>, } impl std::default::Default for DocManager { @@ -72,7 +72,7 @@ impl std::default::Default for DocManager { impl DocManager { pub fn new() -> Self { DocManager::default() } - pub async fn get(&self, doc_id: &str, pg_pool: Data) -> Result>, ServerError> { + pub async fn get(&self, doc_id: &str, pg_pool: Data) -> Result>, ServerError> { match self.docs_map.get(doc_id) { None => { let params = DocIdentifier { @@ -80,7 +80,7 @@ impl DocManager { ..Default::default() }; let doc = read_doc(pg_pool.get_ref(), params).await?; - let handle = spawn_blocking(|| DocHandle::new(doc, pg_pool)) + let handle = spawn_blocking(|| DocOpenHandle::new(doc, pg_pool)) .await .map_err(internal_error)?; let handle = Arc::new(handle?); @@ -91,3 +91,63 @@ impl DocManager { } } } + +pub struct DocOpenHandle { + pub sender: mpsc::Sender, +} + +impl DocOpenHandle { + pub fn new(doc: Doc, pg_pool: Data) -> Result { + let (sender, receiver) = mpsc::channel(100); + let actor = EditDocActor::new(receiver, doc, pg_pool)?; + tokio::task::spawn(actor.run()); + Ok(Self { sender }) + } + + pub async fn add_user(&self, user: Arc, rev_id: i64, socket: Socket) -> Result<(), ServerError> { + let (ret, rx) = oneshot::channel(); + let msg = EditMsg::NewDocUser { + user, + socket, + rev_id, + ret, + }; + let _ = self.send(msg, rx).await?; + Ok(()) + } + + pub async fn apply_revision( + &self, + user: Arc, + socket: Socket, + revision: Revision, + ) -> Result<(), ServerError> { + let (ret, rx) = oneshot::channel(); + let msg = EditMsg::Revision { + user, + socket, + revision, + ret, + }; + let _ = self.send(msg, rx).await?; + Ok(()) + } + + pub async fn document_json(&self) -> DocResult { + let (ret, rx) = oneshot::channel(); + let msg = EditMsg::DocumentJson { ret }; + self.send(msg, rx).await? + } + + pub async fn rev_id(&self) -> DocResult { + let (ret, rx) = oneshot::channel(); + let msg = EditMsg::DocumentRevId { ret }; + self.send(msg, rx).await? + } + + pub(crate) async fn send(&self, msg: EditMsg, rx: oneshot::Receiver) -> DocResult { + let _ = self.sender.send(msg).await.map_err(internal_error)?; + let result = rx.await?; + Ok(result) + } +} diff --git a/backend/src/service/doc/mod.rs b/backend/src/service/doc/mod.rs index 2ef59629ac..6e0a2c9694 100644 --- a/backend/src/service/doc/mod.rs +++ b/backend/src/service/doc/mod.rs @@ -3,7 +3,7 @@ pub(crate) use crud::*; pub use router::*; pub mod crud; -pub mod doc; mod edit; +pub mod manager; pub mod router; mod ws_actor; diff --git a/backend/src/service/doc/ws_actor.rs b/backend/src/service/doc/ws_actor.rs index ef18fc08c4..7f08651550 100644 --- a/backend/src/service/doc/ws_actor.rs +++ b/backend/src/service/doc/ws_actor.rs @@ -1,5 +1,5 @@ use crate::service::{ - doc::{doc::DocManager, edit::DocHandle}, + doc::manager::{DocManager, DocOpenHandle}, util::{md5, parse_from_bytes}, ws::{entities::Socket, WsClientData, WsUser}, }; @@ -73,14 +73,14 @@ impl DocWsActor { match document_data.ty { WsDataType::Acked => Ok(()), - WsDataType::PushRev => self.handle_push_rev(user, socket, data, pool).await, - WsDataType::NewDocUser => self.handle_new_doc_user(user, socket, data, pool).await, + WsDataType::PushRev => self.apply_pushed_rev(user, socket, data, pool).await, + WsDataType::NewDocUser => self.add_doc_user(user, socket, data, pool).await, WsDataType::PullRev => Ok(()), WsDataType::Conflict => Ok(()), } } - async fn handle_new_doc_user( + async fn add_doc_user( &self, user: Arc, socket: Socket, @@ -93,13 +93,13 @@ impl DocWsActor { }) .await .map_err(internal_error)??; - if let Some(handle) = self.doc_handle(&doc_user.doc_id, pool).await { - handle.handle_new_user(user, doc_user.rev_id, socket).await?; + if let Some(handle) = self.find_doc_handle(&doc_user.doc_id, pool).await { + handle.add_user(user, doc_user.rev_id, socket).await?; } Ok(()) } - async fn handle_push_rev( + async fn apply_pushed_rev( &self, user: Arc, socket: Socket, @@ -113,13 +113,13 @@ impl DocWsActor { }) .await .map_err(internal_error)??; - if let Some(handle) = self.doc_handle(&revision.doc_id, pool).await { + if let Some(handle) = self.find_doc_handle(&revision.doc_id, pool).await { handle.apply_revision(user, socket, revision).await?; } Ok(()) } - async fn doc_handle(&self, doc_id: &str, pool: Data) -> Option> { + async fn find_doc_handle(&self, doc_id: &str, pool: Data) -> Option> { match self.doc_manager.get(doc_id, pool).await { Ok(Some(edit_doc)) => Some(edit_doc), Ok(None) => { diff --git a/backend/src/service/view/router.rs b/backend/src/service/view/router.rs index 5a6571e74f..30f3066f92 100644 --- a/backend/src/service/view/router.rs +++ b/backend/src/service/view/router.rs @@ -1,5 +1,5 @@ use crate::service::{ - doc::doc::DocBiz, + doc::manager::DocBiz, user::LoggedUser, util::parse_from_payload, view::{create_view, delete_view, read_view, sql_builder::check_view_ids, update_view}, diff --git a/backend/tests/document/helper.rs b/backend/tests/document/helper.rs index e625f7c9a6..bcb6d3120e 100644 --- a/backend/tests/document/helper.rs +++ b/backend/tests/document/helper.rs @@ -1,7 +1,7 @@ use actix_web::web::Data; use backend::service::doc::{crud::update_doc, doc::DocManager}; use backend_service::config::ServerConfig; -use flowy_document::services::doc::ClientEditDoc as ClientEditDocContext; +use flowy_document::services::doc::ClientDocEditor as ClientEditDocContext; use flowy_test::{workspace::ViewTest, FlowyTest}; use flowy_user::services::user::UserSession; use futures_util::{stream, stream::StreamExt}; diff --git a/frontend/rust-lib/flowy-document/src/module.rs b/frontend/rust-lib/flowy-document/src/module.rs index 91bd0e8501..6fdb91e306 100644 --- a/frontend/rust-lib/flowy-document/src/module.rs +++ b/frontend/rust-lib/flowy-document/src/module.rs @@ -1,7 +1,7 @@ use crate::{ errors::DocError, services::{ - doc::{doc_controller::DocController, ClientEditDoc}, + doc::{doc_controller::DocController, ClientDocEditor}, server::construct_doc_server, ws::WsDocumentManager, }, @@ -44,7 +44,7 @@ impl FlowyDocument { Ok(()) } - pub async fn open(&self, params: DocIdentifier) -> Result, DocError> { + pub async fn open(&self, params: DocIdentifier) -> Result, DocError> { let edit_context = self.doc_ctrl.open(params, self.user.db_pool()?).await?; Ok(edit_context) } diff --git a/frontend/rust-lib/flowy-document/src/services/cache.rs b/frontend/rust-lib/flowy-document/src/services/cache.rs index 8b0ac76f01..6b633dc338 100644 --- a/frontend/rust-lib/flowy-document/src/services/cache.rs +++ b/frontend/rust-lib/flowy-document/src/services/cache.rs @@ -4,25 +4,25 @@ use dashmap::DashMap; use crate::{ errors::DocError, - services::doc::{ClientEditDoc, DocId}, + services::doc::{ClientDocEditor, DocId}, }; pub(crate) struct DocCache { - inner: DashMap>, + inner: DashMap>, } impl DocCache { pub(crate) fn new() -> Self { Self { inner: DashMap::new() } } #[allow(dead_code)] - pub(crate) fn all_docs(&self) -> Vec> { + pub(crate) fn all_docs(&self) -> Vec> { self.inner .iter() .map(|kv| kv.value().clone()) - .collect::>>() + .collect::>>() } - pub(crate) fn set(&self, doc: Arc) { + pub(crate) fn set(&self, doc: Arc) { let doc_id = doc.doc_id.clone(); if self.inner.contains_key(&doc_id) { log::warn!("Doc:{} already exists in cache", &doc_id); @@ -32,7 +32,7 @@ impl DocCache { pub(crate) fn contains(&self, doc_id: &str) -> bool { self.inner.get(doc_id).is_some() } - pub(crate) fn get(&self, doc_id: &str) -> Result, DocError> { + pub(crate) fn get(&self, doc_id: &str) -> Result, DocError> { if !self.contains(&doc_id) { return Err(doc_not_found()); } diff --git a/frontend/rust-lib/flowy-document/src/services/doc/doc_controller.rs b/frontend/rust-lib/flowy-document/src/services/doc/doc_controller.rs index aaf42bc451..e7d33fd186 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/doc_controller.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/doc_controller.rs @@ -4,7 +4,7 @@ use crate::{ services::{ cache::DocCache, doc::{ - edit::{ClientEditDoc, EditDocWsHandler}, + edit::{ClientDocEditor, EditDocWsHandler}, revision::RevisionServer, }, server::Server, @@ -45,7 +45,7 @@ impl DocController { &self, params: DocIdentifier, pool: Arc, - ) -> Result, DocError> { + ) -> Result, DocError> { if !self.cache.contains(¶ms.doc_id) { let edit_ctx = self.make_edit_context(¶ms.doc_id, pool.clone()).await?; return Ok(edit_ctx); @@ -91,7 +91,11 @@ impl DocController { } impl DocController { - async fn make_edit_context(&self, doc_id: &str, pool: Arc) -> Result, DocError> { + async fn make_edit_context( + &self, + doc_id: &str, + pool: Arc, + ) -> Result, DocError> { // Opti: require upgradable_read lock and then upgrade to write lock using // RwLockUpgradableReadGuard::upgrade(xx) of ws // let doc = self.read_doc(doc_id, pool.clone()).await?; @@ -103,7 +107,7 @@ impl DocController { server: self.server.clone(), }); - let edit_ctx = Arc::new(ClientEditDoc::new(doc_id, pool, ws, server, user).await?); + let edit_ctx = Arc::new(ClientDocEditor::new(doc_id, pool, ws, server, user).await?); let ws_handler = Arc::new(EditDocWsHandler(edit_ctx.clone())); self.ws_manager.register_handler(doc_id, ws_handler); self.cache.set(edit_ctx.clone()); diff --git a/frontend/rust-lib/flowy-document/src/services/doc/edit/doc_actor.rs b/frontend/rust-lib/flowy-document/src/services/doc/edit/doc_actor.rs index 1ce354afe1..0584dabc1b 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/edit/doc_actor.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/edit/doc_actor.rs @@ -10,14 +10,14 @@ use lib_ot::core::{Attribute, Delta, Interval, OperationTransformable}; use std::{convert::TryFrom, sync::Arc}; use tokio::sync::{mpsc, oneshot, RwLock}; -pub struct DocumentActor { +pub(crate) struct EditCommandQueue { doc_id: String, document: Arc>, - receiver: Option>, + receiver: Option>, } -impl DocumentActor { - pub fn new(doc_id: &str, delta: Delta, receiver: mpsc::UnboundedReceiver) -> Self { +impl EditCommandQueue { + pub(crate) fn new(doc_id: &str, delta: Delta, receiver: mpsc::UnboundedReceiver) -> Self { let document = Arc::new(RwLock::new(Document::from_delta(delta))); Self { doc_id: doc_id.to_owned(), @@ -26,7 +26,7 @@ impl DocumentActor { } } - pub async fn run(mut self) { + pub(crate) async fn run(mut self) { let mut receiver = self.receiver.take().expect("Should only call once"); let stream = stream! { loop { @@ -46,13 +46,13 @@ impl DocumentActor { .await; } - async fn handle_message(&self, msg: DocumentMsg) -> Result<(), DocumentError> { + async fn handle_message(&self, msg: EditCommand) -> Result<(), DocumentError> { match msg { - DocumentMsg::Delta { delta, ret } => { + EditCommand::ComposeDelta { delta, ret } => { let result = self.composed_delta(delta).await; let _ = ret.send(result); }, - DocumentMsg::RemoteRevision { bytes, ret } => { + EditCommand::RemoteRevision { bytes, ret } => { let revision = Revision::try_from(bytes)?; let delta = Delta::from_bytes(&revision.delta_data)?; let rev_id: RevId = revision.rev_id.into(); @@ -64,15 +64,15 @@ impl DocumentActor { }; let _ = ret.send(Ok(transform_delta)); }, - DocumentMsg::Insert { index, data, ret } => { + EditCommand::Insert { index, data, ret } => { let delta = self.document.write().await.insert(index, data); let _ = ret.send(delta); }, - DocumentMsg::Delete { interval, ret } => { + EditCommand::Delete { interval, ret } => { let result = self.document.write().await.delete(interval); let _ = ret.send(result); }, - DocumentMsg::Format { + EditCommand::Format { interval, attribute, ret, @@ -80,25 +80,25 @@ impl DocumentActor { let result = self.document.write().await.format(interval, attribute); let _ = ret.send(result); }, - DocumentMsg::Replace { interval, data, ret } => { + EditCommand::Replace { interval, data, ret } => { let result = self.document.write().await.replace(interval, data); let _ = ret.send(result); }, - DocumentMsg::CanUndo { ret } => { + EditCommand::CanUndo { ret } => { let _ = ret.send(self.document.read().await.can_undo()); }, - DocumentMsg::CanRedo { ret } => { + EditCommand::CanRedo { ret } => { let _ = ret.send(self.document.read().await.can_redo()); }, - DocumentMsg::Undo { ret } => { + EditCommand::Undo { ret } => { let result = self.document.write().await.undo(); let _ = ret.send(result); }, - DocumentMsg::Redo { ret } => { + EditCommand::Redo { ret } => { let result = self.document.write().await.redo(); let _ = ret.send(result); }, - DocumentMsg::Doc { ret } => { + EditCommand::ReadDoc { ret } => { let data = self.document.read().await.to_json(); let _ = ret.send(Ok(data)); }, @@ -122,9 +122,9 @@ impl DocumentActor { } } -pub type Ret = oneshot::Sender>; -pub enum DocumentMsg { - Delta { +pub(crate) type Ret = oneshot::Sender>; +pub(crate) enum EditCommand { + ComposeDelta { delta: Delta, ret: Ret<()>, }, @@ -164,12 +164,12 @@ pub enum DocumentMsg { Redo { ret: Ret, }, - Doc { + ReadDoc { ret: Ret, }, } -pub struct TransformDeltas { +pub(crate) struct TransformDeltas { pub client_prime: Delta, pub server_prime: Delta, pub server_rev_id: RevId, diff --git a/frontend/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs b/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs similarity index 84% rename from frontend/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs rename to frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs index ae36102640..910e223d4c 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs @@ -2,7 +2,7 @@ use crate::{ errors::{internal_error, DocError, DocResult}, module::DocumentUser, services::{ - doc::{DocumentActor, DocumentMsg, OpenDocAction, RevisionManager, RevisionServer, TransformDeltas}, + doc::{EditCommand, EditCommandQueue, OpenDocAction, RevisionManager, RevisionServer, TransformDeltas}, ws::{DocumentWebSocket, WsDocumentHandler}, }, }; @@ -24,15 +24,15 @@ use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot}; pub type DocId = String; -pub struct ClientEditDoc { +pub struct ClientDocEditor { pub doc_id: DocId, rev_manager: Arc, - document: UnboundedSender, + edit_tx: UnboundedSender, ws: Arc, user: Arc, } -impl ClientEditDoc { +impl ClientDocEditor { pub(crate) async fn new( doc_id: &str, pool: Arc, @@ -45,13 +45,13 @@ impl ClientEditDoc { spawn_rev_receiver(receiver, ws.clone()); let delta = rev_manager.load_document().await?; - let document = spawn_doc_edit_actor(doc_id, delta, pool.clone()); + let edit_queue_tx = spawn_edit_queue(doc_id, delta, pool.clone()); let doc_id = doc_id.to_string(); let rev_manager = Arc::new(rev_manager); let edit_doc = Self { doc_id, rev_manager, - document, + edit_tx: edit_queue_tx, ws, user, }; @@ -61,12 +61,12 @@ impl ClientEditDoc { pub async fn insert(&self, index: usize, data: T) -> Result<(), DocError> { let (ret, rx) = oneshot::channel::>(); - let msg = DocumentMsg::Insert { + let msg = EditCommand::Insert { index, data: data.to_string(), ret, }; - let _ = self.document.send(msg); + let _ = self.edit_tx.send(msg); let delta = rx.await.map_err(internal_error)??; let _ = self.save_local_delta(delta).await?; Ok(()) @@ -74,8 +74,8 @@ impl ClientEditDoc { pub async fn delete(&self, interval: Interval) -> Result<(), DocError> { let (ret, rx) = oneshot::channel::>(); - let msg = DocumentMsg::Delete { interval, ret }; - let _ = self.document.send(msg); + let msg = EditCommand::Delete { interval, ret }; + let _ = self.edit_tx.send(msg); let delta = rx.await.map_err(internal_error)??; let _ = self.save_local_delta(delta).await?; Ok(()) @@ -83,12 +83,12 @@ impl ClientEditDoc { pub async fn format(&self, interval: Interval, attribute: Attribute) -> Result<(), DocError> { let (ret, rx) = oneshot::channel::>(); - let msg = DocumentMsg::Format { + let msg = EditCommand::Format { interval, attribute, ret, }; - let _ = self.document.send(msg); + let _ = self.edit_tx.send(msg); let delta = rx.await.map_err(internal_error)??; let _ = self.save_local_delta(delta).await?; Ok(()) @@ -96,12 +96,12 @@ impl ClientEditDoc { pub async fn replace(&mut self, interval: Interval, data: T) -> Result<(), DocError> { let (ret, rx) = oneshot::channel::>(); - let msg = DocumentMsg::Replace { + let msg = EditCommand::Replace { interval, data: data.to_string(), ret, }; - let _ = self.document.send(msg); + let _ = self.edit_tx.send(msg); let delta = rx.await.map_err(internal_error)??; let _ = self.save_local_delta(delta).await?; Ok(()) @@ -109,38 +109,38 @@ impl ClientEditDoc { pub async fn can_undo(&self) -> bool { let (ret, rx) = oneshot::channel::(); - let msg = DocumentMsg::CanUndo { ret }; - let _ = self.document.send(msg); + let msg = EditCommand::CanUndo { ret }; + let _ = self.edit_tx.send(msg); rx.await.unwrap_or(false) } pub async fn can_redo(&self) -> bool { let (ret, rx) = oneshot::channel::(); - let msg = DocumentMsg::CanRedo { ret }; - let _ = self.document.send(msg); + let msg = EditCommand::CanRedo { ret }; + let _ = self.edit_tx.send(msg); rx.await.unwrap_or(false) } pub async fn undo(&self) -> Result { let (ret, rx) = oneshot::channel::>(); - let msg = DocumentMsg::Undo { ret }; - let _ = self.document.send(msg); + let msg = EditCommand::Undo { ret }; + let _ = self.edit_tx.send(msg); let r = rx.await.map_err(internal_error)??; Ok(r) } pub async fn redo(&self) -> Result { let (ret, rx) = oneshot::channel::>(); - let msg = DocumentMsg::Redo { ret }; - let _ = self.document.send(msg); + let msg = EditCommand::Redo { ret }; + let _ = self.edit_tx.send(msg); let r = rx.await.map_err(internal_error)??; Ok(r) } pub async fn delta(&self) -> DocResult { let (ret, rx) = oneshot::channel::>(); - let msg = DocumentMsg::Doc { ret }; - let _ = self.document.send(msg); + let msg = EditCommand::ReadDoc { ret }; + let _ = self.edit_tx.send(msg); let data = rx.await.map_err(internal_error)??; Ok(DocDelta { @@ -162,11 +162,11 @@ impl ClientEditDoc { pub(crate) async fn composing_local_delta(&self, data: Bytes) -> Result<(), DocError> { let delta = Delta::from_bytes(&data)?; let (ret, rx) = oneshot::channel::>(); - let msg = DocumentMsg::Delta { + let msg = EditCommand::ComposeDelta { delta: delta.clone(), ret, }; - let _ = self.document.send(msg); + let _ = self.edit_tx.send(msg); let _ = rx.await.map_err(internal_error)??; let _ = self.save_local_delta(delta).await?; @@ -176,8 +176,8 @@ impl ClientEditDoc { #[cfg(feature = "flowy_test")] pub async fn doc_json(&self) -> DocResult { let (ret, rx) = oneshot::channel::>(); - let msg = DocumentMsg::Doc { ret }; - let _ = self.document.send(msg); + let msg = EditCommand::ReadDoc { ret }; + let _ = self.edit_tx.send(msg); let s = rx.await.map_err(internal_error)??; Ok(s) } @@ -208,7 +208,7 @@ impl ClientEditDoc { async fn handle_push_rev(&self, bytes: Bytes) -> DocResult<()> { // Transform the revision let (ret, rx) = oneshot::channel::>(); - let _ = self.document.send(DocumentMsg::RemoteRevision { bytes, ret }); + let _ = self.edit_tx.send(EditCommand::RemoteRevision { bytes, ret }); let TransformDeltas { client_prime, server_prime, @@ -222,11 +222,11 @@ impl ClientEditDoc { // compose delta let (ret, rx) = oneshot::channel::>(); - let msg = DocumentMsg::Delta { + let msg = EditCommand::ComposeDelta { delta: client_prime.clone(), ret, }; - let _ = self.document.send(msg); + let _ = self.edit_tx.send(msg); let _ = rx.await.map_err(internal_error)??; // update rev id @@ -278,7 +278,7 @@ impl ClientEditDoc { } } -pub struct EditDocWsHandler(pub Arc); +pub struct EditDocWsHandler(pub Arc); impl WsDocumentHandler for EditDocWsHandler { fn receive(&self, doc_data: WsDocumentData) { @@ -313,9 +313,9 @@ fn spawn_rev_receiver(mut receiver: mpsc::UnboundedReceiver, ws: Arc) -> UnboundedSender { - let (sender, receiver) = mpsc::unbounded_channel::(); - let actor = DocumentActor::new(doc_id, delta, receiver); +fn spawn_edit_queue(doc_id: &str, delta: Delta, _pool: Arc) -> UnboundedSender { + let (sender, receiver) = mpsc::unbounded_channel::(); + let actor = EditCommandQueue::new(doc_id, delta, receiver); tokio::spawn(actor.run()); sender } diff --git a/frontend/rust-lib/flowy-document/src/services/doc/edit/mod.rs b/frontend/rust-lib/flowy-document/src/services/doc/edit/mod.rs index 1bb0e5f25e..e86845b81e 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/edit/mod.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/edit/mod.rs @@ -1,7 +1,7 @@ mod doc_actor; -mod edit_doc; +mod editor; mod model; pub(crate) use doc_actor::*; -pub use edit_doc::*; +pub use editor::*; pub(crate) use model::*; diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/model.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/model.rs index dd32d64005..2922a04383 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/model.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/revision/model.rs @@ -5,7 +5,6 @@ use crate::{ use flowy_database::ConnectionPool; use flowy_document_infra::entities::doc::{Revision, RevisionRange}; use lib_infra::future::ResultFuture; - use std::sync::Arc; use tokio::sync::broadcast;