split presence data object into file; improve service encapsulations
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
ea95627dce
commit
ddc8c3b46c
5 changed files with 99 additions and 84 deletions
|
@ -851,6 +851,7 @@ style = { level = "warn", priority = -1 }
|
||||||
## some sadness
|
## some sadness
|
||||||
# trivial assertions are quite alright
|
# trivial assertions are quite alright
|
||||||
assertions_on_constants = { level = "allow", priority = 1 }
|
assertions_on_constants = { level = "allow", priority = 1 }
|
||||||
|
module_inception = { level = "allow", priority = 1 }
|
||||||
|
|
||||||
###################
|
###################
|
||||||
suspicious = { level = "warn", priority = -1 }
|
suspicious = { level = "warn", priority = -1 }
|
||||||
|
|
|
@ -4,7 +4,8 @@ use conduit::{debug_warn, utils, Error, Result};
|
||||||
use database::Map;
|
use database::Map;
|
||||||
use ruma::{events::presence::PresenceEvent, presence::PresenceState, OwnedUserId, UInt, UserId};
|
use ruma::{events::presence::PresenceEvent, presence::PresenceState, OwnedUserId, UInt, UserId};
|
||||||
|
|
||||||
use crate::{globals, presence::Presence, users, Dep};
|
use super::Presence;
|
||||||
|
use crate::{globals, users, Dep};
|
||||||
|
|
||||||
pub struct Data {
|
pub struct Data {
|
||||||
presenceid_presence: Arc<Map>,
|
presenceid_presence: Arc<Map>,
|
||||||
|
|
|
@ -1,81 +1,25 @@
|
||||||
mod data;
|
mod data;
|
||||||
|
mod presence;
|
||||||
|
|
||||||
use std::{sync::Arc, time::Duration};
|
use std::{sync::Arc, time::Duration};
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use conduit::{checked, debug, error, utils, Error, Result, Server};
|
use conduit::{checked, debug, error, Error, Result, Server};
|
||||||
use futures_util::{stream::FuturesUnordered, StreamExt};
|
use futures_util::{stream::FuturesUnordered, StreamExt};
|
||||||
use ruma::{
|
use ruma::{events::presence::PresenceEvent, presence::PresenceState, OwnedUserId, UInt, UserId};
|
||||||
events::presence::{PresenceEvent, PresenceEventContent},
|
|
||||||
presence::PresenceState,
|
|
||||||
OwnedUserId, UInt, UserId,
|
|
||||||
};
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use tokio::{sync::Mutex, time::sleep};
|
use tokio::{sync::Mutex, time::sleep};
|
||||||
|
|
||||||
use self::data::Data;
|
use self::{data::Data, presence::Presence};
|
||||||
use crate::{globals, users, Dep};
|
use crate::{globals, users, Dep};
|
||||||
|
|
||||||
/// Represents data required to be kept in order to implement the presence
|
|
||||||
/// specification.
|
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
|
||||||
pub struct Presence {
|
|
||||||
state: PresenceState,
|
|
||||||
currently_active: bool,
|
|
||||||
last_active_ts: u64,
|
|
||||||
status_msg: Option<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Presence {
|
|
||||||
#[must_use]
|
|
||||||
pub fn new(state: PresenceState, currently_active: bool, last_active_ts: u64, status_msg: Option<String>) -> Self {
|
|
||||||
Self {
|
|
||||||
state,
|
|
||||||
currently_active,
|
|
||||||
last_active_ts,
|
|
||||||
status_msg,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn from_json_bytes(bytes: &[u8]) -> Result<Self> {
|
|
||||||
serde_json::from_slice(bytes).map_err(|_| Error::bad_database("Invalid presence data in database"))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn to_json_bytes(&self) -> Result<Vec<u8>> {
|
|
||||||
serde_json::to_vec(self).map_err(|_| Error::bad_database("Could not serialize Presence to JSON"))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Creates a PresenceEvent from available data.
|
|
||||||
pub fn to_presence_event(&self, user_id: &UserId, users: &users::Service) -> Result<PresenceEvent> {
|
|
||||||
let now = utils::millis_since_unix_epoch();
|
|
||||||
let last_active_ago = if self.currently_active {
|
|
||||||
None
|
|
||||||
} else {
|
|
||||||
Some(UInt::new_saturating(now.saturating_sub(self.last_active_ts)))
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(PresenceEvent {
|
|
||||||
sender: user_id.to_owned(),
|
|
||||||
content: PresenceEventContent {
|
|
||||||
presence: self.state.clone(),
|
|
||||||
status_msg: self.status_msg.clone(),
|
|
||||||
currently_active: Some(self.currently_active),
|
|
||||||
last_active_ago,
|
|
||||||
displayname: users.displayname(user_id)?,
|
|
||||||
avatar_url: users.avatar_url(user_id)?,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct Service {
|
pub struct Service {
|
||||||
services: Services,
|
timer_sender: loole::Sender<TimerType>,
|
||||||
pub db: Data,
|
timer_receiver: Mutex<loole::Receiver<TimerType>>,
|
||||||
pub timer_sender: loole::Sender<(OwnedUserId, Duration)>,
|
|
||||||
timer_receiver: Mutex<loole::Receiver<(OwnedUserId, Duration)>>,
|
|
||||||
timeout_remote_users: bool,
|
timeout_remote_users: bool,
|
||||||
idle_timeout: u64,
|
idle_timeout: u64,
|
||||||
offline_timeout: u64,
|
offline_timeout: u64,
|
||||||
|
pub db: Data,
|
||||||
|
services: Services,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct Services {
|
struct Services {
|
||||||
|
@ -84,6 +28,8 @@ struct Services {
|
||||||
users: Dep<users::Service>,
|
users: Dep<users::Service>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type TimerType = (OwnedUserId, Duration);
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl crate::Service for Service {
|
impl crate::Service for Service {
|
||||||
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
|
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
|
||||||
|
@ -92,17 +38,17 @@ impl crate::Service for Service {
|
||||||
let offline_timeout_s = config.presence_offline_timeout_s;
|
let offline_timeout_s = config.presence_offline_timeout_s;
|
||||||
let (timer_sender, timer_receiver) = loole::unbounded();
|
let (timer_sender, timer_receiver) = loole::unbounded();
|
||||||
Ok(Arc::new(Self {
|
Ok(Arc::new(Self {
|
||||||
services: Services {
|
|
||||||
server: args.server.clone(),
|
|
||||||
globals: args.depend::<globals::Service>("globals"),
|
|
||||||
users: args.depend::<users::Service>("users"),
|
|
||||||
},
|
|
||||||
db: Data::new(&args),
|
|
||||||
timer_sender,
|
timer_sender,
|
||||||
timer_receiver: Mutex::new(timer_receiver),
|
timer_receiver: Mutex::new(timer_receiver),
|
||||||
timeout_remote_users: config.presence_timeout_remote_users,
|
timeout_remote_users: config.presence_timeout_remote_users,
|
||||||
idle_timeout: checked!(idle_timeout_s * 1_000)?,
|
idle_timeout: checked!(idle_timeout_s * 1_000)?,
|
||||||
offline_timeout: checked!(offline_timeout_s * 1_000)?,
|
offline_timeout: checked!(offline_timeout_s * 1_000)?,
|
||||||
|
db: Data::new(&args),
|
||||||
|
services: Services {
|
||||||
|
server: args.server.clone(),
|
||||||
|
globals: args.depend::<globals::Service>("globals"),
|
||||||
|
users: args.depend::<users::Service>("users"),
|
||||||
|
},
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
65
src/service/presence/presence.rs
Normal file
65
src/service/presence/presence.rs
Normal file
|
@ -0,0 +1,65 @@
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use conduit::{utils, Error, Result};
|
||||||
|
use ruma::{
|
||||||
|
events::presence::{PresenceEvent, PresenceEventContent},
|
||||||
|
presence::PresenceState,
|
||||||
|
UInt, UserId,
|
||||||
|
};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
use crate::users;
|
||||||
|
|
||||||
|
/// Represents data required to be kept in order to implement the presence
|
||||||
|
/// specification.
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||||
|
pub(super) struct Presence {
|
||||||
|
state: PresenceState,
|
||||||
|
currently_active: bool,
|
||||||
|
last_active_ts: u64,
|
||||||
|
status_msg: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Presence {
|
||||||
|
#[must_use]
|
||||||
|
pub(super) fn new(
|
||||||
|
state: PresenceState, currently_active: bool, last_active_ts: u64, status_msg: Option<String>,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
state,
|
||||||
|
currently_active,
|
||||||
|
last_active_ts,
|
||||||
|
status_msg,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn from_json_bytes(bytes: &[u8]) -> Result<Self> {
|
||||||
|
serde_json::from_slice(bytes).map_err(|_| Error::bad_database("Invalid presence data in database"))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn to_json_bytes(&self) -> Result<Vec<u8>> {
|
||||||
|
serde_json::to_vec(self).map_err(|_| Error::bad_database("Could not serialize Presence to JSON"))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Creates a PresenceEvent from available data.
|
||||||
|
pub(super) fn to_presence_event(&self, user_id: &UserId, users: &Arc<users::Service>) -> Result<PresenceEvent> {
|
||||||
|
let now = utils::millis_since_unix_epoch();
|
||||||
|
let last_active_ago = if self.currently_active {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some(UInt::new_saturating(now.saturating_sub(self.last_active_ts)))
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(PresenceEvent {
|
||||||
|
sender: user_id.to_owned(),
|
||||||
|
content: PresenceEventContent {
|
||||||
|
presence: self.state.clone(),
|
||||||
|
status_msg: self.status_msg.clone(),
|
||||||
|
currently_active: Some(self.currently_active),
|
||||||
|
last_active_ago,
|
||||||
|
displayname: users.displayname(user_id)?,
|
||||||
|
avatar_url: users.avatar_url(user_id)?,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
|
@ -26,19 +26,10 @@ use ruma::{
|
||||||
use self::data::Data;
|
use self::data::Data;
|
||||||
use crate::{admin, rooms, Dep};
|
use crate::{admin, rooms, Dep};
|
||||||
|
|
||||||
pub struct SlidingSyncCache {
|
|
||||||
lists: BTreeMap<String, SyncRequestList>,
|
|
||||||
subscriptions: BTreeMap<OwnedRoomId, sync_events::v4::RoomSubscription>,
|
|
||||||
known_rooms: BTreeMap<String, BTreeMap<OwnedRoomId, u64>>, // For every room, the roomsince number
|
|
||||||
extensions: ExtensionsConfig,
|
|
||||||
}
|
|
||||||
|
|
||||||
type DbConnections = Mutex<BTreeMap<(OwnedUserId, OwnedDeviceId, String), Arc<Mutex<SlidingSyncCache>>>>;
|
|
||||||
|
|
||||||
pub struct Service {
|
pub struct Service {
|
||||||
services: Services,
|
connections: DbConnections,
|
||||||
pub db: Data,
|
pub db: Data,
|
||||||
pub connections: DbConnections,
|
services: Services,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct Services {
|
struct Services {
|
||||||
|
@ -49,18 +40,29 @@ struct Services {
|
||||||
impl crate::Service for Service {
|
impl crate::Service for Service {
|
||||||
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
|
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
|
||||||
Ok(Arc::new(Self {
|
Ok(Arc::new(Self {
|
||||||
|
connections: StdMutex::new(BTreeMap::new()),
|
||||||
|
db: Data::new(&args),
|
||||||
services: Services {
|
services: Services {
|
||||||
admin: args.depend::<admin::Service>("admin"),
|
admin: args.depend::<admin::Service>("admin"),
|
||||||
state_cache: args.depend::<rooms::state_cache::Service>("rooms::state_cache"),
|
state_cache: args.depend::<rooms::state_cache::Service>("rooms::state_cache"),
|
||||||
},
|
},
|
||||||
db: Data::new(&args),
|
|
||||||
connections: StdMutex::new(BTreeMap::new()),
|
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
|
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type DbConnections = Mutex<BTreeMap<DbConnectionsKey, DbConnectionsVal>>;
|
||||||
|
type DbConnectionsKey = (OwnedUserId, OwnedDeviceId, String);
|
||||||
|
type DbConnectionsVal = Arc<Mutex<SlidingSyncCache>>;
|
||||||
|
|
||||||
|
struct SlidingSyncCache {
|
||||||
|
lists: BTreeMap<String, SyncRequestList>,
|
||||||
|
subscriptions: BTreeMap<OwnedRoomId, sync_events::v4::RoomSubscription>,
|
||||||
|
known_rooms: BTreeMap<String, BTreeMap<OwnedRoomId, u64>>, // For every room, the roomsince number
|
||||||
|
extensions: ExtensionsConfig,
|
||||||
|
}
|
||||||
|
|
||||||
impl Service {
|
impl Service {
|
||||||
/// Check if a user has an account on this homeserver.
|
/// Check if a user has an account on this homeserver.
|
||||||
#[inline]
|
#[inline]
|
||||||
|
|
Loading…
Add table
Reference in a new issue