fix(presence): fix issues found when testing

This commit is contained in:
Jakub Kubík 2022-11-18 22:32:50 +01:00
parent b6541d207d
commit b18b228c7c
No known key found for this signature in database
GPG key ID: D3A0D5D60F3A173F
8 changed files with 161 additions and 96 deletions

View file

@ -1,5 +1,5 @@
use crate::{services, utils, Result, Ruma}; use crate::{services, Result, Ruma};
use ruma::api::client::presence::{get_presence, set_presence}; use ruma::{api::client::presence::{get_presence, set_presence}, uint, presence::PresenceState};
use std::time::Duration; use std::time::Duration;
/// # `PUT /_matrix/client/r0/presence/{userId}/status` /// # `PUT /_matrix/client/r0/presence/{userId}/status`
@ -21,16 +21,13 @@ pub async fn set_presence_route(
avatar_url: services().users.avatar_url(sender_user)?, avatar_url: services().users.avatar_url(sender_user)?,
currently_active: None, currently_active: None,
displayname: services().users.displayname(sender_user)?, displayname: services().users.displayname(sender_user)?,
last_active_ago: Some( last_active_ago: Some(uint!(0)),
utils::millis_since_unix_epoch()
.try_into()
.expect("time is valid"),
),
presence: body.presence.clone(), presence: body.presence.clone(),
status_msg: body.status_msg.clone(), status_msg: body.status_msg.clone(),
}, },
sender: sender_user.clone(), sender: sender_user.clone(),
}, },
true
)?; )?;
} }
@ -69,7 +66,6 @@ pub async fn get_presence_route(
if let Some(presence) = presence_event { if let Some(presence) = presence_event {
Ok(get_presence::v3::Response { Ok(get_presence::v3::Response {
// TODO: Should ruma just use the presenceeventcontent type here?
status_msg: presence.content.status_msg, status_msg: presence.content.status_msg,
currently_active: presence.content.currently_active, currently_active: presence.content.currently_active,
last_active_ago: presence last_active_ago: presence
@ -79,6 +75,11 @@ pub async fn get_presence_route(
presence: presence.content.presence, presence: presence.content.presence,
}) })
} else { } else {
todo!(); Ok(get_presence::v3::Response {
status_msg: None,
currently_active: None,
last_active_ago: None,
presence: PresenceState::Offline,
})
} }
} }

View file

@ -109,6 +109,7 @@ pub async fn set_displayname_route(
}, },
sender: sender_user.clone(), sender: sender_user.clone(),
}, },
true
)?; )?;
} }
@ -244,6 +245,7 @@ pub async fn set_avatar_url_route(
}, },
sender: sender_user.clone(), sender: sender_user.clone(),
}, },
true
)?; )?;
} }

View file

@ -166,7 +166,16 @@ async fn sync_helper(
}; };
// TODO: match body.set_presence { // TODO: match body.set_presence {
services().rooms.edus.presence.ping_presence(&sender_user)?; services()
.rooms
.edus
.presence
.ping_presence(
&sender_user,
false,
true,
true
)?;
// Setup watchers, so if there's no response, we can wait for them // Setup watchers, so if there's no response, we can wait for them
let watcher = services().globals.watch(&sender_user, &sender_device); let watcher = services().globals.watch(&sender_user, &sender_device);

View file

@ -770,6 +770,7 @@ pub async fn send_transaction_message_route(
}, },
sender: user_id.clone(), sender: user_id.clone(),
}, },
true
)?; )?;
} }
} }

View file

