chore: fix database rows filter and sort (#6241)

* chore: fix database rows filter and sort

* chore: fix ios build

* chore: fix task id

* chore: fmt

* chore: ios build
This commit is contained in:
Nathan.fooo 2024-09-09 12:54:44 +08:00 committed by GitHub
parent 3caf84b8a0
commit 006ea02bfe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 193 additions and 158 deletions

View File

@ -15,7 +15,7 @@ use lib_infra::util::{get_operating_system, OperatingSystem};
use std::path::PathBuf;
use std::sync::Arc;
use crate::local_ai::path::offline_app_path;
use crate::local_ai::watch::offline_app_path;
#[cfg(any(target_os = "windows", target_os = "macos", target_os = "linux"))]
use crate::local_ai::watch::{watch_offline_app, WatchContext};
use tokio::fs::{self};
@ -35,6 +35,7 @@ const LLM_MODEL_DIR: &str = "models";
const DOWNLOAD_FINISH: &str = "finish";
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub enum WatchDiskEvent {
Create,
Remove,

View File

@ -2,7 +2,5 @@ pub mod local_llm_chat;
pub mod local_llm_resource;
mod model_request;
mod path;
pub mod stream_util;
#[cfg(any(target_os = "windows", target_os = "macos", target_os = "linux"))]
pub mod watch;

View File

@ -1,33 +0,0 @@
use std::path::PathBuf;
pub(crate) fn install_path() -> Option<PathBuf> {
#[cfg(not(any(target_os = "windows", target_os = "macos", target_os = "linux")))]
return None;
#[cfg(target_os = "windows")]
return None;
#[cfg(target_os = "macos")]
return Some(PathBuf::from("/usr/local/bin"));
#[cfg(target_os = "linux")]
return None;
}
pub(crate) fn offline_app_path() -> PathBuf {
#[cfg(not(any(target_os = "windows", target_os = "macos", target_os = "linux")))]
return PathBuf::new();
#[cfg(any(target_os = "windows", target_os = "macos", target_os = "linux"))]
{
let offline_app = "appflowy_ai_plugin";
#[cfg(target_os = "windows")]
return PathBuf::from(format!("/usr/local/bin/{}", offline_app));
#[cfg(target_os = "macos")]
return PathBuf::from(format!("/usr/local/bin/{}", offline_app));
#[cfg(target_os = "linux")]
return PathBuf::from(format!("/usr/local/bin/{}", offline_app));
}
}

View File

@ -1,18 +1,20 @@
use crate::local_ai::local_llm_resource::WatchDiskEvent;
use crate::local_ai::path::{install_path, offline_app_path};
use flowy_error::{FlowyError, FlowyResult};
use notify::{Event, RecursiveMode, Watcher};
use std::path::PathBuf;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
use tracing::{error, trace};
#[cfg(any(target_os = "windows", target_os = "macos", target_os = "linux"))]
pub struct WatchContext {
#[allow(dead_code)]
watcher: notify::RecommendedWatcher,
pub path: PathBuf,
}
#[cfg(any(target_os = "windows", target_os = "macos", target_os = "linux"))]
pub fn watch_offline_app() -> FlowyResult<(WatchContext, UnboundedReceiver<WatchDiskEvent>)> {
use notify::{Event, Watcher};
let install_path = install_path().ok_or_else(|| {
FlowyError::internal().with_context("Unsupported platform for offline app watching")
})?;
@ -43,7 +45,7 @@ pub fn watch_offline_app() -> FlowyResult<(WatchContext, UnboundedReceiver<Watch
})
.map_err(|err| FlowyError::internal().with_context(err))?;
watcher
.watch(&install_path, RecursiveMode::NonRecursive)
.watch(&install_path, notify::RecursiveMode::NonRecursive)
.map_err(|err| FlowyError::internal().with_context(err))?;
Ok((
@ -54,3 +56,38 @@ pub fn watch_offline_app() -> FlowyResult<(WatchContext, UnboundedReceiver<Watch
rx,
))
}
#[cfg(not(any(target_os = "windows", target_os = "macos", target_os = "linux")))]
pub(crate) fn install_path() -> Option<PathBuf> {
None
}
#[cfg(any(target_os = "windows", target_os = "macos", target_os = "linux"))]
pub(crate) fn install_path() -> Option<PathBuf> {
#[cfg(target_os = "windows")]
return None;
#[cfg(target_os = "macos")]
return Some(PathBuf::from("/usr/local/bin"));
#[cfg(target_os = "linux")]
return None;
}
#[cfg(not(any(target_os = "windows", target_os = "macos", target_os = "linux")))]
pub(crate) fn offline_app_path() -> PathBuf {
PathBuf::new()
}
#[cfg(any(target_os = "windows", target_os = "macos", target_os = "linux"))]
pub(crate) fn offline_app_path() -> PathBuf {
let offline_app = "appflowy_ai_plugin";
#[cfg(target_os = "windows")]
return PathBuf::from(format!("/usr/local/bin/{}", offline_app));
#[cfg(target_os = "macos")]
return PathBuf::from(format!("/usr/local/bin/{}", offline_app));
#[cfg(target_os = "linux")]
return PathBuf::from(format!("/usr/local/bin/{}", offline_app));
}

View File

@ -86,6 +86,18 @@ impl From<RowOrder> for RowMetaPB {
}
}
impl From<&Row> for RowMetaPB {
fn from(data: &Row) -> Self {
Self {
id: data.id.clone().into_inner(),
document_id: None,
icon: None,
is_document_empty: None,
attachment_count: None,
}
}
}
impl From<Row> for RowMetaPB {
fn from(data: Row) -> Self {
Self {

View File

@ -62,7 +62,7 @@ pub(crate) async fn get_all_rows_handler(
let row_details = database_editor.get_all_rows(view_id.as_ref()).await?;
let rows = row_details
.into_iter()
.map(|detail| RowMetaPB::from(detail.as_ref().clone()))
.map(|detail| RowMetaPB::from(detail.as_ref()))
.collect::<Vec<RowMetaPB>>();
data_result_ok(RepeatedRowMetaPB { items: rows })
}

View File

@ -42,6 +42,7 @@ use lib_infra::util::timestamp;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::oneshot::Sender;
use tokio::sync::RwLock as TokioRwLock;
use tokio::sync::{broadcast, oneshot};
use tokio_util::sync::CancellationToken;
@ -1411,117 +1412,74 @@ impl DatabaseEditor {
// Check if the database is currently being opened
if !*self.is_opening.load_full() {
self.is_opening.store(Arc::new(true));
let view_layout = self.database.read().await.get_database_view_layout(view_id);
let new_token = CancellationToken::new();
if let Some(old_token) = self
.database_cancellation
.write()
.await
.replace(new_token.clone())
{
old_token.cancel();
}
let fut = async {
let view_layout = self.database.read().await.get_database_view_layout(view_id);
let new_token = CancellationToken::new();
if let Some(old_token) = self
.database_cancellation
.write()
.await
.replace(new_token.clone())
{
old_token.cancel();
}
let view_editor = self.database_views.get_or_init_view_editor(view_id).await?;
let row_orders = view_editor.get_all_row_orders().await?;
view_editor.set_row_orders(row_orders.clone()).await;
let view_editor = self.database_views.get_or_init_view_editor(view_id).await?;
let row_orders = view_editor.get_all_row_orders().await?;
view_editor.set_row_orders(row_orders.clone()).await;
// Collect database details in a single block holding the `read` lock
let (database_id, fields, is_linked) = {
let database = self.database.read().await;
(
database.get_database_id(),
database
.get_all_field_orders()
.into_iter()
.map(FieldIdPB::from)
.collect::<Vec<_>>(),
database.is_inline_view(view_id),
)
};
let rows = row_orders
.into_iter()
.map(RowMetaPB::from)
.collect::<Vec<RowMetaPB>>();
trace!(
"[Database]: database: {}, num fields: {}, num rows: {}",
database_id,
fields.len(),
rows.len()
);
trace!("[Database]: start loading rows");
let cloned_database = Arc::downgrade(&self.database);
let view_editor = self.database_views.get_or_init_view_editor(view_id).await?;
tokio::spawn(async move {
const CHUNK_SIZE: usize = 10;
let apply_filter_and_sort =
|mut loaded_rows: Vec<Arc<Row>>, view_editor: Arc<DatabaseViewEditor>| async move {
for loaded_row in loaded_rows.iter() {
view_editor
.row_by_row_id
.insert(loaded_row.id.to_string(), loaded_row.clone());
}
tokio::time::sleep(Duration::from_millis(500)).await;
if view_editor.has_filters().await {
trace!("[Database]: filtering rows:{}", loaded_rows.len());
view_editor.v_filter_rows_and_notify(&mut loaded_rows).await;
}
if view_editor.has_sorts().await {
trace!("[Database]: sorting rows:{}", loaded_rows.len());
view_editor.v_sort_rows_and_notify(&mut loaded_rows).await;
}
};
let mut loaded_rows = vec![];
for chunk_row_orders in view_editor.row_orders.read().await.chunks(CHUNK_SIZE) {
match cloned_database.upgrade() {
None => break,
Some(database) => {
for lazy_row in chunk_row_orders {
if let Some(database_row) =
database.read().await.init_database_row(&lazy_row.id).await
{
if let Some(row) = database_row.read().await.get_row() {
loaded_rows.push(Arc::new(row));
}
}
}
// stop init database rows
if new_token.is_cancelled() {
info!("[Database]: stop loading database rows");
return;
}
},
}
tokio::task::yield_now().await;
}
info!("[Database]: Finish loading rows: {}", loaded_rows.len());
apply_filter_and_sort(loaded_rows.clone(), view_editor).await;
if let Some(notify_finish) = notify_finish {
let _ = notify_finish.send(());
}
});
Ok::<_, FlowyError>(DatabasePB {
id: database_id,
fields,
rows,
layout_type: view_layout.into(),
is_linked,
})
// Collect database details in a single block holding the `read` lock
let (database_id, fields, is_linked) = {
let database = self.database.read().await;
(
database.get_database_id(),
database
.get_all_field_orders()
.into_iter()
.map(FieldIdPB::from)
.collect::<Vec<_>>(),
database.is_inline_view(view_id),
)
};
let result = fut.await;
let mut row_metas = row_orders
.into_iter()
.map(RowMetaPB::from)
.collect::<Vec<RowMetaPB>>();
trace!(
"[Database]: database: {}, num fields: {}, num rows: {}",
database_id,
fields.len(),
row_metas.len()
);
let view_editor = self.database_views.get_or_init_view_editor(view_id).await?;
// Load rows asynchronously
// if the number of rows is less than 100 or notify_finish is not None, it will wait until all
// rows are loaded
let should_wait = notify_finish.is_some() || row_metas.len() < 100;
let (tx, rx) = oneshot::channel();
self.async_load_rows(view_editor, Some(tx), new_token);
if should_wait {
if let Ok(rows) = rx.await {
row_metas = rows
.into_iter()
.map(|row| RowMetaPB::from(row.as_ref()))
.collect::<Vec<RowMetaPB>>();
}
}
if let Some(tx) = notify_finish {
let _ = tx.send(());
}
let result = Ok(DatabasePB {
id: database_id,
fields,
rows: row_metas,
layout_type: view_layout.into(),
is_linked,
});
// Mark that the opening process is complete
self.is_opening.store(Arc::new(false));
// Clear cancellation token
@ -1541,6 +1499,69 @@ impl DatabaseEditor {
}
}
fn async_load_rows(
&self,
view_editor: Arc<DatabaseViewEditor>,
notify_finish: Option<Sender<Vec<Arc<Row>>>>,
new_token: CancellationToken,
) {
trace!("[Database]: start loading rows");
let cloned_database = Arc::downgrade(&self.database);
tokio::spawn(async move {
const CHUNK_SIZE: usize = 20;
let apply_filter_and_sort =
|mut loaded_rows: Vec<Arc<Row>>, view_editor: Arc<DatabaseViewEditor>| async move {
for loaded_row in loaded_rows.iter() {
view_editor
.row_by_row_id
.insert(loaded_row.id.to_string(), loaded_row.clone());
}
if view_editor.has_filters().await {
trace!("[Database]: filtering rows:{}", loaded_rows.len());
view_editor.v_filter_rows_and_notify(&mut loaded_rows).await;
}
if view_editor.has_sorts().await {
trace!("[Database]: sorting rows:{}", loaded_rows.len());
view_editor.v_sort_rows_and_notify(&mut loaded_rows).await;
}
loaded_rows
};
let mut loaded_rows = vec![];
for chunk_row_orders in view_editor.row_orders.read().await.chunks(CHUNK_SIZE) {
match cloned_database.upgrade() {
None => break,
Some(database) => {
for lazy_row in chunk_row_orders {
if let Some(database_row) =
database.read().await.init_database_row(&lazy_row.id).await
{
if let Some(row) = database_row.read().await.get_row() {
loaded_rows.push(Arc::new(row));
}
}
}
// stop init database rows
if new_token.is_cancelled() {
info!("[Database]: stop loading database rows");
return;
}
},
}
tokio::task::yield_now().await;
}
info!("[Database]: Finish loading all rows: {}", loaded_rows.len());
let loaded_rows = apply_filter_and_sort(loaded_rows, view_editor).await;
if let Some(notify_finish) = notify_finish {
let _ = notify_finish.send(loaded_rows);
}
});
}
pub async fn export_csv(&self, style: CSVFormat) -> FlowyResult<String> {
let database = self.database.clone();
let database_guard = database.read().await;

View File

@ -371,7 +371,7 @@ impl FilterController {
&filters,
) {
if is_visible {
let row_meta = RowMetaPB::from(row.as_ref().clone());
let row_meta = RowMetaPB::from(row.as_ref());
visible_rows.push(InsertedRowPB::new(row_meta).with_index(index as i32))
} else {
invisible_rows.push(row.id.clone());

View File

@ -115,7 +115,7 @@ where
if !no_status_group_rows.is_empty() {
changeset
.inserted_rows
.push(InsertedRowPB::new(RowMetaPB::from(row.clone())));
.push(InsertedRowPB::new(RowMetaPB::from(row)));
no_status_group.add_row(row.clone());
}
@ -246,7 +246,7 @@ where
let changeset = GroupRowsNotificationPB::insert(
group.id.clone(),
vec![InsertedRowPB {
row_meta: (*row).clone().into(),
row_meta: row.into(),
index: Some(index as i32),
is_new: true,
}],
@ -265,7 +265,7 @@ where
let changeset = GroupRowsNotificationPB::insert(
no_status_group.id.clone(),
vec![InsertedRowPB {
row_meta: (*row).clone().into(),
row_meta: row.into(),
index: Some(index as i32),
is_new: true,
}],

View File

@ -66,7 +66,7 @@ impl GroupCustomize for CheckboxGroupController {
if is_not_contained {
changeset
.inserted_rows
.push(InsertedRowPB::new(RowMetaPB::from(row.clone())));
.push(InsertedRowPB::new(RowMetaPB::from(row)));
group.add_row(row.clone());
}
}
@ -82,7 +82,7 @@ impl GroupCustomize for CheckboxGroupController {
if is_not_contained {
changeset
.inserted_rows
.push(InsertedRowPB::new(RowMetaPB::from(row.clone())));
.push(InsertedRowPB::new(RowMetaPB::from(row)));
group.add_row(row.clone());
}
}

View File

@ -77,7 +77,7 @@ impl GroupController for DefaultGroupController {
vec![GroupRowsNotificationPB::insert(
self.group.id.clone(),
vec![InsertedRowPB {
row_meta: (*row).clone().into(),
row_meta: row.into(),
index: Some(index as i32),
is_new: true,
}],

View File

@ -98,7 +98,7 @@ impl GroupCustomize for URLGroupController {
if !group.contains_row(&row.id) {
changeset
.inserted_rows
.push(InsertedRowPB::new(RowMetaPB::from(row.clone())));
.push(InsertedRowPB::new(RowMetaPB::from(row)));
group.add_row(row.clone());
}
} else if group.contains_row(&row.id) {

View File

@ -13,7 +13,7 @@ impl TaskStore {
pub fn new() -> Self {
Self {
tasks: HashMap::new(),
task_id_counter: AtomicU32::new(0),
task_id_counter: AtomicU32::new(1),
}
}
@ -45,7 +45,6 @@ impl TaskStore {
}
pub(crate) fn next_task_id(&self) -> TaskId {
let old = self.task_id_counter.fetch_add(1, SeqCst);
old + 1
self.task_id_counter.fetch_add(1, SeqCst)
}
}