AppFlowy/rust-lib/flowy-workspace/src/services/workspace_controller.rs

315 lines
12 KiB
Rust
Raw Normal View History

2021-07-20 14:03:21 +08:00
use crate::{
entities::{app::App, workspace::*},
errors::*,
module::{WorkspaceDatabase, WorkspaceUser},
2021-09-07 23:30:43 +08:00
observable::observable,
services::{helper::spawn, server::Server, AppController},
sql_tables::workspace::{WorkspaceTable, WorkspaceTableChangeset, WorkspaceTableSql},
2021-07-20 14:03:21 +08:00
};
2021-09-04 15:12:53 +08:00
2021-09-03 12:44:48 +08:00
use flowy_infra::kv::KV;
2021-09-02 17:43:10 +08:00
2021-09-07 23:30:43 +08:00
use crate::{
entities::app::RepeatedApp,
observable::WorkspaceObservable,
sql_tables::{
app::{AppTable, AppTableSql},
view::{ViewTable, ViewTableSql},
},
};
use flowy_database::SqliteConnection;
2021-09-02 17:43:10 +08:00
use std::sync::Arc;
2021-07-13 23:08:20 +08:00
2021-09-01 22:50:22 +08:00
pub(crate) struct WorkspaceController {
pub user: Arc<dyn WorkspaceUser>,
pub workspace_sql: Arc<WorkspaceTableSql>,
2021-09-07 23:30:43 +08:00
pub app_sql: Arc<AppTableSql>,
pub view_sql: Arc<ViewTableSql>,
pub database: Arc<dyn WorkspaceDatabase>,
2021-07-20 14:03:21 +08:00
pub app_controller: Arc<AppController>,
2021-09-01 22:50:22 +08:00
server: Server,
2021-07-13 23:08:20 +08:00
}
impl WorkspaceController {
2021-09-01 22:50:22 +08:00
pub(crate) fn new(
2021-07-20 14:03:21 +08:00
user: Arc<dyn WorkspaceUser>,
database: Arc<dyn WorkspaceDatabase>,
app_controller: Arc<AppController>,
2021-09-01 22:50:22 +08:00
server: Server,
2021-07-20 14:03:21 +08:00
) -> Self {
2021-09-07 23:30:43 +08:00
let workspace_sql = Arc::new(WorkspaceTableSql {});
let app_sql = Arc::new(AppTableSql {});
let view_sql = Arc::new(ViewTableSql {});
2021-07-20 14:03:21 +08:00
Self {
user,
2021-09-07 23:30:43 +08:00
workspace_sql,
app_sql,
view_sql,
database,
2021-07-20 14:03:21 +08:00
app_controller,
2021-09-01 22:50:22 +08:00
server,
2021-07-20 14:03:21 +08:00
}
}
2021-07-13 23:08:20 +08:00
2021-09-01 22:50:22 +08:00
pub(crate) async fn create_workspace(&self, params: CreateWorkspaceParams) -> Result<Workspace, WorkspaceError> {
2021-09-02 19:57:19 +08:00
let workspace = self.create_workspace_on_server(params.clone()).await?;
let user_id = self.user.user_id()?;
2021-09-02 19:57:19 +08:00
let workspace_table = WorkspaceTable::new(workspace.clone(), &user_id);
let conn = &*self.database.db_connection()?;
//[[immediate_transaction]]
// https://sqlite.org/lang_transaction.html
// IMMEDIATE cause the database connection to start a new write immediately,
// without waiting for a write statement. The BEGIN IMMEDIATE might fail
// with SQLITE_BUSY if another write transaction is already active on another
// database connection.
//
// EXCLUSIVE is similar to IMMEDIATE in that a write transaction is started
// immediately. EXCLUSIVE and IMMEDIATE are the same in WAL mode, but in
// other journaling modes, EXCLUSIVE prevents other database connections from
// reading the database while the transaction is underway.
(conn).immediate_transaction::<_, WorkspaceError, _>(|| {
self.workspace_sql.create_workspace(workspace_table, conn)?;
let repeated_workspace = self.read_local_workspaces(None, &user_id, conn)?;
2021-09-07 23:30:43 +08:00
observable(&user_id, WorkspaceObservable::UserCreateWorkspace)
.payload(repeated_workspace)
.build();
Ok(())
})?;
2021-09-07 17:12:03 +08:00
Ok(workspace)
2021-07-13 23:08:20 +08:00
}
2021-09-02 19:57:19 +08:00
pub(crate) async fn update_workspace(&self, params: UpdateWorkspaceParams) -> Result<(), WorkspaceError> {
let _ = self.update_workspace_on_server(params.clone()).await?;
let changeset = WorkspaceTableChangeset::new(params);
2021-07-21 22:41:44 +08:00
let workspace_id = changeset.id.clone();
let conn = &*self.database.db_connection()?;
(conn).immediate_transaction::<_, WorkspaceError, _>(|| {
let _ = self.workspace_sql.update_workspace(changeset, conn)?;
let user_id = self.user.user_id()?;
let workspace = self.read_local_workspace(workspace_id.clone(), &user_id, conn)?;
2021-09-07 23:30:43 +08:00
observable(&workspace_id, WorkspaceObservable::WorkspaceUpdated)
.payload(workspace)
.build();
Ok(())
})?;
2021-09-07 17:12:03 +08:00
2021-07-20 14:03:21 +08:00
Ok(())
}
2021-09-02 19:57:19 +08:00
pub(crate) async fn delete_workspace(&self, workspace_id: &str) -> Result<(), WorkspaceError> {
let user_id = self.user.user_id()?;
2021-09-02 19:57:19 +08:00
let _ = self.delete_workspace_on_server(workspace_id).await?;
let conn = &*self.database.db_connection()?;
(conn).immediate_transaction::<_, WorkspaceError, _>(|| {
let _ = self.workspace_sql.delete_workspace(workspace_id, conn)?;
let repeated_workspace = self.read_local_workspaces(None, &user_id, conn)?;
2021-09-07 23:30:43 +08:00
observable(&user_id, WorkspaceObservable::UserDeleteWorkspace)
.payload(repeated_workspace)
.build();
Ok(())
})?;
2021-09-07 17:12:03 +08:00
Ok(())
2021-07-20 14:03:21 +08:00
}
2021-09-04 09:00:15 +08:00
pub(crate) async fn open_workspace(&self, params: QueryWorkspaceParams) -> Result<Workspace, WorkspaceError> {
2021-08-27 23:53:53 +08:00
let user_id = self.user.user_id()?;
let conn = self.database.db_connection()?;
2021-09-04 09:00:15 +08:00
if let Some(workspace_id) = params.workspace_id.clone() {
let workspace = self.read_local_workspace(workspace_id, &user_id, &*conn)?;
2021-09-07 17:12:03 +08:00
set_current_workspace(&workspace.id);
Ok(workspace)
2021-09-04 09:00:15 +08:00
} else {
return Err(ErrorBuilder::new(ErrorCode::WorkspaceIdInvalid)
.msg("Opened workspace id should not be empty")
.build());
2021-08-27 23:53:53 +08:00
}
}
pub(crate) async fn read_workspaces(&self, params: QueryWorkspaceParams) -> Result<RepeatedWorkspace, WorkspaceError> {
2021-08-27 23:53:53 +08:00
let user_id = self.user.user_id()?;
let _ = self.read_workspaces_on_server(user_id.clone(), params.clone()).await;
let conn = self.database.db_connection()?;
let workspaces = self.read_local_workspaces(params.workspace_id.clone(), &user_id, &*conn)?;
2021-09-07 17:12:03 +08:00
Ok(workspaces)
}
pub(crate) async fn read_cur_workspace(&self) -> Result<Workspace, WorkspaceError> {
let workspace_id = get_current_workspace()?;
let user_id = self.user.user_id()?;
let params = QueryWorkspaceParams {
workspace_id: Some(workspace_id.clone()),
};
let _ = self.read_workspaces_on_server(user_id.clone(), params).await?;
let conn = self.database.db_connection()?;
let workspace = self.read_local_workspace(workspace_id, &user_id, &*conn)?;
2021-09-07 17:12:03 +08:00
Ok(workspace)
}
pub(crate) async fn read_workspace_apps(&self) -> Result<RepeatedApp, WorkspaceError> {
let workspace_id = get_current_workspace()?;
let conn = self.database.db_connection()?;
let apps = self.read_local_apps(&workspace_id, &*conn)?;
2021-09-07 17:12:03 +08:00
// TODO: read from server
Ok(RepeatedApp { items: apps })
}
#[tracing::instrument(level = "debug", skip(self, conn), err)]
fn read_local_workspaces(
&self,
workspace_id: Option<String>,
user_id: &str,
conn: &SqliteConnection,
) -> Result<RepeatedWorkspace, WorkspaceError> {
2021-09-07 17:12:03 +08:00
let workspace_id = workspace_id.to_owned();
let workspace_tables = self.workspace_sql.read_workspaces(workspace_id, user_id, conn)?;
2021-09-07 17:12:03 +08:00
2021-08-27 23:53:53 +08:00
let mut workspaces = vec![];
2021-08-28 23:08:12 +08:00
for table in workspace_tables {
let apps = self.read_local_apps(&table.id, conn)?;
2021-08-28 23:08:12 +08:00
let mut workspace: Workspace = table.into();
workspace.apps.items = apps;
workspaces.push(workspace);
}
2021-08-27 23:53:53 +08:00
Ok(RepeatedWorkspace { items: workspaces })
2021-07-20 14:03:21 +08:00
}
fn read_local_workspace(&self, workspace_id: String, user_id: &str, conn: &SqliteConnection) -> Result<Workspace, WorkspaceError> {
2021-09-07 17:12:03 +08:00
// Opti: fetch single workspace from local db
let mut repeated_workspace = self.read_local_workspaces(Some(workspace_id), user_id, conn)?;
2021-08-28 23:08:12 +08:00
if repeated_workspace.is_empty() {
2021-08-30 22:44:17 +08:00
return Err(ErrorBuilder::new(ErrorCode::RecordNotFound).build());
2021-08-28 23:08:12 +08:00
}
debug_assert_eq!(repeated_workspace.len(), 1);
2021-09-01 22:50:22 +08:00
let workspace = repeated_workspace.drain(..1).collect::<Vec<Workspace>>().pop().unwrap();
2021-08-28 23:08:12 +08:00
Ok(workspace)
}
#[tracing::instrument(level = "debug", skip(self, conn), err)]
fn read_local_apps(&self, workspace_id: &str, conn: &SqliteConnection) -> Result<Vec<App>, WorkspaceError> {
2021-07-20 14:03:21 +08:00
let apps = self
.workspace_sql
.read_apps_belong_to_workspace(workspace_id, conn)?
2021-07-20 14:03:21 +08:00
.into_iter()
.map(|app_table| app_table.into())
.collect::<Vec<App>>();
Ok(apps)
}
2021-07-13 23:08:20 +08:00
}
2021-08-24 21:38:53 +08:00
impl WorkspaceController {
2021-09-02 19:57:19 +08:00
fn token_with_server(&self) -> Result<(String, Server), WorkspaceError> {
let token = self.user.token()?;
let server = self.server.clone();
Ok((token, server))
}
2021-08-28 23:08:12 +08:00
2021-09-05 13:50:23 +08:00
#[tracing::instrument(skip(self), err)]
2021-09-02 19:57:19 +08:00
async fn create_workspace_on_server(&self, params: CreateWorkspaceParams) -> Result<Workspace, WorkspaceError> {
let token = self.user.token()?;
2021-09-02 19:57:19 +08:00
let workspace = self.server.create_workspace(&token, params).await?;
Ok(workspace)
2021-08-28 23:08:12 +08:00
}
2021-09-05 13:50:23 +08:00
#[tracing::instrument(skip(self), err)]
2021-09-02 19:57:19 +08:00
async fn update_workspace_on_server(&self, params: UpdateWorkspaceParams) -> Result<(), WorkspaceError> {
let (token, server) = self.token_with_server()?;
spawn(async move {
match server.update_workspace(&token, params).await {
Ok(_) => {},
Err(e) => {
// TODO: retry?
log::error!("Update workspace failed: {:?}", e);
},
}
});
Ok(())
}
2021-08-24 21:38:53 +08:00
2021-09-05 13:50:23 +08:00
#[tracing::instrument(skip(self), err)]
2021-09-02 19:57:19 +08:00
async fn delete_workspace_on_server(&self, workspace_id: &str) -> Result<(), WorkspaceError> {
let params = DeleteWorkspaceParams {
workspace_id: workspace_id.to_string(),
};
let (token, server) = self.token_with_server()?;
spawn(async move {
match server.delete_workspace(&token, params).await {
Ok(_) => {},
Err(e) => {
// TODO: retry?
log::error!("Delete workspace failed: {:?}", e);
},
}
});
Ok(())
}
2021-08-25 17:34:20 +08:00
2021-09-05 13:50:23 +08:00
#[tracing::instrument(skip(self), err)]
2021-09-06 16:18:34 +08:00
async fn read_workspaces_on_server(&self, user_id: String, params: QueryWorkspaceParams) -> Result<(), WorkspaceError> {
2021-09-02 19:57:19 +08:00
let (token, server) = self.token_with_server()?;
2021-09-07 23:30:43 +08:00
let workspace_sql = self.workspace_sql.clone();
let app_sql = self.app_sql.clone();
let view_sql = self.view_sql.clone();
let conn = self.database.db_connection()?;
spawn(async move {
2021-09-07 17:12:03 +08:00
// Opti: retry?
let workspaces = server.read_workspace(&token, params).await?;
// TODO: rollback if fail
2021-09-07 17:12:03 +08:00
let _ = (&*conn).immediate_transaction::<_, WorkspaceError, _>(|| {
2021-09-07 23:30:43 +08:00
log::debug!("Save {} workspace", workspaces.len());
2021-09-07 17:12:03 +08:00
for workspace in &workspaces.items {
let mut m_workspace = workspace.clone();
let apps = m_workspace.apps.take_items();
2021-09-07 17:12:03 +08:00
let workspace_table = WorkspaceTable::new(m_workspace, &user_id);
2021-09-07 23:30:43 +08:00
let _ = workspace_sql.create_workspace(workspace_table, &*conn)?;
log::debug!("Save {} apps", apps.len());
for mut app in apps {
let views = app.belongings.take_items();
match app_sql.create_app(AppTable::new(app), &*conn) {
Ok(_) => {},
Err(e) => log::error!("create app failed: {:?}", e),
}
2021-09-07 23:30:43 +08:00
log::debug!("Save {} views", views.len());
for view in views {
match view_sql.create_view(ViewTable::new(view), &*conn) {
Ok(_) => {},
Err(e) => log::error!("create view failed: {:?}", e),
}
}
}
2021-09-07 17:12:03 +08:00
}
Ok(())
})?;
2021-09-07 23:30:43 +08:00
observable(&user_id, WorkspaceObservable::WorkspaceListUpdated)
2021-09-07 17:12:03 +08:00
.payload(workspaces)
.build();
Result::<(), WorkspaceError>::Ok(())
});
2021-09-06 16:18:34 +08:00
Ok(())
2021-08-25 17:34:20 +08:00
}
}
const CURRENT_WORKSPACE_ID: &str = "current_workspace_id";
2021-08-25 17:34:20 +08:00
2021-09-03 12:44:48 +08:00
fn set_current_workspace(workspace: &str) { KV::set_str(CURRENT_WORKSPACE_ID, workspace.to_owned()); }
2021-08-26 17:58:59 +08:00
fn get_current_workspace() -> Result<String, WorkspaceError> {
2021-09-03 12:44:48 +08:00
match KV::get_str(CURRENT_WORKSPACE_ID) {
None => Err(ErrorBuilder::new(ErrorCode::CurrentWorkspaceNotFound).build()),
Some(workspace_id) => Ok(workspace_id),
}
2021-08-26 17:58:59 +08:00
}