add write buffer corking using rocksdb manual_wal_flush.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-03-19 09:56:36 -07:00 committed by June
parent 3969b667ba
commit 3f60365cc6
4 changed files with 98 additions and 6 deletions

View file

@ -20,6 +20,9 @@ pub(crate) trait KeyValueDatabaseEngine: Send + Sync {
fn flush(&self) -> Result<()>; fn flush(&self) -> Result<()>;
#[allow(dead_code)] #[allow(dead_code)]
fn sync(&self) -> Result<()> { Ok(()) } fn sync(&self) -> Result<()> { Ok(()) }
fn cork(&self) -> Result<()> { Ok(()) }
fn uncork(&self) -> Result<()> { Ok(()) }
fn corked(&self) -> bool { false }
fn cleanup(&self) -> Result<()> { Ok(()) } fn cleanup(&self) -> Result<()> { Ok(()) }
fn memory_usage(&self) -> Result<String> { fn memory_usage(&self) -> Result<String> {
Ok("Current database engine does not support memory usage reporting.".to_owned()) Ok("Current database engine does not support memory usage reporting.".to_owned())
@ -89,3 +92,32 @@ pub(crate) trait KvTree: Send + Sync {
Ok(()) Ok(())
} }
} }
pub struct Cork {
db: Arc<dyn KeyValueDatabaseEngine>,
flush: bool,
sync: bool,
}
impl Cork {
pub(crate) fn new(db: &Arc<dyn KeyValueDatabaseEngine>, flush: bool, sync: bool) -> Self {
db.cork().unwrap();
Cork {
db: db.clone(),
flush,
sync,
}
}
}
impl Drop for Cork {
fn drop(&mut self) {
self.db.uncork().ok();
if self.flush {
self.db.flush().ok();
}
if self.sync {
self.db.sync().ok();
}
}
}

View file

@ -1,7 +1,7 @@
use std::{ use std::{
future::Future, future::Future,
pin::Pin, pin::Pin,
sync::{Arc, RwLock}, sync::{atomic::AtomicU32, Arc, RwLock},
}; };
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
@ -23,6 +23,7 @@ pub(crate) struct Engine {
opts: rust_rocksdb::Options, opts: rust_rocksdb::Options,
env: rust_rocksdb::Env, env: rust_rocksdb::Env,
config: Config, config: Config,
corks: AtomicU32,
} }
struct RocksDbEngineTree<'a> { struct RocksDbEngineTree<'a> {
@ -62,6 +63,7 @@ fn db_options(
db_opts.set_max_subcompactions(threads.try_into().unwrap()); db_opts.set_max_subcompactions(threads.try_into().unwrap());
// IO // IO
db_opts.set_manual_wal_flush(true);
db_opts.set_use_direct_reads(true); db_opts.set_use_direct_reads(true);
db_opts.set_use_direct_io_for_flush_and_compaction(true); db_opts.set_use_direct_io_for_flush_and_compaction(true);
if config.rocksdb_optimize_for_spinning_disks { if config.rocksdb_optimize_for_spinning_disks {
@ -163,6 +165,7 @@ impl KeyValueDatabaseEngine for Arc<Engine> {
opts: db_opts, opts: db_opts,
env: db_env, env: db_env,
config: config.clone(), config: config.clone(),
corks: AtomicU32::new(0),
})) }))
} }
@ -193,6 +196,20 @@ impl KeyValueDatabaseEngine for Arc<Engine> {
Ok(()) Ok(())
} }
fn corked(&self) -> bool { self.corks.load(std::sync::atomic::Ordering::Relaxed) > 0 }
fn cork(&self) -> Result<()> {
self.corks.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Ok(())
}
fn uncork(&self) -> Result<()> {
self.corks.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
Ok(())
}
fn memory_usage(&self) -> Result<String> { fn memory_usage(&self) -> Result<String> {
let stats = rust_rocksdb::perf::get_memory_usage_stats( let stats = rust_rocksdb::perf::get_memory_usage_stats(
Some(&[&self.rocks]), Some(&[&self.rocks]),
@ -320,6 +337,10 @@ impl KvTree for RocksDbEngineTree<'_> {
drop(lock); drop(lock);
if !self.db.corked() {
self.db.flush()?;
}
self.watchers.wake(key); self.watchers.wake(key);
Ok(()) Ok(())
@ -334,13 +355,25 @@ impl KvTree for RocksDbEngineTree<'_> {
batch.put_cf(&self.cf(), key, value); batch.put_cf(&self.cf(), key, value);
} }
Ok(self.db.rocks.write_opt(batch, &writeoptions)?) let result = self.db.rocks.write_opt(batch, &writeoptions);
if !self.db.corked() {
self.db.flush()?;
}
Ok(result?)
} }
fn remove(&self, key: &[u8]) -> Result<()> { fn remove(&self, key: &[u8]) -> Result<()> {
let writeoptions = rust_rocksdb::WriteOptions::default(); let writeoptions = rust_rocksdb::WriteOptions::default();
Ok(self.db.rocks.delete_cf_opt(&self.cf(), key, &writeoptions)?) let result = self.db.rocks.delete_cf_opt(&self.cf(), key, &writeoptions);
if !self.db.corked() {
self.db.flush()?;
}
Ok(result?)
} }
fn remove_batch(&self, iter: &mut dyn Iterator<Item = Vec<u8>>) -> Result<()> { fn remove_batch(&self, iter: &mut dyn Iterator<Item = Vec<u8>>) -> Result<()> {
@ -352,7 +385,13 @@ impl KvTree for RocksDbEngineTree<'_> {
batch.delete_cf(&self.cf(), key); batch.delete_cf(&self.cf(), key);
} }
Ok(self.db.rocks.write_opt(batch, &writeoptions)?) let result = self.db.rocks.write_opt(batch, &writeoptions);
if !self.db.corked() {
self.db.flush()?;
}
Ok(result?)
} }
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> { fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
@ -404,6 +443,11 @@ impl KvTree for RocksDbEngineTree<'_> {
self.db.rocks.put_cf_opt(&self.cf(), key, &new, &writeoptions)?; self.db.rocks.put_cf_opt(&self.cf(), key, &new, &writeoptions)?;
drop(lock); drop(lock);
if !self.db.corked() {
self.db.flush()?;
}
Ok(new) Ok(new)
} }
@ -426,6 +470,10 @@ impl KvTree for RocksDbEngineTree<'_> {
drop(lock); drop(lock);
if !self.db.corked() {
self.db.flush()?;
}
Ok(()) Ok(())
} }

