diff --git a/src/database/abstraction.rs b/src/database/abstraction.rs index 0fabb3ce..45e62b6b 100644 --- a/src/database/abstraction.rs +++ b/src/database/abstraction.rs @@ -20,6 +20,9 @@ pub(crate) trait KeyValueDatabaseEngine: Send + Sync { fn flush(&self) -> Result<()>; #[allow(dead_code)] fn sync(&self) -> Result<()> { Ok(()) } + fn cork(&self) -> Result<()> { Ok(()) } + fn uncork(&self) -> Result<()> { Ok(()) } + fn corked(&self) -> bool { false } fn cleanup(&self) -> Result<()> { Ok(()) } fn memory_usage(&self) -> Result { Ok("Current database engine does not support memory usage reporting.".to_owned()) @@ -89,3 +92,32 @@ pub(crate) trait KvTree: Send + Sync { Ok(()) } } + +pub struct Cork { + db: Arc, + flush: bool, + sync: bool, +} + +impl Cork { + pub(crate) fn new(db: &Arc, flush: bool, sync: bool) -> Self { + db.cork().unwrap(); + Cork { + db: db.clone(), + flush, + sync, + } + } +} + +impl Drop for Cork { + fn drop(&mut self) { + self.db.uncork().ok(); + if self.flush { + self.db.flush().ok(); + } + if self.sync { + self.db.sync().ok(); + } + } +} diff --git a/src/database/abstraction/rocksdb.rs b/src/database/abstraction/rocksdb.rs index ac97a5ad..90e5183d 100644 --- a/src/database/abstraction/rocksdb.rs +++ b/src/database/abstraction/rocksdb.rs @@ -1,7 +1,7 @@ use std::{ future::Future, pin::Pin, - sync::{Arc, RwLock}, + sync::{atomic::AtomicU32, Arc, RwLock}, }; use chrono::{DateTime, Utc}; @@ -23,6 +23,7 @@ pub(crate) struct Engine { opts: rust_rocksdb::Options, env: rust_rocksdb::Env, config: Config, + corks: AtomicU32, } struct RocksDbEngineTree<'a> { @@ -62,6 +63,7 @@ fn db_options( db_opts.set_max_subcompactions(threads.try_into().unwrap()); // IO + db_opts.set_manual_wal_flush(true); db_opts.set_use_direct_reads(true); db_opts.set_use_direct_io_for_flush_and_compaction(true); if config.rocksdb_optimize_for_spinning_disks { @@ -163,6 +165,7 @@ impl KeyValueDatabaseEngine for Arc { opts: db_opts, env: db_env, config: config.clone(), + corks: AtomicU32::new(0), })) } @@ -193,6 +196,20 @@ impl KeyValueDatabaseEngine for Arc { Ok(()) } + fn corked(&self) -> bool { self.corks.load(std::sync::atomic::Ordering::Relaxed) > 0 } + + fn cork(&self) -> Result<()> { + self.corks.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + + Ok(()) + } + + fn uncork(&self) -> Result<()> { + self.corks.fetch_sub(1, std::sync::atomic::Ordering::Relaxed); + + Ok(()) + } + fn memory_usage(&self) -> Result { let stats = rust_rocksdb::perf::get_memory_usage_stats( Some(&[&self.rocks]), @@ -320,6 +337,10 @@ impl KvTree for RocksDbEngineTree<'_> { drop(lock); + if !self.db.corked() { + self.db.flush()?; + } + self.watchers.wake(key); Ok(()) @@ -334,13 +355,25 @@ impl KvTree for RocksDbEngineTree<'_> { batch.put_cf(&self.cf(), key, value); } - Ok(self.db.rocks.write_opt(batch, &writeoptions)?) + let result = self.db.rocks.write_opt(batch, &writeoptions); + + if !self.db.corked() { + self.db.flush()?; + } + + Ok(result?) } fn remove(&self, key: &[u8]) -> Result<()> { let writeoptions = rust_rocksdb::WriteOptions::default(); - Ok(self.db.rocks.delete_cf_opt(&self.cf(), key, &writeoptions)?) + let result = self.db.rocks.delete_cf_opt(&self.cf(), key, &writeoptions); + + if !self.db.corked() { + self.db.flush()?; + } + + Ok(result?) } fn remove_batch(&self, iter: &mut dyn Iterator>) -> Result<()> { @@ -352,7 +385,13 @@ impl KvTree for RocksDbEngineTree<'_> { batch.delete_cf(&self.cf(), key); } - Ok(self.db.rocks.write_opt(batch, &writeoptions)?) + let result = self.db.rocks.write_opt(batch, &writeoptions); + + if !self.db.corked() { + self.db.flush()?; + } + + Ok(result?) } fn iter<'a>(&'a self) -> Box, Vec)> + 'a> { @@ -404,6 +443,11 @@ impl KvTree for RocksDbEngineTree<'_> { self.db.rocks.put_cf_opt(&self.cf(), key, &new, &writeoptions)?; drop(lock); + + if !self.db.corked() { + self.db.flush()?; + } + Ok(new) } @@ -426,6 +470,10 @@ impl KvTree for RocksDbEngineTree<'_> { drop(lock); + if !self.db.corked() { + self.db.flush()?; + } + Ok(()) } diff --git a/src/database/key_value/globals.rs b/src/database/key_value/globals.rs index 55ebed4a..3b5a4730 100644 --- a/src/database/key_value/globals.rs +++ b/src/database/key_value/globals.rs @@ -9,7 +9,10 @@ use ruma::{ DeviceId, MilliSecondsSinceUnixEpoch, OwnedServerSigningKeyId, ServerName, UserId, }; -use crate::{database::KeyValueDatabase, service, services, utils, Error, Result}; +use crate::{ + database::{abstraction::Cork, KeyValueDatabase}, + service, services, utils, Error, Result, +}; const COUNTER: &[u8] = b"c"; const LAST_CHECK_FOR_UPDATES_COUNT: &[u8] = b"u"; @@ -119,6 +122,12 @@ impl service::globals::Data for KeyValueDatabase { fn flush(&self) -> Result<()> { self.db.flush() } + fn cork(&self) -> Result { Ok(Cork::new(&self.db, false, false)) } + + fn cork_and_flush(&self) -> Result { Ok(Cork::new(&self.db, true, false)) } + + fn cork_and_sync(&self) -> Result { Ok(Cork::new(&self.db, true, true)) } + fn memory_usage(&self) -> String { let pdu_cache = self.pdu_cache.lock().unwrap().len(); let shorteventid_cache = self.shorteventid_cache.lock().unwrap().len(); diff --git a/src/service/globals/data.rs b/src/service/globals/data.rs index 8322c1aa..1615b5a0 100644 --- a/src/service/globals/data.rs +++ b/src/service/globals/data.rs @@ -7,7 +7,7 @@ use ruma::{ DeviceId, OwnedServerSigningKeyId, ServerName, UserId, }; -use crate::Result; +use crate::{database::abstraction::Cork, Result}; #[async_trait] pub trait Data: Send + Sync { @@ -19,6 +19,9 @@ pub trait Data: Send + Sync { async fn watch(&self, user_id: &UserId, device_id: &DeviceId) -> Result<()>; fn cleanup(&self) -> Result<()>; fn flush(&self) -> Result<()>; + fn cork(&self) -> Result; + fn cork_and_flush(&self) -> Result; + fn cork_and_sync(&self) -> Result; fn memory_usage(&self) -> String; fn clear_caches(&self, amount: u32); fn load_keypair(&self) -> Result;