diff --git a/src/api/client_server/presence.rs b/src/api/client_server/presence.rs index e5cd1b8e..46cad8fe 100644 --- a/src/api/client_server/presence.rs +++ b/src/api/client_server/presence.rs @@ -1,4 +1,4 @@ -use crate::{services, utils, Error, Result, Ruma}; +use crate::{services, Error, Result, Ruma}; use ruma::api::client::{ error::ErrorKind, presence::{get_presence, set_presence}, @@ -12,28 +12,16 @@ pub async fn set_presence_route( body: Ruma, ) -> Result { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); - for room_id in services().rooms.state_cache.rooms_joined(sender_user) { let room_id = room_id?; - services().rooms.edus.presence.update_presence( - sender_user, + services().rooms.edus.presence.set_presence( &room_id, - ruma::events::presence::PresenceEvent { - content: ruma::events::presence::PresenceEventContent { - avatar_url: services().users.avatar_url(sender_user)?, - currently_active: None, - displayname: services().users.displayname(sender_user)?, - last_active_ago: Some( - utils::millis_since_unix_epoch() - .try_into() - .expect("time is valid"), - ), - presence: body.presence.clone(), - status_msg: body.status_msg.clone(), - }, - sender: sender_user.clone(), - }, + sender_user, + body.presence.clone(), + None, + None, + body.status_msg.clone(), )?; } @@ -63,7 +51,7 @@ pub async fn get_presence_route( .rooms .edus .presence - .get_last_presence_event(sender_user, &room_id)? + .get_presence(&room_id, sender_user)? { presence_event = Some(presence); break; diff --git a/src/api/client_server/profile.rs b/src/api/client_server/profile.rs index cf1db2d7..00d52eb0 100644 --- a/src/api/client_server/profile.rs +++ b/src/api/client_server/profile.rs @@ -1,4 +1,4 @@ -use crate::{service::pdu::PduBuilder, services, utils, Error, Result, Ruma}; +use crate::{service::pdu::PduBuilder, services, Error, Result, Ruma}; use ruma::{ api::{ client::{ @@ -10,6 +10,7 @@ use ruma::{ federation::{self, query::get_profile_information::v1::ProfileField}, }, events::{room::member::RoomMemberEventContent, StateEventType, TimelineEventType}, + presence::PresenceState, }; use serde_json::value::to_raw_value; use std::sync::Arc; @@ -89,29 +90,15 @@ pub async fn set_displayname_route( .timeline .build_and_append_pdu(pdu_builder, sender_user, &room_id, &state_lock) .await; - - // Presence update - services().rooms.edus.presence.update_presence( - sender_user, - &room_id, - ruma::events::presence::PresenceEvent { - content: ruma::events::presence::PresenceEventContent { - avatar_url: services().users.avatar_url(sender_user)?, - currently_active: None, - displayname: services().users.displayname(sender_user)?, - last_active_ago: Some( - utils::millis_since_unix_epoch() - .try_into() - .expect("time is valid"), - ), - presence: ruma::presence::PresenceState::Online, - status_msg: None, - }, - sender: sender_user.clone(), - }, - )?; } + // Presence update + services() + .rooms + .edus + .presence + .ping_presence(sender_user, PresenceState::Online)?; + Ok(set_display_name::v3::Response {}) } @@ -224,29 +211,15 @@ pub async fn set_avatar_url_route( .timeline .build_and_append_pdu(pdu_builder, sender_user, &room_id, &state_lock) .await; - - // Presence update - services().rooms.edus.presence.update_presence( - sender_user, - &room_id, - ruma::events::presence::PresenceEvent { - content: ruma::events::presence::PresenceEventContent { - avatar_url: services().users.avatar_url(sender_user)?, - currently_active: None, - displayname: services().users.displayname(sender_user)?, - last_active_ago: Some( - utils::millis_since_unix_epoch() - .try_into() - .expect("time is valid"), - ), - presence: ruma::presence::PresenceState::Online, - status_msg: None, - }, - sender: sender_user.clone(), - }, - )?; } + // Presence update + services() + .rooms + .edus + .presence + .ping_presence(sender_user, PresenceState::Online)?; + Ok(set_avatar_url::v3::Response {}) } diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs index e0c6e0b9..7b7224c3 100644 --- a/src/api/client_server/sync.rs +++ b/src/api/client_server/sync.rs @@ -174,8 +174,12 @@ async fn sync_helper( body: sync_events::v3::Request, // bool = caching allowed ) -> Result<(sync_events::v3::Response, bool), Error> { - // TODO: match body.set_presence { - services().rooms.edus.presence.ping_presence(&sender_user)?; + // Presence update + services() + .rooms + .edus + .presence + .ping_presence(&sender_user, body.set_presence)?; // Setup watchers, so if there's no response, we can wait for them let watcher = services().globals.watch(&sender_user, &sender_device); @@ -252,36 +256,36 @@ async fn sync_helper( } // Take presence updates from this room - for (user_id, presence) in services() + for presence_data in services() .rooms .edus .presence - .presence_since(&room_id, since)? + .presence_since(&room_id, since) { + let (user_id, _, presence_event) = presence_data?; + match presence_updates.entry(user_id) { - Entry::Vacant(v) => { - v.insert(presence); + Entry::Vacant(slot) => { + slot.insert(presence_event); } - Entry::Occupied(mut o) => { - let p = o.get_mut(); + Entry::Occupied(mut slot) => { + let curr_event = slot.get_mut(); + let curr_content = &mut curr_event.content; + let new_content = presence_event.content; // Update existing presence event with more info - p.content.presence = presence.content.presence; - if let Some(status_msg) = presence.content.status_msg { - p.content.status_msg = Some(status_msg); - } - if let Some(last_active_ago) = presence.content.last_active_ago { - p.content.last_active_ago = Some(last_active_ago); - } - if let Some(displayname) = presence.content.displayname { - p.content.displayname = Some(displayname); - } - if let Some(avatar_url) = presence.content.avatar_url { - p.content.avatar_url = Some(avatar_url); - } - if let Some(currently_active) = presence.content.currently_active { - p.content.currently_active = Some(currently_active); - } + curr_content.presence = new_content.presence; + curr_content.status_msg = + curr_content.status_msg.clone().or(new_content.status_msg); + curr_content.last_active_ago = + curr_content.last_active_ago.or(new_content.last_active_ago); + curr_content.displayname = + curr_content.displayname.clone().or(new_content.displayname); + curr_content.avatar_url = + curr_content.avatar_url.clone().or(new_content.avatar_url); + curr_content.currently_active = curr_content + .currently_active + .or(new_content.currently_active); } } } diff --git a/src/api/server_server.rs b/src/api/server_server.rs index 6ca352b8..178d0c99 100644 --- a/src/api/server_server.rs +++ b/src/api/server_server.rs @@ -777,7 +777,20 @@ pub async fn send_transaction_message_route( .filter_map(|edu| serde_json::from_str::(edu.json().get()).ok()) { match edu { - Edu::Presence(_) => {} + Edu::Presence(presence) => { + for update in presence.push { + for room_id in services().rooms.state_cache.rooms_joined(&update.user_id) { + services().rooms.edus.presence.set_presence( + &room_id?, + &update.user_id, + update.presence.clone(), + Some(update.currently_active), + Some(update.last_active_ago), + update.status_msg.clone(), + )?; + } + } + } Edu::Receipt(receipt) => { for (room_id, room_updates) in receipt.receipts { for (user_id, user_updates) in room_updates.read { diff --git a/src/config/mod.rs b/src/config/mod.rs index 652b3a4c..bef5ebf8 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -83,6 +83,13 @@ pub struct Config { pub emergency_password: Option, + #[serde(default = "false_fn")] + pub allow_presence: bool, + #[serde(default = "default_presence_idle_timeout_s")] + pub presence_idle_timeout_s: u64, + #[serde(default = "default_presence_offline_timeout_s")] + pub presence_offline_timeout_s: u64, + #[serde(flatten)] pub catchall: BTreeMap, } @@ -302,6 +309,14 @@ fn default_turn_ttl() -> u64 { 60 * 60 * 24 } +fn default_presence_idle_timeout_s() -> u64 { + 5 * 60 +} + +fn default_presence_offline_timeout_s() -> u64 { + 15 * 60 +} + // I know, it's a great name pub fn default_default_room_version() -> RoomVersionId { RoomVersionId::V10 diff --git a/src/database/key_value/rooms/edus/presence.rs b/src/database/key_value/rooms/edus/presence.rs index 904b1c44..a2c8ee44 100644 --- a/src/database/key_value/rooms/edus/presence.rs +++ b/src/database/key_value/rooms/edus/presence.rs @@ -1,152 +1,190 @@ -use std::collections::HashMap; +use std::{iter, time::Duration}; use ruma::{ events::presence::PresenceEvent, presence::PresenceState, OwnedUserId, RoomId, UInt, UserId, }; -use crate::{database::KeyValueDatabase, service, services, utils, Error, Result}; +use crate::{ + database::KeyValueDatabase, + service::{self, rooms::edus::presence::Presence}, + services, + utils::{self, user_id_from_bytes}, + Error, Result, +}; impl service::rooms::edus::presence::Data for KeyValueDatabase { - fn update_presence( - &self, - user_id: &UserId, - room_id: &RoomId, - presence: PresenceEvent, - ) -> Result<()> { - // TODO: Remove old entry? Or maybe just wipe completely from time to time? + fn get_presence(&self, room_id: &RoomId, user_id: &UserId) -> Result> { + if !services().globals.config.allow_presence { + return Ok(None); + } - let count = services().globals.next_count()?.to_be_bytes(); + let key = presence_key(room_id, user_id); - let mut presence_id = room_id.as_bytes().to_vec(); - presence_id.push(0xff); - presence_id.extend_from_slice(&count); - presence_id.push(0xff); - presence_id.extend_from_slice(presence.sender.as_bytes()); - - self.presenceid_presence.insert( - &presence_id, - &serde_json::to_vec(&presence).expect("PresenceEvent can be serialized"), - )?; - - self.userid_lastpresenceupdate.insert( - user_id.as_bytes(), - &utils::millis_since_unix_epoch().to_be_bytes(), - )?; - - Ok(()) - } - - fn ping_presence(&self, user_id: &UserId) -> Result<()> { - self.userid_lastpresenceupdate.insert( - user_id.as_bytes(), - &utils::millis_since_unix_epoch().to_be_bytes(), - )?; - - Ok(()) - } - - fn last_presence_update(&self, user_id: &UserId) -> Result> { - self.userid_lastpresenceupdate - .get(user_id.as_bytes())? - .map(|bytes| { - utils::u64_from_bytes(&bytes).map_err(|_| { - Error::bad_database("Invalid timestamp in userid_lastpresenceupdate.") - }) + self.roomuserid_presence + .get(&key)? + .map(|presence_bytes| -> Result { + Presence::from_json_bytes(&presence_bytes)?.to_presence_event(user_id) }) .transpose() } - fn get_presence_event( + fn ping_presence(&self, user_id: &UserId, new_state: PresenceState) -> Result<()> { + if !services().globals.config.allow_presence { + return Ok(()); + } + + let now = utils::millis_since_unix_epoch(); + let mut state_changed = false; + + for room_id in services().rooms.state_cache.rooms_joined(user_id) { + let key = presence_key(&room_id?, user_id); + + let presence_bytes = self.roomuserid_presence.get(&key)?; + + if let Some(presence_bytes) = presence_bytes { + let presence = Presence::from_json_bytes(&presence_bytes)?; + if presence.state != new_state { + state_changed = true; + break; + } + } + } + + let count = if state_changed { + services().globals.next_count()? + } else { + services().globals.current_count()? + }; + + for room_id in services().rooms.state_cache.rooms_joined(user_id) { + let key = presence_key(&room_id?, user_id); + + let presence_bytes = self.roomuserid_presence.get(&key)?; + + let new_presence = match presence_bytes { + Some(presence_bytes) => { + let mut presence = Presence::from_json_bytes(&presence_bytes)?; + presence.state = new_state.clone(); + presence.currently_active = presence.state == PresenceState::Online; + presence.last_active_ts = now; + presence.last_count = count; + + presence + } + None => Presence::new( + new_state.clone(), + new_state == PresenceState::Online, + now, + count, + None, + ), + }; + + self.roomuserid_presence + .insert(&key, &new_presence.to_json_bytes()?)?; + } + + let timeout = match new_state { + PresenceState::Online => services().globals.config.presence_idle_timeout_s, + _ => services().globals.config.presence_offline_timeout_s, + }; + + self.presence_timer_sender + .send((user_id.to_owned(), Duration::from_secs(timeout))) + .map_err(|_| Error::bad_database("Failed to add presence timer")) + } + + fn set_presence( &self, room_id: &RoomId, user_id: &UserId, - count: u64, - ) -> Result> { - let mut presence_id = room_id.as_bytes().to_vec(); - presence_id.push(0xff); - presence_id.extend_from_slice(&count.to_be_bytes()); - presence_id.push(0xff); - presence_id.extend_from_slice(user_id.as_bytes()); - - self.presenceid_presence - .get(&presence_id)? - .map(|value| parse_presence_event(&value)) - .transpose() - } - - fn presence_since( - &self, - room_id: &RoomId, - since: u64, - ) -> Result> { - let mut prefix = room_id.as_bytes().to_vec(); - prefix.push(0xff); - - let mut first_possible_edu = prefix.clone(); - first_possible_edu.extend_from_slice(&(since + 1).to_be_bytes()); // +1 so we don't send the event at since - let mut hashmap = HashMap::new(); - - for (key, value) in self - .presenceid_presence - .iter_from(&first_possible_edu, false) - .take_while(|(key, _)| key.starts_with(&prefix)) - { - let user_id = UserId::parse( - utils::string_from_bytes( - key.rsplit(|&b| b == 0xff) - .next() - .expect("rsplit always returns an element"), - ) - .map_err(|_| Error::bad_database("Invalid UserId bytes in presenceid_presence."))?, - ) - .map_err(|_| Error::bad_database("Invalid UserId in presenceid_presence."))?; - - let presence = parse_presence_event(&value)?; - - hashmap.insert(user_id, presence); + presence_state: PresenceState, + currently_active: Option, + last_active_ago: Option, + status_msg: Option, + ) -> Result<()> { + if !services().globals.config.allow_presence { + return Ok(()); } - Ok(hashmap) + let now = utils::millis_since_unix_epoch(); + let last_active_ts = match last_active_ago { + Some(last_active_ago) => now.saturating_sub(last_active_ago.into()), + None => now, + }; + + let key = presence_key(room_id, user_id); + + let presence = Presence::new( + presence_state, + currently_active.unwrap_or(false), + last_active_ts, + services().globals.next_count()?, + status_msg, + ); + + let timeout = match presence.state { + PresenceState::Online => services().globals.config.presence_idle_timeout_s, + _ => services().globals.config.presence_offline_timeout_s, + }; + + self.presence_timer_sender + .send((user_id.to_owned(), Duration::from_secs(timeout))) + .map_err(|_| Error::bad_database("Failed to add presence timer"))?; + + self.roomuserid_presence + .insert(&key, &presence.to_json_bytes()?)?; + + Ok(()) } - /* - fn presence_maintain(&self, db: Arc>) { - // TODO @M0dEx: move this to a timed tasks module - tokio::spawn(async move { - loop { - select! { - Some(user_id) = self.presence_timers.next() { - // TODO @M0dEx: would it be better to acquire the lock outside the loop? - let guard = db.read().await; + fn remove_presence(&self, user_id: &UserId) -> Result<()> { + for room_id in services().rooms.state_cache.rooms_joined(user_id) { + let key = presence_key(&room_id?, user_id); - // TODO @M0dEx: add self.presence_timers - // TODO @M0dEx: maintain presence - } - } - } - }); + self.roomuserid_presence.remove(&key)?; + } + + Ok(()) + } + + fn presence_since<'a>( + &'a self, + room_id: &RoomId, + since: u64, + ) -> Box> + 'a> { + if !services().globals.config.allow_presence { + return Box::new(iter::empty()); + } + + let prefix = [room_id.as_bytes(), &[0xff]].concat(); + + Box::new( + self.roomuserid_presence + .scan_prefix(prefix) + .map( + |(key, presence_bytes)| -> Result<(OwnedUserId, u64, PresenceEvent)> { + let user_id = user_id_from_bytes( + key.rsplit(|byte| *byte == 0xff).next().ok_or_else(|| { + Error::bad_database("No UserID bytes in presence key") + })?, + )?; + + let presence = Presence::from_json_bytes(&presence_bytes)?; + let presence_event = presence.to_presence_event(&user_id)?; + + Ok((user_id, presence.last_count, presence_event)) + }, + ) + .filter(move |presence_data| match presence_data { + Ok((_, count, _)) => *count > since, + Err(_) => false, + }), + ) } - */ } -fn parse_presence_event(bytes: &[u8]) -> Result { - let mut presence: PresenceEvent = serde_json::from_slice(bytes) - .map_err(|_| Error::bad_database("Invalid presence event in db."))?; - - let current_timestamp: UInt = utils::millis_since_unix_epoch() - .try_into() - .expect("time is valid"); - - if presence.content.presence == PresenceState::Online { - // Don't set last_active_ago when the user is online - presence.content.last_active_ago = None; - } else { - // Convert from timestamp to duration - presence.content.last_active_ago = presence - .content - .last_active_ago - .map(|timestamp| current_timestamp - timestamp); - } - - Ok(presence) +#[inline] +fn presence_key(room_id: &RoomId, user_id: &UserId) -> Vec { + [room_id.as_bytes(), &[0xff], user_id.as_bytes()].concat() } diff --git a/src/database/mod.rs b/src/database/mod.rs index 8d1b1913..7d7dad7c 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -2,8 +2,8 @@ pub mod abstraction; pub mod key_value; use crate::{ - service::rooms::timeline::PduCount, services, utils, Config, Error, PduEvent, Result, Services, - SERVICES, + service::rooms::{edus::presence::presence_handler, timeline::PduCount}, + services, utils, Config, Error, PduEvent, Result, Services, SERVICES, }; use abstraction::{KeyValueDatabaseEngine, KvTree}; use directories::ProjectDirs; @@ -29,7 +29,7 @@ use std::{ sync::{Arc, Mutex, RwLock}, time::Duration, }; -use tokio::time::interval; +use tokio::{sync::mpsc, time::interval}; use tracing::{debug, error, info, warn}; @@ -71,8 +71,7 @@ pub struct KeyValueDatabase { pub(super) readreceiptid_readreceipt: Arc, // ReadReceiptId = RoomId + Count + UserId pub(super) roomuserid_privateread: Arc, // RoomUserId = Room + User, PrivateRead = Count pub(super) roomuserid_lastprivatereadupdate: Arc, // LastPrivateReadUpdate = Count - pub(super) presenceid_presence: Arc, // PresenceId = RoomId + Count + UserId - pub(super) userid_lastpresenceupdate: Arc, // LastPresenceUpdate = Count + pub(super) roomuserid_presence: Arc, //pub rooms: rooms::Rooms, pub(super) pduid_pdu: Arc, // PduId = ShortRoomId + Count @@ -170,6 +169,7 @@ pub struct KeyValueDatabase { pub(super) our_real_users_cache: RwLock>>>, pub(super) appservice_in_room_cache: RwLock>>, pub(super) lasttimelinecount_cache: Mutex>, + pub(super) presence_timer_sender: Arc>, } impl KeyValueDatabase { @@ -273,6 +273,8 @@ impl KeyValueDatabase { error!(?config.max_request_size, "Max request size is less than 1KB. Please increase it."); } + let (presence_sender, presence_receiver) = mpsc::unbounded_channel(); + let db_raw = Box::new(Self { _db: builder.clone(), userid_password: builder.open_tree("userid_password")?, @@ -299,8 +301,7 @@ impl KeyValueDatabase { roomuserid_privateread: builder.open_tree("roomuserid_privateread")?, // "Private" read receipt roomuserid_lastprivatereadupdate: builder .open_tree("roomuserid_lastprivatereadupdate")?, - presenceid_presence: builder.open_tree("presenceid_presence")?, - userid_lastpresenceupdate: builder.open_tree("userid_lastpresenceupdate")?, + roomuserid_presence: builder.open_tree("roomuserid_presence")?, pduid_pdu: builder.open_tree("pduid_pdu")?, eventid_pduid: builder.open_tree("eventid_pduid")?, roomid_pduleaves: builder.open_tree("roomid_pduleaves")?, @@ -392,6 +393,7 @@ impl KeyValueDatabase { our_real_users_cache: RwLock::new(HashMap::new()), appservice_in_room_cache: RwLock::new(HashMap::new()), lasttimelinecount_cache: Mutex::new(HashMap::new()), + presence_timer_sender: Arc::new(presence_sender), }); let db = Box::leak(db_raw); @@ -962,9 +964,6 @@ impl KeyValueDatabase { ); } - // This data is probably outdated - db.presenceid_presence.clear()?; - services().admin.start_handler(); // Set emergency access for the conduit user @@ -989,6 +988,9 @@ impl KeyValueDatabase { if services().globals.allow_check_for_updates() { Self::start_check_for_updates_task(); } + if services().globals.config.allow_presence { + Self::start_presence_handler(presence_receiver).await; + } Ok(()) } @@ -1062,8 +1064,7 @@ impl KeyValueDatabase { pub async fn start_cleanup_task() { #[cfg(unix)] use tokio::signal::unix::{signal, SignalKind}; - - use std::time::{Duration, Instant}; + use tokio::time::Instant; let timer_interval = Duration::from_secs(services().globals.config.cleanup_second_interval as u64); @@ -1098,6 +1099,17 @@ impl KeyValueDatabase { } }); } + + pub async fn start_presence_handler( + presence_timer_receiver: mpsc::UnboundedReceiver<(OwnedUserId, Duration)>, + ) { + tokio::spawn(async move { + match presence_handler(presence_timer_receiver).await { + Ok(()) => warn!("Presence maintenance task finished"), + Err(e) => error!("Presence maintenance task finished with error: {e}"), + } + }); + } } /// Sets the emergency password and push rules for the @conduit account in case emergency password is set diff --git a/src/service/globals/mod.rs b/src/service/globals/mod.rs index 2373a270..1f4fc382 100644 --- a/src/service/globals/mod.rs +++ b/src/service/globals/mod.rs @@ -353,6 +353,18 @@ impl Service { &self.config.emergency_password } + pub fn allow_presence(&self) -> bool { + self.config.allow_presence + } + + pub fn presence_idle_timeout_s(&self) -> u64 { + self.config.presence_idle_timeout_s + } + + pub fn presence_offline_timeout_s(&self) -> u64 { + self.config.presence_offline_timeout_s + } + pub fn supported_room_versions(&self) -> Vec { let mut room_versions: Vec = vec![]; room_versions.extend(self.stable_room_versions.clone()); diff --git a/src/service/rooms/edus/presence/data.rs b/src/service/rooms/edus/presence/data.rs index 53329e08..cfac45aa 100644 --- a/src/service/rooms/edus/presence/data.rs +++ b/src/service/rooms/edus/presence/data.rs @@ -1,38 +1,33 @@ -use std::collections::HashMap; - use crate::Result; -use ruma::{events::presence::PresenceEvent, OwnedUserId, RoomId, UserId}; +use ruma::{ + events::presence::PresenceEvent, presence::PresenceState, OwnedUserId, RoomId, UInt, UserId, +}; pub trait Data: Send + Sync { + /// Returns the latest presence event for the given user in the given room. + fn get_presence(&self, room_id: &RoomId, user_id: &UserId) -> Result>; + + /// Pings the presence of the given user in the given room, setting the specified state. + fn ping_presence(&self, user_id: &UserId, new_state: PresenceState) -> Result<()>; + /// Adds a presence event which will be saved until a new event replaces it. - /// - /// Note: This method takes a RoomId because presence updates are always bound to rooms to - /// make sure users outside these rooms can't see them. - fn update_presence( + fn set_presence( &self, - user_id: &UserId, room_id: &RoomId, - presence: PresenceEvent, + user_id: &UserId, + presence_state: PresenceState, + currently_active: Option, + last_active_ago: Option, + status_msg: Option, ) -> Result<()>; - /// Resets the presence timeout, so the user will stay in their current presence state. - fn ping_presence(&self, user_id: &UserId) -> Result<()>; - - /// Returns the timestamp of the last presence update of this user in millis since the unix epoch. - fn last_presence_update(&self, user_id: &UserId) -> Result>; - - /// Returns the presence event with correct last_active_ago. - fn get_presence_event( - &self, - room_id: &RoomId, - user_id: &UserId, - count: u64, - ) -> Result>; + /// Removes the presence record for the given user from the database. + fn remove_presence(&self, user_id: &UserId) -> Result<()>; /// Returns the most recent presence updates that happened after the event with id `since`. - fn presence_since( - &self, + fn presence_since<'a>( + &'a self, room_id: &RoomId, since: u64, - ) -> Result>; + ) -> Box> + 'a>; } diff --git a/src/service/rooms/edus/presence/mod.rs b/src/service/rooms/edus/presence/mod.rs index 4b929d28..6a09e08e 100644 --- a/src/service/rooms/edus/presence/mod.rs +++ b/src/service/rooms/edus/presence/mod.rs @@ -1,125 +1,211 @@ mod data; -use std::collections::HashMap; + +use std::time::Duration; pub use data::Data; -use ruma::{events::presence::PresenceEvent, OwnedUserId, RoomId, UserId}; +use futures_util::{stream::FuturesUnordered, StreamExt}; +use ruma::{ + events::presence::{PresenceEvent, PresenceEventContent}, + presence::PresenceState, + OwnedUserId, RoomId, UInt, UserId, +}; +use serde::{Deserialize, Serialize}; +use tokio::{sync::mpsc, time::sleep}; +use tracing::debug; -use crate::Result; +use crate::{services, utils, Error, Result}; + +/// Represents data required to be kept in order to implement the presence specification. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct Presence { + pub state: PresenceState, + pub currently_active: bool, + pub last_active_ts: u64, + pub last_count: u64, + pub status_msg: Option, +} + +impl Presence { + pub fn new( + state: PresenceState, + currently_active: bool, + last_active_ts: u64, + last_count: u64, + status_msg: Option, + ) -> Self { + Self { + state, + currently_active, + last_active_ts, + last_count, + status_msg, + } + } + + pub fn from_json_bytes(bytes: &[u8]) -> Result { + serde_json::from_slice(bytes) + .map_err(|_| Error::bad_database("Invalid presence data in database")) + } + + pub fn to_json_bytes(&self) -> Result> { + serde_json::to_vec(self) + .map_err(|_| Error::bad_database("Could not serialize Presence to JSON")) + } + + /// Creates a PresenceEvent from available data. + pub fn to_presence_event(&self, user_id: &UserId) -> Result { + let now = utils::millis_since_unix_epoch(); + let last_active_ago = if self.currently_active { + None + } else { + Some(UInt::new_saturating( + now.saturating_sub(self.last_active_ts), + )) + }; + + Ok(PresenceEvent { + sender: user_id.to_owned(), + content: PresenceEventContent { + presence: self.state.clone(), + status_msg: self.status_msg.clone(), + currently_active: Some(self.currently_active), + last_active_ago, + displayname: services().users.displayname(user_id)?, + avatar_url: services().users.avatar_url(user_id)?, + }, + }) + } +} pub struct Service { pub db: &'static dyn Data, } impl Service { - /// Adds a presence event which will be saved until a new event replaces it. - /// - /// Note: This method takes a RoomId because presence updates are always bound to rooms to - /// make sure users outside these rooms can't see them. - pub fn update_presence( + /// Returns the latest presence event for the given user in the given room. + pub fn get_presence( &self, - _user_id: &UserId, - _room_id: &RoomId, - _presence: PresenceEvent, - ) -> Result<()> { - // self.db.update_presence(user_id, room_id, presence) - Ok(()) - } - - /// Resets the presence timeout, so the user will stay in their current presence state. - pub fn ping_presence(&self, _user_id: &UserId) -> Result<()> { - // self.db.ping_presence(user_id) - Ok(()) - } - - pub fn get_last_presence_event( - &self, - _user_id: &UserId, - _room_id: &RoomId, + room_id: &RoomId, + user_id: &UserId, ) -> Result> { - // let last_update = match self.db.last_presence_update(user_id)? { - // Some(last) => last, - // None => return Ok(None), - // }; - - // self.db.get_presence_event(room_id, user_id, last_update) - Ok(None) + self.db.get_presence(room_id, user_id) } - /* TODO - /// Sets all users to offline who have been quiet for too long. - fn _presence_maintain( + /// Pings the presence of the given user in the given room, setting the specified state. + pub fn ping_presence(&self, user_id: &UserId, new_state: PresenceState) -> Result<()> { + self.db.ping_presence(user_id, new_state) + } + + /// Adds a presence event which will be saved until a new event replaces it. + pub fn set_presence( &self, - rooms: &super::Rooms, - globals: &super::super::globals::Globals, + room_id: &RoomId, + user_id: &UserId, + presence_state: PresenceState, + currently_active: Option, + last_active_ago: Option, + status_msg: Option, ) -> Result<()> { - let current_timestamp = utils::millis_since_unix_epoch(); + self.db.set_presence( + room_id, + user_id, + presence_state, + currently_active, + last_active_ago, + status_msg, + ) + } - for (user_id_bytes, last_timestamp) in self - .userid_lastpresenceupdate - .iter() - .filter_map(|(k, bytes)| { - Some(( - k, - utils::u64_from_bytes(&bytes) - .map_err(|_| { - Error::bad_database("Invalid timestamp in userid_lastpresenceupdate.") - }) - .ok()?, - )) - }) - .take_while(|(_, timestamp)| current_timestamp.saturating_sub(*timestamp) > 5 * 60_000) - // 5 Minutes - { - // Send new presence events to set the user offline - let count = globals.next_count()?.to_be_bytes(); - let user_id: Box<_> = utils::string_from_bytes(&user_id_bytes) - .map_err(|_| { - Error::bad_database("Invalid UserId bytes in userid_lastpresenceupdate.") - })? - .try_into() - .map_err(|_| Error::bad_database("Invalid UserId in userid_lastpresenceupdate."))?; - for room_id in rooms.rooms_joined(&user_id).filter_map(|r| r.ok()) { - let mut presence_id = room_id.as_bytes().to_vec(); - presence_id.push(0xff); - presence_id.extend_from_slice(&count); - presence_id.push(0xff); - presence_id.extend_from_slice(&user_id_bytes); - - self.presenceid_presence.insert( - &presence_id, - &serde_json::to_vec(&PresenceEvent { - content: PresenceEventContent { - avatar_url: None, - currently_active: None, - displayname: None, - last_active_ago: Some( - last_timestamp.try_into().expect("time is valid"), - ), - presence: PresenceState::Offline, - status_msg: None, - }, - sender: user_id.to_owned(), - }) - .expect("PresenceEvent can be serialized"), - )?; - } - - self.userid_lastpresenceupdate.insert( - user_id.as_bytes(), - &utils::millis_since_unix_epoch().to_be_bytes(), - )?; - } - - Ok(()) - }*/ + /// Removes the presence record for the given user from the database. + pub fn remove_presence(&self, user_id: &UserId) -> Result<()> { + self.db.remove_presence(user_id) + } /// Returns the most recent presence updates that happened after the event with id `since`. - pub fn presence_since( - &self, - _room_id: &RoomId, - _since: u64, - ) -> Result> { - // self.db.presence_since(room_id, since) - Ok(HashMap::new()) + pub fn presence_since<'a>( + &'a self, + room_id: &RoomId, + since: u64, + ) -> Box> + 'a> { + self.db.presence_since(room_id, since) } } + +pub async fn presence_handler( + mut presence_timer_receiver: mpsc::UnboundedReceiver<(OwnedUserId, Duration)>, +) -> Result<()> { + let mut presence_timers = FuturesUnordered::new(); + + loop { + debug!("Number of presence timers: {}", presence_timers.len()); + + tokio::select! { + Some((user_id, timeout)) = presence_timer_receiver.recv() => { + debug!("Adding timer for user '{user_id}': Timeout {timeout:?}"); + presence_timers.push(presence_timer(user_id, timeout)); + } + + Some(user_id) = presence_timers.next() => { + process_presence_timer(user_id)?; + } + } + } +} + +async fn presence_timer(user_id: OwnedUserId, timeout: Duration) -> OwnedUserId { + sleep(timeout).await; + + user_id +} + +fn process_presence_timer(user_id: OwnedUserId) -> Result<()> { + let idle_timeout = services().globals.config.presence_idle_timeout_s * 1_000; + let offline_timeout = services().globals.config.presence_offline_timeout_s * 1_000; + + let mut presence_state = PresenceState::Offline; + let mut last_active_ago = None; + let mut status_msg = None; + + for room_id in services().rooms.state_cache.rooms_joined(&user_id) { + let presence_event = services() + .rooms + .edus + .presence + .get_presence(&room_id?, &user_id)?; + + if let Some(presence_event) = presence_event { + presence_state = presence_event.content.presence; + last_active_ago = presence_event.content.last_active_ago; + status_msg = presence_event.content.status_msg; + + break; + } + } + + let new_state = match (&presence_state, last_active_ago.map(u64::from)) { + (PresenceState::Online, Some(ago)) if ago >= idle_timeout => { + Some(PresenceState::Unavailable) + } + (PresenceState::Unavailable, Some(ago)) if ago >= offline_timeout => { + Some(PresenceState::Offline) + } + _ => None, + }; + + debug!("Processed presence timer for user '{user_id}': Old state = {presence_state}, New state = {new_state:?}"); + + if let Some(new_state) = new_state { + for room_id in services().rooms.state_cache.rooms_joined(&user_id) { + services().rooms.edus.presence.set_presence( + &room_id?, + &user_id, + new_state.clone(), + Some(false), + last_active_ago, + status_msg.clone(), + )?; + } + } + + Ok(()) +} diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index fa14f123..eb8fe849 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -26,7 +26,8 @@ use ruma::{ federation::{ self, transactions::edu::{ - DeviceListUpdateContent, Edu, ReceiptContent, ReceiptData, ReceiptMap, + DeviceListUpdateContent, Edu, PresenceContent, PresenceUpdate, ReceiptContent, + ReceiptData, ReceiptMap, }, }, OutgoingRequest, @@ -285,6 +286,39 @@ impl Service { .filter(|user_id| user_id.server_name() == services().globals.server_name()), ); + // Look for presence updates in this room + let mut presence_updates = Vec::new(); + + for presence_data in services() + .rooms + .edus + .presence + .presence_since(&room_id, since) + { + let (user_id, count, presence_event) = presence_data?; + + if count > max_edu_count { + max_edu_count = count; + } + + if user_id.server_name() != services().globals.server_name() { + continue; + } + + presence_updates.push(PresenceUpdate { + user_id, + presence: presence_event.content.presence, + currently_active: presence_event.content.currently_active.unwrap_or(false), + last_active_ago: presence_event.content.last_active_ago.unwrap_or(uint!(0)), + status_msg: presence_event.content.status_msg, + }); + } + + let presence_content = Edu::Presence(PresenceContent::new(presence_updates)); + events.push( + serde_json::to_vec(&presence_content).expect("PresenceEvent can be serialized"), + ); + // Look for read receipts in this room for r in services() .rooms diff --git a/src/utils/mod.rs b/src/utils/mod.rs index d09a1033..6c80e2bf 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,12 +1,15 @@ pub mod error; +use crate::{Error, Result}; use argon2::{Config, Variant}; -use cmp::Ordering; use rand::prelude::*; use ring::digest; -use ruma::{canonical_json::try_from_json_map, CanonicalJsonError, CanonicalJsonObject}; +use ruma::{ + canonical_json::try_from_json_map, CanonicalJsonError, CanonicalJsonObject, OwnedUserId, +}; use std::{ - cmp, fmt, + cmp::Ordering, + fmt, str::FromStr, time::{SystemTime, UNIX_EPOCH}, }; @@ -51,6 +54,15 @@ pub fn string_from_bytes(bytes: &[u8]) -> Result Result { + OwnedUserId::try_from( + string_from_bytes(bytes) + .map_err(|_| Error::bad_database("Failed to parse string from bytes"))?, + ) + .map_err(|_| Error::bad_database("Failed to parse user id from bytes")) +} + pub fn random_string(length: usize) -> String { thread_rng() .sample_iter(&rand::distributions::Alphanumeric)