From 865b5d72416e2b500db903c65010a44ed1418736 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Sat, 6 Apr 2024 01:24:08 -0700 Subject: [PATCH] reorganize database crate. split database Cork into unit. split database migrations from mod.rs Signed-off-by: Jason Volk --- src/database/abstraction.rs | 125 --- src/database/cork.rs | 32 + src/database/key_value/globals.rs | 2 +- src/database/kvengine.rs | 38 + src/database/kvtree.rs | 60 ++ src/database/migrations.rs | 640 +++++++++++++++ src/database/mod.rs | 761 ++---------------- .../rocksdb.rs => rocksdb/mod.rs} | 0 .../{abstraction/sqlite.rs => sqlite/mod.rs} | 0 src/database/{abstraction => }/watchers.rs | 0 src/service/globals/data.rs | 2 +- 11 files changed, 849 insertions(+), 811 deletions(-) delete mode 100644 src/database/abstraction.rs create mode 100644 src/database/cork.rs create mode 100644 src/database/kvengine.rs create mode 100644 src/database/kvtree.rs create mode 100644 src/database/migrations.rs rename src/database/{abstraction/rocksdb.rs => rocksdb/mod.rs} (100%) rename src/database/{abstraction/sqlite.rs => sqlite/mod.rs} (100%) rename src/database/{abstraction => }/watchers.rs (100%) diff --git a/src/database/abstraction.rs b/src/database/abstraction.rs deleted file mode 100644 index cd3efa6f..00000000 --- a/src/database/abstraction.rs +++ /dev/null @@ -1,125 +0,0 @@ -use std::{error::Error, future::Future, pin::Pin, sync::Arc}; - -use super::Config; -use crate::Result; - -#[cfg(feature = "sqlite")] -pub mod sqlite; - -#[cfg(feature = "rocksdb")] -pub(crate) mod rocksdb; - -#[cfg(any(feature = "sqlite", feature = "rocksdb"))] -pub(crate) mod watchers; - -pub(crate) trait KeyValueDatabaseEngine: Send + Sync { - fn open(config: &Config) -> Result - where - Self: Sized; - fn open_tree(&self, name: &'static str) -> Result>; - fn flush(&self) -> Result<()>; - #[allow(dead_code)] - fn sync(&self) -> Result<()> { Ok(()) } - fn cork(&self) -> Result<()> { Ok(()) } - fn uncork(&self) -> Result<()> { Ok(()) } - fn corked(&self) -> bool { false } - fn cleanup(&self) -> Result<()> { Ok(()) } - fn memory_usage(&self) -> Result { - Ok("Current database engine does not support memory usage reporting.".to_owned()) - } - - #[allow(dead_code)] - fn clear_caches(&self) {} - - fn backup(&self) -> Result<(), Box> { unimplemented!() } - - fn backup_list(&self) -> Result { Ok(String::new()) } - - fn file_list(&self) -> Result { Ok(String::new()) } -} - -pub(crate) trait KvTree: Send + Sync { - fn get(&self, key: &[u8]) -> Result>>; - - #[allow(dead_code)] - #[cfg(feature = "rocksdb")] - fn multi_get( - &self, _iter: Vec<(&Arc>, Vec)>, - ) -> Vec>, rust_rocksdb::Error>> { - unimplemented!() - } - - fn insert(&self, key: &[u8], value: &[u8]) -> Result<()>; - fn insert_batch(&self, iter: &mut dyn Iterator, Vec)>) -> Result<()> { - for (key, value) in iter { - self.insert(&key, &value)?; - } - - Ok(()) - } - - fn remove(&self, key: &[u8]) -> Result<()>; - - #[allow(dead_code)] - fn remove_batch(&self, iter: &mut dyn Iterator>) -> Result<()> { - for key in iter { - self.remove(&key)?; - } - - Ok(()) - } - - fn iter<'a>(&'a self) -> Box, Vec)> + 'a>; - - fn iter_from<'a>(&'a self, from: &[u8], backwards: bool) -> Box, Vec)> + 'a>; - - fn increment(&self, key: &[u8]) -> Result>; - fn increment_batch(&self, iter: &mut dyn Iterator>) -> Result<()> { - for key in iter { - self.increment(&key)?; - } - - Ok(()) - } - - fn scan_prefix<'a>(&'a self, prefix: Vec) -> Box, Vec)> + 'a>; - - fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin + Send + 'a>>; - - fn clear(&self) -> Result<()> { - for (key, _) in self.iter() { - self.remove(&key)?; - } - - Ok(()) - } -} - -pub struct Cork { - db: Arc, - flush: bool, - sync: bool, -} - -impl Cork { - pub(crate) fn new(db: &Arc, flush: bool, sync: bool) -> Self { - db.cork().unwrap(); - Cork { - db: db.clone(), - flush, - sync, - } - } -} - -impl Drop for Cork { - fn drop(&mut self) { - self.db.uncork().ok(); - if self.flush { - self.db.flush().ok(); - } - if self.sync { - self.db.sync().ok(); - } - } -} diff --git a/src/database/cork.rs b/src/database/cork.rs new file mode 100644 index 00000000..27b59d17 --- /dev/null +++ b/src/database/cork.rs @@ -0,0 +1,32 @@ +use std::sync::Arc; + +use super::KeyValueDatabaseEngine; + +pub struct Cork { + db: Arc, + flush: bool, + sync: bool, +} + +impl Cork { + pub(crate) fn new(db: &Arc, flush: bool, sync: bool) -> Self { + db.cork().unwrap(); + Cork { + db: db.clone(), + flush, + sync, + } + } +} + +impl Drop for Cork { + fn drop(&mut self) { + self.db.uncork().ok(); + if self.flush { + self.db.flush().ok(); + } + if self.sync { + self.db.sync().ok(); + } + } +} diff --git a/src/database/key_value/globals.rs b/src/database/key_value/globals.rs index 799b78f1..1986fe46 100644 --- a/src/database/key_value/globals.rs +++ b/src/database/key_value/globals.rs @@ -10,7 +10,7 @@ use ruma::{ }; use crate::{ - database::{abstraction::Cork, KeyValueDatabase}, + database::{Cork, KeyValueDatabase}, service, services, utils, Error, Result, }; diff --git a/src/database/kvengine.rs b/src/database/kvengine.rs new file mode 100644 index 00000000..c67a7e98 --- /dev/null +++ b/src/database/kvengine.rs @@ -0,0 +1,38 @@ +use std::{error::Error, sync::Arc}; + +use super::{Config, KvTree}; +use crate::Result; + +pub(crate) trait KeyValueDatabaseEngine: Send + Sync { + fn open(config: &Config) -> Result + where + Self: Sized; + + fn open_tree(&self, name: &'static str) -> Result>; + + fn flush(&self) -> Result<()>; + + #[allow(dead_code)] + fn sync(&self) -> Result<()> { Ok(()) } + + fn cork(&self) -> Result<()> { Ok(()) } + + fn uncork(&self) -> Result<()> { Ok(()) } + + fn corked(&self) -> bool { false } + + fn cleanup(&self) -> Result<()> { Ok(()) } + + fn memory_usage(&self) -> Result { + Ok("Current database engine does not support memory usage reporting.".to_owned()) + } + + #[allow(dead_code)] + fn clear_caches(&self) {} + + fn backup(&self) -> Result<(), Box> { unimplemented!() } + + fn backup_list(&self) -> Result { Ok(String::new()) } + + fn file_list(&self) -> Result { Ok(String::new()) } +} diff --git a/src/database/kvtree.rs b/src/database/kvtree.rs new file mode 100644 index 00000000..beda110a --- /dev/null +++ b/src/database/kvtree.rs @@ -0,0 +1,60 @@ +use std::{future::Future, pin::Pin, sync::Arc}; + +use crate::Result; + +pub(crate) trait KvTree: Send + Sync { + fn get(&self, key: &[u8]) -> Result>>; + + #[allow(dead_code)] + #[cfg(feature = "rocksdb")] + fn multi_get( + &self, _iter: Vec<(&Arc>, Vec)>, + ) -> Vec>, rust_rocksdb::Error>> { + unimplemented!() + } + + fn insert(&self, key: &[u8], value: &[u8]) -> Result<()>; + fn insert_batch(&self, iter: &mut dyn Iterator, Vec)>) -> Result<()> { + for (key, value) in iter { + self.insert(&key, &value)?; + } + + Ok(()) + } + + fn remove(&self, key: &[u8]) -> Result<()>; + + #[allow(dead_code)] + fn remove_batch(&self, iter: &mut dyn Iterator>) -> Result<()> { + for key in iter { + self.remove(&key)?; + } + + Ok(()) + } + + fn iter<'a>(&'a self) -> Box, Vec)> + 'a>; + + fn iter_from<'a>(&'a self, from: &[u8], backwards: bool) -> Box, Vec)> + 'a>; + + fn increment(&self, key: &[u8]) -> Result>; + fn increment_batch(&self, iter: &mut dyn Iterator>) -> Result<()> { + for key in iter { + self.increment(&key)?; + } + + Ok(()) + } + + fn scan_prefix<'a>(&'a self, prefix: Vec) -> Box, Vec)> + 'a>; + + fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin + Send + 'a>>; + + fn clear(&self) -> Result<()> { + for (key, _) in self.iter() { + self.remove(&key)?; + } + + Ok(()) + } +} diff --git a/src/database/migrations.rs b/src/database/migrations.rs new file mode 100644 index 00000000..ce46cb2d --- /dev/null +++ b/src/database/migrations.rs @@ -0,0 +1,640 @@ +use std::{ + collections::{HashMap, HashSet}, + fs::{self}, + io::Write, + mem::size_of, + sync::Arc, +}; + +use argon2::{password_hash::SaltString, PasswordHasher, PasswordVerifier}; +use itertools::Itertools; +use rand::thread_rng; +use ruma::{ + events::{push_rules::PushRulesEvent, GlobalAccountDataEventType}, + push::Ruleset, + EventId, OwnedRoomId, RoomId, UserId, +}; +use tracing::{debug, error, info, warn}; + +use super::KeyValueDatabase; +use crate::{services, utils, Config, Error, Result}; + +pub(crate) async fn migrations(db: &KeyValueDatabase, config: &Config) -> Result<()> { + // Matrix resource ownership is based on the server name; changing it + // requires recreating the database from scratch. + if services().users.count()? > 0 { + let conduit_user = + UserId::parse_with_server_name("conduit", &config.server_name).expect("@conduit:server_name is valid"); + + if !services().users.exists(&conduit_user)? { + error!("The {} server user does not exist, and the database is not new.", conduit_user); + return Err(Error::bad_database( + "Cannot reuse an existing database after changing the server name, please delete the old one first.", + )); + } + } + + // If the database has any data, perform data migrations before starting + // do not increment the db version if the user is not using sha256_media + let latest_database_version = if cfg!(feature = "sha256_media") { + 14 + } else { + 13 + }; + + if services().users.count()? > 0 { + // MIGRATIONS + if services().globals.database_version()? < 1 { + for (roomserverid, _) in db.roomserverids.iter() { + let mut parts = roomserverid.split(|&b| b == 0xFF); + let room_id = parts.next().expect("split always returns one element"); + let Some(servername) = parts.next() else { + error!("Migration: Invalid roomserverid in db."); + continue; + }; + let mut serverroomid = servername.to_vec(); + serverroomid.push(0xFF); + serverroomid.extend_from_slice(room_id); + + db.serverroomids.insert(&serverroomid, &[])?; + } + + services().globals.bump_database_version(1)?; + + warn!("Migration: 0 -> 1 finished"); + } + + if services().globals.database_version()? < 2 { + // We accidentally inserted hashed versions of "" into the db instead of just "" + for (userid, password) in db.userid_password.iter() { + let salt = SaltString::generate(thread_rng()); + let empty_pass = services() + .globals + .argon + .hash_password(b"", &salt) + .expect("our own password to be properly hashed"); + let empty_hashed_password = services() + .globals + .argon + .verify_password(&password, &empty_pass) + .is_ok(); + + if empty_hashed_password { + db.userid_password.insert(&userid, b"")?; + } + } + + services().globals.bump_database_version(2)?; + + warn!("Migration: 1 -> 2 finished"); + } + + if services().globals.database_version()? < 3 { + // Move media to filesystem + for (key, content) in db.mediaid_file.iter() { + if content.is_empty() { + continue; + } + + #[allow(deprecated)] + let path = services().globals.get_media_file(&key); + let mut file = fs::File::create(path)?; + file.write_all(&content)?; + db.mediaid_file.insert(&key, &[])?; + } + + services().globals.bump_database_version(3)?; + + warn!("Migration: 2 -> 3 finished"); + } + + if services().globals.database_version()? < 4 { + // Add federated users to services() as deactivated + for our_user in services().users.iter() { + let our_user = our_user?; + if services().users.is_deactivated(&our_user)? { + continue; + } + for room in services().rooms.state_cache.rooms_joined(&our_user) { + for user in services().rooms.state_cache.room_members(&room?) { + let user = user?; + if user.server_name() != config.server_name { + info!(?user, "Migration: creating user"); + services().users.create(&user, None)?; + } + } + } + } + + services().globals.bump_database_version(4)?; + + warn!("Migration: 3 -> 4 finished"); + } + + if services().globals.database_version()? < 5 { + // Upgrade user data store + for (roomuserdataid, _) in db.roomuserdataid_accountdata.iter() { + let mut parts = roomuserdataid.split(|&b| b == 0xFF); + let room_id = parts.next().unwrap(); + let user_id = parts.next().unwrap(); + let event_type = roomuserdataid.rsplit(|&b| b == 0xFF).next().unwrap(); + + let mut key = room_id.to_vec(); + key.push(0xFF); + key.extend_from_slice(user_id); + key.push(0xFF); + key.extend_from_slice(event_type); + + db.roomusertype_roomuserdataid + .insert(&key, &roomuserdataid)?; + } + + services().globals.bump_database_version(5)?; + + warn!("Migration: 4 -> 5 finished"); + } + + if services().globals.database_version()? < 6 { + // Set room member count + for (roomid, _) in db.roomid_shortstatehash.iter() { + let string = utils::string_from_bytes(&roomid).unwrap(); + let room_id = <&RoomId>::try_from(string.as_str()).unwrap(); + services().rooms.state_cache.update_joined_count(room_id)?; + } + + services().globals.bump_database_version(6)?; + + warn!("Migration: 5 -> 6 finished"); + } + + if services().globals.database_version()? < 7 { + // Upgrade state store + let mut last_roomstates: HashMap = HashMap::new(); + let mut current_sstatehash: Option = None; + let mut current_room = None; + let mut current_state = HashSet::new(); + let mut counter = 0; + + let mut handle_state = |current_sstatehash: u64, + current_room: &RoomId, + current_state: HashSet<_>, + last_roomstates: &mut HashMap<_, _>| { + counter += 1; + let last_roomsstatehash = last_roomstates.get(current_room); + + let states_parents = last_roomsstatehash.map_or_else( + || Ok(Vec::new()), + |&last_roomsstatehash| { + services() + .rooms + .state_compressor + .load_shortstatehash_info(last_roomsstatehash) + }, + )?; + + let (statediffnew, statediffremoved) = if let Some(parent_stateinfo) = states_parents.last() { + let statediffnew = current_state + .difference(&parent_stateinfo.1) + .copied() + .collect::>(); + + let statediffremoved = parent_stateinfo + .1 + .difference(¤t_state) + .copied() + .collect::>(); + + (statediffnew, statediffremoved) + } else { + (current_state, HashSet::new()) + }; + + services().rooms.state_compressor.save_state_from_diff( + current_sstatehash, + Arc::new(statediffnew), + Arc::new(statediffremoved), + 2, // every state change is 2 event changes on average + states_parents, + )?; + + /* + let mut tmp = services().rooms.load_shortstatehash_info(¤t_sstatehash)?; + let state = tmp.pop().unwrap(); + println!( + "{}\t{}{:?}: {:?} + {:?} - {:?}", + current_room, + " ".repeat(tmp.len()), + utils::u64_from_bytes(¤t_sstatehash).unwrap(), + tmp.last().map(|b| utils::u64_from_bytes(&b.0).unwrap()), + state + .2 + .iter() + .map(|b| utils::u64_from_bytes(&b[size_of::()..]).unwrap()) + .collect::>(), + state + .3 + .iter() + .map(|b| utils::u64_from_bytes(&b[size_of::()..]).unwrap()) + .collect::>() + ); + */ + + Ok::<_, Error>(()) + }; + + for (k, seventid) in db.db.open_tree("stateid_shorteventid")?.iter() { + let sstatehash = utils::u64_from_bytes(&k[0..size_of::()]).expect("number of bytes is correct"); + let sstatekey = k[size_of::()..].to_vec(); + if Some(sstatehash) != current_sstatehash { + if let Some(current_sstatehash) = current_sstatehash { + handle_state( + current_sstatehash, + current_room.as_deref().unwrap(), + current_state, + &mut last_roomstates, + )?; + last_roomstates.insert(current_room.clone().unwrap(), current_sstatehash); + } + current_state = HashSet::new(); + current_sstatehash = Some(sstatehash); + + let event_id = db.shorteventid_eventid.get(&seventid).unwrap().unwrap(); + let string = utils::string_from_bytes(&event_id).unwrap(); + let event_id = <&EventId>::try_from(string.as_str()).unwrap(); + let pdu = services() + .rooms + .timeline + .get_pdu(event_id) + .unwrap() + .unwrap(); + + if Some(&pdu.room_id) != current_room.as_ref() { + current_room = Some(pdu.room_id.clone()); + } + } + + let mut val = sstatekey; + val.extend_from_slice(&seventid); + current_state.insert(val.try_into().expect("size is correct")); + } + + if let Some(current_sstatehash) = current_sstatehash { + handle_state( + current_sstatehash, + current_room.as_deref().unwrap(), + current_state, + &mut last_roomstates, + )?; + } + + services().globals.bump_database_version(7)?; + + warn!("Migration: 6 -> 7 finished"); + } + + if services().globals.database_version()? < 8 { + // Generate short room ids for all rooms + for (room_id, _) in db.roomid_shortstatehash.iter() { + let shortroomid = services().globals.next_count()?.to_be_bytes(); + db.roomid_shortroomid.insert(&room_id, &shortroomid)?; + info!("Migration: 8"); + } + // Update pduids db layout + let mut batch = db.pduid_pdu.iter().filter_map(|(key, v)| { + if !key.starts_with(b"!") { + return None; + } + let mut parts = key.splitn(2, |&b| b == 0xFF); + let room_id = parts.next().unwrap(); + let count = parts.next().unwrap(); + + let short_room_id = db + .roomid_shortroomid + .get(room_id) + .unwrap() + .expect("shortroomid should exist"); + + let mut new_key = short_room_id; + new_key.extend_from_slice(count); + + Some((new_key, v)) + }); + + db.pduid_pdu.insert_batch(&mut batch)?; + + let mut batch2 = db.eventid_pduid.iter().filter_map(|(k, value)| { + if !value.starts_with(b"!") { + return None; + } + let mut parts = value.splitn(2, |&b| b == 0xFF); + let room_id = parts.next().unwrap(); + let count = parts.next().unwrap(); + + let short_room_id = db + .roomid_shortroomid + .get(room_id) + .unwrap() + .expect("shortroomid should exist"); + + let mut new_value = short_room_id; + new_value.extend_from_slice(count); + + Some((k, new_value)) + }); + + db.eventid_pduid.insert_batch(&mut batch2)?; + + services().globals.bump_database_version(8)?; + + warn!("Migration: 7 -> 8 finished"); + } + + if services().globals.database_version()? < 9 { + // Update tokenids db layout + let mut iter = db + .tokenids + .iter() + .filter_map(|(key, _)| { + if !key.starts_with(b"!") { + return None; + } + let mut parts = key.splitn(4, |&b| b == 0xFF); + let room_id = parts.next().unwrap(); + let word = parts.next().unwrap(); + let _pdu_id_room = parts.next().unwrap(); + let pdu_id_count = parts.next().unwrap(); + + let short_room_id = db + .roomid_shortroomid + .get(room_id) + .unwrap() + .expect("shortroomid should exist"); + let mut new_key = short_room_id; + new_key.extend_from_slice(word); + new_key.push(0xFF); + new_key.extend_from_slice(pdu_id_count); + Some((new_key, Vec::new())) + }) + .peekable(); + + while iter.peek().is_some() { + db.tokenids.insert_batch(&mut iter.by_ref().take(1000))?; + debug!("Inserted smaller batch"); + } + + info!("Deleting starts"); + + let batch2: Vec<_> = db + .tokenids + .iter() + .filter_map(|(key, _)| { + if key.starts_with(b"!") { + Some(key) + } else { + None + } + }) + .collect(); + + for key in batch2 { + db.tokenids.remove(&key)?; + } + + services().globals.bump_database_version(9)?; + + warn!("Migration: 8 -> 9 finished"); + } + + if services().globals.database_version()? < 10 { + // Add other direction for shortstatekeys + for (statekey, shortstatekey) in db.statekey_shortstatekey.iter() { + db.shortstatekey_statekey + .insert(&shortstatekey, &statekey)?; + } + + // Force E2EE device list updates so we can send them over federation + for user_id in services().users.iter().filter_map(Result::ok) { + services().users.mark_device_key_update(&user_id)?; + } + + services().globals.bump_database_version(10)?; + + warn!("Migration: 9 -> 10 finished"); + } + + if services().globals.database_version()? < 11 { + db.db + .open_tree("userdevicesessionid_uiaarequest")? + .clear()?; + services().globals.bump_database_version(11)?; + + warn!("Migration: 10 -> 11 finished"); + } + + if services().globals.database_version()? < 12 { + for username in services().users.list_local_users()? { + let user = match UserId::parse_with_server_name(username.clone(), &config.server_name) { + Ok(u) => u, + Err(e) => { + warn!("Invalid username {username}: {e}"); + continue; + }, + }; + + let raw_rules_list = services() + .account_data + .get(None, &user, GlobalAccountDataEventType::PushRules.to_string().into()) + .unwrap() + .expect("Username is invalid"); + + let mut account_data = serde_json::from_str::(raw_rules_list.get()).unwrap(); + let rules_list = &mut account_data.content.global; + + //content rule + { + let content_rule_transformation = [".m.rules.contains_user_name", ".m.rule.contains_user_name"]; + + let rule = rules_list.content.get(content_rule_transformation[0]); + if rule.is_some() { + let mut rule = rule.unwrap().clone(); + content_rule_transformation[1].clone_into(&mut rule.rule_id); + rules_list + .content + .shift_remove(content_rule_transformation[0]); + rules_list.content.insert(rule); + } + } + + //underride rules + { + let underride_rule_transformation = [ + [".m.rules.call", ".m.rule.call"], + [".m.rules.room_one_to_one", ".m.rule.room_one_to_one"], + [".m.rules.encrypted_room_one_to_one", ".m.rule.encrypted_room_one_to_one"], + [".m.rules.message", ".m.rule.message"], + [".m.rules.encrypted", ".m.rule.encrypted"], + ]; + + for transformation in underride_rule_transformation { + let rule = rules_list.underride.get(transformation[0]); + if let Some(rule) = rule { + let mut rule = rule.clone(); + transformation[1].clone_into(&mut rule.rule_id); + rules_list.underride.shift_remove(transformation[0]); + rules_list.underride.insert(rule); + } + } + } + + services().account_data.update( + None, + &user, + GlobalAccountDataEventType::PushRules.to_string().into(), + &serde_json::to_value(account_data).expect("to json value always works"), + )?; + } + + services().globals.bump_database_version(12)?; + + warn!("Migration: 11 -> 12 finished"); + } + + // This migration can be reused as-is anytime the server-default rules are + // updated. + if services().globals.database_version()? < 13 { + for username in services().users.list_local_users()? { + let user = match UserId::parse_with_server_name(username.clone(), &config.server_name) { + Ok(u) => u, + Err(e) => { + warn!("Invalid username {username}: {e}"); + continue; + }, + }; + + let raw_rules_list = services() + .account_data + .get(None, &user, GlobalAccountDataEventType::PushRules.to_string().into()) + .unwrap() + .expect("Username is invalid"); + + let mut account_data = serde_json::from_str::(raw_rules_list.get()).unwrap(); + + let user_default_rules = Ruleset::server_default(&user); + account_data + .content + .global + .update_with_server_default(user_default_rules); + + services().account_data.update( + None, + &user, + GlobalAccountDataEventType::PushRules.to_string().into(), + &serde_json::to_value(account_data).expect("to json value always works"), + )?; + } + + services().globals.bump_database_version(13)?; + + warn!("Migration: 12 -> 13 finished"); + } + + #[cfg(feature = "sha256_media")] + { + if services().globals.database_version()? < 14 && cfg!(feature = "sha256_media") { + warn!("sha256_media feature flag is enabled, migrating legacy base64 file names to sha256 file names"); + // Move old media files to new names + for (key, _) in db.mediaid_file.iter() { + let old_path = services().globals.get_media_file(&key); + debug!("Old file path: {old_path:?}"); + let path = services().globals.get_media_file_new(&key); + debug!("New file path: {path:?}"); + // move the file to the new location + if old_path.exists() { + tokio::fs::rename(&old_path, &path).await?; + } + } + + services().globals.bump_database_version(14)?; + + warn!("Migration: 13 -> 14 finished"); + } + } + + assert_eq!( + services().globals.database_version().unwrap(), + latest_database_version, + "Failed asserting local database version {} is equal to known latest conduwuit database version {}", + services().globals.database_version().unwrap(), + latest_database_version + ); + + { + let patterns = &config.forbidden_usernames; + if !patterns.is_empty() { + for user_id in services() + .users + .iter() + .filter_map(Result::ok) + .filter(|user| !services().users.is_deactivated(user).unwrap_or(true)) + .filter(|user| user.server_name() == config.server_name) + { + let matches = patterns.matches(user_id.localpart()); + if matches.matched_any() { + warn!( + "User {} matches the following forbidden username patterns: {}", + user_id.to_string(), + matches + .into_iter() + .map(|x| &patterns.patterns()[x]) + .join(", ") + ); + } + } + } + } + + { + let patterns = &config.forbidden_alias_names; + if !patterns.is_empty() { + for address in services().rooms.metadata.iter_ids() { + let room_id = address?; + let room_aliases = services().rooms.alias.local_aliases_for_room(&room_id); + for room_alias_result in room_aliases { + let room_alias = room_alias_result?; + let matches = patterns.matches(room_alias.alias()); + if matches.matched_any() { + warn!( + "Room with alias {} ({}) matches the following forbidden room name patterns: {}", + room_alias, + &room_id, + matches + .into_iter() + .map(|x| &patterns.patterns()[x]) + .join(", ") + ); + } + } + } + } + } + + info!( + "Loaded {} database with version {}", + config.database_backend, latest_database_version + ); + } else { + services() + .globals + .bump_database_version(latest_database_version)?; + + // Create the admin room and server user on first run + services().admin.create_admin_room().await?; + + warn!( + "Created new {} database with version {}", + config.database_backend, latest_database_version + ); + } + + Ok(()) +} diff --git a/src/database/mod.rs b/src/database/mod.rs index 2a8af795..41e8d5b8 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -1,37 +1,48 @@ -pub(crate) mod abstraction; +pub(crate) mod cork; pub(crate) mod key_value; +pub(crate) mod kvengine; +pub(crate) mod kvtree; +mod migrations; + +#[cfg(feature = "rocksdb")] +pub(crate) mod rocksdb; + +#[cfg(feature = "sqlite")] +pub mod sqlite; + +#[cfg(any(feature = "sqlite", feature = "rocksdb"))] +pub(crate) mod watchers; use std::{ collections::{BTreeMap, HashMap, HashSet}, fs::{self}, - io::Write, - mem::size_of, path::Path, sync::{Arc, Mutex, RwLock}, time::Duration, }; -use abstraction::{KeyValueDatabaseEngine, KvTree}; -use argon2::{password_hash::SaltString, PasswordHasher, PasswordVerifier}; -use itertools::Itertools; +pub(crate) use cork::Cork; +pub(crate) use kvengine::KeyValueDatabaseEngine; +pub(crate) use kvtree::KvTree; use lru_cache::LruCache; -use rand::thread_rng; use ruma::{ events::{ - push_rules::{PushRulesEvent, PushRulesEventContent}, - room::message::RoomMessageEventContent, - GlobalAccountDataEvent, GlobalAccountDataEventType, StateEventType, + push_rules::PushRulesEventContent, room::message::RoomMessageEventContent, GlobalAccountDataEvent, + GlobalAccountDataEventType, StateEventType, }, push::Ruleset, - CanonicalJsonValue, EventId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UserId, + CanonicalJsonValue, EventId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedUserId, UserId, }; use serde::Deserialize; #[cfg(unix)] use tokio::signal::unix::{signal, SignalKind}; use tokio::time::{interval, Instant}; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, warn}; -use crate::{service::rooms::timeline::PduCount, services, utils, Config, Error, PduEvent, Result, Services, SERVICES}; +use crate::{ + database::migrations::migrations, service::rooms::timeline::PduCount, services, Config, Error, PduEvent, Result, + Services, SERVICES, +}; pub struct KeyValueDatabase { db: Arc, @@ -193,42 +204,6 @@ struct CheckForUpdatesResponse { } impl KeyValueDatabase { - fn check_db_setup(config: &Config) -> Result<()> { - let path = Path::new(&config.database_path); - - let sqlite_exists = path.join("conduit.db").exists(); - let rocksdb_exists = path.join("IDENTITY").exists(); - - let mut count = 0; - - if sqlite_exists { - count += 1; - } - - if rocksdb_exists { - count += 1; - } - - if count > 1 { - warn!("Multiple databases at database_path detected"); - return Ok(()); - } - - if sqlite_exists && config.database_backend != "sqlite" { - return Err(Error::bad_config( - "Found sqlite at database_path, but is not specified in config.", - )); - } - - if rocksdb_exists && config.database_backend != "rocksdb" { - return Err(Error::bad_config( - "Found rocksdb at database_path, but is not specified in config.", - )); - } - - Ok(()) - } - /// Load an existing database or create a new one. #[allow(clippy::too_many_lines)] pub async fn load_or_create(config: Config) -> Result<()> { @@ -252,14 +227,14 @@ impl KeyValueDatabase { #[cfg(not(feature = "sqlite"))] return Err(Error::bad_config("Database backend not found.")); #[cfg(feature = "sqlite")] - Arc::new(Arc::::open(&config)?) + Arc::new(Arc::::open(&config)?) }, "rocksdb" => { debug!("Got rocksdb database backend"); #[cfg(not(feature = "rocksdb"))] return Err(Error::bad_config("Database backend not found.")); #[cfg(feature = "rocksdb")] - Arc::new(Arc::::open(&config)?) + Arc::new(Arc::::open(&config)?) }, _ => { return Err(Error::bad_config( @@ -399,625 +374,7 @@ impl KeyValueDatabase { // This is the first and only time we initialize the SERVICE static *SERVICES.write().unwrap() = Some(Box::leak(services_raw)); - // Matrix resource ownership is based on the server name; changing it - // requires recreating the database from scratch. - if services().users.count()? > 0 { - let conduit_user = - UserId::parse_with_server_name("conduit", &config.server_name).expect("@conduit:server_name is valid"); - - if !services().users.exists(&conduit_user)? { - error!("The {} server user does not exist, and the database is not new.", conduit_user); - return Err(Error::bad_database( - "Cannot reuse an existing database after changing the server name, please delete the old one \ - first.", - )); - } - } - - // If the database has any data, perform data migrations before starting - // do not increment the db version if the user is not using sha256_media - let latest_database_version = if cfg!(feature = "sha256_media") { - 14 - } else { - 13 - }; - - if services().users.count()? > 0 { - // MIGRATIONS - if services().globals.database_version()? < 1 { - for (roomserverid, _) in db.roomserverids.iter() { - let mut parts = roomserverid.split(|&b| b == 0xFF); - let room_id = parts.next().expect("split always returns one element"); - let Some(servername) = parts.next() else { - error!("Migration: Invalid roomserverid in db."); - continue; - }; - let mut serverroomid = servername.to_vec(); - serverroomid.push(0xFF); - serverroomid.extend_from_slice(room_id); - - db.serverroomids.insert(&serverroomid, &[])?; - } - - services().globals.bump_database_version(1)?; - - warn!("Migration: 0 -> 1 finished"); - } - - if services().globals.database_version()? < 2 { - // We accidentally inserted hashed versions of "" into the db instead of just "" - for (userid, password) in db.userid_password.iter() { - let salt = SaltString::generate(thread_rng()); - let empty_pass = services() - .globals - .argon - .hash_password(b"", &salt) - .expect("our own password to be properly hashed"); - let empty_hashed_password = services() - .globals - .argon - .verify_password(&password, &empty_pass) - .is_ok(); - - if empty_hashed_password { - db.userid_password.insert(&userid, b"")?; - } - } - - services().globals.bump_database_version(2)?; - - warn!("Migration: 1 -> 2 finished"); - } - - if services().globals.database_version()? < 3 { - // Move media to filesystem - for (key, content) in db.mediaid_file.iter() { - if content.is_empty() { - continue; - } - - #[allow(deprecated)] - let path = services().globals.get_media_file(&key); - let mut file = fs::File::create(path)?; - file.write_all(&content)?; - db.mediaid_file.insert(&key, &[])?; - } - - services().globals.bump_database_version(3)?; - - warn!("Migration: 2 -> 3 finished"); - } - - if services().globals.database_version()? < 4 { - // Add federated users to services() as deactivated - for our_user in services().users.iter() { - let our_user = our_user?; - if services().users.is_deactivated(&our_user)? { - continue; - } - for room in services().rooms.state_cache.rooms_joined(&our_user) { - for user in services().rooms.state_cache.room_members(&room?) { - let user = user?; - if user.server_name() != config.server_name { - info!(?user, "Migration: creating user"); - services().users.create(&user, None)?; - } - } - } - } - - services().globals.bump_database_version(4)?; - - warn!("Migration: 3 -> 4 finished"); - } - - if services().globals.database_version()? < 5 { - // Upgrade user data store - for (roomuserdataid, _) in db.roomuserdataid_accountdata.iter() { - let mut parts = roomuserdataid.split(|&b| b == 0xFF); - let room_id = parts.next().unwrap(); - let user_id = parts.next().unwrap(); - let event_type = roomuserdataid.rsplit(|&b| b == 0xFF).next().unwrap(); - - let mut key = room_id.to_vec(); - key.push(0xFF); - key.extend_from_slice(user_id); - key.push(0xFF); - key.extend_from_slice(event_type); - - db.roomusertype_roomuserdataid - .insert(&key, &roomuserdataid)?; - } - - services().globals.bump_database_version(5)?; - - warn!("Migration: 4 -> 5 finished"); - } - - if services().globals.database_version()? < 6 { - // Set room member count - for (roomid, _) in db.roomid_shortstatehash.iter() { - let string = utils::string_from_bytes(&roomid).unwrap(); - let room_id = <&RoomId>::try_from(string.as_str()).unwrap(); - services().rooms.state_cache.update_joined_count(room_id)?; - } - - services().globals.bump_database_version(6)?; - - warn!("Migration: 5 -> 6 finished"); - } - - if services().globals.database_version()? < 7 { - // Upgrade state store - let mut last_roomstates: HashMap = HashMap::new(); - let mut current_sstatehash: Option = None; - let mut current_room = None; - let mut current_state = HashSet::new(); - let mut counter = 0; - - let mut handle_state = |current_sstatehash: u64, - current_room: &RoomId, - current_state: HashSet<_>, - last_roomstates: &mut HashMap<_, _>| { - counter += 1; - let last_roomsstatehash = last_roomstates.get(current_room); - - let states_parents = last_roomsstatehash.map_or_else( - || Ok(Vec::new()), - |&last_roomsstatehash| { - services() - .rooms - .state_compressor - .load_shortstatehash_info(last_roomsstatehash) - }, - )?; - - let (statediffnew, statediffremoved) = if let Some(parent_stateinfo) = states_parents.last() { - let statediffnew = current_state - .difference(&parent_stateinfo.1) - .copied() - .collect::>(); - - let statediffremoved = parent_stateinfo - .1 - .difference(¤t_state) - .copied() - .collect::>(); - - (statediffnew, statediffremoved) - } else { - (current_state, HashSet::new()) - }; - - services().rooms.state_compressor.save_state_from_diff( - current_sstatehash, - Arc::new(statediffnew), - Arc::new(statediffremoved), - 2, // every state change is 2 event changes on average - states_parents, - )?; - - /* - let mut tmp = services().rooms.load_shortstatehash_info(¤t_sstatehash)?; - let state = tmp.pop().unwrap(); - println!( - "{}\t{}{:?}: {:?} + {:?} - {:?}", - current_room, - " ".repeat(tmp.len()), - utils::u64_from_bytes(¤t_sstatehash).unwrap(), - tmp.last().map(|b| utils::u64_from_bytes(&b.0).unwrap()), - state - .2 - .iter() - .map(|b| utils::u64_from_bytes(&b[size_of::()..]).unwrap()) - .collect::>(), - state - .3 - .iter() - .map(|b| utils::u64_from_bytes(&b[size_of::()..]).unwrap()) - .collect::>() - ); - */ - - Ok::<_, Error>(()) - }; - - for (k, seventid) in db.db.open_tree("stateid_shorteventid")?.iter() { - let sstatehash = - utils::u64_from_bytes(&k[0..size_of::()]).expect("number of bytes is correct"); - let sstatekey = k[size_of::()..].to_vec(); - if Some(sstatehash) != current_sstatehash { - if let Some(current_sstatehash) = current_sstatehash { - handle_state( - current_sstatehash, - current_room.as_deref().unwrap(), - current_state, - &mut last_roomstates, - )?; - last_roomstates.insert(current_room.clone().unwrap(), current_sstatehash); - } - current_state = HashSet::new(); - current_sstatehash = Some(sstatehash); - - let event_id = db.shorteventid_eventid.get(&seventid).unwrap().unwrap(); - let string = utils::string_from_bytes(&event_id).unwrap(); - let event_id = <&EventId>::try_from(string.as_str()).unwrap(); - let pdu = services() - .rooms - .timeline - .get_pdu(event_id) - .unwrap() - .unwrap(); - - if Some(&pdu.room_id) != current_room.as_ref() { - current_room = Some(pdu.room_id.clone()); - } - } - - let mut val = sstatekey; - val.extend_from_slice(&seventid); - current_state.insert(val.try_into().expect("size is correct")); - } - - if let Some(current_sstatehash) = current_sstatehash { - handle_state( - current_sstatehash, - current_room.as_deref().unwrap(), - current_state, - &mut last_roomstates, - )?; - } - - services().globals.bump_database_version(7)?; - - warn!("Migration: 6 -> 7 finished"); - } - - if services().globals.database_version()? < 8 { - // Generate short room ids for all rooms - for (room_id, _) in db.roomid_shortstatehash.iter() { - let shortroomid = services().globals.next_count()?.to_be_bytes(); - db.roomid_shortroomid.insert(&room_id, &shortroomid)?; - info!("Migration: 8"); - } - // Update pduids db layout - let mut batch = db.pduid_pdu.iter().filter_map(|(key, v)| { - if !key.starts_with(b"!") { - return None; - } - let mut parts = key.splitn(2, |&b| b == 0xFF); - let room_id = parts.next().unwrap(); - let count = parts.next().unwrap(); - - let short_room_id = db - .roomid_shortroomid - .get(room_id) - .unwrap() - .expect("shortroomid should exist"); - - let mut new_key = short_room_id; - new_key.extend_from_slice(count); - - Some((new_key, v)) - }); - - db.pduid_pdu.insert_batch(&mut batch)?; - - let mut batch2 = db.eventid_pduid.iter().filter_map(|(k, value)| { - if !value.starts_with(b"!") { - return None; - } - let mut parts = value.splitn(2, |&b| b == 0xFF); - let room_id = parts.next().unwrap(); - let count = parts.next().unwrap(); - - let short_room_id = db - .roomid_shortroomid - .get(room_id) - .unwrap() - .expect("shortroomid should exist"); - - let mut new_value = short_room_id; - new_value.extend_from_slice(count); - - Some((k, new_value)) - }); - - db.eventid_pduid.insert_batch(&mut batch2)?; - - services().globals.bump_database_version(8)?; - - warn!("Migration: 7 -> 8 finished"); - } - - if services().globals.database_version()? < 9 { - // Update tokenids db layout - let mut iter = db - .tokenids - .iter() - .filter_map(|(key, _)| { - if !key.starts_with(b"!") { - return None; - } - let mut parts = key.splitn(4, |&b| b == 0xFF); - let room_id = parts.next().unwrap(); - let word = parts.next().unwrap(); - let _pdu_id_room = parts.next().unwrap(); - let pdu_id_count = parts.next().unwrap(); - - let short_room_id = db - .roomid_shortroomid - .get(room_id) - .unwrap() - .expect("shortroomid should exist"); - let mut new_key = short_room_id; - new_key.extend_from_slice(word); - new_key.push(0xFF); - new_key.extend_from_slice(pdu_id_count); - Some((new_key, Vec::new())) - }) - .peekable(); - - while iter.peek().is_some() { - db.tokenids.insert_batch(&mut iter.by_ref().take(1000))?; - debug!("Inserted smaller batch"); - } - - info!("Deleting starts"); - - let batch2: Vec<_> = db - .tokenids - .iter() - .filter_map(|(key, _)| { - if key.starts_with(b"!") { - Some(key) - } else { - None - } - }) - .collect(); - - for key in batch2 { - db.tokenids.remove(&key)?; - } - - services().globals.bump_database_version(9)?; - - warn!("Migration: 8 -> 9 finished"); - } - - if services().globals.database_version()? < 10 { - // Add other direction for shortstatekeys - for (statekey, shortstatekey) in db.statekey_shortstatekey.iter() { - db.shortstatekey_statekey - .insert(&shortstatekey, &statekey)?; - } - - // Force E2EE device list updates so we can send them over federation - for user_id in services().users.iter().filter_map(Result::ok) { - services().users.mark_device_key_update(&user_id)?; - } - - services().globals.bump_database_version(10)?; - - warn!("Migration: 9 -> 10 finished"); - } - - if services().globals.database_version()? < 11 { - db.db - .open_tree("userdevicesessionid_uiaarequest")? - .clear()?; - services().globals.bump_database_version(11)?; - - warn!("Migration: 10 -> 11 finished"); - } - - if services().globals.database_version()? < 12 { - for username in services().users.list_local_users()? { - let user = match UserId::parse_with_server_name(username.clone(), &config.server_name) { - Ok(u) => u, - Err(e) => { - warn!("Invalid username {username}: {e}"); - continue; - }, - }; - - let raw_rules_list = services() - .account_data - .get(None, &user, GlobalAccountDataEventType::PushRules.to_string().into()) - .unwrap() - .expect("Username is invalid"); - - let mut account_data = serde_json::from_str::(raw_rules_list.get()).unwrap(); - let rules_list = &mut account_data.content.global; - - //content rule - { - let content_rule_transformation = [".m.rules.contains_user_name", ".m.rule.contains_user_name"]; - - let rule = rules_list.content.get(content_rule_transformation[0]); - if rule.is_some() { - let mut rule = rule.unwrap().clone(); - content_rule_transformation[1].clone_into(&mut rule.rule_id); - rules_list - .content - .shift_remove(content_rule_transformation[0]); - rules_list.content.insert(rule); - } - } - - //underride rules - { - let underride_rule_transformation = [ - [".m.rules.call", ".m.rule.call"], - [".m.rules.room_one_to_one", ".m.rule.room_one_to_one"], - [".m.rules.encrypted_room_one_to_one", ".m.rule.encrypted_room_one_to_one"], - [".m.rules.message", ".m.rule.message"], - [".m.rules.encrypted", ".m.rule.encrypted"], - ]; - - for transformation in underride_rule_transformation { - let rule = rules_list.underride.get(transformation[0]); - if let Some(rule) = rule { - let mut rule = rule.clone(); - transformation[1].clone_into(&mut rule.rule_id); - rules_list.underride.shift_remove(transformation[0]); - rules_list.underride.insert(rule); - } - } - } - - services().account_data.update( - None, - &user, - GlobalAccountDataEventType::PushRules.to_string().into(), - &serde_json::to_value(account_data).expect("to json value always works"), - )?; - } - - services().globals.bump_database_version(12)?; - - warn!("Migration: 11 -> 12 finished"); - } - - // This migration can be reused as-is anytime the server-default rules are - // updated. - if services().globals.database_version()? < 13 { - for username in services().users.list_local_users()? { - let user = match UserId::parse_with_server_name(username.clone(), &config.server_name) { - Ok(u) => u, - Err(e) => { - warn!("Invalid username {username}: {e}"); - continue; - }, - }; - - let raw_rules_list = services() - .account_data - .get(None, &user, GlobalAccountDataEventType::PushRules.to_string().into()) - .unwrap() - .expect("Username is invalid"); - - let mut account_data = serde_json::from_str::(raw_rules_list.get()).unwrap(); - - let user_default_rules = Ruleset::server_default(&user); - account_data - .content - .global - .update_with_server_default(user_default_rules); - - services().account_data.update( - None, - &user, - GlobalAccountDataEventType::PushRules.to_string().into(), - &serde_json::to_value(account_data).expect("to json value always works"), - )?; - } - - services().globals.bump_database_version(13)?; - - warn!("Migration: 12 -> 13 finished"); - } - - #[cfg(feature = "sha256_media")] - { - if services().globals.database_version()? < 14 && cfg!(feature = "sha256_media") { - warn!( - "sha256_media feature flag is enabled, migrating legacy base64 file names to sha256 file names" - ); - // Move old media files to new names - for (key, _) in db.mediaid_file.iter() { - let old_path = services().globals.get_media_file(&key); - debug!("Old file path: {old_path:?}"); - let path = services().globals.get_media_file_new(&key); - debug!("New file path: {path:?}"); - // move the file to the new location - if old_path.exists() { - tokio::fs::rename(&old_path, &path).await?; - } - } - - services().globals.bump_database_version(14)?; - - warn!("Migration: 13 -> 14 finished"); - } - } - - assert_eq!( - services().globals.database_version().unwrap(), - latest_database_version, - "Failed asserting local database version {} is equal to known latest conduwuit database version {}", - services().globals.database_version().unwrap(), - latest_database_version - ); - - { - let patterns = &config.forbidden_usernames; - if !patterns.is_empty() { - for user_id in services() - .users - .iter() - .filter_map(Result::ok) - .filter(|user| !services().users.is_deactivated(user).unwrap_or(true)) - .filter(|user| user.server_name() == config.server_name) - { - let matches = patterns.matches(user_id.localpart()); - if matches.matched_any() { - warn!( - "User {} matches the following forbidden username patterns: {}", - user_id.to_string(), - matches - .into_iter() - .map(|x| &patterns.patterns()[x]) - .join(", ") - ); - } - } - } - } - - { - let patterns = &config.forbidden_alias_names; - if !patterns.is_empty() { - for address in services().rooms.metadata.iter_ids() { - let room_id = address?; - let room_aliases = services().rooms.alias.local_aliases_for_room(&room_id); - for room_alias_result in room_aliases { - let room_alias = room_alias_result?; - let matches = patterns.matches(room_alias.alias()); - if matches.matched_any() { - warn!( - "Room with alias {} ({}) matches the following forbidden room name patterns: {}", - room_alias, - &room_id, - matches - .into_iter() - .map(|x| &patterns.patterns()[x]) - .join(", ") - ); - } - } - } - } - } - - info!( - "Loaded {} database with version {}", - config.database_backend, latest_database_version - ); - } else { - services() - .globals - .bump_database_version(latest_database_version)?; - - // Create the admin room and server user on first run - services().admin.create_admin_room().await?; - - warn!( - "Created new {} database with version {}", - config.database_backend, latest_database_version - ); - } + migrations(db, &config).await?; services().admin.start_handler(); @@ -1056,14 +413,40 @@ impl KeyValueDatabase { Ok(()) } - pub fn flush(&self) -> Result<()> { - let start = std::time::Instant::now(); + fn check_db_setup(config: &Config) -> Result<()> { + let path = Path::new(&config.database_path); - let res = self.db.flush(); + let sqlite_exists = path.join("conduit.db").exists(); + let rocksdb_exists = path.join("IDENTITY").exists(); - debug!("flush: took {:?}", start.elapsed()); + let mut count = 0; - res + if sqlite_exists { + count += 1; + } + + if rocksdb_exists { + count += 1; + } + + if count > 1 { + warn!("Multiple databases at database_path detected"); + return Ok(()); + } + + if sqlite_exists && config.database_backend != "sqlite" { + return Err(Error::bad_config( + "Found sqlite at database_path, but is not specified in config.", + )); + } + + if rocksdb_exists && config.database_backend != "rocksdb" { + return Err(Error::bad_config( + "Found rocksdb at database_path, but is not specified in config.", + )); + } + + Ok(()) } #[tracing::instrument] @@ -1119,15 +502,6 @@ impl KeyValueDatabase { Ok(()) } - fn perform_cleanup() { - let start = Instant::now(); - if let Err(e) = services().globals.cleanup() { - error!(target: "database-cleanup", "Ran into an error during cleanup: {}", e); - } else { - debug!(target: "database-cleanup", "Finished cleanup in {:#?}.", start.elapsed()); - } - } - #[tracing::instrument] async fn start_cleanup_task() { let timer_interval = Duration::from_secs(u64::from(services().globals.config.cleanup_second_interval)); @@ -1169,6 +543,25 @@ impl KeyValueDatabase { } }); } + + fn perform_cleanup() { + let start = Instant::now(); + if let Err(e) = services().globals.cleanup() { + error!(target: "database-cleanup", "Ran into an error during cleanup: {}", e); + } else { + debug!(target: "database-cleanup", "Finished cleanup in {:#?}.", start.elapsed()); + } + } + + pub fn flush(&self) -> Result<()> { + let start = std::time::Instant::now(); + + let res = self.db.flush(); + + debug!("flush: took {:?}", start.elapsed()); + + res + } } /// Sets the emergency password and push rules for the @conduit account in case diff --git a/src/database/abstraction/rocksdb.rs b/src/database/rocksdb/mod.rs similarity index 100% rename from src/database/abstraction/rocksdb.rs rename to src/database/rocksdb/mod.rs diff --git a/src/database/abstraction/sqlite.rs b/src/database/sqlite/mod.rs similarity index 100% rename from src/database/abstraction/sqlite.rs rename to src/database/sqlite/mod.rs diff --git a/src/database/abstraction/watchers.rs b/src/database/watchers.rs similarity index 100% rename from src/database/abstraction/watchers.rs rename to src/database/watchers.rs diff --git a/src/service/globals/data.rs b/src/service/globals/data.rs index 51cfc6c8..15c29094 100644 --- a/src/service/globals/data.rs +++ b/src/service/globals/data.rs @@ -7,7 +7,7 @@ use ruma::{ DeviceId, OwnedServerSigningKeyId, ServerName, UserId, }; -use crate::{database::abstraction::Cork, Result}; +use crate::{database::Cork, Result}; #[async_trait] pub trait Data: Send + Sync {