chore: fix potential when open a workspace with retry will cause lock… (#6344)

* chore: fix potential when open a workspace with retry will cause lock issue

* chore: disable verbose log
This commit is contained in:
Nathan.fooo 2024-09-18 21:00:01 +08:00 committed by GitHub
parent 71e7f54367
commit f9f5ae04d0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 78 additions and 26 deletions

View File

@ -2051,7 +2051,6 @@ dependencies = [
"strum",
"strum_macros 0.25.2",
"tokio",
"tokio-retry",
"tokio-util",
"tracing",
"url",

View File

@ -50,7 +50,6 @@ strum = "0.25"
strum_macros = "0.25"
validator = { workspace = true, features = ["derive"] }
tokio-util.workspace = true
tokio-retry = "0.3"
moka = { version = "0.12.8", features = ["future"] }
[dev-dependencies]

View File

@ -1,9 +1,6 @@
use anyhow::anyhow;
use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use std::collections::HashMap;
use std::sync::{Arc, Weak};
use collab::core::collab::DataSource;
use collab::core::origin::CollabOrigin;
use collab::lock::RwLock;
@ -20,6 +17,9 @@ use collab_database::workspace_database::{
use collab_entity::{CollabObject, CollabType, EncodedCollab};
use collab_plugins::local_storage::kv::KVTransactionDB;
use rayon::prelude::*;
use std::collections::HashMap;
use std::sync::{Arc, Weak};
use std::time::Duration;
use tokio::sync::Mutex;
use tracing::{error, info, instrument, trace, warn};
@ -41,8 +41,6 @@ use crate::services::field::translate_type_option::translate::TranslateTypeOptio
use crate::services::field_settings::default_field_settings_by_layout_map;
use crate::services::share::csv::{CSVFormat, CSVImporter, ImportResult};
use tokio::sync::RwLock as TokioRwLock;
use tokio_retry::strategy::ExponentialBackoff;
use tokio_retry::RetryIf;
pub trait DatabaseUser: Send + Sync {
fn user_id(&self) -> Result<i64, FlowyError>;
@ -95,6 +93,7 @@ impl DatabaseManager {
self.removing_editor.lock().await.clear();
// 3. Clear the workspace database
if let Some(old_workspace_database) = self.workspace_database.swap(None) {
info!("Close the old workspace database");
let wdb = old_workspace_database.read().await;
wdb.close();
}
@ -148,6 +147,7 @@ impl DatabaseManager {
let lock = self.workspace_database()?;
let wdb = lock.read().await;
let database_collab = wdb.get_or_init_database(database_id).await?;
drop(wdb);
let lock_guard = database_collab.read().await;
Ok(lock_guard.get_inline_view_id())
}
@ -161,6 +161,7 @@ impl DatabaseManager {
items
}
#[instrument(level = "trace", skip_all, err)]
pub async fn update_database_indexing(
&self,
view_ids_by_database_id: HashMap<String, Vec<String>>,
@ -237,25 +238,11 @@ impl DatabaseManager {
return Ok(database_editor);
}
trace!("init database editor:{}", database_id);
trace!("[Database]: init database editor:{}", database_id);
// When the user opens the database from the left-side bar, it may fail because the workspace database
// hasn't finished syncing yet. In such cases, get_or_create_database will return None.
// The workaround is to add a retry mechanism to attempt fetching the database again.
let database = RetryIf::spawn(
ExponentialBackoff::from_millis(3).factor(1000).take(2),
|| async {
trace!("retry to open database:{}", database_id);
let database = workspace_database
.read()
.await
.get_or_init_database(database_id)
.await?;
Ok::<_, DatabaseError>(database)
},
|err: &DatabaseError| !err.is_no_required_data(),
)
.await?;
let database = open_database_with_retry(workspace_database, database_id).await?;
let editor = DatabaseEditor::new(
self.user.clone(),
database,
@ -273,11 +260,15 @@ impl DatabaseManager {
}
/// Open the database view
#[instrument(level = "trace", skip_all, err)]
pub async fn open_database_view<T: AsRef<str>>(&self, view_id: T) -> FlowyResult<()> {
let view_id = view_id.as_ref();
let lock = self.workspace_database()?;
let workspace_database = lock.read().await;
if let Some(database_id) = workspace_database.get_database_id_with_view_id(view_id) {
let result = workspace_database.get_database_id_with_view_id(view_id);
drop(workspace_database);
if let Some(database_id) = result {
let is_not_exist = self.editors.lock().await.get(&database_id).is_none();
if is_not_exist {
let _ = self.open_database(&database_id).await?;
@ -286,11 +277,14 @@ impl DatabaseManager {
Ok(())
}
#[instrument(level = "trace", skip_all, err)]
pub async fn close_database_view<T: AsRef<str>>(&self, view_id: T) -> FlowyResult<()> {
let view_id = view_id.as_ref();
let lock = self.workspace_database()?;
let workspace_database = lock.read().await;
let database_id = workspace_database.get_database_id_with_view_id(view_id);
drop(workspace_database);
if let Some(database_id) = database_id {
let mut editors = self.editors.lock().await;
let mut should_remove = false;
@ -383,6 +377,8 @@ impl DatabaseManager {
let lock = self.workspace_database()?;
let mut wdb = lock.write().await;
let database = wdb.create_database(create_database_params).await?;
drop(wdb);
let encoded_collab = database
.read()
.await
@ -398,6 +394,8 @@ impl DatabaseManager {
let lock = self.workspace_database()?;
let mut wdb = lock.write().await;
let database = wdb.create_database(params).await?;
drop(wdb);
Ok(database)
}
@ -411,8 +409,8 @@ impl DatabaseManager {
database_view_id: String,
database_parent_view_id: String,
) -> FlowyResult<()> {
let lock = self.workspace_database()?;
let mut wdb = lock.write().await;
let workspace_database = self.workspace_database()?;
let mut wdb = workspace_database.write().await;
let mut params = CreateViewParams::new(database_id.clone(), database_view_id, name, layout);
if let Ok(database) = wdb.get_or_init_database(&database_id).await {
let (field, layout_setting, field_settings_map) =
@ -1044,3 +1042,59 @@ impl DatabaseCollabPersistenceService for DatabasePersistenceImpl {
(vec![], row_ids)
}
}
async fn open_database_with_retry(
workspace_database: Arc<RwLock<WorkspaceDatabase>>,
database_id: &str,
) -> Result<Arc<RwLock<Database>>, DatabaseError> {
let max_retries = 3;
let retry_interval = Duration::from_secs(4);
for attempt in 1..=max_retries {
trace!(
"[Database]: attempt {} to open database:{}",
attempt,
database_id
);
let result = workspace_database
.try_read()
.map_err(|err| DatabaseError::Internal(anyhow!("workspace database lock fail: {}", err)))?
.get_or_init_database(database_id)
.await;
// Attempt to open the database
match result {
Ok(database) => return Ok(database),
Err(err) => {
if matches!(err, DatabaseError::RecordNotFound)
|| matches!(err, DatabaseError::NoRequiredData(_))
{
error!(
"[Database]: retry {} to open database:{}, error:{}",
attempt, database_id, err
);
if attempt < max_retries {
tokio::time::sleep(retry_interval).await;
} else {
error!(
"[Database]: exhausted retries to open database:{}, error:{}",
database_id, err
);
return Err(err);
}
} else {
error!(
"[Database]: stop retrying to open database:{}, error:{}",
database_id, err
);
return Err(err);
}
},
}
}
Err(DatabaseError::Internal(anyhow!(
"Exhausted retries to open database: {}",
database_id
)))
}