From f69f4c479ced451f0547dabb0615287e0c9c4328 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Sun, 4 Aug 2024 06:16:17 +0000 Subject: [PATCH] propagate error from database options building Signed-off-by: Jason Volk --- src/database/engine.rs | 12 +++++++++--- src/database/opts.rs | 38 +++++++++++++++++++++++--------------- 2 files changed, 32 insertions(+), 18 deletions(-) diff --git a/src/database/engine.rs b/src/database/engine.rs index 3975d3d9..6c5b5bd2 100644 --- a/src/database/engine.rs +++ b/src/database/engine.rs @@ -49,7 +49,7 @@ impl Engine { let mut db_env = Env::new().or_else(or_else)?; let row_cache = Cache::new_lru_cache(row_cache_capacity_bytes); - let db_opts = db_options(config, &mut db_env, &row_cache, col_cache.get("primary").expect("cache")); + let db_opts = db_options(config, &mut db_env, &row_cache, col_cache.get("primary").expect("cache"))?; let load_time = std::time::Instant::now(); if config.rocksdb_repair { @@ -63,9 +63,15 @@ impl Engine { .collect::>(); debug!("Opening {} column family descriptors in database", cfs.len()); + let cfopts = cfs + .iter() + .map(|name| cf_options(config, name, db_opts.clone(), &mut col_cache)) + .collect::>>()?; + let cfds = cfs .iter() - .map(|name| ColumnFamilyDescriptor::new(name, cf_options(config, name, db_opts.clone(), &mut col_cache))) + .zip(cfopts.into_iter()) + .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)) .collect::>(); debug!("Opening database..."); @@ -102,7 +108,7 @@ impl Engine { debug!("Creating new column family in database: {name}"); let mut col_cache = self.col_cache.write().expect("locked"); - let opts = cf_options(&self.server.config, name, self.opts.clone(), &mut col_cache); + let opts = cf_options(&self.server.config, name, self.opts.clone(), &mut col_cache)?; if let Err(e) = self.db.create_cf(name, &opts) { error!(?name, "Failed to create new column family: {e}"); return or_else(e); diff --git a/src/database/opts.rs b/src/database/opts.rs index 8791e778..890e0188 100644 --- a/src/database/opts.rs +++ b/src/database/opts.rs @@ -1,6 +1,6 @@ -use std::{cmp, collections::HashMap}; +use std::{cmp, collections::HashMap, convert::TryFrom}; -use conduit::{utils, Config}; +use conduit::{utils, Config, Result}; use rocksdb::{ statistics::StatsLevel, BlockBasedOptions, Cache, DBCompactionStyle, DBCompressionType, DBRecoveryMode, Env, LogLevel, Options, UniversalCompactOptions, UniversalCompactionStopStyle, @@ -11,8 +11,7 @@ use rocksdb::{ /// resulting value. Note that we require special per-column options on some /// columns, therefor columns should only be opened after passing this result /// through cf_options(). -pub(crate) fn db_options(config: &Config, env: &mut Env, row_cache: &Cache, col_cache: &Cache) -> Options { - const MIN_PARALLELISM: usize = 2; +pub(crate) fn db_options(config: &Config, env: &mut Env, row_cache: &Cache, col_cache: &Cache) -> Result { const DEFAULT_STATS_LEVEL: StatsLevel = if cfg!(debug_assertions) { StatsLevel::ExceptDetailedTimers } else { @@ -25,14 +24,9 @@ pub(crate) fn db_options(config: &Config, env: &mut Env, row_cache: &Cache, col_ set_logging_defaults(&mut opts, config); // Processing - let threads = if config.rocksdb_parallelism_threads == 0 { - cmp::max(MIN_PARALLELISM, utils::available_parallelism()) - } else { - cmp::max(MIN_PARALLELISM, config.rocksdb_parallelism_threads) - }; - - opts.set_max_background_jobs(threads.try_into().unwrap()); - opts.set_max_subcompactions(threads.try_into().unwrap()); + opts.set_enable_pipelined_write(true); + opts.set_max_background_jobs(num_threads::(config)?); + opts.set_max_subcompactions(num_threads::(config)?); opts.set_max_file_opening_threads(0); if config.rocksdb_compaction_prio_idle { env.lower_thread_pool_cpu_priority(); @@ -100,13 +94,15 @@ pub(crate) fn db_options(config: &Config, env: &mut Env, row_cache: &Cache, col_ }); opts.set_env(env); - opts + Ok(opts) } /// Adjust options for the specific column by name. Provide the result of /// db_options() as the argument to this function and use the return value in /// the arguments to open the specific column. -pub(crate) fn cf_options(cfg: &Config, name: &str, mut opts: Options, cache: &mut HashMap) -> Options { +pub(crate) fn cf_options( + cfg: &Config, name: &str, mut opts: Options, cache: &mut HashMap, +) -> Result { // Columns with non-default compaction options match name { "backupid_algorithm" @@ -169,7 +165,7 @@ pub(crate) fn cf_options(cfg: &Config, name: &str, mut opts: Options, cache: &mu &_ => {}, } - opts + Ok(opts) } fn set_logging_defaults(opts: &mut Options, config: &Config) { @@ -346,3 +342,15 @@ fn table_options(_config: &Config) -> BlockBasedOptions { opts } + +fn num_threads>(config: &Config) -> Result { + const MIN_PARALLELISM: usize = 2; + + let requested = if config.rocksdb_parallelism_threads != 0 { + config.rocksdb_parallelism_threads + } else { + utils::available_parallelism() + }; + + utils::math::try_into::(cmp::max(MIN_PARALLELISM, requested)) +}