diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 28b92efedb..ee2fd47d17 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -55,6 +55,8 @@ lazy_static = "1.4" tokio = { version = "1", features = ["full"] } parking_lot = "0.11" md5 = "0.7.0" +futures-core = { version = "0.3", default-features = false } +pin-project = "1.0.0" flowy-user = { path = "../rust-lib/flowy-user" } flowy-workspace = { path = "../rust-lib/flowy-workspace" } diff --git a/backend/src/application.rs b/backend/src/application.rs index fa05a3066f..ddd763cedf 100644 --- a/backend/src/application.rs +++ b/backend/src/application.rs @@ -133,7 +133,7 @@ fn user_scope() -> Scope { async fn init_app_context(configuration: &Settings) -> AppContext { let _ = crate::service::log::Builder::new("flowy") - .env_filter("Debug") + .env_filter("Trace") .build(); let pg_pool = get_connection_pool(&configuration.database) .await diff --git a/backend/src/service/doc/doc.rs b/backend/src/service/doc/doc.rs index 4ba0077f1e..c75d1283d8 100644 --- a/backend/src/service/doc/doc.rs +++ b/backend/src/service/doc/doc.rs @@ -5,7 +5,7 @@ use crate::{ }; use anyhow::Context; use flowy_document::protobuf::{CreateDocParams, Doc, QueryDocParams, UpdateDocParams}; -use flowy_net::{errors::ServerError, response::FlowyResponse}; +use flowy_net::errors::ServerError; use sqlx::{postgres::PgArguments, PgPool, Postgres}; use uuid::Uuid; @@ -50,10 +50,11 @@ pub(crate) async fn read_doc(pool: &PgPool, params: QueryDocParams) -> Result Result { +) -> Result<(), ServerError> { let doc_id = Uuid::parse_str(¶ms.doc_id)?; let mut transaction = pool .begin() @@ -77,7 +78,7 @@ pub(crate) async fn update_doc( .await .context("Failed to commit SQL transaction to update doc.")?; - Ok(FlowyResponse::success()) + Ok(()) } pub(crate) async fn delete_doc( diff --git a/backend/src/service/doc/edit_doc.rs b/backend/src/service/doc/edit_doc.rs new file mode 100644 index 0000000000..897e90a080 --- /dev/null +++ b/backend/src/service/doc/edit_doc.rs @@ -0,0 +1,59 @@ +use crate::service::doc::update_doc; +use actix_web::web::Data; +use flowy_document::{ + protobuf::{Doc, Revision, UpdateDocParams}, + services::doc::Document, +}; +use flowy_net::errors::{internal_error, ServerError}; +use flowy_ot::core::Delta; +use parking_lot::RwLock; +use sqlx::PgPool; +use std::{sync::Arc, time::Duration}; + +pub(crate) struct EditDoc { + doc_id: String, + document: Arc>, + pg_pool: Data, +} + +impl EditDoc { + pub(crate) fn new(doc: Doc, pg_pool: Data) -> Result { + let delta = Delta::from_bytes(doc.data).map_err(internal_error)?; + let document = Arc::new(RwLock::new(Document::from_delta(delta))); + Ok(Self { + doc_id: doc.id.clone(), + document, + pg_pool, + }) + } + + #[tracing::instrument(level = "debug", skip(self, revision))] + pub(crate) async fn apply_revision(&self, revision: Revision) -> Result<(), ServerError> { + let delta = Delta::from_bytes(revision.delta).map_err(internal_error)?; + match self.document.try_write_for(Duration::from_millis(300)) { + None => { + log::error!("Failed to acquire write lock of document"); + }, + Some(mut w) => { + let _ = w.apply_delta(delta).map_err(internal_error)?; + }, + } + + let md5 = format!("{:x}", md5::compute(self.document.read().to_json())); + if md5 != revision.md5 { + log::warn!("Document md5 not match") + } + + let mut params = UpdateDocParams::new(); + params.set_doc_id(self.doc_id.clone()); + params.set_data(self.document.read().to_bytes()); + match update_doc(self.pg_pool.get_ref(), params).await { + Ok(_) => {}, + Err(e) => { + log::error!("Save doc data failed: {:?}", e); + }, + } + + Ok(()) + } +} diff --git a/backend/src/service/doc/mod.rs b/backend/src/service/doc/mod.rs index 94e30cf5a5..24b044b195 100644 --- a/backend/src/service/doc/mod.rs +++ b/backend/src/service/doc/mod.rs @@ -1,4 +1,5 @@ mod doc; +mod edit_doc; pub mod router; mod sql_builder; pub mod ws_handler; diff --git a/backend/src/service/doc/router.rs b/backend/src/service/doc/router.rs index a22ecab835..08fd075c50 100644 --- a/backend/src/service/doc/router.rs +++ b/backend/src/service/doc/router.rs @@ -28,6 +28,6 @@ pub async fn update_handler( pool: Data, ) -> Result { let params: UpdateDocParams = parse_from_payload(payload).await?; - let response = update_doc(pool.get_ref(), params).await?; - Ok(response.into()) + let _ = update_doc(pool.get_ref(), params).await?; + Ok(FlowyResponse::success().into()) } diff --git a/backend/src/service/doc/ws_handler.rs b/backend/src/service/doc/ws_handler.rs index 9ba9d82e02..3c91456174 100644 --- a/backend/src/service/doc/ws_handler.rs +++ b/backend/src/service/doc/ws_handler.rs @@ -1,90 +1,34 @@ +use super::edit_doc::EditDoc; use crate::service::{doc::read_doc, util::parse_from_bytes, ws::WsBizHandler}; use actix_web::web::Data; use bytes::Bytes; -use dashmap::{mapref::one::Ref, DashMap}; use flowy_document::{ - protobuf::{Doc, QueryDocParams, Revision, WsDataType, WsDocumentData}, + protobuf::{QueryDocParams, Revision, WsDataType, WsDocumentData}, services::doc::Document, }; -use flowy_net::errors::{internal_error, ServerError}; -use flowy_ot::core::Delta; -use parking_lot::{RawRwLock, RwLock}; +use flowy_net::errors::ServerError; +use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard}; use protobuf::Message; use sqlx::PgPool; -use std::sync::Arc; - -#[rustfmt::skip] -// -// Frontend │ Backend -// -// ┌──────────┐ ┌──────────┐ │ ┌─────────┐ ┌───────────────┐ -// │ user 1 │───────▶│WsManager │───────────▶│ws_client│───────────▶│DocWsBizHandler│ -// └──────────┘ └──────────┘ │ └─────────┘ └───────────────┘ -// -// WsDocumentData────▶WsMessage ────▶ Message ─────▶WsMessage ─────▶WsDocumentData +use std::{collections::HashMap, sync::Arc}; pub struct DocWsBizHandler { inner: Arc, } -struct Inner { - pg_pool: Data, - edited_docs: DashMap>>, -} - impl DocWsBizHandler { pub fn new(pg_pool: Data) -> Self { Self { - inner: Arc::new(Inner { - edited_docs: DashMap::new(), - pg_pool, - }), + inner: Arc::new(Inner::new(pg_pool)), } } } -async fn handle_document_data(inner: Arc, data: Bytes) -> Result<(), ServerError> { - let document_data: WsDocumentData = parse_from_bytes(&data)?; - match document_data.ty { - WsDataType::Command => {}, - WsDataType::Delta => { - let revision: Revision = parse_from_bytes(&document_data.data).unwrap(); - let edited_doc = get_edit_doc(inner, &revision.doc_id).await?; - let _ = edited_doc.write().apply_revision(revision)?; - }, - } - - Ok(()) -} - -async fn get_edit_doc( - inner: Arc, - doc_id: &str, -) -> Result>, ServerError> { - let pg_pool = inner.pg_pool.clone(); - - if let Some(doc) = inner.edited_docs.get(doc_id) { - return Ok(doc.clone()); - } - - let params = QueryDocParams { - doc_id: doc_id.to_string(), - ..Default::default() - }; - - let doc = read_doc(pg_pool.get_ref(), params).await?; - let edited_doc = Arc::new(RwLock::new(EditedDoc::new(doc)?)); - inner - .edited_docs - .insert(doc_id.to_string(), edited_doc.clone()); - Ok(edited_doc) -} - impl WsBizHandler for DocWsBizHandler { fn receive_data(&self, data: Bytes) { let inner = self.inner.clone(); - actix_rt::spawn(async { - let result = handle_document_data(inner, data).await; + actix_rt::spawn(async move { + let result = inner.handle(data).await; match result { Ok(_) => {}, Err(e) => log::error!("WsBizHandler handle data error: {:?}", e), @@ -93,34 +37,53 @@ impl WsBizHandler for DocWsBizHandler { } } -struct EditedDoc { - doc_id: String, - document: Document, +struct Inner { + pg_pool: Data, + edit_docs: RwLock>>, } -impl EditedDoc { - fn new(doc: Doc) -> Result { - let delta = Delta::from_bytes(doc.data).map_err(internal_error)?; - let document = Document::from_delta(delta); - Ok(Self { - doc_id: doc.id.clone(), - document, - }) +impl Inner { + fn new(pg_pool: Data) -> Self { + Self { + pg_pool, + edit_docs: RwLock::new(HashMap::new()), + } } - fn apply_revision(&mut self, revision: Revision) -> Result<(), ServerError> { - let delta = Delta::from_bytes(revision.delta).map_err(internal_error)?; - let _ = self - .document - .apply_delta(delta.clone()) - .map_err(internal_error)?; + async fn handle(&self, data: Bytes) -> Result<(), ServerError> { + let document_data: WsDocumentData = parse_from_bytes(&data)?; - let json = self.document.to_json(); - let md5 = format!("{:x}", md5::compute(json)); - if md5 != revision.md5 { - log::error!("Document conflict after apply delta {}", delta) + match document_data.ty { + WsDataType::Command => {}, + WsDataType::Delta => { + let revision: Revision = parse_from_bytes(&document_data.data)?; + let edited_doc = self.get_edit_doc(&revision.doc_id).await?; + tokio::spawn(async move { + edited_doc.apply_revision(revision).await.unwrap(); + }); + }, } Ok(()) } + + async fn get_edit_doc(&self, doc_id: &str) -> Result, ServerError> { + // Opti: using lock free map instead? + let edit_docs = self.edit_docs.upgradable_read(); + if let Some(doc) = edit_docs.get(doc_id) { + return Ok(doc.clone()); + } else { + let mut edit_docs = RwLockUpgradableReadGuard::upgrade(edit_docs); + let pg_pool = self.pg_pool.clone(); + let params = QueryDocParams { + doc_id: doc_id.to_string(), + ..Default::default() + }; + + let doc = read_doc(pg_pool.get_ref(), params).await?; + let edit_doc = Arc::new(EditDoc::new(doc, self.pg_pool.clone())?); + edit_docs.insert(doc_id.to_string(), edit_doc.clone()); + Ok(edit_doc) + } + } } diff --git a/backend/src/service/log/mod.rs b/backend/src/service/log/mod.rs index 7e821832da..48b25ff8e8 100644 --- a/backend/src/service/log/mod.rs +++ b/backend/src/service/log/mod.rs @@ -28,7 +28,7 @@ impl Builder { let env_filter = EnvFilter::new(self.env_filter); let subscriber = tracing_subscriber::fmt() .with_target(true) - .with_max_level(tracing::Level::DEBUG) + .with_max_level(tracing::Level::TRACE) .with_writer(std::io::stderr) .with_thread_ids(false) .compact() diff --git a/rust-lib/flowy-document/src/services/doc/document/document.rs b/rust-lib/flowy-document/src/services/doc/document/document.rs index 12a04e5fbf..d195710710 100644 --- a/rust-lib/flowy-document/src/services/doc/document/document.rs +++ b/rust-lib/flowy-document/src/services/doc/document/document.rs @@ -59,9 +59,9 @@ impl Document { } pub fn apply_delta(&mut self, delta: Delta) -> Result<(), DocError> { - log::debug!("Apply delta: {}", delta); + log::trace!("Apply delta: {}", delta); let _ = self.add_delta(&delta)?; - log::debug!("Document: {}", self.to_json()); + log::trace!("Document: {}", self.to_json()); Ok(()) } diff --git a/rust-lib/flowy-infra/src/lib.rs b/rust-lib/flowy-infra/src/lib.rs index b5b46300ea..8117abcc39 100644 --- a/rust-lib/flowy-infra/src/lib.rs +++ b/rust-lib/flowy-infra/src/lib.rs @@ -9,6 +9,9 @@ pub mod future; pub mod kv; mod protobuf; +#[macro_use] +pub mod macros; + #[allow(dead_code)] pub fn uuid() -> String { uuid::Uuid::new_v4().to_string() } diff --git a/rust-lib/flowy-infra/src/macros.rs b/rust-lib/flowy-infra/src/macros.rs new file mode 100644 index 0000000000..49bf43e54c --- /dev/null +++ b/rust-lib/flowy-infra/src/macros.rs @@ -0,0 +1,8 @@ +#[macro_export] +macro_rules! dispatch_future { + ($fut:expr) => { + ClosureFuture { + fut: Box::pin(async move { $fut.await }), + } + }; +}