generalize log capture to all admin commands; simplify handler

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-08-03 05:34:38 +00:00
parent eded585f79
commit 94b805de0b
3 changed files with 87 additions and 86 deletions

View file

@ -1,6 +1,9 @@
use service::Services;
use std::time::SystemTime;
use conduit_service::Services;
pub(crate) struct Command<'a> {
pub(crate) services: &'a Services,
pub(crate) body: &'a [&'a str],
pub(crate) timer: SystemTime,
}

View file

@ -1,16 +1,12 @@
use std::{
collections::{BTreeMap, HashMap},
fmt::Write,
sync::{Arc, Mutex},
sync::Arc,
time::{Instant, SystemTime},
};
use api::client::validate_and_add_event_id;
use conduit::{
debug, debug_error, err, info, log,
log::{capture, Capture},
utils, warn, Error, PduEvent, Result,
};
use conduit::{debug, debug_error, err, info, trace, utils, warn, Error, PduEvent, Result};
use ruma::{
api::{client::error::ErrorKind, federation::event::get_room_state},
events::room::message::RoomMessageEventContent,
@ -717,30 +713,14 @@ pub(super) async fn resolve_true_destination(
));
}
let filter: &capture::Filter = &|data| {
data.level() <= log::Level::DEBUG
&& data.mod_name().starts_with("conduit")
&& matches!(data.span_name(), "actual" | "well-known" | "srv")
};
let state = &self.services.server.log.capture;
let logs = Arc::new(Mutex::new(String::new()));
let capture = Capture::new(state, Some(filter), capture::fmt_markdown(logs.clone()));
let capture_scope = capture.start();
let actual = self
.services
.resolver
.resolve_actual_dest(&server_name, !no_cache)
.await?;
drop(capture_scope);
let msg = format!(
"{}\nDestination: {}\nHostname URI: {}",
logs.lock().expect("locked"),
actual.dest,
actual.host,
);
let msg = format!("Destination: {}\nHostname URI: {}", actual.dest, actual.host,);
Ok(RoomMessageEventContent::text_markdown(msg))
}

View file

