Compare commits

...

1 commit

Author SHA1 Message Date
Matthias Ahouansou
aaee4d273b
feat(devices): update the device last seen timestamp on usage 2024-05-31 08:37:52 +01:00
8 changed files with 142 additions and 25 deletions

View file

@ -17,8 +17,8 @@ pub async fn get_devices_route(
let devices: Vec<device::Device> = services()
.users
.all_devices_metadata(sender_user)
.filter_map(|r| r.ok()) // Filter out buggy devices
.all_user_devices_metadata(sender_user)
.await
.collect();
Ok(get_devices::v3::Response { devices })

View file

@ -87,7 +87,12 @@ where
if let Some(reg_info) = services().appservice.find_from_token(token).await {
Token::Appservice(Box::new(reg_info.clone()))
} else if let Some((user_id, device_id)) = services().users.find_from_token(token)? {
Token::User((user_id, OwnedDeviceId::from(device_id)))
services()
.users
.update_device_last_seen(device_id.clone())
.await;
Token::User((user_id, device_id))
} else {
Token::Invalid
}

View file

@ -1778,8 +1778,8 @@ pub async fn get_devices_route(
.expect("version will not grow that large"),
devices: services()
.users
.all_devices_metadata(&body.user_id)
.filter_map(|r| r.ok())
.all_user_devices_metadata(&body.user_id)
.await
.filter_map(|metadata| {
Some(UserDevice {
keys: services()

View file

@ -41,7 +41,7 @@ impl service::users::Data for KeyValueDatabase {
}
/// Find out which user an access token belongs to.
fn find_from_token(&self, token: &str) -> Result<Option<(OwnedUserId, String)>> {
fn find_from_token(&self, token: &str) -> Result<Option<(OwnedUserId, OwnedDeviceId)>> {
self.token_userdeviceid
.get(token.as_bytes())?
.map_or(Ok(None), |bytes| {
@ -60,9 +60,11 @@ impl service::users::Data for KeyValueDatabase {
.map_err(|_| {
Error::bad_database("User ID in token_userdeviceid is invalid.")
})?,
utils::string_from_bytes(device_bytes).map_err(|_| {
Error::bad_database("Device ID in token_userdeviceid is invalid.")
})?,
utils::string_from_bytes(device_bytes)
.map_err(|_| {
Error::bad_database("Device ID in token_userdeviceid is invalid.")
})?
.into(),
)))
})
}
@ -896,7 +898,7 @@ impl service::users::Data for KeyValueDatabase {
})
}
fn all_devices_metadata<'a>(
fn all_user_devices_metadata<'a>(
&'a self,
user_id: &UserId,
) -> Box<dyn Iterator<Item = Result<Device>> + 'a> {
@ -914,6 +916,46 @@ impl service::users::Data for KeyValueDatabase {
)
}
fn all_devices_metadata<'a>(
&'a self,
) -> Box<dyn Iterator<Item = Result<(OwnedUserId, Device)>> + 'a> {
Box::new(self.userdeviceid_metadata.iter().map(|(user, bytes)| {
let binding = user.split(|&b| b == 0xff).collect::<Vec<_>>();
let user = binding
.first()
.ok_or_else(|| Error::bad_database("User ID in token_userdeviceid is invalid."))?
.to_vec();
if let (Ok(Ok(u)), Ok(d)) = (
utils::string_from_bytes(&user).map(UserId::parse),
serde_json::from_slice::<Device>(&bytes),
) {
Ok((u, d))
} else {
Err(Error::bad_database(
"Device and/or User in userdeviceid_metadata is invalid.",
))
}
}))
}
fn set_devices_last_seen<'a>(
&'a self,
devices: &'a BTreeMap<OwnedDeviceId, MilliSecondsSinceUnixEpoch>,
) -> Box<dyn Iterator<Item = Result<()>> + 'a> {
Box::new(
self.all_devices_metadata()
.filter_map(Result::ok)
.filter_map(|(u, mut d)| {
d.last_seen_ts = Some(*devices.get(&d.device_id)?);
Some((u, d))
})
.map(|(user, device)| {
self.update_device_metadata(&user, &device.device_id, &device)
}),
)
}
/// Creates a new sync filter. Returns the filter id.
fn create_filter(&self, user_id: &UserId, filter: &FilterDefinition) -> Result<String> {
let filter_id = utils::random_string(4);

View file

@ -988,6 +988,7 @@ impl KeyValueDatabase {
services().sending.start_handler();
Self::start_cleanup_task().await;
Self::start_device_last_seen_update_task();
if services().globals.allow_check_for_updates() {
Self::start_check_for_updates_task();
}
@ -1006,6 +1007,24 @@ impl KeyValueDatabase {
res
}
#[tracing::instrument]
pub fn start_device_last_seen_update_task() {
tokio::spawn(async move {
let timer_interval = Duration::from_secs(60);
let mut i = interval(timer_interval);
loop {
i.tick().await;
let _ = Self::try_update_device_last_seen().await;
}
});
}
async fn try_update_device_last_seen() {
for error in services().users.write_cached_last_seen().await {
warn!("Error writing last seen timestamp of device to database: {error}");
}
}
#[tracing::instrument]
pub fn start_check_for_updates_task() {
tokio::spawn(async move {

View file

@ -114,6 +114,7 @@ impl Services {
users: users::Service {
db,
connections: StdMutex::new(BTreeMap::new()),
device_last_seen: Mutex::new(BTreeMap::new()),
},
account_data: account_data::Service { db },
admin: admin::Service::build(),

View file

@ -4,8 +4,8 @@ use ruma::{
encryption::{CrossSigningKey, DeviceKeys, OneTimeKey},
events::AnyToDeviceEvent,
serde::Raw,
DeviceId, DeviceKeyAlgorithm, DeviceKeyId, OwnedDeviceId, OwnedDeviceKeyId, OwnedMxcUri,
OwnedUserId, UInt, UserId,
DeviceId, DeviceKeyAlgorithm, DeviceKeyId, MilliSecondsSinceUnixEpoch, OwnedDeviceId,
OwnedDeviceKeyId, OwnedMxcUri, OwnedUserId, UInt, UserId,
};
use std::collections::BTreeMap;
@ -20,7 +20,7 @@ pub trait Data: Send + Sync {
fn count(&self) -> Result<usize>;
/// Find out which user an access token belongs to.
fn find_from_token(&self, token: &str) -> Result<Option<(OwnedUserId, String)>>;
fn find_from_token(&self, token: &str) -> Result<Option<(OwnedUserId, OwnedDeviceId)>>;
/// Returns an iterator over all users on this homeserver.
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = Result<OwnedUserId>> + 'a>;
@ -202,11 +202,20 @@ pub trait Data: Send + Sync {
fn get_devicelist_version(&self, user_id: &UserId) -> Result<Option<u64>>;
fn all_devices_metadata<'a>(
fn all_user_devices_metadata<'a>(
&'a self,
user_id: &UserId,
) -> Box<dyn Iterator<Item = Result<Device>> + 'a>;
fn all_devices_metadata<'a>(
&'a self,
) -> Box<dyn Iterator<Item = Result<(OwnedUserId, Device)>> + 'a>;
fn set_devices_last_seen<'a>(
&'a self,
devices: &'a BTreeMap<OwnedDeviceId, MilliSecondsSinceUnixEpoch>,
) -> Box<dyn Iterator<Item = Result<()>> + 'a>;
/// Creates a new sync filter. Returns the filter id.
fn create_filter(&self, user_id: &UserId, filter: &FilterDefinition) -> Result<String>;

View file

@ -2,7 +2,7 @@ mod data;
use std::{
collections::{BTreeMap, BTreeSet},
mem,
sync::{Arc, Mutex},
sync::{Arc, Mutex as StdMutex},
};
pub use data::Data;
@ -19,9 +19,10 @@ use ruma::{
encryption::{CrossSigningKey, DeviceKeys, OneTimeKey},
events::AnyToDeviceEvent,
serde::Raw,
DeviceId, DeviceKeyAlgorithm, DeviceKeyId, OwnedDeviceId, OwnedDeviceKeyId, OwnedMxcUri,
OwnedRoomId, OwnedUserId, RoomAliasId, UInt, UserId,
DeviceId, DeviceKeyAlgorithm, DeviceKeyId, MilliSecondsSinceUnixEpoch, OwnedDeviceId,
OwnedDeviceKeyId, OwnedMxcUri, OwnedRoomId, OwnedUserId, RoomAliasId, UInt, UserId,
};
use tokio::sync::Mutex;
use crate::{services, Error, Result};
@ -36,7 +37,8 @@ pub struct Service {
pub db: &'static dyn Data,
#[allow(clippy::type_complexity)]
pub connections:
Mutex<BTreeMap<(OwnedUserId, OwnedDeviceId, String), Arc<Mutex<SlidingSyncCache>>>>,
StdMutex<BTreeMap<(OwnedUserId, OwnedDeviceId, String), Arc<StdMutex<SlidingSyncCache>>>>,
pub device_last_seen: Mutex<BTreeMap<OwnedDeviceId, MilliSecondsSinceUnixEpoch>>,
}
impl Service {
@ -72,7 +74,7 @@ impl Service {
cache
.entry((user_id, device_id, conn_id))
.or_insert_with(|| {
Arc::new(Mutex::new(SlidingSyncCache {
Arc::new(StdMutex::new(SlidingSyncCache {
lists: BTreeMap::new(),
subscriptions: BTreeMap::new(),
known_rooms: BTreeMap::new(),
@ -200,7 +202,7 @@ impl Service {
cache
.entry((user_id, device_id, conn_id))
.or_insert_with(|| {
Arc::new(Mutex::new(SlidingSyncCache {
Arc::new(StdMutex::new(SlidingSyncCache {
lists: BTreeMap::new(),
subscriptions: BTreeMap::new(),
known_rooms: BTreeMap::new(),
@ -228,7 +230,7 @@ impl Service {
cache
.entry((user_id, device_id, conn_id))
.or_insert_with(|| {
Arc::new(Mutex::new(SlidingSyncCache {
Arc::new(StdMutex::new(SlidingSyncCache {
lists: BTreeMap::new(),
subscriptions: BTreeMap::new(),
known_rooms: BTreeMap::new(),
@ -289,7 +291,7 @@ impl Service {
}
/// Find out which user an access token belongs to.
pub fn find_from_token(&self, token: &str) -> Result<Option<(OwnedUserId, String)>> {
pub fn find_from_token(&self, token: &str) -> Result<Option<(OwnedUserId, OwnedDeviceId)>> {
self.db.find_from_token(token)
}
@ -563,11 +565,25 @@ impl Service {
self.db.get_devicelist_version(user_id)
}
pub fn all_devices_metadata<'a>(
pub async fn all_user_devices_metadata<'a>(
&'a self,
user_id: &UserId,
) -> impl Iterator<Item = Result<Device>> + 'a {
self.db.all_devices_metadata(user_id)
) -> impl Iterator<Item = Device> + 'a {
let all_devices: Vec<_> = self
.db
.all_user_devices_metadata(user_id)
.filter_map(Result::ok)
// RumaHandler trait complains if we don't collect
.collect();
let device_last_seen = self.device_last_seen.lock().await;
// Updates the timestamps with the cached ones
all_devices.into_iter().map(move |mut d| {
if let Some(ts) = device_last_seen.get(&d.device_id) {
d.last_seen_ts = Some(*ts);
};
d
})
}
/// Deactivate account
@ -608,6 +624,31 @@ impl Service {
pub fn find_from_openid_token(&self, token: &str) -> Result<Option<OwnedUserId>> {
self.db.find_from_openid_token(token)
}
/// Sets the device_last_seen timestamp of a given device to now
pub async fn update_device_last_seen(&self, device_id: OwnedDeviceId) {
self.device_last_seen
.lock()
.await
.insert(device_id, MilliSecondsSinceUnixEpoch::now());
}
/// Writes all the currently cached last seen timestamps of devices to the database,
/// clearing the cache in the process
pub async fn write_cached_last_seen(&self) -> Vec<Error> {
let mut map = self.device_last_seen.lock().await;
if !map.is_empty() {
let result = self
.db
.set_devices_last_seen(&map)
.filter_map(Result::err)
.collect();
map.clear();
result
} else {
Vec::new()
}
}
}
/// Ensure that a user only sees signatures from themselves and the target user