feat(presence): implement presence functionality

This commit is contained in:
Jakub Kubík 2023-07-10 16:41:00 +02:00 committed by girlbossceo
parent 22eff2d29c
commit ba03edfae9
12 changed files with 545 additions and 354 deletions

View file

@ -1,5 +1,8 @@
use crate::{services, utils, Result, Ruma}; use crate::{services, Error, Result, Ruma};
use ruma::api::client::presence::{get_presence, set_presence}; use ruma::api::client::{
error::ErrorKind,
presence::{get_presence, set_presence},
};
use std::time::Duration; use std::time::Duration;
/// # `PUT /_matrix/client/r0/presence/{userId}/status` /// # `PUT /_matrix/client/r0/presence/{userId}/status`
@ -9,28 +12,16 @@ pub async fn set_presence_route(
body: Ruma<set_presence::v3::Request>, body: Ruma<set_presence::v3::Request>,
) -> Result<set_presence::v3::Response> { ) -> Result<set_presence::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated"); let sender_user = body.sender_user.as_ref().expect("user is authenticated");
for room_id in services().rooms.state_cache.rooms_joined(sender_user) { for room_id in services().rooms.state_cache.rooms_joined(sender_user) {
let room_id = room_id?; let room_id = room_id?;
services().rooms.edus.presence.update_presence( services().rooms.edus.presence.set_presence(
sender_user,
&room_id, &room_id,
ruma::events::presence::PresenceEvent { sender_user,
content: ruma::events::presence::PresenceEventContent { body.presence.clone(),
avatar_url: services().users.avatar_url(sender_user)?, None,
currently_active: None, None,
displayname: services().users.displayname(sender_user)?, body.status_msg.clone(),
last_active_ago: Some(
utils::millis_since_unix_epoch()
.try_into()
.expect("time is valid"),
),
presence: body.presence.clone(),
status_msg: body.status_msg.clone(),
},
sender: sender_user.clone(),
},
)?; )?;
} }
@ -60,7 +51,7 @@ pub async fn get_presence_route(
.rooms .rooms
.edus .edus
.presence .presence
.get_last_presence_event(sender_user, &room_id)? .get_presence(&room_id, sender_user)?
{ {
presence_event = Some(presence); presence_event = Some(presence);
break; break;
@ -79,6 +70,9 @@ pub async fn get_presence_route(
presence: presence.content.presence, presence: presence.content.presence,
}) })
} else { } else {
todo!(); Err(Error::BadRequest(
ErrorKind::NotFound,
"Presence state for this used was not found",
))
} }
} }

View file

@ -1,4 +1,4 @@
use crate::{service::pdu::PduBuilder, services, utils, Error, Result, Ruma}; use crate::{service::pdu::PduBuilder, services, Error, Result, Ruma};
use ruma::{ use ruma::{
api::{ api::{
client::{ client::{
@ -10,6 +10,7 @@ use ruma::{
federation::{self, query::get_profile_information::v1::ProfileField}, federation::{self, query::get_profile_information::v1::ProfileField},
}, },
events::{room::member::RoomMemberEventContent, StateEventType, TimelineEventType}, events::{room::member::RoomMemberEventContent, StateEventType, TimelineEventType},
presence::PresenceState,
}; };
use serde_json::value::to_raw_value; use serde_json::value::to_raw_value;
use std::sync::Arc; use std::sync::Arc;
@ -89,29 +90,15 @@ pub async fn set_displayname_route(
&room_id, &room_id,
&state_lock, &state_lock,
); );
// Presence update
services().rooms.edus.presence.update_presence(
sender_user,
&room_id,
ruma::events::presence::PresenceEvent {
content: ruma::events::presence::PresenceEventContent {
avatar_url: services().users.avatar_url(sender_user)?,
currently_active: None,
displayname: services().users.displayname(sender_user)?,
last_active_ago: Some(
utils::millis_since_unix_epoch()
.try_into()
.expect("time is valid"),
),
presence: ruma::presence::PresenceState::Online,
status_msg: None,
},
sender: sender_user.clone(),
},
)?;
} }
// Presence update
services()
.rooms
.edus
.presence
.ping_presence(sender_user, PresenceState::Online)?;
Ok(set_display_name::v3::Response {}) Ok(set_display_name::v3::Response {})
} }
@ -224,29 +211,15 @@ pub async fn set_avatar_url_route(
&room_id, &room_id,
&state_lock, &state_lock,
); );
// Presence update
services().rooms.edus.presence.update_presence(
sender_user,
&room_id,
ruma::events::presence::PresenceEvent {
content: ruma::events::presence::PresenceEventContent {
avatar_url: services().users.avatar_url(sender_user)?,
currently_active: None,
displayname: services().users.displayname(sender_user)?,
last_active_ago: Some(
utils::millis_since_unix_epoch()
.try_into()
.expect("time is valid"),
),
presence: ruma::presence::PresenceState::Online,
status_msg: None,
},
sender: sender_user.clone(),
},
)?;
} }
// Presence update
services()
.rooms
.edus
.presence
.ping_presence(sender_user, PresenceState::Online)?;
Ok(set_avatar_url::v3::Response {}) Ok(set_avatar_url::v3::Response {})
} }

View file

