From bbc9190bc06dddb25f0adbb55064f293a4930294 Mon Sep 17 00:00:00 2001 From: appflowy Date: Sun, 26 Dec 2021 19:10:37 +0800 Subject: [PATCH] add virtual net --- backend/Cargo.lock | 16 ++ .../services/web_socket/entities/message.rs | 6 +- backend/src/services/web_socket/ws_client.rs | 4 +- .../flowy_sdk/lib/protobuf/lib-ws/msg.pb.dart | 26 +- .../lib/protobuf/lib-ws/msg.pbjson.dart | 10 +- frontend/rust-lib/Cargo.toml | 1 + frontend/rust-lib/flowy-document/Cargo.toml | 2 +- .../src/services/doc/web_socket/web_socket.rs | 56 ++--- frontend/rust-lib/flowy-net/Cargo.toml | 7 +- .../rust-lib/flowy-net/src/handlers/mod.rs | 7 +- frontend/rust-lib/flowy-net/src/module.rs | 4 +- .../flowy-net/src/services/mock/mod.rs | 3 - .../flowy-net/src/services/mock/ws_mock.rs | 236 ------------------ .../flowy-net/src/services/ws/conn.rs | 165 +++++++++++- .../flowy-net/src/services/ws/manager.rs | 161 ------------ .../rust-lib/flowy-net/src/services/ws/mod.rs | 15 -- frontend/rust-lib/flowy-sdk/Cargo.toml | 1 + .../src/deps_resolve/document_deps.rs | 12 +- frontend/rust-lib/flowy-sdk/src/lib.rs | 21 +- frontend/rust-lib/flowy-sdk/src/module.rs | 10 +- frontend/rust-lib/flowy-test/Cargo.toml | 2 +- .../rust-lib/flowy-test/src/doc_script.rs | 2 +- .../flowy-test/tests/revision_test.rs | 62 ++--- .../rust-lib/flowy-virtual-net/Cargo.toml | 23 ++ .../rust-lib/flowy-virtual-net/src/lib.rs | 12 + .../flowy-virtual-net/src/mock/mod.rs | 4 + .../flowy-virtual-net/src/mock/server.rs | 137 ++++++++++ .../flowy-virtual-net/src/mock/ws_local.rs | 95 +++++++ .../rust-lib/flowy-virtual-net/src/ws/mod.rs | 3 + .../src}/ws/ws_local.rs | 27 +- .../flowy-collaboration/src/entities/mod.rs | 4 + .../src/derive_cache/derive_cache.rs | 2 +- shared-lib/lib-ws/src/msg.rs | 6 +- shared-lib/lib-ws/src/protobuf/model/msg.rs | 44 ++-- .../lib-ws/src/protobuf/proto/msg.proto | 2 +- shared-lib/lib-ws/src/ws.rs | 12 +- 36 files changed, 619 insertions(+), 581 deletions(-) delete mode 100644 frontend/rust-lib/flowy-net/src/services/mock/mod.rs delete mode 100644 frontend/rust-lib/flowy-net/src/services/mock/ws_mock.rs delete mode 100644 frontend/rust-lib/flowy-net/src/services/ws/manager.rs create mode 100644 frontend/rust-lib/flowy-virtual-net/Cargo.toml create mode 100644 frontend/rust-lib/flowy-virtual-net/src/lib.rs create mode 100644 frontend/rust-lib/flowy-virtual-net/src/mock/mod.rs create mode 100644 frontend/rust-lib/flowy-virtual-net/src/mock/server.rs create mode 100644 frontend/rust-lib/flowy-virtual-net/src/mock/ws_local.rs create mode 100644 frontend/rust-lib/flowy-virtual-net/src/ws/mod.rs rename frontend/rust-lib/{flowy-net/src/services => flowy-virtual-net/src}/ws/ws_local.rs (64%) diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 5acb27774d..df060bd39b 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -1401,6 +1401,7 @@ dependencies = [ "flowy-document", "flowy-net", "flowy-user", + "flowy-virtual-net", "futures-core", "lib-dispatch", "lib-infra", @@ -1491,6 +1492,21 @@ dependencies = [ "validator", ] +[[package]] +name = "flowy-virtual-net" +version = "0.1.0" +dependencies = [ + "bytes", + "dashmap", + "flowy-collaboration", + "flowy-net", + "lib-infra", + "lib-ws", + "parking_lot", + "tokio", + "tracing", +] + [[package]] name = "fnv" version = "1.0.7" diff --git a/backend/src/services/web_socket/entities/message.rs b/backend/src/services/web_socket/entities/message.rs index 49f55c9c16..056c37cfba 100644 --- a/backend/src/services/web_socket/entities/message.rs +++ b/backend/src/services/web_socket/entities/message.rs @@ -1,7 +1,7 @@ use actix::Message; use bytes::Bytes; use flowy_collaboration::entities::ws::{DocumentClientWSData, DocumentServerWSData}; -use lib_ws::{WSModule, WebScoketRawMessage}; +use lib_ws::{WSModule, WebSocketRawMessage}; use std::convert::TryInto; #[derive(Debug, Message, Clone)] @@ -17,7 +17,7 @@ impl std::ops::Deref for WebSocketMessage { impl std::convert::From for WebSocketMessage { fn from(data: DocumentClientWSData) -> Self { let bytes: Bytes = data.try_into().unwrap(); - let msg = WebScoketRawMessage { + let msg = WebSocketRawMessage { module: WSModule::Doc, data: bytes.to_vec(), }; @@ -30,7 +30,7 @@ impl std::convert::From for WebSocketMessage { impl std::convert::From for WebSocketMessage { fn from(data: DocumentServerWSData) -> Self { let bytes: Bytes = data.try_into().unwrap(); - let msg = WebScoketRawMessage { + let msg = WebSocketRawMessage { module: WSModule::Doc, data: bytes.to_vec(), }; diff --git a/backend/src/services/web_socket/ws_client.rs b/backend/src/services/web_socket/ws_client.rs index 949d23c0b2..4342a1fbb1 100644 --- a/backend/src/services/web_socket/ws_client.rs +++ b/backend/src/services/web_socket/ws_client.rs @@ -11,7 +11,7 @@ use actix::*; use actix_web::web::Data; use actix_web_actors::{ws, ws::Message::Text}; use bytes::Bytes; -use lib_ws::{WSModule, WebScoketRawMessage}; +use lib_ws::{WSModule, WebSocketRawMessage}; use std::{collections::HashMap, convert::TryFrom, sync::Arc, time::Instant}; pub trait WebSocketReceiver: Send + Sync { @@ -85,7 +85,7 @@ impl WSClient { fn handle_binary_message(&self, bytes: Bytes, socket: Socket) { // TODO: ok to unwrap? - let message: WebScoketRawMessage = WebScoketRawMessage::try_from(bytes).unwrap(); + let message: WebSocketRawMessage = WebSocketRawMessage::try_from(bytes).unwrap(); match self.ws_receivers.get(&message.module) { None => { log::error!("Can't find the receiver for {:?}", message.module); diff --git a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/msg.pb.dart b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/msg.pb.dart index c8b0b05375..6118fde754 100644 --- a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/msg.pb.dart +++ b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/msg.pb.dart @@ -13,15 +13,15 @@ import 'msg.pbenum.dart'; export 'msg.pbenum.dart'; -class WebScoketRawMessage extends $pb.GeneratedMessage { - static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'WebScoketRawMessage', createEmptyInstance: create) +class WebSocketRawMessage extends $pb.GeneratedMessage { + static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'WebSocketRawMessage', createEmptyInstance: create) ..e(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'module', $pb.PbFieldType.OE, defaultOrMaker: WSModule.Doc, valueOf: WSModule.valueOf, enumValues: WSModule.values) ..a<$core.List<$core.int>>(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'data', $pb.PbFieldType.OY) ..hasRequiredFields = false ; - WebScoketRawMessage._() : super(); - factory WebScoketRawMessage({ + WebSocketRawMessage._() : super(); + factory WebSocketRawMessage({ WSModule? module, $core.List<$core.int>? data, }) { @@ -34,26 +34,26 @@ class WebScoketRawMessage extends $pb.GeneratedMessage { } return _result; } - factory WebScoketRawMessage.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r); - factory WebScoketRawMessage.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r); + factory WebSocketRawMessage.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r); + factory WebSocketRawMessage.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r); @$core.Deprecated( 'Using this can add significant overhead to your binary. ' 'Use [GeneratedMessageGenericExtensions.deepCopy] instead. ' 'Will be removed in next major version') - WebScoketRawMessage clone() => WebScoketRawMessage()..mergeFromMessage(this); + WebSocketRawMessage clone() => WebSocketRawMessage()..mergeFromMessage(this); @$core.Deprecated( 'Using this can add significant overhead to your binary. ' 'Use [GeneratedMessageGenericExtensions.rebuild] instead. ' 'Will be removed in next major version') - WebScoketRawMessage copyWith(void Function(WebScoketRawMessage) updates) => super.copyWith((message) => updates(message as WebScoketRawMessage)) as WebScoketRawMessage; // ignore: deprecated_member_use + WebSocketRawMessage copyWith(void Function(WebSocketRawMessage) updates) => super.copyWith((message) => updates(message as WebSocketRawMessage)) as WebSocketRawMessage; // ignore: deprecated_member_use $pb.BuilderInfo get info_ => _i; @$core.pragma('dart2js:noInline') - static WebScoketRawMessage create() => WebScoketRawMessage._(); - WebScoketRawMessage createEmptyInstance() => create(); - static $pb.PbList createRepeated() => $pb.PbList(); + static WebSocketRawMessage create() => WebSocketRawMessage._(); + WebSocketRawMessage createEmptyInstance() => create(); + static $pb.PbList createRepeated() => $pb.PbList(); @$core.pragma('dart2js:noInline') - static WebScoketRawMessage getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor(create); - static WebScoketRawMessage? _defaultInstance; + static WebSocketRawMessage getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor(create); + static WebSocketRawMessage? _defaultInstance; @$pb.TagNumber(1) WSModule get module => $_getN(0); diff --git a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/msg.pbjson.dart b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/msg.pbjson.dart index e4c46df07b..4c43e7174a 100644 --- a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/msg.pbjson.dart +++ b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/msg.pbjson.dart @@ -18,14 +18,14 @@ const WSModule$json = const { /// Descriptor for `WSModule`. Decode as a `google.protobuf.EnumDescriptorProto`. final $typed_data.Uint8List wSModuleDescriptor = $convert.base64Decode('CghXU01vZHVsZRIHCgNEb2MQAA=='); -@$core.Deprecated('Use webScoketRawMessageDescriptor instead') -const WebScoketRawMessage$json = const { - '1': 'WebScoketRawMessage', +@$core.Deprecated('Use webSocketRawMessageDescriptor instead') +const WebSocketRawMessage$json = const { + '1': 'WebSocketRawMessage', '2': const [ const {'1': 'module', '3': 1, '4': 1, '5': 14, '6': '.WSModule', '10': 'module'}, const {'1': 'data', '3': 2, '4': 1, '5': 12, '10': 'data'}, ], }; -/// Descriptor for `WebScoketRawMessage`. Decode as a `google.protobuf.DescriptorProto`. -final $typed_data.Uint8List webScoketRawMessageDescriptor = $convert.base64Decode('ChNXZWJTY29rZXRSYXdNZXNzYWdlEiEKBm1vZHVsZRgBIAEoDjIJLldTTW9kdWxlUgZtb2R1bGUSEgoEZGF0YRgCIAEoDFIEZGF0YQ=='); +/// Descriptor for `WebSocketRawMessage`. Decode as a `google.protobuf.DescriptorProto`. +final $typed_data.Uint8List webSocketRawMessageDescriptor = $convert.base64Decode('ChNXZWJTb2NrZXRSYXdNZXNzYWdlEiEKBm1vZHVsZRgBIAEoDjIJLldTTW9kdWxlUgZtb2R1bGUSEgoEZGF0YRgCIAEoDFIEZGF0YQ=='); diff --git a/frontend/rust-lib/Cargo.toml b/frontend/rust-lib/Cargo.toml index 8565d7ed6f..23d079b073 100644 --- a/frontend/rust-lib/Cargo.toml +++ b/frontend/rust-lib/Cargo.toml @@ -4,6 +4,7 @@ members = [ "lib-log", "lib-sqlite", "flowy-net", + "flowy-virtual-net", "flowy-sdk", "dart-ffi", "flowy-user", diff --git a/frontend/rust-lib/flowy-document/Cargo.toml b/frontend/rust-lib/flowy-document/Cargo.toml index 4760e93343..634cc7b2cc 100644 --- a/frontend/rust-lib/flowy-document/Cargo.toml +++ b/frontend/rust-lib/flowy-document/Cargo.toml @@ -48,7 +48,7 @@ pin-project = "1.0.0" [dev-dependencies] flowy-test = { path = "../flowy-test" } flowy-document = { path = "../flowy-document", features = ["flowy_unit_test"]} -flowy-net = { path = "../flowy-net", features = ["flowy_unit_test"] } +flowy-net = { path = "../flowy-net" } color-eyre = { version = "0.5", default-features = false } criterion = "0.3" rand = "0.7.3" diff --git a/frontend/rust-lib/flowy-document/src/services/doc/web_socket/web_socket.rs b/frontend/rust-lib/flowy-document/src/services/doc/web_socket/web_socket.rs index bf2fbce3e3..099e4bba69 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/web_socket/web_socket.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/web_socket/web_socket.rs @@ -16,7 +16,7 @@ use bytes::Bytes; use flowy_collaboration::{ entities::{ revision::{RepeatedRevision, RevType, Revision, RevisionRange}, - ws::{DocumentClientWSData, DocumentClientWSDataType, DocumentServerWSDataBuilder, NewDocumentUser}, + ws::{DocumentClientWSData, NewDocumentUser}, }, errors::CollaborateResult, }; @@ -25,11 +25,7 @@ use lib_infra::future::FutureResult; use flowy_collaboration::entities::ws::DocumentServerWSDataType; use lib_ws::WSConnectState; -use std::{ - collections::VecDeque, - convert::{TryFrom, TryInto}, - sync::Arc, -}; +use std::{collections::VecDeque, convert::TryFrom, sync::Arc}; use tokio::sync::{broadcast, mpsc::UnboundedSender, oneshot, RwLock}; pub(crate) trait DocumentWebSocketManager: Send + Sync { @@ -44,36 +40,32 @@ pub(crate) async fn make_document_ws_manager( rev_manager: Arc, ws: Arc, ) -> Arc { - if cfg!(feature = "http_server") { - let shared_sink = Arc::new(SharedWSSinkDataProvider::new(rev_manager.clone())); - let ws_stream_consumer = Arc::new(DocumentWebSocketSteamConsumerAdapter { - doc_id: doc_id.clone(), - user_id: user_id.clone(), - editor_edit_queue: editor_edit_queue.clone(), - rev_manager: rev_manager.clone(), - shared_sink: shared_sink.clone(), - }); - let ws_stream_provider = DocumentWSSinkDataProviderAdapter(shared_sink.clone()); - let ws_manager = Arc::new(HttpWebSocketManager::new( - &doc_id, - ws.clone(), - Arc::new(ws_stream_provider), - ws_stream_consumer, - )); - notify_user_has_connected(&user_id, &doc_id, rev_manager.clone(), shared_sink).await; - listen_document_ws_state(&user_id, &doc_id, ws_manager.scribe_state(), rev_manager.clone()); + let shared_sink = Arc::new(SharedWSSinkDataProvider::new(rev_manager.clone())); + let ws_stream_consumer = Arc::new(DocumentWebSocketSteamConsumerAdapter { + doc_id: doc_id.clone(), + user_id: user_id.clone(), + editor_edit_queue: editor_edit_queue.clone(), + rev_manager: rev_manager.clone(), + shared_sink: shared_sink.clone(), + }); + let ws_stream_provider = DocumentWSSinkDataProviderAdapter(shared_sink.clone()); + let ws_manager = Arc::new(HttpWebSocketManager::new( + &doc_id, + ws.clone(), + Arc::new(ws_stream_provider), + ws_stream_consumer, + )); + notify_user_has_connected(&user_id, &doc_id, rev_manager.clone(), shared_sink).await; + listen_document_ws_state(&user_id, &doc_id, ws_manager.scribe_state(), rev_manager.clone()); - Arc::new(ws_manager) - } else { - Arc::new(Arc::new(LocalWebSocketManager {})) - } + Arc::new(ws_manager) } async fn notify_user_has_connected( - user_id: &str, - doc_id: &str, - rev_manager: Arc, - shared_sink: Arc, + _user_id: &str, + _doc_id: &str, + _rev_manager: Arc, + _shared_sink: Arc, ) { // let need_notify = match shared_sink.front().await { // None => true, diff --git a/frontend/rust-lib/flowy-net/Cargo.toml b/frontend/rust-lib/flowy-net/Cargo.toml index 500fd74d90..307752c51e 100644 --- a/frontend/rust-lib/flowy-net/Cargo.toml +++ b/frontend/rust-lib/flowy-net/Cargo.toml @@ -10,8 +10,8 @@ lib-dispatch = { path = "../lib-dispatch" } flowy-error = { path = "../flowy-error" } flowy-derive = { path = "../../../shared-lib/flowy-derive" } lib-infra = { path = "../../../shared-lib/lib-infra" } -lib-ws = { path = "../../../shared-lib/lib-ws" } protobuf = {version = "2.18.0"} +lib-ws = { path = "../../../shared-lib/lib-ws" } bytes = { version = "1.0" } anyhow = "1.0" tokio = {version = "1", features = ["sync"]} @@ -20,10 +20,5 @@ strum = "0.21" strum_macros = "0.21" tracing = { version = "0.1", features = ["log"] } -flowy-collaboration = { path = "../../../shared-lib/flowy-collaboration", optional = true} -lazy_static = {version = "1.4.0", optional = true} -dashmap = {version = "4.0", optional = true} - [features] -flowy_unit_test = ["flowy-collaboration", "lazy_static", "dashmap"] http_server = [] \ No newline at end of file diff --git a/frontend/rust-lib/flowy-net/src/handlers/mod.rs b/frontend/rust-lib/flowy-net/src/handlers/mod.rs index d02093db8d..59e7b9c986 100644 --- a/frontend/rust-lib/flowy-net/src/handlers/mod.rs +++ b/frontend/rust-lib/flowy-net/src/handlers/mod.rs @@ -1,11 +1,14 @@ -use crate::{entities::NetworkState, services::ws::WsManager}; +use crate::{entities::NetworkState, services::ws::FlowyWSConnect}; use flowy_error::FlowyError; use lib_dispatch::prelude::{Data, Unit}; use std::sync::Arc; #[tracing::instrument(skip(data, ws_manager))] -pub async fn update_network_ty(data: Data, ws_manager: Unit>) -> Result<(), FlowyError> { +pub async fn update_network_ty( + data: Data, + ws_manager: Unit>, +) -> Result<(), FlowyError> { let network_state = data.into_inner(); ws_manager.update_network_type(&network_state.ty); Ok(()) diff --git a/frontend/rust-lib/flowy-net/src/module.rs b/frontend/rust-lib/flowy-net/src/module.rs index 888fb2316b..4ca0973dcd 100644 --- a/frontend/rust-lib/flowy-net/src/module.rs +++ b/frontend/rust-lib/flowy-net/src/module.rs @@ -1,8 +1,8 @@ -use crate::{event::NetworkEvent, handlers::*, services::ws::WsManager}; +use crate::{event::NetworkEvent, handlers::*, services::ws::FlowyWSConnect}; use lib_dispatch::prelude::*; use std::sync::Arc; -pub fn create(ws_manager: Arc) -> Module { +pub fn create(ws_manager: Arc) -> Module { Module::new() .name("Flowy-Network") .data(ws_manager) diff --git a/frontend/rust-lib/flowy-net/src/services/mock/mod.rs b/frontend/rust-lib/flowy-net/src/services/mock/mod.rs deleted file mode 100644 index 844f414f32..0000000000 --- a/frontend/rust-lib/flowy-net/src/services/mock/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -mod ws_mock; - -pub use ws_mock::*; diff --git a/frontend/rust-lib/flowy-net/src/services/mock/ws_mock.rs b/frontend/rust-lib/flowy-net/src/services/mock/ws_mock.rs deleted file mode 100644 index 1a5700e8b5..0000000000 --- a/frontend/rust-lib/flowy-net/src/services/mock/ws_mock.rs +++ /dev/null @@ -1,236 +0,0 @@ -use crate::services::ws::{FlowyError, FlowyWebSocket, FlowyWsSender, WSConnectState, WSMessage, WSMessageReceiver}; -use bytes::Bytes; -use dashmap::DashMap; -use flowy_collaboration::{ - core::sync::{DocumentPersistence, RevisionUser, ServerDocumentManager, SyncResponse}, - entities::{ - doc::DocumentInfo, - revision::{RepeatedRevision, Revision}, - ws::{DocumentWSData, DocumentWSDataBuilder, DocumentWSDataType, NewDocumentUser}, - }, - errors::CollaborateError, -}; -use lazy_static::lazy_static; -use lib_infra::future::{FutureResult, FutureResultSend}; -use lib_ws::WSModule; -use parking_lot::RwLock; -use std::{ - convert::{TryFrom, TryInto}, - fmt::{Debug, Formatter}, - sync::Arc, -}; -use tokio::sync::{broadcast, broadcast::Receiver, mpsc}; - -pub struct MockWebSocket { - handlers: DashMap>, - state_sender: broadcast::Sender, - ws_sender: broadcast::Sender, - is_stop: RwLock, -} - -impl std::default::Default for MockWebSocket { - fn default() -> Self { - let (state_sender, _) = broadcast::channel(16); - let (ws_sender, _) = broadcast::channel(16); - MockWebSocket { - handlers: DashMap::new(), - state_sender, - ws_sender, - is_stop: RwLock::new(false), - } - } -} - -impl MockWebSocket { - pub fn new() -> MockWebSocket { MockWebSocket::default() } -} - -impl FlowyWebSocket for Arc { - fn start_connect(&self, _addr: String) -> FutureResult<(), FlowyError> { - *self.is_stop.write() = false; - - let mut ws_receiver = self.ws_sender.subscribe(); - let cloned_ws = self.clone(); - tokio::spawn(async move { - while let Ok(message) = ws_receiver.recv().await { - if *cloned_ws.is_stop.read() { - // do nothing - } else { - let ws_data = DocumentWSData::try_from(Bytes::from(message.data.clone())).unwrap(); - let mut rx = DOC_SERVER.handle_ws_data(ws_data).await; - let new_ws_message = rx.recv().await.unwrap(); - match cloned_ws.handlers.get(&new_ws_message.module) { - None => tracing::error!("Can't find any handler for message: {:?}", new_ws_message), - Some(handler) => handler.receive_message(new_ws_message.clone()), - } - } - } - }); - - FutureResult::new(async { Ok(()) }) - } - - fn stop_connect(&self) -> FutureResult<(), FlowyError> { - *self.is_stop.write() = true; - FutureResult::new(async { Ok(()) }) - } - - fn subscribe_connect_state(&self) -> Receiver { self.state_sender.subscribe() } - - fn reconnect(&self, _count: usize) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) } - - fn add_message_receiver(&self, handler: Arc) -> Result<(), FlowyError> { - let source = handler.source(); - if self.handlers.contains_key(&source) { - tracing::error!("WsSource's {:?} is already registered", source); - } - self.handlers.insert(source, handler); - Ok(()) - } - - fn ws_sender(&self) -> Result, FlowyError> { Ok(Arc::new(self.ws_sender.clone())) } -} - -lazy_static! { - static ref DOC_SERVER: Arc = Arc::new(MockDocServer::default()); -} - -struct MockDocServer { - pub manager: Arc, -} - -impl std::default::Default for MockDocServer { - fn default() -> Self { - let persistence = Arc::new(MockDocServerPersistence::default()); - let manager = Arc::new(ServerDocumentManager::new(persistence)); - MockDocServer { manager } - } -} - -impl MockDocServer { - async fn handle_ws_data(&self, ws_data: DocumentWSData) -> mpsc::Receiver { - let bytes = Bytes::from(ws_data.data); - match ws_data.ty { - DocumentWSDataType::Ack => { - unimplemented!() - }, - DocumentWSDataType::PushRev => { - let revisions = RepeatedRevision::try_from(bytes).unwrap().into_inner(); - let (tx, rx) = mpsc::channel(1); - let user = MockDocUser { - user_id: revision.user_id.clone(), - tx, - }; - self.manager.apply_revisions(user, revisions).await.unwrap(); - - rx - }, - DocumentWSDataType::PullRev => { - unimplemented!() - }, - DocumentWSDataType::UserConnect => { - let new_user = NewDocumentUser::try_from(bytes).unwrap(); - let (tx, rx) = mpsc::channel(1); - let data = DocumentWSDataBuilder::build_ack_message(&new_user.doc_id, &ws_data.id); - let user = Arc::new(MockDocUser { - user_id: new_user.user_id, - tx, - }) as Arc; - - user.receive(SyncResponse::Ack(data)); - rx - }, - } - } -} - -struct MockDocServerPersistence { - inner: Arc>, -} - -impl Debug for MockDocServerPersistence { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("MockDocServerPersistence") } -} - -impl std::default::Default for MockDocServerPersistence { - fn default() -> Self { - MockDocServerPersistence { - inner: Arc::new(DashMap::new()), - } - } -} - -impl DocumentPersistence for MockDocServerPersistence { - fn read_doc(&self, doc_id: &str) -> FutureResultSend { - let inner = self.inner.clone(); - let doc_id = doc_id.to_owned(); - FutureResultSend::new(async move { - match inner.get(&doc_id) { - None => { - // - Err(CollaborateError::record_not_found()) - }, - Some(val) => { - // - Ok(val.value().clone()) - }, - } - }) - } - - fn create_doc(&self, revision: Revision) -> FutureResultSend { - FutureResultSend::new(async move { - let document_info: DocumentInfo = revision.try_into().unwrap(); - Ok(document_info) - }) - } - - fn get_revisions(&self, _doc_id: &str, _rev_ids: Vec) -> FutureResultSend, CollaborateError> { - unimplemented!() - } -} - -#[derive(Debug)] -struct MockDocUser { - user_id: String, - tx: mpsc::Sender, -} - -impl RevisionUser for MockDocUser { - fn user_id(&self) -> String { self.user_id.clone() } - - fn receive(&self, resp: SyncResponse) { - let sender = self.tx.clone(); - tokio::spawn(async move { - match resp { - SyncResponse::Pull(data) => { - let bytes: Bytes = data.try_into().unwrap(); - let msg = WSMessage { - module: WSModule::Doc, - data: bytes.to_vec(), - }; - sender.send(msg).await.unwrap(); - }, - SyncResponse::Push(data) => { - let bytes: Bytes = data.try_into().unwrap(); - let msg = WSMessage { - module: WSModule::Doc, - data: bytes.to_vec(), - }; - sender.send(msg).await.unwrap(); - }, - SyncResponse::Ack(data) => { - let bytes: Bytes = data.try_into().unwrap(); - let msg = WSMessage { - module: WSModule::Doc, - data: bytes.to_vec(), - }; - sender.send(msg).await.unwrap(); - }, - SyncResponse::NewRevision(_) => { - // unimplemented!() - }, - } - }); - } -} diff --git a/frontend/rust-lib/flowy-net/src/services/ws/conn.rs b/frontend/rust-lib/flowy-net/src/services/ws/conn.rs index ff3c7a719b..3e734cf1fc 100644 --- a/frontend/rust-lib/flowy-net/src/services/ws/conn.rs +++ b/frontend/rust-lib/flowy-net/src/services/ws/conn.rs @@ -1,9 +1,12 @@ -use lib_infra::future::FutureResult; -use std::sync::Arc; -use tokio::sync::broadcast; - +use crate::entities::NetworkType; +use flowy_error::internal_error; pub use flowy_error::FlowyError; -pub use lib_ws::{WSConnectState, WSMessageReceiver, WebScoketRawMessage}; +use lib_infra::future::FutureResult; +pub use lib_ws::{WSConnectState, WSMessageReceiver, WebSocketRawMessage}; +use lib_ws::{WSController, WSSender}; +use parking_lot::RwLock; +use std::sync::Arc; +use tokio::sync::{broadcast, broadcast::Receiver}; pub trait FlowyWebSocket: Send + Sync { fn start_connect(&self, addr: String) -> FutureResult<(), FlowyError>; @@ -11,9 +14,155 @@ pub trait FlowyWebSocket: Send + Sync { fn subscribe_connect_state(&self) -> broadcast::Receiver; fn reconnect(&self, count: usize) -> FutureResult<(), FlowyError>; fn add_message_receiver(&self, handler: Arc) -> Result<(), FlowyError>; - fn ws_sender(&self) -> Result, FlowyError>; + fn ws_sender(&self) -> Result, FlowyError>; } -pub trait FlowyWsSender: Send + Sync { - fn send(&self, msg: WebScoketRawMessage) -> Result<(), FlowyError>; +pub trait FlowyWSSender: Send + Sync { + fn send(&self, msg: WebSocketRawMessage) -> Result<(), FlowyError>; +} + +pub struct FlowyWSConnect { + inner: Arc, + connect_type: RwLock, + status_notifier: broadcast::Sender, + addr: String, +} + +impl FlowyWSConnect { + pub fn new(addr: String, ws: Arc) -> Self { + let (status_notifier, _) = broadcast::channel(10); + FlowyWSConnect { + inner: ws, + connect_type: RwLock::new(NetworkType::default()), + status_notifier, + addr, + } + } + + pub async fn start(&self, token: String) -> Result<(), FlowyError> { + let addr = format!("{}/{}", self.addr, token); + self.inner.stop_connect().await?; + let _ = self.inner.start_connect(addr).await?; + Ok(()) + } + + pub async fn stop(&self) { let _ = self.inner.stop_connect().await; } + + pub fn update_network_type(&self, new_type: &NetworkType) { + tracing::debug!("Network new state: {:?}", new_type); + let old_type = self.connect_type.read().clone(); + let _ = self.status_notifier.send(new_type.clone()); + + if &old_type != new_type { + tracing::debug!("Connect type switch from {:?} to {:?}", old_type, new_type); + match (old_type.is_connect(), new_type.is_connect()) { + (false, true) => { + let ws_controller = self.inner.clone(); + tokio::spawn(async move { retry_connect(ws_controller, 100).await }); + }, + (true, false) => { + // + }, + _ => {}, + } + + *self.connect_type.write() = new_type.clone(); + } + } + + pub fn subscribe_websocket_state(&self) -> broadcast::Receiver { + self.inner.subscribe_connect_state() + } + + pub fn subscribe_network_ty(&self) -> broadcast::Receiver { self.status_notifier.subscribe() } + + pub fn add_receiver(&self, handler: Arc) -> Result<(), FlowyError> { + let _ = self.inner.add_message_receiver(handler)?; + Ok(()) + } + + pub fn ws_sender(&self) -> Result, FlowyError> { self.inner.ws_sender() } +} + +#[tracing::instrument(level = "debug", skip(manager))] +pub fn listen_on_websocket(manager: Arc) { + if cfg!(feature = "http_server") { + let ws = manager.inner.clone(); + let mut notify = manager.inner.subscribe_connect_state(); + let _ = tokio::spawn(async move { + loop { + match notify.recv().await { + Ok(state) => { + tracing::info!("Websocket state changed: {}", state); + match state { + WSConnectState::Init => {}, + WSConnectState::Connected => {}, + WSConnectState::Connecting => {}, + WSConnectState::Disconnected => retry_connect(ws.clone(), 100).await, + } + }, + Err(e) => { + tracing::error!("Websocket state notify error: {:?}", e); + break; + }, + } + } + }); + } else { + // do nothing + }; +} + +async fn retry_connect(ws: Arc, count: usize) { + match ws.reconnect(count).await { + Ok(_) => {}, + Err(e) => { + tracing::error!("websocket connect failed: {:?}", e); + }, + } +} + +impl FlowyWebSocket for Arc { + fn start_connect(&self, addr: String) -> FutureResult<(), FlowyError> { + let cloned_ws = self.clone(); + FutureResult::new(async move { + let _ = cloned_ws.start(addr).await.map_err(internal_error)?; + Ok(()) + }) + } + + fn stop_connect(&self) -> FutureResult<(), FlowyError> { + let controller = self.clone(); + FutureResult::new(async move { + controller.stop().await; + Ok(()) + }) + } + + fn subscribe_connect_state(&self) -> Receiver { self.subscribe_state() } + + fn reconnect(&self, count: usize) -> FutureResult<(), FlowyError> { + let cloned_ws = self.clone(); + FutureResult::new(async move { + let _ = cloned_ws.retry(count).await.map_err(internal_error)?; + Ok(()) + }) + } + + fn add_message_receiver(&self, handler: Arc) -> Result<(), FlowyError> { + let _ = self.add_receiver(handler).map_err(internal_error)?; + Ok(()) + } + + fn ws_sender(&self) -> Result, FlowyError> { + let sender = self.sender().map_err(internal_error)?; + Ok(sender) + } +} + +impl FlowyWSSender for WSSender { + fn send(&self, msg: WebSocketRawMessage) -> Result<(), FlowyError> { + let _ = self.send_msg(msg).map_err(internal_error)?; + Ok(()) + } } diff --git a/frontend/rust-lib/flowy-net/src/services/ws/manager.rs b/frontend/rust-lib/flowy-net/src/services/ws/manager.rs deleted file mode 100644 index e99be9dbe9..0000000000 --- a/frontend/rust-lib/flowy-net/src/services/ws/manager.rs +++ /dev/null @@ -1,161 +0,0 @@ -use crate::{ - entities::NetworkType, - services::ws::{local_web_socket, FlowyWebSocket, FlowyWsSender}, -}; -use flowy_error::{internal_error, FlowyError}; -use lib_infra::future::FutureResult; -use lib_ws::{WSConnectState, WSController, WSMessageReceiver, WSSender, WebScoketRawMessage}; -use parking_lot::RwLock; -use std::sync::Arc; -use tokio::sync::{broadcast, broadcast::Receiver}; - -pub struct WsManager { - inner: Arc, - connect_type: RwLock, - status_notifier: broadcast::Sender, - addr: String, -} - -impl WsManager { - pub fn new(addr: String) -> Self { - let ws: Arc = if cfg!(feature = "http_server") { - Arc::new(Arc::new(WSController::new())) - } else { - local_web_socket() - }; - let (status_notifier, _) = broadcast::channel(10); - WsManager { - inner: ws, - connect_type: RwLock::new(NetworkType::default()), - status_notifier, - addr, - } - } - - pub async fn start(&self, token: String) -> Result<(), FlowyError> { - let addr = format!("{}/{}", self.addr, token); - self.inner.stop_connect().await?; - let _ = self.inner.start_connect(addr).await?; - Ok(()) - } - - pub async fn stop(&self) { let _ = self.inner.stop_connect().await; } - - pub fn update_network_type(&self, new_type: &NetworkType) { - tracing::debug!("Network new state: {:?}", new_type); - let old_type = self.connect_type.read().clone(); - let _ = self.status_notifier.send(new_type.clone()); - - if &old_type != new_type { - tracing::debug!("Connect type switch from {:?} to {:?}", old_type, new_type); - match (old_type.is_connect(), new_type.is_connect()) { - (false, true) => { - let ws_controller = self.inner.clone(); - tokio::spawn(async move { retry_connect(ws_controller, 100).await }); - }, - (true, false) => { - // - }, - _ => {}, - } - - *self.connect_type.write() = new_type.clone(); - } - } - - pub fn subscribe_websocket_state(&self) -> broadcast::Receiver { - self.inner.subscribe_connect_state() - } - - pub fn subscribe_network_ty(&self) -> broadcast::Receiver { self.status_notifier.subscribe() } - - pub fn add_receiver(&self, handler: Arc) -> Result<(), FlowyError> { - let _ = self.inner.add_message_receiver(handler)?; - Ok(()) - } - - pub fn ws_sender(&self) -> Result, FlowyError> { self.inner.ws_sender() } -} - -#[tracing::instrument(level = "debug", skip(manager))] -pub fn listen_on_websocket(manager: Arc) { - if cfg!(feature = "http_server") { - let ws = manager.inner.clone(); - let mut notify = manager.inner.subscribe_connect_state(); - let _ = tokio::spawn(async move { - loop { - match notify.recv().await { - Ok(state) => { - tracing::info!("Websocket state changed: {}", state); - match state { - WSConnectState::Init => {}, - WSConnectState::Connected => {}, - WSConnectState::Connecting => {}, - WSConnectState::Disconnected => retry_connect(ws.clone(), 100).await, - } - }, - Err(e) => { - tracing::error!("Websocket state notify error: {:?}", e); - break; - }, - } - } - }); - } else { - // do nothing - }; -} - -async fn retry_connect(ws: Arc, count: usize) { - match ws.reconnect(count).await { - Ok(_) => {}, - Err(e) => { - tracing::error!("websocket connect failed: {:?}", e); - }, - } -} - -impl FlowyWebSocket for Arc { - fn start_connect(&self, addr: String) -> FutureResult<(), FlowyError> { - let cloned_ws = self.clone(); - FutureResult::new(async move { - let _ = cloned_ws.start(addr).await.map_err(internal_error)?; - Ok(()) - }) - } - - fn stop_connect(&self) -> FutureResult<(), FlowyError> { - let controller = self.clone(); - FutureResult::new(async move { - controller.stop().await; - Ok(()) - }) - } - - fn subscribe_connect_state(&self) -> Receiver { self.subscribe_state() } - - fn reconnect(&self, count: usize) -> FutureResult<(), FlowyError> { - let cloned_ws = self.clone(); - FutureResult::new(async move { - let _ = cloned_ws.retry(count).await.map_err(internal_error)?; - Ok(()) - }) - } - - fn add_message_receiver(&self, handler: Arc) -> Result<(), FlowyError> { - let _ = self.add_receiver(handler).map_err(internal_error)?; - Ok(()) - } - - fn ws_sender(&self) -> Result, FlowyError> { - let sender = self.sender().map_err(internal_error)?; - Ok(sender) - } -} - -impl FlowyWsSender for WSSender { - fn send(&self, msg: WebScoketRawMessage) -> Result<(), FlowyError> { - let _ = self.send_msg(msg).map_err(internal_error)?; - Ok(()) - } -} diff --git a/frontend/rust-lib/flowy-net/src/services/ws/mod.rs b/frontend/rust-lib/flowy-net/src/services/ws/mod.rs index 7eea998b4e..2fe6a324b2 100644 --- a/frontend/rust-lib/flowy-net/src/services/ws/mod.rs +++ b/frontend/rust-lib/flowy-net/src/services/ws/mod.rs @@ -1,18 +1,3 @@ pub use conn::*; -pub use manager::*; -use std::sync::Arc; mod conn; -mod manager; -mod ws_local; - -// #[cfg(not(feature = "flowy_unit_test"))] -// pub(crate) fn local_web_socket() -> Arc { -// Arc::new(Arc::new(ws_local::LocalWebSocket::default())) } -// -// #[cfg(feature = "flowy_unit_test")] -// pub(crate) fn local_web_socket() -> Arc { -// Arc::new(Arc::new(crate::services::mock::MockWebSocket::default())) -// } - -pub(crate) fn local_web_socket() -> Arc { Arc::new(Arc::new(ws_local::LocalWebSocket::default())) } diff --git a/frontend/rust-lib/flowy-sdk/Cargo.toml b/frontend/rust-lib/flowy-sdk/Cargo.toml index d1dcd821f4..8ecf0ae451 100644 --- a/frontend/rust-lib/flowy-sdk/Cargo.toml +++ b/frontend/rust-lib/flowy-sdk/Cargo.toml @@ -10,6 +10,7 @@ lib-dispatch = { path = "../lib-dispatch" } lib-log = { path = "../lib-log" } flowy-user = { path = "../flowy-user" } flowy-net = { path = "../flowy-net" } +flowy-virtual-net = { path = "../flowy-virtual-net" } flowy-core = { path = "../flowy-core", default-features = false } flowy-database = { path = "../flowy-database" } flowy-document = { path = "../flowy-document" } diff --git a/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs b/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs index 4be9483027..ee1a7fa244 100644 --- a/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs +++ b/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs @@ -6,15 +6,15 @@ use flowy_document::{ errors::{internal_error, FlowyError}, services::doc::{DocumentWSReceivers, DocumentWebSocket, WSStateReceiver}, }; -use flowy_net::services::ws::WsManager; +use flowy_net::services::ws::FlowyWSConnect; use flowy_user::services::user::UserSession; -use lib_ws::{WSMessageReceiver, WSModule, WebScoketRawMessage}; +use lib_ws::{WSMessageReceiver, WSModule, WebSocketRawMessage}; use std::{convert::TryInto, path::Path, sync::Arc}; pub struct DocumentDepsResolver(); impl DocumentDepsResolver { pub fn resolve( - ws_manager: Arc, + ws_manager: Arc, user_session: Arc, ) -> ( Arc, @@ -61,13 +61,13 @@ impl DocumentUser for DocumentUserImpl { } struct DocumentWebSocketAdapter { - ws_manager: Arc, + ws_manager: Arc, } impl DocumentWebSocket for DocumentWebSocketAdapter { fn send(&self, data: DocumentClientWSData) -> Result<(), FlowyError> { let bytes: Bytes = data.try_into().unwrap(); - let msg = WebScoketRawMessage { + let msg = WebSocketRawMessage { module: WSModule::Doc, data: bytes.to_vec(), }; @@ -84,5 +84,5 @@ struct WSMessageReceiverAdaptor(Arc); impl WSMessageReceiver for WSMessageReceiverAdaptor { fn source(&self) -> WSModule { WSModule::Doc } - fn receive_message(&self, msg: WebScoketRawMessage) { self.0.did_receive_data(Bytes::from(msg.data)); } + fn receive_message(&self, msg: WebSocketRawMessage) { self.0.did_receive_data(Bytes::from(msg.data)); } } diff --git a/frontend/rust-lib/flowy-sdk/src/lib.rs b/frontend/rust-lib/flowy-sdk/src/lib.rs index b417ef19c0..ab66f71612 100644 --- a/frontend/rust-lib/flowy-sdk/src/lib.rs +++ b/frontend/rust-lib/flowy-sdk/src/lib.rs @@ -1,5 +1,4 @@ mod deps_resolve; -// mod flowy_server; pub mod module; use crate::deps_resolve::{DocumentDepsResolver, WorkspaceDepsResolver}; use backend_service::configuration::ClientServerConfiguration; @@ -7,13 +6,15 @@ use flowy_core::{errors::FlowyError, module::init_core, prelude::CoreContext}; use flowy_document::context::DocumentContext; use flowy_net::{ entities::NetworkType, - services::ws::{listen_on_websocket, WsManager}, + services::ws::{listen_on_websocket, FlowyWSConnect, FlowyWebSocket}, }; use flowy_user::{ prelude::UserStatus, services::user::{UserSession, UserSessionConfig}, }; +use flowy_virtual_net::local_web_socket; use lib_dispatch::prelude::*; +use lib_ws::WSController; use module::mk_modules; pub use module::*; use std::sync::{ @@ -72,7 +73,7 @@ pub struct FlowySDK { pub flowy_document: Arc, pub core: Arc, pub dispatcher: Arc, - pub ws_manager: Arc, + pub ws_manager: Arc, } impl FlowySDK { @@ -81,7 +82,13 @@ impl FlowySDK { init_kv(&config.root); tracing::debug!("🔥 {:?}", config); - let ws_manager = Arc::new(WsManager::new(config.server_config.ws_addr())); + let ws: Arc = if cfg!(feature = "http_server") { + Arc::new(Arc::new(WSController::new())) + } else { + local_web_socket() + }; + + let ws_manager = Arc::new(FlowyWSConnect::new(config.server_config.ws_addr(), ws)); let user_session = mk_user_session(&config); let flowy_document = mk_document(ws_manager.clone(), user_session.clone(), &config.server_config); let core_ctx = mk_core_context(user_session.clone(), flowy_document.clone(), &config.server_config); @@ -106,7 +113,7 @@ impl FlowySDK { fn _init( dispatch: &EventDispatcher, - ws_manager: Arc, + ws_manager: Arc, user_session: Arc, core: Arc, ) { @@ -126,7 +133,7 @@ fn _init( } async fn _listen_user_status( - ws_manager: Arc, + ws_manager: Arc, mut subscribe: broadcast::Receiver, core: Arc, ) { @@ -201,7 +208,7 @@ fn mk_core_context( } pub fn mk_document( - ws_manager: Arc, + ws_manager: Arc, user_session: Arc, server_config: &ClientServerConfiguration, ) -> Arc { diff --git a/frontend/rust-lib/flowy-sdk/src/module.rs b/frontend/rust-lib/flowy-sdk/src/module.rs index 572373b761..0175d59e2a 100644 --- a/frontend/rust-lib/flowy-sdk/src/module.rs +++ b/frontend/rust-lib/flowy-sdk/src/module.rs @@ -1,11 +1,15 @@ use flowy_core::prelude::CoreContext; -use flowy_net::services::ws::WsManager; +use flowy_net::services::ws::FlowyWSConnect; use flowy_user::services::user::UserSession; use lib_dispatch::prelude::Module; use std::sync::Arc; -pub fn mk_modules(ws_manager: Arc, core: Arc, user_session: Arc) -> Vec { +pub fn mk_modules( + ws_manager: Arc, + core: Arc, + user_session: Arc, +) -> Vec { let user_module = mk_user_module(user_session); let core_module = mk_core_module(core); let network_module = mk_network_module(ws_manager); @@ -16,4 +20,4 @@ fn mk_user_module(user_session: Arc) -> Module { flowy_user::module fn mk_core_module(core: Arc) -> Module { flowy_core::module::create(core) } -fn mk_network_module(ws_manager: Arc) -> Module { flowy_net::module::create(ws_manager) } +fn mk_network_module(ws_manager: Arc) -> Module { flowy_net::module::create(ws_manager) } diff --git a/frontend/rust-lib/flowy-test/Cargo.toml b/frontend/rust-lib/flowy-test/Cargo.toml index 56fd34ef46..32c4a804a6 100644 --- a/frontend/rust-lib/flowy-test/Cargo.toml +++ b/frontend/rust-lib/flowy-test/Cargo.toml @@ -36,4 +36,4 @@ fake = "~2.3.0" claim = "0.4.0" futures = "0.3.15" serial_test = "0.5.1" -flowy-net = { path = "../flowy-net", features = ["flowy_unit_test"] } \ No newline at end of file +flowy-virtual-net = { path = "../flowy-virtual-net", features = ["flowy_unit_test"] } \ No newline at end of file diff --git a/frontend/rust-lib/flowy-test/src/doc_script.rs b/frontend/rust-lib/flowy-test/src/doc_script.rs index 486a260762..c9757ff92b 100644 --- a/frontend/rust-lib/flowy-test/src/doc_script.rs +++ b/frontend/rust-lib/flowy-test/src/doc_script.rs @@ -77,7 +77,7 @@ impl EditorTest { EditorScript::AssertNextRevId(rev_id) => { let next_revision = rev_manager.next_sync_revision().await.unwrap(); if rev_id.is_none() { - assert_eq!(next_revision.is_none(), true); + assert_eq!(next_revision.is_none(), true, "Next revision should be None"); return; } let next_revision = next_revision.unwrap(); diff --git a/frontend/rust-lib/flowy-test/tests/revision_test.rs b/frontend/rust-lib/flowy-test/tests/revision_test.rs index 12048d2569..563dd0454d 100644 --- a/frontend/rust-lib/flowy-test/tests/revision_test.rs +++ b/frontend/rust-lib/flowy-test/tests/revision_test.rs @@ -1,31 +1,31 @@ -// use flowy_test::doc_script::{EditorScript::*, *}; -// use lib_ot::revision::RevState; -// -// #[tokio::test] -// async fn doc_sync_test() { -// let scripts = vec![ -// InsertText("1", 0), -// InsertText("2", 1), -// InsertText("3", 2), -// AssertJson(r#"[{"insert":"123\n"}]"#), -// AssertNextRevId(None), -// ]; -// EditorTest::new().await.run_scripts(scripts).await; -// } -// -// #[tokio::test] -// async fn doc_sync_retry_ws_conn() { -// let scripts = vec![ -// InsertText("1", 0), -// StopWs, -// InsertText("2", 1), -// InsertText("3", 2), -// StartWs, -// WaitSyncFinished, -// AssertRevisionState(2, RevState::Acked), -// AssertRevisionState(3, RevState::Acked), -// AssertNextRevId(None), -// AssertJson(r#"[{"insert":"123\n"}]"#), -// ]; -// EditorTest::new().await.run_scripts(scripts).await; -// } +use flowy_collaboration::entities::revision::RevState; +use flowy_test::doc_script::{EditorScript::*, *}; + +#[tokio::test] +async fn doc_sync_test() { + let scripts = vec![ + InsertText("1", 0), + InsertText("2", 1), + InsertText("3", 2), + AssertJson(r#"[{"insert":"123\n"}]"#), + AssertNextRevId(None), + ]; + EditorTest::new().await.run_scripts(scripts).await; +} + +#[tokio::test] +async fn doc_sync_retry_ws_conn() { + let scripts = vec![ + InsertText("1", 0), + StopWs, + InsertText("2", 1), + InsertText("3", 2), + StartWs, + WaitSyncFinished, + AssertRevisionState(2, RevState::Ack), + AssertRevisionState(3, RevState::Ack), + AssertNextRevId(None), + AssertJson(r#"[{"insert":"123\n"}]"#), + ]; + EditorTest::new().await.run_scripts(scripts).await; +} diff --git a/frontend/rust-lib/flowy-virtual-net/Cargo.toml b/frontend/rust-lib/flowy-virtual-net/Cargo.toml new file mode 100644 index 0000000000..cbc1b7ef87 --- /dev/null +++ b/frontend/rust-lib/flowy-virtual-net/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "flowy-virtual-net" +version = "0.1.0" +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +lib-ws = { path = "../../../shared-lib/lib-ws" } +lib-infra = { path = "../../../shared-lib/lib-infra" } +flowy-net = { path = "../flowy-net" } +bytes = { version = "1.0" } +parking_lot = "0.11" +tokio = {version = "1", features = ["sync"]} +tracing = { version = "0.1", features = ["log"] } + +# flowy-collaboration and dashmap would be optional +flowy-collaboration = { path = "../../../shared-lib/flowy-collaboration"} +dashmap = {version = "4.0"} + +[features] +flowy_unit_test = [] +http_server = [] \ No newline at end of file diff --git a/frontend/rust-lib/flowy-virtual-net/src/lib.rs b/frontend/rust-lib/flowy-virtual-net/src/lib.rs new file mode 100644 index 0000000000..8d1b774812 --- /dev/null +++ b/frontend/rust-lib/flowy-virtual-net/src/lib.rs @@ -0,0 +1,12 @@ +use flowy_net::services::ws::FlowyWebSocket; +use std::sync::Arc; +mod ws; + +#[cfg(not(feature = "flowy_unit_test"))] +pub fn local_web_socket() -> Arc { Arc::new(ws::LocalWebSocket::default()) } + +#[cfg(feature = "flowy_unit_test")] +mod mock; + +#[cfg(feature = "flowy_unit_test")] +pub fn local_web_socket() -> Arc { Arc::new(crate::mock::MockWebSocket::default()) } diff --git a/frontend/rust-lib/flowy-virtual-net/src/mock/mod.rs b/frontend/rust-lib/flowy-virtual-net/src/mock/mod.rs new file mode 100644 index 0000000000..2066369bbb --- /dev/null +++ b/frontend/rust-lib/flowy-virtual-net/src/mock/mod.rs @@ -0,0 +1,4 @@ +mod server; +mod ws_local; + +pub use ws_local::*; diff --git a/frontend/rust-lib/flowy-virtual-net/src/mock/server.rs b/frontend/rust-lib/flowy-virtual-net/src/mock/server.rs new file mode 100644 index 0000000000..25afd7f792 --- /dev/null +++ b/frontend/rust-lib/flowy-virtual-net/src/mock/server.rs @@ -0,0 +1,137 @@ +use bytes::Bytes; +use dashmap::DashMap; +use flowy_collaboration::{entities::prelude::*, errors::CollaborateError, sync::*}; +use flowy_net::services::ws::*; +use lib_infra::future::FutureResultSend; +use lib_ws::{WSModule, WebSocketRawMessage}; +use std::{ + convert::{TryFrom, TryInto}, + fmt::{Debug, Formatter}, + sync::Arc, +}; +use tokio::sync::mpsc; + +pub struct MockDocServer { + pub manager: Arc, +} + +impl std::default::Default for MockDocServer { + fn default() -> Self { + let persistence = Arc::new(MockDocServerPersistence::default()); + let manager = Arc::new(ServerDocumentManager::new(persistence)); + MockDocServer { manager } + } +} + +impl MockDocServer { + pub async fn handle_ws_data(&self, ws_data: DocumentClientWSData) -> Option> { + let bytes = Bytes::from(ws_data.data); + match ws_data.ty { + DocumentClientWSDataType::ClientPushRev => { + let revisions = RepeatedRevision::try_from(bytes).unwrap().into_inner(); + if revisions.is_empty() { + return None; + } + let first_revision = revisions.first().unwrap(); + let (tx, rx) = mpsc::channel(1); + let user = Arc::new(MockDocUser { + user_id: first_revision.user_id.clone(), + tx, + }); + self.manager.apply_revisions(user, revisions).await.unwrap(); + Some(rx) + }, + } + } +} + +struct MockDocServerPersistence { + inner: Arc>, +} + +impl Debug for MockDocServerPersistence { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("MockDocServerPersistence") } +} + +impl std::default::Default for MockDocServerPersistence { + fn default() -> Self { + MockDocServerPersistence { + inner: Arc::new(DashMap::new()), + } + } +} + +impl DocumentPersistence for MockDocServerPersistence { + fn read_doc(&self, doc_id: &str) -> FutureResultSend { + let inner = self.inner.clone(); + let doc_id = doc_id.to_owned(); + FutureResultSend::new(async move { + match inner.get(&doc_id) { + None => { + // + Err(CollaborateError::record_not_found()) + }, + Some(val) => { + // + Ok(val.value().clone()) + }, + } + }) + } + + fn create_doc(&self, revision: Revision) -> FutureResultSend { + FutureResultSend::new(async move { + let document_info: DocumentInfo = revision.try_into().unwrap(); + Ok(document_info) + }) + } + + fn get_revisions(&self, _doc_id: &str, _rev_ids: Vec) -> FutureResultSend, CollaborateError> { + unimplemented!() + } +} + +#[derive(Debug)] +struct MockDocUser { + user_id: String, + tx: mpsc::Sender, +} + +impl RevisionUser for MockDocUser { + fn user_id(&self) -> String { self.user_id.clone() } + + fn receive(&self, resp: SyncResponse) { + let sender = self.tx.clone(); + tokio::spawn(async move { + match resp { + SyncResponse::Pull(data) => { + let bytes: Bytes = data.try_into().unwrap(); + let msg = WebSocketRawMessage { + module: WSModule::Doc, + data: bytes.to_vec(), + }; + sender.send(msg).await.unwrap(); + }, + SyncResponse::Push(data) => { + let bytes: Bytes = data.try_into().unwrap(); + let msg = WebSocketRawMessage { + module: WSModule::Doc, + data: bytes.to_vec(), + }; + sender.send(msg).await.unwrap(); + }, + SyncResponse::Ack(data) => { + let bytes: Bytes = data.try_into().unwrap(); + let msg = WebSocketRawMessage { + module: WSModule::Doc, + data: bytes.to_vec(), + }; + sender.send(msg).await.unwrap(); + }, + SyncResponse::NewRevision(_) => { + // unimplemented!() + }, + } + }); + } +} diff --git a/frontend/rust-lib/flowy-virtual-net/src/mock/ws_local.rs b/frontend/rust-lib/flowy-virtual-net/src/mock/ws_local.rs new file mode 100644 index 0000000000..9c042c1bdb --- /dev/null +++ b/frontend/rust-lib/flowy-virtual-net/src/mock/ws_local.rs @@ -0,0 +1,95 @@ +use crate::mock::server::MockDocServer; +use bytes::Bytes; +use dashmap::DashMap; +use flowy_collaboration::entities::ws::*; +use flowy_net::services::ws::*; +use lib_infra::future::FutureResult; +use lib_ws::{WSModule, WebSocketRawMessage}; +use parking_lot::RwLock; +use std::{convert::TryFrom, sync::Arc}; +use tokio::sync::{broadcast, broadcast::Receiver}; + +pub struct MockWebSocket { + receivers: Arc>>, + state_sender: broadcast::Sender, + ws_sender: MockWSSender, + is_stop: Arc>, + server: Arc, +} + +impl std::default::Default for MockWebSocket { + fn default() -> Self { + let (state_sender, _) = broadcast::channel(16); + let (ws_sender, _) = broadcast::channel(16); + let server = Arc::new(MockDocServer::default()); + MockWebSocket { + receivers: Arc::new(DashMap::new()), + state_sender, + ws_sender: MockWSSender(ws_sender), + is_stop: Arc::new(RwLock::new(false)), + server, + } + } +} + +impl FlowyWebSocket for MockWebSocket { + fn start_connect(&self, _addr: String) -> FutureResult<(), FlowyError> { + *self.is_stop.write() = false; + + let mut ws_receiver = self.ws_sender.subscribe(); + let receivers = self.receivers.clone(); + let is_stop = self.is_stop.clone(); + let server = self.server.clone(); + tokio::spawn(async move { + while let Ok(message) = ws_receiver.recv().await { + if *is_stop.read() { + // do nothing + } else { + let ws_data = DocumentClientWSData::try_from(Bytes::from(message.data.clone())).unwrap(); + + if let Some(mut rx) = server.handle_ws_data(ws_data).await { + let new_ws_message = rx.recv().await.unwrap(); + match receivers.get(&new_ws_message.module) { + None => tracing::error!("Can't find any handler for message: {:?}", new_ws_message), + Some(handler) => handler.receive_message(new_ws_message.clone()), + } + } + } + } + }); + + FutureResult::new(async { Ok(()) }) + } + + fn stop_connect(&self) -> FutureResult<(), FlowyError> { + *self.is_stop.write() = true; + FutureResult::new(async { Ok(()) }) + } + + fn subscribe_connect_state(&self) -> Receiver { self.state_sender.subscribe() } + + fn reconnect(&self, _count: usize) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) } + + fn add_message_receiver(&self, handler: Arc) -> Result<(), FlowyError> { + self.receivers.insert(handler.source(), handler); + Ok(()) + } + + fn ws_sender(&self) -> Result, FlowyError> { Ok(Arc::new(self.ws_sender.clone())) } +} + +#[derive(Clone)] +pub struct MockWSSender(broadcast::Sender); + +impl FlowyWSSender for MockWSSender { + fn send(&self, msg: WebSocketRawMessage) -> Result<(), FlowyError> { + let _ = self.0.send(msg); + Ok(()) + } +} + +impl std::ops::Deref for MockWSSender { + type Target = broadcast::Sender; + + fn deref(&self) -> &Self::Target { &self.0 } +} diff --git a/frontend/rust-lib/flowy-virtual-net/src/ws/mod.rs b/frontend/rust-lib/flowy-virtual-net/src/ws/mod.rs new file mode 100644 index 0000000000..637a2655af --- /dev/null +++ b/frontend/rust-lib/flowy-virtual-net/src/ws/mod.rs @@ -0,0 +1,3 @@ +mod ws_local; + +pub use ws_local::*; diff --git a/frontend/rust-lib/flowy-net/src/services/ws/ws_local.rs b/frontend/rust-lib/flowy-virtual-net/src/ws/ws_local.rs similarity index 64% rename from frontend/rust-lib/flowy-net/src/services/ws/ws_local.rs rename to frontend/rust-lib/flowy-virtual-net/src/ws/ws_local.rs index 6f7d3c2eba..c50c415fc5 100644 --- a/frontend/rust-lib/flowy-net/src/services/ws/ws_local.rs +++ b/frontend/rust-lib/flowy-virtual-net/src/ws/ws_local.rs @@ -1,10 +1,10 @@ -use crate::services::ws::{ +use flowy_net::services::ws::{ FlowyError, + FlowyWSSender, FlowyWebSocket, - FlowyWsSender, WSConnectState, WSMessageReceiver, - WebScoketRawMessage, + WebSocketRawMessage, }; use lib_infra::future::FutureResult; use std::sync::Arc; @@ -12,7 +12,7 @@ use tokio::sync::{broadcast, broadcast::Receiver}; pub(crate) struct LocalWebSocket { state_sender: broadcast::Sender, - ws_sender: broadcast::Sender, + ws_sender: LocalWSSender, } impl std::default::Default for LocalWebSocket { @@ -21,12 +21,12 @@ impl std::default::Default for LocalWebSocket { let (ws_sender, _) = broadcast::channel(16); LocalWebSocket { state_sender, - ws_sender, + ws_sender: LocalWSSender(ws_sender), } } } -impl FlowyWebSocket for Arc { +impl FlowyWebSocket for LocalWebSocket { fn start_connect(&self, _addr: String) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) } fn stop_connect(&self) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) } @@ -37,12 +37,19 @@ impl FlowyWebSocket for Arc { fn add_message_receiver(&self, _handler: Arc) -> Result<(), FlowyError> { Ok(()) } - fn ws_sender(&self) -> Result, FlowyError> { Ok(Arc::new(self.ws_sender.clone())) } + fn ws_sender(&self) -> Result, FlowyError> { Ok(Arc::new(self.ws_sender.clone())) } } -impl FlowyWsSender for broadcast::Sender { - fn send(&self, msg: WebScoketRawMessage) -> Result<(), FlowyError> { - let _ = self.send(msg); +#[derive(Clone)] +pub struct LocalWSSender(broadcast::Sender); +impl FlowyWSSender for LocalWSSender { + fn send(&self, msg: WebSocketRawMessage) -> Result<(), FlowyError> { + let _ = self.0.send(msg); Ok(()) } } + +impl std::ops::Deref for LocalWSSender { + type Target = broadcast::Sender; + fn deref(&self) -> &Self::Target { &self.0 } +} diff --git a/shared-lib/flowy-collaboration/src/entities/mod.rs b/shared-lib/flowy-collaboration/src/entities/mod.rs index b79cf36fe5..4a077ba354 100644 --- a/shared-lib/flowy-collaboration/src/entities/mod.rs +++ b/shared-lib/flowy-collaboration/src/entities/mod.rs @@ -2,3 +2,7 @@ pub mod doc; pub mod parser; pub mod revision; pub mod ws; + +pub mod prelude { + pub use crate::entities::{doc::*, parser::*, revision::*, ws::*}; +} diff --git a/shared-lib/flowy-derive/src/derive_cache/derive_cache.rs b/shared-lib/flowy-derive/src/derive_cache/derive_cache.rs index 3c09280f91..77551f8c43 100644 --- a/shared-lib/flowy-derive/src/derive_cache/derive_cache.rs +++ b/shared-lib/flowy-derive/src/derive_cache/derive_cache.rs @@ -78,7 +78,7 @@ pub fn category_from_str(type_str: &str) -> TypeCategory { | "ExportRequest" | "ExportData" | "WSError" - | "WebScoketRawMessage" + | "WebSocketRawMessage" => TypeCategory::Protobuf, "WorkspaceEvent" | "WorkspaceNotification" diff --git a/shared-lib/lib-ws/src/msg.rs b/shared-lib/lib-ws/src/msg.rs index 5d1143ed4b..98baba8d43 100644 --- a/shared-lib/lib-ws/src/msg.rs +++ b/shared-lib/lib-ws/src/msg.rs @@ -4,7 +4,7 @@ use std::convert::TryInto; use tokio_tungstenite::tungstenite::Message as TokioMessage; #[derive(ProtoBuf, Debug, Clone, Default)] -pub struct WebScoketRawMessage { +pub struct WebSocketRawMessage { #[pb(index = 1)] pub module: WSModule, @@ -29,8 +29,8 @@ impl ToString for WSModule { } } -impl std::convert::From for TokioMessage { - fn from(msg: WebScoketRawMessage) -> Self { +impl std::convert::From for TokioMessage { + fn from(msg: WebSocketRawMessage) -> Self { let result: Result = msg.try_into(); match result { Ok(bytes) => TokioMessage::Binary(bytes.to_vec()), diff --git a/shared-lib/lib-ws/src/protobuf/model/msg.rs b/shared-lib/lib-ws/src/protobuf/model/msg.rs index 134a892c4e..835aeb0a3d 100644 --- a/shared-lib/lib-ws/src/protobuf/model/msg.rs +++ b/shared-lib/lib-ws/src/protobuf/model/msg.rs @@ -24,7 +24,7 @@ // const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_22_1; #[derive(PartialEq,Clone,Default)] -pub struct WebScoketRawMessage { +pub struct WebSocketRawMessage { // message fields pub module: WSModule, pub data: ::std::vec::Vec, @@ -33,14 +33,14 @@ pub struct WebScoketRawMessage { pub cached_size: ::protobuf::CachedSize, } -impl<'a> ::std::default::Default for &'a WebScoketRawMessage { - fn default() -> &'a WebScoketRawMessage { - ::default_instance() +impl<'a> ::std::default::Default for &'a WebSocketRawMessage { + fn default() -> &'a WebSocketRawMessage { + ::default_instance() } } -impl WebScoketRawMessage { - pub fn new() -> WebScoketRawMessage { +impl WebSocketRawMessage { + pub fn new() -> WebSocketRawMessage { ::std::default::Default::default() } @@ -86,7 +86,7 @@ impl WebScoketRawMessage { } } -impl ::protobuf::Message for WebScoketRawMessage { +impl ::protobuf::Message for WebSocketRawMessage { fn is_initialized(&self) -> bool { true } @@ -161,8 +161,8 @@ impl ::protobuf::Message for WebScoketRawMessage { Self::descriptor_static() } - fn new() -> WebScoketRawMessage { - WebScoketRawMessage::new() + fn new() -> WebSocketRawMessage { + WebSocketRawMessage::new() } fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor { @@ -171,29 +171,29 @@ impl ::protobuf::Message for WebScoketRawMessage { let mut fields = ::std::vec::Vec::new(); fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeEnum>( "module", - |m: &WebScoketRawMessage| { &m.module }, - |m: &mut WebScoketRawMessage| { &mut m.module }, + |m: &WebSocketRawMessage| { &m.module }, + |m: &mut WebSocketRawMessage| { &mut m.module }, )); fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeBytes>( "data", - |m: &WebScoketRawMessage| { &m.data }, - |m: &mut WebScoketRawMessage| { &mut m.data }, + |m: &WebSocketRawMessage| { &m.data }, + |m: &mut WebSocketRawMessage| { &mut m.data }, )); - ::protobuf::reflect::MessageDescriptor::new_pb_name::( - "WebScoketRawMessage", + ::protobuf::reflect::MessageDescriptor::new_pb_name::( + "WebSocketRawMessage", fields, file_descriptor_proto() ) }) } - fn default_instance() -> &'static WebScoketRawMessage { - static instance: ::protobuf::rt::LazyV2 = ::protobuf::rt::LazyV2::INIT; - instance.get(WebScoketRawMessage::new) + fn default_instance() -> &'static WebSocketRawMessage { + static instance: ::protobuf::rt::LazyV2 = ::protobuf::rt::LazyV2::INIT; + instance.get(WebSocketRawMessage::new) } } -impl ::protobuf::Clear for WebScoketRawMessage { +impl ::protobuf::Clear for WebSocketRawMessage { fn clear(&mut self) { self.module = WSModule::Doc; self.data.clear(); @@ -201,13 +201,13 @@ impl ::protobuf::Clear for WebScoketRawMessage { } } -impl ::std::fmt::Debug for WebScoketRawMessage { +impl ::std::fmt::Debug for WebSocketRawMessage { fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { ::protobuf::text_format::fmt(self, f) } } -impl ::protobuf::reflect::ProtobufValue for WebScoketRawMessage { +impl ::protobuf::reflect::ProtobufValue for WebSocketRawMessage { fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef { ::protobuf::reflect::ReflectValueRef::Message(self) } @@ -261,7 +261,7 @@ impl ::protobuf::reflect::ProtobufValue for WSModule { } static file_descriptor_proto_data: &'static [u8] = b"\ - \n\tmsg.proto\"L\n\x13WebScoketRawMessage\x12!\n\x06module\x18\x01\x20\ + \n\tmsg.proto\"L\n\x13WebSocketRawMessage\x12!\n\x06module\x18\x01\x20\ \x01(\x0e2\t.WSModuleR\x06module\x12\x12\n\x04data\x18\x02\x20\x01(\x0cR\ \x04data*\x13\n\x08WSModule\x12\x07\n\x03Doc\x10\0J\xd9\x01\n\x06\x12\ \x04\0\0\x08\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\ diff --git a/shared-lib/lib-ws/src/protobuf/proto/msg.proto b/shared-lib/lib-ws/src/protobuf/proto/msg.proto index 1d317c5a91..169cbe7ec8 100644 --- a/shared-lib/lib-ws/src/protobuf/proto/msg.proto +++ b/shared-lib/lib-ws/src/protobuf/proto/msg.proto @@ -1,6 +1,6 @@ syntax = "proto3"; -message WebScoketRawMessage { +message WebSocketRawMessage { WSModule module = 1; bytes data = 2; } diff --git a/shared-lib/lib-ws/src/ws.rs b/shared-lib/lib-ws/src/ws.rs index a45bbd4621..9824d42af5 100644 --- a/shared-lib/lib-ws/src/ws.rs +++ b/shared-lib/lib-ws/src/ws.rs @@ -3,7 +3,7 @@ use crate::{ connect::{WSConnectionFuture, WSStream}, errors::WSError, WSModule, - WebScoketRawMessage, + WebSocketRawMessage, }; use backend_service::errors::ServerError; use bytes::Bytes; @@ -34,7 +34,7 @@ type Handlers = DashMap>; pub trait WSMessageReceiver: Sync + Send + 'static { fn source(&self) -> WSModule; - fn receive_message(&self, msg: WebScoketRawMessage); + fn receive_message(&self, msg: WebSocketRawMessage); } pub struct WSController { @@ -175,7 +175,7 @@ impl WSHandlerFuture { fn handle_binary_message(&self, bytes: Vec) { let bytes = Bytes::from(bytes); - match WebScoketRawMessage::try_from(bytes) { + match WebSocketRawMessage::try_from(bytes) { Ok(message) => match self.handlers.get(&message.module) { None => log::error!("Can't find any handler for message: {:?}", message), Some(handler) => handler.receive_message(message.clone()), @@ -207,7 +207,7 @@ pub struct WSSender { } impl WSSender { - pub fn send_msg>(&self, msg: T) -> Result<(), WSError> { + pub fn send_msg>(&self, msg: T) -> Result<(), WSError> { let msg = msg.into(); let _ = self .ws_tx @@ -217,7 +217,7 @@ impl WSSender { } pub fn send_text(&self, source: &WSModule, text: &str) -> Result<(), WSError> { - let msg = WebScoketRawMessage { + let msg = WebSocketRawMessage { module: source.clone(), data: text.as_bytes().to_vec(), }; @@ -225,7 +225,7 @@ impl WSSender { } pub fn send_binary(&self, source: &WSModule, bytes: Vec) -> Result<(), WSError> { - let msg = WebScoketRawMessage { + let msg = WebSocketRawMessage { module: source.clone(), data: bytes, };