use crate::{ errors::FlowyError, services::doc::revision::RevisionRecord, sql_tables::{ doc::RevisionTable, mk_revision_record_from_table, RevTableType, RevisionChangeset, RevisionTableState, }, }; use diesel::update; use flowy_collaboration::entities::revision::RevisionRange; use flowy_database::{insert_or_ignore_into, prelude::*, schema::rev_table::dsl, SqliteConnection}; pub struct RevisionTableSql {} impl RevisionTableSql { pub(crate) fn create(revision_records: Vec, conn: &SqliteConnection) -> Result<(), FlowyError> { // Batch insert: https://diesel.rs/guides/all-about-inserts.html let records = revision_records .into_iter() .map(|record| { let rev_state: RevisionTableState = record.state.into(); ( dsl::doc_id.eq(record.revision.doc_id), dsl::base_rev_id.eq(record.revision.base_rev_id), dsl::rev_id.eq(record.revision.rev_id), dsl::data.eq(record.revision.delta_data), dsl::state.eq(rev_state), dsl::ty.eq(RevTableType::Local), ) }) .collect::>(); let _ = insert_or_ignore_into(dsl::rev_table).values(&records).execute(conn)?; Ok(()) } pub(crate) fn update(changeset: RevisionChangeset, conn: &SqliteConnection) -> Result<(), FlowyError> { let filter = dsl::rev_table .filter(dsl::rev_id.eq(changeset.rev_id.as_ref())) .filter(dsl::doc_id.eq(changeset.doc_id)); let _ = update(filter).set(dsl::state.eq(changeset.state)).execute(conn)?; tracing::debug!("Set {} to {:?}", changeset.rev_id, changeset.state); Ok(()) } pub(crate) fn read( user_id: &str, doc_id: &str, rev_ids: Option>, conn: &SqliteConnection, ) -> Result, FlowyError> { let mut sql = dsl::rev_table.filter(dsl::doc_id.eq(doc_id)).into_boxed(); if let Some(rev_ids) = rev_ids { sql = sql.filter(dsl::rev_id.eq_any(rev_ids)); } let rows = sql.order(dsl::rev_id.asc()).load::(conn)?; let records = rows .into_iter() .map(|row| mk_revision_record_from_table(user_id, row)) .collect::>(); Ok(records) } pub(crate) fn read_with_range( user_id: &str, doc_id: &str, range: RevisionRange, conn: &SqliteConnection, ) -> Result, FlowyError> { let rev_tables = dsl::rev_table .filter(dsl::rev_id.ge(range.start)) .filter(dsl::rev_id.le(range.end)) .filter(dsl::doc_id.eq(doc_id)) .order(dsl::rev_id.asc()) .load::(conn)?; let revisions = rev_tables .into_iter() .map(|table| mk_revision_record_from_table(user_id, table)) .collect::>(); Ok(revisions) } pub(crate) fn delete(doc_id: &str, rev_ids: Option>, conn: &SqliteConnection) -> Result<(), FlowyError> { let mut sql = dsl::rev_table.filter(dsl::doc_id.eq(doc_id)).into_boxed(); if let Some(rev_ids) = rev_ids { sql = sql.filter(dsl::rev_id.eq_any(rev_ids)); } let affected_row = sql.execute(conn)?; tracing::debug!("Delete {} revision rows", affected_row); Ok(()) } }