propagate error from database options building

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-08-04 06:16:17 +00:00
parent 992f725362
commit f69f4c479c
2 changed files with 32 additions and 18 deletions

View file

@ -49,7 +49,7 @@ impl Engine {
let mut db_env = Env::new().or_else(or_else)?; let mut db_env = Env::new().or_else(or_else)?;
let row_cache = Cache::new_lru_cache(row_cache_capacity_bytes); let row_cache = Cache::new_lru_cache(row_cache_capacity_bytes);
let db_opts = db_options(config, &mut db_env, &row_cache, col_cache.get("primary").expect("cache")); let db_opts = db_options(config, &mut db_env, &row_cache, col_cache.get("primary").expect("cache"))?;
let load_time = std::time::Instant::now(); let load_time = std::time::Instant::now();
if config.rocksdb_repair { if config.rocksdb_repair {
@ -63,9 +63,15 @@ impl Engine {
.collect::<BTreeSet<_>>(); .collect::<BTreeSet<_>>();
debug!("Opening {} column family descriptors in database", cfs.len()); debug!("Opening {} column family descriptors in database", cfs.len());
let cfopts = cfs
.iter()
.map(|name| cf_options(config, name, db_opts.clone(), &mut col_cache))
.collect::<Result<Vec<_>>>()?;
let cfds = cfs let cfds = cfs
.iter() .iter()
.map(|name| ColumnFamilyDescriptor::new(name, cf_options(config, name, db_opts.clone(), &mut col_cache))) .zip(cfopts.into_iter())
.map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
debug!("Opening database..."); debug!("Opening database...");
@ -102,7 +108,7 @@ impl Engine {
debug!("Creating new column family in database: {name}"); debug!("Creating new column family in database: {name}");
let mut col_cache = self.col_cache.write().expect("locked"); let mut col_cache = self.col_cache.write().expect("locked");
let opts = cf_options(&self.server.config, name, self.opts.clone(), &mut col_cache); let opts = cf_options(&self.server.config, name, self.opts.clone(), &mut col_cache)?;
if let Err(e) = self.db.create_cf(name, &opts) { if let Err(e) = self.db.create_cf(name, &opts) {
error!(?name, "Failed to create new column family: {e}"); error!(?name, "Failed to create new column family: {e}");
return or_else(e); return or_else(e);

View file

@ -1,6 +1,6 @@
use std::{cmp, collections::HashMap}; use std::{cmp, collections::HashMap, convert::TryFrom};
use conduit::{utils, Config}; use conduit::{utils, Config, Result};
use rocksdb::{ use rocksdb::{
statistics::StatsLevel, BlockBasedOptions, Cache, DBCompactionStyle, DBCompressionType, DBRecoveryMode, Env, statistics::StatsLevel, BlockBasedOptions, Cache, DBCompactionStyle, DBCompressionType, DBRecoveryMode, Env,
LogLevel, Options, UniversalCompactOptions, UniversalCompactionStopStyle, LogLevel, Options, UniversalCompactOptions, UniversalCompactionStopStyle,
@ -11,8 +11,7 @@ use rocksdb::{
/// resulting value. Note that we require special per-column options on some /// resulting value. Note that we require special per-column options on some
/// columns, therefor columns should only be opened after passing this result /// columns, therefor columns should only be opened after passing this result
/// through cf_options(). /// through cf_options().
pub(crate) fn db_options(config: &Config, env: &mut Env, row_cache: &Cache, col_cache: &Cache) -> Options { pub(crate) fn db_options(config: &Config, env: &mut Env, row_cache: &Cache, col_cache: &Cache) -> Result<Options> {
const MIN_PARALLELISM: usize = 2;
const DEFAULT_STATS_LEVEL: StatsLevel = if cfg!(debug_assertions) { const DEFAULT_STATS_LEVEL: StatsLevel = if cfg!(debug_assertions) {
StatsLevel::ExceptDetailedTimers StatsLevel::ExceptDetailedTimers
} else { } else {
@ -25,14 +24,9 @@ pub(crate) fn db_options(config: &Config, env: &mut Env, row_cache: &Cache, col_
set_logging_defaults(&mut opts, config); set_logging_defaults(&mut opts, config);
// Processing // Processing
let threads = if config.rocksdb_parallelism_threads == 0 { opts.set_enable_pipelined_write(true);
cmp::max(MIN_PARALLELISM, utils::available_parallelism()) opts.set_max_background_jobs(num_threads::<i32>(config)?);
} else { opts.set_max_subcompactions(num_threads::<u32>(config)?);
cmp::max(MIN_PARALLELISM, config.rocksdb_parallelism_threads)
};
opts.set_max_background_jobs(threads.try_into().unwrap());
opts.set_max_subcompactions(threads.try_into().unwrap());
opts.set_max_file_opening_threads(0); opts.set_max_file_opening_threads(0);
if config.rocksdb_compaction_prio_idle { if config.rocksdb_compaction_prio_idle {
env.lower_thread_pool_cpu_priority(); env.lower_thread_pool_cpu_priority();
@ -100,13 +94,15 @@ pub(crate) fn db_options(config: &Config, env: &mut Env, row_cache: &Cache, col_
}); });
opts.set_env(env); opts.set_env(env);
opts Ok(opts)
} }
/// Adjust options for the specific column by name. Provide the result of /// Adjust options for the specific column by name. Provide the result of
/// db_options() as the argument to this function and use the return value in /// db_options() as the argument to this function and use the return value in
/// the arguments to open the specific column. /// the arguments to open the specific column.
pub(crate) fn cf_options(cfg: &Config, name: &str, mut opts: Options, cache: &mut HashMap<String, Cache>) -> Options { pub(crate) fn cf_options(
cfg: &Config, name: &str, mut opts: Options, cache: &mut HashMap<String, Cache>,
) -> Result<Options> {
// Columns with non-default compaction options // Columns with non-default compaction options
match name { match name {
"backupid_algorithm" "backupid_algorithm"
@ -169,7 +165,7 @@ pub(crate) fn cf_options(cfg: &Config, name: &str, mut opts: Options, cache: &mu
&_ => {}, &_ => {},
} }
opts Ok(opts)
} }
fn set_logging_defaults(opts: &mut Options, config: &Config) { fn set_logging_defaults(opts: &mut Options, config: &Config) {
@ -346,3 +342,15 @@ fn table_options(_config: &Config) -> BlockBasedOptions {
opts opts
} }
fn num_threads<T: TryFrom<usize>>(config: &Config) -> Result<T> {
const MIN_PARALLELISM: usize = 2;
let requested = if config.rocksdb_parallelism_threads != 0 {
config.rocksdb_parallelism_threads
} else {
utils::available_parallelism()
};
utils::math::try_into::<T, usize>(cmp::max(MIN_PARALLELISM, requested))
}