diff --git a/frontend/appflowy_flutter/macos/Podfile.lock b/frontend/appflowy_flutter/macos/Podfile.lock index f4bedfef79..9c949dd02f 100644 --- a/frontend/appflowy_flutter/macos/Podfile.lock +++ b/frontend/appflowy_flutter/macos/Podfile.lock @@ -171,7 +171,7 @@ SPEC CHECKSUMS: super_native_extensions: 85efee3a7495b46b04befcfc86ed12069264ebf3 url_launcher_macos: c82c93949963e55b228a30115bd219499a6fe404 webview_flutter_wkwebview: 0982481e3d9c78fd5c6f62a002fcd24fc791f1e4 - window_manager: 3a1844359a6295ab1e47659b1a777e36773cd6e8 + window_manager: 990c8e348c4da2a93b81da638245d40554ec9436 PODFILE CHECKSUM: 0532f3f001ca3110b8be345d6491fff690e95823 diff --git a/frontend/rust-lib/event-integration-test/src/lib.rs b/frontend/rust-lib/event-integration-test/src/lib.rs index ff0a3847df..127a356284 100644 --- a/frontend/rust-lib/event-integration-test/src/lib.rs +++ b/frontend/rust-lib/event-integration-test/src/lib.rs @@ -100,7 +100,7 @@ impl EventIntegrationTest { Self::new_with_config(config).await } - pub fn skip_clean(&mut self) { + pub fn skip_auto_remove_temp_dir(&mut self) { self.cleaner.should_clean.store(false, Ordering::Release); } diff --git a/frontend/rust-lib/event-integration-test/src/user_event.rs b/frontend/rust-lib/event-integration-test/src/user_event.rs index 821c3c9a1d..11a653b723 100644 --- a/frontend/rust-lib/event-integration-test/src/user_event.rs +++ b/frontend/rust-lib/event-integration-test/src/user_event.rs @@ -60,7 +60,7 @@ impl EventIntegrationTest { pub async fn sign_up_as_anon(&self) -> SignUpContext { let password = login_password(); - let email = unique_email(); + let email = "anon@appflowy.io".to_string(); let payload = SignUpPayloadPB { email, name: "appflowy".to_string(), diff --git a/frontend/rust-lib/event-integration-test/tests/document/af_cloud_test/file_upload_test.rs b/frontend/rust-lib/event-integration-test/tests/document/af_cloud_test/file_upload_test.rs index 7d8ecc9680..f540b699f5 100644 --- a/frontend/rust-lib/event-integration-test/tests/document/af_cloud_test/file_upload_test.rs +++ b/frontend/rust-lib/event-integration-test/tests/document/af_cloud_test/file_upload_test.rs @@ -39,7 +39,7 @@ async fn af_cloud_upload_big_file_test() { // Simulate a restart let config = test.config.clone(); - test.skip_clean(); + test.skip_auto_remove_temp_dir(); drop(test); tokio::time::sleep(Duration::from_secs(3)).await; diff --git a/frontend/rust-lib/event-integration-test/tests/user/af_cloud_test/auth_test.rs b/frontend/rust-lib/event-integration-test/tests/user/af_cloud_test/auth_test.rs index eaec8f7540..47757f56f7 100644 --- a/frontend/rust-lib/event-integration-test/tests/user/af_cloud_test/auth_test.rs +++ b/frontend/rust-lib/event-integration-test/tests/user/af_cloud_test/auth_test.rs @@ -12,3 +12,34 @@ async fn af_cloud_sign_up_test() { let user = test.af_cloud_sign_in_with_email(&email).await.unwrap(); assert_eq!(user.email, email); } + +#[tokio::test] +async fn af_cloud_sign_up_then_switch_to_anon_test() { + // user_localhost_af_cloud_with_nginx().await; + use_localhost_af_cloud().await; + let mut test = EventIntegrationTest::new().await; + test.skip_auto_remove_temp_dir(); + + let email = generate_test_email(); + let user = test.af_cloud_sign_in_with_email(&email).await.unwrap(); + assert_eq!(user.email, email); + test.sign_out().await; + let config = test.config.clone(); + drop(test); + + let mut test = EventIntegrationTest::new_with_config(config.clone()).await; + test.skip_auto_remove_temp_dir(); + test.sign_up_as_anon().await; + drop(test); + + let mut test = EventIntegrationTest::new_with_config(config.clone()).await; + test.skip_auto_remove_temp_dir(); + let user = test.af_cloud_sign_in_with_email(&email).await.unwrap(); + assert_eq!(user.email, email); + test.sign_out().await; + drop(test); + + let mut test = EventIntegrationTest::new_with_config(config).await; + test.skip_auto_remove_temp_dir(); + test.sign_up_as_anon().await; +} diff --git a/frontend/rust-lib/flowy-ai/src/ai_manager.rs b/frontend/rust-lib/flowy-ai/src/ai_manager.rs index fd931bd73f..06b3adaeea 100644 --- a/frontend/rust-lib/flowy-ai/src/ai_manager.rs +++ b/frontend/rust-lib/flowy-ai/src/ai_manager.rs @@ -69,6 +69,11 @@ pub struct AIManager { pub store_preferences: Arc, server_models: Arc>, } +impl Drop for AIManager { + fn drop(&mut self) { + tracing::trace!("[Drop] drop ai manager"); + } +} impl AIManager { pub fn new( diff --git a/frontend/rust-lib/flowy-core/src/deps_resolve/database_deps.rs b/frontend/rust-lib/flowy-core/src/deps_resolve/database_deps.rs index 1bd3223946..dd68dcdd49 100644 --- a/frontend/rust-lib/flowy-core/src/deps_resolve/database_deps.rs +++ b/frontend/rust-lib/flowy-core/src/deps_resolve/database_deps.rs @@ -21,7 +21,7 @@ impl DatabaseDepsResolver { pub async fn resolve( authenticate_user: Weak, task_scheduler: Arc>, - collab_builder: Arc, + collab_builder: Weak, cloud_service: Arc, ai_service: Arc, ai_manager: Arc, diff --git a/frontend/rust-lib/flowy-core/src/deps_resolve/document_deps.rs b/frontend/rust-lib/flowy-core/src/deps_resolve/document_deps.rs index 3527bc42d6..6f1f53ebfd 100644 --- a/frontend/rust-lib/flowy-core/src/deps_resolve/document_deps.rs +++ b/frontend/rust-lib/flowy-core/src/deps_resolve/document_deps.rs @@ -1,7 +1,6 @@ use crate::deps_resolve::CollabSnapshotSql; use collab_integrate::collab_builder::AppFlowyCollabBuilder; use collab_integrate::CollabKVDB; -use flowy_database2::DatabaseManager; use flowy_document::entities::{DocumentSnapshotData, DocumentSnapshotMeta}; use flowy_document::manager::{DocumentManager, DocumentSnapshotService, DocumentUserService}; use flowy_document_pub::cloud::DocumentCloudService; @@ -15,8 +14,7 @@ pub struct DocumentDepsResolver(); impl DocumentDepsResolver { pub fn resolve( authenticate_user: Weak, - _database_manager: &Arc, - collab_builder: Arc, + collab_builder: Weak, cloud_service: Arc, storage_service: Weak, ) -> Arc { diff --git a/frontend/rust-lib/flowy-core/src/deps_resolve/folder_deps/folder_deps_chat_impl.rs b/frontend/rust-lib/flowy-core/src/deps_resolve/folder_deps/folder_deps_chat_impl.rs index e2791827ee..6aa51b45ae 100644 --- a/frontend/rust-lib/flowy-core/src/deps_resolve/folder_deps/folder_deps_chat_impl.rs +++ b/frontend/rust-lib/flowy-core/src/deps_resolve/folder_deps/folder_deps_chat_impl.rs @@ -7,10 +7,16 @@ use flowy_folder::entities::CreateViewParams; use flowy_folder::share::ImportType; use flowy_folder::view_operation::{FolderOperationHandler, ImportedData}; use lib_infra::async_trait::async_trait; -use std::sync::Arc; +use std::sync::{Arc, Weak}; use uuid::Uuid; -pub struct ChatFolderOperation(pub Arc); +pub struct ChatFolderOperation(pub Weak); + +impl ChatFolderOperation { + fn ai_manager(&self) -> Result, FlowyError> { + self.0.upgrade().ok_or_else(FlowyError::ref_drop) + } +} #[async_trait] impl FolderOperationHandler for ChatFolderOperation { @@ -19,15 +25,15 @@ impl FolderOperationHandler for ChatFolderOperation { } async fn open_view(&self, view_id: &Uuid) -> Result<(), FlowyError> { - self.0.open_chat(view_id).await + self.ai_manager()?.open_chat(view_id).await } async fn close_view(&self, view_id: &Uuid) -> Result<(), FlowyError> { - self.0.close_chat(view_id).await + self.ai_manager()?.close_chat(view_id).await } async fn delete_view(&self, view_id: &Uuid) -> Result<(), FlowyError> { - self.0.delete_chat(view_id).await + self.ai_manager()?.delete_chat(view_id).await } async fn duplicate_view(&self, _view_id: &Uuid) -> Result { @@ -51,7 +57,7 @@ impl FolderOperationHandler for ChatFolderOperation { _layout: ViewLayout, ) -> Result<(), FlowyError> { self - .0 + .ai_manager()? .create_chat(&user_id, parent_view_id, view_id) .await?; Ok(()) diff --git a/frontend/rust-lib/flowy-core/src/deps_resolve/folder_deps/folder_deps_database_impl.rs b/frontend/rust-lib/flowy-core/src/deps_resolve/folder_deps/folder_deps_database_impl.rs index edc40c6d5b..876be714bf 100644 --- a/frontend/rust-lib/flowy-core/src/deps_resolve/folder_deps/folder_deps_database_impl.rs +++ b/frontend/rust-lib/flowy-core/src/deps_resolve/folder_deps/folder_deps_database_impl.rs @@ -19,21 +19,26 @@ use flowy_user::services::data_import::{load_collab_by_object_id, load_collab_by use lib_infra::async_trait::async_trait; use std::collections::HashMap; use std::path::Path; -use std::sync::Arc; +use std::sync::{Arc, Weak}; use uuid::Uuid; -pub struct DatabaseFolderOperation(pub Arc); +pub struct DatabaseFolderOperation(pub Weak); +impl DatabaseFolderOperation { + fn database_manager(&self) -> Result, FlowyError> { + self.0.upgrade().ok_or_else(FlowyError::ref_drop) + } +} #[async_trait] impl FolderOperationHandler for DatabaseFolderOperation { async fn open_view(&self, view_id: &Uuid) -> Result<(), FlowyError> { - self.0.open_database_view(view_id).await?; + self.database_manager()?.open_database_view(view_id).await?; Ok(()) } async fn close_view(&self, view_id: &Uuid) -> Result<(), FlowyError> { self - .0 + .database_manager()? .close_database_view(view_id.to_string().as_str()) .await?; Ok(()) @@ -41,7 +46,7 @@ impl FolderOperationHandler for DatabaseFolderOperation { async fn delete_view(&self, view_id: &Uuid) -> Result<(), FlowyError> { match self - .0 + .database_manager()? .delete_database_view(view_id.to_string().as_str()) .await { @@ -58,17 +63,18 @@ impl FolderOperationHandler for DatabaseFolderOperation { ) -> Result { let workspace_id = _user.workspace_id()?; let view_id_str = view_id.to_string(); + let database_manager = self.database_manager()?; // get the collab_object_id for the database. // // the collab object_id for the database is not the view_id, // we should use the view_id to get the database_id - let oid = self.0.get_database_id_with_view_id(&view_id_str).await?; - let row_oids = self - .0 + let oid = database_manager + .get_database_id_with_view_id(&view_id_str) + .await?; + let row_oids = database_manager .get_database_row_ids_with_view_id(&view_id_str) .await?; - let row_metas = self - .0 + let row_metas = database_manager .get_database_row_metas_with_view_id(view_id, row_oids.clone()) .await?; let row_document_ids = row_metas @@ -79,7 +85,7 @@ impl FolderOperationHandler for DatabaseFolderOperation { .into_iter() .map(|oid| oid.into_inner()) .collect::>(); - let database_metas = self.0.get_all_databases_meta().await; + let database_metas = database_manager.get_all_databases_meta().await; let uid = _user .user_id() @@ -178,14 +184,14 @@ impl FolderOperationHandler for DatabaseFolderOperation { let duplicated_view_id = String::from_utf8(data.to_vec()).map_err(|_| FlowyError::invalid_data())?; let encoded_collab = self - .0 + .database_manager()? .duplicate_database(&duplicated_view_id, ¶ms.view_id.to_string()) .await?; Ok(Some(encoded_collab)) }, ViewData::Data(data) => { let encoded_collab = self - .0 + .database_manager()? .create_database_with_data(¶ms.view_id.to_string(), data.to_vec()) .await?; Ok(Some(encoded_collab)) @@ -207,7 +213,7 @@ impl FolderOperationHandler for DatabaseFolderOperation { let database_view_id = params.view_id.to_string(); let database_parent_view_id = params.parent_view_id.to_string(); self - .0 + .database_manager()? .create_linked_view( name, layout.into(), @@ -245,7 +251,7 @@ impl FolderOperationHandler for DatabaseFolderOperation { ); }, }; - let result = self.0.import_database(data).await; + let result = self.database_manager()?.import_database(data).await; match result { Ok(_) => Ok(()), Err(err) => { @@ -276,7 +282,7 @@ impl FolderOperationHandler for DatabaseFolderOperation { }) .await??; let result = self - .0 + .database_manager()? .import_csv(view_id.to_string(), content, format) .await?; Ok( @@ -309,7 +315,7 @@ impl FolderOperationHandler for DatabaseFolderOperation { let content = String::from_utf8(data).map_err(|e| FlowyError::invalid_data().with_context(e))?; let _ = self - .0 + .database_manager()? .import_csv(view_id.to_string(), content, CSVFormat::Original) .await?; Ok(()) @@ -327,7 +333,7 @@ impl FolderOperationHandler for DatabaseFolderOperation { if old.layout != new.layout { self - .0 + .database_manager()? .update_database_layout(&new.id, database_layout) .await?; Ok(()) diff --git a/frontend/rust-lib/flowy-core/src/deps_resolve/folder_deps/folder_deps_doc_impl.rs b/frontend/rust-lib/flowy-core/src/deps_resolve/folder_deps/folder_deps_doc_impl.rs index a843a8eb1f..adabdfa2a2 100644 --- a/frontend/rust-lib/flowy-core/src/deps_resolve/folder_deps/folder_deps_doc_impl.rs +++ b/frontend/rust-lib/flowy-core/src/deps_resolve/folder_deps/folder_deps_doc_impl.rs @@ -18,11 +18,17 @@ use lib_dispatch::prelude::ToBytes; use lib_infra::async_trait::async_trait; use std::convert::TryFrom; use std::str::FromStr; -use std::sync::Arc; +use std::sync::{Arc, Weak}; use tokio::sync::RwLock; use uuid::Uuid; -pub struct DocumentFolderOperation(pub Arc); +pub struct DocumentFolderOperation(pub Weak); + +impl DocumentFolderOperation { + fn document_manager(&self) -> Result, FlowyError> { + self.0.upgrade().ok_or_else(FlowyError::ref_drop) + } +} #[async_trait] impl FolderOperationHandler for DocumentFolderOperation { fn name(&self) -> &str { @@ -34,7 +40,7 @@ impl FolderOperationHandler for DocumentFolderOperation { uid: i64, workspace_view_builder: Arc>, ) -> Result<(), FlowyError> { - let manager = self.0.clone(); + let manager = self.document_manager()?; let mut write_guard = workspace_view_builder.write().await; // Create a view named "Getting started" with an icon ⭐️ and the built-in README data. @@ -60,18 +66,18 @@ impl FolderOperationHandler for DocumentFolderOperation { } async fn open_view(&self, view_id: &Uuid) -> Result<(), FlowyError> { - self.0.open_document(view_id).await?; + self.document_manager()?.open_document(view_id).await?; Ok(()) } /// Close the document view. async fn close_view(&self, view_id: &Uuid) -> Result<(), FlowyError> { - self.0.close_document(view_id).await?; + self.document_manager()?.close_document(view_id).await?; Ok(()) } async fn delete_view(&self, view_id: &Uuid) -> Result<(), FlowyError> { - match self.0.delete_document(view_id).await { + match self.document_manager()?.delete_document(view_id).await { Ok(_) => tracing::trace!("Delete document: {}", view_id), Err(e) => tracing::error!("🔴delete document failed: {}", e), } @@ -79,7 +85,11 @@ impl FolderOperationHandler for DocumentFolderOperation { } async fn duplicate_view(&self, view_id: &Uuid) -> Result { - let data: DocumentDataPB = self.0.get_document_data(view_id).await?.into(); + let data: DocumentDataPB = self + .document_manager()? + .get_document_data(view_id) + .await? + .into(); let data_bytes = data.into_bytes().map_err(|_| FlowyError::invalid_data())?; Ok(data_bytes) } @@ -107,7 +117,7 @@ impl FolderOperationHandler for DocumentFolderOperation { ViewData::Empty => None, }; let encoded_collab = self - .0 + .document_manager()? .create_document(user_id, ¶ms.view_id, data.map(|d| d.into())) .await?; Ok(Some(encoded_collab)) @@ -123,7 +133,11 @@ impl FolderOperationHandler for DocumentFolderOperation { layout: ViewLayout, ) -> Result<(), FlowyError> { debug_assert_eq!(layout, ViewLayout::Document); - match self.0.create_document(user_id, view_id, None).await { + match self + .document_manager()? + .create_document(user_id, view_id, None) + .await + { Ok(_) => Ok(()), Err(err) => { if err.is_already_exists() { @@ -145,7 +159,7 @@ impl FolderOperationHandler for DocumentFolderOperation { ) -> Result, FlowyError> { let data = DocumentDataPB::try_from(Bytes::from(bytes))?; let encoded_collab = self - .0 + .document_manager()? .create_document(uid, view_id, Some(data.into())) .await?; Ok(vec![( diff --git a/frontend/rust-lib/flowy-core/src/deps_resolve/folder_deps/mod.rs b/frontend/rust-lib/flowy-core/src/deps_resolve/folder_deps/mod.rs index 02b26e71b6..c677110923 100644 --- a/frontend/rust-lib/flowy-core/src/deps_resolve/folder_deps/mod.rs +++ b/frontend/rust-lib/flowy-core/src/deps_resolve/folder_deps/mod.rs @@ -34,7 +34,7 @@ impl FolderDepsResolver { pub async fn resolve( authenticate_user: Weak, collab_builder: Arc, - server_provider: Arc, + server_provider: Weak, folder_indexer: Arc, store_preferences: Arc, ) -> Arc { @@ -57,9 +57,9 @@ impl FolderDepsResolver { pub fn register_handlers( folder_manager: &Arc, - document_manager: Arc, - database_manager: Arc, - chat_manager: Arc, + document_manager: Weak, + database_manager: Weak, + chat_manager: Weak, ) { let document_folder_operation = Arc::new(DocumentFolderOperation(document_manager)); folder_manager.register_operation_handler(ViewLayout::Document, document_folder_operation); diff --git a/frontend/rust-lib/flowy-core/src/deps_resolve/user_deps.rs b/frontend/rust-lib/flowy-core/src/deps_resolve/user_deps.rs index 73c2844a23..482b46b76c 100644 --- a/frontend/rust-lib/flowy-core/src/deps_resolve/user_deps.rs +++ b/frontend/rust-lib/flowy-core/src/deps_resolve/user_deps.rs @@ -2,7 +2,7 @@ use crate::server_layer::ServerProvider; use collab_folder::hierarchy_builder::ParentChildViews; use collab_integrate::collab_builder::AppFlowyCollabBuilder; use flowy_database2::DatabaseManager; -use flowy_error::FlowyResult; +use flowy_error::{FlowyError, FlowyResult}; use flowy_folder::manager::FolderManager; use flowy_folder_pub::entities::ImportFrom; use flowy_sqlite::kv::KVStorePreferences; @@ -11,7 +11,7 @@ use flowy_user::user_manager::UserManager; use flowy_user_pub::workspace_service::UserWorkspaceService; use lib_infra::async_trait::async_trait; use std::collections::HashMap; -use std::sync::Arc; +use std::sync::{Arc, Weak}; use tracing::info; use uuid::Uuid; @@ -20,11 +20,11 @@ pub struct UserDepsResolver(); impl UserDepsResolver { pub async fn resolve( authenticate_user: Arc, - collab_builder: Arc, - server_provider: Arc, + collab_builder: Weak, + server_provider: Weak, store_preference: Arc, - database_manager: Arc, - folder_manager: Arc, + database_manager: Weak, + folder_manager: Weak, ) -> Arc { let workspace_service_impl = Arc::new(UserWorkspaceServiceImpl { database_manager, @@ -33,7 +33,7 @@ impl UserDepsResolver { UserManager::new( server_provider, store_preference, - Arc::downgrade(&collab_builder), + collab_builder, authenticate_user, workspace_service_impl, ) @@ -41,8 +41,8 @@ impl UserDepsResolver { } pub struct UserWorkspaceServiceImpl { - pub database_manager: Arc, - pub folder_manager: Arc, + pub database_manager: Weak, + pub folder_manager: Weak, } #[async_trait] @@ -58,12 +58,16 @@ impl UserWorkspaceService for UserWorkspaceServiceImpl { ImportFrom::AnonUser => { self .folder_manager + .upgrade() + .ok_or_else(FlowyError::ref_drop)? .insert_views_as_spaces(views, orphan_views) .await?; }, ImportFrom::AppFlowyDataFolder => { self .folder_manager + .upgrade() + .ok_or_else(FlowyError::ref_drop)? .insert_views_with_parent(views, orphan_views, parent_view_id) .await?; }, @@ -77,6 +81,8 @@ impl UserWorkspaceService for UserWorkspaceServiceImpl { ) -> FlowyResult<()> { self .database_manager + .upgrade() + .ok_or_else(FlowyError::ref_drop)? .update_database_indexing(ids_by_database_id) .await?; Ok(()) @@ -87,6 +93,8 @@ impl UserWorkspaceService for UserWorkspaceServiceImpl { // Log the error and continue if let Err(err) = self .folder_manager + .upgrade() + .ok_or_else(FlowyError::ref_drop)? .remove_indices_for_workspace(workspace_id) .await { diff --git a/frontend/rust-lib/flowy-core/src/lib.rs b/frontend/rust-lib/flowy-core/src/lib.rs index c2800bd73b..fe7e69649c 100644 --- a/frontend/rust-lib/flowy-core/src/lib.rs +++ b/frontend/rust-lib/flowy-core/src/lib.rs @@ -67,6 +67,13 @@ pub struct AppFlowyCore { pub search_manager: Arc, pub ai_manager: Arc, pub storage_manager: Arc, + pub collab_builder: Arc, +} + +impl Drop for AppFlowyCore { + fn drop(&mut self) { + tracing::trace!("[Drop] drop appflowy core"); + } } impl AppFlowyCore { @@ -174,7 +181,7 @@ impl AppFlowyCore { let folder_manager = FolderDepsResolver::resolve( Arc::downgrade(&authenticate_user), collab_builder.clone(), - server_provider.clone(), + Arc::downgrade(&server_provider), folder_indexer.clone(), store_preference.clone(), ) @@ -198,7 +205,7 @@ impl AppFlowyCore { let database_manager = DatabaseDepsResolver::resolve( Arc::downgrade(&authenticate_user), task_dispatcher.clone(), - collab_builder.clone(), + Arc::downgrade(&collab_builder), server_provider.clone(), server_provider.clone(), ai_manager.clone(), @@ -207,19 +214,18 @@ impl AppFlowyCore { let document_manager = DocumentDepsResolver::resolve( Arc::downgrade(&authenticate_user), - &database_manager, - collab_builder.clone(), + Arc::downgrade(&collab_builder), server_provider.clone(), Arc::downgrade(&storage_manager.storage_service), ); let user_manager = UserDepsResolver::resolve( authenticate_user.clone(), - collab_builder.clone(), - server_provider.clone(), + Arc::downgrade(&collab_builder), + Arc::downgrade(&server_provider), store_preference.clone(), - database_manager.clone(), - folder_manager.clone(), + Arc::downgrade(&database_manager), + Arc::downgrade(&folder_manager), ) .await; @@ -233,9 +239,9 @@ impl AppFlowyCore { // Register the folder operation handlers register_handlers( &folder_manager, - document_manager.clone(), - database_manager.clone(), - ai_manager.clone(), + Arc::downgrade(&document_manager), + Arc::downgrade(&database_manager), + Arc::downgrade(&ai_manager), ); ( @@ -253,14 +259,14 @@ impl AppFlowyCore { .await; let user_status_callback = UserStatusCallbackImpl { - user_manager: user_manager.clone(), - collab_builder, - folder_manager: folder_manager.clone(), - database_manager: database_manager.clone(), - document_manager: document_manager.clone(), - server_provider: server_provider.clone(), - storage_manager: storage_manager.clone(), - ai_manager: ai_manager.clone(), + user_manager: Arc::downgrade(&user_manager), + collab_builder: Arc::downgrade(&collab_builder), + folder_manager: Arc::downgrade(&folder_manager), + database_manager: Arc::downgrade(&database_manager), + document_manager: Arc::downgrade(&document_manager), + server_provider: Arc::downgrade(&server_provider), + storage_manager: Arc::downgrade(&storage_manager), + ai_manager: Arc::downgrade(&ai_manager), runtime: runtime.clone(), }; @@ -268,16 +274,13 @@ impl AppFlowyCore { database_manager: Arc::downgrade(&database_manager), document_manager: Arc::downgrade(&document_manager), }; - - let cloned_user_manager = Arc::downgrade(&user_manager); - if let Some(user_manager) = cloned_user_manager.upgrade() { - if let Err(err) = user_manager - .init_with_callback(user_status_callback, collab_interact_impl) - .await - { - error!("Init user failed: {}", err) - } + if let Err(err) = user_manager + .init_with_callback(user_status_callback, collab_interact_impl) + .await + { + error!("Init user failed: {}", err) } + #[allow(clippy::arc_with_non_send_sync)] let event_dispatcher = Arc::new(AFPluginDispatcher::new( runtime, @@ -305,6 +308,7 @@ impl AppFlowyCore { search_manager, ai_manager, storage_manager, + collab_builder, } } diff --git a/frontend/rust-lib/flowy-core/src/user_state_callback.rs b/frontend/rust-lib/flowy-core/src/user_state_callback.rs index 1a1762f651..4e2458e011 100644 --- a/frontend/rust-lib/flowy-core/src/user_state_callback.rs +++ b/frontend/rust-lib/flowy-core/src/user_state_callback.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::sync::{Arc, Weak}; use anyhow::Context; use client_api::entity::billing_dto::SubscriptionPlan; @@ -24,20 +24,63 @@ use lib_infra::async_trait::async_trait; use uuid::Uuid; pub(crate) struct UserStatusCallbackImpl { - pub(crate) user_manager: Arc, - pub(crate) collab_builder: Arc, - pub(crate) folder_manager: Arc, - pub(crate) database_manager: Arc, - pub(crate) document_manager: Arc, - pub(crate) server_provider: Arc, - pub(crate) storage_manager: Arc, - pub(crate) ai_manager: Arc, + pub(crate) user_manager: Weak, + pub(crate) collab_builder: Weak, + pub(crate) folder_manager: Weak, + pub(crate) database_manager: Weak, + pub(crate) document_manager: Weak, + pub(crate) server_provider: Weak, + pub(crate) storage_manager: Weak, + pub(crate) ai_manager: Weak, // By default, all callback will run on the caller thread. If you don't want to block the caller // thread, you can use runtime to spawn a new task. pub(crate) runtime: Arc, } impl UserStatusCallbackImpl { + fn user_manager(&self) -> Result, FlowyError> { + self.user_manager.upgrade().ok_or_else(FlowyError::ref_drop) + } + + fn folder_manager(&self) -> Result, FlowyError> { + self + .folder_manager + .upgrade() + .ok_or_else(FlowyError::ref_drop) + } + + fn database_manager(&self) -> Result, FlowyError> { + self + .database_manager + .upgrade() + .ok_or_else(FlowyError::ref_drop) + } + + fn document_manager(&self) -> Result, FlowyError> { + self + .document_manager + .upgrade() + .ok_or_else(FlowyError::ref_drop) + } + + fn server_provider(&self) -> Result, FlowyError> { + self + .server_provider + .upgrade() + .ok_or_else(FlowyError::ref_drop) + } + + fn storage_manager(&self) -> Result, FlowyError> { + self + .storage_manager + .upgrade() + .ok_or_else(FlowyError::ref_drop) + } + + fn ai_manager(&self) -> Result, FlowyError> { + self.ai_manager.upgrade().ok_or_else(FlowyError::ref_drop) + } + async fn folder_init_data_source( &self, user_id: i64, @@ -50,8 +93,8 @@ impl UserStatusCallbackImpl { }); } let doc_state_result = self - .folder_manager - .cloud_service + .folder_manager()? + .cloud_service()? .get_folder_doc_state(workspace_id, user_id, CollabType::Folder, workspace_id) .await; resolve_data_source(auth_type, doc_state_result) @@ -64,7 +107,7 @@ impl UserStatusCallbackImpl { object_id: &Uuid, ) -> FlowyResult { let db = self - .user_manager + .user_manager()? .get_collab_db(user_id)? .upgrade() .ok_or_else(|| FlowyError::internal().with_context("Collab db is not initialized"))?; @@ -87,17 +130,17 @@ impl UserStatusCallback for UserStatusCallbackImpl { ) -> FlowyResult<()> { if let Some(cloud_config) = cloud_config { self - .server_provider + .server_provider()? .set_enable_sync(user_id, cloud_config.enable_sync); if cloud_config.enable_encrypt { self - .server_provider + .server_provider()? .set_encrypt_secret(cloud_config.encrypt_secret.clone()); } } self - .folder_manager + .folder_manager()? .initialize( user_id, workspace_id, @@ -107,12 +150,12 @@ impl UserStatusCallback for UserStatusCallbackImpl { ) .await?; self - .database_manager + .database_manager()? .initialize(user_id, auth_type == &AuthType::Local) .await?; - self.document_manager.initialize(user_id).await?; + self.document_manager()?.initialize(user_id).await?; - let cloned_ai_manager = self.ai_manager.clone(); + let cloned_ai_manager = self.ai_manager()?; let workspace_id = *workspace_id; self.runtime.spawn(async move { if let Err(err) = cloned_ai_manager @@ -142,20 +185,20 @@ impl UserStatusCallback for UserStatusCallbackImpl { .folder_init_data_source(user_id, workspace_id, auth_type) .await?; self - .folder_manager + .folder_manager()? .initialize_after_sign_in(user_id, data_source) .await?; self - .database_manager + .database_manager()? .initialize_after_sign_in(user_id, auth_type.is_local()) .await?; self - .document_manager + .document_manager()? .initialize_after_sign_in(user_id) .await?; self - .ai_manager + .ai_manager()? .initialize_after_sign_in(workspace_id) .await?; @@ -182,7 +225,7 @@ impl UserStatusCallback for UserStatusCallbackImpl { .await?; self - .folder_manager + .folder_manager()? .initialize_after_sign_up( user_profile.uid, &user_profile.token, @@ -194,26 +237,26 @@ impl UserStatusCallback for UserStatusCallbackImpl { .context("FolderManager error")?; self - .database_manager + .database_manager()? .initialize_after_sign_up(user_profile.uid, auth_type.is_local()) .await .context("DatabaseManager error")?; self - .document_manager + .document_manager()? .initialize_after_sign_up(user_profile.uid) .await .context("DocumentManager error")?; self - .ai_manager + .ai_manager()? .initialize_after_sign_up(workspace_id) .await?; Ok(()) } async fn on_token_expired(&self, _token: &str, user_id: i64) -> FlowyResult<()> { - self.folder_manager.clear(user_id).await; + self.folder_manager()?.clear(user_id).await; Ok(()) } @@ -229,23 +272,23 @@ impl UserStatusCallback for UserStatusCallbackImpl { .await?; self - .folder_manager + .folder_manager()? .initialize_after_open_workspace(user_id, data_source) .await?; self - .database_manager + .database_manager()? .initialize_after_open_workspace(user_id, auth_type.is_local()) .await?; self - .document_manager + .document_manager()? .initialize_after_open_workspace(user_id) .await?; self - .ai_manager + .ai_manager()? .initialize_after_open_workspace(workspace_id) .await?; self - .storage_manager + .storage_manager()? .initialize_after_open_workspace(workspace_id) .await; Ok(()) @@ -253,8 +296,13 @@ impl UserStatusCallback for UserStatusCallbackImpl { fn on_network_status_changed(&self, reachable: bool) { info!("Notify did update network: reachable: {}", reachable); - self.collab_builder.update_network(reachable); - self.storage_manager.update_network_reachable(reachable); + if let Some(collab_builder) = self.collab_builder.upgrade() { + collab_builder.update_network(reachable); + } + + if let Ok(storage) = self.storage_manager() { + storage.update_network_reachable(reachable); + } } fn on_subscription_plans_updated(&self, plans: Vec) { @@ -266,15 +314,19 @@ impl UserStatusCallback for UserStatusCallbackImpl { } } if storage_plan_changed { - self.storage_manager.enable_storage_write_access(); + if let Ok(storage) = self.storage_manager() { + storage.enable_storage_write_access(); + } } } fn on_storage_permission_updated(&self, can_write: bool) { - if can_write { - self.storage_manager.enable_storage_write_access(); - } else { - self.storage_manager.disable_storage_write_access(); + if let Ok(storage) = self.storage_manager() { + if can_write { + storage.enable_storage_write_access(); + } else { + storage.disable_storage_write_access(); + } } } } diff --git a/frontend/rust-lib/flowy-database2/src/manager.rs b/frontend/rust-lib/flowy-database2/src/manager.rs index 666d2f8eaf..d79a32ff2f 100644 --- a/frontend/rust-lib/flowy-database2/src/manager.rs +++ b/frontend/rust-lib/flowy-database2/src/manager.rs @@ -59,16 +59,22 @@ pub struct DatabaseManager { task_scheduler: Arc>, pub(crate) editors: Mutex, removing_editor: Arc>>>, - collab_builder: Arc, + collab_builder: Weak, cloud_service: Arc, ai_service: Arc, } +impl Drop for DatabaseManager { + fn drop(&mut self) { + tracing::trace!("[Drop] drop database manager"); + } +} + impl DatabaseManager { pub fn new( database_user: Arc, task_scheduler: Arc>, - collab_builder: Arc, + collab_builder: Weak, cloud_service: Arc, ai_service: Arc, ) -> Self { @@ -84,6 +90,10 @@ impl DatabaseManager { } } + fn collab_builder(&self) -> FlowyResult> { + self.collab_builder.upgrade().ok_or(FlowyError::ref_drop()) + } + /// When initialize with new workspace, all the resources will be cleared. pub async fn initialize(&self, uid: i64, is_local_user: bool) -> FlowyResult<()> { // 1. Clear all existing tasks @@ -119,7 +129,7 @@ impl DatabaseManager { .await?; let collab_object = collab_service .build_collab_object(&workspace_database_object_id, CollabType::WorkspaceDatabase)?; - let workspace_database = self.collab_builder.create_workspace_database_manager( + let workspace_database = self.collab_builder()?.create_workspace_database_manager( collab_object, workspace_database_collab, collab_db, @@ -281,11 +291,12 @@ impl DatabaseManager { // hasn't finished syncing yet. In such cases, get_or_create_database will return None. // The workaround is to add a retry mechanism to attempt fetching the database again. let database = open_database_with_retry(workspace_database, database_id).await?; + let collab_builder = self.collab_builder()?; let editor = DatabaseEditor::new( self.user.clone(), database, self.task_scheduler.clone(), - self.collab_builder.clone(), + collab_builder, ) .await?; @@ -709,7 +720,7 @@ impl DatabaseManager { struct WorkspaceDatabaseCollabServiceImpl { is_local_user: bool, user: Arc, - collab_builder: Arc, + collab_builder: Weak, persistence: Arc, cloud_service: Arc, } @@ -718,7 +729,7 @@ impl WorkspaceDatabaseCollabServiceImpl { fn new( is_local_user: bool, user: Arc, - collab_builder: Arc, + collab_builder: Weak, cloud_service: Arc, ) -> Self { let persistence = DatabasePersistenceImpl { user: user.clone() }; @@ -731,6 +742,13 @@ impl WorkspaceDatabaseCollabServiceImpl { } } + fn collab_builder(&self) -> Result, DatabaseError> { + self + .collab_builder + .upgrade() + .ok_or_else(|| DatabaseError::Internal(anyhow!("Collab builder is not initialized"))) + } + async fn get_encode_collab( &self, object_id: &Uuid, @@ -797,7 +815,7 @@ impl WorkspaceDatabaseCollabServiceImpl { .workspace_id() .map_err(|err| DatabaseError::Internal(err.into()))?; let object = self - .collab_builder + .collab_builder()? .collab_object(&workspace_id, uid, object_id, object_type) .map_err(|err| DatabaseError::Internal(anyhow!("Failed to build collab object: {}", err)))?; Ok(object) @@ -906,7 +924,7 @@ impl DatabaseCollabService for WorkspaceDatabaseCollabServiceImpl { let collab_db = self.collab_db()?; let collab = self - .collab_builder + .collab_builder()? .build_collab(&object, &collab_db, data_source) .await?; Ok(collab) diff --git a/frontend/rust-lib/flowy-database2/src/services/database/database_editor.rs b/frontend/rust-lib/flowy-database2/src/services/database/database_editor.rs index 227b96df4f..e9c8d3afdd 100644 --- a/frontend/rust-lib/flowy-database2/src/services/database/database_editor.rs +++ b/frontend/rust-lib/flowy-database2/src/services/database/database_editor.rs @@ -65,7 +65,7 @@ pub struct DatabaseEditor { pub(crate) database_views: Arc, #[allow(dead_code)] user: Arc, - collab_builder: Arc, + collab_builder: Weak, is_loading_rows: ArcSwapOption>, opening_ret_txs: Arc>>, #[allow(dead_code)] @@ -138,7 +138,7 @@ impl DatabaseEditor { database, cell_cache, database_views, - collab_builder, + collab_builder: Arc::downgrade(&collab_builder), is_loading_rows: Default::default(), opening_ret_txs: Arc::new(Default::default()), database_cancellation, @@ -150,6 +150,13 @@ impl DatabaseEditor { Ok(this) } + pub fn collab_builder(&self) -> FlowyResult> { + self + .collab_builder + .upgrade() + .ok_or_else(FlowyError::ref_drop) + } + pub async fn close_view(&self, view_id: &str) { self.database_views.remove_view(view_id).await; } @@ -795,6 +802,7 @@ impl DatabaseEditor { } debug!("[Database]: Init database row: {}", row_id); + let collab_builder = self.collab_builder()?; let database_row = self .database .read() @@ -810,14 +818,14 @@ impl DatabaseEditor { if !is_finalized { trace!("[Database]: finalize database row: {}", row_id); let row_id = Uuid::from_str(row_id.as_str())?; - let collab_object = self.collab_builder.collab_object( + let collab_object = collab_builder.collab_object( &self.user.workspace_id()?, self.user.user_id()?, &row_id, CollabType::DatabaseRow, )?; - if let Err(err) = self.collab_builder.finalize( + if let Err(err) = collab_builder.finalize( collab_object, CollabBuilderConfig::default(), database_row.clone(), diff --git a/frontend/rust-lib/flowy-document/src/manager.rs b/frontend/rust-lib/flowy-document/src/manager.rs index 9c6a383bae..bfb9f885ce 100644 --- a/frontend/rust-lib/flowy-document/src/manager.rs +++ b/frontend/rust-lib/flowy-document/src/manager.rs @@ -53,7 +53,7 @@ pub trait DocumentSnapshotService: Send + Sync { pub struct DocumentManager { pub user_service: Arc, - collab_builder: Arc, + collab_builder: Weak, documents: Arc>>>, removing_documents: Arc>>>, cloud_service: Arc, @@ -61,10 +61,16 @@ pub struct DocumentManager { snapshot_service: Arc, } +impl Drop for DocumentManager { + fn drop(&mut self) { + tracing::trace!("[Drop] drop document manager"); + } +} + impl DocumentManager { pub fn new( user_service: Arc, - collab_builder: Arc, + collab_builder: Weak, cloud_service: Arc, storage_service: Weak, snapshot_service: Arc, @@ -80,6 +86,13 @@ impl DocumentManager { } } + fn collab_builder(&self) -> FlowyResult> { + self + .collab_builder + .upgrade() + .ok_or_else(FlowyError::ref_drop) + } + /// Get the encoded collab of the document. pub async fn get_encoded_collab_with_view_id(&self, doc_id: &Uuid) -> FlowyResult { let uid = self.user_service.user_id()?; @@ -190,10 +203,10 @@ impl DocumentManager { let workspace_id = self.user_service.workspace_id()?; let collab_object = self - .collab_builder + .collab_builder()? .collab_object(&workspace_id, uid, doc_id, CollabType::Document)?; let document = self - .collab_builder + .collab_builder()? .create_document( collab_object, data_source, diff --git a/frontend/rust-lib/flowy-document/tests/document/util.rs b/frontend/rust-lib/flowy-document/tests/document/util.rs index 231bb3852e..7315bd1753 100644 --- a/frontend/rust-lib/flowy-document/tests/document/util.rs +++ b/frontend/rust-lib/flowy-document/tests/document/util.rs @@ -25,6 +25,8 @@ use tracing_subscriber::{fmt::Subscriber, util::SubscriberInitExt, EnvFilter}; use uuid::Uuid; pub struct DocumentTest { + #[allow(dead_code)] + builder: Arc, inner: DocumentManager, } @@ -44,12 +46,15 @@ impl DocumentTest { let manager = DocumentManager::new( Arc::new(user), - builder, + Arc::downgrade(&builder), cloud_service, Arc::downgrade(&file_storage), document_snapshot, ); - Self { inner: manager } + Self { + inner: manager, + builder, + } } } diff --git a/frontend/rust-lib/flowy-error/src/code.rs b/frontend/rust-lib/flowy-error/src/code.rs index 4112883e61..b64da1c35f 100644 --- a/frontend/rust-lib/flowy-error/src/code.rs +++ b/frontend/rust-lib/flowy-error/src/code.rs @@ -383,6 +383,9 @@ pub enum ErrorCode { #[error("User not login")] UserNotLogin = 131, + + #[error("Reference resource is not available")] + WeakRefDrop = 132, } impl ErrorCode { diff --git a/frontend/rust-lib/flowy-error/src/errors.rs b/frontend/rust-lib/flowy-error/src/errors.rs index a9a2b6fa2b..f76a7d4dda 100644 --- a/frontend/rust-lib/flowy-error/src/errors.rs +++ b/frontend/rust-lib/flowy-error/src/errors.rs @@ -162,6 +162,7 @@ impl FlowyError { static_flowy_error!(local_ai_not_ready, ErrorCode::LocalAINotReady); static_flowy_error!(local_ai_disabled, ErrorCode::LocalAIDisabled); static_flowy_error!(user_not_login, ErrorCode::UserNotLogin); + static_flowy_error!(ref_drop, ErrorCode::WeakRefDrop); } impl std::convert::From for FlowyError { diff --git a/frontend/rust-lib/flowy-folder/src/manager.rs b/frontend/rust-lib/flowy-folder/src/manager.rs index 37533ae500..50c5ced0f2 100644 --- a/frontend/rust-lib/flowy-folder/src/manager.rs +++ b/frontend/rust-lib/flowy-folder/src/manager.rs @@ -63,16 +63,22 @@ pub struct FolderManager { pub(crate) collab_builder: Arc, pub(crate) user: Arc, pub(crate) operation_handlers: FolderOperationHandlers, - pub cloud_service: Arc, + pub cloud_service: Weak, pub(crate) folder_indexer: Arc, pub(crate) store_preferences: Arc, } +impl Drop for FolderManager { + fn drop(&mut self) { + tracing::trace!("[Drop] drop folder manager"); + } +} + impl FolderManager { pub fn new( user: Arc, collab_builder: Arc, - cloud_service: Arc, + cloud_service: Weak, folder_indexer: Arc, store_preferences: Arc, ) -> FlowyResult { @@ -89,6 +95,13 @@ impl FolderManager { Ok(manager) } + pub fn cloud_service(&self) -> FlowyResult> { + self + .cloud_service + .upgrade() + .ok_or_else(FlowyError::ref_drop) + } + pub fn register_operation_handler( &self, layout: ViewLayout, @@ -317,7 +330,7 @@ impl FolderManager { // The folder updates should not be empty, as the folder data is stored // when the user signs up for the first time. let result = self - .cloud_service + .cloud_service()? .get_folder_doc_state(workspace_id, user_id, CollabType::Folder, workspace_id) .await .map_err(FlowyError::from); @@ -326,7 +339,7 @@ impl FolderManager { Ok(folder_doc_state) => { info!( "Get folder updates via {}, doc state len: {}", - self.cloud_service.service_name(), + self.cloud_service()?.service_name(), folder_doc_state.len() ); self @@ -1202,7 +1215,7 @@ impl FolderManager { // Sync the view to the cloud if sync_after_create { self - .cloud_service + .cloud_service()? .batch_create_folder_collab_objects(&workspace_id, objects) .await?; } @@ -1355,7 +1368,7 @@ impl FolderManager { let workspace_id = self.user.workspace_id()?; self - .cloud_service + .cloud_service()? .publish_view(&workspace_id, payload) .await?; Ok(()) @@ -1366,7 +1379,7 @@ impl FolderManager { pub async fn unpublish_views(&self, view_ids: Vec) -> FlowyResult<()> { let workspace_id = self.user.workspace_id()?; self - .cloud_service + .cloud_service()? .unpublish_views(&workspace_id, view_ids) .await?; Ok(()) @@ -1376,7 +1389,7 @@ impl FolderManager { /// The publish info contains the namespace and publish_name of the view. #[tracing::instrument(level = "debug", skip(self))] pub async fn get_publish_info(&self, view_id: &Uuid) -> FlowyResult { - let publish_info = self.cloud_service.get_publish_info(view_id).await?; + let publish_info = self.cloud_service()?.get_publish_info(view_id).await?; Ok(publish_info) } @@ -1385,7 +1398,7 @@ impl FolderManager { pub async fn set_publish_name(&self, view_id: Uuid, new_name: String) -> FlowyResult<()> { let workspace_id = self.user.workspace_id()?; self - .cloud_service + .cloud_service()? .set_publish_name(&workspace_id, view_id, new_name) .await?; Ok(()) @@ -1397,7 +1410,7 @@ impl FolderManager { pub async fn set_publish_namespace(&self, new_namespace: String) -> FlowyResult<()> { let workspace_id = self.user.workspace_id()?; self - .cloud_service + .cloud_service()? .set_publish_namespace(&workspace_id, new_namespace) .await?; Ok(()) @@ -1408,7 +1421,7 @@ impl FolderManager { pub async fn get_publish_namespace(&self) -> FlowyResult { let workspace_id = self.user.workspace_id()?; let namespace = self - .cloud_service + .cloud_service()? .get_publish_namespace(&workspace_id) .await?; Ok(namespace) @@ -1419,7 +1432,7 @@ impl FolderManager { pub async fn list_published_views(&self) -> FlowyResult> { let workspace_id = self.user.workspace_id()?; let published_views = self - .cloud_service + .cloud_service()? .list_published_views(&workspace_id) .await?; Ok(published_views) @@ -1429,7 +1442,7 @@ impl FolderManager { pub async fn get_default_published_view_info(&self) -> FlowyResult { let workspace_id = self.user.workspace_id()?; let default_published_view_info = self - .cloud_service + .cloud_service()? .get_default_published_view_info(&workspace_id) .await?; Ok(default_published_view_info) @@ -1439,7 +1452,7 @@ impl FolderManager { pub async fn set_default_published_view(&self, view_id: uuid::Uuid) -> FlowyResult<()> { let workspace_id = self.user.workspace_id()?; self - .cloud_service + .cloud_service()? .set_default_published_view(&workspace_id, view_id) .await?; Ok(()) @@ -1449,7 +1462,7 @@ impl FolderManager { pub async fn remove_default_published_view(&self) -> FlowyResult<()> { let workspace_id = self.user.workspace_id()?; self - .cloud_service + .cloud_service()? .remove_default_published_view(&workspace_id) .await?; Ok(()) @@ -1779,7 +1792,7 @@ impl FolderManager { } pub(crate) async fn import_zip_file(&self, zip_file_path: &str) -> FlowyResult<()> { - self.cloud_service.import_zip(zip_file_path).await?; + self.cloud_service()?.import_zip(zip_file_path).await?; Ok(()) } @@ -1809,7 +1822,7 @@ impl FolderManager { info!("Syncing the imported {} collab to the cloud", objects.len()); self - .cloud_service + .cloud_service()? .batch_create_folder_collab_objects(&workspace_id, objects) .await?; @@ -1939,7 +1952,7 @@ impl FolderManager { limit: usize, ) -> FlowyResult> { let snapshots = self - .cloud_service + .cloud_service()? .get_folder_snapshots(workspace_id, limit) .await? .into_iter() diff --git a/frontend/rust-lib/flowy-folder/src/manager_init.rs b/frontend/rust-lib/flowy-folder/src/manager_init.rs index c581031f54..25f0b4989e 100644 --- a/frontend/rust-lib/flowy-folder/src/manager_init.rs +++ b/frontend/rust-lib/flowy-folder/src/manager_init.rs @@ -73,7 +73,7 @@ impl FolderManager { // 3. If the folder doesn't exist and create_if_not_exist is false, try to fetch the folder data from cloud/ // This will happen user can't fetch the folder data when the user sign in. let doc_state = self - .cloud_service + .cloud_service()? .get_folder_doc_state(workspace_id, uid, CollabType::Folder, workspace_id) .await?; diff --git a/frontend/rust-lib/flowy-search/src/services/manager.rs b/frontend/rust-lib/flowy-search/src/services/manager.rs index a71449d5d2..f39c73dc4c 100644 --- a/frontend/rust-lib/flowy-search/src/services/manager.rs +++ b/frontend/rust-lib/flowy-search/src/services/manager.rs @@ -121,6 +121,12 @@ impl SearchManager { } } +impl Drop for SearchManager { + fn drop(&mut self) { + tracing::trace!("[Drop] drop search manager"); + } +} + async fn is_current_search( current_search: &Arc>>, search_id: &str, diff --git a/frontend/rust-lib/flowy-user/src/event_handler.rs b/frontend/rust-lib/flowy-user/src/event_handler.rs index ee6efb1006..f48c09c45a 100644 --- a/frontend/rust-lib/flowy-user/src/event_handler.rs +++ b/frontend/rust-lib/flowy-user/src/event_handler.rs @@ -382,10 +382,9 @@ pub async fn set_cloud_config_handler( let mut config = get_cloud_config(session.user_id, &store_preferences) .ok_or(FlowyError::internal().with_context("Can't find any cloud config"))?; + let cloud_service = manager.cloud_service()?; if let Some(enable_sync) = update.enable_sync { - manager - .cloud_service - .set_enable_sync(session.user_id, enable_sync); + cloud_service.set_enable_sync(session.user_id, enable_sync); config.enable_sync = enable_sync; } @@ -395,7 +394,7 @@ pub async fn set_cloud_config_handler( enable_sync: config.enable_sync, enable_encrypt: config.enable_encrypt, encrypt_secret: config.encrypt_secret, - server_url: manager.cloud_service.service_url(), + server_url: cloud_service.service_url(), }; send_notification( @@ -416,13 +415,14 @@ pub async fn get_cloud_config_handler( let manager = upgrade_manager(manager)?; let session = manager.get_session()?; let store_preferences = upgrade_store_preferences(store_preferences)?; + let cloud_service = manager.cloud_service()?; // Generate the default config if the config is not exist let config = get_or_create_cloud_config(session.user_id, &store_preferences); data_result_ok(CloudSettingPB { enable_sync: config.enable_sync, enable_encrypt: config.enable_encrypt, encrypt_secret: config.encrypt_secret, - server_url: manager.cloud_service.service_url(), + server_url: cloud_service.service_url(), }) } @@ -476,7 +476,7 @@ pub async fn update_network_state_handler( ) -> Result<(), FlowyError> { let manager = upgrade_manager(manager)?; let reachable = data.into_inner().ty.is_reachable(); - manager.cloud_service.set_network_reachable(reachable); + manager.cloud_service()?.set_network_reachable(reachable); manager .user_status_callback .read() diff --git a/frontend/rust-lib/flowy-user/src/migrations/anon_user_workspace.rs b/frontend/rust-lib/flowy-user/src/migrations/anon_user_workspace.rs index 01729906a1..8eac0d47a1 100644 --- a/frontend/rust-lib/flowy-user/src/migrations/anon_user_workspace.rs +++ b/frontend/rust-lib/flowy-user/src/migrations/anon_user_workspace.rs @@ -1,6 +1,6 @@ use diesel::SqliteConnection; use semver::Version; -use std::sync::Arc; +use std::sync::{Arc, Weak}; use tracing::instrument; use collab_integrate::CollabKVDB; @@ -35,7 +35,7 @@ impl UserDataMigration for AnonUserWorkspaceTableMigration { fn run( &self, user: &Session, - _collab_db: &Arc, + _collab_db: &Weak, user_auth_type: &AuthType, db: &mut SqliteConnection, store_preferences: &Arc, diff --git a/frontend/rust-lib/flowy-user/src/migrations/doc_key_with_workspace.rs b/frontend/rust-lib/flowy-user/src/migrations/doc_key_with_workspace.rs index 4d8f304454..66eac029f2 100644 --- a/frontend/rust-lib/flowy-user/src/migrations/doc_key_with_workspace.rs +++ b/frontend/rust-lib/flowy-user/src/migrations/doc_key_with_workspace.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::sync::{Arc, Weak}; use collab_plugins::local_storage::kv::doc::migrate_old_keys; use collab_plugins::local_storage::kv::KVTransactionDB; @@ -7,7 +7,7 @@ use semver::Version; use tracing::{instrument, trace}; use collab_integrate::CollabKVDB; -use flowy_error::FlowyResult; +use flowy_error::{FlowyError, FlowyResult}; use flowy_sqlite::kv::KVStorePreferences; use flowy_user_pub::entities::AuthType; @@ -40,11 +40,14 @@ impl UserDataMigration for CollabDocKeyWithWorkspaceIdMigration { fn run( &self, user: &Session, - collab_db: &Arc, + collab_db: &Weak, _user_auth_type: &AuthType, _db: &mut SqliteConnection, _store_preferences: &Arc, ) -> FlowyResult<()> { + let collab_db = collab_db + .upgrade() + .ok_or_else(|| FlowyError::internal().with_context("Failed to upgrade DB object"))?; trace!("migrate key with workspace id:{}", user.workspace_id); collab_db.with_write_txn(|txn| { migrate_old_keys(txn, &user.workspace_id)?; diff --git a/frontend/rust-lib/flowy-user/src/migrations/document_empty_content.rs b/frontend/rust-lib/flowy-user/src/migrations/document_empty_content.rs index 26ead8b2a9..3aa3aa485f 100644 --- a/frontend/rust-lib/flowy-user/src/migrations/document_empty_content.rs +++ b/frontend/rust-lib/flowy-user/src/migrations/document_empty_content.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::sync::{Arc, Weak}; use collab::core::origin::{CollabClient, CollabOrigin}; use collab::preclude::Collab; @@ -42,7 +42,7 @@ impl UserDataMigration for HistoricalEmptyDocumentMigration { fn run( &self, user: &Session, - collab_db: &Arc, + collab_db: &Weak, user_auth_type: &AuthType, _db: &mut SqliteConnection, _store_preferences: &Arc, @@ -53,6 +53,9 @@ impl UserDataMigration for HistoricalEmptyDocumentMigration { if !matches!(user_auth_type, AuthType::Local) { return Ok(()); } + let collab_db = collab_db + .upgrade() + .ok_or_else(|| FlowyError::internal().with_context("Failed to upgrade DB object"))?; collab_db.with_write_txn(|write_txn| { let origin = CollabOrigin::Client(CollabClient::new(user.user_id, "phantom")); let folder_collab = match load_collab( diff --git a/frontend/rust-lib/flowy-user/src/migrations/migration.rs b/frontend/rust-lib/flowy-user/src/migrations/migration.rs index 563f36b952..691a1970b5 100644 --- a/frontend/rust-lib/flowy-user/src/migrations/migration.rs +++ b/frontend/rust-lib/flowy-user/src/migrations/migration.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::sync::{Arc, Weak}; use chrono::NaiveDateTime; use collab_integrate::CollabKVDB; @@ -18,7 +18,7 @@ pub const FIRST_TIME_INSTALL_VERSION: &str = "first_install_version"; pub struct UserLocalDataMigration { session: Session, - collab_db: Arc, + collab_db: Weak, sqlite_pool: Arc, kv: Arc, } @@ -26,7 +26,7 @@ pub struct UserLocalDataMigration { impl UserLocalDataMigration { pub fn new( session: Session, - collab_db: Arc, + collab_db: Weak, sqlite_pool: Arc, kv: Arc, ) -> Self { @@ -103,7 +103,7 @@ pub trait UserDataMigration { fn run( &self, user: &Session, - collab_db: &Arc, + collab_db: &Weak, user_auth_type: &AuthType, db: &mut SqliteConnection, store_preferences: &Arc, diff --git a/frontend/rust-lib/flowy-user/src/migrations/workspace_and_favorite_v1.rs b/frontend/rust-lib/flowy-user/src/migrations/workspace_and_favorite_v1.rs index eee4a114b8..92a5f5f48e 100644 --- a/frontend/rust-lib/flowy-user/src/migrations/workspace_and_favorite_v1.rs +++ b/frontend/rust-lib/flowy-user/src/migrations/workspace_and_favorite_v1.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::sync::{Arc, Weak}; use collab_folder::Folder; use collab_plugins::local_storage::kv::{KVTransactionDB, PersistenceError}; @@ -7,7 +7,7 @@ use semver::Version; use tracing::instrument; use collab_integrate::{CollabKVAction, CollabKVDB}; -use flowy_error::FlowyResult; +use flowy_error::{FlowyError, FlowyResult}; use flowy_sqlite::kv::KVStorePreferences; use flowy_user_pub::entities::AuthType; @@ -40,11 +40,14 @@ impl UserDataMigration for FavoriteV1AndWorkspaceArrayMigration { fn run( &self, user: &Session, - collab_db: &Arc, + collab_db: &Weak, _user_auth_type: &AuthType, _db: &mut SqliteConnection, _store_preferences: &Arc, ) -> FlowyResult<()> { + let collab_db = collab_db + .upgrade() + .ok_or_else(|| FlowyError::internal().with_context("Failed to upgrade DB object"))?; collab_db.with_write_txn(|write_txn| { if let Ok(collab) = load_collab( user.user_id, diff --git a/frontend/rust-lib/flowy-user/src/migrations/workspace_trash_v1.rs b/frontend/rust-lib/flowy-user/src/migrations/workspace_trash_v1.rs index 38665f3c41..5e56cdfecd 100644 --- a/frontend/rust-lib/flowy-user/src/migrations/workspace_trash_v1.rs +++ b/frontend/rust-lib/flowy-user/src/migrations/workspace_trash_v1.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::sync::{Arc, Weak}; use collab_folder::Folder; use collab_plugins::local_storage::kv::{KVTransactionDB, PersistenceError}; @@ -7,7 +7,7 @@ use semver::Version; use tracing::instrument; use collab_integrate::{CollabKVAction, CollabKVDB}; -use flowy_error::FlowyResult; +use flowy_error::{FlowyError, FlowyResult}; use flowy_sqlite::kv::KVStorePreferences; use flowy_user_pub::entities::AuthType; @@ -38,11 +38,14 @@ impl UserDataMigration for WorkspaceTrashMapToSectionMigration { fn run( &self, user: &Session, - collab_db: &Arc, + collab_db: &Weak, _user_auth_type: &AuthType, _db: &mut SqliteConnection, _store_preferences: &Arc, ) -> FlowyResult<()> { + let collab_db = collab_db + .upgrade() + .ok_or_else(|| FlowyError::internal().with_context("Failed to upgrade DB object"))?; collab_db.with_write_txn(|write_txn| { if let Ok(collab) = load_collab( user.user_id, diff --git a/frontend/rust-lib/flowy-user/src/services/authenticate_user.rs b/frontend/rust-lib/flowy-user/src/services/authenticate_user.rs index 881f4dfdf7..3b7428642d 100644 --- a/frontend/rust-lib/flowy-user/src/services/authenticate_user.rs +++ b/frontend/rust-lib/flowy-user/src/services/authenticate_user.rs @@ -26,6 +26,15 @@ pub struct AuthenticateUser { session: ArcSwapOption, } +impl Drop for AuthenticateUser { + fn drop(&mut self) { + tracing::trace!( + "[Drop ]Drop AuthenticateUser: {:?}", + self.session.load_full().map(|s| s.user_id) + ); + } +} + impl AuthenticateUser { pub fn new(user_config: UserConfig, store_preferences: Arc) -> Self { let user_paths = UserPaths::new(user_config.storage_path.clone()); @@ -71,10 +80,7 @@ impl AuthenticateUser { } pub fn get_collab_db(&self, uid: i64) -> FlowyResult> { - self - .database - .get_collab_db(uid) - .map(|collab_db| Arc::downgrade(&collab_db)) + self.database.get_collab_db(uid) } pub fn get_sqlite_connection(&self, uid: i64) -> FlowyResult { @@ -104,7 +110,11 @@ impl AuthenticateUser { pub fn is_collab_on_disk(&self, uid: i64, object_id: &str) -> FlowyResult { let session = self.get_session()?; - let collab_db = self.database.get_collab_db(uid)?; + let collab_db = self + .database + .get_collab_db(uid)? + .upgrade() + .ok_or_else(|| FlowyError::internal().with_context("Collab db is not initialized"))?; let read_txn = collab_db.read_txn(); Ok(read_txn.is_exist(uid, session.workspace_id.as_str(), object_id)) } diff --git a/frontend/rust-lib/flowy-user/src/services/data_import/appflowy_data_import.rs b/frontend/rust-lib/flowy-user/src/services/data_import/appflowy_data_import.rs index f2e48834c4..9b42436209 100644 --- a/frontend/rust-lib/flowy-user/src/services/data_import/appflowy_data_import.rs +++ b/frontend/rust-lib/flowy-user/src/services/data_import/appflowy_data_import.rs @@ -47,7 +47,7 @@ use uuid::Uuid; pub(crate) struct ImportedFolder { pub imported_session: Session, - pub imported_collab_db: Arc, + pub imported_collab_db: Weak, pub container_name: Option, pub parent_view_id: Option, pub source: ImportedSource, @@ -126,7 +126,7 @@ pub(crate) fn prepare_import( run_data_migration( &imported_session, &imported_user_auth_type, - imported_collab_db.clone(), + Arc::downgrade(&imported_collab_db), imported_sqlite_db.get_pool(), other_store_preferences.clone(), app_version, @@ -134,7 +134,7 @@ pub(crate) fn prepare_import( Ok(ImportedFolder { imported_session, - imported_collab_db, + imported_collab_db: Arc::downgrade(&imported_collab_db), container_name: None, parent_view_id, source: ImportedSource::ExternalFolder, @@ -174,7 +174,10 @@ pub(crate) fn generate_import_data( let imported_workspace_id = imported_folder.imported_session.workspace_id.clone(); let imported_session = imported_folder.imported_session.clone(); let imported_workspace_database_id = imported_folder.workspace_database_id.clone(); - let imported_collab_db = imported_folder.imported_collab_db.clone(); + let imported_collab_db = imported_folder + .imported_collab_db + .upgrade() + .ok_or_else(|| FlowyError::internal().with_context("Failed to upgrade DB object"))?; let imported_container_view_name = imported_folder.container_name.clone(); let mut database_view_ids_by_database_id: HashMap> = HashMap::new(); diff --git a/frontend/rust-lib/flowy-user/src/services/db.rs b/frontend/rust-lib/flowy-user/src/services/db.rs index 15126558d7..a5fdc826ac 100644 --- a/frontend/rust-lib/flowy-user/src/services/db.rs +++ b/frontend/rust-lib/flowy-user/src/services/db.rs @@ -1,6 +1,3 @@ -use std::path::{Path, PathBuf}; -use std::{fs, io, sync::Arc}; - use chrono::{Days, Local}; use collab_integrate::{CollabKVAction, CollabKVDB, PersistenceError}; use collab_plugins::local_storage::kv::KVTransactionDB; @@ -12,6 +9,9 @@ use flowy_sqlite::{DBConnection, Database}; use flowy_user_pub::entities::UserProfile; use flowy_user_pub::sql::select_user_profile; use lib_infra::file_util::{unzip_and_replace, zip_folder}; +use std::path::{Path, PathBuf}; +use std::sync::Weak; +use std::{fs, io, sync::Arc}; use tracing::{error, event, info, instrument}; pub trait UserDBPath: Send + Sync + 'static { @@ -97,9 +97,9 @@ impl UserDB { Ok(pool) } - pub(crate) fn get_collab_db(&self, user_id: i64) -> Result, FlowyError> { + pub(crate) fn get_collab_db(&self, user_id: i64) -> Result, FlowyError> { let collab_db = self.open_collab_db(self.paths.collab_db_path(user_id), user_id)?; - Ok(collab_db) + Ok(Arc::downgrade(&collab_db)) } pub fn open_sqlite_db( @@ -340,7 +340,7 @@ pub(crate) fn validate_collab_db( read_txn.is_exist(uid, workspace_id, workspace_id) }, Err(err) => { - error!("open collab db error, {:?}", err); + error!("open collab db error when validate collab, {:?}", err); !matches!( err, PersistenceError::RocksdbCorruption(_) | PersistenceError::RocksdbRepairFail(_) diff --git a/frontend/rust-lib/flowy-user/src/user_manager/manager.rs b/frontend/rust-lib/flowy-user/src/user_manager/manager.rs index 057519deaf..3ef9d7c3fc 100644 --- a/frontend/rust-lib/flowy-user/src/user_manager/manager.rs +++ b/frontend/rust-lib/flowy-user/src/user_manager/manager.rs @@ -44,7 +44,7 @@ use flowy_user_pub::session::Session; use flowy_user_pub::sql::*; pub struct UserManager { - pub(crate) cloud_service: Arc, + pub(crate) cloud_service: Weak, pub(crate) store_preferences: Arc, pub(crate) user_awareness: Arc>>, pub(crate) user_status_callback: RwLock>, @@ -56,9 +56,15 @@ pub struct UserManager { pub(crate) is_loading_awareness: Arc>, } +impl Drop for UserManager { + fn drop(&mut self) { + tracing::trace!("[Drop] drop user manager"); + } +} + impl UserManager { pub fn new( - cloud_services: Arc, + cloud_services: Weak, store_preferences: Arc, collab_builder: Weak, authenticate_user: Arc, @@ -82,7 +88,12 @@ impl UserManager { }); let weak_user_manager = Arc::downgrade(&user_manager); - if let Ok(user_service) = user_manager.cloud_service.get_user_service() { + if let Ok(user_service) = user_manager + .cloud_service + .upgrade() + .ok_or_else(FlowyError::ref_drop) + .and_then(|v| v.get_user_service()) + { if let Some(mut rx) = user_service.subscribe_user_update() { tokio::spawn(async move { while let Some(update) = rx.recv().await { @@ -99,6 +110,13 @@ impl UserManager { user_manager } + pub fn cloud_service(&self) -> FlowyResult> { + self + .cloud_service + .upgrade() + .ok_or_else(FlowyError::ref_drop) + } + pub fn close_db(&self) { if let Err(err) = self.authenticate_user.close_db() { error!("Close db failed: {:?}", err); @@ -125,6 +143,7 @@ impl UserManager { let user_status_callback = Arc::new(user_status_callback); *self.user_status_callback.write().await = user_status_callback.clone(); *self.collab_interact.write().await = Arc::new(collab_interact); + let cloud_service = self.cloud_service()?; if let Ok(session) = self.get_session() { info!( @@ -137,9 +156,7 @@ impl UserManager { let uid = session.user_id; let token = self.token_from_auth_type(&auth_type)?; - self - .cloud_service - .set_server_auth_type(&auth_type, token.clone())?; + cloud_service.set_server_auth_type(&auth_type, token.clone())?; event!( tracing::Level::INFO, @@ -157,12 +174,12 @@ impl UserManager { if auth_type.is_appflowy_cloud() { let local_token = token.unwrap_or_default(); // Subscribe the token state - let weak_cloud_services = Arc::downgrade(&self.cloud_service); + let weak_cloud_services = self.cloud_service.clone(); let weak_authenticate_user = Arc::downgrade(&self.authenticate_user); let weak_pool = Arc::downgrade(&self.db_pool(uid)?); let workspace_id = session.workspace_id.clone(); let cloned_session = session.clone(); - if let Some(mut token_state_rx) = self.cloud_service.subscribe_token_state() { + if let Some(mut token_state_rx) = cloud_service.subscribe_token_state() { event!(tracing::Level::DEBUG, "Listen token state change"); let user_uid = uid; tokio::spawn(async move { @@ -305,11 +322,7 @@ impl UserManager { } pub fn get_collab_db(&self, uid: i64) -> Result, FlowyError> { - self - .authenticate_user - .database - .get_collab_db(uid) - .map(|collab_db| Arc::downgrade(&collab_db)) + self.authenticate_user.database.get_collab_db(uid) } #[cfg(debug_assertions)] @@ -331,10 +344,10 @@ impl UserManager { params: SignInParams, auth_type: AuthType, ) -> Result { - self.cloud_service.set_server_auth_type(&auth_type, None)?; + let cloud_service = self.cloud_service()?; + cloud_service.set_server_auth_type(&auth_type, None)?; - let response: AuthResponse = self - .cloud_service + let response: AuthResponse = cloud_service .get_user_service()? .sign_in(BoxAny::new(params)) .await?; @@ -380,11 +393,12 @@ impl UserManager { auth_type: AuthType, params: BoxAny, ) -> Result { - self.cloud_service.set_server_auth_type(&auth_type, None)?; + let cloud_service = self.cloud_service()?; + cloud_service.set_server_auth_type(&auth_type, None)?; // sign out the current user if there is one let migration_user = self.get_migration_user(&auth_type).await; - let auth_service = self.cloud_service.get_user_service()?; + let auth_service = cloud_service.get_user_service()?; let response: AuthResponse = auth_service.sign_up(params).await?; let new_user_profile = UserProfile::from((&response, &auth_type)); self @@ -462,7 +476,7 @@ impl UserManager { pub async fn sign_out(&self) -> Result<(), FlowyError> { if let Ok(session) = self.get_session() { sign_out( - &self.cloud_service, + &self.cloud_service()?, &session, &self.authenticate_user, self.db_connection(session.user_id)?, @@ -475,7 +489,7 @@ impl UserManager { #[tracing::instrument(level = "info", skip(self))] pub async fn delete_account(&self) -> Result<(), FlowyError> { self - .cloud_service + .cloud_service()? .get_user_service()? .delete_account() .await?; @@ -502,7 +516,7 @@ impl UserManager { changeset, )?; self - .cloud_service + .cloud_service()? .get_user_service()? .update_user(params) .await?; @@ -560,7 +574,7 @@ impl UserManager { let uid = old_user_profile.uid; let result: Result = self - .cloud_service + .cloud_service()? .get_user_service()? .get_user_profile(uid, workspace_id) .await; @@ -645,7 +659,7 @@ impl UserManager { } pub async fn receive_realtime_event(&self, json: Value) { - if let Ok(user_service) = self.cloud_service.get_user_service() { + if let Ok(user_service) = self.cloud_service().and_then(|v| v.get_user_service()) { user_service.receive_realtime_event(json) } } @@ -656,11 +670,10 @@ impl UserManager { authenticator: &AuthType, email: &str, ) -> Result { - self - .cloud_service - .set_server_auth_type(authenticator, None)?; + let cloud_service = self.cloud_service()?; + cloud_service.set_server_auth_type(authenticator, None)?; - let auth_service = self.cloud_service.get_user_service()?; + let auth_service = cloud_service.get_user_service()?; let url = auth_service.generate_sign_in_url_with_email(email).await?; Ok(url) } @@ -672,9 +685,9 @@ impl UserManager { password: &str, ) -> Result { self - .cloud_service + .cloud_service()? .set_server_auth_type(&AuthType::AppFlowyCloud, None)?; - let auth_service = self.cloud_service.get_user_service()?; + let auth_service = self.cloud_service()?.get_user_service()?; let response = auth_service.sign_in_with_password(email, password).await?; Ok(response) } @@ -686,9 +699,9 @@ impl UserManager { redirect_to: &str, ) -> Result<(), FlowyError> { self - .cloud_service + .cloud_service()? .set_server_auth_type(&AuthType::AppFlowyCloud, None)?; - let auth_service = self.cloud_service.get_user_service()?; + let auth_service = self.cloud_service()?.get_user_service()?; auth_service .sign_in_with_magic_link(email, redirect_to) .await?; @@ -702,9 +715,9 @@ impl UserManager { passcode: &str, ) -> Result { self - .cloud_service + .cloud_service()? .set_server_auth_type(&AuthType::AppFlowyCloud, None)?; - let auth_service = self.cloud_service.get_user_service()?; + let auth_service = self.cloud_service()?.get_user_service()?; let response = auth_service.sign_in_with_passcode(email, passcode).await?; Ok(response) } @@ -715,9 +728,9 @@ impl UserManager { oauth_provider: &str, ) -> Result { self - .cloud_service + .cloud_service()? .set_server_auth_type(&AuthType::AppFlowyCloud, None)?; - let auth_service = self.cloud_service.get_user_service()?; + let auth_service = self.cloud_service()?.get_user_service()?; let url = auth_service .generate_oauth_url_with_provider(oauth_provider) .await?; @@ -855,7 +868,7 @@ fn mark_all_migrations_as_applied(sqlite_pool: &Arc) { pub(crate) fn run_data_migration( session: &Session, user_auth_type: &AuthType, - collab_db: Arc, + collab_db: Weak, sqlite_pool: Arc, kv: Arc, app_version: &Version, @@ -885,7 +898,13 @@ pub async fn sign_out( authenticate_user: &AuthenticateUser, conn: DBConnection, ) -> Result<(), FlowyError> { + info!("[Sign out] Sign out user: {}", session.user_id); let _ = remove_user_token(session.user_id, conn); + + info!( + "[Sign out] Close user related database: {}", + session.user_id + ); authenticate_user.database.close(session.user_id)?; authenticate_user.set_session(None)?; diff --git a/frontend/rust-lib/flowy-user/src/user_manager/manager_user_awareness.rs b/frontend/rust-lib/flowy-user/src/user_manager/manager_user_awareness.rs index 1edf4f393b..0dd2b5a02c 100644 --- a/frontend/rust-lib/flowy-user/src/user_manager/manager_user_awareness.rs +++ b/frontend/rust-lib/flowy-user/src/user_manager/manager_user_awareness.rs @@ -215,7 +215,7 @@ impl UserManager { let collab_db = self.get_collab_db(session.user_id)?; let weak_builder = self.collab_builder.clone(); let user_awareness = Arc::downgrade(&self.user_awareness); - let cloud_services = self.cloud_service.clone(); + let cloud_services = self.cloud_service()?; let authenticate_user = self.authenticate_user.clone(); let is_loading_awareness = self.is_loading_awareness.clone(); diff --git a/frontend/rust-lib/flowy-user/src/user_manager/manager_user_workspace.rs b/frontend/rust-lib/flowy-user/src/user_manager/manager_user_workspace.rs index af5b176106..862c4ca15e 100644 --- a/frontend/rust-lib/flowy-user/src/user_manager/manager_user_workspace.rs +++ b/frontend/rust-lib/flowy-user/src/user_manager/manager_user_workspace.rs @@ -3,7 +3,7 @@ use client_api::entity::billing_dto::{RecurringInterval, SubscriptionPlanDetail} use client_api::entity::billing_dto::{SubscriptionPlan, WorkspaceUsageAndLimit}; use std::str::FromStr; -use std::sync::Arc; +use std::sync::{Arc, Weak}; use crate::entities::{ RepeatedUserWorkspacePB, SubscribeWorkspacePB, SuccessWorkspaceSubscriptionPB, @@ -40,7 +40,9 @@ impl UserManager { let user_collab_db = self .authenticate_user .database - .get_collab_db(current_session.user_id)?; + .get_collab_db(current_session.user_id)? + .upgrade() + .ok_or_else(|| FlowyError::internal().with_context("Collab db not found"))?; let cloned_current_session = current_session.clone(); let import_data = tokio::task::spawn_blocking(move || { @@ -111,7 +113,7 @@ impl UserManager { let user_id = current_session.user_id; let workspace_id = Uuid::parse_str(¤t_session.workspace_id)?; let weak_user_collab_db = Arc::downgrade(&user_collab_db); - let weak_user_cloud_service = self.cloud_service.get_user_service()?; + let weak_user_cloud_service = self.cloud_service()?.get_user_service()?; match upload_collab_objects_data( user_id, weak_user_collab_db, @@ -139,7 +141,7 @@ impl UserManager { pub async fn migration_anon_user_on_appflowy_cloud_sign_up( &self, old_user: &AnonUser, - old_collab_db: &Arc, + old_collab_db: &Weak, ) -> FlowyResult<()> { let import_context = ImportedFolder { imported_session: old_user.session.as_ref().clone(), @@ -158,13 +160,14 @@ impl UserManager { info!("open workspace: {}, auth type:{}", workspace_id, auth_type); let workspace_id_str = workspace_id.to_string(); let token = self.token_from_auth_type(&auth_type)?; - self.cloud_service.set_server_auth_type(&auth_type, token)?; + let cloud_service = self.cloud_service()?; + cloud_service.set_server_auth_type(&auth_type, token)?; let uid = self.user_id()?; let profile = self .get_user_profile_from_disk(uid, &workspace_id_str) .await?; - if let Err(err) = self.cloud_service.set_token(&profile.token) { + if let Err(err) = cloud_service.set_token(&profile.token) { error!("Set token failed: {}", err); } @@ -174,7 +177,7 @@ impl UserManager { if err.is_record_not_found() { sync_workspace( workspace_id, - self.cloud_service.get_user_service()?, + cloud_service.get_user_service()?, uid, auth_type, self.db_pool(uid)?, @@ -187,7 +190,7 @@ impl UserManager { Ok(row) => { let user_workspace = UserWorkspace::from(row); let workspace_id = *workspace_id; - let user_service = self.cloud_service.get_user_service()?; + let user_service = cloud_service.get_user_service()?; let pool = self.db_pool(uid)?; tokio::spawn(async move { let _ = sync_workspace(&workspace_id, user_service, uid, auth_type, pool).await; @@ -231,10 +234,11 @@ impl UserManager { auth_type: AuthType, ) -> FlowyResult { let token = self.token_from_auth_type(&auth_type)?; - self.cloud_service.set_server_auth_type(&auth_type, token)?; + let cloud_service = self.cloud_service()?; + cloud_service.set_server_auth_type(&auth_type, token)?; let new_workspace = self - .cloud_service + .cloud_service()? .get_user_service()? .create_workspace(workspace_name) .await?; @@ -257,7 +261,7 @@ impl UserManager { changeset: UserWorkspaceChangeset, ) -> FlowyResult<()> { self - .cloud_service + .cloud_service()? .get_user_service()? .patch_workspace(workspace_id, changeset.name.clone(), changeset.icon.clone()) .await?; @@ -280,7 +284,7 @@ impl UserManager { pub async fn leave_workspace(&self, workspace_id: &Uuid) -> FlowyResult<()> { info!("leave workspace: {}", workspace_id); self - .cloud_service + .cloud_service()? .get_user_service()? .leave_workspace(workspace_id) .await?; @@ -300,7 +304,7 @@ impl UserManager { pub async fn delete_workspace(&self, workspace_id: &Uuid) -> FlowyResult<()> { info!("delete workspace: {}", workspace_id); self - .cloud_service + .cloud_service()? .get_user_service()? .delete_workspace(workspace_id) .await?; @@ -323,7 +327,7 @@ impl UserManager { role: Role, ) -> FlowyResult<()> { self - .cloud_service + .cloud_service()? .get_user_service()? .invite_workspace_member(invitee_email, workspace_id, role) .await?; @@ -333,7 +337,7 @@ impl UserManager { pub async fn list_pending_workspace_invitations(&self) -> FlowyResult> { let status = Some(WorkspaceInvitationStatus::Pending); let invitations = self - .cloud_service + .cloud_service()? .get_user_service()? .list_workspace_invitations(status) .await?; @@ -342,7 +346,7 @@ impl UserManager { pub async fn accept_workspace_invitation(&self, invite_id: String) -> FlowyResult<()> { self - .cloud_service + .cloud_service()? .get_user_service()? .accept_workspace_invitations(invite_id) .await?; @@ -355,7 +359,7 @@ impl UserManager { workspace_id: Uuid, ) -> FlowyResult<()> { self - .cloud_service + .cloud_service()? .get_user_service()? .remove_workspace_member(user_email, workspace_id) .await?; @@ -367,7 +371,7 @@ impl UserManager { workspace_id: Uuid, ) -> FlowyResult> { let members = self - .cloud_service + .cloud_service()? .get_user_service()? .get_workspace_members(workspace_id) .await?; @@ -380,7 +384,7 @@ impl UserManager { uid: i64, ) -> FlowyResult { let member = self - .cloud_service + .cloud_service()? .get_user_service()? .get_workspace_member(&workspace_id, uid) .await?; @@ -394,7 +398,7 @@ impl UserManager { role: Role, ) -> FlowyResult<()> { self - .cloud_service + .cloud_service()? .get_user_service()? .update_workspace_member(user_email, workspace_id, role) .await?; @@ -420,7 +424,10 @@ impl UserManager { let local_workspaces = select_all_user_workspace(uid, &mut conn)?; // 2) If both cloud service and pool are available, fire off a background sync - if let (Ok(service), Ok(pool)) = (self.cloud_service.get_user_service(), self.db_pool(uid)) { + if let (Ok(service), Ok(pool)) = ( + self.cloud_service().and_then(|v| v.get_user_service()), + self.db_pool(uid), + ) { // capture only what we need let auth_copy = auth_type; @@ -477,7 +484,7 @@ impl UserManager { ) -> FlowyResult { let workspace_id = Uuid::from_str(&workspace_subscription.workspace_id)?; let payment_link = self - .cloud_service + .cloud_service()? .get_user_service()? .subscribe_workspace( workspace_id, @@ -497,7 +504,7 @@ impl UserManager { ) -> FlowyResult { let workspace_id = Uuid::from_str(&workspace_id)?; let subscriptions = self - .cloud_service + .cloud_service()? .get_user_service()? .get_workspace_subscription_one(&workspace_id) .await?; @@ -513,7 +520,7 @@ impl UserManager { reason: Option, ) -> FlowyResult<()> { self - .cloud_service + .cloud_service()? .get_user_service()? .cancel_workspace_subscription(workspace_id, plan, reason) .await?; @@ -528,7 +535,7 @@ impl UserManager { recurring_interval: RecurringInterval, ) -> FlowyResult<()> { self - .cloud_service + .cloud_service()? .get_user_service()? .update_workspace_subscription_payment_period(workspace_id, plan, recurring_interval) .await?; @@ -538,7 +545,7 @@ impl UserManager { #[instrument(level = "info", skip(self), err)] pub async fn get_subscription_plan_details(&self) -> FlowyResult> { let plan_details = self - .cloud_service + .cloud_service()? .get_user_service()? .get_subscription_plan_details() .await?; @@ -551,7 +558,7 @@ impl UserManager { workspace_id: &Uuid, ) -> FlowyResult { let workspace_usage = self - .cloud_service + .cloud_service()? .get_user_service()? .get_workspace_usage(workspace_id) .await?; @@ -577,7 +584,7 @@ impl UserManager { #[instrument(level = "info", skip(self), err)] pub async fn get_billing_portal_url(&self) -> FlowyResult { let url = self - .cloud_service + .cloud_service()? .get_user_service()? .get_billing_portal_url() .await?; @@ -589,7 +596,7 @@ impl UserManager { updated_settings: UpdateUserWorkspaceSettingPB, ) -> FlowyResult<()> { let workspace_id = Uuid::from_str(&updated_settings.workspace_id)?; - let cloud_service = self.cloud_service.get_user_service()?; + let cloud_service = self.cloud_service()?.get_user_service()?; let settings = cloud_service .update_workspace_setting(&workspace_id, updated_settings.clone().into()) .await?; @@ -629,7 +636,7 @@ impl UserManager { // Spawn a task to sync remote settings using the helper let pool = self.db_pool(uid)?; - let cloud_service = self.cloud_service.clone(); + let cloud_service = self.cloud_service()?; tokio::spawn(async move { let _ = sync_workspace_settings(cloud_service, workspace_id, old_pb, uid, pool).await; }); @@ -638,7 +645,7 @@ impl UserManager { Err(err) => { if err.is_record_not_found() { trace!("No workspace settings found, fetch from remote"); - let service = self.cloud_service.get_user_service()?; + let service = self.cloud_service()?.get_user_service()?; let settings = service.get_workspace_setting(workspace_id).await?; let pb = WorkspaceSettingsPB::from(&settings); let mut conn = self.db_connection(uid)?; @@ -691,7 +698,7 @@ impl UserManager { ) -> FlowyResult { trace!("get workspace member info from remote: {}", workspace_id); let member = self - .cloud_service + .cloud_service()? .get_user_service()? .get_workspace_member(workspace_id, uid) .await?; @@ -721,7 +728,7 @@ impl UserManager { let plans = PeriodicallyCheckBillingState::new( workspace_id, success.plan.map(SubscriptionPlan::from), - Arc::downgrade(&self.cloud_service), + self.cloud_service.clone(), Arc::downgrade(&self.authenticate_user), ) .start()