@ -170,8 +170,12 @@ async fn sync_helper(
body: sync_events::v3::Request, body: sync_events::v3::Request,
// bool = caching allowed // bool = caching allowed
) -> Result<(sync_events::v3::Response, bool), Error> { ) -> Result<(sync_events::v3::Response, bool), Error> {
// TODO: match body.set_presence { // Presence update
services().rooms.edus.presence.ping_presence(&sender_user)?; services()
.rooms
.edus
.presence
.ping_presence(&sender_user, body.set_presence)?;
// Setup watchers, so if there's no response, we can wait for them // Setup watchers, so if there's no response, we can wait for them
let watcher = services().globals.watch(&sender_user, &sender_device); let watcher = services().globals.watch(&sender_user, &sender_device);
@ -248,36 +252,36 @@ async fn sync_helper(
} }
// Take presence updates from this room // Take presence updates from this room
for (user_id, presence) in services() for presence_data in services()
.rooms .rooms
.edus .edus
.presence .presence
.presence_since(&room_id, since)? .presence_since(&room_id, since)
{ {
let (user_id, _, presence_event) = presence_data?;
match presence_updates.entry(user_id) { match presence_updates.entry(user_id) {
Entry::Vacant(v) => { Entry::Vacant(slot) => {
v.insert(presence); slot.insert(presence_event);
} }
Entry::Occupied(mut o) => { Entry::Occupied(mut slot) => {
let p = o.get_mut(); let curr_event = slot.get_mut();
let curr_content = &mut curr_event.content;
let new_content = presence_event.content;
// Update existing presence event with more info // Update existing presence event with more info
p.content.presence = presence.content.presence; curr_content.presence = new_content.presence;
if let Some(status_msg) = presence.content.status_msg { curr_content.status_msg =
p.content.status_msg = Some(status_msg); curr_content.status_msg.clone().or(new_content.status_msg);
} curr_content.last_active_ago =
if let Some(last_active_ago) = presence.content.last_active_ago { curr_content.last_active_ago.or(new_content.last_active_ago);
p.content.last_active_ago = Some(last_active_ago); curr_content.displayname =
} curr_content.displayname.clone().or(new_content.displayname);
if let Some(displayname) = presence.content.displayname { curr_content.avatar_url =
p.content.displayname = Some(displayname); curr_content.avatar_url.clone().or(new_content.avatar_url);
} curr_content.currently_active = curr_content
if let Some(avatar_url) = presence.content.avatar_url { .currently_active
p.content.avatar_url = Some(avatar_url); .or(new_content.currently_active);
}
if let Some(currently_active) = presence.content.currently_active {
p.content.currently_active = Some(currently_active);
}
} }
} }
} }

View file

