diff --git a/src/api/client/sync.rs b/src/api/client/sync.rs index 9b0facc0..544fe8b6 100644 --- a/src/api/client/sync.rs +++ b/src/api/client/sync.rs @@ -7,12 +7,13 @@ use std::{ use axum::extract::State; use conduit::{ - error, + debug, error, utils::math::{ruma_from_u64, ruma_from_usize, usize_from_ruma, usize_from_u64_truncated}, warn, Err, PduCount, }; use ruma::{ api::client::{ + error::ErrorKind, filter::{FilterDefinition, LazyLoadOptions}, sync::sync_events::{ self, @@ -1081,7 +1082,7 @@ fn share_encrypted_room( /// Sliding Sync endpoint (future endpoint: `/_matrix/client/v4/sync`) pub(crate) async fn sync_events_v4_route( State(services): State, body: Ruma, -) -> Result> { +) -> Result { let sender_user = body.sender_user.expect("user is authenticated"); let sender_device = body.sender_device.expect("user is authenticated"); let mut body = body.body; @@ -1101,6 +1102,19 @@ pub(crate) async fn sync_events_v4_route( .and_then(|string| string.parse().ok()) .unwrap_or(0); + if globalsince != 0 + && !services + .users + .remembered(sender_user.clone(), sender_device.clone(), conn_id.clone()) + { + debug!("Restarting sync stream because it was gone from the database"); + return Err(Error::Request( + ErrorKind::UnknownPos, + "Connection data lost since last time".into(), + http::StatusCode::BAD_REQUEST, + )); + } + if globalsince == 0 { services .users diff --git a/src/service/users/mod.rs b/src/service/users/mod.rs index bdb07310..35f30a61 100644 --- a/src/service/users/mod.rs +++ b/src/service/users/mod.rs @@ -68,6 +68,13 @@ impl Service { #[inline] pub fn exists(&self, user_id: &UserId) -> Result { self.db.exists(user_id) } + pub fn remembered(&self, user_id: OwnedUserId, device_id: OwnedDeviceId, conn_id: String) -> bool { + self.connections + .lock() + .unwrap() + .contains_key(&(user_id, device_id, conn_id)) + } + pub fn forget_sync_request_connection(&self, user_id: OwnedUserId, device_id: OwnedDeviceId, conn_id: String) { self.connections .lock()