From ad5bc23296c39e0523a748e1e3bc4d1c8ae45f12 Mon Sep 17 00:00:00 2001 From: appflowy Date: Tue, 5 Oct 2021 11:46:56 +0800 Subject: [PATCH] rename files --- backend/src/service/doc/edit/edit_actor.rs | 9 +++- backend/src/service/doc/edit/edit_doc.rs | 41 +++++++++-------- backend/src/service/doc/edit/open_handle.rs | 7 ++- backend/tests/document/edit.rs | 39 ++++++++++++---- backend/tests/document/helper.rs | 33 ++++++------- rust-lib/dart-ffi/src/lib.rs | 2 +- rust-lib/flowy-document/Flowy.toml | 2 +- rust-lib/flowy-document/src/lib.rs | 2 +- .../src/{observable => notify}/mod.rs | 0 .../src/{observable => notify}/observable.rs | 6 +-- .../doc/edit/{edit_actor.rs => doc_actor.rs} | 36 +++++++-------- .../src/services/doc/edit/edit_doc.rs | 46 +++++++++---------- .../src/services/doc/edit/message.rs | 2 +- .../src/services/doc/edit/mod.rs | 2 +- .../src/dart/stream_sender.rs | 10 ++-- rust-lib/flowy-observable/src/lib.rs | 8 ++-- rust-lib/flowy-user/Flowy.toml | 2 +- rust-lib/flowy-user/src/lib.rs | 2 +- .../src/{observable => notify}/mod.rs | 0 .../src/{observable => notify}/observable.rs | 6 ++- .../src/services/server/server_api.rs | 4 +- .../src/services/user/user_session.rs | 6 +-- rust-lib/flowy-workspace/Flowy.toml | 2 +- rust-lib/flowy-workspace/src/lib.rs | 2 +- .../src/{observable => notify}/mod.rs | 0 .../src/{observable => notify}/observable.rs | 6 +-- .../src/services/app_controller.rs | 10 ++-- .../src/services/server/middleware.rs | 6 ++- .../src/services/view_controller.rs | 12 +++-- .../src/services/workspace_controller.rs | 10 ++-- 30 files changed, 176 insertions(+), 137 deletions(-) rename rust-lib/flowy-document/src/{observable => notify}/mod.rs (100%) rename rust-lib/flowy-document/src/{observable => notify}/observable.rs (62%) rename rust-lib/flowy-document/src/services/doc/edit/{edit_actor.rs => doc_actor.rs} (85%) rename rust-lib/flowy-user/src/{observable => notify}/mod.rs (100%) rename rust-lib/flowy-user/src/{observable => notify}/observable.rs (72%) rename rust-lib/flowy-workspace/src/{observable => notify}/mod.rs (100%) rename rust-lib/flowy-workspace/src/{observable => notify}/observable.rs (80%) diff --git a/backend/src/service/doc/edit/edit_actor.rs b/backend/src/service/doc/edit/edit_actor.rs index 3c17858bf2..b19ddbf2b9 100644 --- a/backend/src/service/doc/edit/edit_actor.rs +++ b/backend/src/service/doc/edit/edit_actor.rs @@ -8,7 +8,7 @@ use flowy_document::protobuf::{Doc, Revision}; use flowy_net::errors::{internal_error, Result as DocResult, ServerError}; use futures::stream::StreamExt; use sqlx::PgPool; -use std::sync::Arc; +use std::sync::{atomic::Ordering::SeqCst, Arc}; use tokio::{ sync::{mpsc, oneshot}, task::spawn_blocking, @@ -35,6 +35,9 @@ pub enum EditMsg { DocumentJson { ret: oneshot::Sender>, }, + DocumentRevId { + ret: oneshot::Sender>, + }, NewDocUser { user: Arc, socket: Socket, @@ -97,6 +100,10 @@ impl EditDocActor { .map_err(internal_error); let _ = ret.send(json); }, + EditMsg::DocumentRevId { ret } => { + let edit_context = self.edit_doc.clone(); + let _ = ret.send(Ok(edit_context.rev_id.load(SeqCst))); + }, EditMsg::NewDocUser { user, socket, diff --git a/backend/src/service/doc/edit/edit_doc.rs b/backend/src/service/doc/edit/edit_doc.rs index 91c0f0ff16..14a8d069b2 100644 --- a/backend/src/service/doc/edit/edit_doc.rs +++ b/backend/src/service/doc/edit/edit_doc.rs @@ -32,8 +32,8 @@ use std::{ }; pub struct ServerEditDoc { - doc_id: String, - rev_id: AtomicI64, + pub doc_id: String, + pub rev_id: AtomicI64, document: Arc>, users: DashMap, } @@ -121,7 +121,7 @@ impl ServerEditDoc { // The client document is outdated. Transform the client revision delta and then // send the prime delta to the client. Client should compose the this prime // delta. - let cli_revision = self.transform_client_revision(&revision)?; + let cli_revision = self.transform_revision(&revision)?; let ws_cli_revision = mk_push_rev_ws_message(&self.doc_id, cli_revision); user.socket.do_send(ws_cli_revision).map_err(internal_error)?; }, @@ -137,8 +137,16 @@ impl ServerEditDoc { Ok(()) } - fn transform_client_revision(&self, revision: &Revision) -> Result { - let (cli_prime, server_prime) = self.transform(&revision.delta_data).map_err(internal_error)?; + #[tracing::instrument(level = "debug", skip(self, revision))] + fn transform_revision(&self, revision: &Revision) -> Result { + let cli_delta = Delta::from_bytes(&revision.delta_data).map_err(internal_error)?; + let (cli_prime, server_prime) = self + .document + .read() + .delta() + .transform(&cli_delta) + .map_err(internal_error)?; + let _ = self.compose_delta(server_prime)?; let cli_revision = self.mk_revision(revision.rev_id, cli_prime); Ok(cli_revision) @@ -159,19 +167,14 @@ impl ServerEditDoc { revision } - #[tracing::instrument(level = "debug", skip(self, delta_data))] - fn transform(&self, delta_data: &Vec) -> Result<(Delta, Delta), OTError> { - log::debug!("Document: {}", self.document.read().to_json()); - let doc_delta = self.document.read().delta().clone(); - let cli_delta = Delta::from_bytes(delta_data)?; - - log::debug!("Compose delta: {}", cli_delta); - let (cli_prime, server_prime) = doc_delta.transform(&cli_delta)?; - - Ok((cli_prime, server_prime)) - } - - #[tracing::instrument(level = "debug", skip(self), err)] + #[tracing::instrument( + level = "debug", + skip(self, delta), + fields( + delta = %delta.to_json(), + result, + ) + )] fn compose_delta(&self, delta: Delta) -> Result<(), ServerError> { // Opti: push each revision into queue and process it one by one. match self.document.try_write_for(Duration::from_millis(300)) { @@ -180,7 +183,7 @@ impl ServerEditDoc { }, Some(mut write_guard) => { let _ = write_guard.compose_delta(&delta).map_err(internal_error)?; - log::debug!("Document: {}", write_guard.to_json()); + tracing::Span::current().record("result", &write_guard.to_json().as_str()); }, } Ok(()) diff --git a/backend/src/service/doc/edit/open_handle.rs b/backend/src/service/doc/edit/open_handle.rs index 2693e962d3..34856cc4bc 100644 --- a/backend/src/service/doc/edit/open_handle.rs +++ b/backend/src/service/doc/edit/open_handle.rs @@ -18,7 +18,6 @@ impl DocHandle { let (sender, receiver) = mpsc::channel(100); let actor = EditDocActor::new(receiver, doc, pg_pool)?; tokio::task::spawn(actor.run()); - Ok(Self { sender }) } @@ -58,6 +57,12 @@ impl DocHandle { self.send(msg, rx).await? } + pub async fn rev_id(&self) -> DocResult { + let (ret, rx) = oneshot::channel(); + let msg = EditMsg::DocumentRevId { ret }; + self.send(msg, rx).await? + } + pub(crate) async fn send(&self, msg: EditMsg, rx: oneshot::Receiver) -> DocResult { let _ = self.sender.send(msg).await.map_err(internal_error)?; let result = rx.await?; diff --git a/backend/tests/document/edit.rs b/backend/tests/document/edit.rs index 809b80b825..aea9965627 100644 --- a/backend/tests/document/edit.rs +++ b/backend/tests/document/edit.rs @@ -1,5 +1,6 @@ use crate::document::helper::{DocScript, DocumentTest}; use flowy_document::services::doc::{Document, FlowyDoc}; +use flowy_ot::core::{Attribute, Interval}; #[rustfmt::skip] // ┌─────────┐ ┌─────────┐ @@ -16,15 +17,33 @@ use flowy_document::services::doc::{Document, FlowyDoc}; // └──────────────────────────┘ │ │ └──────────────────────┘ // │ │ #[actix_rt::test] -async fn delta_sync_after_ws_connection() { +async fn delta_sync_while_editing() { let test = DocumentTest::new().await; test.run_scripts(vec![ DocScript::ConnectWs, DocScript::OpenDoc, - DocScript::SendText(0, "abc"), - DocScript::SendText(3, "123"), + DocScript::InsertText(0, "abc"), + DocScript::InsertText(3, "123"), DocScript::AssertClient(r#"[{"insert":"abc123\n"}]"#), - DocScript::AssertServer(r#"[{"insert":"abc123\n"}]"#), + DocScript::AssertServer(r#"[{"insert":"abc123\n"}]"#, 2), + ]) + .await; +} + +#[actix_rt::test] +async fn delta_sync_while_editing_with_attribute() { + let test = DocumentTest::new().await; + test.run_scripts(vec![ + DocScript::ConnectWs, + DocScript::OpenDoc, + DocScript::InsertText(0, "abc"), + DocScript::FormatText(Interval::new(0, 3), Attribute::Bold(true)), + DocScript::AssertClient(r#"[{"insert":"abc","attributes":{"bold":true}},{"insert":"\n"}]"#), + DocScript::AssertServer(r#"[{"insert":"abc","attributes":{"bold":true}},{"insert":"\n"}]"#, 2), + DocScript::InsertText(3, "efg"), + DocScript::FormatText(Interval::new(3, 5), Attribute::Italic(true)), + DocScript::AssertClient(r#"[{"insert":"abc","attributes":{"bold":true}},{"insert":"ef","attributes":{"bold":true,"italic":true}},{"insert":"g","attributes":{"bold":true}},{"insert":"\n"}]"#), + DocScript::AssertServer(r#"[{"insert":"abc","attributes":{"bold":true}},{"insert":"ef","attributes":{"bold":true,"italic":true}},{"insert":"g","attributes":{"bold":true}},{"insert":"\n"}]"#, 4), ]) .await; } @@ -54,7 +73,7 @@ async fn delta_sync_with_http_request() { DocScript::SetServerDocument(json, 3), DocScript::OpenDoc, DocScript::AssertClient(r#"[{"insert":"123456\n"}]"#), - DocScript::AssertServer(r#"[{"insert":"123456\n"}]"#), + DocScript::AssertServer(r#"[{"insert":"123456\n"}]"#, 3), ]) .await; } @@ -116,10 +135,10 @@ async fn delta_sync_while_local_rev_less_than_server_rev() { test.run_scripts(vec![ DocScript::OpenDoc, DocScript::SetServerDocument(json, 3), - DocScript::SendText(0, "abc"), + DocScript::InsertText(0, "abc"), DocScript::ConnectWs, DocScript::AssertClient(r#"[{"insert":"abc\n123\n"}]"#), - DocScript::AssertServer(r#"[{"insert":"abc\n123\n"}]"#), + DocScript::AssertServer(r#"[{"insert":"abc\n123\n"}]"#, 4), ]) .await; } @@ -160,11 +179,11 @@ async fn delta_sync_while_local_rev_greater_than_server_rev() { DocScript::SetServerDocument(json, 1), DocScript::OpenDoc, DocScript::AssertClient(r#"[{"insert":"123\n"}]"#), - DocScript::SendText(3, "abc"), - DocScript::SendText(6, "efg"), + DocScript::InsertText(3, "abc"), + DocScript::InsertText(6, "efg"), DocScript::ConnectWs, DocScript::AssertClient(r#"[{"insert":"123abcefg\n"}]"#), - DocScript::AssertServer(r#"[{"insert":"123abcefg\n"}]"#), + DocScript::AssertServer(r#"[{"insert":"123abcefg\n"}]"#, 3), ]) .await; } diff --git a/backend/tests/document/helper.rs b/backend/tests/document/helper.rs index 6dfc1b5a1d..dd19a48a64 100644 --- a/backend/tests/document/helper.rs +++ b/backend/tests/document/helper.rs @@ -15,6 +15,7 @@ use flowy_user::services::user::UserSession; use crate::helper::{spawn_server, TestServer}; use flowy_document::protobuf::UpdateDocParams; +use flowy_ot::core::{Attribute, Interval}; use parking_lot::RwLock; use serde::__private::Formatter; @@ -25,27 +26,14 @@ pub struct DocumentTest { #[derive(Clone)] pub enum DocScript { ConnectWs, - SendText(usize, &'static str), + InsertText(usize, &'static str), + FormatText(Interval, Attribute), AssertClient(&'static str), - AssertServer(&'static str), + AssertServer(&'static str, i64), SetServerDocument(String, i64), // delta_json, rev_id OpenDoc, } -impl std::fmt::Display for DocScript { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - let name = match self { - DocScript::ConnectWs => "ConnectWs", - DocScript::SendText(_, _) => "SendText", - DocScript::AssertClient(_) => "AssertClient", - DocScript::AssertServer(_) => "AssertServer", - DocScript::SetServerDocument(_, _) => "SetServerDocument", - DocScript::OpenDoc => "OpenDoc", - }; - f.write_str(&format!("******** {} *********", name)) - } -} - impl DocumentTest { pub async fn new() -> Self { let server = spawn_server().await; @@ -122,19 +110,28 @@ async fn run_scripts(context: Arc>, scripts: Vec { context.write().open_doc().await; }, - DocScript::SendText(index, s) => { + DocScript::InsertText(index, s) => { context.read().client_edit_context().insert(index, s).await.unwrap(); }, + DocScript::FormatText(interval, attribute) => { + context + .read() + .client_edit_context() + .format(interval, attribute) + .await + .unwrap(); + }, DocScript::AssertClient(s) => { sleep(Duration::from_millis(100)).await; let json = context.read().client_edit_context().doc_json().await.unwrap(); assert_eq(s, &json); }, - DocScript::AssertServer(s) => { + DocScript::AssertServer(s, rev_id) => { sleep(Duration::from_millis(100)).await; let pg_pool = context.read().pool.clone(); let doc_manager = context.read().doc_manager.clone(); let edit_doc = doc_manager.get(&doc_id, pg_pool).await.unwrap().unwrap(); + assert_eq!(edit_doc.rev_id().await.unwrap(), rev_id); let json = edit_doc.document_json().await.unwrap(); assert_eq(s, &json); }, diff --git a/rust-lib/dart-ffi/src/lib.rs b/rust-lib/dart-ffi/src/lib.rs index d111ba12e9..5880935fc9 100644 --- a/rust-lib/dart-ffi/src/lib.rs +++ b/rust-lib/dart-ffi/src/lib.rs @@ -61,7 +61,7 @@ pub extern "C" fn sync_command(input: *const u8, len: usize) -> *const u8 { #[no_mangle] pub extern "C" fn set_stream_port(port: i64) -> i32 { - flowy_observable::dart::RustStreamSender::set_port(port); + flowy_observable::dart::DartStreamSender::set_port(port); return 0; } diff --git a/rust-lib/flowy-document/Flowy.toml b/rust-lib/flowy-document/Flowy.toml index d0e50ebb1d..a80b9508a5 100644 --- a/rust-lib/flowy-document/Flowy.toml +++ b/rust-lib/flowy-document/Flowy.toml @@ -1,3 +1,3 @@ -proto_crates = ["src/entities", "src/event.rs", "src/errors.rs", "src/observable"] +proto_crates = ["src/entities", "src/event.rs", "src/errors.rs", "src/notify"] event_files = [] \ No newline at end of file diff --git a/rust-lib/flowy-document/src/lib.rs b/rust-lib/flowy-document/src/lib.rs index 470f52a80e..3656b825d0 100644 --- a/rust-lib/flowy-document/src/lib.rs +++ b/rust-lib/flowy-document/src/lib.rs @@ -1,7 +1,7 @@ pub mod entities; pub mod errors; pub mod module; -mod observable; +mod notify; pub mod protobuf; pub mod services; mod sql_tables; diff --git a/rust-lib/flowy-document/src/observable/mod.rs b/rust-lib/flowy-document/src/notify/mod.rs similarity index 100% rename from rust-lib/flowy-document/src/observable/mod.rs rename to rust-lib/flowy-document/src/notify/mod.rs diff --git a/rust-lib/flowy-document/src/observable/observable.rs b/rust-lib/flowy-document/src/notify/observable.rs similarity index 62% rename from rust-lib/flowy-document/src/observable/observable.rs rename to rust-lib/flowy-document/src/notify/observable.rs index e6d66802f4..f0fb6f92dc 100644 --- a/rust-lib/flowy-document/src/observable/observable.rs +++ b/rust-lib/flowy-document/src/notify/observable.rs @@ -1,5 +1,5 @@ use flowy_derive::ProtoBuf_Enum; -use flowy_observable::NotifyBuilder; +use flowy_observable::DartNotifyBuilder; const OBSERVABLE_CATEGORY: &'static str = "Doc"; #[derive(ProtoBuf_Enum, Debug)] pub(crate) enum DocObservable { @@ -11,6 +11,6 @@ impl std::convert::Into for DocObservable { } #[allow(dead_code)] -pub(crate) fn observable(id: &str, ty: DocObservable) -> NotifyBuilder { - NotifyBuilder::new(id, ty, OBSERVABLE_CATEGORY) +pub(crate) fn dart_notify(id: &str, ty: DocObservable) -> DartNotifyBuilder { + DartNotifyBuilder::new(id, ty, OBSERVABLE_CATEGORY) } diff --git a/rust-lib/flowy-document/src/services/doc/edit/edit_actor.rs b/rust-lib/flowy-document/src/services/doc/edit/doc_actor.rs similarity index 85% rename from rust-lib/flowy-document/src/services/doc/edit/edit_actor.rs rename to rust-lib/flowy-document/src/services/doc/edit/doc_actor.rs index 944abe0e94..a35799bae9 100644 --- a/rust-lib/flowy-document/src/services/doc/edit/edit_actor.rs +++ b/rust-lib/flowy-document/src/services/doc/edit/doc_actor.rs @@ -3,7 +3,7 @@ use crate::{ errors::{internal_error, DocResult}, services::doc::{ edit::{ - message::{EditMsg, TransformDeltas}, + message::{DocumentMsg, TransformDeltas}, DocId, }, Document, @@ -17,19 +17,19 @@ use futures::stream::StreamExt; use std::{convert::TryFrom, sync::Arc}; use tokio::sync::{mpsc, RwLock}; -pub struct DocumentEditActor { +pub struct DocumentActor { doc_id: DocId, document: Arc>, pool: Arc, - receiver: Option>, + receiver: Option>, } -impl DocumentEditActor { +impl DocumentActor { pub fn new( doc_id: &str, delta: Delta, pool: Arc, - receiver: mpsc::UnboundedReceiver, + receiver: mpsc::UnboundedReceiver, ) -> Self { let doc_id = doc_id.to_string(); let document = Arc::new(RwLock::new(Document::from_delta(delta))); @@ -61,13 +61,13 @@ impl DocumentEditActor { .await; } - async fn handle_message(&self, msg: EditMsg) -> DocResult<()> { + async fn handle_message(&self, msg: DocumentMsg) -> DocResult<()> { match msg { - EditMsg::Delta { delta, ret } => { + DocumentMsg::Delta { delta, ret } => { let result = self.compose_delta(delta).await; let _ = ret.send(result); }, - EditMsg::RemoteRevision { bytes, ret } => { + DocumentMsg::RemoteRevision { bytes, ret } => { let revision = Revision::try_from(bytes)?; let delta = Delta::from_bytes(&revision.delta_data)?; let rev_id: RevId = revision.rev_id.into(); @@ -79,15 +79,15 @@ impl DocumentEditActor { }; let _ = ret.send(Ok(transform_delta)); }, - EditMsg::Insert { index, data, ret } => { + DocumentMsg::Insert { index, data, ret } => { let delta = self.document.write().await.insert(index, data); let _ = ret.send(delta); }, - EditMsg::Delete { interval, ret } => { + DocumentMsg::Delete { interval, ret } => { let result = self.document.write().await.delete(interval); let _ = ret.send(result); }, - EditMsg::Format { + DocumentMsg::Format { interval, attribute, ret, @@ -95,29 +95,29 @@ impl DocumentEditActor { let result = self.document.write().await.format(interval, attribute); let _ = ret.send(result); }, - EditMsg::Replace { interval, data, ret } => { + DocumentMsg::Replace { interval, data, ret } => { let result = self.document.write().await.replace(interval, data); let _ = ret.send(result); }, - EditMsg::CanUndo { ret } => { + DocumentMsg::CanUndo { ret } => { let _ = ret.send(self.document.read().await.can_undo()); }, - EditMsg::CanRedo { ret } => { + DocumentMsg::CanRedo { ret } => { let _ = ret.send(self.document.read().await.can_redo()); }, - EditMsg::Undo { ret } => { + DocumentMsg::Undo { ret } => { let result = self.document.write().await.undo(); let _ = ret.send(result); }, - EditMsg::Redo { ret } => { + DocumentMsg::Redo { ret } => { let result = self.document.write().await.redo(); let _ = ret.send(result); }, - EditMsg::Doc { ret } => { + DocumentMsg::Doc { ret } => { let data = self.document.read().await.to_json(); let _ = ret.send(Ok(data)); }, - EditMsg::SaveDocument { rev_id, ret } => { + DocumentMsg::SaveDocument { rev_id, ret } => { let result = self.save_to_disk(rev_id).await; let _ = ret.send(result); }, diff --git a/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs b/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs index e3bbb3c3e3..0aaa8019d7 100644 --- a/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs +++ b/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs @@ -8,8 +8,8 @@ use crate::{ services::{ doc::{ edit::{ - edit_actor::DocumentEditActor, - message::{EditMsg, TransformDeltas}, + doc_actor::DocumentActor, + message::{DocumentMsg, TransformDeltas}, model::NotifyOpenDocAction, }, revision::{DocRevision, RevisionCmd, RevisionManager, RevisionServer, RevisionStoreActor}, @@ -31,7 +31,7 @@ pub type DocId = String; pub struct ClientEditDoc { pub doc_id: DocId, rev_manager: Arc, - document: UnboundedSender, + document: UnboundedSender, ws: Arc, pool: Arc, user: Arc, @@ -64,7 +64,7 @@ impl ClientEditDoc { pub async fn insert(&self, index: usize, data: T) -> Result<(), DocError> { let (ret, rx) = oneshot::channel::>(); - let msg = EditMsg::Insert { + let msg = DocumentMsg::Insert { index, data: data.to_string(), ret, @@ -77,7 +77,7 @@ impl ClientEditDoc { pub async fn delete(&self, interval: Interval) -> Result<(), DocError> { let (ret, rx) = oneshot::channel::>(); - let msg = EditMsg::Delete { interval, ret }; + let msg = DocumentMsg::Delete { interval, ret }; let _ = self.document.send(msg); let delta_data = rx.await.map_err(internal_error)??.to_bytes(); let _ = self.mk_revision(&delta_data).await?; @@ -86,7 +86,7 @@ impl ClientEditDoc { pub async fn format(&self, interval: Interval, attribute: Attribute) -> Result<(), DocError> { let (ret, rx) = oneshot::channel::>(); - let msg = EditMsg::Format { + let msg = DocumentMsg::Format { interval, attribute, ret, @@ -99,7 +99,7 @@ impl ClientEditDoc { pub async fn replace(&mut self, interval: Interval, data: T) -> Result<(), DocError> { let (ret, rx) = oneshot::channel::>(); - let msg = EditMsg::Replace { + let msg = DocumentMsg::Replace { interval, data: data.to_string(), ret, @@ -112,35 +112,35 @@ impl ClientEditDoc { pub async fn can_undo(&self) -> bool { let (ret, rx) = oneshot::channel::(); - let msg = EditMsg::CanUndo { ret }; + let msg = DocumentMsg::CanUndo { ret }; let _ = self.document.send(msg); rx.await.unwrap_or(false) } pub async fn can_redo(&self) -> bool { let (ret, rx) = oneshot::channel::(); - let msg = EditMsg::CanRedo { ret }; + let msg = DocumentMsg::CanRedo { ret }; let _ = self.document.send(msg); rx.await.unwrap_or(false) } pub async fn undo(&self) -> Result { let (ret, rx) = oneshot::channel::>(); - let msg = EditMsg::Undo { ret }; + let msg = DocumentMsg::Undo { ret }; let _ = self.document.send(msg); rx.await.map_err(internal_error)? } pub async fn redo(&self) -> Result { let (ret, rx) = oneshot::channel::>(); - let msg = EditMsg::Redo { ret }; + let msg = DocumentMsg::Redo { ret }; let _ = self.document.send(msg); rx.await.map_err(internal_error)? } pub async fn doc(&self) -> DocResult { let (ret, rx) = oneshot::channel::>(); - let msg = EditMsg::Doc { ret }; + let msg = DocumentMsg::Doc { ret }; let _ = self.document.send(msg); let data = rx.await.map_err(internal_error)??; let rev_id = self.rev_manager.rev_id(); @@ -166,7 +166,7 @@ impl ClientEditDoc { pub(crate) async fn compose_local_delta(&self, data: Bytes) -> Result<(), DocError> { let delta = Delta::from_bytes(&data)?; let (ret, rx) = oneshot::channel::>(); - let msg = EditMsg::Delta { delta, ret }; + let msg = DocumentMsg::Delta { delta, ret }; let _ = self.document.send(msg); let _ = rx.await.map_err(internal_error)??; @@ -177,7 +177,7 @@ impl ClientEditDoc { #[cfg(feature = "flowy_test")] pub async fn doc_json(&self) -> DocResult { let (ret, rx) = oneshot::channel::>(); - let msg = EditMsg::Doc { ret }; + let msg = DocumentMsg::Doc { ret }; let _ = self.document.send(msg); rx.await.map_err(internal_error)? } @@ -203,7 +203,7 @@ impl ClientEditDoc { async fn handle_push_rev(&self, bytes: Bytes) -> DocResult<()> { // Transform the revision let (ret, rx) = oneshot::channel::>(); - let _ = self.document.send(EditMsg::RemoteRevision { bytes, ret }); + let _ = self.document.send(DocumentMsg::RemoteRevision { bytes, ret }); let TransformDeltas { client_prime, server_prime, @@ -217,7 +217,7 @@ impl ClientEditDoc { // compose delta let (ret, rx) = oneshot::channel::>(); - let msg = EditMsg::Delta { + let msg = DocumentMsg::Delta { delta: client_prime.clone(), ret, }; @@ -246,7 +246,7 @@ impl ClientEditDoc { &self.doc_id, RevType::Remote, ); - self.ws.send(revision.into()); + let _ = self.ws.send(revision.into()); save_document(self.document.clone(), local_rev_id.into()).await; Ok(()) @@ -261,7 +261,7 @@ impl ClientEditDoc { WsDataType::PullRev => { let range = RevisionRange::try_from(bytes)?; let revision = self.rev_manager.construct_revisions(range).await?; - self.ws.send(revision.into()); + let _ = self.ws.send(revision.into()); }, WsDataType::NewDocUser => {}, WsDataType::Acked => { @@ -298,9 +298,9 @@ impl WsDocumentHandler for EditDocWsHandler { } } -async fn save_document(document: UnboundedSender, rev_id: RevId) -> DocResult<()> { +async fn save_document(document: UnboundedSender, rev_id: RevId) -> DocResult<()> { let (ret, rx) = oneshot::channel::>(); - let _ = document.send(EditMsg::SaveDocument { rev_id, ret }); + let _ = document.send(DocumentMsg::SaveDocument { rev_id, ret }); let result = rx.await.map_err(internal_error)?; result } @@ -316,9 +316,9 @@ fn spawn_rev_store_actor( sender } -fn spawn_doc_edit_actor(doc_id: &str, delta: Delta, pool: Arc) -> UnboundedSender { - let (sender, receiver) = mpsc::unbounded_channel::(); - let actor = DocumentEditActor::new(&doc_id, delta, pool.clone(), receiver); +fn spawn_doc_edit_actor(doc_id: &str, delta: Delta, pool: Arc) -> UnboundedSender { + let (sender, receiver) = mpsc::unbounded_channel::(); + let actor = DocumentActor::new(&doc_id, delta, pool.clone(), receiver); tokio::spawn(actor.run()); sender } diff --git a/rust-lib/flowy-document/src/services/doc/edit/message.rs b/rust-lib/flowy-document/src/services/doc/edit/message.rs index 80b2ff8aa2..22a7f58bcf 100644 --- a/rust-lib/flowy-document/src/services/doc/edit/message.rs +++ b/rust-lib/flowy-document/src/services/doc/edit/message.rs @@ -6,7 +6,7 @@ use bytes::Bytes; use tokio::sync::oneshot; pub type Ret = oneshot::Sender>; -pub enum EditMsg { +pub enum DocumentMsg { Delta { delta: Delta, ret: Ret<()>, diff --git a/rust-lib/flowy-document/src/services/doc/edit/mod.rs b/rust-lib/flowy-document/src/services/doc/edit/mod.rs index f5ba0015bb..fcfc991daa 100644 --- a/rust-lib/flowy-document/src/services/doc/edit/mod.rs +++ b/rust-lib/flowy-document/src/services/doc/edit/mod.rs @@ -1,4 +1,4 @@ -mod edit_actor; +mod doc_actor; mod edit_doc; mod message; mod model; diff --git a/rust-lib/flowy-observable/src/dart/stream_sender.rs b/rust-lib/flowy-observable/src/dart/stream_sender.rs index a6f289d1fa..4ffad8d7ed 100644 --- a/rust-lib/flowy-observable/src/dart/stream_sender.rs +++ b/rust-lib/flowy-observable/src/dart/stream_sender.rs @@ -4,15 +4,15 @@ use lazy_static::lazy_static; use std::{convert::TryInto, sync::RwLock}; lazy_static! { - static ref R2F_STREAM_SENDER: RwLock = RwLock::new(RustStreamSender::new()); + static ref DART_STREAM_SENDER: RwLock = RwLock::new(DartStreamSender::new()); } -pub struct RustStreamSender { +pub struct DartStreamSender { #[allow(dead_code)] isolate: Option, } -impl RustStreamSender { +impl DartStreamSender { fn new() -> Self { Self { isolate: None } } fn inner_set_port(&mut self, port: i64) { @@ -33,7 +33,7 @@ impl RustStreamSender { } pub fn set_port(port: i64) { - match R2F_STREAM_SENDER.write() { + match DART_STREAM_SENDER.write() { Ok(mut stream) => stream.inner_set_port(port), Err(e) => { let msg = format!("Get rust to flutter stream lock fail. {:?}", e); @@ -44,7 +44,7 @@ impl RustStreamSender { pub fn post(_observable_subject: ObservableSubject) -> Result<(), String> { #[cfg(feature = "dart")] - match R2F_STREAM_SENDER.read() { + match DART_STREAM_SENDER.read() { Ok(stream) => stream.inner_post(_observable_subject), Err(e) => Err(format!("Get rust to flutter stream lock fail. {:?}", e)), } diff --git a/rust-lib/flowy-observable/src/lib.rs b/rust-lib/flowy-observable/src/lib.rs index 4497f737a4..cd471519d4 100644 --- a/rust-lib/flowy-observable/src/lib.rs +++ b/rust-lib/flowy-observable/src/lib.rs @@ -4,10 +4,10 @@ pub mod dart; pub mod entities; mod protobuf; -use crate::{dart::RustStreamSender, entities::ObservableSubject}; +use crate::{dart::DartStreamSender, entities::ObservableSubject}; use flowy_dispatch::prelude::ToBytes; -pub struct NotifyBuilder { +pub struct DartNotifyBuilder { id: String, payload: Option, error: Option, @@ -15,7 +15,7 @@ pub struct NotifyBuilder { ty: i32, } -impl NotifyBuilder { +impl DartNotifyBuilder { pub fn new>(id: &str, ty: T, source: &str) -> Self { Self { id: id.to_owned(), @@ -73,7 +73,7 @@ impl NotifyBuilder { }; log::debug!("Notify {}", subject); - match RustStreamSender::post(subject) { + match DartStreamSender::post(subject) { Ok(_) => {}, Err(error) => log::error!("Send observable subject failed: {}", error), } diff --git a/rust-lib/flowy-user/Flowy.toml b/rust-lib/flowy-user/Flowy.toml index f66326779a..152585acd5 100644 --- a/rust-lib/flowy-user/Flowy.toml +++ b/rust-lib/flowy-user/Flowy.toml @@ -1,3 +1,3 @@ -proto_crates = ["src/entities", "src/event.rs", "src/errors.rs", "src/observable"] +proto_crates = ["src/entities", "src/event.rs", "src/errors.rs", "src/notify"] event_files = ["src/event.rs"] \ No newline at end of file diff --git a/rust-lib/flowy-user/src/lib.rs b/rust-lib/flowy-user/src/lib.rs index 760b68d502..a6fc15f7f4 100644 --- a/rust-lib/flowy-user/src/lib.rs +++ b/rust-lib/flowy-user/src/lib.rs @@ -5,7 +5,7 @@ pub mod entities; pub mod errors; pub mod event; pub mod module; -mod observable; +mod notify; pub mod protobuf; pub mod services; diff --git a/rust-lib/flowy-user/src/observable/mod.rs b/rust-lib/flowy-user/src/notify/mod.rs similarity index 100% rename from rust-lib/flowy-user/src/observable/mod.rs rename to rust-lib/flowy-user/src/notify/mod.rs diff --git a/rust-lib/flowy-user/src/observable/observable.rs b/rust-lib/flowy-user/src/notify/observable.rs similarity index 72% rename from rust-lib/flowy-user/src/observable/observable.rs rename to rust-lib/flowy-user/src/notify/observable.rs index b7de996237..65c9a42f6e 100644 --- a/rust-lib/flowy-user/src/observable/observable.rs +++ b/rust-lib/flowy-user/src/notify/observable.rs @@ -1,6 +1,6 @@ use flowy_derive::ProtoBuf_Enum; -use flowy_observable::NotifyBuilder; +use flowy_observable::DartNotifyBuilder; const OBSERVABLE_CATEGORY: &'static str = "User"; @@ -20,4 +20,6 @@ impl std::convert::Into for UserObservable { fn into(self) -> i32 { self as i32 } } -pub(crate) fn notify(id: &str, ty: UserObservable) -> NotifyBuilder { NotifyBuilder::new(id, ty, OBSERVABLE_CATEGORY) } +pub(crate) fn dart_notify(id: &str, ty: UserObservable) -> DartNotifyBuilder { + DartNotifyBuilder::new(id, ty, OBSERVABLE_CATEGORY) +} diff --git a/rust-lib/flowy-user/src/services/server/server_api.rs b/rust-lib/flowy-user/src/services/server/server_api.rs index f9a6d0bc42..c923a04186 100644 --- a/rust-lib/flowy-user/src/services/server/server_api.rs +++ b/rust-lib/flowy-user/src/services/server/server_api.rs @@ -52,7 +52,7 @@ impl UserServerAPI for UserServer { fn ws_addr(&self) -> String { self.config.ws_addr() } } -use crate::{errors::ErrorCode, observable::*}; +use crate::{errors::ErrorCode, notify::*}; use flowy_net::response::FlowyResponse; use lazy_static::lazy_static; use std::sync::Arc; @@ -70,7 +70,7 @@ impl ResponseMiddleware for Middleware { None => {}, Some(token) => { let error = UserError::new(ErrorCode::UserUnauthorized, ""); - notify(token, UserObservable::UserUnauthorized).error(error).send() + dart_notify(token, UserObservable::UserUnauthorized).error(error).send() }, } } diff --git a/rust-lib/flowy-user/src/services/user/user_session.rs b/rust-lib/flowy-user/src/services/user/user_session.rs index 79677d3b28..623fd71a7a 100644 --- a/rust-lib/flowy-user/src/services/user/user_session.rs +++ b/rust-lib/flowy-user/src/services/user/user_session.rs @@ -6,7 +6,7 @@ use crate::{ }; use crate::{ - observable::*, + notify::*, services::server::{construct_user_server, Server}, }; use flowy_database::{ @@ -194,12 +194,12 @@ impl UserSession { tokio::spawn(async move { match server.get_user(&token).await { Ok(profile) => { - notify(&token, UserObservable::UserProfileUpdated) + dart_notify(&token, UserObservable::UserProfileUpdated) .payload(profile) .send(); }, Err(e) => { - notify(&token, UserObservable::UserProfileUpdated).error(e).send(); + dart_notify(&token, UserObservable::UserProfileUpdated).error(e).send(); }, } }); diff --git a/rust-lib/flowy-workspace/Flowy.toml b/rust-lib/flowy-workspace/Flowy.toml index f66326779a..152585acd5 100644 --- a/rust-lib/flowy-workspace/Flowy.toml +++ b/rust-lib/flowy-workspace/Flowy.toml @@ -1,3 +1,3 @@ -proto_crates = ["src/entities", "src/event.rs", "src/errors.rs", "src/observable"] +proto_crates = ["src/entities", "src/event.rs", "src/errors.rs", "src/notify"] event_files = ["src/event.rs"] \ No newline at end of file diff --git a/rust-lib/flowy-workspace/src/lib.rs b/rust-lib/flowy-workspace/src/lib.rs index 661d499250..4277089aa5 100644 --- a/rust-lib/flowy-workspace/src/lib.rs +++ b/rust-lib/flowy-workspace/src/lib.rs @@ -1,5 +1,5 @@ mod handlers; -mod observable; +mod notify; mod services; mod sql_tables; diff --git a/rust-lib/flowy-workspace/src/observable/mod.rs b/rust-lib/flowy-workspace/src/notify/mod.rs similarity index 100% rename from rust-lib/flowy-workspace/src/observable/mod.rs rename to rust-lib/flowy-workspace/src/notify/mod.rs diff --git a/rust-lib/flowy-workspace/src/observable/observable.rs b/rust-lib/flowy-workspace/src/notify/observable.rs similarity index 80% rename from rust-lib/flowy-workspace/src/observable/observable.rs rename to rust-lib/flowy-workspace/src/notify/observable.rs index 7cfb25189e..8d77c416cd 100644 --- a/rust-lib/flowy-workspace/src/observable/observable.rs +++ b/rust-lib/flowy-workspace/src/notify/observable.rs @@ -1,5 +1,5 @@ use flowy_derive::ProtoBuf_Enum; -use flowy_observable::NotifyBuilder; +use flowy_observable::DartNotifyBuilder; const OBSERVABLE_CATEGORY: &'static str = "Workspace"; #[derive(ProtoBuf_Enum, Debug)] @@ -26,6 +26,6 @@ impl std::convert::Into for WorkspaceObservable { fn into(self) -> i32 { self as i32 } } -pub(crate) fn notify(id: &str, ty: WorkspaceObservable) -> NotifyBuilder { - NotifyBuilder::new(id, ty, OBSERVABLE_CATEGORY) +pub(crate) fn dart_notify(id: &str, ty: WorkspaceObservable) -> DartNotifyBuilder { + DartNotifyBuilder::new(id, ty, OBSERVABLE_CATEGORY) } diff --git a/rust-lib/flowy-workspace/src/services/app_controller.rs b/rust-lib/flowy-workspace/src/services/app_controller.rs index bfec2dc134..454f815dbc 100644 --- a/rust-lib/flowy-workspace/src/services/app_controller.rs +++ b/rust-lib/flowy-workspace/src/services/app_controller.rs @@ -2,7 +2,7 @@ use crate::{ entities::app::{App, CreateAppParams, *}, errors::*, module::{WorkspaceDatabase, WorkspaceUser}, - observable::*, + notify::*, services::{helper::spawn, server::Server}, sql_tables::app::{AppTable, AppTableChangeset, AppTableSql}, }; @@ -36,7 +36,7 @@ impl AppController { conn.immediate_transaction::<_, WorkspaceError, _>(|| { let _ = self.save_app(app.clone(), &*conn)?; let apps = self.read_local_apps(&app.workspace_id, &*conn)?; - notify(&app.workspace_id, WorkspaceObservable::WorkspaceCreateApp) + dart_notify(&app.workspace_id, WorkspaceObservable::WorkspaceCreateApp) .payload(apps) .send(); Ok(()) @@ -64,7 +64,7 @@ impl AppController { conn.immediate_transaction::<_, WorkspaceError, _>(|| { let app = self.sql.delete_app(app_id, &*conn)?; let apps = self.read_local_apps(&app.workspace_id, &*conn)?; - notify(&app.workspace_id, WorkspaceObservable::WorkspaceDeleteApp) + dart_notify(&app.workspace_id, WorkspaceObservable::WorkspaceDeleteApp) .payload(apps) .send(); Ok(()) @@ -87,7 +87,9 @@ impl AppController { conn.immediate_transaction::<_, WorkspaceError, _>(|| { let _ = self.sql.update_app(changeset, conn)?; let app: App = self.sql.read_app(&app_id, None, conn)?.into(); - notify(&app_id, WorkspaceObservable::AppUpdated).payload(app).send(); + dart_notify(&app_id, WorkspaceObservable::AppUpdated) + .payload(app) + .send(); Ok(()) })?; diff --git a/rust-lib/flowy-workspace/src/services/server/middleware.rs b/rust-lib/flowy-workspace/src/services/server/middleware.rs index 07d9eb6256..0fd3275635 100644 --- a/rust-lib/flowy-workspace/src/services/server/middleware.rs +++ b/rust-lib/flowy-workspace/src/services/server/middleware.rs @@ -6,7 +6,7 @@ lazy_static! { use crate::{ errors::{ErrorCode, WorkspaceError}, - observable::*, + notify::*, }; use flowy_net::{request::ResponseMiddleware, response::FlowyResponse}; @@ -21,7 +21,9 @@ impl ResponseMiddleware for WorkspaceMiddleware { None => {}, Some(token) => { let error = WorkspaceError::new(ErrorCode::UserUnauthorized, ""); - notify(token, WorkspaceObservable::UserUnauthorized).error(error).send() + dart_notify(token, WorkspaceObservable::UserUnauthorized) + .error(error) + .send() }, } } diff --git a/rust-lib/flowy-workspace/src/services/view_controller.rs b/rust-lib/flowy-workspace/src/services/view_controller.rs index e29e7e273b..f15ba79acb 100644 --- a/rust-lib/flowy-workspace/src/services/view_controller.rs +++ b/rust-lib/flowy-workspace/src/services/view_controller.rs @@ -2,7 +2,7 @@ use crate::{ entities::view::{CreateViewParams, UpdateViewParams, View}, errors::WorkspaceError, module::WorkspaceDatabase, - observable::notify, + notify::dart_notify, services::{helper::spawn, server::Server}, sql_tables::view::{ViewTable, ViewTableChangeset, ViewTableSql}, }; @@ -11,7 +11,7 @@ use crate::{ entities::view::{DeleteViewParams, QueryViewParams, RepeatedView}, errors::internal_error, module::WorkspaceUser, - observable::WorkspaceObservable, + notify::WorkspaceObservable, }; use flowy_database::SqliteConnection; use flowy_document::{ @@ -55,7 +55,7 @@ impl ViewController { .create(CreateDocParams::new(&view.id, params.data), conn)?; let repeated_view = self.read_local_views_belong_to(&view.belong_to_id, conn)?; - notify(&view.belong_to_id, WorkspaceObservable::AppCreateView) + dart_notify(&view.belong_to_id, WorkspaceObservable::AppCreateView) .payload(repeated_view) .send(); Ok(()) @@ -93,7 +93,7 @@ impl ViewController { let _ = self.document.delete(params.into(), conn)?; let repeated_view = self.read_local_views_belong_to(&view_table.belong_to_id, conn)?; - notify(&view_table.belong_to_id, WorkspaceObservable::AppDeleteView) + dart_notify(&view_table.belong_to_id, WorkspaceObservable::AppDeleteView) .payload(repeated_view) .send(); Ok(()) @@ -119,7 +119,9 @@ impl ViewController { conn.immediate_transaction::<_, WorkspaceError, _>(|| { let _ = self.sql.update_view(changeset, conn)?; let view: View = self.sql.read_view(&view_id, None, conn)?.into(); - notify(&view_id, WorkspaceObservable::ViewUpdated).payload(view).send(); + dart_notify(&view_id, WorkspaceObservable::ViewUpdated) + .payload(view) + .send(); Ok(()) })?; diff --git a/rust-lib/flowy-workspace/src/services/workspace_controller.rs b/rust-lib/flowy-workspace/src/services/workspace_controller.rs index b16bd4b40f..3019d258e8 100644 --- a/rust-lib/flowy-workspace/src/services/workspace_controller.rs +++ b/rust-lib/flowy-workspace/src/services/workspace_controller.rs @@ -5,7 +5,7 @@ use crate::{ }, errors::*, module::{WorkspaceDatabase, WorkspaceUser}, - observable::*, + notify::*, services::{helper::spawn, server::Server, AppController, ViewController}, sql_tables::workspace::{WorkspaceTable, WorkspaceTableChangeset, WorkspaceTableSql}, }; @@ -61,7 +61,7 @@ impl WorkspaceController { conn.immediate_transaction::<_, WorkspaceError, _>(|| { self.workspace_sql.create_workspace(workspace_table, conn)?; let repeated_workspace = self.read_local_workspaces(None, &user_id, conn)?; - notify(&token, WorkspaceObservable::UserCreateWorkspace) + dart_notify(&token, WorkspaceObservable::UserCreateWorkspace) .payload(repeated_workspace) .send(); @@ -80,7 +80,7 @@ impl WorkspaceController { let _ = self.workspace_sql.update_workspace(changeset, conn)?; let user_id = self.user.user_id()?; let workspace = self.read_local_workspace(workspace_id.clone(), &user_id, conn)?; - notify(&workspace_id, WorkspaceObservable::WorkspaceUpdated) + dart_notify(&workspace_id, WorkspaceObservable::WorkspaceUpdated) .payload(workspace) .send(); @@ -100,7 +100,7 @@ impl WorkspaceController { conn.immediate_transaction::<_, WorkspaceError, _>(|| { let _ = self.workspace_sql.delete_workspace(workspace_id, conn)?; let repeated_workspace = self.read_local_workspaces(None, &user_id, conn)?; - notify(&token, WorkspaceObservable::UserDeleteWorkspace) + dart_notify(&token, WorkspaceObservable::UserDeleteWorkspace) .payload(repeated_workspace) .send(); @@ -289,7 +289,7 @@ impl WorkspaceController { Ok(()) })?; - notify(&token, WorkspaceObservable::WorkspaceListUpdated) + dart_notify(&token, WorkspaceObservable::WorkspaceListUpdated) .payload(workspaces) .send(); Result::<(), WorkspaceError>::Ok(())