@ -838,7 +838,20 @@ pub async fn send_transaction_message_route(
.filter_map(|edu| serde_json::from_str::<Edu>(edu.json().get()).ok()) .filter_map(|edu| serde_json::from_str::<Edu>(edu.json().get()).ok())
{ {
match edu { match edu {
Edu::Presence(_) => {} Edu::Presence(presence) => {
for update in presence.push {
for room_id in services().rooms.state_cache.rooms_joined(&update.user_id) {
services().rooms.edus.presence.set_presence(
&room_id?,
&update.user_id,
update.presence.clone(),
Some(update.currently_active),
Some(update.last_active_ago),
update.status_msg.clone(),
)?;
}
}
}
Edu::Receipt(receipt) => { Edu::Receipt(receipt) => {
for (room_id, room_updates) in receipt.receipts { for (room_id, room_updates) in receipt.receipts {
for (user_id, user_updates) in room_updates.read { for (user_id, user_updates) in room_updates.read {

View file

@ -92,6 +92,13 @@ pub struct Config {
pub emergency_password: Option<String>, pub emergency_password: Option<String>,
#[serde(default = "false_fn")]
pub allow_presence: bool,
#[serde(default = "default_presence_idle_timeout_s")]
pub presence_idle_timeout_s: u64,
#[serde(default = "default_presence_offline_timeout_s")]
pub presence_offline_timeout_s: u64,
#[serde(flatten)] #[serde(flatten)]
pub catchall: BTreeMap<String, IgnoredAny>, pub catchall: BTreeMap<String, IgnoredAny>,
} }
@ -310,6 +317,14 @@ fn default_turn_ttl() -> u64 {
60 * 60 * 24 60 * 60 * 24
} }
fn default_presence_idle_timeout_s() -> u64 {
5 * 60
}
fn default_presence_offline_timeout_s() -> u64 {
15 * 60
}
// I know, it's a great name // I know, it's a great name
pub fn default_default_room_version() -> RoomVersionId { pub fn default_default_room_version() -> RoomVersionId {
RoomVersionId::V10 RoomVersionId::V10

View file

@ -1,152 +1,190 @@
use std::collections::HashMap; use std::{iter, time::Duration};
use ruma::{ use ruma::{
events::presence::PresenceEvent, presence::PresenceState, OwnedUserId, RoomId, UInt, UserId, events::presence::PresenceEvent, presence::PresenceState, OwnedUserId, RoomId, UInt, UserId,
}; };
use crate::{database::KeyValueDatabase, service, services, utils, Error, Result}; use crate::{
database::KeyValueDatabase,
service::{self, rooms::edus::presence::Presence},
services,
utils::{self, user_id_from_bytes},
Error, Result,
};
impl service::rooms::edus::presence::Data for KeyValueDatabase { impl service::rooms::edus::presence::Data for KeyValueDatabase {
fn update_presence( fn get_presence(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<PresenceEvent>> {
&self, if !services().globals.config.allow_presence {
user_id: &UserId, return Ok(None);
room_id: &RoomId, }
presence: PresenceEvent,
) -> Result<()> {
// TODO: Remove old entry? Or maybe just wipe completely from time to time?
let count = services().globals.next_count()?.to_be_bytes(); let key = presence_key(room_id, user_id);
let mut presence_id = room_id.as_bytes().to_vec(); self.roomuserid_presence
presence_id.push(0xff); .get(&key)?
presence_id.extend_from_slice(&count); .map(|presence_bytes| -> Result<PresenceEvent> {
presence_id.push(0xff); Presence::from_json_bytes(&presence_bytes)?.to_presence_event(user_id)
presence_id.extend_from_slice(presence.sender.as_bytes());
self.presenceid_presence.insert(
&presence_id,
&serde_json::to_vec(&presence).expect("PresenceEvent can be serialized"),
)?;
self.userid_lastpresenceupdate.insert(
user_id.as_bytes(),
&utils::millis_since_unix_epoch().to_be_bytes(),
)?;
Ok(())
}
fn ping_presence(&self, user_id: &UserId) -> Result<()> {
self.userid_lastpresenceupdate.insert(
user_id.as_bytes(),
&utils::millis_since_unix_epoch().to_be_bytes(),
)?;
Ok(())
}
fn last_presence_update(&self, user_id: &UserId) -> Result<Option<u64>> {
self.userid_lastpresenceupdate
.get(user_id.as_bytes())?
.map(|bytes| {
utils::u64_from_bytes(&bytes).map_err(|_| {
Error::bad_database("Invalid timestamp in userid_lastpresenceupdate.")
})
}) })
.transpose() .transpose()
} }
fn get_presence_event( fn ping_presence(&self, user_id: &UserId, new_state: PresenceState) -> Result<()> {
if !services().globals.config.allow_presence {
return Ok(());
}
let now = utils::millis_since_unix_epoch();
let mut state_changed = false;
for room_id in services().rooms.state_cache.rooms_joined(user_id) {
let key = presence_key(&room_id?, user_id);
let presence_bytes = self.roomuserid_presence.get(&key)?;
if let Some(presence_bytes) = presence_bytes {
let presence = Presence::from_json_bytes(&presence_bytes)?;
if presence.state != new_state {
state_changed = true;
break;
}
}
}
let count = if state_changed {
services().globals.next_count()?
} else {
services().globals.current_count()?
};
for room_id in services().rooms.state_cache.rooms_joined(user_id) {
let key = presence_key(&room_id?, user_id);
let presence_bytes = self.roomuserid_presence.get(&key)?;
let new_presence = match presence_bytes {
Some(presence_bytes) => {
let mut presence = Presence::from_json_bytes(&presence_bytes)?;
presence.state = new_state.clone();
presence.currently_active = presence.state == PresenceState::Online;
presence.last_active_ts = now;
presence.last_count = count;
presence
}
None => Presence::new(
new_state.clone(),
new_state == PresenceState::Online,
now,
count,
None,
),
};
self.roomuserid_presence
.insert(&key, &new_presence.to_json_bytes()?)?;
}
let timeout = match new_state {
PresenceState::Online => services().globals.config.presence_idle_timeout_s,
_ => services().globals.config.presence_offline_timeout_s,
};
self.presence_timer_sender
.send((user_id.to_owned(), Duration::from_secs(timeout)))
.map_err(|_| Error::bad_database("Failed to add presence timer"))
}
fn set_presence(
&self, &self,
room_id: &RoomId, room_id: &RoomId,
user_id: &UserId, user_id: &UserId,
count: u64, presence_state: PresenceState,
) -> Result<Option<PresenceEvent>> { currently_active: Option<bool>,
let mut presence_id = room_id.as_bytes().to_vec(); last_active_ago: Option<UInt>,
presence_id.push(0xff); status_msg: Option<String>,
presence_id.extend_from_slice(&count.to_be_bytes()); ) -> Result<()> {
presence_id.push(0xff); if !services().globals.config.allow_presence {
presence_id.extend_from_slice(user_id.as_bytes()); return Ok(());
self.presenceid_presence
.get(&presence_id)?
.map(|value| parse_presence_event(&value))
.transpose()
}
fn presence_since(
&self,
room_id: &RoomId,
since: u64,
) -> Result<HashMap<OwnedUserId, PresenceEvent>> {
let mut prefix = room_id.as_bytes().to_vec();
prefix.push(0xff);
let mut first_possible_edu = prefix.clone();
first_possible_edu.extend_from_slice(&(since + 1).to_be_bytes()); // +1 so we don't send the event at since
let mut hashmap = HashMap::new();
for (key, value) in self
.presenceid_presence
.iter_from(&first_possible_edu, false)
.take_while(|(key, _)| key.starts_with(&prefix))
{
let user_id = UserId::parse(
utils::string_from_bytes(
key.rsplit(|&b| b == 0xff)
.next()
.expect("rsplit always returns an element"),
)
.map_err(|_| Error::bad_database("Invalid UserId bytes in presenceid_presence."))?,
)
.map_err(|_| Error::bad_database("Invalid UserId in presenceid_presence."))?;
let presence = parse_presence_event(&value)?;
hashmap.insert(user_id, presence);
} }
Ok(hashmap) let now = utils::millis_since_unix_epoch();
let last_active_ts = match last_active_ago {
Some(last_active_ago) => now.saturating_sub(last_active_ago.into()),
None => now,
};
let key = presence_key(room_id, user_id);
let presence = Presence::new(
presence_state,
currently_active.unwrap_or(false),
last_active_ts,
services().globals.next_count()?,
status_msg,
);
let timeout = match presence.state {
PresenceState::Online => services().globals.config.presence_idle_timeout_s,
_ => services().globals.config.presence_offline_timeout_s,
};
self.presence_timer_sender
.send((user_id.to_owned(), Duration::from_secs(timeout)))
.map_err(|_| Error::bad_database("Failed to add presence timer"))?;
self.roomuserid_presence
.insert(&key, &presence.to_json_bytes()?)?;
Ok(())
} }
/* fn remove_presence(&self, user_id: &UserId) -> Result<()> {
fn presence_maintain(&self, db: Arc<TokioRwLock<Database>>) { for room_id in services().rooms.state_cache.rooms_joined(user_id) {
// TODO @M0dEx: move this to a timed tasks module let key = presence_key(&room_id?, user_id);
tokio::spawn(async move {
loop {
select! {
Some(user_id) = self.presence_timers.next() {
// TODO @M0dEx: would it be better to acquire the lock outside the loop?
let guard = db.read().await;
// TODO @M0dEx: add self.presence_timers self.roomuserid_presence.remove(&key)?;
// TODO @M0dEx: maintain presence }
}
} Ok(())
} }
});
fn presence_since<'a>(
&'a self,
room_id: &RoomId,
since: u64,
) -> Box<dyn Iterator<Item = Result<(OwnedUserId, u64, PresenceEvent)>> + 'a> {
if !services().globals.config.allow_presence {
return Box::new(iter::empty());
}
let prefix = [room_id.as_bytes(), &[0xff]].concat();
Box::new(
self.roomuserid_presence
.scan_prefix(prefix)
.map(
|(key, presence_bytes)| -> Result<(OwnedUserId, u64, PresenceEvent)> {
let user_id = user_id_from_bytes(
key.rsplit(|byte| *byte == 0xff).next().ok_or_else(|| {
Error::bad_database("No UserID bytes in presence key")
})?,
)?;
let presence = Presence::from_json_bytes(&presence_bytes)?;
let presence_event = presence.to_presence_event(&user_id)?;
Ok((user_id, presence.last_count, presence_event))
},
)
.filter(move |presence_data| match presence_data {
Ok((_, count, _)) => *count > since,
Err(_) => false,
}),
)
} }
*/
} }
fn parse_presence_event(bytes: &[u8]) -> Result<PresenceEvent> { #[inline]
let mut presence: PresenceEvent = serde_json::from_slice(bytes) fn presence_key(room_id: &RoomId, user_id: &UserId) -> Vec<u8> {
.map_err(|_| Error::bad_database("Invalid presence event in db."))?; [room_id.as_bytes(), &[0xff], user_id.as_bytes()].concat()
let current_timestamp: UInt = utils::millis_since_unix_epoch()
.try_into()
.expect("time is valid");
if presence.content.presence == PresenceState::Online {
// Don't set last_active_ago when the user is online
presence.content.last_active_ago = None;
} else {
// Convert from timestamp to duration
presence.content.last_active_ago = presence
.content
.last_active_ago
.map(|timestamp| current_timestamp - timestamp);
}
Ok(presence)
} }

View file

@ -2,8 +2,8 @@ pub mod abstraction;
pub mod key_value; pub mod key_value;
use crate::{ use crate::{
service::rooms::timeline::PduCount, services, utils, Config, Error, PduEvent, Result, Services, service::rooms::{edus::presence::presence_handler, timeline::PduCount},
SERVICES, services, utils, Config, Error, PduEvent, Result, Services, SERVICES,
}; };
use abstraction::{KeyValueDatabaseEngine, KvTree}; use abstraction::{KeyValueDatabaseEngine, KvTree};
use directories::ProjectDirs; use directories::ProjectDirs;
@ -28,7 +28,7 @@ use std::{
sync::{Arc, Mutex, RwLock}, sync::{Arc, Mutex, RwLock},
time::Duration, time::Duration,
}; };
use tokio::time::interval; use tokio::{sync::mpsc, time::interval};
use tracing::{debug, error, info, warn}; use tracing::{debug, error, info, warn};
@ -72,8 +72,7 @@ pub struct KeyValueDatabase {
pub(super) roomuserid_lastprivatereadupdate: Arc<dyn KvTree>, // LastPrivateReadUpdate = Count pub(super) roomuserid_lastprivatereadupdate: Arc<dyn KvTree>, // LastPrivateReadUpdate = Count
pub(super) typingid_userid: Arc<dyn KvTree>, // TypingId = RoomId + TimeoutTime + Count pub(super) typingid_userid: Arc<dyn KvTree>, // TypingId = RoomId + TimeoutTime + Count
pub(super) roomid_lasttypingupdate: Arc<dyn KvTree>, // LastRoomTypingUpdate = Count pub(super) roomid_lasttypingupdate: Arc<dyn KvTree>, // LastRoomTypingUpdate = Count
pub(super) presenceid_presence: Arc<dyn KvTree>, // PresenceId = RoomId + Count + UserId pub(super) roomuserid_presence: Arc<dyn KvTree>,
pub(super) userid_lastpresenceupdate: Arc<dyn KvTree>, // LastPresenceUpdate = Count
//pub rooms: rooms::Rooms, //pub rooms: rooms::Rooms,
pub(super) pduid_pdu: Arc<dyn KvTree>, // PduId = ShortRoomId + Count pub(super) pduid_pdu: Arc<dyn KvTree>, // PduId = ShortRoomId + Count
@ -172,6 +171,7 @@ pub struct KeyValueDatabase {
pub(super) our_real_users_cache: RwLock<HashMap<OwnedRoomId, Arc<HashSet<OwnedUserId>>>>, pub(super) our_real_users_cache: RwLock<HashMap<OwnedRoomId, Arc<HashSet<OwnedUserId>>>>,
pub(super) appservice_in_room_cache: RwLock<HashMap<OwnedRoomId, HashMap<String, bool>>>, pub(super) appservice_in_room_cache: RwLock<HashMap<OwnedRoomId, HashMap<String, bool>>>,
pub(super) lasttimelinecount_cache: Mutex<HashMap<OwnedRoomId, PduCount>>, pub(super) lasttimelinecount_cache: Mutex<HashMap<OwnedRoomId, PduCount>>,
pub(super) presence_timer_sender: Arc<mpsc::UnboundedSender<(OwnedUserId, Duration)>>,
} }
impl KeyValueDatabase { impl KeyValueDatabase {
@ -275,6 +275,8 @@ impl KeyValueDatabase {
error!(?config.max_request_size, "Max request size is less than 1KB. Please increase it."); error!(?config.max_request_size, "Max request size is less than 1KB. Please increase it.");
} }
let (presence_sender, presence_receiver) = mpsc::unbounded_channel();
let db_raw = Box::new(Self { let db_raw = Box::new(Self {
_db: builder.clone(), _db: builder.clone(),
userid_password: builder.open_tree("userid_password")?, userid_password: builder.open_tree("userid_password")?,
@ -303,8 +305,7 @@ impl KeyValueDatabase {
.open_tree("roomuserid_lastprivatereadupdate")?, .open_tree("roomuserid_lastprivatereadupdate")?,
typingid_userid: builder.open_tree("typingid_userid")?, typingid_userid: builder.open_tree("typingid_userid")?,
roomid_lasttypingupdate: builder.open_tree("roomid_lasttypingupdate")?, roomid_lasttypingupdate: builder.open_tree("roomid_lasttypingupdate")?,
presenceid_presence: builder.open_tree("presenceid_presence")?, roomuserid_presence: builder.open_tree("roomuserid_presence")?,
userid_lastpresenceupdate: builder.open_tree("userid_lastpresenceupdate")?,
pduid_pdu: builder.open_tree("pduid_pdu")?, pduid_pdu: builder.open_tree("pduid_pdu")?,
eventid_pduid: builder.open_tree("eventid_pduid")?, eventid_pduid: builder.open_tree("eventid_pduid")?,
roomid_pduleaves: builder.open_tree("roomid_pduleaves")?, roomid_pduleaves: builder.open_tree("roomid_pduleaves")?,
@ -397,6 +398,7 @@ impl KeyValueDatabase {
our_real_users_cache: RwLock::new(HashMap::new()), our_real_users_cache: RwLock::new(HashMap::new()),
appservice_in_room_cache: RwLock::new(HashMap::new()), appservice_in_room_cache: RwLock::new(HashMap::new()),
lasttimelinecount_cache: Mutex::new(HashMap::new()), lasttimelinecount_cache: Mutex::new(HashMap::new()),
presence_timer_sender: Arc::new(presence_sender),
}); });
let db = Box::leak(db_raw); let db = Box::leak(db_raw);
@ -965,9 +967,6 @@ impl KeyValueDatabase {
); );
} }
// This data is probably outdated
db.presenceid_presence.clear()?;
services().admin.start_handler(); services().admin.start_handler();
// Set emergency access for the conduit user // Set emergency access for the conduit user
@ -992,6 +991,9 @@ impl KeyValueDatabase {
if services().globals.allow_check_for_updates() { if services().globals.allow_check_for_updates() {
Self::start_check_for_updates_task(); Self::start_check_for_updates_task();
} }
if services().globals.config.allow_presence {
Self::start_presence_handler(presence_receiver).await;
}
Ok(()) Ok(())
} }
@ -1065,8 +1067,7 @@ impl KeyValueDatabase {
pub async fn start_cleanup_task() { pub async fn start_cleanup_task() {
#[cfg(unix)] #[cfg(unix)]
use tokio::signal::unix::{signal, SignalKind}; use tokio::signal::unix::{signal, SignalKind};
use tokio::time::Instant;
use std::time::{Duration, Instant};
let timer_interval = let timer_interval =
Duration::from_secs(services().globals.config.cleanup_second_interval as u64); Duration::from_secs(services().globals.config.cleanup_second_interval as u64);
@ -1114,6 +1115,17 @@ impl KeyValueDatabase {
} }
}); });
} }
pub async fn start_presence_handler(
presence_timer_receiver: mpsc::UnboundedReceiver<(OwnedUserId, Duration)>,
) {
tokio::spawn(async move {
match presence_handler(presence_timer_receiver).await {
Ok(()) => warn!("Presence maintenance task finished"),
Err(e) => error!("Presence maintenance task finished with error: {e}"),
}
});
}
} }
/// Sets the emergency password and push rules for the @conduit account in case emergency password is set /// Sets the emergency password and push rules for the @conduit account in case emergency password is set

View file

@ -367,6 +367,18 @@ impl Service {
&self.config.emergency_password &self.config.emergency_password
} }
pub fn allow_presence(&self) -> bool {
self.config.allow_presence
}
pub fn presence_idle_timeout_s(&self) -> u64 {
self.config.presence_idle_timeout_s
}
pub fn presence_offline_timeout_s(&self) -> u64 {
self.config.presence_offline_timeout_s
}
pub fn supported_room_versions(&self) -> Vec<RoomVersionId> { pub fn supported_room_versions(&self) -> Vec<RoomVersionId> {
let mut room_versions: Vec<RoomVersionId> = vec![]; let mut room_versions: Vec<RoomVersionId> = vec![];
room_versions.extend(self.stable_room_versions.clone()); room_versions.extend(self.stable_room_versions.clone());

View file

@ -1,38 +1,33 @@
use std::collections::HashMap;
use crate::Result; use crate::Result;
use ruma::{events::presence::PresenceEvent, OwnedUserId, RoomId, UserId}; use ruma::{
events::presence::PresenceEvent, presence::PresenceState, OwnedUserId, RoomId, UInt, UserId,
};
pub trait Data: Send + Sync { pub trait Data: Send + Sync {
/// Returns the latest presence event for the given user in the given room.
fn get_presence(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<PresenceEvent>>;
/// Pings the presence of the given user in the given room, setting the specified state.
fn ping_presence(&self, user_id: &UserId, new_state: PresenceState) -> Result<()>;
/// Adds a presence event which will be saved until a new event replaces it. /// Adds a presence event which will be saved until a new event replaces it.
/// fn set_presence(
/// Note: This method takes a RoomId because presence updates are always bound to rooms to
/// make sure users outside these rooms can't see them.
fn update_presence(
&self, &self,
user_id: &UserId,
room_id: &RoomId, room_id: &RoomId,
presence: PresenceEvent, user_id: &UserId,
presence_state: PresenceState,
currently_active: Option<bool>,
last_active_ago: Option<UInt>,
status_msg: Option<String>,
) -> Result<()>; ) -> Result<()>;
/// Resets the presence timeout, so the user will stay in their current presence state. /// Removes the presence record for the given user from the database.
fn ping_presence(&self, user_id: &UserId) -> Result<()>; fn remove_presence(&self, user_id: &UserId) -> Result<()>;
/// Returns the timestamp of the last presence update of this user in millis since the unix epoch.
fn last_presence_update(&self, user_id: &UserId) -> Result<Option<u64>>;
/// Returns the presence event with correct last_active_ago.
fn get_presence_event(
&self,
room_id: &RoomId,
user_id: &UserId,
count: u64,
) -> Result<Option<PresenceEvent>>;
/// Returns the most recent presence updates that happened after the event with id `since`. /// Returns the most recent presence updates that happened after the event with id `since`.
fn presence_since( fn presence_since<'a>(
&self, &'a self,
room_id: &RoomId, room_id: &RoomId,
since: u64, since: u64,
) -> Result<HashMap<OwnedUserId, PresenceEvent>>; ) -> Box<dyn Iterator<Item = Result<(OwnedUserId, u64, PresenceEvent)>> + 'a>;
} }

View file

@ -1,122 +1,211 @@
mod data; mod data;
use std::collections::HashMap;
use std::time::Duration;
pub use data::Data; pub use data::Data;
use ruma::{events::presence::PresenceEvent, OwnedUserId, RoomId, UserId}; use futures_util::{stream::FuturesUnordered, StreamExt};
use ruma::{
events::presence::{PresenceEvent, PresenceEventContent},
presence::PresenceState,
OwnedUserId, RoomId, UInt, UserId,
};
use serde::{Deserialize, Serialize};
use tokio::{sync::mpsc, time::sleep};
use tracing::debug;
use crate::Result; use crate::{services, utils, Error, Result};
/// Represents data required to be kept in order to implement the presence specification.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Presence {
pub state: PresenceState,
pub currently_active: bool,
pub last_active_ts: u64,
pub last_count: u64,
pub status_msg: Option<String>,
}
impl Presence {
pub fn new(
state: PresenceState,
currently_active: bool,
last_active_ts: u64,
last_count: u64,
status_msg: Option<String>,
) -> Self {
Self {
state,
currently_active,
last_active_ts,
last_count,
status_msg,
}
}
pub fn from_json_bytes(bytes: &[u8]) -> Result<Self> {
serde_json::from_slice(bytes)
.map_err(|_| Error::bad_database("Invalid presence data in database"))
}
pub fn to_json_bytes(&self) -> Result<Vec<u8>> {
serde_json::to_vec(self)
.map_err(|_| Error::bad_database("Could not serialize Presence to JSON"))
}
/// Creates a PresenceEvent from available data.
pub fn to_presence_event(&self, user_id: &UserId) -> Result<PresenceEvent> {
let now = utils::millis_since_unix_epoch();
let last_active_ago = if self.currently_active {
None
} else {
Some(UInt::new_saturating(
now.saturating_sub(self.last_active_ts),
))
};
Ok(PresenceEvent {
sender: user_id.to_owned(),
content: PresenceEventContent {
presence: self.state.clone(),
status_msg: self.status_msg.clone(),
currently_active: Some(self.currently_active),
last_active_ago,
displayname: services().users.displayname(user_id)?,
avatar_url: services().users.avatar_url(user_id)?,
},
})
}
}
pub struct Service { pub struct Service {
pub db: &'static dyn Data, pub db: &'static dyn Data,
} }
impl Service { impl Service {
/// Adds a presence event which will be saved until a new event replaces it. /// Returns the latest presence event for the given user in the given room.
/// pub fn get_presence(
/// Note: This method takes a RoomId because presence updates are always bound to rooms to
/// make sure users outside these rooms can't see them.
pub fn update_presence(
&self, &self,
user_id: &UserId,
room_id: &RoomId, room_id: &RoomId,
presence: PresenceEvent,
) -> Result<()> {
self.db.update_presence(user_id, room_id, presence)
}
/// Resets the presence timeout, so the user will stay in their current presence state.
pub fn ping_presence(&self, user_id: &UserId) -> Result<()> {
self.db.ping_presence(user_id)
}
pub fn get_last_presence_event(
&self,
user_id: &UserId, user_id: &UserId,
room_id: &RoomId,
) -> Result<Option<PresenceEvent>> { ) -> Result<Option<PresenceEvent>> {
let last_update = match self.db.last_presence_update(user_id)? { self.db.get_presence(room_id, user_id)
Some(last) => last,
None => return Ok(None),
};
self.db.get_presence_event(room_id, user_id, last_update)
} }
/* TODO /// Pings the presence of the given user in the given room, setting the specified state.
/// Sets all users to offline who have been quiet for too long. pub fn ping_presence(&self, user_id: &UserId, new_state: PresenceState) -> Result<()> {
fn _presence_maintain( self.db.ping_presence(user_id, new_state)
}
/// Adds a presence event which will be saved until a new event replaces it.
pub fn set_presence(
&self, &self,
rooms: &super::Rooms, room_id: &RoomId,
globals: &super::super::globals::Globals, user_id: &UserId,
presence_state: PresenceState,
currently_active: Option<bool>,
last_active_ago: Option<UInt>,
status_msg: Option<String>,
) -> Result<()> { ) -> Result<()> {
let current_timestamp = utils::millis_since_unix_epoch(); self.db.set_presence(
room_id,
user_id,
presence_state,
currently_active,
last_active_ago,
status_msg,
)
}
for (user_id_bytes, last_timestamp) in self /// Removes the presence record for the given user from the database.
.userid_lastpresenceupdate pub fn remove_presence(&self, user_id: &UserId) -> Result<()> {
.iter() self.db.remove_presence(user_id)
.filter_map(|(k, bytes)| { }
Some((
k,
utils::u64_from_bytes(&bytes)
.map_err(|_| {
Error::bad_database("Invalid timestamp in userid_lastpresenceupdate.")
})
.ok()?,
))
})
.take_while(|(_, timestamp)| current_timestamp.saturating_sub(*timestamp) > 5 * 60_000)
// 5 Minutes
{
// Send new presence events to set the user offline
let count = globals.next_count()?.to_be_bytes();
let user_id: Box<_> = utils::string_from_bytes(&user_id_bytes)
.map_err(|_| {
Error::bad_database("Invalid UserId bytes in userid_lastpresenceupdate.")
})?
.try_into()
.map_err(|_| Error::bad_database("Invalid UserId in userid_lastpresenceupdate."))?;
for room_id in rooms.rooms_joined(&user_id).filter_map(|r| r.ok()) {
let mut presence_id = room_id.as_bytes().to_vec();
presence_id.push(0xff);
presence_id.extend_from_slice(&count);
presence_id.push(0xff);
presence_id.extend_from_slice(&user_id_bytes);
self.presenceid_presence.insert(
&presence_id,
&serde_json::to_vec(&PresenceEvent {
content: PresenceEventContent {
avatar_url: None,
currently_active: None,
displayname: None,
last_active_ago: Some(
last_timestamp.try_into().expect("time is valid"),
),
presence: PresenceState::Offline,
status_msg: None,
},
sender: user_id.to_owned(),
})
.expect("PresenceEvent can be serialized"),
)?;
}
self.userid_lastpresenceupdate.insert(
user_id.as_bytes(),
&utils::millis_since_unix_epoch().to_be_bytes(),
)?;
}
Ok(())
}*/
/// Returns the most recent presence updates that happened after the event with id `since`. /// Returns the most recent presence updates that happened after the event with id `since`.
#[tracing::instrument(skip(self, since, room_id))] pub fn presence_since<'a>(
pub fn presence_since( &'a self,
&self,
room_id: &RoomId, room_id: &RoomId,
since: u64, since: u64,
) -> Result<HashMap<OwnedUserId, PresenceEvent>> { ) -> Box<dyn Iterator<Item = Result<(OwnedUserId, u64, PresenceEvent)>> + 'a> {
self.db.presence_since(room_id, since) self.db.presence_since(room_id, since)
} }
} }
pub async fn presence_handler(
mut presence_timer_receiver: mpsc::UnboundedReceiver<(OwnedUserId, Duration)>,
) -> Result<()> {
let mut presence_timers = FuturesUnordered::new();
loop {
debug!("Number of presence timers: {}", presence_timers.len());
tokio::select! {
Some((user_id, timeout)) = presence_timer_receiver.recv() => {
debug!("Adding timer for user '{user_id}': Timeout {timeout:?}");
presence_timers.push(presence_timer(user_id, timeout));
}
Some(user_id) = presence_timers.next() => {
process_presence_timer(user_id)?;
}
}
}
}
async fn presence_timer(user_id: OwnedUserId, timeout: Duration) -> OwnedUserId {
sleep(timeout).await;
user_id
}
fn process_presence_timer(user_id: OwnedUserId) -> Result<()> {
let idle_timeout = services().globals.config.presence_idle_timeout_s * 1_000;
let offline_timeout = services().globals.config.presence_offline_timeout_s * 1_000;
let mut presence_state = PresenceState::Offline;
let mut last_active_ago = None;
let mut status_msg = None;
for room_id in services().rooms.state_cache.rooms_joined(&user_id) {
let presence_event = services()
.rooms
.edus
.presence
.get_presence(&room_id?, &user_id)?;
if let Some(presence_event) = presence_event {
presence_state = presence_event.content.presence;
last_active_ago = presence_event.content.last_active_ago;
status_msg = presence_event.content.status_msg;
break;
}
}
let new_state = match (&presence_state, last_active_ago.map(|ago| u64::from(ago))) {
(PresenceState::Online, Some(ago)) if ago >= idle_timeout => {
Some(PresenceState::Unavailable)
}
(PresenceState::Unavailable, Some(ago)) if ago >= offline_timeout => {
Some(PresenceState::Offline)
}
_ => None,
};
debug!("Processed presence timer for user '{user_id}': Old state = {presence_state}, New state = {new_state:?}");
if let Some(new_state) = new_state {
for room_id in services().rooms.state_cache.rooms_joined(&user_id) {
services().rooms.edus.presence.set_presence(
&room_id?,
&user_id,
new_state.clone(),
Some(false),
last_active_ago,
status_msg.clone(),
)?;
}
}
Ok(())
}

