Compare commits
1 commit
next
...
state_full
Author | SHA1 | Date | |
---|---|---|---|
|
1887b308a4 |
4 changed files with 53 additions and 28 deletions
|
@ -378,11 +378,11 @@ async fn sync_helper(
|
||||||
let mut state_events = Vec::new();
|
let mut state_events = Vec::new();
|
||||||
let mut lazy_loaded = HashSet::new();
|
let mut lazy_loaded = HashSet::new();
|
||||||
|
|
||||||
for (shortstatekey, id) in current_state_ids {
|
for (&shortstatekey, id) in current_state_ids.iter() {
|
||||||
let (event_type, state_key) = db.rooms.get_statekey_from_short(shortstatekey)?;
|
let (event_type, state_key) = db.rooms.get_statekey_from_short(shortstatekey)?;
|
||||||
|
|
||||||
if event_type != EventType::RoomMember {
|
if event_type != EventType::RoomMember {
|
||||||
let pdu = match db.rooms.get_pdu(&id)? {
|
let pdu = match db.rooms.get_pdu(id)? {
|
||||||
Some(pdu) => pdu,
|
Some(pdu) => pdu,
|
||||||
None => {
|
None => {
|
||||||
error!("Pdu in state not found: {}", id);
|
error!("Pdu in state not found: {}", id);
|
||||||
|
@ -394,7 +394,7 @@ async fn sync_helper(
|
||||||
|| body.full_state
|
|| body.full_state
|
||||||
|| timeline_users.contains(&state_key)
|
|| timeline_users.contains(&state_key)
|
||||||
{
|
{
|
||||||
let pdu = match db.rooms.get_pdu(&id)? {
|
let pdu = match db.rooms.get_pdu(id)? {
|
||||||
Some(pdu) => pdu,
|
Some(pdu) => pdu,
|
||||||
None => {
|
None => {
|
||||||
error!("Pdu in state not found: {}", id);
|
error!("Pdu in state not found: {}", id);
|
||||||
|
@ -460,9 +460,9 @@ async fn sync_helper(
|
||||||
let current_state_ids = db.rooms.state_full_ids(current_shortstatehash)?;
|
let current_state_ids = db.rooms.state_full_ids(current_shortstatehash)?;
|
||||||
let since_state_ids = db.rooms.state_full_ids(since_shortstatehash)?;
|
let since_state_ids = db.rooms.state_full_ids(since_shortstatehash)?;
|
||||||
|
|
||||||
for (key, id) in current_state_ids {
|
for (&key, id) in current_state_ids.iter() {
|
||||||
if body.full_state || since_state_ids.get(&key) != Some(&id) {
|
if body.full_state || since_state_ids.get(&key) != Some(id) {
|
||||||
let pdu = match db.rooms.get_pdu(&id)? {
|
let pdu = match db.rooms.get_pdu(id)? {
|
||||||
Some(pdu) => pdu,
|
Some(pdu) => pdu,
|
||||||
None => {
|
None => {
|
||||||
error!("Pdu in state not found: {}", id);
|
error!("Pdu in state not found: {}", id);
|
||||||
|
|
|
@ -362,6 +362,7 @@ impl Database {
|
||||||
.expect("pdu cache capacity fits into usize"),
|
.expect("pdu cache capacity fits into usize"),
|
||||||
)),
|
)),
|
||||||
auth_chain_cache: Mutex::new(LruCache::new(1_000_000)),
|
auth_chain_cache: Mutex::new(LruCache::new(1_000_000)),
|
||||||
|
state_full_ids_cache: Mutex::new(LruCache::new(30)),
|
||||||
shorteventid_cache: Mutex::new(LruCache::new(1_000_000)),
|
shorteventid_cache: Mutex::new(LruCache::new(1_000_000)),
|
||||||
eventidshort_cache: Mutex::new(LruCache::new(1_000_000)),
|
eventidshort_cache: Mutex::new(LruCache::new(1_000_000)),
|
||||||
shortstatekey_cache: Mutex::new(LruCache::new(1_000_000)),
|
shortstatekey_cache: Mutex::new(LruCache::new(1_000_000)),
|
||||||
|
|
|
@ -114,6 +114,7 @@ pub struct Rooms {
|
||||||
pub(super) pdu_cache: Mutex<LruCache<Box<EventId>, Arc<PduEvent>>>,
|
pub(super) pdu_cache: Mutex<LruCache<Box<EventId>, Arc<PduEvent>>>,
|
||||||
pub(super) shorteventid_cache: Mutex<LruCache<u64, Arc<EventId>>>,
|
pub(super) shorteventid_cache: Mutex<LruCache<u64, Arc<EventId>>>,
|
||||||
pub(super) auth_chain_cache: Mutex<LruCache<Vec<u64>, Arc<HashSet<u64>>>>,
|
pub(super) auth_chain_cache: Mutex<LruCache<Vec<u64>, Arc<HashSet<u64>>>>,
|
||||||
|
pub(super) state_full_ids_cache: Mutex<LruCache<u64, Arc<BTreeMap<u64, Arc<EventId>>>>>,
|
||||||
pub(super) eventidshort_cache: Mutex<LruCache<Box<EventId>, u64>>,
|
pub(super) eventidshort_cache: Mutex<LruCache<Box<EventId>, u64>>,
|
||||||
pub(super) statekeyshort_cache: Mutex<LruCache<(EventType, String), u64>>,
|
pub(super) statekeyshort_cache: Mutex<LruCache<(EventType, String), u64>>,
|
||||||
pub(super) shortstatekey_cache: Mutex<LruCache<u64, (EventType, String)>>,
|
pub(super) shortstatekey_cache: Mutex<LruCache<u64, (EventType, String)>>,
|
||||||
|
@ -138,16 +139,34 @@ impl Rooms {
|
||||||
/// Builds a StateMap by iterating over all keys that start
|
/// Builds a StateMap by iterating over all keys that start
|
||||||
/// with state_hash, this gives the full state for the given state_hash.
|
/// with state_hash, this gives the full state for the given state_hash.
|
||||||
#[tracing::instrument(skip(self))]
|
#[tracing::instrument(skip(self))]
|
||||||
pub fn state_full_ids(&self, shortstatehash: u64) -> Result<BTreeMap<u64, Arc<EventId>>> {
|
pub fn state_full_ids(&self, shortstatehash: u64) -> Result<Arc<BTreeMap<u64, Arc<EventId>>>> {
|
||||||
|
if let Some(r) = self
|
||||||
|
.state_full_ids_cache
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.get_mut(&shortstatehash)
|
||||||
|
{
|
||||||
|
return Ok(r.clone());
|
||||||
|
}
|
||||||
|
|
||||||
let full_state = self
|
let full_state = self
|
||||||
.load_shortstatehash_info(shortstatehash)?
|
.load_shortstatehash_info(shortstatehash)?
|
||||||
.pop()
|
.pop()
|
||||||
.expect("there is always one layer")
|
.expect("there is always one layer")
|
||||||
.1;
|
.1;
|
||||||
full_state
|
let result = Arc::new(
|
||||||
.into_iter()
|
full_state
|
||||||
.map(|compressed| self.parse_compressed_state_event(compressed))
|
.into_iter()
|
||||||
.collect()
|
.map(|compressed| self.parse_compressed_state_event(compressed))
|
||||||
|
.collect::<Result<BTreeMap<_, _>>>()?,
|
||||||
|
);
|
||||||
|
|
||||||
|
self.state_full_ids_cache
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.insert(shortstatehash, result.clone());
|
||||||
|
|
||||||
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(self))]
|
#[tracing::instrument(skip(self))]
|
||||||
|
|
|
@ -1330,7 +1330,7 @@ async fn upgrade_outlier_to_timeline_pdu(
|
||||||
let state =
|
let state =
|
||||||
prev_event_sstatehash.map(|shortstatehash| db.rooms.state_full_ids(shortstatehash));
|
prev_event_sstatehash.map(|shortstatehash| db.rooms.state_full_ids(shortstatehash));
|
||||||
|
|
||||||
if let Some(Ok(mut state)) = state {
|
if let Some(Ok(state)) = state {
|
||||||
warn!("Using cached state");
|
warn!("Using cached state");
|
||||||
let prev_pdu =
|
let prev_pdu =
|
||||||
db.rooms.get_pdu(prev_event).ok().flatten().ok_or_else(|| {
|
db.rooms.get_pdu(prev_event).ok().flatten().ok_or_else(|| {
|
||||||
|
@ -1343,11 +1343,13 @@ async fn upgrade_outlier_to_timeline_pdu(
|
||||||
.get_or_create_shortstatekey(&prev_pdu.kind, state_key, &db.globals)
|
.get_or_create_shortstatekey(&prev_pdu.kind, state_key, &db.globals)
|
||||||
.map_err(|_| "Failed to create shortstatekey.".to_owned())?;
|
.map_err(|_| "Failed to create shortstatekey.".to_owned())?;
|
||||||
|
|
||||||
|
let mut state = (*state).clone();
|
||||||
state.insert(shortstatekey, Arc::from(prev_event));
|
state.insert(shortstatekey, Arc::from(prev_event));
|
||||||
// Now it's the state after the pdu
|
// Now it's the state after the pdu
|
||||||
|
state_at_incoming_event = Some(Arc::new(state));
|
||||||
|
} else {
|
||||||
|
state_at_incoming_event = Some(state);
|
||||||
}
|
}
|
||||||
|
|
||||||
state_at_incoming_event = Some(state);
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
warn!("Calculating state at event using state res");
|
warn!("Calculating state at event using state res");
|
||||||
|
@ -1377,10 +1379,11 @@ async fn upgrade_outlier_to_timeline_pdu(
|
||||||
let mut auth_chain_sets = Vec::with_capacity(extremity_sstatehashes.len());
|
let mut auth_chain_sets = Vec::with_capacity(extremity_sstatehashes.len());
|
||||||
|
|
||||||
for (sstatehash, prev_event) in extremity_sstatehashes {
|
for (sstatehash, prev_event) in extremity_sstatehashes {
|
||||||
let mut leaf_state: BTreeMap<_, _> = db
|
let mut leaf_state: BTreeMap<_, _> = (*db
|
||||||
.rooms
|
.rooms
|
||||||
.state_full_ids(sstatehash)
|
.state_full_ids(sstatehash)
|
||||||
.map_err(|_| "Failed to ask db for room state.".to_owned())?;
|
.map_err(|_| "Failed to ask db for room state.".to_owned())?)
|
||||||
|
.clone();
|
||||||
|
|
||||||
if let Some(state_key) = &prev_event.state_key {
|
if let Some(state_key) = &prev_event.state_key {
|
||||||
let shortstatekey = db
|
let shortstatekey = db
|
||||||
|
@ -1424,7 +1427,7 @@ async fn upgrade_outlier_to_timeline_pdu(
|
||||||
res.ok().flatten()
|
res.ok().flatten()
|
||||||
},
|
},
|
||||||
) {
|
) {
|
||||||
Ok(new_state) => Some(
|
Ok(new_state) => Some(Arc::new(
|
||||||
new_state
|
new_state
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|((event_type, state_key), event_id)| {
|
.map(|((event_type, state_key), event_id)| {
|
||||||
|
@ -1435,7 +1438,7 @@ async fn upgrade_outlier_to_timeline_pdu(
|
||||||
Ok((shortstatekey, event_id))
|
Ok((shortstatekey, event_id))
|
||||||
})
|
})
|
||||||
.collect::<Result<_, String>>()?,
|
.collect::<Result<_, String>>()?,
|
||||||
),
|
)),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!("State resolution on prev events failed, either an event could not be found or deserialization: {}", e);
|
warn!("State resolution on prev events failed, either an event could not be found or deserialization: {}", e);
|
||||||
None
|
None
|
||||||
|
@ -1511,7 +1514,7 @@ async fn upgrade_outlier_to_timeline_pdu(
|
||||||
return Err("Incoming event refers to wrong create event.".to_owned());
|
return Err("Incoming event refers to wrong create event.".to_owned());
|
||||||
}
|
}
|
||||||
|
|
||||||
state_at_incoming_event = Some(state);
|
state_at_incoming_event = Some(Arc::new(state));
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!("Fetching state for event failed: {}", e);
|
warn!("Fetching state for event failed: {}", e);
|
||||||
|
@ -1691,16 +1694,18 @@ async fn upgrade_outlier_to_timeline_pdu(
|
||||||
fork_states.push(current_state_ids);
|
fork_states.push(current_state_ids);
|
||||||
|
|
||||||
// We also add state after incoming event to the fork states
|
// We also add state after incoming event to the fork states
|
||||||
let mut state_after = state_at_incoming_event.clone();
|
|
||||||
if let Some(state_key) = &incoming_pdu.state_key {
|
if let Some(state_key) = &incoming_pdu.state_key {
|
||||||
|
let mut state_after = (*state_at_incoming_event).clone();
|
||||||
let shortstatekey = db
|
let shortstatekey = db
|
||||||
.rooms
|
.rooms
|
||||||
.get_or_create_shortstatekey(&incoming_pdu.kind, state_key, &db.globals)
|
.get_or_create_shortstatekey(&incoming_pdu.kind, state_key, &db.globals)
|
||||||
.map_err(|_| "Failed to create shortstatekey.".to_owned())?;
|
.map_err(|_| "Failed to create shortstatekey.".to_owned())?;
|
||||||
|
|
||||||
state_after.insert(shortstatekey, Arc::from(&*incoming_pdu.event_id));
|
state_after.insert(shortstatekey, Arc::from(&*incoming_pdu.event_id));
|
||||||
|
fork_states.push(Arc::new(state_after));
|
||||||
|
} else {
|
||||||
|
fork_states.push(state_at_incoming_event);
|
||||||
}
|
}
|
||||||
fork_states.push(state_after);
|
|
||||||
|
|
||||||
let mut update_state = false;
|
let mut update_state = false;
|
||||||
// 14. Use state resolution to find new room state
|
// 14. Use state resolution to find new room state
|
||||||
|
@ -1737,11 +1742,11 @@ async fn upgrade_outlier_to_timeline_pdu(
|
||||||
let fork_states: Vec<_> = fork_states
|
let fork_states: Vec<_> = fork_states
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|map| {
|
.map(|map| {
|
||||||
map.into_iter()
|
map.iter()
|
||||||
.filter_map(|(k, id)| {
|
.filter_map(|(&k, id)| {
|
||||||
db.rooms
|
db.rooms
|
||||||
.get_statekey_from_short(k)
|
.get_statekey_from_short(k)
|
||||||
.map(|k| (k, id))
|
.map(|k| (k, id.clone()))
|
||||||
.map_err(|e| warn!("Failed to get_statekey_from_short: {}", e))
|
.map_err(|e| warn!("Failed to get_statekey_from_short: {}", e))
|
||||||
.ok()
|
.ok()
|
||||||
})
|
})
|
||||||
|
@ -2546,10 +2551,10 @@ pub fn get_room_state_route(
|
||||||
let pdus = db
|
let pdus = db
|
||||||
.rooms
|
.rooms
|
||||||
.state_full_ids(shortstatehash)?
|
.state_full_ids(shortstatehash)?
|
||||||
.into_iter()
|
.iter()
|
||||||
.map(|(_, id)| {
|
.map(|(_, id)| {
|
||||||
PduEvent::convert_to_outgoing_federation_event(
|
PduEvent::convert_to_outgoing_federation_event(
|
||||||
db.rooms.get_pdu_json(&id).unwrap().unwrap(),
|
db.rooms.get_pdu_json(id).unwrap().unwrap(),
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
@ -2611,8 +2616,8 @@ pub fn get_room_state_ids_route(
|
||||||
let pdu_ids = db
|
let pdu_ids = db
|
||||||
.rooms
|
.rooms
|
||||||
.state_full_ids(shortstatehash)?
|
.state_full_ids(shortstatehash)?
|
||||||
.into_iter()
|
.iter()
|
||||||
.map(|(_, id)| (*id).to_owned())
|
.map(|(_, id)| (**id).to_owned())
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let auth_chain_ids = get_auth_chain(&body.room_id, vec![Arc::from(&*body.event_id)], &db)?;
|
let auth_chain_ids = get_auth_chain(&body.room_id, vec![Arc::from(&*body.event_id)], &db)?;
|
||||||
|
|
Loading…
Add table
Reference in a new issue