From 6d5e54a66b96ab504eeb6cca03499fb03761dcb6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Sun, 18 Dec 2022 06:37:03 +0100 Subject: [PATCH] fix: jaeger support --- Cargo.lock | 31 +++++++++++++++++++ Cargo.toml | 1 + src/api/client_server/sync.rs | 2 +- .../key_value/rooms/state_accessor.rs | 9 ++---- src/main.rs | 24 +++++++++++--- src/service/rooms/auth_chain/mod.rs | 1 - src/service/rooms/event_handler/mod.rs | 10 +++--- src/service/rooms/state_accessor/data.rs | 7 ++--- src/service/rooms/state_accessor/mod.rs | 9 ++---- 9 files changed, 65 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a659dec0..bb5943a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -419,6 +419,7 @@ dependencies = [ "tower-http", "tracing", "tracing-flame", + "tracing-opentelemetry", "tracing-subscriber", "trust-dns-resolver", ] @@ -574,6 +575,19 @@ dependencies = [ "zeroize", ] +[[package]] +name = "dashmap" +version = "5.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc" +dependencies = [ + "cfg-if", + "hashbrown", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "data-encoding" version = "2.3.2" @@ -1573,6 +1587,7 @@ version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c24f96e21e7acc813c7a8394ee94978929db2bcc46cf6b5014fc612bf7760c22" dependencies = [ + "fnv", "futures-channel", "futures-util", "indexmap", @@ -1590,6 +1605,8 @@ checksum = "1ca41c4933371b61c2a2f214bf16931499af4ec90543604ec828f7a625c09113" dependencies = [ "async-trait", "crossbeam-channel", + "dashmap", + "fnv", "futures-channel", "futures-executor", "futures-util", @@ -2891,6 +2908,20 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21ebb87a95ea13271332df069020513ab70bdb5637ca42d6e492dc3bbbad48de" +dependencies = [ + "once_cell", + "opentelemetry", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", +] + [[package]] name = "tracing-subscriber" version = "0.3.16" diff --git a/Cargo.toml b/Cargo.toml index 87102c0c..737799d3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,6 +69,7 @@ tracing-subscriber = { version = "0.3.16", features = ["env-filter"] } tracing-flame = "0.2.0" opentelemetry = { version = "0.18.0", features = ["rt-tokio"] } opentelemetry-jaeger = { version = "0.17.0", features = ["rt-tokio"] } +tracing-opentelemetry = "0.18.0" lru-cache = "0.1.2" rusqlite = { version = "0.28.0", optional = true, features = ["bundled"] } parking_lot = { version = "0.12.1", optional = true } diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs index 43ca238a..568a23ce 100644 --- a/src/api/client_server/sync.rs +++ b/src/api/client_server/sync.rs @@ -873,7 +873,7 @@ async fn sync_helper( let since_state_ids = match since_shortstatehash { Some(s) => services().rooms.state_accessor.state_full_ids(s).await?, - None => BTreeMap::new(), + None => HashMap::new(), }; let left_event_id = match services().rooms.state_accessor.room_state_get_id( diff --git a/src/database/key_value/rooms/state_accessor.rs b/src/database/key_value/rooms/state_accessor.rs index 70e59acb..0f0c0dc7 100644 --- a/src/database/key_value/rooms/state_accessor.rs +++ b/src/database/key_value/rooms/state_accessor.rs @@ -1,7 +1,4 @@ -use std::{ - collections::{BTreeMap, HashMap}, - sync::Arc, -}; +use std::{collections::HashMap, sync::Arc}; use crate::{database::KeyValueDatabase, service, services, utils, Error, PduEvent, Result}; use async_trait::async_trait; @@ -9,7 +6,7 @@ use ruma::{events::StateEventType, EventId, RoomId}; #[async_trait] impl service::rooms::state_accessor::Data for KeyValueDatabase { - async fn state_full_ids(&self, shortstatehash: u64) -> Result>> { + async fn state_full_ids(&self, shortstatehash: u64) -> Result>> { let full_state = services() .rooms .state_compressor @@ -17,7 +14,7 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase { .pop() .expect("there is always one layer") .1; - let mut result = BTreeMap::new(); + let mut result = HashMap::new(); let mut i = 0; for compressed in full_state.into_iter() { let parsed = services() diff --git a/src/main.rs b/src/main.rs index 013c4de5..fa33c094 100644 --- a/src/main.rs +++ b/src/main.rs @@ -26,7 +26,6 @@ use http::{ header::{self, HeaderName}, Method, StatusCode, Uri, }; -use opentelemetry::trace::{FutureExt, Tracer}; use ruma::api::{ client::{ error::{Error as RumaError, ErrorBody, ErrorKind}, @@ -93,14 +92,29 @@ async fn main() { if config.allow_jaeger { opentelemetry::global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); let tracer = opentelemetry_jaeger::new_agent_pipeline() + .with_auto_split_batch(true) + .with_service_name("conduit") .install_batch(opentelemetry::runtime::Tokio) .unwrap(); + let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); - let span = tracer.start("conduit"); - start.with_current_context().await; - drop(span); + let filter_layer = match EnvFilter::try_new(&config.log) { + Ok(s) => s, + Err(e) => { + eprintln!( + "It looks like your log config is invalid. The following error occurred: {}", + e + ); + EnvFilter::try_new("warn").unwrap() + } + }; - println!("exporting"); + let subscriber = tracing_subscriber::Registry::default() + .with(filter_layer) + .with(telemetry); + tracing::subscriber::set_global_default(subscriber).unwrap(); + start.await; + println!("exporting remaining spans"); opentelemetry::global::shutdown_tracer_provider(); } else { let registry = tracing_subscriber::Registry::default(); diff --git a/src/service/rooms/auth_chain/mod.rs b/src/service/rooms/auth_chain/mod.rs index d3b6e401..39636045 100644 --- a/src/service/rooms/auth_chain/mod.rs +++ b/src/service/rooms/auth_chain/mod.rs @@ -15,7 +15,6 @@ pub struct Service { } impl Service { - #[tracing::instrument(skip(self))] pub fn get_cached_eventid_authchain<'a>( &'a self, key: &[u64], diff --git a/src/service/rooms/event_handler/mod.rs b/src/service/rooms/event_handler/mod.rs index 3c49349f..0bba61c6 100644 --- a/src/service/rooms/event_handler/mod.rs +++ b/src/service/rooms/event_handler/mod.rs @@ -7,7 +7,7 @@ use ruma::{ RoomVersionId, }; use std::{ - collections::{btree_map, hash_map, BTreeMap, HashMap, HashSet}, + collections::{hash_map, BTreeMap, HashMap, HashSet}, pin::Pin, sync::{Arc, RwLock, RwLockWriteGuard}, time::{Duration, Instant, SystemTime}, @@ -553,7 +553,7 @@ impl Service { let mut auth_chain_sets = Vec::with_capacity(extremity_sstatehashes.len()); for (sstatehash, prev_event) in extremity_sstatehashes { - let mut leaf_state: BTreeMap<_, _> = services() + let mut leaf_state: HashMap<_, _> = services() .rooms .state_accessor .state_full_ids(sstatehash) @@ -660,7 +660,7 @@ impl Service { ) .await; - let mut state: BTreeMap<_, Arc> = BTreeMap::new(); + let mut state: HashMap<_, Arc> = HashMap::new(); for (pdu, _) in state_vec { let state_key = pdu.state_key.clone().ok_or_else(|| { Error::bad_database("Found non-state pdu in state events.") @@ -672,10 +672,10 @@ impl Service { )?; match state.entry(shortstatekey) { - btree_map::Entry::Vacant(v) => { + hash_map::Entry::Vacant(v) => { v.insert(Arc::from(&*pdu.event_id)); } - btree_map::Entry::Occupied(_) => return Err( + hash_map::Entry::Occupied(_) => return Err( Error::bad_database("State event's type and state_key combination exists multiple times."), ), } diff --git a/src/service/rooms/state_accessor/data.rs b/src/service/rooms/state_accessor/data.rs index 340b19c3..f3ae3c21 100644 --- a/src/service/rooms/state_accessor/data.rs +++ b/src/service/rooms/state_accessor/data.rs @@ -1,7 +1,4 @@ -use std::{ - collections::{BTreeMap, HashMap}, - sync::Arc, -}; +use std::{collections::HashMap, sync::Arc}; use async_trait::async_trait; use ruma::{events::StateEventType, EventId, RoomId}; @@ -12,7 +9,7 @@ use crate::{PduEvent, Result}; pub trait Data: Send + Sync { /// Builds a StateMap by iterating over all keys that start /// with state_hash, this gives the full state for the given state_hash. - async fn state_full_ids(&self, shortstatehash: u64) -> Result>>; + async fn state_full_ids(&self, shortstatehash: u64) -> Result>>; async fn state_full( &self, diff --git a/src/service/rooms/state_accessor/mod.rs b/src/service/rooms/state_accessor/mod.rs index 1a9c4a9e..87d99368 100644 --- a/src/service/rooms/state_accessor/mod.rs +++ b/src/service/rooms/state_accessor/mod.rs @@ -1,8 +1,5 @@ mod data; -use std::{ - collections::{BTreeMap, HashMap}, - sync::Arc, -}; +use std::{collections::HashMap, sync::Arc}; pub use data::Data; use ruma::{events::StateEventType, EventId, RoomId}; @@ -16,7 +13,8 @@ pub struct Service { impl Service { /// Builds a StateMap by iterating over all keys that start /// with state_hash, this gives the full state for the given state_hash. - pub async fn state_full_ids(&self, shortstatehash: u64) -> Result>> { + #[tracing::instrument(skip(self))] + pub async fn state_full_ids(&self, shortstatehash: u64) -> Result>> { self.db.state_full_ids(shortstatehash).await } @@ -39,7 +37,6 @@ impl Service { } /// Returns a single PDU from `room_id` with key (`event_type`, `state_key`). - #[tracing::instrument(skip(self))] pub fn state_get( &self, shortstatehash: u64,