View file

@ -26,7 +26,8 @@ use ruma::{
federation::{ federation::{
self, self,
transactions::edu::{ transactions::edu::{
DeviceListUpdateContent, Edu, ReceiptContent, ReceiptData, ReceiptMap, DeviceListUpdateContent, Edu, PresenceContent, PresenceUpdate, ReceiptContent,
ReceiptData, ReceiptMap,
}, },
}, },
OutgoingRequest, OutgoingRequest,
@ -285,6 +286,39 @@ impl Service {
.filter(|user_id| user_id.server_name() == services().globals.server_name()), .filter(|user_id| user_id.server_name() == services().globals.server_name()),
); );
// Look for presence updates in this room
let mut presence_updates = Vec::new();
for presence_data in services()
.rooms
.edus
.presence
.presence_since(&room_id, since)
{
let (user_id, count, presence_event) = presence_data?;
if count > max_edu_count {
max_edu_count = count;
}
if user_id.server_name() != services().globals.server_name() {
continue;
}
presence_updates.push(PresenceUpdate {
user_id,
presence: presence_event.content.presence,
currently_active: presence_event.content.currently_active.unwrap_or(false),
last_active_ago: presence_event.content.last_active_ago.unwrap_or(uint!(0)),
status_msg: presence_event.content.status_msg,
});
}
let presence_content = Edu::Presence(PresenceContent::new(presence_updates));
events.push(
serde_json::to_vec(&presence_content).expect("PresenceEvent can be serialized"),
);
// Look for read receipts in this room // Look for read receipts in this room
for r in services() for r in services()
.rooms .rooms

