diff --git a/src/client_server/sync.rs b/src/client_server/sync.rs index 14aac3a1..e15b1b9a 100644 --- a/src/client_server/sync.rs +++ b/src/client_server/sync.rs @@ -378,11 +378,11 @@ async fn sync_helper( let mut state_events = Vec::new(); let mut lazy_loaded = HashSet::new(); - for (shortstatekey, id) in current_state_ids { + for (&shortstatekey, id) in current_state_ids.iter() { let (event_type, state_key) = db.rooms.get_statekey_from_short(shortstatekey)?; if event_type != EventType::RoomMember { - let pdu = match db.rooms.get_pdu(&id)? { + let pdu = match db.rooms.get_pdu(id)? { Some(pdu) => pdu, None => { error!("Pdu in state not found: {}", id); @@ -394,7 +394,7 @@ async fn sync_helper( || body.full_state || timeline_users.contains(&state_key) { - let pdu = match db.rooms.get_pdu(&id)? { + let pdu = match db.rooms.get_pdu(id)? { Some(pdu) => pdu, None => { error!("Pdu in state not found: {}", id); @@ -460,9 +460,9 @@ async fn sync_helper( let current_state_ids = db.rooms.state_full_ids(current_shortstatehash)?; let since_state_ids = db.rooms.state_full_ids(since_shortstatehash)?; - for (key, id) in current_state_ids { - if body.full_state || since_state_ids.get(&key) != Some(&id) { - let pdu = match db.rooms.get_pdu(&id)? { + for (&key, id) in current_state_ids.iter() { + if body.full_state || since_state_ids.get(&key) != Some(id) { + let pdu = match db.rooms.get_pdu(id)? { Some(pdu) => pdu, None => { error!("Pdu in state not found: {}", id); diff --git a/src/database.rs b/src/database.rs index 1997dc0a..761e1a9f 100644 --- a/src/database.rs +++ b/src/database.rs @@ -362,6 +362,7 @@ impl Database { .expect("pdu cache capacity fits into usize"), )), auth_chain_cache: Mutex::new(LruCache::new(1_000_000)), + state_full_ids_cache: Mutex::new(LruCache::new(30)), shorteventid_cache: Mutex::new(LruCache::new(1_000_000)), eventidshort_cache: Mutex::new(LruCache::new(1_000_000)), shortstatekey_cache: Mutex::new(LruCache::new(1_000_000)), diff --git a/src/database/rooms.rs b/src/database/rooms.rs index 0ba6c9ba..96588748 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -114,6 +114,7 @@ pub struct Rooms { pub(super) pdu_cache: Mutex, Arc>>, pub(super) shorteventid_cache: Mutex>>, pub(super) auth_chain_cache: Mutex, Arc>>>, + pub(super) state_full_ids_cache: Mutex>>>>, pub(super) eventidshort_cache: Mutex, u64>>, pub(super) statekeyshort_cache: Mutex>, pub(super) shortstatekey_cache: Mutex>, @@ -138,16 +139,34 @@ impl Rooms { /// Builds a StateMap by iterating over all keys that start /// with state_hash, this gives the full state for the given state_hash. #[tracing::instrument(skip(self))] - pub fn state_full_ids(&self, shortstatehash: u64) -> Result>> { + pub fn state_full_ids(&self, shortstatehash: u64) -> Result>>> { + if let Some(r) = self + .state_full_ids_cache + .lock() + .unwrap() + .get_mut(&shortstatehash) + { + return Ok(r.clone()); + } + let full_state = self .load_shortstatehash_info(shortstatehash)? .pop() .expect("there is always one layer") .1; - full_state - .into_iter() - .map(|compressed| self.parse_compressed_state_event(compressed)) - .collect() + let result = Arc::new( + full_state + .into_iter() + .map(|compressed| self.parse_compressed_state_event(compressed)) + .collect::>>()?, + ); + + self.state_full_ids_cache + .lock() + .unwrap() + .insert(shortstatehash, result.clone()); + + Ok(result) } #[tracing::instrument(skip(self))] diff --git a/src/server_server.rs b/src/server_server.rs index 54ae0251..2438bbbc 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -1330,7 +1330,7 @@ async fn upgrade_outlier_to_timeline_pdu( let state = prev_event_sstatehash.map(|shortstatehash| db.rooms.state_full_ids(shortstatehash)); - if let Some(Ok(mut state)) = state { + if let Some(Ok(state)) = state { warn!("Using cached state"); let prev_pdu = db.rooms.get_pdu(prev_event).ok().flatten().ok_or_else(|| { @@ -1343,11 +1343,13 @@ async fn upgrade_outlier_to_timeline_pdu( .get_or_create_shortstatekey(&prev_pdu.kind, state_key, &db.globals) .map_err(|_| "Failed to create shortstatekey.".to_owned())?; + let mut state = (*state).clone(); state.insert(shortstatekey, Arc::from(prev_event)); // Now it's the state after the pdu + state_at_incoming_event = Some(Arc::new(state)); + } else { + state_at_incoming_event = Some(state); } - - state_at_incoming_event = Some(state); } } else { warn!("Calculating state at event using state res"); @@ -1377,10 +1379,11 @@ async fn upgrade_outlier_to_timeline_pdu( let mut auth_chain_sets = Vec::with_capacity(extremity_sstatehashes.len()); for (sstatehash, prev_event) in extremity_sstatehashes { - let mut leaf_state: BTreeMap<_, _> = db + let mut leaf_state: BTreeMap<_, _> = (*db .rooms .state_full_ids(sstatehash) - .map_err(|_| "Failed to ask db for room state.".to_owned())?; + .map_err(|_| "Failed to ask db for room state.".to_owned())?) + .clone(); if let Some(state_key) = &prev_event.state_key { let shortstatekey = db @@ -1424,7 +1427,7 @@ async fn upgrade_outlier_to_timeline_pdu( res.ok().flatten() }, ) { - Ok(new_state) => Some( + Ok(new_state) => Some(Arc::new( new_state .into_iter() .map(|((event_type, state_key), event_id)| { @@ -1435,7 +1438,7 @@ async fn upgrade_outlier_to_timeline_pdu( Ok((shortstatekey, event_id)) }) .collect::>()?, - ), + )), Err(e) => { warn!("State resolution on prev events failed, either an event could not be found or deserialization: {}", e); None @@ -1511,7 +1514,7 @@ async fn upgrade_outlier_to_timeline_pdu( return Err("Incoming event refers to wrong create event.".to_owned()); } - state_at_incoming_event = Some(state); + state_at_incoming_event = Some(Arc::new(state)); } Err(e) => { warn!("Fetching state for event failed: {}", e); @@ -1691,16 +1694,18 @@ async fn upgrade_outlier_to_timeline_pdu( fork_states.push(current_state_ids); // We also add state after incoming event to the fork states - let mut state_after = state_at_incoming_event.clone(); if let Some(state_key) = &incoming_pdu.state_key { + let mut state_after = (*state_at_incoming_event).clone(); let shortstatekey = db .rooms .get_or_create_shortstatekey(&incoming_pdu.kind, state_key, &db.globals) .map_err(|_| "Failed to create shortstatekey.".to_owned())?; state_after.insert(shortstatekey, Arc::from(&*incoming_pdu.event_id)); + fork_states.push(Arc::new(state_after)); + } else { + fork_states.push(state_at_incoming_event); } - fork_states.push(state_after); let mut update_state = false; // 14. Use state resolution to find new room state @@ -1737,11 +1742,11 @@ async fn upgrade_outlier_to_timeline_pdu( let fork_states: Vec<_> = fork_states .into_iter() .map(|map| { - map.into_iter() - .filter_map(|(k, id)| { + map.iter() + .filter_map(|(&k, id)| { db.rooms .get_statekey_from_short(k) - .map(|k| (k, id)) + .map(|k| (k, id.clone())) .map_err(|e| warn!("Failed to get_statekey_from_short: {}", e)) .ok() }) @@ -2546,10 +2551,10 @@ pub fn get_room_state_route( let pdus = db .rooms .state_full_ids(shortstatehash)? - .into_iter() + .iter() .map(|(_, id)| { PduEvent::convert_to_outgoing_federation_event( - db.rooms.get_pdu_json(&id).unwrap().unwrap(), + db.rooms.get_pdu_json(id).unwrap().unwrap(), ) }) .collect(); @@ -2611,8 +2616,8 @@ pub fn get_room_state_ids_route( let pdu_ids = db .rooms .state_full_ids(shortstatehash)? - .into_iter() - .map(|(_, id)| (*id).to_owned()) + .iter() + .map(|(_, id)| (**id).to_owned()) .collect(); let auth_chain_ids = get_auth_chain(&body.room_id, vec![Arc::from(&*body.event_id)], &db)?;