From bd2570944614eedafbade9d1dbaee77a6aa64d5c Mon Sep 17 00:00:00 2001 From: strawberry Date: Wed, 17 Apr 2024 15:16:01 -0400 Subject: [PATCH] Revert "dont use loole for sending channel code" This reverts commit d0a9666a2976b983daf72eb0c0f8d41c04443f84. --- src/service/sending/mod.rs | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index 2323aa57..8c813970 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -25,7 +25,7 @@ use ruma::{ events::{push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent, GlobalAccountDataEventType}, push, uint, MilliSecondsSinceUnixEpoch, OwnedServerName, OwnedUserId, RoomId, ServerName, UInt, UserId, }; -use tokio::sync::{mpsc, Mutex, Semaphore}; +use tokio::sync::{Mutex, Semaphore}; use tracing::{error, warn}; use crate::{service::presence::Presence, services, utils::calculate_hash, Config, Error, PduEvent, Result}; @@ -42,8 +42,8 @@ pub struct Service { /// The state for a given state hash. pub(super) maximum_requests: Arc, - pub sender: mpsc::UnboundedSender<(OutgoingKind, SendingEventType, Vec)>, - receiver: Mutex)>>, + pub sender: loole::Sender<(OutgoingKind, SendingEventType, Vec)>, + receiver: Mutex)>>, startup_netburst: bool, startup_netburst_keep: i64, timeout: u64, @@ -72,7 +72,7 @@ enum TransactionStatus { impl Service { pub fn build(db: &'static dyn Data, config: &Config) -> Arc { - let (sender, receiver) = mpsc::unbounded_channel(); + let (sender, receiver) = loole::unbounded(); Arc::new(Self { db, sender, @@ -274,7 +274,7 @@ impl Service { #[tracing::instrument(skip(self), name = "sender")] async fn handler(&self) -> Result<()> { - let mut receiver = self.receiver.lock().await; + let receiver = self.receiver.lock().await; let mut futures = FuturesUnordered::new(); let mut current_transaction_status = HashMap::::new(); @@ -342,13 +342,16 @@ impl Service { } }; }, - Some((outgoing_kind, event, key)) = receiver.recv() => { - if let Ok(Some(events)) = self.select_events( - &outgoing_kind, - vec![(event, key)], - &mut current_transaction_status, - ) { - futures.push(handle_events(outgoing_kind, events)); + + event = receiver.recv_async() => { + if let Ok((outgoing_kind, event, key)) = event { + if let Ok(Some(events)) = self.select_events( + &outgoing_kind, + vec![(event, key)], + &mut current_transaction_status, + ) { + futures.push(handle_events(outgoing_kind, events)); + } } } }