2023-10-02 17:22:22 +08:00
|
|
|
use std::collections::HashMap;
|
2023-08-28 13:28:24 +08:00
|
|
|
use std::future::Future;
|
|
|
|
|
use std::iter::Take;
|
|
|
|
|
use std::pin::Pin;
|
2023-07-29 09:46:24 +08:00
|
|
|
use std::str::FromStr;
|
2023-08-20 14:13:54 +08:00
|
|
|
use std::sync::{Arc, Weak};
|
2023-08-28 13:28:24 +08:00
|
|
|
use std::time::Duration;
|
2023-07-29 09:46:24 +08:00
|
|
|
|
|
|
|
|
use anyhow::Error;
|
2023-08-22 00:19:15 +08:00
|
|
|
use collab::core::collab::MutexCollab;
|
|
|
|
|
use collab::core::origin::CollabOrigin;
|
2023-09-13 22:11:51 +08:00
|
|
|
use collab_define::{CollabObject, CollabType};
|
2023-08-20 14:13:54 +08:00
|
|
|
use parking_lot::RwLock;
|
|
|
|
|
use serde_json::Value;
|
2023-08-14 12:57:59 +08:00
|
|
|
use tokio::sync::oneshot::channel;
|
2023-08-28 13:28:24 +08:00
|
|
|
use tokio_retry::strategy::FixedInterval;
|
|
|
|
|
use tokio_retry::{Action, RetryIf};
|
2023-07-29 09:46:24 +08:00
|
|
|
use uuid::Uuid;
|
|
|
|
|
|
2023-08-22 00:19:15 +08:00
|
|
|
use flowy_folder_deps::cloud::{Folder, Workspace};
|
2023-07-29 09:46:24 +08:00
|
|
|
use flowy_user_deps::cloud::*;
|
|
|
|
|
use flowy_user_deps::entities::*;
|
2023-08-07 22:24:04 +08:00
|
|
|
use flowy_user_deps::DEFAULT_USER_NAME;
|
2023-07-29 09:46:24 +08:00
|
|
|
use lib_infra::box_any::BoxAny;
|
|
|
|
|
use lib_infra::future::FutureResult;
|
2023-08-22 00:19:15 +08:00
|
|
|
use lib_infra::util::timestamp;
|
2023-07-29 09:46:24 +08:00
|
|
|
|
2023-08-31 16:40:40 +08:00
|
|
|
use crate::response::ExtendedResponse;
|
2023-08-28 13:28:24 +08:00
|
|
|
use crate::supabase::api::request::{
|
|
|
|
|
get_updates_from_server, FetchObjectUpdateAction, RetryCondition,
|
|
|
|
|
};
|
2023-08-20 14:13:54 +08:00
|
|
|
use crate::supabase::api::util::{
|
2023-08-31 16:40:40 +08:00
|
|
|
InsertParamsBuilder, RealtimeBinaryColumnDecoder, SupabaseBinaryColumnDecoder,
|
2023-08-20 14:13:54 +08:00
|
|
|
};
|
2023-08-28 13:28:24 +08:00
|
|
|
use crate::supabase::api::{flush_collab_with_update, PostgresWrapper, SupabaseServerService};
|
2023-07-29 09:46:24 +08:00
|
|
|
use crate::supabase::define::*;
|
|
|
|
|
use crate::supabase::entities::UserProfileResponse;
|
2023-08-20 14:13:54 +08:00
|
|
|
use crate::supabase::entities::{GetUserProfileParams, RealtimeUserEvent};
|
|
|
|
|
use crate::supabase::entities::{RealtimeCollabUpdateEvent, RealtimeEvent, UidResponse};
|
|
|
|
|
use crate::supabase::CollabUpdateSenderByOid;
|
|
|
|
|
use crate::AppFlowyEncryption;
|
2023-07-29 09:46:24 +08:00
|
|
|
|
2023-08-14 12:57:59 +08:00
|
|
|
pub struct SupabaseUserServiceImpl<T> {
|
2023-07-29 09:46:24 +08:00
|
|
|
server: T,
|
2023-08-20 14:13:54 +08:00
|
|
|
realtime_event_handlers: Vec<Box<dyn RealtimeEventHandler>>,
|
|
|
|
|
user_update_tx: Option<UserUpdateSender>,
|
2023-07-29 09:46:24 +08:00
|
|
|
}
|
|
|
|
|
|
2023-08-14 12:57:59 +08:00
|
|
|
impl<T> SupabaseUserServiceImpl<T> {
|
2023-08-20 14:13:54 +08:00
|
|
|
pub fn new(
|
|
|
|
|
server: T,
|
|
|
|
|
realtime_event_handlers: Vec<Box<dyn RealtimeEventHandler>>,
|
|
|
|
|
user_update_tx: Option<UserUpdateSender>,
|
|
|
|
|
) -> Self {
|
|
|
|
|
Self {
|
|
|
|
|
server,
|
|
|
|
|
realtime_event_handlers,
|
|
|
|
|
user_update_tx,
|
|
|
|
|
}
|
2023-07-29 09:46:24 +08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2023-08-24 14:00:34 +08:00
|
|
|
impl<T> UserCloudService for SupabaseUserServiceImpl<T>
|
2023-07-29 09:46:24 +08:00
|
|
|
where
|
|
|
|
|
T: SupabaseServerService,
|
|
|
|
|
{
|
2023-10-02 17:22:22 +08:00
|
|
|
fn sign_up(&self, params: BoxAny) -> FutureResult<AuthResponse, Error> {
|
2023-07-29 09:46:24 +08:00
|
|
|
let try_get_postgrest = self.server.try_get_postgrest();
|
|
|
|
|
FutureResult::new(async move {
|
|
|
|
|
let postgrest = try_get_postgrest?;
|
2023-10-02 17:22:22 +08:00
|
|
|
let params = oauth_params_from_box_any(params)?;
|
2023-07-29 09:46:24 +08:00
|
|
|
let is_new_user = postgrest
|
|
|
|
|
.from(USER_TABLE)
|
|
|
|
|
.select("uid")
|
|
|
|
|
.eq("uuid", params.uuid.to_string())
|
|
|
|
|
.execute()
|
|
|
|
|
.await?
|
|
|
|
|
.get_value::<Vec<UidResponse>>()
|
|
|
|
|
.await?
|
|
|
|
|
.is_empty();
|
|
|
|
|
|
|
|
|
|
// Insert the user if it's a new user. After the user is inserted, we can query the user profile
|
|
|
|
|
// and workspaces. The profile and workspaces are created by the database trigger.
|
|
|
|
|
if is_new_user {
|
|
|
|
|
let insert_params = InsertParamsBuilder::new()
|
|
|
|
|
.insert(USER_UUID, params.uuid.to_string())
|
|
|
|
|
.insert(USER_EMAIL, params.email)
|
|
|
|
|
.build();
|
|
|
|
|
let resp = postgrest
|
|
|
|
|
.from(USER_TABLE)
|
|
|
|
|
.insert(insert_params)
|
|
|
|
|
.execute()
|
|
|
|
|
.await?
|
|
|
|
|
.success_with_body()
|
|
|
|
|
.await?;
|
|
|
|
|
tracing::debug!("Create user response: {:?}", resp);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Query the user profile and workspaces
|
2023-08-20 14:13:54 +08:00
|
|
|
tracing::debug!(
|
|
|
|
|
"user uuid: {}, device_id: {}",
|
|
|
|
|
params.uuid,
|
|
|
|
|
params.device_id
|
|
|
|
|
);
|
2023-07-29 09:46:24 +08:00
|
|
|
let user_profile =
|
|
|
|
|
get_user_profile(postgrest.clone(), GetUserProfileParams::Uuid(params.uuid))
|
|
|
|
|
.await?
|
|
|
|
|
.unwrap();
|
|
|
|
|
let user_workspaces = get_user_workspaces(postgrest.clone(), user_profile.uid).await?;
|
|
|
|
|
let latest_workspace = user_workspaces
|
|
|
|
|
.iter()
|
|
|
|
|
.find(|user_workspace| user_workspace.id == user_profile.latest_workspace_id)
|
|
|
|
|
.cloned();
|
|
|
|
|
|
2023-08-07 22:24:04 +08:00
|
|
|
let user_name = if user_profile.name.is_empty() {
|
|
|
|
|
DEFAULT_USER_NAME()
|
|
|
|
|
} else {
|
|
|
|
|
user_profile.name
|
|
|
|
|
};
|
|
|
|
|
|
2023-10-02 17:22:22 +08:00
|
|
|
Ok(AuthResponse {
|
2023-07-29 09:46:24 +08:00
|
|
|
user_id: user_profile.uid,
|
2023-08-07 22:24:04 +08:00
|
|
|
name: user_name,
|
2023-07-29 09:46:24 +08:00
|
|
|
latest_workspace: latest_workspace.unwrap(),
|
|
|
|
|
user_workspaces,
|
2023-08-17 23:46:39 +08:00
|
|
|
is_new_user,
|
2023-07-29 09:46:24 +08:00
|
|
|
email: Some(user_profile.email),
|
|
|
|
|
token: None,
|
2023-08-12 17:36:31 +08:00
|
|
|
device_id: params.device_id,
|
2023-08-17 23:46:39 +08:00
|
|
|
encryption_type: EncryptionType::from_sign(&user_profile.encryption_sign),
|
2023-07-29 09:46:24 +08:00
|
|
|
})
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
2023-10-02 17:22:22 +08:00
|
|
|
fn sign_in(&self, params: BoxAny) -> FutureResult<AuthResponse, Error> {
|
2023-07-29 09:46:24 +08:00
|
|
|
let try_get_postgrest = self.server.try_get_postgrest();
|
|
|
|
|
FutureResult::new(async move {
|
|
|
|
|
let postgrest = try_get_postgrest?;
|
2023-10-02 17:22:22 +08:00
|
|
|
let params = oauth_params_from_box_any(params)?;
|
2023-07-29 09:46:24 +08:00
|
|
|
let uuid = params.uuid;
|
2023-08-17 23:46:39 +08:00
|
|
|
let response = get_user_profile(postgrest.clone(), GetUserProfileParams::Uuid(uuid))
|
2023-07-29 09:46:24 +08:00
|
|
|
.await?
|
|
|
|
|
.unwrap();
|
2023-08-17 23:46:39 +08:00
|
|
|
let user_workspaces = get_user_workspaces(postgrest.clone(), response.uid).await?;
|
2023-07-29 09:46:24 +08:00
|
|
|
let latest_workspace = user_workspaces
|
|
|
|
|
.iter()
|
2023-08-17 23:46:39 +08:00
|
|
|
.find(|user_workspace| user_workspace.id == response.latest_workspace_id)
|
2023-07-29 09:46:24 +08:00
|
|
|
.cloned();
|
2023-08-07 22:24:04 +08:00
|
|
|
|
2023-10-02 17:22:22 +08:00
|
|
|
Ok(AuthResponse {
|
2023-08-17 23:46:39 +08:00
|
|
|
user_id: response.uid,
|
2023-08-07 22:24:04 +08:00
|
|
|
name: DEFAULT_USER_NAME(),
|
2023-07-29 09:46:24 +08:00
|
|
|
latest_workspace: latest_workspace.unwrap(),
|
|
|
|
|
user_workspaces,
|
2023-10-02 17:22:22 +08:00
|
|
|
is_new_user: false,
|
2023-07-29 09:46:24 +08:00
|
|
|
email: None,
|
|
|
|
|
token: None,
|
2023-08-12 17:36:31 +08:00
|
|
|
device_id: params.device_id,
|
2023-08-17 23:46:39 +08:00
|
|
|
encryption_type: EncryptionType::from_sign(&response.encryption_sign),
|
2023-07-29 09:46:24 +08:00
|
|
|
})
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn sign_out(&self, _token: Option<String>) -> FutureResult<(), Error> {
|
|
|
|
|
FutureResult::new(async { Ok(()) })
|
|
|
|
|
}
|
|
|
|
|
|
2023-10-07 09:58:44 +08:00
|
|
|
fn generate_sign_in_url_with_email(&self, _email: &str) -> FutureResult<String, Error> {
|
2023-10-02 17:22:22 +08:00
|
|
|
FutureResult::new(async {
|
|
|
|
|
Err(anyhow::anyhow!(
|
|
|
|
|
"Can't generate callback url when using supabase"
|
|
|
|
|
))
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
2023-10-07 09:58:44 +08:00
|
|
|
fn generate_oauth_url_with_provider(&self, _provider: &str) -> FutureResult<String, Error> {
|
|
|
|
|
FutureResult::new(async {
|
|
|
|
|
Err(anyhow::anyhow!(
|
|
|
|
|
"Can't generate oauth url when using supabase"
|
|
|
|
|
))
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
2023-07-29 09:46:24 +08:00
|
|
|
fn update_user(
|
|
|
|
|
&self,
|
|
|
|
|
_credential: UserCredentials,
|
|
|
|
|
params: UpdateUserProfileParams,
|
|
|
|
|
) -> FutureResult<(), Error> {
|
|
|
|
|
let try_get_postgrest = self.server.try_get_postgrest();
|
|
|
|
|
FutureResult::new(async move {
|
|
|
|
|
let postgrest = try_get_postgrest?;
|
|
|
|
|
update_user_profile(postgrest, params).await?;
|
|
|
|
|
Ok(())
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn get_user_profile(
|
|
|
|
|
&self,
|
|
|
|
|
credential: UserCredentials,
|
|
|
|
|
) -> FutureResult<Option<UserProfile>, Error> {
|
|
|
|
|
let try_get_postgrest = self.server.try_get_postgrest();
|
|
|
|
|
let uid = credential
|
|
|
|
|
.uid
|
|
|
|
|
.ok_or(anyhow::anyhow!("uid is required"))
|
|
|
|
|
.unwrap();
|
|
|
|
|
FutureResult::new(async move {
|
|
|
|
|
let postgrest = try_get_postgrest?;
|
|
|
|
|
let user_profile_resp = get_user_profile(postgrest, GetUserProfileParams::Uid(uid)).await?;
|
|
|
|
|
match user_profile_resp {
|
|
|
|
|
None => Ok(None),
|
2023-08-17 23:46:39 +08:00
|
|
|
Some(response) => Ok(Some(UserProfile {
|
|
|
|
|
uid: response.uid,
|
|
|
|
|
email: response.email,
|
|
|
|
|
name: response.name,
|
2023-07-29 09:46:24 +08:00
|
|
|
token: "".to_string(),
|
|
|
|
|
icon_url: "".to_string(),
|
|
|
|
|
openai_key: "".to_string(),
|
2023-10-09 23:14:24 +08:00
|
|
|
stability_ai_key: "".to_string(),
|
2023-08-17 23:46:39 +08:00
|
|
|
workspace_id: response.latest_workspace_id,
|
2023-07-29 09:46:24 +08:00
|
|
|
auth_type: AuthType::Supabase,
|
2023-08-17 23:46:39 +08:00
|
|
|
encryption_type: EncryptionType::from_sign(&response.encryption_sign),
|
2023-07-29 09:46:24 +08:00
|
|
|
})),
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn get_user_workspaces(&self, uid: i64) -> FutureResult<Vec<UserWorkspace>, Error> {
|
|
|
|
|
let try_get_postgrest = self.server.try_get_postgrest();
|
|
|
|
|
FutureResult::new(async move {
|
|
|
|
|
let postgrest = try_get_postgrest?;
|
|
|
|
|
let user_workspaces = get_user_workspaces(postgrest, uid).await?;
|
|
|
|
|
Ok(user_workspaces)
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn check_user(&self, credential: UserCredentials) -> FutureResult<(), Error> {
|
|
|
|
|
let try_get_postgrest = self.server.try_get_postgrest();
|
|
|
|
|
let uuid = credential.uuid.and_then(|uuid| Uuid::from_str(&uuid).ok());
|
|
|
|
|
let uid = credential.uid;
|
|
|
|
|
FutureResult::new(async move {
|
|
|
|
|
let postgrest = try_get_postgrest?;
|
|
|
|
|
check_user(postgrest, uid, uuid).await?;
|
|
|
|
|
Ok(())
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn add_workspace_member(
|
|
|
|
|
&self,
|
|
|
|
|
_user_email: String,
|
|
|
|
|
_workspace_id: String,
|
|
|
|
|
) -> FutureResult<(), Error> {
|
|
|
|
|
todo!()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn remove_workspace_member(
|
|
|
|
|
&self,
|
|
|
|
|
_user_email: String,
|
|
|
|
|
_workspace_id: String,
|
|
|
|
|
) -> FutureResult<(), Error> {
|
|
|
|
|
todo!()
|
|
|
|
|
}
|
2023-08-14 12:57:59 +08:00
|
|
|
|
|
|
|
|
fn get_user_awareness_updates(&self, uid: i64) -> FutureResult<Vec<Vec<u8>>, Error> {
|
|
|
|
|
let try_get_postgrest = self.server.try_get_weak_postgrest();
|
|
|
|
|
let awareness_id = uid.to_string();
|
|
|
|
|
let (tx, rx) = channel();
|
|
|
|
|
tokio::spawn(async move {
|
|
|
|
|
tx.send(
|
|
|
|
|
async move {
|
|
|
|
|
let postgrest = try_get_postgrest?;
|
|
|
|
|
let action =
|
|
|
|
|
FetchObjectUpdateAction::new(awareness_id, CollabType::UserAwareness, postgrest);
|
2023-08-17 23:46:39 +08:00
|
|
|
action.run_with_fix_interval(3, 3).await
|
2023-08-14 12:57:59 +08:00
|
|
|
}
|
|
|
|
|
.await,
|
|
|
|
|
)
|
|
|
|
|
});
|
|
|
|
|
FutureResult::new(async { rx.await? })
|
|
|
|
|
}
|
2023-08-18 15:13:34 +08:00
|
|
|
|
2023-08-20 14:13:54 +08:00
|
|
|
fn receive_realtime_event(&self, json: Value) {
|
|
|
|
|
match serde_json::from_value::<RealtimeEvent>(json) {
|
|
|
|
|
Ok(event) => {
|
|
|
|
|
tracing::trace!("Realtime event: {}", event);
|
|
|
|
|
for handler in &self.realtime_event_handlers {
|
|
|
|
|
if event.table.as_str().starts_with(handler.table_name()) {
|
|
|
|
|
handler.handler_event(&event);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
Err(e) => {
|
|
|
|
|
tracing::error!("parser realtime event error: {}", e);
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn subscribe_user_update(&self) -> Option<UserUpdateReceiver> {
|
|
|
|
|
self.user_update_tx.as_ref().map(|tx| tx.subscribe())
|
|
|
|
|
}
|
|
|
|
|
|
2023-08-22 00:19:15 +08:00
|
|
|
fn reset_workspace(&self, collab_object: CollabObject) -> FutureResult<(), Error> {
|
2023-08-28 13:28:24 +08:00
|
|
|
let collab_object = collab_object;
|
2023-08-22 00:19:15 +08:00
|
|
|
|
|
|
|
|
let try_get_postgrest = self.server.try_get_weak_postgrest();
|
|
|
|
|
let (tx, rx) = channel();
|
|
|
|
|
let init_update = empty_workspace_update(&collab_object);
|
|
|
|
|
tokio::spawn(async move {
|
|
|
|
|
tx.send(
|
|
|
|
|
async move {
|
|
|
|
|
let postgrest = try_get_postgrest?
|
|
|
|
|
.upgrade()
|
|
|
|
|
.ok_or(anyhow::anyhow!("postgrest is not available"))?;
|
|
|
|
|
|
2023-09-17 17:14:34 +08:00
|
|
|
let updates = get_updates_from_server(
|
|
|
|
|
&collab_object.object_id,
|
|
|
|
|
&collab_object.collab_type,
|
|
|
|
|
&postgrest,
|
|
|
|
|
)
|
|
|
|
|
.await?;
|
2023-08-22 00:19:15 +08:00
|
|
|
|
|
|
|
|
flush_collab_with_update(
|
|
|
|
|
&collab_object,
|
|
|
|
|
updates,
|
|
|
|
|
&postgrest,
|
|
|
|
|
init_update,
|
|
|
|
|
postgrest.secret(),
|
|
|
|
|
)
|
|
|
|
|
.await?;
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
.await,
|
|
|
|
|
)
|
|
|
|
|
});
|
|
|
|
|
FutureResult::new(async { rx.await? })
|
|
|
|
|
}
|
|
|
|
|
|
2023-08-18 15:13:34 +08:00
|
|
|
fn create_collab_object(
|
|
|
|
|
&self,
|
|
|
|
|
collab_object: &CollabObject,
|
2023-08-28 13:28:24 +08:00
|
|
|
update: Vec<u8>,
|
2023-08-18 15:13:34 +08:00
|
|
|
) -> FutureResult<(), Error> {
|
|
|
|
|
let try_get_postgrest = self.server.try_get_weak_postgrest();
|
|
|
|
|
let cloned_collab_object = collab_object.clone();
|
|
|
|
|
let (tx, rx) = channel();
|
|
|
|
|
tokio::spawn(async move {
|
|
|
|
|
tx.send(
|
|
|
|
|
async move {
|
2023-08-28 13:28:24 +08:00
|
|
|
CreateCollabAction::new(cloned_collab_object, try_get_postgrest?, update)
|
|
|
|
|
.run()
|
|
|
|
|
.await?;
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
.await,
|
|
|
|
|
)
|
|
|
|
|
});
|
|
|
|
|
FutureResult::new(async { rx.await? })
|
|
|
|
|
}
|
|
|
|
|
}
|
2023-08-18 15:13:34 +08:00
|
|
|
|
2023-08-28 13:28:24 +08:00
|
|
|
pub struct CreateCollabAction {
|
|
|
|
|
collab_object: CollabObject,
|
|
|
|
|
postgrest: Weak<PostgresWrapper>,
|
|
|
|
|
update: Vec<u8>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl CreateCollabAction {
|
|
|
|
|
pub fn new(
|
|
|
|
|
collab_object: CollabObject,
|
|
|
|
|
postgrest: Weak<PostgresWrapper>,
|
|
|
|
|
update: Vec<u8>,
|
|
|
|
|
) -> Self {
|
|
|
|
|
Self {
|
|
|
|
|
collab_object,
|
|
|
|
|
postgrest,
|
|
|
|
|
update,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn run(self) -> RetryIf<Take<FixedInterval>, CreateCollabAction, RetryCondition> {
|
|
|
|
|
let postgrest = self.postgrest.clone();
|
|
|
|
|
let retry_strategy = FixedInterval::new(Duration::from_secs(2)).take(3);
|
|
|
|
|
RetryIf::spawn(retry_strategy, self, RetryCondition(postgrest))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl Action for CreateCollabAction {
|
|
|
|
|
type Future = Pin<Box<dyn Future<Output = Result<Self::Item, Self::Error>> + Send>>;
|
|
|
|
|
type Item = ();
|
|
|
|
|
type Error = anyhow::Error;
|
2023-08-18 15:13:34 +08:00
|
|
|
|
2023-08-28 13:28:24 +08:00
|
|
|
fn run(&mut self) -> Self::Future {
|
|
|
|
|
let weak_postgres = self.postgrest.clone();
|
|
|
|
|
let cloned_collab_object = self.collab_object.clone();
|
|
|
|
|
let cloned_update = self.update.clone();
|
|
|
|
|
Box::pin(async move {
|
|
|
|
|
match weak_postgres.upgrade() {
|
|
|
|
|
None => Ok(()),
|
|
|
|
|
Some(postgrest) => {
|
|
|
|
|
let secret = postgrest.secret();
|
|
|
|
|
flush_collab_with_update(
|
2023-08-18 15:13:34 +08:00
|
|
|
&cloned_collab_object,
|
2023-08-28 13:28:24 +08:00
|
|
|
vec![],
|
2023-08-18 15:13:34 +08:00
|
|
|
&postgrest,
|
2023-08-28 13:28:24 +08:00
|
|
|
cloned_update,
|
|
|
|
|
secret,
|
2023-08-18 15:13:34 +08:00
|
|
|
)
|
|
|
|
|
.await?;
|
|
|
|
|
Ok(())
|
2023-08-28 13:28:24 +08:00
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
})
|
2023-08-18 15:13:34 +08:00
|
|
|
}
|
2023-07-29 09:46:24 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn get_user_profile(
|
|
|
|
|
postgrest: Arc<PostgresWrapper>,
|
|
|
|
|
params: GetUserProfileParams,
|
|
|
|
|
) -> Result<Option<UserProfileResponse>, Error> {
|
|
|
|
|
let mut builder = postgrest
|
|
|
|
|
.from(USER_PROFILE_VIEW)
|
2023-08-17 23:46:39 +08:00
|
|
|
.select("uid, email, name, encryption_sign, latest_workspace_id");
|
2023-07-29 09:46:24 +08:00
|
|
|
|
|
|
|
|
match params {
|
|
|
|
|
GetUserProfileParams::Uid(uid) => builder = builder.eq("uid", uid.to_string()),
|
|
|
|
|
GetUserProfileParams::Uuid(uuid) => builder = builder.eq("uuid", uuid.to_string()),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let mut profiles = builder
|
|
|
|
|
.execute()
|
|
|
|
|
.await?
|
|
|
|
|
.error_for_status()?
|
|
|
|
|
.get_value::<Vec<UserProfileResponse>>()
|
|
|
|
|
.await?;
|
|
|
|
|
match profiles.len() {
|
|
|
|
|
0 => Ok(None),
|
|
|
|
|
1 => Ok(Some(profiles.swap_remove(0))),
|
2023-08-17 23:46:39 +08:00
|
|
|
_ => {
|
|
|
|
|
tracing::error!("multiple user profile found");
|
|
|
|
|
Ok(None)
|
|
|
|
|
},
|
2023-07-29 09:46:24 +08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn get_user_workspaces(
|
|
|
|
|
postgrest: Arc<PostgresWrapper>,
|
|
|
|
|
uid: i64,
|
|
|
|
|
) -> Result<Vec<UserWorkspace>, Error> {
|
|
|
|
|
postgrest
|
|
|
|
|
.from(WORKSPACE_TABLE)
|
|
|
|
|
.select("id:workspace_id, name:workspace_name, created_at, database_storage_id")
|
|
|
|
|
.eq("owner_uid", uid.to_string())
|
|
|
|
|
.execute()
|
|
|
|
|
.await?
|
|
|
|
|
.error_for_status()?
|
|
|
|
|
.get_value::<Vec<UserWorkspace>>()
|
|
|
|
|
.await
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn update_user_profile(
|
|
|
|
|
postgrest: Arc<PostgresWrapper>,
|
|
|
|
|
params: UpdateUserProfileParams,
|
|
|
|
|
) -> Result<(), Error> {
|
|
|
|
|
if params.is_empty() {
|
|
|
|
|
anyhow::bail!("no params to update");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// check if user exists
|
|
|
|
|
let exists = !postgrest
|
|
|
|
|
.from(USER_TABLE)
|
|
|
|
|
.select("uid")
|
2023-08-17 23:46:39 +08:00
|
|
|
.eq("uid", params.uid.to_string())
|
2023-07-29 09:46:24 +08:00
|
|
|
.execute()
|
|
|
|
|
.await?
|
|
|
|
|
.error_for_status()?
|
|
|
|
|
.get_value::<Vec<UidResponse>>()
|
|
|
|
|
.await?
|
|
|
|
|
.is_empty();
|
|
|
|
|
if !exists {
|
2023-08-17 23:46:39 +08:00
|
|
|
anyhow::bail!("user uid {} does not exist", params.uid);
|
2023-07-29 09:46:24 +08:00
|
|
|
}
|
|
|
|
|
let mut update_params = serde_json::Map::new();
|
|
|
|
|
if let Some(name) = params.name {
|
|
|
|
|
update_params.insert("name".to_string(), serde_json::json!(name));
|
|
|
|
|
}
|
|
|
|
|
if let Some(email) = params.email {
|
|
|
|
|
update_params.insert("email".to_string(), serde_json::json!(email));
|
|
|
|
|
}
|
2023-08-17 23:46:39 +08:00
|
|
|
if let Some(encrypt_sign) = params.encryption_sign {
|
|
|
|
|
update_params.insert(
|
|
|
|
|
"encryption_sign".to_string(),
|
|
|
|
|
serde_json::json!(encrypt_sign),
|
|
|
|
|
);
|
|
|
|
|
}
|
2023-07-29 09:46:24 +08:00
|
|
|
|
2023-08-17 23:46:39 +08:00
|
|
|
let update_payload = serde_json::to_string(&update_params).unwrap();
|
2023-07-29 09:46:24 +08:00
|
|
|
let resp = postgrest
|
|
|
|
|
.from(USER_TABLE)
|
|
|
|
|
.update(update_payload)
|
2023-08-17 23:46:39 +08:00
|
|
|
.eq("uid", params.uid.to_string())
|
2023-07-29 09:46:24 +08:00
|
|
|
.execute()
|
|
|
|
|
.await?
|
|
|
|
|
.success_with_body()
|
|
|
|
|
.await?;
|
|
|
|
|
|
2023-08-17 23:46:39 +08:00
|
|
|
tracing::trace!("update user profile resp: {:?}", resp);
|
2023-07-29 09:46:24 +08:00
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn check_user(
|
|
|
|
|
postgrest: Arc<PostgresWrapper>,
|
|
|
|
|
uid: Option<i64>,
|
|
|
|
|
uuid: Option<Uuid>,
|
|
|
|
|
) -> Result<(), Error> {
|
|
|
|
|
let mut builder = postgrest.from(USER_TABLE);
|
|
|
|
|
|
|
|
|
|
if let Some(uid) = uid {
|
|
|
|
|
builder = builder.eq("uid", uid.to_string());
|
|
|
|
|
} else if let Some(uuid) = uuid {
|
|
|
|
|
builder = builder.eq("uuid", uuid.to_string());
|
|
|
|
|
} else {
|
|
|
|
|
anyhow::bail!("uid or uuid is required");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let exists = !builder
|
|
|
|
|
.execute()
|
|
|
|
|
.await?
|
|
|
|
|
.error_for_status()?
|
|
|
|
|
.get_value::<Vec<UidResponse>>()
|
|
|
|
|
.await?
|
|
|
|
|
.is_empty();
|
|
|
|
|
if !exists {
|
|
|
|
|
anyhow::bail!("user does not exist, uid: {:?}, uuid: {:?}", uid, uuid);
|
|
|
|
|
}
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
2023-08-20 14:13:54 +08:00
|
|
|
|
|
|
|
|
pub trait RealtimeEventHandler: Send + Sync + 'static {
|
|
|
|
|
fn table_name(&self) -> &str;
|
|
|
|
|
|
|
|
|
|
fn handler_event(&self, event: &RealtimeEvent);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub struct RealtimeUserHandler(pub UserUpdateSender);
|
|
|
|
|
impl RealtimeEventHandler for RealtimeUserHandler {
|
|
|
|
|
fn table_name(&self) -> &str {
|
|
|
|
|
"af_user"
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn handler_event(&self, event: &RealtimeEvent) {
|
|
|
|
|
if let Ok(user_event) = serde_json::from_value::<RealtimeUserEvent>(event.new.clone()) {
|
|
|
|
|
let _ = self.0.send(UserUpdate {
|
|
|
|
|
uid: user_event.uid,
|
|
|
|
|
name: user_event.name,
|
|
|
|
|
email: user_event.email,
|
|
|
|
|
encryption_sign: user_event.encryption_sign,
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub struct RealtimeCollabUpdateHandler {
|
|
|
|
|
sender_by_oid: Weak<CollabUpdateSenderByOid>,
|
|
|
|
|
device_id: Arc<RwLock<String>>,
|
|
|
|
|
encryption: Weak<dyn AppFlowyEncryption>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl RealtimeCollabUpdateHandler {
|
|
|
|
|
pub fn new(
|
|
|
|
|
sender_by_oid: Weak<CollabUpdateSenderByOid>,
|
|
|
|
|
device_id: Arc<RwLock<String>>,
|
|
|
|
|
encryption: Weak<dyn AppFlowyEncryption>,
|
|
|
|
|
) -> Self {
|
|
|
|
|
Self {
|
|
|
|
|
sender_by_oid,
|
|
|
|
|
device_id,
|
|
|
|
|
encryption,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
impl RealtimeEventHandler for RealtimeCollabUpdateHandler {
|
|
|
|
|
fn table_name(&self) -> &str {
|
|
|
|
|
"af_collab_update"
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn handler_event(&self, event: &RealtimeEvent) {
|
|
|
|
|
if let Ok(collab_update) =
|
|
|
|
|
serde_json::from_value::<RealtimeCollabUpdateEvent>(event.new.clone())
|
|
|
|
|
{
|
|
|
|
|
if let Some(sender_by_oid) = self.sender_by_oid.upgrade() {
|
|
|
|
|
if let Some(sender) = sender_by_oid.read().get(collab_update.oid.as_str()) {
|
|
|
|
|
tracing::trace!(
|
|
|
|
|
"current device: {}, event device: {}",
|
|
|
|
|
self.device_id.read(),
|
|
|
|
|
collab_update.did.as_str()
|
|
|
|
|
);
|
|
|
|
|
if *self.device_id.read() != collab_update.did.as_str() {
|
|
|
|
|
let encryption_secret = self
|
|
|
|
|
.encryption
|
|
|
|
|
.upgrade()
|
|
|
|
|
.and_then(|encryption| encryption.get_secret());
|
|
|
|
|
|
|
|
|
|
tracing::trace!(
|
|
|
|
|
"Parse collab update with len: {}, encrypt: {}",
|
|
|
|
|
collab_update.value.len(),
|
|
|
|
|
collab_update.encrypt,
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
match SupabaseBinaryColumnDecoder::decode::<_, RealtimeBinaryColumnDecoder>(
|
|
|
|
|
collab_update.value.as_str(),
|
|
|
|
|
collab_update.encrypt,
|
|
|
|
|
&encryption_secret,
|
|
|
|
|
) {
|
|
|
|
|
Ok(value) => {
|
|
|
|
|
if let Err(e) = sender.send(value) {
|
|
|
|
|
tracing::debug!("send realtime update error: {}", e);
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
Err(err) => {
|
|
|
|
|
tracing::error!("decode collab update error: {}", err);
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2023-08-22 00:19:15 +08:00
|
|
|
|
|
|
|
|
fn empty_workspace_update(collab_object: &CollabObject) -> Vec<u8> {
|
|
|
|
|
let workspace_id = collab_object.object_id.clone();
|
|
|
|
|
let collab = Arc::new(MutexCollab::new(
|
|
|
|
|
CollabOrigin::Empty,
|
|
|
|
|
&collab_object.object_id,
|
|
|
|
|
vec![],
|
|
|
|
|
));
|
|
|
|
|
let folder = Folder::create(collab.clone(), None, None);
|
|
|
|
|
folder.workspaces.create_workspace(Workspace {
|
|
|
|
|
id: workspace_id.clone(),
|
|
|
|
|
name: "My workspace".to_string(),
|
|
|
|
|
child_views: Default::default(),
|
|
|
|
|
created_at: timestamp(),
|
|
|
|
|
});
|
|
|
|
|
folder.set_current_workspace(&workspace_id);
|
|
|
|
|
collab.encode_as_update_v1().0
|
|
|
|
|
}
|
2023-10-02 17:22:22 +08:00
|
|
|
|
|
|
|
|
fn oauth_params_from_box_any(any: BoxAny) -> Result<SupabaseOAuthParams, Error> {
|
|
|
|
|
let map: HashMap<String, String> = any.unbox_or_error()?;
|
|
|
|
|
let uuid = uuid_from_map(&map)?;
|
|
|
|
|
let email = map.get("email").cloned().unwrap_or_default();
|
|
|
|
|
let device_id = map.get("device_id").cloned().unwrap_or_default();
|
|
|
|
|
Ok(SupabaseOAuthParams {
|
|
|
|
|
uuid,
|
|
|
|
|
email,
|
|
|
|
|
device_id,
|
|
|
|
|
})
|
|
|
|
|
}
|