Add show_cache_usage admin room command

Signed-off-by: Jonas Zohren <git-pbkyr@jzohren.de>
This commit is contained in:
Jonas Zohren 2021-08-26 15:17:17 +02:00
parent 5aa56b92ee
commit e534199195
3 changed files with 196 additions and 82 deletions

View file

@ -1,3 +1,34 @@
use std::{
collections::{BTreeMap, HashMap, HashSet},
convert::{TryFrom, TryInto},
fs::{self, remove_dir_all},
io::Write,
mem::size_of,
ops::Deref,
path::Path,
sync::{Arc, Mutex, RwLock},
};
use std::hash::Hash;
use directories::ProjectDirs;
use lru_cache::LruCache;
use rocket::{
futures::{channel::mpsc, stream::FuturesUnordered, StreamExt},
outcome::{IntoOutcome, try_outcome},
request::{FromRequest, Request},
Shutdown, State,
};
use ruma::{DeviceId, EventId, RoomId, ServerName, UserId};
use serde::{de::IgnoredAny, Deserialize};
use tokio::sync::{OwnedRwLockReadGuard, RwLock as TokioRwLock, Semaphore};
use tracing::{debug, error, warn};
use abstraction::DatabaseEngine;
use crate::{Error, Result, utils};
use self::proxy::ProxyConfig;
pub mod abstraction;
pub mod account_data;
@ -14,33 +45,6 @@ pub mod transaction_ids;
pub mod uiaa;
pub mod users;
use crate::{utils, Error, Result};
use abstraction::DatabaseEngine;
use directories::ProjectDirs;
use lru_cache::LruCache;
use rocket::{
futures::{channel::mpsc, stream::FuturesUnordered, StreamExt},
outcome::{try_outcome, IntoOutcome},
request::{FromRequest, Request},
Shutdown, State,
};
use ruma::{DeviceId, EventId, RoomId, ServerName, UserId};
use serde::{de::IgnoredAny, Deserialize};
use std::{
collections::{BTreeMap, HashMap, HashSet},
convert::{TryFrom, TryInto},
fs::{self, remove_dir_all},
io::Write,
mem::size_of,
ops::Deref,
path::Path,
sync::{Arc, Mutex, RwLock},
};
use tokio::sync::{OwnedRwLockReadGuard, RwLock as TokioRwLock, Semaphore};
use tracing::{debug, error, warn};
use self::proxy::ProxyConfig;
#[derive(Clone, Debug, Deserialize)]
pub struct Config {
server_name: Box<ServerName>,
@ -132,6 +136,17 @@ pub type Engine = abstraction::sqlite::Engine;
#[cfg(feature = "heed")]
pub type Engine = abstraction::heed::Engine;
// for each key: (memory_usage in bytes, items in cache, capacity)
pub struct CacheUsageStatistics {
pdu_cache: (usize, usize, usize),
auth_chain_cache: (usize, usize, usize),
shorteventid_cache: (usize, usize, usize),
eventidshort_cache: (usize, usize, usize),
statekeyshort_cache: (usize, usize, usize),
shortstatekey_cache: (usize, usize, usize),
stateinfo_cache: (usize, usize, usize),
}
pub struct Database {
_db: Arc<Engine>,
pub globals: globals::Globals,
@ -163,27 +178,27 @@ impl Database {
fn check_sled_or_sqlite_db(config: &Config) -> Result<()> {
#[cfg(feature = "backend_sqlite")]
{
let path = Path::new(&config.database_path);
{
let path = Path::new(&config.database_path);
let sled_exists = path.join("db").exists();
let sqlite_exists = path.join("conduit.db").exists();
if sled_exists {
if sqlite_exists {
// most likely an in-place directory, only warn
warn!("Both sled and sqlite databases are detected in database directory");
warn!("Currently running from the sqlite database, but consider removing sled database files to free up space")
} else {
error!(
let sled_exists = path.join("db").exists();
let sqlite_exists = path.join("conduit.db").exists();
if sled_exists {
if sqlite_exists {
// most likely an in-place directory, only warn
warn!("Both sled and sqlite databases are detected in database directory");
warn!("Currently running from the sqlite database, but consider removing sled database files to free up space")
} else {
error!(
"Sled database detected, conduit now uses sqlite for database operations"
);
error!("This database must be converted to sqlite, go to https://github.com/ShadowJonathan/conduit_toolbox#conduit_sled_to_sqlite");
return Err(Error::bad_config(
"sled database detected, migrate to sqlite",
));
error!("This database must be converted to sqlite, go to https://github.com/ShadowJonathan/conduit_toolbox#conduit_sled_to_sqlite");
return Err(Error::bad_config(
"sled database detected, migrate to sqlite",
));
}
}
}
}
Ok(())
}
@ -728,9 +743,9 @@ impl Database {
drop(guard);
#[cfg(feature = "sqlite")]
{
Self::start_wal_clean_task(Arc::clone(&db), &config).await;
}
{
Self::start_wal_clean_task(Arc::clone(&db), &config).await;
}
Ok(db)
}
@ -881,7 +896,7 @@ impl Database {
tokio::spawn(async move {
let mut i = interval(timer_interval);
#[cfg(unix)]
let mut s = signal(SignalKind::hangup()).unwrap();
let mut s = signal(SignalKind::hangup()).unwrap();
loop {
#[cfg(unix)]
@ -892,12 +907,13 @@ impl Database {
_ = s.recv() => {
info!("wal-trunc: Received SIGHUP");
}
};
#[cfg(not(unix))]
{
i.tick().await;
info!("wal-trunc: Timer ticked")
}
;
#[cfg(not(unix))]
{
i.tick().await;
info!("wal-trunc: Timer ticked")
}
let start = Instant::now();
if let Err(e) = db.read().await.flush_wal() {
@ -908,6 +924,65 @@ impl Database {
}
});
}
/// Measures memory usage in bytes and how full the caches are in percent for all caches in the Database struct.
pub fn get_cache_usage(&mut self) -> Result<CacheUsageStatistics> {
fn memory_usage_of_locked_cache<K: Eq + Hash, V>(cache: &mut Mutex<LruCache<K, V>>) -> usize {
let raw_cache = cache.lock().unwrap();
let mut cache_items_size_sum: usize = 0;
for cache_item in raw_cache.iter() {
cache_items_size_sum += std::mem::size_of_val(&cache_item);
}
cache_items_size_sum += std::mem::size_of_val(&cache);
cache_items_size_sum
}
fn items_in_locked_cache<K: Eq + Hash, V>(cache: &mut Mutex<LruCache<K, V>>) -> usize {
cache.lock().unwrap().len()
}
fn capacity_of_locked_cache<K: Eq + Hash, V>(cache: &mut Mutex<LruCache<K, V>>) -> usize {
cache.lock().unwrap().capacity()
}
return
Ok(CacheUsageStatistics {
pdu_cache: (
memory_usage_of_locked_cache(&mut self.rooms.pdu_cache),
items_in_locked_cache(&mut self.rooms.pdu_cache),
capacity_of_locked_cache(&mut self.rooms.pdu_cache)
),
auth_chain_cache: (
memory_usage_of_locked_cache(&mut self.rooms.auth_chain_cache),
items_in_locked_cache(&mut self.rooms.auth_chain_cache),
capacity_of_locked_cache(&mut self.rooms.auth_chain_cache)
),
shorteventid_cache: (
memory_usage_of_locked_cache(&mut self.rooms.shorteventid_cache),
items_in_locked_cache(&mut self.rooms.shorteventid_cache),
capacity_of_locked_cache(&mut self.rooms.shorteventid_cache)
),
eventidshort_cache: (
memory_usage_of_locked_cache(&mut self.rooms.eventidshort_cache),
items_in_locked_cache(&mut self.rooms.eventidshort_cache),
capacity_of_locked_cache(&mut self.rooms.eventidshort_cache)
),
statekeyshort_cache: (
memory_usage_of_locked_cache(&mut self.rooms.statekeyshort_cache),
items_in_locked_cache(&mut self.rooms.statekeyshort_cache),
capacity_of_locked_cache(&mut self.rooms.statekeyshort_cache)
),
shortstatekey_cache: (
memory_usage_of_locked_cache(&mut self.rooms.shortstatekey_cache),
items_in_locked_cache(&mut self.rooms.shortstatekey_cache),
capacity_of_locked_cache(&mut self.rooms.shortstatekey_cache)
),
stateinfo_cache: (
memory_usage_of_locked_cache(&mut self.rooms.stateinfo_cache),
items_in_locked_cache(&mut self.rooms.stateinfo_cache),
capacity_of_locked_cache(&mut self.rooms.stateinfo_cache)
),
});
}
}
pub struct DatabaseGuard(OwnedRwLockReadGuard<Database>);

View file

@ -3,19 +3,21 @@ use std::{
sync::Arc,
};
use crate::{pdu::PduBuilder, Database};
use rocket::futures::{channel::mpsc, stream::StreamExt};
use ruma::{
events::{room::message, EventType},
events::{EventType, room::message},
UserId,
};
use tokio::sync::{MutexGuard, RwLock, RwLockReadGuard};
use tokio::sync::{MutexGuard, RwLock, RwLockWriteGuard};
use tracing::warn;
use crate::{Database, pdu::PduBuilder};
pub enum AdminCommand {
RegisterAppservice(serde_yaml::Value),
ListAppservices,
SendMessage(message::MessageEventContent),
ShowCacheUsage,
}
#[derive(Clone)]
@ -59,7 +61,7 @@ impl Admin {
drop(guard);
let send_message = |message: message::MessageEventContent,
guard: RwLockReadGuard<'_, Database>,
guard: RwLockWriteGuard<'_, Database>,
mutex_lock: &MutexGuard<'_, ()>| {
guard
.rooms
@ -83,7 +85,7 @@ impl Admin {
loop {
tokio::select! {
Some(event) = receiver.next() => {
let guard = db.read().await;
let mut guard = db.write().await;
let mutex_state = Arc::clone(
guard.globals
.roomid_mutex_state
@ -92,6 +94,7 @@ impl Admin {
.entry(conduit_room.clone())
.or_default(),
);
let state_lock = mutex_state.lock().await;
match event {
@ -114,6 +117,37 @@ impl Admin {
AdminCommand::SendMessage(message) => {
send_message(message, guard, &state_lock);
}
AdminCommand::ShowCacheUsage => {
fn format_cache_statistics_triple(name: String, triple: (usize, usize, usize)) -> String {
let (memory_usage, item_count, capacity) = triple;
format!(
"{0} is using {1} MB ({2} bytes) of RAM at {3:.2}% utilization.",
name,
memory_usage / 100_00,
memory_usage ,
((item_count as f32 / capacity as f32) * 100.0)
)
}
if let Ok(cache_usage_statistics) = guard.get_cache_usage() {
let mut statistics_lines = Vec::with_capacity(7);
statistics_lines.push(format_cache_statistics_triple("pdu_cache".to_string(), cache_usage_statistics.pdu_cache));
statistics_lines.push(format_cache_statistics_triple("auth_chain_cache".to_string(), cache_usage_statistics.auth_chain_cache));
statistics_lines.push(format_cache_statistics_triple("shorteventid_cache".to_string(), cache_usage_statistics.shorteventid_cache));
statistics_lines.push(format_cache_statistics_triple("eventidshort_cache".to_string(), cache_usage_statistics.eventidshort_cache));
statistics_lines.push(format_cache_statistics_triple("statekeyshort_cache".to_string(), cache_usage_statistics.statekeyshort_cache));
statistics_lines.push(format_cache_statistics_triple("shortstatekey_cache".to_string(), cache_usage_statistics.shortstatekey_cache));
statistics_lines.push(format_cache_statistics_triple("stateinfo_cache".to_string(), cache_usage_statistics.stateinfo_cache));
send_message(message::MessageEventContent::text_plain(statistics_lines.join("\n")), guard, &state_lock);
} else {
let result_text = "Could not calculate database cache size";
send_message(message::MessageEventContent::text_plain(result_text), guard, &state_lock);
}
}
}
drop(state_lock);

View file

@ -1,38 +1,40 @@
mod edus;
pub use edus::RoomEdus;
use member::MembershipState;
use crate::{pdu::PduBuilder, utils, Database, Error, PduEvent, Result};
use lru_cache::LruCache;
use regex::Regex;
use ring::digest;
use rocket::http::RawStr;
use ruma::{
api::{client::error::ErrorKind, federation},
events::{
ignored_user_list, push_rules,
room::{
create::CreateEventContent, member, message, power_levels::PowerLevelsEventContent,
},
AnyStrippedStateEvent, AnySyncStateEvent, EventType,
},
push::{self, Action, Tweak},
serde::{CanonicalJsonObject, CanonicalJsonValue, Raw},
state_res::{self, RoomVersion, StateMap},
uint, EventId, RoomAliasId, RoomId, RoomVersionId, ServerName, UserId,
};
use std::{
collections::{BTreeMap, HashMap, HashSet},
convert::{TryFrom, TryInto},
mem::size_of,
sync::{Arc, Mutex},
};
use lru_cache::LruCache;
use member::MembershipState;
use regex::Regex;
use ring::digest;
use rocket::http::RawStr;
use ruma::{
api::{client::error::ErrorKind, federation},
EventId,
events::{
AnyStrippedStateEvent, AnySyncStateEvent,
EventType,
ignored_user_list, push_rules, room::{
create::CreateEventContent, member, message, power_levels::PowerLevelsEventContent,
},
},
push::{self, Action, Tweak},
RoomAliasId,
RoomId, RoomVersionId, serde::{CanonicalJsonObject, CanonicalJsonValue, Raw}, ServerName, state_res::{self, RoomVersion, StateMap}, uint, UserId,
};
use tokio::sync::MutexGuard;
use tracing::{error, warn};
pub use edus::RoomEdus;
use crate::{Database, Error, pdu::PduBuilder, PduEvent, Result, utils};
use super::{abstraction::Tree, admin::AdminCommand, pusher};
mod edus;
/// The unique identifier of each state group.
///
/// This is created when a state group is added to the database by
@ -1563,10 +1565,13 @@ impl Rooms {
));
}
}
"show_cache_usage" => {
db.admin.send(AdminCommand::ShowCacheUsage);
}
_ => {
db.admin.send(AdminCommand::SendMessage(
message::MessageEventContent::text_plain(format!(
"Unrecognized command: {}",
"Unrecognized command: {}\nAvailable commands:\n- register_appservice\n- list_appservices\n- get_pdu\n- show_cache_usage",
command
)),
));