diff --git a/frontend/rust-lib/Cargo.lock b/frontend/rust-lib/Cargo.lock index d401b58a7f..fffaee2340 100644 --- a/frontend/rust-lib/Cargo.lock +++ b/frontend/rust-lib/Cargo.lock @@ -2051,7 +2051,6 @@ dependencies = [ "strum", "strum_macros 0.25.2", "tokio", - "tokio-retry", "tokio-util", "tracing", "url", diff --git a/frontend/rust-lib/flowy-database2/Cargo.toml b/frontend/rust-lib/flowy-database2/Cargo.toml index 9c97bf39aa..0373335c85 100644 --- a/frontend/rust-lib/flowy-database2/Cargo.toml +++ b/frontend/rust-lib/flowy-database2/Cargo.toml @@ -50,7 +50,6 @@ strum = "0.25" strum_macros = "0.25" validator = { workspace = true, features = ["derive"] } tokio-util.workspace = true -tokio-retry = "0.3" moka = { version = "0.12.8", features = ["future"] } [dev-dependencies] diff --git a/frontend/rust-lib/flowy-database2/src/manager.rs b/frontend/rust-lib/flowy-database2/src/manager.rs index 18bec2c4f8..8311e2acc4 100644 --- a/frontend/rust-lib/flowy-database2/src/manager.rs +++ b/frontend/rust-lib/flowy-database2/src/manager.rs @@ -1,9 +1,6 @@ use anyhow::anyhow; use arc_swap::ArcSwapOption; use async_trait::async_trait; -use std::collections::HashMap; -use std::sync::{Arc, Weak}; - use collab::core::collab::DataSource; use collab::core::origin::CollabOrigin; use collab::lock::RwLock; @@ -20,6 +17,9 @@ use collab_database::workspace_database::{ use collab_entity::{CollabObject, CollabType, EncodedCollab}; use collab_plugins::local_storage::kv::KVTransactionDB; use rayon::prelude::*; +use std::collections::HashMap; +use std::sync::{Arc, Weak}; +use std::time::Duration; use tokio::sync::Mutex; use tracing::{error, info, instrument, trace, warn}; @@ -41,8 +41,6 @@ use crate::services::field::translate_type_option::translate::TranslateTypeOptio use crate::services::field_settings::default_field_settings_by_layout_map; use crate::services::share::csv::{CSVFormat, CSVImporter, ImportResult}; use tokio::sync::RwLock as TokioRwLock; -use tokio_retry::strategy::ExponentialBackoff; -use tokio_retry::RetryIf; pub trait DatabaseUser: Send + Sync { fn user_id(&self) -> Result; @@ -95,6 +93,7 @@ impl DatabaseManager { self.removing_editor.lock().await.clear(); // 3. Clear the workspace database if let Some(old_workspace_database) = self.workspace_database.swap(None) { + info!("Close the old workspace database"); let wdb = old_workspace_database.read().await; wdb.close(); } @@ -148,6 +147,7 @@ impl DatabaseManager { let lock = self.workspace_database()?; let wdb = lock.read().await; let database_collab = wdb.get_or_init_database(database_id).await?; + drop(wdb); let lock_guard = database_collab.read().await; Ok(lock_guard.get_inline_view_id()) } @@ -161,6 +161,7 @@ impl DatabaseManager { items } + #[instrument(level = "trace", skip_all, err)] pub async fn update_database_indexing( &self, view_ids_by_database_id: HashMap>, @@ -237,25 +238,11 @@ impl DatabaseManager { return Ok(database_editor); } - trace!("init database editor:{}", database_id); + trace!("[Database]: init database editor:{}", database_id); // When the user opens the database from the left-side bar, it may fail because the workspace database // hasn't finished syncing yet. In such cases, get_or_create_database will return None. // The workaround is to add a retry mechanism to attempt fetching the database again. - let database = RetryIf::spawn( - ExponentialBackoff::from_millis(3).factor(1000).take(2), - || async { - trace!("retry to open database:{}", database_id); - let database = workspace_database - .read() - .await - .get_or_init_database(database_id) - .await?; - Ok::<_, DatabaseError>(database) - }, - |err: &DatabaseError| !err.is_no_required_data(), - ) - .await?; - + let database = open_database_with_retry(workspace_database, database_id).await?; let editor = DatabaseEditor::new( self.user.clone(), database, @@ -273,11 +260,15 @@ impl DatabaseManager { } /// Open the database view + #[instrument(level = "trace", skip_all, err)] pub async fn open_database_view>(&self, view_id: T) -> FlowyResult<()> { let view_id = view_id.as_ref(); let lock = self.workspace_database()?; let workspace_database = lock.read().await; - if let Some(database_id) = workspace_database.get_database_id_with_view_id(view_id) { + let result = workspace_database.get_database_id_with_view_id(view_id); + drop(workspace_database); + + if let Some(database_id) = result { let is_not_exist = self.editors.lock().await.get(&database_id).is_none(); if is_not_exist { let _ = self.open_database(&database_id).await?; @@ -286,11 +277,14 @@ impl DatabaseManager { Ok(()) } + #[instrument(level = "trace", skip_all, err)] pub async fn close_database_view>(&self, view_id: T) -> FlowyResult<()> { let view_id = view_id.as_ref(); let lock = self.workspace_database()?; let workspace_database = lock.read().await; let database_id = workspace_database.get_database_id_with_view_id(view_id); + drop(workspace_database); + if let Some(database_id) = database_id { let mut editors = self.editors.lock().await; let mut should_remove = false; @@ -383,6 +377,8 @@ impl DatabaseManager { let lock = self.workspace_database()?; let mut wdb = lock.write().await; let database = wdb.create_database(create_database_params).await?; + drop(wdb); + let encoded_collab = database .read() .await @@ -398,6 +394,8 @@ impl DatabaseManager { let lock = self.workspace_database()?; let mut wdb = lock.write().await; let database = wdb.create_database(params).await?; + drop(wdb); + Ok(database) } @@ -411,8 +409,8 @@ impl DatabaseManager { database_view_id: String, database_parent_view_id: String, ) -> FlowyResult<()> { - let lock = self.workspace_database()?; - let mut wdb = lock.write().await; + let workspace_database = self.workspace_database()?; + let mut wdb = workspace_database.write().await; let mut params = CreateViewParams::new(database_id.clone(), database_view_id, name, layout); if let Ok(database) = wdb.get_or_init_database(&database_id).await { let (field, layout_setting, field_settings_map) = @@ -1044,3 +1042,59 @@ impl DatabaseCollabPersistenceService for DatabasePersistenceImpl { (vec![], row_ids) } } +async fn open_database_with_retry( + workspace_database: Arc>, + database_id: &str, +) -> Result>, DatabaseError> { + let max_retries = 3; + let retry_interval = Duration::from_secs(4); + for attempt in 1..=max_retries { + trace!( + "[Database]: attempt {} to open database:{}", + attempt, + database_id + ); + + let result = workspace_database + .try_read() + .map_err(|err| DatabaseError::Internal(anyhow!("workspace database lock fail: {}", err)))? + .get_or_init_database(database_id) + .await; + + // Attempt to open the database + match result { + Ok(database) => return Ok(database), + Err(err) => { + if matches!(err, DatabaseError::RecordNotFound) + || matches!(err, DatabaseError::NoRequiredData(_)) + { + error!( + "[Database]: retry {} to open database:{}, error:{}", + attempt, database_id, err + ); + + if attempt < max_retries { + tokio::time::sleep(retry_interval).await; + } else { + error!( + "[Database]: exhausted retries to open database:{}, error:{}", + database_id, err + ); + return Err(err); + } + } else { + error!( + "[Database]: stop retrying to open database:{}, error:{}", + database_id, err + ); + return Err(err); + } + }, + } + } + + Err(DatabaseError::Internal(anyhow!( + "Exhausted retries to open database: {}", + database_id + ))) +}