2023-08-12 17:36:31 +08:00
use std ::collections ::HashMap ;
2023-08-17 23:46:39 +08:00
use std ::sync ::{ Arc , Weak } ;
2023-05-21 18:53:59 +08:00
2023-10-10 19:05:55 +08:00
use collab_entity ::CollabObject ;
2023-09-13 22:11:51 +08:00
use collab_plugins ::cloud_storage ::{ RemoteCollabStorage , RemoteUpdateSender } ;
2023-08-20 14:13:54 +08:00
use parking_lot ::RwLock ;
2023-05-21 18:53:59 +08:00
2023-07-29 09:46:24 +08:00
use flowy_database_deps ::cloud ::DatabaseCloudService ;
use flowy_document_deps ::cloud ::DocumentCloudService ;
use flowy_folder_deps ::cloud ::FolderCloudService ;
use flowy_server_config ::supabase_config ::SupabaseConfiguration ;
2023-09-01 22:27:29 +08:00
use flowy_storage ::FileStorageService ;
2023-08-24 14:00:34 +08:00
use flowy_user_deps ::cloud ::UserCloudService ;
2023-05-21 18:53:59 +08:00
2023-07-29 09:46:24 +08:00
use crate ::supabase ::api ::{
2023-08-20 14:13:54 +08:00
RESTfulPostgresServer , RealtimeCollabUpdateHandler , RealtimeEventHandler , RealtimeUserHandler ,
SupabaseCollabStorageImpl , SupabaseDatabaseServiceImpl , SupabaseDocumentServiceImpl ,
SupabaseFolderServiceImpl , SupabaseServerServiceImpl , SupabaseUserServiceImpl ,
2023-07-05 20:57:09 +08:00
} ;
2023-08-31 16:40:40 +08:00
use crate ::supabase ::file_storage ::core ::SupabaseFileStorage ;
2023-09-01 22:27:29 +08:00
use crate ::supabase ::file_storage ::FileStoragePlanImpl ;
2023-08-17 23:46:39 +08:00
use crate ::{ AppFlowyEncryption , AppFlowyServer } ;
2023-05-21 18:53:59 +08:00
2023-07-29 09:46:24 +08:00
/// https://www.pgbouncer.org/features.html
/// Only support session mode.
///
/// Session mode:
/// When a new client connects, a connection is assigned to the client until it disconnects. Afterward,
/// the connection is returned back to the pool. All PostgreSQL features can be used with this option.
/// For the moment, the default pool size of pgbouncer in supabase is 15 in session mode. Which means
/// that we can have 15 concurrent connections to the database.
///
/// Transaction mode:
/// This is the suggested option for serverless functions. With this, the connection is only assigned
/// to the client for the duration of a transaction. Once done, the connection is returned to the pool.
/// Two consecutive transactions from the same client could be done over two, different connections.
/// Some session-based PostgreSQL features such as prepared statements are not available with this option.
/// A more comprehensive list of incompatible features can be found here.
///
/// Most of the case, Session mode is faster than Transaction mode(no statement cache(https://github.com/supabase/supavisor/issues/69) and queue transaction).
/// But Transaction mode is more suitable for serverless functions. It can reduce the number of concurrent
/// connections to the database.
/// TODO(nathan): fix prepared statement error when using transaction mode. https://github.com/prisma/prisma/issues/11643
///
#[ derive(Clone, Debug, Default) ]
pub enum PgPoolMode {
#[ default ]
Session ,
Transaction ,
}
impl PgPoolMode {
pub fn support_prepare_cached ( & self ) -> bool {
matches! ( self , PgPoolMode ::Session )
}
}
2023-08-20 14:13:54 +08:00
pub type CollabUpdateSenderByOid = RwLock < HashMap < String , RemoteUpdateSender > > ;
2023-07-05 20:57:09 +08:00
/// Supabase server is used to provide the implementation of the [AppFlowyServer] trait.
/// It contains the configuration of the supabase server and the postgres server.
pub struct SupabaseServer {
#[ allow(dead_code) ]
config : SupabaseConfiguration ,
2023-08-18 15:13:34 +08:00
/// did represents as the device id is used to identify the device that is currently using the app.
2023-08-20 14:13:54 +08:00
device_id : Arc < RwLock < String > > ,
2023-09-01 22:27:29 +08:00
uid : Arc < RwLock < Option < i64 > > > ,
2023-08-20 14:13:54 +08:00
collab_update_sender : Arc < CollabUpdateSenderByOid > ,
2023-07-29 09:46:24 +08:00
restful_postgres : Arc < RwLock < Option < Arc < RESTfulPostgresServer > > > > ,
2023-08-31 16:40:40 +08:00
file_storage : Arc < RwLock < Option < Arc < SupabaseFileStorage > > > > ,
2023-08-17 23:46:39 +08:00
encryption : Weak < dyn AppFlowyEncryption > ,
2023-07-05 20:57:09 +08:00
}
impl SupabaseServer {
2023-08-17 23:46:39 +08:00
pub fn new (
2023-09-01 22:27:29 +08:00
uid : Arc < RwLock < Option < i64 > > > ,
2023-08-17 23:46:39 +08:00
config : SupabaseConfiguration ,
enable_sync : bool ,
2023-08-20 14:13:54 +08:00
device_id : Arc < RwLock < String > > ,
2023-08-17 23:46:39 +08:00
encryption : Weak < dyn AppFlowyEncryption > ,
) -> Self {
2023-08-20 14:13:54 +08:00
let collab_update_sender = Default ::default ( ) ;
2023-08-17 23:46:39 +08:00
let restful_postgres = if enable_sync {
Some ( Arc ::new ( RESTfulPostgresServer ::new (
config . clone ( ) ,
encryption . clone ( ) ,
) ) )
2023-07-14 13:37:13 +08:00
} else {
None
} ;
2023-08-31 16:40:40 +08:00
let file_storage = if enable_sync {
2023-09-01 22:27:29 +08:00
let plan = FileStoragePlanImpl ::new (
Arc ::downgrade ( & uid ) ,
restful_postgres . as_ref ( ) . map ( Arc ::downgrade ) ,
) ;
Some ( Arc ::new (
SupabaseFileStorage ::new ( & config , encryption . clone ( ) , Arc ::new ( plan ) ) . unwrap ( ) ,
) )
2023-08-31 16:40:40 +08:00
} else {
None
} ;
2023-07-05 20:57:09 +08:00
Self {
config ,
2023-08-20 14:13:54 +08:00
device_id ,
collab_update_sender ,
2023-07-29 09:46:24 +08:00
restful_postgres : Arc ::new ( RwLock ::new ( restful_postgres ) ) ,
2023-08-31 16:40:40 +08:00
file_storage : Arc ::new ( RwLock ::new ( file_storage ) ) ,
2023-08-17 23:46:39 +08:00
encryption ,
2023-09-01 22:27:29 +08:00
uid ,
2023-07-14 13:37:13 +08:00
}
}
2023-09-01 22:27:29 +08:00
}
impl AppFlowyServer for SupabaseServer {
fn set_enable_sync ( & self , uid : i64 , enable : bool ) {
tracing ::info! ( " {} supabase sync: {} " , uid , enable ) ;
2023-07-14 13:37:13 +08:00
if enable {
2023-09-01 22:27:29 +08:00
if self . restful_postgres . read ( ) . is_none ( ) {
let postgres = RESTfulPostgresServer ::new ( self . config . clone ( ) , self . encryption . clone ( ) ) ;
* self . restful_postgres . write ( ) = Some ( Arc ::new ( postgres ) ) ;
}
if self . file_storage . read ( ) . is_none ( ) {
let plan = FileStoragePlanImpl ::new (
Arc ::downgrade ( & self . uid ) ,
self . restful_postgres . read ( ) . as_ref ( ) . map ( Arc ::downgrade ) ,
) ;
let file_storage =
SupabaseFileStorage ::new ( & self . config , self . encryption . clone ( ) , Arc ::new ( plan ) ) . unwrap ( ) ;
* self . file_storage . write ( ) = Some ( Arc ::new ( file_storage ) ) ;
2023-07-14 13:37:13 +08:00
}
} else {
2023-07-29 09:46:24 +08:00
* self . restful_postgres . write ( ) = None ;
2023-09-01 22:27:29 +08:00
* self . file_storage . write ( ) = None ;
2023-07-05 20:57:09 +08:00
}
}
2023-07-14 13:37:13 +08:00
2023-08-24 14:00:34 +08:00
fn user_service ( & self ) -> Arc < dyn UserCloudService > {
2023-08-20 14:13:54 +08:00
// handle the realtime collab update event.
2023-11-14 14:01:46 +08:00
let ( user_update_tx , user_update_rx ) = tokio ::sync ::mpsc ::channel ( 1 ) ;
2023-08-20 14:13:54 +08:00
let collab_update_handler = Box ::new ( RealtimeCollabUpdateHandler ::new (
Arc ::downgrade ( & self . collab_update_sender ) ,
self . device_id . clone ( ) ,
self . encryption . clone ( ) ,
) ) ;
// handle the realtime user event.
let user_handler = Box ::new ( RealtimeUserHandler ( user_update_tx . clone ( ) ) ) ;
let handlers : Vec < Box < dyn RealtimeEventHandler > > = vec! [ collab_update_handler , user_handler ] ;
Arc ::new ( SupabaseUserServiceImpl ::new (
SupabaseServerServiceImpl ( self . restful_postgres . clone ( ) ) ,
handlers ,
2023-11-14 14:01:46 +08:00
Some ( user_update_rx ) ,
2023-08-20 14:13:54 +08:00
) )
2023-07-05 20:57:09 +08:00
}
fn folder_service ( & self ) -> Arc < dyn FolderCloudService > {
2023-08-05 15:02:05 +08:00
Arc ::new ( SupabaseFolderServiceImpl ::new ( SupabaseServerServiceImpl (
self . restful_postgres . clone ( ) ,
) ) )
2023-07-05 20:57:09 +08:00
}
fn database_service ( & self ) -> Arc < dyn DatabaseCloudService > {
2023-08-05 15:02:05 +08:00
Arc ::new ( SupabaseDatabaseServiceImpl ::new ( SupabaseServerServiceImpl (
self . restful_postgres . clone ( ) ,
) ) )
2023-07-05 20:57:09 +08:00
}
fn document_service ( & self ) -> Arc < dyn DocumentCloudService > {
2023-08-05 15:02:05 +08:00
Arc ::new ( SupabaseDocumentServiceImpl ::new ( SupabaseServerServiceImpl (
self . restful_postgres . clone ( ) ,
) ) )
2023-05-21 18:53:59 +08:00
}
2023-05-23 23:55:21 +08:00
2023-08-12 17:36:31 +08:00
fn collab_storage ( & self , collab_object : & CollabObject ) -> Option < Arc < dyn RemoteCollabStorage > > {
let ( tx , rx ) = tokio ::sync ::mpsc ::unbounded_channel ( ) ;
self
2023-08-20 14:13:54 +08:00
. collab_update_sender
2023-08-12 17:36:31 +08:00
. write ( )
. insert ( collab_object . object_id . clone ( ) , tx ) ;
2023-08-20 14:13:54 +08:00
2023-08-05 15:02:05 +08:00
Some ( Arc ::new ( SupabaseCollabStorageImpl ::new (
2023-07-29 09:46:24 +08:00
SupabaseServerServiceImpl ( self . restful_postgres . clone ( ) ) ,
2023-08-12 17:36:31 +08:00
Some ( rx ) ,
2023-08-17 23:46:39 +08:00
self . encryption . clone ( ) ,
2023-07-14 13:37:13 +08:00
) ) )
}
2023-08-31 16:40:40 +08:00
fn file_storage ( & self ) -> Option < Arc < dyn FileStorageService > > {
self
. file_storage
. read ( )
. clone ( )
. map ( | s | s as Arc < dyn FileStorageService > )
}
2023-07-14 13:37:13 +08:00
}