164 lines
5.9 KiB
Rust
Raw Normal View History

2022-03-12 22:52:24 +08:00
use crate::{queue::EditorCommand, TEXT_BLOCK_SYNC_INTERVAL_IN_MILLIS};
2022-01-11 22:23:19 +08:00
use bytes::Bytes;
2022-03-19 16:52:28 +08:00
use flowy_error::{internal_error, FlowyError};
use flowy_revision::*;
use flowy_sync::{
2022-01-11 22:23:19 +08:00
entities::{
2022-01-20 23:51:11 +08:00
revision::RevisionRange,
2022-02-25 22:27:44 +08:00
ws_data::{ClientRevisionWSData, NewDocumentUser, ServerRevisionWSDataType},
2022-01-11 22:23:19 +08:00
},
errors::CollaborateResult,
};
2022-01-20 23:51:11 +08:00
use lib_infra::future::{BoxResultFuture, FutureResult};
2022-02-28 22:38:53 +08:00
use lib_ot::rich_text::RichTextAttributes;
use lib_ot::rich_text::RichTextDelta;
2022-01-11 22:23:19 +08:00
use lib_ws::WSConnectState;
2022-01-20 23:51:11 +08:00
use std::{sync::Arc, time::Duration};
2022-01-12 12:40:41 +08:00
use tokio::sync::{
broadcast,
mpsc::{Receiver, Sender},
oneshot,
};
pub(crate) type EditorCommandSender = Sender<EditorCommand>;
pub(crate) type EditorCommandReceiver = Receiver<EditorCommand>;
2022-01-11 22:23:19 +08:00
2022-04-14 21:57:00 +08:00
#[allow(dead_code)]
2022-02-26 11:03:42 +08:00
pub(crate) async fn make_block_ws_manager(
2022-01-11 22:23:19 +08:00
doc_id: String,
user_id: String,
2022-01-12 12:40:41 +08:00
edit_cmd_tx: EditorCommandSender,
2022-01-14 15:23:21 +08:00
rev_manager: Arc<RevisionManager>,
2022-02-18 23:04:55 +08:00
rev_web_socket: Arc<dyn RevisionWebSocket>,
2022-01-14 15:23:21 +08:00
) -> Arc<RevisionWebSocketManager> {
2022-02-25 22:27:44 +08:00
let ws_data_provider = Arc::new(WSDataProvider::new(&doc_id, Arc::new(rev_manager.clone())));
2022-03-10 17:14:10 +08:00
let resolver = Arc::new(TextBlockConflictResolver { edit_cmd_tx });
2022-02-28 22:38:53 +08:00
let conflict_controller =
RichTextConflictController::new(&user_id, resolver, Arc::new(ws_data_provider.clone()), rev_manager);
2022-03-10 17:14:10 +08:00
let ws_data_stream = Arc::new(TextBlockRevisionWSDataStream::new(conflict_controller));
let ws_data_sink = Arc::new(TextBlockWSDataSink(ws_data_provider));
2022-03-12 22:52:24 +08:00
let ping_duration = Duration::from_millis(TEXT_BLOCK_SYNC_INTERVAL_IN_MILLIS);
2022-01-14 15:23:21 +08:00
let ws_manager = Arc::new(RevisionWebSocketManager::new(
2022-02-26 11:03:42 +08:00
"Block",
2022-01-11 22:23:19 +08:00
&doc_id,
2022-02-18 23:04:55 +08:00
rev_web_socket,
2022-02-25 22:27:44 +08:00
ws_data_sink,
ws_data_stream,
2022-01-14 15:23:21 +08:00
ping_duration,
2022-01-11 22:23:19 +08:00
));
2022-01-21 21:41:24 +08:00
listen_document_ws_state(&user_id, &doc_id, ws_manager.scribe_state());
2022-01-11 22:23:19 +08:00
ws_manager
}
2022-04-14 21:57:00 +08:00
#[allow(dead_code)]
2022-01-21 21:41:24 +08:00
fn listen_document_ws_state(_user_id: &str, _doc_id: &str, mut subscriber: broadcast::Receiver<WSConnectState>) {
2022-01-11 22:23:19 +08:00
tokio::spawn(async move {
while let Ok(state) = subscriber.recv().await {
match state {
2022-01-24 17:35:58 +08:00
WSConnectState::Init => {}
WSConnectState::Connecting => {}
WSConnectState::Connected => {}
WSConnectState::Disconnected => {}
2022-01-11 22:23:19 +08:00
}
}
});
}
2022-03-10 17:14:10 +08:00
pub(crate) struct TextBlockRevisionWSDataStream {
2022-02-28 22:38:53 +08:00
conflict_controller: Arc<RichTextConflictController>,
2022-02-25 22:27:44 +08:00
}
2022-03-10 17:14:10 +08:00
impl TextBlockRevisionWSDataStream {
2022-04-14 21:57:00 +08:00
#[allow(dead_code)]
2022-02-28 22:38:53 +08:00
pub fn new(conflict_controller: RichTextConflictController) -> Self {
2022-02-25 22:27:44 +08:00
Self {
conflict_controller: Arc::new(conflict_controller),
}
}
2022-01-11 22:23:19 +08:00
}
2022-03-10 17:14:10 +08:00
impl RevisionWSDataStream for TextBlockRevisionWSDataStream {
2022-01-21 21:41:24 +08:00
fn receive_push_revision(&self, bytes: Bytes) -> BoxResultFuture<(), FlowyError> {
2022-02-25 22:27:44 +08:00
let resolver = self.conflict_controller.clone();
2022-01-21 21:41:24 +08:00
Box::pin(async move { resolver.receive_bytes(bytes).await })
2022-01-11 22:23:19 +08:00
}
2022-01-21 21:41:24 +08:00
fn receive_ack(&self, id: String, ty: ServerRevisionWSDataType) -> BoxResultFuture<(), FlowyError> {
2022-02-25 22:27:44 +08:00
let resolver = self.conflict_controller.clone();
2022-01-21 21:41:24 +08:00
Box::pin(async move { resolver.ack_revision(id, ty).await })
2022-01-11 22:23:19 +08:00
}
2022-01-21 21:41:24 +08:00
fn receive_new_user_connect(&self, _new_user: NewDocumentUser) -> BoxResultFuture<(), FlowyError> {
2022-01-12 12:40:41 +08:00
// Do nothing by now, just a placeholder for future extension.
2022-01-21 21:41:24 +08:00
Box::pin(async move { Ok(()) })
2022-01-11 22:23:19 +08:00
}
2022-01-21 21:41:24 +08:00
fn pull_revisions_in_range(&self, range: RevisionRange) -> BoxResultFuture<(), FlowyError> {
2022-02-25 22:27:44 +08:00
let resolver = self.conflict_controller.clone();
2022-01-21 21:41:24 +08:00
Box::pin(async move { resolver.send_revisions(range).await })
2022-01-11 22:23:19 +08:00
}
}
2022-03-10 17:14:10 +08:00
pub(crate) struct TextBlockWSDataSink(pub(crate) Arc<WSDataProvider>);
impl RevisionWebSocketSink for TextBlockWSDataSink {
2022-01-14 15:23:21 +08:00
fn next(&self) -> FutureResult<Option<ClientRevisionWSData>, FlowyError> {
2022-01-20 23:51:11 +08:00
let sink_provider = self.0.clone();
FutureResult::new(async move { sink_provider.next().await })
2022-01-11 22:23:19 +08:00
}
}
2022-03-10 17:14:10 +08:00
struct TextBlockConflictResolver {
2022-01-20 23:51:11 +08:00
edit_cmd_tx: EditorCommandSender,
2022-01-11 22:23:19 +08:00
}
2022-03-10 17:14:10 +08:00
impl ConflictResolver<RichTextAttributes> for TextBlockConflictResolver {
2022-02-28 22:38:53 +08:00
fn compose_delta(&self, delta: RichTextDelta) -> BoxResultFuture<DeltaMD5, FlowyError> {
2022-01-20 23:51:11 +08:00
let tx = self.edit_cmd_tx.clone();
Box::pin(async move {
let (ret, rx) = oneshot::channel();
tx.send(EditorCommand::ComposeRemoteDelta {
client_delta: delta,
ret,
})
.await
.map_err(internal_error)?;
let md5 = rx.await.map_err(|e| {
FlowyError::internal().context(format!("handle EditorCommand::ComposeRemoteDelta failed: {}", e))
})??;
Ok(md5)
})
2022-01-11 22:23:19 +08:00
}
2022-01-20 23:51:11 +08:00
fn transform_delta(
&self,
2022-02-28 22:38:53 +08:00
delta: RichTextDelta,
2022-03-19 16:52:28 +08:00
) -> BoxResultFuture<flowy_revision::RichTextTransformDeltas, FlowyError> {
2022-01-20 23:51:11 +08:00
let tx = self.edit_cmd_tx.clone();
Box::pin(async move {
2022-02-28 22:38:53 +08:00
let (ret, rx) = oneshot::channel::<CollaborateResult<RichTextTransformDeltas>>();
2022-01-20 23:51:11 +08:00
tx.send(EditorCommand::TransformDelta { delta, ret })
.await
.map_err(internal_error)?;
let transform_delta = rx
.await
.map_err(|e| FlowyError::internal().context(format!("TransformDelta failed: {}", e)))??;
Ok(transform_delta)
})
2022-01-11 22:23:19 +08:00
}
2022-02-28 22:38:53 +08:00
fn reset_delta(&self, delta: RichTextDelta) -> BoxResultFuture<DeltaMD5, FlowyError> {
2022-01-20 23:51:11 +08:00
let tx = self.edit_cmd_tx.clone();
Box::pin(async move {
2022-01-11 22:23:19 +08:00
let (ret, rx) = oneshot::channel();
2022-01-20 23:51:11 +08:00
let _ = tx
.send(EditorCommand::ResetDelta { delta, ret })
2022-01-15 23:58:36 +08:00
.await
.map_err(internal_error)?;
2022-01-20 23:51:11 +08:00
let md5 = rx.await.map_err(|e| {
FlowyError::internal().context(format!("handle EditorCommand::OverrideDelta failed: {}", e))
2022-01-15 23:58:36 +08:00
})??;
2022-01-20 23:51:11 +08:00
Ok(md5)
})
2022-01-11 22:23:19 +08:00
}
}