From 08bf074cbbb7f2a50fcd4675b5e1e9b267e347a6 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Fri, 14 Jun 2024 22:08:44 +0000 Subject: [PATCH] reduce roomid_mutex_state Signed-off-by: Jason Volk --- src/admin/debug/debug_commands.rs | 12 +-- src/api/client/membership.rs | 97 ++++++------------------- src/api/client/message.rs | 20 ++--- src/api/client/profile.rs | 14 +--- src/api/client/redact.rs | 17 ++--- src/api/client/room.rs | 49 ++++--------- src/api/client/state.rs | 13 +--- src/api/server/make_join.rs | 18 ++--- src/api/server/make_leave.rs | 19 ++--- src/service/admin/create.rs | 15 +--- src/service/admin/grant.rs | 13 +--- src/service/admin/mod.rs | 21 +----- src/service/globals/mod.rs | 4 +- src/service/rooms/event_handler/mod.rs | 12 +-- src/service/rooms/state/data.rs | 10 +-- src/service/rooms/state/mod.rs | 10 +-- src/service/rooms/state_accessor/mod.rs | 4 +- src/service/rooms/timeline/mod.rs | 14 ++-- 18 files changed, 93 insertions(+), 269 deletions(-) diff --git a/src/admin/debug/debug_commands.rs b/src/admin/debug/debug_commands.rs index 0316abc2..0b9ff62c 100644 --- a/src/admin/debug/debug_commands.rs +++ b/src/admin/debug/debug_commands.rs @@ -595,17 +595,7 @@ pub(crate) async fn force_set_room_state_from_server( .state_compressor .save_state(room_id.clone().as_ref(), new_room_state)?; - let mutex_state = Arc::clone( - services() - .globals - .roomid_mutex_state - .write() - .await - .entry(room_id.clone().into()) - .or_default(), - ); - let state_lock = mutex_state.lock().await; - + let state_lock = services().globals.roomid_mutex_state.lock(&room_id).await; services() .rooms .state diff --git a/src/api/client/membership.rs b/src/api/client/membership.rs index f781808c..111d03c4 100644 --- a/src/api/client/membership.rs +++ b/src/api/client/membership.rs @@ -7,6 +7,7 @@ use std::{ }; use axum_client_ip::InsecureClientIp; +use conduit::utils::mutex_map; use ruma::{ api::{ client::{ @@ -32,7 +33,7 @@ use ruma::{ OwnedUserId, RoomId, RoomVersionId, ServerName, UserId, }; use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; -use tokio::sync::{MutexGuard, RwLock}; +use tokio::sync::RwLock; use tracing::{debug, error, info, trace, warn}; use super::get_alias_helper; @@ -373,16 +374,11 @@ pub(crate) async fn kick_user_route(body: Ruma) -> Resul event.membership = MembershipState::Leave; event.reason.clone_from(&body.reason); - let mutex_state = Arc::clone( - services() - .globals - .roomid_mutex_state - .write() - .await - .entry(body.room_id.clone()) - .or_default(), - ); - let state_lock = mutex_state.lock().await; + let state_lock = services() + .globals + .roomid_mutex_state + .lock(&body.room_id) + .await; services() .rooms @@ -442,16 +438,11 @@ pub(crate) async fn ban_user_route(body: Ruma) -> Result< }, )?; - let mutex_state = Arc::clone( - services() - .globals - .roomid_mutex_state - .write() - .await - .entry(body.room_id.clone()) - .or_default(), - ); - let state_lock = mutex_state.lock().await; + let state_lock = services() + .globals + .roomid_mutex_state + .lock(&body.room_id) + .await; services() .rooms @@ -496,16 +487,11 @@ pub(crate) async fn unban_user_route(body: Ruma) -> Res event.reason.clone_from(&body.reason); event.join_authorized_via_users_server = None; - let mutex_state = Arc::clone( - services() - .globals - .roomid_mutex_state - .write() - .await - .entry(body.room_id.clone()) - .or_default(), - ); - let state_lock = mutex_state.lock().await; + let state_lock = services() + .globals + .roomid_mutex_state + .lock(&body.room_id) + .await; services() .rooms @@ -670,16 +656,7 @@ pub async fn join_room_by_id_helper( }); } - let mutex_state = Arc::clone( - services() - .globals - .roomid_mutex_state - .write() - .await - .entry(room_id.to_owned()) - .or_default(), - ); - let state_lock = mutex_state.lock().await; + let state_lock = services().globals.roomid_mutex_state.lock(room_id).await; // Ask a remote server if we are not participating in this room if !services() @@ -695,7 +672,7 @@ pub async fn join_room_by_id_helper( async fn join_room_by_id_helper_remote( sender_user: &UserId, room_id: &RoomId, reason: Option, servers: &[OwnedServerName], - _third_party_signed: Option<&ThirdPartySigned>, state_lock: MutexGuard<'_, ()>, + _third_party_signed: Option<&ThirdPartySigned>, state_lock: mutex_map::Guard<()>, ) -> Result { info!("Joining {room_id} over federation."); @@ -1030,7 +1007,7 @@ async fn join_room_by_id_helper_remote( async fn join_room_by_id_helper_local( sender_user: &UserId, room_id: &RoomId, reason: Option, servers: &[OwnedServerName], - _third_party_signed: Option<&ThirdPartySigned>, state_lock: MutexGuard<'_, ()>, + _third_party_signed: Option<&ThirdPartySigned>, state_lock: mutex_map::Guard<()>, ) -> Result { info!("We can join locally"); @@ -1413,17 +1390,7 @@ pub(crate) async fn invite_helper( if !user_is_local(user_id) { let (pdu, pdu_json, invite_room_state) = { - let mutex_state = Arc::clone( - services() - .globals - .roomid_mutex_state - .write() - .await - .entry(room_id.to_owned()) - .or_default(), - ); - let state_lock = mutex_state.lock().await; - + let state_lock = services().globals.roomid_mutex_state.lock(room_id).await; let content = to_raw_value(&RoomMemberEventContent { avatar_url: services().users.avatar_url(user_id)?, displayname: None, @@ -1535,16 +1502,7 @@ pub(crate) async fn invite_helper( )); } - let mutex_state = Arc::clone( - services() - .globals - .roomid_mutex_state - .write() - .await - .entry(room_id.to_owned()) - .or_default(), - ); - let state_lock = mutex_state.lock().await; + let state_lock = services().globals.roomid_mutex_state.lock(room_id).await; services() .rooms @@ -1638,16 +1596,7 @@ pub async fn leave_room(user_id: &UserId, room_id: &RoomId, reason: Option, user_id: OwnedUserId) { for (pdu_builder, room_id) in all_joined_rooms { - let mutex_state = Arc::clone( - services() - .globals - .roomid_mutex_state - .write() - .await - .entry(room_id.clone()) - .or_default(), - ); - let state_lock = mutex_state.lock().await; - + let state_lock = services().globals.roomid_mutex_state.lock(room_id).await; if let Err(e) = services() .rooms .timeline diff --git a/src/api/client/redact.rs b/src/api/client/redact.rs index d92ef002..4cb24c33 100644 --- a/src/api/client/redact.rs +++ b/src/api/client/redact.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use ruma::{ api::client::redact::redact_event, events::{room::redaction::RoomRedactionEventContent, TimelineEventType}, @@ -17,16 +15,11 @@ pub(crate) async fn redact_event_route(body: Ruma) -> let sender_user = body.sender_user.as_ref().expect("user is authenticated"); let body = body.body; - let mutex_state = Arc::clone( - services() - .globals - .roomid_mutex_state - .write() - .await - .entry(body.room_id.clone()) - .or_default(), - ); - let state_lock = mutex_state.lock().await; + let state_lock = services() + .globals + .roomid_mutex_state + .lock(&body.room_id) + .await; let event_id = services() .rooms diff --git a/src/api/client/room.rs b/src/api/client/room.rs index 14071d39..7090fdc8 100644 --- a/src/api/client/room.rs +++ b/src/api/client/room.rs @@ -1,4 +1,4 @@ -use std::{cmp::max, collections::BTreeMap, sync::Arc}; +use std::{cmp::max, collections::BTreeMap}; use conduit::{debug_info, debug_warn}; use ruma::{ @@ -89,18 +89,8 @@ pub(crate) async fn create_room_route(body: Ruma) -> R )); } - services().rooms.short.get_or_create_shortroomid(&room_id)?; - - let mutex_state = Arc::clone( - services() - .globals - .roomid_mutex_state - .write() - .await - .entry(room_id.clone()) - .or_default(), - ); - let state_lock = mutex_state.lock().await; + let _short_id = services().rooms.short.get_or_create_shortroomid(&room_id)?; + let state_lock = services().globals.roomid_mutex_state.lock(&room_id).await; let alias: Option = if let Some(alias) = &body.room_alias_name { Some(room_alias_check(alias, &body.appservice_info).await?) @@ -577,21 +567,17 @@ pub(crate) async fn upgrade_room_route(body: Ruma) -> // Create a replacement room let replacement_room = RoomId::new(services().globals.server_name()); - services() + + let _short_id = services() .rooms .short .get_or_create_shortroomid(&replacement_room)?; - let mutex_state = Arc::clone( - services() - .globals - .roomid_mutex_state - .write() - .await - .entry(body.room_id.clone()) - .or_default(), - ); - let state_lock = mutex_state.lock().await; + let state_lock = services() + .globals + .roomid_mutex_state + .lock(&body.room_id) + .await; // Send a m.room.tombstone event to the old room to indicate that it is not // intended to be used any further Fail if the sender does not have the required @@ -619,16 +605,11 @@ pub(crate) async fn upgrade_room_route(body: Ruma) -> // Change lock to replacement room drop(state_lock); - let mutex_state = Arc::clone( - services() - .globals - .roomid_mutex_state - .write() - .await - .entry(replacement_room.clone()) - .or_default(), - ); - let state_lock = mutex_state.lock().await; + let state_lock = services() + .globals + .roomid_mutex_state + .lock(&replacement_room) + .await; // Get the old room creation event let mut create_event_content = serde_json::from_str::( diff --git a/src/api/client/state.rs b/src/api/client/state.rs index 32e0fb68..17ae7be4 100644 --- a/src/api/client/state.rs +++ b/src/api/client/state.rs @@ -172,18 +172,7 @@ async fn send_state_event_for_key_helper( sender: &UserId, room_id: &RoomId, event_type: &StateEventType, json: &Raw, state_key: String, ) -> Result> { allowed_to_send_state_event(room_id, event_type, json).await?; - - let mutex_state = Arc::clone( - services() - .globals - .roomid_mutex_state - .write() - .await - .entry(room_id.to_owned()) - .or_default(), - ); - let state_lock = mutex_state.lock().await; - + let state_lock = services().globals.roomid_mutex_state.lock(room_id).await; let event_id = services() .rooms .timeline diff --git a/src/api/server/make_join.rs b/src/api/server/make_join.rs index 0a6b1992..d3d934f7 100644 --- a/src/api/server/make_join.rs +++ b/src/api/server/make_join.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use ruma::{ api::{client::error::ErrorKind, federation::membership::prepare_join_event}, events::{ @@ -74,17 +72,11 @@ pub(crate) async fn create_join_event_template_route( } } - let mutex_state = Arc::clone( - services() - .globals - .roomid_mutex_state - .write() - .await - .entry(body.room_id.clone()) - .or_default(), - ); - let state_lock = mutex_state.lock().await; - + let state_lock = services() + .globals + .roomid_mutex_state + .lock(&body.room_id) + .await; let join_rules_event = services() .rooms diff --git a/src/api/server/make_leave.rs b/src/api/server/make_leave.rs index 72c931c5..62c09717 100644 --- a/src/api/server/make_leave.rs +++ b/src/api/server/make_leave.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use ruma::{ api::{client::error::ErrorKind, federation::membership::prepare_leave_event}, events::{ @@ -37,18 +35,11 @@ pub(crate) async fn create_leave_event_template_route( .acl_check(origin, &body.room_id)?; let room_version_id = services().rooms.state.get_room_version(&body.room_id)?; - - let mutex_state = Arc::clone( - services() - .globals - .roomid_mutex_state - .write() - .await - .entry(body.room_id.clone()) - .or_default(), - ); - let state_lock = mutex_state.lock().await; - + let state_lock = services() + .globals + .roomid_mutex_state + .lock(&body.room_id) + .await; let content = to_raw_value(&RoomMemberEventContent { avatar_url: None, blurhash: None, diff --git a/src/service/admin/create.rs b/src/service/admin/create.rs index c65bb6fa..ad70fe0c 100644 --- a/src/service/admin/create.rs +++ b/src/service/admin/create.rs @@ -1,4 +1,4 @@ -use std::{collections::BTreeMap, sync::Arc}; +use std::collections::BTreeMap; use conduit::{Error, Result}; use ruma::{ @@ -32,18 +32,9 @@ use crate::{pdu::PduBuilder, services}; pub async fn create_admin_room() -> Result<()> { let room_id = RoomId::new(services().globals.server_name()); - services().rooms.short.get_or_create_shortroomid(&room_id)?; + let _short_id = services().rooms.short.get_or_create_shortroomid(&room_id)?; - let mutex_state = Arc::clone( - services() - .globals - .roomid_mutex_state - .write() - .await - .entry(room_id.clone()) - .or_default(), - ); - let state_lock = mutex_state.lock().await; + let state_lock = services().globals.roomid_mutex_state.lock(&room_id).await; // Create a user for the server let server_user = &services().globals.server_user; diff --git a/src/service/admin/grant.rs b/src/service/admin/grant.rs index 601bc486..ca48ce0d 100644 --- a/src/service/admin/grant.rs +++ b/src/service/admin/grant.rs @@ -1,4 +1,4 @@ -use std::{collections::BTreeMap, sync::Arc}; +use std::collections::BTreeMap; use conduit::Result; use ruma::{ @@ -22,16 +22,7 @@ use crate::{pdu::PduBuilder, services}; /// In conduit, this is equivalent to granting admin privileges. pub async fn make_user_admin(user_id: &UserId, displayname: String) -> Result<()> { if let Some(room_id) = Service::get_admin_room()? { - let mutex_state = Arc::clone( - services() - .globals - .roomid_mutex_state - .write() - .await - .entry(room_id.clone()) - .or_default(), - ); - let state_lock = mutex_state.lock().await; + let state_lock = services().globals.roomid_mutex_state.lock(&room_id).await; // Use the server user to grant the new admin's power level let server_user = &services().globals.server_user; diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index aa4c2c21..f5f818d3 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -4,7 +4,7 @@ mod grant; use std::{future::Future, pin::Pin, sync::Arc}; -use conduit::{Error, Result}; +use conduit::{utils::mutex_map, Error, Result}; pub use create::create_admin_room; pub use grant::make_user_admin; use ruma::{ @@ -15,10 +15,7 @@ use ruma::{ EventId, OwnedRoomId, RoomId, UserId, }; use serde_json::value::to_raw_value; -use tokio::{ - sync::{Mutex, MutexGuard}, - task::JoinHandle, -}; +use tokio::{sync::Mutex, task::JoinHandle}; use tracing::error; use crate::{pdu::PduBuilder, services, PduEvent}; @@ -218,17 +215,7 @@ async fn respond_to_room(content: &RoomMessageEventContent, room_id: &RoomId, us "sender is not admin" ); - let mutex_state = Arc::clone( - services() - .globals - .roomid_mutex_state - .write() - .await - .entry(room_id.to_owned()) - .or_default(), - ); - let state_lock = mutex_state.lock().await; - + let state_lock = services().globals.roomid_mutex_state.lock(room_id).await; let response_pdu = PduBuilder { event_type: TimelineEventType::RoomMessage, content: to_raw_value(content).expect("event is valid, we just created it"), @@ -250,7 +237,7 @@ async fn respond_to_room(content: &RoomMessageEventContent, room_id: &RoomId, us } async fn handle_response_error( - e: &Error, room_id: &RoomId, user_id: &UserId, state_lock: &MutexGuard<'_, ()>, + e: &Error, room_id: &RoomId, user_id: &UserId, state_lock: &mutex_map::Guard<()>, ) -> Result<()> { error!("Failed to build and append admin room response PDU: \"{e}\""); let error_room_message = RoomMessageEventContent::text_plain(format!( diff --git a/src/service/globals/mod.rs b/src/service/globals/mod.rs index 716735df..626b2779 100644 --- a/src/service/globals/mod.rs +++ b/src/service/globals/mod.rs @@ -55,7 +55,7 @@ pub struct Service { pub bad_signature_ratelimiter: Arc, RateLimitState>>>, pub bad_query_ratelimiter: Arc>>, pub roomid_mutex_insert: MutexMap, - pub roomid_mutex_state: RwLock>>>, + pub roomid_mutex_state: MutexMap, pub roomid_mutex_federation: MutexMap, pub roomid_federationhandletime: RwLock>, pub updates_handle: Mutex>>, @@ -116,7 +116,7 @@ impl Service { bad_event_ratelimiter: Arc::new(RwLock::new(HashMap::new())), bad_signature_ratelimiter: Arc::new(RwLock::new(HashMap::new())), bad_query_ratelimiter: Arc::new(RwLock::new(HashMap::new())), - roomid_mutex_state: RwLock::new(HashMap::new()), + roomid_mutex_state: MutexMap::::new(), roomid_mutex_insert: MutexMap::::new(), roomid_mutex_federation: MutexMap::::new(), roomid_federationhandletime: RwLock::new(HashMap::new()), diff --git a/src/service/rooms/event_handler/mod.rs b/src/service/rooms/event_handler/mod.rs index d1413cb5..e2cd5066 100644 --- a/src/service/rooms/event_handler/mod.rs +++ b/src/service/rooms/event_handler/mod.rs @@ -530,18 +530,8 @@ impl Service { // 13. Use state resolution to find new room state // We start looking at current room state now, so lets lock the room - let mutex_state = Arc::clone( - services() - .globals - .roomid_mutex_state - .write() - .await - .entry(room_id.to_owned()) - .or_default(), - ); - trace!("Locking the room"); - let state_lock = mutex_state.lock().await; + let state_lock = services().globals.roomid_mutex_state.lock(room_id).await; // Now we calculate the set of extremities this room has after the incoming // event has been applied. We start with the previous extremities (aka leaves) diff --git a/src/service/rooms/state/data.rs b/src/service/rooms/state/data.rs index f0fef086..f8c7f6a6 100644 --- a/src/service/rooms/state/data.rs +++ b/src/service/rooms/state/data.rs @@ -1,7 +1,7 @@ use std::{collections::HashSet, sync::Arc}; +use conduit::utils::mutex_map; use ruma::{EventId, OwnedEventId, RoomId}; -use tokio::sync::MutexGuard; use crate::{utils, Error, KeyValueDatabase, Result}; @@ -14,7 +14,7 @@ pub trait Data: Send + Sync { &self, room_id: &RoomId, new_shortstatehash: u64, - _mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex + _mutex_lock: &mutex_map::Guard<()>, // Take mutex guard to make sure users get the room state mutex ) -> Result<()>; /// Associates a state with an event. @@ -28,7 +28,7 @@ pub trait Data: Send + Sync { &self, room_id: &RoomId, event_ids: Vec, - _mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex + _mutex_lock: &mutex_map::Guard<()>, // Take mutex guard to make sure users get the room state mutex ) -> Result<()>; } @@ -47,7 +47,7 @@ impl Data for KeyValueDatabase { &self, room_id: &RoomId, new_shortstatehash: u64, - _mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex + _mutex_lock: &mutex_map::Guard<()>, // Take mutex guard to make sure users get the room state mutex ) -> Result<()> { self.roomid_shortstatehash .insert(room_id.as_bytes(), &new_shortstatehash.to_be_bytes())?; @@ -80,7 +80,7 @@ impl Data for KeyValueDatabase { &self, room_id: &RoomId, event_ids: Vec, - _mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex + _mutex_lock: &mutex_map::Guard<()>, // Take mutex guard to make sure users get the room state mutex ) -> Result<()> { let mut prefix = room_id.as_bytes().to_vec(); prefix.push(0xFF); diff --git a/src/service/rooms/state/mod.rs b/src/service/rooms/state/mod.rs index 8a32d9ca..199ee87f 100644 --- a/src/service/rooms/state/mod.rs +++ b/src/service/rooms/state/mod.rs @@ -4,6 +4,7 @@ use std::{ sync::Arc, }; +use conduit::utils::mutex_map; use data::Data; use ruma::{ api::client::error::ErrorKind, @@ -15,7 +16,6 @@ use ruma::{ state_res::{self, StateMap}, EventId, OwnedEventId, RoomId, RoomVersionId, UserId, }; -use tokio::sync::MutexGuard; use tracing::warn; use super::state_compressor::CompressedStateEvent; @@ -33,7 +33,7 @@ impl Service { shortstatehash: u64, statediffnew: Arc>, _statediffremoved: Arc>, - state_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex + state_lock: &mutex_map::Guard<()>, // Take mutex guard to make sure users get the room state mutex ) -> Result<()> { for event_id in statediffnew.iter().filter_map(|new| { services() @@ -299,12 +299,12 @@ impl Service { } /// Set the state hash to a new version, but does not update state_cache. - #[tracing::instrument(skip(self))] + #[tracing::instrument(skip(self, mutex_lock))] pub fn set_room_state( &self, room_id: &RoomId, shortstatehash: u64, - mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex + mutex_lock: &mutex_map::Guard<()>, // Take mutex guard to make sure users get the room state mutex ) -> Result<()> { self.db.set_room_state(room_id, shortstatehash, mutex_lock) } @@ -343,7 +343,7 @@ impl Service { &self, room_id: &RoomId, event_ids: Vec, - state_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex + state_lock: &mutex_map::Guard<()>, // Take mutex guard to make sure users get the room state mutex ) -> Result<()> { self.db .set_forward_extremities(room_id, event_ids, state_lock) diff --git a/src/service/rooms/state_accessor/mod.rs b/src/service/rooms/state_accessor/mod.rs index d3dc92ef..ab290cbd 100644 --- a/src/service/rooms/state_accessor/mod.rs +++ b/src/service/rooms/state_accessor/mod.rs @@ -4,6 +4,7 @@ use std::{ sync::{Arc, Mutex}, }; +use conduit::utils::mutex_map; use data::Data; use lru_cache::LruCache; use ruma::{ @@ -22,7 +23,6 @@ use ruma::{ EventId, OwnedRoomAliasId, OwnedServerName, OwnedUserId, RoomId, ServerName, UserId, }; use serde_json::value::to_raw_value; -use tokio::sync::MutexGuard; use tracing::{error, warn}; use crate::{service::pdu::PduBuilder, services, Error, PduEvent, Result}; @@ -285,7 +285,7 @@ impl Service { } pub async fn user_can_invite( - &self, room_id: &RoomId, sender: &UserId, target_user: &UserId, state_lock: &MutexGuard<'_, ()>, + &self, room_id: &RoomId, sender: &UserId, target_user: &UserId, state_lock: &mutex_map::Guard<()>, ) -> Result { let content = to_raw_value(&RoomMemberEventContent::new(MembershipState::Invite)) .expect("Event content always serializes"); diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index 35c1ffbb..e8774cdc 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -30,7 +30,7 @@ use ruma::{ }; use serde::Deserialize; use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; -use tokio::sync::{Mutex, MutexGuard, RwLock}; +use tokio::sync::{Mutex, RwLock}; use tracing::{debug, error, info, warn}; use super::state_compressor::CompressedStateEvent; @@ -44,7 +44,7 @@ use crate::{ rooms::event_handler::parse_incoming_pdu, }, services, - utils::{self}, + utils::{self, mutex_map}, Error, PduCount, PduEvent, @@ -200,13 +200,13 @@ impl Service { /// happens in `append_pdu`. /// /// Returns pdu id - #[tracing::instrument(skip(self, pdu, pdu_json, leaves))] + #[tracing::instrument(skip_all)] pub async fn append_pdu( &self, pdu: &PduEvent, mut pdu_json: CanonicalJsonObject, leaves: Vec, - state_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex + state_lock: &mutex_map::Guard<()>, // Take mutex guard to make sure users get the room state mutex ) -> Result> { // Coalesce database writes for the remainder of this scope. let _cork = services().globals.db.cork_and_flush(); @@ -581,7 +581,7 @@ impl Service { pdu_builder: PduBuilder, sender: &UserId, room_id: &RoomId, - _mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex + _mutex_lock: &mutex_map::Guard<()>, // Take mutex guard to make sure users get the room state mutex ) -> Result<(PduEvent, CanonicalJsonObject)> { let PduBuilder { event_type, @@ -768,7 +768,7 @@ impl Service { pdu_builder: PduBuilder, sender: &UserId, room_id: &RoomId, - state_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex + state_lock: &mutex_map::Guard<()>, // Take mutex guard to make sure users get the room state mutex ) -> Result> { let (pdu, pdu_json) = self.create_hash_and_sign_event(pdu_builder, sender, room_id, state_lock)?; if let Some(admin_room) = admin::Service::get_admin_room()? { @@ -909,7 +909,7 @@ impl Service { new_room_leaves: Vec, state_ids_compressed: Arc>, soft_fail: bool, - state_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex + state_lock: &mutex_map::Guard<()>, // Take mutex guard to make sure users get the room state mutex ) -> Result>> { // We append to state before appending the pdu, so we don't have a moment in // time with the pdu without it's state. This is okay because append_pdu can't