View file

@ -9,7 +9,10 @@ use ruma::{
DeviceId, MilliSecondsSinceUnixEpoch, OwnedServerSigningKeyId, ServerName, UserId, DeviceId, MilliSecondsSinceUnixEpoch, OwnedServerSigningKeyId, ServerName, UserId,
}; };
use crate::{database::KeyValueDatabase, service, services, utils, Error, Result}; use crate::{
database::{abstraction::Cork, KeyValueDatabase},
service, services, utils, Error, Result,
};
const COUNTER: &[u8] = b"c"; const COUNTER: &[u8] = b"c";
const LAST_CHECK_FOR_UPDATES_COUNT: &[u8] = b"u"; const LAST_CHECK_FOR_UPDATES_COUNT: &[u8] = b"u";
@ -119,6 +122,12 @@ impl service::globals::Data for KeyValueDatabase {
fn flush(&self) -> Result<()> { self.db.flush() } fn flush(&self) -> Result<()> { self.db.flush() }
fn cork(&self) -> Result<Cork> { Ok(Cork::new(&self.db, false, false)) }
fn cork_and_flush(&self) -> Result<Cork> { Ok(Cork::new(&self.db, true, false)) }
fn cork_and_sync(&self) -> Result<Cork> { Ok(Cork::new(&self.db, true, true)) }
fn memory_usage(&self) -> String { fn memory_usage(&self) -> String {
let pdu_cache = self.pdu_cache.lock().unwrap().len(); let pdu_cache = self.pdu_cache.lock().unwrap().len();
let shorteventid_cache = self.shorteventid_cache.lock().unwrap().len(); let shorteventid_cache = self.shorteventid_cache.lock().unwrap().len();

View file

@ -7,7 +7,7 @@ use ruma::{
DeviceId, OwnedServerSigningKeyId, ServerName, UserId, DeviceId, OwnedServerSigningKeyId, ServerName, UserId,
}; };
use crate::Result; use crate::{database::abstraction::Cork, Result};
#[async_trait] #[async_trait]
pub trait Data: Send + Sync { pub trait Data: Send + Sync {
@ -19,6 +19,9 @@ pub trait Data: Send + Sync {
async fn watch(&self, user_id: &UserId, device_id: &DeviceId) -> Result<()>; async fn watch(&self, user_id: &UserId, device_id: &DeviceId) -> Result<()>;
fn cleanup(&self) -> Result<()>; fn cleanup(&self) -> Result<()>;
fn flush(&self) -> Result<()>; fn flush(&self) -> Result<()>;
fn cork(&self) -> Result<Cork>;
fn cork_and_flush(&self) -> Result<Cork>;
fn cork_and_sync(&self) -> Result<Cork>;
fn memory_usage(&self) -> String; fn memory_usage(&self) -> String;
fn clear_caches(&self, amount: u32); fn clear_caches(&self, amount: u32);
fn load_keypair(&self) -> Result<Ed25519KeyPair>; fn load_keypair(&self) -> Result<Ed25519KeyPair>;