View file

@ -1,12 +1,15 @@
pub mod error; pub mod error;
use crate::{Error, Result};
use argon2::{Config, Variant}; use argon2::{Config, Variant};
use cmp::Ordering;
use rand::prelude::*; use rand::prelude::*;
use ring::digest; use ring::digest;
use ruma::{canonical_json::try_from_json_map, CanonicalJsonError, CanonicalJsonObject}; use ruma::{
canonical_json::try_from_json_map, CanonicalJsonError, CanonicalJsonObject, OwnedUserId,
};
use std::{ use std::{
cmp, fmt, cmp::Ordering,
fmt,
str::FromStr, str::FromStr,
time::{SystemTime, UNIX_EPOCH}, time::{SystemTime, UNIX_EPOCH},
}; };
@ -51,6 +54,15 @@ pub fn string_from_bytes(bytes: &[u8]) -> Result<String, std::string::FromUtf8Er
String::from_utf8(bytes.to_vec()) String::from_utf8(bytes.to_vec())
} }
/// Parses a OwnedUserId from bytes.
pub fn user_id_from_bytes(bytes: &[u8]) -> Result<OwnedUserId> {
OwnedUserId::try_from(
string_from_bytes(bytes)
.map_err(|_| Error::bad_database("Failed to parse string from bytes"))?,
)
.map_err(|_| Error::bad_database("Failed to parse user id from bytes"))
}
pub fn random_string(length: usize) -> String { pub fn random_string(length: usize) -> String {
thread_rng() thread_rng()
.sample_iter(&rand::distributions::Alphanumeric) .sample_iter(&rand::distributions::Alphanumeric)