From 0092f1a35674645d9fbf0331ae11ecc9c6e7a4bd Mon Sep 17 00:00:00 2001 From: appflowy Date: Thu, 23 Sep 2021 19:59:58 +0800 Subject: [PATCH] WsDocumentMessage supports command/delta --- .../protobuf/flowy-document/revision.pb.dart | 14 +++ .../flowy-document/revision.pbjson.dart | 3 +- .../lib/protobuf/flowy-document/ws.pb.dart | 16 +-- .../protobuf/flowy-document/ws.pbenum.dart | 14 +-- .../protobuf/flowy-document/ws.pbjson.dart | 17 ++-- .../lib/protobuf/flowy-ws/msg.pb.dart | 16 +-- .../lib/protobuf/flowy-ws/msg.pbenum.dart | 12 +-- .../lib/protobuf/flowy-ws/msg.pbjson.dart | 14 +-- backend/Cargo.toml | 1 + backend/src/application.rs | 3 +- backend/src/service/doc/ws_handler.rs | 47 ++++++++- backend/src/service/mod.rs | 13 +-- backend/src/service/ws/biz_handler.rs | 17 ++-- backend/src/service/ws/ws_client.rs | 30 +++--- backend/tests/ws/helper.rs | 6 +- rust-lib/dart-ffi/Cargo.toml | 8 +- .../src/derive_cache/derive_cache.rs | 3 +- .../src/entities/doc/revision.rs | 6 +- rust-lib/flowy-document/src/entities/ws/ws.rs | 27 ++++- rust-lib/flowy-document/src/module.rs | 5 +- .../src/protobuf/model/revision.rs | 79 +++++++++++---- .../flowy-document/src/protobuf/model/ws.rs | 99 ++++++++++--------- .../src/protobuf/proto/revision.proto | 1 + .../src/protobuf/proto/ws.proto | 7 +- .../src/services/doc/doc_controller.rs | 11 +-- .../src/services/doc/edit_context.rs | 27 ++--- .../src/services/ws/ws_manager.rs | 47 ++++----- .../src/deps_resolve/document_deps.rs | 30 +++--- rust-lib/flowy-sqlite/src/pool.rs | 8 +- rust-lib/flowy-ws/src/msg.rs | 13 +-- rust-lib/flowy-ws/src/protobuf/model/msg.rs | 64 ++++++------ .../flowy-ws/src/protobuf/proto/msg.proto | 4 +- rust-lib/flowy-ws/src/ws.rs | 24 ++--- 33 files changed, 400 insertions(+), 286 deletions(-) diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pb.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pb.dart index 152a95f90c..ff76015f39 100644 --- a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pb.dart +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pb.dart @@ -16,6 +16,7 @@ class Revision extends $pb.GeneratedMessage { ..aInt64(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'revId') ..a<$core.List<$core.int>>(3, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'delta', $pb.PbFieldType.OY) ..aOS(4, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'md5') + ..aOS(5, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'docId') ..hasRequiredFields = false ; @@ -25,6 +26,7 @@ class Revision extends $pb.GeneratedMessage { $fixnum.Int64? revId, $core.List<$core.int>? delta, $core.String? md5, + $core.String? docId, }) { final _result = create(); if (baseRevId != null) { @@ -39,6 +41,9 @@ class Revision extends $pb.GeneratedMessage { if (md5 != null) { _result.md5 = md5; } + if (docId != null) { + _result.docId = docId; + } return _result; } factory Revision.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r); @@ -97,5 +102,14 @@ class Revision extends $pb.GeneratedMessage { $core.bool hasMd5() => $_has(3); @$pb.TagNumber(4) void clearMd5() => clearField(4); + + @$pb.TagNumber(5) + $core.String get docId => $_getSZ(4); + @$pb.TagNumber(5) + set docId($core.String v) { $_setString(4, v); } + @$pb.TagNumber(5) + $core.bool hasDocId() => $_has(4); + @$pb.TagNumber(5) + void clearDocId() => clearField(5); } diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pbjson.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pbjson.dart index ef2e3b1275..9aa97e3d6e 100644 --- a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pbjson.dart +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pbjson.dart @@ -16,8 +16,9 @@ const Revision$json = const { const {'1': 'rev_id', '3': 2, '4': 1, '5': 3, '10': 'revId'}, const {'1': 'delta', '3': 3, '4': 1, '5': 12, '10': 'delta'}, const {'1': 'md5', '3': 4, '4': 1, '5': 9, '10': 'md5'}, + const {'1': 'doc_id', '3': 5, '4': 1, '5': 9, '10': 'docId'}, ], }; /// Descriptor for `Revision`. Decode as a `google.protobuf.DescriptorProto`. -final $typed_data.Uint8List revisionDescriptor = $convert.base64Decode('CghSZXZpc2lvbhIeCgtiYXNlX3Jldl9pZBgBIAEoA1IJYmFzZVJldklkEhUKBnJldl9pZBgCIAEoA1IFcmV2SWQSFAoFZGVsdGEYAyABKAxSBWRlbHRhEhAKA21kNRgEIAEoCVIDbWQ1'); +final $typed_data.Uint8List revisionDescriptor = $convert.base64Decode('CghSZXZpc2lvbhIeCgtiYXNlX3Jldl9pZBgBIAEoA1IJYmFzZVJldklkEhUKBnJldl9pZBgCIAEoA1IFcmV2SWQSFAoFZGVsdGEYAyABKAxSBWRlbHRhEhAKA21kNRgEIAEoCVIDbWQ1EhUKBmRvY19pZBgFIAEoCVIFZG9jSWQ='); diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pb.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pb.dart index a1c30e56f4..8761104933 100644 --- a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pb.dart +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pb.dart @@ -16,7 +16,7 @@ export 'ws.pbenum.dart'; class WsDocumentData extends $pb.GeneratedMessage { static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'WsDocumentData', createEmptyInstance: create) ..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'id') - ..e(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'source', $pb.PbFieldType.OE, defaultOrMaker: WsSource.Delta, valueOf: WsSource.valueOf, enumValues: WsSource.values) + ..e(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'ty', $pb.PbFieldType.OE, defaultOrMaker: WsDataType.Command, valueOf: WsDataType.valueOf, enumValues: WsDataType.values) ..a<$core.List<$core.int>>(3, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'data', $pb.PbFieldType.OY) ..hasRequiredFields = false ; @@ -24,15 +24,15 @@ class WsDocumentData extends $pb.GeneratedMessage { WsDocumentData._() : super(); factory WsDocumentData({ $core.String? id, - WsSource? source, + WsDataType? ty, $core.List<$core.int>? data, }) { final _result = create(); if (id != null) { _result.id = id; } - if (source != null) { - _result.source = source; + if (ty != null) { + _result.ty = ty; } if (data != null) { _result.data = data; @@ -70,13 +70,13 @@ class WsDocumentData extends $pb.GeneratedMessage { void clearId() => clearField(1); @$pb.TagNumber(2) - WsSource get source => $_getN(1); + WsDataType get ty => $_getN(1); @$pb.TagNumber(2) - set source(WsSource v) { setField(2, v); } + set ty(WsDataType v) { setField(2, v); } @$pb.TagNumber(2) - $core.bool hasSource() => $_has(1); + $core.bool hasTy() => $_has(1); @$pb.TagNumber(2) - void clearSource() => clearField(2); + void clearTy() => clearField(2); @$pb.TagNumber(3) $core.List<$core.int> get data => $_getN(2); diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pbenum.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pbenum.dart index 7feddf6b0e..b862c5a613 100644 --- a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pbenum.dart +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pbenum.dart @@ -9,16 +9,18 @@ import 'dart:core' as $core; import 'package:protobuf/protobuf.dart' as $pb; -class WsSource extends $pb.ProtobufEnum { - static const WsSource Delta = WsSource._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Delta'); +class WsDataType extends $pb.ProtobufEnum { + static const WsDataType Command = WsDataType._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Command'); + static const WsDataType Delta = WsDataType._(1, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Delta'); - static const $core.List values = [ + static const $core.List values = [ + Command, Delta, ]; - static final $core.Map<$core.int, WsSource> _byValue = $pb.ProtobufEnum.initByValue(values); - static WsSource? valueOf($core.int value) => _byValue[value]; + static final $core.Map<$core.int, WsDataType> _byValue = $pb.ProtobufEnum.initByValue(values); + static WsDataType? valueOf($core.int value) => _byValue[value]; - const WsSource._($core.int v, $core.String n) : super(v, n); + const WsDataType._($core.int v, $core.String n) : super(v, n); } diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pbjson.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pbjson.dart index 494a270516..a63f7a0ed2 100644 --- a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pbjson.dart +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pbjson.dart @@ -8,25 +8,26 @@ import 'dart:core' as $core; import 'dart:convert' as $convert; import 'dart:typed_data' as $typed_data; -@$core.Deprecated('Use wsSourceDescriptor instead') -const WsSource$json = const { - '1': 'WsSource', +@$core.Deprecated('Use wsDataTypeDescriptor instead') +const WsDataType$json = const { + '1': 'WsDataType', '2': const [ - const {'1': 'Delta', '2': 0}, + const {'1': 'Command', '2': 0}, + const {'1': 'Delta', '2': 1}, ], }; -/// Descriptor for `WsSource`. Decode as a `google.protobuf.EnumDescriptorProto`. -final $typed_data.Uint8List wsSourceDescriptor = $convert.base64Decode('CghXc1NvdXJjZRIJCgVEZWx0YRAA'); +/// Descriptor for `WsDataType`. Decode as a `google.protobuf.EnumDescriptorProto`. +final $typed_data.Uint8List wsDataTypeDescriptor = $convert.base64Decode('CgpXc0RhdGFUeXBlEgsKB0NvbW1hbmQQABIJCgVEZWx0YRAB'); @$core.Deprecated('Use wsDocumentDataDescriptor instead') const WsDocumentData$json = const { '1': 'WsDocumentData', '2': const [ const {'1': 'id', '3': 1, '4': 1, '5': 9, '10': 'id'}, - const {'1': 'source', '3': 2, '4': 1, '5': 14, '6': '.WsSource', '10': 'source'}, + const {'1': 'ty', '3': 2, '4': 1, '5': 14, '6': '.WsDataType', '10': 'ty'}, const {'1': 'data', '3': 3, '4': 1, '5': 12, '10': 'data'}, ], }; /// Descriptor for `WsDocumentData`. Decode as a `google.protobuf.DescriptorProto`. -final $typed_data.Uint8List wsDocumentDataDescriptor = $convert.base64Decode('Cg5Xc0RvY3VtZW50RGF0YRIOCgJpZBgBIAEoCVICaWQSIQoGc291cmNlGAIgASgOMgkuV3NTb3VyY2VSBnNvdXJjZRISCgRkYXRhGAMgASgMUgRkYXRh'); +final $typed_data.Uint8List wsDocumentDataDescriptor = $convert.base64Decode('Cg5Xc0RvY3VtZW50RGF0YRIOCgJpZBgBIAEoCVICaWQSGwoCdHkYAiABKA4yCy5Xc0RhdGFUeXBlUgJ0eRISCgRkYXRhGAMgASgMUgRkYXRh'); diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/msg.pb.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/msg.pb.dart index 1b198075a5..0c07e01026 100644 --- a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/msg.pb.dart +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/msg.pb.dart @@ -15,19 +15,19 @@ export 'msg.pbenum.dart'; class WsMessage extends $pb.GeneratedMessage { static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'WsMessage', createEmptyInstance: create) - ..e(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'source', $pb.PbFieldType.OE, defaultOrMaker: WsSource.Doc, valueOf: WsSource.valueOf, enumValues: WsSource.values) + ..e(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'module', $pb.PbFieldType.OE, defaultOrMaker: WsModule.Doc, valueOf: WsModule.valueOf, enumValues: WsModule.values) ..a<$core.List<$core.int>>(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'data', $pb.PbFieldType.OY) ..hasRequiredFields = false ; WsMessage._() : super(); factory WsMessage({ - WsSource? source, + WsModule? module, $core.List<$core.int>? data, }) { final _result = create(); - if (source != null) { - _result.source = source; + if (module != null) { + _result.module = module; } if (data != null) { _result.data = data; @@ -56,13 +56,13 @@ class WsMessage extends $pb.GeneratedMessage { static WsMessage? _defaultInstance; @$pb.TagNumber(1) - WsSource get source => $_getN(0); + WsModule get module => $_getN(0); @$pb.TagNumber(1) - set source(WsSource v) { setField(1, v); } + set module(WsModule v) { setField(1, v); } @$pb.TagNumber(1) - $core.bool hasSource() => $_has(0); + $core.bool hasModule() => $_has(0); @$pb.TagNumber(1) - void clearSource() => clearField(1); + void clearModule() => clearField(1); @$pb.TagNumber(2) $core.List<$core.int> get data => $_getN(1); diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/msg.pbenum.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/msg.pbenum.dart index bd39d1cbd7..dddbfe6eaa 100644 --- a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/msg.pbenum.dart +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/msg.pbenum.dart @@ -9,16 +9,16 @@ import 'dart:core' as $core; import 'package:protobuf/protobuf.dart' as $pb; -class WsSource extends $pb.ProtobufEnum { - static const WsSource Doc = WsSource._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Doc'); +class WsModule extends $pb.ProtobufEnum { + static const WsModule Doc = WsModule._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Doc'); - static const $core.List values = [ + static const $core.List values = [ Doc, ]; - static final $core.Map<$core.int, WsSource> _byValue = $pb.ProtobufEnum.initByValue(values); - static WsSource? valueOf($core.int value) => _byValue[value]; + static final $core.Map<$core.int, WsModule> _byValue = $pb.ProtobufEnum.initByValue(values); + static WsModule? valueOf($core.int value) => _byValue[value]; - const WsSource._($core.int v, $core.String n) : super(v, n); + const WsModule._($core.int v, $core.String n) : super(v, n); } diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/msg.pbjson.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/msg.pbjson.dart index 6f0c3e285e..13899cc203 100644 --- a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/msg.pbjson.dart +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/msg.pbjson.dart @@ -8,24 +8,24 @@ import 'dart:core' as $core; import 'dart:convert' as $convert; import 'dart:typed_data' as $typed_data; -@$core.Deprecated('Use wsSourceDescriptor instead') -const WsSource$json = const { - '1': 'WsSource', +@$core.Deprecated('Use wsModuleDescriptor instead') +const WsModule$json = const { + '1': 'WsModule', '2': const [ const {'1': 'Doc', '2': 0}, ], }; -/// Descriptor for `WsSource`. Decode as a `google.protobuf.EnumDescriptorProto`. -final $typed_data.Uint8List wsSourceDescriptor = $convert.base64Decode('CghXc1NvdXJjZRIHCgNEb2MQAA=='); +/// Descriptor for `WsModule`. Decode as a `google.protobuf.EnumDescriptorProto`. +final $typed_data.Uint8List wsModuleDescriptor = $convert.base64Decode('CghXc01vZHVsZRIHCgNEb2MQAA=='); @$core.Deprecated('Use wsMessageDescriptor instead') const WsMessage$json = const { '1': 'WsMessage', '2': const [ - const {'1': 'source', '3': 1, '4': 1, '5': 14, '6': '.WsSource', '10': 'source'}, + const {'1': 'module', '3': 1, '4': 1, '5': 14, '6': '.WsModule', '10': 'module'}, const {'1': 'data', '3': 2, '4': 1, '5': 12, '10': 'data'}, ], }; /// Descriptor for `WsMessage`. Decode as a `google.protobuf.DescriptorProto`. -final $typed_data.Uint8List wsMessageDescriptor = $convert.base64Decode('CglXc01lc3NhZ2USIQoGc291cmNlGAEgASgOMgkuV3NTb3VyY2VSBnNvdXJjZRISCgRkYXRhGAIgASgMUgRkYXRh'); +final $typed_data.Uint8List wsMessageDescriptor = $convert.base64Decode('CglXc01lc3NhZ2USIQoGbW9kdWxlGAEgASgOMgkuV3NNb2R1bGVSBm1vZHVsZRISCgRkYXRhGAIgASgMUgRkYXRh'); diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 03d8069fb9..2c9571cd95 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -53,6 +53,7 @@ jsonwebtoken = "7.2" sql-builder = "3.1.1" lazy_static = "1.4" tokio = { version = "1", features = ["full"] } +parking_lot = "0.11" flowy-user = { path = "../rust-lib/flowy-user" } flowy-workspace = { path = "../rust-lib/flowy-workspace" } diff --git a/backend/src/application.rs b/backend/src/application.rs index ad1ea4e15c..fa05a3066f 100644 --- a/backend/src/application.rs +++ b/backend/src/application.rs @@ -24,7 +24,6 @@ use crate::{ ws::WSServer, }, }; -use flowy_ws::WsSource; pub struct Application { port: u16, @@ -55,7 +54,7 @@ pub fn run(listener: TcpListener, app_ctx: AppContext) -> Result, + edit_docs: DashMap>>, +} impl DocWsBizHandler { - pub fn new() -> Self { Self {} } + pub fn new(pg_pool: Data) -> Self { + Self { + edit_docs: DashMap::new(), + pg_pool, + } + } } impl WsBizHandler for DocWsBizHandler { fn receive_data(&self, data: Bytes) { - let revision: Revision = parse_from_bytes(&data).unwrap(); - log::warn!("{:?}", revision); + let document_data: WsDocumentData = parse_from_bytes(&data).unwrap(); + match document_data.ty { + WsDataType::Command => {}, + WsDataType::Delta => { + let revision: Revision = parse_from_bytes(&document_data.data).unwrap(); + log::warn!("{:?}", revision); + }, + } } } + +pub struct EditDoc { + doc_id: String, + document: Document, +} diff --git a/backend/src/service/mod.rs b/backend/src/service/mod.rs index 2d45c748e6..8dcc7d066b 100644 --- a/backend/src/service/mod.rs +++ b/backend/src/service/mod.rs @@ -1,7 +1,8 @@ use crate::service::{doc::ws_handler::DocWsBizHandler, ws::WsBizHandlers}; -use flowy_ws::WsSource; +use actix_web::web::Data; +use flowy_ws::WsModule; +use sqlx::PgPool; use std::sync::Arc; -use tokio::sync::RwLock; pub mod app; pub mod doc; @@ -12,15 +13,15 @@ pub mod view; pub mod workspace; pub mod ws; -pub fn make_ws_biz_handlers() -> WsBizHandlers { +pub fn make_ws_biz_handlers(pg_pool: Data) -> WsBizHandlers { let mut ws_biz_handlers = WsBizHandlers::new(); // doc - let doc_biz_handler = DocWsBizHandler::new(); - ws_biz_handlers.register(WsSource::Doc, wrap(doc_biz_handler)); + let doc_biz_handler = DocWsBizHandler::new(pg_pool); + ws_biz_handlers.register(WsModule::Doc, wrap(doc_biz_handler)); // ws_biz_handlers } -fn wrap(val: T) -> Arc> { Arc::new(RwLock::new(val)) } +fn wrap(val: T) -> Arc { Arc::new(val) } diff --git a/backend/src/service/ws/biz_handler.rs b/backend/src/service/ws/biz_handler.rs index 1252d2f2c7..16400f9fe9 100644 --- a/backend/src/service/ws/biz_handler.rs +++ b/backend/src/service/ws/biz_handler.rs @@ -1,31 +1,28 @@ use bytes::Bytes; -use dashmap::{mapref::one::Ref, DashMap}; -use flowy_ws::WsSource; -use std::sync::Arc; -use tokio::sync::RwLock; +use flowy_ws::WsModule; +use std::{collections::HashMap, sync::Arc}; pub trait WsBizHandler: Send + Sync { fn receive_data(&self, data: Bytes); } -pub type BizHandler = Arc>; - +pub type BizHandler = Arc; pub struct WsBizHandlers { - inner: DashMap, + inner: HashMap, } impl WsBizHandlers { pub fn new() -> Self { Self { - inner: DashMap::new(), + inner: HashMap::new(), } } - pub fn register(&self, source: WsSource, handler: BizHandler) { + pub fn register(&mut self, source: WsModule, handler: BizHandler) { self.inner.insert(source, handler); } - pub fn get(&self, source: &WsSource) -> Option { + pub fn get(&self, source: &WsModule) -> Option { match self.inner.get(source) { None => None, Some(handler) => Some(handler.clone()), diff --git a/backend/src/service/ws/ws_client.rs b/backend/src/service/ws/ws_client.rs index d63f87569f..64538c538e 100644 --- a/backend/src/service/ws/ws_client.rs +++ b/backend/src/service/ws/ws_client.rs @@ -9,13 +9,12 @@ use crate::{ WsBizHandlers, }, }; -use actix::{fut::wrap_future, *}; +use actix::*; use actix_web::web::Data; use actix_web_actors::{ws, ws::Message::Text}; use bytes::Bytes; -use flowy_ws::{WsMessage, WsSource}; -use std::{convert::TryFrom, pin::Pin, time::Instant}; -use tokio::sync::RwLock; +use flowy_ws::WsMessage; +use std::{convert::TryFrom, time::Instant}; pub struct WSClient { session_id: SessionId, @@ -55,18 +54,16 @@ impl WSClient { let msg = ClientMessage::new(self.session_id.clone(), data); self.server.do_send(msg); } -} -async fn handle_binary_message(biz_handlers: Data, bytes: Bytes) { - let message: WsMessage = WsMessage::try_from(bytes).unwrap(); - match biz_handlers.get(&message.source) { - None => { - log::error!("Can't find the handler for {:?}", message.source); - }, - Some(handler) => handler - .write() - .await - .receive_data(Bytes::from(message.data)), + fn handle_binary_message(&self, bytes: Bytes) { + // TODO: ok to unwrap? + let message: WsMessage = WsMessage::try_from(bytes).unwrap(); + match self.biz_handlers.get(&message.module) { + None => { + log::error!("Can't find the handler for {:?}", message.module); + }, + Some(handler) => handler.receive_data(Bytes::from(message.data)), + } } } @@ -83,8 +80,7 @@ impl StreamHandler> for WSClient { }, Ok(ws::Message::Binary(bytes)) => { log::debug!(" Receive {} binary", &self.session_id); - let biz_handlers = self.biz_handlers.clone(); - ctx.spawn(wrap_future(handle_binary_message(biz_handlers, bytes))); + self.handle_binary_message(bytes); }, Ok(Text(_)) => { log::warn!("Receive unexpected text message"); diff --git a/backend/tests/ws/helper.rs b/backend/tests/ws/helper.rs index 2bbbf6651f..19d286a061 100644 --- a/backend/tests/ws/helper.rs +++ b/backend/tests/ws/helper.rs @@ -1,5 +1,5 @@ use crate::helper::TestServer; -use flowy_ws::{WsController, WsSender, WsState}; +use flowy_ws::{WsController, WsModule, WsSender, WsState}; use parking_lot::RwLock; use std::sync::Arc; @@ -26,7 +26,7 @@ impl WsTest { WsScriptRunner { scripts: scripts.clone(), sender: sender.clone(), - source: "editor".to_owned(), + source: WsModule::Doc, } .run(); }, @@ -54,7 +54,7 @@ impl WsTest { struct WsScriptRunner { scripts: Vec, sender: Arc, - source: String, + source: WsModule, } impl WsScriptRunner { diff --git a/rust-lib/dart-ffi/Cargo.toml b/rust-lib/dart-ffi/Cargo.toml index dbaf8eb045..a3c26b0054 100644 --- a/rust-lib/dart-ffi/Cargo.toml +++ b/rust-lib/dart-ffi/Cargo.toml @@ -7,11 +7,11 @@ edition = "2018" [lib] name = "dart_ffi" # this value will change depending on the target os -# for iOS it would be `rlib` -# for Macos it would be `rlib` +# for iOS it would be `cdylib` +# for Macos it would be `cdylib` # for android it would be `c-dylib` -# default rlib -crate-type = ["rlib"] +# default cdylib +crate-type = ["cdylib"] [dependencies] diff --git a/rust-lib/flowy-derive/src/derive_cache/derive_cache.rs b/rust-lib/flowy-derive/src/derive_cache/derive_cache.rs index fbdde1ed88..cdd4f009e8 100644 --- a/rust-lib/flowy-derive/src/derive_cache/derive_cache.rs +++ b/rust-lib/flowy-derive/src/derive_cache/derive_cache.rs @@ -81,7 +81,8 @@ pub fn category_from_str(type_str: &str) -> TypeCategory { | "WorkspaceEvent" | "ErrorCode" | "WorkspaceObservable" - | "WsSource" + | "WsModule" + | "WsDataType" | "DocObservable" | "FFIStatusCode" | "UserEvent" diff --git a/rust-lib/flowy-document/src/entities/doc/revision.rs b/rust-lib/flowy-document/src/entities/doc/revision.rs index b89ca8ccc7..f772d5ae1f 100644 --- a/rust-lib/flowy-document/src/entities/doc/revision.rs +++ b/rust-lib/flowy-document/src/entities/doc/revision.rs @@ -13,15 +13,19 @@ pub struct Revision { #[pb(index = 4)] pub md5: String, + + #[pb(index = 5)] + pub doc_id: String, } impl Revision { - pub fn new(base_rev_id: i64, rev_id: i64, delta: Vec, md5: String) -> Revision { + pub fn new(base_rev_id: i64, rev_id: i64, delta: Vec, md5: String, doc_id: String) -> Revision { Self { base_rev_id, rev_id, delta, md5, + doc_id, } } } diff --git a/rust-lib/flowy-document/src/entities/ws/ws.rs b/rust-lib/flowy-document/src/entities/ws/ws.rs index 3785d5c1b0..3c9ee911ed 100644 --- a/rust-lib/flowy-document/src/entities/ws/ws.rs +++ b/rust-lib/flowy-document/src/entities/ws/ws.rs @@ -1,12 +1,16 @@ +use crate::entities::doc::Revision; +use bytes::Bytes; use flowy_derive::{ProtoBuf, ProtoBuf_Enum}; +use std::convert::TryInto; #[derive(Debug, Clone, ProtoBuf_Enum, Eq, PartialEq, Hash)] -pub enum WsSource { - Delta = 0, +pub enum WsDataType { + Command = 0, + Delta = 1, } -impl std::default::Default for WsSource { - fn default() -> Self { WsSource::Delta } +impl std::default::Default for WsDataType { + fn default() -> Self { WsDataType::Command } } #[derive(ProtoBuf, Default, Debug, Clone)] @@ -15,8 +19,21 @@ pub struct WsDocumentData { pub id: String, #[pb(index = 2)] - pub source: WsSource, + pub ty: WsDataType, #[pb(index = 3)] pub data: Vec, // Delta } + +impl std::convert::From for WsDocumentData { + fn from(revision: Revision) -> Self { + let id = revision.doc_id.clone(); + let bytes: Bytes = revision.try_into().unwrap(); + let data = bytes.to_vec(); + Self { + id, + ty: WsDataType::Delta, + data, + } + } +} diff --git a/rust-lib/flowy-document/src/module.rs b/rust-lib/flowy-document/src/module.rs index 58aaa64ead..f6d3c8e5fa 100644 --- a/rust-lib/flowy-document/src/module.rs +++ b/rust-lib/flowy-document/src/module.rs @@ -1,6 +1,5 @@ use std::sync::Arc; - use diesel::SqliteConnection; use parking_lot::RwLock; @@ -9,7 +8,7 @@ use flowy_database::ConnectionPool; use crate::{ entities::doc::{CreateDocParams, Doc, DocDelta, QueryDocParams}, errors::DocError, - services::{doc::doc_controller::DocController, server::construct_doc_server, ws::WsManager}, + services::{doc::doc_controller::DocController, server::construct_doc_server, ws::WsDocumentManager}, }; pub trait DocumentUser: Send + Sync { @@ -23,7 +22,7 @@ pub struct FlowyDocument { } impl FlowyDocument { - pub fn new(user: Arc, ws_manager: Arc>) -> FlowyDocument { + pub fn new(user: Arc, ws_manager: Arc>) -> FlowyDocument { let server = construct_doc_server(); let controller = Arc::new(DocController::new(server.clone(), user.clone(), ws_manager.clone())); Self { doc_ctrl: controller } diff --git a/rust-lib/flowy-document/src/protobuf/model/revision.rs b/rust-lib/flowy-document/src/protobuf/model/revision.rs index 5e18f843dd..556ae12536 100644 --- a/rust-lib/flowy-document/src/protobuf/model/revision.rs +++ b/rust-lib/flowy-document/src/protobuf/model/revision.rs @@ -30,6 +30,7 @@ pub struct Revision { pub rev_id: i64, pub delta: ::std::vec::Vec, pub md5: ::std::string::String, + pub doc_id: ::std::string::String, // special fields pub unknown_fields: ::protobuf::UnknownFields, pub cached_size: ::protobuf::CachedSize, @@ -127,6 +128,32 @@ impl Revision { pub fn take_md5(&mut self) -> ::std::string::String { ::std::mem::replace(&mut self.md5, ::std::string::String::new()) } + + // string doc_id = 5; + + + pub fn get_doc_id(&self) -> &str { + &self.doc_id + } + pub fn clear_doc_id(&mut self) { + self.doc_id.clear(); + } + + // Param is passed by value, moved + pub fn set_doc_id(&mut self, v: ::std::string::String) { + self.doc_id = v; + } + + // Mutable pointer to the field. + // If field is not initialized, it is initialized with default value first. + pub fn mut_doc_id(&mut self) -> &mut ::std::string::String { + &mut self.doc_id + } + + // Take field + pub fn take_doc_id(&mut self) -> ::std::string::String { + ::std::mem::replace(&mut self.doc_id, ::std::string::String::new()) + } } impl ::protobuf::Message for Revision { @@ -158,6 +185,9 @@ impl ::protobuf::Message for Revision { 4 => { ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.md5)?; }, + 5 => { + ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.doc_id)?; + }, _ => { ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?; }, @@ -182,6 +212,9 @@ impl ::protobuf::Message for Revision { if !self.md5.is_empty() { my_size += ::protobuf::rt::string_size(4, &self.md5); } + if !self.doc_id.is_empty() { + my_size += ::protobuf::rt::string_size(5, &self.doc_id); + } my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields()); self.cached_size.set(my_size); my_size @@ -200,6 +233,9 @@ impl ::protobuf::Message for Revision { if !self.md5.is_empty() { os.write_string(4, &self.md5)?; } + if !self.doc_id.is_empty() { + os.write_string(5, &self.doc_id)?; + } os.write_unknown_fields(self.get_unknown_fields())?; ::std::result::Result::Ok(()) } @@ -258,6 +294,11 @@ impl ::protobuf::Message for Revision { |m: &Revision| { &m.md5 }, |m: &mut Revision| { &mut m.md5 }, )); + fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>( + "doc_id", + |m: &Revision| { &m.doc_id }, + |m: &mut Revision| { &mut m.doc_id }, + )); ::protobuf::reflect::MessageDescriptor::new_pb_name::( "Revision", fields, @@ -278,6 +319,7 @@ impl ::protobuf::Clear for Revision { self.rev_id = 0; self.delta.clear(); self.md5.clear(); + self.doc_id.clear(); self.unknown_fields.clear(); } } @@ -295,23 +337,26 @@ impl ::protobuf::reflect::ProtobufValue for Revision { } static file_descriptor_proto_data: &'static [u8] = b"\ - \n\x0erevision.proto\"i\n\x08Revision\x12\x1e\n\x0bbase_rev_id\x18\x01\ - \x20\x01(\x03R\tbaseRevId\x12\x15\n\x06rev_id\x18\x02\x20\x01(\x03R\x05r\ - evId\x12\x14\n\x05delta\x18\x03\x20\x01(\x0cR\x05delta\x12\x10\n\x03md5\ - \x18\x04\x20\x01(\tR\x03md5J\x86\x02\n\x06\x12\x04\0\0\x07\x01\n\x08\n\ - \x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x07\x01\n\n\n\x03\ - \x04\0\x01\x12\x03\x02\x08\x10\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\ - \x1a\n\x0c\n\x05\x04\0\x02\0\x05\x12\x03\x03\x04\t\n\x0c\n\x05\x04\0\x02\ - \0\x01\x12\x03\x03\n\x15\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x18\x19\ - \n\x0b\n\x04\x04\0\x02\x01\x12\x03\x04\x04\x15\n\x0c\n\x05\x04\0\x02\x01\ - \x05\x12\x03\x04\x04\t\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x04\n\x10\n\ - \x0c\n\x05\x04\0\x02\x01\x03\x12\x03\x04\x13\x14\n\x0b\n\x04\x04\0\x02\ - \x02\x12\x03\x05\x04\x14\n\x0c\n\x05\x04\0\x02\x02\x05\x12\x03\x05\x04\t\ - \n\x0c\n\x05\x04\0\x02\x02\x01\x12\x03\x05\n\x0f\n\x0c\n\x05\x04\0\x02\ - \x02\x03\x12\x03\x05\x12\x13\n\x0b\n\x04\x04\0\x02\x03\x12\x03\x06\x04\ - \x13\n\x0c\n\x05\x04\0\x02\x03\x05\x12\x03\x06\x04\n\n\x0c\n\x05\x04\0\ - \x02\x03\x01\x12\x03\x06\x0b\x0e\n\x0c\n\x05\x04\0\x02\x03\x03\x12\x03\ - \x06\x11\x12b\x06proto3\ + \n\x0erevision.proto\"\x80\x01\n\x08Revision\x12\x1e\n\x0bbase_rev_id\ + \x18\x01\x20\x01(\x03R\tbaseRevId\x12\x15\n\x06rev_id\x18\x02\x20\x01(\ + \x03R\x05revId\x12\x14\n\x05delta\x18\x03\x20\x01(\x0cR\x05delta\x12\x10\ + \n\x03md5\x18\x04\x20\x01(\tR\x03md5\x12\x15\n\x06doc_id\x18\x05\x20\x01\ + (\tR\x05docIdJ\xbd\x02\n\x06\x12\x04\0\0\x08\x01\n\x08\n\x01\x0c\x12\x03\ + \0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x08\x01\n\n\n\x03\x04\0\x01\x12\ + \x03\x02\x08\x10\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\x1a\n\x0c\n\x05\ + \x04\0\x02\0\x05\x12\x03\x03\x04\t\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\ + \x03\n\x15\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x18\x19\n\x0b\n\x04\ + \x04\0\x02\x01\x12\x03\x04\x04\x15\n\x0c\n\x05\x04\0\x02\x01\x05\x12\x03\ + \x04\x04\t\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x04\n\x10\n\x0c\n\x05\ + \x04\0\x02\x01\x03\x12\x03\x04\x13\x14\n\x0b\n\x04\x04\0\x02\x02\x12\x03\ + \x05\x04\x14\n\x0c\n\x05\x04\0\x02\x02\x05\x12\x03\x05\x04\t\n\x0c\n\x05\ + \x04\0\x02\x02\x01\x12\x03\x05\n\x0f\n\x0c\n\x05\x04\0\x02\x02\x03\x12\ + \x03\x05\x12\x13\n\x0b\n\x04\x04\0\x02\x03\x12\x03\x06\x04\x13\n\x0c\n\ + \x05\x04\0\x02\x03\x05\x12\x03\x06\x04\n\n\x0c\n\x05\x04\0\x02\x03\x01\ + \x12\x03\x06\x0b\x0e\n\x0c\n\x05\x04\0\x02\x03\x03\x12\x03\x06\x11\x12\n\ + \x0b\n\x04\x04\0\x02\x04\x12\x03\x07\x04\x16\n\x0c\n\x05\x04\0\x02\x04\ + \x05\x12\x03\x07\x04\n\n\x0c\n\x05\x04\0\x02\x04\x01\x12\x03\x07\x0b\x11\ + \n\x0c\n\x05\x04\0\x02\x04\x03\x12\x03\x07\x14\x15b\x06proto3\ "; static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; diff --git a/rust-lib/flowy-document/src/protobuf/model/ws.rs b/rust-lib/flowy-document/src/protobuf/model/ws.rs index 579166561f..3b83649d53 100644 --- a/rust-lib/flowy-document/src/protobuf/model/ws.rs +++ b/rust-lib/flowy-document/src/protobuf/model/ws.rs @@ -27,7 +27,7 @@ pub struct WsDocumentData { // message fields pub id: ::std::string::String, - pub source: WsSource, + pub ty: WsDataType, pub data: ::std::vec::Vec, // special fields pub unknown_fields: ::protobuf::UnknownFields, @@ -71,19 +71,19 @@ impl WsDocumentData { ::std::mem::replace(&mut self.id, ::std::string::String::new()) } - // .WsSource source = 2; + // .WsDataType ty = 2; - pub fn get_source(&self) -> WsSource { - self.source + pub fn get_ty(&self) -> WsDataType { + self.ty } - pub fn clear_source(&mut self) { - self.source = WsSource::Delta; + pub fn clear_ty(&mut self) { + self.ty = WsDataType::Command; } // Param is passed by value, moved - pub fn set_source(&mut self, v: WsSource) { - self.source = v; + pub fn set_ty(&mut self, v: WsDataType) { + self.ty = v; } // bytes data = 3; @@ -126,7 +126,7 @@ impl ::protobuf::Message for WsDocumentData { ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.id)?; }, 2 => { - ::protobuf::rt::read_proto3_enum_with_unknown_fields_into(wire_type, is, &mut self.source, 2, &mut self.unknown_fields)? + ::protobuf::rt::read_proto3_enum_with_unknown_fields_into(wire_type, is, &mut self.ty, 2, &mut self.unknown_fields)? }, 3 => { ::protobuf::rt::read_singular_proto3_bytes_into(wire_type, is, &mut self.data)?; @@ -146,8 +146,8 @@ impl ::protobuf::Message for WsDocumentData { if !self.id.is_empty() { my_size += ::protobuf::rt::string_size(1, &self.id); } - if self.source != WsSource::Delta { - my_size += ::protobuf::rt::enum_size(2, self.source); + if self.ty != WsDataType::Command { + my_size += ::protobuf::rt::enum_size(2, self.ty); } if !self.data.is_empty() { my_size += ::protobuf::rt::bytes_size(3, &self.data); @@ -161,8 +161,8 @@ impl ::protobuf::Message for WsDocumentData { if !self.id.is_empty() { os.write_string(1, &self.id)?; } - if self.source != WsSource::Delta { - os.write_enum(2, ::protobuf::ProtobufEnum::value(&self.source))?; + if self.ty != WsDataType::Command { + os.write_enum(2, ::protobuf::ProtobufEnum::value(&self.ty))?; } if !self.data.is_empty() { os.write_bytes(3, &self.data)?; @@ -210,10 +210,10 @@ impl ::protobuf::Message for WsDocumentData { |m: &WsDocumentData| { &m.id }, |m: &mut WsDocumentData| { &mut m.id }, )); - fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeEnum>( - "source", - |m: &WsDocumentData| { &m.source }, - |m: &mut WsDocumentData| { &mut m.source }, + fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeEnum>( + "ty", + |m: &WsDocumentData| { &m.ty }, + |m: &mut WsDocumentData| { &mut m.ty }, )); fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeBytes>( "data", @@ -237,7 +237,7 @@ impl ::protobuf::Message for WsDocumentData { impl ::protobuf::Clear for WsDocumentData { fn clear(&mut self) { self.id.clear(); - self.source = WsSource::Delta; + self.ty = WsDataType::Command; self.data.clear(); self.unknown_fields.clear(); } @@ -256,25 +256,28 @@ impl ::protobuf::reflect::ProtobufValue for WsDocumentData { } #[derive(Clone,PartialEq,Eq,Debug,Hash)] -pub enum WsSource { - Delta = 0, +pub enum WsDataType { + Command = 0, + Delta = 1, } -impl ::protobuf::ProtobufEnum for WsSource { +impl ::protobuf::ProtobufEnum for WsDataType { fn value(&self) -> i32 { *self as i32 } - fn from_i32(value: i32) -> ::std::option::Option { + fn from_i32(value: i32) -> ::std::option::Option { match value { - 0 => ::std::option::Option::Some(WsSource::Delta), + 0 => ::std::option::Option::Some(WsDataType::Command), + 1 => ::std::option::Option::Some(WsDataType::Delta), _ => ::std::option::Option::None } } fn values() -> &'static [Self] { - static values: &'static [WsSource] = &[ - WsSource::Delta, + static values: &'static [WsDataType] = &[ + WsDataType::Command, + WsDataType::Delta, ]; values } @@ -282,44 +285,46 @@ impl ::protobuf::ProtobufEnum for WsSource { fn enum_descriptor_static() -> &'static ::protobuf::reflect::EnumDescriptor { static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::EnumDescriptor> = ::protobuf::rt::LazyV2::INIT; descriptor.get(|| { - ::protobuf::reflect::EnumDescriptor::new_pb_name::("WsSource", file_descriptor_proto()) + ::protobuf::reflect::EnumDescriptor::new_pb_name::("WsDataType", file_descriptor_proto()) }) } } -impl ::std::marker::Copy for WsSource { +impl ::std::marker::Copy for WsDataType { } -impl ::std::default::Default for WsSource { +impl ::std::default::Default for WsDataType { fn default() -> Self { - WsSource::Delta + WsDataType::Command } } -impl ::protobuf::reflect::ProtobufValue for WsSource { +impl ::protobuf::reflect::ProtobufValue for WsDataType { fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef { ::protobuf::reflect::ReflectValueRef::Enum(::protobuf::ProtobufEnum::descriptor(self)) } } static file_descriptor_proto_data: &'static [u8] = b"\ - \n\x08ws.proto\"W\n\x0eWsDocumentData\x12\x0e\n\x02id\x18\x01\x20\x01(\t\ - R\x02id\x12!\n\x06source\x18\x02\x20\x01(\x0e2\t.WsSourceR\x06source\x12\ - \x12\n\x04data\x18\x03\x20\x01(\x0cR\x04data*\x15\n\x08WsSource\x12\t\n\ - \x05Delta\x10\0J\x90\x02\n\x06\x12\x04\0\0\t\x01\n\x08\n\x01\x0c\x12\x03\ - \0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x06\x01\n\n\n\x03\x04\0\x01\x12\ - \x03\x02\x08\x16\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\x12\n\x0c\n\x05\ - \x04\0\x02\0\x05\x12\x03\x03\x04\n\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\ - \x03\x0b\r\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x10\x11\n\x0b\n\x04\ - \x04\0\x02\x01\x12\x03\x04\x04\x18\n\x0c\n\x05\x04\0\x02\x01\x06\x12\x03\ - \x04\x04\x0c\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x04\r\x13\n\x0c\n\x05\ - \x04\0\x02\x01\x03\x12\x03\x04\x16\x17\n\x0b\n\x04\x04\0\x02\x02\x12\x03\ - \x05\x04\x13\n\x0c\n\x05\x04\0\x02\x02\x05\x12\x03\x05\x04\t\n\x0c\n\x05\ - \x04\0\x02\x02\x01\x12\x03\x05\n\x0e\n\x0c\n\x05\x04\0\x02\x02\x03\x12\ - \x03\x05\x11\x12\n\n\n\x02\x05\0\x12\x04\x07\0\t\x01\n\n\n\x03\x05\0\x01\ - \x12\x03\x07\x05\r\n\x0b\n\x04\x05\0\x02\0\x12\x03\x08\x04\x0e\n\x0c\n\ - \x05\x05\0\x02\0\x01\x12\x03\x08\x04\t\n\x0c\n\x05\x05\0\x02\0\x02\x12\ - \x03\x08\x0c\rb\x06proto3\ + \n\x08ws.proto\"Q\n\x0eWsDocumentData\x12\x0e\n\x02id\x18\x01\x20\x01(\t\ + R\x02id\x12\x1b\n\x02ty\x18\x02\x20\x01(\x0e2\x0b.WsDataTypeR\x02ty\x12\ + \x12\n\x04data\x18\x03\x20\x01(\x0cR\x04data*$\n\nWsDataType\x12\x0b\n\ + \x07Command\x10\0\x12\t\n\x05Delta\x10\x01J\xb9\x02\n\x06\x12\x04\0\0\n\ + \x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x06\ + \x01\n\n\n\x03\x04\0\x01\x12\x03\x02\x08\x16\n\x0b\n\x04\x04\0\x02\0\x12\ + \x03\x03\x04\x12\n\x0c\n\x05\x04\0\x02\0\x05\x12\x03\x03\x04\n\n\x0c\n\ + \x05\x04\0\x02\0\x01\x12\x03\x03\x0b\r\n\x0c\n\x05\x04\0\x02\0\x03\x12\ + \x03\x03\x10\x11\n\x0b\n\x04\x04\0\x02\x01\x12\x03\x04\x04\x16\n\x0c\n\ + \x05\x04\0\x02\x01\x06\x12\x03\x04\x04\x0e\n\x0c\n\x05\x04\0\x02\x01\x01\ + \x12\x03\x04\x0f\x11\n\x0c\n\x05\x04\0\x02\x01\x03\x12\x03\x04\x14\x15\n\ + \x0b\n\x04\x04\0\x02\x02\x12\x03\x05\x04\x13\n\x0c\n\x05\x04\0\x02\x02\ + \x05\x12\x03\x05\x04\t\n\x0c\n\x05\x04\0\x02\x02\x01\x12\x03\x05\n\x0e\n\ + \x0c\n\x05\x04\0\x02\x02\x03\x12\x03\x05\x11\x12\n\n\n\x02\x05\0\x12\x04\ + \x07\0\n\x01\n\n\n\x03\x05\0\x01\x12\x03\x07\x05\x0f\n\x0b\n\x04\x05\0\ + \x02\0\x12\x03\x08\x04\x10\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\x08\x04\ + \x0b\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x08\x0e\x0f\n\x0b\n\x04\x05\0\ + \x02\x01\x12\x03\t\x04\x0e\n\x0c\n\x05\x05\0\x02\x01\x01\x12\x03\t\x04\t\ + \n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\t\x0c\rb\x06proto3\ "; static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; diff --git a/rust-lib/flowy-document/src/protobuf/proto/revision.proto b/rust-lib/flowy-document/src/protobuf/proto/revision.proto index 1c8434d5ef..d3e5f71aa7 100644 --- a/rust-lib/flowy-document/src/protobuf/proto/revision.proto +++ b/rust-lib/flowy-document/src/protobuf/proto/revision.proto @@ -5,4 +5,5 @@ message Revision { int64 rev_id = 2; bytes delta = 3; string md5 = 4; + string doc_id = 5; } diff --git a/rust-lib/flowy-document/src/protobuf/proto/ws.proto b/rust-lib/flowy-document/src/protobuf/proto/ws.proto index aaf21aba7a..d50b40cdd4 100644 --- a/rust-lib/flowy-document/src/protobuf/proto/ws.proto +++ b/rust-lib/flowy-document/src/protobuf/proto/ws.proto @@ -2,9 +2,10 @@ syntax = "proto3"; message WsDocumentData { string id = 1; - WsSource source = 2; + WsDataType ty = 2; bytes data = 3; } -enum WsSource { - Delta = 0; +enum WsDataType { + Command = 0; + Delta = 1; } diff --git a/rust-lib/flowy-document/src/services/doc/doc_controller.rs b/rust-lib/flowy-document/src/services/doc/doc_controller.rs index e87aacef8a..72c4e70e9d 100644 --- a/rust-lib/flowy-document/src/services/doc/doc_controller.rs +++ b/rust-lib/flowy-document/src/services/doc/doc_controller.rs @@ -2,12 +2,11 @@ use crate::{ entities::doc::{CreateDocParams, Doc, DocDelta, QueryDocParams, UpdateDocParams}, errors::{internal_error, DocError}, module::DocumentUser, - services::{cache::DocCache, doc::edit_context::EditDocContext, server::Server, ws::WsManager}, + services::{cache::DocCache, doc::edit_context::EditDocContext, server::Server, ws::WsDocumentManager}, sql_tables::doc::{DocTable, DocTableSql, OpTableSql}, }; use bytes::Bytes; use flowy_database::{ConnectionPool, SqliteConnection}; -use flowy_infra::future::ClosureFuture; use parking_lot::RwLock; use std::sync::Arc; @@ -16,13 +15,13 @@ pub(crate) struct DocController { server: Server, doc_sql: Arc, op_sql: Arc, - ws: Arc>, + ws: Arc>, cache: Arc, user: Arc, } impl DocController { - pub(crate) fn new(server: Server, user: Arc, ws: Arc>) -> Self { + pub(crate) fn new(server: Server, user: Arc, ws: Arc>) -> Self { let doc_sql = Arc::new(DocTableSql {}); let op_sql = Arc::new(OpTableSql {}); let cache = Arc::new(DocCache::new()); @@ -149,8 +148,8 @@ impl DocController { fn make_edit_context(&self, doc: Doc) -> Result, DocError> { // Opti: require upgradable_read lock and then upgrade to write lock using // RwLockUpgradableReadGuard::upgrade(xx) of ws - let sender = self.ws.read().sender.clone(); - let edit_ctx = Arc::new(EditDocContext::new(doc, sender, self.op_sql.clone())?); + let ws = self.ws.read().sender(); + let edit_ctx = Arc::new(EditDocContext::new(doc, ws, self.op_sql.clone())?); self.ws.write().register_handler(edit_ctx.id.as_ref(), edit_ctx.clone()); self.cache.set(edit_ctx.clone()); Ok(edit_ctx) diff --git a/rust-lib/flowy-document/src/services/doc/edit_context.rs b/rust-lib/flowy-document/src/services/doc/edit_context.rs index b14d5e02e5..8336c6929f 100644 --- a/rust-lib/flowy-document/src/services/doc/edit_context.rs +++ b/rust-lib/flowy-document/src/services/doc/edit_context.rs @@ -1,23 +1,23 @@ use crate::{ entities::{ doc::{Doc, Revision}, - ws::{WsDocumentData, WsSource}, + ws::{WsDataType, WsDocumentData}, }, errors::{internal_error, DocError}, services::{ doc::Document, - ws::{WsHandler, WsSender}, + ws::{WsDocumentHandler, WsDocumentSender}, }, - sql_tables::doc::{OpState, OpTable, OpTableSql}, + sql_tables::doc::{OpTable, OpTableSql}, }; use bytes::Bytes; use flowy_database::ConnectionPool; use flowy_ot::core::Delta; -use parking_lot::{lock_api::RwLockWriteGuard, RawRwLock, RwLock}; +use parking_lot::RwLock; use std::{ convert::TryInto, sync::{ - atomic::{AtomicI64, AtomicUsize, Ordering::SeqCst}, + atomic::{AtomicI64, Ordering::SeqCst}, Arc, }, }; @@ -26,12 +26,12 @@ pub(crate) struct EditDocContext { pub(crate) id: DocId, pub(crate) rev_counter: RevCounter, document: RwLock, - ws_sender: Arc, + ws: Arc, op_sql: Arc, } impl EditDocContext { - pub(crate) fn new(doc: Doc, ws_sender: Arc, op_sql: Arc) -> Result { + pub(crate) fn new(doc: Doc, ws: Arc, op_sql: Arc) -> Result { let id: DocId = doc.id.into(); let rev_counter = RevCounter::new(doc.revision); let delta: Delta = doc.data.try_into()?; @@ -41,7 +41,7 @@ impl EditDocContext { id, rev_counter, document, - ws_sender, + ws, op_sql, }) } @@ -64,9 +64,9 @@ impl EditDocContext { // Opti: it is necessary to save the rev if send success? let md5 = format!("{:x}", md5::compute(json)); - let revision = Revision::new(base_rev_id, rev_id, data.to_vec(), md5); + let revision = Revision::new(base_rev_id, rev_id, data.to_vec(), md5, self.id.clone().into()); self.save_revision(revision.clone(), pool.clone()); - match self.ws_sender.send_data(revision.try_into()?) { + match self.ws.send(revision.into()) { Ok(_) => { // TODO: remove the rev if send success // let _ = self.delete_revision(rev_id, pool)?; @@ -101,10 +101,11 @@ impl EditDocContext { } } -impl WsHandler for EditDocContext { +impl WsDocumentHandler for EditDocContext { fn receive(&self, data: WsDocumentData) { - match data.source { - WsSource::Delta => {}, + match data.ty { + WsDataType::Delta => {}, + WsDataType::Command => {}, } } } diff --git a/rust-lib/flowy-document/src/services/ws/ws_manager.rs b/rust-lib/flowy-document/src/services/ws/ws_manager.rs index c25a3af8f1..92d3ffc529 100644 --- a/rust-lib/flowy-document/src/services/ws/ws_manager.rs +++ b/rust-lib/flowy-document/src/services/ws/ws_manager.rs @@ -1,38 +1,43 @@ use crate::{entities::ws::WsDocumentData, errors::DocError}; use bytes::Bytes; -use lazy_static::lazy_static; + use std::{collections::HashMap, convert::TryInto, sync::Arc}; -pub trait WsSender: Send + Sync { - fn send_data(&self, data: Bytes) -> Result<(), DocError>; +pub(crate) trait WsDocumentHandler: Send + Sync { + fn receive(&self, data: WsDocumentData); } -pub struct WsManager { - pub(crate) sender: Arc, - doc_handlers: HashMap>, +pub trait WsDocumentSender: Send + Sync { + fn send(&self, data: WsDocumentData) -> Result<(), DocError>; } -impl WsManager { - pub fn new(sender: Arc) -> Self { +pub struct WsDocumentManager { + sender: Arc, + // key: the document id + ws_handlers: HashMap>, +} + +impl WsDocumentManager { + pub fn new(sender: Arc) -> Self { Self { sender, - doc_handlers: HashMap::new(), + ws_handlers: HashMap::new(), } } - pub(crate) fn register_handler(&mut self, id: &str, handler: Arc) { - if self.doc_handlers.contains_key(id) { + pub(crate) fn register_handler(&mut self, id: &str, handler: Arc) { + if self.ws_handlers.contains_key(id) { log::error!("Duplicate handler registered for {:?}", id); } - self.doc_handlers.insert(id.to_string(), handler); + self.ws_handlers.insert(id.to_string(), handler); } - pub(crate) fn remove_handler(&mut self, id: &str) { self.doc_handlers.remove(id); } + pub(crate) fn remove_handler(&mut self, id: &str) { self.ws_handlers.remove(id); } pub fn receive_data(&self, data: Bytes) { let data: WsDocumentData = data.try_into().unwrap(); - match self.doc_handlers.get(&data.id) { + match self.ws_handlers.get(&data.id) { None => { log::error!("Can't find any source handler for {:?}", data.id); }, @@ -42,17 +47,5 @@ impl WsManager { } } - pub fn send_data(&self, data: WsDocumentData) { - let bytes: Bytes = data.try_into().unwrap(); - match self.sender.send_data(bytes) { - Ok(_) => {}, - Err(e) => { - log::error!("WsDocument send message failed: {:?}", e); - }, - } - } -} - -pub(crate) trait WsHandler: Send + Sync { - fn receive(&self, data: WsDocumentData); + pub fn sender(&self) -> Arc { self.sender.clone() } } diff --git a/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs b/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs index 2e88794d24..7061879005 100644 --- a/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs +++ b/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs @@ -2,13 +2,14 @@ use bytes::Bytes; use flowy_document::{ errors::DocError, module::DocumentUser, - prelude::{WsManager, WsSender}, + prelude::{WsDocumentManager, WsDocumentSender}, }; +use flowy_document::entities::ws::WsDocumentData; use flowy_user::{errors::ErrorCode, services::user::UserSession}; -use flowy_ws::{WsMessage, WsMessageHandler, WsSource}; +use flowy_ws::{WsMessage, WsMessageHandler, WsModule}; use parking_lot::RwLock; -use std::{path::Path, sync::Arc}; +use std::{convert::TryInto, path::Path, sync::Arc}; pub struct DocumentDepsResolver { user_session: Arc, @@ -17,7 +18,7 @@ pub struct DocumentDepsResolver { impl DocumentDepsResolver { pub fn new(user_session: Arc) -> Self { Self { user_session } } - pub fn split_into(self) -> (Arc, Arc>) { + pub fn split_into(self) -> (Arc, Arc>) { let user = Arc::new(DocumentUserImpl { user: self.user_session.clone(), }); @@ -26,9 +27,9 @@ impl DocumentDepsResolver { user: self.user_session.clone(), }); - let ws_manager = Arc::new(RwLock::new(WsManager::new(sender))); + let ws_manager = Arc::new(RwLock::new(WsDocumentManager::new(sender))); - let ws_handler = Arc::new(WsDocumentResolver { inner: ws_manager.clone() }); + let ws_handler = Arc::new(WsDocumentReceiver { inner: ws_manager.clone() }); self.user_session.add_ws_handler(ws_handler); @@ -70,23 +71,24 @@ struct WsSenderImpl { user: Arc, } -impl WsSender for WsSenderImpl { - fn send_data(&self, data: Bytes) -> Result<(), DocError> { +impl WsDocumentSender for WsSenderImpl { + fn send(&self, data: WsDocumentData) -> Result<(), DocError> { + let bytes: Bytes = data.try_into().unwrap(); let msg = WsMessage { - source: WsSource::Doc, - data: data.to_vec(), + module: WsModule::Doc, + data: bytes.to_vec(), }; let _ = self.user.send_ws_msg(msg).map_err(|e| DocError::internal().context(e))?; Ok(()) } } -struct WsDocumentResolver { - inner: Arc>, +struct WsDocumentReceiver { + inner: Arc>, } -impl WsMessageHandler for WsDocumentResolver { - fn source(&self) -> WsSource { WsSource::Doc } +impl WsMessageHandler for WsDocumentReceiver { + fn source(&self) -> WsModule { WsModule::Doc } fn receive_message(&self, msg: WsMessage) { let data = Bytes::from(msg.data); diff --git a/rust-lib/flowy-sqlite/src/pool.rs b/rust-lib/flowy-sqlite/src/pool.rs index 8522687b83..72b4f95fc6 100644 --- a/rust-lib/flowy-sqlite/src/pool.rs +++ b/rust-lib/flowy-sqlite/src/pool.rs @@ -2,13 +2,7 @@ use crate::{errors::*, pragma::*}; use diesel::{connection::Connection, SqliteConnection}; use r2d2::{CustomizeConnection, ManageConnection, Pool}; use scheduled_thread_pool::ScheduledThreadPool; -use std::{ - sync::{ - atomic::{AtomicUsize, Ordering::SeqCst}, - Arc, - }, - time::Duration, -}; +use std::{sync::Arc, time::Duration}; lazy_static::lazy_static! { static ref DB_POOL: Arc = Arc::new( diff --git a/rust-lib/flowy-ws/src/msg.rs b/rust-lib/flowy-ws/src/msg.rs index 50b2db0981..bc6fb1a212 100644 --- a/rust-lib/flowy-ws/src/msg.rs +++ b/rust-lib/flowy-ws/src/msg.rs @@ -3,28 +3,29 @@ use flowy_derive::{ProtoBuf, ProtoBuf_Enum}; use std::convert::{TryFrom, TryInto}; use tokio_tungstenite::tungstenite::Message as TokioMessage; +// Opti: using four bytes of the data to represent the source #[derive(ProtoBuf, Debug, Clone, Default)] pub struct WsMessage { #[pb(index = 1)] - pub source: WsSource, + pub module: WsModule, #[pb(index = 2)] pub data: Vec, } #[derive(ProtoBuf_Enum, Debug, Clone, Eq, PartialEq, Hash)] -pub enum WsSource { +pub enum WsModule { Doc = 0, } -impl std::default::Default for WsSource { - fn default() -> Self { WsSource::Doc } +impl std::default::Default for WsModule { + fn default() -> Self { WsModule::Doc } } -impl ToString for WsSource { +impl ToString for WsModule { fn to_string(&self) -> String { match self { - WsSource::Doc => "0".to_string(), + WsModule::Doc => "0".to_string(), } } } diff --git a/rust-lib/flowy-ws/src/protobuf/model/msg.rs b/rust-lib/flowy-ws/src/protobuf/model/msg.rs index 153a955bab..35c80e3a65 100644 --- a/rust-lib/flowy-ws/src/protobuf/model/msg.rs +++ b/rust-lib/flowy-ws/src/protobuf/model/msg.rs @@ -26,7 +26,7 @@ #[derive(PartialEq,Clone,Default)] pub struct WsMessage { // message fields - pub source: WsSource, + pub module: WsModule, pub data: ::std::vec::Vec, // special fields pub unknown_fields: ::protobuf::UnknownFields, @@ -44,19 +44,19 @@ impl WsMessage { ::std::default::Default::default() } - // .WsSource source = 1; + // .WsModule module = 1; - pub fn get_source(&self) -> WsSource { - self.source + pub fn get_module(&self) -> WsModule { + self.module } - pub fn clear_source(&mut self) { - self.source = WsSource::Doc; + pub fn clear_module(&mut self) { + self.module = WsModule::Doc; } // Param is passed by value, moved - pub fn set_source(&mut self, v: WsSource) { - self.source = v; + pub fn set_module(&mut self, v: WsModule) { + self.module = v; } // bytes data = 2; @@ -96,7 +96,7 @@ impl ::protobuf::Message for WsMessage { let (field_number, wire_type) = is.read_tag_unpack()?; match field_number { 1 => { - ::protobuf::rt::read_proto3_enum_with_unknown_fields_into(wire_type, is, &mut self.source, 1, &mut self.unknown_fields)? + ::protobuf::rt::read_proto3_enum_with_unknown_fields_into(wire_type, is, &mut self.module, 1, &mut self.unknown_fields)? }, 2 => { ::protobuf::rt::read_singular_proto3_bytes_into(wire_type, is, &mut self.data)?; @@ -113,8 +113,8 @@ impl ::protobuf::Message for WsMessage { #[allow(unused_variables)] fn compute_size(&self) -> u32 { let mut my_size = 0; - if self.source != WsSource::Doc { - my_size += ::protobuf::rt::enum_size(1, self.source); + if self.module != WsModule::Doc { + my_size += ::protobuf::rt::enum_size(1, self.module); } if !self.data.is_empty() { my_size += ::protobuf::rt::bytes_size(2, &self.data); @@ -125,8 +125,8 @@ impl ::protobuf::Message for WsMessage { } fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> { - if self.source != WsSource::Doc { - os.write_enum(1, ::protobuf::ProtobufEnum::value(&self.source))?; + if self.module != WsModule::Doc { + os.write_enum(1, ::protobuf::ProtobufEnum::value(&self.module))?; } if !self.data.is_empty() { os.write_bytes(2, &self.data)?; @@ -169,10 +169,10 @@ impl ::protobuf::Message for WsMessage { static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT; descriptor.get(|| { let mut fields = ::std::vec::Vec::new(); - fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeEnum>( - "source", - |m: &WsMessage| { &m.source }, - |m: &mut WsMessage| { &mut m.source }, + fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeEnum>( + "module", + |m: &WsMessage| { &m.module }, + |m: &mut WsMessage| { &mut m.module }, )); fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeBytes>( "data", @@ -195,7 +195,7 @@ impl ::protobuf::Message for WsMessage { impl ::protobuf::Clear for WsMessage { fn clear(&mut self) { - self.source = WsSource::Doc; + self.module = WsModule::Doc; self.data.clear(); self.unknown_fields.clear(); } @@ -214,25 +214,25 @@ impl ::protobuf::reflect::ProtobufValue for WsMessage { } #[derive(Clone,PartialEq,Eq,Debug,Hash)] -pub enum WsSource { +pub enum WsModule { Doc = 0, } -impl ::protobuf::ProtobufEnum for WsSource { +impl ::protobuf::ProtobufEnum for WsModule { fn value(&self) -> i32 { *self as i32 } - fn from_i32(value: i32) -> ::std::option::Option { + fn from_i32(value: i32) -> ::std::option::Option { match value { - 0 => ::std::option::Option::Some(WsSource::Doc), + 0 => ::std::option::Option::Some(WsModule::Doc), _ => ::std::option::Option::None } } fn values() -> &'static [Self] { - static values: &'static [WsSource] = &[ - WsSource::Doc, + static values: &'static [WsModule] = &[ + WsModule::Doc, ]; values } @@ -240,30 +240,30 @@ impl ::protobuf::ProtobufEnum for WsSource { fn enum_descriptor_static() -> &'static ::protobuf::reflect::EnumDescriptor { static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::EnumDescriptor> = ::protobuf::rt::LazyV2::INIT; descriptor.get(|| { - ::protobuf::reflect::EnumDescriptor::new_pb_name::("WsSource", file_descriptor_proto()) + ::protobuf::reflect::EnumDescriptor::new_pb_name::("WsModule", file_descriptor_proto()) }) } } -impl ::std::marker::Copy for WsSource { +impl ::std::marker::Copy for WsModule { } -impl ::std::default::Default for WsSource { +impl ::std::default::Default for WsModule { fn default() -> Self { - WsSource::Doc + WsModule::Doc } } -impl ::protobuf::reflect::ProtobufValue for WsSource { +impl ::protobuf::reflect::ProtobufValue for WsModule { fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef { ::protobuf::reflect::ReflectValueRef::Enum(::protobuf::ProtobufEnum::descriptor(self)) } } static file_descriptor_proto_data: &'static [u8] = b"\ - \n\tmsg.proto\"B\n\tWsMessage\x12!\n\x06source\x18\x01\x20\x01(\x0e2\t.W\ - sSourceR\x06source\x12\x12\n\x04data\x18\x02\x20\x01(\x0cR\x04data*\x13\ - \n\x08WsSource\x12\x07\n\x03Doc\x10\0J\xd9\x01\n\x06\x12\x04\0\0\x08\x01\ + \n\tmsg.proto\"B\n\tWsMessage\x12!\n\x06module\x18\x01\x20\x01(\x0e2\t.W\ + sModuleR\x06module\x12\x12\n\x04data\x18\x02\x20\x01(\x0cR\x04data*\x13\ + \n\x08WsModule\x12\x07\n\x03Doc\x10\0J\xd9\x01\n\x06\x12\x04\0\0\x08\x01\ \n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x05\x01\n\ \n\n\x03\x04\0\x01\x12\x03\x02\x08\x11\n\x0b\n\x04\x04\0\x02\0\x12\x03\ \x03\x04\x18\n\x0c\n\x05\x04\0\x02\0\x06\x12\x03\x03\x04\x0c\n\x0c\n\x05\ diff --git a/rust-lib/flowy-ws/src/protobuf/proto/msg.proto b/rust-lib/flowy-ws/src/protobuf/proto/msg.proto index 36b405fefe..4b71578340 100644 --- a/rust-lib/flowy-ws/src/protobuf/proto/msg.proto +++ b/rust-lib/flowy-ws/src/protobuf/proto/msg.proto @@ -1,9 +1,9 @@ syntax = "proto3"; message WsMessage { - WsSource source = 1; + WsModule module = 1; bytes data = 2; } -enum WsSource { +enum WsModule { Doc = 0; } diff --git a/rust-lib/flowy-ws/src/ws.rs b/rust-lib/flowy-ws/src/ws.rs index 595958a1c3..f2274cc3a2 100644 --- a/rust-lib/flowy-ws/src/ws.rs +++ b/rust-lib/flowy-ws/src/ws.rs @@ -2,7 +2,7 @@ use crate::{ connect::{Retry, WsConnectionFuture}, errors::WsError, WsMessage, - WsSource, + WsModule, }; use flowy_net::errors::ServerError; use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender}; @@ -24,7 +24,7 @@ use tokio_tungstenite::tungstenite::{ pub type MsgReceiver = UnboundedReceiver; pub type MsgSender = UnboundedSender; pub trait WsMessageHandler: Sync + Send + 'static { - fn source(&self) -> WsSource; + fn source(&self) -> WsModule; fn receive_message(&self, msg: WsMessage); } @@ -51,7 +51,7 @@ pub enum WsState { } pub struct WsController { - handlers: HashMap>, + handlers: HashMap>, state_notify: Arc>, #[allow(dead_code)] addr: Option, @@ -164,11 +164,11 @@ impl WsController { pub struct WsHandlerFuture { #[pin] msg_rx: MsgReceiver, - handlers: HashMap>, + handlers: HashMap>, } impl WsHandlerFuture { - fn new(handlers: HashMap>, msg_rx: MsgReceiver) -> Self { Self { msg_rx, handlers } } + fn new(handlers: HashMap>, msg_rx: MsgReceiver) -> Self { Self { msg_rx, handlers } } } impl Future for WsHandlerFuture { @@ -180,9 +180,8 @@ impl Future for WsHandlerFuture { return Poll::Ready(()); }, Some(message) => { - log::debug!("🐴 ws handler receive message"); let message = WsMessage::from(message); - match self.handlers.get(&message.source) { + match self.handlers.get(&message.module) { None => log::error!("Can't find any handler for message: {:?}", message), Some(handler) => handler.receive_message(message.clone()), } @@ -204,16 +203,19 @@ impl WsSender { Ok(()) } - pub fn send_text(&self, source: WsSource, text: &str) -> Result<(), WsError> { + pub fn send_text(&self, source: &WsModule, text: &str) -> Result<(), WsError> { let msg = WsMessage { - source, + module: source.clone(), data: text.as_bytes().to_vec(), }; self.send_msg(msg) } - pub fn send_binary(&self, source: WsSource, bytes: Vec) -> Result<(), WsError> { - let msg = WsMessage { source, data: bytes }; + pub fn send_binary(&self, source: &WsModule, bytes: Vec) -> Result<(), WsError> { + let msg = WsMessage { + module: source.clone(), + data: bytes, + }; self.send_msg(msg) }