diff --git a/src/api/client_server/read_marker.rs b/src/api/client_server/read_marker.rs index 182748d6..a6097c17 100644 --- a/src/api/client_server/read_marker.rs +++ b/src/api/client_server/read_marker.rs @@ -81,6 +81,8 @@ pub async fn set_read_marker_route(body: Ruma) -> room_id: body.room_id.clone(), }, )?; + + services().sending.flush_room(&body.room_id)?; } Ok(set_read_marker::v3::Response {}) @@ -136,6 +138,8 @@ pub async fn create_receipt_route(body: Ruma) -> Re room_id: body.room_id.clone(), }, )?; + + services().sending.flush_room(&body.room_id)?; }, create_receipt::v3::ReceiptType::ReadPrivate => { let count = services() diff --git a/src/database/key_value/sending.rs b/src/database/key_value/sending.rs index a3ede405..5087cbe7 100644 --- a/src/database/key_value/sending.rs +++ b/src/database/key_value/sending.rs @@ -90,6 +90,10 @@ impl service::sending::Data for KeyValueDatabase { fn mark_as_active(&self, events: &[(SendingEventType, Vec)]) -> Result<()> { for (e, key) in events { + if key.is_empty() { + continue; + } + let value = if let SendingEventType::Edu(value) = &e { &**value } else { diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index fae05404..8c8d0243 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -25,7 +25,7 @@ use ruma::{ }, device_id, events::{push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent, GlobalAccountDataEventType}, - push, uint, MilliSecondsSinceUnixEpoch, OwnedServerName, OwnedUserId, ServerName, UInt, UserId, + push, uint, MilliSecondsSinceUnixEpoch, OwnedServerName, OwnedUserId, RoomId, ServerName, UInt, UserId, }; use tokio::{ select, @@ -80,6 +80,7 @@ impl OutgoingKind { pub enum SendingEventType { Pdu(Vec), // pduid Edu(Vec), // pdu json + Flush, // none } pub struct Service { @@ -237,9 +238,11 @@ impl Service { events.push(e); } } else { - self.db.mark_as_active(&new_events)?; - for (e, _) in new_events { - events.push(e); + if !new_events.is_empty() { + self.db.mark_as_active(&new_events)?; + for (e, _) in new_events { + events.push(e); + } } if let OutgoingKind::Normal(server_name) = outgoing_kind { @@ -421,6 +424,29 @@ impl Service { Ok(()) } + #[tracing::instrument(skip(self, room_id))] + pub fn flush_room(&self, room_id: &RoomId) -> Result<()> { + let servers: HashSet = + services().rooms.state_cache.room_servers(room_id).filter_map(std::result::Result::ok).collect(); + + self.flush_servers(servers.into_iter()) + } + + #[tracing::instrument(skip(self, servers))] + pub fn flush_servers>(&self, servers: I) -> Result<()> { + let requests = servers + .into_iter() + .filter(|server| server != services().globals.server_name()) + .map(OutgoingKind::Normal) + .collect::>(); + + for outgoing_kind in requests.into_iter() { + self.sender.send((outgoing_kind, SendingEventType::Flush, Vec::::new())).unwrap(); + } + + Ok(()) + } + /// Cleanup event data /// Used for instance after we remove an appservice registration #[tracing::instrument(skip(self))] @@ -461,6 +487,9 @@ impl Service { SendingEventType::Edu(_) => { // Appservices don't need EDUs (?) }, + SendingEventType::Flush => { + // flush only; no new content + }, } } @@ -480,6 +509,7 @@ impl Service { .iter() .map(|e| match e { SendingEventType::Edu(b) | SendingEventType::Pdu(b) => &**b, + SendingEventType::Flush => &[], }) .collect::>(), ))) @@ -521,6 +551,9 @@ impl Service { SendingEventType::Edu(_) => { // Push gateways don't need EDUs (?) }, + SendingEventType::Flush => { + // flush only; no new content + }, } } @@ -601,6 +634,9 @@ impl Service { edu_jsons.push(raw); } }, + SendingEventType::Flush => { + // flush only; no new content + }, } } @@ -618,6 +654,7 @@ impl Service { .iter() .map(|e| match e { SendingEventType::Edu(b) | SendingEventType::Pdu(b) => &**b, + SendingEventType::Flush => &[], }) .collect::>(), )))