add flush suite to sending service; trigger on read receipts.

Signed-off-by: Jason Volk <jason@zemos.net>
Signed-off-by: strawberry <strawberry@puppygock.gay>
This commit is contained in:
Jason Volk 2024-03-17 02:25:50 -04:00 committed by June
parent 95ea665649
commit abceae26de
3 changed files with 49 additions and 4 deletions

View file

@ -81,6 +81,8 @@ pub async fn set_read_marker_route(body: Ruma<set_read_marker::v3::Request>) ->
room_id: body.room_id.clone(),
},
)?;
services().sending.flush_room(&body.room_id)?;
}
Ok(set_read_marker::v3::Response {})
@ -136,6 +138,8 @@ pub async fn create_receipt_route(body: Ruma<create_receipt::v3::Request>) -> Re
room_id: body.room_id.clone(),
},
)?;
services().sending.flush_room(&body.room_id)?;
},
create_receipt::v3::ReceiptType::ReadPrivate => {
let count = services()

View file

@ -90,6 +90,10 @@ impl service::sending::Data for KeyValueDatabase {
fn mark_as_active(&self, events: &[(SendingEventType, Vec<u8>)]) -> Result<()> {
for (e, key) in events {
if key.is_empty() {
continue;
}
let value = if let SendingEventType::Edu(value) = &e {
&**value
} else {

View file

@ -25,7 +25,7 @@ use ruma::{
},
device_id,
events::{push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent, GlobalAccountDataEventType},
push, uint, MilliSecondsSinceUnixEpoch, OwnedServerName, OwnedUserId, ServerName, UInt, UserId,
push, uint, MilliSecondsSinceUnixEpoch, OwnedServerName, OwnedUserId, RoomId, ServerName, UInt, UserId,
};
use tokio::{
select,
@ -80,6 +80,7 @@ impl OutgoingKind {
pub enum SendingEventType {
Pdu(Vec<u8>), // pduid
Edu(Vec<u8>), // pdu json
Flush, // none
}
pub struct Service {
@ -237,9 +238,11 @@ impl Service {
events.push(e);
}
} else {
self.db.mark_as_active(&new_events)?;
for (e, _) in new_events {
events.push(e);
if !new_events.is_empty() {
self.db.mark_as_active(&new_events)?;
for (e, _) in new_events {
events.push(e);
}
}
if let OutgoingKind::Normal(server_name) = outgoing_kind {
@ -421,6 +424,29 @@ impl Service {
Ok(())
}
#[tracing::instrument(skip(self, room_id))]
pub fn flush_room(&self, room_id: &RoomId) -> Result<()> {
let servers: HashSet<OwnedServerName> =
services().rooms.state_cache.room_servers(room_id).filter_map(std::result::Result::ok).collect();
self.flush_servers(servers.into_iter())
}
#[tracing::instrument(skip(self, servers))]
pub fn flush_servers<I: Iterator<Item = OwnedServerName>>(&self, servers: I) -> Result<()> {
let requests = servers
.into_iter()
.filter(|server| server != services().globals.server_name())
.map(OutgoingKind::Normal)
.collect::<Vec<_>>();
for outgoing_kind in requests.into_iter() {
self.sender.send((outgoing_kind, SendingEventType::Flush, Vec::<u8>::new())).unwrap();
}
Ok(())
}
/// Cleanup event data
/// Used for instance after we remove an appservice registration
#[tracing::instrument(skip(self))]
@ -461,6 +487,9 @@ impl Service {
SendingEventType::Edu(_) => {
// Appservices don't need EDUs (?)
},
SendingEventType::Flush => {
// flush only; no new content
},
}
}
@ -480,6 +509,7 @@ impl Service {
.iter()
.map(|e| match e {
SendingEventType::Edu(b) | SendingEventType::Pdu(b) => &**b,
SendingEventType::Flush => &[],
})
.collect::<Vec<_>>(),
)))
@ -521,6 +551,9 @@ impl Service {
SendingEventType::Edu(_) => {
// Push gateways don't need EDUs (?)
},
SendingEventType::Flush => {
// flush only; no new content
},
}
}
@ -601,6 +634,9 @@ impl Service {
edu_jsons.push(raw);
}
},
SendingEventType::Flush => {
// flush only; no new content
},
}
}
@ -618,6 +654,7 @@ impl Service {
.iter()
.map(|e| match e {
SendingEventType::Edu(b) | SendingEventType::Pdu(b) => &**b,
SendingEventType::Flush => &[],
})
.collect::<Vec<_>>(),
)))