340 lines
12 KiB
Rust
Raw Normal View History

2021-10-13 11:10:29 +08:00
use crate::{
dart_notification::{send_anonymous_dart_notification, FolderNotification},
2021-12-31 10:32:25 +08:00
entities::trash::{RepeatedTrash, RepeatedTrashId, Trash, TrashId, TrashType},
2021-12-14 18:04:51 +08:00
errors::{FlowyError, FlowyResult},
2022-01-30 10:33:21 +08:00
event_map::{FolderCouldServiceV1, WorkspaceUser},
2022-01-17 11:55:36 +08:00
services::persistence::{FolderPersistence, FolderPersistenceTransaction},
2021-10-13 11:10:29 +08:00
};
2022-01-14 09:09:25 +08:00
use flowy_folder_data_model::revision::TrashRevision;
2021-12-06 15:49:21 +08:00
use std::{fmt::Formatter, sync::Arc};
use tokio::sync::{broadcast, mpsc};
2021-10-13 23:11:45 +08:00
2021-12-06 15:49:21 +08:00
pub struct TrashController {
2022-01-17 11:55:36 +08:00
persistence: Arc<FolderPersistence>,
2021-10-13 23:11:45 +08:00
notify: broadcast::Sender<TrashEvent>,
2022-01-17 11:55:36 +08:00
cloud_service: Arc<dyn FolderCouldServiceV1>,
user: Arc<dyn WorkspaceUser>,
2021-10-13 11:10:29 +08:00
}
2021-12-06 15:49:21 +08:00
impl TrashController {
2022-01-10 23:45:59 +08:00
pub fn new(
2022-01-17 11:55:36 +08:00
persistence: Arc<FolderPersistence>,
cloud_service: Arc<dyn FolderCouldServiceV1>,
2022-01-10 23:45:59 +08:00
user: Arc<dyn WorkspaceUser>,
) -> Self {
2021-10-13 23:11:45 +08:00
let (tx, _) = broadcast::channel(10);
Self {
2022-01-14 09:09:25 +08:00
persistence,
notify: tx,
2022-01-10 23:45:59 +08:00
cloud_service,
user,
}
2021-10-13 23:11:45 +08:00
}
2021-10-16 21:22:59 +08:00
2021-10-13 23:11:45 +08:00
#[tracing::instrument(level = "debug", skip(self), fields(putback) err)]
2021-12-14 18:04:51 +08:00
pub async fn putback(&self, trash_id: &str) -> FlowyResult<()> {
let (tx, mut rx) = mpsc::channel::<FlowyResult<()>>(1);
2022-01-20 23:51:11 +08:00
let trash = self
.persistence
.begin_transaction(|transaction| {
let mut repeated_trash = transaction.read_trash(Some(trash_id.to_owned()))?;
let _ = transaction.delete_trash(Some(vec![trash_id.to_owned()]))?;
notify_trash_changed(transaction.read_trash(None)?);
if repeated_trash.is_empty() {
return Err(FlowyError::internal().context("Try to put back trash is not exists"));
}
Ok(repeated_trash.pop().unwrap())
})
.await?;
2021-10-16 21:22:59 +08:00
2021-12-31 10:32:25 +08:00
let identifier = TrashId {
2022-01-14 09:09:25 +08:00
id: trash.id,
ty: trash.ty,
};
2021-12-31 10:32:25 +08:00
let _ = self.delete_trash_on_server(RepeatedTrashId {
items: vec![identifier.clone()],
delete_all: false,
})?;
tracing::Span::current().record("putback", &format!("{:?}", &identifier).as_str());
let _ = self.notify.send(TrashEvent::Putback(vec![identifier].into(), tx));
let _ = rx.recv().await.unwrap()?;
2021-10-13 23:11:45 +08:00
Ok(())
}
#[tracing::instrument(level = "debug", skip(self) err)]
2022-01-22 18:48:43 +08:00
pub async fn restore_all_trash(&self) -> FlowyResult<()> {
let trash_identifier: RepeatedTrashId = self
2022-01-20 23:51:11 +08:00
.persistence
.begin_transaction(|transaction| {
let trash = transaction.read_trash(None);
let _ = transaction.delete_trash(None);
trash
})
.await?
.into();
2021-10-28 13:42:39 +08:00
2021-12-14 18:04:51 +08:00
let (tx, mut rx) = mpsc::channel::<FlowyResult<()>>(1);
let _ = self.notify.send(TrashEvent::Putback(trash_identifier, tx));
let _ = rx.recv().await;
2021-10-31 11:41:22 +08:00
notify_trash_changed(RepeatedTrash { items: vec![] });
let _ = self.delete_all_trash_on_server().await?;
Ok(())
}
2021-10-28 13:42:39 +08:00
#[tracing::instrument(level = "debug", skip(self), err)]
2022-01-22 18:48:43 +08:00
pub async fn delete_all_trash(&self) -> FlowyResult<()> {
let all_trash_identifiers: RepeatedTrashId = self
2022-01-14 09:09:25 +08:00
.persistence
2022-01-20 23:51:11 +08:00
.begin_transaction(|transaction| transaction.read_trash(None))
.await?
.into();
let _ = self.delete_with_identifiers(all_trash_identifiers).await?;
2021-10-31 11:41:22 +08:00
notify_trash_changed(RepeatedTrash { items: vec![] });
let _ = self.delete_all_trash_on_server().await?;
Ok(())
}
2021-10-28 13:42:39 +08:00
#[tracing::instrument(level = "debug", skip(self), err)]
2021-12-31 10:32:25 +08:00
pub async fn delete(&self, trash_identifiers: RepeatedTrashId) -> FlowyResult<()> {
2021-10-28 13:42:39 +08:00
let _ = self.delete_with_identifiers(trash_identifiers.clone()).await?;
let trash_revs = self
2022-01-14 09:09:25 +08:00
.persistence
2022-01-20 23:51:11 +08:00
.begin_transaction(|transaction| transaction.read_trash(None))
.await?;
notify_trash_changed(trash_revs);
2021-10-28 13:42:39 +08:00
let _ = self.delete_trash_on_server(trash_identifiers)?;
Ok(())
}
2021-10-28 13:42:39 +08:00
#[tracing::instrument(level = "debug", skip(self), fields(delete_trash_ids), err)]
2021-12-31 10:32:25 +08:00
pub async fn delete_with_identifiers(&self, trash_identifiers: RepeatedTrashId) -> FlowyResult<()> {
2021-12-14 18:04:51 +08:00
let (tx, mut rx) = mpsc::channel::<FlowyResult<()>>(1);
2021-10-28 13:42:39 +08:00
tracing::Span::current().record("delete_trash_ids", &format!("{}", trash_identifiers).as_str());
let _ = self.notify.send(TrashEvent::Delete(trash_identifiers.clone(), tx));
2021-10-28 13:42:39 +08:00
match rx.recv().await {
2022-01-24 17:35:58 +08:00
None => {}
2021-10-28 13:42:39 +08:00
Some(result) => match result {
2022-01-24 17:35:58 +08:00
Ok(_) => {}
2021-10-28 13:42:39 +08:00
Err(e) => log::error!("{}", e),
},
}
2022-01-20 23:51:11 +08:00
let _ = self
.persistence
.begin_transaction(|transaction| {
let ids = trash_identifiers
.items
.into_iter()
.map(|item| item.id)
.collect::<Vec<_>>();
transaction.delete_trash(Some(ids))
})
.await?;
2022-01-14 09:09:25 +08:00
2021-10-13 23:11:45 +08:00
Ok(())
}
// [[ transaction ]]
// https://www.tutlane.com/tutorial/sqlite/sqlite-transactions-begin-commit-rollback
// We can use these commands only when we are performing INSERT, UPDATE, and
// DELETE operations. Its not possible for us to use these commands to
// CREATE and DROP tables operations because those are auto-commit in the
// database.
#[tracing::instrument(name = "add_trash", level = "debug", skip(self, trash), fields(trash_ids), err)]
pub async fn add<T: Into<TrashRevision>>(&self, trash: Vec<T>) -> Result<(), FlowyError> {
2021-12-14 18:04:51 +08:00
let (tx, mut rx) = mpsc::channel::<FlowyResult<()>>(1);
let trash_revs: Vec<TrashRevision> = trash.into_iter().map(|t| t.into()).collect();
let identifiers = trash_revs.iter().map(|t| t.into()).collect::<Vec<TrashId>>();
2021-10-31 11:41:22 +08:00
tracing::Span::current().record(
"trash_ids",
&format!(
"{:?}",
identifiers
.iter()
.map(|identifier| format!("{:?}:{}", identifier.ty, identifier.id))
.collect::<Vec<_>>()
)
.as_str(),
);
2022-01-20 23:51:11 +08:00
let _ = self
.persistence
.begin_transaction(|transaction| {
let _ = transaction.create_trash(trash_revs.clone())?;
let _ = self.create_trash_on_server(trash_revs);
2022-01-20 23:51:11 +08:00
notify_trash_changed(transaction.read_trash(None)?);
Ok(())
})
.await?;
let _ = self.notify.send(TrashEvent::NewTrash(identifiers.into(), tx));
let _ = rx.recv().await.unwrap()?;
2021-10-14 14:34:22 +08:00
Ok(())
}
2022-01-24 17:35:58 +08:00
pub fn subscribe(&self) -> broadcast::Receiver<TrashEvent> {
self.notify.subscribe()
}
2022-01-20 23:51:11 +08:00
pub async fn read_trash(&self) -> Result<RepeatedTrash, FlowyError> {
let items: Vec<Trash> = self
2022-01-14 09:09:25 +08:00
.persistence
2022-01-20 23:51:11 +08:00
.begin_transaction(|transaction| transaction.read_trash(None))
.await?
.into_iter()
.map(|trash_rev| trash_rev.into())
.collect();
let _ = self.read_trash_on_server()?;
Ok(RepeatedTrash { items })
}
2022-01-14 09:09:25 +08:00
pub fn read_trash_ids<'a>(
&self,
2022-01-17 11:55:36 +08:00
transaction: &'a (dyn FolderPersistenceTransaction + 'a),
2022-01-14 09:09:25 +08:00
) -> Result<Vec<String>, FlowyError> {
let ids = transaction
2022-01-17 11:55:36 +08:00
.read_trash(None)?
.into_iter()
.map(|item| item.id)
.collect::<Vec<String>>();
Ok(ids)
}
}
2021-12-06 15:49:21 +08:00
impl TrashController {
2022-01-23 22:33:47 +08:00
#[tracing::instrument(level = "trace", skip(self, trash), err)]
2021-12-31 10:32:25 +08:00
fn create_trash_on_server<T: Into<RepeatedTrashId>>(&self, trash: T) -> FlowyResult<()> {
let token = self.user.token()?;
let trash_identifiers = trash.into();
2022-01-10 23:45:59 +08:00
let server = self.cloud_service.clone();
// TODO: retry?
let _ = tokio::spawn(async move {
match server.create_trash(&token, trash_identifiers).await {
2022-01-24 17:35:58 +08:00
Ok(_) => {}
Err(e) => log::error!("Create trash failed: {:?}", e),
}
});
Ok(())
}
2021-10-14 14:34:22 +08:00
2022-01-23 22:33:47 +08:00
#[tracing::instrument(level = "trace", skip(self, trash), err)]
2021-12-31 10:32:25 +08:00
fn delete_trash_on_server<T: Into<RepeatedTrashId>>(&self, trash: T) -> FlowyResult<()> {
let token = self.user.token()?;
let trash_identifiers = trash.into();
2022-01-10 23:45:59 +08:00
let server = self.cloud_service.clone();
let _ = tokio::spawn(async move {
match server.delete_trash(&token, trash_identifiers).await {
2022-01-24 17:35:58 +08:00
Ok(_) => {}
Err(e) => log::error!("Delete trash failed: {:?}", e),
}
});
2021-10-13 23:11:45 +08:00
Ok(())
}
2022-01-23 22:33:47 +08:00
#[tracing::instrument(level = "trace", skip(self), err)]
2021-12-14 18:04:51 +08:00
fn read_trash_on_server(&self) -> FlowyResult<()> {
let token = self.user.token()?;
2022-01-10 23:45:59 +08:00
let server = self.cloud_service.clone();
2022-01-14 09:09:25 +08:00
let persistence = self.persistence.clone();
2021-11-09 15:32:57 +08:00
tokio::spawn(async move {
match server.read_trash(&token).await {
Ok(trash_rev) => {
tracing::debug!("Remote trash count: {}", trash_rev.len());
2022-01-20 23:51:11 +08:00
let result = persistence
.begin_transaction(|transaction| {
let _ = transaction.create_trash(trash_rev.clone())?;
2022-01-20 23:51:11 +08:00
transaction.read_trash(None)
})
.await;
2022-01-14 09:09:25 +08:00
match result {
Ok(trash_revs) => {
notify_trash_changed(trash_revs);
2022-01-24 17:35:58 +08:00
}
2022-01-14 09:09:25 +08:00
Err(e) => log::error!("Save trash failed: {:?}", e),
}
2022-01-24 17:35:58 +08:00
}
Err(e) => log::error!("Read trash failed: {:?}", e),
}
});
Ok(())
}
2022-01-23 22:33:47 +08:00
#[tracing::instrument(level = "trace", skip(self), err)]
2021-12-14 18:04:51 +08:00
async fn delete_all_trash_on_server(&self) -> FlowyResult<()> {
let token = self.user.token()?;
2022-01-10 23:45:59 +08:00
let server = self.cloud_service.clone();
2021-12-31 10:32:25 +08:00
server.delete_trash(&token, RepeatedTrashId::all()).await
}
}
2022-04-12 11:13:35 +08:00
#[tracing::instrument(level = "debug", skip(repeated_trash), fields(n_trash))]
fn notify_trash_changed<T: Into<RepeatedTrash>>(repeated_trash: T) {
let repeated_trash = repeated_trash.into();
tracing::Span::current().record("n_trash", &repeated_trash.len());
send_anonymous_dart_notification(FolderNotification::TrashUpdated)
.payload(repeated_trash)
.send();
}
#[derive(Clone)]
pub enum TrashEvent {
2021-12-31 10:32:25 +08:00
NewTrash(RepeatedTrashId, mpsc::Sender<FlowyResult<()>>),
Putback(RepeatedTrashId, mpsc::Sender<FlowyResult<()>>),
Delete(RepeatedTrashId, mpsc::Sender<FlowyResult<()>>),
}
2021-10-31 11:41:22 +08:00
impl std::fmt::Debug for TrashEvent {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
TrashEvent::NewTrash(identifiers, _) => f.write_str(&format!("{:?}", identifiers)),
TrashEvent::Putback(identifiers, _) => f.write_str(&format!("{:?}", identifiers)),
TrashEvent::Delete(identifiers, _) => f.write_str(&format!("{:?}", identifiers)),
}
}
}
impl TrashEvent {
pub fn select(self, s: TrashType) -> Option<TrashEvent> {
match self {
TrashEvent::Putback(mut identifiers, sender) => {
identifiers.items.retain(|item| item.ty == s);
if identifiers.items.is_empty() {
None
} else {
Some(TrashEvent::Putback(identifiers, sender))
}
2022-01-24 17:35:58 +08:00
}
TrashEvent::Delete(mut identifiers, sender) => {
identifiers.items.retain(|item| item.ty == s);
if identifiers.items.is_empty() {
None
} else {
Some(TrashEvent::Delete(identifiers, sender))
}
2022-01-24 17:35:58 +08:00
}
TrashEvent::NewTrash(mut identifiers, sender) => {
identifiers.items.retain(|item| item.ty == s);
if identifiers.items.is_empty() {
None
} else {
Some(TrashEvent::NewTrash(identifiers, sender))
}
2022-01-24 17:35:58 +08:00
}
}
}
2021-10-13 11:10:29 +08:00
}