2021-10-01 19:39:08 +08:00
|
|
|
use crate::{
|
2021-10-04 17:38:56 +08:00
|
|
|
entities::doc::{RevId, Revision},
|
2021-10-06 15:23:38 +08:00
|
|
|
errors::DocResult,
|
2021-10-01 20:58:13 +08:00
|
|
|
services::doc::{
|
2021-10-27 23:56:41 +08:00
|
|
|
edit::message::{DocumentMsg, TransformDeltas},
|
2021-10-01 20:58:13 +08:00
|
|
|
Document,
|
|
|
|
},
|
2021-10-01 19:39:08 +08:00
|
|
|
};
|
|
|
|
use async_stream::stream;
|
2021-10-04 17:38:56 +08:00
|
|
|
use flowy_ot::core::{Delta, OperationTransformable};
|
2021-10-01 19:39:08 +08:00
|
|
|
use futures::stream::StreamExt;
|
2021-10-30 14:44:43 +08:00
|
|
|
use std::{convert::TryFrom, sync::Arc};
|
2021-10-02 21:35:06 +08:00
|
|
|
use tokio::sync::{mpsc, RwLock};
|
2021-10-01 19:39:08 +08:00
|
|
|
|
2021-10-05 11:46:56 +08:00
|
|
|
pub struct DocumentActor {
|
2021-11-03 22:04:45 +08:00
|
|
|
doc_id: String,
|
2021-10-01 20:58:13 +08:00
|
|
|
document: Arc<RwLock<Document>>,
|
2021-10-05 11:46:56 +08:00
|
|
|
receiver: Option<mpsc::UnboundedReceiver<DocumentMsg>>,
|
2021-10-01 19:39:08 +08:00
|
|
|
}
|
|
|
|
|
2021-10-05 11:46:56 +08:00
|
|
|
impl DocumentActor {
|
2021-11-03 22:04:45 +08:00
|
|
|
pub fn new(doc_id: &str, delta: Delta, receiver: mpsc::UnboundedReceiver<DocumentMsg>) -> Self {
|
2021-10-01 20:58:13 +08:00
|
|
|
let document = Arc::new(RwLock::new(Document::from_delta(delta)));
|
2021-10-01 19:39:08 +08:00
|
|
|
Self {
|
2021-11-03 22:04:45 +08:00
|
|
|
doc_id: doc_id.to_owned(),
|
2021-10-01 19:39:08 +08:00
|
|
|
document,
|
|
|
|
receiver: Some(receiver),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn run(mut self) {
|
|
|
|
let mut receiver = self.receiver.take().expect("Should only call once");
|
|
|
|
let stream = stream! {
|
|
|
|
loop {
|
|
|
|
match receiver.recv().await {
|
|
|
|
Some(msg) => yield msg,
|
|
|
|
None => break,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
2021-10-01 20:58:13 +08:00
|
|
|
stream
|
|
|
|
.for_each(|msg| async {
|
|
|
|
match self.handle_message(msg).await {
|
|
|
|
Ok(_) => {},
|
|
|
|
Err(e) => log::error!("{:?}", e),
|
|
|
|
}
|
|
|
|
})
|
|
|
|
.await;
|
2021-10-01 19:39:08 +08:00
|
|
|
}
|
|
|
|
|
2021-10-05 11:46:56 +08:00
|
|
|
async fn handle_message(&self, msg: DocumentMsg) -> DocResult<()> {
|
2021-10-01 19:39:08 +08:00
|
|
|
match msg {
|
2021-10-05 11:46:56 +08:00
|
|
|
DocumentMsg::Delta { delta, ret } => {
|
2021-11-04 12:47:41 +08:00
|
|
|
let result = self.composed_delta(delta).await;
|
2021-10-01 19:39:08 +08:00
|
|
|
let _ = ret.send(result);
|
|
|
|
},
|
2021-10-05 11:46:56 +08:00
|
|
|
DocumentMsg::RemoteRevision { bytes, ret } => {
|
2021-10-04 17:38:56 +08:00
|
|
|
let revision = Revision::try_from(bytes)?;
|
|
|
|
let delta = Delta::from_bytes(&revision.delta_data)?;
|
|
|
|
let rev_id: RevId = revision.rev_id.into();
|
|
|
|
let (server_prime, client_prime) = self.document.read().await.delta().transform(&delta)?;
|
|
|
|
let transform_delta = TransformDeltas {
|
|
|
|
client_prime,
|
|
|
|
server_prime,
|
|
|
|
server_rev_id: rev_id,
|
|
|
|
};
|
|
|
|
let _ = ret.send(Ok(transform_delta));
|
|
|
|
},
|
2021-10-05 11:46:56 +08:00
|
|
|
DocumentMsg::Insert { index, data, ret } => {
|
2021-10-01 20:58:13 +08:00
|
|
|
let delta = self.document.write().await.insert(index, data);
|
2021-10-01 19:39:08 +08:00
|
|
|
let _ = ret.send(delta);
|
|
|
|
},
|
2021-10-05 11:46:56 +08:00
|
|
|
DocumentMsg::Delete { interval, ret } => {
|
2021-10-01 20:58:13 +08:00
|
|
|
let result = self.document.write().await.delete(interval);
|
2021-10-01 19:39:08 +08:00
|
|
|
let _ = ret.send(result);
|
|
|
|
},
|
2021-10-05 11:46:56 +08:00
|
|
|
DocumentMsg::Format {
|
2021-10-01 19:39:08 +08:00
|
|
|
interval,
|
|
|
|
attribute,
|
|
|
|
ret,
|
|
|
|
} => {
|
2021-10-01 20:58:13 +08:00
|
|
|
let result = self.document.write().await.format(interval, attribute);
|
2021-10-01 19:39:08 +08:00
|
|
|
let _ = ret.send(result);
|
|
|
|
},
|
2021-10-05 11:46:56 +08:00
|
|
|
DocumentMsg::Replace { interval, data, ret } => {
|
2021-10-01 20:58:13 +08:00
|
|
|
let result = self.document.write().await.replace(interval, data);
|
2021-10-01 19:39:08 +08:00
|
|
|
let _ = ret.send(result);
|
|
|
|
},
|
2021-10-05 11:46:56 +08:00
|
|
|
DocumentMsg::CanUndo { ret } => {
|
2021-10-01 20:58:13 +08:00
|
|
|
let _ = ret.send(self.document.read().await.can_undo());
|
2021-10-01 19:39:08 +08:00
|
|
|
},
|
2021-10-05 11:46:56 +08:00
|
|
|
DocumentMsg::CanRedo { ret } => {
|
2021-10-01 20:58:13 +08:00
|
|
|
let _ = ret.send(self.document.read().await.can_redo());
|
2021-10-01 19:39:08 +08:00
|
|
|
},
|
2021-10-05 11:46:56 +08:00
|
|
|
DocumentMsg::Undo { ret } => {
|
2021-10-01 20:58:13 +08:00
|
|
|
let result = self.document.write().await.undo();
|
2021-10-01 19:39:08 +08:00
|
|
|
let _ = ret.send(result);
|
|
|
|
},
|
2021-10-05 11:46:56 +08:00
|
|
|
DocumentMsg::Redo { ret } => {
|
2021-10-01 20:58:13 +08:00
|
|
|
let result = self.document.write().await.redo();
|
2021-10-01 19:39:08 +08:00
|
|
|
let _ = ret.send(result);
|
|
|
|
},
|
2021-10-05 11:46:56 +08:00
|
|
|
DocumentMsg::Doc { ret } => {
|
2021-10-01 20:58:13 +08:00
|
|
|
let data = self.document.read().await.to_json();
|
2021-10-01 19:39:08 +08:00
|
|
|
let _ = ret.send(Ok(data));
|
|
|
|
},
|
2021-10-05 17:54:11 +08:00
|
|
|
DocumentMsg::SaveDocument { rev_id: _, ret } => {
|
2021-10-05 14:37:45 +08:00
|
|
|
// let result = self.save_to_disk(rev_id).await;
|
|
|
|
let _ = ret.send(Ok(()));
|
2021-10-01 19:39:08 +08:00
|
|
|
},
|
|
|
|
}
|
2021-10-01 20:58:13 +08:00
|
|
|
Ok(())
|
2021-10-01 19:39:08 +08:00
|
|
|
}
|
|
|
|
|
2021-11-04 12:47:41 +08:00
|
|
|
#[tracing::instrument(level = "debug", skip(self, delta), fields(compose_result), err)]
|
|
|
|
async fn composed_delta(&self, delta: Delta) -> DocResult<()> {
|
2021-11-03 15:37:38 +08:00
|
|
|
// tracing::debug!("{:?} thread handle_message", thread::current(),);
|
2021-10-27 23:56:41 +08:00
|
|
|
let mut document = self.document.write().await;
|
|
|
|
let result = document.compose_delta(&delta);
|
2021-11-04 12:47:41 +08:00
|
|
|
tracing::Span::current().record(
|
2021-11-11 10:55:13 +08:00
|
|
|
"composed_delta",
|
2021-11-04 12:47:41 +08:00
|
|
|
&format!("doc_id:{} - {}", &self.doc_id, delta.to_json()).as_str(),
|
|
|
|
);
|
2021-10-27 23:56:41 +08:00
|
|
|
drop(document);
|
|
|
|
|
2021-10-04 17:38:56 +08:00
|
|
|
result
|
|
|
|
}
|
2021-10-01 19:39:08 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
// #[tracing::instrument(level = "debug", skip(self, params), err)]
|
|
|
|
// fn update_doc_on_server(&self, params: UpdateDocParams) -> Result<(),
|
|
|
|
// DocError> { let token = self.user.token()?;
|
|
|
|
// let server = self.server.clone();
|
|
|
|
// tokio::spawn(async move {
|
|
|
|
// match server.update_doc(&token, params).await {
|
|
|
|
// Ok(_) => {},
|
|
|
|
// Err(e) => {
|
|
|
|
// // TODO: retry?
|
|
|
|
// log::error!("Update doc failed: {}", e);
|
|
|
|
// },
|
|
|
|
// }
|
|
|
|
// });
|
|
|
|
// Ok(())
|
|
|
|
// }
|