From 75da496128afdc7d75233cee6b8d0b77cdda5eb0 Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Mon, 7 Oct 2024 08:57:42 +0800 Subject: [PATCH] fix: calculation bug (#6489) * chore: upgrade collab * fix: calculate value * chore: optimize calculate * chore: cal when open database * chore: update calculation when filter change * chore: use same runtime --- frontend/rust-lib/dart-ffi/src/lib.rs | 6 +- .../calculation/calculation_entities.rs | 13 + .../src/services/calculations/controller.rs | 177 ++++++++----- .../src/services/calculations/entities.rs | 15 -- .../src/services/calculations/service.rs | 240 +++++++----------- .../src/services/calculations/task.rs | 25 +- .../src/services/database/database_editor.rs | 23 +- .../database_view/view_calculations.rs | 16 +- .../src/services/database_view/view_editor.rs | 51 +++- .../services/database_view/view_operation.rs | 2 +- .../src/services/filter/controller.rs | 11 +- .../src/services/filter/task.rs | 22 +- .../src/services/sort/controller.rs | 1 + .../flowy-database2/src/services/sort/task.rs | 26 +- .../lib-infra/src/priority_task/scheduler.rs | 21 +- .../lib-infra/tests/task_test/script.rs | 21 +- 16 files changed, 364 insertions(+), 306 deletions(-) diff --git a/frontend/rust-lib/dart-ffi/src/lib.rs b/frontend/rust-lib/dart-ffi/src/lib.rs index 984b77697f..bc53505ecf 100644 --- a/frontend/rust-lib/dart-ffi/src/lib.rs +++ b/frontend/rust-lib/dart-ffi/src/lib.rs @@ -144,15 +144,15 @@ pub extern "C" fn init_sdk(_port: i64, data: *mut c_char) -> i64 { .take() .map(|isolate| Arc::new(LogStreamSenderImpl { isolate }) as Arc); let (sender, task_rx) = mpsc::unbounded_channel::(); + let runtime = Arc::new(AFPluginRuntime::new().unwrap()); + let cloned_runtime = runtime.clone(); let handle = std::thread::spawn(move || { - let runtime = Builder::new_multi_thread().enable_all().build().unwrap(); let local_set = LocalSet::new(); - runtime.block_on(local_set.run_until(Runner { rx: task_rx })); + cloned_runtime.block_on(local_set.run_until(Runner { rx: task_rx })); }); *DART_APPFLOWY_CORE.sender.write().unwrap() = Some(sender); *DART_APPFLOWY_CORE.handle.write().unwrap() = Some(handle); - let runtime = Arc::new(AFPluginRuntime::new().unwrap()); let cloned_runtime = runtime.clone(); *DART_APPFLOWY_CORE.core.write().unwrap() = runtime .block_on(async move { Some(AppFlowyCore::new(config, cloned_runtime, log_stream).await) }); diff --git a/frontend/rust-lib/flowy-database2/src/entities/calculation/calculation_entities.rs b/frontend/rust-lib/flowy-database2/src/entities/calculation/calculation_entities.rs index 8137b905d9..36451e9032 100644 --- a/frontend/rust-lib/flowy-database2/src/entities/calculation/calculation_entities.rs +++ b/frontend/rust-lib/flowy-database2/src/entities/calculation/calculation_entities.rs @@ -23,6 +23,19 @@ pub struct CalculationPB { pub value: String, } +impl std::convert::From<&CalculationPB> for Calculation { + fn from(calculation: &CalculationPB) -> Self { + let calculation_type = calculation.calculation_type.into(); + + Self { + id: calculation.id.clone(), + field_id: calculation.field_id.clone(), + calculation_type, + value: calculation.value.clone(), + } + } +} + impl std::convert::From<&Calculation> for CalculationPB { fn from(calculation: &Calculation) -> Self { let calculation_type = calculation.calculation_type.into(); diff --git a/frontend/rust-lib/flowy-database2/src/services/calculations/controller.rs b/frontend/rust-lib/flowy-database2/src/services/calculations/controller.rs index 865c8a22c5..6a1858ea0c 100644 --- a/frontend/rust-lib/flowy-database2/src/services/calculations/controller.rs +++ b/frontend/rust-lib/flowy-database2/src/services/calculations/controller.rs @@ -3,12 +3,13 @@ use std::str::FromStr; use std::sync::Arc; use collab_database::fields::Field; -use collab_database::rows::{Row, RowCell}; +use collab_database::rows::{Cell, Row}; +use dashmap::DashMap; use flowy_error::FlowyResult; +use lib_infra::priority_task::{QualityOfService, Task, TaskContent, TaskDispatcher}; use serde::{Deserialize, Serialize}; use tokio::sync::RwLock as TokioRwLock; - -use lib_infra::priority_task::{QualityOfService, Task, TaskContent, TaskDispatcher}; +use tracing::instrument; use crate::entities::{ CalculationChangesetNotificationPB, CalculationPB, CalculationType, FieldType, @@ -21,10 +22,10 @@ use super::{Calculation, CalculationChangeset, CalculationsService}; #[async_trait] pub trait CalculationsDelegate: Send + Sync + 'static { - async fn get_cells_for_field(&self, view_id: &str, field_id: &str) -> Vec>; + async fn get_cells_for_field(&self, view_id: &str, field_id: &str) -> Vec>; async fn get_field(&self, field_id: &str) -> Option; async fn get_calculation(&self, view_id: &str, field_id: &str) -> Option>; - async fn get_all_calculations(&self, view_id: &str) -> Arc>>; + async fn get_all_calculations(&self, view_id: &str) -> Vec>; async fn update_calculation(&self, view_id: &str, calculation: Calculation); async fn remove_calculation(&self, view_id: &str, calculation_id: &str); } @@ -71,15 +72,16 @@ impl CalculationsController { } pub async fn close(&self) { - if let Ok(mut task_scheduler) = self.task_scheduler.try_write() { - task_scheduler.unregister_handler(&self.handler_id).await; - } else { - tracing::error!("Attempt to get the lock of task_scheduler failed"); - } + self + .task_scheduler + .write() + .await + .unregister_handler(&self.handler_id) + .await; } #[tracing::instrument(name = "schedule_calculation_task", level = "trace", skip(self))] - async fn gen_task(&self, task_type: CalculationEvent, qos: QualityOfService) { + pub(crate) async fn gen_task(&self, task_type: CalculationEvent, qos: QualityOfService) { let task_id = self.task_scheduler.read().await.next_task_id(); let task = Task::new( &self.handler_id, @@ -100,7 +102,7 @@ impl CalculationsController { pub async fn process(&self, predicate: &str) -> FlowyResult<()> { let event_type = CalculationEvent::from_str(predicate).unwrap(); match event_type { - CalculationEvent::RowChanged(row) => self.handle_row_changed(row).await, + CalculationEvent::RowChanged(row) => self.handle_row_changed(&row).await, CalculationEvent::CellUpdated(field_id) => self.handle_cell_changed(field_id).await, CalculationEvent::FieldDeleted(field_id) => self.handle_field_deleted(field_id).await, CalculationEvent::FieldTypeChanged(field_id, new_field_type) => { @@ -108,6 +110,11 @@ impl CalculationsController { .handle_field_type_changed(field_id, new_field_type) .await }, + CalculationEvent::InitialRows(rows) => { + for row in rows { + self.handle_row_changed(row.as_ref()).await; + } + }, } Ok(()) @@ -117,7 +124,7 @@ impl CalculationsController { self .gen_task( CalculationEvent::FieldDeleted(field_id), - QualityOfService::UserInteractive, + QualityOfService::Background, ) .await } @@ -151,7 +158,7 @@ impl CalculationsController { self .gen_task( CalculationEvent::FieldTypeChanged(field_id, new_field_type), - QualityOfService::UserInteractive, + QualityOfService::Background, ) .await } @@ -188,7 +195,7 @@ impl CalculationsController { self .gen_task( CalculationEvent::CellUpdated(field_id), - QualityOfService::UserInteractive, + QualityOfService::Background, ) .await } @@ -200,23 +207,31 @@ impl CalculationsController { .await; if let Some(calculation) = calculation { - let update = self.get_updated_calculation(calculation).await; - if let Some(update) = update { - self + if let Some(field) = self.delegate.get_field(&field_id).await { + let cells = self .delegate - .update_calculation(&self.view_id, update.clone()) + .get_cells_for_field(&self.view_id, &calculation.field_id) .await; - let notification = CalculationChangesetNotificationPB::from_update( - &self.view_id, - vec![CalculationPB::from(&update)], - ); + // Update the calculation + if let Some(update) = self.update_calculation(calculation, &field, cells).await { + self + .delegate + .update_calculation(&self.view_id, update.clone()) + .await; - let _ = self - .notifier - .send(DatabaseViewChanged::CalculationValueNotification( - notification, - )); + // Send notification + let notification = CalculationChangesetNotificationPB::from_update( + &self.view_id, + vec![CalculationPB::from(&update)], + ); + + let _ = self + .notifier + .send(DatabaseViewChanged::CalculationValueNotification( + notification, + )); + } } } } @@ -225,51 +240,43 @@ impl CalculationsController { self .gen_task( CalculationEvent::RowChanged(row), - QualityOfService::UserInteractive, + QualityOfService::Background, ) .await } - async fn handle_row_changed(&self, row: Row) { - let cells = row.cells.iter(); + async fn handle_row_changed(&self, row: &Row) { + let cells = &row.cells; let mut updates = vec![]; + let mut cells_by_field = DashMap::>>::new(); // In case there are calculations where empty cells are counted // as a contribution to the value. if cells.len() == 0 { let calculations = self.delegate.get_all_calculations(&self.view_id).await; - for calculation in calculations.iter() { - let update = self.get_updated_calculation(calculation.clone()).await; - if let Some(update) = update { - updates.push(CalculationPB::from(&update)); - self - .delegate - .update_calculation(&self.view_id, update) - .await; - } + for calculation in calculations.into_iter() { + let cells = self + .get_or_fetch_cells(&calculation.field_id, &mut cells_by_field) + .await; + updates.extend(self.handle_cells_changed(calculation, cells).await); } } // Iterate each cell in the row for cell in cells { - let field_id = cell.0; + let field_id = &cell.0; let calculation = self.delegate.get_calculation(&self.view_id, field_id).await; if let Some(calculation) = calculation { - let update = self.get_updated_calculation(calculation.clone()).await; - - if let Some(update) = update { - updates.push(CalculationPB::from(&update)); - self - .delegate - .update_calculation(&self.view_id, update) - .await; - } + let cells = self + .get_or_fetch_cells(&calculation.field_id, &mut cells_by_field) + .await; + let changes = self.handle_cells_changed(calculation, cells).await; + updates.extend(changes); } } if !updates.is_empty() { let notification = CalculationChangesetNotificationPB::from_update(&self.view_id, updates); - let _ = self .notifier .send(DatabaseViewChanged::CalculationValueNotification( @@ -278,17 +285,58 @@ impl CalculationsController { } } - async fn get_updated_calculation(&self, calculation: Arc) -> Option { - let field_cells = self - .delegate - .get_cells_for_field(&self.view_id, &calculation.field_id) - .await; - let field = self.delegate.get_field(&calculation.field_id).await?; + async fn get_or_fetch_cells<'a>( + &'a self, + field_id: &'a str, + cells_by_field: &'a mut DashMap>>, + ) -> Vec> { + let cells = cells_by_field.get(field_id).map(|entry| entry.to_vec()); + match cells { + None => { + let fetch_cells = self + .delegate + .get_cells_for_field(&self.view_id, field_id) + .await; + cells_by_field.insert(field_id.to_string(), fetch_cells.clone()); + fetch_cells + }, + Some(cells) => cells, + } + } - let value = - self - .calculations_service - .calculate(&field, calculation.calculation_type, field_cells); + /// field_cells will be the cells that belong to the field with field_id + async fn handle_cells_changed( + &self, + calculation: Arc, + field_cells: Vec>, + ) -> Vec { + let mut updates = vec![]; + if let Some(field) = self.delegate.get_field(&calculation.field_id).await { + let update = self + .update_calculation(calculation, &field, field_cells) + .await; + if let Some(update) = update { + updates.push(CalculationPB::from(&update)); + self + .delegate + .update_calculation(&self.view_id, update) + .await; + } + } + + updates + } + + #[instrument(level = "trace", skip_all)] + async fn update_calculation( + &self, + calculation: Arc, + field: &Field, + cells: Vec>, + ) -> Option { + let value = self + .calculations_service + .calculate(&field, calculation.calculation_type, cells); if value != calculation.value { return Some(calculation.with_value(value)); @@ -304,7 +352,7 @@ impl CalculationsController { let mut notification: Option = None; if let Some(insert) = &changeset.insert_calculation { - let row_cells: Vec> = self + let cells = self .delegate .get_cells_for_field(&self.view_id, &insert.field_id) .await; @@ -313,7 +361,7 @@ impl CalculationsController { let value = self .calculations_service - .calculate(&field, insert.calculation_type, row_cells); + .calculate(&field, insert.calculation_type, cells); notification = Some(CalculationChangesetNotificationPB::from_insert( &self.view_id, @@ -352,7 +400,8 @@ impl CalculationsController { } #[derive(Serialize, Deserialize, Clone, Debug)] -enum CalculationEvent { +pub(crate) enum CalculationEvent { + InitialRows(Vec>), RowChanged(Row), CellUpdated(String), FieldTypeChanged(String, FieldType), diff --git a/frontend/rust-lib/flowy-database2/src/services/calculations/entities.rs b/frontend/rust-lib/flowy-database2/src/services/calculations/entities.rs index 42f2b60ca1..34688b7367 100644 --- a/frontend/rust-lib/flowy-database2/src/services/calculations/entities.rs +++ b/frontend/rust-lib/flowy-database2/src/services/calculations/entities.rs @@ -3,8 +3,6 @@ use collab::preclude::Any; use collab_database::views::{CalculationMap, CalculationMapBuilder}; use serde::Deserialize; -use crate::entities::CalculationPB; - #[derive(Debug, Clone, Deserialize)] pub struct Calculation { pub id: String, @@ -31,19 +29,6 @@ impl From for CalculationMap { } } -impl std::convert::From<&CalculationPB> for Calculation { - fn from(calculation: &CalculationPB) -> Self { - let calculation_type = calculation.calculation_type.into(); - - Self { - id: calculation.id.clone(), - field_id: calculation.field_id.clone(), - calculation_type, - value: calculation.value.clone(), - } - } -} - impl TryFrom for Calculation { type Error = anyhow::Error; diff --git a/frontend/rust-lib/flowy-database2/src/services/calculations/service.rs b/frontend/rust-lib/flowy-database2/src/services/calculations/service.rs index bf48136622..758e4744e3 100644 --- a/frontend/rust-lib/flowy-database2/src/services/calculations/service.rs +++ b/frontend/rust-lib/flowy-database2/src/services/calculations/service.rs @@ -1,64 +1,57 @@ use std::sync::Arc; use collab_database::fields::Field; -use collab_database::rows::RowCell; +use collab_database::rows::{Cell, RowCell}; use crate::entities::CalculationType; use crate::services::field::TypeOptionCellExt; +use rayon::prelude::*; -pub struct CalculationsService {} - +pub struct CalculationsService; impl CalculationsService { pub fn new() -> Self { - Self {} + Self } - pub fn calculate( - &self, - field: &Field, - calculation_type: i64, - row_cells: Vec>, - ) -> String { + pub fn calculate(&self, field: &Field, calculation_type: i64, cells: Vec>) -> String { let ty: CalculationType = calculation_type.into(); match ty { - CalculationType::Average => self.calculate_average(field, row_cells), - CalculationType::Max => self.calculate_max(field, row_cells), - CalculationType::Median => self.calculate_median(field, row_cells), - CalculationType::Min => self.calculate_min(field, row_cells), - CalculationType::Sum => self.calculate_sum(field, row_cells), - CalculationType::Count => self.calculate_count(row_cells), - CalculationType::CountEmpty => self.calculate_count_empty(field, row_cells), - CalculationType::CountNonEmpty => self.calculate_count_non_empty(field, row_cells), + CalculationType::Average => self.calculate_average(field, cells), + CalculationType::Max => self.calculate_max(field, cells), + CalculationType::Median => self.calculate_median(field, cells), + CalculationType::Min => self.calculate_min(field, cells), + CalculationType::Sum => self.calculate_sum(field, cells), + CalculationType::Count => self.calculate_count(cells), + CalculationType::CountEmpty => self.calculate_count_empty(field, cells), + CalculationType::CountNonEmpty => self.calculate_count_non_empty(field, cells), } } - fn calculate_average(&self, field: &Field, row_cells: Vec>) -> String { - let mut sum = 0.0; - let mut len = 0.0; + fn calculate_average(&self, field: &Field, cells: Vec>) -> String { if let Some(handler) = TypeOptionCellExt::new(field, None).get_type_option_cell_data_handler() { - for row_cell in row_cells { - if let Some(cell) = &row_cell.cell { - if let Some(value) = handler.handle_numeric_cell(cell) { - sum += value; - len += 1.0; - } - } - } - } + let (sum, len): (f64, usize) = cells + .par_iter() + .filter_map(|cell| handler.handle_numeric_cell(cell)) + .map(|value| (value, 1)) + .reduce( + || (0.0, 0), + |(sum1, len1), (sum2, len2)| (sum1 + sum2, len1 + len2), + ); - if len > 0.0 { - format!("{:.5}", sum / len) + if len > 0 { + format!("{:.5}", sum / len as f64) + } else { + String::new() + } } else { String::new() } } - fn calculate_median(&self, field: &Field, row_cells: Vec>) -> String { - let values = self.reduce_values_f64(field, row_cells, |values| { - values.sort_by(|a, b| a.partial_cmp(b).unwrap()); - values.clone() - }); + fn calculate_median(&self, field: &Field, cells: Vec>) -> String { + let mut values = self.reduce_values_f64(field, cells); + values.par_sort_by(|a, b| a.partial_cmp(b).unwrap()); if !values.is_empty() { format!("{:.5}", Self::median(&values)) @@ -67,8 +60,74 @@ impl CalculationsService { } } + fn calculate_min(&self, field: &Field, cells: Vec>) -> String { + let values = self.reduce_values_f64(field, cells); + if let Some(min) = values.par_iter().min_by(|a, b| a.total_cmp(b)) { + format!("{:.5}", min) + } else { + String::new() + } + } + + fn calculate_max(&self, field: &Field, cells: Vec>) -> String { + let values = self.reduce_values_f64(field, cells); + if let Some(max) = values.par_iter().max_by(|a, b| a.total_cmp(b)) { + format!("{:.5}", max) + } else { + String::new() + } + } + + fn calculate_sum(&self, field: &Field, cells: Vec>) -> String { + let values = self.reduce_values_f64(field, cells); + if !values.is_empty() { + format!("{:.5}", values.par_iter().sum::()) + } else { + String::new() + } + } + + fn calculate_count(&self, cells: Vec>) -> String { + format!("{}", cells.len()) + } + + fn calculate_count_empty(&self, field: &Field, cells: Vec>) -> String { + if let Some(handler) = TypeOptionCellExt::new(field, None).get_type_option_cell_data_handler() { + let empty_count = cells + .par_iter() + .filter(|cell| handler.handle_is_cell_empty(cell, field)) + .count(); + empty_count.to_string() + } else { + "".to_string() + } + } + + fn calculate_count_non_empty(&self, field: &Field, cells: Vec>) -> String { + if let Some(handler) = TypeOptionCellExt::new(field, None).get_type_option_cell_data_handler() { + let non_empty_count = cells + .par_iter() + .filter(|cell| !handler.handle_is_cell_empty(cell, field)) + .count(); + non_empty_count.to_string() + } else { + "".to_string() + } + } + + fn reduce_values_f64(&self, field: &Field, row_cells: Vec>) -> Vec { + if let Some(handler) = TypeOptionCellExt::new(field, None).get_type_option_cell_data_handler() { + row_cells + .par_iter() + .filter_map(|cell| handler.handle_numeric_cell(cell)) + .collect::>() + } else { + vec![] + } + } + fn median(array: &[f64]) -> f64 { - if (array.len() % 2) == 0 { + if array.len() % 2 == 0 { let left = array.len() / 2 - 1; let right = array.len() / 2; (array[left] + array[right]) / 2.0 @@ -76,109 +135,4 @@ impl CalculationsService { array[array.len() / 2] } } - - fn calculate_min(&self, field: &Field, row_cells: Vec>) -> String { - let values = self.reduce_values_f64(field, row_cells, |values| { - values.sort_by(|a, b| a.partial_cmp(b).unwrap()); - values.clone() - }); - - if !values.is_empty() { - let min = values.iter().min_by(|a, b| a.total_cmp(b)); - if let Some(min) = min { - return format!("{:.5}", min); - } - } - - String::new() - } - - fn calculate_max(&self, field: &Field, row_cells: Vec>) -> String { - let values = self.reduce_values_f64(field, row_cells, |values| { - values.sort_by(|a, b| a.partial_cmp(b).unwrap()); - values.clone() - }); - - if !values.is_empty() { - let max = values.iter().max_by(|a, b| a.total_cmp(b)); - if let Some(max) = max { - return format!("{:.5}", max); - } - } - - String::new() - } - - fn calculate_sum(&self, field: &Field, row_cells: Vec>) -> String { - let values = self.reduce_values_f64(field, row_cells, |values| values.clone()); - - if !values.is_empty() { - format!("{:.5}", values.iter().sum::()) - } else { - String::new() - } - } - - fn calculate_count(&self, row_cells: Vec>) -> String { - if !row_cells.is_empty() { - format!("{}", row_cells.len()) - } else { - String::new() - } - } - - fn calculate_count_empty(&self, field: &Field, row_cells: Vec>) -> String { - match TypeOptionCellExt::new(field, None).get_type_option_cell_data_handler() { - Some(handler) if !row_cells.is_empty() => row_cells - .iter() - .filter(|row_cell| { - if let Some(cell) = &row_cell.cell { - handler.handle_is_cell_empty(cell, field) - } else { - true - } - }) - .collect::>() - .len() - .to_string(), - _ => "".to_string(), - } - } - - fn calculate_count_non_empty(&self, field: &Field, row_cells: Vec>) -> String { - match TypeOptionCellExt::new(field, None).get_type_option_cell_data_handler() { - Some(handler) if !row_cells.is_empty() => row_cells - .iter() - .filter(|row_cell| { - if let Some(cell) = &row_cell.cell { - !handler.handle_is_cell_empty(cell, field) - } else { - false - } - }) - .collect::>() - .len() - .to_string(), - _ => "".to_string(), - } - } - - fn reduce_values_f64(&self, field: &Field, row_cells: Vec>, f: F) -> T - where - F: FnOnce(&mut Vec) -> T, - { - let mut values = vec![]; - - if let Some(handler) = TypeOptionCellExt::new(field, None).get_type_option_cell_data_handler() { - for row_cell in row_cells { - if let Some(cell) = &row_cell.cell { - if let Some(value) = handler.handle_numeric_cell(cell) { - values.push(value); - } - } - } - } - - f(&mut values) - } } diff --git a/frontend/rust-lib/flowy-database2/src/services/calculations/task.rs b/frontend/rust-lib/flowy-database2/src/services/calculations/task.rs index b8ae249c4b..073e35c143 100644 --- a/frontend/rust-lib/flowy-database2/src/services/calculations/task.rs +++ b/frontend/rust-lib/flowy-database2/src/services/calculations/task.rs @@ -1,9 +1,9 @@ -use lib_infra::future::BoxResultFuture; +use crate::services::calculations::CalculationsController; +use async_trait::async_trait; + use lib_infra::priority_task::{TaskContent, TaskHandler}; use std::sync::Arc; -use crate::services::calculations::CalculationsController; - pub struct CalculationsTaskHandler { handler_id: String, calculations_controller: Arc, @@ -18,6 +18,7 @@ impl CalculationsTaskHandler { } } +#[async_trait] impl TaskHandler for CalculationsTaskHandler { fn handler_id(&self) -> &str { &self.handler_id @@ -27,16 +28,14 @@ impl TaskHandler for CalculationsTaskHandler { "CalculationsTaskHandler" } - fn run(&self, content: TaskContent) -> BoxResultFuture<(), anyhow::Error> { + async fn run(&self, content: TaskContent) -> Result<(), anyhow::Error> { let calculations_controller = self.calculations_controller.clone(); - Box::pin(async move { - if let TaskContent::Text(predicate) = content { - calculations_controller - .process(&predicate) - .await - .map_err(anyhow::Error::from)?; - } - Ok(()) - }) + if let TaskContent::Text(predicate) = content { + calculations_controller + .process(&predicate) + .await + .map_err(anyhow::Error::from)?; + } + Ok(()) } } diff --git a/frontend/rust-lib/flowy-database2/src/services/database/database_editor.rs b/frontend/rust-lib/flowy-database2/src/services/database/database_editor.rs index c1b5652995..e4809c2ce0 100644 --- a/frontend/rust-lib/flowy-database2/src/services/database/database_editor.rs +++ b/frontend/rust-lib/flowy-database2/src/services/database/database_editor.rs @@ -1670,10 +1670,17 @@ impl DatabaseEditor { loaded_rows.len(), blocking_read ); - let loaded_rows = apply_filter_and_sort(loaded_rows, view_editor).await; + let loaded_rows = apply_filter_and_sort(loaded_rows, view_editor.clone()).await; + + // Update calculation values + let calculate_rows = loaded_rows.clone(); + if let Some(notify_finish) = notify_finish { let _ = notify_finish.send(loaded_rows); } + tokio::spawn(async move { + let _ = view_editor.v_calculate_rows(calculate_rows).await; + }); }); } @@ -1976,14 +1983,12 @@ impl DatabaseViewOperation for DatabaseViewOperationImpl { self.database.write().await.remove_row(row_id).await } - async fn get_cells_for_field(&self, view_id: &str, field_id: &str) -> Vec> { - let cells = self - .database - .read() - .await - .get_cells_for_field(view_id, field_id) - .await; - cells.into_iter().map(Arc::new).collect() + async fn get_cells_for_field(&self, view_id: &str, field_id: &str) -> Vec { + let editor = self.editor_by_view_id.read().await.get(view_id).cloned(); + match editor { + None => vec![], + Some(editor) => editor.v_get_cells_for_field(field_id).await, + } } async fn get_cell_in_row(&self, field_id: &str, row_id: &RowId) -> Arc { diff --git a/frontend/rust-lib/flowy-database2/src/services/database_view/view_calculations.rs b/frontend/rust-lib/flowy-database2/src/services/database_view/view_calculations.rs index e6f5da1134..301be9fbe9 100644 --- a/frontend/rust-lib/flowy-database2/src/services/database_view/view_calculations.rs +++ b/frontend/rust-lib/flowy-database2/src/services/database_view/view_calculations.rs @@ -2,7 +2,7 @@ use async_trait::async_trait; use collab_database::fields::Field; use std::sync::Arc; -use collab_database::rows::RowCell; +use collab_database::rows::Cell; use crate::services::calculations::{ Calculation, CalculationsController, CalculationsDelegate, CalculationsTaskHandler, @@ -46,8 +46,14 @@ struct DatabaseViewCalculationsDelegateImpl(Arc); #[async_trait] impl CalculationsDelegate for DatabaseViewCalculationsDelegateImpl { - async fn get_cells_for_field(&self, view_id: &str, field_id: &str) -> Vec> { - self.0.get_cells_for_field(view_id, field_id).await + async fn get_cells_for_field(&self, view_id: &str, field_id: &str) -> Vec> { + self + .0 + .get_cells_for_field(view_id, field_id) + .await + .into_iter() + .filter_map(|row_cell| row_cell.cell.map(Arc::new)) + .collect() } async fn get_field(&self, field_id: &str) -> Option { @@ -70,7 +76,7 @@ impl CalculationsDelegate for DatabaseViewCalculationsDelegateImpl { self.0.remove_calculation(view_id, calculation_id).await } - async fn get_all_calculations(&self, view_id: &str) -> Arc>> { - self.0.get_all_calculations(view_id).await.into() + async fn get_all_calculations(&self, view_id: &str) -> Vec> { + self.0.get_all_calculations(view_id).await } } diff --git a/frontend/rust-lib/flowy-database2/src/services/database_view/view_editor.rs b/frontend/rust-lib/flowy-database2/src/services/database_view/view_editor.rs index 84d038964d..2d388b0d25 100644 --- a/frontend/rust-lib/flowy-database2/src/services/database_view/view_editor.rs +++ b/frontend/rust-lib/flowy-database2/src/services/database_view/view_editor.rs @@ -11,7 +11,9 @@ use crate::entities::{ SortChangesetNotificationPB, SortPB, UpdateCalculationChangesetPB, UpdateSortPayloadPB, }; use crate::notification::{send_notification, DatabaseNotification}; -use crate::services::calculations::{Calculation, CalculationChangeset, CalculationsController}; +use crate::services::calculations::{ + Calculation, CalculationChangeset, CalculationEvent, CalculationsController, +}; use crate::services::cell::{CellBuilder, CellCache}; use crate::services::database::{database_view_setting_pb_from_view, DatabaseRowEvent, UpdatedRow}; use crate::services::database_view::view_calculations::make_calculations_controller; @@ -36,10 +38,11 @@ use crate::services::sort::{Sort, SortChangeset, SortController}; use collab_database::database::{gen_database_calculation_id, gen_database_sort_id, gen_row_id}; use collab_database::entity::DatabaseView; use collab_database::fields::Field; -use collab_database::rows::{Cells, Row, RowDetail, RowId}; +use collab_database::rows::{Cells, Row, RowCell, RowDetail, RowId}; use collab_database::views::{DatabaseLayout, RowOrder}; use dashmap::DashMap; use flowy_error::{FlowyError, FlowyResult}; +use lib_infra::priority_task::QualityOfService; use lib_infra::util::timestamp; use tokio::sync::{broadcast, RwLock}; use tracing::{instrument, trace, warn}; @@ -387,6 +390,27 @@ impl DatabaseViewEditor { rows } + pub async fn v_get_cells_for_field(&self, field_id: &str) -> Vec { + let row_orders = self.delegate.get_all_row_orders(&self.view_id).await; + let rows = self.delegate.get_all_rows(&self.view_id, row_orders).await; + let rows = self.v_filter_rows(rows).await; + let rows = rows + .into_iter() + .filter_map(|row| { + row + .cells + .get(field_id) + .map(|cell| RowCell::new(row.id.clone(), Some(cell.clone()))) + }) + .collect::>(); + trace!( + "[Database]: get cells for field: {}, total rows:{}", + field_id, + rows.len() + ); + rows + } + pub async fn v_get_row(&self, row_id: &RowId) -> Option<(usize, Arc)> { self.delegate.get_row_detail(&self.view_id, row_id).await } @@ -650,6 +674,18 @@ impl DatabaseViewEditor { Ok(()) } + pub async fn v_calculate_rows(&self, rows: Vec>) -> FlowyResult<()> { + self + .calculations_controller + .gen_task( + CalculationEvent::InitialRows(rows), + QualityOfService::UserInteractive, + ) + .await; + + Ok(()) + } + pub async fn v_delete_all_sorts(&self) -> FlowyResult<()> { let all_sorts = self.v_get_all_sorts().await; self.sort_controller.write().await.delete_all_sorts().await; @@ -669,11 +705,9 @@ impl DatabaseViewEditor { &self, params: UpdateCalculationChangesetPB, ) -> FlowyResult<()> { - let calculation_id = match params.calculation_id { - None => gen_database_calculation_id(), - Some(calculation_id) => calculation_id, - }; - + let calculation_id = params + .calculation_id + .unwrap_or_else(|| gen_database_calculation_id()); let calculation = Calculation::none( calculation_id, params.field_id, @@ -748,6 +782,9 @@ impl DatabaseViewEditor { self.v_group_by_field(&field_id).await?; } + let row_orders = self.delegate.get_all_row_orders(&self.view_id).await; + let rows = self.delegate.get_all_rows(&self.view_id, row_orders).await; + self.v_calculate_rows(rows).await?; Ok(()) } diff --git a/frontend/rust-lib/flowy-database2/src/services/database_view/view_operation.rs b/frontend/rust-lib/flowy-database2/src/services/database_view/view_operation.rs index 14e382f9ae..30816587d6 100644 --- a/frontend/rust-lib/flowy-database2/src/services/database_view/view_operation.rs +++ b/frontend/rust-lib/flowy-database2/src/services/database_view/view_operation.rs @@ -62,7 +62,7 @@ pub trait DatabaseViewOperation: Send + Sync + 'static { async fn remove_row(&self, row_id: &RowId) -> Option; - async fn get_cells_for_field(&self, view_id: &str, field_id: &str) -> Vec>; + async fn get_cells_for_field(&self, view_id: &str, field_id: &str) -> Vec; async fn get_cell_in_row(&self, field_id: &str, row_id: &RowId) -> Arc; diff --git a/frontend/rust-lib/flowy-database2/src/services/filter/controller.rs b/frontend/rust-lib/flowy-database2/src/services/filter/controller.rs index b4178afeb9..27d7733b93 100644 --- a/frontend/rust-lib/flowy-database2/src/services/filter/controller.rs +++ b/frontend/rust-lib/flowy-database2/src/services/filter/controller.rs @@ -116,11 +116,12 @@ impl FilterController { } pub async fn close(&self) { - if let Ok(mut task_scheduler) = self.task_scheduler.try_write() { - task_scheduler.unregister_handler(&self.handler_id).await; - } else { - tracing::error!("Try to get the lock of task_scheduler failed"); - } + self + .task_scheduler + .write() + .await + .unregister_handler(&self.handler_id) + .await; } #[tracing::instrument(name = "schedule_filter_task", level = "trace", skip(self))] diff --git a/frontend/rust-lib/flowy-database2/src/services/filter/task.rs b/frontend/rust-lib/flowy-database2/src/services/filter/task.rs index 03ed453f89..e6c2ae905a 100644 --- a/frontend/rust-lib/flowy-database2/src/services/filter/task.rs +++ b/frontend/rust-lib/flowy-database2/src/services/filter/task.rs @@ -1,5 +1,6 @@ use crate::services::filter::FilterController; -use lib_infra::future::BoxResultFuture; +use async_trait::async_trait; + use lib_infra::priority_task::{TaskContent, TaskHandler}; use std::sync::Arc; @@ -17,6 +18,7 @@ impl FilterTaskHandler { } } +#[async_trait] impl TaskHandler for FilterTaskHandler { fn handler_id(&self) -> &str { &self.handler_id @@ -26,16 +28,14 @@ impl TaskHandler for FilterTaskHandler { "FilterTaskHandler" } - fn run(&self, content: TaskContent) -> BoxResultFuture<(), anyhow::Error> { + async fn run(&self, content: TaskContent) -> Result<(), anyhow::Error> { let filter_controller = self.filter_controller.clone(); - Box::pin(async move { - if let TaskContent::Text(predicate) = content { - filter_controller - .process(&predicate) - .await - .map_err(anyhow::Error::from)?; - } - Ok(()) - }) + if let TaskContent::Text(predicate) = content { + filter_controller + .process(&predicate) + .await + .map_err(anyhow::Error::from)?; + } + Ok(()) } } diff --git a/frontend/rust-lib/flowy-database2/src/services/sort/controller.rs b/frontend/rust-lib/flowy-database2/src/services/sort/controller.rs index 3b522105bf..f46534494b 100644 --- a/frontend/rust-lib/flowy-database2/src/services/sort/controller.rs +++ b/frontend/rust-lib/flowy-database2/src/services/sort/controller.rs @@ -6,6 +6,7 @@ use std::sync::Arc; use collab_database::fields::Field; use collab_database::rows::{Cell, Row, RowId}; + use rayon::prelude::ParallelSliceMut; use serde::{Deserialize, Serialize}; use tokio::sync::RwLock as TokioRwLock; diff --git a/frontend/rust-lib/flowy-database2/src/services/sort/task.rs b/frontend/rust-lib/flowy-database2/src/services/sort/task.rs index 107f318dec..6b77e87a33 100644 --- a/frontend/rust-lib/flowy-database2/src/services/sort/task.rs +++ b/frontend/rust-lib/flowy-database2/src/services/sort/task.rs @@ -1,5 +1,6 @@ use crate::services::sort::SortController; -use lib_infra::future::BoxResultFuture; +use async_trait::async_trait; + use lib_infra::priority_task::{TaskContent, TaskHandler}; use std::sync::Arc; use tokio::sync::RwLock; @@ -19,6 +20,7 @@ impl SortTaskHandler { } } +#[async_trait] impl TaskHandler for SortTaskHandler { fn handler_id(&self) -> &str { &self.handler_id @@ -28,18 +30,16 @@ impl TaskHandler for SortTaskHandler { "SortTaskHandler" } - fn run(&self, content: TaskContent) -> BoxResultFuture<(), anyhow::Error> { + async fn run(&self, content: TaskContent) -> Result<(), anyhow::Error> { let sort_controller = self.sort_controller.clone(); - Box::pin(async move { - if let TaskContent::Text(predicate) = content { - sort_controller - .write() - .await - .process(&predicate) - .await - .map_err(anyhow::Error::from)?; - } - Ok(()) - }) + if let TaskContent::Text(predicate) = content { + sort_controller + .write() + .await + .process(&predicate) + .await + .map_err(anyhow::Error::from)?; + } + Ok(()) } } diff --git a/frontend/rust-lib/lib-infra/src/priority_task/scheduler.rs b/frontend/rust-lib/lib-infra/src/priority_task/scheduler.rs index 1e1bb33c2d..96798b742a 100644 --- a/frontend/rust-lib/lib-infra/src/priority_task/scheduler.rs +++ b/frontend/rust-lib/lib-infra/src/priority_task/scheduler.rs @@ -2,13 +2,14 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; -use crate::future::BoxResultFuture; use crate::priority_task::queue::TaskQueue; use crate::priority_task::store::TaskStore; use crate::priority_task::{Task, TaskContent, TaskId, TaskState}; use anyhow::Error; +use async_trait::async_trait; use tokio::sync::{watch, RwLock}; use tokio::time::interval; +use tracing::trace; pub struct TaskDispatcher { queue: TaskQueue, @@ -105,6 +106,11 @@ impl TaskDispatcher { return; } + trace!( + "Add task: handler:{}, task:{:?}", + task.handler_id, + task.content + ); self.queue.push(&task); self.store.insert_task(task); self.notify(); @@ -160,6 +166,7 @@ impl TaskRunner { } } +#[async_trait] pub trait TaskHandler: Send + Sync + 'static { fn handler_id(&self) -> &str; @@ -167,9 +174,10 @@ pub trait TaskHandler: Send + Sync + 'static { "" } - fn run(&self, content: TaskContent) -> BoxResultFuture<(), Error>; + async fn run(&self, content: TaskContent) -> Result<(), Error>; } +#[async_trait] impl TaskHandler for Box where T: TaskHandler, @@ -182,11 +190,12 @@ where (**self).handler_name() } - fn run(&self, content: TaskContent) -> BoxResultFuture<(), Error> { - (**self).run(content) + async fn run(&self, content: TaskContent) -> Result<(), Error> { + (**self).run(content).await } } +#[async_trait] impl TaskHandler for Arc where T: TaskHandler, @@ -199,7 +208,7 @@ where (**self).handler_name() } - fn run(&self, content: TaskContent) -> BoxResultFuture<(), Error> { - (**self).run(content) + async fn run(&self, content: TaskContent) -> Result<(), Error> { + (**self).run(content).await } } diff --git a/frontend/rust-lib/lib-infra/tests/task_test/script.rs b/frontend/rust-lib/lib-infra/tests/task_test/script.rs index c419d981f2..2446376e94 100644 --- a/frontend/rust-lib/lib-infra/tests/task_test/script.rs +++ b/frontend/rust-lib/lib-infra/tests/task_test/script.rs @@ -170,22 +170,21 @@ impl RefCountValue for MockBlobTaskHandler { async fn did_remove(&self) {} } +#[async_trait] impl TaskHandler for MockBlobTaskHandler { fn handler_id(&self) -> &str { "2" } - fn run(&self, content: TaskContent) -> BoxResultFuture<(), Error> { - Box::pin(async move { - match content { - TaskContent::Text(_) => panic!("Only support blob"), - TaskContent::Blob(bytes) => { - let _msg = String::from_utf8(bytes).unwrap(); - tokio::time::sleep(Duration::from_millis(20)).await; - }, - } - Ok(()) - }) + async fn run(&self, content: TaskContent) -> Result<(), Error> { + match content { + TaskContent::Text(_) => panic!("Only support blob"), + TaskContent::Blob(bytes) => { + let _msg = String::from_utf8(bytes).unwrap(); + tokio::time::sleep(Duration::from_millis(20)).await; + }, + } + Ok(()) } }