From 718613de42408e00a1a1aac9f58d7a1fc7f5ff06 Mon Sep 17 00:00:00 2001 From: appflowy Date: Thu, 13 Jan 2022 10:53:30 +0800 Subject: [PATCH] flowy-net impl http and local server --- backend/Cargo.lock | 5 + backend/tests/util/helper.rs | 2 +- frontend/rust-lib/flowy-document/Cargo.toml | 1 - frontend/rust-lib/flowy-net/Cargo.toml | 5 + .../src/{cloud => http_server}/core.rs | 221 +++-------- .../src/{cloud => http_server}/document.rs | 44 +-- .../src/{cloud => http_server}/mod.rs | 0 .../src/{cloud => http_server}/user.rs | 59 +-- frontend/rust-lib/flowy-net/src/lib.rs | 3 +- .../flowy-net/src/local_server/mod.rs | 27 ++ .../{ws/local => local_server}/persistence.rs | 17 +- .../flowy-net/src/local_server/server.rs | 372 ++++++++++++++++++ .../rust-lib/flowy-net/src/local_server/ws.rs | 82 ++++ .../rust-lib/flowy-net/src/ws/connection.rs | 14 +- .../rust-lib/flowy-net/src/ws/http/mod.rs | 3 - .../flowy-net/src/ws/{http => }/http_ws.rs | 0 .../flowy-net/src/ws/local/local_server.rs | 104 ----- .../flowy-net/src/ws/local/local_ws.rs | 166 -------- .../rust-lib/flowy-net/src/ws/local/mod.rs | 24 -- frontend/rust-lib/flowy-net/src/ws/mod.rs | 3 +- .../flowy-sdk/src/deps_resolve/core_deps.rs | 169 +------- .../src/deps_resolve/document_deps.rs | 64 +-- .../flowy-sdk/src/deps_resolve/user_deps.rs | 62 +-- frontend/rust-lib/flowy-sdk/src/lib.rs | 67 ++-- frontend/rust-lib/flowy-user/Cargo.toml | 1 - 25 files changed, 667 insertions(+), 848 deletions(-) rename frontend/rust-lib/flowy-net/src/{cloud => http_server}/core.rs (63%) rename frontend/rust-lib/flowy-net/src/{cloud => http_server}/document.rs (67%) rename frontend/rust-lib/flowy-net/src/{cloud => http_server}/mod.rs (100%) rename frontend/rust-lib/flowy-net/src/{cloud => http_server}/user.rs (59%) create mode 100644 frontend/rust-lib/flowy-net/src/local_server/mod.rs rename frontend/rust-lib/flowy-net/src/{ws/local => local_server}/persistence.rs (90%) create mode 100644 frontend/rust-lib/flowy-net/src/local_server/server.rs create mode 100644 frontend/rust-lib/flowy-net/src/local_server/ws.rs delete mode 100644 frontend/rust-lib/flowy-net/src/ws/http/mod.rs rename frontend/rust-lib/flowy-net/src/ws/{http => }/http_ws.rs (100%) delete mode 100644 frontend/rust-lib/flowy-net/src/ws/local/local_server.rs delete mode 100644 frontend/rust-lib/flowy-net/src/ws/local/local_ws.rs delete mode 100644 frontend/rust-lib/flowy-net/src/ws/local/mod.rs diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 3de7da43e3..0194b5818c 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -1382,14 +1382,19 @@ name = "flowy-net" version = "0.1.0" dependencies = [ "anyhow", + "async-stream", "backend-service", "bytes", "dashmap", "flowy-collaboration", + "flowy-core", "flowy-core-data-model", "flowy-derive", + "flowy-document", "flowy-error", + "flowy-user", "flowy-user-data-model", + "futures-util", "lazy_static", "lib-dispatch", "lib-infra", diff --git a/backend/tests/util/helper.rs b/backend/tests/util/helper.rs index 4f5e913341..430f7e4b02 100644 --- a/backend/tests/util/helper.rs +++ b/backend/tests/util/helper.rs @@ -12,7 +12,7 @@ use flowy_collaboration::{ entities::doc::{CreateDocParams, DocumentId, DocumentInfo}, }; use flowy_core_data_model::entities::prelude::*; -use flowy_net::cloud::{ +use flowy_net::http_server::{ core::*, document::{create_document_request, read_document_request}, user::*, diff --git a/frontend/rust-lib/flowy-document/Cargo.toml b/frontend/rust-lib/flowy-document/Cargo.toml index bb20153ab9..7cf5e5f6cd 100644 --- a/frontend/rust-lib/flowy-document/Cargo.toml +++ b/frontend/rust-lib/flowy-document/Cargo.toml @@ -46,7 +46,6 @@ pin-project = "1.0.0" [dev-dependencies] flowy-test = { path = "../flowy-test" } flowy-document = { path = "../flowy-document", features = ["flowy_unit_test"]} -flowy-net = { path = "../flowy-net" } color-eyre = { version = "0.5", default-features = false } criterion = "0.3" rand = "0.7.3" diff --git a/frontend/rust-lib/flowy-net/Cargo.toml b/frontend/rust-lib/flowy-net/Cargo.toml index a5769caaf5..13aeeb2d3e 100644 --- a/frontend/rust-lib/flowy-net/Cargo.toml +++ b/frontend/rust-lib/flowy-net/Cargo.toml @@ -13,6 +13,9 @@ flowy-collaboration = { path = "../../../shared-lib/flowy-collaboration"} backend-service = { path = "../../../shared-lib/backend-service" } flowy-core-data-model = { path = "../../../shared-lib/flowy-core-data-model" } flowy-user-data-model = { path = "../../../shared-lib/flowy-user-data-model"} +flowy-core = { path = "../flowy-core" } +flowy-user = { path = "../flowy-user" } +flowy-document = { path = "../flowy-document" } lazy_static = "1.4.0" lib-infra = { path = "../../../shared-lib/lib-infra" } protobuf = {version = "2.18.0"} @@ -25,5 +28,7 @@ strum = "0.21" strum_macros = "0.21" tracing = { version = "0.1", features = ["log"] } dashmap = {version = "4.0"} +async-stream = "0.3.2" +futures-util = "0.3.15" [features] http_server = [] \ No newline at end of file diff --git a/frontend/rust-lib/flowy-net/src/cloud/core.rs b/frontend/rust-lib/flowy-net/src/http_server/core.rs similarity index 63% rename from frontend/rust-lib/flowy-net/src/cloud/core.rs rename to frontend/rust-lib/flowy-net/src/http_server/core.rs index ac89975c55..0e90f479ab 100644 --- a/frontend/rust-lib/flowy-net/src/cloud/core.rs +++ b/frontend/rust-lib/flowy-net/src/http_server/core.rs @@ -5,15 +5,16 @@ use backend_service::{ response::FlowyResponse, }; use flowy_core_data_model::entities::{ - app::{App, AppId, CreateAppParams, RepeatedApp, UpdateAppParams}, + app::{App, AppId, CreateAppParams, UpdateAppParams}, trash::{RepeatedTrash, RepeatedTrashId}, - view::{CreateViewParams, RepeatedView, RepeatedViewId, UpdateViewParams, View, ViewId}, + view::{CreateViewParams, RepeatedViewId, UpdateViewParams, View, ViewId}, workspace::{CreateWorkspaceParams, RepeatedWorkspace, UpdateWorkspaceParams, Workspace, WorkspaceId}, }; use flowy_error::FlowyError; +use flowy_core::module::WorkspaceCloudService; use lazy_static::lazy_static; -use lib_infra::{future::FutureResult, timestamp, uuid_string}; +use lib_infra::future::FutureResult; use std::sync::Arc; use tokio::sync::broadcast; @@ -25,10 +26,10 @@ impl CoreHttpCloudService { pub fn new(config: ClientServerConfiguration) -> CoreHttpCloudService { Self { config } } } -impl CoreHttpCloudService { - pub fn init(&self) {} +impl WorkspaceCloudService for CoreHttpCloudService { + fn init(&self) {} - pub fn create_workspace(&self, token: &str, params: CreateWorkspaceParams) -> FutureResult { + fn create_workspace(&self, token: &str, params: CreateWorkspaceParams) -> FutureResult { let token = token.to_owned(); let url = self.config.workspace_url(); FutureResult::new(async move { @@ -37,7 +38,7 @@ impl CoreHttpCloudService { }) } - pub fn read_workspace(&self, token: &str, params: WorkspaceId) -> FutureResult { + fn read_workspace(&self, token: &str, params: WorkspaceId) -> FutureResult { let token = token.to_owned(); let url = self.config.workspace_url(); FutureResult::new(async move { @@ -46,7 +47,7 @@ impl CoreHttpCloudService { }) } - pub fn update_workspace(&self, token: &str, params: UpdateWorkspaceParams) -> FutureResult<(), FlowyError> { + fn update_workspace(&self, token: &str, params: UpdateWorkspaceParams) -> FutureResult<(), FlowyError> { let token = token.to_owned(); let url = self.config.workspace_url(); FutureResult::new(async move { @@ -55,7 +56,7 @@ impl CoreHttpCloudService { }) } - pub fn delete_workspace(&self, token: &str, params: WorkspaceId) -> FutureResult<(), FlowyError> { + fn delete_workspace(&self, token: &str, params: WorkspaceId) -> FutureResult<(), FlowyError> { let token = token.to_owned(); let url = self.config.workspace_url(); FutureResult::new(async move { @@ -64,7 +65,7 @@ impl CoreHttpCloudService { }) } - pub fn create_view(&self, token: &str, params: CreateViewParams) -> FutureResult { + fn create_view(&self, token: &str, params: CreateViewParams) -> FutureResult { let token = token.to_owned(); let url = self.config.view_url(); FutureResult::new(async move { @@ -73,7 +74,7 @@ impl CoreHttpCloudService { }) } - pub fn read_view(&self, token: &str, params: ViewId) -> FutureResult, FlowyError> { + fn read_view(&self, token: &str, params: ViewId) -> FutureResult, FlowyError> { let token = token.to_owned(); let url = self.config.view_url(); FutureResult::new(async move { @@ -82,7 +83,7 @@ impl CoreHttpCloudService { }) } - pub fn delete_view(&self, token: &str, params: RepeatedViewId) -> FutureResult<(), FlowyError> { + fn delete_view(&self, token: &str, params: RepeatedViewId) -> FutureResult<(), FlowyError> { let token = token.to_owned(); let url = self.config.view_url(); FutureResult::new(async move { @@ -91,7 +92,7 @@ impl CoreHttpCloudService { }) } - pub fn update_view(&self, token: &str, params: UpdateViewParams) -> FutureResult<(), FlowyError> { + fn update_view(&self, token: &str, params: UpdateViewParams) -> FutureResult<(), FlowyError> { let token = token.to_owned(); let url = self.config.view_url(); FutureResult::new(async move { @@ -100,7 +101,7 @@ impl CoreHttpCloudService { }) } - pub fn create_app(&self, token: &str, params: CreateAppParams) -> FutureResult { + fn create_app(&self, token: &str, params: CreateAppParams) -> FutureResult { let token = token.to_owned(); let url = self.config.app_url(); FutureResult::new(async move { @@ -109,7 +110,7 @@ impl CoreHttpCloudService { }) } - pub fn read_app(&self, token: &str, params: AppId) -> FutureResult, FlowyError> { + fn read_app(&self, token: &str, params: AppId) -> FutureResult, FlowyError> { let token = token.to_owned(); let url = self.config.app_url(); FutureResult::new(async move { @@ -118,7 +119,7 @@ impl CoreHttpCloudService { }) } - pub fn update_app(&self, token: &str, params: UpdateAppParams) -> FutureResult<(), FlowyError> { + fn update_app(&self, token: &str, params: UpdateAppParams) -> FutureResult<(), FlowyError> { let token = token.to_owned(); let url = self.config.app_url(); FutureResult::new(async move { @@ -127,7 +128,7 @@ impl CoreHttpCloudService { }) } - pub fn delete_app(&self, token: &str, params: AppId) -> FutureResult<(), FlowyError> { + fn delete_app(&self, token: &str, params: AppId) -> FutureResult<(), FlowyError> { let token = token.to_owned(); let url = self.config.app_url(); FutureResult::new(async move { @@ -136,7 +137,7 @@ impl CoreHttpCloudService { }) } - pub fn create_trash(&self, token: &str, params: RepeatedTrashId) -> FutureResult<(), FlowyError> { + fn create_trash(&self, token: &str, params: RepeatedTrashId) -> FutureResult<(), FlowyError> { let token = token.to_owned(); let url = self.config.trash_url(); FutureResult::new(async move { @@ -145,7 +146,7 @@ impl CoreHttpCloudService { }) } - pub fn delete_trash(&self, token: &str, params: RepeatedTrashId) -> FutureResult<(), FlowyError> { + fn delete_trash(&self, token: &str, params: RepeatedTrashId) -> FutureResult<(), FlowyError> { let token = token.to_owned(); let url = self.config.trash_url(); FutureResult::new(async move { @@ -154,7 +155,7 @@ impl CoreHttpCloudService { }) } - pub fn read_trash(&self, token: &str) -> FutureResult { + fn read_trash(&self, token: &str) -> FutureResult { let token = token.to_owned(); let url = self.config.trash_url(); FutureResult::new(async move { @@ -164,149 +165,6 @@ impl CoreHttpCloudService { } } -pub struct CoreLocalCloudService {} -impl CoreLocalCloudService { - pub fn new(_config: &ClientServerConfiguration) -> Self { CoreLocalCloudService {} } -} -impl CoreLocalCloudService { - pub fn init(&self) {} - - pub fn create_workspace(&self, _token: &str, params: CreateWorkspaceParams) -> FutureResult { - let time = timestamp(); - let workspace = Workspace { - id: uuid_string(), - name: params.name, - desc: params.desc, - apps: RepeatedApp::default(), - modified_time: time, - create_time: time, - }; - - FutureResult::new(async { Ok(workspace) }) - } - - pub fn read_workspace(&self, _token: &str, _params: WorkspaceId) -> FutureResult { - FutureResult::new(async { - let repeated_workspace = RepeatedWorkspace { items: vec![] }; - Ok(repeated_workspace) - }) - } - - pub fn update_workspace(&self, _token: &str, _params: UpdateWorkspaceParams) -> FutureResult<(), FlowyError> { - FutureResult::new(async { Ok(()) }) - } - - pub fn delete_workspace(&self, _token: &str, _params: WorkspaceId) -> FutureResult<(), FlowyError> { - FutureResult::new(async { Ok(()) }) - } - - pub fn create_view(&self, _token: &str, params: CreateViewParams) -> FutureResult { - let time = timestamp(); - let view = View { - id: params.view_id, - belong_to_id: params.belong_to_id, - name: params.name, - desc: params.desc, - view_type: params.view_type, - version: 0, - belongings: RepeatedView::default(), - modified_time: time, - create_time: time, - }; - FutureResult::new(async { Ok(view) }) - } - - pub fn read_view(&self, _token: &str, _params: ViewId) -> FutureResult, FlowyError> { - FutureResult::new(async { Ok(None) }) - } - - pub fn delete_view(&self, _token: &str, _params: RepeatedViewId) -> FutureResult<(), FlowyError> { - FutureResult::new(async { Ok(()) }) - } - - pub fn update_view(&self, _token: &str, _params: UpdateViewParams) -> FutureResult<(), FlowyError> { - FutureResult::new(async { Ok(()) }) - } - - pub fn create_app(&self, _token: &str, params: CreateAppParams) -> FutureResult { - let time = timestamp(); - let app = App { - id: uuid_string(), - workspace_id: params.workspace_id, - name: params.name, - desc: params.desc, - belongings: RepeatedView::default(), - version: 0, - modified_time: time, - create_time: time, - }; - FutureResult::new(async { Ok(app) }) - } - - pub fn read_app(&self, _token: &str, _params: AppId) -> FutureResult, FlowyError> { - FutureResult::new(async { Ok(None) }) - } - - pub fn update_app(&self, _token: &str, _params: UpdateAppParams) -> FutureResult<(), FlowyError> { - FutureResult::new(async { Ok(()) }) - } - - pub fn delete_app(&self, _token: &str, _params: AppId) -> FutureResult<(), FlowyError> { - FutureResult::new(async { Ok(()) }) - } - - pub fn create_trash(&self, _token: &str, _params: RepeatedTrashId) -> FutureResult<(), FlowyError> { - FutureResult::new(async { Ok(()) }) - } - - pub fn delete_trash(&self, _token: &str, _params: RepeatedTrashId) -> FutureResult<(), FlowyError> { - FutureResult::new(async { Ok(()) }) - } - - pub fn read_trash(&self, _token: &str) -> FutureResult { - FutureResult::new(async { - let repeated_trash = RepeatedTrash { items: vec![] }; - Ok(repeated_trash) - }) - } -} - -lazy_static! { - static ref MIDDLEWARE: Arc = Arc::new(CoreResponseMiddleware::new()); -} - -pub struct CoreResponseMiddleware { - invalid_token_sender: broadcast::Sender, -} - -impl CoreResponseMiddleware { - fn new() -> Self { - let (sender, _) = broadcast::channel(10); - CoreResponseMiddleware { - invalid_token_sender: sender, - } - } - - pub fn invalid_token_subscribe(&self) -> broadcast::Receiver { self.invalid_token_sender.subscribe() } -} - -impl ResponseMiddleware for CoreResponseMiddleware { - fn receive_response(&self, token: &Option, response: &FlowyResponse) { - if let Some(error) = &response.error { - if error.is_unauthorized() { - tracing::error!("user is unauthorized"); - match token { - None => {}, - Some(token) => match self.invalid_token_sender.send(token.clone()) { - Ok(_) => {}, - Err(e) => tracing::error!("{:?}", e), - }, - } - } - } - } -} - fn request_builder() -> HttpRequestBuilder { HttpRequestBuilder::new().middleware(MIDDLEWARE.clone()) } pub async fn create_workspace_request( @@ -474,3 +332,40 @@ pub async fn read_trash_request(token: &str, url: &str) -> Result = Arc::new(CoreResponseMiddleware::new()); +} + +pub struct CoreResponseMiddleware { + invalid_token_sender: broadcast::Sender, +} + +impl CoreResponseMiddleware { + fn new() -> Self { + let (sender, _) = broadcast::channel(10); + CoreResponseMiddleware { + invalid_token_sender: sender, + } + } + + #[allow(dead_code)] + fn invalid_token_subscribe(&self) -> broadcast::Receiver { self.invalid_token_sender.subscribe() } +} + +impl ResponseMiddleware for CoreResponseMiddleware { + fn receive_response(&self, token: &Option, response: &FlowyResponse) { + if let Some(error) = &response.error { + if error.is_unauthorized() { + tracing::error!("user is unauthorized"); + match token { + None => {}, + Some(token) => match self.invalid_token_sender.send(token.clone()) { + Ok(_) => {}, + Err(e) => tracing::error!("{:?}", e), + }, + } + } + } + } +} diff --git a/frontend/rust-lib/flowy-net/src/cloud/document.rs b/frontend/rust-lib/flowy-net/src/http_server/document.rs similarity index 67% rename from frontend/rust-lib/flowy-net/src/cloud/document.rs rename to frontend/rust-lib/flowy-net/src/http_server/document.rs index 277b2bc1f7..b81847de44 100644 --- a/frontend/rust-lib/flowy-net/src/cloud/document.rs +++ b/frontend/rust-lib/flowy-net/src/http_server/document.rs @@ -3,10 +3,8 @@ use backend_service::{ request::{HttpRequestBuilder, ResponseMiddleware}, response::FlowyResponse, }; -use flowy_collaboration::{ - client_document::default::initial_delta_string, - entities::doc::{CreateDocParams, DocumentId, DocumentInfo, ResetDocumentParams}, -}; +use flowy_collaboration::entities::doc::{CreateDocParams, DocumentId, DocumentInfo, ResetDocumentParams}; +use flowy_document::DocumentCloudService; use flowy_error::FlowyError; use lazy_static::lazy_static; use lib_infra::future::FutureResult; @@ -20,56 +18,26 @@ impl DocumentHttpCloudService { pub fn new(config: ClientServerConfiguration) -> Self { Self { config } } } -impl DocumentHttpCloudService { - pub fn create_document_request(&self, token: &str, params: CreateDocParams) -> FutureResult<(), FlowyError> { +impl DocumentCloudService for DocumentHttpCloudService { + fn create_document(&self, token: &str, params: CreateDocParams) -> FutureResult<(), FlowyError> { let token = token.to_owned(); let url = self.config.doc_url(); FutureResult::new(async move { create_document_request(&token, params, &url).await }) } - pub fn read_document_request( - &self, - token: &str, - params: DocumentId, - ) -> FutureResult, FlowyError> { + fn read_document(&self, token: &str, params: DocumentId) -> FutureResult, FlowyError> { let token = token.to_owned(); let url = self.config.doc_url(); FutureResult::new(async move { read_document_request(&token, params, &url).await }) } - pub fn update_document_request(&self, token: &str, params: ResetDocumentParams) -> FutureResult<(), FlowyError> { + fn update_document(&self, token: &str, params: ResetDocumentParams) -> FutureResult<(), FlowyError> { let token = token.to_owned(); let url = self.config.doc_url(); FutureResult::new(async move { reset_doc_request(&token, params, &url).await }) } } -pub struct DocumentLocalCloudService {} - -impl DocumentLocalCloudService { - pub fn create_document_request(&self, _token: &str, _params: CreateDocParams) -> FutureResult<(), FlowyError> { - FutureResult::new(async { Ok(()) }) - } - - pub fn read_document_request( - &self, - _token: &str, - params: DocumentId, - ) -> FutureResult, FlowyError> { - let doc = DocumentInfo { - doc_id: params.doc_id, - text: initial_delta_string(), - rev_id: 0, - base_rev_id: 0, - }; - FutureResult::new(async { Ok(Some(doc)) }) - } - - pub fn update_document_request(&self, _token: &str, _params: ResetDocumentParams) -> FutureResult<(), FlowyError> { - FutureResult::new(async { Ok(()) }) - } -} - pub async fn create_document_request(token: &str, params: CreateDocParams, url: &str) -> Result<(), FlowyError> { let _ = request_builder() .post(&url.to_owned()) diff --git a/frontend/rust-lib/flowy-net/src/cloud/mod.rs b/frontend/rust-lib/flowy-net/src/http_server/mod.rs similarity index 100% rename from frontend/rust-lib/flowy-net/src/cloud/mod.rs rename to frontend/rust-lib/flowy-net/src/http_server/mod.rs diff --git a/frontend/rust-lib/flowy-net/src/cloud/user.rs b/frontend/rust-lib/flowy-net/src/http_server/user.rs similarity index 59% rename from frontend/rust-lib/flowy-net/src/cloud/user.rs rename to frontend/rust-lib/flowy-net/src/http_server/user.rs index c93543f841..494ac71f0a 100644 --- a/frontend/rust-lib/flowy-net/src/cloud/user.rs +++ b/frontend/rust-lib/flowy-net/src/http_server/user.rs @@ -1,5 +1,6 @@ use backend_service::{configuration::*, errors::ServerError, request::HttpRequestBuilder}; use flowy_error::FlowyError; +use flowy_user::module::UserCloudService; use flowy_user_data_model::entities::{ SignInParams, SignInResponse, @@ -8,7 +9,7 @@ use flowy_user_data_model::entities::{ UpdateUserParams, UserProfile, }; -use lib_infra::{future::FutureResult, uuid_string}; +use lib_infra::future::FutureResult; pub struct UserHttpCloudService { config: ClientServerConfiguration, @@ -17,8 +18,8 @@ impl UserHttpCloudService { pub fn new(config: &ClientServerConfiguration) -> Self { Self { config: config.clone() } } } -impl UserHttpCloudService { - pub fn sign_up(&self, params: SignUpParams) -> FutureResult { +impl UserCloudService for UserHttpCloudService { + fn sign_up(&self, params: SignUpParams) -> FutureResult { let url = self.config.sign_up_url(); FutureResult::new(async move { let resp = user_sign_up_request(params, &url).await?; @@ -26,7 +27,7 @@ impl UserHttpCloudService { }) } - pub fn sign_in(&self, params: SignInParams) -> FutureResult { + fn sign_in(&self, params: SignInParams) -> FutureResult { let url = self.config.sign_in_url(); FutureResult::new(async move { let resp = user_sign_in_request(params, &url).await?; @@ -34,7 +35,7 @@ impl UserHttpCloudService { }) } - pub fn sign_out(&self, token: &str) -> FutureResult<(), FlowyError> { + fn sign_out(&self, token: &str) -> FutureResult<(), FlowyError> { let token = token.to_owned(); let url = self.config.sign_out_url(); FutureResult::new(async move { @@ -43,7 +44,7 @@ impl UserHttpCloudService { }) } - pub fn update_user(&self, token: &str, params: UpdateUserParams) -> FutureResult<(), FlowyError> { + fn update_user(&self, token: &str, params: UpdateUserParams) -> FutureResult<(), FlowyError> { let token = token.to_owned(); let url = self.config.user_profile_url(); FutureResult::new(async move { @@ -52,7 +53,7 @@ impl UserHttpCloudService { }) } - pub fn get_user(&self, token: &str) -> FutureResult { + fn get_user(&self, token: &str) -> FutureResult { let token = token.to_owned(); let url = self.config.user_profile_url(); FutureResult::new(async move { @@ -61,49 +62,7 @@ impl UserHttpCloudService { }) } - pub fn ws_addr(&self) -> String { self.config.ws_addr() } -} -pub struct UserLocalCloudService(); -impl UserLocalCloudService { - pub fn new(_config: &ClientServerConfiguration) -> Self { Self() } -} - -impl UserLocalCloudService { - pub fn sign_up(&self, params: SignUpParams) -> FutureResult { - let uid = uuid_string(); - FutureResult::new(async move { - Ok(SignUpResponse { - user_id: uid.clone(), - name: params.name, - email: params.email, - token: uid, - }) - }) - } - - pub fn sign_in(&self, params: SignInParams) -> FutureResult { - let user_id = uuid_string(); - FutureResult::new(async { - Ok(SignInResponse { - user_id: user_id.clone(), - name: params.name, - email: params.email, - token: user_id, - }) - }) - } - - pub fn sign_out(&self, _token: &str) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) } - - pub fn update_user(&self, _token: &str, _params: UpdateUserParams) -> FutureResult<(), FlowyError> { - FutureResult::new(async { Ok(()) }) - } - - pub fn get_user(&self, _token: &str) -> FutureResult { - FutureResult::new(async { Ok(UserProfile::default()) }) - } - - pub fn ws_addr(&self) -> String { "ws://localhost:8000/ws/".to_owned() } + fn ws_addr(&self) -> String { self.config.ws_addr() } } pub async fn user_sign_up_request(params: SignUpParams, url: &str) -> Result { diff --git a/frontend/rust-lib/flowy-net/src/lib.rs b/frontend/rust-lib/flowy-net/src/lib.rs index b3f0ce775d..f4c0be577d 100644 --- a/frontend/rust-lib/flowy-net/src/lib.rs +++ b/frontend/rust-lib/flowy-net/src/lib.rs @@ -1,7 +1,8 @@ -pub mod cloud; pub mod entities; mod event; mod handlers; +pub mod http_server; +pub mod local_server; pub mod module; pub mod protobuf; pub mod ws; diff --git a/frontend/rust-lib/flowy-net/src/local_server/mod.rs b/frontend/rust-lib/flowy-net/src/local_server/mod.rs new file mode 100644 index 0000000000..e62b0680c4 --- /dev/null +++ b/frontend/rust-lib/flowy-net/src/local_server/mod.rs @@ -0,0 +1,27 @@ +use backend_service::configuration::ClientServerConfiguration; +use tokio::sync::{broadcast, mpsc}; + +mod persistence; +mod server; +mod ws; + +pub use server::*; +pub use ws::*; + +pub struct LocalServerContext { + pub local_ws: LocalWebSocket, + pub local_server: LocalServer, +} + +pub fn build_server(_config: &ClientServerConfiguration) -> LocalServerContext { + let (client_ws_sender, server_ws_receiver) = mpsc::unbounded_channel(); + let (server_ws_sender, _) = broadcast::channel(16); + + // server_ws_sender -> client_ws_receiver + // server_ws_receiver <- client_ws_sender + let local_ws = LocalWebSocket::new(server_ws_receiver, server_ws_sender.clone()); + let client_ws_receiver = server_ws_sender; + let local_server = LocalServer::new(client_ws_sender, client_ws_receiver); + + LocalServerContext { local_ws, local_server } +} diff --git a/frontend/rust-lib/flowy-net/src/ws/local/persistence.rs b/frontend/rust-lib/flowy-net/src/local_server/persistence.rs similarity index 90% rename from frontend/rust-lib/flowy-net/src/ws/local/persistence.rs rename to frontend/rust-lib/flowy-net/src/local_server/persistence.rs index da6b41fc3e..7c41e622b3 100644 --- a/frontend/rust-lib/flowy-net/src/ws/local/persistence.rs +++ b/frontend/rust-lib/flowy-net/src/local_server/persistence.rs @@ -1,5 +1,3 @@ -use crate::ws::local::DocumentCloudStorage; - use flowy_collaboration::{ entities::doc::DocumentInfo, errors::CollaborateError, @@ -14,6 +12,21 @@ use std::{ sync::Arc, }; +pub trait DocumentCloudStorage: Send + Sync { + fn set_revisions(&self, repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError>; + fn get_revisions( + &self, + doc_id: &str, + rev_ids: Option>, + ) -> BoxResultFuture; + + fn reset_document( + &self, + doc_id: &str, + repeated_revision: RepeatedRevisionPB, + ) -> BoxResultFuture<(), CollaborateError>; +} + pub(crate) struct LocalDocumentCloudPersistence { // For the moment, we use memory to cache the data, it will be implemented with other storage. // Like the Firestore,Dropbox.etc. diff --git a/frontend/rust-lib/flowy-net/src/local_server/server.rs b/frontend/rust-lib/flowy-net/src/local_server/server.rs new file mode 100644 index 0000000000..1fe722fbc6 --- /dev/null +++ b/frontend/rust-lib/flowy-net/src/local_server/server.rs @@ -0,0 +1,372 @@ +use crate::local_server::persistence::LocalDocumentCloudPersistence; +use async_stream::stream; +use bytes::Bytes; +use flowy_collaboration::{ + client_document::default::initial_delta_string, + entities::{ + doc::{CreateDocParams, DocumentId, DocumentInfo, ResetDocumentParams}, + ws::{DocumentClientWSData, DocumentClientWSDataType}, + }, + errors::CollaborateError, + protobuf::DocumentClientWSData as DocumentClientWSDataPB, + server_document::*, +}; +use flowy_core::module::WorkspaceCloudService; +use flowy_error::{internal_error, FlowyError}; +use futures_util::stream::StreamExt; +use lib_ws::{WSModule, WebSocketRawMessage}; +use parking_lot::RwLock; +use std::{ + convert::{TryFrom, TryInto}, + fmt::Debug, + sync::Arc, +}; +use tokio::sync::{broadcast, mpsc, mpsc::UnboundedSender}; + +pub struct LocalServer { + doc_manager: Arc, + stop_tx: RwLock>>, + client_ws_sender: mpsc::UnboundedSender, + client_ws_receiver: broadcast::Sender, +} + +impl LocalServer { + pub fn new( + client_ws_sender: mpsc::UnboundedSender, + client_ws_receiver: broadcast::Sender, + ) -> Self { + let persistence = Arc::new(LocalDocumentCloudPersistence::default()); + let doc_manager = Arc::new(ServerDocumentManager::new(persistence)); + let stop_tx = RwLock::new(None); + + LocalServer { + doc_manager, + stop_tx, + client_ws_sender, + client_ws_receiver, + } + } + + pub async fn stop(&self) { + if let Some(stop_tx) = self.stop_tx.read().clone() { + let _ = stop_tx.send(()).await; + } + } + + pub fn run(&self) { + let (stop_tx, stop_rx) = mpsc::channel(1); + *self.stop_tx.write() = Some(stop_tx); + let runner = LocalWebSocketRunner { + doc_manager: self.doc_manager.clone(), + stop_rx: Some(stop_rx), + client_ws_sender: self.client_ws_sender.clone(), + client_ws_receiver: Some(self.client_ws_receiver.subscribe()), + }; + tokio::spawn(runner.run()); + } +} + +struct LocalWebSocketRunner { + doc_manager: Arc, + stop_rx: Option>, + client_ws_sender: mpsc::UnboundedSender, + client_ws_receiver: Option>, +} + +impl LocalWebSocketRunner { + pub async fn run(mut self) { + let mut stop_rx = self.stop_rx.take().expect("Only run once"); + let mut client_ws_receiver = self.client_ws_receiver.take().expect("Only run once"); + let stream = stream! { + loop { + tokio::select! { + result = client_ws_receiver.recv() => { + match result { + Ok(msg) => yield msg, + Err(_e) => {}, + } + }, + _ = stop_rx.recv() => { + tracing::trace!("[LocalWebSocketRunner] stop"); + break + }, + }; + } + }; + stream + .for_each(|message| async { + match self.handle_message(message).await { + Ok(_) => {}, + Err(e) => tracing::error!("[LocalWebSocketRunner]: {}", e), + } + }) + .await; + } + + async fn handle_message(&self, message: WebSocketRawMessage) -> Result<(), FlowyError> { + let bytes = Bytes::from(message.data); + let client_data = DocumentClientWSData::try_from(bytes).map_err(internal_error)?; + let _ = self.handle_client_data(client_data, "".to_owned()).await?; + Ok(()) + } + + pub async fn handle_client_data( + &self, + client_data: DocumentClientWSData, + user_id: String, + ) -> Result<(), CollaborateError> { + tracing::trace!( + "[LocalDocumentServer] receive: {}:{}-{:?} ", + client_data.doc_id, + client_data.id(), + client_data.ty, + ); + let client_ws_sender = self.client_ws_sender.clone(); + let user = Arc::new(LocalDocumentUser { + user_id, + client_ws_sender, + }); + let ty = client_data.ty.clone(); + let document_client_data: DocumentClientWSDataPB = client_data.try_into().unwrap(); + match ty { + DocumentClientWSDataType::ClientPushRev => { + let _ = self + .doc_manager + .handle_client_revisions(user, document_client_data) + .await?; + }, + DocumentClientWSDataType::ClientPing => { + let _ = self.doc_manager.handle_client_ping(user, document_client_data).await?; + }, + } + Ok(()) + } +} + +#[derive(Debug)] +struct LocalDocumentUser { + user_id: String, + client_ws_sender: mpsc::UnboundedSender, +} + +impl RevisionUser for LocalDocumentUser { + fn user_id(&self) -> String { self.user_id.clone() } + + fn receive(&self, resp: SyncResponse) { + let sender = self.client_ws_sender.clone(); + let send_fn = |sender: UnboundedSender, msg: WebSocketRawMessage| match sender.send(msg) { + Ok(_) => {}, + Err(e) => { + tracing::error!("LocalDocumentUser send message failed: {}", e); + }, + }; + + tokio::spawn(async move { + match resp { + SyncResponse::Pull(data) => { + let bytes: Bytes = data.try_into().unwrap(); + let msg = WebSocketRawMessage { + module: WSModule::Doc, + data: bytes.to_vec(), + }; + send_fn(sender, msg); + }, + SyncResponse::Push(data) => { + let bytes: Bytes = data.try_into().unwrap(); + let msg = WebSocketRawMessage { + module: WSModule::Doc, + data: bytes.to_vec(), + }; + send_fn(sender, msg); + }, + SyncResponse::Ack(data) => { + let bytes: Bytes = data.try_into().unwrap(); + let msg = WebSocketRawMessage { + module: WSModule::Doc, + data: bytes.to_vec(), + }; + send_fn(sender, msg); + }, + } + }); + } +} + +use flowy_core_data_model::entities::{ + app::{App, AppId, CreateAppParams, RepeatedApp, UpdateAppParams}, + trash::{RepeatedTrash, RepeatedTrashId}, + view::{CreateViewParams, RepeatedView, RepeatedViewId, UpdateViewParams, View, ViewId}, + workspace::{CreateWorkspaceParams, RepeatedWorkspace, UpdateWorkspaceParams, Workspace, WorkspaceId}, +}; +use flowy_document::DocumentCloudService; +use flowy_user::module::UserCloudService; +use flowy_user_data_model::entities::{ + SignInParams, + SignInResponse, + SignUpParams, + SignUpResponse, + UpdateUserParams, + UserProfile, +}; +use lib_infra::{future::FutureResult, timestamp, uuid_string}; + +impl WorkspaceCloudService for LocalServer { + fn init(&self) {} + + fn create_workspace(&self, _token: &str, params: CreateWorkspaceParams) -> FutureResult { + let time = timestamp(); + let workspace = Workspace { + id: uuid_string(), + name: params.name, + desc: params.desc, + apps: RepeatedApp::default(), + modified_time: time, + create_time: time, + }; + + FutureResult::new(async { Ok(workspace) }) + } + + fn read_workspace(&self, _token: &str, _params: WorkspaceId) -> FutureResult { + FutureResult::new(async { + let repeated_workspace = RepeatedWorkspace { items: vec![] }; + Ok(repeated_workspace) + }) + } + + fn update_workspace(&self, _token: &str, _params: UpdateWorkspaceParams) -> FutureResult<(), FlowyError> { + FutureResult::new(async { Ok(()) }) + } + + fn delete_workspace(&self, _token: &str, _params: WorkspaceId) -> FutureResult<(), FlowyError> { + FutureResult::new(async { Ok(()) }) + } + + fn create_view(&self, _token: &str, params: CreateViewParams) -> FutureResult { + let time = timestamp(); + let view = View { + id: params.view_id, + belong_to_id: params.belong_to_id, + name: params.name, + desc: params.desc, + view_type: params.view_type, + version: 0, + belongings: RepeatedView::default(), + modified_time: time, + create_time: time, + }; + FutureResult::new(async { Ok(view) }) + } + + fn read_view(&self, _token: &str, _params: ViewId) -> FutureResult, FlowyError> { + FutureResult::new(async { Ok(None) }) + } + + fn delete_view(&self, _token: &str, _params: RepeatedViewId) -> FutureResult<(), FlowyError> { + FutureResult::new(async { Ok(()) }) + } + + fn update_view(&self, _token: &str, _params: UpdateViewParams) -> FutureResult<(), FlowyError> { + FutureResult::new(async { Ok(()) }) + } + + fn create_app(&self, _token: &str, params: CreateAppParams) -> FutureResult { + let time = timestamp(); + let app = App { + id: uuid_string(), + workspace_id: params.workspace_id, + name: params.name, + desc: params.desc, + belongings: RepeatedView::default(), + version: 0, + modified_time: time, + create_time: time, + }; + FutureResult::new(async { Ok(app) }) + } + + fn read_app(&self, _token: &str, _params: AppId) -> FutureResult, FlowyError> { + FutureResult::new(async { Ok(None) }) + } + + fn update_app(&self, _token: &str, _params: UpdateAppParams) -> FutureResult<(), FlowyError> { + FutureResult::new(async { Ok(()) }) + } + + fn delete_app(&self, _token: &str, _params: AppId) -> FutureResult<(), FlowyError> { + FutureResult::new(async { Ok(()) }) + } + + fn create_trash(&self, _token: &str, _params: RepeatedTrashId) -> FutureResult<(), FlowyError> { + FutureResult::new(async { Ok(()) }) + } + + fn delete_trash(&self, _token: &str, _params: RepeatedTrashId) -> FutureResult<(), FlowyError> { + FutureResult::new(async { Ok(()) }) + } + + fn read_trash(&self, _token: &str) -> FutureResult { + FutureResult::new(async { + let repeated_trash = RepeatedTrash { items: vec![] }; + Ok(repeated_trash) + }) + } +} + +impl UserCloudService for LocalServer { + fn sign_up(&self, params: SignUpParams) -> FutureResult { + let uid = uuid_string(); + FutureResult::new(async move { + Ok(SignUpResponse { + user_id: uid.clone(), + name: params.name, + email: params.email, + token: uid, + }) + }) + } + + fn sign_in(&self, params: SignInParams) -> FutureResult { + let user_id = uuid_string(); + FutureResult::new(async { + Ok(SignInResponse { + user_id: user_id.clone(), + name: params.name, + email: params.email, + token: user_id, + }) + }) + } + + fn sign_out(&self, _token: &str) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) } + + fn update_user(&self, _token: &str, _params: UpdateUserParams) -> FutureResult<(), FlowyError> { + FutureResult::new(async { Ok(()) }) + } + + fn get_user(&self, _token: &str) -> FutureResult { + FutureResult::new(async { Ok(UserProfile::default()) }) + } + + fn ws_addr(&self) -> String { "ws://localhost:8000/ws/".to_owned() } +} + +impl DocumentCloudService for LocalServer { + fn create_document(&self, _token: &str, _params: CreateDocParams) -> FutureResult<(), FlowyError> { + FutureResult::new(async { Ok(()) }) + } + + fn read_document(&self, _token: &str, params: DocumentId) -> FutureResult, FlowyError> { + let doc = DocumentInfo { + doc_id: params.doc_id, + text: initial_delta_string(), + rev_id: 0, + base_rev_id: 0, + }; + FutureResult::new(async { Ok(Some(doc)) }) + } + + fn update_document(&self, _token: &str, _params: ResetDocumentParams) -> FutureResult<(), FlowyError> { + FutureResult::new(async { Ok(()) }) + } +} diff --git a/frontend/rust-lib/flowy-net/src/local_server/ws.rs b/frontend/rust-lib/flowy-net/src/local_server/ws.rs new file mode 100644 index 0000000000..da1ebac84e --- /dev/null +++ b/frontend/rust-lib/flowy-net/src/local_server/ws.rs @@ -0,0 +1,82 @@ +use crate::ws::connection::{FlowyRawWebSocket, FlowyWSSender}; +use dashmap::DashMap; +use flowy_error::FlowyError; +use lib_infra::future::FutureResult; +use lib_ws::{WSConnectState, WSMessageReceiver, WSModule, WebSocketRawMessage}; +use parking_lot::RwLock; +use std::sync::Arc; +use tokio::sync::{broadcast, broadcast::Receiver, mpsc::UnboundedReceiver}; + +pub struct LocalWebSocket { + user_id: Arc>>, + receivers: Arc>>, + state_sender: broadcast::Sender, + server_ws_receiver: RwLock>>, + server_ws_sender: broadcast::Sender, +} + +impl LocalWebSocket { + pub fn new( + server_ws_receiver: UnboundedReceiver, + server_ws_sender: broadcast::Sender, + ) -> Self { + let user_id = Arc::new(RwLock::new(None)); + let receivers = Arc::new(DashMap::new()); + let server_ws_receiver = RwLock::new(Some(server_ws_receiver)); + let (state_sender, _) = broadcast::channel(16); + LocalWebSocket { + user_id, + receivers, + state_sender, + server_ws_receiver, + server_ws_sender, + } + } +} + +impl FlowyRawWebSocket for LocalWebSocket { + fn initialize(&self) -> FutureResult<(), FlowyError> { + let mut server_ws_receiver = self.server_ws_receiver.write().take().expect("Only take once"); + let receivers = self.receivers.clone(); + tokio::spawn(async move { + while let Some(message) = server_ws_receiver.recv().await { + match receivers.get(&message.module) { + None => tracing::error!("Can't find any handler for message: {:?}", message), + Some(receiver) => receiver.receive_message(message.clone()), + } + } + }); + FutureResult::new(async { Ok(()) }) + } + + fn start_connect(&self, _addr: String, user_id: String) -> FutureResult<(), FlowyError> { + *self.user_id.write() = Some(user_id); + FutureResult::new(async { Ok(()) }) + } + + fn stop_connect(&self) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) } + + fn subscribe_connect_state(&self) -> Receiver { self.state_sender.subscribe() } + + fn reconnect(&self, _count: usize) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) } + + fn add_receiver(&self, receiver: Arc) -> Result<(), FlowyError> { + self.receivers.insert(receiver.source(), receiver); + Ok(()) + } + + fn sender(&self) -> Result, FlowyError> { + let ws = LocalWebSocketAdaptor(self.server_ws_sender.clone()); + Ok(Arc::new(ws)) + } +} + +#[derive(Clone)] +struct LocalWebSocketAdaptor(broadcast::Sender); + +impl FlowyWSSender for LocalWebSocketAdaptor { + fn send(&self, msg: WebSocketRawMessage) -> Result<(), FlowyError> { + let _ = self.0.send(msg); + Ok(()) + } +} diff --git a/frontend/rust-lib/flowy-net/src/ws/connection.rs b/frontend/rust-lib/flowy-net/src/ws/connection.rs index adbfd98fbb..9e9b89627a 100644 --- a/frontend/rust-lib/flowy-net/src/ws/connection.rs +++ b/frontend/rust-lib/flowy-net/src/ws/connection.rs @@ -4,6 +4,7 @@ pub use flowy_error::FlowyError; use lib_infra::future::FutureResult; pub use lib_ws::{WSConnectState, WSMessageReceiver, WebSocketRawMessage}; +use lib_ws::WSController; use parking_lot::RwLock; use std::sync::Arc; use tokio::sync::broadcast; @@ -30,7 +31,18 @@ pub struct FlowyWebSocketConnect { } impl FlowyWebSocketConnect { - pub fn new(addr: String, ws: Arc) -> Self { + pub fn new(addr: String) -> Self { + let ws = Arc::new(Arc::new(WSController::new())); + let (status_notifier, _) = broadcast::channel(10); + FlowyWebSocketConnect { + inner: ws, + connect_type: RwLock::new(NetworkType::default()), + status_notifier, + addr, + } + } + + pub fn from_local(addr: String, ws: Arc) -> Self { let (status_notifier, _) = broadcast::channel(10); FlowyWebSocketConnect { inner: ws, diff --git a/frontend/rust-lib/flowy-net/src/ws/http/mod.rs b/frontend/rust-lib/flowy-net/src/ws/http/mod.rs deleted file mode 100644 index 5616b3a3e6..0000000000 --- a/frontend/rust-lib/flowy-net/src/ws/http/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub use http_ws::*; - -mod http_ws; diff --git a/frontend/rust-lib/flowy-net/src/ws/http/http_ws.rs b/frontend/rust-lib/flowy-net/src/ws/http_ws.rs similarity index 100% rename from frontend/rust-lib/flowy-net/src/ws/http/http_ws.rs rename to frontend/rust-lib/flowy-net/src/ws/http_ws.rs diff --git a/frontend/rust-lib/flowy-net/src/ws/local/local_server.rs b/frontend/rust-lib/flowy-net/src/ws/local/local_server.rs deleted file mode 100644 index 4a4e0822da..0000000000 --- a/frontend/rust-lib/flowy-net/src/ws/local/local_server.rs +++ /dev/null @@ -1,104 +0,0 @@ -use crate::ws::local::persistence::LocalDocumentCloudPersistence; -use bytes::Bytes; -use flowy_collaboration::{ - entities::ws::{DocumentClientWSData, DocumentClientWSDataType}, - errors::CollaborateError, - protobuf::DocumentClientWSData as DocumentClientWSDataPB, - server_document::*, -}; -use lib_ws::{WSModule, WebSocketRawMessage}; -use std::{convert::TryInto, fmt::Debug, sync::Arc}; -use tokio::sync::{mpsc, mpsc::UnboundedSender}; - -pub struct LocalDocumentServer { - doc_manager: Arc, - sender: mpsc::UnboundedSender, -} - -impl LocalDocumentServer { - pub fn new(sender: mpsc::UnboundedSender) -> Self { - let persistence = Arc::new(LocalDocumentCloudPersistence::default()); - let doc_manager = Arc::new(ServerDocumentManager::new(persistence)); - LocalDocumentServer { doc_manager, sender } - } - - pub async fn handle_client_data( - &self, - client_data: DocumentClientWSData, - user_id: String, - ) -> Result<(), CollaborateError> { - tracing::trace!( - "[LocalDocumentServer] receive: {}:{}-{:?} ", - client_data.doc_id, - client_data.id(), - client_data.ty, - ); - let user = Arc::new(LocalDocumentUser { - user_id, - ws_sender: self.sender.clone(), - }); - let ty = client_data.ty.clone(); - let document_client_data: DocumentClientWSDataPB = client_data.try_into().unwrap(); - match ty { - DocumentClientWSDataType::ClientPushRev => { - let _ = self - .doc_manager - .handle_client_revisions(user, document_client_data) - .await?; - }, - DocumentClientWSDataType::ClientPing => { - let _ = self.doc_manager.handle_client_ping(user, document_client_data).await?; - }, - } - Ok(()) - } -} - -#[derive(Debug)] -struct LocalDocumentUser { - user_id: String, - ws_sender: mpsc::UnboundedSender, -} - -impl RevisionUser for LocalDocumentUser { - fn user_id(&self) -> String { self.user_id.clone() } - - fn receive(&self, resp: SyncResponse) { - let sender = self.ws_sender.clone(); - let send_fn = |sender: UnboundedSender, msg: WebSocketRawMessage| match sender.send(msg) { - Ok(_) => {}, - Err(e) => { - tracing::error!("LocalDocumentUser send message failed: {}", e); - }, - }; - - tokio::spawn(async move { - match resp { - SyncResponse::Pull(data) => { - let bytes: Bytes = data.try_into().unwrap(); - let msg = WebSocketRawMessage { - module: WSModule::Doc, - data: bytes.to_vec(), - }; - send_fn(sender, msg); - }, - SyncResponse::Push(data) => { - let bytes: Bytes = data.try_into().unwrap(); - let msg = WebSocketRawMessage { - module: WSModule::Doc, - data: bytes.to_vec(), - }; - send_fn(sender, msg); - }, - SyncResponse::Ack(data) => { - let bytes: Bytes = data.try_into().unwrap(); - let msg = WebSocketRawMessage { - module: WSModule::Doc, - data: bytes.to_vec(), - }; - send_fn(sender, msg); - }, - } - }); - } -} diff --git a/frontend/rust-lib/flowy-net/src/ws/local/local_ws.rs b/frontend/rust-lib/flowy-net/src/ws/local/local_ws.rs deleted file mode 100644 index a8ccecf2fa..0000000000 --- a/frontend/rust-lib/flowy-net/src/ws/local/local_ws.rs +++ /dev/null @@ -1,166 +0,0 @@ -use crate::ws::{ - connection::{FlowyRawWebSocket, FlowyWSSender}, - local::local_server::LocalDocumentServer, -}; -use bytes::Bytes; -use dashmap::DashMap; -use flowy_collaboration::entities::ws::*; -use flowy_error::{internal_error, FlowyError}; -use lib_infra::future::FutureResult; -use lib_ws::{WSConnectState, WSMessageReceiver, WSModule, WebSocketRawMessage}; -use parking_lot::RwLock; -use std::{convert::TryFrom, sync::Arc}; -use tokio::sync::{broadcast, broadcast::Receiver, mpsc, mpsc::UnboundedReceiver}; - -pub struct LocalWebSocket { - receivers: Arc>>, - state_sender: broadcast::Sender, - // LocalWSSender uses the mpsc::channel sender to simulate the web socket. It spawns a receiver that uses the - // LocalDocumentServer to handle the message. The server will send the WebSocketRawMessage messages that will - // be handled by the WebSocketRawMessage receivers. - ws_sender: LocalWSSender, - local_server: Arc, - local_server_rx: RwLock>>, - local_server_stop_tx: RwLock>>, - user_id: Arc>>, -} - -impl std::default::Default for LocalWebSocket { - fn default() -> Self { - let (state_sender, _) = broadcast::channel(16); - let ws_sender = LocalWSSender::default(); - let receivers = Arc::new(DashMap::new()); - - let (server_tx, server_rx) = mpsc::unbounded_channel(); - let local_server = Arc::new(LocalDocumentServer::new(server_tx)); - let local_server_rx = RwLock::new(Some(server_rx)); - let local_server_stop_tx = RwLock::new(None); - let user_id = Arc::new(RwLock::new(None)); - - LocalWebSocket { - receivers, - state_sender, - ws_sender, - local_server, - local_server_rx, - local_server_stop_tx, - user_id, - } - } -} - -impl LocalWebSocket { - fn cancel_pre_spawn_client(&self) { - if let Some(stop_tx) = self.local_server_stop_tx.read().clone() { - tokio::spawn(async move { - let _ = stop_tx.send(()).await; - }); - } - } - - fn spawn_client_ws_receiver(&self, _addr: String) { - let mut ws_receiver = self.ws_sender.subscribe(); - let local_server = self.local_server.clone(); - let user_id = self.user_id.clone(); - let _ = self.cancel_pre_spawn_client(); - let (stop_tx, mut stop_rx) = mpsc::channel::<()>(1); - *self.local_server_stop_tx.write() = Some(stop_tx); - tokio::spawn(async move { - loop { - tokio::select! { - result = ws_receiver.recv() => { - match result { - Ok(message) => { - let user_id = user_id.read().clone(); - handle_ws_raw_message(user_id, &local_server, message).await; - }, - Err(e) => tracing::error!("[LocalWebSocket] error: {}", e), - } - } - _ = stop_rx.recv() => { - break - }, - } - } - }); - } -} - -async fn handle_ws_raw_message( - user_id: Option, - local_server: &Arc, - message: WebSocketRawMessage, -) { - let f = || async { - match user_id { - None => Ok(()), - Some(user_id) => { - let bytes = Bytes::from(message.data); - let client_data = DocumentClientWSData::try_from(bytes).map_err(internal_error)?; - let _ = local_server.handle_client_data(client_data, user_id).await?; - Ok::<(), FlowyError>(()) - }, - } - }; - if let Err(e) = f().await { - tracing::error!("[LocalWebSocket] error: {:?}", e); - } -} - -impl FlowyRawWebSocket for LocalWebSocket { - fn initialize(&self) -> FutureResult<(), FlowyError> { - let mut server_rx = self.local_server_rx.write().take().expect("Only take once"); - let receivers = self.receivers.clone(); - tokio::spawn(async move { - while let Some(message) = server_rx.recv().await { - match receivers.get(&message.module) { - None => tracing::error!("Can't find any handler for message: {:?}", message), - Some(receiver) => receiver.receive_message(message.clone()), - } - } - }); - FutureResult::new(async { Ok(()) }) - } - - fn start_connect(&self, addr: String, user_id: String) -> FutureResult<(), FlowyError> { - *self.user_id.write() = Some(user_id); - self.spawn_client_ws_receiver(addr); - FutureResult::new(async { Ok(()) }) - } - - fn stop_connect(&self) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) } - - fn subscribe_connect_state(&self) -> Receiver { self.state_sender.subscribe() } - - fn reconnect(&self, _count: usize) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) } - - fn add_receiver(&self, receiver: Arc) -> Result<(), FlowyError> { - self.receivers.insert(receiver.source(), receiver); - Ok(()) - } - - fn sender(&self) -> Result, FlowyError> { Ok(Arc::new(self.ws_sender.clone())) } -} - -#[derive(Clone)] -struct LocalWSSender(broadcast::Sender); - -impl std::default::Default for LocalWSSender { - fn default() -> Self { - let (tx, _) = broadcast::channel(16); - Self(tx) - } -} - -impl FlowyWSSender for LocalWSSender { - fn send(&self, msg: WebSocketRawMessage) -> Result<(), FlowyError> { - let _ = self.0.send(msg); - Ok(()) - } -} - -impl std::ops::Deref for LocalWSSender { - type Target = broadcast::Sender; - - fn deref(&self) -> &Self::Target { &self.0 } -} diff --git a/frontend/rust-lib/flowy-net/src/ws/local/mod.rs b/frontend/rust-lib/flowy-net/src/ws/local/mod.rs deleted file mode 100644 index 29bb454a03..0000000000 --- a/frontend/rust-lib/flowy-net/src/ws/local/mod.rs +++ /dev/null @@ -1,24 +0,0 @@ -mod local_server; -mod local_ws; -mod persistence; - -use flowy_collaboration::errors::CollaborateError; -pub use local_ws::*; - -use flowy_collaboration::protobuf::RepeatedRevision as RepeatedRevisionPB; -use lib_infra::future::BoxResultFuture; - -pub trait DocumentCloudStorage: Send + Sync { - fn set_revisions(&self, repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError>; - fn get_revisions( - &self, - doc_id: &str, - rev_ids: Option>, - ) -> BoxResultFuture; - - fn reset_document( - &self, - doc_id: &str, - repeated_revision: RepeatedRevisionPB, - ) -> BoxResultFuture<(), CollaborateError>; -} diff --git a/frontend/rust-lib/flowy-net/src/ws/mod.rs b/frontend/rust-lib/flowy-net/src/ws/mod.rs index b161c5ae8b..f16abe75e3 100644 --- a/frontend/rust-lib/flowy-net/src/ws/mod.rs +++ b/frontend/rust-lib/flowy-net/src/ws/mod.rs @@ -1,3 +1,2 @@ pub mod connection; -pub mod http; -pub mod local; +pub mod http_ws; diff --git a/frontend/rust-lib/flowy-sdk/src/deps_resolve/core_deps.rs b/frontend/rust-lib/flowy-sdk/src/deps_resolve/core_deps.rs index ccbd578a1b..2f180ed747 100644 --- a/frontend/rust-lib/flowy-sdk/src/deps_resolve/core_deps.rs +++ b/frontend/rust-lib/flowy-sdk/src/deps_resolve/core_deps.rs @@ -2,34 +2,17 @@ use backend_service::configuration::ClientServerConfiguration; use flowy_core::{ errors::FlowyError, module::{WorkspaceCloudService, WorkspaceDatabase, WorkspaceUser}, - prelude::{ - App, - AppId, - CreateAppParams, - CreateViewParams, - CreateWorkspaceParams, - RepeatedTrash, - RepeatedTrashId, - RepeatedViewId, - RepeatedWorkspace, - UpdateAppParams, - UpdateViewParams, - UpdateWorkspaceParams, - View, - ViewId, - Workspace, - WorkspaceId, - }, }; use flowy_database::ConnectionPool; -use flowy_net::cloud::core::{CoreHttpCloudService, CoreLocalCloudService}; +use flowy_net::{http_server::core::CoreHttpCloudService, local_server::LocalServer}; use flowy_user::services::UserSession; -use lib_infra::future::FutureResult; + use std::sync::Arc; pub struct CoreDepsResolver(); impl CoreDepsResolver { pub fn resolve( + local_server: Option>, user_session: Arc, server_config: &ClientServerConfiguration, ) -> ( @@ -39,7 +22,10 @@ impl CoreDepsResolver { ) { let user: Arc = Arc::new(WorkspaceUserImpl(user_session.clone())); let database: Arc = Arc::new(WorkspaceDatabaseImpl(user_session)); - let cloud_service = make_core_cloud_service(server_config); + let cloud_service: Arc = match local_server { + None => Arc::new(CoreHttpCloudService::new(server_config.clone())), + Some(local_server) => local_server, + }; (user, database, cloud_service) } } @@ -57,144 +43,3 @@ impl WorkspaceUser for WorkspaceUserImpl { fn token(&self) -> Result { self.0.token().map_err(|e| FlowyError::internal().context(e)) } } - -fn make_core_cloud_service(config: &ClientServerConfiguration) -> Arc { - if cfg!(feature = "http_server") { - Arc::new(CoreHttpCloudServiceAdaptor::new(config)) - } else { - Arc::new(CoreLocalCloudServiceAdaptor::new(config)) - } -} - -struct CoreHttpCloudServiceAdaptor(CoreHttpCloudService); -impl CoreHttpCloudServiceAdaptor { - fn new(config: &ClientServerConfiguration) -> Self { Self(CoreHttpCloudService::new(config.clone())) } -} -impl WorkspaceCloudService for CoreHttpCloudServiceAdaptor { - fn init(&self) { self.0.init() } - - fn create_workspace(&self, token: &str, params: CreateWorkspaceParams) -> FutureResult { - self.0.create_workspace(token, params) - } - - fn read_workspace(&self, token: &str, params: WorkspaceId) -> FutureResult { - self.0.read_workspace(token, params) - } - - fn update_workspace(&self, token: &str, params: UpdateWorkspaceParams) -> FutureResult<(), FlowyError> { - self.0.update_workspace(token, params) - } - - fn delete_workspace(&self, token: &str, params: WorkspaceId) -> FutureResult<(), FlowyError> { - self.0.delete_workspace(token, params) - } - - fn create_view(&self, token: &str, params: CreateViewParams) -> FutureResult { - self.0.create_view(token, params) - } - - fn read_view(&self, token: &str, params: ViewId) -> FutureResult, FlowyError> { - self.0.read_view(token, params) - } - - fn delete_view(&self, token: &str, params: RepeatedViewId) -> FutureResult<(), FlowyError> { - self.0.delete_view(token, params) - } - - fn update_view(&self, token: &str, params: UpdateViewParams) -> FutureResult<(), FlowyError> { - self.0.update_view(token, params) - } - - fn create_app(&self, token: &str, params: CreateAppParams) -> FutureResult { - self.0.create_app(token, params) - } - - fn read_app(&self, token: &str, params: AppId) -> FutureResult, FlowyError> { - self.0.read_app(token, params) - } - - fn update_app(&self, token: &str, params: UpdateAppParams) -> FutureResult<(), FlowyError> { - self.0.update_app(token, params) - } - - fn delete_app(&self, token: &str, params: AppId) -> FutureResult<(), FlowyError> { - self.0.delete_app(token, params) - } - - fn create_trash(&self, token: &str, params: RepeatedTrashId) -> FutureResult<(), FlowyError> { - self.0.create_trash(token, params) - } - - fn delete_trash(&self, token: &str, params: RepeatedTrashId) -> FutureResult<(), FlowyError> { - self.0.delete_trash(token, params) - } - - fn read_trash(&self, token: &str) -> FutureResult { self.0.read_trash(token) } -} - -struct CoreLocalCloudServiceAdaptor(CoreLocalCloudService); -impl CoreLocalCloudServiceAdaptor { - fn new(config: &ClientServerConfiguration) -> Self { Self(CoreLocalCloudService::new(config)) } -} - -impl WorkspaceCloudService for CoreLocalCloudServiceAdaptor { - fn init(&self) { self.0.init() } - - fn create_workspace(&self, token: &str, params: CreateWorkspaceParams) -> FutureResult { - self.0.create_workspace(token, params) - } - - fn read_workspace(&self, token: &str, params: WorkspaceId) -> FutureResult { - self.0.read_workspace(token, params) - } - - fn update_workspace(&self, token: &str, params: UpdateWorkspaceParams) -> FutureResult<(), FlowyError> { - self.0.update_workspace(token, params) - } - - fn delete_workspace(&self, token: &str, params: WorkspaceId) -> FutureResult<(), FlowyError> { - self.0.delete_workspace(token, params) - } - - fn create_view(&self, token: &str, params: CreateViewParams) -> FutureResult { - self.0.create_view(token, params) - } - - fn read_view(&self, token: &str, params: ViewId) -> FutureResult, FlowyError> { - self.0.read_view(token, params) - } - - fn delete_view(&self, token: &str, params: RepeatedViewId) -> FutureResult<(), FlowyError> { - self.0.delete_view(token, params) - } - - fn update_view(&self, token: &str, params: UpdateViewParams) -> FutureResult<(), FlowyError> { - self.0.update_view(token, params) - } - - fn create_app(&self, token: &str, params: CreateAppParams) -> FutureResult { - self.0.create_app(token, params) - } - - fn read_app(&self, token: &str, params: AppId) -> FutureResult, FlowyError> { - self.0.read_app(token, params) - } - - fn update_app(&self, token: &str, params: UpdateAppParams) -> FutureResult<(), FlowyError> { - self.0.update_app(token, params) - } - - fn delete_app(&self, token: &str, params: AppId) -> FutureResult<(), FlowyError> { - self.0.delete_app(token, params) - } - - fn create_trash(&self, token: &str, params: RepeatedTrashId) -> FutureResult<(), FlowyError> { - self.0.create_trash(token, params) - } - - fn delete_trash(&self, token: &str, params: RepeatedTrashId) -> FutureResult<(), FlowyError> { - self.0.delete_trash(token, params) - } - - fn read_trash(&self, token: &str) -> FutureResult { self.0.read_trash(token) } -} diff --git a/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs b/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs index 7a77c9632b..ba5d84fbab 100644 --- a/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs +++ b/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs @@ -1,9 +1,6 @@ use backend_service::configuration::ClientServerConfiguration; use bytes::Bytes; -use flowy_collaboration::entities::{ - doc::{CreateDocParams, DocumentId, DocumentInfo, ResetDocumentParams}, - ws::DocumentClientWSData, -}; +use flowy_collaboration::entities::ws::DocumentClientWSData; use flowy_database::ConnectionPool; use flowy_document::{ context::DocumentUser, @@ -12,11 +9,12 @@ use flowy_document::{ DocumentCloudService, }; use flowy_net::{ - cloud::document::{DocumentHttpCloudService, DocumentLocalCloudService}, + http_server::document::DocumentHttpCloudService, + local_server::LocalServer, ws::connection::FlowyWebSocketConnect, }; use flowy_user::services::UserSession; -use lib_infra::future::FutureResult; + use lib_ws::{WSMessageReceiver, WSModule, WebSocketRawMessage}; use std::{convert::TryInto, path::Path, sync::Arc}; @@ -30,6 +28,7 @@ pub struct DocumentDependencies { pub struct DocumentDepsResolver(); impl DocumentDepsResolver { pub fn resolve( + local_server: Option>, ws_conn: Arc, user_session: Arc, server_config: &ClientServerConfiguration, @@ -39,7 +38,12 @@ impl DocumentDepsResolver { let ws_receivers = Arc::new(DocumentWSReceivers::new()); let receiver = Arc::new(WSMessageReceiverImpl(ws_receivers.clone())); ws_conn.add_ws_message_receiver(receiver).unwrap(); - let cloud_service = make_document_cloud_service(server_config); + + let cloud_service: Arc = match local_server { + None => Arc::new(DocumentHttpCloudService::new(server_config.clone())), + Some(local_server) => local_server, + }; + DocumentDependencies { user, ws_receivers, @@ -94,49 +98,3 @@ impl WSMessageReceiver for WSMessageReceiverImpl { }); } } - -fn make_document_cloud_service(server_config: &ClientServerConfiguration) -> Arc { - if cfg!(feature = "http_server") { - Arc::new(DocumentHttpCloudServiceAdaptor::new(server_config.clone())) - } else { - Arc::new(DocumentLocalCloudServiceAdaptor::new()) - } -} - -struct DocumentHttpCloudServiceAdaptor(DocumentHttpCloudService); -impl DocumentHttpCloudServiceAdaptor { - fn new(config: ClientServerConfiguration) -> Self { - DocumentHttpCloudServiceAdaptor(DocumentHttpCloudService::new(config)) - } -} -impl DocumentCloudService for DocumentHttpCloudServiceAdaptor { - fn create_document(&self, token: &str, params: CreateDocParams) -> FutureResult<(), FlowyError> { - self.0.create_document_request(token, params) - } - - fn read_document(&self, token: &str, params: DocumentId) -> FutureResult, FlowyError> { - self.0.read_document_request(token, params) - } - - fn update_document(&self, token: &str, params: ResetDocumentParams) -> FutureResult<(), FlowyError> { - self.0.update_document_request(token, params) - } -} - -struct DocumentLocalCloudServiceAdaptor(DocumentLocalCloudService); -impl DocumentLocalCloudServiceAdaptor { - fn new() -> Self { Self(DocumentLocalCloudService {}) } -} -impl DocumentCloudService for DocumentLocalCloudServiceAdaptor { - fn create_document(&self, token: &str, params: CreateDocParams) -> FutureResult<(), FlowyError> { - self.0.create_document_request(token, params) - } - - fn read_document(&self, token: &str, params: DocumentId) -> FutureResult, FlowyError> { - self.0.read_document_request(token, params) - } - - fn update_document(&self, token: &str, params: ResetDocumentParams) -> FutureResult<(), FlowyError> { - self.0.update_document_request(token, params) - } -} diff --git a/frontend/rust-lib/flowy-sdk/src/deps_resolve/user_deps.rs b/frontend/rust-lib/flowy-sdk/src/deps_resolve/user_deps.rs index db1e790a83..12328b02c3 100644 --- a/frontend/rust-lib/flowy-sdk/src/deps_resolve/user_deps.rs +++ b/frontend/rust-lib/flowy-sdk/src/deps_resolve/user_deps.rs @@ -1,58 +1,18 @@ -use crate::FlowyError; use backend_service::configuration::ClientServerConfiguration; -use flowy_net::cloud::user::{UserHttpCloudService, UserLocalCloudService}; -use flowy_user::{ - entities::{SignInParams, SignInResponse, SignUpParams, SignUpResponse, UpdateUserParams, UserProfile}, - module::UserCloudService, -}; -use lib_infra::future::FutureResult; +use flowy_net::{http_server::user::UserHttpCloudService, local_server::LocalServer}; +use flowy_user::module::UserCloudService; + use std::sync::Arc; pub struct UserDepsResolver(); impl UserDepsResolver { - pub fn resolve(server_config: &ClientServerConfiguration) -> Arc { - make_user_cloud_service(server_config) + pub fn resolve( + local_server: &Option>, + server_config: &ClientServerConfiguration, + ) -> Arc { + match local_server.clone() { + None => Arc::new(UserHttpCloudService::new(server_config)), + Some(local_server) => local_server, + } } } - -fn make_user_cloud_service(config: &ClientServerConfiguration) -> Arc { - if cfg!(feature = "http_server") { - Arc::new(UserHttpCloudServiceAdaptor(UserHttpCloudService::new(config))) - } else { - Arc::new(UserLocalCloudServiceAdaptor(UserLocalCloudService::new(config))) - } -} - -struct UserHttpCloudServiceAdaptor(UserHttpCloudService); -impl UserCloudService for UserHttpCloudServiceAdaptor { - fn sign_up(&self, params: SignUpParams) -> FutureResult { self.0.sign_up(params) } - - fn sign_in(&self, params: SignInParams) -> FutureResult { self.0.sign_in(params) } - - fn sign_out(&self, token: &str) -> FutureResult<(), FlowyError> { self.0.sign_out(token) } - - fn update_user(&self, token: &str, params: UpdateUserParams) -> FutureResult<(), FlowyError> { - self.0.update_user(token, params) - } - - fn get_user(&self, token: &str) -> FutureResult { self.0.get_user(token) } - - fn ws_addr(&self) -> String { self.0.ws_addr() } -} - -struct UserLocalCloudServiceAdaptor(UserLocalCloudService); -impl UserCloudService for UserLocalCloudServiceAdaptor { - fn sign_up(&self, params: SignUpParams) -> FutureResult { self.0.sign_up(params) } - - fn sign_in(&self, params: SignInParams) -> FutureResult { self.0.sign_in(params) } - - fn sign_out(&self, token: &str) -> FutureResult<(), FlowyError> { self.0.sign_out(token) } - - fn update_user(&self, token: &str, params: UpdateUserParams) -> FutureResult<(), FlowyError> { - self.0.update_user(token, params) - } - - fn get_user(&self, token: &str) -> FutureResult { self.0.get_user(token) } - - fn ws_addr(&self) -> String { self.0.ws_addr() } -} diff --git a/frontend/rust-lib/flowy-sdk/src/lib.rs b/frontend/rust-lib/flowy-sdk/src/lib.rs index cb97e299e2..197a24ca6b 100644 --- a/frontend/rust-lib/flowy-sdk/src/lib.rs +++ b/frontend/rust-lib/flowy-sdk/src/lib.rs @@ -6,14 +6,12 @@ use flowy_core::{context::CoreContext, errors::FlowyError, module::init_core}; use flowy_document::context::DocumentContext; use flowy_net::{ entities::NetworkType, - ws::{ - connection::{listen_on_websocket, FlowyRawWebSocket, FlowyWebSocketConnect}, - local::LocalWebSocket, - }, + local_server::LocalServer, + ws::connection::{listen_on_websocket, FlowyWebSocketConnect}, }; use flowy_user::services::{notifier::UserStatus, UserSession, UserSessionConfig}; use lib_dispatch::prelude::*; -use lib_ws::WSController; + use module::mk_modules; pub use module::*; use std::{ @@ -88,6 +86,7 @@ pub struct FlowySDK { pub core: Arc, pub dispatcher: Arc, pub ws_conn: Arc, + pub local_server: Option>, } impl FlowySDK { @@ -96,18 +95,25 @@ impl FlowySDK { init_kv(&config.root); tracing::debug!("🔥 {:?}", config); - let ws_conn = Arc::new(FlowyWebSocketConnect::new( - config.server_config.ws_addr(), - default_web_socket(), - )); - let user_session = mk_user_session(&config, &config.server_config); - let flowy_document = mk_document(&ws_conn, &user_session, &config.server_config); - let core_ctx = mk_core_context(&user_session, &flowy_document, &config.server_config); + let ws_addr = config.server_config.ws_addr(); + let (local_server, ws_conn) = if cfg!(feature = "http_server") { + let ws_conn = Arc::new(FlowyWebSocketConnect::new(ws_addr)); + (None, ws_conn) + } else { + let context = flowy_net::local_server::build_server(&config.server_config); + let local_ws = Arc::new(context.local_ws); + let ws_conn = Arc::new(FlowyWebSocketConnect::from_local(ws_addr, local_ws)); + (Some(Arc::new(context.local_server)), ws_conn) + }; + + let user_session = mk_user_session(&config, &local_server, &config.server_config); + let flowy_document = mk_document(&local_server, &ws_conn, &user_session, &config.server_config); + let core_ctx = mk_core_context(&local_server, &user_session, &flowy_document, &config.server_config); // let modules = mk_modules(&ws_conn, &core_ctx, &user_session); let dispatcher = Arc::new(EventDispatcher::construct(|| modules)); - _init(&dispatcher, &ws_conn, &user_session, &core_ctx); + _init(&local_server, &dispatcher, &ws_conn, &user_session, &core_ctx); Self { config, @@ -116,6 +122,7 @@ impl FlowySDK { core: core_ctx, dispatcher, ws_conn, + local_server, } } @@ -123,6 +130,7 @@ impl FlowySDK { } fn _init( + local_server: &Option>, dispatch: &EventDispatcher, ws_conn: &Arc, user_session: &Arc, @@ -134,8 +142,13 @@ fn _init( let cloned_core = core.clone(); let user_session = user_session.clone(); let ws_conn = ws_conn.clone(); + let local_server = local_server.clone(); dispatch.spawn(async move { + if let Some(local_server) = local_server.as_ref() { + local_server.run(); + } + user_session.init(); ws_conn.init().await; listen_on_websocket(ws_conn.clone()); @@ -206,36 +219,40 @@ fn init_log(config: &FlowySDKConfig) { } } -fn mk_user_session(config: &FlowySDKConfig, server_config: &ClientServerConfiguration) -> Arc { +fn mk_user_session( + config: &FlowySDKConfig, + local_server: &Option>, + server_config: &ClientServerConfiguration, +) -> Arc { let session_cache_key = format!("{}_session_cache", &config.name); let user_config = UserSessionConfig::new(&config.root, &session_cache_key); - let cloud_service = UserDepsResolver::resolve(server_config); + let cloud_service = UserDepsResolver::resolve(local_server, server_config); Arc::new(UserSession::new(user_config, cloud_service)) } fn mk_core_context( + local_server: &Option>, user_session: &Arc, flowy_document: &Arc, server_config: &ClientServerConfiguration, ) -> Arc { - let (user, database, cloud_service) = CoreDepsResolver::resolve(user_session.clone(), server_config); + let (user, database, cloud_service) = + CoreDepsResolver::resolve(local_server.clone(), user_session.clone(), server_config); init_core(user, database, flowy_document.clone(), cloud_service) } -fn default_web_socket() -> Arc { - if cfg!(feature = "http_server") { - Arc::new(Arc::new(WSController::new())) - } else { - Arc::new(LocalWebSocket::default()) - } -} - pub fn mk_document( + local_server: &Option>, ws_conn: &Arc, user_session: &Arc, server_config: &ClientServerConfiguration, ) -> Arc { - let dependencies = DocumentDepsResolver::resolve(ws_conn.clone(), user_session.clone(), server_config); + let dependencies = DocumentDepsResolver::resolve( + local_server.clone(), + ws_conn.clone(), + user_session.clone(), + server_config, + ); Arc::new(DocumentContext::new( dependencies.user, dependencies.ws_receivers, diff --git a/frontend/rust-lib/flowy-user/Cargo.toml b/frontend/rust-lib/flowy-user/Cargo.toml index 01fea2fc7d..78092c9925 100644 --- a/frontend/rust-lib/flowy-user/Cargo.toml +++ b/frontend/rust-lib/flowy-user/Cargo.toml @@ -17,7 +17,6 @@ lib-dispatch = { path = "../lib-dispatch" } flowy-error = { path = "../flowy-error", features = ["db", "backend"] } lib-sqlite = { path = "../lib-sqlite" } - tracing = { version = "0.1", features = ["log"] } bytes = "1.0" serde = { version = "1.0", features = ["derive"] }