tokio metrics

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-06-25 05:05:02 +00:00
parent 5ec49b3f62
commit cce270d938
13 changed files with 157 additions and 33 deletions

2
Cargo.lock generated
View file

@ -694,6 +694,7 @@ dependencies = [
"tikv-jemalloc-sys", "tikv-jemalloc-sys",
"tikv-jemallocator", "tikv-jemallocator",
"tokio", "tokio",
"tokio-metrics",
"tracing", "tracing",
"tracing-core", "tracing-core",
"tracing-subscriber", "tracing-subscriber",
@ -4058,6 +4059,7 @@ checksum = "eace09241d62c98b7eeb1107d4c5c64ca3bd7da92e8c218c153ab3a78f9be112"
dependencies = [ dependencies = [
"futures-util", "futures-util",
"pin-project-lite", "pin-project-lite",
"tokio",
"tokio-stream", "tokio-stream",
] ]

View file

@ -200,6 +200,9 @@ features = [
"io-util", "io-util",
] ]
[workspace.dependencies.tokio-metrics]
version = "0.3.1"
[workspace.dependencies.libloading] [workspace.dependencies.libloading]
version = "0.8.3" version = "0.8.3"
@ -382,10 +385,6 @@ version = "0.5.4"
default-features = false default-features = false
features = ["use_std"] features = ["use_std"]
[workspace.dependencies.tokio-metrics]
version = "0.3.1"
default-features = false
[workspace.dependencies.console-subscriber] [workspace.dependencies.console-subscriber]
version = "0.3" version = "0.3"

View file

@ -653,3 +653,37 @@ pub(super) fn memory_stats() -> RoomMessageEventContent {
html_body.expect("string result"), html_body.expect("string result"),
) )
} }
#[cfg(tokio_unstable)]
pub(super) async fn runtime_metrics(_body: Vec<&str>) -> Result<RoomMessageEventContent> {
let out = services().server.metrics.runtime_metrics().map_or_else(
|| "Runtime metrics are not available.".to_owned(),
|metrics| format!("```rs\n{metrics:#?}\n```"),
);
Ok(RoomMessageEventContent::text_markdown(out))
}
#[cfg(not(tokio_unstable))]
pub(super) async fn runtime_metrics(_body: Vec<&str>) -> Result<RoomMessageEventContent> {
Ok(RoomMessageEventContent::text_markdown(
"Runtime metrics require building with `tokio_unstable`.",
))
}
#[cfg(tokio_unstable)]
pub(super) async fn runtime_interval(_body: Vec<&str>) -> Result<RoomMessageEventContent> {
let out = services().server.metrics.runtime_interval().map_or_else(
|| "Runtime metrics are not available.".to_owned(),
|metrics| format!("```rs\n{metrics:#?}\n```"),
);
Ok(RoomMessageEventContent::text_markdown(out))
}
#[cfg(not(tokio_unstable))]
pub(super) async fn runtime_interval(_body: Vec<&str>) -> Result<RoomMessageEventContent> {
Ok(RoomMessageEventContent::text_markdown(
"Runtime metrics require building with `tokio_unstable`.",
))
}

View file

@ -160,6 +160,13 @@ pub(super) enum DebugCommand {
/// - Print extended memory usage /// - Print extended memory usage
MemoryStats, MemoryStats,
/// - Print general tokio runtime metric totals.
RuntimeMetrics,
/// - Print detailed tokio runtime metrics accumulated since last command
/// invocation.
RuntimeInterval,
/// - Developer test stubs /// - Developer test stubs
#[command(subcommand)] #[command(subcommand)]
Tester(TesterCommand), Tester(TesterCommand),
@ -213,6 +220,8 @@ pub(super) async fn process(command: DebugCommand, body: Vec<&str>) -> Result<Ro
no_cache, no_cache,
} => resolve_true_destination(body, server_name, no_cache).await?, } => resolve_true_destination(body, server_name, no_cache).await?,
DebugCommand::MemoryStats => memory_stats(), DebugCommand::MemoryStats => memory_stats(),
DebugCommand::RuntimeMetrics => runtime_metrics(body).await?,
DebugCommand::RuntimeInterval => runtime_interval(body).await?,
DebugCommand::Tester(command) => tester::process(command, body).await?, DebugCommand::Tester(command) => tester::process(command, body).await?,
}) })
} }

View file

@ -82,6 +82,7 @@ tikv-jemalloc-ctl.workspace = true
tikv-jemalloc-sys.optional = true tikv-jemalloc-sys.optional = true
tikv-jemalloc-sys.workspace = true tikv-jemalloc-sys.workspace = true
tokio.workspace = true tokio.workspace = true
tokio-metrics.workspace = true
tracing-core.workspace = true tracing-core.workspace = true
tracing-subscriber.workspace = true tracing-subscriber.workspace = true
tracing.workspace = true tracing.workspace = true

72
src/core/metrics/mod.rs Normal file
View file

