diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs index d3079d61..46f4fc83 100644 --- a/src/api/client_server/sync.rs +++ b/src/api/client_server/sync.rs @@ -28,7 +28,7 @@ use ruma::{ uint, DeviceId, EventId, OwnedDeviceId, OwnedUserId, RoomId, UInt, UserId, }; use tokio::sync::watch::Sender; -use tracing::{debug, error}; +use tracing::{debug, error, Instrument as _, Span}; use crate::{ service::{pdu::EventHash, rooms::timeline::PduCount}, @@ -271,164 +271,17 @@ async fn sync_helper( .rooms_left(&sender_user) .collect(); for result in all_left_rooms { - let (room_id, _) = result?; - - { - // Get and drop the lock to wait for remaining operations to finish - let mutex_insert = Arc::clone( - services() - .globals - .roomid_mutex_insert - .write() - .await - .entry(room_id.clone()) - .or_default(), - ); - let insert_lock = mutex_insert.lock().await; - drop(insert_lock); - }; - - let left_count = services() - .rooms - .state_cache - .get_left_count(&room_id, &sender_user)?; - - // Left before last sync - if Some(since) >= left_count { - continue; - } - - if !services().rooms.metadata.exists(&room_id)? { - // This is just a rejected invite, not a room we know - // Insert a leave event anyways - let event = PduEvent { - event_id: EventId::new(services().globals.server_name()).into(), - sender: sender_user.clone(), - origin: None, - origin_server_ts: utils::millis_since_unix_epoch() - .try_into() - .expect("Timestamp is valid js_int value"), - kind: TimelineEventType::RoomMember, - content: serde_json::from_str(r#"{"membership":"leave"}"#).expect("this is valid JSON"), - state_key: Some(sender_user.to_string()), - unsigned: None, - // The following keys are dropped on conversion - room_id: room_id.clone(), - prev_events: vec![], - depth: uint!(1), - auth_events: vec![], - redacts: None, - hashes: EventHash { - sha256: String::new(), - }, - signatures: None, - }; - - left_rooms.insert( - room_id, - LeftRoom { - account_data: RoomAccountData { - events: Vec::new(), - }, - timeline: Timeline { - limited: false, - prev_batch: Some(next_batch_string.clone()), - events: Vec::new(), - }, - state: State { - events: vec![event.to_sync_state_event()], - }, - }, - ); - continue; - } - - let mut left_state_events = Vec::new(); - - let since_shortstatehash = services() - .rooms - .user - .get_token_shortstatehash(&room_id, since)?; - - let since_state_ids = match since_shortstatehash { - Some(s) => services().rooms.state_accessor.state_full_ids(s).await?, - None => HashMap::new(), - }; - - let Some(left_event_id) = services().rooms.state_accessor.room_state_get_id( - &room_id, - &StateEventType::RoomMember, - sender_user.as_str(), - )? - else { - error!("Left room but no left state event"); - continue; - }; - - let Some(left_shortstatehash) = services() - .rooms - .state_accessor - .pdu_shortstatehash(&left_event_id)? - else { - error!("Leave event has no state"); - continue; - }; - - let mut left_state_ids = services() - .rooms - .state_accessor - .state_full_ids(left_shortstatehash) - .await?; - - let leave_shortstatekey = services() - .rooms - .short - .get_or_create_shortstatekey(&StateEventType::RoomMember, sender_user.as_str())?; - - left_state_ids.insert(leave_shortstatekey, left_event_id); - - let mut i = 0; - for (key, id) in left_state_ids { - if full_state || since_state_ids.get(&key) != Some(&id) { - let (event_type, state_key) = services().rooms.short.get_statekey_from_short(key)?; - - if !lazy_load_enabled - || event_type != StateEventType::RoomMember - || full_state - // TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565 - || (cfg!(feature = "element_hacks") && *sender_user == state_key) - { - let Some(pdu) = services().rooms.timeline.get_pdu(&id)? else { - error!("Pdu in state not found: {}", id); - continue; - }; - - left_state_events.push(pdu.to_sync_state_event()); - - i += 1; - if i % 100 == 0 { - tokio::task::yield_now().await; - } - } - } - } - - left_rooms.insert( - room_id.clone(), - LeftRoom { - account_data: RoomAccountData { - events: Vec::new(), - }, - timeline: Timeline { - limited: false, - prev_batch: Some(next_batch_string.clone()), - events: Vec::new(), - }, - state: State { - events: left_state_events, - }, - }, - ); + handle_left_room( + since, + &result?.0, + &sender_user, + &mut left_rooms, + &next_batch_string, + full_state, + lazy_load_enabled, + ) + .instrument(Span::current()) + .await?; } let mut invited_rooms = BTreeMap::new(); @@ -567,6 +420,170 @@ async fn sync_helper( } } +#[tracing::instrument(skip_all, fields(user_id = %sender_user, room_id = %room_id))] +async fn handle_left_room( + since: u64, room_id: &RoomId, sender_user: &UserId, left_rooms: &mut BTreeMap, + next_batch_string: &str, full_state: bool, lazy_load_enabled: bool, +) -> Result<()> { + { + // Get and drop the lock to wait for remaining operations to finish + let mutex_insert = Arc::clone( + services() + .globals + .roomid_mutex_insert + .write() + .await + .entry(room_id.to_owned()) + .or_default(), + ); + let insert_lock = mutex_insert.lock().await; + drop(insert_lock); + }; + + let left_count = services() + .rooms + .state_cache + .get_left_count(room_id, sender_user)?; + + // Left before last sync + if Some(since) >= left_count { + return Ok(()); + } + + if !services().rooms.metadata.exists(room_id)? { + // This is just a rejected invite, not a room we know + // Insert a leave event anyways + let event = PduEvent { + event_id: EventId::new(services().globals.server_name()).into(), + sender: sender_user.to_owned(), + origin: None, + origin_server_ts: utils::millis_since_unix_epoch() + .try_into() + .expect("Timestamp is valid js_int value"), + kind: TimelineEventType::RoomMember, + content: serde_json::from_str(r#"{"membership":"leave"}"#).expect("this is valid JSON"), + state_key: Some(sender_user.to_string()), + unsigned: None, + // The following keys are dropped on conversion + room_id: room_id.to_owned(), + prev_events: vec![], + depth: uint!(1), + auth_events: vec![], + redacts: None, + hashes: EventHash { + sha256: String::new(), + }, + signatures: None, + }; + + left_rooms.insert( + room_id.to_owned(), + LeftRoom { + account_data: RoomAccountData { + events: Vec::new(), + }, + timeline: Timeline { + limited: false, + prev_batch: Some(next_batch_string.to_owned()), + events: Vec::new(), + }, + state: State { + events: vec![event.to_sync_state_event()], + }, + }, + ); + return Ok(()); + } + + let mut left_state_events = Vec::new(); + + let since_shortstatehash = services() + .rooms + .user + .get_token_shortstatehash(room_id, since)?; + + let since_state_ids = match since_shortstatehash { + Some(s) => services().rooms.state_accessor.state_full_ids(s).await?, + None => HashMap::new(), + }; + + let Some(left_event_id) = services().rooms.state_accessor.room_state_get_id( + room_id, + &StateEventType::RoomMember, + sender_user.as_str(), + )? + else { + error!("Left room but no left state event"); + return Ok(()); + }; + + let Some(left_shortstatehash) = services() + .rooms + .state_accessor + .pdu_shortstatehash(&left_event_id)? + else { + error!(event_id = %left_event_id, "Leave event has no state"); + return Ok(()); + }; + + let mut left_state_ids = services() + .rooms + .state_accessor + .state_full_ids(left_shortstatehash) + .await?; + + let leave_shortstatekey = services() + .rooms + .short + .get_or_create_shortstatekey(&StateEventType::RoomMember, sender_user.as_str())?; + + left_state_ids.insert(leave_shortstatekey, left_event_id); + + let mut i = 0; + for (key, id) in left_state_ids { + if full_state || since_state_ids.get(&key) != Some(&id) { + let (event_type, state_key) = services().rooms.short.get_statekey_from_short(key)?; + + if !lazy_load_enabled + || event_type != StateEventType::RoomMember + || full_state + // TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565 + || (cfg!(feature = "element_hacks") && *sender_user == state_key) + { + let Some(pdu) = services().rooms.timeline.get_pdu(&id)? else { + error!("Pdu in state not found: {}", id); + continue; + }; + + left_state_events.push(pdu.to_sync_state_event()); + + i += 1; + if i % 100 == 0 { + tokio::task::yield_now().await; + } + } + } + } + + left_rooms.insert( + room_id.to_owned(), + LeftRoom { + account_data: RoomAccountData { + events: Vec::new(), + }, + timeline: Timeline { + limited: false, + prev_batch: Some(next_batch_string.to_owned()), + events: Vec::new(), + }, + state: State { + events: left_state_events, + }, + }, + ); + Ok(()) +} + async fn process_presence_updates( presence_updates: &mut HashMap, since: u64, syncing_user: &OwnedUserId, ) -> Result<()> {