diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index a6c3411f..26f43fd3 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -5,6 +5,7 @@ mod sender; use std::{fmt::Debug, sync::Arc}; +use async_trait::async_trait; use conduit::{err, Result, Server}; use ruma::{ api::{appservice::Registration, OutgoingRequest}, @@ -47,6 +48,32 @@ pub enum SendingEvent { Flush, // none } +#[async_trait] +impl crate::Service for Service { + fn build(args: crate::Args<'_>) -> Result> { + let (sender, receiver) = loole::unbounded(); + Ok(Arc::new(Self { + db: data::Data::new(args.db.clone()), + server: args.server.clone(), + sender, + receiver: Mutex::new(receiver), + })) + } + + async fn worker(self: Arc) -> Result<()> { + // trait impl can't be split between files so this just glues to mod sender + self.sender().await + } + + fn interrupt(&self) { + if !self.sender.is_closed() { + self.sender.close(); + } + } + + fn name(&self) -> &str { crate::service::make_name(std::module_path!()) } +} + impl Service { #[tracing::instrument(skip(self, pdu_id, user, pushkey), level = "debug")] pub fn send_pdu_push(&self, pdu_id: &[u8], user: &UserId, pushkey: String) -> Result<()> { diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index a924ce55..df41db28 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -2,11 +2,9 @@ use std::{ cmp, collections::{BTreeMap, HashMap, HashSet}, fmt::Debug, - sync::Arc, time::{Duration, Instant}, }; -use async_trait::async_trait; use base64::{engine::general_purpose, Engine as _}; use conduit::{debug, debug_warn, error, trace, utils::math::continue_exponential_backoff_secs, warn}; use federation::transactions::send_transaction_message; @@ -24,9 +22,9 @@ use ruma::{ ServerName, UInt, }; use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; -use tokio::{sync::Mutex, time::sleep_until}; +use tokio::time::sleep_until; -use super::{appservice, data::Data, send, Destination, Msg, SendingEvent, Service}; +use super::{appservice, send, Destination, Msg, SendingEvent, Service}; use crate::{presence::Presence, services, user_is_local, utils::calculate_hash, Error, Result}; #[derive(Debug)] @@ -46,20 +44,9 @@ const DEQUEUE_LIMIT: usize = 48; const SELECT_EDU_LIMIT: usize = 16; const CLEANUP_TIMEOUT_MS: u64 = 3500; -#[async_trait] -impl crate::Service for Service { - fn build(args: crate::Args<'_>) -> Result> { - let (sender, receiver) = loole::unbounded(); - Ok(Arc::new(Self { - db: Data::new(args.db.clone()), - server: args.server.clone(), - sender, - receiver: Mutex::new(receiver), - })) - } - +impl Service { #[tracing::instrument(skip_all, name = "sender")] - async fn worker(self: Arc) -> Result<()> { + pub(super) async fn sender(&self) -> Result<()> { let receiver = self.receiver.lock().await; let mut futures: SendingFutures<'_> = FuturesUnordered::new(); let mut statuses: CurTransactionStatus = CurTransactionStatus::new(); @@ -82,16 +69,6 @@ impl crate::Service for Service { Ok(()) } - fn interrupt(&self) { - if !self.sender.is_closed() { - self.sender.close(); - } - } - - fn name(&self) -> &str { crate::service::make_name(std::module_path!()) } -} - -impl Service { fn handle_response( &self, response: SendingResult, futures: &SendingFutures<'_>, statuses: &mut CurTransactionStatus, ) {