eliminate RotationHandler
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
0e74ade7d7
commit
6e59135a7d
5 changed files with 12 additions and 37 deletions
|
@ -1,5 +1,5 @@
|
||||||
use std::{
|
use std::{
|
||||||
sync::atomic::{AtomicBool, AtomicU32},
|
sync::atomic::{AtomicBool, AtomicU32, Ordering},
|
||||||
time::SystemTime,
|
time::SystemTime,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -65,4 +65,7 @@ impl Server {
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.expect("runtime handle available in Server")
|
.expect("runtime handle available in Server")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
pub fn running(&self) -> bool { !self.stopping.load(Ordering::Acquire) }
|
||||||
}
|
}
|
||||||
|
|
|
@ -97,7 +97,6 @@ async fn signal(server: Arc<Server>, tx: Sender<()>, handle: axum_server::Handle
|
||||||
}
|
}
|
||||||
|
|
||||||
server.stopping.store(true, Ordering::Release);
|
server.stopping.store(true, Ordering::Release);
|
||||||
services().globals.rotate.fire();
|
|
||||||
if let Err(e) = tx.send(()) {
|
if let Err(e) = tx.send(()) {
|
||||||
error!("failed sending shutdown transaction to channel: {e}");
|
error!("failed sending shutdown transaction to channel: {e}");
|
||||||
}
|
}
|
||||||
|
|
|
@ -159,7 +159,13 @@ impl Data for KeyValueDatabase {
|
||||||
// One time keys
|
// One time keys
|
||||||
futures.push(self.userid_lastonetimekeyupdate.watch_prefix(&userid_bytes));
|
futures.push(self.userid_lastonetimekeyupdate.watch_prefix(&userid_bytes));
|
||||||
|
|
||||||
futures.push(Box::pin(services().globals.rotate.watch()));
|
futures.push(Box::pin(async move {
|
||||||
|
let _result = services().server.signal.subscribe().recv().await;
|
||||||
|
}));
|
||||||
|
|
||||||
|
if !services().server.running() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
// Wait until one of them finds something
|
// Wait until one of them finds something
|
||||||
trace!(futures = futures.len(), "watch started");
|
trace!(futures = futures.len(), "watch started");
|
||||||
|
|
|
@ -8,7 +8,6 @@ pub(super) mod updates;
|
||||||
use std::{
|
use std::{
|
||||||
collections::{BTreeMap, HashMap},
|
collections::{BTreeMap, HashMap},
|
||||||
fs,
|
fs,
|
||||||
future::Future,
|
|
||||||
path::PathBuf,
|
path::PathBuf,
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
time::Instant,
|
time::Instant,
|
||||||
|
@ -29,7 +28,7 @@ use ruma::{
|
||||||
ServerName, UserId,
|
ServerName, UserId,
|
||||||
};
|
};
|
||||||
use tokio::{
|
use tokio::{
|
||||||
sync::{broadcast, Mutex, RwLock},
|
sync::{Mutex, RwLock},
|
||||||
task::JoinHandle,
|
task::JoinHandle,
|
||||||
};
|
};
|
||||||
use tracing::{error, trace};
|
use tracing::{error, trace};
|
||||||
|
@ -59,36 +58,6 @@ pub struct Service {
|
||||||
pub roomid_federationhandletime: RwLock<HashMap<OwnedRoomId, (OwnedEventId, Instant)>>,
|
pub roomid_federationhandletime: RwLock<HashMap<OwnedRoomId, (OwnedEventId, Instant)>>,
|
||||||
pub updates_handle: Mutex<Option<JoinHandle<()>>>,
|
pub updates_handle: Mutex<Option<JoinHandle<()>>>,
|
||||||
pub stateres_mutex: Arc<Mutex<()>>,
|
pub stateres_mutex: Arc<Mutex<()>>,
|
||||||
pub rotate: RotationHandler,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Handles "rotation" of long-polling requests. "Rotation" in this context is
|
|
||||||
/// similar to "rotation" of log files and the like.
|
|
||||||
///
|
|
||||||
/// This is utilized to have sync workers return early and release read locks on
|
|
||||||
/// the database.
|
|
||||||
pub struct RotationHandler(broadcast::Sender<()>, ());
|
|
||||||
|
|
||||||
impl RotationHandler {
|
|
||||||
fn new() -> Self {
|
|
||||||
let (s, _r) = broadcast::channel(1);
|
|
||||||
Self(s, ())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn watch(&self) -> impl Future<Output = ()> {
|
|
||||||
let mut r = self.0.subscribe();
|
|
||||||
#[allow(clippy::let_underscore_must_use)]
|
|
||||||
async move {
|
|
||||||
_ = r.recv().await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[allow(clippy::let_underscore_must_use)]
|
|
||||||
pub fn fire(&self) { _ = self.0.send(()); }
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for RotationHandler {
|
|
||||||
fn default() -> Self { Self::new() }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Service {
|
impl Service {
|
||||||
|
@ -149,7 +118,6 @@ impl Service {
|
||||||
roomid_federationhandletime: RwLock::new(HashMap::new()),
|
roomid_federationhandletime: RwLock::new(HashMap::new()),
|
||||||
updates_handle: Mutex::new(None),
|
updates_handle: Mutex::new(None),
|
||||||
stateres_mutex: Arc::new(Mutex::new(())),
|
stateres_mutex: Arc::new(Mutex::new(())),
|
||||||
rotate: RotationHandler::new(),
|
|
||||||
};
|
};
|
||||||
|
|
||||||
fs::create_dir_all(s.get_media_folder())?;
|
fs::create_dir_all(s.get_media_folder())?;
|
||||||
|
|
|
@ -303,7 +303,6 @@ bad_signature_ratelimiter: {bad_signature_ratelimiter}
|
||||||
trace!("Interrupting services...");
|
trace!("Interrupting services...");
|
||||||
self.server.stopping.store(true, atomic::Ordering::Release);
|
self.server.stopping.store(true, atomic::Ordering::Release);
|
||||||
|
|
||||||
self.globals.rotate.fire();
|
|
||||||
self.sending.interrupt();
|
self.sending.interrupt();
|
||||||
self.presence.interrupt();
|
self.presence.interrupt();
|
||||||
self.admin.interrupt();
|
self.admin.interrupt();
|
||||||
|
|
Loading…
Add table
Reference in a new issue