diff --git a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/msg.pbenum.dart b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/msg.pbenum.dart index bca822dca5..ffcdcc93ac 100644 --- a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/msg.pbenum.dart +++ b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/msg.pbenum.dart @@ -12,10 +12,12 @@ import 'package:protobuf/protobuf.dart' as $pb; class WSChannel extends $pb.ProtobufEnum { static const WSChannel Document = WSChannel._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Document'); static const WSChannel Folder = WSChannel._(1, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Folder'); + static const WSChannel Grid = WSChannel._(2, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Grid'); static const $core.List values = [ Document, Folder, + Grid, ]; static final $core.Map<$core.int, WSChannel> _byValue = $pb.ProtobufEnum.initByValue(values); diff --git a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/msg.pbjson.dart b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/msg.pbjson.dart index d1edecb666..0b878758fa 100644 --- a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/msg.pbjson.dart +++ b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/msg.pbjson.dart @@ -14,11 +14,12 @@ const WSChannel$json = const { '2': const [ const {'1': 'Document', '2': 0}, const {'1': 'Folder', '2': 1}, + const {'1': 'Grid', '2': 2}, ], }; /// Descriptor for `WSChannel`. Decode as a `google.protobuf.EnumDescriptorProto`. -final $typed_data.Uint8List wSChannelDescriptor = $convert.base64Decode('CglXU0NoYW5uZWwSDAoIRG9jdW1lbnQQABIKCgZGb2xkZXIQAQ=='); +final $typed_data.Uint8List wSChannelDescriptor = $convert.base64Decode('CglXU0NoYW5uZWwSDAoIRG9jdW1lbnQQABIKCgZGb2xkZXIQARIICgRHcmlkEAI='); @$core.Deprecated('Use webSocketRawMessageDescriptor instead') const WebSocketRawMessage$json = const { '1': 'WebSocketRawMessage', diff --git a/frontend/rust-lib/Cargo.lock b/frontend/rust-lib/Cargo.lock index 199389951d..f5ae68cbda 100755 --- a/frontend/rust-lib/Cargo.lock +++ b/frontend/rust-lib/Cargo.lock @@ -1052,6 +1052,7 @@ dependencies = [ "bytes", "chrono", "dart-notify", + "dashmap", "diesel", "flowy-collaboration", "flowy-database", diff --git a/frontend/rust-lib/flowy-block/src/manager.rs b/frontend/rust-lib/flowy-block/src/manager.rs index e7ad6f7494..6c9e23b4ed 100644 --- a/frontend/rust-lib/flowy-block/src/manager.rs +++ b/frontend/rust-lib/flowy-block/src/manager.rs @@ -32,11 +32,11 @@ impl BlockManager { block_user: Arc, rev_web_socket: Arc, ) -> Self { - let block_handlers = Arc::new(BlockEditors::new()); + let block_editors = Arc::new(BlockEditors::new()); Self { cloud_service, rev_web_socket, - block_editors: block_handlers, + block_editors, block_user, } } diff --git a/frontend/rust-lib/flowy-grid/Cargo.toml b/frontend/rust-lib/flowy-grid/Cargo.toml index 237ff3f6ea..5c65fc5ebc 100644 --- a/frontend/rust-lib/flowy-grid/Cargo.toml +++ b/frontend/rust-lib/flowy-grid/Cargo.toml @@ -29,7 +29,7 @@ chrono = "0.4.19" uuid = { version = "0.8", features = ["serde", "v4"] } bytes = { version = "1.0" } diesel = {version = "1.4.8", features = ["sqlite"]} -#diesel_derives = {version = "1.4.1", features = ["sqlite"]} +dashmap = "4.0" parking_lot = "0.11" diff --git a/frontend/rust-lib/flowy-grid/src/controller.rs b/frontend/rust-lib/flowy-grid/src/controller.rs deleted file mode 100644 index acbe116978..0000000000 --- a/frontend/rust-lib/flowy-grid/src/controller.rs +++ /dev/null @@ -1 +0,0 @@ -pub struct GridManager {} diff --git a/frontend/rust-lib/flowy-grid/src/event_handler.rs b/frontend/rust-lib/flowy-grid/src/event_handler.rs index 600f55ab1a..7d6e7f8e21 100644 --- a/frontend/rust-lib/flowy-grid/src/event_handler.rs +++ b/frontend/rust-lib/flowy-grid/src/event_handler.rs @@ -1,4 +1,4 @@ -use crate::controller::GridManager; +use crate::manager::GridManager; use flowy_error::FlowyError; use flowy_grid_data_model::entities::{ CreateGridPayload, Grid, GridId, RepeatedField, RepeatedFieldOrder, RepeatedRow, RepeatedRowOrder, diff --git a/frontend/rust-lib/flowy-grid/src/event_map.rs b/frontend/rust-lib/flowy-grid/src/event_map.rs index 3fce720d36..787cd6336d 100644 --- a/frontend/rust-lib/flowy-grid/src/event_map.rs +++ b/frontend/rust-lib/flowy-grid/src/event_map.rs @@ -1,5 +1,5 @@ -use crate::controller::GridManager; use crate::event_handler::*; +use crate::manager::GridManager; use flowy_derive::{Flowy_Event, ProtoBuf_Enum}; use lib_dispatch::prelude::*; use std::sync::Arc; diff --git a/frontend/rust-lib/flowy-grid/src/lib.rs b/frontend/rust-lib/flowy-grid/src/lib.rs index 2645d2b121..a7b1a080e7 100644 --- a/frontend/rust-lib/flowy-grid/src/lib.rs +++ b/frontend/rust-lib/flowy-grid/src/lib.rs @@ -1,9 +1,9 @@ #[macro_use] mod macros; -mod controller; mod event_handler; -mod event_map; +pub mod event_map; +pub mod manager; mod protobuf; mod services; diff --git a/frontend/rust-lib/flowy-grid/src/manager.rs b/frontend/rust-lib/flowy-grid/src/manager.rs new file mode 100644 index 0000000000..af4de88293 --- /dev/null +++ b/frontend/rust-lib/flowy-grid/src/manager.rs @@ -0,0 +1,107 @@ +use crate::services::grid_editor::ClientGridEditor; +use dashmap::DashMap; +use flowy_error::{FlowyError, FlowyResult}; +use flowy_sync::{RevisionManager, RevisionPersistence, RevisionWebSocket}; +use lib_sqlite::ConnectionPool; +use std::sync::Arc; + +pub trait GridUser: Send + Sync { + fn user_id(&self) -> Result; + fn token(&self) -> Result; + fn db_pool(&self) -> Result, FlowyError>; +} + +pub struct GridManager { + grid_editors: Arc, + grid_user: Arc, + rev_web_socket: Arc, +} + +impl GridManager { + pub fn new(grid_user: Arc, rev_web_socket: Arc) -> Self { + let grid_editors = Arc::new(GridEditors::new()); + Self { + grid_editors, + grid_user, + rev_web_socket, + } + } + + #[tracing::instrument(level = "debug", skip(self, grid_id), fields(grid_id), err)] + pub async fn open_grid>(&self, grid_id: T) -> Result, FlowyError> { + let grid_id = grid_id.as_ref(); + tracing::Span::current().record("grid_id", &grid_id); + self.get_grid_editor(grid_id).await + } + + #[tracing::instrument(level = "trace", skip(self, grid_id), fields(grid_id), err)] + pub fn close_grid>(&self, grid_id: T) -> Result<(), FlowyError> { + let grid_id = grid_id.as_ref(); + tracing::Span::current().record("grid_id", &grid_id); + self.grid_editors.remove(grid_id); + Ok(()) + } + + #[tracing::instrument(level = "debug", skip(self, grid_id), fields(doc_id), err)] + pub fn delete_grid>(&self, grid_id: T) -> Result<(), FlowyError> { + let grid_id = grid_id.as_ref(); + tracing::Span::current().record("grid_id", &grid_id); + self.grid_editors.remove(grid_id); + Ok(()) + } + + async fn get_grid_editor(&self, grid_id: &str) -> FlowyResult> { + match self.grid_editors.get(grid_id) { + None => { + let db_pool = self.grid_user.db_pool()?; + self.make_grid_editor(grid_id, db_pool).await + } + Some(editor) => Ok(editor), + } + } + + async fn make_grid_editor( + &self, + grid_id: &str, + pool: Arc, + ) -> Result, FlowyError> { + let token = self.grid_user.token()?; + let user_id = self.grid_user.user_id()?; + let grid_editor = ClientGridEditor::new(&user_id, grid_id, &token, pool, self.rev_web_socket.clone()).await?; + self.grid_editors.insert(grid_id, &grid_editor); + Ok(grid_editor) + } +} + +pub struct GridEditors { + inner: DashMap>, +} + +impl GridEditors { + fn new() -> Self { + Self { inner: DashMap::new() } + } + + pub(crate) fn insert(&self, grid_id: &str, grid_editor: &Arc) { + if self.inner.contains_key(grid_id) { + tracing::warn!("Grid:{} already exists in cache", grid_id); + } + self.inner.insert(grid_id.to_string(), grid_editor.clone()); + } + + pub(crate) fn contains(&self, grid_id: &str) -> bool { + self.inner.get(grid_id).is_some() + } + + pub(crate) fn get(&self, grid_id: &str) -> Option> { + if !self.contains(grid_id) { + return None; + } + let opened_grid = self.inner.get(grid_id).unwrap(); + Some(opened_grid.clone()) + } + + pub(crate) fn remove(&self, grid_id: &str) { + self.inner.remove(grid_id); + } +} diff --git a/frontend/rust-lib/flowy-grid/src/services/grid_editor.rs b/frontend/rust-lib/flowy-grid/src/services/grid_editor.rs index b96e11b749..974c026069 100644 --- a/frontend/rust-lib/flowy-grid/src/services/grid_editor.rs +++ b/frontend/rust-lib/flowy-grid/src/services/grid_editor.rs @@ -16,7 +16,7 @@ use std::sync::Arc; pub struct ClientGridEditor { user_id: String, - grid_id: GridId, + grid_id: String, grid: Arc>, rev_manager: Arc, kv: Arc, @@ -25,13 +25,13 @@ pub struct ClientGridEditor { impl ClientGridEditor { pub async fn new( user_id: &str, - grid_id: &GridId, + grid_id: &str, token: &str, pool: Arc, _web_socket: Arc, - ) -> FlowyResult { - let rev_persistence = Arc::new(RevisionPersistence::new(user_id, grid_id.as_ref(), pool.clone())); - let mut rev_manager = RevisionManager::new(user_id, grid_id.as_ref(), rev_persistence); + ) -> FlowyResult> { + let rev_persistence = Arc::new(RevisionPersistence::new(user_id, grid_id, pool.clone())); + let mut rev_manager = RevisionManager::new(user_id, grid_id, rev_persistence); let cloud = Arc::new(GridRevisionCloudService { token: token.to_string(), }); @@ -43,13 +43,13 @@ impl ClientGridEditor { let user_id = user_id.to_owned(); let grid_id = grid_id.to_owned(); - Ok(Self { + Ok(Arc::new(Self { user_id, grid_id, grid, rev_manager, kv, - }) + })) } pub async fn create_row(&self, row: RawRow) -> FlowyResult<()> { diff --git a/frontend/rust-lib/flowy-net/src/local_server/server.rs b/frontend/rust-lib/flowy-net/src/local_server/server.rs index 24fda76639..a9149720fb 100644 --- a/frontend/rust-lib/flowy-net/src/local_server/server.rs +++ b/frontend/rust-lib/flowy-net/src/local_server/server.rs @@ -122,6 +122,9 @@ impl LocalWebSocketRunner { let _ = self.handle_folder_client_data(client_data, "".to_owned()).await?; Ok(()) } + WSChannel::Grid => { + todo!("Implement grid web socket channel") + } } } diff --git a/frontend/rust-lib/flowy-sdk/src/deps_resolve/block_deps.rs b/frontend/rust-lib/flowy-sdk/src/deps_resolve/block_deps.rs index c7e77e252c..f8c97d133c 100644 --- a/frontend/rust-lib/flowy-sdk/src/deps_resolve/block_deps.rs +++ b/frontend/rust-lib/flowy-sdk/src/deps_resolve/block_deps.rs @@ -25,13 +25,13 @@ impl BlockDepsResolver { server_config: &ClientServerConfiguration, ) -> Arc { let user = Arc::new(BlockUserImpl(user_session)); - let ws_sender = Arc::new(BlockWebSocket(ws_conn.clone())); + let rev_web_socket = Arc::new(BlockWebSocket(ws_conn.clone())); let cloud_service: Arc = match local_server { None => Arc::new(BlockHttpCloudService::new(server_config.clone())), Some(local_server) => local_server, }; - let manager = Arc::new(BlockManager::new(cloud_service, user, ws_sender)); + let manager = Arc::new(BlockManager::new(cloud_service, user, rev_web_socket)); let receiver = Arc::new(DocumentWSMessageReceiverImpl(manager.clone())); ws_conn.add_ws_message_receiver(receiver).unwrap(); diff --git a/frontend/rust-lib/flowy-sdk/src/deps_resolve/grid_deps.rs b/frontend/rust-lib/flowy-sdk/src/deps_resolve/grid_deps.rs new file mode 100644 index 0000000000..26eb87ecbc --- /dev/null +++ b/frontend/rust-lib/flowy-sdk/src/deps_resolve/grid_deps.rs @@ -0,0 +1,66 @@ +use crate::FlowyError; +use bytes::Bytes; +use flowy_collaboration::entities::ws_data::ClientRevisionWSData; +use flowy_database::ConnectionPool; +use flowy_grid::manager::{GridManager, GridUser}; +use flowy_net::ws::connection::FlowyWebSocketConnect; +use flowy_sync::{RevisionWebSocket, WSStateReceiver}; +use flowy_user::services::UserSession; +use futures_core::future::BoxFuture; +use lib_infra::future::BoxResultFuture; +use lib_ws::{WSChannel, WebSocketRawMessage}; +use std::convert::TryInto; +use std::sync::Arc; + +pub struct GridDepsResolver(); + +impl GridDepsResolver { + pub fn resolve(ws_conn: Arc, user_session: Arc) -> Arc { + let user = Arc::new(GridUserImpl(user_session)); + let rev_web_socket = Arc::new(GridWebSocket(ws_conn.clone())); + let manager = Arc::new(GridManager::new(user, rev_web_socket)); + manager + } +} + +struct GridUserImpl(Arc); +impl GridUser for GridUserImpl { + fn user_id(&self) -> Result { + self.0.user_id() + } + + fn token(&self) -> Result { + self.0.token() + } + + fn db_pool(&self) -> Result, FlowyError> { + self.0.db_pool() + } +} + +struct GridWebSocket(Arc); +impl RevisionWebSocket for GridWebSocket { + fn send(&self, data: ClientRevisionWSData) -> BoxResultFuture<(), FlowyError> { + let bytes: Bytes = data.try_into().unwrap(); + let msg = WebSocketRawMessage { + channel: WSChannel::Grid, + data: bytes.to_vec(), + }; + + let ws_conn = self.0.clone(); + Box::pin(async move { + match ws_conn.web_socket().await? { + None => {} + Some(sender) => { + sender.send(msg).map_err(|e| FlowyError::internal().context(e))?; + } + } + Ok(()) + }) + } + + fn subscribe_state_changed(&self) -> BoxFuture { + let ws_conn = self.0.clone(); + Box::pin(async move { ws_conn.subscribe_websocket_state().await }) + } +} diff --git a/frontend/rust-lib/flowy-sdk/src/deps_resolve/mod.rs b/frontend/rust-lib/flowy-sdk/src/deps_resolve/mod.rs index 569c32b8de..c0a84d8d94 100644 --- a/frontend/rust-lib/flowy-sdk/src/deps_resolve/mod.rs +++ b/frontend/rust-lib/flowy-sdk/src/deps_resolve/mod.rs @@ -1,7 +1,10 @@ mod block_deps; mod folder_deps; +mod grid_deps; mod user_deps; +mod util; pub use block_deps::*; pub use folder_deps::*; +pub use grid_deps::*; pub use user_deps::*; diff --git a/frontend/rust-lib/flowy-sdk/src/deps_resolve/util.rs b/frontend/rust-lib/flowy-sdk/src/deps_resolve/util.rs new file mode 100644 index 0000000000..e69de29bb2 diff --git a/frontend/rust-lib/flowy-sdk/src/lib.rs b/frontend/rust-lib/flowy-sdk/src/lib.rs index a24c071943..5ea59b176d 100644 --- a/frontend/rust-lib/flowy-sdk/src/lib.rs +++ b/frontend/rust-lib/flowy-sdk/src/lib.rs @@ -5,6 +5,7 @@ pub use flowy_net::get_client_server_configuration; use crate::deps_resolve::*; use flowy_block::BlockManager; use flowy_folder::{controller::FolderManager, errors::FlowyError}; +use flowy_grid::manager::GridManager; use flowy_net::ClientServerConfiguration; use flowy_net::{ entities::NetworkType, @@ -88,6 +89,7 @@ pub struct FlowySDK { pub user_session: Arc, pub document_manager: Arc, pub folder_manager: Arc, + pub grid_manager: Arc, pub dispatcher: Arc, pub ws_conn: Arc, pub local_server: Option>, @@ -100,7 +102,7 @@ impl FlowySDK { tracing::debug!("🔥 {:?}", config); let runtime = tokio_default_runtime().unwrap(); let (local_server, ws_conn) = mk_local_server(&config.server_config); - let (user_session, document_manager, folder_manager, local_server) = runtime.block_on(async { + let (user_session, document_manager, folder_manager, local_server, grid_manager) = runtime.block_on(async { let user_session = mk_user_session(&config, &local_server, &config.server_config); let document_manager = BlockDepsResolver::resolve( local_server.clone(), @@ -118,15 +120,23 @@ impl FlowySDK { ) .await; + let grid_manager = GridDepsResolver::resolve(ws_conn.clone(), user_session.clone()); + if let Some(local_server) = local_server.as_ref() { local_server.run(); } ws_conn.init().await; - (user_session, document_manager, folder_manager, local_server) + ( + user_session, + document_manager, + folder_manager, + local_server, + grid_manager, + ) }); let dispatcher = Arc::new(EventDispatcher::construct(runtime, || { - mk_modules(&ws_conn, &folder_manager, &user_session) + mk_modules(&ws_conn, &folder_manager, &grid_manager, &user_session) })); _start_listening(&dispatcher, &ws_conn, &user_session, &folder_manager); @@ -136,6 +146,7 @@ impl FlowySDK { user_session, document_manager, folder_manager, + grid_manager, dispatcher, ws_conn, local_server, diff --git a/frontend/rust-lib/flowy-sdk/src/module.rs b/frontend/rust-lib/flowy-sdk/src/module.rs index b88300d529..27d9c8dd8c 100644 --- a/frontend/rust-lib/flowy-sdk/src/module.rs +++ b/frontend/rust-lib/flowy-sdk/src/module.rs @@ -1,4 +1,5 @@ use flowy_folder::controller::FolderManager; +use flowy_grid::manager::GridManager; use flowy_net::ws::connection::FlowyWebSocketConnect; use flowy_user::services::UserSession; use lib_dispatch::prelude::Module; @@ -7,22 +8,28 @@ use std::sync::Arc; pub fn mk_modules( ws_conn: &Arc, folder_manager: &Arc, + grid_manager: &Arc, user_session: &Arc, ) -> Vec { let user_module = mk_user_module(user_session.clone()); let folder_module = mk_folder_module(folder_manager.clone()); let network_module = mk_network_module(ws_conn.clone()); - vec![user_module, folder_module, network_module] + let grid_module = mk_grid_module(grid_manager.clone()); + vec![user_module, folder_module, network_module, grid_module] } fn mk_user_module(user_session: Arc) -> Module { flowy_user::event_map::create(user_session) } -fn mk_folder_module(core: Arc) -> Module { - flowy_folder::event_map::create(core) +fn mk_folder_module(folder_manager: Arc) -> Module { + flowy_folder::event_map::create(folder_manager) } fn mk_network_module(ws_conn: Arc) -> Module { flowy_net::event_map::create(ws_conn) } + +fn mk_grid_module(grid_manager: Arc) -> Module { + flowy_grid::event_map::create(grid_manager) +} diff --git a/shared-lib/lib-ws/src/msg.rs b/shared-lib/lib-ws/src/msg.rs index 522491a8a6..51f0a76607 100644 --- a/shared-lib/lib-ws/src/msg.rs +++ b/shared-lib/lib-ws/src/msg.rs @@ -12,10 +12,12 @@ pub struct WebSocketRawMessage { pub data: Vec, } +// The lib-ws crate should not contain business logic.So WSChannel should be removed into another place. #[derive(ProtoBuf_Enum, Debug, Clone, Eq, PartialEq, Hash)] pub enum WSChannel { Document = 0, Folder = 1, + Grid = 2, } impl std::default::Default for WSChannel { @@ -29,6 +31,7 @@ impl ToString for WSChannel { match self { WSChannel::Document => "0".to_string(), WSChannel::Folder => "1".to_string(), + WSChannel::Grid => "2".to_string(), } } } diff --git a/shared-lib/lib-ws/src/protobuf/model/msg.rs b/shared-lib/lib-ws/src/protobuf/model/msg.rs index 0911d4e9b9..35537df3f0 100644 --- a/shared-lib/lib-ws/src/protobuf/model/msg.rs +++ b/shared-lib/lib-ws/src/protobuf/model/msg.rs @@ -217,6 +217,7 @@ impl ::protobuf::reflect::ProtobufValue for WebSocketRawMessage { pub enum WSChannel { Document = 0, Folder = 1, + Grid = 2, } impl ::protobuf::ProtobufEnum for WSChannel { @@ -228,6 +229,7 @@ impl ::protobuf::ProtobufEnum for WSChannel { match value { 0 => ::std::option::Option::Some(WSChannel::Document), 1 => ::std::option::Option::Some(WSChannel::Folder), + 2 => ::std::option::Option::Some(WSChannel::Grid), _ => ::std::option::Option::None } } @@ -236,6 +238,7 @@ impl ::protobuf::ProtobufEnum for WSChannel { static values: &'static [WSChannel] = &[ WSChannel::Document, WSChannel::Folder, + WSChannel::Grid, ]; values } @@ -266,8 +269,8 @@ impl ::protobuf::reflect::ProtobufValue for WSChannel { static file_descriptor_proto_data: &'static [u8] = b"\ \n\tmsg.proto\"O\n\x13WebSocketRawMessage\x12$\n\x07channel\x18\x01\x20\ \x01(\x0e2\n.WSChannelR\x07channel\x12\x12\n\x04data\x18\x02\x20\x01(\ - \x0cR\x04data*%\n\tWSChannel\x12\x0c\n\x08Document\x10\0\x12\n\n\x06Fold\ - er\x10\x01b\x06proto3\ + \x0cR\x04data*/\n\tWSChannel\x12\x0c\n\x08Document\x10\0\x12\n\n\x06Fold\ + er\x10\x01\x12\x08\n\x04Grid\x10\x02b\x06proto3\ "; static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; diff --git a/shared-lib/lib-ws/src/protobuf/proto/msg.proto b/shared-lib/lib-ws/src/protobuf/proto/msg.proto index e5237ddef0..9306bd499b 100644 --- a/shared-lib/lib-ws/src/protobuf/proto/msg.proto +++ b/shared-lib/lib-ws/src/protobuf/proto/msg.proto @@ -7,4 +7,5 @@ message WebSocketRawMessage { enum WSChannel { Document = 0; Folder = 1; + Grid = 2; }