split service manager into unit
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
5be679e17b
commit
38c989a07e
3 changed files with 179 additions and 137 deletions
159
src/service/manager.rs
Normal file
159
src/service/manager.rs
Normal file
|
@ -0,0 +1,159 @@
|
|||
use std::{panic::AssertUnwindSafe, sync::Arc, time::Duration};
|
||||
|
||||
use conduit::{debug, debug_warn, error, trace, utils::time, warn, Error, Result, Server};
|
||||
use futures_util::FutureExt;
|
||||
use tokio::{
|
||||
sync::{Mutex, MutexGuard},
|
||||
task::{JoinHandle, JoinSet},
|
||||
time::sleep,
|
||||
};
|
||||
|
||||
use crate::{service::Service, Services};
|
||||
|
||||
pub(crate) struct Manager {
|
||||
manager: Mutex<Option<JoinHandle<Result<()>>>>,
|
||||
workers: Mutex<Workers>,
|
||||
server: Arc<Server>,
|
||||
services: &'static Services,
|
||||
}
|
||||
|
||||
type Workers = JoinSet<WorkerResult>;
|
||||
type WorkerResult = (Arc<dyn Service>, Result<()>);
|
||||
type WorkersLocked<'a> = MutexGuard<'a, Workers>;
|
||||
|
||||
const RESTART_DELAY_MS: u64 = 2500;
|
||||
|
||||
impl Manager {
|
||||
pub(super) fn new(services: &Services) -> Arc<Self> {
|
||||
Arc::new(Self {
|
||||
manager: Mutex::new(None),
|
||||
workers: Mutex::new(JoinSet::new()),
|
||||
server: services.server.clone(),
|
||||
services: crate::services(),
|
||||
})
|
||||
}
|
||||
|
||||
pub(super) async fn poll(&self) -> Result<()> {
|
||||
if let Some(manager) = &mut *self.manager.lock().await {
|
||||
trace!("Polling service manager...");
|
||||
return manager.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(super) async fn start(self: Arc<Self>) -> Result<()> {
|
||||
let mut workers = self.workers.lock().await;
|
||||
|
||||
debug!("Starting service manager...");
|
||||
let self_ = self.clone();
|
||||
_ = self.manager.lock().await.insert(
|
||||
self.server
|
||||
.runtime()
|
||||
.spawn(async move { self_.worker().await }),
|
||||
);
|
||||
|
||||
debug!("Starting service workers...");
|
||||
for service in self.services.service.values() {
|
||||
self.start_worker(&mut workers, service).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(super) async fn stop(&self) {
|
||||
if let Some(manager) = self.manager.lock().await.take() {
|
||||
debug!("Waiting for service manager...");
|
||||
if let Err(e) = manager.await {
|
||||
error!("Manager shutdown error: {e:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn worker(&self) -> Result<()> {
|
||||
loop {
|
||||
let mut workers = self.workers.lock().await;
|
||||
tokio::select! {
|
||||
result = workers.join_next() => match result {
|
||||
Some(Ok(result)) => self.handle_result(&mut workers, result).await?,
|
||||
Some(Err(error)) => self.handle_abort(&mut workers, Error::from(error)).await?,
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
debug!("Worker manager finished");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_abort(&self, _workers: &mut WorkersLocked<'_>, error: Error) -> Result<()> {
|
||||
// not supported until service can be associated with abort
|
||||
unimplemented!("unexpected worker task abort {error:?}");
|
||||
}
|
||||
|
||||
async fn handle_result(&self, workers: &mut WorkersLocked<'_>, result: WorkerResult) -> Result<()> {
|
||||
let (service, result) = result;
|
||||
match result {
|
||||
Ok(()) => self.handle_finished(workers, &service).await,
|
||||
Err(error) => self.handle_error(workers, &service, error).await,
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_finished(&self, _workers: &mut WorkersLocked<'_>, service: &Arc<dyn Service>) -> Result<()> {
|
||||
debug!("service {:?} worker finished", service.name());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_error(
|
||||
&self, workers: &mut WorkersLocked<'_>, service: &Arc<dyn Service>, error: Error,
|
||||
) -> Result<()> {
|
||||
let name = service.name();
|
||||
error!("service {name:?} aborted: {error}");
|
||||
|
||||
if !self.server.running() {
|
||||
debug_warn!("service {name:?} error ignored on shutdown.");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if !error.is_panic() {
|
||||
return Err(error);
|
||||
}
|
||||
|
||||
let delay = Duration::from_millis(RESTART_DELAY_MS);
|
||||
warn!("service {name:?} worker restarting after {} delay", time::pretty(delay));
|
||||
sleep(delay).await;
|
||||
|
||||
self.start_worker(workers, service).await
|
||||
}
|
||||
|
||||
/// Start the worker in a task for the service.
|
||||
async fn start_worker(&self, workers: &mut WorkersLocked<'_>, service: &Arc<dyn Service>) -> Result<()> {
|
||||
if !self.server.running() {
|
||||
return Err(Error::Err(format!(
|
||||
"Service {:?} worker not starting during server shutdown.",
|
||||
service.name()
|
||||
)));
|
||||
}
|
||||
|
||||
debug!("Service {:?} worker starting...", service.name());
|
||||
workers.spawn_on(worker(service.clone()), self.server.runtime());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Base frame for service worker. This runs in a tokio::task. All errors and
|
||||
/// panics from the worker are caught and returned cleanly. The JoinHandle
|
||||
/// should never error with a panic, and if so it should propagate, but it may
|
||||
/// error with an Abort which the manager should handle along with results to
|
||||
/// determine if the worker should be restarted.
|
||||
async fn worker(service: Arc<dyn Service>) -> WorkerResult {
|
||||
let service_ = Arc::clone(&service);
|
||||
let result = AssertUnwindSafe(service_.worker())
|
||||
.catch_unwind()
|
||||
.await
|
||||
.map_err(Error::from_panic);
|
||||
|
||||
// flattens JoinError for panic into worker's Error
|
||||
(service, result.unwrap_or_else(Err))
|
||||
}
|
|
@ -1,5 +1,6 @@
|
|||
#![allow(refining_impl_trait)]
|
||||
|
||||
mod manager;
|
||||
mod service;
|
||||
pub mod services;
|
||||
|
||||
|
|
|
@ -1,16 +1,13 @@
|
|||
use std::{collections::BTreeMap, fmt::Write, ops::DerefMut, panic::AssertUnwindSafe, sync::Arc, time::Duration};
|
||||
use std::{collections::BTreeMap, fmt::Write, sync::Arc};
|
||||
|
||||
use conduit::{debug, debug_info, debug_warn, error, info, trace, utils::time, warn, Error, Result, Server};
|
||||
use conduit::{debug, debug_info, info, trace, Result, Server};
|
||||
use database::Database;
|
||||
use futures_util::FutureExt;
|
||||
use tokio::{
|
||||
sync::{Mutex, MutexGuard},
|
||||
task::{JoinHandle, JoinSet},
|
||||
time::sleep,
|
||||
};
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use crate::{
|
||||
account_data, admin, appservice, globals, key_backups, media, presence, pusher, rooms, sending,
|
||||
account_data, admin, appservice, globals, key_backups,
|
||||
manager::Manager,
|
||||
media, presence, pusher, rooms, sending,
|
||||
service::{Args, Map, Service},
|
||||
transaction_ids, uiaa, users,
|
||||
};
|
||||
|
@ -30,19 +27,12 @@ pub struct Services {
|
|||
pub media: Arc<media::Service>,
|
||||
pub sending: Arc<sending::Service>,
|
||||
|
||||
manager: Mutex<Option<JoinHandle<Result<()>>>>,
|
||||
workers: Mutex<Workers>,
|
||||
manager: Mutex<Option<Arc<Manager>>>,
|
||||
pub(crate) service: Map,
|
||||
pub server: Arc<Server>,
|
||||
pub db: Arc<Database>,
|
||||
}
|
||||
|
||||
type Workers = JoinSet<WorkerResult>;
|
||||
type WorkerResult = (Arc<dyn Service>, Result<()>);
|
||||
type WorkersLocked<'a> = MutexGuard<'a, Workers>;
|
||||
|
||||
const RESTART_DELAY_MS: u64 = 2500;
|
||||
|
||||
impl Services {
|
||||
pub fn build(server: Arc<Server>, db: Arc<Database>) -> Result<Self> {
|
||||
let mut service: Map = BTreeMap::new();
|
||||
|
@ -94,7 +84,6 @@ impl Services {
|
|||
sending: build!(sending::Service),
|
||||
globals: build!(globals::Service),
|
||||
manager: Mutex::new(None),
|
||||
workers: Mutex::new(JoinSet::new()),
|
||||
service,
|
||||
server,
|
||||
db,
|
||||
|
@ -108,13 +97,13 @@ impl Services {
|
|||
globals::migrations::migrations(&self.db, &self.globals.config).await?;
|
||||
globals::emerg_access::init_emergency_access();
|
||||
|
||||
let mut workers = self.workers.lock().await;
|
||||
for service in self.service.values() {
|
||||
self.start_worker(&mut workers, service).await?;
|
||||
}
|
||||
|
||||
debug!("Starting service manager...");
|
||||
self.manager_start().await?;
|
||||
self.manager
|
||||
.lock()
|
||||
.await
|
||||
.insert(Manager::new(self))
|
||||
.clone()
|
||||
.start()
|
||||
.await?;
|
||||
|
||||
if self.globals.allow_check_for_updates() {
|
||||
let handle = globals::updates::start_check_for_updates_task();
|
||||
|
@ -135,16 +124,16 @@ impl Services {
|
|||
_ = updates_handle.await;
|
||||
}
|
||||
|
||||
debug!("Stopping service manager...");
|
||||
self.manager_stop().await;
|
||||
if let Some(manager) = self.manager.lock().await.as_ref() {
|
||||
manager.stop().await;
|
||||
}
|
||||
|
||||
debug_info!("Services shutdown complete.");
|
||||
}
|
||||
|
||||
pub async fn poll(&self) -> Result<()> {
|
||||
if let Some(manager) = self.manager.lock().await.deref_mut() {
|
||||
trace!("Polling service manager...");
|
||||
return manager.await?;
|
||||
if let Some(manager) = self.manager.lock().await.as_ref() {
|
||||
return manager.poll().await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
@ -190,111 +179,4 @@ impl Services {
|
|||
service.interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
async fn manager_start(&self) -> Result<()> {
|
||||
debug!("Starting service manager...");
|
||||
self.manager.lock().await.get_or_insert_with(|| {
|
||||
self.server
|
||||
.runtime()
|
||||
.spawn(async move { crate::services().manager().await })
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn manager_stop(&self) {
|
||||
if let Some(manager) = self.manager.lock().await.take() {
|
||||
debug!("Waiting for service manager...");
|
||||
if let Err(e) = manager.await {
|
||||
error!("Manager shutdown error: {e:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn manager(&self) -> Result<()> {
|
||||
loop {
|
||||
let mut workers = self.workers.lock().await;
|
||||
tokio::select! {
|
||||
result = workers.join_next() => match result {
|
||||
Some(Ok(result)) => self.handle_result(&mut workers, result).await?,
|
||||
Some(Err(error)) => self.handle_abort(&mut workers, Error::from(error)).await?,
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
debug!("Worker manager finished");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_abort(&self, _workers: &mut WorkersLocked<'_>, error: Error) -> Result<()> {
|
||||
// not supported until service can be associated with abort
|
||||
unimplemented!("unexpected worker task abort {error:?}");
|
||||
}
|
||||
|
||||
async fn handle_result(&self, workers: &mut WorkersLocked<'_>, result: WorkerResult) -> Result<()> {
|
||||
let (service, result) = result;
|
||||
match result {
|
||||
Ok(()) => self.handle_finished(workers, &service).await,
|
||||
Err(error) => self.handle_error(workers, &service, error).await,
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_finished(&self, _workers: &mut WorkersLocked<'_>, service: &Arc<dyn Service>) -> Result<()> {
|
||||
debug!("service {:?} worker finished", service.name());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_error(
|
||||
&self, workers: &mut WorkersLocked<'_>, service: &Arc<dyn Service>, error: Error,
|
||||
) -> Result<()> {
|
||||
let name = service.name();
|
||||
error!("service {name:?} aborted: {error}");
|
||||
|
||||
if !self.server.running() {
|
||||
debug_warn!("service {name:?} error ignored on shutdown.");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if !error.is_panic() {
|
||||
return Err(error);
|
||||
}
|
||||
|
||||
let delay = Duration::from_millis(RESTART_DELAY_MS);
|
||||
warn!("service {name:?} worker restarting after {} delay", time::pretty(delay));
|
||||
sleep(delay).await;
|
||||
|
||||
self.start_worker(workers, service).await
|
||||
}
|
||||
|
||||
/// Start the worker in a task for the service.
|
||||
async fn start_worker(&self, workers: &mut WorkersLocked<'_>, service: &Arc<dyn Service>) -> Result<()> {
|
||||
if !self.server.running() {
|
||||
return Err(Error::Err(format!(
|
||||
"Service {:?} worker not starting during server shutdown.",
|
||||
service.name()
|
||||
)));
|
||||
}
|
||||
|
||||
debug!("Service {:?} worker starting...", service.name());
|
||||
workers.spawn_on(worker(service.clone()), self.server.runtime());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Base frame for service worker. This runs in a tokio::task. All errors and
|
||||
/// panics from the worker are caught and returned cleanly. The JoinHandle
|
||||
/// should never error with a panic, and if so it should propagate, but it may
|
||||
/// error with an Abort which the manager should handle along with results to
|
||||
/// determine if the worker should be restarted.
|
||||
async fn worker(service: Arc<dyn Service>) -> WorkerResult {
|
||||
let service_ = Arc::clone(&service);
|
||||
let result = AssertUnwindSafe(service_.worker())
|
||||
.catch_unwind()
|
||||
.await
|
||||
.map_err(Error::from_panic);
|
||||
|
||||
// flattens JoinError for panic into worker's Error
|
||||
(service, result.unwrap_or_else(Err))
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue