Merge branch 'fixjoin' into 'next'

fix: let requests continue event if client disconnects

See merge request famedly/conduit!464
This commit is contained in:
Timo Kösters 2023-03-18 07:59:36 +00:00
commit f704169aeb
2 changed files with 32 additions and 10 deletions

View file

@ -7,7 +7,7 @@
#![allow(clippy::suspicious_else_formatting)] #![allow(clippy::suspicious_else_formatting)]
#![deny(clippy::dbg_macro)] #![deny(clippy::dbg_macro)]
use std::{future::Future, io, net::SocketAddr, time::Duration}; use std::{future::Future, io, net::SocketAddr, sync::atomic, time::Duration};
use axum::{ use axum::{
extract::{DefaultBodyLimit, FromRequest, MatchedPath}, extract::{DefaultBodyLimit, FromRequest, MatchedPath},
@ -147,6 +147,7 @@ async fn run_server() -> io::Result<()> {
let middlewares = ServiceBuilder::new() let middlewares = ServiceBuilder::new()
.sensitive_headers([header::AUTHORIZATION]) .sensitive_headers([header::AUTHORIZATION])
.layer(axum::middleware::from_fn(spawn_task))
.layer( .layer(
TraceLayer::new_for_http().make_span_with(|request: &http::Request<_>| { TraceLayer::new_for_http().make_span_with(|request: &http::Request<_>| {
let path = if let Some(path) = request.extensions().get::<MatchedPath>() { let path = if let Some(path) = request.extensions().get::<MatchedPath>() {
@ -211,16 +212,21 @@ async fn run_server() -> io::Result<()> {
} }
} }
// On shutdown
info!(target: "shutdown-sync", "Received shutdown notification, notifying sync helpers...");
services().globals.rotate.fire();
#[cfg(feature = "systemd")]
let _ = sd_notify::notify(true, &[sd_notify::NotifyState::Stopping]);
Ok(()) Ok(())
} }
async fn spawn_task<B: Send + 'static>(
req: axum::http::Request<B>,
next: axum::middleware::Next<B>,
) -> std::result::Result<axum::response::Response, StatusCode> {
if services().globals.shutdown.load(atomic::Ordering::Relaxed) {
return Err(StatusCode::SERVICE_UNAVAILABLE);
}
tokio::spawn(next.run(req))
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)
}
async fn unrecognized_method<B>( async fn unrecognized_method<B>(
req: axum::http::Request<B>, req: axum::http::Request<B>,
next: axum::middleware::Next<B>, next: axum::middleware::Next<B>,
@ -442,6 +448,11 @@ async fn shutdown_signal(handle: ServerHandle) {
warn!("Received {}, shutting down...", sig); warn!("Received {}, shutting down...", sig);
handle.graceful_shutdown(Some(Duration::from_secs(30))); handle.graceful_shutdown(Some(Duration::from_secs(30)));
services().globals.shutdown();
#[cfg(feature = "systemd")]
let _ = sd_notify::notify(true, &[sd_notify::NotifyState::Stopping]);
} }
async fn not_found(uri: Uri) -> impl IntoResponse { async fn not_found(uri: Uri) -> impl IntoResponse {

View file

@ -6,7 +6,7 @@ use ruma::{
use crate::api::server_server::FedDest; use crate::api::server_server::FedDest;
use crate::{Config, Error, Result}; use crate::{services, Config, Error, Result};
use ruma::{ use ruma::{
api::{ api::{
client::sync::sync_events, client::sync::sync_events,
@ -14,6 +14,7 @@ use ruma::{
}, },
DeviceId, RoomVersionId, ServerName, UserId, DeviceId, RoomVersionId, ServerName, UserId,
}; };
use std::sync::atomic::{self, AtomicBool};
use std::{ use std::{
collections::{BTreeMap, HashMap}, collections::{BTreeMap, HashMap},
fs, fs,
@ -24,7 +25,7 @@ use std::{
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use tokio::sync::{broadcast, watch::Receiver, Mutex as TokioMutex, Semaphore}; use tokio::sync::{broadcast, watch::Receiver, Mutex as TokioMutex, Semaphore};
use tracing::error; use tracing::{error, info};
use trust_dns_resolver::TokioAsyncResolver; use trust_dns_resolver::TokioAsyncResolver;
type WellKnownMap = HashMap<OwnedServerName, (FedDest, String)>; type WellKnownMap = HashMap<OwnedServerName, (FedDest, String)>;
@ -58,6 +59,8 @@ pub struct Service {
pub roomid_federationhandletime: RwLock<HashMap<OwnedRoomId, (OwnedEventId, Instant)>>, pub roomid_federationhandletime: RwLock<HashMap<OwnedRoomId, (OwnedEventId, Instant)>>,
pub stateres_mutex: Arc<Mutex<()>>, pub stateres_mutex: Arc<Mutex<()>>,
pub rotate: RotationHandler, pub rotate: RotationHandler,
pub shutdown: AtomicBool,
} }
/// Handles "rotation" of long-polling requests. "Rotation" in this context is similar to "rotation" of log files and the like. /// Handles "rotation" of long-polling requests. "Rotation" in this context is similar to "rotation" of log files and the like.
@ -160,6 +163,7 @@ impl Service {
stateres_mutex: Arc::new(Mutex::new(())), stateres_mutex: Arc::new(Mutex::new(())),
sync_receivers: RwLock::new(HashMap::new()), sync_receivers: RwLock::new(HashMap::new()),
rotate: RotationHandler::new(), rotate: RotationHandler::new(),
shutdown: AtomicBool::new(false),
}; };
fs::create_dir_all(s.get_media_folder())?; fs::create_dir_all(s.get_media_folder())?;
@ -341,6 +345,13 @@ impl Service {
r.push(base64::encode_config(key, base64::URL_SAFE_NO_PAD)); r.push(base64::encode_config(key, base64::URL_SAFE_NO_PAD));
r r
} }
pub fn shutdown(&self) {
self.shutdown.store(true, atomic::Ordering::Relaxed);
// On shutdown
info!(target: "shutdown-sync", "Received shutdown notification, notifying sync helpers...");
services().globals.rotate.fire();
}
} }
fn reqwest_client_builder(config: &Config) -> Result<reqwest::ClientBuilder> { fn reqwest_client_builder(config: &Config) -> Result<reqwest::ClientBuilder> {