async update user profile

This commit is contained in:
appflowy 2021-09-04 17:26:04 +08:00
parent 9771066d80
commit ee34ee4b5b
6 changed files with 86 additions and 69 deletions

View File

@ -3,13 +3,10 @@ macro_rules! diesel_update_table {
( (
$table_name:ident, $table_name:ident,
$changeset:ident, $changeset:ident,
$connection:ident $connection:expr
) => { ) => {
let filter = let filter = $table_name::dsl::$table_name.filter($table_name::dsl::id.eq($changeset.id.clone()));
$table_name::dsl::$table_name.filter($table_name::dsl::id.eq($changeset.id.clone())); let affected_row = diesel::update(filter).set($changeset).execute(&*$connection)?;
let affected_row = diesel::update(filter)
.set($changeset)
.execute(&*$connection)?;
debug_assert_eq!(affected_row, 1); debug_assert_eq!(affected_row, 1);
}; };
} }

View File

@ -12,11 +12,9 @@ pub struct EditorDatabaseImpl {
impl DocumentDatabase for EditorDatabaseImpl { impl DocumentDatabase for EditorDatabaseImpl {
fn db_connection(&self) -> Result<DBConnection, DocError> { fn db_connection(&self) -> Result<DBConnection, DocError> {
self.user_session.get_db_connection().map_err(|e| { self.user_session
ErrorBuilder::new(DocErrorCode::EditorDBConnFailed) .db()
.error(e) .map_err(|e| ErrorBuilder::new(DocErrorCode::EditorDBConnFailed).error(e).build())
.build()
})
} }
} }
@ -26,11 +24,10 @@ pub struct EditorUserImpl {
impl DocumentUser for EditorUserImpl { impl DocumentUser for EditorUserImpl {
fn user_doc_dir(&self) -> Result<String, DocError> { fn user_doc_dir(&self) -> Result<String, DocError> {
let dir = self.user_session.user_dir().map_err(|e| { let dir = self
ErrorBuilder::new(DocErrorCode::EditorUserNotLoginYet) .user_session
.error(e) .user_dir()
.build() .map_err(|e| ErrorBuilder::new(DocErrorCode::EditorUserNotLoginYet).error(e).build())?;
})?;
let doc_dir = format!("{}/doc", dir); let doc_dir = format!("{}/doc", dir);
if !Path::new(&doc_dir).exists() { if !Path::new(&doc_dir).exists() {

View File

@ -13,19 +13,15 @@ pub struct WorkspaceUserImpl {
impl WorkspaceUser for WorkspaceUserImpl { impl WorkspaceUser for WorkspaceUserImpl {
fn user_id(&self) -> Result<String, WorkspaceError> { fn user_id(&self) -> Result<String, WorkspaceError> {
self.user_session.user_id().map_err(|e| { self.user_session
ErrorBuilder::new(ErrorCode::UserInternalError) .user_id()
.error(e) .map_err(|e| ErrorBuilder::new(ErrorCode::UserInternalError).error(e).build())
.build()
})
} }
fn token(&self) -> Result<String, WorkspaceError> { fn token(&self) -> Result<String, WorkspaceError> {
self.user_session.token().map_err(|e| { self.user_session
ErrorBuilder::new(ErrorCode::UserInternalError) .token()
.error(e) .map_err(|e| ErrorBuilder::new(ErrorCode::UserInternalError).error(e).build())
.build()
})
} }
} }
@ -35,10 +31,8 @@ pub struct WorkspaceDatabaseImpl {
impl WorkspaceDatabase for WorkspaceDatabaseImpl { impl WorkspaceDatabase for WorkspaceDatabaseImpl {
fn db_connection(&self) -> Result<DBConnection, WorkspaceError> { fn db_connection(&self) -> Result<DBConnection, WorkspaceError> {
self.user_session.get_db_connection().map_err(|e| { self.user_session
ErrorBuilder::new(ErrorCode::DatabaseConnectionFail) .db()
.error(e) .map_err(|e| ErrorBuilder::new(ErrorCode::DatabaseConnectionFail).error(e).build())
.build()
})
} }
} }

View File

@ -0,0 +1,9 @@
use std::future::Future;
pub fn spawn<F>(f: F)
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
tokio::spawn(f);
}

View File

@ -1,2 +1,3 @@
mod helper;
mod server; mod server;
pub mod user; pub mod user;

View File

@ -19,6 +19,7 @@ use flowy_sqlite::ConnectionPool;
use parking_lot::RwLock; use parking_lot::RwLock;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::services::helper::spawn;
use std::sync::Arc; use std::sync::Arc;
pub struct UserSessionConfig { pub struct UserSessionConfig {
@ -53,7 +54,7 @@ impl UserSession {
} }
} }
pub fn get_db_connection(&self) -> Result<DBConnection, UserError> { pub fn db(&self) -> Result<DBConnection, UserError> {
let user_id = self.get_session()?.user_id; let user_id = self.get_session()?.user_id;
self.database.get_connection(&user_id) self.database.get_connection(&user_id)
} }
@ -64,7 +65,7 @@ impl UserSession {
// //
// let pool = self.db_connection_pool()?; // let pool = self.db_connection_pool()?;
// let conn: PooledConnection<ConnectionManager> = pool.get()?; // let conn: PooledConnection<ConnectionManager> = pool.get()?;
pub fn db_connection_pool(&self) -> Result<Arc<ConnectionPool>, UserError> { pub fn db_pool(&self) -> Result<Arc<ConnectionPool>, UserError> {
let user_id = self.get_session()?.user_id; let user_id = self.get_session()?.user_id;
self.database.get_pool(&user_id) self.database.get_pool(&user_id)
} }
@ -97,58 +98,31 @@ impl UserSession {
pub async fn sign_out(&self) -> Result<(), UserError> { pub async fn sign_out(&self) -> Result<(), UserError> {
let session = self.get_session()?; let session = self.get_session()?;
match self.server.sign_out(&session.token).await { let _ = diesel::delete(dsl::user_table.filter(dsl::id.eq(&session.user_id))).execute(&*(self.db()?))?;
Ok(_) => {},
Err(e) => log::error!("Sign out failed: {:?}", e),
}
let conn = self.get_db_connection()?;
let _ = diesel::delete(dsl::user_table.filter(dsl::id.eq(&session.user_id))).execute(&*conn)?;
let _ = self.server.sign_out(&session.token);
let _ = self.database.close_user_db(&session.user_id)?; let _ = self.database.close_user_db(&session.user_id)?;
let _ = self.set_session(None)?; let _ = self.set_session(None)?;
let _ = self.sign_out_on_server(&session.token).await?;
Ok(()) Ok(())
} }
pub async fn update_user(&self, params: UpdateUserParams) -> Result<(), UserError> { pub async fn update_user(&self, params: UpdateUserParams) -> Result<(), UserError> {
let session = self.get_session()?; let session = self.get_session()?;
match self.server.update_user(&session.token, params.clone()).await { let changeset = UserTableChangeset::new(params.clone());
Ok(_) => {}, diesel_update_table!(user_table, changeset, self.db()?);
Err(e) => {
// TODO: retry?
log::error!("update user profile failed: {:?}", e);
},
}
let changeset = UserTableChangeset::new(params); let _ = self.update_user_on_server(&session.token, params).await?;
let conn = self.get_db_connection()?;
diesel_update_table!(user_table, changeset, conn);
Ok(()) Ok(())
} }
pub async fn user_profile(&self) -> Result<UserProfile, UserError> { pub async fn user_profile(&self) -> Result<UserProfile, UserError> {
let session = self.get_session()?; let session = self.get_session()?;
let token = session.token; let token = session.token;
let server = self.server.clone();
tokio::spawn(async move {
match server.get_user(&token).await {
Ok(profile) => {
//
log::info!("{:?}", profile);
},
Err(e) => {
//
log::info!("{:?}", e);
},
}
})
.await;
let user = dsl::user_table let user = dsl::user_table
.filter(user_table::id.eq(&session.user_id)) .filter(user_table::id.eq(&session.user_id))
.first::<UserTable>(&*(self.get_db_connection()?))?; .first::<UserTable>(&*(self.db()?))?;
let _ = self.read_user_profile_on_server(&token).await?;
Ok(UserProfile::from(user)) Ok(UserProfile::from(user))
} }
@ -163,8 +137,53 @@ impl UserSession {
} }
impl UserSession { impl UserSession {
async fn read_user_profile_on_server(&self, token: &str) -> Result<(), UserError> {
let server = self.server.clone();
let token = token.to_owned();
spawn(async move {
match server.get_user(&token).await {
Ok(profile) => {
//
log::info!("{:?}", profile);
},
Err(e) => {
//
log::info!("{:?}", e);
},
}
});
Ok(())
}
async fn update_user_on_server(&self, token: &str, params: UpdateUserParams) -> Result<(), UserError> {
let server = self.server.clone();
let token = token.to_owned();
spawn(async move {
match server.update_user(&token, params).await {
Ok(_) => {},
Err(e) => {
// TODO: retry?
log::error!("update user profile failed: {:?}", e);
},
}
});
Ok(())
}
async fn sign_out_on_server(&self, token: &str) -> Result<(), UserError> {
let server = self.server.clone();
let token = token.to_owned();
spawn(async move {
match server.sign_out(&token).await {
Ok(_) => {},
Err(e) => log::error!("Sign out failed: {:?}", e),
}
});
Ok(())
}
async fn save_user(&self, user: UserTable) -> Result<UserTable, UserError> { async fn save_user(&self, user: UserTable) -> Result<UserTable, UserError> {
let conn = self.get_db_connection()?; let conn = self.db()?;
let _ = diesel::insert_into(user_table::table).values(user.clone()).execute(&*conn)?; let _ = diesel::insert_into(user_table::table).values(user.clone()).execute(&*conn)?;
Ok(user) Ok(user)
@ -214,7 +233,7 @@ pub async fn update_user(_server: Server, pool: Arc<ConnectionPool>, params: Upd
} }
impl UserDatabaseConnection for UserSession { impl UserDatabaseConnection for UserSession {
fn get_connection(&self) -> Result<DBConnection, String> { self.get_db_connection().map_err(|e| format!("{:?}", e)) } fn get_connection(&self) -> Result<DBConnection, String> { self.db().map_err(|e| format!("{:?}", e)) }
} }
const SESSION_CACHE_KEY: &str = "session_cache_key"; const SESSION_CACHE_KEY: &str = "session_cache_key";