168 lines
5.6 KiB
Rust
Raw Normal View History

use crate::{
entities::doc::{RevId, Revision},
2021-10-02 21:35:06 +08:00
errors::{internal_error, DocResult},
2021-10-01 20:58:13 +08:00
services::doc::{
edit::{
message::{EditMsg, TransformDeltas},
DocId,
},
2021-10-01 20:58:13 +08:00
Document,
},
sql_tables::{DocTableChangeset, DocTableSql},
};
use async_stream::stream;
use flowy_database::ConnectionPool;
use flowy_ot::core::{Delta, OperationTransformable};
use futures::stream::StreamExt;
use std::{convert::TryFrom, sync::Arc};
2021-10-02 21:35:06 +08:00
use tokio::sync::{mpsc, RwLock};
pub struct DocumentEditActor {
doc_id: DocId,
2021-10-01 20:58:13 +08:00
document: Arc<RwLock<Document>>,
pool: Arc<ConnectionPool>,
receiver: Option<mpsc::UnboundedReceiver<EditMsg>>,
}
impl DocumentEditActor {
pub fn new(
doc_id: &str,
delta: Delta,
pool: Arc<ConnectionPool>,
receiver: mpsc::UnboundedReceiver<EditMsg>,
) -> Self {
let doc_id = doc_id.to_string();
2021-10-01 20:58:13 +08:00
let document = Arc::new(RwLock::new(Document::from_delta(delta)));
Self {
doc_id,
document,
pool,
receiver: Some(receiver),
}
}
pub async fn run(mut self) {
let mut receiver = self.receiver.take().expect("Should only call once");
let stream = stream! {
loop {
match receiver.recv().await {
Some(msg) => yield msg,
None => break,
}
}
};
2021-10-01 20:58:13 +08:00
stream
.for_each(|msg| async {
match self.handle_message(msg).await {
Ok(_) => {},
Err(e) => log::error!("{:?}", e),
}
})
.await;
}
2021-10-01 20:58:13 +08:00
async fn handle_message(&self, msg: EditMsg) -> DocResult<()> {
match msg {
EditMsg::Delta { delta, ret } => {
let result = self.compose_delta(delta).await;
let _ = ret.send(result);
},
EditMsg::RemoteRevision { bytes, ret } => {
let revision = Revision::try_from(bytes)?;
let delta = Delta::from_bytes(&revision.delta_data)?;
let rev_id: RevId = revision.rev_id.into();
let (server_prime, client_prime) = self.document.read().await.delta().transform(&delta)?;
let transform_delta = TransformDeltas {
client_prime,
server_prime,
server_rev_id: rev_id,
};
let _ = ret.send(Ok(transform_delta));
},
EditMsg::Insert { index, data, ret } => {
2021-10-01 20:58:13 +08:00
let delta = self.document.write().await.insert(index, data);
let _ = ret.send(delta);
},
EditMsg::Delete { interval, ret } => {
2021-10-01 20:58:13 +08:00
let result = self.document.write().await.delete(interval);
let _ = ret.send(result);
},
EditMsg::Format {
interval,
attribute,
ret,
} => {
2021-10-01 20:58:13 +08:00
let result = self.document.write().await.format(interval, attribute);
let _ = ret.send(result);
},
EditMsg::Replace { interval, data, ret } => {
2021-10-01 20:58:13 +08:00
let result = self.document.write().await.replace(interval, data);
let _ = ret.send(result);
},
EditMsg::CanUndo { ret } => {
2021-10-01 20:58:13 +08:00
let _ = ret.send(self.document.read().await.can_undo());
},
EditMsg::CanRedo { ret } => {
2021-10-01 20:58:13 +08:00
let _ = ret.send(self.document.read().await.can_redo());
},
EditMsg::Undo { ret } => {
2021-10-01 20:58:13 +08:00
let result = self.document.write().await.undo();
let _ = ret.send(result);
},
EditMsg::Redo { ret } => {
2021-10-01 20:58:13 +08:00
let result = self.document.write().await.redo();
let _ = ret.send(result);
},
EditMsg::Doc { ret } => {
2021-10-01 20:58:13 +08:00
let data = self.document.read().await.to_json();
let _ = ret.send(Ok(data));
},
EditMsg::SaveDocument { rev_id, ret } => {
2021-10-01 20:58:13 +08:00
let result = self.save_to_disk(rev_id).await;
let _ = ret.send(result);
},
}
2021-10-01 20:58:13 +08:00
Ok(())
}
async fn compose_delta(&self, delta: Delta) -> DocResult<()> {
let result = self.document.write().await.compose_delta(&delta);
log::debug!(
"Compose push delta: {}. result: {}",
delta.to_json(),
self.document.read().await.to_json()
);
result
}
#[tracing::instrument(level = "debug", skip(self, rev_id), err)]
2021-10-02 21:35:06 +08:00
async fn save_to_disk(&self, rev_id: RevId) -> DocResult<()> {
2021-10-01 20:58:13 +08:00
let data = self.document.read().await.to_json();
let changeset = DocTableChangeset {
id: self.doc_id.clone(),
data,
2021-10-02 21:35:06 +08:00
rev_id: rev_id.into(),
};
let sql = DocTableSql {};
let conn = self.pool.get().map_err(internal_error)?;
let _ = sql.update_doc_table(changeset, &*conn)?;
Ok(())
}
}
// #[tracing::instrument(level = "debug", skip(self, params), err)]
// fn update_doc_on_server(&self, params: UpdateDocParams) -> Result<(),
// DocError> { let token = self.user.token()?;
// let server = self.server.clone();
// tokio::spawn(async move {
// match server.update_doc(&token, params).await {
// Ok(_) => {},
// Err(e) => {
// // TODO: retry?
// log::error!("Update doc failed: {}", e);
// },
// }
// });
// Ok(())
// }