isolate axum shutdown in router; minor run-cycle/signalling tweaks
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
e4aa20ebeb
commit
0e74ade7d7
8 changed files with 57 additions and 72 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -658,7 +658,6 @@ version = "0.4.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"argon2",
|
"argon2",
|
||||||
"axum 0.7.5",
|
"axum 0.7.5",
|
||||||
"axum-server",
|
|
||||||
"bytes",
|
"bytes",
|
||||||
"either",
|
"either",
|
||||||
"figment",
|
"figment",
|
||||||
|
|
|
@ -61,7 +61,6 @@ sentry_telemetry = []
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
argon2.workspace = true
|
argon2.workspace = true
|
||||||
axum-server.workspace = true
|
|
||||||
axum.workspace = true
|
axum.workspace = true
|
||||||
bytes.workspace = true
|
bytes.workspace = true
|
||||||
either.workspace = true
|
either.workspace = true
|
||||||
|
|
|
@ -1,8 +1,5 @@
|
||||||
use std::{
|
use std::{
|
||||||
sync::{
|
sync::atomic::{AtomicBool, AtomicU32},
|
||||||
atomic::{AtomicBool, AtomicU32},
|
|
||||||
Mutex,
|
|
||||||
},
|
|
||||||
time::SystemTime,
|
time::SystemTime,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -18,24 +15,20 @@ pub struct Server {
|
||||||
/// Timestamp server was started; used for uptime.
|
/// Timestamp server was started; used for uptime.
|
||||||
pub started: SystemTime,
|
pub started: SystemTime,
|
||||||
|
|
||||||
/// Reload/shutdown signal channel. Called from the signal handler or admin
|
/// Reload/shutdown pending indicator; server is shutting down. This is an
|
||||||
/// command to initiate shutdown.
|
/// observable used on shutdown and should not be modified.
|
||||||
pub shutdown: Mutex<Option<axum_server::Handle>>,
|
pub stopping: AtomicBool,
|
||||||
|
|
||||||
/// Reload/shutdown signal
|
|
||||||
pub signal: broadcast::Sender<&'static str>,
|
|
||||||
|
|
||||||
/// Reload/shutdown desired indicator; when false, shutdown is desired. This
|
/// Reload/shutdown desired indicator; when false, shutdown is desired. This
|
||||||
/// is an observable used on shutdown and modifying is not recommended.
|
/// is an observable used on shutdown and modifying is not recommended.
|
||||||
pub reload: AtomicBool,
|
pub reloading: AtomicBool,
|
||||||
|
|
||||||
/// Reload/shutdown pending indicator; server is shutting down. This is an
|
|
||||||
/// observable used on shutdown and should not be modified.
|
|
||||||
pub interrupt: AtomicBool,
|
|
||||||
|
|
||||||
/// Handle to the runtime
|
/// Handle to the runtime
|
||||||
pub runtime: Option<runtime::Handle>,
|
pub runtime: Option<runtime::Handle>,
|
||||||
|
|
||||||
|
/// Reload/shutdown signal
|
||||||
|
pub signal: broadcast::Sender<&'static str>,
|
||||||
|
|
||||||
/// Log level reload handles.
|
/// Log level reload handles.
|
||||||
pub tracing_reload_handle: LogLevelReloadHandles,
|
pub tracing_reload_handle: LogLevelReloadHandles,
|
||||||
|
|
||||||
|
@ -53,11 +46,10 @@ impl Server {
|
||||||
Self {
|
Self {
|
||||||
config,
|
config,
|
||||||
started: SystemTime::now(),
|
started: SystemTime::now(),
|
||||||
shutdown: Mutex::new(None),
|
stopping: AtomicBool::new(false),
|
||||||
signal: broadcast::channel::<&'static str>(1).0,
|
reloading: AtomicBool::new(false),
|
||||||
reload: AtomicBool::new(false),
|
|
||||||
interrupt: AtomicBool::new(false),
|
|
||||||
runtime,
|
runtime,
|
||||||
|
signal: broadcast::channel::<&'static str>(1).0,
|
||||||
tracing_reload_handle,
|
tracing_reload_handle,
|
||||||
requests_spawn_active: AtomicU32::new(0),
|
requests_spawn_active: AtomicU32::new(0),
|
||||||
requests_spawn_finished: AtomicU32::new(0),
|
requests_spawn_finished: AtomicU32::new(0),
|
||||||
|
|
|
@ -6,10 +6,9 @@ extern crate conduit_core as conduit;
|
||||||
|
|
||||||
use std::{cmp, sync::Arc, time::Duration};
|
use std::{cmp, sync::Arc, time::Duration};
|
||||||
|
|
||||||
use conduit::{debug_error, debug_info, error, utils::available_parallelism, warn, Error, Result};
|
use conduit::{debug_error, debug_info, error, trace, utils::available_parallelism, warn, Error, Result};
|
||||||
use server::Server;
|
use server::Server;
|
||||||
use tokio::{runtime, signal};
|
use tokio::{runtime, signal};
|
||||||
use tracing::debug;
|
|
||||||
|
|
||||||
const WORKER_NAME: &str = "conduwuit:worker";
|
const WORKER_NAME: &str = "conduwuit:worker";
|
||||||
const WORKER_MIN: usize = 2;
|
const WORKER_MIN: usize = 2;
|
||||||
|
@ -97,33 +96,39 @@ async fn async_main(server: &Arc<Server>) -> Result<(), Error> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(unix)]
|
||||||
#[tracing::instrument(skip_all)]
|
#[tracing::instrument(skip_all)]
|
||||||
async fn signal(server: Arc<Server>) {
|
async fn signal(server: Arc<Server>) {
|
||||||
let (mut term, mut quit);
|
|
||||||
#[cfg(unix)]
|
|
||||||
{
|
|
||||||
use signal::unix;
|
use signal::unix;
|
||||||
quit = unix::signal(unix::SignalKind::quit()).expect("SIGQUIT handler");
|
let mut quit = unix::signal(unix::SignalKind::quit()).expect("SIGQUIT handler");
|
||||||
term = unix::signal(unix::SignalKind::terminate()).expect("SIGTERM handler");
|
let mut term = unix::signal(unix::SignalKind::terminate()).expect("SIGTERM handler");
|
||||||
};
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
debug!("Installed signal handlers");
|
trace!("Installed signal handlers");
|
||||||
let sig: &'static str;
|
let sig: &'static str;
|
||||||
#[cfg(unix)]
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
|
_ = signal::ctrl_c() => { sig = "SIGINT"; },
|
||||||
|
_ = quit.recv() => { sig = "SIGQUIT"; },
|
||||||
_ = term.recv() => { sig = "SIGTERM"; },
|
_ = term.recv() => { sig = "SIGTERM"; },
|
||||||
_ = quit.recv() => { sig = "Ctrl+\\"; },
|
|
||||||
_ = signal::ctrl_c() => { sig = "Ctrl+C"; },
|
|
||||||
}
|
|
||||||
#[cfg(not(unix))]
|
|
||||||
tokio::select! {
|
|
||||||
_ = signal::ctrl_c() => { sig = "Ctrl+C"; },
|
|
||||||
}
|
}
|
||||||
|
|
||||||
warn!("Received signal {}", sig);
|
warn!("Received {sig}");
|
||||||
if let Err(e) = server.server.signal.send(sig) {
|
if let Err(e) = server.server.signal.send(sig) {
|
||||||
debug_error!("signal channel: {e}");
|
debug_error!("signal channel: {e}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(not(unix))]
|
||||||
|
#[tracing::instrument(skip_all)]
|
||||||
|
async fn signal(server: Arc<Server>) {
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
_ = signal::ctrl_c() => {
|
||||||
|
warn!("Received Ctrl+C");
|
||||||
|
if let Err(e) = server.server.signal.send("SIGINT") {
|
||||||
|
debug_error!("signal channel: {e}");
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -44,7 +44,7 @@ pub(crate) async fn run(server: &Arc<Server>, starts: bool) -> Result<(bool, boo
|
||||||
error!("Running server: {error}");
|
error!("Running server: {error}");
|
||||||
return Err(error);
|
return Err(error);
|
||||||
}
|
}
|
||||||
let reloads = server.server.reload.swap(false, Ordering::AcqRel);
|
let reloads = server.server.reloading.swap(false, Ordering::AcqRel);
|
||||||
let stops = !reloads || stale(server).await? <= restart_thresh();
|
let stops = !reloads || stale(server).await? <= restart_thresh();
|
||||||
let starts = reloads && stops;
|
let starts = reloads && stops;
|
||||||
if stops {
|
if stops {
|
||||||
|
|
|
@ -13,7 +13,7 @@ use tracing::{debug, error, trace};
|
||||||
pub(crate) async fn spawn(
|
pub(crate) async fn spawn(
|
||||||
State(server): State<Arc<Server>>, req: http::Request<axum::body::Body>, next: axum::middleware::Next,
|
State(server): State<Arc<Server>>, req: http::Request<axum::body::Body>, next: axum::middleware::Next,
|
||||||
) -> Result<axum::response::Response, StatusCode> {
|
) -> Result<axum::response::Response, StatusCode> {
|
||||||
if server.interrupt.load(Ordering::Relaxed) {
|
if server.stopping.load(Ordering::Relaxed) {
|
||||||
debug_warn!("unavailable pending shutdown");
|
debug_warn!("unavailable pending shutdown");
|
||||||
return Err(StatusCode::SERVICE_UNAVAILABLE);
|
return Err(StatusCode::SERVICE_UNAVAILABLE);
|
||||||
}
|
}
|
||||||
|
@ -35,7 +35,7 @@ pub(crate) async fn spawn(
|
||||||
pub(crate) async fn handle(
|
pub(crate) async fn handle(
|
||||||
State(server): State<Arc<Server>>, req: http::Request<axum::body::Body>, next: axum::middleware::Next,
|
State(server): State<Arc<Server>>, req: http::Request<axum::body::Body>, next: axum::middleware::Next,
|
||||||
) -> Result<axum::response::Response, StatusCode> {
|
) -> Result<axum::response::Response, StatusCode> {
|
||||||
if server.interrupt.load(Ordering::Relaxed) {
|
if server.stopping.load(Ordering::Relaxed) {
|
||||||
debug_warn!(
|
debug_warn!(
|
||||||
method = %req.method(),
|
method = %req.method(),
|
||||||
uri = %req.uri(),
|
uri = %req.uri(),
|
||||||
|
|
|
@ -25,16 +25,12 @@ pub(crate) async fn run(server: Arc<Server>) -> Result<(), Error> {
|
||||||
_ = services().admin.handle.lock().await.insert(admin::handle);
|
_ = services().admin.handle.lock().await.insert(admin::handle);
|
||||||
|
|
||||||
// Setup shutdown/signal handling
|
// Setup shutdown/signal handling
|
||||||
|
server.stopping.store(false, Ordering::Release);
|
||||||
let handle = ServerHandle::new();
|
let handle = ServerHandle::new();
|
||||||
_ = server
|
|
||||||
.shutdown
|
|
||||||
.lock()
|
|
||||||
.expect("locked")
|
|
||||||
.insert(handle.clone());
|
|
||||||
|
|
||||||
server.interrupt.store(false, Ordering::Release);
|
|
||||||
let (tx, _) = broadcast::channel::<()>(1);
|
let (tx, _) = broadcast::channel::<()>(1);
|
||||||
let sigs = server.runtime().spawn(signal(server.clone(), tx.clone()));
|
let sigs = server
|
||||||
|
.runtime()
|
||||||
|
.spawn(signal(server.clone(), tx.clone(), handle.clone()));
|
||||||
|
|
||||||
// Serve clients
|
// Serve clients
|
||||||
let res = serve::serve(&server, app, handle, tx.subscribe()).await;
|
let res = serve::serve(&server, app, handle, tx.subscribe()).await;
|
||||||
|
@ -43,10 +39,6 @@ pub(crate) async fn run(server: Arc<Server>) -> Result<(), Error> {
|
||||||
sigs.abort();
|
sigs.abort();
|
||||||
_ = sigs.await;
|
_ = sigs.await;
|
||||||
|
|
||||||
// Reset the axum handle instance; this should be reusable and might be
|
|
||||||
// reload-survivable but better to be safe than sorry.
|
|
||||||
_ = server.shutdown.lock().expect("locked").take();
|
|
||||||
|
|
||||||
// Remove the admin room callback
|
// Remove the admin room callback
|
||||||
_ = services().admin.handle.lock().await.take();
|
_ = services().admin.handle.lock().await.take();
|
||||||
|
|
||||||
|
@ -76,7 +68,7 @@ pub(crate) async fn stop(_server: Arc<Server>) -> Result<(), Error> {
|
||||||
|
|
||||||
// Wait for all completions before dropping or we'll lose them to the module
|
// Wait for all completions before dropping or we'll lose them to the module
|
||||||
// unload and explode.
|
// unload and explode.
|
||||||
services().shutdown().await;
|
services().stop().await;
|
||||||
// Deactivate services(). Any further use will panic the caller.
|
// Deactivate services(). Any further use will panic the caller.
|
||||||
service::fini();
|
service::fini();
|
||||||
|
|
||||||
|
@ -90,7 +82,7 @@ pub(crate) async fn stop(_server: Arc<Server>) -> Result<(), Error> {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip_all)]
|
#[tracing::instrument(skip_all)]
|
||||||
async fn signal(server: Arc<Server>, tx: Sender<()>) {
|
async fn signal(server: Arc<Server>, tx: Sender<()>, handle: axum_server::Handle) {
|
||||||
let sig: &'static str = server
|
let sig: &'static str = server
|
||||||
.signal
|
.signal
|
||||||
.subscribe()
|
.subscribe()
|
||||||
|
@ -99,18 +91,17 @@ async fn signal(server: Arc<Server>, tx: Sender<()>) {
|
||||||
.expect("channel error");
|
.expect("channel error");
|
||||||
|
|
||||||
debug!("Received signal {}", sig);
|
debug!("Received signal {}", sig);
|
||||||
if sig == "Ctrl+C" {
|
if sig == "SIGINT" {
|
||||||
let reload = cfg!(unix) && cfg!(debug_assertions);
|
let reload = cfg!(unix) && cfg!(debug_assertions);
|
||||||
server.reload.store(reload, Ordering::Release);
|
server.reloading.store(reload, Ordering::Release);
|
||||||
}
|
}
|
||||||
|
|
||||||
server.interrupt.store(true, Ordering::Release);
|
server.stopping.store(true, Ordering::Release);
|
||||||
services().globals.rotate.fire();
|
services().globals.rotate.fire();
|
||||||
if let Err(e) = tx.send(()) {
|
if let Err(e) = tx.send(()) {
|
||||||
error!("failed sending shutdown transaction to channel: {e}");
|
error!("failed sending shutdown transaction to channel: {e}");
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(handle) = server.shutdown.lock().expect("locked").as_ref() {
|
|
||||||
let pending = server.requests_spawn_active.load(Ordering::Relaxed);
|
let pending = server.requests_spawn_active.load(Ordering::Relaxed);
|
||||||
if pending > 0 {
|
if pending > 0 {
|
||||||
let timeout = Duration::from_secs(36);
|
let timeout = Duration::from_secs(36);
|
||||||
|
@ -120,5 +111,4 @@ async fn signal(server: Arc<Server>, tx: Sender<()>) {
|
||||||
debug!(pending, "Notifying for immediate shutdown");
|
debug!(pending, "Notifying for immediate shutdown");
|
||||||
handle.shutdown();
|
handle.shutdown();
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -301,7 +301,7 @@ bad_signature_ratelimiter: {bad_signature_ratelimiter}
|
||||||
|
|
||||||
pub async fn interrupt(&self) {
|
pub async fn interrupt(&self) {
|
||||||
trace!("Interrupting services...");
|
trace!("Interrupting services...");
|
||||||
self.server.interrupt.store(true, atomic::Ordering::Release);
|
self.server.stopping.store(true, atomic::Ordering::Release);
|
||||||
|
|
||||||
self.globals.rotate.fire();
|
self.globals.rotate.fire();
|
||||||
self.sending.interrupt();
|
self.sending.interrupt();
|
||||||
|
@ -312,7 +312,7 @@ bad_signature_ratelimiter: {bad_signature_ratelimiter}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip_all)]
|
#[tracing::instrument(skip_all)]
|
||||||
pub async fn shutdown(&self) {
|
pub async fn stop(&self) {
|
||||||
info!("Shutting down services");
|
info!("Shutting down services");
|
||||||
self.interrupt().await;
|
self.interrupt().await;
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue