console command interruption

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-06-16 05:17:15 +00:00
parent 1d1b1644e9
commit a22524496d
2 changed files with 33 additions and 17 deletions

View file

@ -1,7 +1,8 @@
#![cfg(feature = "console")]
use std::sync::Arc;
use conduit::{error, log, trace};
use conduit::{debug, defer, error, log, trace};
use futures_util::future::{AbortHandle, Abortable};
use ruma::events::room::message::RoomMessageEventContent;
use rustyline::{error::ReadlineError, history, Editor};
use termimad::MadSkin;
@ -12,6 +13,7 @@ use crate::services;
pub struct Console {
join: Mutex<Option<JoinHandle<()>>>,
input: Mutex<Editor<(), history::MemHistory>>,
abort: std::sync::Mutex<Option<AbortHandle>>,
output: MadSkin,
}
@ -49,6 +51,7 @@ impl Console {
Arc::new(Self {
join: None.into(),
input: Mutex::new(input),
abort: None.into(),
output,
})
}
@ -62,8 +65,6 @@ impl Console {
}
}
pub fn interrupt(self: &Arc<Self>) { Self::handle_interrupt(); }
#[allow(clippy::let_underscore_must_use)]
pub async fn close(self: &Arc<Self>) {
if let Some(join) = self.join.lock().await.take() {
@ -71,8 +72,16 @@ impl Console {
}
}
pub fn interrupt(self: &Arc<Self>) {
if let Some(abort) = self.abort.lock().expect("locked").take() {
debug!("Interrupting console command...");
abort.abort();
}
}
#[tracing::instrument(skip_all, name = "console")]
async fn worker(self: Arc<Self>) {
debug!("session starting");
while services().server.running() {
let mut input = self.input.lock().await;
@ -80,25 +89,39 @@ impl Console {
let line = tokio::task::block_in_place(|| input.readline("uwu> "));
drop(suppression);
trace!(?line, "input");
match line {
Ok(string) => self.handle(string).await,
Ok(string) => self.clone().handle(string).await,
Err(e) => match e {
ReadlineError::Eof => break,
ReadlineError::Interrupted => Self::handle_interrupt(),
ReadlineError::WindowResized => Self::handle_winch(),
ReadlineError::Interrupted | ReadlineError::Eof => break,
ReadlineError::WindowResized => continue,
_ => error!("console: {e:?}"),
},
}
}
debug!("session ending");
self.join.lock().await.take();
}
async fn handle(&self, line: String) {
#[allow(clippy::let_underscore_must_use)]
async fn handle(self: Arc<Self>, line: String) {
if line.is_empty() {
return;
}
let future = self.clone().process(line);
let (abort, abort_reg) = AbortHandle::new_pair();
let future = Abortable::new(future, abort_reg);
_ = self.abort.lock().expect("locked").insert(abort);
defer! {{
_ = self.abort.lock().expect("locked").take();
}}
_ = future.await;
}
async fn process(self: Arc<Self>, line: String) {
match services().admin.command_in_place(line, None).await {
Ok(Some(content)) => self.output(content).await,
Err(e) => error!("processing command: {e}"),
@ -106,16 +129,8 @@ impl Console {
}
}
async fn output(&self, output_content: RoomMessageEventContent) {
async fn output(self: Arc<Self>, output_content: RoomMessageEventContent) {
let output = self.output.term_text(output_content.body());
println!("{output}");
}
fn handle_interrupt() {
trace!("interrupted");
}
fn handle_winch() {
trace!("winch");
}
}

View file

@ -147,6 +147,7 @@ impl Service {
async fn handle_signal(&self, #[allow(unused_variables)] sig: &'static str) {
#[cfg(feature = "console")]
if sig == "SIGINT" && services().server.running() {
self.console.interrupt();
self.console.start().await;
}
}