2025-03-09 23:32:42 +08:00
use crate ::ai_manager ::AIUserService ;
use crate ::local_ai ::controller ::LocalAISetting ;
use flowy_ai_pub ::cloud ::LocalAIConfig ;
use flowy_error ::{ ErrorCode , FlowyError , FlowyResult } ;
use lib_infra ::async_trait ::async_trait ;
use crate ::entities ::LackOfAIResourcePB ;
2025-03-11 13:21:58 +08:00
use crate ::local_ai ::watch ::{ is_plugin_ready , ollama_plugin_path } ;
2025-03-09 23:32:42 +08:00
#[ cfg(target_os = " macos " ) ]
use crate ::local_ai ::watch ::{ watch_offline_app , WatchContext } ;
use crate ::notification ::{
chat_notification_builder , ChatNotification , APPFLOWY_AI_NOTIFICATION_KEY ,
} ;
2025-03-27 14:16:58 +08:00
use af_local_ai ::ollama_plugin ::OllamaPluginConfig ;
2025-03-09 23:32:42 +08:00
use lib_infra ::util ::{ get_operating_system , OperatingSystem } ;
use reqwest ::Client ;
2025-03-13 14:01:41 +08:00
use serde ::Deserialize ;
2025-03-09 23:32:42 +08:00
use std ::path ::PathBuf ;
use std ::sync ::Arc ;
use std ::time ::Duration ;
use tracing ::{ error , info , instrument , trace } ;
2025-03-13 14:01:41 +08:00
#[ derive(Debug, Deserialize) ]
struct TagsResponse {
models : Vec < ModelEntry > ,
}
#[ derive(Debug, Deserialize) ]
struct ModelEntry {
name : String ,
}
2025-03-09 23:32:42 +08:00
#[ async_trait ]
pub trait LLMResourceService : Send + Sync + 'static {
/// Get local ai configuration from remote server
async fn fetch_local_ai_config ( & self ) -> Result < LocalAIConfig , anyhow ::Error > ;
fn store_setting ( & self , setting : LocalAISetting ) -> Result < ( ) , anyhow ::Error > ;
fn retrieve_setting ( & self ) -> Option < LocalAISetting > ;
}
const LLM_MODEL_DIR : & str = " models " ;
#[ derive(Debug, Clone) ]
#[ allow(dead_code) ]
pub enum WatchDiskEvent {
Create ,
Remove ,
}
pub enum PendingResource {
2025-03-10 22:48:09 +08:00
PluginExecutableNotReady ,
2025-03-10 15:56:09 +08:00
OllamaServerNotReady ,
2025-03-09 23:32:42 +08:00
MissingModel ( String ) ,
}
2025-03-30 09:54:08 +08:00
impl PendingResource {
pub fn desc ( self ) -> String {
match self {
PendingResource ::PluginExecutableNotReady = > " The Local AI app was not installed correctly. Please follow the instructions to install the Local AI application " . to_string ( ) ,
PendingResource ::OllamaServerNotReady = > " Ollama is not ready. Please follow the instructions to install Ollama " . to_string ( ) ,
PendingResource ::MissingModel ( model ) = > format! ( " Cannot find the model: {} . Please use the ollama pull command to install the model " , model ) ,
}
}
}
2025-03-09 23:32:42 +08:00
pub struct LocalAIResourceController {
user_service : Arc < dyn AIUserService > ,
resource_service : Arc < dyn LLMResourceService > ,
resource_notify : tokio ::sync ::broadcast ::Sender < ( ) > ,
#[ cfg(target_os = " macos " ) ]
#[ allow(dead_code) ]
app_disk_watch : Option < WatchContext > ,
app_state_sender : tokio ::sync ::broadcast ::Sender < WatchDiskEvent > ,
}
impl LocalAIResourceController {
pub fn new (
user_service : Arc < dyn AIUserService > ,
resource_service : impl LLMResourceService ,
) -> Self {
let ( resource_notify , _ ) = tokio ::sync ::broadcast ::channel ( 1 ) ;
let ( app_state_sender , _ ) = tokio ::sync ::broadcast ::channel ( 1 ) ;
#[ cfg(target_os = " macos " ) ]
let mut offline_app_disk_watch : Option < WatchContext > = None ;
#[ cfg(target_os = " macos " ) ]
{
match watch_offline_app ( ) {
Ok ( ( new_watcher , mut rx ) ) = > {
let sender = app_state_sender . clone ( ) ;
tokio ::spawn ( async move {
while let Some ( event ) = rx . recv ( ) . await {
if let Err ( err ) = sender . send ( event ) {
error! ( " [LLM Resource] Failed to send offline app state: {:?} " , err ) ;
}
}
} ) ;
offline_app_disk_watch = Some ( new_watcher ) ;
} ,
Err ( err ) = > {
error! ( " [LLM Resource] Failed to watch offline app path: {:?} " , err ) ;
} ,
}
}
Self {
user_service ,
resource_service : Arc ::new ( resource_service ) ,
#[ cfg(target_os = " macos " ) ]
app_disk_watch : offline_app_disk_watch ,
app_state_sender ,
resource_notify ,
}
}
pub fn subscribe_resource_notify ( & self ) -> tokio ::sync ::broadcast ::Receiver < ( ) > {
self . resource_notify . subscribe ( )
}
pub fn subscribe_app_state ( & self ) -> tokio ::sync ::broadcast ::Receiver < WatchDiskEvent > {
self . app_state_sender . subscribe ( )
}
/// Returns true when all resources are downloaded and ready to use.
pub async fn is_resource_ready ( & self ) -> bool {
let sys = get_operating_system ( ) ;
if ! sys . is_desktop ( ) {
return false ;
}
2025-03-30 09:54:08 +08:00
match self . calculate_pending_resources ( ) . await {
Ok ( res ) = > res . is_empty ( ) ,
Err ( _ ) = > false ,
}
2025-03-09 23:32:42 +08:00
}
pub async fn get_plugin_download_link ( & self ) -> FlowyResult < String > {
let ai_config = self . get_local_ai_configuration ( ) . await ? ;
Ok ( ai_config . plugin . url )
}
/// Retrieves model information and updates the current model settings.
pub fn get_llm_setting ( & self ) -> LocalAISetting {
self . resource_service . retrieve_setting ( ) . unwrap_or_default ( )
}
#[ instrument(level = " info " , skip_all, err) ]
pub async fn set_llm_setting ( & self , setting : LocalAISetting ) -> FlowyResult < ( ) > {
self . resource_service . store_setting ( setting ) ? ;
2025-03-30 09:54:08 +08:00
let mut resources = self . calculate_pending_resources ( ) . await ? ;
if let Some ( resource ) = resources . pop ( ) {
2025-03-09 23:32:42 +08:00
chat_notification_builder (
APPFLOWY_AI_NOTIFICATION_KEY ,
ChatNotification ::LocalAIResourceUpdated ,
)
2025-03-30 09:54:08 +08:00
. payload ( LackOfAIResourcePB {
resource_desc : resource . desc ( ) ,
} )
2025-03-09 23:32:42 +08:00
. send ( ) ;
}
Ok ( ( ) )
}
2025-03-30 09:54:08 +08:00
pub async fn get_lack_of_resource ( & self ) -> Option < String > {
let mut resources = self . calculate_pending_resources ( ) . await . ok ( ) ? ;
resources . pop ( ) . map ( | r | r . desc ( ) )
2025-03-09 23:32:42 +08:00
}
2025-03-30 09:54:08 +08:00
pub async fn calculate_pending_resources ( & self ) -> FlowyResult < Vec < PendingResource > > {
let mut resources = vec! [ ] ;
2025-03-09 23:32:42 +08:00
let app_path = ollama_plugin_path ( ) ;
2025-03-11 13:21:58 +08:00
if ! is_plugin_ready ( ) {
2025-03-11 09:32:20 +08:00
trace! ( " [LLM Resource] offline app not found: {:?} " , app_path ) ;
2025-03-30 09:54:08 +08:00
resources . push ( PendingResource ::PluginExecutableNotReady ) ;
return Ok ( resources ) ;
2025-03-09 23:32:42 +08:00
}
let setting = self . get_llm_setting ( ) ;
let client = Client ::builder ( ) . timeout ( Duration ::from_secs ( 5 ) ) . build ( ) ? ;
match client . get ( & setting . ollama_server_url ) . send ( ) . await {
Ok ( resp ) if resp . status ( ) . is_success ( ) = > {
info! (
" [LLM Resource] Ollama server is running at {} " ,
setting . ollama_server_url
) ;
} ,
_ = > {
info! (
" [LLM Resource] Ollama server is not responding at {} " ,
setting . ollama_server_url
) ;
2025-03-30 09:54:08 +08:00
resources . push ( PendingResource ::OllamaServerNotReady ) ;
return Ok ( resources ) ;
2025-03-09 23:32:42 +08:00
} ,
}
2025-03-13 14:01:41 +08:00
let required_models = vec! [ setting . chat_model_name , setting . embedding_model_name ] ;
// Query the /api/tags endpoint to get a structured list of locally available models.
let tags_url = format! ( " {} /api/tags " , setting . ollama_server_url ) ;
match client . get ( & tags_url ) . send ( ) . await {
Ok ( resp ) if resp . status ( ) . is_success ( ) = > {
2025-03-30 09:54:08 +08:00
let tags : TagsResponse = resp . json ( ) . await . map_err ( | e | {
log ::error! (
" [LLM Resource] Failed to parse /api/tags JSON response: {:?} " ,
e
) ;
e
2025-03-13 14:01:41 +08:00
} ) ? ;
// Check each required model is present in the response.
for required in & required_models {
if ! tags . models . iter ( ) . any ( | m | m . name . contains ( required ) ) {
2025-03-09 23:32:42 +08:00
log ::trace! (
2025-03-13 14:01:41 +08:00
" [LLM Resource] required model '{}' not found in API response " ,
required
2025-03-09 23:32:42 +08:00
) ;
2025-03-30 09:54:08 +08:00
resources . push ( PendingResource ::MissingModel ( required . clone ( ) ) ) ;
// Optionally, you could continue checking all models rather than returning early.
return Ok ( resources ) ;
2025-03-09 23:32:42 +08:00
}
}
} ,
2025-03-13 14:01:41 +08:00
_ = > {
2025-03-09 23:32:42 +08:00
error! (
2025-03-13 14:01:41 +08:00
" [LLM Resource] Failed to fetch models from {} (GET /api/tags) " ,
setting . ollama_server_url
2025-03-09 23:32:42 +08:00
) ;
2025-03-30 09:54:08 +08:00
resources . push ( PendingResource ::OllamaServerNotReady ) ;
return Ok ( resources ) ;
2025-03-09 23:32:42 +08:00
} ,
}
2025-03-30 09:54:08 +08:00
Ok ( resources )
2025-03-09 23:32:42 +08:00
}
#[ instrument(level = " info " , skip_all) ]
pub async fn get_plugin_config ( & self , rag_enabled : bool ) -> FlowyResult < OllamaPluginConfig > {
if ! self . is_resource_ready ( ) . await {
2025-03-13 15:49:44 +08:00
return Err ( FlowyError ::new (
ErrorCode ::AppFlowyLAINotReady ,
" AppFlowyLAI not found " ,
) ) ;
2025-03-09 23:32:42 +08:00
}
let llm_setting = self . get_llm_setting ( ) ;
let bin_path = match get_operating_system ( ) {
2025-03-13 17:03:49 +08:00
OperatingSystem ::MacOS | OperatingSystem ::Windows | OperatingSystem ::Linux = > {
ollama_plugin_path ( )
} ,
2025-03-09 23:32:42 +08:00
_ = > {
return Err (
FlowyError ::local_ai_unavailable ( )
. with_context ( " Local AI not available on current platform " ) ,
) ;
} ,
} ;
let mut config = OllamaPluginConfig ::new (
bin_path ,
2025-03-19 10:23:38 +08:00
" af_ollama_plugin " . to_string ( ) ,
2025-03-09 23:32:42 +08:00
llm_setting . chat_model_name . clone ( ) ,
llm_setting . embedding_model_name . clone ( ) ,
Some ( llm_setting . ollama_server_url . clone ( ) ) ,
) ? ;
2025-03-16 09:37:05 +08:00
//config = config.with_log_level("debug".to_string());
2025-03-09 23:32:42 +08:00
if rag_enabled {
let resource_dir = self . resource_dir ( ) ? ;
let persist_directory = resource_dir . join ( " vectorstore " ) ;
if ! persist_directory . exists ( ) {
std ::fs ::create_dir_all ( & persist_directory ) ? ;
}
config . set_rag_enabled ( & persist_directory ) ? ;
}
if cfg! ( debug_assertions ) {
config = config . with_verbose ( true ) ;
}
trace! ( " [AI Chat] config: {:?} " , config ) ;
Ok ( config )
}
/// Fetches the local AI configuration from the resource service.
async fn get_local_ai_configuration ( & self ) -> FlowyResult < LocalAIConfig > {
self
. resource_service
. fetch_local_ai_config ( )
. await
. map_err ( | err | {
error! ( " [LLM Resource] Failed to fetch local ai config: {:?} " , err ) ;
FlowyError ::local_ai ( )
. with_context ( " Can't retrieve model info. Please try again later " . to_string ( ) )
} )
}
pub ( crate ) fn user_model_folder ( & self ) -> FlowyResult < PathBuf > {
self . resource_dir ( ) . map ( | dir | dir . join ( LLM_MODEL_DIR ) )
}
pub ( crate ) fn resource_dir ( & self ) -> FlowyResult < PathBuf > {
let user_data_dir = self . user_service . application_root_dir ( ) ? ;
Ok ( user_data_dir . join ( " ai " ) )
}
}
#[ allow(dead_code) ]
fn bytes_to_readable_format ( bytes : u64 ) -> String {
const BYTES_IN_GIGABYTE : u64 = 1024 * 1024 * 1024 ;
const BYTES_IN_MEGABYTE : u64 = 1024 * 1024 ;
if bytes > = BYTES_IN_GIGABYTE {
let gigabytes = ( bytes as f64 ) / ( BYTES_IN_GIGABYTE as f64 ) ;
format! ( " {:.1} GB " , gigabytes )
} else {
let megabytes = ( bytes as f64 ) / ( BYTES_IN_MEGABYTE as f64 ) ;
format! ( " {:.2} MB " , megabytes )
}
}