146 lines
5.1 KiB
Rust
Raw Normal View History

2022-03-10 22:27:19 +08:00
use crate::disk::RevisionRecord;
use crate::REVISION_WRITE_INTERVAL_IN_MILLIS;
2021-12-13 22:46:35 +08:00
use dashmap::DashMap;
2021-12-18 18:35:45 +08:00
use flowy_error::{FlowyError, FlowyResult};
2022-03-19 16:52:28 +08:00
use flowy_sync::entities::revision::RevisionRange;
2022-01-02 10:34:42 +08:00
use std::{borrow::Cow, sync::Arc, time::Duration};
2021-12-18 18:35:45 +08:00
use tokio::{sync::RwLock, task::JoinHandle};
2021-12-13 22:46:35 +08:00
2021-12-18 18:35:45 +08:00
pub(crate) trait RevisionMemoryCacheDelegate: Send + Sync {
2022-01-01 16:16:06 +08:00
fn checkpoint_tick(&self, records: Vec<RevisionRecord>) -> FlowyResult<()>;
2022-01-14 15:23:21 +08:00
fn receive_ack(&self, object_id: &str, rev_id: i64);
2021-12-18 00:23:26 +08:00
}
2022-01-14 15:23:21 +08:00
pub(crate) struct RevisionMemoryCache {
object_id: String,
2021-12-13 22:46:35 +08:00
revs_map: Arc<DashMap<i64, RevisionRecord>>,
2021-12-18 18:35:45 +08:00
delegate: Arc<dyn RevisionMemoryCacheDelegate>,
pending_write_revs: Arc<RwLock<Vec<i64>>>,
defer_save: RwLock<Option<JoinHandle<()>>>,
2021-12-13 22:46:35 +08:00
}
2022-01-14 15:23:21 +08:00
impl RevisionMemoryCache {
pub(crate) fn new(object_id: &str, delegate: Arc<dyn RevisionMemoryCacheDelegate>) -> Self {
RevisionMemoryCache {
object_id: object_id.to_owned(),
2021-12-13 22:46:35 +08:00
revs_map: Arc::new(DashMap::new()),
2021-12-18 18:35:45 +08:00
delegate,
pending_write_revs: Arc::new(RwLock::new(vec![])),
defer_save: RwLock::new(None),
2021-12-13 22:46:35 +08:00
}
}
2022-01-23 12:14:00 +08:00
pub(crate) fn contains(&self, rev_id: &i64) -> bool {
self.revs_map.contains_key(rev_id)
}
2021-12-13 22:46:35 +08:00
2022-01-02 10:34:42 +08:00
pub(crate) async fn add<'a>(&'a self, record: Cow<'a, RevisionRecord>) {
let record = match record {
Cow::Borrowed(record) => record.clone(),
Cow::Owned(record) => record,
};
2022-01-07 17:37:11 +08:00
let rev_id = record.revision.rev_id;
2022-01-26 23:29:18 +08:00
self.revs_map.insert(rev_id, record);
2022-01-07 17:37:11 +08:00
let mut write_guard = self.pending_write_revs.write().await;
if !write_guard.contains(&rev_id) {
write_guard.push(rev_id);
drop(write_guard);
2022-01-26 23:29:18 +08:00
self.make_checkpoint().await;
2021-12-13 22:46:35 +08:00
}
2021-12-18 00:23:26 +08:00
}
2021-12-13 22:46:35 +08:00
2022-01-01 16:16:06 +08:00
pub(crate) async fn ack(&self, rev_id: &i64) {
2021-12-18 18:35:45 +08:00
match self.revs_map.get_mut(rev_id) {
2022-01-23 12:14:00 +08:00
None => {}
2021-12-18 18:35:45 +08:00
Some(mut record) => record.ack(),
2021-12-13 22:46:35 +08:00
}
2021-12-18 18:35:45 +08:00
2022-01-26 23:29:18 +08:00
if self.pending_write_revs.read().await.contains(rev_id) {
self.make_checkpoint().await;
} else {
2021-12-18 18:35:45 +08:00
// The revision must be saved on disk if the pending_write_revs
// doesn't contains the rev_id.
2022-01-14 15:23:21 +08:00
self.delegate.receive_ack(&self.object_id, *rev_id);
2021-12-18 18:35:45 +08:00
}
}
2022-01-01 16:16:06 +08:00
pub(crate) async fn get(&self, rev_id: &i64) -> Option<RevisionRecord> {
2022-01-23 12:14:00 +08:00
self.revs_map.get(rev_id).map(|r| r.value().clone())
2021-12-13 22:46:35 +08:00
}
2022-01-26 23:29:18 +08:00
pub(crate) fn remove(&self, rev_id: &i64) {
let _ = self.revs_map.remove(rev_id);
}
pub(crate) fn remove_with_range(&self, range: &RevisionRange) {
for rev_id in range.iter() {
self.remove(&rev_id);
}
}
2022-01-01 16:16:06 +08:00
pub(crate) async fn get_with_range(&self, range: &RevisionRange) -> Result<Vec<RevisionRecord>, FlowyError> {
2021-12-13 22:46:35 +08:00
let revs = range
.iter()
2021-12-18 00:23:26 +08:00
.flat_map(|rev_id| self.revs_map.get(&rev_id).map(|record| record.clone()))
.collect::<Vec<RevisionRecord>>();
2021-12-18 18:35:45 +08:00
Ok(revs)
}
2021-12-13 22:46:35 +08:00
2022-01-26 23:29:18 +08:00
pub(crate) async fn reset_with_revisions(&self, revision_records: Vec<RevisionRecord>) {
2022-01-02 10:34:42 +08:00
self.revs_map.clear();
if let Some(handler) = self.defer_save.write().await.take() {
handler.abort();
}
2022-01-07 17:37:11 +08:00
let mut write_guard = self.pending_write_revs.write().await;
write_guard.clear();
for record in revision_records {
write_guard.push(record.revision.rev_id);
2022-01-26 23:29:18 +08:00
self.revs_map.insert(record.revision.rev_id, record);
2022-01-07 17:37:11 +08:00
}
drop(write_guard);
self.make_checkpoint().await;
2022-01-02 10:34:42 +08:00
}
2021-12-18 18:35:45 +08:00
async fn make_checkpoint(&self) {
// https://github.com/async-graphql/async-graphql/blob/ed8449beec3d9c54b94da39bab33cec809903953/src/dataloader/mod.rs#L362
if let Some(handler) = self.defer_save.write().await.take() {
handler.abort();
}
if self.pending_write_revs.read().await.is_empty() {
return;
2021-12-13 22:46:35 +08:00
}
2021-12-18 18:35:45 +08:00
let rev_map = self.revs_map.clone();
let pending_write_revs = self.pending_write_revs.clone();
let delegate = self.delegate.clone();
*self.defer_save.write().await = Some(tokio::spawn(async move {
2022-01-22 18:48:43 +08:00
tokio::time::sleep(Duration::from_millis(REVISION_WRITE_INTERVAL_IN_MILLIS)).await;
2021-12-18 18:35:45 +08:00
let mut revs_write_guard = pending_write_revs.write().await;
// It may cause performance issues because we hold the write lock of the
// rev_order and the lock will be released after the checkpoint has been written
// to the disk.
//
// Use saturating_sub and split_off ?
// https://stackoverflow.com/questions/28952411/what-is-the-idiomatic-way-to-pop-the-last-n-elements-in-a-mutable-vec
let mut save_records: Vec<RevisionRecord> = vec![];
revs_write_guard.iter().for_each(|rev_id| match rev_map.get(rev_id) {
2022-01-23 12:14:00 +08:00
None => {}
2021-12-18 18:35:45 +08:00
Some(value) => {
save_records.push(value.value().clone());
2022-01-23 12:14:00 +08:00
}
2021-12-18 18:35:45 +08:00
});
2022-01-01 16:16:06 +08:00
if delegate.checkpoint_tick(save_records).is_ok() {
2021-12-18 18:35:45 +08:00
revs_write_guard.clear();
drop(revs_write_guard);
}
}));
2021-12-13 22:46:35 +08:00
}
}