282 lines
9.9 KiB
Rust
Raw Normal View History

2022-02-18 23:04:55 +08:00
use crate::web_socket::EditorCommandReceiver;
2022-03-10 17:14:10 +08:00
use crate::TextBlockUser;
2021-12-16 22:24:05 +08:00
use async_stream::stream;
2022-03-11 21:36:00 +08:00
use bytes::Bytes;
2022-03-19 16:52:28 +08:00
use flowy_error::{FlowyError, FlowyResult};
use flowy_revision::{DeltaMD5, RevisionCompactor, RevisionManager, RichTextTransformDeltas, TransformDeltas};
use flowy_sync::util::make_delta_from_revisions;
use flowy_sync::{
2022-01-21 21:41:24 +08:00
client_document::{history::UndoResult, ClientDocument},
entities::revision::{RevId, Revision},
2021-12-16 22:24:05 +08:00
errors::CollaborateError,
};
use futures::stream::StreamExt;
use lib_ot::{
2022-08-01 09:31:40 +08:00
core::{Interval, OperationTransform},
2022-01-20 23:51:11 +08:00
rich_text::{RichTextAttribute, RichTextAttributes, RichTextDelta},
2021-12-16 22:24:05 +08:00
};
use std::sync::Arc;
2022-01-12 12:40:41 +08:00
use tokio::sync::{oneshot, RwLock};
2021-12-16 22:24:05 +08:00
2022-01-12 12:40:41 +08:00
// The EditorCommandQueue executes each command that will alter the document in
// serial.
2022-02-26 11:03:42 +08:00
pub(crate) struct EditBlockQueue {
2022-01-12 17:08:50 +08:00
document: Arc<RwLock<ClientDocument>>,
2022-03-10 17:14:10 +08:00
user: Arc<dyn TextBlockUser>,
2022-01-14 15:23:21 +08:00
rev_manager: Arc<RevisionManager>,
2022-01-12 12:40:41 +08:00
receiver: Option<EditorCommandReceiver>,
2021-12-16 22:24:05 +08:00
}
2022-02-26 11:03:42 +08:00
impl EditBlockQueue {
2022-01-05 23:15:55 +08:00
pub(crate) fn new(
2022-03-10 17:14:10 +08:00
user: Arc<dyn TextBlockUser>,
2022-01-14 15:23:21 +08:00
rev_manager: Arc<RevisionManager>,
2022-01-05 23:15:55 +08:00
delta: RichTextDelta,
2022-01-12 12:40:41 +08:00
receiver: EditorCommandReceiver,
2022-01-05 23:15:55 +08:00
) -> Self {
2022-01-12 17:08:50 +08:00
let document = Arc::new(RwLock::new(ClientDocument::from_delta(delta)));
2021-12-16 22:24:05 +08:00
Self {
document,
2022-01-05 23:15:55 +08:00
user,
rev_manager,
2021-12-16 22:24:05 +08:00
receiver: Some(receiver),
}
}
pub(crate) async fn run(mut self) {
let mut receiver = self.receiver.take().expect("Should only call once");
let stream = stream! {
loop {
match receiver.recv().await {
Some(msg) => yield msg,
None => break,
}
}
};
stream
2022-01-05 23:15:55 +08:00
.for_each(|command| async {
match self.handle_command(command).await {
2022-01-24 17:35:58 +08:00
Ok(_) => {}
2021-12-16 22:24:05 +08:00
Err(e) => tracing::debug!("[EditCommandQueue]: {}", e),
}
})
.await;
}
2022-01-09 15:13:45 +08:00
#[tracing::instrument(level = "trace", skip(self), err)]
2022-01-05 23:15:55 +08:00
async fn handle_command(&self, command: EditorCommand) -> Result<(), FlowyError> {
match command {
EditorCommand::ComposeLocalDelta { delta, ret } => {
let mut document = self.document.write().await;
let _ = document.compose_delta(delta.clone())?;
let md5 = document.md5();
drop(document);
let _ = self.save_local_delta(delta, md5).await?;
let _ = ret.send(Ok(()));
2022-01-24 17:35:58 +08:00
}
2022-01-20 23:51:11 +08:00
EditorCommand::ComposeRemoteDelta { client_delta, ret } => {
2022-01-05 23:15:55 +08:00
let mut document = self.document.write().await;
let _ = document.compose_delta(client_delta.clone())?;
let md5 = document.md5();
2022-01-20 23:51:11 +08:00
drop(document);
let _ = ret.send(Ok(md5));
2022-01-24 17:35:58 +08:00
}
2022-01-20 23:51:11 +08:00
EditorCommand::ResetDelta { delta, ret } => {
2022-01-05 23:15:55 +08:00
let mut document = self.document.write().await;
let _ = document.set_delta(delta);
let md5 = document.md5();
drop(document);
2022-01-20 23:51:11 +08:00
let _ = ret.send(Ok(md5));
2022-01-24 17:35:58 +08:00
}
2022-01-20 23:51:11 +08:00
EditorCommand::TransformDelta { delta, ret } => {
2021-12-16 22:24:05 +08:00
let f = || async {
let read_guard = self.document.read().await;
2022-01-02 10:34:42 +08:00
let mut server_prime: Option<RichTextDelta> = None;
let client_prime: RichTextDelta;
2022-01-21 21:41:24 +08:00
if read_guard.is_empty() {
2022-01-02 10:34:42 +08:00
// Do nothing
2022-01-20 23:51:11 +08:00
client_prime = delta;
2022-01-02 10:34:42 +08:00
} else {
2022-01-20 23:51:11 +08:00
let (s_prime, c_prime) = read_guard.delta().transform(&delta)?;
2022-01-02 10:34:42 +08:00
client_prime = c_prime;
server_prime = Some(s_prime);
}
drop(read_guard);
2022-02-28 22:38:53 +08:00
Ok::<RichTextTransformDeltas, CollaborateError>(TransformDeltas {
2021-12-16 22:24:05 +08:00
client_prime,
server_prime,
2022-01-02 10:34:42 +08:00
})
2021-12-16 22:24:05 +08:00
};
let _ = ret.send(f().await);
2022-01-24 17:35:58 +08:00
}
2021-12-18 18:35:45 +08:00
EditorCommand::Insert { index, data, ret } => {
2021-12-16 22:24:05 +08:00
let mut write_guard = self.document.write().await;
let delta = write_guard.insert(index, data)?;
let md5 = write_guard.md5();
2022-01-05 23:15:55 +08:00
let _ = self.save_local_delta(delta, md5).await?;
let _ = ret.send(Ok(()));
2022-01-24 17:35:58 +08:00
}
2021-12-18 18:35:45 +08:00
EditorCommand::Delete { interval, ret } => {
2021-12-16 22:24:05 +08:00
let mut write_guard = self.document.write().await;
let delta = write_guard.delete(interval)?;
let md5 = write_guard.md5();
2022-01-05 23:15:55 +08:00
let _ = self.save_local_delta(delta, md5).await?;
let _ = ret.send(Ok(()));
2022-01-24 17:35:58 +08:00
}
2021-12-18 18:35:45 +08:00
EditorCommand::Format {
2021-12-16 22:24:05 +08:00
interval,
attribute,
ret,
} => {
let mut write_guard = self.document.write().await;
let delta = write_guard.format(interval, attribute)?;
let md5 = write_guard.md5();
2022-01-05 23:15:55 +08:00
let _ = self.save_local_delta(delta, md5).await?;
let _ = ret.send(Ok(()));
2022-01-24 17:35:58 +08:00
}
2021-12-18 18:35:45 +08:00
EditorCommand::Replace { interval, data, ret } => {
2021-12-16 22:24:05 +08:00
let mut write_guard = self.document.write().await;
let delta = write_guard.replace(interval, data)?;
let md5 = write_guard.md5();
2022-01-05 23:15:55 +08:00
let _ = self.save_local_delta(delta, md5).await?;
let _ = ret.send(Ok(()));
2022-01-24 17:35:58 +08:00
}
2021-12-18 18:35:45 +08:00
EditorCommand::CanUndo { ret } => {
2021-12-16 22:24:05 +08:00
let _ = ret.send(self.document.read().await.can_undo());
2022-01-24 17:35:58 +08:00
}
2021-12-18 18:35:45 +08:00
EditorCommand::CanRedo { ret } => {
2021-12-16 22:24:05 +08:00
let _ = ret.send(self.document.read().await.can_redo());
2022-01-24 17:35:58 +08:00
}
2021-12-18 18:35:45 +08:00
EditorCommand::Undo { ret } => {
2022-01-05 23:15:55 +08:00
let mut write_guard = self.document.write().await;
let UndoResult { delta } = write_guard.undo()?;
let md5 = write_guard.md5();
let _ = self.save_local_delta(delta, md5).await?;
let _ = ret.send(Ok(()));
2022-01-24 17:35:58 +08:00
}
2021-12-18 18:35:45 +08:00
EditorCommand::Redo { ret } => {
2022-01-05 23:15:55 +08:00
let mut write_guard = self.document.write().await;
let UndoResult { delta } = write_guard.redo()?;
let md5 = write_guard.md5();
let _ = self.save_local_delta(delta, md5).await?;
let _ = ret.send(Ok(()));
2022-01-24 17:35:58 +08:00
}
2022-03-05 22:30:42 +08:00
EditorCommand::ReadDeltaStr { ret } => {
let data = self.document.read().await.delta_str();
2021-12-16 22:24:05 +08:00
let _ = ret.send(Ok(data));
2022-01-24 17:35:58 +08:00
}
2022-03-12 21:06:15 +08:00
EditorCommand::ReadDelta { ret } => {
2021-12-16 22:24:05 +08:00
let delta = self.document.read().await.delta().clone();
let _ = ret.send(Ok(delta));
2022-01-24 17:35:58 +08:00
}
2021-12-16 22:24:05 +08:00
}
Ok(())
}
2022-01-05 23:15:55 +08:00
async fn save_local_delta(&self, delta: RichTextDelta, md5: String) -> Result<RevId, FlowyError> {
2022-08-02 09:11:04 +08:00
let delta_data = delta.json_bytes();
2022-01-05 23:15:55 +08:00
let (base_rev_id, rev_id) = self.rev_manager.next_rev_id_pair();
let user_id = self.user.user_id()?;
2022-01-14 15:23:21 +08:00
let revision = Revision::new(
&self.rev_manager.object_id,
base_rev_id,
rev_id,
delta_data,
&user_id,
md5,
);
let _ = self.rev_manager.add_local_revision(&revision).await?;
2022-01-05 23:15:55 +08:00
Ok(rev_id.into())
}
}
2022-03-11 21:36:00 +08:00
pub(crate) struct TextBlockRevisionCompactor();
impl RevisionCompactor for TextBlockRevisionCompactor {
fn bytes_from_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
2022-01-26 23:29:18 +08:00
let delta = make_delta_from_revisions::<RichTextAttributes>(revisions)?;
2022-08-02 09:11:04 +08:00
Ok(delta.json_bytes())
2022-01-25 20:37:48 +08:00
}
}
2021-12-16 22:24:05 +08:00
pub(crate) type Ret<T> = oneshot::Sender<Result<T, CollaborateError>>;
2021-12-18 18:35:45 +08:00
pub(crate) enum EditorCommand {
2022-01-05 23:15:55 +08:00
ComposeLocalDelta {
2021-12-16 22:24:05 +08:00
delta: RichTextDelta,
2022-01-05 23:15:55 +08:00
ret: Ret<()>,
},
ComposeRemoteDelta {
client_delta: RichTextDelta,
2022-01-20 23:51:11 +08:00
ret: Ret<DeltaMD5>,
2021-12-16 22:24:05 +08:00
},
2022-01-20 23:51:11 +08:00
ResetDelta {
2022-01-02 10:34:42 +08:00
delta: RichTextDelta,
2022-01-20 23:51:11 +08:00
ret: Ret<DeltaMD5>,
2022-01-02 10:34:42 +08:00
},
2022-01-20 23:51:11 +08:00
TransformDelta {
delta: RichTextDelta,
2022-02-28 22:38:53 +08:00
ret: Ret<RichTextTransformDeltas>,
2021-12-16 22:24:05 +08:00
},
Insert {
index: usize,
data: String,
2022-01-05 23:15:55 +08:00
ret: Ret<()>,
2021-12-16 22:24:05 +08:00
},
Delete {
interval: Interval,
2022-01-05 23:15:55 +08:00
ret: Ret<()>,
2021-12-16 22:24:05 +08:00
},
Format {
interval: Interval,
attribute: RichTextAttribute,
2022-01-05 23:15:55 +08:00
ret: Ret<()>,
2021-12-16 22:24:05 +08:00
},
Replace {
interval: Interval,
data: String,
2022-01-05 23:15:55 +08:00
ret: Ret<()>,
2021-12-16 22:24:05 +08:00
},
CanUndo {
ret: oneshot::Sender<bool>,
},
CanRedo {
ret: oneshot::Sender<bool>,
},
Undo {
2022-01-05 23:15:55 +08:00
ret: Ret<()>,
2021-12-16 22:24:05 +08:00
},
Redo {
2022-01-05 23:15:55 +08:00
ret: Ret<()>,
2021-12-16 22:24:05 +08:00
},
2022-03-05 22:30:42 +08:00
ReadDeltaStr {
2021-12-16 22:24:05 +08:00
ret: Ret<String>,
},
2022-01-09 15:13:45 +08:00
#[allow(dead_code)]
2022-03-12 21:06:15 +08:00
ReadDelta {
2021-12-16 22:24:05 +08:00
ret: Ret<RichTextDelta>,
},
}
2022-01-05 23:15:55 +08:00
impl std::fmt::Debug for EditorCommand {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let s = match self {
EditorCommand::ComposeLocalDelta { .. } => "ComposeLocalDelta",
EditorCommand::ComposeRemoteDelta { .. } => "ComposeRemoteDelta",
2022-01-20 23:51:11 +08:00
EditorCommand::ResetDelta { .. } => "ResetDelta",
EditorCommand::TransformDelta { .. } => "TransformDelta",
2022-01-05 23:15:55 +08:00
EditorCommand::Insert { .. } => "Insert",
EditorCommand::Delete { .. } => "Delete",
EditorCommand::Format { .. } => "Format",
EditorCommand::Replace { .. } => "Replace",
EditorCommand::CanUndo { .. } => "CanUndo",
EditorCommand::CanRedo { .. } => "CanRedo",
EditorCommand::Undo { .. } => "Undo",
EditorCommand::Redo { .. } => "Redo",
2022-03-05 22:30:42 +08:00
EditorCommand::ReadDeltaStr { .. } => "ReadDeltaStr",
2022-03-12 21:06:15 +08:00
EditorCommand::ReadDelta { .. } => "ReadDocumentAsDelta",
2022-01-05 23:15:55 +08:00
};
f.write_str(s)
}
}