feat(presence): start work on cleanup task

This commit is contained in:
Jakub Kubík 2022-11-21 21:24:37 +01:00
parent 5e4e4d0089
commit f9d10e8f41
No known key found for this signature in database
GPG key ID: D3A0D5D60F3A173F
5 changed files with 52 additions and 7 deletions

View file

@ -81,6 +81,11 @@ pub struct Config {
#[serde(default = "default_presence_offline_timeout")] #[serde(default = "default_presence_offline_timeout")]
pub presence_offline_timeout: u64, pub presence_offline_timeout: u64,
#[serde(default = "default_presence_cleanup_period")]
pub presence_cleanup_period: u64,
#[serde(default = "default_presence_cleanup_limit")]
pub presence_cleanup_limit: u64,
#[serde(flatten)] #[serde(flatten)]
pub catchall: BTreeMap<String, IgnoredAny>, pub catchall: BTreeMap<String, IgnoredAny>,
} }
@ -263,11 +268,19 @@ fn default_turn_ttl() -> u64 {
} }
fn default_presence_idle_timeout() -> u64 { fn default_presence_idle_timeout() -> u64 {
1 * 60 * 1000 1 * 60
} }
fn default_presence_offline_timeout() -> u64 { fn default_presence_offline_timeout() -> u64 {
15 * 60 * 1000 30 * 60
}
fn default_presence_cleanup_period() -> u64 {
24 * 60 * 60
}
fn default_presence_cleanup_limit() -> u64 {
24 * 60 * 60
} }
// I know, it's a great name // I know, it's a great name

View file

@ -220,11 +220,13 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase {
) -> Result<()> { ) -> Result<()> {
let mut timers = FuturesUnordered::new(); let mut timers = FuturesUnordered::new();
let mut timers_timestamp: HashMap<OwnedUserId, u64> = HashMap::new(); let mut timers_timestamp: HashMap<OwnedUserId, u64> = HashMap::new();
let idle_timeout = Duration::from_secs(services().globals.presence_idle_timeout());
let offline_timeout = Duration::from_secs(services().globals.presence_offline_timeout());
// TODO: Get rid of this hack // TODO: Get rid of this hack (hinting correct types to rustc)
timers.push(create_presence_timer( timers.push(create_presence_timer(
Duration::from_secs(60), Duration::from_secs(1),
user_id!("@test:test.com").to_owned(), UserId::parse_with_server_name("conduit", services().globals.server_name()).expect("Conduit user always exists")
)); ));
tokio::spawn(async move { tokio::spawn(async move {
@ -260,6 +262,7 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase {
} }
Some(user_id) = timer_receiver.recv() => { Some(user_id) = timer_receiver.recv() => {
let now = millis_since_unix_epoch(); let now = millis_since_unix_epoch();
// Do not create timers if we added timers recently
let should_send = match timers_timestamp.entry(user_id.to_owned()) { let should_send = match timers_timestamp.entry(user_id.to_owned()) {
Entry::Occupied(mut entry) => { Entry::Occupied(mut entry) => {
if now - entry.get() > 15 * 1000 { if now - entry.get() > 15 * 1000 {
@ -280,10 +283,10 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase {
} }
// Idle timeout // Idle timeout
timers.push(create_presence_timer(Duration::from_secs(60), user_id.clone())); timers.push(create_presence_timer(idle_timeout, user_id.clone()));
// Offline timeout // Offline timeout
timers.push(create_presence_timer(Duration::from_secs(60*15) , user_id.clone())); timers.push(create_presence_timer(offline_timeout, user_id.clone()));
info!("Added timers for user '{}' ({})", user_id, timers.len()); info!("Added timers for user '{}' ({})", user_id, timers.len());
} }
@ -293,6 +296,21 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase {
Ok(()) Ok(())
} }
fn presence_cleanup(&self) -> Result<()> {
let period = Duration::from_secs(services().globals.presence_cleanup_period());
let age_limit = Duration::from_secs(services().globals.presence_cleanup_limit());
tokio::spawn(async move {
loop {
// TODO: Cleanup
sleep(period).await;
}
});
Ok(())
}
} }
async fn create_presence_timer(duration: Duration, user_id: OwnedUserId) -> OwnedUserId { async fn create_presence_timer(duration: Duration, user_id: OwnedUserId) -> OwnedUserId {

View file

@ -294,6 +294,14 @@ impl Service {
self.config.presence_offline_timeout self.config.presence_offline_timeout
} }
pub fn presence_cleanup_period(&self) -> u64 {
self.config.presence_cleanup_period
}
pub fn presence_cleanup_limit(&self) -> u64 {
self.config.presence_cleanup_limit
}
pub fn supported_room_versions(&self) -> Vec<RoomVersionId> { pub fn supported_room_versions(&self) -> Vec<RoomVersionId> {
let mut room_versions: Vec<RoomVersionId> = vec![]; let mut room_versions: Vec<RoomVersionId> = vec![];
room_versions.extend(self.stable_room_versions.clone()); room_versions.extend(self.stable_room_versions.clone());

View file

@ -42,4 +42,6 @@ pub trait Data: Send + Sync {
fn presence_maintain(&self, timer_receiver: mpsc::UnboundedReceiver<OwnedUserId>) fn presence_maintain(&self, timer_receiver: mpsc::UnboundedReceiver<OwnedUserId>)
-> Result<()>; -> Result<()>;
fn presence_cleanup(&self) -> Result<()>;
} }

View file

@ -98,6 +98,10 @@ impl Service {
self.db.presence_maintain(timer_receiver) self.db.presence_maintain(timer_receiver)
} }
fn presence_cleanup(&self) -> Result<()> {
self.db.presence_cleanup()
}
/// Spawns a timer for the user used by the maintenance task /// Spawns a timer for the user used by the maintenance task
fn spawn_timer(&self, user_id: &UserId) -> Result<()> { fn spawn_timer(&self, user_id: &UserId) -> Result<()> {
self.timer_sender self.timer_sender