From ca1c77d76bffa65867040579cac1e085cb1a7ed8 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Mon, 1 Apr 2024 20:48:40 -0700 Subject: [PATCH] refactor presence to not involve rooms. Signed-off-by: Jason Volk --- src/api/client_server/presence.rs | 21 +--- src/api/client_server/profile.rs | 20 ++-- src/api/client_server/sync.rs | 32 +++-- src/api/server_server.rs | 17 ++- src/config/mod.rs | 6 +- src/database/key_value/presence.rs | 182 +++++++++++------------------ src/database/mod.rs | 42 ++----- src/service/mod.rs | 6 +- src/service/presence/data.rs | 16 +-- src/service/presence/mod.rs | 161 ++++++++++++++++--------- src/service/sending/mod.rs | 41 ++++--- 11 files changed, 263 insertions(+), 281 deletions(-) diff --git a/src/api/client_server/presence.rs b/src/api/client_server/presence.rs index 41939294..424fe105 100644 --- a/src/api/client_server/presence.rs +++ b/src/api/client_server/presence.rs @@ -16,18 +16,9 @@ pub async fn set_presence_route(body: Ruma) -> Result } let sender_user = body.sender_user.as_ref().expect("user is authenticated"); - for room_id in services().rooms.state_cache.rooms_joined(sender_user) { - let room_id = room_id?; - - services().presence.set_presence( - &room_id, - sender_user, - body.presence.clone(), - None, - None, - body.status_msg.clone(), - )?; - } + services() + .presence + .set_presence(sender_user, &body.presence, None, None, body.status_msg.clone())?; Ok(set_presence::v3::Response {}) } @@ -46,14 +37,12 @@ pub async fn get_presence_route(body: Ruma) -> Result let mut presence_event = None; - for room_id in services() + for _room_id in services() .rooms .user .get_shared_rooms(vec![sender_user.clone(), body.user_id.clone()])? { - let room_id = room_id?; - - if let Some(presence) = services().presence.get_presence(&room_id, sender_user)? { + if let Some(presence) = services().presence.get_presence(sender_user)? { presence_event = Some(presence); break; } diff --git a/src/api/client_server/profile.rs b/src/api/client_server/profile.rs index 29202c84..083a3073 100644 --- a/src/api/client_server/profile.rs +++ b/src/api/client_server/profile.rs @@ -86,10 +86,12 @@ pub async fn set_displayname_route( .await; } - // Presence update - services() - .presence - .ping_presence(sender_user, PresenceState::Online)?; + if services().globals.allow_local_presence() { + // Presence update + services() + .presence + .ping_presence(sender_user, &PresenceState::Online)?; + } Ok(set_display_name::v3::Response {}) } @@ -224,10 +226,12 @@ pub async fn set_avatar_url_route(body: Ruma) -> Re .await; } - // Presence update - services() - .presence - .ping_presence(sender_user, PresenceState::Online)?; + if services().globals.allow_local_presence() { + // Presence update + services() + .presence + .ping_presence(sender_user, &PresenceState::Online)?; + } Ok(set_avatar_url::v3::Response {}) } diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs index a8d1f7f4..2f4fded8 100644 --- a/src/api/client_server/sync.rs +++ b/src/api/client_server/sync.rs @@ -171,9 +171,11 @@ async fn sync_helper( // bool = caching allowed ) -> Result<(sync_events::v3::Response, bool), Error> { // Presence update - services() - .presence - .ping_presence(&sender_user, body.set_presence)?; + if services().globals.allow_local_presence() { + services() + .presence + .ping_presence(&sender_user, &body.set_presence)?; + } // Setup watchers, so if there's no response, we can wait for them let watcher = services().globals.watch(&sender_user, &sender_device); @@ -222,6 +224,10 @@ async fn sync_helper( .filter_map(Result::ok), ); + if services().globals.allow_local_presence() { + process_presence_updates(&mut presence_updates, since, &sender_user).await?; + } + let all_joined_rooms = services() .rooms .state_cache @@ -252,10 +258,6 @@ async fn sync_helper( if !joined_room.is_empty() { joined_rooms.insert(room_id.clone(), joined_room); } - - if services().globals.allow_local_presence() { - process_room_presence_updates(&mut presence_updates, &room_id, since).await?; - } } } @@ -522,11 +524,19 @@ async fn sync_helper( } } -async fn process_room_presence_updates( - presence_updates: &mut HashMap, room_id: &RoomId, since: u64, +async fn process_presence_updates( + presence_updates: &mut HashMap, since: u64, syncing_user: &OwnedUserId, ) -> Result<()> { - // Take presence updates from this room - for (user_id, _, presence_event) in services().presence.presence_since(room_id, since) { + // Take presence updates + for (user_id, _, presence_event) in services().presence.presence_since(since) { + if !services() + .rooms + .state_cache + .user_sees_user(syncing_user, &user_id)? + { + continue; + } + match presence_updates.entry(user_id) { Entry::Vacant(slot) => { slot.insert(presence_event); diff --git a/src/api/server_server.rs b/src/api/server_server.rs index 881f0990..b73c8726 100644 --- a/src/api/server_server.rs +++ b/src/api/server_server.rs @@ -338,16 +338,13 @@ pub async fn send_transaction_message_route( } for update in presence.push { - for room_id in services().rooms.state_cache.rooms_joined(&update.user_id) { - services().presence.set_presence( - &room_id?, - &update.user_id, - update.presence.clone(), - Some(update.currently_active), - Some(update.last_active_ago), - update.status_msg.clone(), - )?; - } + services().presence.set_presence( + &update.user_id, + &update.presence, + Some(update.currently_active), + Some(update.last_active_ago), + update.status_msg.clone(), + )?; } }, Edu::Receipt(receipt) => { diff --git a/src/config/mod.rs b/src/config/mod.rs index ee500571..730740e8 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -213,6 +213,8 @@ pub struct Config { pub presence_idle_timeout_s: u64, #[serde(default = "default_presence_offline_timeout_s")] pub presence_offline_timeout_s: u64, + #[serde(default = "true_fn")] + pub presence_timeout_remote_users: bool, #[serde(default = "true_fn")] pub allow_incoming_read_receipts: bool, @@ -718,9 +720,9 @@ fn default_notification_push_path() -> String { "/_matrix/push/v1/notify".to_own fn default_turn_ttl() -> u64 { 60 * 60 * 24 } -fn default_presence_idle_timeout_s() -> u64 { 2 * 60 } +fn default_presence_idle_timeout_s() -> u64 { 5 * 60 } -fn default_presence_offline_timeout_s() -> u64 { 15 * 60 } +fn default_presence_offline_timeout_s() -> u64 { 30 * 60 } fn default_typing_federation_timeout_s() -> u64 { 30 } diff --git a/src/database/key_value/presence.rs b/src/database/key_value/presence.rs index 23f5c43c..153c5e0b 100644 --- a/src/database/key_value/presence.rs +++ b/src/database/key_value/presence.rs @@ -1,7 +1,5 @@ -use std::time::Duration; - -use ruma::{events::presence::PresenceEvent, presence::PresenceState, OwnedUserId, RoomId, UInt, UserId}; -use tracing::error; +use ruma::{events::presence::PresenceEvent, presence::PresenceState, OwnedUserId, UInt, UserId}; +use tracing::debug; use crate::{ database::KeyValueDatabase, @@ -12,149 +10,98 @@ use crate::{ }; impl service::presence::Data for KeyValueDatabase { - fn get_presence(&self, room_id: &RoomId, user_id: &UserId) -> Result> { - let key = presence_key(room_id, user_id); + fn get_presence(&self, user_id: &UserId) -> Result> { + if let Some(count_bytes) = self.userid_presenceid.get(user_id.as_bytes())? { + let count = utils::u64_from_bytes(&count_bytes) + .map_err(|_e| Error::bad_database("No 'count' bytes in presence key"))?; - self.roomuserid_presence - .get(&key)? - .map(|presence_bytes| -> Result { - Presence::from_json_bytes(&presence_bytes)?.to_presence_event(user_id) - }) - .transpose() - } - - fn ping_presence(&self, user_id: &UserId, new_state: PresenceState) -> Result<()> { - let Some(ref tx) = *self.presence_timer_sender else { - return Ok(()); - }; - - let now = utils::millis_since_unix_epoch(); - let mut state_changed = false; - - for room_id in services().rooms.state_cache.rooms_joined(user_id) { - let key = presence_key(&room_id?, user_id); - - let presence_bytes = self.roomuserid_presence.get(&key)?; - - if let Some(presence_bytes) = presence_bytes { - let presence = Presence::from_json_bytes(&presence_bytes)?; - if presence.state != new_state { - state_changed = true; - break; - } - } - } - - let count = if state_changed { - services().globals.next_count()? + let key = presenceid_key(count, user_id); + self.presenceid_presence + .get(&key)? + .map(|presence_bytes| -> Result<(u64, PresenceEvent)> { + Ok((count, Presence::from_json_bytes(&presence_bytes)?.to_presence_event(user_id)?)) + }) + .transpose() } else { - services().globals.current_count()? - }; - - for room_id in services().rooms.state_cache.rooms_joined(user_id) { - let key = presence_key(&room_id?, user_id); - - let presence_bytes = self.roomuserid_presence.get(&key)?; - - let new_presence = match presence_bytes { - Some(presence_bytes) => { - let mut presence = Presence::from_json_bytes(&presence_bytes)?; - presence.state = new_state.clone(); - presence.currently_active = presence.state == PresenceState::Online; - presence.last_active_ts = now; - presence.last_count = count; - - presence - }, - None => Presence::new(new_state.clone(), new_state == PresenceState::Online, now, count, None), - }; - - self.roomuserid_presence - .insert(&key, &new_presence.to_json_bytes()?)?; + Ok(None) } - - let timeout = match new_state { - PresenceState::Online => services().globals.config.presence_idle_timeout_s, - _ => services().globals.config.presence_offline_timeout_s, - }; - - tx.send((user_id.to_owned(), Duration::from_secs(timeout))) - .map_err(|e| { - error!("Failed to add presence timer: {}", e); - Error::bad_database("Failed to add presence timer") - }) } fn set_presence( - &self, room_id: &RoomId, user_id: &UserId, presence_state: PresenceState, currently_active: Option, + &self, user_id: &UserId, presence_state: &PresenceState, currently_active: Option, last_active_ago: Option, status_msg: Option, ) -> Result<()> { - let Some(ref tx) = *self.presence_timer_sender else { - return Ok(()); + let last_presence = self.get_presence(user_id)?; + let state_changed = match last_presence { + None => true, + Some(ref presence) => presence.1.content.presence != *presence_state, }; let now = utils::millis_since_unix_epoch(); - let last_active_ts = match last_active_ago { - Some(last_active_ago) => now.saturating_sub(last_active_ago.into()), - None => now, + let last_last_active_ts = match last_presence { + None => 0, + Some((_, ref presence)) => now.saturating_sub(presence.content.last_active_ago.unwrap_or_default().into()), }; - let key = presence_key(room_id, user_id); + let last_active_ts = match last_active_ago { + None => now, + Some(last_active_ago) => now.saturating_sub(last_active_ago.into()), + }; + + // tighten for state flicker? + if !state_changed && last_active_ts <= last_last_active_ts { + debug!( + "presence spam {:?} last_active_ts:{:?} <= {:?}", + user_id, last_active_ts, last_last_active_ts + ); + return Ok(()); + } let presence = Presence::new( - presence_state, + presence_state.to_owned(), currently_active.unwrap_or(false), last_active_ts, - services().globals.next_count()?, status_msg, ); + let count = services().globals.next_count()?; + let key = presenceid_key(count, user_id); - let timeout = match presence.state { - PresenceState::Online => services().globals.config.presence_idle_timeout_s, - _ => services().globals.config.presence_offline_timeout_s, - }; - - tx.send((user_id.to_owned(), Duration::from_secs(timeout))) - .map_err(|e| { - error!("Failed to add presence timer: {}", e); - Error::bad_database("Failed to add presence timer") - })?; - - self.roomuserid_presence + self.presenceid_presence .insert(&key, &presence.to_json_bytes()?)?; + self.userid_presenceid + .insert(user_id.as_bytes(), &count.to_be_bytes())?; + + if let Some((last_count, _)) = last_presence { + let key = presenceid_key(last_count, user_id); + self.presenceid_presence.remove(&key)?; + } + Ok(()) } fn remove_presence(&self, user_id: &UserId) -> Result<()> { - for room_id in services().rooms.state_cache.rooms_joined(user_id) { - let key = presence_key(&room_id?, user_id); - - self.roomuserid_presence.remove(&key)?; + if let Some(count_bytes) = self.userid_presenceid.get(user_id.as_bytes())? { + let count = utils::u64_from_bytes(&count_bytes) + .map_err(|_e| Error::bad_database("No 'count' bytes in presence key"))?; + let key = presenceid_key(count, user_id); + self.presenceid_presence.remove(&key)?; + self.userid_presenceid.remove(user_id.as_bytes())?; } Ok(()) } - fn presence_since<'a>( - &'a self, room_id: &RoomId, since: u64, - ) -> Box + 'a> { - let prefix = [room_id.as_bytes(), &[0xFF]].concat(); - + fn presence_since<'a>(&'a self, since: u64) -> Box + 'a> { Box::new( - self.roomuserid_presence - .scan_prefix(prefix) + self.presenceid_presence + .iter() .flat_map(|(key, presence_bytes)| -> Result<(OwnedUserId, u64, PresenceEvent)> { - let user_id = user_id_from_bytes( - key.rsplit(|byte| *byte == 0xFF) - .next() - .ok_or_else(|| Error::bad_database("No UserID bytes in presence key"))?, - )?; - + let (count, user_id) = presenceid_parse(&key)?; let presence = Presence::from_json_bytes(&presence_bytes)?; let presence_event = presence.to_presence_event(&user_id)?; - Ok((user_id, presence.last_count, presence_event)) + Ok((user_id, count, presence_event)) }) .filter(move |(_, count, _)| *count > since), ) @@ -162,6 +109,15 @@ impl service::presence::Data for KeyValueDatabase { } #[inline] -fn presence_key(room_id: &RoomId, user_id: &UserId) -> Vec { - [room_id.as_bytes(), &[0xFF], user_id.as_bytes()].concat() +fn presenceid_key(count: u64, user_id: &UserId) -> Vec { + [count.to_be_bytes().to_vec(), user_id.as_bytes().to_vec()].concat() +} + +#[inline] +fn presenceid_parse(key: &[u8]) -> Result<(u64, OwnedUserId)> { + let (count, user_id) = key.split_at(8); + let user_id = user_id_from_bytes(user_id)?; + let count = utils::u64_from_bytes(count).unwrap(); + + Ok((count, user_id)) } diff --git a/src/database/mod.rs b/src/database/mod.rs index d2293c14..86f499b2 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -28,16 +28,10 @@ use ruma::{ use serde::Deserialize; #[cfg(unix)] use tokio::signal::unix::{signal, SignalKind}; -use tokio::{ - sync::mpsc, - time::{interval, Instant}, -}; +use tokio::time::{interval, Instant}; use tracing::{debug, error, info, warn}; -use crate::{ - service::{presence::presence_handler, rooms::timeline::PduCount}, - services, utils, Config, Error, PduEvent, Result, Services, SERVICES, -}; +use crate::{service::rooms::timeline::PduCount, services, utils, Config, Error, PduEvent, Result, Services, SERVICES}; pub struct KeyValueDatabase { db: Arc, @@ -65,8 +59,9 @@ pub struct KeyValueDatabase { pub(super) userid_usersigningkeyid: Arc, pub(super) userfilterid_filter: Arc, // UserFilterId = UserId + FilterId - - pub(super) todeviceid_events: Arc, // ToDeviceId = UserId + DeviceId + Count + pub(super) todeviceid_events: Arc, // ToDeviceId = UserId + DeviceId + Count + pub(super) userid_presenceid: Arc, // UserId => Count + pub(super) presenceid_presence: Arc, // Count + UserId => Presence //pub uiaa: uiaa::Uiaa, pub(super) userdevicesessionid_uiaainfo: Arc, // User-interactive authentication @@ -77,7 +72,6 @@ pub struct KeyValueDatabase { pub(super) readreceiptid_readreceipt: Arc, // ReadReceiptId = RoomId + Count + UserId pub(super) roomuserid_privateread: Arc, // RoomUserId = Room + User, PrivateRead = Count pub(super) roomuserid_lastprivatereadupdate: Arc, // LastPrivateReadUpdate = Count - pub(super) roomuserid_presence: Arc, //pub rooms: rooms::Rooms, pub(super) pduid_pdu: Arc, // PduId = ShortRoomId + Count @@ -185,7 +179,6 @@ pub struct KeyValueDatabase { pub(super) our_real_users_cache: RwLock>>>, pub(super) appservice_in_room_cache: RwLock>>, pub(super) lasttimelinecount_cache: Mutex>, - pub(super) presence_timer_sender: Arc>>, } #[derive(Deserialize)] @@ -275,14 +268,6 @@ impl KeyValueDatabase { }, }; - let presence_sender = if config.allow_local_presence { - let (presence_sender, presence_receiver) = mpsc::unbounded_channel(); - Self::start_presence_handler(presence_receiver).await; - Some(presence_sender) - } else { - None - }; - let db_raw = Box::new(Self { db: builder.clone(), userid_password: builder.open_tree("userid_password")?, @@ -302,13 +287,14 @@ impl KeyValueDatabase { userid_usersigningkeyid: builder.open_tree("userid_usersigningkeyid")?, userfilterid_filter: builder.open_tree("userfilterid_filter")?, todeviceid_events: builder.open_tree("todeviceid_events")?, + userid_presenceid: builder.open_tree("userid_presenceid")?, + presenceid_presence: builder.open_tree("presenceid_presence")?, userdevicesessionid_uiaainfo: builder.open_tree("userdevicesessionid_uiaainfo")?, userdevicesessionid_uiaarequest: RwLock::new(BTreeMap::new()), readreceiptid_readreceipt: builder.open_tree("readreceiptid_readreceipt")?, roomuserid_privateread: builder.open_tree("roomuserid_privateread")?, // "Private" read receipt roomuserid_lastprivatereadupdate: builder.open_tree("roomuserid_lastprivatereadupdate")?, - roomuserid_presence: builder.open_tree("roomuserid_presence")?, pduid_pdu: builder.open_tree("pduid_pdu")?, eventid_pduid: builder.open_tree("eventid_pduid")?, roomid_pduleaves: builder.open_tree("roomid_pduleaves")?, @@ -404,7 +390,6 @@ impl KeyValueDatabase { our_real_users_cache: RwLock::new(HashMap::new()), appservice_in_room_cache: RwLock::new(HashMap::new()), lasttimelinecount_cache: Mutex::new(HashMap::new()), - presence_timer_sender: Arc::new(presence_sender), }); let db = Box::leak(db_raw); @@ -1059,6 +1044,10 @@ impl KeyValueDatabase { services().sending.start_handler(); + if config.allow_local_presence { + services().presence.start_handler(); + } + Self::start_cleanup_task().await; if services().globals.allow_check_for_updates() { Self::start_check_for_updates_task().await; @@ -1180,15 +1169,6 @@ impl KeyValueDatabase { } }); } - - async fn start_presence_handler(presence_timer_receiver: mpsc::UnboundedReceiver<(OwnedUserId, Duration)>) { - tokio::spawn(async move { - match presence_handler(presence_timer_receiver).await { - Ok(()) => warn!("Presence maintenance task finished"), - Err(e) => error!("Presence maintenance task finished with error: {e}"), - } - }); - } } /// Sets the emergency password and push rules for the @conduit account in case diff --git a/src/service/mod.rs b/src/service/mod.rs index 1af75846..1e29eacf 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -31,7 +31,7 @@ pub struct Services<'a> { pub uiaa: uiaa::Service, pub users: users::Service, pub account_data: account_data::Service, - pub presence: presence::Service, + pub presence: Arc, pub admin: Arc, pub globals: globals::Service<'a>, pub key_backups: key_backups::Service, @@ -155,9 +155,7 @@ impl Services<'_> { account_data: account_data::Service { db, }, - presence: presence::Service { - db, - }, + presence: presence::Service::build(db, config), admin: admin::Service::build(), key_backups: key_backups::Service { db, diff --git a/src/service/presence/data.rs b/src/service/presence/data.rs index 6b7ad4c2..649601a6 100644 --- a/src/service/presence/data.rs +++ b/src/service/presence/data.rs @@ -1,18 +1,14 @@ -use ruma::{events::presence::PresenceEvent, presence::PresenceState, OwnedUserId, RoomId, UInt, UserId}; +use ruma::{events::presence::PresenceEvent, presence::PresenceState, OwnedUserId, UInt, UserId}; use crate::Result; pub trait Data: Send + Sync { - /// Returns the latest presence event for the given user in the given room. - fn get_presence(&self, room_id: &RoomId, user_id: &UserId) -> Result>; - - /// Pings the presence of the given user in the given room, setting the - /// specified state. - fn ping_presence(&self, user_id: &UserId, new_state: PresenceState) -> Result<()>; + /// Returns the latest presence event for the given user. + fn get_presence(&self, user_id: &UserId) -> Result>; /// Adds a presence event which will be saved until a new event replaces it. fn set_presence( - &self, room_id: &RoomId, user_id: &UserId, presence_state: PresenceState, currently_active: Option, + &self, user_id: &UserId, presence_state: &PresenceState, currently_active: Option, last_active_ago: Option, status_msg: Option, ) -> Result<()>; @@ -21,7 +17,5 @@ pub trait Data: Send + Sync { /// Returns the most recent presence updates that happened after the event /// with id `since`. - fn presence_since<'a>( - &'a self, room_id: &RoomId, since: u64, - ) -> Box + 'a>; + fn presence_since<'a>(&'a self, since: u64) -> Box + 'a>; } diff --git a/src/service/presence/mod.rs b/src/service/presence/mod.rs index a31ba53a..534894cc 100644 --- a/src/service/presence/mod.rs +++ b/src/service/presence/mod.rs @@ -1,19 +1,22 @@ mod data; -use std::time::Duration; +use std::{sync::Arc, time::Duration}; pub use data::Data; use futures_util::{stream::FuturesUnordered, StreamExt}; use ruma::{ events::presence::{PresenceEvent, PresenceEventContent}, presence::PresenceState, - OwnedUserId, RoomId, UInt, UserId, + OwnedUserId, UInt, UserId, }; use serde::{Deserialize, Serialize}; -use tokio::{sync::mpsc, time::sleep}; -use tracing::debug; +use tokio::{ + sync::{mpsc, Mutex}, + time::sleep, +}; +use tracing::{debug, error}; -use crate::{services, utils, Error, Result}; +use crate::{services, utils, Config, Error, Result}; /// Represents data required to be kept in order to implement the presence /// specification. @@ -22,19 +25,15 @@ pub struct Presence { pub state: PresenceState, pub currently_active: bool, pub last_active_ts: u64, - pub last_count: u64, pub status_msg: Option, } impl Presence { - pub fn new( - state: PresenceState, currently_active: bool, last_active_ts: u64, last_count: u64, status_msg: Option, - ) -> Self { + pub fn new(state: PresenceState, currently_active: bool, last_active_ts: u64, status_msg: Option) -> Self { Self { state, currently_active, last_active_ts, - last_count, status_msg, } } @@ -72,27 +71,94 @@ impl Presence { pub struct Service { pub db: &'static dyn Data, + pub timer_sender: mpsc::UnboundedSender<(OwnedUserId, Duration)>, + timer_receiver: Mutex>, + timeout_remote_users: bool, } impl Service { - /// Returns the latest presence event for the given user in the given room. - pub fn get_presence(&self, room_id: &RoomId, user_id: &UserId) -> Result> { - self.db.get_presence(room_id, user_id) + pub fn build(db: &'static dyn Data, config: &Config) -> Arc { + let (timer_sender, timer_receiver) = mpsc::unbounded_channel(); + + Arc::new(Self { + db, + timer_sender, + timer_receiver: Mutex::new(timer_receiver), + timeout_remote_users: config.presence_timeout_remote_users, + }) + } + + pub fn start_handler(self: &Arc) { + let self_ = Arc::clone(self); + tokio::spawn(async move { + self_ + .handler() + .await + .expect("Failed to start presence handler"); + }); + } + + /// Returns the latest presence event for the given user. + pub fn get_presence(&self, user_id: &UserId) -> Result> { + if let Some((_, presence)) = self.db.get_presence(user_id)? { + Ok(Some(presence)) + } else { + Ok(None) + } } /// Pings the presence of the given user in the given room, setting the /// specified state. - pub fn ping_presence(&self, user_id: &UserId, new_state: PresenceState) -> Result<()> { - self.db.ping_presence(user_id, new_state) + pub fn ping_presence(&self, user_id: &UserId, new_state: &PresenceState) -> Result<()> { + let last_presence = self.db.get_presence(user_id)?; + let state_changed = match last_presence { + None => true, + Some((_, ref presence)) => presence.content.presence != *new_state, + }; + + let last_last_active_ago = match last_presence { + None => 0_u64, + Some((_, ref presence)) => presence.content.last_active_ago.unwrap_or_default().into(), + }; + + const REFRESH_TIMEOUT: u64 = 60 * 25 * 1000; + if !state_changed && last_last_active_ago < REFRESH_TIMEOUT { + return Ok(()); + } + + let status_msg = match last_presence { + Some((_, ref presence)) => presence.content.status_msg.clone(), + None => Some(String::new()), + }; + + let last_active_ago = UInt::new(0); + let currently_active = *new_state == PresenceState::Online; + self.set_presence(user_id, new_state, Some(currently_active), last_active_ago, status_msg) } /// Adds a presence event which will be saved until a new event replaces it. pub fn set_presence( - &self, room_id: &RoomId, user_id: &UserId, presence_state: PresenceState, currently_active: Option, + &self, user_id: &UserId, presence_state: &PresenceState, currently_active: Option, last_active_ago: Option, status_msg: Option, ) -> Result<()> { self.db - .set_presence(room_id, user_id, presence_state, currently_active, last_active_ago, status_msg) + .set_presence(user_id, presence_state, currently_active, last_active_ago, status_msg)?; + + if self.timeout_remote_users || user_id.server_name() == services().globals.server_name() { + let timeout = match presence_state { + PresenceState::Online => services().globals.config.presence_idle_timeout_s, + _ => services().globals.config.presence_offline_timeout_s, + }; + + self.timer_sender + .send((user_id.to_owned(), Duration::from_secs(timeout))) + .map_err(|e| { + error!("Failed to add presence timer: {}", e); + Error::bad_database("Failed to add presence timer") + })?; + } + + Ok(()) } /// Removes the presence record for the given user from the database. @@ -100,29 +166,23 @@ impl Service { /// Returns the most recent presence updates that happened after the event /// with id `since`. - pub fn presence_since( - &self, room_id: &RoomId, since: u64, - ) -> Box> { - self.db.presence_since(room_id, since) + pub fn presence_since(&self, since: u64) -> Box> { + self.db.presence_since(since) } -} -pub async fn presence_handler( - mut presence_timer_receiver: mpsc::UnboundedReceiver<(OwnedUserId, Duration)>, -) -> Result<()> { - let mut presence_timers = FuturesUnordered::new(); + async fn handler(&self) -> Result<()> { + let mut presence_timers = FuturesUnordered::new(); + let mut receiver = self.timer_receiver.lock().await; + loop { + tokio::select! { + Some((user_id, timeout)) = receiver.recv() => { + debug!("Adding timer {}: {user_id} timeout:{timeout:?}", presence_timers.len()); + presence_timers.push(presence_timer(user_id, timeout)); + } - loop { - debug!("Number of presence timers: {}", presence_timers.len()); - - tokio::select! { - Some((user_id, timeout)) = presence_timer_receiver.recv() => { - debug!("Adding timer for user '{user_id}': Timeout {timeout:?}"); - presence_timers.push(presence_timer(user_id, timeout)); - } - - Some(user_id) = presence_timers.next() => { - process_presence_timer(&user_id)?; + Some(user_id) = presence_timers.next() => { + process_presence_timer(&user_id)?; + } } } } @@ -142,16 +202,12 @@ fn process_presence_timer(user_id: &OwnedUserId) -> Result<()> { let mut last_active_ago = None; let mut status_msg = None; - for room_id in services().rooms.state_cache.rooms_joined(user_id) { - let presence_event = services().presence.get_presence(&room_id?, user_id)?; + let presence_event = services().presence.get_presence(user_id)?; - if let Some(presence_event) = presence_event { - presence_state = presence_event.content.presence; - last_active_ago = presence_event.content.last_active_ago; - status_msg = presence_event.content.status_msg; - - break; - } + if let Some(presence_event) = presence_event { + presence_state = presence_event.content.presence; + last_active_ago = presence_event.content.last_active_ago; + status_msg = presence_event.content.status_msg; } let new_state = match (&presence_state, last_active_ago.map(u64::from)) { @@ -163,16 +219,9 @@ fn process_presence_timer(user_id: &OwnedUserId) -> Result<()> { debug!("Processed presence timer for user '{user_id}': Old state = {presence_state}, New state = {new_state:?}"); if let Some(new_state) = new_state { - for room_id in services().rooms.state_cache.rooms_joined(user_id) { - services().presence.set_presence( - &room_id?, - user_id, - new_state.clone(), - Some(false), - last_active_ago, - status_msg.clone(), - )?; - } + services() + .presence + .set_presence(user_id, &new_state, Some(false), last_active_ago, status_msg)?; } Ok(()) diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index eeeab84d..c666bc94 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -1,4 +1,5 @@ use std::{ + cmp, collections::{BTreeMap, HashMap, HashSet}, fmt::Debug, sync::Arc, @@ -413,7 +414,7 @@ impl Service { // Fail if a request has failed recently (exponential backoff) const MAX_DURATION: Duration = Duration::from_secs(60 * 60 * 24); let mut min_elapsed_duration = Duration::from_secs(self.timeout) * (*tries) * (*tries); - min_elapsed_duration = std::cmp::min(min_elapsed_duration, MAX_DURATION); + min_elapsed_duration = cmp::min(min_elapsed_duration, MAX_DURATION); if time.elapsed() < min_elapsed_duration { allow = false; } else { @@ -448,9 +449,6 @@ impl Service { .filter_map(Result::ok) .filter(|user_id| user_id.server_name() == services().globals.server_name()), ); - if !select_edus_presence(&room_id, since, &mut max_edu_count, &mut events)? { - break; - } if !select_edus_receipts(&room_id, since, &mut max_edu_count, &mut events)? { break; } @@ -472,29 +470,36 @@ impl Service { events.push(serde_json::to_vec(&edu).expect("json can be serialized")); } + if services().globals.allow_outgoing_presence() { + select_edus_presence(server_name, since, &mut max_edu_count, &mut events)?; + } + Ok((events, max_edu_count)) } } -/// Look for presence [in this room] <--- XXX -#[tracing::instrument(skip(room_id, since, max_edu_count, events))] +/// Look for presence +#[tracing::instrument(skip(server_name, since, max_edu_count, events))] pub fn select_edus_presence( - room_id: &RoomId, since: u64, max_edu_count: &mut u64, events: &mut Vec>, + server_name: &ServerName, since: u64, max_edu_count: &mut u64, events: &mut Vec>, ) -> Result { - if !services().globals.allow_outgoing_presence() { - return Ok(true); - } - - // Look for presence updates in this room + // Look for presence updates for this server let mut presence_updates = Vec::new(); - for (user_id, count, presence_event) in services().presence.presence_since(room_id, since) { - if count > *max_edu_count { - *max_edu_count = count; - } + for (user_id, count, presence_event) in services().presence.presence_since(since) { + *max_edu_count = cmp::max(count, *max_edu_count); + if user_id.server_name() != services().globals.server_name() { continue; } + if !services() + .rooms + .state_cache + .server_sees_user(server_name, &user_id)? + { + continue; + } + presence_updates.push(PresenceUpdate { user_id, presence: presence_event.content.presence, @@ -524,10 +529,8 @@ pub fn select_edus_receipts( .readreceipts_since(room_id, since) { let (user_id, count, read_receipt) = r?; + *max_edu_count = cmp::max(count, *max_edu_count); - if count > *max_edu_count { - *max_edu_count = count; - } if user_id.server_name() != services().globals.server_name() { continue; }