diff --git a/backend/Cargo.lock b/backend/Cargo.lock index b71b82288e..2c55dcc580 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -1111,6 +1111,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0" +[[package]] +name = "dissimilar" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31ad93652f40969dead8d4bf897a41e9462095152eb21c56e5830537e41179dd" + [[package]] name = "dotenv" version = "0.15.0" @@ -1227,6 +1233,8 @@ dependencies = [ "bytes", "chrono", "dashmap", + "dissimilar", + "flowy-core-data-model", "flowy-derive", "futures", "lib-infra", @@ -1236,6 +1244,7 @@ dependencies = [ "parking_lot", "protobuf", "serde", + "serde_json", "strum", "strum_macros", "tokio", @@ -1289,10 +1298,11 @@ dependencies = [ "chrono", "derive_more", "error-code", - "flowy-collaboration", "flowy-derive", "log", "protobuf", + "serde", + "serde_json", "strum", "strum_macros", "unicode-segmentation", diff --git a/backend/src/services/folder/ws_actor.rs b/backend/src/services/folder/ws_actor.rs index a76bb11bb4..a5919448bb 100644 --- a/backend/src/services/folder/ws_actor.rs +++ b/backend/src/services/folder/ws_actor.rs @@ -11,7 +11,6 @@ use flowy_collaboration::{ ClientRevisionWSData as ClientRevisionWSDataPB, ClientRevisionWSDataType as ClientRevisionWSDataTypePB, }, - server_document::ServerDocumentManager, synchronizer::{RevisionSyncResponse, RevisionUser}, }; use futures::stream::StreamExt; diff --git a/frontend/rust-lib/flowy-core/src/services/persistence/mod.rs b/frontend/rust-lib/flowy-core/src/services/persistence/mod.rs index a2556d7d57..f20f015419 100644 --- a/frontend/rust-lib/flowy-core/src/services/persistence/mod.rs +++ b/frontend/rust-lib/flowy-core/src/services/persistence/mod.rs @@ -1,4 +1,4 @@ -mod version_1; +pub mod version_1; mod version_2; use std::sync::Arc; diff --git a/frontend/rust-lib/flowy-core/src/services/view/controller.rs b/frontend/rust-lib/flowy-core/src/services/view/controller.rs index c58e0818b6..8ab5f3820d 100644 --- a/frontend/rust-lib/flowy-core/src/services/view/controller.rs +++ b/frontend/rust-lib/flowy-core/src/services/view/controller.rs @@ -4,6 +4,7 @@ use flowy_collaboration::entities::{ revision::{RepeatedRevision, Revision}, }; +use flowy_collaboration::client_document::default::initial_delta_string; use futures::{FutureExt, StreamExt}; use std::{collections::HashSet, sync::Arc}; @@ -61,7 +62,13 @@ impl ViewController { #[tracing::instrument(level = "debug", skip(self, params), fields(name = %params.name), err)] pub(crate) async fn create_view_from_params(&self, params: CreateViewParams) -> Result { - let delta_data = Bytes::from(params.view_data.clone()); + let view_data = if params.view_data.is_empty() { + initial_delta_string() + } else { + params.view_data.clone() + }; + + let delta_data = Bytes::from(view_data); let user_id = self.user.user_id()?; let repeated_revision: RepeatedRevision = Revision::initial_revision(&user_id, ¶ms.view_id, delta_data).into(); @@ -110,22 +117,20 @@ impl ViewController { }) } - #[tracing::instrument(level = "debug", skip(self, params), fields(doc_id = %params.doc_id), err)] - pub(crate) async fn open_view(&self, params: DocumentId) -> Result { - let doc_id = params.doc_id.clone(); - let editor = self.document_ctx.controller.open_document(¶ms.doc_id).await?; - - KV::set_str(LATEST_VIEW_ID, doc_id.clone()); + #[tracing::instrument(level = "debug", skip(self), err)] + pub(crate) async fn open_view(&self, doc_id: &str) -> Result { + let editor = self.document_ctx.controller.open_document(doc_id).await?; + KV::set_str(LATEST_VIEW_ID, doc_id.to_owned()); let document_json = editor.document_json().await?; Ok(DocumentDelta { - doc_id, + doc_id: doc_id.to_string(), delta_json: document_json, }) } - #[tracing::instrument(level = "debug", skip(self, params), err)] - pub(crate) async fn close_view(&self, params: DocumentId) -> Result<(), FlowyError> { - let _ = self.document_ctx.controller.close_document(¶ms.doc_id)?; + #[tracing::instrument(level = "debug", skip(self), err)] + pub(crate) async fn close_view(&self, doc_id: &str) -> Result<(), FlowyError> { + let _ = self.document_ctx.controller.close_document(doc_id)?; Ok(()) } @@ -140,13 +145,13 @@ impl ViewController { Ok(()) } - #[tracing::instrument(level = "debug", skip(self, params), fields(doc_id = %params.doc_id), err)] - pub(crate) async fn duplicate_view(&self, params: DocumentId) -> Result<(), FlowyError> { + #[tracing::instrument(level = "debug", skip(self), err)] + pub(crate) async fn duplicate_view(&self, doc_id: &str) -> Result<(), FlowyError> { let view = self .persistence - .begin_transaction(|transaction| transaction.read_view(¶ms.doc_id))?; + .begin_transaction(|transaction| transaction.read_view(doc_id))?; - let editor = self.document_ctx.controller.open_document(¶ms.doc_id).await?; + let editor = self.document_ctx.controller.open_document(doc_id).await?; let document_json = editor.document_json().await?; let duplicate_params = CreateViewParams { belong_to_id: view.belong_to_id.clone(), diff --git a/frontend/rust-lib/flowy-core/src/services/view/event_handler.rs b/frontend/rust-lib/flowy-core/src/services/view/event_handler.rs index fe58e14548..f3acad59bc 100644 --- a/frontend/rust-lib/flowy-core/src/services/view/event_handler.rs +++ b/frontend/rust-lib/flowy-core/src/services/view/event_handler.rs @@ -84,7 +84,7 @@ pub(crate) async fn open_view_handler( controller: Unit>, ) -> DataResult { let params: ViewId = data.into_inner().try_into()?; - let doc = controller.open_view(params.into()).await?; + let doc = controller.open_view(¶ms.view_id).await?; data_result(doc) } @@ -93,7 +93,7 @@ pub(crate) async fn close_view_handler( controller: Unit>, ) -> Result<(), FlowyError> { let params: ViewId = data.into_inner().try_into()?; - let _ = controller.close_view(params.into()).await?; + let _ = controller.close_view(¶ms.view_id).await?; Ok(()) } @@ -103,7 +103,7 @@ pub(crate) async fn duplicate_view_handler( controller: Unit>, ) -> Result<(), FlowyError> { let params: ViewId = data.into_inner().try_into()?; - let _ = controller.duplicate_view(params.into()).await?; + let _ = controller.duplicate_view(¶ms.view_id).await?; Ok(()) } diff --git a/frontend/rust-lib/flowy-document/src/core/web_socket.rs b/frontend/rust-lib/flowy-document/src/core/web_socket.rs index 781f2ea2b5..e8317b19cb 100644 --- a/frontend/rust-lib/flowy-document/src/core/web_socket.rs +++ b/frontend/rust-lib/flowy-document/src/core/web_socket.rs @@ -132,14 +132,20 @@ impl RevisionWSSinkDataProvider for DocumentWSSinkDataProviderAdapter { async fn transform_pushed_revisions( revisions: Vec, - edit_cmd: &EditorCommandSender, + edit_cmd_tx: &EditorCommandSender, ) -> FlowyResult { let (ret, rx) = oneshot::channel::>(); - let _ = edit_cmd.send(EditorCommand::TransformRevision { revisions, ret }); - Ok(rx.await.map_err(internal_error)??) + edit_cmd_tx + .send(EditorCommand::TransformRevision { revisions, ret }) + .await + .map_err(internal_error)?; + let transform_delta = rx + .await + .map_err(|e| FlowyError::internal().context(format!("transform_pushed_revisions failed: {}", e)))??; + Ok(transform_delta) } -#[tracing::instrument(level = "debug", skip(edit_cmd_tx, rev_manager, bytes))] +#[tracing::instrument(level = "debug", skip(edit_cmd_tx, rev_manager, bytes), err)] pub(crate) async fn handle_remote_revision( edit_cmd_tx: EditorCommandSender, rev_manager: Arc, @@ -173,23 +179,33 @@ pub(crate) async fn handle_remote_revision( // The server_prime is None means the client local revisions conflict with the // server, and it needs to override the client delta. let (ret, rx) = oneshot::channel(); - let _ = edit_cmd_tx.send(EditorCommand::OverrideDelta { - revisions, - delta: client_prime, - ret, - }); - let _ = rx.await.map_err(internal_error)??; + let _ = edit_cmd_tx + .send(EditorCommand::OverrideDelta { + revisions, + delta: client_prime, + ret, + }) + .await; + let _ = rx.await.map_err(|e| { + FlowyError::internal().context(format!("handle EditorCommand::OverrideDelta failed: {}", e)) + })??; Ok(None) }, Some(server_prime) => { let (ret, rx) = oneshot::channel(); - let _ = edit_cmd_tx.send(EditorCommand::ComposeRemoteDelta { - revisions, - client_delta: client_prime, - server_delta: server_prime, - ret, - }); - Ok(rx.await.map_err(internal_error)??) + edit_cmd_tx + .send(EditorCommand::ComposeRemoteDelta { + revisions, + client_delta: client_prime, + server_delta: server_prime, + ret, + }) + .await + .map_err(internal_error)?; + let result = rx.await.map_err(|e| { + FlowyError::internal().context(format!("handle EditorCommand::ComposeRemoteDelta failed: {}", e)) + })??; + Ok(result) }, } } diff --git a/frontend/rust-lib/flowy-document/tests/editor/serde_test.rs b/frontend/rust-lib/flowy-document/tests/editor/serde_test.rs index e5b31a194f..89e9e10c7b 100644 --- a/frontend/rust-lib/flowy-document/tests/editor/serde_test.rs +++ b/frontend/rust-lib/flowy-document/tests/editor/serde_test.rs @@ -3,6 +3,7 @@ use lib_ot::{ core::*, rich_text::{AttributeBuilder, RichTextAttribute, RichTextAttributeValue, RichTextDelta}, }; +use lib_ot::rich_text::RichTextOperation; #[test] fn operation_insert_serialize_test() { diff --git a/frontend/rust-lib/flowy-net/src/local_server/persistence.rs b/frontend/rust-lib/flowy-net/src/local_server/persistence.rs index 8784cdf31c..4812b6cb89 100644 --- a/frontend/rust-lib/flowy-net/src/local_server/persistence.rs +++ b/frontend/rust-lib/flowy-net/src/local_server/persistence.rs @@ -12,6 +12,8 @@ use std::{ sync::Arc, }; +// For the moment, we use memory to cache the data, it will be implemented with +// other storage. Like the Firestore,Dropbox.etc. pub trait RevisionCloudStorage: Send + Sync { fn set_revisions(&self, repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError>; fn get_revisions( @@ -28,8 +30,6 @@ pub trait RevisionCloudStorage: Send + Sync { } pub(crate) struct LocalDocumentCloudPersistence { - // For the moment, we use memory to cache the data, it will be implemented with other storage. - // Like the Firestore,Dropbox.etc. storage: Arc, } diff --git a/frontend/rust-lib/flowy-sdk/src/lib.rs b/frontend/rust-lib/flowy-sdk/src/lib.rs index 4310b37b38..34920ad1ff 100644 --- a/frontend/rust-lib/flowy-sdk/src/lib.rs +++ b/frontend/rust-lib/flowy-sdk/src/lib.rs @@ -74,6 +74,7 @@ fn crate_log_filter(level: String) -> String { filters.push(format!("lib_ot={}", level)); filters.push(format!("lib_ws={}", level)); filters.push(format!("lib_infra={}", level)); + filters.push(format!("flowy_sync={}", level)); filters.join(",") } diff --git a/frontend/rust-lib/flowy-sync/src/ws_manager.rs b/frontend/rust-lib/flowy-sync/src/ws_manager.rs index 943bb1db11..48f2af4a9a 100644 --- a/frontend/rust-lib/flowy-sync/src/ws_manager.rs +++ b/frontend/rust-lib/flowy-sync/src/ws_manager.rs @@ -147,13 +147,13 @@ impl RevisionWSStream { yield msg }, None => { - tracing::debug!("[RevisionWSStream:{}] loop exit", object_id); + tracing::debug!("[RevisionWSStream]:{} loop exit", object_id); break; }, } }, _ = stop_rx.recv() => { - tracing::debug!("[RevisionWSStream:{}] loop exit", object_id); + tracing::debug!("[RevisionWSStream]:{} loop exit", object_id); break }, }; @@ -164,7 +164,7 @@ impl RevisionWSStream { .for_each(|msg| async { match self.handle_message(msg).await { Ok(_) => {}, - Err(e) => tracing::error!("[RevisionWSStream:{}] error: {}", self.object_id, e), + Err(e) => tracing::error!("[RevisionWSStream]:{} error: {}", self.object_id, e), } }) .await; diff --git a/shared-lib/Cargo.lock b/shared-lib/Cargo.lock index 9656cec42f..3c5a43e123 100644 --- a/shared-lib/Cargo.lock +++ b/shared-lib/Cargo.lock @@ -530,6 +530,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0" +[[package]] +name = "dissimilar" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31ad93652f40969dead8d4bf897a41e9462095152eb21c56e5830537e41179dd" + [[package]] name = "either" version = "1.6.1" @@ -631,6 +637,8 @@ dependencies = [ "bytes", "chrono", "dashmap", + "dissimilar", + "flowy-core-data-model", "flowy-derive", "futures", "lib-infra", @@ -640,6 +648,7 @@ dependencies = [ "parking_lot", "protobuf", "serde", + "serde_json", "strum", "strum_macros", "tokio", @@ -655,10 +664,11 @@ dependencies = [ "chrono", "derive_more", "error-code", - "flowy-collaboration", "flowy-derive", "log", "protobuf", + "serde", + "serde_json", "strum", "strum_macros", "unicode-segmentation", diff --git a/shared-lib/flowy-collaboration/Cargo.toml b/shared-lib/flowy-collaboration/Cargo.toml index 9962b7c031..7ff7589521 100644 --- a/shared-lib/flowy-collaboration/Cargo.toml +++ b/shared-lib/flowy-collaboration/Cargo.toml @@ -9,12 +9,15 @@ edition = "2018" lib-ot = { path = "../lib-ot" } lib-infra = { path = "../lib-infra" } flowy-derive = { path = "../flowy-derive" } +flowy-core-data-model = { path = "../flowy-core-data-model" } protobuf = {version = "2.18.0"} bytes = "1.0" log = "0.4.14" md5 = "0.7.0" tokio = { version = "1", features = ["full"] } -serde = { version = "1.0", features = ["derive"] } +serde = { version = "1.0", features = ["derive", "rc"] } +serde_json = {version = "1.0"} +dissimilar = "1.0" tracing = { version = "0.1", features = ["log"] } url = "2.2" strum = "0.21" diff --git a/shared-lib/flowy-collaboration/src/client_document/extensions/helper.rs b/shared-lib/flowy-collaboration/src/client_document/extensions/helper.rs index d800595ffb..2fe60c570b 100644 --- a/shared-lib/flowy-collaboration/src/client_document/extensions/helper.rs +++ b/shared-lib/flowy-collaboration/src/client_document/extensions/helper.rs @@ -1,8 +1,5 @@ use crate::util::find_newline; -use lib_ot::{ - core::RichTextOperation, - rich_text::{plain_attributes, AttributeScope, RichTextAttribute, RichTextDelta}, -}; +use lib_ot::rich_text::{plain_attributes, AttributeScope, RichTextAttribute, RichTextDelta, RichTextOperation}; pub(crate) fn line_break( op: &RichTextOperation, diff --git a/shared-lib/flowy-collaboration/src/folder/folder_data.rs b/shared-lib/flowy-collaboration/src/folder/folder_data.rs new file mode 100644 index 0000000000..189b2869da --- /dev/null +++ b/shared-lib/flowy-collaboration/src/folder/folder_data.rs @@ -0,0 +1,132 @@ +use dissimilar::*; +use flowy_core_data_model::entities::{ + app::{App, RepeatedApp}, + trash::{RepeatedTrash, Trash}, + view::{RepeatedView, View}, + workspace::{RepeatedWorkspace, Workspace}, +}; +use lib_ot::core::{ + Delta, + FlowyStr, + Operation, + Operation::Retain, + PlainDeltaBuilder, + PlainTextAttributes, + PlainTextOpBuilder, +}; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct RootFolder { + workspaces: Vec>, + trash: Vec, +} + +impl RootFolder { + pub fn add_workspace(&mut self, workspace: Workspace) -> Option> { + let workspace = Arc::new(workspace); + if self.workspaces.contains(&workspace) { + tracing::warn!("Duplicate workspace"); + return None; + } + + let old = WorkspacesJson::new(self.workspaces.clone()).to_json().unwrap(); + self.workspaces.push(workspace); + let new = WorkspacesJson::new(self.workspaces.clone()).to_json().unwrap(); + Some(cal_diff(old, new)) + } + + pub fn update_workspace(&mut self, workspace_id: &str, name: Option, desc: Option) { + if let Some(mut workspace) = self + .workspaces + .iter_mut() + .find(|workspace| workspace.id == workspace_id) + { + let m_workspace = Arc::make_mut(&mut workspace); + if let Some(name) = name { + m_workspace.name = name; + } + + if let Some(desc) = desc { + m_workspace.desc = desc; + } + } + } + + pub fn delete_workspace(&mut self, workspace_id: &str) { self.workspaces.retain(|w| w.id != workspace_id) } +} + +fn cal_diff(old: String, new: String) -> Delta { + let mut chunks = dissimilar::diff(&old, &new); + let mut delta_builder = PlainDeltaBuilder::new(); + for chunk in &chunks { + match chunk { + Chunk::Equal(s) => { + delta_builder = delta_builder.retain(FlowyStr::from(*s).utf16_size()); + }, + Chunk::Delete(s) => { + delta_builder = delta_builder.delete(FlowyStr::from(*s).utf16_size()); + }, + Chunk::Insert(s) => { + delta_builder = delta_builder.insert(*s); + }, + } + } + delta_builder.build() +} + +#[derive(Serialize, Deserialize)] +struct WorkspacesJson { + workspaces: Vec>, +} + +impl WorkspacesJson { + fn new(workspaces: Vec>) -> Self { Self { workspaces } } + + fn to_json(self) -> Result { + serde_json::to_string(&self).map_err(|e| format!("format workspaces failed: {}", e)) + } +} + +#[cfg(test)] +mod tests { + use crate::folder::folder_data::RootFolder; + use chrono::Utc; + use flowy_core_data_model::{entities::prelude::Workspace, user_default}; + use std::{borrow::Cow, sync::Arc}; + + #[test] + fn folder_add_workspace_serde_test() { + let mut folder = RootFolder { + workspaces: vec![], + trash: vec![], + }; + + let time = Utc::now(); + let workspace_1 = user_default::create_default_workspace(time); + let delta_1 = folder.add_workspace(workspace_1).unwrap(); + println!("{}", delta_1); + + let workspace_2 = user_default::create_default_workspace(time); + let delta_2 = folder.add_workspace(workspace_2).unwrap(); + println!("{}", delta_2); + } + + #[test] + fn serial_folder_test() { + let time = Utc::now(); + let workspace = user_default::create_default_workspace(time); + let id = workspace.id.clone(); + let mut folder = RootFolder { + workspaces: vec![Arc::new(workspace)], + trash: vec![], + }; + + let mut cloned = folder.clone(); + cloned.update_workspace(&id, Some("123".to_owned()), None); + + println!("{}", serde_json::to_string(&folder).unwrap()); + println!("{}", serde_json::to_string(&cloned).unwrap()); + } +} diff --git a/shared-lib/flowy-collaboration/src/folder/folder_manager.rs b/shared-lib/flowy-collaboration/src/folder/folder_manager.rs new file mode 100644 index 0000000000..64c6bb36a1 --- /dev/null +++ b/shared-lib/flowy-collaboration/src/folder/folder_manager.rs @@ -0,0 +1,5 @@ +use lib_infra::future::BoxResultFuture; + +pub trait FolderCloudPersistence: Send + Sync { + // fn read_folder(&self) -> BoxResultFuture<> +} diff --git a/shared-lib/flowy-collaboration/src/folder/mod.rs b/shared-lib/flowy-collaboration/src/folder/mod.rs new file mode 100644 index 0000000000..1b0b578491 --- /dev/null +++ b/shared-lib/flowy-collaboration/src/folder/mod.rs @@ -0,0 +1,2 @@ +mod folder_data; +mod folder_manager; diff --git a/shared-lib/flowy-collaboration/src/lib.rs b/shared-lib/flowy-collaboration/src/lib.rs index a043dfcd26..84a5320739 100644 --- a/shared-lib/flowy-collaboration/src/lib.rs +++ b/shared-lib/flowy-collaboration/src/lib.rs @@ -1,6 +1,7 @@ pub mod client_document; pub mod entities; pub mod errors; +pub mod folder; pub mod protobuf; pub mod server_document; pub mod synchronizer; diff --git a/shared-lib/flowy-collaboration/src/server_document/document_manager.rs b/shared-lib/flowy-collaboration/src/server_document/document_manager.rs index 3ed154b48e..35fc1a777b 100644 --- a/shared-lib/flowy-collaboration/src/server_document/document_manager.rs +++ b/shared-lib/flowy-collaboration/src/server_document/document_manager.rs @@ -176,28 +176,6 @@ struct OpenDocHandle { users: DashMap>, } -impl RevisionSyncPersistence for Arc { - fn read_revisions( - &self, - object_id: &str, - rev_ids: Option>, - ) -> BoxResultFuture, CollaborateError> { - (**self).read_revisions(object_id, rev_ids) - } - - fn save_revisions(&self, repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError> { - (**self).save_revisions(repeated_revision) - } - - fn reset_object( - &self, - object_id: &str, - repeated_revision: RepeatedRevisionPB, - ) -> BoxResultFuture<(), CollaborateError> { - (**self).reset_document(object_id, repeated_revision) - } -} - impl OpenDocHandle { fn new(doc: DocumentInfo, persistence: Arc) -> Result { let doc_id = doc.doc_id.clone(); @@ -263,6 +241,28 @@ impl std::ops::Drop for OpenDocHandle { } } +impl RevisionSyncPersistence for Arc { + fn read_revisions( + &self, + object_id: &str, + rev_ids: Option>, + ) -> BoxResultFuture, CollaborateError> { + (**self).read_revisions(object_id, rev_ids) + } + + fn save_revisions(&self, repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError> { + (**self).save_revisions(repeated_revision) + } + + fn reset_object( + &self, + object_id: &str, + repeated_revision: RepeatedRevisionPB, + ) -> BoxResultFuture<(), CollaborateError> { + (**self).reset_document(object_id, repeated_revision) + } +} + // #[derive(Debug)] enum DocumentCommand { ApplyRevisions { diff --git a/shared-lib/flowy-collaboration/src/server_document/document_pad.rs b/shared-lib/flowy-collaboration/src/server_document/document_pad.rs index 984d7f1850..40b6c42d73 100644 --- a/shared-lib/flowy-collaboration/src/server_document/document_pad.rs +++ b/shared-lib/flowy-collaboration/src/server_document/document_pad.rs @@ -17,8 +17,6 @@ impl ServerDocument { let doc_id = doc_id.to_owned(); ServerDocument { doc_id, delta } } - - pub fn is_empty(&self) -> bool { self.delta == C::initial_delta() } } impl RevisionSyncObject for ServerDocument { diff --git a/shared-lib/flowy-collaboration/src/server_document/revision_sync.rs b/shared-lib/flowy-collaboration/src/server_document/revision_sync.rs deleted file mode 100644 index 4ed469d479..0000000000 --- a/shared-lib/flowy-collaboration/src/server_document/revision_sync.rs +++ /dev/null @@ -1,243 +0,0 @@ -use crate::{ - entities::{ - revision::RevisionRange, - ws_data::{ServerRevisionWSData, ServerRevisionWSDataBuilder}, - }, - errors::CollaborateError, - protobuf::{RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB}, - server_document::{document_pad::ServerDocument, DocumentCloudPersistence}, - util::*, -}; -use lib_infra::future::BoxResultFuture; -use lib_ot::{core::OperationTransformable, rich_text::RichTextDelta}; -use parking_lot::RwLock; -use std::{ - cmp::Ordering, - fmt::Debug, - sync::{ - atomic::{AtomicI64, Ordering::SeqCst}, - Arc, - }, - time::Duration, -}; - -pub trait RevisionUser: Send + Sync + Debug { - fn user_id(&self) -> String; - fn receive(&self, resp: RevisionSyncResponse); -} - -pub trait RevisionSyncObject { - type SyncObject; - - fn read_revisions(&self, rev_ids: Option>) -> BoxResultFuture, CollaborateError>; - - fn save_revisions(&self, repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError>; - - fn reset_object(&self, repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError>; -} - -pub enum RevisionSyncResponse { - Pull(ServerRevisionWSData), - Push(ServerRevisionWSData), - Ack(ServerRevisionWSData), -} - -pub struct RevisionSynchronizer { - pub doc_id: String, - pub rev_id: AtomicI64, - document: Arc>, - persistence: Arc, -} - -impl RevisionSynchronizer { - pub fn new( - doc_id: &str, - rev_id: i64, - document: ServerDocument, - persistence: Arc, - ) -> RevisionSynchronizer { - let document = Arc::new(RwLock::new(document)); - RevisionSynchronizer { - doc_id: doc_id.to_string(), - rev_id: AtomicI64::new(rev_id), - document, - persistence, - } - } - - #[tracing::instrument(level = "debug", skip(self, user, repeated_revision), err)] - pub async fn sync_revisions( - &self, - user: Arc, - repeated_revision: RepeatedRevisionPB, - ) -> Result<(), CollaborateError> { - let doc_id = self.doc_id.clone(); - if repeated_revision.get_items().is_empty() { - // Return all the revisions to client - let revisions = self.persistence.read_revisions(&doc_id, None).await?; - let repeated_revision = repeated_revision_from_revision_pbs(revisions)?; - let data = ServerRevisionWSDataBuilder::build_push_message(&doc_id, repeated_revision); - user.receive(RevisionSyncResponse::Push(data)); - return Ok(()); - } - - let server_base_rev_id = self.rev_id.load(SeqCst); - let first_revision = repeated_revision.get_items().first().unwrap().clone(); - if self.is_applied_before(&first_revision, &self.persistence).await { - // Server has received this revision before, so ignore the following revisions - return Ok(()); - } - - match server_base_rev_id.cmp(&first_revision.rev_id) { - Ordering::Less => { - let server_rev_id = next(server_base_rev_id); - if server_base_rev_id == first_revision.base_rev_id || server_rev_id == first_revision.rev_id { - // The rev is in the right order, just compose it. - for revision in repeated_revision.get_items() { - let _ = self.compose_revision(revision)?; - } - let _ = self.persistence.save_revisions(repeated_revision).await?; - } else { - // The server document is outdated, pull the missing revision from the client. - let range = RevisionRange { - object_id: self.doc_id.clone(), - start: server_rev_id, - end: first_revision.rev_id, - }; - let msg = ServerRevisionWSDataBuilder::build_pull_message(&self.doc_id, range); - user.receive(RevisionSyncResponse::Pull(msg)); - } - }, - Ordering::Equal => { - // Do nothing - tracing::warn!("Applied revision rev_id is the same as cur_rev_id"); - }, - Ordering::Greater => { - // The client document is outdated. Transform the client revision delta and then - // send the prime delta to the client. Client should compose the this prime - // delta. - let from_rev_id = first_revision.rev_id; - let to_rev_id = server_base_rev_id; - let _ = self.push_revisions_to_user(user, from_rev_id, to_rev_id).await; - }, - } - Ok(()) - } - - #[tracing::instrument(level = "trace", skip(self, user), fields(server_rev_id), err)] - pub async fn pong(&self, user: Arc, client_rev_id: i64) -> Result<(), CollaborateError> { - let doc_id = self.doc_id.clone(); - let server_rev_id = self.rev_id(); - tracing::Span::current().record("server_rev_id", &server_rev_id); - - match server_rev_id.cmp(&client_rev_id) { - Ordering::Less => { - tracing::error!("Client should not send ping and the server should pull the revisions from the client") - }, - Ordering::Equal => tracing::trace!("{} is up to date.", doc_id), - Ordering::Greater => { - // The client document is outdated. Transform the client revision delta and then - // send the prime delta to the client. Client should compose the this prime - // delta. - let from_rev_id = client_rev_id; - let to_rev_id = server_rev_id; - tracing::trace!("Push revisions to user"); - let _ = self.push_revisions_to_user(user, from_rev_id, to_rev_id).await; - }, - } - Ok(()) - } - - #[tracing::instrument(level = "debug", skip(self, repeated_revision), fields(doc_id), err)] - pub async fn reset(&self, repeated_revision: RepeatedRevisionPB) -> Result<(), CollaborateError> { - let doc_id = self.doc_id.clone(); - tracing::Span::current().record("doc_id", &doc_id.as_str()); - let revisions: Vec = repeated_revision.get_items().to_vec(); - let (_, rev_id) = pair_rev_id_from_revision_pbs(&revisions); - let delta = make_delta_from_revision_pb(revisions)?; - - let _ = self.persistence.reset_document(&doc_id, repeated_revision).await?; - *self.document.write() = ServerDocument::from_delta(delta); - let _ = self.rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(rev_id)); - Ok(()) - } - - pub fn doc_json(&self) -> String { self.document.read().to_json() } - - fn compose_revision(&self, revision: &RevisionPB) -> Result<(), CollaborateError> { - let delta = RichTextDelta::from_bytes(&revision.delta_data)?; - let _ = self.compose_delta(delta)?; - let _ = self.rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(revision.rev_id)); - Ok(()) - } - - #[tracing::instrument(level = "debug", skip(self, revision))] - fn transform_revision(&self, revision: &RevisionPB) -> Result<(RichTextDelta, RichTextDelta), CollaborateError> { - let cli_delta = RichTextDelta::from_bytes(&revision.delta_data)?; - let result = self.document.read().delta().transform(&cli_delta)?; - Ok(result) - } - - fn compose_delta(&self, delta: RichTextDelta) -> Result<(), CollaborateError> { - if delta.is_empty() { - log::warn!("Composed delta is empty"); - } - - match self.document.try_write_for(Duration::from_millis(300)) { - None => log::error!("Failed to acquire write lock of document"), - Some(mut write_guard) => { - let _ = write_guard.compose_delta(delta); - }, - } - Ok(()) - } - - pub(crate) fn rev_id(&self) -> i64 { self.rev_id.load(SeqCst) } - - async fn is_applied_before( - &self, - new_revision: &RevisionPB, - persistence: &Arc, - ) -> bool { - let rev_ids = Some(vec![new_revision.rev_id]); - if let Ok(revisions) = persistence.read_revisions(&self.doc_id, rev_ids).await { - if let Some(revision) = revisions.first() { - if revision.md5 == new_revision.md5 { - return true; - } - } - }; - - false - } - - async fn push_revisions_to_user(&self, user: Arc, from: i64, to: i64) { - let rev_ids: Vec = (from..=to).collect(); - let revisions = match self.persistence.read_revisions(&self.doc_id, Some(rev_ids)).await { - Ok(revisions) => { - assert_eq!( - revisions.is_empty(), - false, - "revisions should not be empty if the doc exists" - ); - revisions - }, - Err(e) => { - tracing::error!("{}", e); - vec![] - }, - }; - - tracing::debug!("Push revision: {} -> {} to client", from, to); - match repeated_revision_from_revision_pbs(revisions) { - Ok(repeated_revision) => { - let data = ServerRevisionWSDataBuilder::build_push_message(&self.doc_id, repeated_revision); - user.receive(RevisionSyncResponse::Push(data)); - }, - Err(e) => tracing::error!("{}", e), - } - } -} - -#[inline] -fn next(rev_id: i64) -> i64 { rev_id + 1 } diff --git a/shared-lib/flowy-collaboration/src/synchronizer.rs b/shared-lib/flowy-collaboration/src/synchronizer.rs index 855833c53d..21e3c44158 100644 --- a/shared-lib/flowy-collaboration/src/synchronizer.rs +++ b/shared-lib/flowy-collaboration/src/synchronizer.rs @@ -8,7 +8,7 @@ use crate::{ util::*, }; use lib_infra::future::BoxResultFuture; -use lib_ot::core::{Attributes, Delta, OperationTransformable}; +use lib_ot::core::{Attributes, Delta}; use parking_lot::RwLock; use serde::de::DeserializeOwned; use std::{ diff --git a/shared-lib/flowy-core-data-model/Cargo.toml b/shared-lib/flowy-core-data-model/Cargo.toml index 3b0b97b052..e6f3b67b26 100644 --- a/shared-lib/flowy-core-data-model/Cargo.toml +++ b/shared-lib/flowy-core-data-model/Cargo.toml @@ -14,10 +14,11 @@ strum = "0.21" strum_macros = "0.21" derive_more = {version = "0.99", features = ["display"]} log = "0.4.14" -flowy-collaboration = { path = "../flowy-collaboration" } uuid = { version = "0.8", features = ["serde", "v4"] } chrono = { version = "0.4" } error-code = { path = "../error-code" } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" [features] default = [] diff --git a/shared-lib/flowy-core-data-model/src/entities/app.rs b/shared-lib/flowy-core-data-model/src/entities/app.rs index 211de61618..956cbefc78 100644 --- a/shared-lib/flowy-core-data-model/src/entities/app.rs +++ b/shared-lib/flowy-core-data-model/src/entities/app.rs @@ -8,9 +8,10 @@ use crate::{ }, }; use flowy_derive::ProtoBuf; +use serde::{Deserialize, Serialize}; use std::convert::TryInto; -#[derive(PartialEq, ProtoBuf, Default, Debug, Clone)] +#[derive(PartialEq, ProtoBuf, Default, Debug, Clone, Serialize, Deserialize)] pub struct App { #[pb(index = 1)] pub id: String, @@ -41,7 +42,8 @@ impl App { pub fn take_belongings(&mut self) -> RepeatedView { std::mem::take(&mut self.belongings) } } -#[derive(PartialEq, Debug, Default, ProtoBuf, Clone)] +#[derive(PartialEq, Debug, Default, ProtoBuf, Clone, Serialize, Deserialize)] +#[serde(transparent)] pub struct RepeatedApp { #[pb(index = 1)] pub items: Vec, diff --git a/shared-lib/flowy-core-data-model/src/entities/trash.rs b/shared-lib/flowy-core-data-model/src/entities/trash.rs index 2321464348..5fe5d11c9e 100644 --- a/shared-lib/flowy-core-data-model/src/entities/trash.rs +++ b/shared-lib/flowy-core-data-model/src/entities/trash.rs @@ -1,8 +1,47 @@ use crate::{entities::app::App, impl_def_and_def_mut}; use flowy_derive::{ProtoBuf, ProtoBuf_Enum}; +use serde::{Deserialize, Serialize}; use std::fmt::Formatter; -#[derive(PartialEq, Debug, ProtoBuf_Enum, Clone)] +#[derive(PartialEq, ProtoBuf, Default, Debug, Clone, Serialize, Deserialize)] +pub struct Trash { + #[pb(index = 1)] + pub id: String, + + #[pb(index = 2)] + pub name: String, + + #[pb(index = 3)] + pub modified_time: i64, + + #[pb(index = 4)] + pub create_time: i64, + + #[pb(index = 5)] + pub ty: TrashType, +} + +#[derive(PartialEq, Debug, Default, ProtoBuf, Clone)] +pub struct RepeatedTrash { + #[pb(index = 1)] + pub items: Vec, +} + +impl_def_and_def_mut!(RepeatedTrash, Trash); + +impl std::convert::From for Trash { + fn from(app: App) -> Self { + Trash { + id: app.id, + name: app.name, + modified_time: app.modified_time, + create_time: app.create_time, + ty: TrashType::App, + } + } +} + +#[derive(PartialEq, Debug, ProtoBuf_Enum, Clone, Serialize, Deserialize)] pub enum TrashType { Unknown = 0, View = 1, @@ -97,41 +136,3 @@ impl std::convert::From<&Trash> for TrashId { impl std::fmt::Display for TrashId { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str(&format!("{:?}:{}", self.ty, self.id)) } } - -#[derive(PartialEq, ProtoBuf, Default, Debug, Clone)] -pub struct Trash { - #[pb(index = 1)] - pub id: String, - - #[pb(index = 2)] - pub name: String, - - #[pb(index = 3)] - pub modified_time: i64, - - #[pb(index = 4)] - pub create_time: i64, - - #[pb(index = 5)] - pub ty: TrashType, -} - -#[derive(PartialEq, Debug, Default, ProtoBuf, Clone)] -pub struct RepeatedTrash { - #[pb(index = 1)] - pub items: Vec, -} - -impl_def_and_def_mut!(RepeatedTrash, Trash); - -impl std::convert::From for Trash { - fn from(app: App) -> Self { - Trash { - id: app.id, - name: app.name, - modified_time: app.modified_time, - create_time: app.create_time, - ty: TrashType::App, - } - } -} diff --git a/shared-lib/flowy-core-data-model/src/entities/view.rs b/shared-lib/flowy-core-data-model/src/entities/view.rs index 46a6e334b2..3637d298a6 100644 --- a/shared-lib/flowy-core-data-model/src/entities/view.rs +++ b/shared-lib/flowy-core-data-model/src/entities/view.rs @@ -7,11 +7,11 @@ use crate::{ view::{ViewDesc, ViewIdentify, ViewName, ViewThumbnail}, }, }; -use flowy_collaboration::{client_document::default::initial_delta_string, entities::doc::DocumentId}; use flowy_derive::{ProtoBuf, ProtoBuf_Enum}; +use serde::{Deserialize, Serialize}; use std::convert::TryInto; -#[derive(PartialEq, ProtoBuf, Default, Debug, Clone)] +#[derive(PartialEq, ProtoBuf, Default, Debug, Clone, Serialize, Deserialize)] pub struct View { #[pb(index = 1)] pub id: String, @@ -41,7 +41,8 @@ pub struct View { pub create_time: i64, } -#[derive(PartialEq, Debug, Default, ProtoBuf, Clone)] +#[derive(PartialEq, Debug, Default, ProtoBuf, Clone, Serialize, Deserialize)] +#[serde(transparent)] pub struct RepeatedView { #[pb(index = 1)] pub items: Vec, @@ -61,7 +62,7 @@ impl std::convert::From for Trash { } } -#[derive(PartialEq, Debug, ProtoBuf_Enum, Clone)] +#[derive(PartialEq, Debug, ProtoBuf_Enum, Clone, Serialize, Deserialize)] pub enum ViewType { Blank = 0, Doc = 1, @@ -155,7 +156,7 @@ impl TryInto for CreateViewRequest { fn try_into(self) -> Result { let name = ViewName::parse(self.name)?.0; let belong_to_id = AppIdentify::parse(self.belong_to_id)?.0; - let view_data = initial_delta_string(); + let view_data = "".to_string(); let view_id = uuid::Uuid::new_v4().to_string(); let thumbnail = match self.thumbnail { None => "".to_string(), @@ -190,14 +191,6 @@ impl std::convert::From for ViewId { fn from(view_id: String) -> Self { ViewId { view_id } } } -impl std::convert::From for DocumentId { - fn from(identifier: ViewId) -> Self { - DocumentId { - doc_id: identifier.view_id, - } - } -} - impl TryInto for QueryViewRequest { type Error = ErrorCode; fn try_into(self) -> Result { diff --git a/shared-lib/flowy-core-data-model/src/entities/workspace.rs b/shared-lib/flowy-core-data-model/src/entities/workspace.rs index 1d25a25801..5203f38345 100644 --- a/shared-lib/flowy-core-data-model/src/entities/workspace.rs +++ b/shared-lib/flowy-core-data-model/src/entities/workspace.rs @@ -5,9 +5,10 @@ use crate::{ parser::workspace::{WorkspaceDesc, WorkspaceIdentify, WorkspaceName}, }; use flowy_derive::ProtoBuf; +use serde::{Deserialize, Serialize}; use std::convert::TryInto; -#[derive(PartialEq, ProtoBuf, Default, Debug, Clone)] +#[derive(PartialEq, ProtoBuf, Default, Debug, Clone, Serialize, Deserialize)] pub struct Workspace { #[pb(index = 1)] pub id: String, diff --git a/shared-lib/lib-ot/src/core/delta/builder.rs b/shared-lib/lib-ot/src/core/delta/builder.rs index 03fcc964ca..8767541297 100644 --- a/shared-lib/lib-ot/src/core/delta/builder.rs +++ b/shared-lib/lib-ot/src/core/delta/builder.rs @@ -1,4 +1,6 @@ -use crate::core::{trim, Attributes, Delta}; +use crate::core::{trim, Attributes, Delta, PlainTextAttributes}; + +pub type PlainDeltaBuilder = DeltaBuilder; pub struct DeltaBuilder { delta: Delta, diff --git a/shared-lib/lib-ot/src/core/delta/delta.rs b/shared-lib/lib-ot/src/core/delta/delta.rs index f0b5aa4478..ead1f23ffd 100644 --- a/shared-lib/lib-ot/src/core/delta/delta.rs +++ b/shared-lib/lib-ot/src/core/delta/delta.rs @@ -13,7 +13,7 @@ use std::{ str::FromStr, }; -// TODO: optimize the memory usage with Arc_mut or Cow +// TODO: optimize the memory usage with Arc::make_mut or Cow #[derive(Clone, Debug, PartialEq, Eq)] pub struct Delta { pub ops: Vec>, diff --git a/shared-lib/lib-ot/src/core/operation/builder.rs b/shared-lib/lib-ot/src/core/operation/builder.rs index efd63c2bff..fb64d07f61 100644 --- a/shared-lib/lib-ot/src/core/operation/builder.rs +++ b/shared-lib/lib-ot/src/core/operation/builder.rs @@ -1,9 +1,10 @@ use crate::{ - core::{Attributes, Operation}, + core::{Attributes, Operation, PlainTextAttributes}, rich_text::RichTextAttributes, }; pub type RichTextOpBuilder = OpBuilder; +pub type PlainTextOpBuilder = OpBuilder; pub struct OpBuilder { ty: Operation, diff --git a/shared-lib/lib-ot/src/core/operation/operation.rs b/shared-lib/lib-ot/src/core/operation/operation.rs index 886ed199f9..8cc81784d7 100644 --- a/shared-lib/lib-ot/src/core/operation/operation.rs +++ b/shared-lib/lib-ot/src/core/operation/operation.rs @@ -1,12 +1,13 @@ use crate::{ core::{FlowyStr, Interval, OpBuilder, OperationTransformable}, + errors::OTError, rich_text::{RichTextAttribute, RichTextAttributes}, }; use serde::__private::Formatter; use std::{ cmp::min, fmt, - fmt::Debug, + fmt::{Debug, Display}, ops::{Deref, DerefMut}, }; @@ -19,13 +20,6 @@ pub trait Attributes: fmt::Display + Eq + PartialEq + Default + Clone + Debug + fn extend_other(&mut self, other: Self); } -pub type RichTextOperation = Operation; -impl RichTextOperation { - pub fn contain_attribute(&self, attribute: &RichTextAttribute) -> bool { - self.get_attributes().contains_key(&attribute.key) - } -} - #[derive(Debug, Clone, Eq, PartialEq)] pub enum Operation { Delete(usize), @@ -328,3 +322,25 @@ where } } } + +#[derive(Debug, Clone, Eq, PartialEq, Default)] +pub struct PlainTextAttributes(); +impl fmt::Display for PlainTextAttributes { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str("PlainTextAttributes") } +} + +impl Attributes for PlainTextAttributes { + fn is_empty(&self) -> bool { true } + + fn remove_empty(&mut self) {} + + fn extend_other(&mut self, _other: Self) {} +} + +impl OperationTransformable for PlainTextAttributes { + fn compose(&self, _other: &Self) -> Result { Ok(self.clone()) } + + fn transform(&self, other: &Self) -> Result<(Self, Self), OTError> { Ok((self.clone(), other.clone())) } + + fn invert(&self, _other: &Self) -> Self { self.clone() } +} diff --git a/shared-lib/lib-ot/src/rich_text/attributes.rs b/shared-lib/lib-ot/src/rich_text/attributes.rs index 5af66a817a..43589835b5 100644 --- a/shared-lib/lib-ot/src/rich_text/attributes.rs +++ b/shared-lib/lib-ot/src/rich_text/attributes.rs @@ -1,7 +1,7 @@ #![allow(non_snake_case)] use crate::{ block_attribute, - core::{Attributes, OperationTransformable, RichTextOperation}, + core::{Attributes, Operation, OperationTransformable}, errors::OTError, ignore_attribute, inline_attribute, @@ -16,6 +16,13 @@ use std::{ }; use strum_macros::Display; +pub type RichTextOperation = Operation; +impl RichTextOperation { + pub fn contain_attribute(&self, attribute: &RichTextAttribute) -> bool { + self.get_attributes().contains_key(&attribute.key) + } +} + #[derive(Debug, Clone, Eq, PartialEq)] pub struct RichTextAttributes { pub(crate) inner: HashMap,