187 lines
5.8 KiB
Rust
Raw Normal View History

use async_stream::stream;
2021-11-13 11:11:24 +08:00
use bytes::Bytes;
2021-11-19 12:18:46 +08:00
use flowy_document_infra::{
core::{history::UndoResult, Document},
errors::DocumentError,
};
use futures::stream::StreamExt;
2021-12-07 19:59:08 +08:00
use lib_ot::{
core::{Interval, OperationTransformable},
2021-12-07 22:32:34 +08:00
revision::{RevId, Revision},
2021-12-07 19:59:08 +08:00
rich_text::{RichTextAttribute, RichTextDelta},
};
use std::{convert::TryFrom, sync::Arc};
2021-11-13 11:11:24 +08:00
use tokio::sync::{mpsc, oneshot, RwLock};
2021-12-03 22:40:56 +08:00
pub(crate) struct EditCommandQueue {
doc_id: String,
2021-10-01 20:58:13 +08:00
document: Arc<RwLock<Document>>,
2021-12-03 22:40:56 +08:00
receiver: Option<mpsc::UnboundedReceiver<EditCommand>>,
}
2021-12-03 22:40:56 +08:00
impl EditCommandQueue {
2021-12-07 10:39:01 +08:00
pub(crate) fn new(doc_id: &str, delta: RichTextDelta, receiver: mpsc::UnboundedReceiver<EditCommand>) -> Self {
2021-10-01 20:58:13 +08:00
let document = Arc::new(RwLock::new(Document::from_delta(delta)));
Self {
doc_id: doc_id.to_owned(),
document,
receiver: Some(receiver),
}
}
2021-12-03 22:40:56 +08:00
pub(crate) async fn run(mut self) {
let mut receiver = self.receiver.take().expect("Should only call once");
let stream = stream! {
loop {
match receiver.recv().await {
Some(msg) => yield msg,
None => break,
}
}
};
2021-10-01 20:58:13 +08:00
stream
.for_each(|msg| async {
2021-12-09 21:39:53 +08:00
self.handle_message(msg).await;
2021-10-01 20:58:13 +08:00
})
.await;
}
2021-12-09 21:39:53 +08:00
async fn handle_message(&self, msg: EditCommand) {
match msg {
2021-12-03 22:40:56 +08:00
EditCommand::ComposeDelta { delta, ret } => {
let result = self.composed_delta(delta).await;
let _ = ret.send(result);
},
2021-12-08 17:33:22 +08:00
EditCommand::ProcessRemoteRevision { bytes, ret } => {
2021-12-09 21:39:53 +08:00
let f = || async {
let revision = Revision::try_from(bytes)?;
let delta = RichTextDelta::from_bytes(&revision.delta_data)?;
let rev_id: RevId = revision.rev_id.into();
let (server_prime, client_prime) = self.document.read().await.delta().transform(&delta)?;
let transform_delta = TransformDeltas {
client_prime,
server_prime,
server_rev_id: rev_id,
};
Ok::<TransformDeltas, DocumentError>(transform_delta)
};
2021-12-09 21:39:53 +08:00
let _ = ret.send(f().await);
},
2021-12-03 22:40:56 +08:00
EditCommand::Insert { index, data, ret } => {
2021-10-01 20:58:13 +08:00
let delta = self.document.write().await.insert(index, data);
let _ = ret.send(delta);
},
2021-12-03 22:40:56 +08:00
EditCommand::Delete { interval, ret } => {
2021-10-01 20:58:13 +08:00
let result = self.document.write().await.delete(interval);
let _ = ret.send(result);
},
2021-12-03 22:40:56 +08:00
EditCommand::Format {
interval,
attribute,
ret,
} => {
2021-10-01 20:58:13 +08:00
let result = self.document.write().await.format(interval, attribute);
let _ = ret.send(result);
},
2021-12-03 22:40:56 +08:00
EditCommand::Replace { interval, data, ret } => {
2021-10-01 20:58:13 +08:00
let result = self.document.write().await.replace(interval, data);
let _ = ret.send(result);
},
2021-12-03 22:40:56 +08:00
EditCommand::CanUndo { ret } => {
2021-10-01 20:58:13 +08:00
let _ = ret.send(self.document.read().await.can_undo());
},
2021-12-03 22:40:56 +08:00
EditCommand::CanRedo { ret } => {
2021-10-01 20:58:13 +08:00
let _ = ret.send(self.document.read().await.can_redo());
},
2021-12-03 22:40:56 +08:00
EditCommand::Undo { ret } => {
2021-10-01 20:58:13 +08:00
let result = self.document.write().await.undo();
let _ = ret.send(result);
},
2021-12-03 22:40:56 +08:00
EditCommand::Redo { ret } => {
2021-10-01 20:58:13 +08:00
let result = self.document.write().await.redo();
let _ = ret.send(result);
},
2021-12-03 22:40:56 +08:00
EditCommand::ReadDoc { ret } => {
2021-10-01 20:58:13 +08:00
let data = self.document.read().await.to_json();
let _ = ret.send(Ok(data));
},
2021-12-09 11:00:05 +08:00
EditCommand::ReadDocDelta { ret } => {
let delta = self.document.read().await.delta().clone();
let _ = ret.send(Ok(delta));
},
}
}
#[tracing::instrument(level = "debug", skip(self, delta), fields(compose_result), err)]
2021-12-07 10:39:01 +08:00
async fn composed_delta(&self, delta: RichTextDelta) -> Result<(), DocumentError> {
2021-11-03 15:37:38 +08:00
// tracing::debug!("{:?} thread handle_message", thread::current(),);
let mut document = self.document.write().await;
tracing::Span::current().record(
"composed_delta",
&format!("doc_id:{} - {}", &self.doc_id, delta.to_json()).as_str(),
);
2021-11-12 21:44:26 +08:00
let result = document.compose_delta(delta);
drop(document);
result
}
}
2021-12-03 22:40:56 +08:00
pub(crate) type Ret<T> = oneshot::Sender<Result<T, DocumentError>>;
2021-12-10 11:05:23 +08:00
#[allow(dead_code)]
2021-12-03 22:40:56 +08:00
pub(crate) enum EditCommand {
ComposeDelta {
2021-12-07 10:39:01 +08:00
delta: RichTextDelta,
2021-11-13 11:11:24 +08:00
ret: Ret<()>,
},
2021-12-08 17:33:22 +08:00
ProcessRemoteRevision {
2021-11-13 11:11:24 +08:00
bytes: Bytes,
ret: Ret<TransformDeltas>,
},
Insert {
index: usize,
data: String,
2021-12-07 10:39:01 +08:00
ret: Ret<RichTextDelta>,
2021-11-13 11:11:24 +08:00
},
Delete {
interval: Interval,
2021-12-07 10:39:01 +08:00
ret: Ret<RichTextDelta>,
2021-11-13 11:11:24 +08:00
},
Format {
interval: Interval,
2021-12-07 10:39:01 +08:00
attribute: RichTextAttribute,
ret: Ret<RichTextDelta>,
2021-11-13 11:11:24 +08:00
},
Replace {
interval: Interval,
data: String,
2021-12-07 10:39:01 +08:00
ret: Ret<RichTextDelta>,
2021-11-13 11:11:24 +08:00
},
CanUndo {
ret: oneshot::Sender<bool>,
},
CanRedo {
ret: oneshot::Sender<bool>,
},
Undo {
ret: Ret<UndoResult>,
},
Redo {
ret: Ret<UndoResult>,
},
2021-12-03 22:40:56 +08:00
ReadDoc {
2021-11-13 11:11:24 +08:00
ret: Ret<String>,
},
2021-12-09 11:00:05 +08:00
ReadDocDelta {
ret: Ret<RichTextDelta>,
},
2021-11-13 11:11:24 +08:00
}
2021-12-03 22:40:56 +08:00
pub(crate) struct TransformDeltas {
2021-12-07 10:39:01 +08:00
pub client_prime: RichTextDelta,
pub server_prime: RichTextDelta,
2021-11-13 11:11:24 +08:00
pub server_rev_id: RevId,
}