feat(presence): restructure database trees for presence

This commit is contained in:
Jakub Kubík 2022-11-17 22:48:57 +01:00
parent cd5a83d4e2
commit deeffde679
No known key found for this signature in database
GPG key ID: D3A0D5D60F3A173F
3 changed files with 52 additions and 41 deletions

View file

@ -6,7 +6,29 @@ use ruma::{
};
use tokio::{sync::mpsc, time::sleep};
use crate::{database::KeyValueDatabase, service, services, utils, Error, Result};
use crate::{
database::KeyValueDatabase, service, services, utils, utils::u64_from_bytes, Error, Result,
};
use crate::utils::millis_since_unix_epoch;
pub struct PresenceUpdate {
count: u64,
timestamp: u64,
}
impl PresenceUpdate {
fn to_be_bytes(&self) -> &[u8] {
&*([self.count.to_be_bytes(), self.timestamp.to_be_bytes()].concat())
}
fn from_be_bytes(bytes: &[u8]) -> Result<Self> {
let (count_bytes, timestamp_bytes) = bytes.split_at(bytes.len() / 2);
Ok(Self {
count: u64_from_bytes(count_bytes)?,
timestamp: u64_from_bytes(timestamp_bytes)?,
})
}
}
impl service::rooms::edus::presence::Data for KeyValueDatabase {
fn update_presence(
@ -15,45 +37,41 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase {
room_id: &RoomId,
presence: PresenceEvent,
) -> Result<()> {
// TODO: Remove old entry? Or maybe just wipe completely from time to time?
let mut roomuser_id = [room_id.as_bytes(), 0xff, user_id.as_bytes()].concat();
let count = services().globals.next_count()?.to_be_bytes();
let mut presence_id = room_id.as_bytes().to_vec();
presence_id.push(0xff);
presence_id.extend_from_slice(&count);
presence_id.push(0xff);
presence_id.extend_from_slice(presence.sender.as_bytes());
self.presenceid_presence.insert(
&presence_id,
&serde_json::to_vec(&presence).expect("PresenceEvent can be serialized"),
self.roomuserid_presenceevent.insert(
&roomuser_id,
&serde_json::to_vec(&presence)?,
)?;
self.userid_lastpresenceupdate.insert(
self.userid_presenceupdate.insert(
user_id.as_bytes(),
&utils::millis_since_unix_epoch().to_be_bytes(),
PresenceUpdate {
count: services().globals.next_count()?,
timestamp: millis_since_unix_epoch(),
}.to_be_bytes(),
)?;
Ok(())
}
fn ping_presence(&self, user_id: &UserId) -> Result<()> {
self.userid_lastpresenceupdate.insert(
self.userid_presenceupdate.insert(
user_id.as_bytes(),
&utils::millis_since_unix_epoch().to_be_bytes(),
PresenceUpdate {
count: services().globals.current_count()?,
timestamp: millis_since_unix_epoch(),
}.to_be_bytes()
)?;
Ok(())
}
fn last_presence_update(&self, user_id: &UserId) -> Result<Option<u64>> {
self.userid_lastpresenceupdate
self.userid_presenceupdate
.get(user_id.as_bytes())?
.map(|bytes| {
utils::u64_from_bytes(&bytes).map_err(|_| {
Error::bad_database("Invalid timestamp in userid_lastpresenceupdate.")
})
PresenceUpdate::from_be_bytes(bytes)?.timestamp
})
.transpose()
}
@ -62,17 +80,12 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase {
&self,
room_id: &RoomId,
user_id: &UserId,
count: u64,
presence_timestamp: u64
) -> Result<Option<PresenceEvent>> {
let mut presence_id = room_id.as_bytes().to_vec();
presence_id.push(0xff);
presence_id.extend_from_slice(&count.to_be_bytes());
presence_id.push(0xff);
presence_id.extend_from_slice(user_id.as_bytes());
self.presenceid_presence
.get(&presence_id)?
.map(|value| parse_presence_event(&value))
let mut roomuser_id = [room_id.as_bytes(), 0xff, user_id.as_bytes()].concat();
self.roomuserid_presenceevent
.get(&roomuser_id)?
.map(|value| parse_presence_event(&value, presence_timestamp))
.transpose()
}
@ -144,13 +157,11 @@ async fn create_presence_timer(duration: Duration, user_id: Box<UserId>) -> Box<
user_id
}
fn parse_presence_event(bytes: &[u8]) -> Result<PresenceEvent> {
fn parse_presence_event(bytes: &[u8], presence_timestamp: u64) -> Result<PresenceEvent> {
let mut presence: PresenceEvent = serde_json::from_slice(bytes)
.map_err(|_| Error::bad_database("Invalid presence event in db."))?;
let current_timestamp: UInt = utils::millis_since_unix_epoch()
.try_into()
.expect("time is valid");
let current_timestamp: UInt = millis_since_unix_epoch().try_into()?;
if presence.content.presence == PresenceState::Online {
// Don't set last_active_ago when the user is online
@ -160,7 +171,7 @@ fn parse_presence_event(bytes: &[u8]) -> Result<PresenceEvent> {
presence.content.last_active_ago = presence
.content
.last_active_ago
.map(|timestamp| current_timestamp - timestamp);
.map(|timestamp| current_timestamp - presence_timestamp);
}
Ok(presence)

View file

@ -65,8 +65,8 @@ pub struct KeyValueDatabase {
pub(super) roomuserid_lastprivatereadupdate: Arc<dyn KvTree>, // LastPrivateReadUpdate = Count
pub(super) typingid_userid: Arc<dyn KvTree>, // TypingId = RoomId + TimeoutTime + Count
pub(super) roomid_lasttypingupdate: Arc<dyn KvTree>, // LastRoomTypingUpdate = Count
pub(super) presenceid_presence: Arc<dyn KvTree>, // PresenceId = RoomId + Count + UserId
pub(super) userid_lastpresenceupdate: Arc<dyn KvTree>, // LastPresenceUpdate = Count
pub(super) userid_presenceupdate: Arc<dyn KvTree>, // PresenceUpdate = Count + Timestamp
pub(super) roomuserid_presenceevent: Arc<dyn KvTree>, // PresenceEvent
//pub rooms: rooms::Rooms,
pub(super) pduid_pdu: Arc<dyn KvTree>, // PduId = ShortRoomId + Count
@ -288,8 +288,8 @@ impl KeyValueDatabase {
.open_tree("roomuserid_lastprivatereadupdate")?,
typingid_userid: builder.open_tree("typingid_userid")?,
roomid_lasttypingupdate: builder.open_tree("roomid_lasttypingupdate")?,
presenceid_presence: builder.open_tree("presenceid_presence")?,
userid_lastpresenceupdate: builder.open_tree("userid_lastpresenceupdate")?,
userid_presenceupdate: builder.open_tree("userid_presenceupdate")?,
roomuserid_presenceevent: builder.open_tree("roomuserid_presenceevent")?,
pduid_pdu: builder.open_tree("pduid_pdu")?,
eventid_pduid: builder.open_tree("eventid_pduid")?,
roomid_pduleaves: builder.open_tree("roomid_pduleaves")?,

View file

@ -27,7 +27,7 @@ pub trait Data: Send + Sync {
&self,
room_id: &RoomId,
user_id: &UserId,
count: u64,
presence_timestamp: u64,
) -> Result<Option<PresenceEvent>>;
/// Returns the most recent presence updates that happened after the event with id `since`.