Compare commits

...

4 commits

Author SHA1 Message Date
Jakub Kubík
aaf746ffdd
fix(sync): correctly update presence properties 2024-05-17 18:24:55 +02:00
Jakub Kubík
5314221e89
style(presence): use flat_map instead of matching Results in filter 2024-05-17 18:24:54 +02:00
Jakub Kubík
ae76052378
feat(presence): add granular allow configuration 2024-05-17 18:24:54 +02:00
Jakub Kubík
509afc6b46
feat(presence): implement presence functionality 2024-05-17 18:24:54 +02:00
12 changed files with 570 additions and 362 deletions

View file

@ -1,4 +1,4 @@
use crate::{services, utils, Error, Result, Ruma}; use crate::{services, Error, Result, Ruma};
use ruma::api::client::{ use ruma::api::client::{
error::ErrorKind, error::ErrorKind,
presence::{get_presence, set_presence}, presence::{get_presence, set_presence},
@ -11,29 +11,24 @@ use std::time::Duration;
pub async fn set_presence_route( pub async fn set_presence_route(
body: Ruma<set_presence::v3::Request>, body: Ruma<set_presence::v3::Request>,
) -> Result<set_presence::v3::Response> { ) -> Result<set_presence::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated"); if !services().globals.allow_local_presence() {
return Err(Error::BadRequest(
ErrorKind::Forbidden,
"Presence is disabled on this server",
));
}
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
for room_id in services().rooms.state_cache.rooms_joined(sender_user) { for room_id in services().rooms.state_cache.rooms_joined(sender_user) {
let room_id = room_id?; let room_id = room_id?;
services().rooms.edus.presence.update_presence( services().rooms.edus.presence.set_presence(
sender_user,
&room_id, &room_id,
ruma::events::presence::PresenceEvent { sender_user,
content: ruma::events::presence::PresenceEventContent { body.presence.clone(),
avatar_url: services().users.avatar_url(sender_user)?, None,
currently_active: None, None,
displayname: services().users.displayname(sender_user)?, body.status_msg.clone(),
last_active_ago: Some(
utils::millis_since_unix_epoch()
.try_into()
.expect("time is valid"),
),
presence: body.presence.clone(),
status_msg: body.status_msg.clone(),
},
sender: sender_user.clone(),
},
)?; )?;
} }
@ -48,6 +43,13 @@ pub async fn set_presence_route(
pub async fn get_presence_route( pub async fn get_presence_route(
body: Ruma<get_presence::v3::Request>, body: Ruma<get_presence::v3::Request>,
) -> Result<get_presence::v3::Response> { ) -> Result<get_presence::v3::Response> {
if !services().globals.allow_local_presence() {
return Err(Error::BadRequest(
ErrorKind::Forbidden,
"Presence is disabled on this server",
));
}
let sender_user = body.sender_user.as_ref().expect("user is authenticated"); let sender_user = body.sender_user.as_ref().expect("user is authenticated");
let mut presence_event = None; let mut presence_event = None;
@ -63,7 +65,7 @@ pub async fn get_presence_route(
.rooms .rooms
.edus .edus
.presence .presence
.get_last_presence_event(sender_user, &room_id)? .get_presence(&room_id, sender_user)?
{ {
presence_event = Some(presence); presence_event = Some(presence);
break; break;

View file

@ -1,4 +1,4 @@
use crate::{service::pdu::PduBuilder, services, utils, Error, Result, Ruma}; use crate::{service::pdu::PduBuilder, services, Error, Result, Ruma};
use ruma::{ use ruma::{
api::{ api::{
client::{ client::{
@ -10,6 +10,7 @@ use ruma::{
federation::{self, query::get_profile_information::v1::ProfileField}, federation::{self, query::get_profile_information::v1::ProfileField},
}, },
events::{room::member::RoomMemberEventContent, StateEventType, TimelineEventType}, events::{room::member::RoomMemberEventContent, StateEventType, TimelineEventType},
presence::PresenceState,
}; };
use serde_json::value::to_raw_value; use serde_json::value::to_raw_value;
use std::sync::Arc; use std::sync::Arc;
@ -89,27 +90,15 @@ pub async fn set_displayname_route(
.timeline .timeline
.build_and_append_pdu(pdu_builder, sender_user, &room_id, &state_lock) .build_and_append_pdu(pdu_builder, sender_user, &room_id, &state_lock)
.await; .await;
}
if services().globals.allow_local_presence() {
// Presence update // Presence update
services().rooms.edus.presence.update_presence( services()
sender_user, .rooms
&room_id, .edus
ruma::events::presence::PresenceEvent { .presence
content: ruma::events::presence::PresenceEventContent { .ping_presence(sender_user, PresenceState::Online)?;
avatar_url: services().users.avatar_url(sender_user)?,
currently_active: None,
displayname: services().users.displayname(sender_user)?,
last_active_ago: Some(
utils::millis_since_unix_epoch()
.try_into()
.expect("time is valid"),
),
presence: ruma::presence::PresenceState::Online,
status_msg: None,
},
sender: sender_user.clone(),
},
)?;
} }
Ok(set_display_name::v3::Response {}) Ok(set_display_name::v3::Response {})
@ -224,27 +213,15 @@ pub async fn set_avatar_url_route(
.timeline .timeline
.build_and_append_pdu(pdu_builder, sender_user, &room_id, &state_lock) .build_and_append_pdu(pdu_builder, sender_user, &room_id, &state_lock)
.await; .await;
}
if services().globals.allow_local_presence() {
// Presence update // Presence update
services().rooms.edus.presence.update_presence( services()
sender_user, .rooms
&room_id, .edus
ruma::events::presence::PresenceEvent { .presence
content: ruma::events::presence::PresenceEventContent { .ping_presence(sender_user, PresenceState::Online)?;
avatar_url: services().users.avatar_url(sender_user)?,
currently_active: None,
displayname: services().users.displayname(sender_user)?,
last_active_ago: Some(
utils::millis_since_unix_epoch()
.try_into()
.expect("time is valid"),
),
presence: ruma::presence::PresenceState::Online,
status_msg: None,
},
sender: sender_user.clone(),
},
)?;
} }
Ok(set_avatar_url::v3::Response {}) Ok(set_avatar_url::v3::Response {})

View file

@ -18,6 +18,7 @@ use ruma::{
uiaa::UiaaResponse, uiaa::UiaaResponse,
}, },
events::{ events::{
presence::PresenceEvent,
room::member::{MembershipState, RoomMemberEventContent}, room::member::{MembershipState, RoomMemberEventContent},
StateEventType, TimelineEventType, StateEventType, TimelineEventType,
}, },
@ -174,8 +175,14 @@ async fn sync_helper(
body: sync_events::v3::Request, body: sync_events::v3::Request,
// bool = caching allowed // bool = caching allowed
) -> Result<(sync_events::v3::Response, bool), Error> { ) -> Result<(sync_events::v3::Response, bool), Error> {
// TODO: match body.set_presence { // Presence update
services().rooms.edus.presence.ping_presence(&sender_user)?; if services().globals.allow_local_presence() {
services()
.rooms
.edus
.presence
.ping_presence(&sender_user, body.set_presence)?;
}
// Setup watchers, so if there's no response, we can wait for them // Setup watchers, so if there's no response, we can wait for them
let watcher = services().globals.watch(&sender_user, &sender_device); let watcher = services().globals.watch(&sender_user, &sender_device);
@ -251,39 +258,8 @@ async fn sync_helper(
joined_rooms.insert(room_id.clone(), joined_room); joined_rooms.insert(room_id.clone(), joined_room);
} }
// Take presence updates from this room if services().globals.allow_local_presence() {
for (user_id, presence) in services() process_room_presence_updates(&mut presence_updates, &room_id, since).await?;
.rooms
.edus
.presence
.presence_since(&room_id, since)?
{
match presence_updates.entry(user_id) {
Entry::Vacant(v) => {
v.insert(presence);
}
Entry::Occupied(mut o) => {
let p = o.get_mut();
// Update existing presence event with more info
p.content.presence = presence.content.presence;
if let Some(status_msg) = presence.content.status_msg {
p.content.status_msg = Some(status_msg);
}
if let Some(last_active_ago) = presence.content.last_active_ago {
p.content.last_active_ago = Some(last_active_ago);
}
if let Some(displayname) = presence.content.displayname {
p.content.displayname = Some(displayname);
}
if let Some(avatar_url) = presence.content.avatar_url {
p.content.avatar_url = Some(avatar_url);
}
if let Some(currently_active) = presence.content.currently_active {
p.content.currently_active = Some(currently_active);
}
}
}
} }
} }
} }
@ -595,6 +571,45 @@ async fn sync_helper(
} }
} }
async fn process_room_presence_updates(
presence_updates: &mut HashMap<OwnedUserId, PresenceEvent>,
room_id: &RoomId,
since: u64,
) -> Result<()> {
// Take presence updates from this room
for (user_id, _, presence_event) in services()
.rooms
.edus
.presence
.presence_since(room_id, since)
{
match presence_updates.entry(user_id) {
Entry::Vacant(slot) => {
slot.insert(presence_event);
}
Entry::Occupied(mut slot) => {
let curr_event = slot.get_mut();
let curr_content = &mut curr_event.content;
let new_content = presence_event.content;
// Update existing presence event with more info
curr_content.presence = new_content.presence;
curr_content.status_msg = new_content.status_msg.or(curr_content.status_msg.take());
curr_content.last_active_ago =
new_content.last_active_ago.or(curr_content.last_active_ago);
curr_content.displayname =
new_content.displayname.or(curr_content.displayname.take());
curr_content.avatar_url = new_content.avatar_url.or(curr_content.avatar_url.take());
curr_content.currently_active = new_content
.currently_active
.or(curr_content.currently_active);
}
}
}
Ok(())
}
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
async fn load_joined_room( async fn load_joined_room(
sender_user: &UserId, sender_user: &UserId,

View file

@ -777,7 +777,24 @@ pub async fn send_transaction_message_route(
.filter_map(|edu| serde_json::from_str::<Edu>(edu.json().get()).ok()) .filter_map(|edu| serde_json::from_str::<Edu>(edu.json().get()).ok())
{ {
match edu { match edu {
Edu::Presence(_) => {} Edu::Presence(presence) => {
if !services().globals.allow_incoming_presence() {
continue;
}
for update in presence.push {
for room_id in services().rooms.state_cache.rooms_joined(&update.user_id) {
services().rooms.edus.presence.set_presence(
&room_id?,
&update.user_id,
update.presence.clone(),
Some(update.currently_active),
Some(update.last_active_ago),
update.status_msg.clone(),
)?;
}
}
}
Edu::Receipt(receipt) => { Edu::Receipt(receipt) => {
for (room_id, room_updates) in receipt.receipts { for (room_id, room_updates) in receipt.receipts {
for (user_id, user_updates) in room_updates.read { for (user_id, user_updates) in room_updates.read {

View file

@ -83,6 +83,17 @@ pub struct Config {
pub emergency_password: Option<String>, pub emergency_password: Option<String>,
#[serde(default = "false_fn")]
pub allow_local_presence: bool,
#[serde(default = "false_fn")]
pub allow_incoming_presence: bool,
#[serde(default = "false_fn")]
pub allow_outgoing_presence: bool,
#[serde(default = "default_presence_idle_timeout_s")]
pub presence_idle_timeout_s: u64,
#[serde(default = "default_presence_offline_timeout_s")]
pub presence_offline_timeout_s: u64,
#[serde(flatten)] #[serde(flatten)]
pub catchall: BTreeMap<String, IgnoredAny>, pub catchall: BTreeMap<String, IgnoredAny>,
} }
@ -302,6 +313,14 @@ fn default_turn_ttl() -> u64 {
60 * 60 * 24 60 * 60 * 24
} }
fn default_presence_idle_timeout_s() -> u64 {
5 * 60
}
fn default_presence_offline_timeout_s() -> u64 {
15 * 60
}
// I know, it's a great name // I know, it's a great name
pub fn default_default_room_version() -> RoomVersionId { pub fn default_default_room_version() -> RoomVersionId {
RoomVersionId::V10 RoomVersionId::V10

View file

@ -1,152 +1,171 @@
use std::collections::HashMap; use std::time::Duration;
use ruma::{ use ruma::{
events::presence::PresenceEvent, presence::PresenceState, OwnedUserId, RoomId, UInt, UserId, events::presence::PresenceEvent, presence::PresenceState, OwnedUserId, RoomId, UInt, UserId,
}; };
use crate::{database::KeyValueDatabase, service, services, utils, Error, Result}; use crate::{
database::KeyValueDatabase,
service::{self, rooms::edus::presence::Presence},
services,
utils::{self, user_id_from_bytes},
Error, Result,
};
impl service::rooms::edus::presence::Data for KeyValueDatabase { impl service::rooms::edus::presence::Data for KeyValueDatabase {
fn update_presence( fn get_presence(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<PresenceEvent>> {
&self, let key = presence_key(room_id, user_id);
user_id: &UserId,
room_id: &RoomId,
presence: PresenceEvent,
) -> Result<()> {
// TODO: Remove old entry? Or maybe just wipe completely from time to time?
let count = services().globals.next_count()?.to_be_bytes(); self.roomuserid_presence
.get(&key)?
let mut presence_id = room_id.as_bytes().to_vec(); .map(|presence_bytes| -> Result<PresenceEvent> {
presence_id.push(0xff); Presence::from_json_bytes(&presence_bytes)?.to_presence_event(user_id)
presence_id.extend_from_slice(&count);
presence_id.push(0xff);
presence_id.extend_from_slice(presence.sender.as_bytes());
self.presenceid_presence.insert(
&presence_id,
&serde_json::to_vec(&presence).expect("PresenceEvent can be serialized"),
)?;
self.userid_lastpresenceupdate.insert(
user_id.as_bytes(),
&utils::millis_since_unix_epoch().to_be_bytes(),
)?;
Ok(())
}
fn ping_presence(&self, user_id: &UserId) -> Result<()> {
self.userid_lastpresenceupdate.insert(
user_id.as_bytes(),
&utils::millis_since_unix_epoch().to_be_bytes(),
)?;
Ok(())
}
fn last_presence_update(&self, user_id: &UserId) -> Result<Option<u64>> {
self.userid_lastpresenceupdate
.get(user_id.as_bytes())?
.map(|bytes| {
utils::u64_from_bytes(&bytes).map_err(|_| {
Error::bad_database("Invalid timestamp in userid_lastpresenceupdate.")
})
}) })
.transpose() .transpose()
} }
fn get_presence_event( fn ping_presence(&self, user_id: &UserId, new_state: PresenceState) -> Result<()> {
let now = utils::millis_since_unix_epoch();
let mut state_changed = false;
for room_id in services().rooms.state_cache.rooms_joined(user_id) {
let key = presence_key(&room_id?, user_id);
let presence_bytes = self.roomuserid_presence.get(&key)?;
if let Some(presence_bytes) = presence_bytes {
let presence = Presence::from_json_bytes(&presence_bytes)?;
if presence.state != new_state {
state_changed = true;
break;
}
}
}
let count = if state_changed {
services().globals.next_count()?
} else {
services().globals.current_count()?
};
for room_id in services().rooms.state_cache.rooms_joined(user_id) {
let key = presence_key(&room_id?, user_id);
let presence_bytes = self.roomuserid_presence.get(&key)?;
let new_presence = match presence_bytes {
Some(presence_bytes) => {
let mut presence = Presence::from_json_bytes(&presence_bytes)?;
presence.state = new_state.clone();
presence.currently_active = presence.state == PresenceState::Online;
presence.last_active_ts = now;
presence.last_count = count;
presence
}
None => Presence::new(
new_state.clone(),
new_state == PresenceState::Online,
now,
count,
None,
),
};
self.roomuserid_presence
.insert(&key, &new_presence.to_json_bytes()?)?;
}
let timeout = match new_state {
PresenceState::Online => services().globals.config.presence_idle_timeout_s,
_ => services().globals.config.presence_offline_timeout_s,
};
self.presence_timer_sender
.send((user_id.to_owned(), Duration::from_secs(timeout)))
.map_err(|_| Error::bad_database("Failed to add presence timer"))
}
fn set_presence(
&self, &self,
room_id: &RoomId, room_id: &RoomId,
user_id: &UserId, user_id: &UserId,
count: u64, presence_state: PresenceState,
) -> Result<Option<PresenceEvent>> { currently_active: Option<bool>,
let mut presence_id = room_id.as_bytes().to_vec(); last_active_ago: Option<UInt>,
presence_id.push(0xff); status_msg: Option<String>,
presence_id.extend_from_slice(&count.to_be_bytes()); ) -> Result<()> {
presence_id.push(0xff); let now = utils::millis_since_unix_epoch();
presence_id.extend_from_slice(user_id.as_bytes()); let last_active_ts = match last_active_ago {
Some(last_active_ago) => now.saturating_sub(last_active_ago.into()),
None => now,
};
self.presenceid_presence let key = presence_key(room_id, user_id);
.get(&presence_id)?
.map(|value| parse_presence_event(&value)) let presence = Presence::new(
.transpose() presence_state,
currently_active.unwrap_or(false),
last_active_ts,
services().globals.next_count()?,
status_msg,
);
let timeout = match presence.state {
PresenceState::Online => services().globals.config.presence_idle_timeout_s,
_ => services().globals.config.presence_offline_timeout_s,
};
self.presence_timer_sender
.send((user_id.to_owned(), Duration::from_secs(timeout)))
.map_err(|_| Error::bad_database("Failed to add presence timer"))?;
self.roomuserid_presence
.insert(&key, &presence.to_json_bytes()?)?;
Ok(())
} }
fn presence_since( fn remove_presence(&self, user_id: &UserId) -> Result<()> {
&self, for room_id in services().rooms.state_cache.rooms_joined(user_id) {
room_id: &RoomId, let key = presence_key(&room_id?, user_id);
since: u64,
) -> Result<HashMap<OwnedUserId, PresenceEvent>> {
let mut prefix = room_id.as_bytes().to_vec();
prefix.push(0xff);
let mut first_possible_edu = prefix.clone(); self.roomuserid_presence.remove(&key)?;
first_possible_edu.extend_from_slice(&(since + 1).to_be_bytes()); // +1 so we don't send the event at since
let mut hashmap = HashMap::new();
for (key, value) in self
.presenceid_presence
.iter_from(&first_possible_edu, false)
.take_while(|(key, _)| key.starts_with(&prefix))
{
let user_id = UserId::parse(
utils::string_from_bytes(
key.rsplit(|&b| b == 0xff)
.next()
.expect("rsplit always returns an element"),
)
.map_err(|_| Error::bad_database("Invalid UserId bytes in presenceid_presence."))?,
)
.map_err(|_| Error::bad_database("Invalid UserId in presenceid_presence."))?;
let presence = parse_presence_event(&value)?;
hashmap.insert(user_id, presence);
} }
Ok(hashmap) Ok(())
} }
/* fn presence_since<'a>(
fn presence_maintain(&self, db: Arc<TokioRwLock<Database>>) { &'a self,
// TODO @M0dEx: move this to a timed tasks module room_id: &RoomId,
tokio::spawn(async move { since: u64,
loop { ) -> Box<dyn Iterator<Item = (OwnedUserId, u64, PresenceEvent)> + 'a> {
select! { let prefix = [room_id.as_bytes(), &[0xff]].concat();
Some(user_id) = self.presence_timers.next() {
// TODO @M0dEx: would it be better to acquire the lock outside the loop?
let guard = db.read().await;
// TODO @M0dEx: add self.presence_timers Box::new(
// TODO @M0dEx: maintain presence self.roomuserid_presence
} .scan_prefix(prefix)
} .flat_map(
} |(key, presence_bytes)| -> Result<(OwnedUserId, u64, PresenceEvent)> {
}); let user_id = user_id_from_bytes(
key.rsplit(|byte| *byte == 0xff).next().ok_or_else(|| {
Error::bad_database("No UserID bytes in presence key")
})?,
)?;
let presence = Presence::from_json_bytes(&presence_bytes)?;
let presence_event = presence.to_presence_event(&user_id)?;
Ok((user_id, presence.last_count, presence_event))
},
)
.filter(move |(_, count, _)| *count > since),
)
} }
*/
} }
fn parse_presence_event(bytes: &[u8]) -> Result<PresenceEvent> { #[inline]
let mut presence: PresenceEvent = serde_json::from_slice(bytes) fn presence_key(room_id: &RoomId, user_id: &UserId) -> Vec<u8> {
.map_err(|_| Error::bad_database("Invalid presence event in db."))?; [room_id.as_bytes(), &[0xff], user_id.as_bytes()].concat()
let current_timestamp: UInt = utils::millis_since_unix_epoch()
.try_into()
.expect("time is valid");
if presence.content.presence == PresenceState::Online {
// Don't set last_active_ago when the user is online
presence.content.last_active_ago = None;
} else {
// Convert from timestamp to duration
presence.content.last_active_ago = presence
.content
.last_active_ago
.map(|timestamp| current_timestamp - timestamp);
}
Ok(presence)
} }

View file

@ -2,8 +2,8 @@ pub mod abstraction;
pub mod key_value; pub mod key_value;
use crate::{ use crate::{
service::rooms::timeline::PduCount, services, utils, Config, Error, PduEvent, Result, Services, service::rooms::{edus::presence::presence_handler, timeline::PduCount},
SERVICES, services, utils, Config, Error, PduEvent, Result, Services, SERVICES,
}; };
use abstraction::{KeyValueDatabaseEngine, KvTree}; use abstraction::{KeyValueDatabaseEngine, KvTree};
use directories::ProjectDirs; use directories::ProjectDirs;
@ -29,7 +29,7 @@ use std::{
sync::{Arc, Mutex, RwLock}, sync::{Arc, Mutex, RwLock},
time::Duration, time::Duration,
}; };
use tokio::time::interval; use tokio::{sync::mpsc, time::interval};
use tracing::{debug, error, info, warn}; use tracing::{debug, error, info, warn};
@ -71,8 +71,7 @@ pub struct KeyValueDatabase {
pub(super) readreceiptid_readreceipt: Arc<dyn KvTree>, // ReadReceiptId = RoomId + Count + UserId pub(super) readreceiptid_readreceipt: Arc<dyn KvTree>, // ReadReceiptId = RoomId + Count + UserId
pub(super) roomuserid_privateread: Arc<dyn KvTree>, // RoomUserId = Room + User, PrivateRead = Count pub(super) roomuserid_privateread: Arc<dyn KvTree>, // RoomUserId = Room + User, PrivateRead = Count
pub(super) roomuserid_lastprivatereadupdate: Arc<dyn KvTree>, // LastPrivateReadUpdate = Count pub(super) roomuserid_lastprivatereadupdate: Arc<dyn KvTree>, // LastPrivateReadUpdate = Count
pub(super) presenceid_presence: Arc<dyn KvTree>, // PresenceId = RoomId + Count + UserId pub(super) roomuserid_presence: Arc<dyn KvTree>,
pub(super) userid_lastpresenceupdate: Arc<dyn KvTree>, // LastPresenceUpdate = Count
//pub rooms: rooms::Rooms, //pub rooms: rooms::Rooms,
pub(super) pduid_pdu: Arc<dyn KvTree>, // PduId = ShortRoomId + Count pub(super) pduid_pdu: Arc<dyn KvTree>, // PduId = ShortRoomId + Count
@ -170,6 +169,7 @@ pub struct KeyValueDatabase {
pub(super) our_real_users_cache: RwLock<HashMap<OwnedRoomId, Arc<HashSet<OwnedUserId>>>>, pub(super) our_real_users_cache: RwLock<HashMap<OwnedRoomId, Arc<HashSet<OwnedUserId>>>>,
pub(super) appservice_in_room_cache: RwLock<HashMap<OwnedRoomId, HashMap<String, bool>>>, pub(super) appservice_in_room_cache: RwLock<HashMap<OwnedRoomId, HashMap<String, bool>>>,
pub(super) lasttimelinecount_cache: Mutex<HashMap<OwnedRoomId, PduCount>>, pub(super) lasttimelinecount_cache: Mutex<HashMap<OwnedRoomId, PduCount>>,
pub(super) presence_timer_sender: Arc<mpsc::UnboundedSender<(OwnedUserId, Duration)>>,
} }
impl KeyValueDatabase { impl KeyValueDatabase {
@ -273,6 +273,8 @@ impl KeyValueDatabase {
error!(?config.max_request_size, "Max request size is less than 1KB. Please increase it."); error!(?config.max_request_size, "Max request size is less than 1KB. Please increase it.");
} }
let (presence_sender, presence_receiver) = mpsc::unbounded_channel();
let db_raw = Box::new(Self { let db_raw = Box::new(Self {
_db: builder.clone(), _db: builder.clone(),
userid_password: builder.open_tree("userid_password")?, userid_password: builder.open_tree("userid_password")?,
@ -299,8 +301,7 @@ impl KeyValueDatabase {
roomuserid_privateread: builder.open_tree("roomuserid_privateread")?, // "Private" read receipt roomuserid_privateread: builder.open_tree("roomuserid_privateread")?, // "Private" read receipt
roomuserid_lastprivatereadupdate: builder roomuserid_lastprivatereadupdate: builder
.open_tree("roomuserid_lastprivatereadupdate")?, .open_tree("roomuserid_lastprivatereadupdate")?,
presenceid_presence: builder.open_tree("presenceid_presence")?, roomuserid_presence: builder.open_tree("roomuserid_presence")?,
userid_lastpresenceupdate: builder.open_tree("userid_lastpresenceupdate")?,
pduid_pdu: builder.open_tree("pduid_pdu")?, pduid_pdu: builder.open_tree("pduid_pdu")?,
eventid_pduid: builder.open_tree("eventid_pduid")?, eventid_pduid: builder.open_tree("eventid_pduid")?,
roomid_pduleaves: builder.open_tree("roomid_pduleaves")?, roomid_pduleaves: builder.open_tree("roomid_pduleaves")?,
@ -392,6 +393,7 @@ impl KeyValueDatabase {
our_real_users_cache: RwLock::new(HashMap::new()), our_real_users_cache: RwLock::new(HashMap::new()),
appservice_in_room_cache: RwLock::new(HashMap::new()), appservice_in_room_cache: RwLock::new(HashMap::new()),
lasttimelinecount_cache: Mutex::new(HashMap::new()), lasttimelinecount_cache: Mutex::new(HashMap::new()),
presence_timer_sender: Arc::new(presence_sender),
}); });
let db = Box::leak(db_raw); let db = Box::leak(db_raw);
@ -962,9 +964,6 @@ impl KeyValueDatabase {
); );
} }
// This data is probably outdated
db.presenceid_presence.clear()?;
services().admin.start_handler(); services().admin.start_handler();
// Set emergency access for the conduit user // Set emergency access for the conduit user
@ -989,6 +988,9 @@ impl KeyValueDatabase {
if services().globals.allow_check_for_updates() { if services().globals.allow_check_for_updates() {
Self::start_check_for_updates_task(); Self::start_check_for_updates_task();
} }
if services().globals.allow_local_presence() {
Self::start_presence_handler(presence_receiver).await;
}
Ok(()) Ok(())
} }
@ -1062,8 +1064,7 @@ impl KeyValueDatabase {
pub async fn start_cleanup_task() { pub async fn start_cleanup_task() {
#[cfg(unix)] #[cfg(unix)]
use tokio::signal::unix::{signal, SignalKind}; use tokio::signal::unix::{signal, SignalKind};
use tokio::time::Instant;
use std::time::{Duration, Instant};
let timer_interval = let timer_interval =
Duration::from_secs(services().globals.config.cleanup_second_interval as u64); Duration::from_secs(services().globals.config.cleanup_second_interval as u64);
@ -1098,6 +1099,17 @@ impl KeyValueDatabase {
} }
}); });
} }
pub async fn start_presence_handler(
presence_timer_receiver: mpsc::UnboundedReceiver<(OwnedUserId, Duration)>,
) {
tokio::spawn(async move {
match presence_handler(presence_timer_receiver).await {
Ok(()) => warn!("Presence maintenance task finished"),
Err(e) => error!("Presence maintenance task finished with error: {e}"),
}
});
}
} }
/// Sets the emergency password and push rules for the @conduit account in case emergency password is set /// Sets the emergency password and push rules for the @conduit account in case emergency password is set

View file

@ -353,6 +353,26 @@ impl Service {
&self.config.emergency_password &self.config.emergency_password
} }
pub fn allow_local_presence(&self) -> bool {
self.config.allow_local_presence
}
pub fn allow_incoming_presence(&self) -> bool {
self.config.allow_incoming_presence
}
pub fn allow_outcoming_presence(&self) -> bool {
self.config.allow_outgoing_presence
}
pub fn presence_idle_timeout_s(&self) -> u64 {
self.config.presence_idle_timeout_s
}
pub fn presence_offline_timeout_s(&self) -> u64 {
self.config.presence_offline_timeout_s
}
pub fn supported_room_versions(&self) -> Vec<RoomVersionId> { pub fn supported_room_versions(&self) -> Vec<RoomVersionId> {
let mut room_versions: Vec<RoomVersionId> = vec![]; let mut room_versions: Vec<RoomVersionId> = vec![];
room_versions.extend(self.stable_room_versions.clone()); room_versions.extend(self.stable_room_versions.clone());

View file

@ -1,38 +1,33 @@
use std::collections::HashMap;
use crate::Result; use crate::Result;
use ruma::{events::presence::PresenceEvent, OwnedUserId, RoomId, UserId}; use ruma::{
events::presence::PresenceEvent, presence::PresenceState, OwnedUserId, RoomId, UInt, UserId,
};
pub trait Data: Send + Sync { pub trait Data: Send + Sync {
/// Returns the latest presence event for the given user in the given room.
fn get_presence(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<PresenceEvent>>;
/// Pings the presence of the given user in the given room, setting the specified state.
fn ping_presence(&self, user_id: &UserId, new_state: PresenceState) -> Result<()>;
/// Adds a presence event which will be saved until a new event replaces it. /// Adds a presence event which will be saved until a new event replaces it.
/// fn set_presence(
/// Note: This method takes a RoomId because presence updates are always bound to rooms to
/// make sure users outside these rooms can't see them.
fn update_presence(
&self, &self,
user_id: &UserId,
room_id: &RoomId, room_id: &RoomId,
presence: PresenceEvent, user_id: &UserId,
presence_state: PresenceState,
currently_active: Option<bool>,
last_active_ago: Option<UInt>,
status_msg: Option<String>,
) -> Result<()>; ) -> Result<()>;
/// Resets the presence timeout, so the user will stay in their current presence state. /// Removes the presence record for the given user from the database.
fn ping_presence(&self, user_id: &UserId) -> Result<()>; fn remove_presence(&self, user_id: &UserId) -> Result<()>;
/// Returns the timestamp of the last presence update of this user in millis since the unix epoch.
fn last_presence_update(&self, user_id: &UserId) -> Result<Option<u64>>;
/// Returns the presence event with correct last_active_ago.
fn get_presence_event(
&self,
room_id: &RoomId,
user_id: &UserId,
count: u64,
) -> Result<Option<PresenceEvent>>;
/// Returns the most recent presence updates that happened after the event with id `since`. /// Returns the most recent presence updates that happened after the event with id `since`.
fn presence_since( fn presence_since<'a>(
&self, &'a self,
room_id: &RoomId, room_id: &RoomId,
since: u64, since: u64,
) -> Result<HashMap<OwnedUserId, PresenceEvent>>; ) -> Box<dyn Iterator<Item = (OwnedUserId, u64, PresenceEvent)> + 'a>;
} }

View file

@ -1,125 +1,211 @@
mod data; mod data;
use std::collections::HashMap;
use std::time::Duration;
pub use data::Data; pub use data::Data;
use ruma::{events::presence::PresenceEvent, OwnedUserId, RoomId, UserId}; use futures_util::{stream::FuturesUnordered, StreamExt};
use ruma::{
events::presence::{PresenceEvent, PresenceEventContent},
presence::PresenceState,
OwnedUserId, RoomId, UInt, UserId,
};
use serde::{Deserialize, Serialize};
use tokio::{sync::mpsc, time::sleep};
use tracing::debug;
use crate::Result; use crate::{services, utils, Error, Result};
/// Represents data required to be kept in order to implement the presence specification.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Presence {
pub state: PresenceState,
pub currently_active: bool,
pub last_active_ts: u64,
pub last_count: u64,
pub status_msg: Option<String>,
}
impl Presence {
pub fn new(
state: PresenceState,
currently_active: bool,
last_active_ts: u64,
last_count: u64,
status_msg: Option<String>,
) -> Self {
Self {
state,
currently_active,
last_active_ts,
last_count,
status_msg,
}
}
pub fn from_json_bytes(bytes: &[u8]) -> Result<Self> {
serde_json::from_slice(bytes)
.map_err(|_| Error::bad_database("Invalid presence data in database"))
}
pub fn to_json_bytes(&self) -> Result<Vec<u8>> {
serde_json::to_vec(self)
.map_err(|_| Error::bad_database("Could not serialize Presence to JSON"))
}
/// Creates a PresenceEvent from available data.
pub fn to_presence_event(&self, user_id: &UserId) -> Result<PresenceEvent> {
let now = utils::millis_since_unix_epoch();
let last_active_ago = if self.currently_active {
None
} else {
Some(UInt::new_saturating(
now.saturating_sub(self.last_active_ts),
))
};
Ok(PresenceEvent {
sender: user_id.to_owned(),
content: PresenceEventContent {
presence: self.state.clone(),
status_msg: self.status_msg.clone(),
currently_active: Some(self.currently_active),
last_active_ago,
displayname: services().users.displayname(user_id)?,
avatar_url: services().users.avatar_url(user_id)?,
},
})
}
}
pub struct Service { pub struct Service {
pub db: &'static dyn Data, pub db: &'static dyn Data,
} }
impl Service { impl Service {
/// Adds a presence event which will be saved until a new event replaces it. /// Returns the latest presence event for the given user in the given room.
/// pub fn get_presence(
/// Note: This method takes a RoomId because presence updates are always bound to rooms to
/// make sure users outside these rooms can't see them.
pub fn update_presence(
&self, &self,
_user_id: &UserId, room_id: &RoomId,
_room_id: &RoomId, user_id: &UserId,
_presence: PresenceEvent,
) -> Result<()> {
// self.db.update_presence(user_id, room_id, presence)
Ok(())
}
/// Resets the presence timeout, so the user will stay in their current presence state.
pub fn ping_presence(&self, _user_id: &UserId) -> Result<()> {
// self.db.ping_presence(user_id)
Ok(())
}
pub fn get_last_presence_event(
&self,
_user_id: &UserId,
_room_id: &RoomId,
) -> Result<Option<PresenceEvent>> { ) -> Result<Option<PresenceEvent>> {
// let last_update = match self.db.last_presence_update(user_id)? { self.db.get_presence(room_id, user_id)
// Some(last) => last,
// None => return Ok(None),
// };
// self.db.get_presence_event(room_id, user_id, last_update)
Ok(None)
} }
/* TODO /// Pings the presence of the given user in the given room, setting the specified state.
/// Sets all users to offline who have been quiet for too long. pub fn ping_presence(&self, user_id: &UserId, new_state: PresenceState) -> Result<()> {
fn _presence_maintain( self.db.ping_presence(user_id, new_state)
}
/// Adds a presence event which will be saved until a new event replaces it.
pub fn set_presence(
&self, &self,
rooms: &super::Rooms, room_id: &RoomId,
globals: &super::super::globals::Globals, user_id: &UserId,
presence_state: PresenceState,
currently_active: Option<bool>,
last_active_ago: Option<UInt>,
status_msg: Option<String>,
) -> Result<()> { ) -> Result<()> {
let current_timestamp = utils::millis_since_unix_epoch(); self.db.set_presence(
room_id,
user_id,
presence_state,
currently_active,
last_active_ago,
status_msg,
)
}
for (user_id_bytes, last_timestamp) in self /// Removes the presence record for the given user from the database.
.userid_lastpresenceupdate pub fn remove_presence(&self, user_id: &UserId) -> Result<()> {
.iter() self.db.remove_presence(user_id)
.filter_map(|(k, bytes)| { }
Some((
k,
utils::u64_from_bytes(&bytes)
.map_err(|_| {
Error::bad_database("Invalid timestamp in userid_lastpresenceupdate.")
})
.ok()?,
))
})
.take_while(|(_, timestamp)| current_timestamp.saturating_sub(*timestamp) > 5 * 60_000)
// 5 Minutes
{
// Send new presence events to set the user offline
let count = globals.next_count()?.to_be_bytes();
let user_id: Box<_> = utils::string_from_bytes(&user_id_bytes)
.map_err(|_| {
Error::bad_database("Invalid UserId bytes in userid_lastpresenceupdate.")
})?
.try_into()
.map_err(|_| Error::bad_database("Invalid UserId in userid_lastpresenceupdate."))?;
for room_id in rooms.rooms_joined(&user_id).filter_map(|r| r.ok()) {
let mut presence_id = room_id.as_bytes().to_vec();
presence_id.push(0xff);
presence_id.extend_from_slice(&count);
presence_id.push(0xff);
presence_id.extend_from_slice(&user_id_bytes);
self.presenceid_presence.insert(
&presence_id,
&serde_json::to_vec(&PresenceEvent {
content: PresenceEventContent {
avatar_url: None,
currently_active: None,
displayname: None,
last_active_ago: Some(
last_timestamp.try_into().expect("time is valid"),
),
presence: PresenceState::Offline,
status_msg: None,
},
sender: user_id.to_owned(),
})
.expect("PresenceEvent can be serialized"),
)?;
}
self.userid_lastpresenceupdate.insert(
user_id.as_bytes(),
&utils::millis_since_unix_epoch().to_be_bytes(),
)?;
}
Ok(())
}*/
/// Returns the most recent presence updates that happened after the event with id `since`. /// Returns the most recent presence updates that happened after the event with id `since`.
pub fn presence_since( pub fn presence_since(
&self, &self,
_room_id: &RoomId, room_id: &RoomId,
_since: u64, since: u64,
) -> Result<HashMap<OwnedUserId, PresenceEvent>> { ) -> Box<dyn Iterator<Item = (OwnedUserId, u64, PresenceEvent)>> {
// self.db.presence_since(room_id, since) self.db.presence_since(room_id, since)
Ok(HashMap::new())
} }
} }
pub async fn presence_handler(
mut presence_timer_receiver: mpsc::UnboundedReceiver<(OwnedUserId, Duration)>,
) -> Result<()> {
let mut presence_timers = FuturesUnordered::new();
loop {
debug!("Number of presence timers: {}", presence_timers.len());
tokio::select! {
Some((user_id, timeout)) = presence_timer_receiver.recv() => {
debug!("Adding timer for user '{user_id}': Timeout {timeout:?}");
presence_timers.push(presence_timer(user_id, timeout));
}
Some(user_id) = presence_timers.next() => {
process_presence_timer(user_id)?;
}
}
}
}
async fn presence_timer(user_id: OwnedUserId, timeout: Duration) -> OwnedUserId {
sleep(timeout).await;
user_id
}
fn process_presence_timer(user_id: OwnedUserId) -> Result<()> {
let idle_timeout = services().globals.config.presence_idle_timeout_s * 1_000;
let offline_timeout = services().globals.config.presence_offline_timeout_s * 1_000;
let mut presence_state = PresenceState::Offline;
let mut last_active_ago = None;
let mut status_msg = None;
for room_id in services().rooms.state_cache.rooms_joined(&user_id) {
let presence_event = services()
.rooms
.edus
.presence
.get_presence(&room_id?, &user_id)?;
if let Some(presence_event) = presence_event {
presence_state = presence_event.content.presence;
last_active_ago = presence_event.content.last_active_ago;
status_msg = presence_event.content.status_msg;
break;
}
}
let new_state = match (&presence_state, last_active_ago.map(u64::from)) {
(PresenceState::Online, Some(ago)) if ago >= idle_timeout => {
Some(PresenceState::Unavailable)
}
(PresenceState::Unavailable, Some(ago)) if ago >= offline_timeout => {
Some(PresenceState::Offline)
}
_ => None,
};
debug!("Processed presence timer for user '{user_id}': Old state = {presence_state}, New state = {new_state:?}");
if let Some(new_state) = new_state {
for room_id in services().rooms.state_cache.rooms_joined(&user_id) {
services().rooms.edus.presence.set_presence(
&room_id?,
&user_id,
new_state.clone(),
Some(false),
last_active_ago,
status_msg.clone(),
)?;
}
}
Ok(())
}

View file

@ -26,7 +26,8 @@ use ruma::{
federation::{ federation::{
self, self,
transactions::edu::{ transactions::edu::{
DeviceListUpdateContent, Edu, ReceiptContent, ReceiptData, ReceiptMap, DeviceListUpdateContent, Edu, PresenceContent, PresenceUpdate, ReceiptContent,
ReceiptData, ReceiptMap,
}, },
}, },
OutgoingRequest, OutgoingRequest,
@ -285,6 +286,39 @@ impl Service {
.filter(|user_id| user_id.server_name() == services().globals.server_name()), .filter(|user_id| user_id.server_name() == services().globals.server_name()),
); );
if services().globals.allow_outcoming_presence() {
// Look for presence updates in this room
let mut presence_updates = Vec::new();
for (user_id, count, presence_event) in services()
.rooms
.edus
.presence
.presence_since(&room_id, since)
{
if count > max_edu_count {
max_edu_count = count;
}
if user_id.server_name() != services().globals.server_name() {
continue;
}
presence_updates.push(PresenceUpdate {
user_id,
presence: presence_event.content.presence,
currently_active: presence_event.content.currently_active.unwrap_or(false),
last_active_ago: presence_event.content.last_active_ago.unwrap_or(uint!(0)),
status_msg: presence_event.content.status_msg,
});
}
let presence_content = Edu::Presence(PresenceContent::new(presence_updates));
events.push(
serde_json::to_vec(&presence_content).expect("PresenceEvent can be serialized"),
);
}
// Look for read receipts in this room // Look for read receipts in this room
for r in services() for r in services()
.rooms .rooms

View file

@ -1,12 +1,15 @@
pub mod error; pub mod error;
use crate::{Error, Result};
use argon2::{Config, Variant}; use argon2::{Config, Variant};
use cmp::Ordering;
use rand::prelude::*; use rand::prelude::*;
use ring::digest; use ring::digest;
use ruma::{canonical_json::try_from_json_map, CanonicalJsonError, CanonicalJsonObject}; use ruma::{
canonical_json::try_from_json_map, CanonicalJsonError, CanonicalJsonObject, OwnedUserId,
};
use std::{ use std::{
cmp, fmt, cmp::Ordering,
fmt,
str::FromStr, str::FromStr,
time::{SystemTime, UNIX_EPOCH}, time::{SystemTime, UNIX_EPOCH},
}; };
@ -51,6 +54,15 @@ pub fn string_from_bytes(bytes: &[u8]) -> Result<String, std::string::FromUtf8Er
String::from_utf8(bytes.to_vec()) String::from_utf8(bytes.to_vec())
} }
/// Parses a OwnedUserId from bytes.
pub fn user_id_from_bytes(bytes: &[u8]) -> Result<OwnedUserId> {
OwnedUserId::try_from(
string_from_bytes(bytes)
.map_err(|_| Error::bad_database("Failed to parse string from bytes"))?,
)
.map_err(|_| Error::bad_database("Failed to parse user id from bytes"))
}
pub fn random_string(length: usize) -> String { pub fn random_string(length: usize) -> String {
thread_rng() thread_rng()
.sample_iter(&rand::distributions::Alphanumeric) .sample_iter(&rand::distributions::Alphanumeric)