diff --git a/frontend/rust-lib/Cargo.lock b/frontend/rust-lib/Cargo.lock index ac47524944..2e85c57326 100644 --- a/frontend/rust-lib/Cargo.lock +++ b/frontend/rust-lib/Cargo.lock @@ -2609,6 +2609,7 @@ name = "flowy-core" version = "0.1.0" dependencies = [ "af-local-ai", + "af-plugin", "anyhow", "arc-swap", "base64 0.21.5", @@ -2966,6 +2967,7 @@ dependencies = [ "collab-plugins", "collab-user", "dotenv", + "flowy-ai", "flowy-ai-pub", "flowy-database-pub", "flowy-document-pub", diff --git a/frontend/rust-lib/Cargo.toml b/frontend/rust-lib/Cargo.toml index 581715985f..6e93b13a19 100644 --- a/frontend/rust-lib/Cargo.toml +++ b/frontend/rust-lib/Cargo.toml @@ -99,6 +99,8 @@ zip = "2.2.0" dashmap = "6.0.1" derive_builder = "0.20.2" tantivy = { version = "0.24.0" } +af-plugin = { version = "0.1" } +af-local-ai = { version = "0.1" } # Please using the following command to update the revision id # Current directory: frontend diff --git a/frontend/rust-lib/flowy-ai/Cargo.toml b/frontend/rust-lib/flowy-ai/Cargo.toml index a31d42da61..3a6aaf5898 100644 --- a/frontend/rust-lib/flowy-ai/Cargo.toml +++ b/frontend/rust-lib/flowy-ai/Cargo.toml @@ -35,8 +35,8 @@ serde_json = { workspace = true } anyhow = "1.0.86" tokio-stream = "0.1.15" tokio-util = { workspace = true, features = ["full"] } -af-local-ai = { version = "0.1.0" } -af-plugin = { version = "0.1.0" } +af-local-ai = { workspace = true } +af-plugin = { workspace = true } reqwest = { version = "0.11.27", features = ["json"] } sha2 = "0.10.7" base64 = "0.21.5" diff --git a/frontend/rust-lib/flowy-ai/src/lib.rs b/frontend/rust-lib/flowy-ai/src/lib.rs index 6ab100fd6e..ccd3920e0d 100644 --- a/frontend/rust-lib/flowy-ai/src/lib.rs +++ b/frontend/rust-lib/flowy-ai/src/lib.rs @@ -5,7 +5,7 @@ pub mod ai_manager; mod chat; mod completion; pub mod entities; -mod local_ai; +pub mod local_ai; // #[cfg(any(target_os = "windows", target_os = "macos", target_os = "linux"))] // pub mod mcp; diff --git a/frontend/rust-lib/flowy-ai/src/local_ai/controller.rs b/frontend/rust-lib/flowy-ai/src/local_ai/controller.rs index ac44e9ad55..494e11d073 100644 --- a/frontend/rust-lib/flowy-ai/src/local_ai/controller.rs +++ b/frontend/rust-lib/flowy-ai/src/local_ai/controller.rs @@ -24,10 +24,10 @@ use serde::{Deserialize, Serialize}; use serde_json::json; use std::ops::Deref; use std::path::{Path, PathBuf}; -use std::sync::Arc; +use std::sync::{Arc, Weak}; use tokio::select; use tokio_stream::StreamExt; -use tracing::{debug, error, info, instrument}; +use tracing::{debug, error, info, instrument, warn}; use uuid::Uuid; #[derive(Clone, Debug, Serialize, Deserialize)] @@ -53,7 +53,7 @@ pub struct LocalAIController { ai_plugin: Arc, resource: Arc, current_chat_id: ArcSwapOption, - store_preferences: Arc, + store_preferences: Weak, user_service: Arc, #[allow(dead_code)] cloud_service: Arc, @@ -70,7 +70,7 @@ impl Deref for LocalAIController { impl LocalAIController { pub fn new( plugin_manager: Arc, - store_preferences: Arc, + store_preferences: Weak, user_service: Arc, cloud_service: Arc, ) -> Self { @@ -94,7 +94,7 @@ impl LocalAIController { let mut running_state_rx = local_ai.subscribe_running_state(); let cloned_llm_res = Arc::clone(&local_ai_resource); - let cloned_store_preferences = Arc::clone(&store_preferences); + let cloned_store_preferences = store_preferences.clone(); let cloned_local_ai = Arc::clone(&local_ai); let cloned_user_service = Arc::clone(&user_service); @@ -110,44 +110,47 @@ impl LocalAIController { info!("[AI Plugin] state: {:?}", state); // Read whether plugin is enabled from store; default to true - let enabled = cloned_store_preferences.get_bool(&key).unwrap_or(true); + if let Some(store_preferences) = cloned_store_preferences.upgrade() { + let enabled = store_preferences.get_bool(&key).unwrap_or(true); + // Only check resource status if the plugin isn’t in "UnexpectedStop" and is enabled + let (plugin_downloaded, lack_of_resource) = + if !matches!(state, RunningState::UnexpectedStop { .. }) && enabled { + // Possibly check plugin readiness and resource concurrency in parallel, + // but here we do it sequentially for clarity. + let downloaded = is_plugin_ready(); + let resource_lack = cloned_llm_res.get_lack_of_resource().await; + (downloaded, resource_lack) + } else { + (false, None) + }; - // Only check resource status if the plugin isn’t in "UnexpectedStop" and is enabled - let (plugin_downloaded, lack_of_resource) = - if !matches!(state, RunningState::UnexpectedStop { .. }) && enabled { - // Possibly check plugin readiness and resource concurrency in parallel, - // but here we do it sequentially for clarity. - let downloaded = is_plugin_ready(); - let resource_lack = cloned_llm_res.get_lack_of_resource().await; - (downloaded, resource_lack) + // If plugin is running, retrieve version + let plugin_version = if matches!(state, RunningState::Running { .. }) { + match cloned_local_ai.plugin_info().await { + Ok(info) => Some(info.version), + Err(_) => None, + } } else { - (false, None) + None }; - // If plugin is running, retrieve version - let plugin_version = if matches!(state, RunningState::Running { .. }) { - match cloned_local_ai.plugin_info().await { - Ok(info) => Some(info.version), - Err(_) => None, - } + // Broadcast the new local AI state + let new_state = RunningStatePB::from(state); + chat_notification_builder( + APPFLOWY_AI_NOTIFICATION_KEY, + ChatNotification::UpdateLocalAIState, + ) + .payload(LocalAIPB { + enabled, + plugin_downloaded, + lack_of_resource, + state: new_state, + plugin_version, + }) + .send(); } else { - None - }; - - // Broadcast the new local AI state - let new_state = RunningStatePB::from(state); - chat_notification_builder( - APPFLOWY_AI_NOTIFICATION_KEY, - ChatNotification::UpdateLocalAIState, - ) - .payload(LocalAIPB { - enabled, - plugin_downloaded, - lack_of_resource, - state: new_state, - plugin_version, - }) - .send(); + warn!("[AI Plugin] store preferences is dropped"); + } } }); @@ -207,6 +210,13 @@ impl LocalAIController { Ok(()) } + fn upgrade_store_preferences(&self) -> FlowyResult> { + self + .store_preferences + .upgrade() + .ok_or_else(|| FlowyError::internal().with_context("Store preferences is dropped")) + } + /// Indicate whether the local AI plugin is running. pub fn is_running(&self) -> bool { if !self.is_enabled() { @@ -228,7 +238,10 @@ impl LocalAIController { .workspace_id() .map(|workspace_id| local_ai_enabled_key(&workspace_id)) { - self.store_preferences.get_bool(&key).unwrap_or(false) + match self.upgrade_store_preferences() { + Ok(store) => store.get_bool(&key).unwrap_or(false), + Err(_) => false, + } } else { false } @@ -373,8 +386,9 @@ impl LocalAIController { pub async fn toggle_local_ai(&self) -> FlowyResult { let workspace_id = self.user_service.workspace_id()?; let key = local_ai_enabled_key(&workspace_id); - let enabled = !self.store_preferences.get_bool(&key).unwrap_or(true); - self.store_preferences.set_bool(&key, enabled)?; + let store_preferences = self.upgrade_store_preferences()?; + let enabled = !store_preferences.get_bool(&key).unwrap_or(true); + store_preferences.set_bool(&key, enabled)?; self.toggle_plugin(enabled).await?; Ok(enabled) } @@ -591,7 +605,16 @@ async fn initialize_ai_plugin( pub struct LLMResourceServiceImpl { user_service: Arc, cloud_service: Arc, - store_preferences: Arc, + store_preferences: Weak, +} + +impl LLMResourceServiceImpl { + fn upgrade_store_preferences(&self) -> FlowyResult> { + self + .store_preferences + .upgrade() + .ok_or_else(|| FlowyError::internal().with_context("Store preferences is dropped")) + } } #[async_trait] impl LLMResourceService for LLMResourceServiceImpl { @@ -605,16 +628,14 @@ impl LLMResourceService for LLMResourceServiceImpl { } fn store_setting(&self, setting: LocalAISetting) -> Result<(), Error> { - self - .store_preferences - .set_object(LOCAL_AI_SETTING_KEY, &setting)?; + let store_preferences = self.upgrade_store_preferences()?; + store_preferences.set_object(LOCAL_AI_SETTING_KEY, &setting)?; Ok(()) } fn retrieve_setting(&self) -> Option { - self - .store_preferences - .get_object::(LOCAL_AI_SETTING_KEY) + let store_preferences = self.upgrade_store_preferences().ok()?; + store_preferences.get_object::(LOCAL_AI_SETTING_KEY) } } diff --git a/frontend/rust-lib/flowy-core/Cargo.toml b/frontend/rust-lib/flowy-core/Cargo.toml index 6499f79284..b4e7bd5fec 100644 --- a/frontend/rust-lib/flowy-core/Cargo.toml +++ b/frontend/rust-lib/flowy-core/Cargo.toml @@ -37,7 +37,8 @@ flowy-storage-pub = { workspace = true } client-api.workspace = true flowy-ai = { workspace = true } flowy-ai-pub = { workspace = true } -af-local-ai = { version = "0.1.0" } +af-local-ai = { workspace = true } +af-plugin = { workspace = true } tracing.workspace = true diff --git a/frontend/rust-lib/flowy-core/src/server_layer.rs b/frontend/rust-lib/flowy-core/src/server_layer.rs index 8157748b4f..176818abdf 100644 --- a/frontend/rust-lib/flowy-core/src/server_layer.rs +++ b/frontend/rust-lib/flowy-core/src/server_layer.rs @@ -1,11 +1,9 @@ +use crate::AppFlowyCoreConfig; +use af_plugin::manager::PluginManager; use arc_swap::ArcSwapOption; use dashmap::DashMap; -use diesel::Connection; -use serde_repr::*; -use std::fmt::{Display, Formatter}; -use std::sync::atomic::{AtomicBool, AtomicU8, Ordering}; -use std::sync::{Arc, Weak}; - +use flowy_ai::ai_manager::AIUserService; +use flowy_ai::local_ai::controller::LocalAIController; use flowy_error::{FlowyError, FlowyResult}; use flowy_server::af_cloud::define::ServerUser; use flowy_server::af_cloud::AppFlowyCloudServer; @@ -14,8 +12,10 @@ use flowy_server::{AppFlowyEncryption, AppFlowyServer, EncryptionImpl}; use flowy_server_pub::AuthenticatorType; use flowy_sqlite::kv::KVStorePreferences; use flowy_user_pub::entities::*; - -use crate::AppFlowyCoreConfig; +use serde_repr::*; +use std::fmt::{Display, Formatter}; +use std::sync::atomic::{AtomicBool, AtomicU8, Ordering}; +use std::sync::{Arc, Weak}; #[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize_repr, Deserialize_repr)] #[repr(u8)] @@ -66,10 +66,21 @@ impl ServerProvider { config: AppFlowyCoreConfig, server: Server, store_preferences: Weak, + user_service: impl AIUserService, server_user: impl ServerUser + 'static, ) -> Self { let user = Arc::new(server_user); let encryption = EncryptionImpl::new(None); + + let user_service = Arc::new(user_service); + let plugin_manager = Arc::new(PluginManager::new()); + let local_ai = Arc::new(LocalAIController::new( + plugin_manager.clone(), + store_preferences.clone(), + user_service.clone(), + chat_cloud_service.clone(), + )); + Self { config, providers: DashMap::new(), diff --git a/frontend/rust-lib/flowy-server/Cargo.toml b/frontend/rust-lib/flowy-server/Cargo.toml index e94570cc4a..5225eb817d 100644 --- a/frontend/rust-lib/flowy-server/Cargo.toml +++ b/frontend/rust-lib/flowy-server/Cargo.toml @@ -44,6 +44,7 @@ tokio-stream = { workspace = true, features = ["sync"] } rand = "0.8.5" semver = "1.0.23" flowy-sqlite = { workspace = true } +flowy-ai = { workspace = true } [dependencies.client-api] workspace = true