From 5aee03d14adc6470e27be3f881ab7cf92ea255e2 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Sun, 16 Jun 2024 19:42:16 +0000 Subject: [PATCH] switch to crate rustyline_async improve console signal and interrupt stack Signed-off-by: Jason Volk --- Cargo.lock | 77 +++++------------ Cargo.toml | 5 +- src/service/Cargo.toml | 6 +- src/service/admin/console.rs | 158 ++++++++++++++++++++++++----------- src/service/admin/mod.rs | 6 +- 5 files changed, 137 insertions(+), 115 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 00d23a74..71540f8d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -510,12 +510,6 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" -[[package]] -name = "cfg_aliases" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e" - [[package]] name = "cfg_aliases" version = "0.2.1" @@ -580,15 +574,6 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4b82cf0babdbd58558212896d1a4272303a57bdb245c2bf1147185fb45640e70" -[[package]] -name = "clipboard-win" -version = "5.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79f4473f5144e20d9aceaf2972478f06ddf687831eafeeb434fbaf0acc4144ad" -dependencies = [ - "error-code", -] - [[package]] name = "color_quant" version = "1.1.0" @@ -696,7 +681,7 @@ dependencies = [ "itertools 0.13.0", "libloading", "log", - "nix 0.29.0", + "nix", "parking_lot", "rand", "regex", @@ -796,7 +781,7 @@ dependencies = [ "reqwest", "ruma", "ruma-identifiers-validation", - "rustyline", + "rustyline-async", "serde", "serde_json", "serde_yaml", @@ -990,6 +975,7 @@ checksum = "f476fe445d41c9e991fd07515a6f463074b782242ccf4a5b7b1d1012e70824df" dependencies = [ "bitflags 2.5.0", "crossterm_winapi", + "futures-core", "libc", "mio", "parking_lot", @@ -1166,12 +1152,6 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" -[[package]] -name = "error-code" -version = "3.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0474425d51df81997e2f90a21591180b38eccf27292d755f3e30750225c175b" - [[package]] name = "fallible-iterator" version = "0.3.0" @@ -2341,18 +2321,6 @@ version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "650eef8c711430f1a879fdd01d4745a7deea475becfb90269c06775983bbf086" -[[package]] -name = "nix" -version = "0.28.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab2156c4fce2f8df6c499cc1c763e4394b7482525bf2a9701c9d79d215f519e4" -dependencies = [ - "bitflags 2.5.0", - "cfg-if", - "cfg_aliases 0.1.1", - "libc", -] - [[package]] name = "nix" version = "0.29.0" @@ -2361,7 +2329,7 @@ checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" dependencies = [ "bitflags 2.5.0", "cfg-if", - "cfg_aliases 0.2.1", + "cfg_aliases", "libc", ] @@ -3420,22 +3388,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "955d28af4278de8121b7ebeb796b6a45735dc01436d898801014aced2773a3d6" [[package]] -name = "rustyline" -version = "14.0.0" +name = "rustyline-async" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7803e8936da37efd9b6d4478277f4b2b9bb5cdb37a113e8d63222e58da647e63" +checksum = "8b6eb06391513b2184f0a5405c11a4a0a5302e8be442f4c5c35267187c2b37d5" dependencies = [ - "bitflags 2.5.0", - "cfg-if", - "clipboard-win", - "libc", - "log", - "memchr", - "nix 0.28.0", + "crossterm", + "futures-channel", + "futures-util", + "pin-project", + "thingbuf", + "thiserror", "unicode-segmentation", "unicode-width", - "utf8parse", - "windows-sys 0.52.0", ] [[package]] @@ -4024,6 +3989,16 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "thingbuf" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "662b54ef6f7b4e71f683dadc787bbb2d8e8ef2f91b682ebed3164a5a7abca905" +dependencies = [ + "parking_lot", + "pin-project", +] + [[package]] name = "thiserror" version = "1.0.61" @@ -4635,12 +4610,6 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" -[[package]] -name = "utf8parse" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" - [[package]] name = "uuid" version = "1.8.0" diff --git a/Cargo.toml b/Cargo.toml index 0c6c6e25..70599fcf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -415,9 +415,8 @@ features = [ "light", ] -[workspace.dependencies.rustyline] -version = "14.0.0" -default-features = false +[workspace.dependencies.rustyline-async] +version = "0.4.2" [workspace.dependencies.termimad] version = "0.29.3" diff --git a/src/service/Cargo.toml b/src/service/Cargo.toml index 8a254ef8..80335181 100644 --- a/src/service/Cargo.toml +++ b/src/service/Cargo.toml @@ -21,7 +21,7 @@ brotli_compression = [ "reqwest/brotli", ] console = [ - "dep:rustyline", + "dep:rustyline-async", "dep:termimad", ] dev_release_log_level = [] @@ -61,8 +61,8 @@ regex.workspace = true reqwest.workspace = true ruma-identifiers-validation.workspace = true ruma.workspace = true -rustyline.workspace = true -rustyline.optional = true +rustyline-async.workspace = true +rustyline-async.optional = true serde_json.workspace = true serde.workspace = true serde_yaml.workspace = true diff --git a/src/service/admin/console.rs b/src/service/admin/console.rs index ba0d7ecc..dc967d00 100644 --- a/src/service/admin/console.rs +++ b/src/service/admin/console.rs @@ -1,44 +1,35 @@ #![cfg(feature = "console")] -use std::sync::Arc; +use std::{ + collections::VecDeque, + sync::{Arc, Mutex}, +}; -use conduit::{debug, defer, error, log, trace}; +use conduit::{debug, defer, error, log}; use futures_util::future::{AbortHandle, Abortable}; use ruma::events::room::message::RoomMessageEventContent; -use rustyline::{error::ReadlineError, history, Editor}; +use rustyline_async::{Readline, ReadlineError, ReadlineEvent}; use termimad::MadSkin; -use tokio::{sync::Mutex, task::JoinHandle}; +use tokio::task::JoinHandle; use crate::services; pub struct Console { - join: Mutex>>, - input: Mutex>, - abort: std::sync::Mutex>, + worker_join: Mutex>>, + input_abort: Mutex>, + command_abort: Mutex>, + history: Mutex>, output: MadSkin, } +const PROMPT: &str = "uwu> "; +const HISTORY_LIMIT: usize = 48; + impl Console { #[must_use] pub fn new() -> Arc { - use rustyline::config::{Behavior, BellStyle}; use termimad::{crossterm::style::Color, Alignment, CompoundStyle, LineStyle}; - let config = rustyline::Config::builder() - .enable_signals(false) - .behavior(Behavior::PreferTerm) - .bell_style(BellStyle::Visible) - .auto_add_history(true) - .max_history_size(100) - .expect("valid history size") - .indent_size(4) - .tab_stop(4) - .build(); - - let history = history::MemHistory::with_config(config); - let input = Editor::with_history(config, history).expect("builder configuration succeeded"); - let mut output = MadSkin::default_dark(); - let code_style = CompoundStyle::with_fgbg(Color::AnsiValue(40), Color::AnsiValue(234)); output.inline_code = code_style.clone(); output.code_block = LineStyle { @@ -49,33 +40,63 @@ impl Console { }; Arc::new(Self { - join: None.into(), - input: Mutex::new(input), - abort: None.into(), + worker_join: None.into(), + input_abort: None.into(), + command_abort: None.into(), + history: VecDeque::with_capacity(HISTORY_LIMIT).into(), output, }) } + pub(super) async fn handle_signal(self: &Arc, sig: &'static str) { + if !services().server.running() { + self.interrupt(); + } else if sig == "SIGINT" { + self.interrupt_command(); + self.start().await; + } + } + #[allow(clippy::let_underscore_must_use)] pub async fn start(self: &Arc) { - let mut join = self.join.lock().await; - if join.is_none() { + let mut worker_join = self.worker_join.lock().expect("locked"); + if worker_join.is_none() { let self_ = Arc::clone(self); - _ = join.insert(services().server.runtime().spawn(self_.worker())); + _ = worker_join.insert(services().server.runtime().spawn(self_.worker())); } } #[allow(clippy::let_underscore_must_use)] pub async fn close(self: &Arc) { - if let Some(join) = self.join.lock().await.take() { - _ = join.await; - } + self.interrupt(); + let Some(worker_join) = self.worker_join.lock().expect("locked").take() else { + return; + }; + + _ = worker_join.await; } pub fn interrupt(self: &Arc) { - if let Some(abort) = self.abort.lock().expect("locked").take() { + self.interrupt_command(); + self.interrupt_readline(); + self.worker_join + .lock() + .expect("locked") + .as_ref() + .map(JoinHandle::abort); + } + + pub fn interrupt_readline(self: &Arc) { + if let Some(input_abort) = self.input_abort.lock().expect("locked").take() { + debug!("Interrupting console readline..."); + input_abort.abort(); + } + } + + pub fn interrupt_command(self: &Arc) { + if let Some(command_abort) = self.command_abort.lock().expect("locked").take() { debug!("Interrupting console command..."); - abort.abort(); + command_abort.abort(); } } @@ -83,25 +104,48 @@ impl Console { async fn worker(self: Arc) { debug!("session starting"); while services().server.running() { - let mut input = self.input.lock().await; - - let suppression = log::Suppress::new(&services().server); - let line = tokio::task::block_in_place(|| input.readline("uwu> ")); - drop(suppression); - - trace!(?line, "input"); - match line { - Ok(string) => self.clone().handle(string).await, - Err(e) => match e { - ReadlineError::Interrupted | ReadlineError::Eof => break, - ReadlineError::WindowResized => continue, - _ => error!("console: {e:?}"), + match self.readline().await { + Ok(event) => match event { + ReadlineEvent::Line(string) => self.clone().handle(string).await, + ReadlineEvent::Interrupted => continue, + ReadlineEvent::Eof => break, + }, + Err(error) => match error { + ReadlineError::Closed => break, + ReadlineError::IO(error) => { + error!("console I/O: {error:?}"); + break; + }, }, } } debug!("session ending"); - self.join.lock().await.take(); + self.worker_join.lock().expect("locked").take(); + } + + #[allow(clippy::let_underscore_must_use)] + async fn readline(self: &Arc) -> Result { + let _suppression = log::Suppress::new(&services().server); + + let (mut readline, _writer) = Readline::new(PROMPT.to_owned())?; + self.set_history(&mut readline); + + let future = readline.readline(); + + let (abort, abort_reg) = AbortHandle::new_pair(); + let future = Abortable::new(future, abort_reg); + _ = self.input_abort.lock().expect("locked").insert(abort); + defer! {{ + _ = self.input_abort.lock().expect("locked").take(); + }} + + let Ok(result) = future.await else { + return Ok(ReadlineEvent::Eof); + }; + + readline.flush()?; + result } #[allow(clippy::let_underscore_must_use)] @@ -110,12 +154,13 @@ impl Console { return; } + self.add_history(line.clone()); let future = self.clone().process(line); let (abort, abort_reg) = AbortHandle::new_pair(); let future = Abortable::new(future, abort_reg); - _ = self.abort.lock().expect("locked").insert(abort); + _ = self.command_abort.lock().expect("locked").insert(abort); defer! {{ - _ = self.abort.lock().expect("locked").take(); + _ = self.command_abort.lock().expect("locked").take(); }} _ = future.await; @@ -133,4 +178,17 @@ impl Console { let output = self.output.term_text(output_content.body()); println!("{output}"); } + + fn set_history(&self, readline: &mut Readline) { + let history = self.history.lock().expect("locked"); + for entry in history.iter().rev() { + readline.add_history_entry(entry.clone()); + } + } + + fn add_history(&self, line: String) { + let mut history = self.history.lock().expect("locked"); + history.push_front(line); + history.truncate(HISTORY_LIMIT); + } } diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index 3f983b75..730163ac 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -130,7 +130,6 @@ impl Service { let receiver = self.receiver.lock().await; let mut signals = services().server.signal.subscribe(); loop { - debug_assert!(!receiver.is_closed(), "channel closed"); tokio::select! { command = receiver.recv_async() => match command { Ok(command) => self.handle_command(command).await, @@ -146,10 +145,7 @@ impl Service { async fn handle_signal(&self, #[allow(unused_variables)] sig: &'static str) { #[cfg(feature = "console")] - if sig == "SIGINT" && services().server.running() { - self.console.interrupt(); - self.console.start().await; - } + self.console.handle_signal(sig).await; } async fn handle_command(&self, command: Command) {