diff --git a/frontend/app_flowy/lib/workspace/application/doc/doc_bloc.dart b/frontend/app_flowy/lib/workspace/application/doc/doc_bloc.dart index 72e903976f..d97ba433ce 100644 --- a/frontend/app_flowy/lib/workspace/application/doc/doc_bloc.dart +++ b/frontend/app_flowy/lib/workspace/application/doc/doc_bloc.dart @@ -86,7 +86,7 @@ class DocBloc extends Bloc { final result = await docManager.readDoc(); yield result.fold( (doc) { - document = _decodeJsonToDocument(doc.text); + document = _decodeJsonToDocument(doc.deltaJson); _subscription = document.changes.listen((event) { final delta = event.item2; final documentDelta = document.toDelta(); @@ -113,7 +113,7 @@ class DocBloc extends Bloc { result.fold((rustDoc) { // final json = utf8.decode(doc.data); - final rustDelta = Delta.fromJson(jsonDecode(rustDoc.text)); + final rustDelta = Delta.fromJson(jsonDecode(rustDoc.deltaJson)); if (documentDelta != rustDelta) { Log.error("Receive : $rustDelta"); Log.error("Expected : $documentDelta"); diff --git a/frontend/rust-lib/flowy-core/src/services/view/controller.rs b/frontend/rust-lib/flowy-core/src/services/view/controller.rs index 53fefa9011..d2342b52ac 100644 --- a/frontend/rust-lib/flowy-core/src/services/view/controller.rs +++ b/frontend/rust-lib/flowy-core/src/services/view/controller.rs @@ -1,4 +1,9 @@ -use flowy_collaboration::entities::doc::{DocumentDelta, DocumentId}; +use bytes::Bytes; +use flowy_collaboration::entities::{ + doc::{DocumentDelta, DocumentId}, + prelude::Revision, + revision::RepeatedRevision, +}; use flowy_database::SqliteConnection; use futures::{FutureExt, StreamExt}; use std::{collections::HashSet, sync::Arc}; @@ -57,25 +62,35 @@ impl ViewController { } #[tracing::instrument(level = "debug", skip(self, params), fields(name = %params.name), err)] - pub(crate) async fn create_view_from_params(&self, params: CreateViewParams) -> Result { + pub(crate) async fn create_view_from_params(&self, mut params: CreateViewParams) -> Result { + let delta_data = Bytes::from(params.take_view_data()); + let user_id = self.user.user_id()?; + let repeated_revision: RepeatedRevision = + Revision::initial_revision(&user_id, ¶ms.view_id, delta_data).into(); + let _ = self + .document_ctx + .controller + .save_document(¶ms.view_id, repeated_revision) + .await?; let view = self.create_view_on_server(params).await?; - let view = self.create_view_on_local(view).await?; + let _ = self.create_view_on_local(view.clone()).await?; Ok(view) } - pub(crate) async fn create_view_on_local(&self, view: View) -> Result { + pub(crate) async fn create_view_on_local(&self, view: View) -> Result<(), FlowyError> { let conn = &*self.database.db_connection()?; let trash_can = self.trash_controller.clone(); conn.immediate_transaction::<_, FlowyError, _>(|| { - let _ = self.save_view(view.clone(), conn)?; - let _ = notify_views_changed(&view.belong_to_id, trash_can, &conn)?; + let belong_to_id = view.belong_to_id.clone(); + let _ = self.save_view(view, conn)?; + let _ = notify_views_changed(&belong_to_id, trash_can, &conn)?; Ok(()) })?; - Ok(view) + Ok(()) } pub(crate) fn save_view(&self, view: View, conn: &SqliteConnection) -> Result<(), FlowyError> { @@ -115,8 +130,7 @@ impl ViewController { #[tracing::instrument(level = "debug", skip(self, params), fields(doc_id = %params.doc_id), err)] pub(crate) async fn open_view(&self, params: DocumentId) -> Result { let doc_id = params.doc_id.clone(); - let db_pool = self.database.db_pool()?; - let editor = self.document_ctx.controller.open(¶ms.doc_id, db_pool).await?; + let editor = self.document_ctx.controller.open(¶ms.doc_id).await?; KV::set_str(LATEST_VIEW_ID, doc_id.clone()); let document_json = editor.document_json().await?; @@ -146,11 +160,7 @@ impl ViewController { #[tracing::instrument(level = "debug", skip(self, params), fields(doc_id = %params.doc_id), err)] pub(crate) async fn duplicate_view(&self, params: DocumentId) -> Result<(), FlowyError> { let view: View = ViewTableSql::read_view(¶ms.doc_id, &*self.database.db_connection()?)?.into(); - let editor = self - .document_ctx - .controller - .open(¶ms.doc_id, self.database.db_pool()?) - .await?; + let editor = self.document_ctx.controller.open(¶ms.doc_id).await?; let document_json = editor.document_json().await?; let duplicate_params = CreateViewParams { belong_to_id: view.belong_to_id.clone(), @@ -168,11 +178,7 @@ impl ViewController { #[tracing::instrument(level = "debug", skip(self, params), err)] pub(crate) async fn export_doc(&self, params: ExportParams) -> Result { - let editor = self - .document_ctx - .controller - .open(¶ms.doc_id, self.database.db_pool()?) - .await?; + let editor = self.document_ctx.controller.open(¶ms.doc_id).await?; let delta_json = editor.document_json().await?; Ok(ExportData { data: delta_json, @@ -211,12 +217,7 @@ impl ViewController { } pub(crate) async fn receive_document_delta(&self, params: DocumentDelta) -> Result { - let db_pool = self.document_ctx.user.db_pool()?; - let doc = self - .document_ctx - .controller - .apply_document_delta(params, db_pool) - .await?; + let doc = self.document_ctx.controller.apply_document_delta(params).await?; Ok(doc) } @@ -263,7 +264,7 @@ impl ViewController { let token = self.user.token()?; let server = self.server.clone(); let pool = self.database.db_pool()?; - // Opti: retry? + // TODO: Retry with RetryAction? tokio::spawn(async move { match server.read_view(&token, params).await { Ok(Some(view)) => match pool.get() { diff --git a/frontend/rust-lib/flowy-core/tests/workspace/app_test.rs b/frontend/rust-lib/flowy-core/tests/workspace/app_test.rs index 3aa3fa9558..442ca307a1 100644 --- a/frontend/rust-lib/flowy-core/tests/workspace/app_test.rs +++ b/frontend/rust-lib/flowy-core/tests/workspace/app_test.rs @@ -76,21 +76,3 @@ async fn app_create_with_view() { assert_eq!(view_from_db.belongings[0], view_a); assert_eq!(view_from_db.belongings[1], view_b); } - -// #[tokio::test] -// async fn app_set_trash_flag() { -// let test = AppTest::new().await; -// test.delete().await; -// -// let query = QueryAppRequest::new(&test.app.id).trash(); -// let _ = read_app(&test.sdk, query); -// } -// -// #[tokio::test] -// #[should_panic] -// async fn app_set_trash_flag_2() { -// let test = AppTest::new().await; -// test.move_app_to_trash().await; -// let query = QueryAppRequest::new(&test.app.id); -// let _ = read_app(&test.sdk, query); -// } diff --git a/frontend/rust-lib/flowy-document/src/services/controller.rs b/frontend/rust-lib/flowy-document/src/services/controller.rs index a4aaa59b12..d38c2e35af 100644 --- a/frontend/rust-lib/flowy-document/src/services/controller.rs +++ b/frontend/rust-lib/flowy-document/src/services/controller.rs @@ -14,7 +14,10 @@ use crate::{ }; use bytes::Bytes; use dashmap::DashMap; -use flowy_collaboration::entities::doc::{DocumentDelta, DocumentId, DocumentInfo}; +use flowy_collaboration::entities::{ + doc::{DocumentDelta, DocumentId, DocumentInfo}, + revision::RepeatedRevision, +}; use flowy_database::ConnectionPool; use flowy_error::FlowyResult; use lib_infra::future::FutureResult; @@ -52,19 +55,11 @@ impl DocumentController { Ok(()) } - #[tracing::instrument(level = "debug", skip(self, doc_id, pool), fields(doc_id), err)] - pub async fn open>( - &self, - doc_id: T, - pool: Arc, - ) -> Result, FlowyError> { + #[tracing::instrument(level = "debug", skip(self, doc_id), fields(doc_id), err)] + pub async fn open>(&self, doc_id: T) -> Result, FlowyError> { let doc_id = doc_id.as_ref(); tracing::Span::current().record("doc_id", &doc_id); - if !self.open_cache.contains(doc_id) { - let editor = self.make_editor(doc_id, pool.clone()).await?; - return Ok(editor); - } - self.open_cache.get(doc_id) + self.get_editor(doc_id).await } #[tracing::instrument(level = "debug", skip(self, doc_id), fields(doc_id), err)] @@ -85,17 +80,9 @@ impl DocumentController { Ok(()) } - #[tracing::instrument(level = "debug", skip(self, delta, db_pool), fields(doc_id = %delta.doc_id), err)] - pub async fn apply_document_delta( - &self, - delta: DocumentDelta, - db_pool: Arc, - ) -> Result { - if !self.open_cache.contains(&delta.doc_id) { - let _ = self.open(&delta.doc_id, db_pool).await?; - } - - let editor = self.open_cache.get(&delta.doc_id)?; + #[tracing::instrument(level = "debug", skip(self, delta), fields(doc_id = %delta.doc_id), err)] + pub async fn apply_document_delta(&self, delta: DocumentDelta) -> Result { + let editor = self.get_editor(&delta.doc_id).await?; let _ = editor.compose_local_delta(Bytes::from(delta.delta_json)).await?; let document_json = editor.document_json().await?; Ok(DocumentDelta { @@ -104,7 +91,23 @@ impl DocumentController { }) } - pub async fn save_document_delta(&self, delta: DocumentDelta) {} + pub async fn save_document>(&self, doc_id: T, revisions: RepeatedRevision) -> FlowyResult<()> { + let doc_id = doc_id.as_ref().to_owned(); + let db_pool = self.user.db_pool()?; + let rev_manager = self.make_rev_manager(&doc_id, db_pool)?; + let _ = rev_manager.reset_document(revisions).await?; + Ok(()) + } + + async fn get_editor(&self, doc_id: &str) -> FlowyResult> { + match self.open_cache.get(doc_id) { + None => { + let db_pool = self.user.db_pool()?; + self.make_editor(&doc_id, db_pool).await + }, + Some(editor) => Ok(editor), + } + } } impl DocumentController { @@ -173,28 +176,23 @@ impl OpenDocCache { pub(crate) fn contains(&self, doc_id: &str) -> bool { self.inner.get(doc_id).is_some() } - pub(crate) fn get(&self, doc_id: &str) -> Result, FlowyError> { + pub(crate) fn get(&self, doc_id: &str) -> Option> { if !self.contains(&doc_id) { - return Err(doc_not_found()); + return None; } let opened_doc = self.inner.get(doc_id).unwrap(); - Ok(opened_doc.clone()) + Some(opened_doc.clone()) } pub(crate) fn remove(&self, id: &str) { let doc_id = id.to_string(); - match self.get(id) { - Ok(editor) => editor.stop(), - Err(e) => log::error!("{}", e), + if let Some(editor) = self.get(id) { + editor.stop() } self.inner.remove(&doc_id); } } -fn doc_not_found() -> FlowyError { - FlowyError::record_not_found().context("Doc is close or you should call open first") -} - #[tracing::instrument(level = "debug", skip(state_receiver, receivers))] fn listen_ws_state_changed(mut state_receiver: WSStateReceiver, receivers: Arc) { tokio::spawn(async move { diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/cache.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/cache.rs index 1523b3b41f..241a6a622c 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/cache.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/cache.rs @@ -4,10 +4,10 @@ use crate::{ disk::{Persistence, RevisionDiskCache}, memory::{RevisionMemoryCache, RevisionMemoryCacheDelegate}, }, - sql_tables::{RevChangeset, RevTableState}, + sql_tables::{RevTableState, RevisionChangeset}, }; use dashmap::DashMap; -use flowy_collaboration::entities::revision::{RevState, Revision, RevisionRange}; +use flowy_collaboration::entities::revision::{Revision, RevisionRange, RevisionState}; use flowy_database::ConnectionPool; use flowy_error::{internal_error, FlowyResult}; use lib_infra::future::FutureResult; @@ -21,11 +21,9 @@ use std::{ }; use tokio::{sync::RwLock, task::spawn_blocking}; -type DocRevisionDiskCache = dyn RevisionDiskCache; - pub struct RevisionCache { doc_id: String, - pub disk_cache: Arc, + disk_cache: Arc>, memory_cache: Arc, sync_seq: Arc, latest_rev_id: AtomicI64, @@ -46,6 +44,29 @@ impl RevisionCache { } } + pub fn read_revisions(&self, doc_id: &str) -> FlowyResult> { + self.disk_cache.read_revisions(doc_id, None) + } + + #[tracing::instrument(level = "debug", skip(self, doc_id, revisions))] + pub fn reset_document(&self, doc_id: &str, revisions: Vec) -> FlowyResult<()> { + let disk_cache = self.disk_cache.clone(); + let conn = disk_cache.db_pool().get().map_err(internal_error)?; + let records = revisions + .into_iter() + .map(|revision| RevisionRecord { + revision, + state: RevisionState::StateLocal, + }) + .collect::>(); + + conn.immediate_transaction::<_, FlowyError, _>(|| { + let _ = disk_cache.delete_revisions(doc_id, None, &*conn)?; + let _ = disk_cache.write_revisions(records, &*conn)?; + Ok(()) + }) + } + #[tracing::instrument(level = "debug", skip(self, revision))] pub async fn add_local_revision(&self, revision: Revision) -> FlowyResult<()> { if self.memory_cache.contains(&revision.rev_id) { @@ -54,7 +75,7 @@ impl RevisionCache { let rev_id = revision.rev_id; let record = RevisionRecord { revision, - state: RevState::StateLocal, + state: RevisionState::StateLocal, }; let _ = self.memory_cache.add_revision(&record).await; self.sync_seq.add_revision(record).await?; @@ -70,7 +91,7 @@ impl RevisionCache { let rev_id = revision.rev_id; let record = RevisionRecord { revision, - state: RevState::Ack, + state: RevisionState::Ack, }; self.memory_cache.add_revision(&record).await; let _ = self.latest_rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(rev_id)); @@ -91,11 +112,13 @@ impl RevisionCache { pub async fn get_revision(&self, rev_id: i64) -> Option { match self.memory_cache.get_revision(&rev_id).await { - None => match self.disk_cache.read_revision(&self.doc_id, rev_id) { - Ok(Some(revision)) => Some(revision), - Ok(None) => { - tracing::warn!("Can't find revision in {} with rev_id: {}", &self.doc_id, rev_id); - None + None => match self.disk_cache.read_revisions(&self.doc_id, Some(vec![rev_id])) { + Ok(mut records) => { + if records.is_empty() { + tracing::warn!("Can't find revision in {} with rev_id: {}", &self.doc_id, rev_id); + } + assert_eq!(records.len(), 1); + records.pop() }, Err(e) => { tracing::error!("{}", e); @@ -112,7 +135,7 @@ impl RevisionCache { if records.len() != range_len { let disk_cache = self.disk_cache.clone(); let doc_id = self.doc_id.clone(); - records = spawn_blocking(move || disk_cache.revisions_in_range(&doc_id, &range)) + records = spawn_blocking(move || disk_cache.read_revisions_with_range(&doc_id, &range)) .await .map_err(internal_error)??; @@ -134,9 +157,13 @@ impl RevisionCache { match sync_seq.next_sync_revision().await { None => match sync_seq.next_sync_rev_id().await { None => Ok(None), - Some(rev_id) => match disk_cache.read_revision(&doc_id, rev_id)? { - None => Ok(None), - Some(record) => Ok(Some(record.revision)), + Some(rev_id) => { + let records = disk_cache.read_revisions(&doc_id, Some(vec![rev_id]))?; + let mut revisions = records + .into_iter() + .map(|record| record.revision) + .collect::>(); + Ok(revisions.pop()) }, }, Some((_, record)) => Ok(Some(record.revision)), @@ -146,10 +173,13 @@ impl RevisionCache { } impl RevisionMemoryCacheDelegate for Arc { - fn receive_checkpoint(&self, records: Vec) -> FlowyResult<()> { self.create_revisions(records) } + fn receive_checkpoint(&self, records: Vec) -> FlowyResult<()> { + let conn = &*self.pool.get().map_err(internal_error)?; + self.write_revisions(records, &conn) + } fn receive_ack(&self, doc_id: &str, rev_id: i64) { - let changeset = RevChangeset { + let changeset = RevisionChangeset { doc_id: doc_id.to_string(), rev_id: rev_id.into(), state: RevTableState::Acked, @@ -164,11 +194,11 @@ impl RevisionMemoryCacheDelegate for Arc { #[derive(Clone)] pub struct RevisionRecord { pub revision: Revision, - pub state: RevState, + pub state: RevisionState, } impl RevisionRecord { - pub fn ack(&mut self) { self.state = RevState::Ack; } + pub fn ack(&mut self) { self.state = RevisionState::Ack; } } struct RevisionSyncSeq { diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/disk.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/disk.rs index ec6a117e41..fb16b397d8 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/disk.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/disk.rs @@ -1,6 +1,7 @@ use crate::services::doc::revision::RevisionRecord; -use crate::sql_tables::{RevChangeset, RevTableSql}; +use crate::sql_tables::{RevTableSql, RevisionChangeset}; +use diesel::SqliteConnection; use flowy_collaboration::entities::revision::RevisionRange; use flowy_database::ConnectionPool; use flowy_error::{internal_error, FlowyError, FlowyResult}; @@ -8,11 +9,22 @@ use std::{fmt::Debug, sync::Arc}; pub trait RevisionDiskCache: Sync + Send { type Error: Debug; - fn create_revisions(&self, revisions: Vec) -> Result<(), Self::Error>; - fn revisions_in_range(&self, doc_id: &str, range: &RevisionRange) -> Result, Self::Error>; - fn read_revision(&self, doc_id: &str, rev_id: i64) -> Result, Self::Error>; - fn read_revisions(&self, doc_id: &str) -> Result, Self::Error>; - fn update_revisions(&self, changesets: Vec) -> FlowyResult<()>; + fn write_revisions(&self, revisions: Vec, conn: &SqliteConnection) -> Result<(), Self::Error>; + fn read_revisions(&self, doc_id: &str, rev_ids: Option>) -> Result, Self::Error>; + fn read_revisions_with_range( + &self, + doc_id: &str, + range: &RevisionRange, + ) -> Result, Self::Error>; + fn update_revisions(&self, changesets: Vec) -> FlowyResult<()>; + fn delete_revisions( + &self, + doc_id: &str, + rev_ids: Option>, + conn: &SqliteConnection, + ) -> Result<(), Self::Error>; + + fn db_pool(&self) -> Arc; } pub(crate) struct Persistence { @@ -23,33 +35,28 @@ pub(crate) struct Persistence { impl RevisionDiskCache for Persistence { type Error = FlowyError; - fn create_revisions(&self, revisions: Vec) -> Result<(), Self::Error> { - let conn = &*self.pool.get().map_err(internal_error)?; - conn.immediate_transaction::<_, FlowyError, _>(|| { - let _ = RevTableSql::create_rev_table(revisions, conn)?; - Ok(()) - }) + fn write_revisions(&self, revisions: Vec, conn: &SqliteConnection) -> Result<(), Self::Error> { + let _ = RevTableSql::create_rev_table(revisions, conn)?; + Ok(()) } - fn revisions_in_range(&self, doc_id: &str, range: &RevisionRange) -> Result, Self::Error> { - let conn = &*self.pool.get().map_err(internal_error).unwrap(); + fn read_revisions(&self, doc_id: &str, rev_ids: Option>) -> Result, Self::Error> { + let conn = self.pool.get().map_err(internal_error)?; + let records = RevTableSql::read_rev_tables(&self.user_id, doc_id, rev_ids, &*conn)?; + Ok(records) + } + + fn read_revisions_with_range( + &self, + doc_id: &str, + range: &RevisionRange, + ) -> Result, Self::Error> { + let conn = &*self.pool.get().map_err(internal_error)?; let revisions = RevTableSql::read_rev_tables_with_range(&self.user_id, doc_id, range.clone(), conn)?; Ok(revisions) } - fn read_revision(&self, doc_id: &str, rev_id: i64) -> Result, Self::Error> { - let conn = self.pool.get().map_err(internal_error)?; - let some = RevTableSql::read_rev_table(&self.user_id, doc_id, &rev_id, &*conn)?; - Ok(some) - } - - fn read_revisions(&self, doc_id: &str) -> Result, Self::Error> { - let conn = self.pool.get().map_err(internal_error)?; - let some = RevTableSql::read_rev_tables(&self.user_id, doc_id, &*conn)?; - Ok(some) - } - - fn update_revisions(&self, changesets: Vec) -> FlowyResult<()> { + fn update_revisions(&self, changesets: Vec) -> FlowyResult<()> { let conn = &*self.pool.get().map_err(internal_error)?; let _ = conn.immediate_transaction::<_, FlowyError, _>(|| { for changeset in changesets { @@ -59,6 +66,18 @@ impl RevisionDiskCache for Persistence { })?; Ok(()) } + + fn delete_revisions( + &self, + doc_id: &str, + rev_ids: Option>, + conn: &SqliteConnection, + ) -> Result<(), Self::Error> { + let _ = RevTableSql::delete_rev_tables(doc_id, rev_ids, conn)?; + Ok(()) + } + + fn db_pool(&self) -> Arc { self.pool.clone() } } impl Persistence { diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs index 15152c8f09..98b68495a1 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs @@ -3,7 +3,7 @@ use bytes::Bytes; use flowy_collaboration::{ entities::{ doc::DocumentInfo, - revision::{RevState, RevType, Revision, RevisionRange}, + revision::{RepeatedRevision, RevType, Revision, RevisionRange, RevisionState}, }, util::{md5, RevIdCounter}, }; @@ -51,6 +51,11 @@ impl RevisionManager { Ok(doc.delta()?) } + #[tracing::instrument(level = "debug", skip(self, revisions), err)] + pub async fn reset_document(&self, revisions: RepeatedRevision) -> FlowyResult<()> { + self.cache.reset_document(&self.doc_id, revisions.into_inner()) + } + pub async fn add_remote_revision(&self, revision: &Revision) -> Result<(), FlowyError> { assert_eq!(revision.ty, RevType::Remote); self.rev_id_counter.set(revision.rev_id); @@ -108,7 +113,7 @@ struct RevisionLoader { impl RevisionLoader { async fn load(&self) -> Result, FlowyError> { - let records = self.cache.disk_cache.read_revisions(&self.doc_id)?; + let records = self.cache.read_revisions(&self.doc_id)?; let revisions: Vec; if records.is_empty() { let doc = self.server.fetch_document(&self.doc_id).await?; @@ -128,11 +133,11 @@ impl RevisionLoader { } else { for record in &records { match record.state { - RevState::StateLocal => match self.cache.add_local_revision(record.revision.clone()).await { + RevisionState::StateLocal => match self.cache.add_local_revision(record.revision.clone()).await { Ok(_) => {}, Err(e) => tracing::error!("{}", e), }, - RevState::Ack => {}, + RevisionState::Ack => {}, } } revisions = records.into_iter().map(|record| record.revision).collect::<_>(); diff --git a/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_sql.rs b/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_sql.rs index 9f98133a68..1273103b32 100644 --- a/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_sql.rs +++ b/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_sql.rs @@ -1,7 +1,7 @@ use crate::{ errors::FlowyError, services::doc::revision::RevisionRecord, - sql_tables::{doc::RevTable, mk_revision_record_from_table, RevChangeset, RevTableState, RevTableType}, + sql_tables::{doc::RevTable, mk_revision_record_from_table, RevTableState, RevTableType, RevisionChangeset}, }; use diesel::update; use flowy_collaboration::entities::revision::RevisionRange; @@ -32,7 +32,7 @@ impl RevTableSql { Ok(()) } - pub(crate) fn update_rev_table(changeset: RevChangeset, conn: &SqliteConnection) -> Result<(), FlowyError> { + pub(crate) fn update_rev_table(changeset: RevisionChangeset, conn: &SqliteConnection) -> Result<(), FlowyError> { let filter = dsl::rev_table .filter(dsl::rev_id.eq(changeset.rev_id.as_ref())) .filter(dsl::doc_id.eq(changeset.doc_id)); @@ -44,36 +44,20 @@ impl RevTableSql { pub(crate) fn read_rev_tables( user_id: &str, doc_id: &str, + rev_ids: Option>, conn: &SqliteConnection, ) -> Result, FlowyError> { - let filter = dsl::rev_table - .filter(dsl::doc_id.eq(doc_id)) - .order(dsl::rev_id.asc()) - .into_boxed(); - let rev_tables = filter.load::(conn)?; - let revisions = rev_tables - .into_iter() - .map(|table| mk_revision_record_from_table(user_id, table)) - .collect::>(); - Ok(revisions) - } - - pub(crate) fn read_rev_table( - user_id: &str, - doc_id: &str, - revision_id: &i64, - conn: &SqliteConnection, - ) -> Result, FlowyError> { - let filter = dsl::rev_table - .filter(dsl::doc_id.eq(doc_id)) - .filter(dsl::rev_id.eq(revision_id)); - let result = filter.first::(conn); - - if Err(diesel::NotFound) == result { - Ok(None) - } else { - Ok(Some(mk_revision_record_from_table(user_id, result?))) + let mut sql = dsl::rev_table.filter(dsl::doc_id.eq(doc_id)).into_boxed(); + if let Some(rev_ids) = rev_ids { + sql = sql.filter(dsl::rev_id.eq_any(rev_ids)); } + let rows = sql.order(dsl::rev_id.asc()).load::(conn)?; + let records = rows + .into_iter() + .map(|row| mk_revision_record_from_table(user_id, row)) + .collect::>(); + + Ok(records) } pub(crate) fn read_rev_tables_with_range( @@ -96,13 +80,18 @@ impl RevTableSql { Ok(revisions) } - #[allow(dead_code)] - pub(crate) fn delete_rev_table(doc_id_s: &str, rev_id_s: i64, conn: &SqliteConnection) -> Result<(), FlowyError> { - let filter = dsl::rev_table - .filter(dsl::rev_id.eq(rev_id_s)) - .filter(dsl::doc_id.eq(doc_id_s)); - let affected_row = diesel::delete(filter).execute(conn)?; - debug_assert_eq!(affected_row, 1); + pub(crate) fn delete_rev_tables( + doc_id: &str, + rev_ids: Option>, + conn: &SqliteConnection, + ) -> Result<(), FlowyError> { + let mut sql = dsl::rev_table.filter(dsl::doc_id.eq(doc_id)).into_boxed(); + if let Some(rev_ids) = rev_ids { + sql = sql.filter(dsl::rev_id.eq_any(rev_ids)); + } + + let affected_row = sql.execute(conn)?; + tracing::debug!("Delete {} revision rows", affected_row); Ok(()) } } diff --git a/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_table.rs b/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_table.rs index 92e9e39137..f5989ca1cc 100644 --- a/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_table.rs +++ b/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_table.rs @@ -1,7 +1,7 @@ use crate::services::doc::revision::RevisionRecord; use diesel::sql_types::Integer; use flowy_collaboration::{ - entities::revision::{RevId, RevState, RevType, Revision}, + entities::revision::{RevId, RevType, Revision, RevisionState}, util::md5, }; use flowy_database::schema::rev_table; @@ -48,20 +48,20 @@ impl RevTableState { } impl_sql_integer_expression!(RevTableState); -impl std::convert::From for RevState { +impl std::convert::From for RevisionState { fn from(s: RevTableState) -> Self { match s { - RevTableState::Local => RevState::StateLocal, - RevTableState::Acked => RevState::Ack, + RevTableState::Local => RevisionState::StateLocal, + RevTableState::Acked => RevisionState::Ack, } } } -impl std::convert::From for RevTableState { - fn from(s: RevState) -> Self { +impl std::convert::From for RevTableState { + fn from(s: RevisionState) -> Self { match s { - RevState::StateLocal => RevTableState::Local, - RevState::Ack => RevTableState::Acked, + RevisionState::StateLocal => RevTableState::Local, + RevisionState::Ack => RevTableState::Acked, } } } @@ -130,7 +130,7 @@ impl RevTableType { } impl_sql_integer_expression!(RevTableType); -pub struct RevChangeset { +pub struct RevisionChangeset { pub(crate) doc_id: String, pub(crate) rev_id: RevId, pub(crate) state: RevTableState, diff --git a/frontend/rust-lib/flowy-test/src/doc_script.rs b/frontend/rust-lib/flowy-test/src/doc_script.rs index 0a4929442d..11155e37fa 100644 --- a/frontend/rust-lib/flowy-test/src/doc_script.rs +++ b/frontend/rust-lib/flowy-test/src/doc_script.rs @@ -1,5 +1,5 @@ use crate::{helper::ViewTest, FlowySDKTest}; -use flowy_collaboration::entities::revision::RevState; +use flowy_collaboration::entities::revision::RevisionState; use flowy_document::services::doc::{edit::ClientDocumentEditor, SYNC_INTERVAL_IN_MILLIS}; use lib_ot::{core::Interval, rich_text::RichTextDelta}; use std::sync::Arc; @@ -12,7 +12,7 @@ pub enum EditorScript { Delete(Interval), Replace(Interval, &'static str), - AssertRevisionState(i64, RevState), + AssertRevisionState(i64, RevisionState), AssertNextRevId(Option), AssertCurrentRevId(i64), AssertJson(&'static str), @@ -30,8 +30,7 @@ impl EditorTest { let sdk = FlowySDKTest::setup(); let _ = sdk.init_user().await; let test = ViewTest::new(&sdk).await; - let db_pool = sdk.user_session.db_pool().unwrap(); - let editor = sdk.document_ctx.controller.open(&test.view.id, db_pool).await.unwrap(); + let editor = sdk.document_ctx.controller.open(&test.view.id).await.unwrap(); Self { sdk, editor } } diff --git a/frontend/rust-lib/flowy-test/tests/revision_test.rs b/frontend/rust-lib/flowy-test/tests/revision_test.rs index 563dd0454d..49b2e47236 100644 --- a/frontend/rust-lib/flowy-test/tests/revision_test.rs +++ b/frontend/rust-lib/flowy-test/tests/revision_test.rs @@ -1,4 +1,4 @@ -use flowy_collaboration::entities::revision::RevState; +use flowy_collaboration::entities::revision::RevisionState; use flowy_test::doc_script::{EditorScript::*, *}; #[tokio::test] @@ -22,8 +22,8 @@ async fn doc_sync_retry_ws_conn() { InsertText("3", 2), StartWs, WaitSyncFinished, - AssertRevisionState(2, RevState::Ack), - AssertRevisionState(3, RevState::Ack), + AssertRevisionState(2, RevisionState::Ack), + AssertRevisionState(3, RevisionState::Ack), AssertNextRevId(None), AssertJson(r#"[{"insert":"123\n"}]"#), ]; diff --git a/shared-lib/flowy-collaboration/src/entities/revision.rs b/shared-lib/flowy-collaboration/src/entities/revision.rs index 30089acd32..f418911b1f 100644 --- a/shared-lib/flowy-collaboration/src/entities/revision.rs +++ b/shared-lib/flowy-collaboration/src/entities/revision.rs @@ -39,9 +39,23 @@ impl Revision { pub fn pair_rev_id(&self) -> (i64, i64) { (self.base_rev_id, self.rev_id) } - #[allow(dead_code)] pub fn is_initial(&self) -> bool { self.rev_id == 0 } + pub fn initial_revision(user_id: &str, doc_id: &str, delta_data: Bytes) -> Self { + let user_id = user_id.to_owned(); + let doc_id = doc_id.to_owned(); + let md5 = md5(&delta_data); + Self { + base_rev_id: 0, + rev_id: 0, + delta_data: delta_data.to_vec(), + md5, + doc_id, + ty: RevType::Local, + user_id, + } + } + pub fn new( doc_id: &str, base_rev_id: i64, @@ -51,11 +65,11 @@ impl Revision { user_id: &str, md5: String, ) -> Revision { + let user_id = user_id.to_owned(); let doc_id = doc_id.to_owned(); let delta_data = delta_data.to_vec(); let base_rev_id = base_rev_id; let rev_id = rev_id; - let user_id = user_id.to_owned(); if base_rev_id != 0 { debug_assert!(base_rev_id != rev_id); @@ -73,6 +87,10 @@ impl Revision { } } +impl std::convert::From for RepeatedRevision { + fn from(revision: Revision) -> Self { RepeatedRevision { items: vec![revision] } } +} + impl std::fmt::Debug for Revision { fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { let _ = f.write_fmt(format_args!("doc_id {}, ", self.doc_id))?; @@ -142,6 +160,8 @@ impl std::fmt::Display for RevId { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_fmt(format_args!("{}", self.value)) } } +// Deprecated +// TODO: remove RevType #[derive(Debug, ProtoBuf_Enum, Clone, Eq, PartialEq)] pub enum RevType { Local = 0, @@ -193,7 +213,7 @@ pub fn md5>(data: T) -> String { } #[derive(Debug, Clone, Eq, PartialEq)] -pub enum RevState { +pub enum RevisionState { StateLocal = 0, Ack = 1, } diff --git a/shared-lib/flowy-core-data-model/src/entities/view/view_create.rs b/shared-lib/flowy-core-data-model/src/entities/view/view_create.rs index 819f6a5cb9..ee85b476ad 100644 --- a/shared-lib/flowy-core-data-model/src/entities/view/view_create.rs +++ b/shared-lib/flowy-core-data-model/src/entities/view/view_create.rs @@ -69,6 +69,7 @@ pub struct CreateViewParams { #[pb(index = 5)] pub view_type: ViewType, + // ViewType::Doc -> Delta string #[pb(index = 6)] pub view_data: String, @@ -96,6 +97,8 @@ impl CreateViewParams { view_id, } } + + pub fn take_view_data(&mut self) -> String { ::std::mem::replace(&mut self.view_data, String::new()) } } impl TryInto for CreateViewRequest {