From 2423de2d517cd382ec71e9550e74a5da07aa23a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Tue, 13 Jul 2021 15:44:25 +0200 Subject: [PATCH] improvement: locks --- src/client_server/account.rs | 35 ++++++++++++++- src/client_server/membership.rs | 78 ++++++++++++++++++++++++++++----- src/client_server/message.rs | 14 ++++++ src/client_server/profile.rs | 34 +++++++++++--- src/client_server/redact.rs | 15 +++++++ src/client_server/room.rs | 40 ++++++++++++++++- src/client_server/state.rs | 13 ++++++ src/client_server/sync.rs | 38 ++++++++++++++++ src/database/admin.rs | 71 ++++++++++++++++++------------ src/database/globals.rs | 2 + src/database/rooms.rs | 14 +++++- src/server_server.rs | 21 +++++++-- 12 files changed, 321 insertions(+), 54 deletions(-) diff --git a/src/client_server/account.rs b/src/client_server/account.rs index e5268ec0..31711f89 100644 --- a/src/client_server/account.rs +++ b/src/client_server/account.rs @@ -1,4 +1,4 @@ -use std::{collections::BTreeMap, convert::TryInto}; +use std::{collections::BTreeMap, convert::TryInto, sync::Arc}; use super::{DEVICE_ID_LENGTH, SESSION_ID_LENGTH, TOKEN_LENGTH}; use crate::{database::DatabaseGuard, pdu::PduBuilder, utils, ConduitResult, Error, Ruma}; @@ -236,6 +236,16 @@ pub async fn register_route( let room_id = RoomId::new(db.globals.server_name()); + let mutex = Arc::clone( + db.globals + .roomid_mutex + .write() + .unwrap() + .entry(room_id.clone()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; + let mut content = ruma::events::room::create::CreateEventContent::new(conduit_user.clone()); content.federate = true; content.predecessor = None; @@ -253,6 +263,7 @@ pub async fn register_route( &conduit_user, &room_id, &db, + &mutex_lock, )?; // 2. Make conduit bot join @@ -274,6 +285,7 @@ pub async fn register_route( &conduit_user, &room_id, &db, + &mutex_lock, )?; // 3. Power levels @@ -298,6 +310,7 @@ pub async fn register_route( &conduit_user, &room_id, &db, + &mutex_lock, )?; // 4.1 Join Rules @@ -315,6 +328,7 @@ pub async fn register_route( &conduit_user, &room_id, &db, + &mutex_lock, )?; // 4.2 History Visibility @@ -334,6 +348,7 @@ pub async fn register_route( &conduit_user, &room_id, &db, + &mutex_lock, )?; // 4.3 Guest Access @@ -351,6 +366,7 @@ pub async fn register_route( &conduit_user, &room_id, &db, + &mutex_lock, )?; // 6. Events implied by name and topic @@ -370,6 +386,7 @@ pub async fn register_route( &conduit_user, &room_id, &db, + &mutex_lock, )?; db.rooms.build_and_append_pdu( @@ -386,6 +403,7 @@ pub async fn register_route( &conduit_user, &room_id, &db, + &mutex_lock, )?; // Room alias @@ -408,6 +426,7 @@ pub async fn register_route( &conduit_user, &room_id, &db, + &mutex_lock, )?; db.rooms.set_alias(&alias, Some(&room_id), &db.globals)?; @@ -431,6 +450,7 @@ pub async fn register_route( &conduit_user, &room_id, &db, + &mutex_lock, )?; db.rooms.build_and_append_pdu( PduBuilder { @@ -450,6 +470,7 @@ pub async fn register_route( &user_id, &room_id, &db, + &mutex_lock, )?; // Send welcome message @@ -468,6 +489,7 @@ pub async fn register_route( &conduit_user, &room_id, &db, + &mutex_lock, )?; } @@ -643,6 +665,16 @@ pub async fn deactivate_route( third_party_invite: None, }; + let mutex = Arc::clone( + db.globals + .roomid_mutex + .write() + .unwrap() + .entry(room_id.clone()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; + db.rooms.build_and_append_pdu( PduBuilder { event_type: EventType::RoomMember, @@ -654,6 +686,7 @@ pub async fn deactivate_route( &sender_user, &room_id, &db, + &mutex_lock, )?; } diff --git a/src/client_server/membership.rs b/src/client_server/membership.rs index 9401b761..a74950b6 100644 --- a/src/client_server/membership.rs +++ b/src/client_server/membership.rs @@ -203,6 +203,16 @@ pub async fn kick_user_route( event.membership = ruma::events::room::member::MembershipState::Leave; // TODO: reason + let mutex = Arc::clone( + db.globals + .roomid_mutex + .write() + .unwrap() + .entry(body.room_id.clone()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; + db.rooms.build_and_append_pdu( PduBuilder { event_type: EventType::RoomMember, @@ -214,8 +224,11 @@ pub async fn kick_user_route( &sender_user, &body.room_id, &db, + &mutex_lock, )?; + drop(mutex_lock); + db.flush().await?; Ok(kick_user::Response::new().into()) @@ -261,6 +274,16 @@ pub async fn ban_user_route( }, )?; + let mutex = Arc::clone( + db.globals + .roomid_mutex + .write() + .unwrap() + .entry(body.room_id.clone()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; + db.rooms.build_and_append_pdu( PduBuilder { event_type: EventType::RoomMember, @@ -272,8 +295,11 @@ pub async fn ban_user_route( &sender_user, &body.room_id, &db, + &mutex_lock, )?; + drop(mutex_lock); + db.flush().await?; Ok(ban_user::Response::new().into()) @@ -310,6 +336,16 @@ pub async fn unban_user_route( event.membership = ruma::events::room::member::MembershipState::Leave; + let mutex = Arc::clone( + db.globals + .roomid_mutex + .write() + .unwrap() + .entry(body.room_id.clone()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; + db.rooms.build_and_append_pdu( PduBuilder { event_type: EventType::RoomMember, @@ -321,8 +357,11 @@ pub async fn unban_user_route( &sender_user, &body.room_id, &db, + &mutex_lock, )?; + drop(mutex_lock); + db.flush().await?; Ok(unban_user::Response::new().into()) @@ -446,6 +485,16 @@ async fn join_room_by_id_helper( ) -> ConduitResult { let sender_user = sender_user.expect("user is authenticated"); + let mutex = Arc::clone( + db.globals + .roomid_mutex + .write() + .unwrap() + .entry(room_id.clone()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; + // Ask a remote server if we don't have this room if !db.rooms.exists(&room_id)? && room_id.server_name() != db.globals.server_name() { let mut make_join_response_and_server = Err(Error::BadServerResponse( @@ -649,9 +698,12 @@ async fn join_room_by_id_helper( &sender_user, &room_id, &db, + &mutex_lock, )?; } + drop(mutex_lock); + db.flush().await?; Ok(join_room_by_id::Response::new(room_id.clone()).into()) @@ -721,13 +773,23 @@ async fn validate_and_add_event_id( Ok((event_id, value)) } -pub async fn invite_helper( +pub async fn invite_helper<'a>( sender_user: &UserId, user_id: &UserId, room_id: &RoomId, db: &Database, is_direct: bool, ) -> Result<()> { + let mutex = Arc::clone( + db.globals + .roomid_mutex + .write() + .unwrap() + .entry(room_id.clone()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; + if user_id.server_name() != db.globals.server_name() { let prev_events = db .rooms @@ -863,6 +925,8 @@ pub async fn invite_helper( ) .expect("event is valid, we just created it"); + drop(mutex_lock); + let invite_room_state = db.rooms.calculate_invite_state(&pdu)?; let response = db .sending @@ -902,16 +966,6 @@ pub async fn invite_helper( ) .map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Origin field is invalid."))?; - let mutex = Arc::clone( - db.globals - .roomid_mutex - .write() - .unwrap() - .entry(room_id.clone()) - .or_default(), - ); - let mutex_lock = mutex.lock().await; - let pdu_id = server_server::handle_incoming_pdu( &origin, &event_id, @@ -932,7 +986,6 @@ pub async fn invite_helper( ErrorKind::InvalidParam, "Could not accept incoming PDU as timeline event.", ))?; - drop(mutex_lock); for server in db .rooms @@ -964,6 +1017,7 @@ pub async fn invite_helper( &sender_user, room_id, &db, + &mutex_lock, )?; Ok(()) diff --git a/src/client_server/message.rs b/src/client_server/message.rs index 7e898b11..3d8218c6 100644 --- a/src/client_server/message.rs +++ b/src/client_server/message.rs @@ -10,6 +10,7 @@ use ruma::{ use std::{ collections::BTreeMap, convert::{TryFrom, TryInto}, + sync::Arc, }; #[cfg(feature = "conduit_bin")] @@ -27,6 +28,16 @@ pub async fn send_message_event_route( let sender_user = body.sender_user.as_ref().expect("user is authenticated"); let sender_device = body.sender_device.as_deref(); + let mutex = Arc::clone( + db.globals + .roomid_mutex + .write() + .unwrap() + .entry(body.room_id.clone()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; + // Check if this is a new transaction id if let Some(response) = db.transaction_ids @@ -64,6 +75,7 @@ pub async fn send_message_event_route( &sender_user, &body.room_id, &db, + &mutex_lock, )?; db.transaction_ids.add_txnid( @@ -73,6 +85,8 @@ pub async fn send_message_event_route( event_id.as_bytes(), )?; + drop(mutex_lock); + db.flush().await?; Ok(send_message_event::Response::new(event_id).into()) diff --git a/src/client_server/profile.rs b/src/client_server/profile.rs index 5281a4a2..d947bbe1 100644 --- a/src/client_server/profile.rs +++ b/src/client_server/profile.rs @@ -9,7 +9,7 @@ use ruma::{ events::EventType, serde::Raw, }; -use std::convert::TryInto; +use std::{convert::TryInto, sync::Arc}; #[cfg(feature = "conduit_bin")] use rocket::{get, put}; @@ -69,9 +69,19 @@ pub async fn set_displayname_route( }) .filter_map(|r| r.ok()) { - let _ = db - .rooms - .build_and_append_pdu(pdu_builder, &sender_user, &room_id, &db); + let mutex = Arc::clone( + db.globals + .roomid_mutex + .write() + .unwrap() + .entry(room_id.clone()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; + + let _ = + db.rooms + .build_and_append_pdu(pdu_builder, &sender_user, &room_id, &db, &mutex_lock); // Presence update db.rooms.edus.update_presence( @@ -171,9 +181,19 @@ pub async fn set_avatar_url_route( }) .filter_map(|r| r.ok()) { - let _ = db - .rooms - .build_and_append_pdu(pdu_builder, &sender_user, &room_id, &db); + let mutex = Arc::clone( + db.globals + .roomid_mutex + .write() + .unwrap() + .entry(room_id.clone()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; + + let _ = + db.rooms + .build_and_append_pdu(pdu_builder, &sender_user, &room_id, &db, &mutex_lock); // Presence update db.rooms.edus.update_presence( diff --git a/src/client_server/redact.rs b/src/client_server/redact.rs index 3db27716..2e4c6519 100644 --- a/src/client_server/redact.rs +++ b/src/client_server/redact.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::{database::DatabaseGuard, pdu::PduBuilder, ConduitResult, Ruma}; use ruma::{ api::client::r0::redact::redact_event, @@ -18,6 +20,16 @@ pub async fn redact_event_route( ) -> ConduitResult { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); + let mutex = Arc::clone( + db.globals + .roomid_mutex + .write() + .unwrap() + .entry(body.room_id.clone()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; + let event_id = db.rooms.build_and_append_pdu( PduBuilder { event_type: EventType::RoomRedaction, @@ -32,8 +44,11 @@ pub async fn redact_event_route( &sender_user, &body.room_id, &db, + &mutex_lock, )?; + drop(mutex_lock); + db.flush().await?; Ok(redact_event::Response { event_id }.into()) diff --git a/src/client_server/room.rs b/src/client_server/room.rs index 43625fe5..f48c5e93 100644 --- a/src/client_server/room.rs +++ b/src/client_server/room.rs @@ -15,7 +15,7 @@ use ruma::{ serde::Raw, RoomAliasId, RoomId, RoomVersionId, }; -use std::{cmp::max, collections::BTreeMap, convert::TryFrom}; +use std::{cmp::max, collections::BTreeMap, convert::TryFrom, sync::Arc}; #[cfg(feature = "conduit_bin")] use rocket::{get, post}; @@ -33,6 +33,16 @@ pub async fn create_room_route( let room_id = RoomId::new(db.globals.server_name()); + let mutex = Arc::clone( + db.globals + .roomid_mutex + .write() + .unwrap() + .entry(room_id.clone()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; + let alias = body .room_alias_name .as_ref() @@ -69,6 +79,7 @@ pub async fn create_room_route( &sender_user, &room_id, &db, + &mutex_lock, )?; // 2. Let the room creator join @@ -90,6 +101,7 @@ pub async fn create_room_route( &sender_user, &room_id, &db, + &mutex_lock, )?; // 3. Power levels @@ -144,6 +156,7 @@ pub async fn create_room_route( &sender_user, &room_id, &db, + &mutex_lock, )?; // 4. Events set by preset @@ -170,6 +183,7 @@ pub async fn create_room_route( &sender_user, &room_id, &db, + &mutex_lock, )?; // 4.2 History Visibility @@ -187,6 +201,7 @@ pub async fn create_room_route( &sender_user, &room_id, &db, + &mutex_lock, )?; // 4.3 Guest Access @@ -212,6 +227,7 @@ pub async fn create_room_route( &sender_user, &room_id, &db, + &mutex_lock, )?; // 5. Events listed in initial_state @@ -227,7 +243,7 @@ pub async fn create_room_route( } db.rooms - .build_and_append_pdu(pdu_builder, &sender_user, &room_id, &db)?; + .build_and_append_pdu(pdu_builder, &sender_user, &room_id, &db, &mutex_lock)?; } // 6. Events implied by name and topic @@ -248,6 +264,7 @@ pub async fn create_room_route( &sender_user, &room_id, &db, + &mutex_lock, )?; } @@ -266,10 +283,12 @@ pub async fn create_room_route( &sender_user, &room_id, &db, + &mutex_lock, )?; } // 7. Events implied by invite (and TODO: invite_3pid) + drop(mutex_lock); for user_id in &body.invite { let _ = invite_helper(sender_user, user_id, &room_id, &db, body.is_direct).await; } @@ -340,6 +359,16 @@ pub async fn upgrade_room_route( // Create a replacement room let replacement_room = RoomId::new(db.globals.server_name()); + let mutex = Arc::clone( + db.globals + .roomid_mutex + .write() + .unwrap() + .entry(body.room_id.clone()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; + // Send a m.room.tombstone event to the old room to indicate that it is not intended to be used any further // Fail if the sender does not have the required permissions let tombstone_event_id = db.rooms.build_and_append_pdu( @@ -357,6 +386,7 @@ pub async fn upgrade_room_route( sender_user, &body.room_id, &db, + &mutex_lock, )?; // Get the old room federations status @@ -397,6 +427,7 @@ pub async fn upgrade_room_route( sender_user, &replacement_room, &db, + &mutex_lock, )?; // Join the new room @@ -418,6 +449,7 @@ pub async fn upgrade_room_route( sender_user, &replacement_room, &db, + &mutex_lock, )?; // Recommended transferable state events list from the specs @@ -451,6 +483,7 @@ pub async fn upgrade_room_route( sender_user, &replacement_room, &db, + &mutex_lock, )?; } @@ -494,8 +527,11 @@ pub async fn upgrade_room_route( sender_user, &body.room_id, &db, + &mutex_lock, )?; + drop(mutex_lock); + db.flush().await?; // Return the replacement room id diff --git a/src/client_server/state.rs b/src/client_server/state.rs index 68246d54..e0e5d29a 100644 --- a/src/client_server/state.rs +++ b/src/client_server/state.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::{ database::DatabaseGuard, pdu::PduBuilder, ConduitResult, Database, Error, Result, Ruma, }; @@ -257,6 +259,16 @@ pub async fn send_state_event_for_key_helper( } } + let mutex = Arc::clone( + db.globals + .roomid_mutex + .write() + .unwrap() + .entry(room_id.clone()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; + let event_id = db.rooms.build_and_append_pdu( PduBuilder { event_type, @@ -268,6 +280,7 @@ pub async fn send_state_event_for_key_helper( &sender_user, &room_id, &db, + &mutex_lock, )?; Ok(event_id) diff --git a/src/client_server/sync.rs b/src/client_server/sync.rs index 092c4a9b..fe113048 100644 --- a/src/client_server/sync.rs +++ b/src/client_server/sync.rs @@ -189,6 +189,18 @@ async fn sync_helper( for room_id in db.rooms.rooms_joined(&sender_user) { let room_id = room_id?; + // Get and drop the lock to wait for remaining operations to finish + let mutex = Arc::clone( + db.globals + .roomid_mutex + .write() + .unwrap() + .entry(room_id.clone()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; + drop(mutex_lock); + let mut non_timeline_pdus = db .rooms .pdus_until(&sender_user, &room_id, u64::MAX) @@ -641,6 +653,19 @@ async fn sync_helper( let mut left_rooms = BTreeMap::new(); for result in db.rooms.rooms_left(&sender_user) { let (room_id, left_state_events) = result?; + + // Get and drop the lock to wait for remaining operations to finish + let mutex = Arc::clone( + db.globals + .roomid_mutex + .write() + .unwrap() + .entry(room_id.clone()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; + drop(mutex_lock); + let left_count = db.rooms.get_left_count(&room_id, &sender_user)?; // Left before last sync @@ -667,6 +692,19 @@ async fn sync_helper( let mut invited_rooms = BTreeMap::new(); for result in db.rooms.rooms_invited(&sender_user) { let (room_id, invite_state_events) = result?; + + // Get and drop the lock to wait for remaining operations to finish + let mutex = Arc::clone( + db.globals + .roomid_mutex + .write() + .unwrap() + .entry(room_id.clone()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; + drop(mutex_lock); + let invite_count = db.rooms.get_invite_count(&room_id, &sender_user)?; // Invited before last sync diff --git a/src/database/admin.rs b/src/database/admin.rs index cd5fa847..d8b7ae5e 100644 --- a/src/database/admin.rs +++ b/src/database/admin.rs @@ -10,7 +10,7 @@ use ruma::{ events::{room::message, EventType}, UserId, }; -use tokio::sync::{RwLock, RwLockReadGuard}; +use tokio::sync::{MutexGuard, RwLock, RwLockReadGuard}; pub enum AdminCommand { RegisterAppservice(serde_yaml::Value), @@ -48,38 +48,51 @@ impl Admin { ) .unwrap(); - if conduit_room.is_none() { - warn!("Conduit instance does not have an #admins room. Logging to that room will not work. Restart Conduit after creating a user to fix this."); - } + let conduit_room = match conduit_room { + None => { + warn!("Conduit instance does not have an #admins room. Logging to that room will not work. Restart Conduit after creating a user to fix this."); + return; + } + Some(r) => r, + }; drop(guard); - let send_message = - |message: message::MessageEventContent, guard: RwLockReadGuard<'_, Database>| { - if let Some(conduit_room) = &conduit_room { - guard - .rooms - .build_and_append_pdu( - PduBuilder { - event_type: EventType::RoomMessage, - content: serde_json::to_value(message) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: None, - redacts: None, - }, - &conduit_user, - &conduit_room, - &guard, - ) - .unwrap(); - } - }; + let send_message = |message: message::MessageEventContent, + guard: RwLockReadGuard<'_, Database>, + mutex_lock: &MutexGuard<'_, ()>| { + guard + .rooms + .build_and_append_pdu( + PduBuilder { + event_type: EventType::RoomMessage, + content: serde_json::to_value(message) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: None, + redacts: None, + }, + &conduit_user, + &conduit_room, + &guard, + mutex_lock, + ) + .unwrap(); + }; loop { tokio::select! { Some(event) = receiver.next() => { let guard = db.read().await; + let mutex = Arc::clone( + guard.globals + .roomid_mutex + .write() + .unwrap() + .entry(conduit_room.clone()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; match event { AdminCommand::RegisterAppservice(yaml) => { @@ -93,15 +106,17 @@ impl Admin { count, appservices.into_iter().filter_map(|r| r.ok()).collect::>().join(", ") ); - send_message(message::MessageEventContent::text_plain(output), guard); + send_message(message::MessageEventContent::text_plain(output), guard, &mutex_lock); } else { - send_message(message::MessageEventContent::text_plain("Failed to get appservices."), guard); + send_message(message::MessageEventContent::text_plain("Failed to get appservices."), guard, &mutex_lock); } } AdminCommand::SendMessage(message) => { - send_message(message, guard) + send_message(message, guard, &mutex_lock); } } + + drop(mutex_lock); } } } diff --git a/src/database/globals.rs b/src/database/globals.rs index 5d75e86f..c7e96ff6 100644 --- a/src/database/globals.rs +++ b/src/database/globals.rs @@ -40,6 +40,7 @@ pub struct Globals { pub bad_signature_ratelimiter: Arc, RateLimitState>>>, pub servername_ratelimiter: Arc, Arc>>>, pub roomid_mutex: RwLock>>>, + pub roomid_mutex_federation: RwLock>>>, // this lock will be held longer pub sync_receivers: RwLock< BTreeMap< (UserId, Box), @@ -193,6 +194,7 @@ impl Globals { bad_signature_ratelimiter: Arc::new(RwLock::new(BTreeMap::new())), servername_ratelimiter: Arc::new(RwLock::new(BTreeMap::new())), roomid_mutex: RwLock::new(BTreeMap::new()), + roomid_mutex_federation: RwLock::new(BTreeMap::new()), sync_receivers: RwLock::new(BTreeMap::new()), rotate: RotationHandler::new(), }; diff --git a/src/database/rooms.rs b/src/database/rooms.rs index 5f188eea..22b4b4a3 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -2,6 +2,7 @@ mod edus; pub use edus::RoomEdus; use member::MembershipState; +use tokio::sync::MutexGuard; use crate::{pdu::PduBuilder, utils, Database, Error, PduEvent, Result}; use log::{debug, error, warn}; @@ -1209,6 +1210,7 @@ impl Rooms { sender: &UserId, room_id: &RoomId, db: &Database, + _mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room mutex ) -> Result { let PduBuilder { event_type, @@ -1218,7 +1220,6 @@ impl Rooms { redacts, } = pdu_builder; - // TODO: Make sure this isn't called twice in parallel let prev_events = self .get_pdu_leaves(&room_id)? .into_iter() @@ -1792,6 +1793,16 @@ impl Rooms { db, )?; } else { + let mutex = Arc::clone( + db.globals + .roomid_mutex + .write() + .unwrap() + .entry(room_id.clone()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; + let mut event = serde_json::from_value::>( self.room_state_get(room_id, &EventType::RoomMember, &user_id.to_string())? .ok_or(Error::BadRequest( @@ -1819,6 +1830,7 @@ impl Rooms { user_id, room_id, db, + &mutex_lock, )?; } diff --git a/src/server_server.rs b/src/server_server.rs index 9ef70b44..573299d5 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -640,7 +640,7 @@ pub async fn send_transaction_message_route( let mutex = Arc::clone( db.globals - .roomid_mutex + .roomid_mutex_federation .write() .unwrap() .entry(room_id.clone()) @@ -1309,11 +1309,13 @@ pub fn handle_incoming_pdu<'a>( pdu_id = Some( append_incoming_pdu( &db, + &room_id, &incoming_pdu, val, extremities, &state_at_incoming_event, ) + .await .map_err(|_| "Failed to add pdu to db.".to_owned())?, ); debug!("Appended incoming pdu."); @@ -1612,13 +1614,24 @@ pub(crate) async fn fetch_signing_keys( /// Append the incoming event setting the state snapshot to the state from the /// server that sent the event. #[tracing::instrument(skip(db))] -pub(crate) fn append_incoming_pdu( +async fn append_incoming_pdu( db: &Database, + room_id: &RoomId, pdu: &PduEvent, pdu_json: CanonicalJsonObject, new_room_leaves: HashSet, state: &StateMap>, ) -> Result> { + let mutex = Arc::clone( + db.globals + .roomid_mutex + .write() + .unwrap() + .entry(room_id.clone()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; + // We append to state before appending the pdu, so we don't have a moment in time with the // pdu without it's state. This is okay because append_pdu can't fail. db.rooms @@ -1631,6 +1644,8 @@ pub(crate) fn append_incoming_pdu( &db, )?; + drop(mutex_lock); + for appservice in db.appservice.iter_all()?.filter_map(|r| r.ok()) { if let Some(namespaces) = appservice.1.get("namespaces") { let users = namespaces @@ -2146,7 +2161,7 @@ pub async fn create_join_event_route( let mutex = Arc::clone( db.globals - .roomid_mutex + .roomid_mutex_federation .write() .unwrap() .entry(body.room_id.clone())