add corks to coalesce writes for several heavy calltrees.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-03-21 04:13:08 -07:00 committed by June
parent 3f60365cc6
commit 61b1d6d869
3 changed files with 13 additions and 0 deletions

View file

@ -197,6 +197,10 @@ async fn sync_helper(
.extend(services().users.keys_changed(sender_user.as_ref(), since, None).filter_map(std::result::Result::ok)); .extend(services().users.keys_changed(sender_user.as_ref(), since, None).filter_map(std::result::Result::ok));
let all_joined_rooms = services().rooms.state_cache.rooms_joined(&sender_user).collect::<Vec<_>>(); let all_joined_rooms = services().rooms.state_cache.rooms_joined(&sender_user).collect::<Vec<_>>();
// Coalesce database writes for the remainder of this scope.
let _cork = services().globals.db.cork_and_flush()?;
for room_id in all_joined_rooms { for room_id in all_joined_rooms {
let room_id = room_id?; let room_id = room_id?;
if let Ok(joined_room) = load_joined_room( if let Ok(joined_room) = load_joined_room(

View file

@ -221,6 +221,9 @@ impl Service {
leaves: Vec<OwnedEventId>, leaves: Vec<OwnedEventId>,
state_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex state_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex
) -> Result<Vec<u8>> { ) -> Result<Vec<u8>> {
// Coalesce database writes for the remainder of this scope.
let _cork = services().globals.db.cork_and_flush()?;
let shortroomid = services().rooms.short.get_shortroomid(&pdu.room_id)?.expect("room exists"); let shortroomid = services().rooms.short.get_shortroomid(&pdu.room_id)?.expect("room exists");
// Make unsigned fields correct. This is not properly documented in the spec, // Make unsigned fields correct. This is not properly documented in the spec,

View file

@ -148,6 +148,7 @@ impl Service {
Some(response) = futures.next() => { Some(response) = futures.next() => {
match response { match response {
Ok(outgoing_kind) => { Ok(outgoing_kind) => {
let _cork = services().globals.db.cork();
self.db.delete_all_active_requests_for(&outgoing_kind)?; self.db.delete_all_active_requests_for(&outgoing_kind)?;
// Find events that have been added since starting the last request // Find events that have been added since starting the last request
@ -202,6 +203,7 @@ impl Service {
let mut retry = false; let mut retry = false;
let mut allow = true; let mut allow = true;
let _cork = services().globals.db.cork();
let entry = current_transaction_status.entry(outgoing_kind.clone()); let entry = current_transaction_status.entry(outgoing_kind.clone());
entry entry
@ -384,6 +386,7 @@ impl Service {
pub fn send_push_pdu(&self, pdu_id: &[u8], user: &UserId, pushkey: String) -> Result<()> { pub fn send_push_pdu(&self, pdu_id: &[u8], user: &UserId, pushkey: String) -> Result<()> {
let outgoing_kind = OutgoingKind::Push(user.to_owned(), pushkey); let outgoing_kind = OutgoingKind::Push(user.to_owned(), pushkey);
let event = SendingEventType::Pdu(pdu_id.to_owned()); let event = SendingEventType::Pdu(pdu_id.to_owned());
let _cork = services().globals.db.cork()?;
let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?; let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?;
self.sender.send((outgoing_kind, event, keys.into_iter().next().unwrap())).unwrap(); self.sender.send((outgoing_kind, event, keys.into_iter().next().unwrap())).unwrap();
@ -396,6 +399,7 @@ impl Service {
.into_iter() .into_iter()
.map(|server| (OutgoingKind::Normal(server), SendingEventType::Pdu(pdu_id.to_owned()))) .map(|server| (OutgoingKind::Normal(server), SendingEventType::Pdu(pdu_id.to_owned())))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let _cork = services().globals.db.cork()?;
let keys = self.db.queue_requests(&requests.iter().map(|(o, e)| (o, e.clone())).collect::<Vec<_>>())?; let keys = self.db.queue_requests(&requests.iter().map(|(o, e)| (o, e.clone())).collect::<Vec<_>>())?;
for ((outgoing_kind, event), key) in requests.into_iter().zip(keys) { for ((outgoing_kind, event), key) in requests.into_iter().zip(keys) {
self.sender.send((outgoing_kind.clone(), event, key)).unwrap(); self.sender.send((outgoing_kind.clone(), event, key)).unwrap();
@ -408,6 +412,7 @@ impl Service {
pub fn send_reliable_edu(&self, server: &ServerName, serialized: Vec<u8>, id: u64) -> Result<()> { pub fn send_reliable_edu(&self, server: &ServerName, serialized: Vec<u8>, id: u64) -> Result<()> {
let outgoing_kind = OutgoingKind::Normal(server.to_owned()); let outgoing_kind = OutgoingKind::Normal(server.to_owned());
let event = SendingEventType::Edu(serialized); let event = SendingEventType::Edu(serialized);
let _cork = services().globals.db.cork()?;
let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?; let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?;
self.sender.send((outgoing_kind, event, keys.into_iter().next().unwrap())).unwrap(); self.sender.send((outgoing_kind, event, keys.into_iter().next().unwrap())).unwrap();
@ -418,6 +423,7 @@ impl Service {
pub fn send_pdu_appservice(&self, appservice_id: String, pdu_id: Vec<u8>) -> Result<()> { pub fn send_pdu_appservice(&self, appservice_id: String, pdu_id: Vec<u8>) -> Result<()> {
let outgoing_kind = OutgoingKind::Appservice(appservice_id); let outgoing_kind = OutgoingKind::Appservice(appservice_id);
let event = SendingEventType::Pdu(pdu_id); let event = SendingEventType::Pdu(pdu_id);
let _cork = services().globals.db.cork()?;
let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?; let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?;
self.sender.send((outgoing_kind, event, keys.into_iter().next().unwrap())).unwrap(); self.sender.send((outgoing_kind, event, keys.into_iter().next().unwrap())).unwrap();