chore: sql

This commit is contained in:
Nathan 2025-04-19 21:03:50 +08:00
parent 102087537a
commit 81f63bebe6
13 changed files with 294 additions and 120 deletions

View File

@ -3086,7 +3086,6 @@ dependencies = [
"lazy_static",
"lib-dispatch",
"lib-infra",
"nanoid",
"protobuf",
"quickcheck",
"quickcheck_macros",
@ -3110,16 +3109,16 @@ dependencies = [
name = "flowy-user-pub"
version = "0.1.0"
dependencies = [
"anyhow",
"arc-swap",
"base64 0.21.5",
"chrono",
"client-api",
"collab",
"collab-entity",
"collab-folder",
"diesel",
"flowy-error",
"flowy-folder-pub",
"flowy-sqlite",
"lib-infra",
"serde",
"serde_json",

View File

@ -1,13 +1,17 @@
#![allow(unused_variables)]
use client_api::entity::GotrueTokenResponse;
use collab::core::origin::CollabOrigin;
use collab::preclude::Collab;
use collab_entity::CollabObject;
use collab_user::core::UserAwareness;
use lazy_static::lazy_static;
use std::sync::Arc;
use tokio::sync::Mutex;
use uuid::Uuid;
use crate::af_cloud::define::LoggedUser;
use crate::local_server::uid::UserIDGenerator;
use flowy_error::FlowyError;
use flowy_user_pub::cloud::{UserCloudService, UserCollabParams};
use flowy_user_pub::entities::*;
@ -16,14 +20,14 @@ use lib_infra::async_trait::async_trait;
use lib_infra::box_any::BoxAny;
use lib_infra::util::timestamp;
use crate::local_server::uid::UserIDGenerator;
lazy_static! {
//FIXME: seriously, userID generation should work using lock-free algorithm
static ref ID_GEN: Mutex<UserIDGenerator> = Mutex::new(UserIDGenerator::new(1));
}
pub(crate) struct LocalServerUserServiceImpl;
pub(crate) struct LocalServerUserServiceImpl {
pub user: Arc<dyn LoggedUser>,
}
#[async_trait]
impl UserCloudService for LocalServerUserServiceImpl {
async fn sign_up(&self, params: BoxAny) -> Result<AuthResponse, FlowyError> {
@ -128,7 +132,13 @@ impl UserCloudService for LocalServerUserServiceImpl {
)
}
async fn get_all_workspace(&self, _uid: i64) -> Result<Vec<UserWorkspace>, FlowyError> {
async fn get_all_workspace(&self, uid: i64) -> Result<Vec<UserWorkspace>, FlowyError> {
let conn = self.user.get_sqlite_db(uid)?;
select_all_user_workspaces(&conn).await.map_err(|e| {
FlowyError::internal().with_context(format!("Failed to get all workspaces: {}", e))
})?;
Ok(vec![])
}
@ -145,17 +155,11 @@ impl UserCloudService for LocalServerUserServiceImpl {
new_workspace_name: Option<String>,
new_workspace_icon: Option<String>,
) -> Result<(), FlowyError> {
Err(
FlowyError::local_version_not_support()
.with_context("local server doesn't support multiple workspaces"),
)
Ok(())
}
async fn delete_workspace(&self, workspace_id: &Uuid) -> Result<(), FlowyError> {
Err(
FlowyError::local_version_not_support()
.with_context("local server doesn't support multiple workspaces"),
)
Ok(())
}
async fn get_user_awareness_doc_state(
@ -188,10 +192,7 @@ impl UserCloudService for LocalServerUserServiceImpl {
workspace_id: &Uuid,
objects: Vec<UserCollabParams>,
) -> Result<(), FlowyError> {
Err(
FlowyError::local_version_not_support()
.with_context("local server doesn't support batch create collab object"),
)
Ok(())
}
}

View File

@ -41,7 +41,9 @@ impl LocalServer {
impl AppFlowyServer for LocalServer {
fn user_service(&self) -> Arc<dyn UserCloudService> {
Arc::new(LocalServerUserServiceImpl)
Arc::new(LocalServerUserServiceImpl {
user: self.user.clone(),
})
}
fn folder_service(&self) -> Arc<dyn FolderCloudService> {

View File

@ -15,7 +15,6 @@ collab-entity = { workspace = true }
serde_json.workspace = true
serde_repr.workspace = true
chrono = { workspace = true, default-features = false, features = ["clock", "serde"] }
anyhow.workspace = true
tokio = { workspace = true, features = ["sync"] }
tokio-stream = "0.1.14"
flowy-folder-pub.workspace = true
@ -23,4 +22,5 @@ collab-folder = { workspace = true }
tracing.workspace = true
base64 = "0.21"
client-api = { workspace = true }
arc-swap = "1.7.1"
flowy-sqlite.workspace = true
diesel.workspace = true

View File

@ -293,7 +293,7 @@ pub trait UserCloudService: Send + Sync + 'static {
async fn get_workspace_subscriptions(
&self,
) -> Result<Vec<WorkspaceSubscriptionStatus>, FlowyError> {
Err(FlowyError::not_support())
Ok(vec![])
}
/// Get the workspace subscriptions for a workspace
@ -301,7 +301,7 @@ pub trait UserCloudService: Send + Sync + 'static {
&self,
workspace_id: &Uuid,
) -> Result<Vec<WorkspaceSubscriptionStatus>, FlowyError> {
Err(FlowyError::not_support())
Ok(vec![])
}
async fn cancel_workspace_subscription(
@ -310,14 +310,14 @@ pub trait UserCloudService: Send + Sync + 'static {
plan: SubscriptionPlan,
reason: Option<String>,
) -> Result<(), FlowyError> {
Err(FlowyError::not_support())
Ok(())
}
async fn get_workspace_plan(
&self,
workspace_id: Uuid,
) -> Result<Vec<SubscriptionPlan>, FlowyError> {
Err(FlowyError::not_support())
Ok(vec![])
}
async fn get_workspace_usage(

View File

@ -164,7 +164,7 @@ impl UserWorkspace {
workspace_database_id: Uuid::new_v4().to_string(),
icon: "".to_string(),
member_count: 1,
role: None,
role: Some(Role::Owner),
}
}
}

View File

@ -1,6 +1,7 @@
pub mod cloud;
pub mod entities;
pub mod session;
mod sql;
pub mod workspace_service;
pub const DEFAULT_USER_NAME: fn() -> String = || "Me".to_string();

View File

@ -0,0 +1 @@
pub mod workspace_sql;

View File

@ -0,0 +1,188 @@
use crate::entities::{AuthType, UserWorkspace};
use chrono::{TimeZone, Utc};
use diesel::{RunQueryDsl, SqliteConnection};
use flowy_error::{FlowyError, FlowyResult};
use flowy_sqlite::schema::user_workspace_table;
use flowy_sqlite::DBConnection;
use flowy_sqlite::{query_dsl::*, ExpressionMethods};
use tracing::{info, trace, warn};
#[derive(Clone, Default, Queryable, Identifiable, Insertable)]
#[diesel(table_name = user_workspace_table)]
pub struct UserWorkspaceTable {
pub id: String,
pub name: String,
pub uid: i64,
pub created_at: i64,
pub database_storage_id: String,
pub icon: String,
pub member_count: i64,
pub role: Option<i32>,
pub auth_type: i32,
}
#[derive(AsChangeset, Identifiable, Default, Debug)]
#[diesel(table_name = user_workspace_table)]
pub struct UserWorkspaceChangeset {
pub id: String,
pub name: Option<String>,
pub icon: Option<String>,
}
impl UserWorkspaceTable {
pub fn from_workspace(
uid: i64,
workspace: &UserWorkspace,
auth_type: AuthType,
) -> Result<Self, FlowyError> {
if workspace.id.is_empty() {
return Err(FlowyError::invalid_data().with_context("The id is empty"));
}
if workspace.workspace_database_id.is_empty() {
return Err(FlowyError::invalid_data().with_context("The database storage id is empty"));
}
Ok(Self {
id: workspace.id.clone(),
name: workspace.name.clone(),
uid,
created_at: workspace.created_at.timestamp(),
database_storage_id: workspace.workspace_database_id.clone(),
icon: workspace.icon.clone(),
member_count: workspace.member_count,
role: workspace.role.clone().map(|v| v as i32),
auth_type: auth_type as i32,
})
}
}
pub fn select_user_workspace(
workspace_id: &str,
mut conn: DBConnection,
) -> Option<UserWorkspaceTable> {
user_workspace_table::dsl::user_workspace_table
.filter(user_workspace_table::id.eq(workspace_id))
.first::<UserWorkspaceTable>(&mut *conn)
.ok()
}
pub fn select_all_user_workspace(
user_id: i64,
mut conn: DBConnection,
) -> Result<Vec<UserWorkspace>, FlowyError> {
let rows = user_workspace_table::dsl::user_workspace_table
.filter(user_workspace_table::uid.eq(user_id))
.load::<UserWorkspaceTable>(&mut *conn)?;
Ok(rows.into_iter().map(UserWorkspace::from).collect())
}
pub fn update_user_workspace(
mut conn: DBConnection,
changeset: UserWorkspaceChangeset,
) -> Result<(), FlowyError> {
diesel::update(user_workspace_table::dsl::user_workspace_table)
.filter(user_workspace_table::id.eq(changeset.id.clone()))
.set(changeset)
.execute(&mut conn)?;
Ok(())
}
pub fn upsert_user_workspace(
uid: i64,
auth_type: AuthType,
user_workspace: UserWorkspace,
conn: &mut SqliteConnection,
) -> Result<(), FlowyError> {
let new_record = UserWorkspaceTable::from_workspace(uid, &user_workspace, auth_type)?;
diesel::insert_into(user_workspace_table::table)
.values(new_record.clone())
.on_conflict(user_workspace_table::id)
.do_update()
.set((
user_workspace_table::name.eq(new_record.name),
user_workspace_table::uid.eq(new_record.uid),
user_workspace_table::created_at.eq(new_record.created_at),
user_workspace_table::database_storage_id.eq(new_record.database_storage_id),
user_workspace_table::icon.eq(new_record.icon),
user_workspace_table::member_count.eq(new_record.member_count),
user_workspace_table::role.eq(new_record.role),
user_workspace_table::auth_type.eq(new_record.auth_type),
))
.execute(conn)?;
Ok(())
}
pub fn delete_user_workspace(mut conn: DBConnection, workspace_id: &str) -> FlowyResult<()> {
let n = conn.immediate_transaction(|conn| {
let rows_affected: usize =
diesel::delete(user_workspace_table::table.filter(user_workspace_table::id.eq(workspace_id)))
.execute(conn)?;
Ok::<usize, FlowyError>(rows_affected)
})?;
if n != 1 {
warn!("expected to delete 1 row, but deleted {} rows", n);
}
Ok(())
}
impl From<UserWorkspaceTable> for UserWorkspace {
fn from(value: UserWorkspaceTable) -> Self {
Self {
id: value.id,
name: value.name,
created_at: Utc
.timestamp_opt(value.created_at, 0)
.single()
.unwrap_or_default(),
workspace_database_id: value.database_storage_id,
icon: value.icon,
member_count: value.member_count,
role: value.role.map(|v| v.into()),
}
}
}
/// Delete all user workspaces for the given user and auth type.
pub fn delete_user_all_workspace(
uid: i64,
auth_type: AuthType,
conn: &mut SqliteConnection,
) -> FlowyResult<()> {
let n = diesel::delete(
user_workspace_table::dsl::user_workspace_table
.filter(user_workspace_table::uid.eq(uid))
.filter(user_workspace_table::auth_type.eq(auth_type as i32)),
)
.execute(conn)?;
info!(
"Delete {} workspaces for user {} and auth type {:?}",
n, uid, auth_type
);
Ok(())
}
/// Delete all user workspaces for the given user and auth type, then insert the provided user workspaces.
pub fn delete_all_then_insert_user_workspaces(
uid: i64,
mut conn: DBConnection,
auth_type: AuthType,
user_workspaces: &[UserWorkspace],
) -> FlowyResult<()> {
conn.immediate_transaction(|conn| {
delete_user_all_workspace(uid, auth_type, conn)?;
info!(
"Insert {} workspaces for user {} and auth type {:?}",
user_workspaces.len(),
uid,
auth_type
);
for user_workspace in user_workspaces {
upsert_user_workspace(uid, auth_type, user_workspace.clone(), conn)?;
}
Ok::<(), FlowyError>(())
})
}

View File

@ -48,7 +48,6 @@ validator = { workspace = true, features = ["derive"] }
rayon = "1.10.0"
[dev-dependencies]
nanoid = "0.4.0"
fake = "2.0.0"
rand = "0.8.4"
quickcheck = "1.0.3"

View File

@ -1,10 +1,11 @@
use chrono::{TimeZone, Utc};
use diesel::RunQueryDsl;
use flowy_error::FlowyError;
use diesel::{RunQueryDsl, SqliteConnection};
use flowy_error::{FlowyError, FlowyResult};
use flowy_sqlite::schema::user_workspace_table;
use flowy_sqlite::DBConnection;
use flowy_sqlite::{query_dsl::*, ExpressionMethods};
use flowy_user_pub::entities::{AuthType, UserWorkspace};
use tracing::{info, trace, warn};
#[derive(Clone, Default, Queryable, Identifiable, Insertable)]
#[diesel(table_name = user_workspace_table)]
@ -91,10 +92,9 @@ pub fn upsert_user_workspace(
uid: i64,
auth_type: AuthType,
user_workspace: UserWorkspace,
conn: &mut DBConnection,
conn: &mut SqliteConnection,
) -> Result<(), FlowyError> {
let new_record = UserWorkspaceTable::from_workspace(uid, &user_workspace, auth_type)?;
diesel::insert_into(user_workspace_table::table)
.values(new_record.clone())
.on_conflict(user_workspace_table::id)
@ -114,6 +114,20 @@ pub fn upsert_user_workspace(
Ok(())
}
pub fn delete_user_workspace(mut conn: DBConnection, workspace_id: &str) -> FlowyResult<()> {
let n = conn.immediate_transaction(|conn| {
let rows_affected: usize =
diesel::delete(user_workspace_table::table.filter(user_workspace_table::id.eq(workspace_id)))
.execute(conn)?;
Ok::<usize, FlowyError>(rows_affected)
})?;
if n != 1 {
warn!("expected to delete 1 row, but deleted {} rows", n);
}
Ok(())
}
impl From<UserWorkspaceTable> for UserWorkspace {
fn from(value: UserWorkspaceTable) -> Self {
Self {
@ -130,3 +144,45 @@ impl From<UserWorkspaceTable> for UserWorkspace {
}
}
}
/// Delete all user workspaces for the given user and auth type.
pub fn delete_user_all_workspace(
uid: i64,
auth_type: AuthType,
conn: &mut SqliteConnection,
) -> FlowyResult<()> {
let n = diesel::delete(
user_workspace_table::dsl::user_workspace_table
.filter(user_workspace_table::uid.eq(uid))
.filter(user_workspace_table::auth_type.eq(auth_type as i32)),
)
.execute(conn)?;
info!(
"Delete {} workspaces for user {} and auth type {:?}",
n, uid, auth_type
);
Ok(())
}
/// Delete all user workspaces for the given user and auth type, then insert the provided user workspaces.
pub fn delete_all_then_insert_user_workspaces(
uid: i64,
mut conn: DBConnection,
auth_type: AuthType,
user_workspaces: &[UserWorkspace],
) -> FlowyResult<()> {
conn.immediate_transaction(|conn| {
delete_user_all_workspace(uid, auth_type, conn)?;
info!(
"Insert {} workspaces for user {} and auth type {:?}",
user_workspaces.len(),
uid,
auth_type
);
for user_workspace in user_workspaces {
upsert_user_workspace(uid, auth_type, user_workspace.clone(), conn)?;
}
Ok::<(), FlowyError>(())
})
}

View File

@ -41,8 +41,9 @@ use crate::services::collab_interact::{DefaultCollabInteract, UserReminder};
use crate::migrations::doc_key_with_workspace::CollabDocKeyWithWorkspaceIdMigration;
use crate::services::sqlite_sql::user_sql::{select_user_profile, UserTable, UserTableChangeset};
use crate::services::sqlite_sql::workspace_sql::upsert_user_workspace;
use crate::user_manager::manager_user_workspace::save_all_user_workspaces;
use crate::services::sqlite_sql::workspace_sql::{
delete_all_then_insert_user_workspaces, upsert_user_workspace,
};
use crate::user_manager::user_login_state::UserAuthProcess;
use crate::{errors::FlowyError, notification::*};
use flowy_user_pub::session::Session;
@ -758,12 +759,13 @@ impl UserManager {
) -> Result<(), FlowyError> {
let user_profile = UserProfile::from((response, &auth_type));
let uid = user_profile.uid;
if auth_type.is_local() {
event!(tracing::Level::DEBUG, "Save new anon user: {:?}", uid);
self.set_anon_user(session);
}
save_all_user_workspaces(
delete_all_then_insert_user_workspaces(
uid,
self.db_connection(uid)?,
auth_type,

View File

@ -23,21 +23,21 @@ use crate::services::sqlite_sql::workspace_setting_sql::{
WorkspaceSettingsChangeset, WorkspaceSettingsTable,
};
use crate::services::sqlite_sql::workspace_sql::{
select_all_user_workspace, select_user_workspace, update_user_workspace, upsert_user_workspace,
UserWorkspaceChangeset, UserWorkspaceTable,
delete_all_then_insert_user_workspaces, delete_user_workspace, select_all_user_workspace,
select_user_workspace, update_user_workspace, upsert_user_workspace, UserWorkspaceChangeset,
UserWorkspaceTable,
};
use crate::user_manager::UserManager;
use collab_integrate::CollabKVDB;
use flowy_error::{ErrorCode, FlowyError, FlowyResult};
use flowy_folder_pub::entities::{ImportFrom, ImportedCollabData, ImportedFolderData};
use flowy_sqlite::schema::user_workspace_table;
use flowy_sqlite::{query_dsl::*, ConnectionPool, DBConnection, ExpressionMethods};
use flowy_sqlite::{ConnectionPool, DBConnection, ExpressionMethods};
use flowy_user_pub::cloud::{UserCloudService, UserCloudServiceProvider};
use flowy_user_pub::entities::{
AuthType, Role, UserWorkspace, WorkspaceInvitation, WorkspaceInvitationStatus, WorkspaceMember,
};
use flowy_user_pub::session::Session;
use tracing::{error, info, instrument, trace, warn};
use tracing::{error, info, instrument, trace};
use uuid::Uuid;
impl UserManager {
@ -282,7 +282,7 @@ impl UserManager {
// delete workspace from local sqlite db
let uid = self.user_id()?;
let conn = self.db_connection(uid)?;
delete_user_workspaces(conn, workspace_id.to_string().as_str())?;
delete_user_workspace(conn, workspace_id.to_string().as_str())?;
self
.user_workspace_service
@ -300,7 +300,7 @@ impl UserManager {
.await?;
let uid = self.user_id()?;
let conn = self.db_connection(uid)?;
delete_user_workspaces(conn, workspace_id.to_string().as_str())?;
delete_user_workspace(conn, workspace_id.to_string().as_str())?;
self
.user_workspace_service
@ -417,7 +417,8 @@ impl UserManager {
tokio::spawn(async move {
if let Ok(new_user_workspaces) = service.get_all_workspace(uid).await {
if let Ok(conn) = pool.get() {
let _ = save_all_user_workspaces(uid, conn, auth_type, &new_user_workspaces);
let _ =
delete_all_then_insert_user_workspaces(uid, conn, auth_type, &new_user_workspaces);
let repeated_workspace_pbs =
RepeatedUserWorkspacePB::from((auth_type, new_user_workspaces));
send_notification(&uid.to_string(), UserNotification::DidUpdateUserWorkspaces)
@ -696,82 +697,6 @@ impl UserManager {
}
}
/// This method is used to save the user workspaces (plural) to the SQLite database
///
/// The workspaces provided in [user_workspaces] will override the existing workspaces in the database.
///
/// Consider using [upsert_user_workspace] if you only need to save a single workspace.
///
pub fn save_all_user_workspaces(
uid: i64,
mut conn: DBConnection,
auth_type: AuthType,
user_workspaces: &[UserWorkspace],
) -> FlowyResult<()> {
let user_workspaces = user_workspaces
.iter()
.map(|user_workspace| UserWorkspaceTable::from_workspace(uid, user_workspace, auth_type))
.collect::<Result<Vec<_>, _>>()?;
conn.immediate_transaction(|conn| {
let existing_ids = user_workspace_table::dsl::user_workspace_table
.select(user_workspace_table::id)
.load::<String>(conn)?;
let new_ids: Vec<String> = user_workspaces.iter().map(|w| w.id.clone()).collect();
let ids_to_delete: Vec<String> = existing_ids
.into_iter()
.filter(|id| !new_ids.contains(id))
.collect();
// insert or update the user workspaces
for user_workspace in &user_workspaces {
let affected_rows = diesel::update(
user_workspace_table::dsl::user_workspace_table
.filter(user_workspace_table::id.eq(&user_workspace.id)),
)
.set((
user_workspace_table::name.eq(&user_workspace.name),
user_workspace_table::created_at.eq(&user_workspace.created_at),
user_workspace_table::database_storage_id.eq(&user_workspace.database_storage_id),
user_workspace_table::icon.eq(&user_workspace.icon),
user_workspace_table::member_count.eq(&user_workspace.member_count),
user_workspace_table::role.eq(&user_workspace.role),
))
.execute(conn)?;
if affected_rows == 0 {
diesel::insert_into(user_workspace_table::table)
.values(user_workspace)
.execute(conn)?;
}
}
// delete the user workspaces that are not in the new list
if !ids_to_delete.is_empty() {
diesel::delete(
user_workspace_table::dsl::user_workspace_table
.filter(user_workspace_table::id.eq_any(ids_to_delete)),
)
.execute(conn)?;
}
Ok::<(), FlowyError>(())
})
}
pub fn delete_user_workspaces(mut conn: DBConnection, workspace_id: &str) -> FlowyResult<()> {
let n = conn.immediate_transaction(|conn| {
let rows_affected: usize =
diesel::delete(user_workspace_table::table.filter(user_workspace_table::id.eq(workspace_id)))
.execute(conn)?;
Ok::<usize, FlowyError>(rows_affected)
})?;
if n != 1 {
warn!("expected to delete 1 row, but deleted {} rows", n);
}
Ok(())
}
fn is_older_than_n_minutes(updated_at: NaiveDateTime, minutes: i64) -> bool {
let current_time: NaiveDateTime = Utc::now().naive_utc();
match current_time.checked_sub_signed(Duration::minutes(minutes)) {