Compare commits

...

3 commits

Author SHA1 Message Date
Nyaaori
9d4f2884e1
feat: Add config option for receiving read receipts
Adds an option for ignoring incoming read receipts over federation
2022-12-21 10:49:12 +01:00
Nyaaori
ccc5030896
feat: Add config option for disabling sending public read receipts
Treats requests like private receipts
2022-12-21 10:49:12 +01:00
Nyaaori
e8d435c541
feat: Implement private read receipts, partial notification clearing 2022-12-21 10:47:16 +01:00
13 changed files with 351 additions and 132 deletions

View file

@ -34,51 +34,62 @@ pub async fn set_read_marker_route(
)?; )?;
} }
if body.private_read_receipt.is_some() || body.read_receipt.is_some() {
services()
.rooms
.user
.reset_notification_counts(sender_user, &body.room_id)?;
}
if let Some(event) = &body.private_read_receipt { if let Some(event) = &body.private_read_receipt {
let _pdu = services()
.rooms
.timeline
.get_pdu(event)?
.ok_or(Error::BadRequest(
ErrorKind::InvalidParam,
"Event does not exist.",
))?;
services().rooms.edus.read_receipt.private_read_set( services().rooms.edus.read_receipt.private_read_set(
&body.room_id, &body.room_id,
sender_user, sender_user,
services() services().rooms.short.get_or_create_shorteventid(event)?,
.rooms
.timeline
.get_pdu_count(event)?
.ok_or(Error::BadRequest(
ErrorKind::InvalidParam,
"Event does not exist.",
))?,
)?; )?;
} }
if let Some(event) = &body.read_receipt { if let Some(event) = &body.read_receipt {
let mut user_receipts = BTreeMap::new(); let _pdu = services()
user_receipts.insert( .rooms
sender_user.clone(), .timeline
ruma::events::receipt::Receipt { .get_pdu(event)?
ts: Some(MilliSecondsSinceUnixEpoch::now()), .ok_or(Error::BadRequest(
thread: ReceiptThread::Unthreaded, ErrorKind::InvalidParam,
}, "Event does not exist.",
); ))?;
let mut receipts = BTreeMap::new(); if services().globals.allow_public_read_receipts() {
receipts.insert(ReceiptType::Read, user_receipts); let mut user_receipts = BTreeMap::new();
user_receipts.insert(
sender_user.clone(),
ruma::events::receipt::Receipt {
ts: Some(MilliSecondsSinceUnixEpoch::now()),
thread: ReceiptThread::Unthreaded,
},
);
let mut receipt_content = BTreeMap::new(); let mut receipts = BTreeMap::new();
receipt_content.insert(event.to_owned(), receipts); receipts.insert(ReceiptType::Read, user_receipts);
services().rooms.edus.read_receipt.readreceipt_update( let mut receipt_content = BTreeMap::new();
sender_user, receipt_content.insert(event.to_owned(), receipts);
services().rooms.edus.read_receipt.readreceipt_update(
sender_user,
&body.room_id,
ruma::events::receipt::ReceiptEvent {
content: ruma::events::receipt::ReceiptEventContent(receipt_content),
room_id: body.room_id.clone(),
},
)?;
};
services().rooms.edus.read_receipt.private_read_set(
&body.room_id, &body.room_id,
ruma::events::receipt::ReceiptEvent { sender_user,
content: ruma::events::receipt::ReceiptEventContent(receipt_content), services().rooms.short.get_or_create_shorteventid(event)?,
room_id: body.room_id.clone(),
},
)?; )?;
} }
@ -93,16 +104,6 @@ pub async fn create_receipt_route(
) -> Result<create_receipt::v3::Response> { ) -> Result<create_receipt::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated"); let sender_user = body.sender_user.as_ref().expect("user is authenticated");
if matches!(
&body.receipt_type,
create_receipt::v3::ReceiptType::Read | create_receipt::v3::ReceiptType::ReadPrivate
) {
services()
.rooms
.user
.reset_notification_counts(sender_user, &body.room_id)?;
}
match body.receipt_type { match body.receipt_type {
create_receipt::v3::ReceiptType::FullyRead => { create_receipt::v3::ReceiptType::FullyRead => {
let fully_read_event = ruma::events::fully_read::FullyReadEvent { let fully_read_event = ruma::events::fully_read::FullyReadEvent {
@ -118,41 +119,67 @@ pub async fn create_receipt_route(
)?; )?;
} }
create_receipt::v3::ReceiptType::Read => { create_receipt::v3::ReceiptType::Read => {
let mut user_receipts = BTreeMap::new(); let _pdu =
user_receipts.insert( services()
sender_user.clone(), .rooms
ruma::events::receipt::Receipt { .timeline
ts: Some(MilliSecondsSinceUnixEpoch::now()), .get_pdu(&body.event_id)?
thread: ReceiptThread::Unthreaded, .ok_or(Error::BadRequest(
}, ErrorKind::InvalidParam,
); "Event does not exist.",
let mut receipts = BTreeMap::new(); ))?;
receipts.insert(ReceiptType::Read, user_receipts);
let mut receipt_content = BTreeMap::new(); if services().globals.allow_public_read_receipts() {
receipt_content.insert(body.event_id.to_owned(), receipts); let mut user_receipts = BTreeMap::new();
user_receipts.insert(
sender_user.clone(),
ruma::events::receipt::Receipt {
ts: Some(MilliSecondsSinceUnixEpoch::now()),
thread: ReceiptThread::Unthreaded,
},
);
let mut receipts = BTreeMap::new();
receipts.insert(ReceiptType::Read, user_receipts);
services().rooms.edus.read_receipt.readreceipt_update( let mut receipt_content = BTreeMap::new();
sender_user, receipt_content.insert(body.event_id.to_owned(), receipts);
&body.room_id,
ruma::events::receipt::ReceiptEvent { services().rooms.edus.read_receipt.readreceipt_update(
content: ruma::events::receipt::ReceiptEventContent(receipt_content), sender_user,
room_id: body.room_id.clone(), &body.room_id,
}, ruma::events::receipt::ReceiptEvent {
)?; content: ruma::events::receipt::ReceiptEventContent(receipt_content),
} room_id: body.room_id.clone(),
create_receipt::v3::ReceiptType::ReadPrivate => { },
)?;
};
services().rooms.edus.read_receipt.private_read_set( services().rooms.edus.read_receipt.private_read_set(
&body.room_id, &body.room_id,
sender_user, sender_user,
services()
.rooms
.short
.get_or_create_shorteventid(&body.event_id)?,
)?;
}
create_receipt::v3::ReceiptType::ReadPrivate => {
let _pdu =
services() services()
.rooms .rooms
.timeline .timeline
.get_pdu_count(&body.event_id)? .get_pdu(&body.event_id)?
.ok_or(Error::BadRequest( .ok_or(Error::BadRequest(
ErrorKind::InvalidParam, ErrorKind::InvalidParam,
"Event does not exist.", "Event does not exist.",
))?, ))?;
services().rooms.edus.read_receipt.private_read_set(
&body.room_id,
sender_user,
services()
.rooms
.short
.get_or_create_shorteventid(&body.event_id)?,
)?; )?;
} }
_ => return Err(Error::bad_database("Unsupported receipt type")), _ => return Err(Error::bad_database("Unsupported receipt type")),

View file

@ -6,6 +6,7 @@ use ruma::{
uiaa::UiaaResponse, uiaa::UiaaResponse,
}, },
events::{ events::{
receipt::{ReceiptThread, ReceiptType},
room::member::{MembershipState, RoomMemberEventContent}, room::member::{MembershipState, RoomMemberEventContent},
RoomEventType, StateEventType, RoomEventType, StateEventType,
}, },
@ -731,6 +732,50 @@ async fn sync_helper(
.map(|(_, _, v)| v) .map(|(_, _, v)| v)
.collect(); .collect();
if services()
.rooms
.edus
.read_receipt
.last_privateread_update(&sender_user, &room_id)
.unwrap_or(0)
> since
{
if let Ok(event_id) = services().rooms.short.get_eventid_from_short(
services()
.rooms
.edus
.read_receipt
.private_read_get(&room_id, &sender_user)
.expect("User did not have a valid private read receipt?")
.expect("User had a last read private receipt update but no receipt?"),
) {
let mut user_receipts = BTreeMap::new();
user_receipts.insert(
sender_user.clone(),
ruma::events::receipt::Receipt {
ts: None,
thread: ReceiptThread::Unthreaded,
},
);
let mut receipts = BTreeMap::new();
receipts.insert(ReceiptType::ReadPrivate, user_receipts);
let mut receipt_content = BTreeMap::new();
receipt_content.insert((*event_id).to_owned(), receipts);
edus.push(
serde_json::from_str(
&serde_json::to_string(&ruma::events::SyncEphemeralRoomEvent {
content: ruma::events::receipt::ReceiptEventContent(receipt_content),
})
.expect("Did not get valid JSON?"),
)
.expect("JSON was somehow invalid despite just being created"),
);
}
};
if services().rooms.edus.typing.last_typing_update(&room_id)? > since { if services().rooms.edus.typing.last_typing_update(&room_id)? > since {
edus.push( edus.push(
serde_json::from_str( serde_json::from_str(

View file

@ -24,7 +24,10 @@ pub async fn get_supported_versions_route(
"v1.1".to_owned(), "v1.1".to_owned(),
"v1.2".to_owned(), "v1.2".to_owned(),
], ],
unstable_features: BTreeMap::from_iter([("org.matrix.e2e_cross_signing".to_owned(), true)]), unstable_features: BTreeMap::from_iter([
("org.matrix.e2e_cross_signing".to_owned(), true),
("org.matrix.msc2285.stable".to_owned(), true),
]),
}; };
Ok(resp) Ok(resp)

View file

@ -746,43 +746,45 @@ pub async fn send_transaction_message_route(
match edu { match edu {
Edu::Presence(_) => {} Edu::Presence(_) => {}
Edu::Receipt(receipt) => { Edu::Receipt(receipt) => {
for (room_id, room_updates) in receipt.receipts { if services().globals.allow_receiving_read_receipts() {
for (user_id, user_updates) in room_updates.read { for (room_id, room_updates) in receipt.receipts {
if let Some((event_id, _)) = user_updates for (user_id, user_updates) in room_updates.read {
.event_ids if let Some((event_id, _)) = user_updates
.iter() .event_ids
.filter_map(|id| { .iter()
.filter_map(|id| {
services()
.rooms
.timeline
.get_pdu_count(id)
.ok()
.flatten()
.map(|r| (id, r))
})
.max_by_key(|(_, count)| *count)
{
let mut user_receipts = BTreeMap::new();
user_receipts.insert(user_id.clone(), user_updates.data);
let mut receipts = BTreeMap::new();
receipts.insert(ReceiptType::Read, user_receipts);
let mut receipt_content = BTreeMap::new();
receipt_content.insert(event_id.to_owned(), receipts);
let event = ReceiptEvent {
content: ReceiptEventContent(receipt_content),
room_id: room_id.clone(),
};
services() services()
.rooms .rooms
.timeline .edus
.get_pdu_count(id) .read_receipt
.ok() .readreceipt_update(&user_id, &room_id, event)?;
.flatten() } else {
.map(|r| (id, r)) // TODO fetch missing events
}) info!("No known event ids in read receipt: {:?}", user_updates);
.max_by_key(|(_, count)| *count) }
{
let mut user_receipts = BTreeMap::new();
user_receipts.insert(user_id.clone(), user_updates.data);
let mut receipts = BTreeMap::new();
receipts.insert(ReceiptType::Read, user_receipts);
let mut receipt_content = BTreeMap::new();
receipt_content.insert(event_id.to_owned(), receipts);
let event = ReceiptEvent {
content: ReceiptEventContent(receipt_content),
room_id: room_id.clone(),
};
services()
.rooms
.edus
.read_receipt
.readreceipt_update(&user_id, &room_id, event)?;
} else {
// TODO fetch missing events
info!("No known event ids in read receipt: {:?}", user_updates);
} }
} }
} }

View file

@ -47,6 +47,10 @@ pub struct Config {
#[serde(default = "false_fn")] #[serde(default = "false_fn")]
pub allow_federation: bool, pub allow_federation: bool,
#[serde(default = "true_fn")] #[serde(default = "true_fn")]
pub allow_public_read_receipts: bool,
#[serde(default = "true_fn")]
pub allow_receiving_read_receipts: bool,
#[serde(default = "true_fn")]
pub allow_room_creation: bool, pub allow_room_creation: bool,
#[serde(default = "true_fn")] #[serde(default = "true_fn")]
pub allow_unstable_room_versions: bool, pub allow_unstable_room_versions: bool,

View file

@ -105,16 +105,25 @@ impl service::rooms::edus::read_receipt::Data for KeyValueDatabase {
) )
} }
fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64) -> Result<()> { fn private_read_set(
&self,
room_id: &RoomId,
user_id: &UserId,
shorteventid: u64,
) -> Result<()> {
let mut key = room_id.as_bytes().to_vec(); let mut key = room_id.as_bytes().to_vec();
key.push(0xff); key.push(0xff);
key.extend_from_slice(user_id.as_bytes()); key.extend_from_slice(user_id.as_bytes());
self.roomuserid_privateread if self.private_read_get(room_id, user_id)?.unwrap_or(0) < shorteventid {
.insert(&key, &count.to_be_bytes())?; self.roomuserid_privateread
.insert(&key, &shorteventid.to_be_bytes())?;
self.roomuserid_lastprivatereadupdate self.roomuserid_lastprivatereadupdate
.insert(&key, &services().globals.next_count()?.to_be_bytes()) .insert(&key, &services().globals.next_count()?.to_be_bytes())
} else {
Ok(())
}
} }
fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<u64>> { fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<u64>> {

View file

@ -3,7 +3,13 @@ use ruma::{OwnedRoomId, OwnedUserId, RoomId, UserId};
use crate::{database::KeyValueDatabase, service, services, utils, Error, Result}; use crate::{database::KeyValueDatabase, service, services, utils, Error, Result};
impl service::rooms::user::Data for KeyValueDatabase { impl service::rooms::user::Data for KeyValueDatabase {
fn reset_notification_counts(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> { fn update_notification_counts(
&self,
user_id: &UserId,
room_id: &RoomId,
notification_count: u64,
highlight_count: u64,
) -> Result<()> {
let mut userroom_id = user_id.as_bytes().to_vec(); let mut userroom_id = user_id.as_bytes().to_vec();
userroom_id.push(0xff); userroom_id.push(0xff);
userroom_id.extend_from_slice(room_id.as_bytes()); userroom_id.extend_from_slice(room_id.as_bytes());
@ -12,9 +18,9 @@ impl service::rooms::user::Data for KeyValueDatabase {
roomuser_id.extend_from_slice(user_id.as_bytes()); roomuser_id.extend_from_slice(user_id.as_bytes());
self.userroomid_notificationcount self.userroomid_notificationcount
.insert(&userroom_id, &0_u64.to_be_bytes())?; .insert(&userroom_id, &notification_count.to_be_bytes())?;
self.userroomid_highlightcount self.userroomid_highlightcount
.insert(&userroom_id, &0_u64.to_be_bytes())?; .insert(&userroom_id, &highlight_count.to_be_bytes())?;
self.roomuserid_lastnotificationread.insert( self.roomuserid_lastnotificationread.insert(
&roomuser_id, &roomuser_id,

View file

@ -234,6 +234,14 @@ impl Service {
self.config.allow_federation self.config.allow_federation
} }
pub fn allow_public_read_receipts(&self) -> bool {
self.config.allow_public_read_receipts
}
pub fn allow_receiving_read_receipts(&self) -> bool {
self.config.allow_receiving_read_receipts
}
pub fn allow_room_creation(&self) -> bool { pub fn allow_room_creation(&self) -> bool {
self.config.allow_room_creation self.config.allow_room_creation
} }

View file

@ -25,8 +25,9 @@ pub trait Data: Send + Sync {
> + 'a, > + 'a,
>; >;
/// Sets a private read marker at `count`. /// Sets a private read marker at `shorteventid`.
fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64) -> Result<()>; fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, shorteventid: u64)
-> Result<()>;
/// Returns the private read marker. /// Returns the private read marker.
fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<u64>>; fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<u64>>;

View file

@ -2,7 +2,7 @@ mod data;
pub use data::Data; pub use data::Data;
use crate::Result; use crate::{services, Result};
use ruma::{events::receipt::ReceiptEvent, serde::Raw, OwnedUserId, RoomId, UserId}; use ruma::{events::receipt::ReceiptEvent, serde::Raw, OwnedUserId, RoomId, UserId};
pub struct Service { pub struct Service {
@ -36,10 +36,19 @@ impl Service {
self.db.readreceipts_since(room_id, since) self.db.readreceipts_since(room_id, since)
} }
/// Sets a private read marker at `count`. /// Sets a private read marker at `shorteventid`.
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self))]
pub fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64) -> Result<()> { pub fn private_read_set(
self.db.private_read_set(room_id, user_id, count) &self,
room_id: &RoomId,
user_id: &UserId,
shorteventid: u64,
) -> Result<()> {
self.db.private_read_set(room_id, user_id, shorteventid)?;
services()
.rooms
.user
.update_notification_counts(user_id, room_id)
} }
/// Returns the private read marker. /// Returns the private read marker.

View file

@ -213,18 +213,17 @@ impl Service {
); );
let insert_lock = mutex_insert.lock().unwrap(); let insert_lock = mutex_insert.lock().unwrap();
let count1 = services().globals.next_count()?; let _count1 = services().globals.next_count()?;
// Mark as read first so the sending client doesn't get a notification even if appending // Mark as read first so the sending client doesn't get a notification even if appending
// fails // fails
services() services().rooms.edus.read_receipt.private_read_set(
.rooms &pdu.room_id,
.edus &pdu.sender,
.read_receipt services()
.private_read_set(&pdu.room_id, &pdu.sender, count1)?; .rooms
services() .short
.rooms .get_or_create_shorteventid(&pdu.event_id)?,
.user )?;
.reset_notification_counts(&pdu.sender, &pdu.room_id)?;
let count2 = services().globals.next_count()?; let count2 = services().globals.next_count()?;
let mut pdu_id = shortroomid.to_be_bytes().to_vec(); let mut pdu_id = shortroomid.to_be_bytes().to_vec();

View file

@ -2,7 +2,13 @@ use crate::Result;
use ruma::{OwnedRoomId, OwnedUserId, RoomId, UserId}; use ruma::{OwnedRoomId, OwnedUserId, RoomId, UserId};
pub trait Data: Send + Sync { pub trait Data: Send + Sync {
fn reset_notification_counts(&self, user_id: &UserId, room_id: &RoomId) -> Result<()>; fn update_notification_counts(
&self,
user_id: &UserId,
room_id: &RoomId,
notification_count: u64,
highlight_count: u64,
) -> Result<()>;
fn notification_count(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64>; fn notification_count(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64>;

View file

@ -1,17 +1,117 @@
mod data; mod data;
pub use data::Data; pub use data::Data;
use ruma::{OwnedRoomId, OwnedUserId, RoomId, UserId}; use ruma::{
events::{
push_rules::PushRulesEvent, room::power_levels::RoomPowerLevelsEventContent,
GlobalAccountDataEventType, StateEventType,
},
push::{Action, Ruleset, Tweak},
OwnedRoomId, OwnedUserId, RoomId, UserId,
};
use crate::Result; use crate::{services, Error, Result};
pub struct Service { pub struct Service {
pub db: &'static dyn Data, pub db: &'static dyn Data,
} }
impl Service { impl Service {
pub fn reset_notification_counts(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> { pub fn update_notification_counts(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> {
self.db.reset_notification_counts(user_id, room_id) let power_levels: RoomPowerLevelsEventContent = services()
.rooms
.state_accessor
.room_state_get(room_id, &StateEventType::RoomPowerLevels, "")?
.map(|ev| {
serde_json::from_str(ev.content.get())
.map_err(|_| Error::bad_database("invalid m.room.power_levels event"))
})
.transpose()?
.unwrap_or_default();
let read_event = services()
.rooms
.edus
.read_receipt
.private_read_get(room_id, user_id)
.unwrap_or(None)
.unwrap_or(0u64);
let mut notification_count = 0u64;
let mut highlight_count = 0u64;
services()
.rooms
.timeline
.pdus_since(user_id, room_id, read_event)?
.filter_map(|pdu| pdu.ok())
.map(|(_, pdu)| pdu)
.filter(|pdu| {
// Don't include user's own messages in notification counts
user_id != &pdu.sender
&& services()
.rooms
.short
.get_or_create_shorteventid(&pdu.event_id)
.unwrap_or(0)
!= read_event
})
.filter_map(|pdu| {
let rules_for_user = services()
.account_data
.get(
None,
user_id,
GlobalAccountDataEventType::PushRules.to_string().into(),
)
.ok()?
.map(|event| {
serde_json::from_str::<PushRulesEvent>(event.get())
.map_err(|_| Error::bad_database("Invalid push rules event in db."))
})
.transpose()
.ok()?
.map(|ev: PushRulesEvent| ev.content.global)
.unwrap_or_else(|| Ruleset::server_default(user_id));
let mut highlight = false;
let mut notify = false;
for action in services()
.pusher
.get_actions(
user_id,
&rules_for_user,
&power_levels,
&pdu.to_sync_room_event(),
&pdu.room_id,
)
.ok()?
{
match action {
Action::DontNotify => notify = false,
// TODO: Implement proper support for coalesce
Action::Notify | Action::Coalesce => notify = true,
Action::SetTweak(Tweak::Highlight(true)) => {
highlight = true;
}
_ => {}
};
}
if notify {
notification_count += 1;
};
if highlight {
highlight_count += 1;
};
Some(())
})
.for_each(|_| {});
self.db
.update_notification_counts(user_id, room_id, notification_count, highlight_count)
} }
pub fn notification_count(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64> { pub fn notification_count(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64> {