Add roomid_statehash tree, clean up review issues

This commit is contained in:
Devin Ragotzy 2020-08-19 17:27:24 -04:00 committed by Devin Ragotzy
parent 846a0098c1
commit d73c6aa8ad
3 changed files with 52 additions and 54 deletions

View file

@ -112,11 +112,7 @@ pub async fn join_room_by_id_route(
.room_state .room_state
.state .state
.iter() .iter()
.map(|pdu| pdu.deserialize().map(StateEvent::Full)) .map(|pdu| pdu.deserialize().map(StateEvent::Full).map(|ev| (ev.event_id(), ev)))
.map(|ev| {
let ev = ev?;
Ok::<_, serde_json::Error>((ev.event_id(), ev))
})
.collect::<Result<BTreeMap<EventId, StateEvent>, _>>() .collect::<Result<BTreeMap<EventId, StateEvent>, _>>()
.map_err(|_| Error::bad_database("Invalid PDU found in db."))?; .map_err(|_| Error::bad_database("Invalid PDU found in db."))?;
@ -140,9 +136,7 @@ pub async fn join_room_by_id_route(
for ev_id in &sorted_events_ids { for ev_id in &sorted_events_ids {
// this is a `state_res::StateEvent` that holds a `ruma::Pdu` // this is a `state_res::StateEvent` that holds a `ruma::Pdu`
let pdu = event_map.get(ev_id).ok_or_else(|| { let pdu = event_map.get(ev_id).expect("Found event_id in sorted events that is not in resolved state");
Error::Conflict("Found event_id in sorted events that is not in resolved state")
})?;
// We do not rebuild the PDU in this case only insert to DB // We do not rebuild the PDU in this case only insert to DB
db.rooms db.rooms

View file

@ -114,6 +114,7 @@ impl Database {
stateid_pduid: db.open_tree("stateid_pduid")?, stateid_pduid: db.open_tree("stateid_pduid")?,
pduid_statehash: db.open_tree("pduid_statehash")?, pduid_statehash: db.open_tree("pduid_statehash")?,
roomid_statehash: db.open_tree("roomid_statehash")?,
}, },
account_data: account_data::AccountData { account_data: account_data::AccountData {
roomuserdataid_accountdata: db.open_tree("roomuserdataid_accountdata")?, roomuserdataid_accountdata: db.open_tree("roomuserdataid_accountdata")?,

View file

@ -63,6 +63,8 @@ pub struct Rooms {
pub(super) pduid_statehash: sled::Tree, // PDU id -> StateHash pub(super) pduid_statehash: sled::Tree, // PDU id -> StateHash
/// Also holds the full room state minus the latest event. /// Also holds the full room state minus the latest event.
pub(super) stateid_pduid: sled::Tree, // StateId = StateHash + (EventType, StateKey) pub(super) stateid_pduid: sled::Tree, // StateId = StateHash + (EventType, StateKey)
/// The room_id -> the latest StateHash
pub(super) roomid_statehash: sled::Tree,
} }
impl StateStore for Rooms { impl StateStore for Rooms {
@ -93,53 +95,7 @@ impl StateStore for Rooms {
} }
} }
// These are the methods related to STATE resolution.
impl Rooms { impl Rooms {
/// Generates a new StateHash and associates it with the incoming event.
///
/// This adds all current state events (not including the incoming event)
/// to `stateid_pduid` and adds the incoming event to `pduid_statehash`.
/// The incoming event is the `pdu_id` passed to this method.
pub fn append_state_pdu(
&self,
room_id: &RoomId,
pdu_id: &[u8],
state_key: &str,
kind: &EventType,
) -> Result<StateHashId> {
let state_hash = self.new_state_hash_id(room_id)?;
let state = self.current_state_pduids(room_id)?;
let mut key = state_hash.as_bytes().to_vec();
key.push(0xff);
// TODO eventually we could avoid writing to the DB so much on every event
// by keeping track of the delta and write that every so often
for ((ev_ty, state_key), pid) in state {
let mut state_id = key.to_vec();
state_id.extend_from_slice(ev_ty.to_string().as_bytes());
key.push(0xff);
state_id.extend_from_slice(state_key.expect("state event").as_bytes());
key.push(0xff);
self.stateid_pduid.insert(&state_id, &pid)?;
}
// This event's state does not include the event itself. `current_state_pduids`
// uses `roomstateid_pduid` before the current event is inserted to the tree so the state
// will be everything up to but not including the incoming event.
self.pduid_statehash.insert(pdu_id, state_hash.as_bytes())?;
let mut key = room_id.as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(kind.to_string().as_bytes());
key.push(0xff);
key.extend_from_slice(state_key.as_bytes());
self.roomstateid_pduid.insert(key, pdu_id)?;
Ok(state_hash)
}
/// Builds a `StateMap` by iterating over all keys that start /// Builds a `StateMap` by iterating over all keys that start
/// with `state_hash`, this gives the full state at event "x". /// with `state_hash`, this gives the full state at event "x".
pub fn get_statemap_by_hash(&self, state_hash: StateHashId) -> Result<StateMap<EventId>> { pub fn get_statemap_by_hash(&self, state_hash: StateHashId) -> Result<StateMap<EventId>> {
@ -633,6 +589,53 @@ impl Rooms {
Ok(pdu.event_id) Ok(pdu.event_id)
} }
/// Generates a new StateHash and associates it with the incoming event.
///
/// This adds all current state events (not including the incoming event)
/// to `stateid_pduid` and adds the incoming event to `pduid_statehash`.
/// The incoming event is the `pdu_id` passed to this method.
fn append_state_pdu(
&self,
room_id: &RoomId,
pdu_id: &[u8],
state_key: &str,
kind: &EventType,
) -> Result<StateHashId> {
let state_hash = self.new_state_hash_id(room_id)?;
let state = self.current_state_pduids(room_id)?;
let mut key = state_hash.as_bytes().to_vec();
key.push(0xff);
// TODO eventually we could avoid writing to the DB so much on every event
// by keeping track of the delta and write that every so often
for ((ev_ty, state_key), pid) in state {
let mut state_id = key.to_vec();
state_id.extend_from_slice(ev_ty.to_string().as_bytes());
key.push(0xff);
state_id.extend_from_slice(state_key.expect("state event").as_bytes());
key.push(0xff);
self.stateid_pduid.insert(&state_id, &pid)?;
}
// This event's state does not include the event itself. `current_state_pduids`
// uses `roomstateid_pduid` before the current event is inserted to the tree so the state
// will be everything up to but not including the incoming event.
self.pduid_statehash.insert(pdu_id, state_hash.as_bytes())?;
self.roomid_statehash.insert(room_id.as_bytes(), state_hash.as_bytes())?;
let mut key = room_id.as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(kind.to_string().as_bytes());
key.push(0xff);
key.extend_from_slice(state_key.as_bytes());
self.roomstateid_pduid.insert(key, pdu_id)?;
Ok(state_hash)
}
/// Creates a new persisted data unit and adds it to a room. /// Creates a new persisted data unit and adds it to a room.
pub fn build_and_append_pdu( pub fn build_and_append_pdu(
&self, &self,