refactor: event handling code
This commit is contained in:
parent
1b0477d569
commit
dcdbcc0851
19 changed files with 373 additions and 5298 deletions
|
@ -806,36 +806,6 @@ pub(crate) async fn invite_helper<'a>(
|
||||||
);
|
);
|
||||||
let state_lock = mutex_state.lock().await;
|
let state_lock = mutex_state.lock().await;
|
||||||
|
|
||||||
let prev_events: Vec<_> = db
|
|
||||||
.rooms
|
|
||||||
.get_pdu_leaves(room_id)?
|
|
||||||
.into_iter()
|
|
||||||
.take(20)
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let create_event = db
|
|
||||||
.rooms
|
|
||||||
.room_state_get(room_id, &StateEventType::RoomCreate, "")?;
|
|
||||||
|
|
||||||
let create_event_content: Option<RoomCreateEventContent> = create_event
|
|
||||||
.as_ref()
|
|
||||||
.map(|create_event| {
|
|
||||||
serde_json::from_str(create_event.content.get()).map_err(|e| {
|
|
||||||
warn!("Invalid create event: {}", e);
|
|
||||||
Error::bad_database("Invalid create event in db.")
|
|
||||||
})
|
|
||||||
})
|
|
||||||
.transpose()?;
|
|
||||||
|
|
||||||
// If there was no create event yet, assume we are creating a room with the default
|
|
||||||
// version right now
|
|
||||||
let room_version_id = create_event_content
|
|
||||||
.map_or(db.globals.default_room_version(), |create_event| {
|
|
||||||
create_event.room_version
|
|
||||||
});
|
|
||||||
let room_version =
|
|
||||||
RoomVersion::new(&room_version_id).expect("room version is supported");
|
|
||||||
|
|
||||||
let content = to_raw_value(&RoomMemberEventContent {
|
let content = to_raw_value(&RoomMemberEventContent {
|
||||||
avatar_url: None,
|
avatar_url: None,
|
||||||
displayname: None,
|
displayname: None,
|
||||||
|
@ -851,98 +821,7 @@ pub(crate) async fn invite_helper<'a>(
|
||||||
let state_key = user_id.to_string();
|
let state_key = user_id.to_string();
|
||||||
let kind = StateEventType::RoomMember;
|
let kind = StateEventType::RoomMember;
|
||||||
|
|
||||||
let auth_events = db.rooms.get_auth_events(
|
let (pdu, pdu_json) = create_hash_and_sign_event();
|
||||||
room_id,
|
|
||||||
&kind.to_string().into(),
|
|
||||||
sender_user,
|
|
||||||
Some(&state_key),
|
|
||||||
&content,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
// Our depth is the maximum depth of prev_events + 1
|
|
||||||
let depth = prev_events
|
|
||||||
.iter()
|
|
||||||
.filter_map(|event_id| Some(db.rooms.get_pdu(event_id).ok()??.depth))
|
|
||||||
.max()
|
|
||||||
.unwrap_or_else(|| uint!(0))
|
|
||||||
+ uint!(1);
|
|
||||||
|
|
||||||
let mut unsigned = BTreeMap::new();
|
|
||||||
|
|
||||||
if let Some(prev_pdu) = db.rooms.room_state_get(room_id, &kind, &state_key)? {
|
|
||||||
unsigned.insert("prev_content".to_owned(), prev_pdu.content.clone());
|
|
||||||
unsigned.insert(
|
|
||||||
"prev_sender".to_owned(),
|
|
||||||
to_raw_value(&prev_pdu.sender).expect("UserId is valid"),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
let pdu = PduEvent {
|
|
||||||
event_id: ruma::event_id!("$thiswillbefilledinlater").into(),
|
|
||||||
room_id: room_id.to_owned(),
|
|
||||||
sender: sender_user.to_owned(),
|
|
||||||
origin_server_ts: utils::millis_since_unix_epoch()
|
|
||||||
.try_into()
|
|
||||||
.expect("time is valid"),
|
|
||||||
kind: kind.to_string().into(),
|
|
||||||
content,
|
|
||||||
state_key: Some(state_key),
|
|
||||||
prev_events,
|
|
||||||
depth,
|
|
||||||
auth_events: auth_events
|
|
||||||
.iter()
|
|
||||||
.map(|(_, pdu)| pdu.event_id.clone())
|
|
||||||
.collect(),
|
|
||||||
redacts: None,
|
|
||||||
unsigned: if unsigned.is_empty() {
|
|
||||||
None
|
|
||||||
} else {
|
|
||||||
Some(to_raw_value(&unsigned).expect("to_raw_value always works"))
|
|
||||||
},
|
|
||||||
hashes: EventHash {
|
|
||||||
sha256: "aaa".to_owned(),
|
|
||||||
},
|
|
||||||
signatures: None,
|
|
||||||
};
|
|
||||||
|
|
||||||
let auth_check = state_res::auth_check(
|
|
||||||
&room_version,
|
|
||||||
&pdu,
|
|
||||||
None::<PduEvent>, // TODO: third_party_invite
|
|
||||||
|k, s| auth_events.get(&(k.clone(), s.to_owned())),
|
|
||||||
)
|
|
||||||
.map_err(|e| {
|
|
||||||
error!("{:?}", e);
|
|
||||||
Error::bad_database("Auth check failed.")
|
|
||||||
})?;
|
|
||||||
|
|
||||||
if !auth_check {
|
|
||||||
return Err(Error::BadRequest(
|
|
||||||
ErrorKind::Forbidden,
|
|
||||||
"Event is not authorized.",
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Hash and sign
|
|
||||||
let mut pdu_json =
|
|
||||||
utils::to_canonical_object(&pdu).expect("event is valid, we just created it");
|
|
||||||
|
|
||||||
pdu_json.remove("event_id");
|
|
||||||
|
|
||||||
// Add origin because synapse likes that (and it's required in the spec)
|
|
||||||
pdu_json.insert(
|
|
||||||
"origin".to_owned(),
|
|
||||||
to_canonical_value(db.globals.server_name())
|
|
||||||
.expect("server name is a valid CanonicalJsonValue"),
|
|
||||||
);
|
|
||||||
|
|
||||||
ruma::signatures::hash_and_sign_event(
|
|
||||||
db.globals.server_name().as_str(),
|
|
||||||
db.globals.keypair(),
|
|
||||||
&mut pdu_json,
|
|
||||||
&room_version_id,
|
|
||||||
)
|
|
||||||
.expect("event is valid, we just created it");
|
|
||||||
|
|
||||||
let invite_room_state = db.rooms.calculate_invite_state(&pdu)?;
|
let invite_room_state = db.rooms.calculate_invite_state(&pdu)?;
|
||||||
|
|
||||||
|
|
File diff suppressed because it is too large
Load diff
File diff suppressed because it is too large
Load diff
|
@ -1,3 +1,12 @@
|
||||||
|
/// Returns the pdu from the outlier tree.
|
||||||
|
pub fn get_outlier_pdu_json(&self, event_id: &EventId) -> Result<Option<CanonicalJsonObject>> {
|
||||||
|
self.eventid_outlierpdu
|
||||||
|
.get(event_id.as_bytes())?
|
||||||
|
.map_or(Ok(None), |pdu| {
|
||||||
|
serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db."))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns the pdu from the outlier tree.
|
/// Returns the pdu from the outlier tree.
|
||||||
pub fn get_pdu_outlier(&self, event_id: &EventId) -> Result<Option<PduEvent>> {
|
pub fn get_pdu_outlier(&self, event_id: &EventId) -> Result<Option<PduEvent>> {
|
||||||
self.eventid_outlierpdu
|
self.eventid_outlierpdu
|
||||||
|
@ -8,8 +17,6 @@
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Append the PDU as an outlier.
|
/// Append the PDU as an outlier.
|
||||||
///
|
|
||||||
/// Any event given to this will be processed (state-res) on another thread.
|
|
||||||
#[tracing::instrument(skip(self, pdu))]
|
#[tracing::instrument(skip(self, pdu))]
|
||||||
pub fn add_pdu_outlier(&self, event_id: &EventId, pdu: &CanonicalJsonObject) -> Result<()> {
|
pub fn add_pdu_outlier(&self, event_id: &EventId, pdu: &CanonicalJsonObject) -> Result<()> {
|
||||||
self.eventid_outlierpdu.insert(
|
self.eventid_outlierpdu.insert(
|
|
@ -1,120 +1,6 @@
|
||||||
|
pub trait Data {
|
||||||
/// Builds a StateMap by iterating over all keys that start
|
fn get_room_shortstatehash(room_id: &RoomId);
|
||||||
/// with state_hash, this gives the full state for the given state_hash.
|
}
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub async fn state_full_ids(&self, shortstatehash: u64) -> Result<BTreeMap<u64, Arc<EventId>>> {
|
|
||||||
let full_state = self
|
|
||||||
.load_shortstatehash_info(shortstatehash)?
|
|
||||||
.pop()
|
|
||||||
.expect("there is always one layer")
|
|
||||||
.1;
|
|
||||||
let mut result = BTreeMap::new();
|
|
||||||
let mut i = 0;
|
|
||||||
for compressed in full_state.into_iter() {
|
|
||||||
let parsed = self.parse_compressed_state_event(compressed)?;
|
|
||||||
result.insert(parsed.0, parsed.1);
|
|
||||||
|
|
||||||
i += 1;
|
|
||||||
if i % 100 == 0 {
|
|
||||||
tokio::task::yield_now().await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(result)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub async fn state_full(
|
|
||||||
&self,
|
|
||||||
shortstatehash: u64,
|
|
||||||
) -> Result<HashMap<(StateEventType, String), Arc<PduEvent>>> {
|
|
||||||
let full_state = self
|
|
||||||
.load_shortstatehash_info(shortstatehash)?
|
|
||||||
.pop()
|
|
||||||
.expect("there is always one layer")
|
|
||||||
.1;
|
|
||||||
|
|
||||||
let mut result = HashMap::new();
|
|
||||||
let mut i = 0;
|
|
||||||
for compressed in full_state {
|
|
||||||
let (_, eventid) = self.parse_compressed_state_event(compressed)?;
|
|
||||||
if let Some(pdu) = self.get_pdu(&eventid)? {
|
|
||||||
result.insert(
|
|
||||||
(
|
|
||||||
pdu.kind.to_string().into(),
|
|
||||||
pdu.state_key
|
|
||||||
.as_ref()
|
|
||||||
.ok_or_else(|| Error::bad_database("State event has no state key."))?
|
|
||||||
.clone(),
|
|
||||||
),
|
|
||||||
pdu,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
i += 1;
|
|
||||||
if i % 100 == 0 {
|
|
||||||
tokio::task::yield_now().await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(result)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns a single PDU from `room_id` with key (`event_type`, `state_key`).
|
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub fn state_get_id(
|
|
||||||
&self,
|
|
||||||
shortstatehash: u64,
|
|
||||||
event_type: &StateEventType,
|
|
||||||
state_key: &str,
|
|
||||||
) -> Result<Option<Arc<EventId>>> {
|
|
||||||
let shortstatekey = match self.get_shortstatekey(event_type, state_key)? {
|
|
||||||
Some(s) => s,
|
|
||||||
None => return Ok(None),
|
|
||||||
};
|
|
||||||
let full_state = self
|
|
||||||
.load_shortstatehash_info(shortstatehash)?
|
|
||||||
.pop()
|
|
||||||
.expect("there is always one layer")
|
|
||||||
.1;
|
|
||||||
Ok(full_state
|
|
||||||
.into_iter()
|
|
||||||
.find(|bytes| bytes.starts_with(&shortstatekey.to_be_bytes()))
|
|
||||||
.and_then(|compressed| {
|
|
||||||
self.parse_compressed_state_event(compressed)
|
|
||||||
.ok()
|
|
||||||
.map(|(_, id)| id)
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns a single PDU from `room_id` with key (`event_type`, `state_key`).
|
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub fn state_get(
|
|
||||||
&self,
|
|
||||||
shortstatehash: u64,
|
|
||||||
event_type: &StateEventType,
|
|
||||||
state_key: &str,
|
|
||||||
) -> Result<Option<Arc<PduEvent>>> {
|
|
||||||
self.state_get_id(shortstatehash, event_type, state_key)?
|
|
||||||
.map_or(Ok(None), |event_id| self.get_pdu(&event_id))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the state hash for this pdu.
|
|
||||||
pub fn pdu_shortstatehash(&self, event_id: &EventId) -> Result<Option<u64>> {
|
|
||||||
self.eventid_shorteventid
|
|
||||||
.get(event_id.as_bytes())?
|
|
||||||
.map_or(Ok(None), |shorteventid| {
|
|
||||||
self.shorteventid_shortstatehash
|
|
||||||
.get(&shorteventid)?
|
|
||||||
.map(|bytes| {
|
|
||||||
utils::u64_from_bytes(&bytes).map_err(|_| {
|
|
||||||
Error::bad_database(
|
|
||||||
"Invalid shortstatehash bytes in shorteventid_shortstatehash",
|
|
||||||
)
|
|
||||||
})
|
|
||||||
})
|
|
||||||
.transpose()
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the last state hash key added to the db for the given room.
|
/// Returns the last state hash key added to the db for the given room.
|
||||||
#[tracing::instrument(skip(self))]
|
#[tracing::instrument(skip(self))]
|
||||||
|
@ -128,382 +14,3 @@
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Force the creation of a new StateHash and insert it into the db.
|
|
||||||
///
|
|
||||||
/// Whatever `state` is supplied to `force_state` becomes the new current room state snapshot.
|
|
||||||
#[tracing::instrument(skip(self, new_state_ids_compressed, db))]
|
|
||||||
pub fn force_state(
|
|
||||||
&self,
|
|
||||||
room_id: &RoomId,
|
|
||||||
new_state_ids_compressed: HashSet<CompressedStateEvent>,
|
|
||||||
db: &Database,
|
|
||||||
) -> Result<()> {
|
|
||||||
let previous_shortstatehash = self.current_shortstatehash(room_id)?;
|
|
||||||
|
|
||||||
let state_hash = self.calculate_hash(
|
|
||||||
&new_state_ids_compressed
|
|
||||||
.iter()
|
|
||||||
.map(|bytes| &bytes[..])
|
|
||||||
.collect::<Vec<_>>(),
|
|
||||||
);
|
|
||||||
|
|
||||||
let (new_shortstatehash, already_existed) =
|
|
||||||
self.get_or_create_shortstatehash(&state_hash, &db.globals)?;
|
|
||||||
|
|
||||||
if Some(new_shortstatehash) == previous_shortstatehash {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
let states_parents = previous_shortstatehash
|
|
||||||
.map_or_else(|| Ok(Vec::new()), |p| self.load_shortstatehash_info(p))?;
|
|
||||||
|
|
||||||
let (statediffnew, statediffremoved) = if let Some(parent_stateinfo) = states_parents.last()
|
|
||||||
{
|
|
||||||
let statediffnew: HashSet<_> = new_state_ids_compressed
|
|
||||||
.difference(&parent_stateinfo.1)
|
|
||||||
.copied()
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let statediffremoved: HashSet<_> = parent_stateinfo
|
|
||||||
.1
|
|
||||||
.difference(&new_state_ids_compressed)
|
|
||||||
.copied()
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
(statediffnew, statediffremoved)
|
|
||||||
} else {
|
|
||||||
(new_state_ids_compressed, HashSet::new())
|
|
||||||
};
|
|
||||||
|
|
||||||
if !already_existed {
|
|
||||||
self.save_state_from_diff(
|
|
||||||
new_shortstatehash,
|
|
||||||
statediffnew.clone(),
|
|
||||||
statediffremoved,
|
|
||||||
2, // every state change is 2 event changes on average
|
|
||||||
states_parents,
|
|
||||||
)?;
|
|
||||||
};
|
|
||||||
|
|
||||||
for event_id in statediffnew.into_iter().filter_map(|new| {
|
|
||||||
self.parse_compressed_state_event(new)
|
|
||||||
.ok()
|
|
||||||
.map(|(_, id)| id)
|
|
||||||
}) {
|
|
||||||
let pdu = match self.get_pdu_json(&event_id)? {
|
|
||||||
Some(pdu) => pdu,
|
|
||||||
None => continue,
|
|
||||||
};
|
|
||||||
|
|
||||||
if pdu.get("type").and_then(|val| val.as_str()) != Some("m.room.member") {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
let pdu: PduEvent = match serde_json::from_str(
|
|
||||||
&serde_json::to_string(&pdu).expect("CanonicalJsonObj can be serialized to JSON"),
|
|
||||||
) {
|
|
||||||
Ok(pdu) => pdu,
|
|
||||||
Err(_) => continue,
|
|
||||||
};
|
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
|
||||||
struct ExtractMembership {
|
|
||||||
membership: MembershipState,
|
|
||||||
}
|
|
||||||
|
|
||||||
let membership = match serde_json::from_str::<ExtractMembership>(pdu.content.get()) {
|
|
||||||
Ok(e) => e.membership,
|
|
||||||
Err(_) => continue,
|
|
||||||
};
|
|
||||||
|
|
||||||
let state_key = match pdu.state_key {
|
|
||||||
Some(k) => k,
|
|
||||||
None => continue,
|
|
||||||
};
|
|
||||||
|
|
||||||
let user_id = match UserId::parse(state_key) {
|
|
||||||
Ok(id) => id,
|
|
||||||
Err(_) => continue,
|
|
||||||
};
|
|
||||||
|
|
||||||
self.update_membership(room_id, &user_id, membership, &pdu.sender, None, db, false)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
self.update_joined_count(room_id, db)?;
|
|
||||||
|
|
||||||
self.roomid_shortstatehash
|
|
||||||
.insert(room_id.as_bytes(), &new_shortstatehash.to_be_bytes())?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the full room state.
|
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub async fn room_state_full(
|
|
||||||
&self,
|
|
||||||
room_id: &RoomId,
|
|
||||||
) -> Result<HashMap<(StateEventType, String), Arc<PduEvent>>> {
|
|
||||||
if let Some(current_shortstatehash) = self.current_shortstatehash(room_id)? {
|
|
||||||
self.state_full(current_shortstatehash).await
|
|
||||||
} else {
|
|
||||||
Ok(HashMap::new())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns a single PDU from `room_id` with key (`event_type`, `state_key`).
|
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub fn room_state_get_id(
|
|
||||||
&self,
|
|
||||||
room_id: &RoomId,
|
|
||||||
event_type: &StateEventType,
|
|
||||||
state_key: &str,
|
|
||||||
) -> Result<Option<Arc<EventId>>> {
|
|
||||||
if let Some(current_shortstatehash) = self.current_shortstatehash(room_id)? {
|
|
||||||
self.state_get_id(current_shortstatehash, event_type, state_key)
|
|
||||||
} else {
|
|
||||||
Ok(None)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns a single PDU from `room_id` with key (`event_type`, `state_key`).
|
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub fn room_state_get(
|
|
||||||
&self,
|
|
||||||
room_id: &RoomId,
|
|
||||||
event_type: &StateEventType,
|
|
||||||
state_key: &str,
|
|
||||||
) -> Result<Option<Arc<PduEvent>>> {
|
|
||||||
if let Some(current_shortstatehash) = self.current_shortstatehash(room_id)? {
|
|
||||||
self.state_get(current_shortstatehash, event_type, state_key)
|
|
||||||
} else {
|
|
||||||
Ok(None)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the leaf pdus of a room.
|
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub fn get_pdu_leaves(&self, room_id: &RoomId) -> Result<HashSet<Arc<EventId>>> {
|
|
||||||
let mut prefix = room_id.as_bytes().to_vec();
|
|
||||||
prefix.push(0xff);
|
|
||||||
|
|
||||||
self.roomid_pduleaves
|
|
||||||
.scan_prefix(prefix)
|
|
||||||
.map(|(_, bytes)| {
|
|
||||||
EventId::parse_arc(utils::string_from_bytes(&bytes).map_err(|_| {
|
|
||||||
Error::bad_database("EventID in roomid_pduleaves is invalid unicode.")
|
|
||||||
})?)
|
|
||||||
.map_err(|_| Error::bad_database("EventId in roomid_pduleaves is invalid."))
|
|
||||||
})
|
|
||||||
.collect()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Replace the leaves of a room.
|
|
||||||
///
|
|
||||||
/// The provided `event_ids` become the new leaves, this allows a room to have multiple
|
|
||||||
/// `prev_events`.
|
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub fn replace_pdu_leaves<'a>(
|
|
||||||
&self,
|
|
||||||
room_id: &RoomId,
|
|
||||||
event_ids: impl IntoIterator<Item = &'a EventId> + Debug,
|
|
||||||
) -> Result<()> {
|
|
||||||
let mut prefix = room_id.as_bytes().to_vec();
|
|
||||||
prefix.push(0xff);
|
|
||||||
|
|
||||||
for (key, _) in self.roomid_pduleaves.scan_prefix(prefix.clone()) {
|
|
||||||
self.roomid_pduleaves.remove(&key)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
for event_id in event_ids {
|
|
||||||
let mut key = prefix.to_owned();
|
|
||||||
key.extend_from_slice(event_id.as_bytes());
|
|
||||||
self.roomid_pduleaves.insert(&key, event_id.as_bytes())?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Generates a new StateHash and associates it with the incoming event.
|
|
||||||
///
|
|
||||||
/// This adds all current state events (not including the incoming event)
|
|
||||||
/// to `stateid_pduid` and adds the incoming event to `eventid_statehash`.
|
|
||||||
#[tracing::instrument(skip(self, state_ids_compressed, globals))]
|
|
||||||
pub fn set_event_state(
|
|
||||||
&self,
|
|
||||||
event_id: &EventId,
|
|
||||||
room_id: &RoomId,
|
|
||||||
state_ids_compressed: HashSet<CompressedStateEvent>,
|
|
||||||
globals: &super::globals::Globals,
|
|
||||||
) -> Result<()> {
|
|
||||||
let shorteventid = self.get_or_create_shorteventid(event_id, globals)?;
|
|
||||||
|
|
||||||
let previous_shortstatehash = self.current_shortstatehash(room_id)?;
|
|
||||||
|
|
||||||
let state_hash = self.calculate_hash(
|
|
||||||
&state_ids_compressed
|
|
||||||
.iter()
|
|
||||||
.map(|s| &s[..])
|
|
||||||
.collect::<Vec<_>>(),
|
|
||||||
);
|
|
||||||
|
|
||||||
let (shortstatehash, already_existed) =
|
|
||||||
self.get_or_create_shortstatehash(&state_hash, globals)?;
|
|
||||||
|
|
||||||
if !already_existed {
|
|
||||||
let states_parents = previous_shortstatehash
|
|
||||||
.map_or_else(|| Ok(Vec::new()), |p| self.load_shortstatehash_info(p))?;
|
|
||||||
|
|
||||||
let (statediffnew, statediffremoved) =
|
|
||||||
if let Some(parent_stateinfo) = states_parents.last() {
|
|
||||||
let statediffnew: HashSet<_> = state_ids_compressed
|
|
||||||
.difference(&parent_stateinfo.1)
|
|
||||||
.copied()
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let statediffremoved: HashSet<_> = parent_stateinfo
|
|
||||||
.1
|
|
||||||
.difference(&state_ids_compressed)
|
|
||||||
.copied()
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
(statediffnew, statediffremoved)
|
|
||||||
} else {
|
|
||||||
(state_ids_compressed, HashSet::new())
|
|
||||||
};
|
|
||||||
self.save_state_from_diff(
|
|
||||||
shortstatehash,
|
|
||||||
statediffnew,
|
|
||||||
statediffremoved,
|
|
||||||
1_000_000, // high number because no state will be based on this one
|
|
||||||
states_parents,
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
self.shorteventid_shortstatehash
|
|
||||||
.insert(&shorteventid.to_be_bytes(), &shortstatehash.to_be_bytes())?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Generates a new StateHash and associates it with the incoming event.
|
|
||||||
///
|
|
||||||
/// This adds all current state events (not including the incoming event)
|
|
||||||
/// to `stateid_pduid` and adds the incoming event to `eventid_statehash`.
|
|
||||||
#[tracing::instrument(skip(self, new_pdu, globals))]
|
|
||||||
pub fn append_to_state(
|
|
||||||
&self,
|
|
||||||
new_pdu: &PduEvent,
|
|
||||||
globals: &super::globals::Globals,
|
|
||||||
) -> Result<u64> {
|
|
||||||
let shorteventid = self.get_or_create_shorteventid(&new_pdu.event_id, globals)?;
|
|
||||||
|
|
||||||
let previous_shortstatehash = self.current_shortstatehash(&new_pdu.room_id)?;
|
|
||||||
|
|
||||||
if let Some(p) = previous_shortstatehash {
|
|
||||||
self.shorteventid_shortstatehash
|
|
||||||
.insert(&shorteventid.to_be_bytes(), &p.to_be_bytes())?;
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(state_key) = &new_pdu.state_key {
|
|
||||||
let states_parents = previous_shortstatehash
|
|
||||||
.map_or_else(|| Ok(Vec::new()), |p| self.load_shortstatehash_info(p))?;
|
|
||||||
|
|
||||||
let shortstatekey = self.get_or_create_shortstatekey(
|
|
||||||
&new_pdu.kind.to_string().into(),
|
|
||||||
state_key,
|
|
||||||
globals,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
let new = self.compress_state_event(shortstatekey, &new_pdu.event_id, globals)?;
|
|
||||||
|
|
||||||
let replaces = states_parents
|
|
||||||
.last()
|
|
||||||
.map(|info| {
|
|
||||||
info.1
|
|
||||||
.iter()
|
|
||||||
.find(|bytes| bytes.starts_with(&shortstatekey.to_be_bytes()))
|
|
||||||
})
|
|
||||||
.unwrap_or_default();
|
|
||||||
|
|
||||||
if Some(&new) == replaces {
|
|
||||||
return Ok(previous_shortstatehash.expect("must exist"));
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: statehash with deterministic inputs
|
|
||||||
let shortstatehash = globals.next_count()?;
|
|
||||||
|
|
||||||
let mut statediffnew = HashSet::new();
|
|
||||||
statediffnew.insert(new);
|
|
||||||
|
|
||||||
let mut statediffremoved = HashSet::new();
|
|
||||||
if let Some(replaces) = replaces {
|
|
||||||
statediffremoved.insert(*replaces);
|
|
||||||
}
|
|
||||||
|
|
||||||
self.save_state_from_diff(
|
|
||||||
shortstatehash,
|
|
||||||
statediffnew,
|
|
||||||
statediffremoved,
|
|
||||||
2,
|
|
||||||
states_parents,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
Ok(shortstatehash)
|
|
||||||
} else {
|
|
||||||
Ok(previous_shortstatehash.expect("first event in room must be a state event"))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tracing::instrument(skip(self, invite_event))]
|
|
||||||
pub fn calculate_invite_state(
|
|
||||||
&self,
|
|
||||||
invite_event: &PduEvent,
|
|
||||||
) -> Result<Vec<Raw<AnyStrippedStateEvent>>> {
|
|
||||||
let mut state = Vec::new();
|
|
||||||
// Add recommended events
|
|
||||||
if let Some(e) =
|
|
||||||
self.room_state_get(&invite_event.room_id, &StateEventType::RoomCreate, "")?
|
|
||||||
{
|
|
||||||
state.push(e.to_stripped_state_event());
|
|
||||||
}
|
|
||||||
if let Some(e) =
|
|
||||||
self.room_state_get(&invite_event.room_id, &StateEventType::RoomJoinRules, "")?
|
|
||||||
{
|
|
||||||
state.push(e.to_stripped_state_event());
|
|
||||||
}
|
|
||||||
if let Some(e) = self.room_state_get(
|
|
||||||
&invite_event.room_id,
|
|
||||||
&StateEventType::RoomCanonicalAlias,
|
|
||||||
"",
|
|
||||||
)? {
|
|
||||||
state.push(e.to_stripped_state_event());
|
|
||||||
}
|
|
||||||
if let Some(e) =
|
|
||||||
self.room_state_get(&invite_event.room_id, &StateEventType::RoomAvatar, "")?
|
|
||||||
{
|
|
||||||
state.push(e.to_stripped_state_event());
|
|
||||||
}
|
|
||||||
if let Some(e) =
|
|
||||||
self.room_state_get(&invite_event.room_id, &StateEventType::RoomName, "")?
|
|
||||||
{
|
|
||||||
state.push(e.to_stripped_state_event());
|
|
||||||
}
|
|
||||||
if let Some(e) = self.room_state_get(
|
|
||||||
&invite_event.room_id,
|
|
||||||
&StateEventType::RoomMember,
|
|
||||||
invite_event.sender.as_str(),
|
|
||||||
)? {
|
|
||||||
state.push(e.to_stripped_state_event());
|
|
||||||
}
|
|
||||||
|
|
||||||
state.push(invite_event.to_stripped_state_event());
|
|
||||||
Ok(state)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub fn set_room_state(&self, room_id: &RoomId, shortstatehash: u64) -> Result<()> {
|
|
||||||
self.roomid_shortstatehash
|
|
||||||
.insert(room_id.as_bytes(), &shortstatehash.to_be_bytes())?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
|
@ -1,133 +1,8 @@
|
||||||
|
pub struct Service<D: Data> {
|
||||||
|
db: D,
|
||||||
|
}
|
||||||
|
|
||||||
/// Builds a StateMap by iterating over all keys that start
|
impl Service {
|
||||||
/// with state_hash, this gives the full state for the given state_hash.
|
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub async fn state_full_ids(&self, shortstatehash: u64) -> Result<BTreeMap<u64, Arc<EventId>>> {
|
|
||||||
let full_state = self
|
|
||||||
.load_shortstatehash_info(shortstatehash)?
|
|
||||||
.pop()
|
|
||||||
.expect("there is always one layer")
|
|
||||||
.1;
|
|
||||||
let mut result = BTreeMap::new();
|
|
||||||
let mut i = 0;
|
|
||||||
for compressed in full_state.into_iter() {
|
|
||||||
let parsed = self.parse_compressed_state_event(compressed)?;
|
|
||||||
result.insert(parsed.0, parsed.1);
|
|
||||||
|
|
||||||
i += 1;
|
|
||||||
if i % 100 == 0 {
|
|
||||||
tokio::task::yield_now().await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(result)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub async fn state_full(
|
|
||||||
&self,
|
|
||||||
shortstatehash: u64,
|
|
||||||
) -> Result<HashMap<(StateEventType, String), Arc<PduEvent>>> {
|
|
||||||
let full_state = self
|
|
||||||
.load_shortstatehash_info(shortstatehash)?
|
|
||||||
.pop()
|
|
||||||
.expect("there is always one layer")
|
|
||||||
.1;
|
|
||||||
|
|
||||||
let mut result = HashMap::new();
|
|
||||||
let mut i = 0;
|
|
||||||
for compressed in full_state {
|
|
||||||
let (_, eventid) = self.parse_compressed_state_event(compressed)?;
|
|
||||||
if let Some(pdu) = self.get_pdu(&eventid)? {
|
|
||||||
result.insert(
|
|
||||||
(
|
|
||||||
pdu.kind.to_string().into(),
|
|
||||||
pdu.state_key
|
|
||||||
.as_ref()
|
|
||||||
.ok_or_else(|| Error::bad_database("State event has no state key."))?
|
|
||||||
.clone(),
|
|
||||||
),
|
|
||||||
pdu,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
i += 1;
|
|
||||||
if i % 100 == 0 {
|
|
||||||
tokio::task::yield_now().await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(result)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns a single PDU from `room_id` with key (`event_type`, `state_key`).
|
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub fn state_get_id(
|
|
||||||
&self,
|
|
||||||
shortstatehash: u64,
|
|
||||||
event_type: &StateEventType,
|
|
||||||
state_key: &str,
|
|
||||||
) -> Result<Option<Arc<EventId>>> {
|
|
||||||
let shortstatekey = match self.get_shortstatekey(event_type, state_key)? {
|
|
||||||
Some(s) => s,
|
|
||||||
None => return Ok(None),
|
|
||||||
};
|
|
||||||
let full_state = self
|
|
||||||
.load_shortstatehash_info(shortstatehash)?
|
|
||||||
.pop()
|
|
||||||
.expect("there is always one layer")
|
|
||||||
.1;
|
|
||||||
Ok(full_state
|
|
||||||
.into_iter()
|
|
||||||
.find(|bytes| bytes.starts_with(&shortstatekey.to_be_bytes()))
|
|
||||||
.and_then(|compressed| {
|
|
||||||
self.parse_compressed_state_event(compressed)
|
|
||||||
.ok()
|
|
||||||
.map(|(_, id)| id)
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns a single PDU from `room_id` with key (`event_type`, `state_key`).
|
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub fn state_get(
|
|
||||||
&self,
|
|
||||||
shortstatehash: u64,
|
|
||||||
event_type: &StateEventType,
|
|
||||||
state_key: &str,
|
|
||||||
) -> Result<Option<Arc<PduEvent>>> {
|
|
||||||
self.state_get_id(shortstatehash, event_type, state_key)?
|
|
||||||
.map_or(Ok(None), |event_id| self.get_pdu(&event_id))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the state hash for this pdu.
|
|
||||||
pub fn pdu_shortstatehash(&self, event_id: &EventId) -> Result<Option<u64>> {
|
|
||||||
self.eventid_shorteventid
|
|
||||||
.get(event_id.as_bytes())?
|
|
||||||
.map_or(Ok(None), |shorteventid| {
|
|
||||||
self.shorteventid_shortstatehash
|
|
||||||
.get(&shorteventid)?
|
|
||||||
.map(|bytes| {
|
|
||||||
utils::u64_from_bytes(&bytes).map_err(|_| {
|
|
||||||
Error::bad_database(
|
|
||||||
"Invalid shortstatehash bytes in shorteventid_shortstatehash",
|
|
||||||
)
|
|
||||||
})
|
|
||||||
})
|
|
||||||
.transpose()
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the last state hash key added to the db for the given room.
|
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub fn current_shortstatehash(&self, room_id: &RoomId) -> Result<Option<u64>> {
|
|
||||||
self.roomid_shortstatehash
|
|
||||||
.get(room_id.as_bytes())?
|
|
||||||
.map_or(Ok(None), |bytes| {
|
|
||||||
Ok(Some(utils::u64_from_bytes(&bytes).map_err(|_| {
|
|
||||||
Error::bad_database("Invalid shortstatehash in roomid_shortstatehash")
|
|
||||||
})?))
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Force the creation of a new StateHash and insert it into the db.
|
/// Force the creation of a new StateHash and insert it into the db.
|
||||||
///
|
///
|
||||||
/// Whatever `state` is supplied to `force_state` becomes the new current room state snapshot.
|
/// Whatever `state` is supplied to `force_state` becomes the new current room state snapshot.
|
||||||
|
@ -138,7 +13,7 @@
|
||||||
new_state_ids_compressed: HashSet<CompressedStateEvent>,
|
new_state_ids_compressed: HashSet<CompressedStateEvent>,
|
||||||
db: &Database,
|
db: &Database,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let previous_shortstatehash = self.current_shortstatehash(room_id)?;
|
let previous_shortstatehash = self.d.current_shortstatehash(room_id)?;
|
||||||
|
|
||||||
let state_hash = self.calculate_hash(
|
let state_hash = self.calculate_hash(
|
||||||
&new_state_ids_compressed
|
&new_state_ids_compressed
|
||||||
|
@ -237,49 +112,6 @@
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the full room state.
|
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub async fn room_state_full(
|
|
||||||
&self,
|
|
||||||
room_id: &RoomId,
|
|
||||||
) -> Result<HashMap<(StateEventType, String), Arc<PduEvent>>> {
|
|
||||||
if let Some(current_shortstatehash) = self.current_shortstatehash(room_id)? {
|
|
||||||
self.state_full(current_shortstatehash).await
|
|
||||||
} else {
|
|
||||||
Ok(HashMap::new())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns a single PDU from `room_id` with key (`event_type`, `state_key`).
|
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub fn room_state_get_id(
|
|
||||||
&self,
|
|
||||||
room_id: &RoomId,
|
|
||||||
event_type: &StateEventType,
|
|
||||||
state_key: &str,
|
|
||||||
) -> Result<Option<Arc<EventId>>> {
|
|
||||||
if let Some(current_shortstatehash) = self.current_shortstatehash(room_id)? {
|
|
||||||
self.state_get_id(current_shortstatehash, event_type, state_key)
|
|
||||||
} else {
|
|
||||||
Ok(None)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns a single PDU from `room_id` with key (`event_type`, `state_key`).
|
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub fn room_state_get(
|
|
||||||
&self,
|
|
||||||
room_id: &RoomId,
|
|
||||||
event_type: &StateEventType,
|
|
||||||
state_key: &str,
|
|
||||||
) -> Result<Option<Arc<PduEvent>>> {
|
|
||||||
if let Some(current_shortstatehash) = self.current_shortstatehash(room_id)? {
|
|
||||||
self.state_get(current_shortstatehash, event_type, state_key)
|
|
||||||
} else {
|
|
||||||
Ok(None)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the leaf pdus of a room.
|
/// Returns the leaf pdus of a room.
|
||||||
#[tracing::instrument(skip(self))]
|
#[tracing::instrument(skip(self))]
|
||||||
pub fn get_pdu_leaves(&self, room_id: &RoomId) -> Result<HashSet<Arc<EventId>>> {
|
pub fn get_pdu_leaves(&self, room_id: &RoomId) -> Result<HashSet<Arc<EventId>>> {
|
||||||
|
@ -507,3 +339,4 @@
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
|
|
||||||
/// Builds a StateMap by iterating over all keys that start
|
/// Builds a StateMap by iterating over all keys that start
|
||||||
/// with state_hash, this gives the full state for the given state_hash.
|
/// with state_hash, this gives the full state for the given state_hash.
|
||||||
#[tracing::instrument(skip(self))]
|
#[tracing::instrument(skip(self))]
|
||||||
|
@ -116,127 +115,6 @@
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the last state hash key added to the db for the given room.
|
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub fn current_shortstatehash(&self, room_id: &RoomId) -> Result<Option<u64>> {
|
|
||||||
self.roomid_shortstatehash
|
|
||||||
.get(room_id.as_bytes())?
|
|
||||||
.map_or(Ok(None), |bytes| {
|
|
||||||
Ok(Some(utils::u64_from_bytes(&bytes).map_err(|_| {
|
|
||||||
Error::bad_database("Invalid shortstatehash in roomid_shortstatehash")
|
|
||||||
})?))
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Force the creation of a new StateHash and insert it into the db.
|
|
||||||
///
|
|
||||||
/// Whatever `state` is supplied to `force_state` becomes the new current room state snapshot.
|
|
||||||
#[tracing::instrument(skip(self, new_state_ids_compressed, db))]
|
|
||||||
pub fn force_state(
|
|
||||||
&self,
|
|
||||||
room_id: &RoomId,
|
|
||||||
new_state_ids_compressed: HashSet<CompressedStateEvent>,
|
|
||||||
db: &Database,
|
|
||||||
) -> Result<()> {
|
|
||||||
let previous_shortstatehash = self.current_shortstatehash(room_id)?;
|
|
||||||
|
|
||||||
let state_hash = self.calculate_hash(
|
|
||||||
&new_state_ids_compressed
|
|
||||||
.iter()
|
|
||||||
.map(|bytes| &bytes[..])
|
|
||||||
.collect::<Vec<_>>(),
|
|
||||||
);
|
|
||||||
|
|
||||||
let (new_shortstatehash, already_existed) =
|
|
||||||
self.get_or_create_shortstatehash(&state_hash, &db.globals)?;
|
|
||||||
|
|
||||||
if Some(new_shortstatehash) == previous_shortstatehash {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
let states_parents = previous_shortstatehash
|
|
||||||
.map_or_else(|| Ok(Vec::new()), |p| self.load_shortstatehash_info(p))?;
|
|
||||||
|
|
||||||
let (statediffnew, statediffremoved) = if let Some(parent_stateinfo) = states_parents.last()
|
|
||||||
{
|
|
||||||
let statediffnew: HashSet<_> = new_state_ids_compressed
|
|
||||||
.difference(&parent_stateinfo.1)
|
|
||||||
.copied()
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let statediffremoved: HashSet<_> = parent_stateinfo
|
|
||||||
.1
|
|
||||||
.difference(&new_state_ids_compressed)
|
|
||||||
.copied()
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
(statediffnew, statediffremoved)
|
|
||||||
} else {
|
|
||||||
(new_state_ids_compressed, HashSet::new())
|
|
||||||
};
|
|
||||||
|
|
||||||
if !already_existed {
|
|
||||||
self.save_state_from_diff(
|
|
||||||
new_shortstatehash,
|
|
||||||
statediffnew.clone(),
|
|
||||||
statediffremoved,
|
|
||||||
2, // every state change is 2 event changes on average
|
|
||||||
states_parents,
|
|
||||||
)?;
|
|
||||||
};
|
|
||||||
|
|
||||||
for event_id in statediffnew.into_iter().filter_map(|new| {
|
|
||||||
self.parse_compressed_state_event(new)
|
|
||||||
.ok()
|
|
||||||
.map(|(_, id)| id)
|
|
||||||
}) {
|
|
||||||
let pdu = match self.get_pdu_json(&event_id)? {
|
|
||||||
Some(pdu) => pdu,
|
|
||||||
None => continue,
|
|
||||||
};
|
|
||||||
|
|
||||||
if pdu.get("type").and_then(|val| val.as_str()) != Some("m.room.member") {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
let pdu: PduEvent = match serde_json::from_str(
|
|
||||||
&serde_json::to_string(&pdu).expect("CanonicalJsonObj can be serialized to JSON"),
|
|
||||||
) {
|
|
||||||
Ok(pdu) => pdu,
|
|
||||||
Err(_) => continue,
|
|
||||||
};
|
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
|
||||||
struct ExtractMembership {
|
|
||||||
membership: MembershipState,
|
|
||||||
}
|
|
||||||
|
|
||||||
let membership = match serde_json::from_str::<ExtractMembership>(pdu.content.get()) {
|
|
||||||
Ok(e) => e.membership,
|
|
||||||
Err(_) => continue,
|
|
||||||
};
|
|
||||||
|
|
||||||
let state_key = match pdu.state_key {
|
|
||||||
Some(k) => k,
|
|
||||||
None => continue,
|
|
||||||
};
|
|
||||||
|
|
||||||
let user_id = match UserId::parse(state_key) {
|
|
||||||
Ok(id) => id,
|
|
||||||
Err(_) => continue,
|
|
||||||
};
|
|
||||||
|
|
||||||
self.update_membership(room_id, &user_id, membership, &pdu.sender, None, db, false)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
self.update_joined_count(room_id, db)?;
|
|
||||||
|
|
||||||
self.roomid_shortstatehash
|
|
||||||
.insert(room_id.as_bytes(), &new_shortstatehash.to_be_bytes())?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the full room state.
|
/// Returns the full room state.
|
||||||
#[tracing::instrument(skip(self))]
|
#[tracing::instrument(skip(self))]
|
||||||
pub async fn room_state_full(
|
pub async fn room_state_full(
|
||||||
|
@ -280,230 +158,3 @@
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the leaf pdus of a room.
|
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub fn get_pdu_leaves(&self, room_id: &RoomId) -> Result<HashSet<Arc<EventId>>> {
|
|
||||||
let mut prefix = room_id.as_bytes().to_vec();
|
|
||||||
prefix.push(0xff);
|
|
||||||
|
|
||||||
self.roomid_pduleaves
|
|
||||||
.scan_prefix(prefix)
|
|
||||||
.map(|(_, bytes)| {
|
|
||||||
EventId::parse_arc(utils::string_from_bytes(&bytes).map_err(|_| {
|
|
||||||
Error::bad_database("EventID in roomid_pduleaves is invalid unicode.")
|
|
||||||
})?)
|
|
||||||
.map_err(|_| Error::bad_database("EventId in roomid_pduleaves is invalid."))
|
|
||||||
})
|
|
||||||
.collect()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Replace the leaves of a room.
|
|
||||||
///
|
|
||||||
/// The provided `event_ids` become the new leaves, this allows a room to have multiple
|
|
||||||
/// `prev_events`.
|
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub fn replace_pdu_leaves<'a>(
|
|
||||||
&self,
|
|
||||||
room_id: &RoomId,
|
|
||||||
event_ids: impl IntoIterator<Item = &'a EventId> + Debug,
|
|
||||||
) -> Result<()> {
|
|
||||||
let mut prefix = room_id.as_bytes().to_vec();
|
|
||||||
prefix.push(0xff);
|
|
||||||
|
|
||||||
for (key, _) in self.roomid_pduleaves.scan_prefix(prefix.clone()) {
|
|
||||||
self.roomid_pduleaves.remove(&key)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
for event_id in event_ids {
|
|
||||||
let mut key = prefix.to_owned();
|
|
||||||
key.extend_from_slice(event_id.as_bytes());
|
|
||||||
self.roomid_pduleaves.insert(&key, event_id.as_bytes())?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Generates a new StateHash and associates it with the incoming event.
|
|
||||||
///
|
|
||||||
/// This adds all current state events (not including the incoming event)
|
|
||||||
/// to `stateid_pduid` and adds the incoming event to `eventid_statehash`.
|
|
||||||
#[tracing::instrument(skip(self, state_ids_compressed, globals))]
|
|
||||||
pub fn set_event_state(
|
|
||||||
&self,
|
|
||||||
event_id: &EventId,
|
|
||||||
room_id: &RoomId,
|
|
||||||
state_ids_compressed: HashSet<CompressedStateEvent>,
|
|
||||||
globals: &super::globals::Globals,
|
|
||||||
) -> Result<()> {
|
|
||||||
let shorteventid = self.get_or_create_shorteventid(event_id, globals)?;
|
|
||||||
|
|
||||||
let previous_shortstatehash = self.current_shortstatehash(room_id)?;
|
|
||||||
|
|
||||||
let state_hash = self.calculate_hash(
|
|
||||||
&state_ids_compressed
|
|
||||||
.iter()
|
|
||||||
.map(|s| &s[..])
|
|
||||||
.collect::<Vec<_>>(),
|
|
||||||
);
|
|
||||||
|
|
||||||
let (shortstatehash, already_existed) =
|
|
||||||
self.get_or_create_shortstatehash(&state_hash, globals)?;
|
|
||||||
|
|
||||||
if !already_existed {
|
|
||||||
let states_parents = previous_shortstatehash
|
|
||||||
.map_or_else(|| Ok(Vec::new()), |p| self.load_shortstatehash_info(p))?;
|
|
||||||
|
|
||||||
let (statediffnew, statediffremoved) =
|
|
||||||
if let Some(parent_stateinfo) = states_parents.last() {
|
|
||||||
let statediffnew: HashSet<_> = state_ids_compressed
|
|
||||||
.difference(&parent_stateinfo.1)
|
|
||||||
.copied()
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let statediffremoved: HashSet<_> = parent_stateinfo
|
|
||||||
.1
|
|
||||||
.difference(&state_ids_compressed)
|
|
||||||
.copied()
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
(statediffnew, statediffremoved)
|
|
||||||
} else {
|
|
||||||
(state_ids_compressed, HashSet::new())
|
|
||||||
};
|
|
||||||
self.save_state_from_diff(
|
|
||||||
shortstatehash,
|
|
||||||
statediffnew,
|
|
||||||
statediffremoved,
|
|
||||||
1_000_000, // high number because no state will be based on this one
|
|
||||||
states_parents,
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
self.shorteventid_shortstatehash
|
|
||||||
.insert(&shorteventid.to_be_bytes(), &shortstatehash.to_be_bytes())?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Generates a new StateHash and associates it with the incoming event.
|
|
||||||
///
|
|
||||||
/// This adds all current state events (not including the incoming event)
|
|
||||||
/// to `stateid_pduid` and adds the incoming event to `eventid_statehash`.
|
|
||||||
#[tracing::instrument(skip(self, new_pdu, globals))]
|
|
||||||
pub fn append_to_state(
|
|
||||||
&self,
|
|
||||||
new_pdu: &PduEvent,
|
|
||||||
globals: &super::globals::Globals,
|
|
||||||
) -> Result<u64> {
|
|
||||||
let shorteventid = self.get_or_create_shorteventid(&new_pdu.event_id, globals)?;
|
|
||||||
|
|
||||||
let previous_shortstatehash = self.current_shortstatehash(&new_pdu.room_id)?;
|
|
||||||
|
|
||||||
if let Some(p) = previous_shortstatehash {
|
|
||||||
self.shorteventid_shortstatehash
|
|
||||||
.insert(&shorteventid.to_be_bytes(), &p.to_be_bytes())?;
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(state_key) = &new_pdu.state_key {
|
|
||||||
let states_parents = previous_shortstatehash
|
|
||||||
.map_or_else(|| Ok(Vec::new()), |p| self.load_shortstatehash_info(p))?;
|
|
||||||
|
|
||||||
let shortstatekey = self.get_or_create_shortstatekey(
|
|
||||||
&new_pdu.kind.to_string().into(),
|
|
||||||
state_key,
|
|
||||||
globals,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
let new = self.compress_state_event(shortstatekey, &new_pdu.event_id, globals)?;
|
|
||||||
|
|
||||||
let replaces = states_parents
|
|
||||||
.last()
|
|
||||||
.map(|info| {
|
|
||||||
info.1
|
|
||||||
.iter()
|
|
||||||
.find(|bytes| bytes.starts_with(&shortstatekey.to_be_bytes()))
|
|
||||||
})
|
|
||||||
.unwrap_or_default();
|
|
||||||
|
|
||||||
if Some(&new) == replaces {
|
|
||||||
return Ok(previous_shortstatehash.expect("must exist"));
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: statehash with deterministic inputs
|
|
||||||
let shortstatehash = globals.next_count()?;
|
|
||||||
|
|
||||||
let mut statediffnew = HashSet::new();
|
|
||||||
statediffnew.insert(new);
|
|
||||||
|
|
||||||
let mut statediffremoved = HashSet::new();
|
|
||||||
if let Some(replaces) = replaces {
|
|
||||||
statediffremoved.insert(*replaces);
|
|
||||||
}
|
|
||||||
|
|
||||||
self.save_state_from_diff(
|
|
||||||
shortstatehash,
|
|
||||||
statediffnew,
|
|
||||||
statediffremoved,
|
|
||||||
2,
|
|
||||||
states_parents,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
Ok(shortstatehash)
|
|
||||||
} else {
|
|
||||||
Ok(previous_shortstatehash.expect("first event in room must be a state event"))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tracing::instrument(skip(self, invite_event))]
|
|
||||||
pub fn calculate_invite_state(
|
|
||||||
&self,
|
|
||||||
invite_event: &PduEvent,
|
|
||||||
) -> Result<Vec<Raw<AnyStrippedStateEvent>>> {
|
|
||||||
let mut state = Vec::new();
|
|
||||||
// Add recommended events
|
|
||||||
if let Some(e) =
|
|
||||||
self.room_state_get(&invite_event.room_id, &StateEventType::RoomCreate, "")?
|
|
||||||
{
|
|
||||||
state.push(e.to_stripped_state_event());
|
|
||||||
}
|
|
||||||
if let Some(e) =
|
|
||||||
self.room_state_get(&invite_event.room_id, &StateEventType::RoomJoinRules, "")?
|
|
||||||
{
|
|
||||||
state.push(e.to_stripped_state_event());
|
|
||||||
}
|
|
||||||
if let Some(e) = self.room_state_get(
|
|
||||||
&invite_event.room_id,
|
|
||||||
&StateEventType::RoomCanonicalAlias,
|
|
||||||
"",
|
|
||||||
)? {
|
|
||||||
state.push(e.to_stripped_state_event());
|
|
||||||
}
|
|
||||||
if let Some(e) =
|
|
||||||
self.room_state_get(&invite_event.room_id, &StateEventType::RoomAvatar, "")?
|
|
||||||
{
|
|
||||||
state.push(e.to_stripped_state_event());
|
|
||||||
}
|
|
||||||
if let Some(e) =
|
|
||||||
self.room_state_get(&invite_event.room_id, &StateEventType::RoomName, "")?
|
|
||||||
{
|
|
||||||
state.push(e.to_stripped_state_event());
|
|
||||||
}
|
|
||||||
if let Some(e) = self.room_state_get(
|
|
||||||
&invite_event.room_id,
|
|
||||||
&StateEventType::RoomMember,
|
|
||||||
invite_event.sender.as_str(),
|
|
||||||
)? {
|
|
||||||
state.push(e.to_stripped_state_event());
|
|
||||||
}
|
|
||||||
|
|
||||||
state.push(invite_event.to_stripped_state_event());
|
|
||||||
Ok(state)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub fn set_room_state(&self, room_id: &RoomId, shortstatehash: u64) -> Result<()> {
|
|
||||||
self.roomid_shortstatehash
|
|
||||||
.insert(room_id.as_bytes(), &shortstatehash.to_be_bytes())?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
|
@ -100,16 +100,6 @@
|
||||||
.transpose()
|
.transpose()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the json of a pdu.
|
|
||||||
pub fn get_outlier_pdu_json(&self, event_id: &EventId) -> Result<Option<CanonicalJsonObject>> {
|
|
||||||
self.eventid_outlierpdu
|
|
||||||
.get(event_id.as_bytes())?
|
|
||||||
.map(|pdu| {
|
|
||||||
serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db."))
|
|
||||||
})
|
|
||||||
.transpose()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the json of a pdu.
|
/// Returns the json of a pdu.
|
||||||
pub fn get_non_outlier_pdu_json(
|
pub fn get_non_outlier_pdu_json(
|
||||||
&self,
|
&self,
|
||||||
|
@ -487,211 +477,6 @@
|
||||||
_ => {}
|
_ => {}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(pdu_id)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Creates a new persisted data unit and adds it to a room.
|
|
||||||
#[tracing::instrument(skip(self, db, _mutex_lock))]
|
|
||||||
pub fn build_and_append_pdu(
|
|
||||||
&self,
|
|
||||||
pdu_builder: PduBuilder,
|
|
||||||
sender: &UserId,
|
|
||||||
room_id: &RoomId,
|
|
||||||
db: &Database,
|
|
||||||
_mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room mutex
|
|
||||||
) -> Result<Arc<EventId>> {
|
|
||||||
let PduBuilder {
|
|
||||||
event_type,
|
|
||||||
content,
|
|
||||||
unsigned,
|
|
||||||
state_key,
|
|
||||||
redacts,
|
|
||||||
} = pdu_builder;
|
|
||||||
|
|
||||||
let prev_events = self
|
|
||||||
.get_pdu_leaves(room_id)?
|
|
||||||
.into_iter()
|
|
||||||
.take(20)
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
|
|
||||||
let create_event = self.room_state_get(room_id, &StateEventType::RoomCreate, "")?;
|
|
||||||
|
|
||||||
let create_event_content: Option<RoomCreateEventContent> = create_event
|
|
||||||
.as_ref()
|
|
||||||
.map(|create_event| {
|
|
||||||
serde_json::from_str(create_event.content.get()).map_err(|e| {
|
|
||||||
warn!("Invalid create event: {}", e);
|
|
||||||
Error::bad_database("Invalid create event in db.")
|
|
||||||
})
|
|
||||||
})
|
|
||||||
.transpose()?;
|
|
||||||
|
|
||||||
// If there was no create event yet, assume we are creating a room with the default
|
|
||||||
// version right now
|
|
||||||
let room_version_id = create_event_content
|
|
||||||
.map_or(db.globals.default_room_version(), |create_event| {
|
|
||||||
create_event.room_version
|
|
||||||
});
|
|
||||||
let room_version = RoomVersion::new(&room_version_id).expect("room version is supported");
|
|
||||||
|
|
||||||
let auth_events =
|
|
||||||
self.get_auth_events(room_id, &event_type, sender, state_key.as_deref(), &content)?;
|
|
||||||
|
|
||||||
// Our depth is the maximum depth of prev_events + 1
|
|
||||||
let depth = prev_events
|
|
||||||
.iter()
|
|
||||||
.filter_map(|event_id| Some(self.get_pdu(event_id).ok()??.depth))
|
|
||||||
.max()
|
|
||||||
.unwrap_or_else(|| uint!(0))
|
|
||||||
+ uint!(1);
|
|
||||||
|
|
||||||
let mut unsigned = unsigned.unwrap_or_default();
|
|
||||||
if let Some(state_key) = &state_key {
|
|
||||||
if let Some(prev_pdu) =
|
|
||||||
self.room_state_get(room_id, &event_type.to_string().into(), state_key)?
|
|
||||||
{
|
|
||||||
unsigned.insert(
|
|
||||||
"prev_content".to_owned(),
|
|
||||||
serde_json::from_str(prev_pdu.content.get()).expect("string is valid json"),
|
|
||||||
);
|
|
||||||
unsigned.insert(
|
|
||||||
"prev_sender".to_owned(),
|
|
||||||
serde_json::to_value(&prev_pdu.sender).expect("UserId::to_value always works"),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut pdu = PduEvent {
|
|
||||||
event_id: ruma::event_id!("$thiswillbefilledinlater").into(),
|
|
||||||
room_id: room_id.to_owned(),
|
|
||||||
sender: sender.to_owned(),
|
|
||||||
origin_server_ts: utils::millis_since_unix_epoch()
|
|
||||||
.try_into()
|
|
||||||
.expect("time is valid"),
|
|
||||||
kind: event_type,
|
|
||||||
content,
|
|
||||||
state_key,
|
|
||||||
prev_events,
|
|
||||||
depth,
|
|
||||||
auth_events: auth_events
|
|
||||||
.iter()
|
|
||||||
.map(|(_, pdu)| pdu.event_id.clone())
|
|
||||||
.collect(),
|
|
||||||
redacts,
|
|
||||||
unsigned: if unsigned.is_empty() {
|
|
||||||
None
|
|
||||||
} else {
|
|
||||||
Some(to_raw_value(&unsigned).expect("to_raw_value always works"))
|
|
||||||
},
|
|
||||||
hashes: EventHash {
|
|
||||||
sha256: "aaa".to_owned(),
|
|
||||||
},
|
|
||||||
signatures: None,
|
|
||||||
};
|
|
||||||
|
|
||||||
let auth_check = state_res::auth_check(
|
|
||||||
&room_version,
|
|
||||||
&pdu,
|
|
||||||
None::<PduEvent>, // TODO: third_party_invite
|
|
||||||
|k, s| auth_events.get(&(k.clone(), s.to_owned())),
|
|
||||||
)
|
|
||||||
.map_err(|e| {
|
|
||||||
error!("{:?}", e);
|
|
||||||
Error::bad_database("Auth check failed.")
|
|
||||||
})?;
|
|
||||||
|
|
||||||
if !auth_check {
|
|
||||||
return Err(Error::BadRequest(
|
|
||||||
ErrorKind::Forbidden,
|
|
||||||
"Event is not authorized.",
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Hash and sign
|
|
||||||
let mut pdu_json =
|
|
||||||
utils::to_canonical_object(&pdu).expect("event is valid, we just created it");
|
|
||||||
|
|
||||||
pdu_json.remove("event_id");
|
|
||||||
|
|
||||||
// Add origin because synapse likes that (and it's required in the spec)
|
|
||||||
pdu_json.insert(
|
|
||||||
"origin".to_owned(),
|
|
||||||
CanonicalJsonValue::String(db.globals.server_name().as_ref().to_owned()),
|
|
||||||
);
|
|
||||||
|
|
||||||
match ruma::signatures::hash_and_sign_event(
|
|
||||||
db.globals.server_name().as_str(),
|
|
||||||
db.globals.keypair(),
|
|
||||||
&mut pdu_json,
|
|
||||||
&room_version_id,
|
|
||||||
) {
|
|
||||||
Ok(_) => {}
|
|
||||||
Err(e) => {
|
|
||||||
return match e {
|
|
||||||
ruma::signatures::Error::PduSize => Err(Error::BadRequest(
|
|
||||||
ErrorKind::TooLarge,
|
|
||||||
"Message is too long",
|
|
||||||
)),
|
|
||||||
_ => Err(Error::BadRequest(
|
|
||||||
ErrorKind::Unknown,
|
|
||||||
"Signing event failed",
|
|
||||||
)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Generate event id
|
|
||||||
pdu.event_id = EventId::parse_arc(format!(
|
|
||||||
"${}",
|
|
||||||
ruma::signatures::reference_hash(&pdu_json, &room_version_id)
|
|
||||||
.expect("ruma can calculate reference hashes")
|
|
||||||
))
|
|
||||||
.expect("ruma's reference hashes are valid event ids");
|
|
||||||
|
|
||||||
pdu_json.insert(
|
|
||||||
"event_id".to_owned(),
|
|
||||||
CanonicalJsonValue::String(pdu.event_id.as_str().to_owned()),
|
|
||||||
);
|
|
||||||
|
|
||||||
// Generate short event id
|
|
||||||
let _shorteventid = self.get_or_create_shorteventid(&pdu.event_id, &db.globals)?;
|
|
||||||
|
|
||||||
// We append to state before appending the pdu, so we don't have a moment in time with the
|
|
||||||
// pdu without it's state. This is okay because append_pdu can't fail.
|
|
||||||
let statehashid = self.append_to_state(&pdu, &db.globals)?;
|
|
||||||
|
|
||||||
let pdu_id = self.append_pdu(
|
|
||||||
&pdu,
|
|
||||||
pdu_json,
|
|
||||||
// Since this PDU references all pdu_leaves we can update the leaves
|
|
||||||
// of the room
|
|
||||||
iter::once(&*pdu.event_id),
|
|
||||||
db,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
// We set the room state after inserting the pdu, so that we never have a moment in time
|
|
||||||
// where events in the current room state do not exist
|
|
||||||
self.set_room_state(room_id, statehashid)?;
|
|
||||||
|
|
||||||
let mut servers: HashSet<Box<ServerName>> =
|
|
||||||
self.room_servers(room_id).filter_map(|r| r.ok()).collect();
|
|
||||||
|
|
||||||
// In case we are kicking or banning a user, we need to inform their server of the change
|
|
||||||
if pdu.kind == RoomEventType::RoomMember {
|
|
||||||
if let Some(state_key_uid) = &pdu
|
|
||||||
.state_key
|
|
||||||
.as_ref()
|
|
||||||
.and_then(|state_key| UserId::parse(state_key.as_str()).ok())
|
|
||||||
{
|
|
||||||
servers.insert(Box::from(state_key_uid.server_name()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove our server from the server list since it will be added to it by room_servers() and/or the if statement above
|
|
||||||
servers.remove(db.globals.server_name());
|
|
||||||
|
|
||||||
db.sending.send_pdu(servers.into_iter(), &pdu_id)?;
|
|
||||||
|
|
||||||
for appservice in db.appservice.all()? {
|
for appservice in db.appservice.all()? {
|
||||||
if self.appservice_in_room(room_id, &appservice, db)? {
|
if self.appservice_in_room(room_id, &appservice, db)? {
|
||||||
db.sending.send_pdu_appservice(&appservice.0, &pdu_id)?;
|
db.sending.send_pdu_appservice(&appservice.0, &pdu_id)?;
|
||||||
|
@ -768,9 +553,268 @@
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Ok(pdu_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn create_hash_and_sign_event(
|
||||||
|
&self,
|
||||||
|
pdu_builder: PduBuilder,
|
||||||
|
sender: &UserId,
|
||||||
|
room_id: &RoomId,
|
||||||
|
db: &Database,
|
||||||
|
_mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex
|
||||||
|
) -> (PduEvent, CanonicalJsonObj) {
|
||||||
|
let PduBuilder {
|
||||||
|
event_type,
|
||||||
|
content,
|
||||||
|
unsigned,
|
||||||
|
state_key,
|
||||||
|
redacts,
|
||||||
|
} = pdu_builder;
|
||||||
|
|
||||||
|
let prev_events: Vec<_> = db
|
||||||
|
.rooms
|
||||||
|
.get_pdu_leaves(room_id)?
|
||||||
|
.into_iter()
|
||||||
|
.take(20)
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let create_event = db
|
||||||
|
.rooms
|
||||||
|
.room_state_get(room_id, &StateEventType::RoomCreate, "")?;
|
||||||
|
|
||||||
|
let create_event_content: Option<RoomCreateEventContent> = create_event
|
||||||
|
.as_ref()
|
||||||
|
.map(|create_event| {
|
||||||
|
serde_json::from_str(create_event.content.get()).map_err(|e| {
|
||||||
|
warn!("Invalid create event: {}", e);
|
||||||
|
Error::bad_database("Invalid create event in db.")
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.transpose()?;
|
||||||
|
|
||||||
|
// If there was no create event yet, assume we are creating a room with the default
|
||||||
|
// version right now
|
||||||
|
let room_version_id = create_event_content
|
||||||
|
.map_or(db.globals.default_room_version(), |create_event| {
|
||||||
|
create_event.room_version
|
||||||
|
});
|
||||||
|
let room_version =
|
||||||
|
RoomVersion::new(&room_version_id).expect("room version is supported");
|
||||||
|
|
||||||
|
let auth_events =
|
||||||
|
self.get_auth_events(room_id, &event_type, sender, state_key.as_deref(), &content)?;
|
||||||
|
|
||||||
|
// Our depth is the maximum depth of prev_events + 1
|
||||||
|
let depth = prev_events
|
||||||
|
.iter()
|
||||||
|
.filter_map(|event_id| Some(db.rooms.get_pdu(event_id).ok()??.depth))
|
||||||
|
.max()
|
||||||
|
.unwrap_or_else(|| uint!(0))
|
||||||
|
+ uint!(1);
|
||||||
|
|
||||||
|
let mut unsigned = unsigned.unwrap_or_default();
|
||||||
|
|
||||||
|
if let Some(state_key) = &state_key {
|
||||||
|
if let Some(prev_pdu) =
|
||||||
|
self.room_state_get(room_id, &event_type.to_string().into(), state_key)?
|
||||||
|
{
|
||||||
|
unsigned.insert(
|
||||||
|
"prev_content".to_owned(),
|
||||||
|
serde_json::from_str(prev_pdu.content.get()).expect("string is valid json"),
|
||||||
|
);
|
||||||
|
unsigned.insert(
|
||||||
|
"prev_sender".to_owned(),
|
||||||
|
serde_json::to_value(&prev_pdu.sender).expect("UserId::to_value always works"),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let pdu = PduEvent {
|
||||||
|
event_id: ruma::event_id!("$thiswillbefilledinlater").into(),
|
||||||
|
room_id: room_id.to_owned(),
|
||||||
|
sender: sender_user.to_owned(),
|
||||||
|
origin_server_ts: utils::millis_since_unix_epoch()
|
||||||
|
.try_into()
|
||||||
|
.expect("time is valid"),
|
||||||
|
kind: event_type,
|
||||||
|
content,
|
||||||
|
state_key,
|
||||||
|
prev_events,
|
||||||
|
depth,
|
||||||
|
auth_events: auth_events
|
||||||
|
.iter()
|
||||||
|
.map(|(_, pdu)| pdu.event_id.clone())
|
||||||
|
.collect(),
|
||||||
|
redacts,
|
||||||
|
unsigned: if unsigned.is_empty() {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some(to_raw_value(&unsigned).expect("to_raw_value always works"))
|
||||||
|
},
|
||||||
|
hashes: EventHash {
|
||||||
|
sha256: "aaa".to_owned(),
|
||||||
|
},
|
||||||
|
signatures: None,
|
||||||
|
};
|
||||||
|
|
||||||
|
let auth_check = state_res::auth_check(
|
||||||
|
&room_version,
|
||||||
|
&pdu,
|
||||||
|
None::<PduEvent>, // TODO: third_party_invite
|
||||||
|
|k, s| auth_events.get(&(k.clone(), s.to_owned())),
|
||||||
|
)
|
||||||
|
.map_err(|e| {
|
||||||
|
error!("{:?}", e);
|
||||||
|
Error::bad_database("Auth check failed.")
|
||||||
|
})?;
|
||||||
|
|
||||||
|
if !auth_check {
|
||||||
|
return Err(Error::BadRequest(
|
||||||
|
ErrorKind::Forbidden,
|
||||||
|
"Event is not authorized.",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Hash and sign
|
||||||
|
let mut pdu_json =
|
||||||
|
utils::to_canonical_object(&pdu).expect("event is valid, we just created it");
|
||||||
|
|
||||||
|
pdu_json.remove("event_id");
|
||||||
|
|
||||||
|
// Add origin because synapse likes that (and it's required in the spec)
|
||||||
|
pdu_json.insert(
|
||||||
|
"origin".to_owned(),
|
||||||
|
to_canonical_value(db.globals.server_name())
|
||||||
|
.expect("server name is a valid CanonicalJsonValue"),
|
||||||
|
);
|
||||||
|
|
||||||
|
match ruma::signatures::hash_and_sign_event(
|
||||||
|
db.globals.server_name().as_str(),
|
||||||
|
db.globals.keypair(),
|
||||||
|
&mut pdu_json,
|
||||||
|
&room_version_id,
|
||||||
|
) {
|
||||||
|
Ok(_) => {}
|
||||||
|
Err(e) => {
|
||||||
|
return match e {
|
||||||
|
ruma::signatures::Error::PduSize => Err(Error::BadRequest(
|
||||||
|
ErrorKind::TooLarge,
|
||||||
|
"Message is too long",
|
||||||
|
)),
|
||||||
|
_ => Err(Error::BadRequest(
|
||||||
|
ErrorKind::Unknown,
|
||||||
|
"Signing event failed",
|
||||||
|
)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Generate event id
|
||||||
|
pdu.event_id = EventId::parse_arc(format!(
|
||||||
|
"${}",
|
||||||
|
ruma::signatures::reference_hash(&pdu_json, &room_version_id)
|
||||||
|
.expect("ruma can calculate reference hashes")
|
||||||
|
))
|
||||||
|
.expect("ruma's reference hashes are valid event ids");
|
||||||
|
|
||||||
|
pdu_json.insert(
|
||||||
|
"event_id".to_owned(),
|
||||||
|
CanonicalJsonValue::String(pdu.event_id.as_str().to_owned()),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Generate short event id
|
||||||
|
let _shorteventid = self.get_or_create_shorteventid(&pdu.event_id, &db.globals)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Creates a new persisted data unit and adds it to a room. This function takes a
|
||||||
|
/// roomid_mutex_state, meaning that only this function is able to mutate the room state.
|
||||||
|
#[tracing::instrument(skip(self, db, _mutex_lock))]
|
||||||
|
pub fn build_and_append_pdu(
|
||||||
|
&self,
|
||||||
|
pdu_builder: PduBuilder,
|
||||||
|
sender: &UserId,
|
||||||
|
room_id: &RoomId,
|
||||||
|
db: &Database,
|
||||||
|
_mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex
|
||||||
|
) -> Result<Arc<EventId>> {
|
||||||
|
|
||||||
|
let (pdu, pdu_json) = create_hash_and_sign_event()?;
|
||||||
|
|
||||||
|
|
||||||
|
// We append to state before appending the pdu, so we don't have a moment in time with the
|
||||||
|
// pdu without it's state. This is okay because append_pdu can't fail.
|
||||||
|
let statehashid = self.append_to_state(&pdu, &db.globals)?;
|
||||||
|
|
||||||
|
let pdu_id = self.append_pdu(
|
||||||
|
&pdu,
|
||||||
|
pdu_json,
|
||||||
|
// Since this PDU references all pdu_leaves we can update the leaves
|
||||||
|
// of the room
|
||||||
|
iter::once(&*pdu.event_id),
|
||||||
|
db,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
// We set the room state after inserting the pdu, so that we never have a moment in time
|
||||||
|
// where events in the current room state do not exist
|
||||||
|
self.set_room_state(room_id, statehashid)?;
|
||||||
|
|
||||||
|
let mut servers: HashSet<Box<ServerName>> =
|
||||||
|
self.room_servers(room_id).filter_map(|r| r.ok()).collect();
|
||||||
|
|
||||||
|
// In case we are kicking or banning a user, we need to inform their server of the change
|
||||||
|
if pdu.kind == RoomEventType::RoomMember {
|
||||||
|
if let Some(state_key_uid) = &pdu
|
||||||
|
.state_key
|
||||||
|
.as_ref()
|
||||||
|
.and_then(|state_key| UserId::parse(state_key.as_str()).ok())
|
||||||
|
{
|
||||||
|
servers.insert(Box::from(state_key_uid.server_name()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove our server from the server list since it will be added to it by room_servers() and/or the if statement above
|
||||||
|
servers.remove(db.globals.server_name());
|
||||||
|
|
||||||
|
db.sending.send_pdu(servers.into_iter(), &pdu_id)?;
|
||||||
|
|
||||||
Ok(pdu.event_id)
|
Ok(pdu.event_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Append the incoming event setting the state snapshot to the state from the
|
||||||
|
/// server that sent the event.
|
||||||
|
#[tracing::instrument(skip_all)]
|
||||||
|
fn append_incoming_pdu<'a>(
|
||||||
|
db: &Database,
|
||||||
|
pdu: &PduEvent,
|
||||||
|
pdu_json: CanonicalJsonObject,
|
||||||
|
new_room_leaves: impl IntoIterator<Item = &'a EventId> + Clone + Debug,
|
||||||
|
state_ids_compressed: HashSet<CompressedStateEvent>,
|
||||||
|
soft_fail: bool,
|
||||||
|
_mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex
|
||||||
|
) -> Result<Option<Vec<u8>>> {
|
||||||
|
// We append to state before appending the pdu, so we don't have a moment in time with the
|
||||||
|
// pdu without it's state. This is okay because append_pdu can't fail.
|
||||||
|
db.rooms.set_event_state(
|
||||||
|
&pdu.event_id,
|
||||||
|
&pdu.room_id,
|
||||||
|
state_ids_compressed,
|
||||||
|
&db.globals,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
if soft_fail {
|
||||||
|
db.rooms
|
||||||
|
.mark_as_referenced(&pdu.room_id, &pdu.prev_events)?;
|
||||||
|
db.rooms.replace_pdu_leaves(&pdu.room_id, new_room_leaves)?;
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
|
||||||
|
let pdu_id = db.rooms.append_pdu(pdu, pdu_json, new_room_leaves, db)?;
|
||||||
|
|
||||||
|
Ok(Some(pdu_id))
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns an iterator over all PDUs in a room.
|
/// Returns an iterator over all PDUs in a room.
|
||||||
#[tracing::instrument(skip(self))]
|
#[tracing::instrument(skip(self))]
|
||||||
pub fn all_pdus<'a>(
|
pub fn all_pdus<'a>(
|
Loading…
Reference in a new issue