247 lines
8.6 KiB
Rust
Raw Normal View History

use crate::{
2022-01-13 11:16:26 +08:00
dart_notification::*,
2021-10-30 17:19:50 +08:00
entities::{
app::{App, CreateAppParams, *},
trash::TrashType,
},
errors::*,
2022-01-17 11:55:36 +08:00
module::{FolderCouldServiceV1, WorkspaceUser},
2021-12-06 15:49:21 +08:00
services::{
2022-01-17 11:55:36 +08:00
persistence::{AppChangeset, FolderPersistence, FolderPersistenceTransaction},
2022-01-24 17:35:58 +08:00
TrashController, TrashEvent,
2021-12-06 15:49:21 +08:00
},
};
2022-01-14 09:09:25 +08:00
2021-12-06 15:49:21 +08:00
use futures::{FutureExt, StreamExt};
use std::{collections::HashSet, sync::Arc};
2021-09-01 22:50:22 +08:00
pub(crate) struct AppController {
user: Arc<dyn WorkspaceUser>,
2022-01-17 11:55:36 +08:00
persistence: Arc<FolderPersistence>,
2022-01-14 09:09:25 +08:00
trash_controller: Arc<TrashController>,
2022-01-17 11:55:36 +08:00
cloud_service: Arc<dyn FolderCouldServiceV1>,
}
impl AppController {
2021-10-30 17:19:50 +08:00
pub(crate) fn new(
user: Arc<dyn WorkspaceUser>,
2022-01-17 11:55:36 +08:00
persistence: Arc<FolderPersistence>,
2021-12-06 15:49:21 +08:00
trash_can: Arc<TrashController>,
2022-01-17 11:55:36 +08:00
cloud_service: Arc<dyn FolderCouldServiceV1>,
2021-10-30 17:19:50 +08:00
) -> Self {
Self {
user,
2022-01-14 09:09:25 +08:00
persistence,
trash_controller: trash_can,
2022-01-10 23:45:59 +08:00
cloud_service,
2021-10-30 17:19:50 +08:00
}
}
2022-01-18 22:56:57 +08:00
pub fn initialize(&self) -> Result<(), FlowyError> {
2021-12-29 00:34:00 +08:00
self.listen_trash_controller_event();
2021-10-30 17:19:50 +08:00
Ok(())
2021-07-20 14:03:21 +08:00
}
#[tracing::instrument(level = "debug", skip(self, params), fields(name = %params.name) err)]
2021-12-14 18:04:51 +08:00
pub(crate) async fn create_app_from_params(&self, params: CreateAppParams) -> Result<App, FlowyError> {
2021-09-02 19:57:19 +08:00
let app = self.create_app_on_server(params).await?;
2021-12-29 00:34:00 +08:00
self.create_app_on_local(app).await
2021-11-08 19:19:02 +08:00
}
2021-12-29 00:34:00 +08:00
pub(crate) async fn create_app_on_local(&self, app: App) -> Result<App, FlowyError> {
2022-01-20 23:51:11 +08:00
let _ = self
.persistence
.begin_transaction(|transaction| {
let _ = transaction.create_app(app.clone())?;
let _ = notify_apps_changed(&app.workspace_id, self.trash_controller.clone(), &transaction)?;
Ok(())
})
.await?;
2021-07-19 22:44:37 +08:00
Ok(app)
}
2021-12-31 10:32:25 +08:00
pub(crate) async fn read_app(&self, params: AppId) -> Result<App, FlowyError> {
2022-01-20 23:51:11 +08:00
let app = self
.persistence
.begin_transaction(|transaction| {
let app = transaction.read_app(&params.app_id)?;
let trash_ids = self.trash_controller.read_trash_ids(&transaction)?;
if trash_ids.contains(&app.id) {
return Err(FlowyError::record_not_found());
}
Ok(app)
})
.await?;
2021-10-30 17:19:50 +08:00
let _ = self.read_app_on_server(params)?;
2022-01-14 09:09:25 +08:00
Ok(app)
2021-09-07 17:12:03 +08:00
}
2021-12-14 18:04:51 +08:00
pub(crate) async fn update_app(&self, params: UpdateAppParams) -> Result<(), FlowyError> {
2022-01-13 11:16:26 +08:00
let changeset = AppChangeset::new(params.clone());
let app_id = changeset.id.clone();
2022-01-20 23:51:11 +08:00
let app = self
.persistence
.begin_transaction(|transaction| {
let _ = transaction.update_app(changeset)?;
let app = transaction.read_app(&app_id)?;
Ok(app)
})
.await?;
send_dart_notification(&app_id, FolderNotification::AppUpdated)
2022-01-14 09:09:25 +08:00
.payload(app)
.send();
let _ = self.update_app_on_server(params)?;
Ok(())
}
2021-10-30 17:19:50 +08:00
2022-01-20 23:51:11 +08:00
pub(crate) async fn read_local_apps(&self, ids: Vec<String>) -> Result<Vec<App>, FlowyError> {
let apps = self
.persistence
.begin_transaction(|transaction| {
let mut apps = vec![];
for id in ids {
apps.push(transaction.read_app(&id)?);
}
Ok(apps)
})
.await?;
2022-01-14 09:09:25 +08:00
Ok(apps)
2021-10-30 17:19:50 +08:00
}
2021-09-02 19:57:19 +08:00
}
2021-07-20 14:03:21 +08:00
2021-09-02 19:57:19 +08:00
impl AppController {
2022-01-23 22:33:47 +08:00
#[tracing::instrument(level = "trace", skip(self), err)]
2021-12-14 18:04:51 +08:00
async fn create_app_on_server(&self, params: CreateAppParams) -> Result<App, FlowyError> {
2021-09-02 19:57:19 +08:00
let token = self.user.token()?;
2022-01-10 23:45:59 +08:00
let app = self.cloud_service.create_app(&token, params).await?;
2021-09-02 19:57:19 +08:00
Ok(app)
}
2022-01-23 22:33:47 +08:00
#[tracing::instrument(level = "trace", skip(self), err)]
2021-12-14 18:04:51 +08:00
fn update_app_on_server(&self, params: UpdateAppParams) -> Result<(), FlowyError> {
2021-09-02 19:57:19 +08:00
let token = self.user.token()?;
2022-01-10 23:45:59 +08:00
let server = self.cloud_service.clone();
2021-11-09 15:32:57 +08:00
tokio::spawn(async move {
2021-09-02 19:57:19 +08:00
match server.update_app(&token, params).await {
2022-01-24 17:35:58 +08:00
Ok(_) => {}
2021-09-02 19:57:19 +08:00
Err(e) => {
// TODO: retry?
log::error!("Update app failed: {:?}", e);
2022-01-24 17:35:58 +08:00
}
2021-09-02 19:57:19 +08:00
}
});
Ok(())
}
2022-01-23 22:33:47 +08:00
#[tracing::instrument(level = "trace", skip(self), err)]
2021-12-31 10:32:25 +08:00
fn read_app_on_server(&self, params: AppId) -> Result<(), FlowyError> {
2021-09-02 19:57:19 +08:00
let token = self.user.token()?;
2022-01-10 23:45:59 +08:00
let server = self.cloud_service.clone();
2022-01-14 09:09:25 +08:00
let persistence = self.persistence.clone();
2021-11-09 15:32:57 +08:00
tokio::spawn(async move {
2021-09-13 13:05:46 +08:00
match server.read_app(&token, params).await {
2022-01-14 09:09:25 +08:00
Ok(Some(app)) => {
2022-01-20 23:51:11 +08:00
match persistence
.begin_transaction(|transaction| transaction.create_app(app.clone()))
.await
{
2022-01-14 09:09:25 +08:00
Ok(_) => {
send_dart_notification(&app.id, FolderNotification::AppUpdated)
2022-01-14 09:09:25 +08:00
.payload(app)
.send();
2022-01-24 17:35:58 +08:00
}
2022-01-14 09:09:25 +08:00
Err(e) => log::error!("Save app failed: {:?}", e),
}
2022-01-24 17:35:58 +08:00
}
Ok(None) => {}
Err(e) => log::error!("Read app failed: {:?}", e),
2021-09-02 19:57:19 +08:00
}
});
Ok(())
2021-07-20 14:03:21 +08:00
}
2021-10-30 17:19:50 +08:00
2021-12-29 00:34:00 +08:00
fn listen_trash_controller_event(&self) {
2022-01-14 09:09:25 +08:00
let mut rx = self.trash_controller.subscribe();
let persistence = self.persistence.clone();
let trash_controller = self.trash_controller.clone();
2021-10-30 17:19:50 +08:00
let _ = tokio::spawn(async move {
loop {
let mut stream = Box::pin(rx.recv().into_stream().filter_map(|result| async move {
match result {
Ok(event) => event.select(TrashType::App),
Err(_e) => None,
}
}));
2021-11-27 19:19:41 +08:00
if let Some(event) = stream.next().await {
2022-01-14 09:09:25 +08:00
handle_trash_event(persistence.clone(), trash_controller.clone(), event).await
2021-10-30 17:19:50 +08:00
}
}
});
}
}
2022-01-14 09:09:25 +08:00
#[tracing::instrument(level = "trace", skip(persistence, trash_controller))]
async fn handle_trash_event(
2022-01-17 11:55:36 +08:00
persistence: Arc<FolderPersistence>,
2022-01-14 09:09:25 +08:00
trash_controller: Arc<TrashController>,
event: TrashEvent,
) {
2021-10-30 17:19:50 +08:00
match event {
TrashEvent::NewTrash(identifiers, ret) | TrashEvent::Putback(identifiers, ret) => {
2022-01-20 23:51:11 +08:00
let result = persistence
.begin_transaction(|transaction| {
for identifier in identifiers.items {
let app = transaction.read_app(&identifier.id)?;
let _ = notify_apps_changed(&app.workspace_id, trash_controller.clone(), &transaction)?;
}
Ok(())
})
.await;
2022-01-14 09:09:25 +08:00
let _ = ret.send(result).await;
2022-01-24 17:35:58 +08:00
}
2021-10-30 17:19:50 +08:00
TrashEvent::Delete(identifiers, ret) => {
2022-01-20 23:51:11 +08:00
let result = persistence
.begin_transaction(|transaction| {
let mut notify_ids = HashSet::new();
for identifier in identifiers.items {
let app = transaction.read_app(&identifier.id)?;
let _ = transaction.delete_app(&identifier.id)?;
notify_ids.insert(app.workspace_id);
}
2021-10-30 17:19:50 +08:00
2022-01-20 23:51:11 +08:00
for notify_id in notify_ids {
let _ = notify_apps_changed(&notify_id, trash_controller.clone(), &transaction)?;
}
Ok(())
})
.await;
2022-01-14 09:09:25 +08:00
let _ = ret.send(result).await;
2022-01-24 17:35:58 +08:00
}
2021-10-30 17:19:50 +08:00
}
}
2021-10-30 17:19:50 +08:00
2022-01-14 09:09:25 +08:00
#[tracing::instrument(skip(workspace_id, trash_controller, transaction), err)]
fn notify_apps_changed<'a>(
2021-12-06 15:49:21 +08:00
workspace_id: &str,
2022-01-14 09:09:25 +08:00
trash_controller: Arc<TrashController>,
2022-01-17 11:55:36 +08:00
transaction: &'a (dyn FolderPersistenceTransaction + 'a),
2021-12-14 18:04:51 +08:00
) -> FlowyResult<()> {
2022-01-14 09:09:25 +08:00
let repeated_app = read_local_workspace_apps(workspace_id, trash_controller, transaction)?;
send_dart_notification(workspace_id, FolderNotification::WorkspaceAppsChanged)
2021-10-30 17:19:50 +08:00
.payload(repeated_app)
.send();
Ok(())
}
2022-01-14 09:09:25 +08:00
pub fn read_local_workspace_apps<'a>(
2021-10-30 17:19:50 +08:00
workspace_id: &str,
2021-12-06 21:47:21 +08:00
trash_controller: Arc<TrashController>,
2022-01-17 11:55:36 +08:00
transaction: &'a (dyn FolderPersistenceTransaction + 'a),
2021-12-14 18:04:51 +08:00
) -> Result<RepeatedApp, FlowyError> {
2022-01-14 09:09:25 +08:00
let mut apps = transaction.read_workspace_apps(workspace_id)?;
let trash_ids = trash_controller.read_trash_ids(transaction)?;
apps.retain(|app| !trash_ids.contains(&app.id));
2021-10-30 17:19:50 +08:00
Ok(RepeatedApp { items: apps })
}