@ -1,7 +1,21 @@
use std::{panic::AssertUnwindSafe, sync::Arc, time::Instant};
use std::{
panic::AssertUnwindSafe,
sync::{Arc, Mutex},
time::SystemTime,
};
use clap::{CommandFactory, Parser};
use conduit::{checked, error, trace, utils::string::common_prefix, Error, Result};
use conduit::{
debug, error,
log::{
capture,
capture::Capture,
fmt::{markdown_table, markdown_table_head},
},
trace,
utils::string::{collect_stream, common_prefix},
Error, Result,
};
use futures_util::future::FutureExt;
use ruma::{
events::{
@ -14,6 +28,7 @@ use service::{
admin::{CommandInput, CommandOutput, HandlerFuture, HandlerResult},
Services,
};
use tracing::Level;
use crate::{admin, admin::AdminCommand, Command};
@ -34,10 +49,21 @@ async fn handle_command(services: Arc<Services>, command: CommandInput) -> Handl
.or_else(|error| handle_panic(&error, command))
}
async fn process_command(services: Arc<Services>, command: &CommandInput) -> CommandOutput {
process(services, &command.command)
async fn process_command(services: Arc<Services>, input: &CommandInput) -> CommandOutput {
let (command, args, body) = match parse(&services, input) {
Err(error) => return error,
Ok(parsed) => parsed,
};
let context = Command {
services: &services,
body: &body,
timer: SystemTime::now(),
};
process(&context, command, &args)
.await
.and_then(|content| reply(content, command.reply_id.clone()))
.and_then(|content| reply(content, input.reply_id.clone()))
}
fn handle_panic(error: &Error, command: CommandInput) -> HandlerResult {
@ -59,69 +85,61 @@ fn reply(mut content: RoomMessageEventContent, reply_id: Option<OwnedEventId>) -
}
// Parse and process a message from the admin room
async fn process(services: Arc<Services>, msg: &str) -> CommandOutput {
let lines = msg.lines().filter(|l| !l.trim().is_empty());
let command = lines
.clone()
.next()
.expect("each string has at least one line");
let (parsed, body) = match parse_command(command) {
Ok(parsed) => parsed,
Err(error) => {
let server_name = services.globals.server_name();
let message = error.replace("server.name", server_name.as_str());
return Some(RoomMessageEventContent::notice_markdown(message));
},
async fn process(context: &Command<'_>, command: AdminCommand, args: &[String]) -> CommandOutput {
let filter: &capture::Filter =
&|data| data.level() <= Level::DEBUG && data.mod_name().starts_with("conduit") && data.scope.contains(&"admin");
let logs = Arc::new(Mutex::new(
collect_stream(|s| markdown_table_head(s)).expect("markdown table header"),
));
let capture = Capture::new(
&context.services.server.log.capture,
Some(filter),
capture::fmt(markdown_table, logs.clone()),
);
let capture_scope = capture.start();
let result = Box::pin(admin::process(command, context)).await;
drop(capture_scope);
debug!(
ok = result.is_ok(),
elapsed = ?context.timer.elapsed(),
command = ?args,
"command processed"
);
let logs = logs.lock().expect("locked");
let output = match result {
Err(error) => format!("{logs}\nEncountered an error while handling the command:\n```\n{error:#?}\n```"),
Ok(reply) => format!("{logs}\n{}", reply.body()), //TODO: content is recreated to add logs
};
let body = parse_body(AdminCommand::command(), &body, lines.skip(1).collect()).expect("trailing body parsed");
let context = Command {
services: &services,
body: &body,
};
let timer = Instant::now();
let result = Box::pin(admin::process(parsed, &context)).await;
let elapsed = timer.elapsed();
conduit::debug!(?command, ok = result.is_ok(), "command processed in {elapsed:?}");
match result {
Ok(reply) => Some(reply),
Err(error) => Some(RoomMessageEventContent::notice_markdown(format!(
"Encountered an error while handling the command:\n```\n{error:#?}\n```"
))),
}
Some(RoomMessageEventContent::notice_markdown(output))
}
// Parse chat messages from the admin room into an AdminCommand object
fn parse_command(command_line: &str) -> Result<(AdminCommand, Vec<String>), String> {
let argv = parse_line(command_line);
let com = AdminCommand::try_parse_from(&argv).map_err(|error| error.to_string())?;
Ok((com, argv))
fn parse<'a>(
services: &Arc<Services>, input: &'a CommandInput,
) -> Result<(AdminCommand, Vec<String>, Vec<&'a str>), CommandOutput> {
let lines = input.command.lines().filter(|line| !line.trim().is_empty());
let command_line = lines.clone().next().expect("command missing first line");
let body = lines.skip(1).collect();
match parse_command(command_line) {
Ok((command, args)) => Ok((command, args, body)),
Err(error) => {
let message = error
.to_string()
.replace("server.name", services.globals.server_name().as_str());
Err(Some(RoomMessageEventContent::notice_markdown(message)))
},
}
}
fn parse_body<'a>(mut cmd: clap::Command, body: &'a [String], lines: Vec<&'a str>) -> Result<Vec<&'a str>> {
let mut start = 1;
'token: for token in body.iter().skip(1) {
let cmd_ = cmd.clone();
for sub in cmd_.get_subcommands() {
if sub.get_name() == *token {
start = checked!(start + 1)?;
cmd = sub.clone();
continue 'token;
}
}
// positional arguments have to be skipped too
let num_posargs = cmd_.get_positionals().count();
start = checked!(start + num_posargs)?;
break;
}
Ok(body
.iter()
.skip(start)
.map(String::as_str)
.chain(lines)
.collect::<Vec<&'a str>>())
fn parse_command(line: &str) -> Result<(AdminCommand, Vec<String>)> {
let argv = parse_line(line);
let command = AdminCommand::try_parse_from(&argv)?;
Ok((command, argv))
}
fn complete_command(mut cmd: clap::Command, line: &str) -> String {