@ -0,0 +1,72 @@
use std::sync::atomic::AtomicU32;
use tokio::runtime;
use tokio_metrics::TaskMonitor;
#[cfg(tokio_unstable)]
use tokio_metrics::{RuntimeIntervals, RuntimeMonitor};
pub struct Metrics {
_runtime: Option<runtime::Handle>,
runtime_metrics: Option<runtime::RuntimeMetrics>,
task_monitor: Option<TaskMonitor>,
#[cfg(tokio_unstable)]
_runtime_monitor: Option<RuntimeMonitor>,
#[cfg(tokio_unstable)]
runtime_intervals: std::sync::Mutex<Option<RuntimeIntervals>>,
// TODO: move stats
pub requests_spawn_active: AtomicU32,
pub requests_spawn_finished: AtomicU32,
pub requests_handle_active: AtomicU32,
pub requests_handle_finished: AtomicU32,
pub requests_panic: AtomicU32,
}
impl Metrics {
#[must_use]
pub fn new(runtime: Option<runtime::Handle>) -> Self {
#[cfg(tokio_unstable)]
let runtime_monitor = runtime.as_ref().map(RuntimeMonitor::new);
#[cfg(tokio_unstable)]
let runtime_intervals = runtime_monitor.as_ref().map(RuntimeMonitor::intervals);
Self {
_runtime: runtime.clone(),
runtime_metrics: runtime.as_ref().map(runtime::Handle::metrics),
task_monitor: runtime.map(|_| TaskMonitor::new()),
#[cfg(tokio_unstable)]
_runtime_monitor: runtime_monitor,
#[cfg(tokio_unstable)]
runtime_intervals: std::sync::Mutex::new(runtime_intervals),
requests_spawn_active: AtomicU32::new(0),
requests_spawn_finished: AtomicU32::new(0),
requests_handle_active: AtomicU32::new(0),
requests_handle_finished: AtomicU32::new(0),
requests_panic: AtomicU32::new(0),
}
}
#[cfg(tokio_unstable)]
pub fn runtime_interval(&self) -> Option<tokio_metrics::RuntimeMetrics> {
self.runtime_intervals
.lock()
.expect("locked")
.as_mut()
.map(Iterator::next)
.expect("next interval")
}
pub fn task_root(&self) -> Option<&TaskMonitor> { self.task_monitor.as_ref() }
pub fn runtime_metrics(&self) -> Option<&runtime::RuntimeMetrics> { self.runtime_metrics.as_ref() }
}

View file

@ -3,6 +3,7 @@ pub mod config;
pub mod debug; pub mod debug;
pub mod error; pub mod error;
pub mod log; pub mod log;
pub mod metrics;
pub mod mods; pub mod mods;
pub mod pdu; pub mod pdu;
pub mod server; pub mod server;

View file

