2022-12-09 09:19:47 +08:00
use crate ::editor ::{ initial_document_content , AppFlowyDocumentEditor , DocumentRevisionMergeable } ;
2022-10-22 21:57:44 +08:00
use crate ::entities ::{ DocumentVersionPB , EditParams } ;
2022-12-09 09:19:47 +08:00
use crate ::old_editor ::editor ::{ DeltaDocumentEditor , DeltaDocumentRevisionMergeable } ;
2023-02-06 21:42:01 +08:00
use crate ::old_editor ::snapshot ::DeltaDocumentSnapshotPersistence ;
2023-01-12 22:31:39 +08:00
use crate ::services ::rev_sqlite ::{
2023-02-13 09:29:49 +08:00
SQLiteDeltaDocumentRevisionPersistence , SQLiteDocumentRevisionPersistence ,
SQLiteDocumentRevisionSnapshotPersistence ,
2023-01-12 22:31:39 +08:00
} ;
2022-10-22 21:57:44 +08:00
use crate ::services ::DocumentPersistence ;
2022-10-20 11:35:11 +08:00
use crate ::{ errors ::FlowyError , DocumentCloudService } ;
2021-11-09 16:00:09 +08:00
use bytes ::Bytes ;
2023-01-30 11:11:19 +08:00
use document_model ::document ::DocumentId ;
use flowy_client_sync ::client_document ::initial_delta_document_content ;
2022-03-19 16:52:28 +08:00
use flowy_error ::FlowyResult ;
2022-06-09 20:58:56 +08:00
use flowy_revision ::{
2023-02-13 09:29:49 +08:00
RevisionCloudService , RevisionManager , RevisionPersistence , RevisionPersistenceConfiguration ,
RevisionWebSocket ,
2022-06-09 20:58:56 +08:00
} ;
2023-01-31 08:28:31 +08:00
use flowy_sqlite ::ConnectionPool ;
2022-12-09 09:19:47 +08:00
use lib_infra ::async_trait ::async_trait ;
2021-12-13 13:55:44 +08:00
use lib_infra ::future ::FutureResult ;
2022-11-08 11:32:07 +08:00
use lib_infra ::ref_map ::{ RefCountHashMap , RefCountValue } ;
2023-01-30 11:11:19 +08:00
use lib_infra ::util ::md5 ;
2022-10-20 11:35:11 +08:00
use lib_ws ::WSConnectState ;
2023-01-30 11:11:19 +08:00
use revision_model ::Revision ;
2022-10-20 11:35:11 +08:00
use std ::any ::Any ;
2022-12-30 11:16:47 +08:00
use std ::convert ::TryFrom ;
use std ::sync ::Arc ;
2022-11-08 11:32:07 +08:00
use tokio ::sync ::RwLock ;
2023-01-30 11:11:19 +08:00
use ws_model ::ws_revision ::ServerRevisionWSData ;
2021-07-23 14:37:18 +08:00
2022-10-13 23:29:37 +08:00
pub trait DocumentUser : Send + Sync {
2023-02-13 09:29:49 +08:00
fn user_dir ( & self ) -> Result < String , FlowyError > ;
2023-04-04 08:41:16 +08:00
fn user_id ( & self ) -> Result < i64 , FlowyError > ;
2023-02-13 09:29:49 +08:00
fn token ( & self ) -> Result < String , FlowyError > ;
2022-10-22 21:57:44 +08:00
}
pub trait DocumentDatabase : Send + Sync {
2023-02-13 09:29:49 +08:00
fn db_pool ( & self ) -> Result < Arc < ConnectionPool > , FlowyError > ;
2022-01-22 18:48:43 +08:00
}
2022-12-09 09:19:47 +08:00
#[ async_trait ]
2022-10-20 11:35:11 +08:00
pub trait DocumentEditor : Send + Sync {
2023-02-13 09:29:49 +08:00
/// Called when the document get closed
async fn close ( & self ) ;
2022-10-20 11:35:11 +08:00
2023-02-13 09:29:49 +08:00
/// Exports the document content. The content is encoded in the corresponding
/// editor data format.
fn export ( & self ) -> FutureResult < String , FlowyError > ;
2022-10-22 21:57:44 +08:00
2023-02-13 09:29:49 +08:00
/// Duplicate the document inner data into String
fn duplicate ( & self ) -> FutureResult < String , FlowyError > ;
2022-10-22 21:57:44 +08:00
2023-02-13 09:29:49 +08:00
fn receive_ws_data ( & self , data : ServerRevisionWSData ) -> FutureResult < ( ) , FlowyError > ;
2022-10-22 21:57:44 +08:00
2023-02-13 09:29:49 +08:00
fn receive_ws_state ( & self , state : & WSConnectState ) ;
2022-10-20 11:35:11 +08:00
2023-02-13 09:29:49 +08:00
/// Receives the local operations made by the user input. The operations are encoded
/// in binary format.
fn compose_local_operations ( & self , data : Bytes ) -> FutureResult < ( ) , FlowyError > ;
2022-10-22 21:57:44 +08:00
2023-02-13 09:29:49 +08:00
/// Returns the `Any` reference that can be used to downcast back to the original,
/// concrete type.
///
/// The indirection through `as_any` is because using `downcast_ref`
/// on `Box<A>` *directly* only lets us downcast back to `&A` again. You can take a look at [this](https://stackoverflow.com/questions/33687447/how-to-get-a-reference-to-a-concrete-type-from-a-trait-object)
/// for more information.
///
///
fn as_any ( & self ) -> & dyn Any ;
2022-10-20 11:35:11 +08:00
}
#[ derive(Clone, Debug) ]
pub struct DocumentConfig {
2023-02-13 09:29:49 +08:00
pub version : DocumentVersionPB ,
2022-10-22 21:57:44 +08:00
}
impl std ::default ::Default for DocumentConfig {
2023-02-13 09:29:49 +08:00
fn default ( ) -> Self {
Self {
version : DocumentVersionPB ::V1 ,
2022-10-22 21:57:44 +08:00
}
2023-02-13 09:29:49 +08:00
}
2022-10-20 11:35:11 +08:00
}
2022-10-13 23:29:37 +08:00
pub struct DocumentManager {
2023-02-13 09:29:49 +08:00
cloud_service : Arc < dyn DocumentCloudService > ,
rev_web_socket : Arc < dyn RevisionWebSocket > ,
editor_map : Arc < RwLock < RefCountHashMap < RefCountDocumentHandler > > > ,
user : Arc < dyn DocumentUser > ,
persistence : Arc < DocumentPersistence > ,
#[ allow(dead_code) ]
config : DocumentConfig ,
2021-07-23 14:37:18 +08:00
}
2022-10-13 23:29:37 +08:00
impl DocumentManager {
2023-02-13 09:29:49 +08:00
pub fn new (
cloud_service : Arc < dyn DocumentCloudService > ,
document_user : Arc < dyn DocumentUser > ,
database : Arc < dyn DocumentDatabase > ,
rev_web_socket : Arc < dyn RevisionWebSocket > ,
config : DocumentConfig ,
) -> Self {
Self {
cloud_service ,
rev_web_socket ,
editor_map : Arc ::new ( RwLock ::new ( RefCountHashMap ::new ( ) ) ) ,
user : document_user ,
persistence : Arc ::new ( DocumentPersistence ::new ( database ) ) ,
config ,
2022-01-01 14:23:58 +08:00
}
2023-02-13 09:29:49 +08:00
}
/// Called immediately after the application launched with the user sign in/sign up.
#[ tracing::instrument(level = " trace " , skip_all, err) ]
2023-04-04 08:41:16 +08:00
pub async fn initialize ( & self , user_id : i64 ) -> FlowyResult < ( ) > {
2023-02-13 09:29:49 +08:00
self . persistence . initialize ( user_id ) ? ;
listen_ws_state_changed ( self . rev_web_socket . clone ( ) , self . editor_map . clone ( ) ) ;
Ok ( ( ) )
}
2023-04-04 08:41:16 +08:00
pub async fn initialize_with_new_user ( & self , _user_id : i64 , _token : & str ) -> FlowyResult < ( ) > {
2023-02-13 09:29:49 +08:00
Ok ( ( ) )
}
#[ tracing::instrument(level = " trace " , skip_all, fields(document_id), err) ]
pub async fn open_document_editor < T : AsRef < str > > (
& self ,
document_id : T ,
) -> Result < Arc < dyn DocumentEditor > , FlowyError > {
let document_id = document_id . as_ref ( ) ;
tracing ::Span ::current ( ) . record ( " document_id " , document_id ) ;
self . init_document_editor ( document_id ) . await
}
#[ tracing::instrument(level = " trace " , skip(self, editor_id), fields(editor_id), err) ]
pub async fn close_document_editor < T : AsRef < str > > ( & self , editor_id : T ) -> Result < ( ) , FlowyError > {
let editor_id = editor_id . as_ref ( ) ;
tracing ::Span ::current ( ) . record ( " editor_id " , editor_id ) ;
self . editor_map . write ( ) . await . remove ( editor_id ) . await ;
Ok ( ( ) )
}
pub async fn apply_edit ( & self , params : EditParams ) -> FlowyResult < ( ) > {
let editor = self . get_document_editor ( & params . doc_id ) . await ? ;
editor
. compose_local_operations ( Bytes ::from ( params . operations ) )
. await ? ;
Ok ( ( ) )
}
pub async fn create_document < T : AsRef < str > > (
& self ,
doc_id : T ,
revisions : Vec < Revision > ,
) -> FlowyResult < ( ) > {
let doc_id = doc_id . as_ref ( ) . to_owned ( ) ;
let db_pool = self . persistence . database . db_pool ( ) ? ;
// Maybe we could save the document to disk without creating the RevisionManager
let rev_manager = self . make_rev_manager ( & doc_id , db_pool ) ? ;
rev_manager . reset_object ( revisions ) . await ? ;
Ok ( ( ) )
}
pub async fn receive_ws_data ( & self , data : Bytes ) {
let result : Result < ServerRevisionWSData , serde_json ::Error > =
ServerRevisionWSData ::try_from ( data ) ;
match result {
Ok ( data ) = > match self . editor_map . read ( ) . await . get ( & data . object_id ) {
None = > tracing ::error! (
" Can't find any source handler for {:?}-{:?} " ,
data . object_id ,
data . payload
) ,
Some ( handler ) = > match handler . 0. receive_ws_data ( data ) . await {
Ok ( _ ) = > { } ,
Err ( e ) = > tracing ::error! ( " {} " , e ) ,
} ,
} ,
Err ( e ) = > {
tracing ::error! ( " Document ws data parser failed: {:?} " , e ) ;
} ,
2022-01-14 20:52:03 +08:00
}
2023-02-13 09:29:49 +08:00
}
2022-10-20 11:35:11 +08:00
2023-02-13 09:29:49 +08:00
pub fn initial_document_content ( & self ) -> String {
match self . config . version {
DocumentVersionPB ::V0 = > initial_delta_document_content ( ) ,
DocumentVersionPB ::V1 = > initial_document_content ( ) ,
2022-10-20 11:35:11 +08:00
}
2023-02-13 09:29:49 +08:00
}
2022-01-14 20:52:03 +08:00
}
2022-10-13 23:29:37 +08:00
impl DocumentManager {
2023-02-13 09:29:49 +08:00
/// Returns the `DocumentEditor`
///
/// # Arguments
///
/// * `doc_id`: the id of the document
///
/// returns: Result<Arc<DocumentEditor>, FlowyError>
///
async fn get_document_editor ( & self , doc_id : & str ) -> FlowyResult < Arc < dyn DocumentEditor > > {
match self . editor_map . read ( ) . await . get ( doc_id ) {
None = > {
//
tracing ::warn! ( " Should call init_document_editor first " ) ;
self . init_document_editor ( doc_id ) . await
} ,
Some ( handler ) = > Ok ( handler . 0. clone ( ) ) ,
2021-12-08 21:51:06 +08:00
}
2023-02-13 09:29:49 +08:00
}
/// Initializes a document editor with the doc_id
///
/// # Arguments
///
/// * `doc_id`: the id of the document
/// * `pool`: sqlite connection pool
///
/// returns: Result<Arc<DocumentEditor>, FlowyError>
///
#[ tracing::instrument(level = " trace " , skip(self), err) ]
pub async fn init_document_editor (
& self ,
doc_id : & str ,
) -> Result < Arc < dyn DocumentEditor > , FlowyError > {
let pool = self . persistence . database . db_pool ( ) ? ;
let user = self . user . clone ( ) ;
let token = self . user . token ( ) ? ;
let cloud_service = Arc ::new ( DocumentRevisionCloudService {
token ,
server : self . cloud_service . clone ( ) ,
} ) ;
2021-12-08 21:51:06 +08:00
2023-02-13 09:29:49 +08:00
match self . config . version {
DocumentVersionPB ::V0 = > {
let rev_manager = self . make_delta_document_rev_manager ( doc_id , pool . clone ( ) ) ? ;
let editor : Arc < dyn DocumentEditor > = Arc ::new (
DeltaDocumentEditor ::new (
2022-10-22 21:57:44 +08:00
doc_id ,
2023-02-13 09:29:49 +08:00
user ,
rev_manager ,
self . rev_web_socket . clone ( ) ,
cloud_service ,
)
. await ? ,
) ;
self
. editor_map
. write ( )
. await
. insert ( doc_id . to_string ( ) , RefCountDocumentHandler ( editor . clone ( ) ) ) ;
Ok ( editor )
} ,
DocumentVersionPB ::V1 = > {
let rev_manager = self . make_document_rev_manager ( doc_id , pool . clone ( ) ) ? ;
let editor : Arc < dyn DocumentEditor > =
Arc ::new ( AppFlowyDocumentEditor ::new ( doc_id , user , rev_manager , cloud_service ) . await ? ) ;
self
. editor_map
. write ( )
. await
. insert ( doc_id . to_string ( ) , RefCountDocumentHandler ( editor . clone ( ) ) ) ;
Ok ( editor )
} ,
2022-10-22 21:57:44 +08:00
}
2023-02-13 09:29:49 +08:00
}
fn make_rev_manager (
& self ,
doc_id : & str ,
pool : Arc < ConnectionPool > ,
) -> Result < RevisionManager < Arc < ConnectionPool > > , FlowyError > {
match self . config . version {
DocumentVersionPB ::V0 = > self . make_delta_document_rev_manager ( doc_id , pool ) ,
DocumentVersionPB ::V1 = > self . make_document_rev_manager ( doc_id , pool ) ,
2021-07-23 14:37:18 +08:00
}
2023-02-13 09:29:49 +08:00
}
fn make_document_rev_manager (
& self ,
doc_id : & str ,
pool : Arc < ConnectionPool > ,
) -> Result < RevisionManager < Arc < ConnectionPool > > , FlowyError > {
2023-04-04 08:41:16 +08:00
let disk_cache = SQLiteDocumentRevisionPersistence ::new ( pool . clone ( ) ) ;
2023-02-13 09:29:49 +08:00
let configuration = RevisionPersistenceConfiguration ::new ( 200 , true ) ;
2023-04-04 08:41:16 +08:00
let rev_persistence = RevisionPersistence ::new ( doc_id , disk_cache , configuration ) ;
2023-02-13 09:29:49 +08:00
let snapshot_persistence = SQLiteDocumentRevisionSnapshotPersistence ::new ( doc_id , pool ) ;
Ok ( RevisionManager ::new (
doc_id ,
rev_persistence ,
DocumentRevisionMergeable ( ) ,
snapshot_persistence ,
) )
}
fn make_delta_document_rev_manager (
& self ,
doc_id : & str ,
pool : Arc < ConnectionPool > ,
) -> Result < RevisionManager < Arc < ConnectionPool > > , FlowyError > {
2023-04-04 08:41:16 +08:00
let disk_cache = SQLiteDeltaDocumentRevisionPersistence ::new ( pool ) ;
2023-02-13 09:29:49 +08:00
let configuration = RevisionPersistenceConfiguration ::new ( 100 , true ) ;
2023-04-04 08:41:16 +08:00
let rev_persistence = RevisionPersistence ::new ( doc_id , disk_cache , configuration ) ;
2023-02-13 09:29:49 +08:00
Ok ( RevisionManager ::new (
doc_id ,
rev_persistence ,
DeltaDocumentRevisionMergeable ( ) ,
DeltaDocumentSnapshotPersistence ( ) ,
) )
}
2021-09-21 15:07:07 +08:00
}
2021-09-26 16:39:57 +08:00
2022-10-13 23:29:37 +08:00
struct DocumentRevisionCloudService {
2023-02-13 09:29:49 +08:00
token : String ,
server : Arc < dyn DocumentCloudService > ,
2021-10-02 17:19:54 +08:00
}
2022-10-13 23:29:37 +08:00
impl RevisionCloudService for DocumentRevisionCloudService {
2023-02-13 09:29:49 +08:00
#[ tracing::instrument(level = " trace " , skip(self)) ]
fn fetch_object (
& self ,
user_id : & str ,
object_id : & str ,
) -> FutureResult < Vec < Revision > , FlowyError > {
let params : DocumentId = object_id . to_string ( ) . into ( ) ;
let server = self . server . clone ( ) ;
let token = self . token . clone ( ) ;
FutureResult ::new ( async move {
match server . fetch_document ( & token , params ) . await ? {
None = > Err ( FlowyError ::record_not_found ( ) . context ( " Remote doesn't have this document " ) ) ,
Some ( payload ) = > {
let bytes = Bytes ::from ( payload . data . clone ( ) ) ;
let doc_md5 = md5 ( & bytes ) ;
let revision = Revision ::new (
& payload . doc_id ,
payload . base_rev_id ,
payload . rev_id ,
bytes ,
doc_md5 ,
) ;
Ok ( vec! [ revision ] )
} ,
}
} )
}
2021-10-02 17:19:54 +08:00
}
2022-11-08 11:32:07 +08:00
#[ derive(Clone) ]
struct RefCountDocumentHandler ( Arc < dyn DocumentEditor > ) ;
2021-12-10 11:05:23 +08:00
2022-12-09 09:19:47 +08:00
#[ async_trait ]
2022-11-08 11:32:07 +08:00
impl RefCountValue for RefCountDocumentHandler {
2023-02-13 09:29:49 +08:00
async fn did_remove ( & self ) {
self . 0. close ( ) . await ;
}
2022-11-08 11:32:07 +08:00
}
2021-12-10 11:05:23 +08:00
2022-11-08 11:32:07 +08:00
impl std ::ops ::Deref for RefCountDocumentHandler {
2023-02-13 09:29:49 +08:00
type Target = Arc < dyn DocumentEditor > ;
2021-12-10 11:05:23 +08:00
2023-02-13 09:29:49 +08:00
fn deref ( & self ) -> & Self ::Target {
& self . 0
}
2021-09-26 16:39:57 +08:00
}
2021-12-10 11:05:23 +08:00
2022-02-25 22:27:44 +08:00
#[ tracing::instrument(level = " trace " , skip(web_socket, handlers)) ]
2022-11-08 11:32:07 +08:00
fn listen_ws_state_changed (
2023-02-13 09:29:49 +08:00
web_socket : Arc < dyn RevisionWebSocket > ,
handlers : Arc < RwLock < RefCountHashMap < RefCountDocumentHandler > > > ,
2022-11-08 11:32:07 +08:00
) {
2023-02-13 09:29:49 +08:00
tokio ::spawn ( async move {
let mut notify = web_socket . subscribe_state_changed ( ) . await ;
while let Ok ( state ) = notify . recv ( ) . await {
handlers . read ( ) . await . values ( ) . iter ( ) . for_each ( | handler | {
handler . receive_ws_state ( & state ) ;
} )
}
} ) ;
2021-12-25 21:44:45 +08:00
}