replace num_cpus dependency with available_parallelism()

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-05-25 20:58:37 +00:00 committed by June 🍓🦴
parent d2aef071bc
commit a537462d51
8 changed files with 22 additions and 13 deletions

2
Cargo.lock generated
View file

@ -581,7 +581,6 @@ dependencies = [
"console-subscriber", "console-subscriber",
"hardened_malloc-rs", "hardened_malloc-rs",
"log", "log",
"num_cpus",
"opentelemetry", "opentelemetry",
"opentelemetry-jaeger", "opentelemetry-jaeger",
"opentelemetry_sdk", "opentelemetry_sdk",
@ -712,7 +711,6 @@ dependencies = [
"futures-util", "futures-util",
"log", "log",
"lru-cache", "lru-cache",
"num_cpus",
"parking_lot", "parking_lot",
"ruma", "ruma",
"rusqlite", "rusqlite",

View file

@ -258,9 +258,6 @@ version = "0.1.80"
[workspace.dependencies.lru-cache] [workspace.dependencies.lru-cache]
version = "0.1.2" version = "0.1.2"
[workspace.dependencies.num_cpus]
version = "1.16.0"
# Used for matrix spec type definitions and helpers # Used for matrix spec type definitions and helpers
[workspace.dependencies.ruma] [workspace.dependencies.ruma]
git = "https://github.com/girlbossceo/ruwuma" git = "https://github.com/girlbossceo/ruwuma"

View file

@ -267,3 +267,15 @@ pub fn maximize_fd_limit() -> Result<(), nix::errno::Errno> {
Ok(()) Ok(())
} }
/// Get the number of threads which could execute in parallel based on the
/// hardware and administrative constraints of this system. This value should be
/// used to hint the size of thread-pools and divide-and-conquer algorithms.
///
/// * <https://doc.rust-lang.org/std/thread/fn.available_parallelism.html>
#[must_use]
pub fn available_parallelism() -> usize {
std::thread::available_parallelism()
.expect("Unable to query for available parallelism.")
.get()
}

View file

@ -56,7 +56,6 @@ conduit-core.workspace = true
futures-util.workspace = true futures-util.workspace = true
log.workspace = true log.workspace = true
lru-cache.workspace = true lru-cache.workspace = true
num_cpus.workspace = true
parking_lot.optional = true parking_lot.optional = true
parking_lot.workspace = true parking_lot.workspace = true
ruma.workspace = true ruma.workspace = true

View file

@ -1,5 +1,7 @@
#![allow(dead_code)] #![allow(dead_code)]
use std::collections::HashMap; use std::{cmp, collections::HashMap};
use conduit::utils;
use super::{ use super::{
rust_rocksdb::{ rust_rocksdb::{
@ -21,10 +23,11 @@ pub(crate) fn db_options(config: &Config, env: &mut Env, row_cache: &Cache, col_
set_logging_defaults(&mut opts, config); set_logging_defaults(&mut opts, config);
// Processing // Processing
const MIN_PARALLELISM: usize = 2;
let threads = if config.rocksdb_parallelism_threads == 0 { let threads = if config.rocksdb_parallelism_threads == 0 {
std::cmp::max(2, num_cpus::get()) // max cores if user specified 0 cmp::max(MIN_PARALLELISM, utils::available_parallelism())
} else { } else {
config.rocksdb_parallelism_threads cmp::max(MIN_PARALLELISM, config.rocksdb_parallelism_threads)
}; };
opts.set_max_background_jobs(threads.try_into().unwrap()); opts.set_max_background_jobs(threads.try_into().unwrap());

View file

@ -108,7 +108,8 @@ impl KeyValueDatabaseEngine for Arc<Engine> {
clippy::cast_precision_loss, clippy::cast_precision_loss,
clippy::cast_sign_loss clippy::cast_sign_loss
)] )]
let cache_size_per_thread = ((config.db_cache_capacity_mb * 1024.0) / ((num_cpus::get() as f64 * 2.0) + 1.0)) as u32; let cache_size_per_thread = ((config.db_cache_capacity_mb * 1024.0)
/ ((conduit::utils::available_parallelism() as f64 * 2.0) + 1.0)) as u32;
let writer = Mutex::new(Engine::prepare_conn(&path, cache_size_per_thread)?); let writer = Mutex::new(Engine::prepare_conn(&path, cache_size_per_thread)?);

View file

@ -78,7 +78,6 @@ log.workspace = true
tracing.workspace = true tracing.workspace = true
tracing-subscriber.workspace = true tracing-subscriber.workspace = true
clap.workspace = true clap.workspace = true
num_cpus.workspace = true
opentelemetry.workspace = true opentelemetry.workspace = true
opentelemetry.optional = true opentelemetry.optional = true

View file

@ -6,7 +6,7 @@ extern crate conduit_core as conduit;
use std::{cmp, sync::Arc, time::Duration}; use std::{cmp, sync::Arc, time::Duration};
use conduit::{debug_info, error, Error, Result}; use conduit::{debug_info, error, utils::available_parallelism, Error, Result};
use server::Server; use server::Server;
use tokio::runtime; use tokio::runtime;
@ -20,7 +20,7 @@ fn main() -> Result<(), Error> {
.enable_io() .enable_io()
.enable_time() .enable_time()
.thread_name(WORKER_NAME) .thread_name(WORKER_NAME)
.worker_threads(cmp::max(WORKER_MIN, num_cpus::get())) .worker_threads(cmp::max(WORKER_MIN, available_parallelism()))
.thread_keep_alive(Duration::from_millis(WORKER_KEEPALIVE_MS)) .thread_keep_alive(Duration::from_millis(WORKER_KEEPALIVE_MS))
.build() .build()
.expect("built runtime"); .expect("built runtime");