2022-10-20 11:35:11 +08:00
|
|
|
use crate::old_editor::queue::{EditorCommand, EditorCommandSender, TextTransformOperations};
|
|
|
|
use crate::TEXT_BLOCK_SYNC_INTERVAL_IN_MILLIS;
|
2022-01-11 22:23:19 +08:00
|
|
|
use bytes::Bytes;
|
2022-11-01 18:59:53 +08:00
|
|
|
use flowy_database::ConnectionPool;
|
2022-10-13 23:29:37 +08:00
|
|
|
use flowy_error::{internal_error, FlowyError, FlowyResult};
|
2022-11-08 21:13:28 +08:00
|
|
|
use flowy_http_model::{
|
|
|
|
revision::{Revision, RevisionRange},
|
|
|
|
ws_data::{ClientRevisionWSData, NewDocumentUser, ServerRevisionWSDataType},
|
|
|
|
};
|
2022-03-19 16:52:28 +08:00
|
|
|
use flowy_revision::*;
|
2022-11-08 21:13:28 +08:00
|
|
|
use flowy_sync::errors::CollaborateResult;
|
2022-10-20 11:35:11 +08:00
|
|
|
use flowy_sync::util::make_operations_from_revisions;
|
2022-01-20 23:51:11 +08:00
|
|
|
use lib_infra::future::{BoxResultFuture, FutureResult};
|
2022-10-22 21:57:44 +08:00
|
|
|
use lib_ot::text_delta::DeltaTextOperations;
|
2022-01-11 22:23:19 +08:00
|
|
|
use lib_ws::WSConnectState;
|
2022-01-20 23:51:11 +08:00
|
|
|
use std::{sync::Arc, time::Duration};
|
2022-10-20 11:35:11 +08:00
|
|
|
use tokio::sync::{broadcast, oneshot};
|
2022-01-11 22:23:19 +08:00
|
|
|
|
2022-10-13 23:29:37 +08:00
|
|
|
#[derive(Clone)]
|
2022-10-22 21:57:44 +08:00
|
|
|
pub struct DeltaDocumentResolveOperations(pub DeltaTextOperations);
|
2022-10-13 23:29:37 +08:00
|
|
|
|
2022-10-22 21:57:44 +08:00
|
|
|
impl OperationsDeserializer<DeltaDocumentResolveOperations> for DeltaDocumentResolveOperations {
|
|
|
|
fn deserialize_revisions(revisions: Vec<Revision>) -> FlowyResult<DeltaDocumentResolveOperations> {
|
|
|
|
Ok(DeltaDocumentResolveOperations(make_operations_from_revisions(
|
|
|
|
revisions,
|
|
|
|
)?))
|
2022-10-13 23:29:37 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-10-22 21:57:44 +08:00
|
|
|
impl OperationsSerializer for DeltaDocumentResolveOperations {
|
2022-10-13 23:29:37 +08:00
|
|
|
fn serialize_operations(&self) -> Bytes {
|
|
|
|
self.0.json_bytes()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-10-22 21:57:44 +08:00
|
|
|
impl DeltaDocumentResolveOperations {
|
|
|
|
pub fn into_inner(self) -> DeltaTextOperations {
|
2022-10-13 23:29:37 +08:00
|
|
|
self.0
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-11-01 18:59:53 +08:00
|
|
|
pub type DocumentConflictController = ConflictController<DeltaDocumentResolveOperations, Arc<ConnectionPool>>;
|
2022-10-13 23:29:37 +08:00
|
|
|
|
2022-04-14 21:57:00 +08:00
|
|
|
#[allow(dead_code)]
|
2022-10-13 23:29:37 +08:00
|
|
|
pub(crate) async fn make_document_ws_manager(
|
2022-01-11 22:23:19 +08:00
|
|
|
doc_id: String,
|
|
|
|
user_id: String,
|
2022-01-12 12:40:41 +08:00
|
|
|
edit_cmd_tx: EditorCommandSender,
|
2022-11-01 18:59:53 +08:00
|
|
|
rev_manager: Arc<RevisionManager<Arc<ConnectionPool>>>,
|
2022-02-18 23:04:55 +08:00
|
|
|
rev_web_socket: Arc<dyn RevisionWebSocket>,
|
2022-01-14 15:23:21 +08:00
|
|
|
) -> Arc<RevisionWebSocketManager> {
|
2022-02-25 22:27:44 +08:00
|
|
|
let ws_data_provider = Arc::new(WSDataProvider::new(&doc_id, Arc::new(rev_manager.clone())));
|
2022-10-13 23:29:37 +08:00
|
|
|
let resolver = Arc::new(DocumentConflictResolver { edit_cmd_tx });
|
2022-02-28 22:38:53 +08:00
|
|
|
let conflict_controller =
|
2022-10-13 23:29:37 +08:00
|
|
|
DocumentConflictController::new(&user_id, resolver, Arc::new(ws_data_provider.clone()), rev_manager);
|
|
|
|
let ws_data_stream = Arc::new(DocumentRevisionWSDataStream::new(conflict_controller));
|
|
|
|
let ws_data_sink = Arc::new(DocumentWSDataSink(ws_data_provider));
|
2022-03-12 22:52:24 +08:00
|
|
|
let ping_duration = Duration::from_millis(TEXT_BLOCK_SYNC_INTERVAL_IN_MILLIS);
|
2022-01-14 15:23:21 +08:00
|
|
|
let ws_manager = Arc::new(RevisionWebSocketManager::new(
|
2022-02-26 11:03:42 +08:00
|
|
|
"Block",
|
2022-01-11 22:23:19 +08:00
|
|
|
&doc_id,
|
2022-02-18 23:04:55 +08:00
|
|
|
rev_web_socket,
|
2022-02-25 22:27:44 +08:00
|
|
|
ws_data_sink,
|
|
|
|
ws_data_stream,
|
2022-01-14 15:23:21 +08:00
|
|
|
ping_duration,
|
2022-01-11 22:23:19 +08:00
|
|
|
));
|
2022-01-21 21:41:24 +08:00
|
|
|
listen_document_ws_state(&user_id, &doc_id, ws_manager.scribe_state());
|
2022-01-11 22:23:19 +08:00
|
|
|
ws_manager
|
|
|
|
}
|
|
|
|
|
2022-04-14 21:57:00 +08:00
|
|
|
#[allow(dead_code)]
|
2022-01-21 21:41:24 +08:00
|
|
|
fn listen_document_ws_state(_user_id: &str, _doc_id: &str, mut subscriber: broadcast::Receiver<WSConnectState>) {
|
2022-01-11 22:23:19 +08:00
|
|
|
tokio::spawn(async move {
|
|
|
|
while let Ok(state) = subscriber.recv().await {
|
|
|
|
match state {
|
2022-01-24 17:35:58 +08:00
|
|
|
WSConnectState::Init => {}
|
|
|
|
WSConnectState::Connecting => {}
|
|
|
|
WSConnectState::Connected => {}
|
|
|
|
WSConnectState::Disconnected => {}
|
2022-01-11 22:23:19 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2022-10-13 23:29:37 +08:00
|
|
|
pub(crate) struct DocumentRevisionWSDataStream {
|
|
|
|
conflict_controller: Arc<DocumentConflictController>,
|
2022-02-25 22:27:44 +08:00
|
|
|
}
|
|
|
|
|
2022-10-13 23:29:37 +08:00
|
|
|
impl DocumentRevisionWSDataStream {
|
2022-04-14 21:57:00 +08:00
|
|
|
#[allow(dead_code)]
|
2022-10-13 23:29:37 +08:00
|
|
|
pub fn new(conflict_controller: DocumentConflictController) -> Self {
|
2022-02-25 22:27:44 +08:00
|
|
|
Self {
|
|
|
|
conflict_controller: Arc::new(conflict_controller),
|
|
|
|
}
|
|
|
|
}
|
2022-01-11 22:23:19 +08:00
|
|
|
}
|
|
|
|
|
2022-10-13 23:29:37 +08:00
|
|
|
impl RevisionWSDataStream for DocumentRevisionWSDataStream {
|
2022-01-21 21:41:24 +08:00
|
|
|
fn receive_push_revision(&self, bytes: Bytes) -> BoxResultFuture<(), FlowyError> {
|
2022-02-25 22:27:44 +08:00
|
|
|
let resolver = self.conflict_controller.clone();
|
2022-01-21 21:41:24 +08:00
|
|
|
Box::pin(async move { resolver.receive_bytes(bytes).await })
|
2022-01-11 22:23:19 +08:00
|
|
|
}
|
|
|
|
|
2022-01-21 21:41:24 +08:00
|
|
|
fn receive_ack(&self, id: String, ty: ServerRevisionWSDataType) -> BoxResultFuture<(), FlowyError> {
|
2022-02-25 22:27:44 +08:00
|
|
|
let resolver = self.conflict_controller.clone();
|
2022-01-21 21:41:24 +08:00
|
|
|
Box::pin(async move { resolver.ack_revision(id, ty).await })
|
2022-01-11 22:23:19 +08:00
|
|
|
}
|
|
|
|
|
2022-01-21 21:41:24 +08:00
|
|
|
fn receive_new_user_connect(&self, _new_user: NewDocumentUser) -> BoxResultFuture<(), FlowyError> {
|
2022-01-12 12:40:41 +08:00
|
|
|
// Do nothing by now, just a placeholder for future extension.
|
2022-01-21 21:41:24 +08:00
|
|
|
Box::pin(async move { Ok(()) })
|
2022-01-11 22:23:19 +08:00
|
|
|
}
|
|
|
|
|
2022-01-21 21:41:24 +08:00
|
|
|
fn pull_revisions_in_range(&self, range: RevisionRange) -> BoxResultFuture<(), FlowyError> {
|
2022-02-25 22:27:44 +08:00
|
|
|
let resolver = self.conflict_controller.clone();
|
2022-01-21 21:41:24 +08:00
|
|
|
Box::pin(async move { resolver.send_revisions(range).await })
|
2022-01-11 22:23:19 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-10-13 23:29:37 +08:00
|
|
|
pub(crate) struct DocumentWSDataSink(pub(crate) Arc<WSDataProvider>);
|
|
|
|
impl RevisionWebSocketSink for DocumentWSDataSink {
|
2022-01-14 15:23:21 +08:00
|
|
|
fn next(&self) -> FutureResult<Option<ClientRevisionWSData>, FlowyError> {
|
2022-01-20 23:51:11 +08:00
|
|
|
let sink_provider = self.0.clone();
|
|
|
|
FutureResult::new(async move { sink_provider.next().await })
|
2022-01-11 22:23:19 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-10-13 23:29:37 +08:00
|
|
|
struct DocumentConflictResolver {
|
2022-01-20 23:51:11 +08:00
|
|
|
edit_cmd_tx: EditorCommandSender,
|
2022-01-11 22:23:19 +08:00
|
|
|
}
|
|
|
|
|
2022-10-22 21:57:44 +08:00
|
|
|
impl ConflictResolver<DeltaDocumentResolveOperations> for DocumentConflictResolver {
|
|
|
|
fn compose_operations(
|
|
|
|
&self,
|
|
|
|
operations: DeltaDocumentResolveOperations,
|
2022-11-02 10:21:10 +08:00
|
|
|
) -> BoxResultFuture<RevisionMD5, FlowyError> {
|
2022-01-20 23:51:11 +08:00
|
|
|
let tx = self.edit_cmd_tx.clone();
|
2022-10-13 23:29:37 +08:00
|
|
|
let operations = operations.into_inner();
|
2022-01-20 23:51:11 +08:00
|
|
|
Box::pin(async move {
|
|
|
|
let (ret, rx) = oneshot::channel();
|
2022-10-13 23:29:37 +08:00
|
|
|
tx.send(EditorCommand::ComposeRemoteOperation {
|
|
|
|
client_operations: operations,
|
2022-01-20 23:51:11 +08:00
|
|
|
ret,
|
|
|
|
})
|
|
|
|
.await
|
|
|
|
.map_err(internal_error)?;
|
2022-10-13 23:29:37 +08:00
|
|
|
let md5 = rx
|
|
|
|
.await
|
|
|
|
.map_err(|e| FlowyError::internal().context(format!("Compose operations failed: {}", e)))??;
|
2022-01-20 23:51:11 +08:00
|
|
|
Ok(md5)
|
|
|
|
})
|
2022-01-11 22:23:19 +08:00
|
|
|
}
|
|
|
|
|
2022-10-13 23:29:37 +08:00
|
|
|
fn transform_operations(
|
2022-01-20 23:51:11 +08:00
|
|
|
&self,
|
2022-10-22 21:57:44 +08:00
|
|
|
operations: DeltaDocumentResolveOperations,
|
|
|
|
) -> BoxResultFuture<TransformOperations<DeltaDocumentResolveOperations>, FlowyError> {
|
2022-01-20 23:51:11 +08:00
|
|
|
let tx = self.edit_cmd_tx.clone();
|
2022-10-13 23:29:37 +08:00
|
|
|
let operations = operations.into_inner();
|
2022-01-20 23:51:11 +08:00
|
|
|
Box::pin(async move {
|
2022-10-13 23:29:37 +08:00
|
|
|
let (ret, rx) = oneshot::channel::<CollaborateResult<TextTransformOperations>>();
|
|
|
|
tx.send(EditorCommand::TransformOperations { operations, ret })
|
2022-01-20 23:51:11 +08:00
|
|
|
.await
|
|
|
|
.map_err(internal_error)?;
|
2022-10-13 23:29:37 +08:00
|
|
|
let transformed_operations = rx
|
2022-01-20 23:51:11 +08:00
|
|
|
.await
|
2022-10-13 23:29:37 +08:00
|
|
|
.map_err(|e| FlowyError::internal().context(format!("Transform operations failed: {}", e)))??;
|
|
|
|
Ok(transformed_operations)
|
2022-01-20 23:51:11 +08:00
|
|
|
})
|
2022-01-11 22:23:19 +08:00
|
|
|
}
|
|
|
|
|
2022-11-02 10:21:10 +08:00
|
|
|
fn reset_operations(&self, operations: DeltaDocumentResolveOperations) -> BoxResultFuture<RevisionMD5, FlowyError> {
|
2022-01-20 23:51:11 +08:00
|
|
|
let tx = self.edit_cmd_tx.clone();
|
2022-10-13 23:29:37 +08:00
|
|
|
let operations = operations.into_inner();
|
2022-01-20 23:51:11 +08:00
|
|
|
Box::pin(async move {
|
2022-01-11 22:23:19 +08:00
|
|
|
let (ret, rx) = oneshot::channel();
|
2022-01-20 23:51:11 +08:00
|
|
|
let _ = tx
|
2022-10-13 23:29:37 +08:00
|
|
|
.send(EditorCommand::ResetOperations { operations, ret })
|
2022-01-15 23:58:36 +08:00
|
|
|
.await
|
|
|
|
.map_err(internal_error)?;
|
2022-10-13 23:29:37 +08:00
|
|
|
let md5 = rx
|
|
|
|
.await
|
|
|
|
.map_err(|e| FlowyError::internal().context(format!("Reset operations failed: {}", e)))??;
|
2022-01-20 23:51:11 +08:00
|
|
|
Ok(md5)
|
|
|
|
})
|
2022-01-11 22:23:19 +08:00
|
|
|
}
|
|
|
|
}
|