diff --git a/src/database/key_value/rooms/edus/presence.rs b/src/database/key_value/rooms/edus/presence.rs index 904b1c44..a72f1136 100644 --- a/src/database/key_value/rooms/edus/presence.rs +++ b/src/database/key_value/rooms/edus/presence.rs @@ -1,8 +1,10 @@ -use std::collections::HashMap; +use futures_util::{stream::FuturesUnordered, StreamExt}; +use std::{collections::HashMap, time::Duration}; use ruma::{ events::presence::PresenceEvent, presence::PresenceState, OwnedUserId, RoomId, UInt, UserId, }; +use tokio::{sync::mpsc, time::sleep}; use crate::{database::KeyValueDatabase, service, services, utils, Error, Result}; @@ -109,24 +111,37 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase { Ok(hashmap) } - /* - fn presence_maintain(&self, db: Arc>) { - // TODO @M0dEx: move this to a timed tasks module + fn presence_maintain( + &self, + mut timer_receiver: mpsc::UnboundedReceiver>, + ) -> Result<()> { + let mut timers = FuturesUnordered::new(); + tokio::spawn(async move { loop { - select! { - Some(user_id) = self.presence_timers.next() { - // TODO @M0dEx: would it be better to acquire the lock outside the loop? - let guard = db.read().await; + tokio::select! { + Some(_user_id) = timers.next() => { + // TODO: Handle presence timeouts + } + Some(user_id) = timer_receiver.recv() => { + // Idle timeout + timers.push(create_presence_timer(Duration::from_secs(60), user_id.clone())); - // TODO @M0dEx: add self.presence_timers - // TODO @M0dEx: maintain presence + // Offline timeout + timers.push(create_presence_timer(Duration::from_secs(60*15) , user_id)); } } } }); + + Ok(()) } - */ +} + +async fn create_presence_timer(duration: Duration, user_id: Box) -> Box { + sleep(duration).await; + + user_id } fn parse_presence_event(bytes: &[u8]) -> Result { diff --git a/src/service/mod.rs b/src/service/mod.rs index 385dcc69..6858ce1e 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -62,7 +62,7 @@ impl Services { auth_chain: rooms::auth_chain::Service { db }, directory: rooms::directory::Service { db }, edus: rooms::edus::Service { - presence: rooms::edus::presence::Service { db }, + presence: rooms::edus::presence::Service::build(db)?, read_receipt: rooms::edus::read_receipt::Service { db }, typing: rooms::edus::typing::Service { db }, }, diff --git a/src/service/rooms/edus/presence/data.rs b/src/service/rooms/edus/presence/data.rs index 53329e08..9c016705 100644 --- a/src/service/rooms/edus/presence/data.rs +++ b/src/service/rooms/edus/presence/data.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use crate::Result; use ruma::{events::presence::PresenceEvent, OwnedUserId, RoomId, UserId}; +use tokio::sync::mpsc; pub trait Data: Send + Sync { /// Adds a presence event which will be saved until a new event replaces it. @@ -35,4 +36,7 @@ pub trait Data: Send + Sync { room_id: &RoomId, since: u64, ) -> Result>; + + fn presence_maintain(&self, timer_receiver: mpsc::UnboundedReceiver>) + -> Result<()>; } diff --git a/src/service/rooms/edus/presence/mod.rs b/src/service/rooms/edus/presence/mod.rs index 860aea18..23194dd1 100644 --- a/src/service/rooms/edus/presence/mod.rs +++ b/src/service/rooms/edus/presence/mod.rs @@ -3,14 +3,30 @@ use std::collections::HashMap; pub use data::Data; use ruma::{events::presence::PresenceEvent, OwnedUserId, RoomId, UserId}; +use tokio::sync::mpsc; -use crate::Result; +use crate::{Error, Result}; pub struct Service { pub db: &'static dyn Data, + + // Presence timers + timer_sender: mpsc::UnboundedSender>, } impl Service { + pub fn build(db: &'static dyn Data) -> Result { + let (sender, receiver) = mpsc::unbounded_channel(); + let service = Self { + db, + timer_sender: sender, + }; + + service.presence_maintain(receiver)?; + + Ok(service) + } + /// Adds a presence event which will be saved until a new event replaces it. /// /// Note: This method takes a RoomId because presence updates are always bound to rooms to @@ -21,11 +37,17 @@ impl Service { room_id: &RoomId, presence: PresenceEvent, ) -> Result<()> { + self.timer_sender + .send(user_id.into()) + .map_err(|_| Error::bad_database("Sender errored out"))?; self.db.update_presence(user_id, room_id, presence) } /// Resets the presence timeout, so the user will stay in their current presence state. pub fn ping_presence(&self, user_id: &UserId) -> Result<()> { + self.timer_sender + .send(user_id.into()) + .map_err(|_| Error::bad_database("Sender errored out"))?; self.db.ping_presence(user_id) } @@ -42,6 +64,13 @@ impl Service { self.db.get_presence_event(room_id, user_id, last_update) } + pub fn presence_maintain( + &self, + timer_receiver: mpsc::UnboundedReceiver>, + ) -> Result<()> { + self.db.presence_maintain(timer_receiver) + } + /* TODO /// Sets all users to offline who have been quiet for too long. fn _presence_maintain(