From e493b3a60d20ec5f8876106b15bcadc5d700fee2 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Fri, 12 Apr 2024 17:24:31 -0700 Subject: [PATCH] split prev_event loop body; fetch state; dedup room version procurement. Signed-off-by: Jason Volk --- src/service/rooms/event_handler/mod.rs | 785 +++++++++++++------------ 1 file changed, 407 insertions(+), 378 deletions(-) diff --git a/src/service/rooms/event_handler/mod.rs b/src/service/rooms/event_handler/mod.rs index da119db5..58aac009 100644 --- a/src/service/rooms/event_handler/mod.rs +++ b/src/service/rooms/event_handler/mod.rs @@ -1,4 +1,5 @@ use std::{ + cmp, collections::{hash_map, HashSet}, pin::Pin, time::{Duration, Instant}, @@ -73,41 +74,36 @@ impl Service { value: BTreeMap, is_timeline_event: bool, pub_key_map: &'a RwLock>>, ) -> Result>> { - // 0. Check the server is in the room + // 1. Skip the PDU if we already have it as a timeline event + if let Some(pdu_id) = services().rooms.timeline.get_pdu_id(event_id)? { + return Ok(Some(pdu_id)); + } + + // 1.1 Check the server is in the room if !services().rooms.metadata.exists(room_id)? { return Err(Error::BadRequest(ErrorKind::NotFound, "Room is unknown to this server")); } + // 1.2 Check if the room is disabled if services().rooms.metadata.is_disabled(room_id)? { - info!( - "Federaton of room {room_id} is currently disabled on this server. Request by origin {origin} and \ - event ID {event_id}" - ); return Err(Error::BadRequest( ErrorKind::forbidden(), "Federation of this room is currently disabled on this server.", )); } + // 1.3 Check room ACL services().rooms.event_handler.acl_check(origin, room_id)?; - // 1. Skip the PDU if we already have it as a timeline event - if let Some(pdu_id) = services().rooms.timeline.get_pdu_id(event_id)? { - return Ok(Some(pdu_id)); - } - + // Fetch create event let create_event = services() .rooms .state_accessor .room_state_get(room_id, &StateEventType::RoomCreate, "")? .ok_or_else(|| Error::bad_database("Failed to find create event in db."))?; - let create_event_content: RoomCreateEventContent = - serde_json::from_str(create_event.content.get()).map_err(|e| { - error!("Invalid create event: {}", e); - Error::BadDatabase("Invalid create event in db") - })?; - let room_version_id = &create_event_content.room_version; + // Procure the room version + let room_version_id = self.get_room_version_id(&create_event)?; let first_pdu_in_room = services() .rooms @@ -118,13 +114,13 @@ impl Service { let (incoming_pdu, val) = self .handle_outlier_pdu(origin, &create_event, event_id, room_id, value, false, pub_key_map) .await?; + self.check_room_id(room_id, &incoming_pdu)?; // 8. if not timeline event: stop if !is_timeline_event { return Ok(None); } - // Skip old events if incoming_pdu.origin_server_ts < first_pdu_in_room.origin_server_ts { return Ok(None); @@ -133,88 +129,33 @@ impl Service { // 9. Fetch any missing prev events doing all checks listed here starting at 1. // These are timeline events let (sorted_prev_events, mut eventid_info) = self - .fetch_unknown_prev_events( + .fetch_prev( origin, &create_event, room_id, - room_version_id, + &room_version_id, pub_key_map, incoming_pdu.prev_events.clone(), ) .await?; - let mut errors = 0; debug!(events = ?sorted_prev_events, "Got previous events"); for prev_id in sorted_prev_events { - // Check for disabled again because it might have changed - if services().rooms.metadata.is_disabled(room_id)? { - info!( - "Federaton of room {room_id} is currently disabled on this server. Request by origin {origin} and \ - event ID {event_id}" - ); - return Err(Error::BadRequest( - ErrorKind::forbidden(), - "Federation of this room is currently disabled on this server.", - )); - } - - if let Some((time, tries)) = services() - .globals - .bad_event_ratelimiter - .read() + match self + .handle_prev_pdu( + origin, + event_id, + room_id, + pub_key_map, + &mut eventid_info, + &create_event, + &first_pdu_in_room, + &prev_id, + ) .await - .get(&*prev_id) { - // Exponential backoff - let mut min_elapsed_duration = Duration::from_secs(5 * 60) * (*tries) * (*tries); - if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) { - min_elapsed_duration = Duration::from_secs(60 * 60 * 24); - } - - if time.elapsed() < min_elapsed_duration { - info!("Backing off from {}", prev_id); - continue; - } - } - - if errors >= 5 { - // Timeout other events - match services() - .globals - .bad_event_ratelimiter - .write() - .await - .entry((*prev_id).to_owned()) - { - hash_map::Entry::Vacant(e) => { - e.insert((Instant::now(), 1)); - }, - hash_map::Entry::Occupied(mut e) => { - *e.get_mut() = (Instant::now(), e.get().1 + 1); - }, - } - continue; - } - - if let Some((pdu, json)) = eventid_info.remove(&*prev_id) { - // Skip old events - if pdu.origin_server_ts < first_pdu_in_room.origin_server_ts { - continue; - } - - let start_time = Instant::now(); - services() - .globals - .roomid_federationhandletime - .write() - .await - .insert(room_id.to_owned(), ((*prev_id).to_owned(), start_time)); - - if let Err(e) = self - .upgrade_outlier_to_timeline_pdu(pdu, json, &create_event, origin, room_id, pub_key_map) - .await - { - errors += 1; + Ok(()) => continue, + Err(e) => { warn!("Prev event {} failed: {}", prev_id, e); match services() .globals @@ -229,26 +170,12 @@ impl Service { hash_map::Entry::Occupied(mut e) => { *e.get_mut() = (Instant::now(), e.get().1 + 1); }, - } - } - let elapsed = start_time.elapsed(); - services() - .globals - .roomid_federationhandletime - .write() - .await - .remove(&room_id.to_owned()); - debug!( - "Handling prev event {} took {}m{}s", - prev_id, - elapsed.as_secs() / 60, - elapsed.as_secs() % 60 - ); + }; + }, } } // Done with prev events, now handling the incoming event - let start_time = Instant::now(); services() .globals @@ -256,11 +183,13 @@ impl Service { .write() .await .insert(room_id.to_owned(), (event_id.to_owned(), start_time)); + let r = services() .rooms .event_handler .upgrade_outlier_to_timeline_pdu(incoming_pdu, val, &create_event, origin, room_id, pub_key_map) .await; + services() .globals .roomid_federationhandletime @@ -271,6 +200,84 @@ impl Service { r } + #[allow(clippy::type_complexity)] + #[allow(clippy::too_many_arguments)] + #[tracing::instrument( + skip(self, origin, event_id, room_id, pub_key_map, eventid_info, create_event, first_pdu_in_room), + name = "prev" + )] + pub(crate) async fn handle_prev_pdu<'a>( + &self, origin: &'a ServerName, event_id: &'a EventId, room_id: &'a RoomId, + pub_key_map: &'a RwLock>>, + eventid_info: &mut HashMap, (Arc, BTreeMap)>, + create_event: &Arc, first_pdu_in_room: &Arc, prev_id: &EventId, + ) -> Result<()> { + // Check for disabled again because it might have changed + if services().rooms.metadata.is_disabled(room_id)? { + debug!( + "Federaton of room {room_id} is currently disabled on this server. Request by origin {origin} and \ + event ID {event_id}" + ); + return Err(Error::BadRequest( + ErrorKind::forbidden(), + "Federation of this room is currently disabled on this server.", + )); + } + + if let Some((time, tries)) = services() + .globals + .bad_event_ratelimiter + .read() + .await + .get(prev_id) + { + // Exponential backoff + const MAX_DURATION: Duration = Duration::from_secs(60 * 60 * 24); + let min_duration = cmp::min(MAX_DURATION, Duration::from_secs(5 * 60) * (*tries) * (*tries)); + let duration = time.elapsed(); + if duration < min_duration { + debug!( + duration = ?duration, + min_duration = ?min_duration, + "Backing off from prev_event" + ); + return Ok(()); + } + } + + if let Some((pdu, json)) = eventid_info.remove(prev_id) { + // Skip old events + if pdu.origin_server_ts < first_pdu_in_room.origin_server_ts { + return Ok(()); + } + + let start_time = Instant::now(); + services() + .globals + .roomid_federationhandletime + .write() + .await + .insert(room_id.to_owned(), ((*prev_id).to_owned(), start_time)); + + self.upgrade_outlier_to_timeline_pdu(pdu, json, create_event, origin, room_id, pub_key_map) + .await?; + + services() + .globals + .roomid_federationhandletime + .write() + .await + .remove(&room_id.to_owned()); + + debug!( + elapsed = ?start_time.elapsed(), + "Handled prev_event", + ); + } + + Ok(()) + } + #[allow(clippy::too_many_arguments)] fn handle_outlier_pdu<'a>( &'a self, origin: &'a ServerName, create_event: &'a PduEvent, event_id: &'a EventId, room_id: &'a RoomId, @@ -285,17 +292,10 @@ impl Service { // 2. Check signatures, otherwise drop // 3. check content hash, redact if doesn't match - let create_event_content: RoomCreateEventContent = serde_json::from_str(create_event.content.get()) - .map_err(|e| { - error!("Invalid create event: {}", e); - Error::BadDatabase("Invalid create event in db") - })?; - - let room_version_id = &create_event_content.room_version; - let room_version = RoomVersion::new(room_version_id).expect("room version is supported"); + let room_version_id = self.get_room_version_id(create_event)?; let guard = pub_key_map.read().await; - let mut val = match ruma::signatures::verify_event(&guard, &value, room_version_id) { + let mut val = match ruma::signatures::verify_event(&guard, &value, &room_version_id) { Err(e) => { // Drop warn!("Dropping bad event {}: {}", event_id, e,); @@ -304,7 +304,7 @@ impl Service { Ok(ruma::signatures::Verified::Signatures) => { // Redact warn!("Calculated hash does not match: {}", event_id); - let Ok(obj) = ruma::canonical_json::redact(value, room_version_id, None) else { + let Ok(obj) = ruma::canonical_json::redact(value, &room_version_id, None) else { return Err(Error::BadRequest(ErrorKind::InvalidParam, "Redaction failed")); }; @@ -339,7 +339,7 @@ impl Service { // 5. Reject "due to auth events" if can't get all the auth events or some of // the auth events are also rejected "due to auth events" // NOTE: Step 5 is not applied anymore because it failed too often - debug!(event_id = ?incoming_pdu.event_id, "Fetching auth events"); + debug!("Fetching auth events"); self.fetch_and_handle_outliers( origin, &incoming_pdu @@ -349,7 +349,7 @@ impl Service { .collect::>(), create_event, room_id, - room_version_id, + &room_version_id, pub_key_map, ) .await; @@ -357,8 +357,7 @@ impl Service { // 6. Reject "due to auth events" if the event doesn't pass auth based on the // auth events - debug!("Auth check for {} based on auth events", incoming_pdu.event_id); - + debug!("Checking based on auth events"); // Build map of auth events let mut auth_events = HashMap::new(); for id in &incoming_pdu.auth_events { @@ -402,7 +401,7 @@ impl Service { } if !state_res::event_auth::auth_check( - &room_version, + &self.to_room_version(&room_version_id), &incoming_pdu, None::, // TODO: third party invite |k, s| auth_events.get(&(k.to_string().into(), s.to_owned())), @@ -443,257 +442,39 @@ impl Service { return Err(Error::BadRequest(ErrorKind::InvalidParam, "Event has been soft failed")); } - debug!("Upgrading {} to timeline pdu", incoming_pdu.event_id); + debug!("Upgrading to timeline pdu"); let timer = tokio::time::Instant::now(); - - let create_event_content: RoomCreateEventContent = - serde_json::from_str(create_event.content.get()).map_err(|e| { - warn!("Invalid create event: {}", e); - Error::BadDatabase("Invalid create event in db") - })?; - - let room_version_id = &create_event_content.room_version; - let room_version = RoomVersion::new(room_version_id).expect("room version is supported"); + let room_version_id = self.get_room_version_id(create_event)?; // 10. Fetch missing state and auth chain events by calling /state_ids at // backwards extremities doing all the checks in this list starting at 1. // These are not timeline events. - // TODO: if we know the prev_events of the incoming event we can avoid the - // request and build the state from a known point and resolve if > 1 prev_event - - debug!("Requesting state at event"); - let mut state_at_incoming_event = None; - - if incoming_pdu.prev_events.len() == 1 { - let prev_event = &*incoming_pdu.prev_events[0]; - let prev_event_sstatehash = services() - .rooms - .state_accessor - .pdu_shortstatehash(prev_event)?; - - let state = if let Some(shortstatehash) = prev_event_sstatehash { - Some( - services() - .rooms - .state_accessor - .state_full_ids(shortstatehash) - .await, - ) - } else { - None - }; - - if let Some(Ok(mut state)) = state { - debug!("Using cached state"); - let prev_pdu = services() - .rooms - .timeline - .get_pdu(prev_event) - .ok() - .flatten() - .ok_or_else(|| Error::bad_database("Could not find prev event, but we know the state."))?; - - if let Some(state_key) = &prev_pdu.state_key { - let shortstatekey = services() - .rooms - .short - .get_or_create_shortstatekey(&prev_pdu.kind.to_string().into(), state_key)?; - - state.insert(shortstatekey, Arc::from(prev_event)); - // Now it's the state after the pdu - } - - state_at_incoming_event = Some(state); - } + debug!("Resolving state at event"); + let mut state_at_incoming_event = if incoming_pdu.prev_events.len() == 1 { + self.state_at_incoming_degree_one(&incoming_pdu).await? } else { - debug!("Calculating state at event using state res"); - let mut extremity_sstatehashes = HashMap::new(); - - let mut okay = true; - for prev_eventid in &incoming_pdu.prev_events { - let Ok(Some(prev_event)) = services().rooms.timeline.get_pdu(prev_eventid) else { - okay = false; - break; - }; - - let Ok(Some(sstatehash)) = services() - .rooms - .state_accessor - .pdu_shortstatehash(prev_eventid) - else { - okay = false; - break; - }; - - extremity_sstatehashes.insert(sstatehash, prev_event); - } - - if okay { - let mut fork_states = Vec::with_capacity(extremity_sstatehashes.len()); - let mut auth_chain_sets = Vec::with_capacity(extremity_sstatehashes.len()); - - for (sstatehash, prev_event) in extremity_sstatehashes { - let mut leaf_state: HashMap<_, _> = services() - .rooms - .state_accessor - .state_full_ids(sstatehash) - .await?; - - if let Some(state_key) = &prev_event.state_key { - let shortstatekey = services() - .rooms - .short - .get_or_create_shortstatekey(&prev_event.kind.to_string().into(), state_key)?; - leaf_state.insert(shortstatekey, Arc::from(&*prev_event.event_id)); - // Now it's the state after the pdu - } - - let mut state = StateMap::with_capacity(leaf_state.len()); - let mut starting_events = Vec::with_capacity(leaf_state.len()); - - for (k, id) in leaf_state { - if let Ok((ty, st_key)) = services().rooms.short.get_statekey_from_short(k) { - // FIXME: Undo .to_string().into() when StateMap - // is updated to use StateEventType - state.insert((ty.to_string().into(), st_key), id.clone()); - } else { - warn!("Failed to get_statekey_from_short."); - } - starting_events.push(id); - } - - auth_chain_sets.push( - services() - .rooms - .auth_chain - .event_ids_iter(room_id, starting_events) - .await? - .collect(), - ); - - fork_states.push(state); - } - - let lock = services().globals.stateres_mutex.lock(); - - let result = state_res::resolve(room_version_id, &fork_states, auth_chain_sets, |id| { - let res = services().rooms.timeline.get_pdu(id); - if let Err(e) = &res { - error!("Failed to fetch event: {}", e); - } - res.ok().flatten() - }); - drop(lock); - - state_at_incoming_event = match result { - Ok(new_state) => Some( - new_state - .into_iter() - .map(|((event_type, state_key), event_id)| { - let shortstatekey = services() - .rooms - .short - .get_or_create_shortstatekey(&event_type.to_string().into(), &state_key)?; - Ok((shortstatekey, event_id)) - }) - .collect::>()?, - ), - Err(e) => { - warn!( - "State resolution on prev events failed, either an event could not be found or \ - deserialization: {}", - e - ); - None - }, - } - } - } + self.state_at_incoming_resolved(&incoming_pdu, room_id, &room_version_id) + .await? + }; if state_at_incoming_event.is_none() { - debug!("Calling /state_ids"); - // Call /state_ids to find out what the state at this pdu is. We trust the - // server's response to some extend, but we still do a lot of checks on the - // events - match services() - .sending - .send_federation_request( + state_at_incoming_event = self + .fetch_state( origin, - get_room_state_ids::v1::Request { - room_id: room_id.to_owned(), - event_id: (*incoming_pdu.event_id).to_owned(), - }, + create_event, + room_id, + &room_version_id, + pub_key_map, + &incoming_pdu.event_id, ) - .await - { - Ok(res) => { - debug!("Fetching state events at event."); - - let collect = res - .pdu_ids - .iter() - .map(|x| Arc::from(&**x)) - .collect::>(); - - let state_vec = self - .fetch_and_handle_outliers( - origin, - &collect, - create_event, - room_id, - room_version_id, - pub_key_map, - ) - .await; - - let mut state: HashMap<_, Arc> = HashMap::new(); - for (pdu, _) in state_vec { - let state_key = pdu - .state_key - .clone() - .ok_or_else(|| Error::bad_database("Found non-state pdu in state events."))?; - - let shortstatekey = services() - .rooms - .short - .get_or_create_shortstatekey(&pdu.kind.to_string().into(), &state_key)?; - - match state.entry(shortstatekey) { - hash_map::Entry::Vacant(v) => { - v.insert(Arc::from(&*pdu.event_id)); - }, - hash_map::Entry::Occupied(_) => { - return Err(Error::bad_database( - "State event's type and state_key combination exists multiple times.", - )) - }, - } - } - - // The original create event must still be in the state - let create_shortstatekey = services() - .rooms - .short - .get_shortstatekey(&StateEventType::RoomCreate, "")? - .expect("Room exists"); - - if state.get(&create_shortstatekey).map(AsRef::as_ref) != Some(&create_event.event_id) { - return Err(Error::bad_database("Incoming event refers to wrong create event.")); - } - - state_at_incoming_event = Some(state); - }, - Err(e) => { - warn!("Fetching state for event failed: {}", e); - return Err(e); - }, - }; + .await?; } let state_at_incoming_event = state_at_incoming_event.expect("we always set this to some above"); + let room_version = self.to_room_version(&room_version_id); - debug!("Starting auth check"); + debug!("Performing auth check"); // 11. Check the auth of the event passes based on the state of the event let check_result = state_res::event_auth::auth_check( &room_version, @@ -715,9 +496,8 @@ impl Service { if !check_result { return Err(Error::bad_database("Event has failed auth check with state at the event.")); } - debug!("Auth check succeeded"); - // Soft fail check before doing state res + debug!("Gathering auth events"); let auth_events = services().rooms.state.get_auth_events( room_id, &incoming_pdu.kind, @@ -726,6 +506,8 @@ impl Service { &incoming_pdu.content, )?; + // Soft fail check before doing state res + debug!("Performing soft-fail check"); let soft_fail = !state_res::event_auth::auth_check(&room_version, &incoming_pdu, None::, |k, s| { auth_events.get(&(k.clone(), s.to_owned())) }) @@ -743,13 +525,15 @@ impl Service { .entry(room_id.to_owned()) .or_default(), ); + + debug!("Locking the room"); let state_lock = mutex_state.lock().await; // Now we calculate the set of extremities this room has after the incoming // event has been applied. We start with the previous extremities (aka leaves) debug!("Calculating extremities"); let mut extremities = services().rooms.state.get_forward_extremities(room_id)?; - debug!("Amount of forward extremities in room {room_id}: {extremities:?}"); + debug!("Calculated {} extremities", extremities.len()); // Remove any forward extremities that are referenced by this incoming event's // prev_events @@ -769,8 +553,7 @@ impl Service { Ok(true) ) }); - - debug!("Compressing state at event"); + debug!("Retained {} extremities. Compressing state", extremities.len()); let state_ids_compressed = Arc::new( state_at_incoming_event .iter() @@ -784,7 +567,7 @@ impl Service { ); if incoming_pdu.state_key.is_some() { - debug!("Preparing for stateres to derive new room state"); + debug!("Event is a state-event. Deriving new room state"); // We also add state after incoming event to the fork states let mut state_after = state_at_incoming_event.clone(); @@ -798,12 +581,11 @@ impl Service { } let new_room_state = self - .resolve_state(room_id, room_version_id, state_after) + .resolve_state(room_id, &room_version_id, state_after) .await?; // Set the new room state to the resolved state debug!("Forcing new room state"); - let (sstatehash, new, removed) = services() .rooms .state_compressor @@ -818,9 +600,8 @@ impl Service { // 14. Check if the event passes auth based on the "current state" of the room, // if not soft fail it - debug!("Starting soft fail auth check"); - if soft_fail { + debug!("Soft failing event"); services() .rooms .timeline @@ -840,6 +621,7 @@ impl Service { .rooms .pdu_metadata .mark_event_soft_failed(&incoming_pdu.event_id)?; + return Err(Error::BadRequest(ErrorKind::InvalidParam, "Event has been soft failed")); } @@ -849,7 +631,6 @@ impl Service { // Now that the event has passed all auth it is added into the timeline. // We use the `state_at_event` instead of `state_after` so we accurately // represent the state for this event. - let pdu_id = services() .rooms .timeline @@ -863,14 +644,13 @@ impl Service { ) .await?; - let elapsed = timer.elapsed(); + // Event has passed all auth/stateres checks + drop(state_lock); debug!( - elapsed = ?elapsed, + elapsed = ?timer.elapsed(), "Appended incoming pdu", ); - // Event has passed all auth/stateres checks - drop(state_lock); Ok(pdu_id) } @@ -905,7 +685,6 @@ impl Service { } debug!("Loading fork states"); - let fork_states: Vec<_> = fork_states .into_iter() .map(|map| { @@ -922,9 +701,9 @@ impl Service { }) .collect(); - debug!("Resolving state"); - let lock = services().globals.stateres_mutex.lock(); + + debug!("Resolving state"); let state_resolve = state_res::resolve(room_version_id, &fork_states, auth_chain_sets, |id| { let res = services().rooms.timeline.get_pdu(id); if let Err(e) = &res { @@ -946,7 +725,6 @@ impl Service { drop(lock); debug!("State resolution done. Compressing state"); - let new_room_state = state .into_iter() .map(|((event_type, state_key), event_id)| { @@ -964,6 +742,240 @@ impl Service { Ok(Arc::new(new_room_state)) } + // TODO: if we know the prev_events of the incoming event we can avoid the + // request and build the state from a known point and resolve if > 1 prev_event + #[tracing::instrument(skip_all, name = "state")] + pub async fn state_at_incoming_degree_one( + &self, incoming_pdu: &Arc, + ) -> Result>>> { + let prev_event = &*incoming_pdu.prev_events[0]; + let prev_event_sstatehash = services() + .rooms + .state_accessor + .pdu_shortstatehash(prev_event)?; + + let state = if let Some(shortstatehash) = prev_event_sstatehash { + Some( + services() + .rooms + .state_accessor + .state_full_ids(shortstatehash) + .await, + ) + } else { + None + }; + + if let Some(Ok(mut state)) = state { + debug!("Using cached state"); + let prev_pdu = services() + .rooms + .timeline + .get_pdu(prev_event) + .ok() + .flatten() + .ok_or_else(|| Error::bad_database("Could not find prev event, but we know the state."))?; + + if let Some(state_key) = &prev_pdu.state_key { + let shortstatekey = services() + .rooms + .short + .get_or_create_shortstatekey(&prev_pdu.kind.to_string().into(), state_key)?; + + state.insert(shortstatekey, Arc::from(prev_event)); + // Now it's the state after the pdu + } + + return Ok(Some(state)); + } + + Ok(None) + } + + #[tracing::instrument(skip_all, name = "state")] + pub async fn state_at_incoming_resolved( + &self, incoming_pdu: &Arc, room_id: &RoomId, room_version_id: &RoomVersionId, + ) -> Result>>> { + debug!("Calculating state at event using state res"); + let mut extremity_sstatehashes = HashMap::new(); + + let mut okay = true; + for prev_eventid in &incoming_pdu.prev_events { + let Ok(Some(prev_event)) = services().rooms.timeline.get_pdu(prev_eventid) else { + okay = false; + break; + }; + + let Ok(Some(sstatehash)) = services() + .rooms + .state_accessor + .pdu_shortstatehash(prev_eventid) + else { + okay = false; + break; + }; + + extremity_sstatehashes.insert(sstatehash, prev_event); + } + + if !okay { + return Ok(None); + } + + let mut fork_states = Vec::with_capacity(extremity_sstatehashes.len()); + let mut auth_chain_sets = Vec::with_capacity(extremity_sstatehashes.len()); + + for (sstatehash, prev_event) in extremity_sstatehashes { + let mut leaf_state: HashMap<_, _> = services() + .rooms + .state_accessor + .state_full_ids(sstatehash) + .await?; + + if let Some(state_key) = &prev_event.state_key { + let shortstatekey = services() + .rooms + .short + .get_or_create_shortstatekey(&prev_event.kind.to_string().into(), state_key)?; + leaf_state.insert(shortstatekey, Arc::from(&*prev_event.event_id)); + // Now it's the state after the pdu + } + + let mut state = StateMap::with_capacity(leaf_state.len()); + let mut starting_events = Vec::with_capacity(leaf_state.len()); + + for (k, id) in leaf_state { + if let Ok((ty, st_key)) = services().rooms.short.get_statekey_from_short(k) { + // FIXME: Undo .to_string().into() when StateMap + // is updated to use StateEventType + state.insert((ty.to_string().into(), st_key), id.clone()); + } else { + warn!("Failed to get_statekey_from_short."); + } + starting_events.push(id); + } + + auth_chain_sets.push( + services() + .rooms + .auth_chain + .event_ids_iter(room_id, starting_events) + .await? + .collect(), + ); + + fork_states.push(state); + } + + let lock = services().globals.stateres_mutex.lock(); + let result = state_res::resolve(room_version_id, &fork_states, auth_chain_sets, |id| { + let res = services().rooms.timeline.get_pdu(id); + if let Err(e) = &res { + error!("Failed to fetch event: {}", e); + } + res.ok().flatten() + }); + drop(lock); + + Ok(match result { + Ok(new_state) => Some( + new_state + .into_iter() + .map(|((event_type, state_key), event_id)| { + let shortstatekey = services() + .rooms + .short + .get_or_create_shortstatekey(&event_type.to_string().into(), &state_key)?; + Ok((shortstatekey, event_id)) + }) + .collect::>()?, + ), + Err(e) => { + warn!( + "State resolution on prev events failed, either an event could not be found or deserialization: {}", + e + ); + None + }, + }) + } + + /// Call /state_ids to find out what the state at this pdu is. We trust the + /// server's response to some extend (sic), but we still do a lot of checks + /// on the events + #[tracing::instrument(skip_all)] + async fn fetch_state( + &self, origin: &ServerName, create_event: &PduEvent, room_id: &RoomId, room_version_id: &RoomVersionId, + pub_key_map: &RwLock>>, event_id: &EventId, + ) -> Result>>> { + debug!("Fetching state ids"); + match services() + .sending + .send_federation_request( + origin, + get_room_state_ids::v1::Request { + room_id: room_id.to_owned(), + event_id: (*event_id).to_owned(), + }, + ) + .await + { + Ok(res) => { + debug!("Fetching state events"); + let collect = res + .pdu_ids + .iter() + .map(|x| Arc::from(&**x)) + .collect::>(); + + let state_vec = self + .fetch_and_handle_outliers(origin, &collect, create_event, room_id, room_version_id, pub_key_map) + .await; + + let mut state: HashMap<_, Arc> = HashMap::new(); + for (pdu, _) in state_vec { + let state_key = pdu + .state_key + .clone() + .ok_or_else(|| Error::bad_database("Found non-state pdu in state events."))?; + + let shortstatekey = services() + .rooms + .short + .get_or_create_shortstatekey(&pdu.kind.to_string().into(), &state_key)?; + + match state.entry(shortstatekey) { + hash_map::Entry::Vacant(v) => { + v.insert(Arc::from(&*pdu.event_id)); + }, + hash_map::Entry::Occupied(_) => { + return Err(Error::bad_database( + "State event's type and state_key combination exists multiple times.", + )) + }, + } + } + + // The original create event must still be in the state + let create_shortstatekey = services() + .rooms + .short + .get_shortstatekey(&StateEventType::RoomCreate, "")? + .expect("Room exists"); + + if state.get(&create_shortstatekey).map(AsRef::as_ref) != Some(&create_event.event_id) { + return Err(Error::bad_database("Incoming event refers to wrong create event.")); + } + + Ok(Some(state)) + }, + Err(e) => { + warn!("Fetching state for event failed: {}", e); + Err(e) + }, + } + } + /// Find the event and auth it. Once the event is validated (steps 1 - 8) /// it is appended to the outliers Tree. /// @@ -1045,7 +1057,7 @@ impl Service { continue; } - info!("Fetching {} over federation.", next_id); + debug!("Fetching {} over federation.", next_id); match services() .sending .send_federation_request( @@ -1057,7 +1069,7 @@ impl Service { .await { Ok(res) => { - info!("Got {} over federation", next_id); + debug!("Got {} over federation", next_id); let Ok((calculated_event_id, value)) = pdu::gen_event_id_canonical_json(&res.pdu, room_version_id) else { @@ -1135,7 +1147,7 @@ impl Service { } if time.elapsed() < min_elapsed_duration { - info!("Backing off from {}", next_id); + debug!("Backing off from {}", next_id); continue; } } @@ -1160,7 +1172,9 @@ impl Service { }) } - async fn fetch_unknown_prev_events( + #[allow(clippy::type_complexity)] + #[tracing::instrument(skip_all)] + async fn fetch_prev( &self, origin: &ServerName, create_event: &PduEvent, room_id: &RoomId, room_version_id: &RoomVersionId, pub_key_map: &RwLock>>, initial_set: Vec>, ) -> Result<( @@ -1196,7 +1210,7 @@ impl Service { if amount > services().globals.max_fetch_prev_events() { // Max limit reached - info!( + debug!( "Max prev event limit reached! Limit: {}", services().globals.max_fetch_prev_events() ); @@ -1259,6 +1273,7 @@ impl Service { } /// Returns Ok if the acl allows the server + #[tracing::instrument(skip_all)] pub fn acl_check(&self, server_name: &ServerName, room_id: &RoomId) -> Result<()> { let acl_event = if let Some(acl) = services() @@ -1306,4 +1321,18 @@ impl Service { } Ok(()) } + + fn get_room_version_id(&self, create_event: &PduEvent) -> Result { + let create_event_content: RoomCreateEventContent = + serde_json::from_str(create_event.content.get()).map_err(|e| { + error!("Invalid create event: {}", e); + Error::BadDatabase("Invalid create event in db") + })?; + + Ok(create_event_content.room_version) + } + + fn to_room_version(&self, room_version_id: &RoomVersionId) -> RoomVersion { + RoomVersion::new(room_version_id).expect("room version is supported") + } }