Compare commits
1 commit
next
...
update-dev
Author | SHA1 | Date | |
---|---|---|---|
|
aaee4d273b |
8 changed files with 142 additions and 25 deletions
|
@ -17,8 +17,8 @@ pub async fn get_devices_route(
|
||||||
|
|
||||||
let devices: Vec<device::Device> = services()
|
let devices: Vec<device::Device> = services()
|
||||||
.users
|
.users
|
||||||
.all_devices_metadata(sender_user)
|
.all_user_devices_metadata(sender_user)
|
||||||
.filter_map(|r| r.ok()) // Filter out buggy devices
|
.await
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
Ok(get_devices::v3::Response { devices })
|
Ok(get_devices::v3::Response { devices })
|
||||||
|
|
|
@ -87,7 +87,12 @@ where
|
||||||
if let Some(reg_info) = services().appservice.find_from_token(token).await {
|
if let Some(reg_info) = services().appservice.find_from_token(token).await {
|
||||||
Token::Appservice(Box::new(reg_info.clone()))
|
Token::Appservice(Box::new(reg_info.clone()))
|
||||||
} else if let Some((user_id, device_id)) = services().users.find_from_token(token)? {
|
} else if let Some((user_id, device_id)) = services().users.find_from_token(token)? {
|
||||||
Token::User((user_id, OwnedDeviceId::from(device_id)))
|
services()
|
||||||
|
.users
|
||||||
|
.update_device_last_seen(device_id.clone())
|
||||||
|
.await;
|
||||||
|
|
||||||
|
Token::User((user_id, device_id))
|
||||||
} else {
|
} else {
|
||||||
Token::Invalid
|
Token::Invalid
|
||||||
}
|
}
|
||||||
|
|
|
@ -1778,8 +1778,8 @@ pub async fn get_devices_route(
|
||||||
.expect("version will not grow that large"),
|
.expect("version will not grow that large"),
|
||||||
devices: services()
|
devices: services()
|
||||||
.users
|
.users
|
||||||
.all_devices_metadata(&body.user_id)
|
.all_user_devices_metadata(&body.user_id)
|
||||||
.filter_map(|r| r.ok())
|
.await
|
||||||
.filter_map(|metadata| {
|
.filter_map(|metadata| {
|
||||||
Some(UserDevice {
|
Some(UserDevice {
|
||||||
keys: services()
|
keys: services()
|
||||||
|
|
|
@ -41,7 +41,7 @@ impl service::users::Data for KeyValueDatabase {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Find out which user an access token belongs to.
|
/// Find out which user an access token belongs to.
|
||||||
fn find_from_token(&self, token: &str) -> Result<Option<(OwnedUserId, String)>> {
|
fn find_from_token(&self, token: &str) -> Result<Option<(OwnedUserId, OwnedDeviceId)>> {
|
||||||
self.token_userdeviceid
|
self.token_userdeviceid
|
||||||
.get(token.as_bytes())?
|
.get(token.as_bytes())?
|
||||||
.map_or(Ok(None), |bytes| {
|
.map_or(Ok(None), |bytes| {
|
||||||
|
@ -60,9 +60,11 @@ impl service::users::Data for KeyValueDatabase {
|
||||||
.map_err(|_| {
|
.map_err(|_| {
|
||||||
Error::bad_database("User ID in token_userdeviceid is invalid.")
|
Error::bad_database("User ID in token_userdeviceid is invalid.")
|
||||||
})?,
|
})?,
|
||||||
utils::string_from_bytes(device_bytes).map_err(|_| {
|
utils::string_from_bytes(device_bytes)
|
||||||
Error::bad_database("Device ID in token_userdeviceid is invalid.")
|
.map_err(|_| {
|
||||||
})?,
|
Error::bad_database("Device ID in token_userdeviceid is invalid.")
|
||||||
|
})?
|
||||||
|
.into(),
|
||||||
)))
|
)))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -896,7 +898,7 @@ impl service::users::Data for KeyValueDatabase {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn all_devices_metadata<'a>(
|
fn all_user_devices_metadata<'a>(
|
||||||
&'a self,
|
&'a self,
|
||||||
user_id: &UserId,
|
user_id: &UserId,
|
||||||
) -> Box<dyn Iterator<Item = Result<Device>> + 'a> {
|
) -> Box<dyn Iterator<Item = Result<Device>> + 'a> {
|
||||||
|
@ -914,6 +916,46 @@ impl service::users::Data for KeyValueDatabase {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn all_devices_metadata<'a>(
|
||||||
|
&'a self,
|
||||||
|
) -> Box<dyn Iterator<Item = Result<(OwnedUserId, Device)>> + 'a> {
|
||||||
|
Box::new(self.userdeviceid_metadata.iter().map(|(user, bytes)| {
|
||||||
|
let binding = user.split(|&b| b == 0xff).collect::<Vec<_>>();
|
||||||
|
let user = binding
|
||||||
|
.first()
|
||||||
|
.ok_or_else(|| Error::bad_database("User ID in token_userdeviceid is invalid."))?
|
||||||
|
.to_vec();
|
||||||
|
|
||||||
|
if let (Ok(Ok(u)), Ok(d)) = (
|
||||||
|
utils::string_from_bytes(&user).map(UserId::parse),
|
||||||
|
serde_json::from_slice::<Device>(&bytes),
|
||||||
|
) {
|
||||||
|
Ok((u, d))
|
||||||
|
} else {
|
||||||
|
Err(Error::bad_database(
|
||||||
|
"Device and/or User in userdeviceid_metadata is invalid.",
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_devices_last_seen<'a>(
|
||||||
|
&'a self,
|
||||||
|
devices: &'a BTreeMap<OwnedDeviceId, MilliSecondsSinceUnixEpoch>,
|
||||||
|
) -> Box<dyn Iterator<Item = Result<()>> + 'a> {
|
||||||
|
Box::new(
|
||||||
|
self.all_devices_metadata()
|
||||||
|
.filter_map(Result::ok)
|
||||||
|
.filter_map(|(u, mut d)| {
|
||||||
|
d.last_seen_ts = Some(*devices.get(&d.device_id)?);
|
||||||
|
Some((u, d))
|
||||||
|
})
|
||||||
|
.map(|(user, device)| {
|
||||||
|
self.update_device_metadata(&user, &device.device_id, &device)
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
/// Creates a new sync filter. Returns the filter id.
|
/// Creates a new sync filter. Returns the filter id.
|
||||||
fn create_filter(&self, user_id: &UserId, filter: &FilterDefinition) -> Result<String> {
|
fn create_filter(&self, user_id: &UserId, filter: &FilterDefinition) -> Result<String> {
|
||||||
let filter_id = utils::random_string(4);
|
let filter_id = utils::random_string(4);
|
||||||
|
|
|
@ -988,6 +988,7 @@ impl KeyValueDatabase {
|
||||||
services().sending.start_handler();
|
services().sending.start_handler();
|
||||||
|
|
||||||
Self::start_cleanup_task().await;
|
Self::start_cleanup_task().await;
|
||||||
|
Self::start_device_last_seen_update_task();
|
||||||
if services().globals.allow_check_for_updates() {
|
if services().globals.allow_check_for_updates() {
|
||||||
Self::start_check_for_updates_task();
|
Self::start_check_for_updates_task();
|
||||||
}
|
}
|
||||||
|
@ -1006,6 +1007,24 @@ impl KeyValueDatabase {
|
||||||
res
|
res
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument]
|
||||||
|
pub fn start_device_last_seen_update_task() {
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let timer_interval = Duration::from_secs(60);
|
||||||
|
let mut i = interval(timer_interval);
|
||||||
|
loop {
|
||||||
|
i.tick().await;
|
||||||
|
let _ = Self::try_update_device_last_seen().await;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn try_update_device_last_seen() {
|
||||||
|
for error in services().users.write_cached_last_seen().await {
|
||||||
|
warn!("Error writing last seen timestamp of device to database: {error}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[tracing::instrument]
|
#[tracing::instrument]
|
||||||
pub fn start_check_for_updates_task() {
|
pub fn start_check_for_updates_task() {
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
|
|
@ -114,6 +114,7 @@ impl Services {
|
||||||
users: users::Service {
|
users: users::Service {
|
||||||
db,
|
db,
|
||||||
connections: StdMutex::new(BTreeMap::new()),
|
connections: StdMutex::new(BTreeMap::new()),
|
||||||
|
device_last_seen: Mutex::new(BTreeMap::new()),
|
||||||
},
|
},
|
||||||
account_data: account_data::Service { db },
|
account_data: account_data::Service { db },
|
||||||
admin: admin::Service::build(),
|
admin: admin::Service::build(),
|
||||||
|
|
|
@ -4,8 +4,8 @@ use ruma::{
|
||||||
encryption::{CrossSigningKey, DeviceKeys, OneTimeKey},
|
encryption::{CrossSigningKey, DeviceKeys, OneTimeKey},
|
||||||
events::AnyToDeviceEvent,
|
events::AnyToDeviceEvent,
|
||||||
serde::Raw,
|
serde::Raw,
|
||||||
DeviceId, DeviceKeyAlgorithm, DeviceKeyId, OwnedDeviceId, OwnedDeviceKeyId, OwnedMxcUri,
|
DeviceId, DeviceKeyAlgorithm, DeviceKeyId, MilliSecondsSinceUnixEpoch, OwnedDeviceId,
|
||||||
OwnedUserId, UInt, UserId,
|
OwnedDeviceKeyId, OwnedMxcUri, OwnedUserId, UInt, UserId,
|
||||||
};
|
};
|
||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
|
|
||||||
|
@ -20,7 +20,7 @@ pub trait Data: Send + Sync {
|
||||||
fn count(&self) -> Result<usize>;
|
fn count(&self) -> Result<usize>;
|
||||||
|
|
||||||
/// Find out which user an access token belongs to.
|
/// Find out which user an access token belongs to.
|
||||||
fn find_from_token(&self, token: &str) -> Result<Option<(OwnedUserId, String)>>;
|
fn find_from_token(&self, token: &str) -> Result<Option<(OwnedUserId, OwnedDeviceId)>>;
|
||||||
|
|
||||||
/// Returns an iterator over all users on this homeserver.
|
/// Returns an iterator over all users on this homeserver.
|
||||||
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = Result<OwnedUserId>> + 'a>;
|
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = Result<OwnedUserId>> + 'a>;
|
||||||
|
@ -202,11 +202,20 @@ pub trait Data: Send + Sync {
|
||||||
|
|
||||||
fn get_devicelist_version(&self, user_id: &UserId) -> Result<Option<u64>>;
|
fn get_devicelist_version(&self, user_id: &UserId) -> Result<Option<u64>>;
|
||||||
|
|
||||||
fn all_devices_metadata<'a>(
|
fn all_user_devices_metadata<'a>(
|
||||||
&'a self,
|
&'a self,
|
||||||
user_id: &UserId,
|
user_id: &UserId,
|
||||||
) -> Box<dyn Iterator<Item = Result<Device>> + 'a>;
|
) -> Box<dyn Iterator<Item = Result<Device>> + 'a>;
|
||||||
|
|
||||||
|
fn all_devices_metadata<'a>(
|
||||||
|
&'a self,
|
||||||
|
) -> Box<dyn Iterator<Item = Result<(OwnedUserId, Device)>> + 'a>;
|
||||||
|
|
||||||
|
fn set_devices_last_seen<'a>(
|
||||||
|
&'a self,
|
||||||
|
devices: &'a BTreeMap<OwnedDeviceId, MilliSecondsSinceUnixEpoch>,
|
||||||
|
) -> Box<dyn Iterator<Item = Result<()>> + 'a>;
|
||||||
|
|
||||||
/// Creates a new sync filter. Returns the filter id.
|
/// Creates a new sync filter. Returns the filter id.
|
||||||
fn create_filter(&self, user_id: &UserId, filter: &FilterDefinition) -> Result<String>;
|
fn create_filter(&self, user_id: &UserId, filter: &FilterDefinition) -> Result<String>;
|
||||||
|
|
||||||
|
|
|
@ -2,7 +2,7 @@ mod data;
|
||||||
use std::{
|
use std::{
|
||||||
collections::{BTreeMap, BTreeSet},
|
collections::{BTreeMap, BTreeSet},
|
||||||
mem,
|
mem,
|
||||||
sync::{Arc, Mutex},
|
sync::{Arc, Mutex as StdMutex},
|
||||||
};
|
};
|
||||||
|
|
||||||
pub use data::Data;
|
pub use data::Data;
|
||||||
|
@ -19,9 +19,10 @@ use ruma::{
|
||||||
encryption::{CrossSigningKey, DeviceKeys, OneTimeKey},
|
encryption::{CrossSigningKey, DeviceKeys, OneTimeKey},
|
||||||
events::AnyToDeviceEvent,
|
events::AnyToDeviceEvent,
|
||||||
serde::Raw,
|
serde::Raw,
|
||||||
DeviceId, DeviceKeyAlgorithm, DeviceKeyId, OwnedDeviceId, OwnedDeviceKeyId, OwnedMxcUri,
|
DeviceId, DeviceKeyAlgorithm, DeviceKeyId, MilliSecondsSinceUnixEpoch, OwnedDeviceId,
|
||||||
OwnedRoomId, OwnedUserId, RoomAliasId, UInt, UserId,
|
OwnedDeviceKeyId, OwnedMxcUri, OwnedRoomId, OwnedUserId, RoomAliasId, UInt, UserId,
|
||||||
};
|
};
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
use crate::{services, Error, Result};
|
use crate::{services, Error, Result};
|
||||||
|
|
||||||
|
@ -36,7 +37,8 @@ pub struct Service {
|
||||||
pub db: &'static dyn Data,
|
pub db: &'static dyn Data,
|
||||||
#[allow(clippy::type_complexity)]
|
#[allow(clippy::type_complexity)]
|
||||||
pub connections:
|
pub connections:
|
||||||
Mutex<BTreeMap<(OwnedUserId, OwnedDeviceId, String), Arc<Mutex<SlidingSyncCache>>>>,
|
StdMutex<BTreeMap<(OwnedUserId, OwnedDeviceId, String), Arc<StdMutex<SlidingSyncCache>>>>,
|
||||||
|
pub device_last_seen: Mutex<BTreeMap<OwnedDeviceId, MilliSecondsSinceUnixEpoch>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Service {
|
impl Service {
|
||||||
|
@ -72,7 +74,7 @@ impl Service {
|
||||||
cache
|
cache
|
||||||
.entry((user_id, device_id, conn_id))
|
.entry((user_id, device_id, conn_id))
|
||||||
.or_insert_with(|| {
|
.or_insert_with(|| {
|
||||||
Arc::new(Mutex::new(SlidingSyncCache {
|
Arc::new(StdMutex::new(SlidingSyncCache {
|
||||||
lists: BTreeMap::new(),
|
lists: BTreeMap::new(),
|
||||||
subscriptions: BTreeMap::new(),
|
subscriptions: BTreeMap::new(),
|
||||||
known_rooms: BTreeMap::new(),
|
known_rooms: BTreeMap::new(),
|
||||||
|
@ -200,7 +202,7 @@ impl Service {
|
||||||
cache
|
cache
|
||||||
.entry((user_id, device_id, conn_id))
|
.entry((user_id, device_id, conn_id))
|
||||||
.or_insert_with(|| {
|
.or_insert_with(|| {
|
||||||
Arc::new(Mutex::new(SlidingSyncCache {
|
Arc::new(StdMutex::new(SlidingSyncCache {
|
||||||
lists: BTreeMap::new(),
|
lists: BTreeMap::new(),
|
||||||
subscriptions: BTreeMap::new(),
|
subscriptions: BTreeMap::new(),
|
||||||
known_rooms: BTreeMap::new(),
|
known_rooms: BTreeMap::new(),
|
||||||
|
@ -228,7 +230,7 @@ impl Service {
|
||||||
cache
|
cache
|
||||||
.entry((user_id, device_id, conn_id))
|
.entry((user_id, device_id, conn_id))
|
||||||
.or_insert_with(|| {
|
.or_insert_with(|| {
|
||||||
Arc::new(Mutex::new(SlidingSyncCache {
|
Arc::new(StdMutex::new(SlidingSyncCache {
|
||||||
lists: BTreeMap::new(),
|
lists: BTreeMap::new(),
|
||||||
subscriptions: BTreeMap::new(),
|
subscriptions: BTreeMap::new(),
|
||||||
known_rooms: BTreeMap::new(),
|
known_rooms: BTreeMap::new(),
|
||||||
|
@ -289,7 +291,7 @@ impl Service {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Find out which user an access token belongs to.
|
/// Find out which user an access token belongs to.
|
||||||
pub fn find_from_token(&self, token: &str) -> Result<Option<(OwnedUserId, String)>> {
|
pub fn find_from_token(&self, token: &str) -> Result<Option<(OwnedUserId, OwnedDeviceId)>> {
|
||||||
self.db.find_from_token(token)
|
self.db.find_from_token(token)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -563,11 +565,25 @@ impl Service {
|
||||||
self.db.get_devicelist_version(user_id)
|
self.db.get_devicelist_version(user_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn all_devices_metadata<'a>(
|
pub async fn all_user_devices_metadata<'a>(
|
||||||
&'a self,
|
&'a self,
|
||||||
user_id: &UserId,
|
user_id: &UserId,
|
||||||
) -> impl Iterator<Item = Result<Device>> + 'a {
|
) -> impl Iterator<Item = Device> + 'a {
|
||||||
self.db.all_devices_metadata(user_id)
|
let all_devices: Vec<_> = self
|
||||||
|
.db
|
||||||
|
.all_user_devices_metadata(user_id)
|
||||||
|
.filter_map(Result::ok)
|
||||||
|
// RumaHandler trait complains if we don't collect
|
||||||
|
.collect();
|
||||||
|
let device_last_seen = self.device_last_seen.lock().await;
|
||||||
|
|
||||||
|
// Updates the timestamps with the cached ones
|
||||||
|
all_devices.into_iter().map(move |mut d| {
|
||||||
|
if let Some(ts) = device_last_seen.get(&d.device_id) {
|
||||||
|
d.last_seen_ts = Some(*ts);
|
||||||
|
};
|
||||||
|
d
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Deactivate account
|
/// Deactivate account
|
||||||
|
@ -608,6 +624,31 @@ impl Service {
|
||||||
pub fn find_from_openid_token(&self, token: &str) -> Result<Option<OwnedUserId>> {
|
pub fn find_from_openid_token(&self, token: &str) -> Result<Option<OwnedUserId>> {
|
||||||
self.db.find_from_openid_token(token)
|
self.db.find_from_openid_token(token)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Sets the device_last_seen timestamp of a given device to now
|
||||||
|
pub async fn update_device_last_seen(&self, device_id: OwnedDeviceId) {
|
||||||
|
self.device_last_seen
|
||||||
|
.lock()
|
||||||
|
.await
|
||||||
|
.insert(device_id, MilliSecondsSinceUnixEpoch::now());
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Writes all the currently cached last seen timestamps of devices to the database,
|
||||||
|
/// clearing the cache in the process
|
||||||
|
pub async fn write_cached_last_seen(&self) -> Vec<Error> {
|
||||||
|
let mut map = self.device_last_seen.lock().await;
|
||||||
|
if !map.is_empty() {
|
||||||
|
let result = self
|
||||||
|
.db
|
||||||
|
.set_devices_last_seen(&map)
|
||||||
|
.filter_map(Result::err)
|
||||||
|
.collect();
|
||||||
|
map.clear();
|
||||||
|
result
|
||||||
|
} else {
|
||||||
|
Vec::new()
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Ensure that a user only sees signatures from themselves and the target user
|
/// Ensure that a user only sees signatures from themselves and the target user
|
||||||
|
|
Loading…
Add table
Reference in a new issue