diff --git a/src/api/client_server/device.rs b/src/api/client_server/device.rs index 9a42f048..c5b52f05 100644 --- a/src/api/client_server/device.rs +++ b/src/api/client_server/device.rs @@ -17,8 +17,8 @@ pub async fn get_devices_route( let devices: Vec = services() .users - .all_devices_metadata(sender_user) - .filter_map(|r| r.ok()) // Filter out buggy devices + .all_user_devices_metadata(sender_user) + .await .collect(); Ok(get_devices::v3::Response { devices }) diff --git a/src/api/ruma_wrapper/axum.rs b/src/api/ruma_wrapper/axum.rs index 1ad2794d..d2e886bc 100644 --- a/src/api/ruma_wrapper/axum.rs +++ b/src/api/ruma_wrapper/axum.rs @@ -87,7 +87,12 @@ where if let Some(reg_info) = services().appservice.find_from_token(token).await { Token::Appservice(Box::new(reg_info.clone())) } else if let Some((user_id, device_id)) = services().users.find_from_token(token)? { - Token::User((user_id, OwnedDeviceId::from(device_id))) + services() + .users + .update_device_last_seen(device_id.clone()) + .await; + + Token::User((user_id, device_id)) } else { Token::Invalid } diff --git a/src/api/server_server.rs b/src/api/server_server.rs index 13a6d648..15b7c421 100644 --- a/src/api/server_server.rs +++ b/src/api/server_server.rs @@ -1778,8 +1778,8 @@ pub async fn get_devices_route( .expect("version will not grow that large"), devices: services() .users - .all_devices_metadata(&body.user_id) - .filter_map(|r| r.ok()) + .all_user_devices_metadata(&body.user_id) + .await .filter_map(|metadata| { Some(UserDevice { keys: services() diff --git a/src/database/key_value/users.rs b/src/database/key_value/users.rs index 63321a40..3719b130 100644 --- a/src/database/key_value/users.rs +++ b/src/database/key_value/users.rs @@ -41,7 +41,7 @@ impl service::users::Data for KeyValueDatabase { } /// Find out which user an access token belongs to. - fn find_from_token(&self, token: &str) -> Result> { + fn find_from_token(&self, token: &str) -> Result> { self.token_userdeviceid .get(token.as_bytes())? .map_or(Ok(None), |bytes| { @@ -60,9 +60,11 @@ impl service::users::Data for KeyValueDatabase { .map_err(|_| { Error::bad_database("User ID in token_userdeviceid is invalid.") })?, - utils::string_from_bytes(device_bytes).map_err(|_| { - Error::bad_database("Device ID in token_userdeviceid is invalid.") - })?, + utils::string_from_bytes(device_bytes) + .map_err(|_| { + Error::bad_database("Device ID in token_userdeviceid is invalid.") + })? + .into(), ))) }) } @@ -896,7 +898,7 @@ impl service::users::Data for KeyValueDatabase { }) } - fn all_devices_metadata<'a>( + fn all_user_devices_metadata<'a>( &'a self, user_id: &UserId, ) -> Box> + 'a> { @@ -914,6 +916,46 @@ impl service::users::Data for KeyValueDatabase { ) } + fn all_devices_metadata<'a>( + &'a self, + ) -> Box> + 'a> { + Box::new(self.userdeviceid_metadata.iter().map(|(user, bytes)| { + let binding = user.split(|&b| b == 0xff).collect::>(); + let user = binding + .first() + .ok_or_else(|| Error::bad_database("User ID in token_userdeviceid is invalid."))? + .to_vec(); + + if let (Ok(Ok(u)), Ok(d)) = ( + utils::string_from_bytes(&user).map(UserId::parse), + serde_json::from_slice::(&bytes), + ) { + Ok((u, d)) + } else { + Err(Error::bad_database( + "Device and/or User in userdeviceid_metadata is invalid.", + )) + } + })) + } + + fn set_devices_last_seen<'a>( + &'a self, + devices: &'a BTreeMap, + ) -> Box> + 'a> { + Box::new( + self.all_devices_metadata() + .filter_map(Result::ok) + .filter_map(|(u, mut d)| { + d.last_seen_ts = Some(*devices.get(&d.device_id)?); + Some((u, d)) + }) + .map(|(user, device)| { + self.update_device_metadata(&user, &device.device_id, &device) + }), + ) + } + /// Creates a new sync filter. Returns the filter id. fn create_filter(&self, user_id: &UserId, filter: &FilterDefinition) -> Result { let filter_id = utils::random_string(4); diff --git a/src/database/mod.rs b/src/database/mod.rs index f4740ff4..a2c419cc 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -988,6 +988,7 @@ impl KeyValueDatabase { services().sending.start_handler(); Self::start_cleanup_task().await; + Self::start_device_last_seen_update_task(); if services().globals.allow_check_for_updates() { Self::start_check_for_updates_task(); } @@ -1006,6 +1007,24 @@ impl KeyValueDatabase { res } + #[tracing::instrument] + pub fn start_device_last_seen_update_task() { + tokio::spawn(async move { + let timer_interval = Duration::from_secs(60); + let mut i = interval(timer_interval); + loop { + i.tick().await; + let _ = Self::try_update_device_last_seen().await; + } + }); + } + + async fn try_update_device_last_seen() { + for error in services().users.write_cached_last_seen().await { + warn!("Error writing last seen timestamp of device to database: {error}"); + } + } + #[tracing::instrument] pub fn start_check_for_updates_task() { tokio::spawn(async move { diff --git a/src/service/mod.rs b/src/service/mod.rs index 4c11bc18..ef7335c9 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -114,6 +114,7 @@ impl Services { users: users::Service { db, connections: StdMutex::new(BTreeMap::new()), + device_last_seen: Mutex::new(BTreeMap::new()), }, account_data: account_data::Service { db }, admin: admin::Service::build(), diff --git a/src/service/users/data.rs b/src/service/users/data.rs index 4566c36d..ad194b60 100644 --- a/src/service/users/data.rs +++ b/src/service/users/data.rs @@ -4,8 +4,8 @@ use ruma::{ encryption::{CrossSigningKey, DeviceKeys, OneTimeKey}, events::AnyToDeviceEvent, serde::Raw, - DeviceId, DeviceKeyAlgorithm, DeviceKeyId, OwnedDeviceId, OwnedDeviceKeyId, OwnedMxcUri, - OwnedUserId, UInt, UserId, + DeviceId, DeviceKeyAlgorithm, DeviceKeyId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, + OwnedDeviceKeyId, OwnedMxcUri, OwnedUserId, UInt, UserId, }; use std::collections::BTreeMap; @@ -20,7 +20,7 @@ pub trait Data: Send + Sync { fn count(&self) -> Result; /// Find out which user an access token belongs to. - fn find_from_token(&self, token: &str) -> Result>; + fn find_from_token(&self, token: &str) -> Result>; /// Returns an iterator over all users on this homeserver. fn iter<'a>(&'a self) -> Box> + 'a>; @@ -202,11 +202,20 @@ pub trait Data: Send + Sync { fn get_devicelist_version(&self, user_id: &UserId) -> Result>; - fn all_devices_metadata<'a>( + fn all_user_devices_metadata<'a>( &'a self, user_id: &UserId, ) -> Box> + 'a>; + fn all_devices_metadata<'a>( + &'a self, + ) -> Box> + 'a>; + + fn set_devices_last_seen<'a>( + &'a self, + devices: &'a BTreeMap, + ) -> Box> + 'a>; + /// Creates a new sync filter. Returns the filter id. fn create_filter(&self, user_id: &UserId, filter: &FilterDefinition) -> Result; diff --git a/src/service/users/mod.rs b/src/service/users/mod.rs index c3799586..c4b37977 100644 --- a/src/service/users/mod.rs +++ b/src/service/users/mod.rs @@ -2,7 +2,7 @@ mod data; use std::{ collections::{BTreeMap, BTreeSet}, mem, - sync::{Arc, Mutex}, + sync::{Arc, Mutex as StdMutex}, }; pub use data::Data; @@ -19,9 +19,10 @@ use ruma::{ encryption::{CrossSigningKey, DeviceKeys, OneTimeKey}, events::AnyToDeviceEvent, serde::Raw, - DeviceId, DeviceKeyAlgorithm, DeviceKeyId, OwnedDeviceId, OwnedDeviceKeyId, OwnedMxcUri, - OwnedRoomId, OwnedUserId, RoomAliasId, UInt, UserId, + DeviceId, DeviceKeyAlgorithm, DeviceKeyId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, + OwnedDeviceKeyId, OwnedMxcUri, OwnedRoomId, OwnedUserId, RoomAliasId, UInt, UserId, }; +use tokio::sync::Mutex; use crate::{services, Error, Result}; @@ -36,7 +37,8 @@ pub struct Service { pub db: &'static dyn Data, #[allow(clippy::type_complexity)] pub connections: - Mutex>>>, + StdMutex>>>, + pub device_last_seen: Mutex>, } impl Service { @@ -72,7 +74,7 @@ impl Service { cache .entry((user_id, device_id, conn_id)) .or_insert_with(|| { - Arc::new(Mutex::new(SlidingSyncCache { + Arc::new(StdMutex::new(SlidingSyncCache { lists: BTreeMap::new(), subscriptions: BTreeMap::new(), known_rooms: BTreeMap::new(), @@ -200,7 +202,7 @@ impl Service { cache .entry((user_id, device_id, conn_id)) .or_insert_with(|| { - Arc::new(Mutex::new(SlidingSyncCache { + Arc::new(StdMutex::new(SlidingSyncCache { lists: BTreeMap::new(), subscriptions: BTreeMap::new(), known_rooms: BTreeMap::new(), @@ -228,7 +230,7 @@ impl Service { cache .entry((user_id, device_id, conn_id)) .or_insert_with(|| { - Arc::new(Mutex::new(SlidingSyncCache { + Arc::new(StdMutex::new(SlidingSyncCache { lists: BTreeMap::new(), subscriptions: BTreeMap::new(), known_rooms: BTreeMap::new(), @@ -289,7 +291,7 @@ impl Service { } /// Find out which user an access token belongs to. - pub fn find_from_token(&self, token: &str) -> Result> { + pub fn find_from_token(&self, token: &str) -> Result> { self.db.find_from_token(token) } @@ -563,11 +565,25 @@ impl Service { self.db.get_devicelist_version(user_id) } - pub fn all_devices_metadata<'a>( + pub async fn all_user_devices_metadata<'a>( &'a self, user_id: &UserId, - ) -> impl Iterator> + 'a { - self.db.all_devices_metadata(user_id) + ) -> impl Iterator + 'a { + let all_devices: Vec<_> = self + .db + .all_user_devices_metadata(user_id) + .filter_map(Result::ok) + // RumaHandler trait complains if we don't collect + .collect(); + let device_last_seen = self.device_last_seen.lock().await; + + // Updates the timestamps with the cached ones + all_devices.into_iter().map(move |mut d| { + if let Some(ts) = device_last_seen.get(&d.device_id) { + d.last_seen_ts = Some(*ts); + }; + d + }) } /// Deactivate account @@ -608,6 +624,31 @@ impl Service { pub fn find_from_openid_token(&self, token: &str) -> Result> { self.db.find_from_openid_token(token) } + + /// Sets the device_last_seen timestamp of a given device to now + pub async fn update_device_last_seen(&self, device_id: OwnedDeviceId) { + self.device_last_seen + .lock() + .await + .insert(device_id, MilliSecondsSinceUnixEpoch::now()); + } + + /// Writes all the currently cached last seen timestamps of devices to the database, + /// clearing the cache in the process + pub async fn write_cached_last_seen(&self) -> Vec { + let mut map = self.device_last_seen.lock().await; + if !map.is_empty() { + let result = self + .db + .set_devices_last_seen(&map) + .filter_map(Result::err) + .collect(); + map.clear(); + result + } else { + Vec::new() + } + } } /// Ensure that a user only sees signatures from themselves and the target user