@ -1,11 +1,11 @@
use std::{ use std::{
sync::atomic::{AtomicBool, AtomicU32, Ordering}, sync::atomic::{AtomicBool, Ordering},
time::SystemTime, time::SystemTime,
}; };
use tokio::{runtime, sync::broadcast}; use tokio::{runtime, sync::broadcast};
use crate::{config::Config, log, Err, Result}; use crate::{config::Config, log::Log, metrics::Metrics, Err, Result};
/// Server runtime state; public portion /// Server runtime state; public portion
pub struct Server { pub struct Server {
@ -33,33 +33,25 @@ pub struct Server {
pub signal: broadcast::Sender<&'static str>, pub signal: broadcast::Sender<&'static str>,
/// Logging subsystem state /// Logging subsystem state
pub log: log::Log, pub log: Log,
/// TODO: move stats /// Metrics subsystem state
pub requests_spawn_active: AtomicU32, pub metrics: Metrics,
pub requests_spawn_finished: AtomicU32,
pub requests_handle_active: AtomicU32,
pub requests_handle_finished: AtomicU32,
pub requests_panic: AtomicU32,
} }
impl Server { impl Server {
#[must_use] #[must_use]
pub fn new(config: Config, runtime: Option<runtime::Handle>, log: log::Log) -> Self { pub fn new(config: Config, runtime: Option<runtime::Handle>, log: Log) -> Self {
Self { Self {
config, config,
started: SystemTime::now(), started: SystemTime::now(),
stopping: AtomicBool::new(false), stopping: AtomicBool::new(false),
reloading: AtomicBool::new(false), reloading: AtomicBool::new(false),
restarting: AtomicBool::new(false), restarting: AtomicBool::new(false),
runtime, runtime: runtime.clone(),
signal: broadcast::channel::<&'static str>(1).0, signal: broadcast::channel::<&'static str>(1).0,
log, log,
requests_spawn_active: AtomicU32::new(0), metrics: Metrics::new(runtime),
requests_spawn_finished: AtomicU32::new(0),
requests_handle_active: AtomicU32::new(0),
requests_handle_finished: AtomicU32::new(0),
requests_panic: AtomicU32::new(0),
} }
} }

View file

@ -20,7 +20,7 @@ use tokio::runtime;
const WORKER_NAME: &str = "conduwuit:worker"; const WORKER_NAME: &str = "conduwuit:worker";
const WORKER_MIN: usize = 2; const WORKER_MIN: usize = 2;
const WORKER_KEEPALIVE_MS: u64 = 2500; const WORKER_KEEPALIVE: u64 = 36;
fn main() -> Result<(), Error> { fn main() -> Result<(), Error> {
let args = clap::parse(); let args = clap::parse();
@ -29,7 +29,7 @@ fn main() -> Result<(), Error> {
.enable_time() .enable_time()
.thread_name(WORKER_NAME) .thread_name(WORKER_NAME)
.worker_threads(cmp::max(WORKER_MIN, available_parallelism())) .worker_threads(cmp::max(WORKER_MIN, available_parallelism()))
.thread_keep_alive(Duration::from_millis(WORKER_KEEPALIVE_MS)) .thread_keep_alive(Duration::from_secs(WORKER_KEEPALIVE))
.build() .build()
.expect("built runtime"); .expect("built runtime");

View file

@ -153,6 +153,7 @@ fn body_limit_layer(server: &Server) -> DefaultBodyLimit { DefaultBodyLimit::max
fn catch_panic(err: Box<dyn Any + Send + 'static>) -> http::Response<http_body_util::Full<bytes::Bytes>> { fn catch_panic(err: Box<dyn Any + Send + 'static>) -> http::Response<http_body_util::Full<bytes::Bytes>> {
conduit_service::services() conduit_service::services()
.server .server
.metrics
.requests_panic .requests_panic
.fetch_add(1, std::sync::atomic::Ordering::Release); .fetch_add(1, std::sync::atomic::Ordering::Release);

View file

@ -17,11 +17,14 @@ pub(crate) async fn spawn(
return Err(StatusCode::SERVICE_UNAVAILABLE); return Err(StatusCode::SERVICE_UNAVAILABLE);
} }
let active = server.requests_spawn_active.fetch_add(1, Ordering::Relaxed); let active = server
.metrics
.requests_spawn_active
.fetch_add(1, Ordering::Relaxed);
trace!(active, "enter"); trace!(active, "enter");
defer! {{ defer! {{
let active = server.requests_spawn_active.fetch_sub(1, Ordering::Relaxed); let active = server.metrics.requests_spawn_active.fetch_sub(1, Ordering::Relaxed);
let finished = server.requests_spawn_finished.fetch_add(1, Ordering::Relaxed); let finished = server.metrics.requests_spawn_finished.fetch_add(1, Ordering::Relaxed);
trace!(active, finished, "leave"); trace!(active, finished, "leave");
}}; }};
@ -45,12 +48,13 @@ pub(crate) async fn handle(
} }
let active = server let active = server
.metrics
.requests_handle_active .requests_handle_active
.fetch_add(1, Ordering::Relaxed); .fetch_add(1, Ordering::Relaxed);
trace!(active, "enter"); trace!(active, "enter");
defer! {{ defer! {{
let active = server.requests_handle_active.fetch_sub(1, Ordering::Relaxed); let active = server.metrics.requests_handle_active.fetch_sub(1, Ordering::Relaxed);
let finished = server.requests_handle_finished.fetch_add(1, Ordering::Relaxed); let finished = server.metrics.requests_handle_finished.fetch_add(1, Ordering::Relaxed);
trace!(active, finished, "leave"); trace!(active, finished, "leave");
}}; }};

View file

@ -108,7 +108,7 @@ async fn handle_shutdown(server: &Arc<Server>, tx: &Sender<()>, handle: &axum_se
error!("failed sending shutdown transaction to channel: {e}"); error!("failed sending shutdown transaction to channel: {e}");
} }
let pending = server.requests_spawn_active.load(Ordering::Relaxed); let pending = server.metrics.requests_spawn_active.load(Ordering::Relaxed);
if pending > 0 { if pending > 0 {
let timeout = Duration::from_secs(36); let timeout = Duration::from_secs(36);
trace!(pending, ?timeout, "Notifying for graceful shutdown"); trace!(pending, ?timeout, "Notifying for graceful shutdown");

View file

@ -21,12 +21,21 @@ pub(super) async fn serve(
info!("Listening on {addrs:?}"); info!("Listening on {addrs:?}");
while join_set.join_next().await.is_some() {} while join_set.join_next().await.is_some() {}
let spawn_active = server.requests_spawn_active.load(Ordering::Relaxed); let spawn_active = server.metrics.requests_spawn_active.load(Ordering::Relaxed);
let handle_active = server.requests_handle_active.load(Ordering::Relaxed); let handle_active = server
.metrics
.requests_handle_active
.load(Ordering::Relaxed);
debug_info!( debug_info!(
spawn_finished = server.requests_spawn_finished.load(Ordering::Relaxed), spawn_finished = server
handle_finished = server.requests_handle_finished.load(Ordering::Relaxed), .metrics
panics = server.requests_panic.load(Ordering::Relaxed), .requests_spawn_finished
.load(Ordering::Relaxed),
handle_finished = server
.metrics
.requests_handle_finished
.load(Ordering::Relaxed),
panics = server.metrics.requests_panic.load(Ordering::Relaxed),
spawn_active, spawn_active,
handle_active, handle_active,
"Stopped listening on {addrs:?}", "Stopped listening on {addrs:?}",