tracing capture interface

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-06-11 01:26:31 +00:00
parent 1bb4021b90
commit aa34021b27
15 changed files with 284 additions and 27 deletions

1
Cargo.lock generated
View file

@ -701,6 +701,7 @@ dependencies = [
"tikv-jemallocator", "tikv-jemallocator",
"tokio", "tokio",
"tracing", "tracing",
"tracing-core",
"tracing-subscriber", "tracing-subscriber",
"url", "url",
] ]

View file

@ -159,6 +159,8 @@ default-features = false
[workspace.dependencies.tracing-subscriber] [workspace.dependencies.tracing-subscriber]
version = "0.3.18" version = "0.3.18"
features = ["env-filter"] features = ["env-filter"]
[workspace.dependencies.tracing-core]
version = "0.1.32"
# for URL previews # for URL previews
[workspace.dependencies.webpage] [workspace.dependencies.webpage]

View file

@ -344,11 +344,7 @@ pub(crate) async fn change_log_level(
}, },
}; };
match services() match services().server.log.reload.reload(&old_filter_layer) {
.server
.tracing_reload_handle
.reload(&old_filter_layer)
{
Ok(()) => { Ok(()) => {
return Ok(RoomMessageEventContent::text_plain(format!( return Ok(RoomMessageEventContent::text_plain(format!(
"Successfully changed log level back to config value {}", "Successfully changed log level back to config value {}",
@ -373,11 +369,7 @@ pub(crate) async fn change_log_level(
}, },
}; };
match services() match services().server.log.reload.reload(&new_filter_layer) {
.server
.tracing_reload_handle
.reload(&new_filter_layer)
{
Ok(()) => { Ok(()) => {
return Ok(RoomMessageEventContent::text_plain("Successfully changed log level")); return Ok(RoomMessageEventContent::text_plain("Successfully changed log level"));
}, },

View file

@ -97,6 +97,7 @@ tikv-jemalloc-ctl.workspace = true
tikv-jemalloc-sys.optional = true tikv-jemalloc-sys.optional = true
tikv-jemalloc-sys.workspace = true tikv-jemalloc-sys.workspace = true
tokio.workspace = true tokio.workspace = true
tracing-core.workspace = true
tracing-subscriber.workspace = true tracing-subscriber.workspace = true
tracing.workspace = true tracing.workspace = true
url.workspace = true url.workspace = true

View file

@ -0,0 +1,31 @@
use tracing::Level;
use tracing_core::{span::Current, Event};
use super::layer::Value;
pub struct Data<'a> {
pub event: &'a Event<'a>,
pub current: &'a Current,
pub values: Option<&'a mut [Value]>,
}
impl Data<'_> {
#[must_use]
pub fn level(&self) -> Level { *self.event.metadata().level() }
#[must_use]
pub fn mod_name(&self) -> &str { self.event.metadata().module_path().unwrap_or_default() }
#[must_use]
pub fn span_name(&self) -> &str { self.current.metadata().map_or("", |s| s.name()) }
#[must_use]
pub fn message(&self) -> &str {
self.values
.as_ref()
.expect("values are not composed for a filter")
.iter()
.find(|(k, _)| *k == "message")
.map_or("", |(_, v)| v.as_str())
}
}

View file

@ -0,0 +1,12 @@
use std::sync::Arc;
use super::Capture;
/// Capture instance scope guard.
pub struct Guard {
pub(super) capture: Arc<Capture>,
}
impl Drop for Guard {
fn drop(&mut self) { self.capture.stop(); }
}

View file

@ -0,0 +1,82 @@
use std::{fmt, sync::Arc};
use tracing::field::{Field, Visit};
use tracing_core::{Event, Subscriber};
use tracing_subscriber::{layer::Context, registry::LookupSpan};
use super::{Capture, Data, State};
pub type Value = (&'static str, String);
pub struct Layer {
state: Arc<State>,
}
struct Visitor {
values: Vec<Value>,
}
impl Layer {
pub fn new(state: &Arc<State>) -> Self {
Self {
state: state.clone(),
}
}
}
impl fmt::Debug for Layer {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter.debug_struct("capture::Layer").finish()
}
}
impl<S> tracing_subscriber::Layer<S> for Layer
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
self.state
.active
.read()
.expect("shared lock")
.iter()
.filter(|capture| filter(capture, event, &ctx))
.for_each(|capture| handle(capture, event, &ctx));
}
}
fn handle<S>(capture: &Capture, event: &Event<'_>, ctx: &Context<'_, S>)
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
let mut visitor = Visitor {
values: Vec::new(),
};
event.record(&mut visitor);
let mut closure = capture.closure.lock().expect("exclusive lock");
closure(Data {
event,
current: &ctx.current_span(),
values: Some(&mut visitor.values),
});
}
fn filter<S>(capture: &Capture, event: &Event<'_>, ctx: &Context<'_, S>) -> bool
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
capture.filter.as_ref().map_or(true, |filter| {
filter(Data {
event,
current: &ctx.current_span(),
values: None,
})
})
}
impl Visit for Visitor {
fn record_debug(&mut self, f: &Field, v: &dyn fmt::Debug) { self.values.push((f.name(), format!("{v:?}"))); }
fn record_str(&mut self, f: &Field, v: &str) { self.values.push((f.name(), v.to_owned())); }
}

View file

@ -0,0 +1,50 @@
pub mod data;
mod guard;
pub mod layer;
pub mod state;
pub mod util;
use std::sync::{Arc, Mutex};
pub use data::Data;
use guard::Guard;
pub use layer::{Layer, Value};
pub use state::State;
pub use util::to_html;
pub type Filter = dyn Fn(Data<'_>) -> bool + Send + Sync + 'static;
pub type Closure = dyn FnMut(Data<'_>) + Send + Sync + 'static;
/// Capture instance state.
pub struct Capture {
state: Arc<State>,
filter: Option<Box<Filter>>,
closure: Mutex<Box<Closure>>,
}
impl Capture {
/// Construct a new capture instance. Capture does not start until the Guard
/// is in scope.
#[must_use]
pub fn new<F, C>(state: &Arc<State>, filter: Option<F>, closure: C) -> Arc<Self>
where
F: Fn(Data<'_>) -> bool + Send + Sync + 'static,
C: FnMut(Data<'_>) + Send + Sync + 'static,
{
Arc::new(Self {
state: state.clone(),
filter: filter.map(|p| -> Box<Filter> { Box::new(p) }),
closure: Mutex::new(Box::new(closure)),
})
}
#[must_use]
pub fn start(self: &Arc<Self>) -> Guard {
self.state.add(self);
Guard {
capture: self.clone(),
}
}
pub fn stop(self: &Arc<Self>) { self.state.del(self); }
}

View file

@ -0,0 +1,35 @@
use std::sync::{Arc, RwLock};
use super::Capture;
/// Capture layer state.
pub struct State {
pub(super) active: RwLock<Vec<Arc<Capture>>>,
}
impl Default for State {
fn default() -> Self { Self::new() }
}
impl State {
#[must_use]
pub fn new() -> Self {
Self {
active: RwLock::new(Vec::new()),
}
}
pub(super) fn add(&self, capture: &Arc<Capture>) {
self.active
.write()
.expect("locked for writing")
.push(capture.clone());
}
pub(super) fn del(&self, capture: &Arc<Capture>) {
let mut vec = self.active.write().expect("locked for writing");
if let Some(pos) = vec.iter().position(|v| Arc::ptr_eq(v, capture)) {
vec.swap_remove(pos);
}
}
}

View file

@ -0,0 +1,19 @@
use std::sync::{Arc, Mutex};
use super::{super::fmt, Closure};
pub fn to_html<S>(out: &Arc<Mutex<S>>) -> Box<Closure>
where
S: std::fmt::Write + Send + 'static,
{
let out = out.clone();
Box::new(move |data| {
fmt::html(
&mut *out.lock().expect("locked"),
&data.level(),
data.span_name(),
data.message(),
)
.expect("log line appended");
})
}

View file

@ -11,7 +11,7 @@ where
let level = level.as_str().to_uppercase(); let level = level.as_str().to_uppercase();
write!( write!(
out, out,
"<font data-mx-color=\"{color}\"><code>{level:>5}</code></font> <code>{span:<12}</code> <code>{msg}</code><br>" "<font data-mx-color=\"{color}\"><code>{level:>5}</code></font> <code>{span:^12}</code> <code>{msg}</code><br>"
)?; )?;
Ok(()) Ok(())

View file

@ -1,13 +1,19 @@
pub mod capture;
pub mod color; pub mod color;
pub mod fmt; pub mod fmt;
mod reload; mod reload;
mod server;
pub use reload::ReloadHandle; pub use capture::Capture;
pub use reload::LogLevelReloadHandles; pub use reload::{LogLevelReloadHandles, ReloadHandle};
pub use server::Server;
pub use tracing::Level;
pub use tracing_core::{Event, Metadata};
// Wraps for logging macros. Use these macros rather than extern tracing:: or log:: crates in // Wraps for logging macros. Use these macros rather than extern tracing:: or
// project code. ::log and ::tracing can still be used if necessary but discouraged. Remember // log:: crates in project code. ::log and ::tracing can still be used if
// debug_ log macros are also exported to the crate namespace like these. // necessary but discouraged. Remember debug_ log macros are also exported to
// the crate namespace like these.
#[macro_export] #[macro_export]
macro_rules! error { macro_rules! error {

14
src/core/log/server.rs Normal file
View file

@ -0,0 +1,14 @@
use std::sync::Arc;
use super::{capture, reload::LogLevelReloadHandles};
/// Logging subsystem. This is a singleton member of super::Server which holds
/// all logging and tracing related state rather than shoving it all in
/// super::Server directly.
pub struct Server {
/// General log level reload handles.
pub reload: LogLevelReloadHandles,
/// Tracing capture state for ephemeral/oneshot uses.
pub capture: Arc<capture::State>,
}

View file

@ -5,7 +5,7 @@ use std::{
use tokio::{runtime, sync::broadcast}; use tokio::{runtime, sync::broadcast};
use crate::{config::Config, log::LogLevelReloadHandles}; use crate::{config::Config, log};
/// Server runtime state; public portion /// Server runtime state; public portion
pub struct Server { pub struct Server {
@ -29,8 +29,8 @@ pub struct Server {
/// Reload/shutdown signal /// Reload/shutdown signal
pub signal: broadcast::Sender<&'static str>, pub signal: broadcast::Sender<&'static str>,
/// Log level reload handles. /// Logging subsystem state
pub tracing_reload_handle: LogLevelReloadHandles, pub log: log::Server,
/// TODO: move stats /// TODO: move stats
pub requests_spawn_active: AtomicU32, pub requests_spawn_active: AtomicU32,
@ -42,7 +42,7 @@ pub struct Server {
impl Server { impl Server {
#[must_use] #[must_use]
pub fn new(config: Config, runtime: Option<runtime::Handle>, tracing_reload_handle: LogLevelReloadHandles) -> Self { pub fn new(config: Config, runtime: Option<runtime::Handle>, log: log::Server) -> Self {
Self { Self {
config, config,
started: SystemTime::now(), started: SystemTime::now(),
@ -50,7 +50,7 @@ impl Server {
reloading: AtomicBool::new(false), reloading: AtomicBool::new(false),
runtime, runtime,
signal: broadcast::channel::<&'static str>(1).0, signal: broadcast::channel::<&'static str>(1).0,
tracing_reload_handle, log,
requests_spawn_active: AtomicU32::new(0), requests_spawn_active: AtomicU32::new(0),
requests_spawn_finished: AtomicU32::new(0), requests_spawn_finished: AtomicU32::new(0),
requests_handle_active: AtomicU32::new(0), requests_handle_active: AtomicU32::new(0),

View file

@ -4,7 +4,7 @@ use conduit::{
config, config,
config::Config, config::Config,
info, info,
log::{LogLevelReloadHandles, ReloadHandle}, log::{self, capture, LogLevelReloadHandles, ReloadHandle},
utils::{hash, sys}, utils::{hash, sys},
Error, Result, Error, Result,
}; };
@ -34,7 +34,7 @@ impl Server {
#[cfg(feature = "sentry_telemetry")] #[cfg(feature = "sentry_telemetry")]
let sentry_guard = init_sentry(&config); let sentry_guard = init_sentry(&config);
let (tracing_reload_handle, tracing_flame_guard) = init_tracing(&config); let (tracing_reload_handle, tracing_flame_guard, capture) = init_tracing(&config);
config.check()?; config.check()?;
#[cfg(unix)] #[cfg(unix)]
@ -50,7 +50,14 @@ impl Server {
); );
Ok(Arc::new(Self { Ok(Arc::new(Self {
server: Arc::new(conduit::Server::new(config, runtime.cloned(), tracing_reload_handle)), server: Arc::new(conduit::Server::new(
config,
runtime.cloned(),
log::Server {
reload: tracing_reload_handle,
capture,
},
)),
_tracing_flame_guard: tracing_flame_guard, _tracing_flame_guard: tracing_flame_guard,
@ -100,7 +107,7 @@ type TracingFlameGuard = ();
// clippy thinks the filter_layer clones are redundant if the next usage is // clippy thinks the filter_layer clones are redundant if the next usage is
// behind a disabled feature. // behind a disabled feature.
#[allow(clippy::redundant_clone)] #[allow(clippy::redundant_clone)]
fn init_tracing(config: &Config) -> (LogLevelReloadHandles, TracingFlameGuard) { fn init_tracing(config: &Config) -> (LogLevelReloadHandles, TracingFlameGuard, Arc<capture::State>) {
let registry = Registry::default(); let registry = Registry::default();
let fmt_layer = tracing_subscriber::fmt::Layer::new(); let fmt_layer = tracing_subscriber::fmt::Layer::new();
let filter_layer = match EnvFilter::try_new(&config.log) { let filter_layer = match EnvFilter::try_new(&config.log) {
@ -122,7 +129,12 @@ fn init_tracing(config: &Config) -> (LogLevelReloadHandles, TracingFlameGuard) {
let (fmt_reload_filter, fmt_reload_handle) = reload::Layer::new(filter_layer.clone()); let (fmt_reload_filter, fmt_reload_handle) = reload::Layer::new(filter_layer.clone());
reload_handles.push(Box::new(fmt_reload_handle)); reload_handles.push(Box::new(fmt_reload_handle));
let subscriber = subscriber.with(fmt_layer.with_filter(fmt_reload_filter));
let cap_state = Arc::new(capture::State::new());
let cap_layer = capture::Layer::new(&cap_state);
let subscriber = subscriber
.with(fmt_layer.with_filter(fmt_reload_filter))
.with(cap_layer);
#[cfg(feature = "sentry_telemetry")] #[cfg(feature = "sentry_telemetry")]
let subscriber = { let subscriber = {
@ -185,5 +197,5 @@ fn init_tracing(config: &Config) -> (LogLevelReloadHandles, TracingFlameGuard) {
needs access to trace-level events. 'release_max_log_level' must be disabled to use tokio-console." needs access to trace-level events. 'release_max_log_level' must be disabled to use tokio-console."
); );
(LogLevelReloadHandles::new(reload_handles), flame_guard) (LogLevelReloadHandles::new(reload_handles), flame_guard, cap_state)
} }