From bb5f2556c30782c8377388d5a83d0531bda9b199 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Wed, 28 Aug 2024 04:09:46 +0000 Subject: [PATCH] improve admin command error propagation Signed-off-by: Jason Volk --- src/admin/command.rs | 2 + src/admin/debug/mod.rs | 1 + src/admin/debug/tester.rs | 18 ++++++- src/admin/processor.rs | 58 ++++++++++++---------- src/core/config/mod.rs | 6 +++ src/service/admin/console.rs | 33 +++++++++++-- src/service/admin/mod.rs | 82 ++++++++++++++++++++----------- src/service/admin/startup.rs | 67 +++++++++++++++++++------ src/service/rooms/timeline/mod.rs | 3 +- 9 files changed, 192 insertions(+), 78 deletions(-) diff --git a/src/admin/command.rs b/src/admin/command.rs index a26564fc..c594736d 100644 --- a/src/admin/command.rs +++ b/src/admin/command.rs @@ -1,9 +1,11 @@ use std::time::SystemTime; use conduit_service::Services; +use ruma::EventId; pub(crate) struct Command<'a> { pub(crate) services: &'a Services, pub(crate) body: &'a [&'a str], pub(crate) timer: SystemTime, + pub(crate) reply_id: Option<&'a EventId>, } diff --git a/src/admin/debug/mod.rs b/src/admin/debug/mod.rs index 1f51a35e..20ddbf2f 100644 --- a/src/admin/debug/mod.rs +++ b/src/admin/debug/mod.rs @@ -195,5 +195,6 @@ pub(super) enum DebugCommand { /// - Developer test stubs #[command(subcommand)] #[allow(non_snake_case)] + #[clap(hide(true))] Tester(TesterCommand), } diff --git a/src/admin/debug/tester.rs b/src/admin/debug/tester.rs index af4ea2dc..c11f893e 100644 --- a/src/admin/debug/tester.rs +++ b/src/admin/debug/tester.rs @@ -1,3 +1,4 @@ +use conduit::Err; use ruma::events::room::message::RoomMessageEventContent; use crate::{admin_command, admin_command_dispatch, Result}; @@ -5,13 +6,28 @@ use crate::{admin_command, admin_command_dispatch, Result}; #[admin_command_dispatch] #[derive(Debug, clap::Subcommand)] pub(crate) enum TesterCommand { + Panic, + Failure, Tester, Timer, } +#[rustfmt::skip] +#[admin_command] +async fn panic(&self) -> Result { + + panic!("panicked") +} + +#[rustfmt::skip] +#[admin_command] +async fn failure(&self) -> Result { + + Err!("failed") +} + #[inline(never)] #[rustfmt::skip] -#[allow(unused_variables)] #[admin_command] async fn tester(&self) -> Result { diff --git a/src/admin/processor.rs b/src/admin/processor.rs index 2e4330c8..67548f49 100644 --- a/src/admin/processor.rs +++ b/src/admin/processor.rs @@ -23,7 +23,7 @@ use ruma::{ relation::InReplyTo, room::message::{Relation::Reply, RoomMessageEventContent}, }, - OwnedEventId, + EventId, }; use service::{ admin::{CommandInput, CommandOutput, ProcessorFuture, ProcessorResult}, @@ -48,12 +48,12 @@ async fn handle_command(services: Arc, command: CommandInput) -> Proce .catch_unwind() .await .map_err(Error::from_panic) - .or_else(|error| handle_panic(&error, command)) + .unwrap_or_else(|error| handle_panic(&error, &command)) } -async fn process_command(services: Arc, input: &CommandInput) -> CommandOutput { +async fn process_command(services: Arc, input: &CommandInput) -> ProcessorResult { let (command, args, body) = match parse(&services, input) { - Err(error) => return error, + Err(error) => return Err(error), Ok(parsed) => parsed, }; @@ -61,33 +61,22 @@ async fn process_command(services: Arc, input: &CommandInput) -> Comma services: &services, body: &body, timer: SystemTime::now(), + reply_id: input.reply_id.as_deref(), }; - process(&context, command, &args) - .await - .and_then(|content| reply(content, input.reply_id.clone())) + process(&context, command, &args).await } -fn handle_panic(error: &Error, command: CommandInput) -> ProcessorResult { +fn handle_panic(error: &Error, command: &CommandInput) -> ProcessorResult { let link = "Please submit a [bug report](https://github.com/girlbossceo/conduwuit/issues/new). 🥺"; let msg = format!("Panic occurred while processing command:\n```\n{error:#?}\n```\n{link}"); let content = RoomMessageEventContent::notice_markdown(msg); error!("Panic while processing command: {error:?}"); - Ok(reply(content, command.reply_id)) -} - -fn reply(mut content: RoomMessageEventContent, reply_id: Option) -> Option { - content.relates_to = reply_id.map(|event_id| Reply { - in_reply_to: InReplyTo { - event_id, - }, - }); - - Some(content) + Err(reply(content, command.reply_id.as_deref())) } // Parse and process a message from the admin room -async fn process(context: &Command<'_>, command: AdminCommand, args: &[String]) -> CommandOutput { +async fn process(context: &Command<'_>, command: AdminCommand, args: &[String]) -> ProcessorResult { let (capture, logs) = capture_create(context); let capture_scope = capture.start(); @@ -112,13 +101,15 @@ async fn process(context: &Command<'_>, command: AdminCommand, args: &[String]) match result { Ok(content) => { - write!(&mut output, "{}", content.body()).expect("failed to format command result to output"); + write!(&mut output, "{0}", content.body()).expect("failed to format command result to output buffer"); + Ok(Some(reply(RoomMessageEventContent::notice_markdown(output), context.reply_id))) }, - Err(error) => write!(&mut output, "Command failed with error:\n```\n{error:#?}\n```") - .expect("failed to format error to command output"), - }; - - Some(RoomMessageEventContent::notice_markdown(output)) + Err(error) => { + write!(&mut output, "Command failed with error:\n```\n{error:#?}\n```") + .expect("failed to format command result to output"); + Err(reply(RoomMessageEventContent::notice_markdown(output), context.reply_id)) + }, + } } fn capture_create(context: &Command<'_>) -> (Arc, Arc>) { @@ -158,7 +149,10 @@ fn parse<'a>( let message = error .to_string() .replace("server.name", services.globals.server_name().as_str()); - Err(Some(RoomMessageEventContent::notice_markdown(message))) + Err(reply( + RoomMessageEventContent::notice_markdown(message), + input.reply_id.as_deref(), + )) }, } } @@ -255,3 +249,13 @@ fn parse_line(command_line: &str) -> Vec { trace!(?command_line, ?argv, "parse"); argv } + +fn reply(mut content: RoomMessageEventContent, reply_id: Option<&EventId>) -> RoomMessageEventContent { + content.relates_to = reply_id.map(|event_id| Reply { + in_reply_to: InReplyTo { + event_id: event_id.to_owned(), + }, + }); + + content +} diff --git a/src/core/config/mod.rs b/src/core/config/mod.rs index 6acb814a..1f78dea6 100644 --- a/src/core/config/mod.rs +++ b/src/core/config/mod.rs @@ -338,6 +338,8 @@ pub struct Config { pub admin_console_automatic: bool, #[serde(default)] pub admin_execute: Vec, + #[serde(default)] + pub admin_execute_errors_ignore: bool, #[serde(default = "default_admin_log_capture")] pub admin_log_capture: String, @@ -601,6 +603,10 @@ impl fmt::Display for Config { &self.admin_console_automatic.to_string(), ); line("Execute admin commands after startup", &self.admin_execute.join(", ")); + line( + "Continue startup even if some commands fail", + &self.admin_execute_errors_ignore.to_string(), + ); line("Filter for admin command log capture", &self.admin_log_capture); line("Allow outgoing federated typing", &self.allow_outgoing_typing.to_string()); line("Allow incoming federated typing", &self.allow_incoming_typing.to_string()); diff --git a/src/service/admin/console.rs b/src/service/admin/console.rs index 7f3a6fae..55bae365 100644 --- a/src/service/admin/console.rs +++ b/src/service/admin/console.rs @@ -158,13 +158,18 @@ impl Console { async fn process(self: Arc, line: String) { match self.admin.command_in_place(line, None).await { - Ok(Some(content)) => self.output(content).await, - Err(e) => error!("processing command: {e}"), - _ => (), + Ok(Some(ref content)) => self.output(content), + Err(ref content) => self.output_err(content), + _ => unreachable!(), } } - async fn output(self: Arc, output_content: RoomMessageEventContent) { + fn output_err(self: Arc, output_content: &RoomMessageEventContent) { + let output = configure_output_err(self.output.clone()); + output.print_text(output_content.body()); + } + + fn output(self: Arc, output_content: &RoomMessageEventContent) { self.output.print_text(output_content.body()); } @@ -194,12 +199,32 @@ impl Console { } } +/// Standalone/static markdown printer for errors. +pub fn print_err(markdown: &str) { + let output = configure_output_err(MadSkin::default_dark()); + output.print_text(markdown); +} /// Standalone/static markdown printer. pub fn print(markdown: &str) { let output = configure_output(MadSkin::default_dark()); output.print_text(markdown); } +fn configure_output_err(mut output: MadSkin) -> MadSkin { + use termimad::{crossterm::style::Color, Alignment, CompoundStyle, LineStyle}; + + let code_style = CompoundStyle::with_fgbg(Color::AnsiValue(196), Color::AnsiValue(234)); + output.inline_code = code_style.clone(); + output.code_block = LineStyle { + left_margin: 0, + right_margin: 0, + align: Alignment::Left, + compound_style: code_style, + }; + + output +} + fn configure_output(mut output: MadSkin) -> MadSkin { use termimad::{crossterm::style::Color, Alignment, CompoundStyle, LineStyle}; diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index 9e27cdf2..bd89d5a3 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -10,7 +10,7 @@ use std::{ }; use async_trait::async_trait; -use conduit::{debug, error, error::default_log, pdu::PduBuilder, Err, Error, PduEvent, Result, Server}; +use conduit::{debug, err, error, error::default_log, pdu::PduBuilder, Error, PduEvent, Result, Server}; pub use create::create_admin_room; use loole::{Receiver, Sender}; use ruma::{ @@ -45,18 +45,34 @@ struct Services { services: StdRwLock>>, } +/// Inputs to a command are a multi-line string and optional reply_id. #[derive(Debug)] pub struct CommandInput { pub command: String, pub reply_id: Option, } +/// Prototype of the tab-completer. The input is buffered text when tab +/// asserted; the output will fully replace the input buffer. pub type Completer = fn(&str) -> String; -pub type Processor = fn(Arc, CommandInput) -> ProcessorFuture; -pub type ProcessorFuture = Pin + Send>>; -pub type ProcessorResult = Result; -pub type CommandOutput = Option; +/// Prototype of the command processor. This is a callback supplied by the +/// reloadable admin module. +pub type Processor = fn(Arc, CommandInput) -> ProcessorFuture; + +/// Return type of the processor +pub type ProcessorFuture = Pin + Send>>; + +/// Result wrapping of a command's handling. Both variants are complete message +/// events which have digested any prior errors. The wrapping preserves whether +/// the command failed without interpreting the text. Ok(None) outputs are +/// dropped to produce no response. +pub type ProcessorResult = Result, CommandOutput>; + +/// Alias for the output structure. +pub type CommandOutput = RoomMessageEventContent; + +/// Maximum number of commands which can be queued for dispatch. const COMMAND_QUEUE_LIMIT: usize = 512; #[async_trait] @@ -86,7 +102,7 @@ impl crate::Service for Service { let receiver = self.receiver.lock().await; let mut signals = self.services.server.signal.subscribe(); - self.startup_execute().await; + self.startup_execute().await?; self.console_auto_start().await; loop { @@ -120,11 +136,15 @@ impl crate::Service for Service { } impl Service { + /// Sends markdown message (not an m.notice for notification reasons) to the + /// admin room as the admin user. pub async fn send_text(&self, body: &str) { self.send_message(RoomMessageEventContent::text_markdown(body)) .await; } + /// Sends a message to the admin room as the admin user (see send_text() for + /// convenience). pub async fn send_message(&self, message_content: RoomMessageEventContent) { if let Ok(Some(room_id)) = self.get_admin_room() { let user_id = &self.services.globals.server_user; @@ -133,14 +153,20 @@ impl Service { } } - pub async fn command(&self, command: String, reply_id: Option) { - self.send(CommandInput { - command, - reply_id, - }) - .await; + /// Posts a command to the command processor queue and returns. Processing + /// will take place on the service worker's task asynchronously. Errors if + /// the queue is full. + pub fn command(&self, command: String, reply_id: Option) -> Result<()> { + self.sender + .send(CommandInput { + command, + reply_id, + }) + .map_err(|e| err!("Failed to enqueue admin command: {e:?}")) } + /// Dispatches a comamnd to the processor on the current task and waits for + /// completion. pub async fn command_in_place(&self, command: String, reply_id: Option) -> ProcessorResult { self.process_command(CommandInput { command, @@ -149,6 +175,8 @@ impl Service { .await } + /// Invokes the tab-completer to complete the command. When unavailable, + /// None is returned. pub fn complete_command(&self, command: &str) -> Option { self.complete .read() @@ -156,11 +184,6 @@ impl Service { .map(|complete| complete(command)) } - async fn send(&self, message: CommandInput) { - debug_assert!(!self.sender.is_closed(), "channel closed"); - self.sender.send_async(message).await.expect("message sent"); - } - async fn handle_signal(&self, #[allow(unused_variables)] sig: &'static str) { #[cfg(feature = "console")] self.console.handle_signal(sig).await; @@ -168,29 +191,28 @@ impl Service { async fn handle_command(&self, command: CommandInput) { match self.process_command(command).await { - Ok(Some(output)) => self.handle_response(output).await, + Ok(Some(output)) | Err(output) => self.handle_response(output).await, Ok(None) => debug!("Command successful with no response"), - Err(e) => error!("Command processing error: {e}"), } } async fn process_command(&self, command: CommandInput) -> ProcessorResult { - let Some(services) = self + let handle = &self + .handle + .read() + .await + .expect("Admin module is not loaded"); + + let services = self .services .services .read() .expect("locked") .as_ref() .and_then(Weak::upgrade) - else { - return Err!("Services self-reference not initialized."); - }; + .expect("Services self-reference not initialized."); - if let Some(handle) = self.handle.read().await.as_ref() { - handle(services, command).await - } else { - Err!("Admin module is not loaded.") - } + handle(services, command).await } /// Checks whether a given user is an admin of this server @@ -233,6 +255,10 @@ impl Service { }; let Ok(Some(pdu)) = self.services.timeline.get_pdu(&in_reply_to.event_id) else { + error!( + event_id = ?in_reply_to.event_id, + "Missing admin command in_reply_to event" + ); return; }; diff --git a/src/service/admin/startup.rs b/src/service/admin/startup.rs index 8a4a3d63..b7bc8d44 100644 --- a/src/service/admin/startup.rs +++ b/src/service/admin/startup.rs @@ -1,9 +1,7 @@ -use conduit::{debug, debug_info, error, implement, info}; +use conduit::{debug, debug_info, error, implement, info, Err, Result}; use ruma::events::room::message::RoomMessageEventContent; use tokio::time::{sleep, Duration}; -use super::console; - /// Possibly spawn the terminal console at startup if configured. #[implement(super::Service)] pub(super) async fn console_auto_start(&self) { @@ -24,45 +22,82 @@ pub(super) async fn console_auto_stop(&self) { /// Execute admin commands after startup #[implement(super::Service)] -pub(super) async fn startup_execute(&self) { - sleep(Duration::from_millis(500)).await; //TODO: remove this after run-states are broadcast - for (i, command) in self.services.server.config.admin_execute.iter().enumerate() { - self.startup_execute_command(i, command.clone()).await; +pub(super) async fn startup_execute(&self) -> Result<()> { + // List of comamnds to execute + let commands = &self.services.server.config.admin_execute; + + // Determine if we're running in smoketest-mode which will change some behaviors + let smoketest = self.services.server.config.test.contains("smoke"); + + // When true, errors are ignored and startup continues. + let errors = !smoketest && self.services.server.config.admin_execute_errors_ignore; + + //TODO: remove this after run-states are broadcast + sleep(Duration::from_millis(500)).await; + + for (i, command) in commands.iter().enumerate() { + if let Err(e) = self.startup_execute_command(i, command.clone()).await { + if !errors { + return Err(e); + } + } + tokio::task::yield_now().await; } // The smoketest functionality is placed here for now and simply initiates // shutdown after all commands have executed. - if self.services.server.config.test.contains("smoke") { + if smoketest { debug_info!("Smoketest mode. All commands complete. Shutting down now..."); self.services .server .shutdown() - .unwrap_or_else(error::default_log); + .inspect_err(error::inspect_log) + .expect("Error shutting down from smoketest"); } + + Ok(()) } /// Execute one admin command after startup #[implement(super::Service)] -async fn startup_execute_command(&self, i: usize, command: String) { +async fn startup_execute_command(&self, i: usize, command: String) -> Result<()> { debug!("Startup command #{i}: executing {command:?}"); match self.command_in_place(command, None).await { - Err(e) => error!("Startup command #{i} failed: {e:?}"), - Ok(None) => info!("Startup command #{i} completed (no output)."), Ok(Some(output)) => Self::startup_command_output(i, &output), + Err(output) => Self::startup_command_error(i, &output), + Ok(None) => { + info!("Startup command #{i} completed (no output)."); + Ok(()) + }, } } #[cfg(feature = "console")] #[implement(super::Service)] -fn startup_command_output(i: usize, content: &RoomMessageEventContent) { - info!("Startup command #{i} completed:"); - console::print(content.body()); +fn startup_command_output(i: usize, content: &RoomMessageEventContent) -> Result<()> { + debug_info!("Startup command #{i} completed:"); + super::console::print(content.body()); + Ok(()) +} + +#[cfg(feature = "console")] +#[implement(super::Service)] +fn startup_command_error(i: usize, content: &RoomMessageEventContent) -> Result<()> { + super::console::print_err(content.body()); + Err!(debug_error!("Startup command #{i} failed.")) } #[cfg(not(feature = "console"))] #[implement(super::Service)] -fn startup_command_output(i: usize, content: &RoomMessageEventContent) { +fn startup_command_output(i: usize, content: &RoomMessageEventContent) -> Result<()> { info!("Startup command #{i} completed:\n{:#?}", content.body()); + Ok(()) +} + +#[cfg(not(feature = "console"))] +#[implement(super::Service)] +fn startup_command_error(i: usize, content: &RoomMessageEventContent) -> Result<()> { + Err!(error!("Startup command #{i} failed:\n{:#?}", content.body())) } diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index 983bef70..4f2352f8 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -523,8 +523,7 @@ impl Service { if self.services.admin.is_admin_command(pdu, &body).await { self.services .admin - .command(body, Some((*pdu.event_id).into())) - .await; + .command(body, Some((*pdu.event_id).into()))?; } } },