convert rocksdb errors locally; remove from Error.
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
c1712d4d8b
commit
01f4455ceb
6 changed files with 65 additions and 58 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -675,7 +675,6 @@ dependencies = [
|
|||
"reqwest",
|
||||
"ring",
|
||||
"ruma",
|
||||
"rust-rocksdb-uwu",
|
||||
"sanitize-filename",
|
||||
"serde",
|
||||
"serde_json",
|
||||
|
|
|
@ -24,14 +24,10 @@ release_max_log_level = [
|
|||
"log/max_level_trace",
|
||||
"log/release_max_level_info",
|
||||
]
|
||||
rocksdb = [
|
||||
"dep:rust-rocksdb",
|
||||
]
|
||||
jemalloc = [
|
||||
"dep:tikv-jemalloc-sys",
|
||||
"dep:tikv-jemalloc-ctl",
|
||||
"dep:tikv-jemallocator",
|
||||
"rust-rocksdb/jemalloc",
|
||||
]
|
||||
jemalloc_prof = [
|
||||
"tikv-jemalloc-sys/profiling",
|
||||
|
@ -39,18 +35,13 @@ jemalloc_prof = [
|
|||
hardened_malloc = [
|
||||
"dep:hardened_malloc-rs"
|
||||
]
|
||||
io_uring = [
|
||||
"rust-rocksdb/io-uring",
|
||||
]
|
||||
zstd_compression = [
|
||||
"rust-rocksdb/zstd",
|
||||
]
|
||||
gzip_compression = [
|
||||
"reqwest/gzip",
|
||||
]
|
||||
brotli_compression = [
|
||||
"reqwest/brotli",
|
||||
]
|
||||
zstd_compression =[]
|
||||
perf_measurements = []
|
||||
sentry_telemetry = []
|
||||
|
||||
|
@ -72,8 +63,13 @@ regex.workspace = true
|
|||
reqwest.workspace = true
|
||||
ring.workspace = true
|
||||
ruma.workspace = true
|
||||
<<<<<<< HEAD
|
||||
rust-rocksdb.optional = true
|
||||
rust-rocksdb.workspace = true
|
||||
=======
|
||||
rusqlite.optional = true
|
||||
rusqlite.workspace = true
|
||||
>>>>>>> 6963b38f (convert rocksdb errors locally; remove from Error.)
|
||||
sanitize-filename.workspace = true
|
||||
serde_json.workspace = true
|
||||
serde_regex.workspace = true
|
||||
|
|
|
@ -25,12 +25,8 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
|
|||
|
||||
#[derive(Error)]
|
||||
pub enum Error {
|
||||
#[cfg(feature = "rocksdb")]
|
||||
#[error("There was a problem with the connection to the rocksdb database: {source}")]
|
||||
RocksDb {
|
||||
#[from]
|
||||
source: rust_rocksdb::Error,
|
||||
},
|
||||
#[error("{0}")]
|
||||
Database(String),
|
||||
#[error("Could not generate an image: {source}")]
|
||||
Image {
|
||||
#[from]
|
||||
|
@ -114,8 +110,7 @@ impl Error {
|
|||
let db_error = String::from("Database or I/O error occurred.");
|
||||
|
||||
match self {
|
||||
#[cfg(feature = "rocksdb")]
|
||||
Self::RocksDb {
|
||||
Self::Database {
|
||||
..
|
||||
} => db_error,
|
||||
Self::Io {
|
||||
|
|
|
@ -2,7 +2,10 @@ use std::{future::Future, pin::Pin, sync::Arc};
|
|||
|
||||
use conduit::{utils, Result};
|
||||
|
||||
use super::{rust_rocksdb::WriteBatchWithTransaction, watchers::Watchers, Engine, KeyValueDatabaseEngine, KvTree};
|
||||
use super::{
|
||||
or_else, result, rust_rocksdb::WriteBatchWithTransaction, watchers::Watchers, Engine, KeyValueDatabaseEngine,
|
||||
KvTree,
|
||||
};
|
||||
|
||||
pub(crate) struct RocksDbEngineTree<'a> {
|
||||
pub(crate) db: Arc<Engine>,
|
||||
|
@ -19,7 +22,7 @@ impl KvTree for RocksDbEngineTree<'_> {
|
|||
let mut readoptions = rust_rocksdb::ReadOptions::default();
|
||||
readoptions.set_total_order_seek(true);
|
||||
|
||||
Ok(self.db.rocks.get_cf_opt(&self.cf(), key, &readoptions)?)
|
||||
result(self.db.rocks.get_cf_opt(&self.cf(), key, &readoptions))
|
||||
}
|
||||
|
||||
fn multi_get(&self, keys: &[&[u8]]) -> Result<Vec<Option<Vec<u8>>>> {
|
||||
|
@ -36,9 +39,10 @@ impl KvTree for RocksDbEngineTree<'_> {
|
|||
.rocks
|
||||
.batched_multi_get_cf_opt(&self.cf(), keys, SORTED, &readoptions)
|
||||
{
|
||||
match res? {
|
||||
Some(res) => ret.push(Some((*res).to_vec())),
|
||||
None => ret.push(None),
|
||||
match res {
|
||||
Ok(Some(res)) => ret.push(Some((*res).to_vec())),
|
||||
Ok(None) => ret.push(None),
|
||||
Err(e) => return or_else(e),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -50,7 +54,8 @@ impl KvTree for RocksDbEngineTree<'_> {
|
|||
|
||||
self.db
|
||||
.rocks
|
||||
.put_cf_opt(&self.cf(), key, value, &writeoptions)?;
|
||||
.put_cf_opt(&self.cf(), key, value, &writeoptions)
|
||||
.or_else(or_else)?;
|
||||
|
||||
if !self.db.corked() {
|
||||
self.db.flush()?;
|
||||
|
@ -70,25 +75,25 @@ impl KvTree for RocksDbEngineTree<'_> {
|
|||
batch.put_cf(&self.cf(), key, value);
|
||||
}
|
||||
|
||||
let result = self.db.rocks.write_opt(batch, &writeoptions);
|
||||
let res = self.db.rocks.write_opt(batch, &writeoptions);
|
||||
|
||||
if !self.db.corked() {
|
||||
self.db.flush()?;
|
||||
}
|
||||
|
||||
Ok(result?)
|
||||
result(res)
|
||||
}
|
||||
|
||||
fn remove(&self, key: &[u8]) -> Result<()> {
|
||||
let writeoptions = rust_rocksdb::WriteOptions::default();
|
||||
|
||||
let result = self.db.rocks.delete_cf_opt(&self.cf(), key, &writeoptions);
|
||||
let res = self.db.rocks.delete_cf_opt(&self.cf(), key, &writeoptions);
|
||||
|
||||
if !self.db.corked() {
|
||||
self.db.flush()?;
|
||||
}
|
||||
|
||||
Ok(result?)
|
||||
result(res)
|
||||
}
|
||||
|
||||
fn remove_batch(&self, iter: &mut dyn Iterator<Item = Vec<u8>>) -> Result<()> {
|
||||
|
@ -100,13 +105,13 @@ impl KvTree for RocksDbEngineTree<'_> {
|
|||
batch.delete_cf(&self.cf(), key);
|
||||
}
|
||||
|
||||
let result = self.db.rocks.write_opt(batch, &writeoptions);
|
||||
let res = self.db.rocks.write_opt(batch, &writeoptions);
|
||||
|
||||
if !self.db.corked() {
|
||||
self.db.flush()?;
|
||||
}
|
||||
|
||||
Ok(result?)
|
||||
result(res)
|
||||
}
|
||||
|
||||
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
|
||||
|
@ -151,11 +156,16 @@ impl KvTree for RocksDbEngineTree<'_> {
|
|||
readoptions.set_total_order_seek(true);
|
||||
let writeoptions = rust_rocksdb::WriteOptions::default();
|
||||
|
||||
let old = self.db.rocks.get_cf_opt(&self.cf(), key, &readoptions)?;
|
||||
let old = self
|
||||
.db
|
||||
.rocks
|
||||
.get_cf_opt(&self.cf(), key, &readoptions)
|
||||
.or_else(or_else)?;
|
||||
let new = utils::increment(old.as_deref());
|
||||
self.db
|
||||
.rocks
|
||||
.put_cf_opt(&self.cf(), key, new, &writeoptions)?;
|
||||
.put_cf_opt(&self.cf(), key, new, &writeoptions)
|
||||
.or_else(or_else)?;
|
||||
|
||||
if !self.db.corked() {
|
||||
self.db.flush()?;
|
||||
|
@ -172,12 +182,19 @@ impl KvTree for RocksDbEngineTree<'_> {
|
|||
let mut batch = WriteBatchWithTransaction::<false>::default();
|
||||
|
||||
for key in iter {
|
||||
let old = self.db.rocks.get_cf_opt(&self.cf(), &key, &readoptions)?;
|
||||
let old = self
|
||||
.db
|
||||
.rocks
|
||||
.get_cf_opt(&self.cf(), &key, &readoptions)
|
||||
.or_else(or_else)?;
|
||||
let new = utils::increment(old.as_deref());
|
||||
batch.put_cf(&self.cf(), key, new);
|
||||
}
|
||||
|
||||
self.db.rocks.write_opt(batch, &writeoptions)?;
|
||||
self.db
|
||||
.rocks
|
||||
.write_opt(batch, &writeoptions)
|
||||
.or_else(or_else)?;
|
||||
|
||||
if !self.db.corked() {
|
||||
self.db.flush()?;
|
||||
|
|
|
@ -51,7 +51,7 @@ impl KeyValueDatabaseEngine for Arc<Engine> {
|
|||
let mut col_cache = HashMap::new();
|
||||
col_cache.insert("primary".to_owned(), Cache::new_lru_cache(col_cache_capacity_bytes));
|
||||
|
||||
let mut db_env = Env::new()?;
|
||||
let mut db_env = Env::new().or_else(or_else)?;
|
||||
let row_cache = Cache::new_lru_cache(row_cache_capacity_bytes);
|
||||
let db_opts = db_options(config, &mut db_env, &row_cache, col_cache.get("primary").expect("cache"));
|
||||
|
||||
|
@ -73,12 +73,13 @@ impl KeyValueDatabaseEngine for Arc<Engine> {
|
|||
.collect::<Vec<_>>();
|
||||
|
||||
debug!("Opening database...");
|
||||
let db = if config.rocksdb_read_only {
|
||||
Db::<MultiThreaded>::open_cf_for_read_only(&db_opts, &config.database_path, cfs.clone(), false)?
|
||||
let res = if config.rocksdb_read_only {
|
||||
Db::<MultiThreaded>::open_cf_for_read_only(&db_opts, &config.database_path, cfs.clone(), false)
|
||||
} else {
|
||||
Db::<MultiThreaded>::open_cf_descriptors(&db_opts, &config.database_path, cfds)?
|
||||
Db::<MultiThreaded>::open_cf_descriptors(&db_opts, &config.database_path, cfds)
|
||||
};
|
||||
|
||||
let db = res.or_else(or_else)?;
|
||||
info!(
|
||||
"Opened database at sequence number {} in {:?}",
|
||||
db.latest_sequence_number(),
|
||||
|
@ -115,17 +116,9 @@ impl KeyValueDatabaseEngine for Arc<Engine> {
|
|||
}))
|
||||
}
|
||||
|
||||
fn flush(&self) -> Result<()> {
|
||||
DBCommon::flush_wal(&self.rocks, false)?;
|
||||
fn flush(&self) -> Result<()> { result(DBCommon::flush_wal(&self.rocks, false)) }
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn sync(&self) -> Result<()> {
|
||||
DBCommon::flush_wal(&self.rocks, true)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
fn sync(&self) -> Result<()> { result(DBCommon::flush_wal(&self.rocks, true)) }
|
||||
|
||||
fn corked(&self) -> bool { self.corks.load(std::sync::atomic::Ordering::Relaxed) > 0 }
|
||||
|
||||
|
@ -146,7 +139,7 @@ impl KeyValueDatabaseEngine for Arc<Engine> {
|
|||
#[allow(clippy::as_conversions, clippy::cast_sign_loss, clippy::cast_possible_truncation)]
|
||||
fn memory_usage(&self) -> Result<String> {
|
||||
let mut res = String::new();
|
||||
let stats = get_memory_usage_stats(Some(&[&self.rocks]), Some(&[&self.row_cache]))?;
|
||||
let stats = get_memory_usage_stats(Some(&[&self.rocks]), Some(&[&self.row_cache])).or_else(or_else)?;
|
||||
writeln!(
|
||||
res,
|
||||
"Memory buffers: {:.2} MiB\nPending write: {:.2} MiB\nTable readers: {:.2} MiB\nRow cache: {:.2} MiB",
|
||||
|
@ -168,10 +161,7 @@ impl KeyValueDatabaseEngine for Arc<Engine> {
|
|||
fn cleanup(&self) -> Result<()> {
|
||||
debug!("Running flush_opt");
|
||||
let flushoptions = rust_rocksdb::FlushOptions::default();
|
||||
|
||||
DBCommon::flush_opt(&self.rocks, &flushoptions)?;
|
||||
|
||||
Ok(())
|
||||
result(DBCommon::flush_opt(&self.rocks, &flushoptions))
|
||||
}
|
||||
|
||||
fn backup(&self) -> Result<(), Box<dyn std::error::Error>> {
|
||||
|
@ -214,8 +204,8 @@ impl KeyValueDatabaseEngine for Arc<Engine> {
|
|||
}
|
||||
|
||||
let mut res = String::new();
|
||||
let options = BackupEngineOptions::new(path.unwrap())?;
|
||||
let engine = BackupEngine::open(&options, &self.env)?;
|
||||
let options = BackupEngineOptions::new(path.expect("valid path")).or_else(or_else)?;
|
||||
let engine = BackupEngine::open(&options, &self.env).or_else(or_else)?;
|
||||
for info in engine.get_backup_info() {
|
||||
writeln!(
|
||||
res,
|
||||
|
@ -275,3 +265,15 @@ impl Drop for Engine {
|
|||
self.env.join_all_threads();
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn result<T>(r: std::result::Result<T, rust_rocksdb::Error>) -> Result<T, conduit::Error> {
|
||||
r.map_or_else(or_else, and_then)
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
fn and_then<T>(t: T) -> Result<T, conduit::Error> { Ok(t) }
|
||||
|
||||
fn or_else<T>(e: rust_rocksdb::Error) -> Result<T, conduit::Error> { Err(map_err(e)) }
|
||||
|
||||
fn map_err(e: rust_rocksdb::Error) -> conduit::Error { conduit::Error::Database(e.into_string()) }
|
||||
|
|
|
@ -83,7 +83,6 @@ hardened_malloc = [
|
|||
]
|
||||
io_uring = [
|
||||
"conduit-admin/io_uring",
|
||||
"conduit-core/io_uring",
|
||||
"conduit-database/io_uring",
|
||||
]
|
||||
jemalloc = [
|
||||
|
@ -119,7 +118,6 @@ release_max_log_level = [
|
|||
]
|
||||
rocksdb = [
|
||||
"conduit-admin/rocksdb",
|
||||
"conduit-core/rocksdb",
|
||||
"conduit-database/rocksdb",
|
||||
]
|
||||
sentry_telemetry = [
|
||||
|
|
Loading…
Add table
Reference in a new issue