From aa34021b274cb3f87b4c319d9ad670fd809eb4e1 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Tue, 11 Jun 2024 01:26:31 +0000 Subject: [PATCH] tracing capture interface Signed-off-by: Jason Volk --- Cargo.lock | 1 + Cargo.toml | 2 + src/admin/debug/debug_commands.rs | 12 +---- src/core/Cargo.toml | 1 + src/core/log/capture/data.rs | 31 ++++++++++++ src/core/log/capture/guard.rs | 12 +++++ src/core/log/capture/layer.rs | 82 +++++++++++++++++++++++++++++++ src/core/log/capture/mod.rs | 50 +++++++++++++++++++ src/core/log/capture/state.rs | 35 +++++++++++++ src/core/log/capture/util.rs | 19 +++++++ src/core/log/fmt.rs | 2 +- src/core/log/mod.rs | 16 ++++-- src/core/log/server.rs | 14 ++++++ src/core/server.rs | 10 ++-- src/main/server.rs | 24 ++++++--- 15 files changed, 284 insertions(+), 27 deletions(-) create mode 100644 src/core/log/capture/data.rs create mode 100644 src/core/log/capture/guard.rs create mode 100644 src/core/log/capture/layer.rs create mode 100644 src/core/log/capture/mod.rs create mode 100644 src/core/log/capture/state.rs create mode 100644 src/core/log/capture/util.rs create mode 100644 src/core/log/server.rs diff --git a/Cargo.lock b/Cargo.lock index 64768b17..5b2b9b05 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -701,6 +701,7 @@ dependencies = [ "tikv-jemallocator", "tokio", "tracing", + "tracing-core", "tracing-subscriber", "url", ] diff --git a/Cargo.toml b/Cargo.toml index 63528ef4..217e2434 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -159,6 +159,8 @@ default-features = false [workspace.dependencies.tracing-subscriber] version = "0.3.18" features = ["env-filter"] +[workspace.dependencies.tracing-core] +version = "0.1.32" # for URL previews [workspace.dependencies.webpage] diff --git a/src/admin/debug/debug_commands.rs b/src/admin/debug/debug_commands.rs index bd15ba50..21a1751c 100644 --- a/src/admin/debug/debug_commands.rs +++ b/src/admin/debug/debug_commands.rs @@ -344,11 +344,7 @@ pub(crate) async fn change_log_level( }, }; - match services() - .server - .tracing_reload_handle - .reload(&old_filter_layer) - { + match services().server.log.reload.reload(&old_filter_layer) { Ok(()) => { return Ok(RoomMessageEventContent::text_plain(format!( "Successfully changed log level back to config value {}", @@ -373,11 +369,7 @@ pub(crate) async fn change_log_level( }, }; - match services() - .server - .tracing_reload_handle - .reload(&new_filter_layer) - { + match services().server.log.reload.reload(&new_filter_layer) { Ok(()) => { return Ok(RoomMessageEventContent::text_plain("Successfully changed log level")); }, diff --git a/src/core/Cargo.toml b/src/core/Cargo.toml index c90a03b3..4a694d90 100644 --- a/src/core/Cargo.toml +++ b/src/core/Cargo.toml @@ -97,6 +97,7 @@ tikv-jemalloc-ctl.workspace = true tikv-jemalloc-sys.optional = true tikv-jemalloc-sys.workspace = true tokio.workspace = true +tracing-core.workspace = true tracing-subscriber.workspace = true tracing.workspace = true url.workspace = true diff --git a/src/core/log/capture/data.rs b/src/core/log/capture/data.rs new file mode 100644 index 00000000..ea104f57 --- /dev/null +++ b/src/core/log/capture/data.rs @@ -0,0 +1,31 @@ +use tracing::Level; +use tracing_core::{span::Current, Event}; + +use super::layer::Value; + +pub struct Data<'a> { + pub event: &'a Event<'a>, + pub current: &'a Current, + pub values: Option<&'a mut [Value]>, +} + +impl Data<'_> { + #[must_use] + pub fn level(&self) -> Level { *self.event.metadata().level() } + + #[must_use] + pub fn mod_name(&self) -> &str { self.event.metadata().module_path().unwrap_or_default() } + + #[must_use] + pub fn span_name(&self) -> &str { self.current.metadata().map_or("", |s| s.name()) } + + #[must_use] + pub fn message(&self) -> &str { + self.values + .as_ref() + .expect("values are not composed for a filter") + .iter() + .find(|(k, _)| *k == "message") + .map_or("", |(_, v)| v.as_str()) + } +} diff --git a/src/core/log/capture/guard.rs b/src/core/log/capture/guard.rs new file mode 100644 index 00000000..a126fedc --- /dev/null +++ b/src/core/log/capture/guard.rs @@ -0,0 +1,12 @@ +use std::sync::Arc; + +use super::Capture; + +/// Capture instance scope guard. +pub struct Guard { + pub(super) capture: Arc, +} + +impl Drop for Guard { + fn drop(&mut self) { self.capture.stop(); } +} diff --git a/src/core/log/capture/layer.rs b/src/core/log/capture/layer.rs new file mode 100644 index 00000000..48b125da --- /dev/null +++ b/src/core/log/capture/layer.rs @@ -0,0 +1,82 @@ +use std::{fmt, sync::Arc}; + +use tracing::field::{Field, Visit}; +use tracing_core::{Event, Subscriber}; +use tracing_subscriber::{layer::Context, registry::LookupSpan}; + +use super::{Capture, Data, State}; + +pub type Value = (&'static str, String); + +pub struct Layer { + state: Arc, +} + +struct Visitor { + values: Vec, +} + +impl Layer { + pub fn new(state: &Arc) -> Self { + Self { + state: state.clone(), + } + } +} + +impl fmt::Debug for Layer { + fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + formatter.debug_struct("capture::Layer").finish() + } +} + +impl tracing_subscriber::Layer for Layer +where + S: Subscriber + for<'a> LookupSpan<'a>, +{ + fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) { + self.state + .active + .read() + .expect("shared lock") + .iter() + .filter(|capture| filter(capture, event, &ctx)) + .for_each(|capture| handle(capture, event, &ctx)); + } +} + +fn handle(capture: &Capture, event: &Event<'_>, ctx: &Context<'_, S>) +where + S: Subscriber + for<'a> LookupSpan<'a>, +{ + let mut visitor = Visitor { + values: Vec::new(), + }; + event.record(&mut visitor); + + let mut closure = capture.closure.lock().expect("exclusive lock"); + closure(Data { + event, + current: &ctx.current_span(), + values: Some(&mut visitor.values), + }); +} + +fn filter(capture: &Capture, event: &Event<'_>, ctx: &Context<'_, S>) -> bool +where + S: Subscriber + for<'a> LookupSpan<'a>, +{ + capture.filter.as_ref().map_or(true, |filter| { + filter(Data { + event, + current: &ctx.current_span(), + values: None, + }) + }) +} + +impl Visit for Visitor { + fn record_debug(&mut self, f: &Field, v: &dyn fmt::Debug) { self.values.push((f.name(), format!("{v:?}"))); } + + fn record_str(&mut self, f: &Field, v: &str) { self.values.push((f.name(), v.to_owned())); } +} diff --git a/src/core/log/capture/mod.rs b/src/core/log/capture/mod.rs new file mode 100644 index 00000000..f0495ad9 --- /dev/null +++ b/src/core/log/capture/mod.rs @@ -0,0 +1,50 @@ +pub mod data; +mod guard; +pub mod layer; +pub mod state; +pub mod util; + +use std::sync::{Arc, Mutex}; + +pub use data::Data; +use guard::Guard; +pub use layer::{Layer, Value}; +pub use state::State; +pub use util::to_html; + +pub type Filter = dyn Fn(Data<'_>) -> bool + Send + Sync + 'static; +pub type Closure = dyn FnMut(Data<'_>) + Send + Sync + 'static; + +/// Capture instance state. +pub struct Capture { + state: Arc, + filter: Option>, + closure: Mutex>, +} + +impl Capture { + /// Construct a new capture instance. Capture does not start until the Guard + /// is in scope. + #[must_use] + pub fn new(state: &Arc, filter: Option, closure: C) -> Arc + where + F: Fn(Data<'_>) -> bool + Send + Sync + 'static, + C: FnMut(Data<'_>) + Send + Sync + 'static, + { + Arc::new(Self { + state: state.clone(), + filter: filter.map(|p| -> Box { Box::new(p) }), + closure: Mutex::new(Box::new(closure)), + }) + } + + #[must_use] + pub fn start(self: &Arc) -> Guard { + self.state.add(self); + Guard { + capture: self.clone(), + } + } + + pub fn stop(self: &Arc) { self.state.del(self); } +} diff --git a/src/core/log/capture/state.rs b/src/core/log/capture/state.rs new file mode 100644 index 00000000..f2c401cd --- /dev/null +++ b/src/core/log/capture/state.rs @@ -0,0 +1,35 @@ +use std::sync::{Arc, RwLock}; + +use super::Capture; + +/// Capture layer state. +pub struct State { + pub(super) active: RwLock>>, +} + +impl Default for State { + fn default() -> Self { Self::new() } +} + +impl State { + #[must_use] + pub fn new() -> Self { + Self { + active: RwLock::new(Vec::new()), + } + } + + pub(super) fn add(&self, capture: &Arc) { + self.active + .write() + .expect("locked for writing") + .push(capture.clone()); + } + + pub(super) fn del(&self, capture: &Arc) { + let mut vec = self.active.write().expect("locked for writing"); + if let Some(pos) = vec.iter().position(|v| Arc::ptr_eq(v, capture)) { + vec.swap_remove(pos); + } + } +} diff --git a/src/core/log/capture/util.rs b/src/core/log/capture/util.rs new file mode 100644 index 00000000..54a19b6a --- /dev/null +++ b/src/core/log/capture/util.rs @@ -0,0 +1,19 @@ +use std::sync::{Arc, Mutex}; + +use super::{super::fmt, Closure}; + +pub fn to_html(out: &Arc>) -> Box +where + S: std::fmt::Write + Send + 'static, +{ + let out = out.clone(); + Box::new(move |data| { + fmt::html( + &mut *out.lock().expect("locked"), + &data.level(), + data.span_name(), + data.message(), + ) + .expect("log line appended"); + }) +} diff --git a/src/core/log/fmt.rs b/src/core/log/fmt.rs index d21f3dbc..a2e3a6f7 100644 --- a/src/core/log/fmt.rs +++ b/src/core/log/fmt.rs @@ -11,7 +11,7 @@ where let level = level.as_str().to_uppercase(); write!( out, - "{level:>5} {span:<12} {msg}
" + "{level:>5} {span:^12} {msg}
" )?; Ok(()) diff --git a/src/core/log/mod.rs b/src/core/log/mod.rs index eae82b50..556bf2f8 100644 --- a/src/core/log/mod.rs +++ b/src/core/log/mod.rs @@ -1,13 +1,19 @@ +pub mod capture; pub mod color; pub mod fmt; mod reload; +mod server; -pub use reload::ReloadHandle; -pub use reload::LogLevelReloadHandles; +pub use capture::Capture; +pub use reload::{LogLevelReloadHandles, ReloadHandle}; +pub use server::Server; +pub use tracing::Level; +pub use tracing_core::{Event, Metadata}; -// Wraps for logging macros. Use these macros rather than extern tracing:: or log:: crates in -// project code. ::log and ::tracing can still be used if necessary but discouraged. Remember -// debug_ log macros are also exported to the crate namespace like these. +// Wraps for logging macros. Use these macros rather than extern tracing:: or +// log:: crates in project code. ::log and ::tracing can still be used if +// necessary but discouraged. Remember debug_ log macros are also exported to +// the crate namespace like these. #[macro_export] macro_rules! error { diff --git a/src/core/log/server.rs b/src/core/log/server.rs new file mode 100644 index 00000000..644cecd1 --- /dev/null +++ b/src/core/log/server.rs @@ -0,0 +1,14 @@ +use std::sync::Arc; + +use super::{capture, reload::LogLevelReloadHandles}; + +/// Logging subsystem. This is a singleton member of super::Server which holds +/// all logging and tracing related state rather than shoving it all in +/// super::Server directly. +pub struct Server { + /// General log level reload handles. + pub reload: LogLevelReloadHandles, + + /// Tracing capture state for ephemeral/oneshot uses. + pub capture: Arc, +} diff --git a/src/core/server.rs b/src/core/server.rs index 7be6fa58..95eb7e01 100644 --- a/src/core/server.rs +++ b/src/core/server.rs @@ -5,7 +5,7 @@ use std::{ use tokio::{runtime, sync::broadcast}; -use crate::{config::Config, log::LogLevelReloadHandles}; +use crate::{config::Config, log}; /// Server runtime state; public portion pub struct Server { @@ -29,8 +29,8 @@ pub struct Server { /// Reload/shutdown signal pub signal: broadcast::Sender<&'static str>, - /// Log level reload handles. - pub tracing_reload_handle: LogLevelReloadHandles, + /// Logging subsystem state + pub log: log::Server, /// TODO: move stats pub requests_spawn_active: AtomicU32, @@ -42,7 +42,7 @@ pub struct Server { impl Server { #[must_use] - pub fn new(config: Config, runtime: Option, tracing_reload_handle: LogLevelReloadHandles) -> Self { + pub fn new(config: Config, runtime: Option, log: log::Server) -> Self { Self { config, started: SystemTime::now(), @@ -50,7 +50,7 @@ impl Server { reloading: AtomicBool::new(false), runtime, signal: broadcast::channel::<&'static str>(1).0, - tracing_reload_handle, + log, requests_spawn_active: AtomicU32::new(0), requests_spawn_finished: AtomicU32::new(0), requests_handle_active: AtomicU32::new(0), diff --git a/src/main/server.rs b/src/main/server.rs index b1bccfe9..ac93da17 100644 --- a/src/main/server.rs +++ b/src/main/server.rs @@ -4,7 +4,7 @@ use conduit::{ config, config::Config, info, - log::{LogLevelReloadHandles, ReloadHandle}, + log::{self, capture, LogLevelReloadHandles, ReloadHandle}, utils::{hash, sys}, Error, Result, }; @@ -34,7 +34,7 @@ impl Server { #[cfg(feature = "sentry_telemetry")] let sentry_guard = init_sentry(&config); - let (tracing_reload_handle, tracing_flame_guard) = init_tracing(&config); + let (tracing_reload_handle, tracing_flame_guard, capture) = init_tracing(&config); config.check()?; #[cfg(unix)] @@ -50,7 +50,14 @@ impl Server { ); Ok(Arc::new(Self { - server: Arc::new(conduit::Server::new(config, runtime.cloned(), tracing_reload_handle)), + server: Arc::new(conduit::Server::new( + config, + runtime.cloned(), + log::Server { + reload: tracing_reload_handle, + capture, + }, + )), _tracing_flame_guard: tracing_flame_guard, @@ -100,7 +107,7 @@ type TracingFlameGuard = (); // clippy thinks the filter_layer clones are redundant if the next usage is // behind a disabled feature. #[allow(clippy::redundant_clone)] -fn init_tracing(config: &Config) -> (LogLevelReloadHandles, TracingFlameGuard) { +fn init_tracing(config: &Config) -> (LogLevelReloadHandles, TracingFlameGuard, Arc) { let registry = Registry::default(); let fmt_layer = tracing_subscriber::fmt::Layer::new(); let filter_layer = match EnvFilter::try_new(&config.log) { @@ -122,7 +129,12 @@ fn init_tracing(config: &Config) -> (LogLevelReloadHandles, TracingFlameGuard) { let (fmt_reload_filter, fmt_reload_handle) = reload::Layer::new(filter_layer.clone()); reload_handles.push(Box::new(fmt_reload_handle)); - let subscriber = subscriber.with(fmt_layer.with_filter(fmt_reload_filter)); + + let cap_state = Arc::new(capture::State::new()); + let cap_layer = capture::Layer::new(&cap_state); + let subscriber = subscriber + .with(fmt_layer.with_filter(fmt_reload_filter)) + .with(cap_layer); #[cfg(feature = "sentry_telemetry")] let subscriber = { @@ -185,5 +197,5 @@ fn init_tracing(config: &Config) -> (LogLevelReloadHandles, TracingFlameGuard) { needs access to trace-level events. 'release_max_log_level' must be disabled to use tokio-console." ); - (LogLevelReloadHandles::new(reload_handles), flame_guard) + (LogLevelReloadHandles::new(reload_handles), flame_guard, cap_state) }