diff --git a/Cargo.lock b/Cargo.lock index 19f63b1a..9f62c18f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2110,7 +2110,7 @@ dependencies = [ [[package]] name = "ruma" version = "0.8.2" -source = "git+https://github.com/ruma/ruma?rev=de9a5a6ecca197e59623c210bd21f53055f83568#de9a5a6ecca197e59623c210bd21f53055f83568" +source = "git+https://github.com/ruma/ruma?rev=38294bd5206498c02b1001227d65654eb548308b#38294bd5206498c02b1001227d65654eb548308b" dependencies = [ "assign", "js_int", @@ -2128,7 +2128,7 @@ dependencies = [ [[package]] name = "ruma-appservice-api" version = "0.8.1" -source = "git+https://github.com/ruma/ruma?rev=de9a5a6ecca197e59623c210bd21f53055f83568#de9a5a6ecca197e59623c210bd21f53055f83568" +source = "git+https://github.com/ruma/ruma?rev=38294bd5206498c02b1001227d65654eb548308b#38294bd5206498c02b1001227d65654eb548308b" dependencies = [ "js_int", "ruma-common", @@ -2139,7 +2139,7 @@ dependencies = [ [[package]] name = "ruma-client-api" version = "0.16.2" -source = "git+https://github.com/ruma/ruma?rev=de9a5a6ecca197e59623c210bd21f53055f83568#de9a5a6ecca197e59623c210bd21f53055f83568" +source = "git+https://github.com/ruma/ruma?rev=38294bd5206498c02b1001227d65654eb548308b#38294bd5206498c02b1001227d65654eb548308b" dependencies = [ "assign", "bytes", @@ -2156,7 +2156,7 @@ dependencies = [ [[package]] name = "ruma-common" version = "0.11.3" -source = "git+https://github.com/ruma/ruma?rev=de9a5a6ecca197e59623c210bd21f53055f83568#de9a5a6ecca197e59623c210bd21f53055f83568" +source = "git+https://github.com/ruma/ruma?rev=38294bd5206498c02b1001227d65654eb548308b#38294bd5206498c02b1001227d65654eb548308b" dependencies = [ "base64 0.21.2", "bytes", @@ -2184,7 +2184,7 @@ dependencies = [ [[package]] name = "ruma-federation-api" version = "0.7.1" -source = "git+https://github.com/ruma/ruma?rev=de9a5a6ecca197e59623c210bd21f53055f83568#de9a5a6ecca197e59623c210bd21f53055f83568" +source = "git+https://github.com/ruma/ruma?rev=38294bd5206498c02b1001227d65654eb548308b#38294bd5206498c02b1001227d65654eb548308b" dependencies = [ "js_int", "ruma-common", @@ -2195,7 +2195,7 @@ dependencies = [ [[package]] name = "ruma-identifiers-validation" version = "0.9.1" -source = "git+https://github.com/ruma/ruma?rev=de9a5a6ecca197e59623c210bd21f53055f83568#de9a5a6ecca197e59623c210bd21f53055f83568" +source = "git+https://github.com/ruma/ruma?rev=38294bd5206498c02b1001227d65654eb548308b#38294bd5206498c02b1001227d65654eb548308b" dependencies = [ "js_int", "thiserror", @@ -2204,7 +2204,7 @@ dependencies = [ [[package]] name = "ruma-identity-service-api" version = "0.7.1" -source = "git+https://github.com/ruma/ruma?rev=de9a5a6ecca197e59623c210bd21f53055f83568#de9a5a6ecca197e59623c210bd21f53055f83568" +source = "git+https://github.com/ruma/ruma?rev=38294bd5206498c02b1001227d65654eb548308b#38294bd5206498c02b1001227d65654eb548308b" dependencies = [ "js_int", "ruma-common", @@ -2214,7 +2214,7 @@ dependencies = [ [[package]] name = "ruma-macros" version = "0.11.3" -source = "git+https://github.com/ruma/ruma?rev=de9a5a6ecca197e59623c210bd21f53055f83568#de9a5a6ecca197e59623c210bd21f53055f83568" +source = "git+https://github.com/ruma/ruma?rev=38294bd5206498c02b1001227d65654eb548308b#38294bd5206498c02b1001227d65654eb548308b" dependencies = [ "once_cell", "proc-macro-crate", @@ -2229,7 +2229,7 @@ dependencies = [ [[package]] name = "ruma-push-gateway-api" version = "0.7.1" -source = "git+https://github.com/ruma/ruma?rev=de9a5a6ecca197e59623c210bd21f53055f83568#de9a5a6ecca197e59623c210bd21f53055f83568" +source = "git+https://github.com/ruma/ruma?rev=38294bd5206498c02b1001227d65654eb548308b#38294bd5206498c02b1001227d65654eb548308b" dependencies = [ "js_int", "ruma-common", @@ -2240,7 +2240,7 @@ dependencies = [ [[package]] name = "ruma-signatures" version = "0.13.1" -source = "git+https://github.com/ruma/ruma?rev=de9a5a6ecca197e59623c210bd21f53055f83568#de9a5a6ecca197e59623c210bd21f53055f83568" +source = "git+https://github.com/ruma/ruma?rev=38294bd5206498c02b1001227d65654eb548308b#38294bd5206498c02b1001227d65654eb548308b" dependencies = [ "base64 0.21.2", "ed25519-dalek", @@ -2256,7 +2256,7 @@ dependencies = [ [[package]] name = "ruma-state-res" version = "0.9.1" -source = "git+https://github.com/ruma/ruma?rev=de9a5a6ecca197e59623c210bd21f53055f83568#de9a5a6ecca197e59623c210bd21f53055f83568" +source = "git+https://github.com/ruma/ruma?rev=38294bd5206498c02b1001227d65654eb548308b#38294bd5206498c02b1001227d65654eb548308b" dependencies = [ "itertools", "js_int", diff --git a/Cargo.toml b/Cargo.toml index 31648422..9698caff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,7 @@ tower-http = { version = "0.4.1", features = ["add-extension", "cors", "sensitiv # Used for matrix spec type definitions and helpers #ruma = { version = "0.4.0", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] } -ruma = { git = "https://github.com/ruma/ruma", rev = "de9a5a6ecca197e59623c210bd21f53055f83568", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-msc2448", "unstable-exhaustive-types", "ring-compat", "unstable-unspecified" ] } +ruma = { git = "https://github.com/ruma/ruma", rev = "38294bd5206498c02b1001227d65654eb548308b", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-msc2448", "unstable-exhaustive-types", "ring-compat", "unstable-unspecified" ] } #ruma = { git = "https://github.com/timokoesters/ruma", rev = "50c1db7e0a3a21fc794b0cce3b64285a4c750c71", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] } #ruma = { path = "../ruma/crates/ruma", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] } diff --git a/src/api/client_server/membership.rs b/src/api/client_server/membership.rs index 11e37e62..ccd8d7a1 100644 --- a/src/api/client_server/membership.rs +++ b/src/api/client_server/membership.rs @@ -743,15 +743,17 @@ async fn join_room_by_id_helper( info!("Saving state from send_join"); let (statehash_before_join, new, removed) = services().rooms.state_compressor.save_state( room_id, - state - .into_iter() - .map(|(k, id)| { - services() - .rooms - .state_compressor - .compress_state_event(k, &id) - }) - .collect::>()?, + Arc::new( + state + .into_iter() + .map(|(k, id)| { + services() + .rooms + .state_compressor + .compress_state_event(k, &id) + }) + .collect::>()?, + ), )?; services() diff --git a/src/database/key_value/rooms/state_accessor.rs b/src/database/key_value/rooms/state_accessor.rs index 0f0c0dc7..ad08f46e 100644 --- a/src/database/key_value/rooms/state_accessor.rs +++ b/src/database/key_value/rooms/state_accessor.rs @@ -16,7 +16,7 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase { .1; let mut result = HashMap::new(); let mut i = 0; - for compressed in full_state.into_iter() { + for compressed in full_state.iter() { let parsed = services() .rooms .state_compressor @@ -45,7 +45,7 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase { let mut result = HashMap::new(); let mut i = 0; - for compressed in full_state { + for compressed in full_state.iter() { let (_, eventid) = services() .rooms .state_compressor @@ -95,7 +95,7 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase { .expect("there is always one layer") .1; Ok(full_state - .into_iter() + .iter() .find(|bytes| bytes.starts_with(&shortstatekey.to_be_bytes())) .and_then(|compressed| { services() diff --git a/src/database/key_value/rooms/state_compressor.rs b/src/database/key_value/rooms/state_compressor.rs index d0a9be48..65ea603e 100644 --- a/src/database/key_value/rooms/state_compressor.rs +++ b/src/database/key_value/rooms/state_compressor.rs @@ -1,4 +1,4 @@ -use std::{collections::HashSet, mem::size_of}; +use std::{collections::HashSet, mem::size_of, sync::Arc}; use crate::{ database::KeyValueDatabase, @@ -37,20 +37,20 @@ impl service::rooms::state_compressor::Data for KeyValueDatabase { Ok(StateDiff { parent, - added, - removed, + added: Arc::new(added), + removed: Arc::new(removed), }) } fn save_statediff(&self, shortstatehash: u64, diff: StateDiff) -> Result<()> { let mut value = diff.parent.unwrap_or(0).to_be_bytes().to_vec(); - for new in &diff.added { + for new in diff.added.iter() { value.extend_from_slice(&new[..]); } if !diff.removed.is_empty() { value.extend_from_slice(&0_u64.to_be_bytes()); - for removed in &diff.removed { + for removed in diff.removed.iter() { value.extend_from_slice(&removed[..]); } } diff --git a/src/database/mod.rs b/src/database/mod.rs index 5d89d4af..4e7bda6b 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -587,8 +587,8 @@ impl KeyValueDatabase { services().rooms.state_compressor.save_state_from_diff( current_sstatehash, - statediffnew, - statediffremoved, + Arc::new(statediffnew), + Arc::new(statediffremoved), 2, // every state change is 2 event changes on average states_parents, )?; diff --git a/src/service/mod.rs b/src/service/mod.rs index 3b488101..7a2bb64c 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -90,7 +90,7 @@ impl Services { state_compressor: rooms::state_compressor::Service { db, stateinfo_cache: Mutex::new(LruCache::new( - (100.0 * config.conduit_cache_capacity_modifier) as usize, + (1000.0 * config.conduit_cache_capacity_modifier) as usize, )), }, timeline: rooms::timeline::Service { diff --git a/src/service/rooms/event_handler/mod.rs b/src/service/rooms/event_handler/mod.rs index 800d849d..066cef48 100644 --- a/src/service/rooms/event_handler/mod.rs +++ b/src/service/rooms/event_handler/mod.rs @@ -774,15 +774,17 @@ impl Service { }); info!("Compressing state at event"); - let state_ids_compressed = state_at_incoming_event - .iter() - .map(|(shortstatekey, id)| { - services() - .rooms - .state_compressor - .compress_state_event(*shortstatekey, id) - }) - .collect::>()?; + let state_ids_compressed = Arc::new( + state_at_incoming_event + .iter() + .map(|(shortstatekey, id)| { + services() + .rooms + .state_compressor + .compress_state_event(*shortstatekey, id) + }) + .collect::>()?, + ); if incoming_pdu.state_key.is_some() { info!("Preparing for stateres to derive new room state"); @@ -886,7 +888,7 @@ impl Service { room_id: &RoomId, room_version_id: &RoomVersionId, incoming_state: HashMap>, - ) -> Result> { + ) -> Result>> { info!("Loading current room state ids"); let current_sstatehash = services() .rooms @@ -966,7 +968,7 @@ impl Service { }) .collect::>()?; - Ok(new_room_state) + Ok(Arc::new(new_room_state)) } /// Find the event and auth it. Once the event is validated (steps 1 - 8) diff --git a/src/service/rooms/state/mod.rs b/src/service/rooms/state/mod.rs index 21ad2f9b..ca9430f1 100644 --- a/src/service/rooms/state/mod.rs +++ b/src/service/rooms/state/mod.rs @@ -32,11 +32,11 @@ impl Service { &self, room_id: &RoomId, shortstatehash: u64, - statediffnew: HashSet, - _statediffremoved: HashSet, + statediffnew: Arc>, + _statediffremoved: Arc>, state_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex ) -> Result<()> { - for event_id in statediffnew.into_iter().filter_map(|new| { + for event_id in statediffnew.iter().filter_map(|new| { services() .rooms .state_compressor @@ -107,7 +107,7 @@ impl Service { &self, event_id: &EventId, room_id: &RoomId, - state_ids_compressed: HashSet, + state_ids_compressed: Arc>, ) -> Result { let shorteventid = services() .rooms @@ -152,9 +152,9 @@ impl Service { .copied() .collect(); - (statediffnew, statediffremoved) + (Arc::new(statediffnew), Arc::new(statediffremoved)) } else { - (state_ids_compressed, HashSet::new()) + (state_ids_compressed, Arc::new(HashSet::new())) }; services().rooms.state_compressor.save_state_from_diff( shortstatehash, @@ -234,8 +234,8 @@ impl Service { services().rooms.state_compressor.save_state_from_diff( shortstatehash, - statediffnew, - statediffremoved, + Arc::new(statediffnew), + Arc::new(statediffremoved), 2, states_parents, )?; @@ -396,7 +396,7 @@ impl Service { .1; Ok(full_state - .into_iter() + .iter() .filter_map(|compressed| { services() .rooms diff --git a/src/service/rooms/state_compressor/data.rs b/src/service/rooms/state_compressor/data.rs index ce164c6d..d221d576 100644 --- a/src/service/rooms/state_compressor/data.rs +++ b/src/service/rooms/state_compressor/data.rs @@ -1,12 +1,12 @@ -use std::collections::HashSet; +use std::{collections::HashSet, sync::Arc}; use super::CompressedStateEvent; use crate::Result; pub struct StateDiff { pub parent: Option, - pub added: HashSet, - pub removed: HashSet, + pub added: Arc>, + pub removed: Arc>, } pub trait Data: Send + Sync { diff --git a/src/service/rooms/state_compressor/mod.rs b/src/service/rooms/state_compressor/mod.rs index 356f32c8..d29b020b 100644 --- a/src/service/rooms/state_compressor/mod.rs +++ b/src/service/rooms/state_compressor/mod.rs @@ -20,10 +20,10 @@ pub struct Service { LruCache< u64, Vec<( - u64, // sstatehash - HashSet, // full state - HashSet, // added - HashSet, // removed + u64, // sstatehash + Arc>, // full state + Arc>, // added + Arc>, // removed )>, >, >, @@ -39,10 +39,10 @@ impl Service { shortstatehash: u64, ) -> Result< Vec<( - u64, // sstatehash - HashSet, // full state - HashSet, // added - HashSet, // removed + u64, // sstatehash + Arc>, // full state + Arc>, // added + Arc>, // removed )>, > { if let Some(r) = self @@ -62,13 +62,19 @@ impl Service { if let Some(parent) = parent { let mut response = self.load_shortstatehash_info(parent)?; - let mut state = response.last().unwrap().1.clone(); + let mut state = (*response.last().unwrap().1).clone(); state.extend(added.iter().copied()); + let removed = (*removed).clone(); for r in &removed { state.remove(r); } - response.push((shortstatehash, state, added, removed)); + response.push((shortstatehash, Arc::new(state), added, Arc::new(removed))); + + self.stateinfo_cache + .lock() + .unwrap() + .insert(shortstatehash, response.clone()); Ok(response) } else { @@ -135,14 +141,14 @@ impl Service { pub fn save_state_from_diff( &self, shortstatehash: u64, - statediffnew: HashSet, - statediffremoved: HashSet, + statediffnew: Arc>, + statediffremoved: Arc>, diff_to_sibling: usize, mut parent_states: Vec<( - u64, // sstatehash - HashSet, // full state - HashSet, // added - HashSet, // removed + u64, // sstatehash + Arc>, // full state + Arc>, // added + Arc>, // removed )>, ) -> Result<()> { let diffsum = statediffnew.len() + statediffremoved.len(); @@ -152,29 +158,29 @@ impl Service { // To many layers, we have to go deeper let parent = parent_states.pop().unwrap(); - let mut parent_new = parent.2; - let mut parent_removed = parent.3; + let mut parent_new = (*parent.2).clone(); + let mut parent_removed = (*parent.3).clone(); - for removed in statediffremoved { - if !parent_new.remove(&removed) { + for removed in statediffremoved.iter() { + if !parent_new.remove(removed) { // It was not added in the parent and we removed it - parent_removed.insert(removed); + parent_removed.insert(removed.clone()); } // Else it was added in the parent and we removed it again. We can forget this change } - for new in statediffnew { - if !parent_removed.remove(&new) { + for new in statediffnew.iter() { + if !parent_removed.remove(new) { // It was not touched in the parent and we added it - parent_new.insert(new); + parent_new.insert(new.clone()); } // Else it was removed in the parent and we added it again. We can forget this change } self.save_state_from_diff( shortstatehash, - parent_new, - parent_removed, + Arc::new(parent_new), + Arc::new(parent_removed), diffsum, parent_states, )?; @@ -205,29 +211,29 @@ impl Service { if diffsum * diffsum >= 2 * diff_to_sibling * parent_diff { // Diff too big, we replace above layer(s) - let mut parent_new = parent.2; - let mut parent_removed = parent.3; + let mut parent_new = (*parent.2).clone(); + let mut parent_removed = (*parent.3).clone(); - for removed in statediffremoved { - if !parent_new.remove(&removed) { + for removed in statediffremoved.iter() { + if !parent_new.remove(removed) { // It was not added in the parent and we removed it - parent_removed.insert(removed); + parent_removed.insert(removed.clone()); } // Else it was added in the parent and we removed it again. We can forget this change } - for new in statediffnew { - if !parent_removed.remove(&new) { + for new in statediffnew.iter() { + if !parent_removed.remove(new) { // It was not touched in the parent and we added it - parent_new.insert(new); + parent_new.insert(new.clone()); } // Else it was removed in the parent and we added it again. We can forget this change } self.save_state_from_diff( shortstatehash, - parent_new, - parent_removed, + Arc::new(parent_new), + Arc::new(parent_removed), diffsum, parent_states, )?; @@ -250,11 +256,11 @@ impl Service { pub fn save_state( &self, room_id: &RoomId, - new_state_ids_compressed: HashSet, + new_state_ids_compressed: Arc>, ) -> Result<( u64, - HashSet, - HashSet, + Arc>, + Arc>, )> { let previous_shortstatehash = services().rooms.state.get_room_shortstatehash(room_id)?; @@ -271,7 +277,11 @@ impl Service { .get_or_create_shortstatehash(&state_hash)?; if Some(new_shortstatehash) == previous_shortstatehash { - return Ok((new_shortstatehash, HashSet::new(), HashSet::new())); + return Ok(( + new_shortstatehash, + Arc::new(HashSet::new()), + Arc::new(HashSet::new()), + )); } let states_parents = previous_shortstatehash @@ -290,9 +300,9 @@ impl Service { .copied() .collect(); - (statediffnew, statediffremoved) + (Arc::new(statediffnew), Arc::new(statediffremoved)) } else { - (new_state_ids_compressed, HashSet::new()) + (new_state_ids_compressed, Arc::new(HashSet::new())) }; if !already_existed { diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index 2356a00e..56769d5b 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -946,7 +946,7 @@ impl Service { pdu: &PduEvent, pdu_json: CanonicalJsonObject, new_room_leaves: Vec, - state_ids_compressed: HashSet, + state_ids_compressed: Arc>, soft_fail: bool, state_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex ) -> Result>> {