310 lines
9.6 KiB
Rust
Raw Normal View History

2025-03-09 23:32:42 +08:00
use crate::ai_manager::AIUserService;
use crate::local_ai::controller::LocalAISetting;
use flowy_ai_pub::cloud::LocalAIConfig;
use flowy_error::{ErrorCode, FlowyError, FlowyResult};
use lib_infra::async_trait::async_trait;
use crate::entities::LackOfAIResourcePB;
2025-04-14 23:07:42 +08:00
#[cfg(any(target_os = "macos", target_os = "linux"))]
2025-03-09 23:32:42 +08:00
use crate::local_ai::watch::{watch_offline_app, WatchContext};
use crate::notification::{
chat_notification_builder, ChatNotification, APPFLOWY_AI_NOTIFICATION_KEY,
};
2025-03-27 14:16:58 +08:00
use af_local_ai::ollama_plugin::OllamaPluginConfig;
use af_plugin::core::path::{is_plugin_ready, ollama_plugin_path};
2025-03-09 23:32:42 +08:00
use lib_infra::util::{get_operating_system, OperatingSystem};
use reqwest::Client;
use serde::Deserialize;
2025-03-09 23:32:42 +08:00
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tracing::{error, info, instrument, trace};
#[derive(Debug, Deserialize)]
struct TagsResponse {
models: Vec<ModelEntry>,
}
#[derive(Debug, Deserialize)]
struct ModelEntry {
name: String,
}
2025-03-09 23:32:42 +08:00
#[async_trait]
pub trait LLMResourceService: Send + Sync + 'static {
/// Get local ai configuration from remote server
async fn fetch_local_ai_config(&self) -> Result<LocalAIConfig, anyhow::Error>;
fn store_setting(&self, setting: LocalAISetting) -> Result<(), anyhow::Error>;
fn retrieve_setting(&self) -> Option<LocalAISetting>;
}
const LLM_MODEL_DIR: &str = "models";
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub enum WatchDiskEvent {
Create,
Remove,
}
#[derive(Debug, Clone)]
2025-03-09 23:32:42 +08:00
pub enum PendingResource {
2025-03-10 22:48:09 +08:00
PluginExecutableNotReady,
2025-03-10 15:56:09 +08:00
OllamaServerNotReady,
2025-03-09 23:32:42 +08:00
MissingModel(String),
}
pub struct LocalAIResourceController {
user_service: Arc<dyn AIUserService>,
resource_service: Arc<dyn LLMResourceService>,
resource_notify: tokio::sync::broadcast::Sender<()>,
2025-04-14 23:07:42 +08:00
#[cfg(any(target_os = "macos", target_os = "linux"))]
2025-03-09 23:32:42 +08:00
#[allow(dead_code)]
app_disk_watch: Option<WatchContext>,
app_state_sender: tokio::sync::broadcast::Sender<WatchDiskEvent>,
}
impl LocalAIResourceController {
pub fn new(
user_service: Arc<dyn AIUserService>,
resource_service: impl LLMResourceService,
) -> Self {
let (resource_notify, _) = tokio::sync::broadcast::channel(1);
let (app_state_sender, _) = tokio::sync::broadcast::channel(1);
2025-04-14 22:05:21 +08:00
#[cfg(any(target_os = "macos", target_os = "linux"))]
2025-03-09 23:32:42 +08:00
let mut offline_app_disk_watch: Option<WatchContext> = None;
2025-04-14 22:05:21 +08:00
#[cfg(any(target_os = "macos", target_os = "linux"))]
2025-03-09 23:32:42 +08:00
{
match watch_offline_app() {
Ok((new_watcher, mut rx)) => {
let sender = app_state_sender.clone();
tokio::spawn(async move {
while let Some(event) = rx.recv().await {
if let Err(err) = sender.send(event) {
error!("[LLM Resource] Failed to send offline app state: {:?}", err);
}
}
});
offline_app_disk_watch = Some(new_watcher);
},
Err(err) => {
error!("[LLM Resource] Failed to watch offline app path: {:?}", err);
},
}
}
Self {
user_service,
resource_service: Arc::new(resource_service),
2025-04-14 23:07:42 +08:00
#[cfg(any(target_os = "macos", target_os = "linux"))]
2025-03-09 23:32:42 +08:00
app_disk_watch: offline_app_disk_watch,
app_state_sender,
resource_notify,
}
}
pub fn subscribe_resource_notify(&self) -> tokio::sync::broadcast::Receiver<()> {
self.resource_notify.subscribe()
}
pub fn subscribe_app_state(&self) -> tokio::sync::broadcast::Receiver<WatchDiskEvent> {
self.app_state_sender.subscribe()
}
/// Returns true when all resources are downloaded and ready to use.
pub async fn is_resource_ready(&self) -> bool {
let sys = get_operating_system();
if !sys.is_desktop() {
return false;
}
self
.calculate_pending_resources()
.await
.is_ok_and(|r| r.is_none())
2025-03-09 23:32:42 +08:00
}
pub async fn get_plugin_download_link(&self) -> FlowyResult<String> {
let ai_config = self.get_local_ai_configuration().await?;
Ok(ai_config.plugin.url)
}
/// Retrieves model information and updates the current model settings.
pub fn get_llm_setting(&self) -> LocalAISetting {
self.resource_service.retrieve_setting().unwrap_or_default()
}
#[instrument(level = "info", skip_all, err)]
pub async fn set_llm_setting(&self, setting: LocalAISetting) -> FlowyResult<()> {
self.resource_service.store_setting(setting)?;
if let Some(resource) = self.calculate_pending_resources().await? {
let resource = LackOfAIResourcePB::from(resource);
2025-03-09 23:32:42 +08:00
chat_notification_builder(
APPFLOWY_AI_NOTIFICATION_KEY,
ChatNotification::LocalAIResourceUpdated,
)
.payload(resource.clone())
2025-03-09 23:32:42 +08:00
.send();
return Err(FlowyError::local_ai().with_context(format!("{:?}", resource)));
2025-03-09 23:32:42 +08:00
}
Ok(())
}
pub async fn get_lack_of_resource(&self) -> Option<LackOfAIResourcePB> {
self
.calculate_pending_resources()
.await
.ok()?
.map(Into::into)
2025-03-09 23:32:42 +08:00
}
pub async fn calculate_pending_resources(&self) -> FlowyResult<Option<PendingResource>> {
2025-03-09 23:32:42 +08:00
let app_path = ollama_plugin_path();
2025-03-11 13:21:58 +08:00
if !is_plugin_ready() {
2025-03-11 09:32:20 +08:00
trace!("[LLM Resource] offline app not found: {:?}", app_path);
return Ok(Some(PendingResource::PluginExecutableNotReady));
2025-03-09 23:32:42 +08:00
}
let setting = self.get_llm_setting();
let client = Client::builder().timeout(Duration::from_secs(5)).build()?;
2025-03-09 23:32:42 +08:00
match client.get(&setting.ollama_server_url).send().await {
Ok(resp) if resp.status().is_success() => {
info!(
"[LLM Resource] Ollama server is running at {}",
setting.ollama_server_url
);
},
_ => {
info!(
"[LLM Resource] Ollama server is not responding at {}",
setting.ollama_server_url
);
return Ok(Some(PendingResource::OllamaServerNotReady));
2025-03-09 23:32:42 +08:00
},
}
let required_models = vec![setting.chat_model_name, setting.embedding_model_name];
// Query the /api/tags endpoint to get a structured list of locally available models.
let tags_url = format!("{}/api/tags", setting.ollama_server_url);
match client.get(&tags_url).send().await {
Ok(resp) if resp.status().is_success() => {
let tags: TagsResponse = resp.json().await.inspect_err(|e| {
log::error!("[LLM Resource] Failed to parse /api/tags JSON response: {e:?}")
})?;
2025-04-09 20:07:06 +08:00
// Check if each of our required models exists in the list of available models
trace!("[LLM Resource] ollama available models: {:?}", tags.models);
for required in &required_models {
2025-04-09 20:07:06 +08:00
if !tags
.models
.iter()
.any(|m| m.name == *required || m.name == format!("{}:latest", required))
{
2025-03-09 23:32:42 +08:00
log::trace!(
"[LLM Resource] required model '{}' not found in API response",
required
2025-03-09 23:32:42 +08:00
);
return Ok(Some(PendingResource::MissingModel(required.clone())));
2025-03-09 23:32:42 +08:00
}
}
},
_ => {
2025-03-09 23:32:42 +08:00
error!(
"[LLM Resource] Failed to fetch models from {} (GET /api/tags)",
setting.ollama_server_url
2025-03-09 23:32:42 +08:00
);
return Ok(Some(PendingResource::OllamaServerNotReady));
2025-03-09 23:32:42 +08:00
},
}
Ok(None)
2025-03-09 23:32:42 +08:00
}
#[instrument(level = "info", skip_all)]
pub async fn get_plugin_config(&self, rag_enabled: bool) -> FlowyResult<OllamaPluginConfig> {
if !self.is_resource_ready().await {
2025-03-13 15:49:44 +08:00
return Err(FlowyError::new(
ErrorCode::AppFlowyLAINotReady,
"AppFlowyLAI not found",
));
2025-03-09 23:32:42 +08:00
}
let llm_setting = self.get_llm_setting();
let bin_path = match get_operating_system() {
2025-03-13 17:03:49 +08:00
OperatingSystem::MacOS | OperatingSystem::Windows | OperatingSystem::Linux => {
ollama_plugin_path()
},
2025-03-09 23:32:42 +08:00
_ => {
return Err(
FlowyError::local_ai_unavailable()
.with_context("Local AI not available on current platform"),
);
},
};
let mut config = OllamaPluginConfig::new(
bin_path,
2025-03-19 10:23:38 +08:00
"af_ollama_plugin".to_string(),
2025-03-09 23:32:42 +08:00
llm_setting.chat_model_name.clone(),
llm_setting.embedding_model_name.clone(),
Some(llm_setting.ollama_server_url.clone()),
)?;
2025-03-16 09:37:05 +08:00
//config = config.with_log_level("debug".to_string());
2025-03-09 23:32:42 +08:00
if rag_enabled {
let resource_dir = self.resource_dir()?;
let persist_directory = resource_dir.join("vectorstore");
if !persist_directory.exists() {
std::fs::create_dir_all(&persist_directory)?;
}
config.set_rag_enabled(&persist_directory)?;
}
if cfg!(debug_assertions) {
config = config.with_verbose(true);
}
trace!("[AI Chat] config: {:?}", config);
Ok(config)
}
/// Fetches the local AI configuration from the resource service.
async fn get_local_ai_configuration(&self) -> FlowyResult<LocalAIConfig> {
self
.resource_service
.fetch_local_ai_config()
.await
.map_err(|err| {
error!("[LLM Resource] Failed to fetch local ai config: {:?}", err);
FlowyError::local_ai()
.with_context("Can't retrieve model info. Please try again later".to_string())
})
}
pub(crate) fn user_model_folder(&self) -> FlowyResult<PathBuf> {
self.resource_dir().map(|dir| dir.join(LLM_MODEL_DIR))
}
pub(crate) fn resource_dir(&self) -> FlowyResult<PathBuf> {
let user_data_dir = self.user_service.application_root_dir()?;
Ok(user_data_dir.join("ai"))
}
}
#[allow(dead_code)]
fn bytes_to_readable_format(bytes: u64) -> String {
const BYTES_IN_GIGABYTE: u64 = 1024 * 1024 * 1024;
const BYTES_IN_MEGABYTE: u64 = 1024 * 1024;
if bytes >= BYTES_IN_GIGABYTE {
let gigabytes = (bytes as f64) / (BYTES_IN_GIGABYTE as f64);
format!("{:.1} GB", gigabytes)
} else {
let megabytes = (bytes as f64) / (BYTES_IN_MEGABYTE as f64);
format!("{:.2} MB", megabytes)
}
}