abstract expoential backoff to math utils.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-07-07 03:39:35 +00:00
parent 5e72d36800
commit 52a561ff9e
5 changed files with 60 additions and 53 deletions

View file

@ -1,9 +1,9 @@
use std::{ use std::{
cmp,
collections::{hash_map, BTreeMap, HashMap, HashSet}, collections::{hash_map, BTreeMap, HashMap, HashSet},
time::{Duration, Instant}, time::Instant,
}; };
use conduit::{utils, utils::math::continue_exponential_backoff_secs, Error, Result};
use futures_util::{stream::FuturesUnordered, StreamExt}; use futures_util::{stream::FuturesUnordered, StreamExt};
use ruma::{ use ruma::{
api::{ api::{
@ -18,15 +18,11 @@ use ruma::{
DeviceKeyAlgorithm, OwnedDeviceId, OwnedUserId, UserId, DeviceKeyAlgorithm, OwnedDeviceId, OwnedUserId, UserId,
}; };
use serde_json::json; use serde_json::json;
use service::user_is_local;
use tracing::debug; use tracing::debug;
use super::SESSION_ID_LENGTH; use super::SESSION_ID_LENGTH;
use crate::{ use crate::{services, Ruma};
service::user_is_local,
services,
utils::{self},
Error, Result, Ruma,
};
/// # `POST /_matrix/client/r0/keys/upload` /// # `POST /_matrix/client/r0/keys/upload`
/// ///
@ -357,11 +353,10 @@ pub(crate) async fn get_keys_helper<F: Fn(&UserId) -> bool + Send>(
.get(server) .get(server)
{ {
// Exponential backoff // Exponential backoff
const MAX_DURATION: Duration = Duration::from_secs(60 * 60 * 24); const MIN: u64 = 5 * 60;
let min_elapsed_duration = cmp::min(MAX_DURATION, Duration::from_secs(5 * 60) * (*tries) * (*tries)); const MAX: u64 = 60 * 60 * 24;
if continue_exponential_backoff_secs(MIN, MAX, time.elapsed(), *tries) {
if time.elapsed() < min_elapsed_duration { debug!("Backing off query from {server:?}");
debug!("Backing off query from {:?}", server);
return (server, Err(Error::BadServerResponse("bad query, still backing off"))); return (server, Err(Error::BadServerResponse("bad query, still backing off")));
} }
} }

View file

@ -1,13 +1,16 @@
use std::{ use std::{
cmp,
collections::{hash_map::Entry, BTreeMap, HashMap, HashSet}, collections::{hash_map::Entry, BTreeMap, HashMap, HashSet},
net::IpAddr, net::IpAddr,
sync::Arc, sync::Arc,
time::{Duration, Instant}, time::Instant,
}; };
use axum_client_ip::InsecureClientIp; use axum_client_ip::InsecureClientIp;
use conduit::utils::mutex_map; use conduit::{
debug, error, info, trace, utils,
utils::{math::continue_exponential_backoff_secs, mutex_map},
warn, Error, PduEvent, Result,
};
use ruma::{ use ruma::{
api::{ api::{
client::{ client::{
@ -35,7 +38,6 @@ use ruma::{
use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
use service::sending::convert_to_outgoing_federation_event; use service::sending::convert_to_outgoing_federation_event;
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tracing::{debug, error, info, trace, warn};
use crate::{ use crate::{
client::{update_avatar_url, update_displayname}, client::{update_avatar_url, update_displayname},
@ -43,7 +45,7 @@ use crate::{
pdu::{gen_event_id_canonical_json, PduBuilder}, pdu::{gen_event_id_canonical_json, PduBuilder},
server_is_ours, user_is_local, server_is_ours, user_is_local,
}, },
services, utils, Error, PduEvent, Result, Ruma, services, Ruma,
}; };
/// Checks if the room is banned in any way possible and the sender user is not /// Checks if the room is banned in any way possible and the sender user is not
@ -1363,11 +1365,10 @@ pub async fn validate_and_add_event_id(
.get(&event_id) .get(&event_id)
{ {
// Exponential backoff // Exponential backoff
const MAX_DURATION: Duration = Duration::from_secs(60 * 60 * 24); const MIN: u64 = 60 * 5;
let min_elapsed_duration = cmp::min(MAX_DURATION, Duration::from_secs(5 * 60) * (*tries) * (*tries)); const MAX: u64 = 60 * 60 * 24;
if continue_exponential_backoff_secs(MIN, MAX, time.elapsed(), *tries) {
if time.elapsed() < min_elapsed_duration { debug!("Backing off from {event_id}");
debug!("Backing off from {}", event_id);
return Err(Error::BadServerResponse("bad event, still backing off")); return Err(Error::BadServerResponse("bad event, still backing off"));
} }
} }

View file

@ -24,3 +24,21 @@ macro_rules! validated {
macro_rules! validated { macro_rules! validated {
($($input:tt)*) => { $crate::checked!($($input)*) } ($($input:tt)*) => { $crate::checked!($($input)*) }
} }
/// Returns false if the exponential backoff has expired based on the inputs
#[inline]
#[must_use]
pub fn continue_exponential_backoff_secs(min: u64, max: u64, elapsed: Duration, tries: u32) -> bool {
let min = Duration::from_secs(min);
let max = Duration::from_secs(max);
continue_exponential_backoff(min, max, elapsed, tries)
}
/// Returns false if the exponential backoff has expired based on the inputs
#[inline]
#[must_use]
pub fn continue_exponential_backoff(min: Duration, max: Duration, elapsed: Duration, tries: u32) -> bool {
let min = min.saturating_mul(tries).saturating_mul(tries);
let min = cmp::min(min, max);
elapsed < min
}

View file

@ -2,14 +2,16 @@ mod parse_incoming_pdu;
mod signing_keys; mod signing_keys;
use std::{ use std::{
cmp,
collections::{hash_map, BTreeMap, HashMap, HashSet}, collections::{hash_map, BTreeMap, HashMap, HashSet},
pin::Pin, pin::Pin,
sync::Arc, sync::Arc,
time::{Duration, Instant}, time::Instant,
}; };
use conduit::{debug_error, debug_info, Error, Result}; use conduit::{
debug, debug_error, debug_info, error, info, trace, utils::math::continue_exponential_backoff_secs, warn, Error,
Result,
};
use futures_util::Future; use futures_util::Future;
pub use parse_incoming_pdu::parse_incoming_pdu; pub use parse_incoming_pdu::parse_incoming_pdu;
use ruma::{ use ruma::{
@ -29,7 +31,6 @@ use ruma::{
uint, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedUserId, RoomId, RoomVersionId, ServerName, uint, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedUserId, RoomId, RoomVersionId, ServerName,
}; };
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tracing::{debug, error, info, trace, warn};
use super::state_compressor::CompressedStateEvent; use super::state_compressor::CompressedStateEvent;
use crate::{pdu, services, PduEvent}; use crate::{pdu, services, PduEvent};
@ -252,14 +253,12 @@ impl Service {
.get(prev_id) .get(prev_id)
{ {
// Exponential backoff // Exponential backoff
const MAX_DURATION: Duration = Duration::from_secs(60 * 60 * 24); const MIN_DURATION: u64 = 5 * 60;
let min_duration = cmp::min(MAX_DURATION, Duration::from_secs(5 * 60) * (*tries) * (*tries)); const MAX_DURATION: u64 = 60 * 60 * 24;
let duration = time.elapsed(); if continue_exponential_backoff_secs(MIN_DURATION, MAX_DURATION, time.elapsed(), *tries) {
if duration < min_duration {
debug!( debug!(
duration = ?duration, ?tries,
min_duration = ?min_duration, duration = ?time.elapsed(),
"Backing off from prev_event" "Backing off from prev_event"
); );
return Ok(()); return Ok(());
@ -1083,12 +1082,10 @@ impl Service {
.get(&*next_id) .get(&*next_id)
{ {
// Exponential backoff // Exponential backoff
const MAX_DURATION: Duration = Duration::from_secs(60 * 60 * 24); const MIN_DURATION: u64 = 5 * 60;
let min_elapsed_duration = const MAX_DURATION: u64 = 60 * 60 * 24;
cmp::min(MAX_DURATION, Duration::from_secs(5 * 60) * (*tries) * (*tries)); if continue_exponential_backoff_secs(MIN_DURATION, MAX_DURATION, time.elapsed(), *tries) {
info!("Backing off from {next_id}");
if time.elapsed() < min_elapsed_duration {
info!("Backing off from {}", next_id);
continue; continue;
} }
} }
@ -1191,12 +1188,10 @@ impl Service {
.get(&**next_id) .get(&**next_id)
{ {
// Exponential backoff // Exponential backoff
const MAX_DURATION: Duration = Duration::from_secs(60 * 60 * 24); const MIN_DURATION: u64 = 5 * 60;
let min_elapsed_duration = const MAX_DURATION: u64 = 60 * 60 * 24;
cmp::min(MAX_DURATION, Duration::from_secs(5 * 60) * (*tries) * (*tries)); if continue_exponential_backoff_secs(MIN_DURATION, MAX_DURATION, time.elapsed(), *tries) {
debug!("Backing off from {next_id}");
if time.elapsed() < min_elapsed_duration {
debug!("Backing off from {}", next_id);
continue; continue;
} }
} }

View file

@ -3,10 +3,11 @@ use std::{
collections::{BTreeMap, HashMap, HashSet}, collections::{BTreeMap, HashMap, HashSet},
fmt::Debug, fmt::Debug,
sync::Arc, sync::Arc,
time::{Duration, Instant}, time::Instant,
}; };
use base64::{engine::general_purpose, Engine as _}; use base64::{engine::general_purpose, Engine as _};
use conduit::{debug, error, utils::math::continue_exponential_backoff_secs, warn};
use federation::transactions::send_transaction_message; use federation::transactions::send_transaction_message;
use futures_util::{future::BoxFuture, stream::FuturesUnordered, StreamExt}; use futures_util::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
use ruma::{ use ruma::{
@ -22,7 +23,6 @@ use ruma::{
ServerName, UInt, ServerName, UInt,
}; };
use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
use tracing::{debug, error, warn};
use super::{appservice, send, Destination, Msg, SendingEvent, Service}; use super::{appservice, send, Destination, Msg, SendingEvent, Service};
use crate::{presence::Presence, services, user_is_local, utils::calculate_hash, Error, Result}; use crate::{presence::Presence, services, user_is_local, utils::calculate_hash, Error, Result};
@ -216,11 +216,9 @@ impl Service {
.and_modify(|e| match e { .and_modify(|e| match e {
TransactionStatus::Failed(tries, time) => { TransactionStatus::Failed(tries, time) => {
// Fail if a request has failed recently (exponential backoff) // Fail if a request has failed recently (exponential backoff)
let max_duration = Duration::from_secs(services().globals.config.sender_retry_backoff_limit); let min = services().globals.config.sender_timeout;
let min_duration = Duration::from_secs(services().globals.config.sender_timeout); let max = services().globals.config.sender_retry_backoff_limit;
let min_elapsed_duration = min_duration * (*tries) * (*tries); if continue_exponential_backoff_secs(min, max, time.elapsed(), *tries) {
let min_elapsed_duration = cmp::min(min_elapsed_duration, max_duration);
if time.elapsed() < min_elapsed_duration {
allow = false; allow = false;
} else { } else {
retry = true; retry = true;