diff --git a/src/api/client_server/presence.rs b/src/api/client_server/presence.rs index dfac3dbd..7afda962 100644 --- a/src/api/client_server/presence.rs +++ b/src/api/client_server/presence.rs @@ -60,7 +60,7 @@ pub async fn get_presence_route( .rooms .edus .presence - .get_last_presence_event(sender_user, &room_id)? + .get_presence_event(sender_user, &room_id)? { presence_event = Some(presence); break; diff --git a/src/api/server_server.rs b/src/api/server_server.rs index b7f88078..9154b3ef 100644 --- a/src/api/server_server.rs +++ b/src/api/server_server.rs @@ -33,6 +33,7 @@ use ruma::{ }, directory::{IncomingFilter, IncomingRoomNetwork}, events::{ + presence::{PresenceEvent, PresenceEventContent}, receipt::{ReceiptEvent, ReceiptEventContent, ReceiptType}, room::{ join_rules::{JoinRule, RoomJoinRulesEventContent}, @@ -746,7 +747,33 @@ pub async fn send_transaction_message_route( .filter_map(|edu| serde_json::from_str::(edu.json().get()).ok()) { match edu { - Edu::Presence(_) => {} + Edu::Presence(presence) => { + for presence_update in presence.push { + let user_id = presence_update.user_id; + for room_id in services() + .rooms + .state_cache + .rooms_joined(&user_id) + .filter_map(|room_id| room_id.ok()) + { + services().rooms.edus.presence.update_presence( + &user_id, + &room_id, + PresenceEvent { + content: PresenceEventContent { + avatar_url: services().users.avatar_url(&user_id)?, + currently_active: Some(presence_update.currently_active), + displayname: services().users.displayname(&user_id)?, + last_active_ago: Some(presence_update.last_active_ago), + presence: presence_update.presence.clone(), + status_msg: presence_update.status_msg.clone(), + }, + sender: user_id.clone(), + }, + )?; + } + } + } Edu::Receipt(receipt) => { for (room_id, room_updates) in receipt.receipts { for (user_id, user_updates) in room_updates.read { diff --git a/src/config/mod.rs b/src/config/mod.rs index 6b862bb6..78724ab2 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -76,6 +76,11 @@ pub struct Config { pub emergency_password: Option, + #[serde(default = "default_presence_idle_timeout")] + pub presence_idle_timeout: u64, + #[serde(default = "default_presence_offline_timeout")] + pub presence_offline_timeout: u64, + #[serde(flatten)] pub catchall: BTreeMap, } @@ -257,6 +262,14 @@ fn default_turn_ttl() -> u64 { 60 * 60 * 24 } +fn default_presence_idle_timeout() -> u64 { + 1 * 60 as u64 +} + +fn default_presence_offline_timeout() -> u64 { + 15 * 60 as u64 +} + // I know, it's a great name pub fn default_default_room_version() -> RoomVersionId { RoomVersionId::V9 diff --git a/src/database/key_value/rooms/edus/presence.rs b/src/database/key_value/rooms/edus/presence.rs index 763572ec..dae6f759 100644 --- a/src/database/key_value/rooms/edus/presence.rs +++ b/src/database/key_value/rooms/edus/presence.rs @@ -1,5 +1,7 @@ use futures_util::{stream::FuturesUnordered, StreamExt}; +use ruma::user_id; use std::{collections::HashMap, time::Duration}; +use tracing::error; use ruma::{ events::presence::PresenceEvent, presence::PresenceState, OwnedUserId, RoomId, UInt, UserId, @@ -7,9 +9,11 @@ use ruma::{ use tokio::{sync::mpsc, time::sleep}; use crate::{ - database::KeyValueDatabase, service, services, utils, utils::u64_from_bytes, Error, Result, + database::KeyValueDatabase, + service, services, utils, + utils::{millis_since_unix_epoch, u64_from_bytes}, + Error, Result, }; -use crate::utils::millis_since_unix_epoch; pub struct PresenceUpdate { count: u64, @@ -17,15 +21,15 @@ pub struct PresenceUpdate { } impl PresenceUpdate { - fn to_be_bytes(&self) -> &[u8] { - &*([self.count.to_be_bytes(), self.timestamp.to_be_bytes()].concat()) + fn to_be_bytes(&self) -> Vec { + [self.count.to_be_bytes(), self.timestamp.to_be_bytes()].concat() } fn from_be_bytes(bytes: &[u8]) -> Result { let (count_bytes, timestamp_bytes) = bytes.split_at(bytes.len() / 2); Ok(Self { - count: u64_from_bytes(count_bytes)?, - timestamp: u64_from_bytes(timestamp_bytes)?, + count: u64_from_bytes(count_bytes).expect("count bytes from DB are valid"), + timestamp: u64_from_bytes(timestamp_bytes).expect("timestamp bytes from DB are valid"), }) } } @@ -37,19 +41,23 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase { room_id: &RoomId, presence: PresenceEvent, ) -> Result<()> { - let mut roomuser_id = [room_id.as_bytes(), 0xff, user_id.as_bytes()].concat(); + let roomuser_id = [room_id.as_bytes(), &[0xff], user_id.as_bytes()].concat(); self.roomuserid_presenceevent.insert( &roomuser_id, - &serde_json::to_vec(&presence)?, + &serde_json::to_vec(&presence).expect("presence event from DB is valid"), )?; self.userid_presenceupdate.insert( user_id.as_bytes(), - PresenceUpdate { + &*PresenceUpdate { count: services().globals.next_count()?, - timestamp: millis_since_unix_epoch(), - }.to_be_bytes(), + timestamp: match presence.content.last_active_ago { + Some(active_ago) => millis_since_unix_epoch().saturating_sub(active_ago.into()), + None => millis_since_unix_epoch(), + }, + } + .to_be_bytes(), )?; Ok(()) @@ -58,10 +66,11 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase { fn ping_presence(&self, user_id: &UserId) -> Result<()> { self.userid_presenceupdate.insert( user_id.as_bytes(), - PresenceUpdate { + &*PresenceUpdate { count: services().globals.current_count()?, timestamp: millis_since_unix_epoch(), - }.to_be_bytes() + } + .to_be_bytes(), )?; Ok(()) @@ -70,9 +79,7 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase { fn last_presence_update(&self, user_id: &UserId) -> Result> { self.userid_presenceupdate .get(user_id.as_bytes())? - .map(|bytes| { - PresenceUpdate::from_be_bytes(bytes)?.timestamp - }) + .map(|bytes| PresenceUpdate::from_be_bytes(&bytes).map(|update| update.timestamp)) .transpose() } @@ -80,57 +87,131 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase { &self, room_id: &RoomId, user_id: &UserId, - presence_timestamp: u64 + presence_timestamp: u64, ) -> Result> { - let mut roomuser_id = [room_id.as_bytes(), 0xff, user_id.as_bytes()].concat(); + let roomuser_id = [room_id.as_bytes(), &[0xff], user_id.as_bytes()].concat(); self.roomuserid_presenceevent .get(&roomuser_id)? .map(|value| parse_presence_event(&value, presence_timestamp)) .transpose() } - fn presence_since( - &self, + fn presence_since<'a>( + &'a self, room_id: &RoomId, since: u64, - ) -> Result>> { + ) -> Result + 'a>> { let services = &services(); - let mut user_timestamp: HashMap = self.userid_presenceupdate + let user_timestamp: HashMap = self + .userid_presenceupdate .iter() - .map(|(user_id_bytes, update_bytes)| (UserId::parse(utils::string_from_bytes(user_id_bytes)), PresenceUpdate::from_be_bytes(update_bytes)?)) + .filter_map(|(user_id_bytes, update_bytes)| { + Some(( + OwnedUserId::from( + UserId::parse(utils::string_from_bytes(&user_id_bytes).ok()?).ok()?, + ), + PresenceUpdate::from_be_bytes(&update_bytes).ok()?, + )) + }) .filter_map(|(user_id, presence_update)| { - if presence_update.count <= since || !services.rooms.state_cache.is_joined(user_id, room_id)? { - return None + if presence_update.count <= since + || !services + .rooms + .state_cache + .is_joined(&user_id, room_id) + .ok()? + { + return None; } Some((user_id, presence_update.timestamp)) }) .collect(); - Ok( + Ok(Box::new( self.roomuserid_presenceevent .iter() - .filter_map(|user_id_bytes, presence_bytes| (UserId::parse(utils::string_from_bytes(user_id_bytes)), presence_bytes)) - .filter_map(|user_id, presence_bytes| { - let timestamp = user_timestamp.get(user_id)?; - - Some((user_id, parse_presence_event(presence_bytes, *timestamp)?)) + .filter_map(|(user_id_bytes, presence_bytes)| { + Some(( + OwnedUserId::from( + UserId::parse(utils::string_from_bytes(&user_id_bytes).ok()?).ok()?, + ), + presence_bytes, + )) }) - .into_iter() - ) + .filter_map( + move |(user_id, presence_bytes)| -> Option<(OwnedUserId, PresenceEvent)> { + let timestamp = user_timestamp.get(&user_id)?; + + Some(( + user_id, + parse_presence_event(&presence_bytes, *timestamp).ok()?, + )) + }, + ), + )) } fn presence_maintain( &self, - mut timer_receiver: mpsc::UnboundedReceiver>, + mut timer_receiver: mpsc::UnboundedReceiver, ) -> Result<()> { let mut timers = FuturesUnordered::new(); + // TODO: Get rid of this hack + timers.push(create_presence_timer( + Duration::from_secs(60), + user_id!("@test:test.com").to_owned(), + )); + tokio::spawn(async move { loop { tokio::select! { - Some(_user_id) = timers.next() => { - // TODO: Handle presence timeouts + Some(user_id) = timers.next() => { + let presence_timestamp = match services().rooms.edus.presence.last_presence_update(&user_id) { + Ok(timestamp) => match timestamp { + Some(timestamp) => timestamp, + None => continue, + }, + Err(e) => { + error!("{e}"); + continue; + } + }; + + let presence_state = determine_presence_state(presence_timestamp); + + // Continue if there is no change in state + if presence_state != PresenceState::Offline { + continue; + } + + for room_id in services() + .rooms + .state_cache + .rooms_joined(&user_id) + .filter_map(|room_id| room_id.ok()) { + let presence_event = match services().rooms.edus.presence.get_presence_event(&user_id, &room_id) { + Ok(event) => match event { + Some(event) => event, + None => continue, + }, + Err(e) => { + error!("{e}"); + continue; + } + }; + + match services().rooms.edus.presence.update_presence(&user_id, &room_id, presence_event) { + Ok(()) => (), + Err(e) => { + error!("{e}"); + continue; + } + } + + // TODO: Send event over federation + } } Some(user_id) = timer_receiver.recv() => { // Idle timeout @@ -147,7 +228,7 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase { } } -async fn create_presence_timer(duration: Duration, user_id: Box) -> Box { +async fn create_presence_timer(duration: Duration, user_id: OwnedUserId) -> OwnedUserId { sleep(duration).await; user_id @@ -162,9 +243,7 @@ fn parse_presence_event(bytes: &[u8], presence_timestamp: u64) -> Result PresenceState { +fn determine_presence_state(last_active_ago: u64) -> PresenceState { let globals = &services().globals; return if last_active_ago < globals.presence_idle_timeout() { @@ -177,10 +256,7 @@ fn determine_presence_state( } /// Translates the timestamp representing last_active_ago to a diff from now. -fn translate_active_ago( - presence_event: &mut PresenceEvent, - last_active_ts: u64, -) { +fn translate_active_ago(presence_event: &mut PresenceEvent, last_active_ts: u64) { let last_active_ago = millis_since_unix_epoch().saturating_sub(last_active_ts); presence_event.content.presence = determine_presence_state(last_active_ago); diff --git a/src/database/mod.rs b/src/database/mod.rs index 0797a136..7baa512a 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -65,7 +65,7 @@ pub struct KeyValueDatabase { pub(super) roomuserid_lastprivatereadupdate: Arc, // LastPrivateReadUpdate = Count pub(super) typingid_userid: Arc, // TypingId = RoomId + TimeoutTime + Count pub(super) roomid_lasttypingupdate: Arc, // LastRoomTypingUpdate = Count - pub(super) userid_presenceupdate: Arc, // PresenceUpdate = Count + Timestamp + pub(super) userid_presenceupdate: Arc, // PresenceUpdate = Count + Timestamp pub(super) roomuserid_presenceevent: Arc, // PresenceEvent //pub rooms: rooms::Rooms, @@ -825,9 +825,6 @@ impl KeyValueDatabase { ); } - // This data is probably outdated - db.presenceid_presence.clear()?; - services().admin.start_handler(); // Set emergency access for the conduit user diff --git a/src/service/globals/mod.rs b/src/service/globals/mod.rs index affc0516..94e3fb97 100644 --- a/src/service/globals/mod.rs +++ b/src/service/globals/mod.rs @@ -286,6 +286,14 @@ impl Service { &self.config.emergency_password } + pub fn presence_idle_timeout(&self) -> u64 { + self.config.presence_idle_timeout + } + + pub fn presence_offline_timeout(&self) -> u64 { + self.config.presence_offline_timeout + } + pub fn supported_room_versions(&self) -> Vec { let mut room_versions: Vec = vec![]; room_versions.extend(self.stable_room_versions.clone()); diff --git a/src/service/rooms/edus/presence/data.rs b/src/service/rooms/edus/presence/data.rs index 216313fe..5dc4c3cb 100644 --- a/src/service/rooms/edus/presence/data.rs +++ b/src/service/rooms/edus/presence/data.rs @@ -1,5 +1,3 @@ -use std::collections::HashMap; - use crate::Result; use ruma::{events::presence::PresenceEvent, OwnedUserId, RoomId, UserId}; use tokio::sync::mpsc; @@ -31,12 +29,12 @@ pub trait Data: Send + Sync { ) -> Result>; /// Returns the most recent presence updates that happened after the event with id `since`. - fn presence_since( - &self, + fn presence_since<'a>( + &'a self, room_id: &RoomId, since: u64, - ) -> Result>; + ) -> Result + 'a>>; - fn presence_maintain(&self, timer_receiver: mpsc::UnboundedReceiver>) + fn presence_maintain(&self, timer_receiver: mpsc::UnboundedReceiver) -> Result<()>; } diff --git a/src/service/rooms/edus/presence/mod.rs b/src/service/rooms/edus/presence/mod.rs index 23194dd1..faac5c76 100644 --- a/src/service/rooms/edus/presence/mod.rs +++ b/src/service/rooms/edus/presence/mod.rs @@ -1,5 +1,4 @@ mod data; -use std::collections::HashMap; pub use data::Data; use ruma::{events::presence::PresenceEvent, OwnedUserId, RoomId, UserId}; @@ -11,7 +10,7 @@ pub struct Service { pub db: &'static dyn Data, // Presence timers - timer_sender: mpsc::UnboundedSender>, + timer_sender: mpsc::UnboundedSender, } impl Service { @@ -51,7 +50,11 @@ impl Service { self.db.ping_presence(user_id) } - pub fn get_last_presence_event( + pub fn last_presence_update(&self, user_id: &UserId) -> Result> { + self.db.last_presence_update(user_id) + } + + pub fn get_presence_event( &self, user_id: &UserId, room_id: &RoomId, @@ -66,86 +69,18 @@ impl Service { pub fn presence_maintain( &self, - timer_receiver: mpsc::UnboundedReceiver>, + timer_receiver: mpsc::UnboundedReceiver, ) -> Result<()> { self.db.presence_maintain(timer_receiver) } - /* TODO - /// Sets all users to offline who have been quiet for too long. - fn _presence_maintain( - &self, - rooms: &super::Rooms, - globals: &super::super::globals::Globals, - ) -> Result<()> { - let current_timestamp = utils::millis_since_unix_epoch(); - - for (user_id_bytes, last_timestamp) in self - .userid_lastpresenceupdate - .iter() - .filter_map(|(k, bytes)| { - Some(( - k, - utils::u64_from_bytes(&bytes) - .map_err(|_| { - Error::bad_database("Invalid timestamp in userid_lastpresenceupdate.") - }) - .ok()?, - )) - }) - .take_while(|(_, timestamp)| current_timestamp.saturating_sub(*timestamp) > 5 * 60_000) - // 5 Minutes - { - // Send new presence events to set the user offline - let count = globals.next_count()?.to_be_bytes(); - let user_id: Box<_> = utils::string_from_bytes(&user_id_bytes) - .map_err(|_| { - Error::bad_database("Invalid UserId bytes in userid_lastpresenceupdate.") - })? - .try_into() - .map_err(|_| Error::bad_database("Invalid UserId in userid_lastpresenceupdate."))?; - for room_id in rooms.rooms_joined(&user_id).filter_map(|r| r.ok()) { - let mut presence_id = room_id.as_bytes().to_vec(); - presence_id.push(0xff); - presence_id.extend_from_slice(&count); - presence_id.push(0xff); - presence_id.extend_from_slice(&user_id_bytes); - - self.presenceid_presence.insert( - &presence_id, - &serde_json::to_vec(&PresenceEvent { - content: PresenceEventContent { - avatar_url: None, - currently_active: None, - displayname: None, - last_active_ago: Some( - last_timestamp.try_into().expect("time is valid"), - ), - presence: PresenceState::Offline, - status_msg: None, - }, - sender: user_id.to_owned(), - }) - .expect("PresenceEvent can be serialized"), - )?; - } - - self.userid_lastpresenceupdate.insert( - user_id.as_bytes(), - &utils::millis_since_unix_epoch().to_be_bytes(), - )?; - } - - Ok(()) - }*/ - /// Returns the most recent presence updates that happened after the event with id `since`. #[tracing::instrument(skip(self, since, room_id))] pub fn presence_since( &self, room_id: &RoomId, since: u64, - ) -> Result> { + ) -> Result>> { self.db.presence_since(room_id, since) } }