Compare commits

...

1 commit

Author SHA1 Message Date
Timo Kösters
1887b308a4
improvement: state_full_ids_cache 2022-01-18 14:05:19 +01:00
4 changed files with 53 additions and 28 deletions

View file

@ -378,11 +378,11 @@ async fn sync_helper(
let mut state_events = Vec::new(); let mut state_events = Vec::new();
let mut lazy_loaded = HashSet::new(); let mut lazy_loaded = HashSet::new();
for (shortstatekey, id) in current_state_ids { for (&shortstatekey, id) in current_state_ids.iter() {
let (event_type, state_key) = db.rooms.get_statekey_from_short(shortstatekey)?; let (event_type, state_key) = db.rooms.get_statekey_from_short(shortstatekey)?;
if event_type != EventType::RoomMember { if event_type != EventType::RoomMember {
let pdu = match db.rooms.get_pdu(&id)? { let pdu = match db.rooms.get_pdu(id)? {
Some(pdu) => pdu, Some(pdu) => pdu,
None => { None => {
error!("Pdu in state not found: {}", id); error!("Pdu in state not found: {}", id);
@ -394,7 +394,7 @@ async fn sync_helper(
|| body.full_state || body.full_state
|| timeline_users.contains(&state_key) || timeline_users.contains(&state_key)
{ {
let pdu = match db.rooms.get_pdu(&id)? { let pdu = match db.rooms.get_pdu(id)? {
Some(pdu) => pdu, Some(pdu) => pdu,
None => { None => {
error!("Pdu in state not found: {}", id); error!("Pdu in state not found: {}", id);
@ -460,9 +460,9 @@ async fn sync_helper(
let current_state_ids = db.rooms.state_full_ids(current_shortstatehash)?; let current_state_ids = db.rooms.state_full_ids(current_shortstatehash)?;
let since_state_ids = db.rooms.state_full_ids(since_shortstatehash)?; let since_state_ids = db.rooms.state_full_ids(since_shortstatehash)?;
for (key, id) in current_state_ids { for (&key, id) in current_state_ids.iter() {
if body.full_state || since_state_ids.get(&key) != Some(&id) { if body.full_state || since_state_ids.get(&key) != Some(id) {
let pdu = match db.rooms.get_pdu(&id)? { let pdu = match db.rooms.get_pdu(id)? {
Some(pdu) => pdu, Some(pdu) => pdu,
None => { None => {
error!("Pdu in state not found: {}", id); error!("Pdu in state not found: {}", id);

View file

@ -362,6 +362,7 @@ impl Database {
.expect("pdu cache capacity fits into usize"), .expect("pdu cache capacity fits into usize"),
)), )),
auth_chain_cache: Mutex::new(LruCache::new(1_000_000)), auth_chain_cache: Mutex::new(LruCache::new(1_000_000)),
state_full_ids_cache: Mutex::new(LruCache::new(30)),
shorteventid_cache: Mutex::new(LruCache::new(1_000_000)), shorteventid_cache: Mutex::new(LruCache::new(1_000_000)),
eventidshort_cache: Mutex::new(LruCache::new(1_000_000)), eventidshort_cache: Mutex::new(LruCache::new(1_000_000)),
shortstatekey_cache: Mutex::new(LruCache::new(1_000_000)), shortstatekey_cache: Mutex::new(LruCache::new(1_000_000)),

View file

@ -114,6 +114,7 @@ pub struct Rooms {
pub(super) pdu_cache: Mutex<LruCache<Box<EventId>, Arc<PduEvent>>>, pub(super) pdu_cache: Mutex<LruCache<Box<EventId>, Arc<PduEvent>>>,
pub(super) shorteventid_cache: Mutex<LruCache<u64, Arc<EventId>>>, pub(super) shorteventid_cache: Mutex<LruCache<u64, Arc<EventId>>>,
pub(super) auth_chain_cache: Mutex<LruCache<Vec<u64>, Arc<HashSet<u64>>>>, pub(super) auth_chain_cache: Mutex<LruCache<Vec<u64>, Arc<HashSet<u64>>>>,
pub(super) state_full_ids_cache: Mutex<LruCache<u64, Arc<BTreeMap<u64, Arc<EventId>>>>>,
pub(super) eventidshort_cache: Mutex<LruCache<Box<EventId>, u64>>, pub(super) eventidshort_cache: Mutex<LruCache<Box<EventId>, u64>>,
pub(super) statekeyshort_cache: Mutex<LruCache<(EventType, String), u64>>, pub(super) statekeyshort_cache: Mutex<LruCache<(EventType, String), u64>>,
pub(super) shortstatekey_cache: Mutex<LruCache<u64, (EventType, String)>>, pub(super) shortstatekey_cache: Mutex<LruCache<u64, (EventType, String)>>,
@ -138,16 +139,34 @@ impl Rooms {
/// Builds a StateMap by iterating over all keys that start /// Builds a StateMap by iterating over all keys that start
/// with state_hash, this gives the full state for the given state_hash. /// with state_hash, this gives the full state for the given state_hash.
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self))]
pub fn state_full_ids(&self, shortstatehash: u64) -> Result<BTreeMap<u64, Arc<EventId>>> { pub fn state_full_ids(&self, shortstatehash: u64) -> Result<Arc<BTreeMap<u64, Arc<EventId>>>> {
if let Some(r) = self
.state_full_ids_cache
.lock()
.unwrap()
.get_mut(&shortstatehash)
{
return Ok(r.clone());
}
let full_state = self let full_state = self
.load_shortstatehash_info(shortstatehash)? .load_shortstatehash_info(shortstatehash)?
.pop() .pop()
.expect("there is always one layer") .expect("there is always one layer")
.1; .1;
full_state let result = Arc::new(
.into_iter() full_state
.map(|compressed| self.parse_compressed_state_event(compressed)) .into_iter()
.collect() .map(|compressed| self.parse_compressed_state_event(compressed))
.collect::<Result<BTreeMap<_, _>>>()?,
);
self.state_full_ids_cache
.lock()
.unwrap()
.insert(shortstatehash, result.clone());
Ok(result)
} }
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self))]

View file

@ -1330,7 +1330,7 @@ async fn upgrade_outlier_to_timeline_pdu(
let state = let state =
prev_event_sstatehash.map(|shortstatehash| db.rooms.state_full_ids(shortstatehash)); prev_event_sstatehash.map(|shortstatehash| db.rooms.state_full_ids(shortstatehash));
if let Some(Ok(mut state)) = state { if let Some(Ok(state)) = state {
warn!("Using cached state"); warn!("Using cached state");
let prev_pdu = let prev_pdu =
db.rooms.get_pdu(prev_event).ok().flatten().ok_or_else(|| { db.rooms.get_pdu(prev_event).ok().flatten().ok_or_else(|| {
@ -1343,11 +1343,13 @@ async fn upgrade_outlier_to_timeline_pdu(
.get_or_create_shortstatekey(&prev_pdu.kind, state_key, &db.globals) .get_or_create_shortstatekey(&prev_pdu.kind, state_key, &db.globals)
.map_err(|_| "Failed to create shortstatekey.".to_owned())?; .map_err(|_| "Failed to create shortstatekey.".to_owned())?;
let mut state = (*state).clone();
state.insert(shortstatekey, Arc::from(prev_event)); state.insert(shortstatekey, Arc::from(prev_event));
// Now it's the state after the pdu // Now it's the state after the pdu
state_at_incoming_event = Some(Arc::new(state));
} else {
state_at_incoming_event = Some(state);
} }
state_at_incoming_event = Some(state);
} }
} else { } else {
warn!("Calculating state at event using state res"); warn!("Calculating state at event using state res");
@ -1377,10 +1379,11 @@ async fn upgrade_outlier_to_timeline_pdu(
let mut auth_chain_sets = Vec::with_capacity(extremity_sstatehashes.len()); let mut auth_chain_sets = Vec::with_capacity(extremity_sstatehashes.len());
for (sstatehash, prev_event) in extremity_sstatehashes { for (sstatehash, prev_event) in extremity_sstatehashes {
let mut leaf_state: BTreeMap<_, _> = db let mut leaf_state: BTreeMap<_, _> = (*db
.rooms .rooms
.state_full_ids(sstatehash) .state_full_ids(sstatehash)
.map_err(|_| "Failed to ask db for room state.".to_owned())?; .map_err(|_| "Failed to ask db for room state.".to_owned())?)
.clone();
if let Some(state_key) = &prev_event.state_key { if let Some(state_key) = &prev_event.state_key {
let shortstatekey = db let shortstatekey = db
@ -1424,7 +1427,7 @@ async fn upgrade_outlier_to_timeline_pdu(
res.ok().flatten() res.ok().flatten()
}, },
) { ) {
Ok(new_state) => Some( Ok(new_state) => Some(Arc::new(
new_state new_state
.into_iter() .into_iter()
.map(|((event_type, state_key), event_id)| { .map(|((event_type, state_key), event_id)| {
@ -1435,7 +1438,7 @@ async fn upgrade_outlier_to_timeline_pdu(
Ok((shortstatekey, event_id)) Ok((shortstatekey, event_id))
}) })
.collect::<Result<_, String>>()?, .collect::<Result<_, String>>()?,
), )),
Err(e) => { Err(e) => {
warn!("State resolution on prev events failed, either an event could not be found or deserialization: {}", e); warn!("State resolution on prev events failed, either an event could not be found or deserialization: {}", e);
None None
@ -1511,7 +1514,7 @@ async fn upgrade_outlier_to_timeline_pdu(
return Err("Incoming event refers to wrong create event.".to_owned()); return Err("Incoming event refers to wrong create event.".to_owned());
} }
state_at_incoming_event = Some(state); state_at_incoming_event = Some(Arc::new(state));
} }
Err(e) => { Err(e) => {
warn!("Fetching state for event failed: {}", e); warn!("Fetching state for event failed: {}", e);
@ -1691,16 +1694,18 @@ async fn upgrade_outlier_to_timeline_pdu(
fork_states.push(current_state_ids); fork_states.push(current_state_ids);
// We also add state after incoming event to the fork states // We also add state after incoming event to the fork states
let mut state_after = state_at_incoming_event.clone();
if let Some(state_key) = &incoming_pdu.state_key { if let Some(state_key) = &incoming_pdu.state_key {
let mut state_after = (*state_at_incoming_event).clone();
let shortstatekey = db let shortstatekey = db
.rooms .rooms
.get_or_create_shortstatekey(&incoming_pdu.kind, state_key, &db.globals) .get_or_create_shortstatekey(&incoming_pdu.kind, state_key, &db.globals)
.map_err(|_| "Failed to create shortstatekey.".to_owned())?; .map_err(|_| "Failed to create shortstatekey.".to_owned())?;
state_after.insert(shortstatekey, Arc::from(&*incoming_pdu.event_id)); state_after.insert(shortstatekey, Arc::from(&*incoming_pdu.event_id));
fork_states.push(Arc::new(state_after));
} else {
fork_states.push(state_at_incoming_event);
} }
fork_states.push(state_after);
let mut update_state = false; let mut update_state = false;
// 14. Use state resolution to find new room state // 14. Use state resolution to find new room state
@ -1737,11 +1742,11 @@ async fn upgrade_outlier_to_timeline_pdu(
let fork_states: Vec<_> = fork_states let fork_states: Vec<_> = fork_states
.into_iter() .into_iter()
.map(|map| { .map(|map| {
map.into_iter() map.iter()
.filter_map(|(k, id)| { .filter_map(|(&k, id)| {
db.rooms db.rooms
.get_statekey_from_short(k) .get_statekey_from_short(k)
.map(|k| (k, id)) .map(|k| (k, id.clone()))
.map_err(|e| warn!("Failed to get_statekey_from_short: {}", e)) .map_err(|e| warn!("Failed to get_statekey_from_short: {}", e))
.ok() .ok()
}) })
@ -2546,10 +2551,10 @@ pub fn get_room_state_route(
let pdus = db let pdus = db
.rooms .rooms
.state_full_ids(shortstatehash)? .state_full_ids(shortstatehash)?
.into_iter() .iter()
.map(|(_, id)| { .map(|(_, id)| {
PduEvent::convert_to_outgoing_federation_event( PduEvent::convert_to_outgoing_federation_event(
db.rooms.get_pdu_json(&id).unwrap().unwrap(), db.rooms.get_pdu_json(id).unwrap().unwrap(),
) )
}) })
.collect(); .collect();
@ -2611,8 +2616,8 @@ pub fn get_room_state_ids_route(
let pdu_ids = db let pdu_ids = db
.rooms .rooms
.state_full_ids(shortstatehash)? .state_full_ids(shortstatehash)?
.into_iter() .iter()
.map(|(_, id)| (*id).to_owned()) .map(|(_, id)| (**id).to_owned())
.collect(); .collect();
let auth_chain_ids = get_auth_chain(&body.room_id, vec![Arc::from(&*body.event_id)], &db)?; let auth_chain_ids = get_auth_chain(&body.room_id, vec![Arc::from(&*body.event_id)], &db)?;