From 0e74ade7d799d5c09836b15e57642b6d91c76f28 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Wed, 5 Jun 2024 19:59:50 +0000 Subject: [PATCH] isolate axum shutdown in router; minor run-cycle/signalling tweaks Signed-off-by: Jason Volk --- Cargo.lock | 1 - src/core/Cargo.toml | 1 - src/core/server.rs | 30 +++++++++++----------------- src/main/main.rs | 43 ++++++++++++++++++++++------------------ src/main/mods.rs | 2 +- src/router/request.rs | 4 ++-- src/router/run.rs | 44 ++++++++++++++++------------------------- src/service/services.rs | 4 ++-- 8 files changed, 57 insertions(+), 72 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 23457c4e..2f5d12b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -658,7 +658,6 @@ version = "0.4.1" dependencies = [ "argon2", "axum 0.7.5", - "axum-server", "bytes", "either", "figment", diff --git a/src/core/Cargo.toml b/src/core/Cargo.toml index f2ccf43e..c90a03b3 100644 --- a/src/core/Cargo.toml +++ b/src/core/Cargo.toml @@ -61,7 +61,6 @@ sentry_telemetry = [] [dependencies] argon2.workspace = true -axum-server.workspace = true axum.workspace = true bytes.workspace = true either.workspace = true diff --git a/src/core/server.rs b/src/core/server.rs index 9e4497ae..6cf477cb 100644 --- a/src/core/server.rs +++ b/src/core/server.rs @@ -1,8 +1,5 @@ use std::{ - sync::{ - atomic::{AtomicBool, AtomicU32}, - Mutex, - }, + sync::atomic::{AtomicBool, AtomicU32}, time::SystemTime, }; @@ -18,24 +15,20 @@ pub struct Server { /// Timestamp server was started; used for uptime. pub started: SystemTime, - /// Reload/shutdown signal channel. Called from the signal handler or admin - /// command to initiate shutdown. - pub shutdown: Mutex>, - - /// Reload/shutdown signal - pub signal: broadcast::Sender<&'static str>, + /// Reload/shutdown pending indicator; server is shutting down. This is an + /// observable used on shutdown and should not be modified. + pub stopping: AtomicBool, /// Reload/shutdown desired indicator; when false, shutdown is desired. This /// is an observable used on shutdown and modifying is not recommended. - pub reload: AtomicBool, - - /// Reload/shutdown pending indicator; server is shutting down. This is an - /// observable used on shutdown and should not be modified. - pub interrupt: AtomicBool, + pub reloading: AtomicBool, /// Handle to the runtime pub runtime: Option, + /// Reload/shutdown signal + pub signal: broadcast::Sender<&'static str>, + /// Log level reload handles. pub tracing_reload_handle: LogLevelReloadHandles, @@ -53,11 +46,10 @@ impl Server { Self { config, started: SystemTime::now(), - shutdown: Mutex::new(None), - signal: broadcast::channel::<&'static str>(1).0, - reload: AtomicBool::new(false), - interrupt: AtomicBool::new(false), + stopping: AtomicBool::new(false), + reloading: AtomicBool::new(false), runtime, + signal: broadcast::channel::<&'static str>(1).0, tracing_reload_handle, requests_spawn_active: AtomicU32::new(0), requests_spawn_finished: AtomicU32::new(0), diff --git a/src/main/main.rs b/src/main/main.rs index 73e6e690..7933538d 100644 --- a/src/main/main.rs +++ b/src/main/main.rs @@ -6,10 +6,9 @@ extern crate conduit_core as conduit; use std::{cmp, sync::Arc, time::Duration}; -use conduit::{debug_error, debug_info, error, utils::available_parallelism, warn, Error, Result}; +use conduit::{debug_error, debug_info, error, trace, utils::available_parallelism, warn, Error, Result}; use server::Server; use tokio::{runtime, signal}; -use tracing::debug; const WORKER_NAME: &str = "conduwuit:worker"; const WORKER_MIN: usize = 2; @@ -97,33 +96,39 @@ async fn async_main(server: &Arc) -> Result<(), Error> { Ok(()) } +#[cfg(unix)] #[tracing::instrument(skip_all)] async fn signal(server: Arc) { - let (mut term, mut quit); - #[cfg(unix)] - { - use signal::unix; - quit = unix::signal(unix::SignalKind::quit()).expect("SIGQUIT handler"); - term = unix::signal(unix::SignalKind::terminate()).expect("SIGTERM handler"); - }; - + use signal::unix; + let mut quit = unix::signal(unix::SignalKind::quit()).expect("SIGQUIT handler"); + let mut term = unix::signal(unix::SignalKind::terminate()).expect("SIGTERM handler"); loop { - debug!("Installed signal handlers"); + trace!("Installed signal handlers"); let sig: &'static str; - #[cfg(unix)] tokio::select! { + _ = signal::ctrl_c() => { sig = "SIGINT"; }, + _ = quit.recv() => { sig = "SIGQUIT"; }, _ = term.recv() => { sig = "SIGTERM"; }, - _ = quit.recv() => { sig = "Ctrl+\\"; }, - _ = signal::ctrl_c() => { sig = "Ctrl+C"; }, - } - #[cfg(not(unix))] - tokio::select! { - _ = signal::ctrl_c() => { sig = "Ctrl+C"; }, } - warn!("Received signal {}", sig); + warn!("Received {sig}"); if let Err(e) = server.server.signal.send(sig) { debug_error!("signal channel: {e}"); } } } + +#[cfg(not(unix))] +#[tracing::instrument(skip_all)] +async fn signal(server: Arc) { + loop { + tokio::select! { + _ = signal::ctrl_c() => { + warn!("Received Ctrl+C"); + if let Err(e) = server.server.signal.send("SIGINT") { + debug_error!("signal channel: {e}"); + } + }, + } + } +} diff --git a/src/main/mods.rs b/src/main/mods.rs index 90a25f97..125cd54a 100644 --- a/src/main/mods.rs +++ b/src/main/mods.rs @@ -44,7 +44,7 @@ pub(crate) async fn run(server: &Arc, starts: bool) -> Result<(bool, boo error!("Running server: {error}"); return Err(error); } - let reloads = server.server.reload.swap(false, Ordering::AcqRel); + let reloads = server.server.reloading.swap(false, Ordering::AcqRel); let stops = !reloads || stale(server).await? <= restart_thresh(); let starts = reloads && stops; if stops { diff --git a/src/router/request.rs b/src/router/request.rs index 25bdd98e..0693e671 100644 --- a/src/router/request.rs +++ b/src/router/request.rs @@ -13,7 +13,7 @@ use tracing::{debug, error, trace}; pub(crate) async fn spawn( State(server): State>, req: http::Request, next: axum::middleware::Next, ) -> Result { - if server.interrupt.load(Ordering::Relaxed) { + if server.stopping.load(Ordering::Relaxed) { debug_warn!("unavailable pending shutdown"); return Err(StatusCode::SERVICE_UNAVAILABLE); } @@ -35,7 +35,7 @@ pub(crate) async fn spawn( pub(crate) async fn handle( State(server): State>, req: http::Request, next: axum::middleware::Next, ) -> Result { - if server.interrupt.load(Ordering::Relaxed) { + if server.stopping.load(Ordering::Relaxed) { debug_warn!( method = %req.method(), uri = %req.uri(), diff --git a/src/router/run.rs b/src/router/run.rs index 4d06442f..85e058cf 100644 --- a/src/router/run.rs +++ b/src/router/run.rs @@ -25,16 +25,12 @@ pub(crate) async fn run(server: Arc) -> Result<(), Error> { _ = services().admin.handle.lock().await.insert(admin::handle); // Setup shutdown/signal handling + server.stopping.store(false, Ordering::Release); let handle = ServerHandle::new(); - _ = server - .shutdown - .lock() - .expect("locked") - .insert(handle.clone()); - - server.interrupt.store(false, Ordering::Release); let (tx, _) = broadcast::channel::<()>(1); - let sigs = server.runtime().spawn(signal(server.clone(), tx.clone())); + let sigs = server + .runtime() + .spawn(signal(server.clone(), tx.clone(), handle.clone())); // Serve clients let res = serve::serve(&server, app, handle, tx.subscribe()).await; @@ -43,10 +39,6 @@ pub(crate) async fn run(server: Arc) -> Result<(), Error> { sigs.abort(); _ = sigs.await; - // Reset the axum handle instance; this should be reusable and might be - // reload-survivable but better to be safe than sorry. - _ = server.shutdown.lock().expect("locked").take(); - // Remove the admin room callback _ = services().admin.handle.lock().await.take(); @@ -76,7 +68,7 @@ pub(crate) async fn stop(_server: Arc) -> Result<(), Error> { // Wait for all completions before dropping or we'll lose them to the module // unload and explode. - services().shutdown().await; + services().stop().await; // Deactivate services(). Any further use will panic the caller. service::fini(); @@ -90,7 +82,7 @@ pub(crate) async fn stop(_server: Arc) -> Result<(), Error> { } #[tracing::instrument(skip_all)] -async fn signal(server: Arc, tx: Sender<()>) { +async fn signal(server: Arc, tx: Sender<()>, handle: axum_server::Handle) { let sig: &'static str = server .signal .subscribe() @@ -99,26 +91,24 @@ async fn signal(server: Arc, tx: Sender<()>) { .expect("channel error"); debug!("Received signal {}", sig); - if sig == "Ctrl+C" { + if sig == "SIGINT" { let reload = cfg!(unix) && cfg!(debug_assertions); - server.reload.store(reload, Ordering::Release); + server.reloading.store(reload, Ordering::Release); } - server.interrupt.store(true, Ordering::Release); + server.stopping.store(true, Ordering::Release); services().globals.rotate.fire(); if let Err(e) = tx.send(()) { error!("failed sending shutdown transaction to channel: {e}"); } - if let Some(handle) = server.shutdown.lock().expect("locked").as_ref() { - let pending = server.requests_spawn_active.load(Ordering::Relaxed); - if pending > 0 { - let timeout = Duration::from_secs(36); - trace!(pending, ?timeout, "Notifying for graceful shutdown"); - handle.graceful_shutdown(Some(timeout)); - } else { - debug!(pending, "Notifying for immediate shutdown"); - handle.shutdown(); - } + let pending = server.requests_spawn_active.load(Ordering::Relaxed); + if pending > 0 { + let timeout = Duration::from_secs(36); + trace!(pending, ?timeout, "Notifying for graceful shutdown"); + handle.graceful_shutdown(Some(timeout)); + } else { + debug!(pending, "Notifying for immediate shutdown"); + handle.shutdown(); } } diff --git a/src/service/services.rs b/src/service/services.rs index 4883d574..a62fa7e1 100644 --- a/src/service/services.rs +++ b/src/service/services.rs @@ -301,7 +301,7 @@ bad_signature_ratelimiter: {bad_signature_ratelimiter} pub async fn interrupt(&self) { trace!("Interrupting services..."); - self.server.interrupt.store(true, atomic::Ordering::Release); + self.server.stopping.store(true, atomic::Ordering::Release); self.globals.rotate.fire(); self.sending.interrupt(); @@ -312,7 +312,7 @@ bad_signature_ratelimiter: {bad_signature_ratelimiter} } #[tracing::instrument(skip_all)] - pub async fn shutdown(&self) { + pub async fn stop(&self) { info!("Shutting down services"); self.interrupt().await;