diff --git a/src/database/key_value/sending.rs b/src/database/key_value/sending.rs index 5a508644..ae662f3b 100644 --- a/src/database/key_value/sending.rs +++ b/src/database/key_value/sending.rs @@ -4,15 +4,13 @@ use crate::{ database::KeyValueDatabase, service::{ self, - sending::{Destination, SendingEventType}, + sending::{Destination, SendingEvent}, }, services, utils, Error, Result, }; impl service::sending::Data for KeyValueDatabase { - fn active_requests<'a>( - &'a self, - ) -> Box, Destination, SendingEventType)>> + 'a> { + fn active_requests<'a>(&'a self) -> Box, Destination, SendingEvent)>> + 'a> { Box::new( self.servercurrentevent_data .iter() @@ -22,7 +20,7 @@ impl service::sending::Data for KeyValueDatabase { fn active_requests_for<'a>( &'a self, destination: &Destination, - ) -> Box, SendingEventType)>> + 'a> { + ) -> Box, SendingEvent)>> + 'a> { let prefix = destination.get_prefix(); Box::new( self.servercurrentevent_data @@ -55,17 +53,17 @@ impl service::sending::Data for KeyValueDatabase { Ok(()) } - fn queue_requests(&self, requests: &[(&Destination, SendingEventType)]) -> Result>> { + fn queue_requests(&self, requests: &[(&Destination, SendingEvent)]) -> Result>> { let mut batch = Vec::new(); let mut keys = Vec::new(); for (destination, event) in requests { let mut key = destination.get_prefix(); - if let SendingEventType::Pdu(value) = &event { + if let SendingEvent::Pdu(value) = &event { key.extend_from_slice(value); } else { key.extend_from_slice(&services().globals.next_count()?.to_be_bytes()); } - let value = if let SendingEventType::Edu(value) = &event { + let value = if let SendingEvent::Edu(value) = &event { &**value } else { &[] @@ -80,7 +78,7 @@ impl service::sending::Data for KeyValueDatabase { fn queued_requests<'a>( &'a self, destination: &Destination, - ) -> Box)>> + 'a> { + ) -> Box)>> + 'a> { let prefix = destination.get_prefix(); return Box::new( self.servernameevent_data @@ -89,13 +87,13 @@ impl service::sending::Data for KeyValueDatabase { ); } - fn mark_as_active(&self, events: &[(SendingEventType, Vec)]) -> Result<()> { + fn mark_as_active(&self, events: &[(SendingEvent, Vec)]) -> Result<()> { for (e, key) in events { if key.is_empty() { continue; } - let value = if let SendingEventType::Edu(value) = &e { + let value = if let SendingEvent::Edu(value) = &e { &**value } else { &[] @@ -122,7 +120,7 @@ impl service::sending::Data for KeyValueDatabase { } #[tracing::instrument(skip(key))] -fn parse_servercurrentevent(key: &[u8], value: Vec) -> Result<(Destination, SendingEventType)> { +fn parse_servercurrentevent(key: &[u8], value: Vec) -> Result<(Destination, SendingEvent)> { // Appservices start with a plus Ok::<_, Error>(if key.starts_with(b"+") { let mut parts = key[1..].splitn(2, |&b| b == 0xFF); @@ -138,9 +136,9 @@ fn parse_servercurrentevent(key: &[u8], value: Vec) -> Result<(Destination, ( Destination::Appservice(server), if value.is_empty() { - SendingEventType::Pdu(event.to_vec()) + SendingEvent::Pdu(event.to_vec()) } else { - SendingEventType::Edu(value) + SendingEvent::Edu(value) }, ) } else if key.starts_with(b"$") { @@ -165,10 +163,10 @@ fn parse_servercurrentevent(key: &[u8], value: Vec) -> Result<(Destination, ( Destination::Push(user_id, pushkey_string), if value.is_empty() { - SendingEventType::Pdu(event.to_vec()) + SendingEvent::Pdu(event.to_vec()) } else { // I'm pretty sure this should never be called - SendingEventType::Edu(value) + SendingEvent::Edu(value) }, ) } else { @@ -188,9 +186,9 @@ fn parse_servercurrentevent(key: &[u8], value: Vec) -> Result<(Destination, .map_err(|_| Error::bad_database("Invalid server string in server_currenttransaction"))?, ), if value.is_empty() { - SendingEventType::Pdu(event.to_vec()) + SendingEvent::Pdu(event.to_vec()) } else { - SendingEventType::Edu(value) + SendingEvent::Edu(value) }, ) }) diff --git a/src/service/sending/data.rs b/src/service/sending/data.rs index 28ee6cb7..04dfd5da 100644 --- a/src/service/sending/data.rs +++ b/src/service/sending/data.rs @@ -1,25 +1,25 @@ use ruma::ServerName; -use super::{Destination, SendingEventType}; +use super::{Destination, SendingEvent}; use crate::Result; -type OutgoingSendingIter<'a> = Box, Destination, SendingEventType)>> + 'a>; -type SendingEventTypeIter<'a> = Box, SendingEventType)>> + 'a>; +type OutgoingSendingIter<'a> = Box, Destination, SendingEvent)>> + 'a>; +type SendingEventIter<'a> = Box, SendingEvent)>> + 'a>; pub(crate) trait Data: Send + Sync { fn active_requests(&self) -> OutgoingSendingIter<'_>; - fn active_requests_for(&self, destination: &Destination) -> SendingEventTypeIter<'_>; + fn active_requests_for(&self, destination: &Destination) -> SendingEventIter<'_>; fn delete_active_request(&self, key: Vec) -> Result<()>; fn delete_all_active_requests_for(&self, destination: &Destination) -> Result<()>; /// TODO: use this? #[allow(dead_code)] fn delete_all_requests_for(&self, destination: &Destination) -> Result<()>; - fn queue_requests(&self, requests: &[(&Destination, SendingEventType)]) -> Result>>; + fn queue_requests(&self, requests: &[(&Destination, SendingEvent)]) -> Result>>; fn queued_requests<'a>( &'a self, destination: &Destination, - ) -> Box)>> + 'a>; - fn mark_as_active(&self, events: &[(SendingEventType, Vec)]) -> Result<()>; + ) -> Box)>> + 'a>; + fn mark_as_active(&self, events: &[(SendingEvent, Vec)]) -> Result<()>; fn set_latest_educount(&self, server_name: &ServerName, educount: u64) -> Result<()>; fn get_latest_educount(&self, server_name: &ServerName) -> Result; } diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index 6003c7ed..51028723 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -9,7 +9,7 @@ use std::{ use base64::{engine::general_purpose, Engine as _}; pub(crate) use data::Data; use federation::transactions::send_transaction_message; -use futures_util::{stream::FuturesUnordered, StreamExt}; +use futures_util::{future::BoxFuture, stream::FuturesUnordered, StreamExt}; use ruma::{ api::{ appservice::Registration, @@ -26,7 +26,7 @@ use ruma::{ push, uint, MilliSecondsSinceUnixEpoch, OwnedServerName, OwnedUserId, RoomId, ServerName, UInt, UserId, }; use tokio::sync::{Mutex, Semaphore}; -use tracing::{error, warn}; +use tracing::{debug, error, warn}; use crate::{service::presence::Presence, services, utils::calculate_hash, Config, Error, PduEvent, Result}; @@ -35,20 +35,25 @@ mod data; mod send; pub(crate) use send::FedDest; -const SELECT_EDU_LIMIT: usize = 16; - pub(crate) struct Service { pub(crate) db: &'static dyn Data, /// The state for a given state hash. pub(crate) maximum_requests: Arc, - pub(crate) sender: loole::Sender<(Destination, SendingEventType, Vec)>, - receiver: Mutex)>>, + sender: loole::Sender, + receiver: Mutex>, startup_netburst: bool, startup_netburst_keep: i64, timeout: u64, } +#[derive(Clone, Debug, PartialEq, Eq)] +struct Msg { + dest: Destination, + event: SendingEvent, + queue_id: Vec, +} + #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub(crate) enum Destination { Appservice(String), @@ -56,20 +61,30 @@ pub(crate) enum Destination { Normal(OwnedServerName), } -#[derive(Clone, Debug, PartialEq, Eq, Hash)] #[allow(clippy::module_name_repetitions)] -pub(crate) enum SendingEventType { +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub(crate) enum SendingEvent { Pdu(Vec), // pduid Edu(Vec), // pdu json Flush, // none } +#[derive(Debug)] enum TransactionStatus { Running, Failed(u32, Instant), // number of times failed, time of last failure Retrying(u32), // number of times failed } +type SendingError = (Destination, Error); +type SendingResult = Result; +type SendingFuture<'a> = BoxFuture<'a, SendingResult>; +type SendingFutures<'a> = FuturesUnordered>; +type CurTransactionStatus = HashMap; + +const DEQUEUE_LIMIT: usize = 48; +const SELECT_EDU_LIMIT: usize = 16; + impl Service { pub(crate) fn build(db: &'static dyn Data, config: &Config) -> Arc { let (sender, receiver) = loole::unbounded(); @@ -87,27 +102,27 @@ impl Service { #[tracing::instrument(skip(self, pdu_id, user, pushkey))] pub(crate) fn send_pdu_push(&self, pdu_id: &[u8], user: &UserId, pushkey: String) -> Result<()> { let dest = Destination::Push(user.to_owned(), pushkey); - let event = SendingEventType::Pdu(pdu_id.to_owned()); + let event = SendingEvent::Pdu(pdu_id.to_owned()); let _cork = services().globals.db.cork()?; let keys = self.db.queue_requests(&[(&dest, event.clone())])?; - self.sender - .send((dest, event, keys.into_iter().next().unwrap())) - .unwrap(); - - Ok(()) + self.dispatch(Msg { + dest, + event, + queue_id: keys.into_iter().next().expect("request queue key"), + }) } #[tracing::instrument(skip(self))] pub(crate) fn send_pdu_appservice(&self, appservice_id: String, pdu_id: Vec) -> Result<()> { let dest = Destination::Appservice(appservice_id); - let event = SendingEventType::Pdu(pdu_id); + let event = SendingEvent::Pdu(pdu_id); let _cork = services().globals.db.cork()?; let keys = self.db.queue_requests(&[(&dest, event.clone())])?; - self.sender - .send((dest, event, keys.into_iter().next().unwrap())) - .unwrap(); - - Ok(()) + self.dispatch(Msg { + dest, + event, + queue_id: keys.into_iter().next().expect("request queue key"), + }) } #[tracing::instrument(skip(self, room_id, pdu_id))] @@ -128,7 +143,7 @@ impl Service { ) -> Result<()> { let requests = servers .into_iter() - .map(|server| (Destination::Normal(server), SendingEventType::Pdu(pdu_id.to_owned()))) + .map(|server| (Destination::Normal(server), SendingEvent::Pdu(pdu_id.to_owned()))) .collect::>(); let _cork = services().globals.db.cork()?; let keys = self.db.queue_requests( @@ -137,8 +152,12 @@ impl Service { .map(|(o, e)| (o, e.clone())) .collect::>(), )?; - for ((dest, event), key) in requests.into_iter().zip(keys) { - self.sender.send((dest.clone(), event, key)).unwrap(); + for ((dest, event), queue_id) in requests.into_iter().zip(keys) { + self.dispatch(Msg { + dest, + event, + queue_id, + })?; } Ok(()) @@ -147,14 +166,14 @@ impl Service { #[tracing::instrument(skip(self, server, serialized))] pub(crate) fn send_edu_server(&self, server: &ServerName, serialized: Vec) -> Result<()> { let dest = Destination::Normal(server.to_owned()); - let event = SendingEventType::Edu(serialized); + let event = SendingEvent::Edu(serialized); let _cork = services().globals.db.cork()?; let keys = self.db.queue_requests(&[(&dest, event.clone())])?; - self.sender - .send((dest, event, keys.into_iter().next().unwrap())) - .unwrap(); - - Ok(()) + self.dispatch(Msg { + dest, + event, + queue_id: keys.into_iter().next().expect("request queue key"), + }) } #[tracing::instrument(skip(self, room_id, serialized))] @@ -175,7 +194,7 @@ impl Service { ) -> Result<()> { let requests = servers .into_iter() - .map(|server| (Destination::Normal(server), SendingEventType::Edu(serialized.clone()))) + .map(|server| (Destination::Normal(server), SendingEvent::Edu(serialized.clone()))) .collect::>(); let _cork = services().globals.db.cork()?; let keys = self.db.queue_requests( @@ -184,8 +203,13 @@ impl Service { .map(|(o, e)| (o, e.clone())) .collect::>(), )?; - for ((dest, event), key) in requests.into_iter().zip(keys) { - self.sender.send((dest.clone(), event, key)).unwrap(); + + for ((dest, event), queue_id) in requests.into_iter().zip(keys) { + self.dispatch(Msg { + dest, + event, + queue_id, + })?; } Ok(()) @@ -206,26 +230,17 @@ impl Service { #[tracing::instrument(skip(self, servers))] pub(crate) fn flush_servers>(&self, servers: I) -> Result<()> { let requests = servers.into_iter().map(Destination::Normal); - for dest in requests { - self.sender - .send((dest, SendingEventType::Flush, Vec::::new())) - .unwrap(); + self.dispatch(Msg { + dest, + event: SendingEvent::Flush, + queue_id: Vec::::new(), + })?; } Ok(()) } - /// Cleanup event data - /// Used for instance after we remove an appservice registration - #[tracing::instrument(skip(self))] - pub(crate) fn cleanup_events(&self, appservice_id: String) -> Result<()> { - self.db - .delete_all_requests_for(&Destination::Appservice(appservice_id))?; - - Ok(()) - } - #[tracing::instrument(skip(self, request), name = "request")] pub(crate) async fn send_federation_request(&self, dest: &ServerName, request: T) -> Result where @@ -262,112 +277,141 @@ impl Service { response } + /// Cleanup event data + /// Used for instance after we remove an appservice registration + #[tracing::instrument(skip(self))] + pub(crate) fn cleanup_events(&self, appservice_id: String) -> Result<()> { + self.db + .delete_all_requests_for(&Destination::Appservice(appservice_id))?; + + Ok(()) + } + pub(crate) fn start_handler(self: &Arc) { let self2 = Arc::clone(self); tokio::spawn(async move { - self2 - .handler() - .await - .expect("Failed to initialize request sending handler"); + self2.handler().await; }); } + fn dispatch(&self, msg: Msg) -> Result<()> { + debug_assert!(!self.sender.is_full(), "channel full"); + debug_assert!(!self.sender.is_closed(), "channel closed"); + self.sender.send(msg).map_err(|e| Error::Err(e.to_string())) + } + #[tracing::instrument(skip_all, name = "sender")] - async fn handler(&self) -> Result<()> { + async fn handler(&self) { let receiver = self.receiver.lock().await; + debug_assert!(!receiver.is_closed(), "channel error"); - let mut futures = FuturesUnordered::new(); - let mut current_transaction_status = HashMap::::new(); - - // Retry requests we could not finish yet - if self.startup_netburst { - let mut initial_transactions = HashMap::>::new(); - for (key, dest, event) in self.db.active_requests().filter_map(Result::ok) { - let entry = initial_transactions.entry(dest.clone()).or_default(); - - if self.startup_netburst_keep >= 0 - && entry.len() >= usize::try_from(self.startup_netburst_keep).unwrap() - { - warn!("Dropping unsent event {:?} {:?}", dest, String::from_utf8_lossy(&key),); - self.db.delete_active_request(key)?; - continue; - } - - entry.push(event); - } - - for (dest, events) in initial_transactions { - current_transaction_status.insert(dest.clone(), TransactionStatus::Running); - futures.push(send_events(dest.clone(), events)); - } - } - + let mut futures: SendingFutures<'_> = FuturesUnordered::new(); + let mut statuses: CurTransactionStatus = CurTransactionStatus::new(); + self.initial_transactions(&mut futures, &mut statuses); loop { tokio::select! { - Some(response) = futures.next() => { - match response { - Ok(dest) => { - let _cork = services().globals.db.cork(); - self.db.delete_all_active_requests_for(&dest)?; - - // Find events that have been added since starting the last request - let new_events = self - .db - .queued_requests(&dest) - .filter_map(Result::ok) - .take(30).collect::>(); - - if !new_events.is_empty() { - // Insert pdus we found - self.db.mark_as_active(&new_events)?; - futures.push(send_events( - dest.clone(), - new_events.into_iter().map(|(event, _)| event).collect(), - )); - } else { - current_transaction_status.remove(&dest); - } - } - Err((dest, _)) => { - current_transaction_status.entry(dest).and_modify(|e| *e = match e { - TransactionStatus::Running => TransactionStatus::Failed(1, Instant::now()), - TransactionStatus::Retrying(n) => TransactionStatus::Failed(*n+1, Instant::now()), - TransactionStatus::Failed(_, _) => { - error!("Request that was not even running failed?!"); - return - }, - }); - } - }; + Ok(request) = receiver.recv_async() => { + self.handle_request(request, &mut futures, &mut statuses); + }, + Some(response) = futures.next() => { + self.handle_response(response, &mut futures, &mut statuses); }, - - event = receiver.recv_async() => { - if let Ok((dest, event, key)) = event { - if let Ok(Some(events)) = self.select_events( - &dest, - vec![(event, key)], - &mut current_transaction_status, - ) { - if !events.is_empty() { - futures.push(send_events(dest, events)); - } else { - current_transaction_status.remove(&dest); - } - } - } - } } } } - #[tracing::instrument(skip(self, dest, new_events, current_transaction_status))] + fn handle_response( + &self, response: SendingResult, futures: &mut SendingFutures<'_>, statuses: &mut CurTransactionStatus, + ) { + match response { + Ok(dest) => self.handle_response_ok(&dest, futures, statuses), + Err((dest, e)) => self.handle_response_err(dest, futures, statuses, &e), + }; + } + + fn handle_response_err( + &self, dest: Destination, _futures: &mut SendingFutures<'_>, statuses: &mut CurTransactionStatus, e: &Error, + ) { + debug!(dest = ?dest, "{e:?}"); + statuses.entry(dest).and_modify(|e| { + *e = match e { + TransactionStatus::Running => TransactionStatus::Failed(1, Instant::now()), + TransactionStatus::Retrying(n) => TransactionStatus::Failed(*n + 1, Instant::now()), + TransactionStatus::Failed(..) => panic!("Request that was not even running failed?!"), + } + }); + } + + fn handle_response_ok( + &self, dest: &Destination, futures: &mut SendingFutures<'_>, statuses: &mut CurTransactionStatus, + ) { + let _cork = services().globals.db.cork(); + self.db + .delete_all_active_requests_for(dest) + .expect("all active requests deleted"); + + // Find events that have been added since starting the last request + let new_events = self + .db + .queued_requests(dest) + .filter_map(Result::ok) + .take(DEQUEUE_LIMIT) + .collect::>(); + + // Insert any pdus we found + if !new_events.is_empty() { + self.db + .mark_as_active(&new_events) + .expect("marked as active"); + let new_events_vec = new_events.into_iter().map(|(event, _)| event).collect(); + futures.push(Box::pin(send_events(dest.clone(), new_events_vec))); + } else { + statuses.remove(dest); + } + } + + fn handle_request(&self, msg: Msg, futures: &mut SendingFutures<'_>, statuses: &mut CurTransactionStatus) { + let iv = vec![(msg.event, msg.queue_id)]; + if let Ok(Some(events)) = self.select_events(&msg.dest, iv, statuses) { + if !events.is_empty() { + futures.push(Box::pin(send_events(msg.dest, events))); + } else { + statuses.remove(&msg.dest); + } + } + } + + fn initial_transactions(&self, futures: &mut SendingFutures<'_>, statuses: &mut CurTransactionStatus) { + let keep = usize::try_from(self.startup_netburst_keep).unwrap_or(usize::MAX); + let mut txns = HashMap::>::new(); + for (key, dest, event) in self.db.active_requests().filter_map(Result::ok) { + let entry = txns.entry(dest.clone()).or_default(); + if self.startup_netburst_keep >= 0 && entry.len() >= keep { + warn!("Dropping unsent event {:?} {:?}", dest, String::from_utf8_lossy(&key)); + self.db + .delete_active_request(key) + .expect("active request deleted"); + } else { + entry.push(event); + } + } + + for (dest, events) in txns { + if self.startup_netburst && !events.is_empty() { + statuses.insert(dest.clone(), TransactionStatus::Running); + futures.push(Box::pin(send_events(dest.clone(), events))); + } + } + } + + #[tracing::instrument(skip(self, dest, new_events, statuses))] fn select_events( &self, dest: &Destination, - new_events: Vec<(SendingEventType, Vec)>, // Events we want to send: event and full key - current_transaction_status: &mut HashMap, - ) -> Result>> { - let (allow, retry) = self.select_events_current(dest.clone(), current_transaction_status)?; + new_events: Vec<(SendingEvent, Vec)>, // Events we want to send: event and full key + statuses: &mut CurTransactionStatus, + ) -> Result>> { + let (allow, retry) = self.select_events_current(dest.clone(), statuses)?; // Nothing can be done for this remote, bail out. if !allow { @@ -399,7 +443,7 @@ impl Service { // Add EDU's into the transaction if let Destination::Normal(server_name) = dest { if let Ok((select_edus, last_count)) = self.select_edus(server_name) { - events.extend(select_edus.into_iter().map(SendingEventType::Edu)); + events.extend(select_edus.into_iter().map(SendingEvent::Edu)); self.db.set_latest_educount(server_name, last_count)?; } } @@ -407,18 +451,16 @@ impl Service { Ok(Some(events)) } - #[tracing::instrument(skip(self, dest, current_transaction_status))] - fn select_events_current( - &self, dest: Destination, current_transaction_status: &mut HashMap, - ) -> Result<(bool, bool)> { + #[tracing::instrument(skip(self, dest, statuses))] + fn select_events_current(&self, dest: Destination, statuses: &mut CurTransactionStatus) -> Result<(bool, bool)> { let (mut allow, mut retry) = (true, false); - current_transaction_status + statuses .entry(dest) .and_modify(|e| match e { TransactionStatus::Failed(tries, time) => { // Fail if a request has failed recently (exponential backoff) - let min_duration = Duration::from_secs(services().globals.config.sender_timeout); let max_duration = Duration::from_secs(services().globals.config.sender_retry_backoff_limit); + let min_duration = Duration::from_secs(services().globals.config.sender_timeout); let min_elapsed_duration = min_duration * (*tries) * (*tries); let min_elapsed_duration = cmp::min(min_elapsed_duration, max_duration); if time.elapsed() < min_elapsed_duration { @@ -438,7 +480,7 @@ impl Service { } #[tracing::instrument(skip(self, server_name))] - pub(crate) fn select_edus(&self, server_name: &ServerName) -> Result<(Vec>, u64)> { + fn select_edus(&self, server_name: &ServerName) -> Result<(Vec>, u64)> { // u64: count of last edu let since = self.db.get_latest_educount(server_name)?; let mut events = Vec::new(); @@ -489,7 +531,7 @@ impl Service { /// Look for presence #[tracing::instrument(skip(server_name, since, max_edu_count, events))] -pub(crate) fn select_edus_presence( +fn select_edus_presence( server_name: &ServerName, since: u64, max_edu_count: &mut u64, events: &mut Vec>, ) -> Result { // Look for presence updates for this server @@ -534,7 +576,7 @@ pub(crate) fn select_edus_presence( /// Look for read receipts in this room #[tracing::instrument(skip(room_id, since, max_edu_count, events))] -pub(crate) fn select_edus_receipts( +fn select_edus_receipts( room_id: &RoomId, since: u64, max_edu_count: &mut u64, events: &mut Vec>, ) -> Result { for r in services() @@ -599,7 +641,7 @@ pub(crate) fn select_edus_receipts( Ok(true) } -async fn send_events(dest: Destination, events: Vec) -> Result { +async fn send_events(dest: Destination, events: Vec) -> SendingResult { debug_assert!(!events.is_empty(), "sending empty transaction"); match dest { Destination::Normal(ref server) => send_events_dest_normal(&dest, server, events).await, @@ -609,14 +651,12 @@ async fn send_events(dest: Destination, events: Vec) -> Result } #[tracing::instrument(skip(dest, events))] -async fn send_events_dest_appservice( - dest: &Destination, id: &String, events: Vec, -) -> Result { +async fn send_events_dest_appservice(dest: &Destination, id: &String, events: Vec) -> SendingResult { let mut pdu_jsons = Vec::new(); for event in &events { match event { - SendingEventType::Pdu(pdu_id) => { + SendingEvent::Pdu(pdu_id) => { pdu_jsons.push( services() .rooms @@ -632,7 +672,7 @@ async fn send_events_dest_appservice( .to_room_event(), ); }, - SendingEventType::Edu(_) | SendingEventType::Flush => { + SendingEvent::Edu(_) | SendingEvent::Flush => { // Appservices don't need EDUs (?) and flush only; // no new content }, @@ -659,8 +699,8 @@ async fn send_events_dest_appservice( &events .iter() .map(|e| match e { - SendingEventType::Edu(b) | SendingEventType::Pdu(b) => &**b, - SendingEventType::Flush => &[], + SendingEvent::Edu(b) | SendingEvent::Pdu(b) => &**b, + SendingEvent::Flush => &[], }) .collect::>(), ))) @@ -680,13 +720,13 @@ async fn send_events_dest_appservice( #[tracing::instrument(skip(dest, events))] async fn send_events_dest_push( - dest: &Destination, userid: &OwnedUserId, pushkey: &String, events: Vec, -) -> Result { + dest: &Destination, userid: &OwnedUserId, pushkey: &String, events: Vec, +) -> SendingResult { let mut pdus = Vec::new(); for event in &events { match event { - SendingEventType::Pdu(pdu_id) => { + SendingEvent::Pdu(pdu_id) => { pdus.push( services() .rooms @@ -701,7 +741,7 @@ async fn send_events_dest_push( })?, ); }, - SendingEventType::Edu(_) | SendingEventType::Flush => { + SendingEvent::Edu(_) | SendingEvent::Flush => { // Push gateways don't need EDUs (?) and flush only; // no new content }, @@ -758,14 +798,14 @@ async fn send_events_dest_push( #[tracing::instrument(skip(dest, events), name = "")] async fn send_events_dest_normal( - dest: &Destination, server_name: &OwnedServerName, events: Vec, -) -> Result { + dest: &Destination, server_name: &OwnedServerName, events: Vec, +) -> SendingResult { let mut edu_jsons = Vec::new(); let mut pdu_jsons = Vec::new(); for event in &events { match event { - SendingEventType::Pdu(pdu_id) => { + SendingEvent::Pdu(pdu_id) => { // TODO: check room version and remove event_id if needed let raw = PduEvent::convert_to_outgoing_federation_event( services() @@ -788,12 +828,12 @@ async fn send_events_dest_normal( ); pdu_jsons.push(raw); }, - SendingEventType::Edu(edu) => { + SendingEvent::Edu(edu) => { if let Ok(raw) = serde_json::from_slice(edu) { edu_jsons.push(raw); } }, - SendingEventType::Flush => { + SendingEvent::Flush => { // flush only; no new content }, } @@ -814,8 +854,8 @@ async fn send_events_dest_normal( &events .iter() .map(|e| match e { - SendingEventType::Edu(b) | SendingEventType::Pdu(b) => &**b, - SendingEventType::Flush => &[], + SendingEvent::Edu(b) | SendingEvent::Pdu(b) => &**b, + SendingEvent::Flush => &[], }) .collect::>(), )))