remove max_concurrent_requests sender hazard
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
00ce43d739
commit
0b33eec1c2
3 changed files with 7 additions and 51 deletions
|
@ -83,21 +83,6 @@ port = 6167
|
||||||
# likely need this to be 0.0.0.0.
|
# likely need this to be 0.0.0.0.
|
||||||
address = "127.0.0.1"
|
address = "127.0.0.1"
|
||||||
|
|
||||||
# How many requests conduwuit sends to other servers at the same time concurrently. Default is 500
|
|
||||||
# Note that because conduwuit is very fast unlike other homeserver implementations, setting this too
|
|
||||||
# high could inadvertently result in ratelimits kicking in, or overloading lower-end homeservers out there.
|
|
||||||
#
|
|
||||||
# A valid use-case for enabling this is if you have a significant amount of overall federation activity
|
|
||||||
# such as many rooms joined/tracked, and many servers in the true destination cache caused by that. Upon
|
|
||||||
# rebooting conduwuit, depending on how fast your resources are, client and incoming federation requests
|
|
||||||
# may timeout or be "stalled" for a period of time due to hitting the max concurrent requests limit from
|
|
||||||
# refreshing federation/destination caches and such.
|
|
||||||
#
|
|
||||||
# If you have a lot of active users on your homeserver, you will definitely need to raise this.
|
|
||||||
#
|
|
||||||
# No this will not speed up room joins.
|
|
||||||
#max_concurrent_requests = 500
|
|
||||||
|
|
||||||
# Max request size for file uploads
|
# Max request size for file uploads
|
||||||
max_request_size = 20_000_000 # in bytes
|
max_request_size = 20_000_000 # in bytes
|
||||||
|
|
||||||
|
|
|
@ -106,8 +106,6 @@ pub(crate) struct Config {
|
||||||
|
|
||||||
#[serde(default = "default_max_request_size")]
|
#[serde(default = "default_max_request_size")]
|
||||||
pub(crate) max_request_size: u32,
|
pub(crate) max_request_size: u32,
|
||||||
#[serde(default = "default_max_concurrent_requests")]
|
|
||||||
pub(crate) max_concurrent_requests: u16,
|
|
||||||
#[serde(default = "default_max_fetch_prev_events")]
|
#[serde(default = "default_max_fetch_prev_events")]
|
||||||
pub(crate) max_fetch_prev_events: u16,
|
pub(crate) max_fetch_prev_events: u16,
|
||||||
|
|
||||||
|
@ -511,7 +509,6 @@ impl fmt::Display for Config {
|
||||||
("DNS fallback to TCP", &self.dns_tcp_fallback.to_string()),
|
("DNS fallback to TCP", &self.dns_tcp_fallback.to_string()),
|
||||||
("Query all nameservers", &self.query_all_nameservers.to_string()),
|
("Query all nameservers", &self.query_all_nameservers.to_string()),
|
||||||
("Maximum request size (bytes)", &self.max_request_size.to_string()),
|
("Maximum request size (bytes)", &self.max_request_size.to_string()),
|
||||||
("Maximum concurrent requests", &self.max_concurrent_requests.to_string()),
|
|
||||||
("Sender retry backoff limit", &self.sender_retry_backoff_limit.to_string()),
|
("Sender retry backoff limit", &self.sender_retry_backoff_limit.to_string()),
|
||||||
("Request connect timeout", &self.request_conn_timeout.to_string()),
|
("Request connect timeout", &self.request_conn_timeout.to_string()),
|
||||||
("Request timeout", &self.request_timeout.to_string()),
|
("Request timeout", &self.request_timeout.to_string()),
|
||||||
|
@ -877,8 +874,6 @@ fn default_max_request_size() -> u32 {
|
||||||
20 * 1024 * 1024 // Default to 20 MB
|
20 * 1024 * 1024 // Default to 20 MB
|
||||||
}
|
}
|
||||||
|
|
||||||
fn default_max_concurrent_requests() -> u16 { 500 }
|
|
||||||
|
|
||||||
fn default_request_conn_timeout() -> u64 { 10 }
|
fn default_request_conn_timeout() -> u64 { 10 }
|
||||||
|
|
||||||
fn default_request_timeout() -> u64 { 35 }
|
fn default_request_timeout() -> u64 { 35 }
|
||||||
|
|
|
@ -25,7 +25,7 @@ use ruma::{
|
||||||
events::{push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent, GlobalAccountDataEventType},
|
events::{push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent, GlobalAccountDataEventType},
|
||||||
push, uint, MilliSecondsSinceUnixEpoch, OwnedServerName, OwnedUserId, RoomId, ServerName, UInt, UserId,
|
push, uint, MilliSecondsSinceUnixEpoch, OwnedServerName, OwnedUserId, RoomId, ServerName, UInt, UserId,
|
||||||
};
|
};
|
||||||
use tokio::sync::{Mutex, Semaphore};
|
use tokio::sync::Mutex;
|
||||||
use tracing::{debug, error, warn};
|
use tracing::{debug, error, warn};
|
||||||
|
|
||||||
use crate::{service::presence::Presence, services, utils::calculate_hash, Config, Error, PduEvent, Result};
|
use crate::{service::presence::Presence, services, utils::calculate_hash, Config, Error, PduEvent, Result};
|
||||||
|
@ -39,7 +39,6 @@ pub(crate) struct Service {
|
||||||
pub(crate) db: &'static dyn Data,
|
pub(crate) db: &'static dyn Data,
|
||||||
|
|
||||||
/// The state for a given state hash.
|
/// The state for a given state hash.
|
||||||
pub(crate) maximum_requests: Arc<Semaphore>,
|
|
||||||
sender: loole::Sender<Msg>,
|
sender: loole::Sender<Msg>,
|
||||||
receiver: Mutex<loole::Receiver<Msg>>,
|
receiver: Mutex<loole::Receiver<Msg>>,
|
||||||
startup_netburst: bool,
|
startup_netburst: bool,
|
||||||
|
@ -91,7 +90,6 @@ impl Service {
|
||||||
db,
|
db,
|
||||||
sender,
|
sender,
|
||||||
receiver: Mutex::new(receiver),
|
receiver: Mutex::new(receiver),
|
||||||
maximum_requests: Arc::new(Semaphore::new(config.max_concurrent_requests as usize)),
|
|
||||||
startup_netburst: config.startup_netburst,
|
startup_netburst: config.startup_netburst,
|
||||||
startup_netburst_keep: config.startup_netburst_keep,
|
startup_netburst_keep: config.startup_netburst_keep,
|
||||||
})
|
})
|
||||||
|
@ -245,10 +243,7 @@ impl Service {
|
||||||
T: OutgoingRequest + Debug,
|
T: OutgoingRequest + Debug,
|
||||||
{
|
{
|
||||||
let client = &services().globals.client.federation;
|
let client = &services().globals.client.federation;
|
||||||
let permit = self.maximum_requests.acquire().await;
|
send::send(client, dest, request).await
|
||||||
let response = send::send(client, dest, request).await;
|
|
||||||
drop(permit);
|
|
||||||
response
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sends a request to an appservice
|
/// Sends a request to an appservice
|
||||||
|
@ -261,11 +256,7 @@ impl Service {
|
||||||
where
|
where
|
||||||
T: OutgoingRequest + Debug,
|
T: OutgoingRequest + Debug,
|
||||||
{
|
{
|
||||||
let permit = self.maximum_requests.acquire().await;
|
appservice::send_request(registration, request).await
|
||||||
let response = appservice::send_request(registration, request).await;
|
|
||||||
drop(permit);
|
|
||||||
|
|
||||||
response
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Cleanup event data
|
/// Cleanup event data
|
||||||
|
@ -670,10 +661,8 @@ async fn send_events_dest_appservice(dest: &Destination, id: &String, events: Ve
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let permit = services().sending.maximum_requests.acquire().await;
|
|
||||||
|
|
||||||
debug_assert!(!pdu_jsons.is_empty(), "sending empty transaction");
|
debug_assert!(!pdu_jsons.is_empty(), "sending empty transaction");
|
||||||
let response = match appservice::send_request(
|
match appservice::send_request(
|
||||||
services()
|
services()
|
||||||
.appservice
|
.appservice
|
||||||
.get_registration(id)
|
.get_registration(id)
|
||||||
|
@ -702,11 +691,7 @@ async fn send_events_dest_appservice(dest: &Destination, id: &String, events: Ve
|
||||||
{
|
{
|
||||||
Ok(_) => Ok(dest.clone()),
|
Ok(_) => Ok(dest.clone()),
|
||||||
Err(e) => Err((dest.clone(), e)),
|
Err(e) => Err((dest.clone(), e)),
|
||||||
};
|
}
|
||||||
|
|
||||||
drop(permit);
|
|
||||||
|
|
||||||
response
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(dest, events))]
|
#[tracing::instrument(skip(dest, events))]
|
||||||
|
@ -772,16 +757,12 @@ async fn send_events_dest_push(
|
||||||
.try_into()
|
.try_into()
|
||||||
.expect("notification count can't go that high");
|
.expect("notification count can't go that high");
|
||||||
|
|
||||||
let permit = services().sending.maximum_requests.acquire().await;
|
|
||||||
|
|
||||||
let _response = services()
|
let _response = services()
|
||||||
.pusher
|
.pusher
|
||||||
.send_push_notice(userid, unread, &pusher, rules_for_user, &pdu)
|
.send_push_notice(userid, unread, &pusher, rules_for_user, &pdu)
|
||||||
.await
|
.await
|
||||||
.map(|_response| dest.clone())
|
.map(|_response| dest.clone())
|
||||||
.map_err(|e| (dest.clone(), e));
|
.map_err(|e| (dest.clone(), e));
|
||||||
|
|
||||||
drop(permit);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(dest.clone())
|
Ok(dest.clone())
|
||||||
|
@ -830,10 +811,9 @@ async fn send_events_dest_normal(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let permit = services().sending.maximum_requests.acquire().await;
|
|
||||||
let client = &services().globals.client.sender;
|
let client = &services().globals.client.sender;
|
||||||
debug_assert!(pdu_jsons.len() + edu_jsons.len() > 0, "sending empty transaction");
|
debug_assert!(pdu_jsons.len() + edu_jsons.len() > 0, "sending empty transaction");
|
||||||
let response = send::send(
|
send::send(
|
||||||
client,
|
client,
|
||||||
server_name,
|
server_name,
|
||||||
send_transaction_message::v1::Request {
|
send_transaction_message::v1::Request {
|
||||||
|
@ -862,11 +842,7 @@ async fn send_events_dest_normal(
|
||||||
}
|
}
|
||||||
dest.clone()
|
dest.clone()
|
||||||
})
|
})
|
||||||
.map_err(|e| (dest.clone(), e));
|
.map_err(|e| (dest.clone(), e))
|
||||||
|
|
||||||
drop(permit);
|
|
||||||
|
|
||||||
response
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Destination {
|
impl Destination {
|
||||||
|
|
Loading…
Add table
Reference in a new issue