293 lines
11 KiB
Rust
Raw Normal View History

2021-10-13 11:10:29 +08:00
use crate::{
entities::trash::{RepeatedTrash, Trash, TrashIdentifier, TrashIdentifiers, TrashType},
2021-10-13 23:11:45 +08:00
errors::{WorkspaceError, WorkspaceResult},
module::{WorkspaceDatabase, WorkspaceUser},
2021-10-14 14:34:22 +08:00
notify::{send_anonymous_dart_notification, WorkspaceNotification},
services::{helper::spawn, server::Server},
sql_tables::trash::TrashTableSql,
2021-10-13 11:10:29 +08:00
};
use crossbeam_utils::thread;
2021-10-13 23:11:45 +08:00
use flowy_database::SqliteConnection;
use std::sync::Arc;
use tokio::sync::{broadcast, mpsc};
2021-10-13 23:11:45 +08:00
2021-10-13 11:10:29 +08:00
pub struct TrashCan {
2021-10-16 21:22:59 +08:00
pub database: Arc<dyn WorkspaceDatabase>,
2021-10-13 23:11:45 +08:00
notify: broadcast::Sender<TrashEvent>,
server: Server,
user: Arc<dyn WorkspaceUser>,
2021-10-13 11:10:29 +08:00
}
impl TrashCan {
pub fn new(database: Arc<dyn WorkspaceDatabase>, server: Server, user: Arc<dyn WorkspaceUser>) -> Self {
2021-10-13 23:11:45 +08:00
let (tx, _) = broadcast::channel(10);
Self {
database,
notify: tx,
server,
user,
}
2021-10-13 23:11:45 +08:00
}
2021-10-16 21:22:59 +08:00
pub(crate) fn init(&self) -> Result<(), WorkspaceError> { Ok(()) }
2021-10-13 23:11:45 +08:00
#[tracing::instrument(level = "debug", skip(self), fields(putback) err)]
pub async fn putback(&self, trash_id: &str) -> WorkspaceResult<()> {
let (tx, mut rx) = mpsc::channel::<WorkspaceResult<()>>(1);
let trash_table = TrashTableSql::read(trash_id, &*self.database.db_connection()?)?;
2021-10-16 21:22:59 +08:00
let _ = thread::scope(|_s| {
let conn = self.database.db_connection()?;
conn.immediate_transaction::<_, WorkspaceError, _>(|| {
2021-10-16 21:22:59 +08:00
let _ = TrashTableSql::delete_trash(trash_id, &*conn)?;
2021-10-17 23:08:39 +08:00
notify_trash_num_changed(TrashTableSql::read_all(&conn)?);
2021-10-16 21:22:59 +08:00
Ok(())
})?;
2021-10-16 21:22:59 +08:00
Ok::<(), WorkspaceError>(())
})
.unwrap()?;
let identifier = TrashIdentifier {
id: trash_table.id,
ty: trash_table.ty.into(),
};
let _ = self.delete_trash_on_server(TrashIdentifiers {
items: vec![identifier.clone()],
delete_all: false,
})?;
tracing::Span::current().record("putback", &format!("{:?}", &identifier).as_str());
let _ = self.notify.send(TrashEvent::Putback(vec![identifier].into(), tx));
let _ = rx.recv().await.unwrap()?;
2021-10-13 23:11:45 +08:00
Ok(())
}
#[tracing::instrument(level = "debug", skip(self) err)]
pub async fn restore_all(&self) -> WorkspaceResult<()> {
let repeated_trash = self.delete_all_trash_on_local()?;
let identifiers: TrashIdentifiers = repeated_trash.items.clone().into();
let (tx, mut rx) = mpsc::channel::<WorkspaceResult<()>>(1);
let _ = self.notify.send(TrashEvent::Putback(identifiers, tx));
let _ = rx.recv().await;
notify_trash_num_changed(RepeatedTrash { items: vec![] });
let _ = self.delete_all_trash_on_server().await?;
Ok(())
}
#[tracing::instrument(level = "debug", skip(self) err)]
pub async fn delete_all(&self) -> WorkspaceResult<()> {
let repeated_trash = self.delete_all_trash_on_local()?;
let identifiers: TrashIdentifiers = repeated_trash.items.clone().into();
let (tx, mut rx) = mpsc::channel::<WorkspaceResult<()>>(1);
let _ = self.notify.send(TrashEvent::Delete(identifiers, tx));
let _ = rx.recv().await;
notify_trash_num_changed(RepeatedTrash { items: vec![] });
let _ = self.delete_all_trash_on_server().await?;
Ok(())
}
fn delete_all_trash_on_local(&self) -> WorkspaceResult<RepeatedTrash> {
let conn = self.database.db_connection()?;
conn.immediate_transaction::<_, WorkspaceError, _>(|| {
let repeated_trash = TrashTableSql::read_all(&*conn)?;
let _ = TrashTableSql::delete_all(&*conn)?;
Ok(repeated_trash)
})
}
#[tracing::instrument(level = "debug", skip(self) err)]
pub async fn delete(&self, trash_identifiers: TrashIdentifiers) -> WorkspaceResult<()> {
let (tx, mut rx) = mpsc::channel::<WorkspaceResult<()>>(1);
let _ = self.notify.send(TrashEvent::Delete(trash_identifiers.clone(), tx));
let _ = rx.recv().await;
let conn = self.database.db_connection()?;
conn.immediate_transaction::<_, WorkspaceError, _>(|| {
for trash_identifier in &trash_identifiers.items {
let _ = TrashTableSql::delete_trash(&trash_identifier.id, &conn)?;
}
Ok(())
})?;
notify_trash_num_changed(TrashTableSql::read_all(&conn)?);
let _ = self.delete_trash_on_server(trash_identifiers)?;
2021-10-13 23:11:45 +08:00
Ok(())
}
// [[ transaction ]]
// https://www.tutlane.com/tutorial/sqlite/sqlite-transactions-begin-commit-rollback
// We can use these commands only when we are performing INSERT, UPDATE, and
// DELETE operations. Its not possible for us to use these commands to
// CREATE and DROP tables operations because those are auto-commit in the
// database.
#[tracing::instrument(level = "debug", skip(self, trash), err)]
pub async fn add<T: Into<Trash>>(&self, trash: Vec<T>) -> Result<(), WorkspaceError> {
let (tx, mut rx) = mpsc::channel::<WorkspaceResult<()>>(1);
let repeated_trash = trash.into_iter().map(|t| t.into()).collect::<Vec<Trash>>();
let identifiers = repeated_trash
.iter()
.map(|t| t.into())
.collect::<Vec<TrashIdentifier>>();
let _ = thread::scope(|_s| {
let conn = self.database.db_connection()?;
conn.immediate_transaction::<_, WorkspaceError, _>(|| {
let _ = TrashTableSql::create_trash(repeated_trash.clone(), &*conn)?;
let _ = self.create_trash_on_server(repeated_trash);
2021-10-17 23:08:39 +08:00
notify_trash_num_changed(TrashTableSql::read_all(&conn)?);
Ok(())
})?;
Ok::<(), WorkspaceError>(())
})
.unwrap()?;
let _ = self.notify.send(TrashEvent::NewTrash(identifiers.into(), tx));
let _ = rx.recv().await.unwrap()?;
2021-10-14 14:34:22 +08:00
Ok(())
}
pub fn subscribe(&self) -> broadcast::Receiver<TrashEvent> { self.notify.subscribe() }
pub fn read_trash(&self, conn: &SqliteConnection) -> Result<RepeatedTrash, WorkspaceError> {
let repeated_trash = TrashTableSql::read_all(&*conn)?;
let _ = self.read_trash_on_server()?;
Ok(repeated_trash)
}
pub fn trash_ids(&self, conn: &SqliteConnection) -> Result<Vec<String>, WorkspaceError> {
let ids = TrashTableSql::read_all(&*conn)?
.into_inner()
.into_iter()
.map(|item| item.id)
.collect::<Vec<String>>();
Ok(ids)
}
}
impl TrashCan {
#[tracing::instrument(level = "debug", skip(self, trash), err)]
fn create_trash_on_server<T: Into<TrashIdentifiers>>(&self, trash: T) -> WorkspaceResult<()> {
let token = self.user.token()?;
let trash_identifiers = trash.into();
let server = self.server.clone();
// TODO: retry?
let _ = tokio::spawn(async move {
match server.create_trash(&token, trash_identifiers).await {
Ok(_) => {},
Err(e) => log::error!("Create trash failed: {:?}", e),
}
});
Ok(())
}
2021-10-14 14:34:22 +08:00
#[tracing::instrument(level = "debug", skip(self, trash), err)]
fn delete_trash_on_server<T: Into<TrashIdentifiers>>(&self, trash: T) -> WorkspaceResult<()> {
let token = self.user.token()?;
let trash_identifiers = trash.into();
let server = self.server.clone();
let _ = tokio::spawn(async move {
match server.delete_trash(&token, trash_identifiers).await {
Ok(_) => {},
Err(e) => log::error!("Delete trash failed: {:?}", e),
}
});
2021-10-13 23:11:45 +08:00
Ok(())
}
#[tracing::instrument(level = "debug", skip(self), err)]
fn read_trash_on_server(&self) -> WorkspaceResult<()> {
let token = self.user.token()?;
let server = self.server.clone();
let pool = self.database.db_pool()?;
spawn(async move {
match server.read_trash(&token).await {
Ok(repeated_trash) => {
match pool.get() {
Ok(conn) => {
let result = conn.immediate_transaction::<_, WorkspaceError, _>(|| {
TrashTableSql::create_trash(repeated_trash.items.clone(), &*conn)
});
match result {
Ok(_) => {
// FIXME: User may modify the trash(add/putback) before the flying request comes
// back that will cause the trash list to be outdated.
// TODO: impl with operation transform
notify_trash_num_changed(repeated_trash);
},
Err(e) => log::error!("Save trash failed: {:?}", e),
}
},
Err(e) => log::error!("Require db connection failed: {:?}", e),
}
},
Err(e) => log::error!("Read trash failed: {:?}", e),
}
});
Ok(())
}
#[tracing::instrument(level = "debug", skip(self), err)]
async fn delete_all_trash_on_server(&self) -> WorkspaceResult<()> {
let token = self.user.token()?;
let server = self.server.clone();
server.delete_trash(&token, TrashIdentifiers::all()).await
}
}
#[tracing::instrument(skip(repeated_trash), fields(trash_count))]
fn notify_trash_num_changed(repeated_trash: RepeatedTrash) {
tracing::Span::current().record("trash_count", &repeated_trash.len());
send_anonymous_dart_notification(WorkspaceNotification::TrashUpdated)
.payload(repeated_trash)
.send();
}
#[derive(Clone)]
pub enum TrashEvent {
NewTrash(TrashIdentifiers, mpsc::Sender<WorkspaceResult<()>>),
Putback(TrashIdentifiers, mpsc::Sender<WorkspaceResult<()>>),
Delete(TrashIdentifiers, mpsc::Sender<WorkspaceResult<()>>),
}
impl TrashEvent {
pub fn select(self, s: TrashType) -> Option<TrashEvent> {
match self {
TrashEvent::Putback(mut identifiers, sender) => {
identifiers.items.retain(|item| item.ty == s);
if identifiers.items.is_empty() {
None
} else {
Some(TrashEvent::Putback(identifiers, sender))
}
},
TrashEvent::Delete(mut identifiers, sender) => {
identifiers.items.retain(|item| item.ty == s);
if identifiers.items.is_empty() {
None
} else {
Some(TrashEvent::Delete(identifiers, sender))
}
},
TrashEvent::NewTrash(mut identifiers, sender) => {
identifiers.items.retain(|item| item.ty == s);
if identifiers.items.is_empty() {
None
} else {
Some(TrashEvent::NewTrash(identifiers, sender))
}
},
}
}
2021-10-13 11:10:29 +08:00
}