add rocksdb configurable options and tweaks, logging improvements, exp. room v11 support
split out the spinning disk focused options into a configurable option, current conduwuit users are NVMe/SSDs anyways so those options are just hindering performance. rocksdb logging builds up overtime with no cleanup or anything, adds support for configuring the amount of logging, size of files, log rotate, etc. fixes https://gitlab.com/girlbossceo/conduwuit/-/issues/1 misc conduit logging improvements for help debugging issues and maybe a future feature experimental Room V11 support from https://gitlab.com/famedly/conduit/-/merge_requests/562 Signed-off-by: strawberry <strawberry@puppygock.gay>
This commit is contained in:
parent
56e4166ee8
commit
f62f641545
10 changed files with 326 additions and 107 deletions
|
@ -23,7 +23,7 @@ use ruma::{
|
|||
},
|
||||
int,
|
||||
serde::JsonObject,
|
||||
CanonicalJsonObject, OwnedRoomAliasId, RoomAliasId, RoomId,
|
||||
CanonicalJsonObject, OwnedRoomAliasId, RoomAliasId, RoomId, RoomVersionId,
|
||||
};
|
||||
use serde_json::{json, value::to_raw_value};
|
||||
use std::{cmp::max, collections::BTreeMap, sync::Arc};
|
||||
|
@ -127,12 +127,28 @@ pub async fn create_room_route(
|
|||
let mut content = content
|
||||
.deserialize_as::<CanonicalJsonObject>()
|
||||
.expect("Invalid creation content");
|
||||
content.insert(
|
||||
"creator".into(),
|
||||
json!(&sender_user).try_into().map_err(|_| {
|
||||
Error::BadRequest(ErrorKind::BadJson, "Invalid creation content")
|
||||
})?,
|
||||
);
|
||||
match room_version {
|
||||
RoomVersionId::V1
|
||||
| RoomVersionId::V2
|
||||
| RoomVersionId::V3
|
||||
| RoomVersionId::V4
|
||||
| RoomVersionId::V5
|
||||
| RoomVersionId::V6
|
||||
| RoomVersionId::V7
|
||||
| RoomVersionId::V8
|
||||
| RoomVersionId::V9
|
||||
| RoomVersionId::V10 => {
|
||||
content.insert(
|
||||
"creator".into(),
|
||||
json!(&sender_user).try_into().map_err(|e| {
|
||||
info!("Invalid creation content: {e}");
|
||||
Error::BadRequest(ErrorKind::BadJson, "Invalid creation content")
|
||||
})?,
|
||||
);
|
||||
}
|
||||
_ => {} // V11 removed the "creator" key
|
||||
}
|
||||
|
||||
content.insert(
|
||||
"room_version".into(),
|
||||
json!(room_version.as_str()).try_into().map_err(|_| {
|
||||
|
@ -143,8 +159,21 @@ pub async fn create_room_route(
|
|||
}
|
||||
None => {
|
||||
// TODO: Add correct value for v11
|
||||
let content = match room_version {
|
||||
RoomVersionId::V1
|
||||
| RoomVersionId::V2
|
||||
| RoomVersionId::V3
|
||||
| RoomVersionId::V4
|
||||
| RoomVersionId::V5
|
||||
| RoomVersionId::V6
|
||||
| RoomVersionId::V7
|
||||
| RoomVersionId::V8
|
||||
| RoomVersionId::V9
|
||||
| RoomVersionId::V10 => RoomCreateEventContent::new_v1(sender_user.clone()),
|
||||
_ => RoomCreateEventContent::new_v11(),
|
||||
};
|
||||
let mut content = serde_json::from_str::<CanonicalJsonObject>(
|
||||
to_raw_value(&RoomCreateEventContent::new_v1(sender_user.clone()))
|
||||
to_raw_value(&content)
|
||||
.map_err(|_| Error::BadRequest(ErrorKind::BadJson, "Invalid creation content"))?
|
||||
.get(),
|
||||
)
|
||||
|
@ -619,12 +648,31 @@ pub async fn upgrade_room_route(
|
|||
));
|
||||
|
||||
// Send a m.room.create event containing a predecessor field and the applicable room_version
|
||||
create_event_content.insert(
|
||||
"creator".into(),
|
||||
json!(&sender_user)
|
||||
.try_into()
|
||||
.map_err(|_| Error::BadRequest(ErrorKind::BadJson, "Error forming creation event"))?,
|
||||
);
|
||||
match body.new_version {
|
||||
RoomVersionId::V1
|
||||
| RoomVersionId::V2
|
||||
| RoomVersionId::V3
|
||||
| RoomVersionId::V4
|
||||
| RoomVersionId::V5
|
||||
| RoomVersionId::V6
|
||||
| RoomVersionId::V7
|
||||
| RoomVersionId::V8
|
||||
| RoomVersionId::V9
|
||||
| RoomVersionId::V10 => {
|
||||
create_event_content.insert(
|
||||
"creator".into(),
|
||||
json!(&sender_user).try_into().map_err(|e| {
|
||||
info!("Error forming creation event: {e}");
|
||||
Error::BadRequest(ErrorKind::BadJson, "Error forming creation event")
|
||||
})?,
|
||||
);
|
||||
}
|
||||
_ => {
|
||||
// "creator" key no longer exists in V11 rooms
|
||||
create_event_content.remove("creator");
|
||||
}
|
||||
}
|
||||
|
||||
create_event_content.insert(
|
||||
"room_version".into(),
|
||||
json!(&body.new_version)
|
||||
|
|
|
@ -90,6 +90,16 @@ pub struct Config {
|
|||
#[serde(default = "default_turn_ttl")]
|
||||
pub turn_ttl: u64,
|
||||
|
||||
pub rocksdb_log_path: Option<PathBuf>,
|
||||
#[serde(default = "default_rocksdb_log_level")]
|
||||
pub rocksdb_log_level: String,
|
||||
#[serde(default = "default_rocksdb_max_log_file_size")]
|
||||
pub rocksdb_max_log_file_size: usize,
|
||||
#[serde(default = "default_rocksdb_log_time_to_roll")]
|
||||
pub rocksdb_log_time_to_roll: usize,
|
||||
#[serde(default = "false_fn")]
|
||||
pub rocksdb_optimize_for_spinning_disks: bool,
|
||||
|
||||
pub emergency_password: Option<String>,
|
||||
|
||||
#[serde(default = "default_notification_push_path")]
|
||||
|
@ -257,6 +267,22 @@ impl fmt::Display for Config {
|
|||
"zstd Response Body Compression",
|
||||
&self.zstd_compression.to_string(),
|
||||
),
|
||||
(
|
||||
"RocksDB database log level",
|
||||
&self.rocksdb_log_level.to_string(),
|
||||
),
|
||||
(
|
||||
"RocksDB database log time-to-roll",
|
||||
&self.rocksdb_log_time_to_roll.to_string(),
|
||||
),
|
||||
(
|
||||
"RocksDB database max log file size",
|
||||
&self.rocksdb_max_log_file_size.to_string(),
|
||||
),
|
||||
(
|
||||
"RocksDB database optimize for spinning disks",
|
||||
&self.rocksdb_optimize_for_spinning_disks.to_string(),
|
||||
),
|
||||
];
|
||||
|
||||
let mut msg: String = "Active config values:\n\n".to_owned();
|
||||
|
@ -290,7 +316,7 @@ fn default_unix_socket_perms() -> u32 {
|
|||
}
|
||||
|
||||
fn default_database_backend() -> String {
|
||||
"sqlite".to_owned()
|
||||
"rocksdb".to_owned()
|
||||
}
|
||||
|
||||
fn default_db_cache_capacity_mb() -> f64 {
|
||||
|
@ -330,7 +356,7 @@ fn default_trusted_servers() -> Vec<OwnedServerName> {
|
|||
}
|
||||
|
||||
fn default_log() -> String {
|
||||
"warn,state_res=warn,_=off,sled=off".to_owned()
|
||||
"warn,state_res=warn".to_owned()
|
||||
}
|
||||
|
||||
fn default_notification_push_path() -> String {
|
||||
|
@ -349,7 +375,20 @@ fn default_presence_offline_timeout_s() -> u64 {
|
|||
15 * 60
|
||||
}
|
||||
|
||||
fn default_rocksdb_log_level() -> String {
|
||||
"info".to_owned()
|
||||
}
|
||||
|
||||
fn default_rocksdb_log_time_to_roll() -> usize {
|
||||
0
|
||||
}
|
||||
|
||||
// I know, it's a great name
|
||||
pub fn default_default_room_version() -> RoomVersionId {
|
||||
RoomVersionId::V10
|
||||
}
|
||||
|
||||
pub fn default_rocksdb_max_log_file_size() -> usize {
|
||||
// 4 megabytes
|
||||
4 * 1024 * 1024
|
||||
}
|
||||
|
|
|
@ -1,11 +1,13 @@
|
|||
use super::{super::Config, watchers::Watchers, KeyValueDatabaseEngine, KvTree};
|
||||
use crate::{utils, Result};
|
||||
use crate::{services, utils, Result};
|
||||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
sync::{Arc, RwLock},
|
||||
};
|
||||
|
||||
use rocksdb::LogLevel::{Debug, Error, Fatal, Info, Warn};
|
||||
|
||||
pub struct Engine {
|
||||
rocks: rocksdb::DBWithThreadMode<rocksdb::MultiThreaded>,
|
||||
max_open_files: i32,
|
||||
|
@ -21,7 +23,9 @@ pub struct RocksDbEngineTree<'a> {
|
|||
}
|
||||
|
||||
fn db_options(max_open_files: i32, rocksdb_cache: &rocksdb::Cache) -> rocksdb::Options {
|
||||
// block-based options: https://docs.rs/rocksdb/latest/rocksdb/struct.BlockBasedOptions.html#
|
||||
let mut block_based_options = rocksdb::BlockBasedOptions::default();
|
||||
|
||||
block_based_options.set_block_cache(rocksdb_cache);
|
||||
|
||||
// "Difference of spinning disk"
|
||||
|
@ -29,18 +33,40 @@ fn db_options(max_open_files: i32, rocksdb_cache: &rocksdb::Cache) -> rocksdb::O
|
|||
block_based_options.set_block_size(64 * 1024);
|
||||
block_based_options.set_cache_index_and_filter_blocks(true);
|
||||
|
||||
// database options: https://docs.rs/rocksdb/latest/rocksdb/struct.Options.html#
|
||||
let mut db_opts = rocksdb::Options::default();
|
||||
|
||||
let rocksdb_log_level = match services().globals.rocksdb_log_level().as_str() {
|
||||
"debug" => Debug,
|
||||
"info" => Info,
|
||||
"warn" => Warn,
|
||||
"error" => Error,
|
||||
"fatal" => Fatal,
|
||||
_ => Info,
|
||||
};
|
||||
|
||||
db_opts.set_log_level(rocksdb_log_level);
|
||||
db_opts.set_max_log_file_size(services().globals.rocksdb_max_log_file_size());
|
||||
db_opts.set_log_file_time_to_roll(services().globals.rocksdb_log_time_to_roll());
|
||||
|
||||
if services().globals.rocksdb_optimize_for_spinning_disks() {
|
||||
// useful for hard drives but on literally any half-decent SSD this is not useful
|
||||
// and the benefits of improved compaction based on up to date stats are good.
|
||||
// current conduwut users have NVMe/SSDs.
|
||||
db_opts.set_skip_stats_update_on_db_open(true);
|
||||
|
||||
db_opts.set_compaction_readahead_size(2 * 1024 * 1024); // default compaction_readahead_size is 0 which is good for SSDs
|
||||
db_opts.set_target_file_size_base(256 * 1024 * 1024); // default target_file_size is 64MB which is good for SSDs
|
||||
db_opts.set_optimize_filters_for_hits(true); // doesn't really seem useful for fast storage
|
||||
} else {
|
||||
db_opts.set_skip_stats_update_on_db_open(false);
|
||||
db_opts.set_max_bytes_for_level_base(512 * 1024 * 1024);
|
||||
db_opts.set_use_direct_reads(true);
|
||||
db_opts.set_use_direct_io_for_flush_and_compaction(true);
|
||||
}
|
||||
|
||||
db_opts.set_block_based_table_factory(&block_based_options);
|
||||
db_opts.set_optimize_filters_for_hits(true);
|
||||
db_opts.set_skip_stats_update_on_db_open(true);
|
||||
db_opts.set_level_compaction_dynamic_level_bytes(true);
|
||||
db_opts.set_target_file_size_base(256 * 1024 * 1024);
|
||||
// defaults to 1MB
|
||||
//db_opts.set_writable_file_max_buffer_size(1024 * 1024);
|
||||
// defaults to 2MB now
|
||||
//db_opts.set_compaction_readahead_size(2 * 1024 * 1024);
|
||||
db_opts.set_use_direct_reads(true);
|
||||
db_opts.set_use_direct_io_for_flush_and_compaction(true);
|
||||
db_opts.create_if_missing(true);
|
||||
db_opts.increase_parallelism(num_cpus::get() as i32);
|
||||
db_opts.set_max_open_files(max_open_files);
|
||||
|
|
|
@ -240,7 +240,9 @@ impl KeyValueDatabase {
|
|||
|
||||
if !Path::new(&config.database_path).exists() {
|
||||
std::fs::create_dir_all(&config.database_path)
|
||||
.map_err(|_| Error::BadConfig("Database folder doesn't exists and couldn't be created (e.g. due to missing permissions). Please create the database folder yourself."))?;
|
||||
.map_err(|e| {
|
||||
error!("Failed to create database path: {e}");
|
||||
Error::BadConfig("Database folder doesn't exists and couldn't be created (e.g. due to missing permissions). Please create the database folder yourself.")})?;
|
||||
}
|
||||
|
||||
let builder: Arc<dyn KeyValueDatabaseEngine> = match &*config.database_backend {
|
||||
|
|
|
@ -292,6 +292,8 @@ enum DebugCommand {
|
|||
/// An event ID (a $ followed by the base64 reference hash)
|
||||
event_id: Box<EventId>,
|
||||
},
|
||||
|
||||
ForceDeviceListUpdates,
|
||||
}
|
||||
|
||||
#[cfg_attr(test, derive(Debug))]
|
||||
|
@ -1263,6 +1265,15 @@ impl Service {
|
|||
None => RoomMessageEventContent::text_plain("PDU not found."),
|
||||
}
|
||||
}
|
||||
DebugCommand::ForceDeviceListUpdates => {
|
||||
// Force E2EE device list updates for all users
|
||||
for user_id in services().users.iter().filter_map(|r| r.ok()) {
|
||||
services().users.mark_device_key_update(&user_id)?;
|
||||
}
|
||||
RoomMessageEventContent::text_plain(
|
||||
"Marked all devices for all users as having new keys to update",
|
||||
)
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
|
@ -1394,10 +1405,24 @@ impl Service {
|
|||
|
||||
services().users.create(&conduit_user, None)?;
|
||||
|
||||
let mut content = RoomCreateEventContent::new_v1(conduit_user.clone());
|
||||
let room_version = services().globals.default_room_version();
|
||||
let mut content = match room_version {
|
||||
RoomVersionId::V1
|
||||
| RoomVersionId::V2
|
||||
| RoomVersionId::V3
|
||||
| RoomVersionId::V4
|
||||
| RoomVersionId::V5
|
||||
| RoomVersionId::V6
|
||||
| RoomVersionId::V7
|
||||
| RoomVersionId::V8
|
||||
| RoomVersionId::V9
|
||||
| RoomVersionId::V10 => RoomCreateEventContent::new_v1(conduit_user.clone()),
|
||||
_ => RoomCreateEventContent::new_v11(),
|
||||
};
|
||||
|
||||
content.federate = true;
|
||||
content.predecessor = None;
|
||||
content.room_version = services().globals.default_room_version();
|
||||
content.room_version = room_version;
|
||||
|
||||
// 1. The room create event
|
||||
services()
|
||||
|
|
|
@ -395,6 +395,22 @@ impl Service {
|
|||
self.config.presence_offline_timeout_s
|
||||
}
|
||||
|
||||
pub fn rocksdb_log_level(&self) -> &String {
|
||||
&self.config.rocksdb_log_level
|
||||
}
|
||||
|
||||
pub fn rocksdb_max_log_file_size(&self) -> usize {
|
||||
self.config.rocksdb_max_log_file_size
|
||||
}
|
||||
|
||||
pub fn rocksdb_log_time_to_roll(&self) -> usize {
|
||||
self.config.rocksdb_log_time_to_roll
|
||||
}
|
||||
|
||||
pub fn rocksdb_optimize_for_spinning_disks(&self) -> bool {
|
||||
self.config.rocksdb_optimize_for_spinning_disks
|
||||
}
|
||||
|
||||
pub fn supported_room_versions(&self) -> Vec<RoomVersionId> {
|
||||
let mut room_versions: Vec<RoomVersionId> = vec![];
|
||||
room_versions.extend(self.stable_room_versions.clone());
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
use crate::Error;
|
||||
use ruma::{
|
||||
canonical_json::redact_content_in_place,
|
||||
events::{
|
||||
room::member::RoomMemberEventContent, space::child::HierarchySpaceChildEvent,
|
||||
AnyEphemeralRoomEvent, AnyMessageLikeEvent, AnyStateEvent, AnyStrippedStateEvent,
|
||||
|
@ -49,44 +50,23 @@ pub struct PduEvent {
|
|||
|
||||
impl PduEvent {
|
||||
#[tracing::instrument(skip(self))]
|
||||
pub fn redact(&mut self, reason: &PduEvent) -> crate::Result<()> {
|
||||
pub fn redact(
|
||||
&mut self,
|
||||
room_version_id: RoomVersionId,
|
||||
reason: &PduEvent,
|
||||
) -> crate::Result<()> {
|
||||
self.unsigned = None;
|
||||
|
||||
let allowed: &[&str] = match self.kind {
|
||||
TimelineEventType::RoomMember => &["join_authorised_via_users_server", "membership"],
|
||||
TimelineEventType::RoomCreate => &["creator"],
|
||||
TimelineEventType::RoomJoinRules => &["join_rule"],
|
||||
TimelineEventType::RoomPowerLevels => &[
|
||||
"ban",
|
||||
"events",
|
||||
"events_default",
|
||||
"kick",
|
||||
"redact",
|
||||
"state_default",
|
||||
"users",
|
||||
"users_default",
|
||||
],
|
||||
TimelineEventType::RoomHistoryVisibility => &["history_visibility"],
|
||||
_ => &[],
|
||||
};
|
||||
|
||||
let mut old_content: BTreeMap<String, serde_json::Value> =
|
||||
serde_json::from_str(self.content.get())
|
||||
.map_err(|_| Error::bad_database("PDU in db has invalid content."))?;
|
||||
|
||||
let mut new_content = serde_json::Map::new();
|
||||
|
||||
for key in allowed {
|
||||
if let Some(value) = old_content.remove(*key) {
|
||||
new_content.insert((*key).to_owned(), value);
|
||||
}
|
||||
}
|
||||
let mut content = serde_json::from_str(self.content.get())
|
||||
.map_err(|_| Error::bad_database("PDU in db has invalid content."))?;
|
||||
redact_content_in_place(&mut content, &room_version_id, self.kind.to_string())
|
||||
.map_err(|e| Error::RedactionError(self.sender.server_name().to_owned(), e))?;
|
||||
|
||||
self.unsigned = Some(to_raw_value(&json!({
|
||||
"redacted_because": serde_json::to_value(reason).expect("to_value(PduEvent) always works")
|
||||
})).expect("to string always works"));
|
||||
|
||||
self.content = to_raw_value(&new_content).expect("to string always works");
|
||||
self.content = to_raw_value(&content).expect("to string always works");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -799,6 +799,7 @@ impl Service {
|
|||
// applied. We start with the previous extremities (aka leaves)
|
||||
debug!("Calculating extremities");
|
||||
let mut extremities = services().rooms.state.get_forward_extremities(room_id)?;
|
||||
debug!("Amount of forward extremities in room {room_id}: {extremities:?}");
|
||||
|
||||
// Remove any forward extremities that are referenced by this incoming event's prev_events
|
||||
for prev_event in &incoming_pdu.prev_events {
|
||||
|
@ -1142,8 +1143,8 @@ impl Service {
|
|||
events_in_reverse_order.push((next_id.clone(), value));
|
||||
events_all.insert(next_id);
|
||||
}
|
||||
Err(_) => {
|
||||
warn!("Failed to fetch event: {}", next_id);
|
||||
Err(e) => {
|
||||
warn!("Failed to fetch event {} | {e}", next_id);
|
||||
back_off((*next_id).to_owned());
|
||||
}
|
||||
}
|
||||
|
@ -1268,7 +1269,10 @@ impl Service {
|
|||
|
||||
if amount > services().globals.max_fetch_prev_events() {
|
||||
// Max limit reached
|
||||
info!("Max prev event limit reached!");
|
||||
info!(
|
||||
"Max prev event limit reached! Limit: {}",
|
||||
services().globals.max_fetch_prev_events()
|
||||
);
|
||||
graph.insert(prev_event_id.clone(), HashSet::new());
|
||||
continue;
|
||||
}
|
||||
|
@ -1322,7 +1326,10 @@ impl Service {
|
|||
),
|
||||
))
|
||||
})
|
||||
.map_err(|_| Error::bad_database("Error sorting prev events"))?;
|
||||
.map_err(|e| {
|
||||
error!("Error sorting prev events: {e}");
|
||||
Error::bad_database("Error sorting prev events")
|
||||
})?;
|
||||
|
||||
Ok((sorted, eventid_info))
|
||||
}
|
||||
|
@ -1339,6 +1346,7 @@ impl Service {
|
|||
let mut server_key_ids = HashMap::new();
|
||||
|
||||
for event in events.into_iter() {
|
||||
debug!("Fetching keys for event: {event:?}");
|
||||
for (signature_server, signature) in event
|
||||
.get("signatures")
|
||||
.ok_or(Error::BadServerResponse(
|
||||
|
@ -1364,6 +1372,7 @@ impl Service {
|
|||
|
||||
if server_key_ids.is_empty() {
|
||||
// Nothing to do, can exit early
|
||||
debug!("server_key_ids is empty, not fetching any keys");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
|
@ -1382,7 +1391,8 @@ impl Service {
|
|||
let signature_server2 = signature_server.clone();
|
||||
let fetch_res = self
|
||||
.fetch_signing_keys_for_server(
|
||||
signature_server2.as_str().try_into().map_err(|_| {
|
||||
signature_server2.as_str().try_into().map_err(|e| {
|
||||
info!("Invalid servername in signatures of server response pdu: {e}");
|
||||
(
|
||||
signature_server.clone(),
|
||||
Error::BadServerResponse(
|
||||
|
@ -1397,7 +1407,7 @@ impl Service {
|
|||
match fetch_res {
|
||||
Ok(keys) => Ok((signature_server, keys)),
|
||||
Err(e) => {
|
||||
warn!("Signature verification failed: Could not fetch signing key.",);
|
||||
warn!("Signature verification failed: Could not fetch signing key for {signature_server}: {e}",);
|
||||
Err((signature_server, e))
|
||||
}
|
||||
}
|
||||
|
@ -1483,7 +1493,8 @@ impl Service {
|
|||
signature_ids.iter().all(|id| keys.contains_key(id))
|
||||
};
|
||||
|
||||
let origin = <&ServerName>::try_from(signature_server.as_str()).map_err(|_| {
|
||||
let origin = <&ServerName>::try_from(signature_server.as_str()).map_err(|e| {
|
||||
info!("Invalid servername in signatures of server response pdu: {e}");
|
||||
Error::BadServerResponse("Invalid servername in signatures of server response pdu.")
|
||||
})?;
|
||||
|
||||
|
@ -1491,7 +1502,7 @@ impl Service {
|
|||
continue;
|
||||
}
|
||||
|
||||
trace!("Loading signing keys for {}", origin);
|
||||
debug!("Loading signing keys for {}", origin);
|
||||
|
||||
let result: BTreeMap<_, _> = services()
|
||||
.globals
|
||||
|
@ -1501,7 +1512,7 @@ impl Service {
|
|||
.collect();
|
||||
|
||||
if !contains_all_ids(&result) {
|
||||
trace!("Signing key not loaded for {}", origin);
|
||||
debug!("Signing key not loaded for {}", origin);
|
||||
servers.insert(origin.to_owned(), BTreeMap::new());
|
||||
}
|
||||
|
||||
|
@ -1540,7 +1551,7 @@ impl Service {
|
|||
}
|
||||
|
||||
if servers.is_empty() {
|
||||
info!("We had all keys locally");
|
||||
info!("server is empty, we had all keys locally, not fetching any keys");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
|
@ -1556,7 +1567,7 @@ impl Service {
|
|||
)
|
||||
.await
|
||||
{
|
||||
trace!("Got signing keys: {:?}", keys);
|
||||
debug!("Got signing keys: {:?}", keys);
|
||||
let mut pkm = pub_key_map
|
||||
.write()
|
||||
.map_err(|_| Error::bad_database("RwLock is poisoned."))?;
|
||||
|
@ -1588,7 +1599,7 @@ impl Service {
|
|||
}
|
||||
|
||||
if servers.is_empty() {
|
||||
info!("Trusted server supplied all signing keys");
|
||||
info!("Trusted server supplied all signing keys, no more keys to fetch");
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
@ -1639,25 +1650,36 @@ impl Service {
|
|||
&StateEventType::RoomServerAcl,
|
||||
"",
|
||||
)? {
|
||||
Some(acl) => acl,
|
||||
None => return Ok(()),
|
||||
Some(acl) => {
|
||||
debug!("ACL event found: {acl:?}");
|
||||
acl
|
||||
}
|
||||
None => {
|
||||
info!("No ACL event found");
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
let acl_event_content: RoomServerAclEventContent =
|
||||
match serde_json::from_str(acl_event.content.get()) {
|
||||
Ok(content) => content,
|
||||
Err(_) => {
|
||||
warn!("Invalid ACL event");
|
||||
Ok(content) => {
|
||||
debug!("Found ACL event contents: {content:?}");
|
||||
content
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Invalid ACL event: {e}");
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
if acl_event_content.allow.is_empty() {
|
||||
warn!("Ignoring broken ACL event (allow key is empty)");
|
||||
// Ignore broken acl events
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if acl_event_content.is_allowed(server_name) {
|
||||
debug!("server {server_name} is allowed by ACL");
|
||||
Ok(())
|
||||
} else {
|
||||
info!(
|
||||
|
@ -1737,7 +1759,7 @@ impl Service {
|
|||
}
|
||||
}
|
||||
|
||||
trace!("Loading signing keys for {}", origin);
|
||||
debug!("Loading signing keys for {}", origin);
|
||||
|
||||
let mut result: BTreeMap<_, _> = services()
|
||||
.globals
|
||||
|
@ -1792,7 +1814,7 @@ impl Service {
|
|||
MilliSecondsSinceUnixEpoch::from_system_time(
|
||||
SystemTime::now()
|
||||
.checked_add(Duration::from_secs(3600))
|
||||
.expect("SystemTime to large"),
|
||||
.expect("SystemTime too large"),
|
||||
)
|
||||
.expect("time is valid"),
|
||||
),
|
||||
|
@ -1806,7 +1828,7 @@ impl Service {
|
|||
.collect::<Vec<_>>()
|
||||
})
|
||||
{
|
||||
trace!("Got signing keys: {:?}", server_keys);
|
||||
debug!("Got signing keys: {:?}", server_keys);
|
||||
for k in server_keys {
|
||||
services().globals.add_signing_key(origin, k.clone())?;
|
||||
result.extend(
|
||||
|
|
|
@ -28,7 +28,7 @@ use ruma::{
|
|||
state_res,
|
||||
state_res::{Event, RoomVersion},
|
||||
uint, user_id, CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId,
|
||||
OwnedServerName, RoomAliasId, RoomId, ServerName, UserId,
|
||||
OwnedServerName, RoomAliasId, RoomId, RoomVersionId, ServerName, UserId,
|
||||
};
|
||||
use serde::Deserialize;
|
||||
use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
|
||||
|
@ -139,6 +139,27 @@ impl Service {
|
|||
}
|
||||
*/
|
||||
|
||||
/// Returns the version of a room, if known
|
||||
pub fn get_room_version(&self, room_id: &RoomId) -> Result<Option<RoomVersionId>> {
|
||||
let create_event = services().rooms.state_accessor.room_state_get(
|
||||
room_id,
|
||||
&StateEventType::RoomCreate,
|
||||
"",
|
||||
)?;
|
||||
|
||||
let create_event_content: Option<RoomCreateEventContent> = create_event
|
||||
.as_ref()
|
||||
.map(|create_event| {
|
||||
serde_json::from_str(create_event.content.get()).map_err(|e| {
|
||||
warn!("Invalid create event: {}", e);
|
||||
Error::bad_database("Invalid create event in db.")
|
||||
})
|
||||
})
|
||||
.transpose()?;
|
||||
|
||||
Ok(create_event_content.map(|content| content.room_version))
|
||||
}
|
||||
|
||||
/// Returns the json of a pdu.
|
||||
pub fn get_pdu_json(&self, event_id: &EventId) -> Result<Option<CanonicalJsonObject>> {
|
||||
self.db.get_pdu_json(event_id)
|
||||
|
@ -340,8 +361,10 @@ impl Service {
|
|||
GlobalAccountDataEventType::PushRules.to_string().into(),
|
||||
)?
|
||||
.map(|event| {
|
||||
serde_json::from_str::<PushRulesEvent>(event.get())
|
||||
.map_err(|_| Error::bad_database("Invalid push rules event in db."))
|
||||
serde_json::from_str::<PushRulesEvent>(event.get()).map_err(|e| {
|
||||
warn!("Invalid push rules event in db for user ID {user}: {e}");
|
||||
Error::bad_database("Invalid push rules event in db.")
|
||||
})
|
||||
})
|
||||
.transpose()?
|
||||
.map(|ev: PushRulesEvent| ev.content.global)
|
||||
|
@ -384,9 +407,39 @@ impl Service {
|
|||
|
||||
match pdu.kind {
|
||||
TimelineEventType::RoomRedaction => {
|
||||
if let Some(redact_id) = &pdu.redacts {
|
||||
self.redact_pdu(redact_id, pdu)?;
|
||||
}
|
||||
let room_version_id = self
|
||||
.get_room_version(&pdu.room_id)?
|
||||
.expect("Got RoomRedaction in a room of unknown version");
|
||||
match room_version_id {
|
||||
RoomVersionId::V1
|
||||
| RoomVersionId::V2
|
||||
| RoomVersionId::V3
|
||||
| RoomVersionId::V4
|
||||
| RoomVersionId::V5
|
||||
| RoomVersionId::V6
|
||||
| RoomVersionId::V7
|
||||
| RoomVersionId::V8
|
||||
| RoomVersionId::V9
|
||||
| RoomVersionId::V10 => {
|
||||
if let Some(redact_id) = &pdu.redacts {
|
||||
self.redact_pdu(redact_id, pdu)?;
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
#[derive(Deserialize)]
|
||||
struct Redaction {
|
||||
redacts: Option<OwnedEventId>,
|
||||
}
|
||||
let content = serde_json::from_str::<Redaction>(pdu.content.get())
|
||||
.map_err(|e| {
|
||||
warn!("Invalid content in redaction pdu: {e}");
|
||||
Error::bad_database("Invalid content in redaction pdu.")
|
||||
})?;
|
||||
if let Some(redact_id) = &content.redacts {
|
||||
self.redact_pdu(redact_id, pdu)?;
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
TimelineEventType::SpaceChild => {
|
||||
if let Some(_state_key) = &pdu.state_key {
|
||||
|
@ -650,28 +703,28 @@ impl Service {
|
|||
.take(20)
|
||||
.collect();
|
||||
|
||||
let create_event = services().rooms.state_accessor.room_state_get(
|
||||
room_id,
|
||||
&StateEventType::RoomCreate,
|
||||
"",
|
||||
)?;
|
||||
|
||||
let create_event_content: Option<RoomCreateEventContent> = create_event
|
||||
.as_ref()
|
||||
.map(|create_event| {
|
||||
serde_json::from_str(create_event.content.get()).map_err(|e| {
|
||||
warn!("Invalid database create event: {}", e);
|
||||
Error::bad_database("Invalid create event in db.")
|
||||
})
|
||||
let room_version_id = self
|
||||
.get_room_version(room_id)?
|
||||
.or_else(|| {
|
||||
if event_type == TimelineEventType::RoomCreate {
|
||||
#[derive(Deserialize)]
|
||||
struct RoomCreate {
|
||||
room_version: RoomVersionId,
|
||||
}
|
||||
let content = serde_json::from_str::<RoomCreate>(content.get())
|
||||
.expect("Invalid content in RoomCreate pdu.");
|
||||
Some(content.room_version)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.transpose()?;
|
||||
.ok_or_else(|| {
|
||||
Error::InconsistentRoomState(
|
||||
"non-create event for room of unknown version",
|
||||
room_id.to_owned(),
|
||||
)
|
||||
})?;
|
||||
|
||||
// If there was no create event yet, assume we are creating a room with the default
|
||||
// version right now
|
||||
let room_version_id = create_event_content
|
||||
.map_or(services().globals.default_room_version(), |create_event| {
|
||||
create_event.room_version
|
||||
});
|
||||
let room_version = RoomVersion::new(&room_version_id).expect("room version is supported");
|
||||
|
||||
let auth_events = services().rooms.state.get_auth_events(
|
||||
|
@ -1041,7 +1094,11 @@ impl Service {
|
|||
let mut pdu = self
|
||||
.get_pdu_from_id(&pdu_id)?
|
||||
.ok_or_else(|| Error::bad_database("PDU ID points to invalid PDU."))?;
|
||||
pdu.redact(reason)?;
|
||||
|
||||
let room_version_id = self.get_room_version(&pdu.room_id)?.ok_or_else(|| {
|
||||
Error::bad_database("Trying to redact PDU in in room of unknown version")
|
||||
})?;
|
||||
pdu.redact(room_version_id, reason)?;
|
||||
self.replace_pdu(
|
||||
&pdu_id,
|
||||
&utils::to_canonical_object(&pdu).expect("PDU is an object"),
|
||||
|
|
|
@ -80,6 +80,10 @@ pub enum Error {
|
|||
#[cfg(feature = "conduit_bin")]
|
||||
#[error("{0}")]
|
||||
PathError(#[from] axum::extract::rejection::PathRejection),
|
||||
#[error("from {0}: {1}")]
|
||||
RedactionError(OwnedServerName, ruma::canonical_json::RedactionError),
|
||||
#[error("{0} in {1}")]
|
||||
InconsistentRoomState(&'static str, ruma::OwnedRoomId),
|
||||
}
|
||||
|
||||
impl Error {
|
||||
|
|
Loading…
Add table
Reference in a new issue