From 125ff21c887e5a16775fd98ab502d00c5176307c Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Wed, 27 Mar 2024 14:47:21 -0700 Subject: [PATCH] add conf item to toggle startup netburst (for developers). Signed-off-by: Jason Volk --- src/config/mod.rs | 6 ++++++ src/service/sending/mod.rs | 34 ++++++++++++++++++---------------- 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/src/config/mod.rs b/src/config/mod.rs index 9436fdb5..cc5f088a 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -226,6 +226,9 @@ pub struct Config { #[serde(with = "serde_regex")] pub forbidden_usernames: RegexSet, + #[serde(default = "default_startup_netburst")] + pub startup_netburst: bool, + #[serde(default)] pub block_non_admin_invites: bool, @@ -518,6 +521,7 @@ impl fmt::Display for Config { "Allow check for updates / announcements check", &self.allow_check_for_updates.to_string(), ), + ("Enable netburst on startup", &self.startup_netburst.to_string()), ]; let mut msg: String = "Active config values:\n\n".to_owned(); @@ -675,3 +679,5 @@ fn default_url_preview_max_spider_size() -> usize { } fn default_new_user_displayname_suffix() -> String { "🏳️‍⚧️".to_owned() } + +fn default_startup_netburst() -> bool { true } diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index c876af49..579f6814 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -90,6 +90,7 @@ pub struct Service { pub(super) maximum_requests: Arc, pub sender: mpsc::UnboundedSender<(OutgoingKind, SendingEventType, Vec)>, receiver: Mutex)>>, + startup_netburst: bool, } enum TransactionStatus { @@ -106,6 +107,7 @@ impl Service { sender, receiver: Mutex::new(receiver), maximum_requests: Arc::new(Semaphore::new(config.max_concurrent_requests as usize)), + startup_netburst: config.startup_netburst, }) } @@ -123,29 +125,29 @@ impl Service { let mut receiver = self.receiver.lock().await; let mut futures = FuturesUnordered::new(); - let mut current_transaction_status = HashMap::::new(); // Retry requests we could not finish yet - let mut initial_transactions = HashMap::>::new(); + if self.startup_netburst { + let mut initial_transactions = HashMap::>::new(); + for (key, outgoing_kind, event) in self.db.active_requests().filter_map(Result::ok) { + let entry = initial_transactions + .entry(outgoing_kind.clone()) + .or_default(); - for (key, outgoing_kind, event) in self.db.active_requests().filter_map(Result::ok) { - let entry = initial_transactions - .entry(outgoing_kind.clone()) - .or_default(); + if entry.len() > 30 { + warn!("Dropping some current events: {:?} {:?} {:?}", key, outgoing_kind, event); + self.db.delete_active_request(key)?; + continue; + } - if entry.len() > 30 { - warn!("Dropping some current events: {:?} {:?} {:?}", key, outgoing_kind, event); - self.db.delete_active_request(key)?; - continue; + entry.push(event); } - entry.push(event); - } - - for (outgoing_kind, events) in initial_transactions { - current_transaction_status.insert(outgoing_kind.clone(), TransactionStatus::Running); - futures.push(Self::handle_events(outgoing_kind.clone(), events)); + for (outgoing_kind, events) in initial_transactions { + current_transaction_status.insert(outgoing_kind.clone(), TransactionStatus::Running); + futures.push(Self::handle_events(outgoing_kind.clone(), events)); + } } loop {