@ -1,7 +1,7 @@
use futures_util::{stream::FuturesUnordered, StreamExt}; use futures_util::{stream::FuturesUnordered, StreamExt};
use ruma::user_id; use ruma::user_id;
use std::{collections::HashMap, time::Duration}; use std::{collections::{HashMap, hash_map::Entry}, time::Duration, mem};
use tracing::error; use tracing::{error, info};
use ruma::{ use ruma::{
events::presence::PresenceEvent, presence::PresenceState, OwnedUserId, RoomId, UInt, UserId, events::presence::PresenceEvent, presence::PresenceState, OwnedUserId, RoomId, UInt, UserId,
@ -17,19 +17,22 @@ use crate::{
pub struct PresenceUpdate { pub struct PresenceUpdate {
count: u64, count: u64,
timestamp: u64, prev_timestamp: u64,
curr_timestamp: u64,
} }
impl PresenceUpdate { impl PresenceUpdate {
fn to_be_bytes(&self) -> Vec<u8> { fn to_be_bytes(&self) -> Vec<u8> {
[self.count.to_be_bytes(), self.timestamp.to_be_bytes()].concat() [self.count.to_be_bytes(), self.prev_timestamp.to_be_bytes(), self.curr_timestamp.to_be_bytes()].concat()
} }
fn from_be_bytes(bytes: &[u8]) -> Result<Self> { fn from_be_bytes(bytes: &[u8]) -> Result<Self> {
let (count_bytes, timestamp_bytes) = bytes.split_at(bytes.len() / 2); let (count_bytes, timestamps_bytes) = bytes.split_at(mem::size_of::<u64>());
let (prev_timestamp_bytes, curr_timestamp_bytes) = timestamps_bytes.split_at(mem::size_of::<u64>());
Ok(Self { Ok(Self {
count: u64_from_bytes(count_bytes).expect("count bytes from DB are valid"), count: u64_from_bytes(count_bytes).expect("count bytes from DB are valid"),
timestamp: u64_from_bytes(timestamp_bytes).expect("timestamp bytes from DB are valid"), prev_timestamp: u64_from_bytes(prev_timestamp_bytes).expect("timestamp bytes from DB are valid"),
curr_timestamp: u64_from_bytes(curr_timestamp_bytes).expect("timestamp bytes from DB are valid"),
}) })
} }
} }
@ -48,14 +51,17 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase {
&serde_json::to_vec(&presence).expect("presence event from DB is valid"), &serde_json::to_vec(&presence).expect("presence event from DB is valid"),
)?; )?;
let timestamp = match presence.content.last_active_ago {
Some(active_ago) => millis_since_unix_epoch().saturating_sub(active_ago.into()),
None => millis_since_unix_epoch(),
};
self.userid_presenceupdate.insert( self.userid_presenceupdate.insert(
user_id.as_bytes(), user_id.as_bytes(),
&*PresenceUpdate { &*PresenceUpdate {
count: services().globals.next_count()?, count: services().globals.next_count()?,
timestamp: match presence.content.last_active_ago { prev_timestamp: timestamp,
Some(active_ago) => millis_since_unix_epoch().saturating_sub(active_ago.into()), curr_timestamp: timestamp,
None => millis_since_unix_epoch(),
},
} }
.to_be_bytes(), .to_be_bytes(),
)?; )?;
@ -63,23 +69,41 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase {
Ok(()) Ok(())
} }
fn ping_presence(&self, user_id: &UserId) -> Result<()> { fn ping_presence(&self, user_id: &UserId, update_count: bool, update_timestamp: bool) -> Result<()> {
let now = millis_since_unix_epoch();
let presence = self.userid_presenceupdate
.get(user_id.as_bytes())?
.map(|presence_bytes| PresenceUpdate::from_be_bytes(&presence_bytes))
.transpose()?;
let new_presence = match presence {
Some(presence) => {
PresenceUpdate {
count: if update_count { services().globals.next_count()? } else { presence.count },
prev_timestamp: if update_timestamp { presence.curr_timestamp } else { presence.prev_timestamp },
curr_timestamp: if update_timestamp { now } else { presence.curr_timestamp }
}
},
None => PresenceUpdate {
count: services().globals.current_count()?,
prev_timestamp: now,
curr_timestamp: now,
}
};
self.userid_presenceupdate.insert( self.userid_presenceupdate.insert(
user_id.as_bytes(), user_id.as_bytes(),
&*PresenceUpdate { &*new_presence.to_be_bytes(),
count: services().globals.current_count()?,
timestamp: millis_since_unix_epoch(),
}
.to_be_bytes(),
)?; )?;
Ok(()) Ok(())
} }
fn last_presence_update(&self, user_id: &UserId) -> Result<Option<u64>> { fn last_presence_update(&self, user_id: &UserId) -> Result<Option<(u64, u64)>> {
self.userid_presenceupdate self.userid_presenceupdate
.get(user_id.as_bytes())? .get(user_id.as_bytes())?
.map(|bytes| PresenceUpdate::from_be_bytes(&bytes).map(|update| update.timestamp)) .map(|bytes| PresenceUpdate::from_be_bytes(&bytes).map(|update| (update.prev_timestamp, update.curr_timestamp)))
.transpose() .transpose()
} }
@ -101,21 +125,22 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase {
room_id: &RoomId, room_id: &RoomId,
since: u64, since: u64,
) -> Result<Box<dyn Iterator<Item = (OwnedUserId, PresenceEvent)> + 'a>> { ) -> Result<Box<dyn Iterator<Item = (OwnedUserId, PresenceEvent)> + 'a>> {
let services = &services();
let user_timestamp: HashMap<OwnedUserId, u64> = self let user_timestamp: HashMap<OwnedUserId, u64> = self
.userid_presenceupdate .userid_presenceupdate
.iter() .iter()
.filter_map(|(user_id_bytes, update_bytes)| { .filter_map(|(user_id_bytes, update_bytes)| {
Some(( Some((
OwnedUserId::from( UserId::parse(
UserId::parse(utils::string_from_bytes(&user_id_bytes).ok()?).ok()?, utils::string_from_bytes(&user_id_bytes)
), .expect("UserID bytes are a valid string")
PresenceUpdate::from_be_bytes(&update_bytes).ok()?, ).expect("UserID bytes from database are a valid UserID"),
PresenceUpdate::from_be_bytes(&update_bytes)
.expect("PresenceUpdate bytes from database are a valid PresenceUpdate"),
)) ))
}) })
.filter_map(|(user_id, presence_update)| { .filter_map(|(user_id, presence_update)| {
if presence_update.count <= since if presence_update.count <= since
|| !services || !services()
.rooms .rooms
.state_cache .state_cache
.is_joined(&user_id, room_id) .is_joined(&user_id, room_id)
@ -124,18 +149,20 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase {
return None; return None;
} }
Some((user_id, presence_update.timestamp)) Some((user_id, presence_update.curr_timestamp))
}) })
.collect(); .collect();
Ok(Box::new( Ok(Box::new(
self.roomuserid_presenceevent self.roomuserid_presenceevent
.iter() .scan_prefix(room_id.as_bytes().to_vec())
.filter_map(|(user_id_bytes, presence_bytes)| { .filter_map(|(roomuserid_bytes, presence_bytes)| {
let user_id_bytes = roomuserid_bytes.split(|byte| *byte == 0xff as u8).last()?;
Some(( Some((
OwnedUserId::from( UserId::parse(
UserId::parse(utils::string_from_bytes(&user_id_bytes).ok()?).ok()?, utils::string_from_bytes(&user_id_bytes)
), .expect("UserID bytes are a valid string")
).expect("UserID bytes from database are a valid UserID").to_owned(),
presence_bytes, presence_bytes,
)) ))
}) })
@ -145,7 +172,8 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase {
Some(( Some((
user_id, user_id,
parse_presence_event(&presence_bytes, *timestamp).ok()?, parse_presence_event(&presence_bytes, *timestamp)
.expect("PresenceEvent bytes from database are a valid PresenceEvent"),
)) ))
}, },
), ),
@ -157,6 +185,7 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase {
mut timer_receiver: mpsc::UnboundedReceiver<OwnedUserId>, mut timer_receiver: mpsc::UnboundedReceiver<OwnedUserId>,
) -> Result<()> { ) -> Result<()> {
let mut timers = FuturesUnordered::new(); let mut timers = FuturesUnordered::new();
let mut timers_timestamp: HashMap<OwnedUserId, u64> = HashMap::new();
// TODO: Get rid of this hack // TODO: Get rid of this hack
timers.push(create_presence_timer( timers.push(create_presence_timer(
@ -168,9 +197,10 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase {
loop { loop {
tokio::select! { tokio::select! {
Some(user_id) = timers.next() => { Some(user_id) = timers.next() => {
let presence_timestamp = match services().rooms.edus.presence.last_presence_update(&user_id) { info!("Processing timer for user '{}' ({})", user_id.clone(), timers.len());
Ok(timestamp) => match timestamp { let (prev_timestamp, curr_timestamp) = match services().rooms.edus.presence.last_presence_update(&user_id) {
Some(timestamp) => timestamp, Ok(timestamp_tuple) => match timestamp_tuple {
Some(timestamp_tuple) => timestamp_tuple,
None => continue, None => continue,
}, },
Err(e) => { Err(e) => {
@ -179,46 +209,49 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase {
} }
}; };
let presence_state = determine_presence_state(presence_timestamp); let prev_presence_state = determine_presence_state(prev_timestamp);
let curr_presence_state = determine_presence_state(curr_timestamp);
// Continue if there is no change in state // Continue if there is no change in state
if presence_state != PresenceState::Offline { if prev_presence_state == curr_presence_state {
continue; continue;
} }
for room_id in services() match services().rooms.edus.presence.ping_presence(&user_id, true, false, false) {
.rooms Ok(_) => (),
.state_cache Err(e) => error!("{e}")
.rooms_joined(&user_id) }
.filter_map(|room_id| room_id.ok()) {
let presence_event = match services().rooms.edus.presence.get_presence_event(&user_id, &room_id) { // TODO: Notify federation sender
Ok(event) => match event { }
Some(event) => event, Some(user_id) = timer_receiver.recv() => {
None => continue, let now = millis_since_unix_epoch();
let should_send = match timers_timestamp.entry(user_id.to_owned()) {
Entry::Occupied(mut entry) => {
if now - entry.get() > 15 * 1000 {
entry.insert(now);
true
} else {
false
}
}, },
Err(e) => { Entry::Vacant(entry) => {
error!("{e}"); entry.insert(now);
continue; true
} }
}; };
match services().rooms.edus.presence.update_presence(&user_id, &room_id, presence_event) { if !should_send {
Ok(()) => (),
Err(e) => {
error!("{e}");
continue; continue;
} }
}
// TODO: Send event over federation
}
}
Some(user_id) = timer_receiver.recv() => {
// Idle timeout // Idle timeout
timers.push(create_presence_timer(Duration::from_secs(60), user_id.clone())); timers.push(create_presence_timer(Duration::from_secs(60), user_id.clone()));
// Offline timeout // Offline timeout
timers.push(create_presence_timer(Duration::from_secs(60*15) , user_id)); timers.push(create_presence_timer(Duration::from_secs(60*15) , user_id.clone()));
info!("Added timers for user '{}' ({})", user_id, timers.len());
} }
} }
} }

View file

@ -825,6 +825,9 @@ impl KeyValueDatabase {
); );
} }
// Flush old presence data
db.userid_presenceupdate.clear()?;
services().admin.start_handler(); services().admin.start_handler();
// Set emergency access for the conduit user // Set emergency access for the conduit user

View file

@ -15,10 +15,10 @@ pub trait Data: Send + Sync {
) -> Result<()>; ) -> Result<()>;
/// Resets the presence timeout, so the user will stay in their current presence state. /// Resets the presence timeout, so the user will stay in their current presence state.
fn ping_presence(&self, user_id: &UserId) -> Result<()>; fn ping_presence(&self, user_id: &UserId, update_count: bool, update_timestamp: bool) -> Result<()>;
/// Returns the timestamp of the last presence update of this user in millis since the unix epoch. /// Returns the timestamp of the last presence update of this user in millis since the unix epoch.
fn last_presence_update(&self, user_id: &UserId) -> Result<Option<u64>>; fn last_presence_update(&self, user_id: &UserId) -> Result<Option<(u64, u64)>>;
/// Returns the presence event with correct last_active_ago. /// Returns the presence event with correct last_active_ago.
fn get_presence_event( fn get_presence_event(

View file

@ -14,6 +14,7 @@ pub struct Service {
} }
impl Service { impl Service {
/// Builds the service and initialized the presence_maintain task
pub fn build(db: &'static dyn Data) -> Result<Self> { pub fn build(db: &'static dyn Data) -> Result<Self> {
let (sender, receiver) = mpsc::unbounded_channel(); let (sender, receiver) = mpsc::unbounded_channel();
let service = Self { let service = Self {
@ -26,6 +27,15 @@ impl Service {
Ok(service) Ok(service)
} }
/// Resets the presence timeout, so the user will stay in their current presence state.
pub fn ping_presence(&self, user_id: &UserId, update_count: bool, update_timestamp: bool, spawn_timer: bool) -> Result<()> {
if spawn_timer {
self.spawn_timer(user_id)?;
}
self.db.ping_presence(user_id, update_count, update_timestamp)
}
/// Adds a presence event which will be saved until a new event replaces it. /// Adds a presence event which will be saved until a new event replaces it.
/// ///
/// Note: This method takes a RoomId because presence updates are always bound to rooms to /// Note: This method takes a RoomId because presence updates are always bound to rooms to
@ -35,45 +45,34 @@ impl Service {
user_id: &UserId, user_id: &UserId,
room_id: &RoomId, room_id: &RoomId,
presence: PresenceEvent, presence: PresenceEvent,
spawn_timer: bool
) -> Result<()> { ) -> Result<()> {
self.timer_sender if spawn_timer {
.send(user_id.into()) self.spawn_timer(user_id)?;
.map_err(|_| Error::bad_database("Sender errored out"))?; }
self.db.update_presence(user_id, room_id, presence) self.db.update_presence(user_id, room_id, presence)
} }
/// Resets the presence timeout, so the user will stay in their current presence state. /// Returns the timestamp of when the presence was last updated for the specified user.
pub fn ping_presence(&self, user_id: &UserId) -> Result<()> { pub fn last_presence_update(&self, user_id: &UserId) -> Result<Option<(u64, u64)>> {
self.timer_sender
.send(user_id.into())
.map_err(|_| Error::bad_database("Sender errored out"))?;
self.db.ping_presence(user_id)
}
pub fn last_presence_update(&self, user_id: &UserId) -> Result<Option<u64>> {
self.db.last_presence_update(user_id) self.db.last_presence_update(user_id)
} }
/// Returns the saved presence event for this user with actual last_active_ago.
pub fn get_presence_event( pub fn get_presence_event(
&self, &self,
user_id: &UserId, user_id: &UserId,
room_id: &RoomId, room_id: &RoomId,
) -> Result<Option<PresenceEvent>> { ) -> Result<Option<PresenceEvent>> {
let last_update = match self.db.last_presence_update(user_id)? { let last_update = match self.db.last_presence_update(user_id)? {
Some(last) => last, Some(last) => last.1,
None => return Ok(None), None => return Ok(None),
}; };
self.db.get_presence_event(room_id, user_id, last_update) self.db.get_presence_event(room_id, user_id, last_update)
} }
pub fn presence_maintain(
&self,
timer_receiver: mpsc::UnboundedReceiver<OwnedUserId>,
) -> Result<()> {
self.db.presence_maintain(timer_receiver)
}
/// Returns the most recent presence updates that happened after the event with id `since`. /// Returns the most recent presence updates that happened after the event with id `since`.
#[tracing::instrument(skip(self, since, room_id))] #[tracing::instrument(skip(self, since, room_id))]
pub fn presence_since( pub fn presence_since(
@ -83,4 +82,21 @@ impl Service {
) -> Result<Box<dyn Iterator<Item = (OwnedUserId, PresenceEvent)>>> { ) -> Result<Box<dyn Iterator<Item = (OwnedUserId, PresenceEvent)>>> {
self.db.presence_since(room_id, since) self.db.presence_since(room_id, since)
} }
/// Spawns a task maintaining presence data
fn presence_maintain(
&self,
timer_receiver: mpsc::UnboundedReceiver<OwnedUserId>,
) -> Result<()> {
self.db.presence_maintain(timer_receiver)
}
/// Spawns a timer for the user used by the maintenance task
fn spawn_timer(&self, user_id: &UserId) -> Result<()> {
self.timer_sender
.send(user_id.into())
.map_err(|_| Error::bad_database("Sender errored out"))?;
Ok(())
}
} }