From 56f652c12d4eac4ab4375e370aeb3f8261ba7747 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Sat, 27 Apr 2024 05:54:04 -0700 Subject: [PATCH] cleanup admin worker loop Signed-off-by: Jason Volk --- src/service/admin/mod.rs | 219 +++++++++++++++++++++------------------ 1 file changed, 116 insertions(+), 103 deletions(-) diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index 3797af65..64ca48c4 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -23,7 +23,7 @@ use ruma::{ EventId, MxcUri, OwnedRoomAliasId, OwnedRoomId, RoomAliasId, RoomId, RoomVersionId, ServerName, UserId, }; use serde_json::value::to_raw_value; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, MutexGuard}; use tracing::{error, warn}; use self::fsck::FsckCommand; @@ -119,100 +119,113 @@ impl Service { }); } + pub(crate) async fn process_message(&self, room_message: String, event_id: Arc) { + self.send(AdminRoomEvent::ProcessMessage(room_message, event_id)) + .await; + } + + pub(crate) async fn send_message(&self, message_content: RoomMessageEventContent) { + self.send(AdminRoomEvent::SendMessage(message_content)) + .await; + } + + async fn send(&self, message: AdminRoomEvent) { + debug_assert!(!self.sender.is_full(), "channel full"); + debug_assert!(!self.sender.is_closed(), "channel closed"); + self.sender.send(message).expect("message sent"); + } + async fn handler(&self) -> Result<()> { let receiver = self.receiver.lock().await; - // TODO: Use futures when we have long admin commands - //let mut futures = FuturesUnordered::new(); + let Ok(Some(admin_room)) = Self::get_admin_room().await else { + return Ok(()); + }; + let server_name = services().globals.server_name(); + let server_user = UserId::parse(format!("@conduit:{server_name}")).expect("server's username is valid"); - let conduit_user = UserId::parse(format!("@conduit:{}", services().globals.server_name())) - .expect("@conduit:server_name is valid"); - - if let Ok(Some(conduit_room)) = Self::get_admin_room().await { - loop { - tokio::select! { - event = receiver.recv_async() => { - match event { - Ok(event) => { - let (mut message_content, reply) = match event { - AdminRoomEvent::SendMessage(content) => (content, None), - AdminRoomEvent::ProcessMessage(room_message, reply_id) => { - (self.process_admin_message(room_message).await, Some(reply_id)) - } - }; - - let mutex_state = Arc::clone( - services().globals - .roomid_mutex_state - .write() - .await - .entry(conduit_room.clone()) - .or_default(), - ); - - let state_lock = mutex_state.lock().await; - - if let Some(reply) = reply { - message_content.relates_to = Some(Reply { in_reply_to: InReplyTo { event_id: reply.into() } }); - } - - if let Err(e) = services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: TimelineEventType::RoomMessage, - content: to_raw_value(&message_content) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: None, - redacts: None, - }, - &conduit_user, - &conduit_room, - &state_lock) - .await { - error!("Failed to build and append admin room response PDU: \"{e}\""); - - let error_room_message = RoomMessageEventContent::text_plain(format!("Failed to build and append admin room PDU: \"{e}\"\n\nThe original admin command may have finished successfully, but we could not return the output.")); - - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: TimelineEventType::RoomMessage, - content: to_raw_value(&error_room_message) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: None, - redacts: None, - }, - &conduit_user, - &conduit_room, - &state_lock) - .await?; - } - drop(state_lock); - } - Err(e) => { - // generally shouldn't happen - error!("Failed to receive admin room event from channel: {e}"); - } - } - } + loop { + debug_assert!(!receiver.is_closed(), "channel closed"); + tokio::select! { + event = receiver.recv_async() => match event { + Ok(event) => self.handle_event(event, &admin_room, &server_user).await?, + Err(e) => error!("Failed to receive admin room event from channel: {e}"), } } } + } + + async fn handle_event(&self, event: AdminRoomEvent, admin_room: &OwnedRoomId, server_user: &UserId) -> Result<()> { + let (mut message_content, reply) = match event { + AdminRoomEvent::SendMessage(content) => (content, None), + AdminRoomEvent::ProcessMessage(room_message, reply_id) => { + (self.process_admin_message(room_message).await, Some(reply_id)) + }, + }; + + let mutex_state = Arc::clone( + services() + .globals + .roomid_mutex_state + .write() + .await + .entry(admin_room.clone()) + .or_default(), + ); + let state_lock = mutex_state.lock().await; + + if let Some(reply) = reply { + message_content.relates_to = Some(Reply { + in_reply_to: InReplyTo { + event_id: reply.into(), + }, + }); + } + + let response_pdu = PduBuilder { + event_type: TimelineEventType::RoomMessage, + content: to_raw_value(&message_content).expect("event is valid, we just created it"), + unsigned: None, + state_key: None, + redacts: None, + }; + + if let Err(e) = services() + .rooms + .timeline + .build_and_append_pdu(response_pdu, server_user, admin_room, &state_lock) + .await + { + self.handle_response_error(&e, admin_room, server_user, &state_lock) + .await?; + } Ok(()) } - pub(crate) async fn process_message(&self, room_message: String, event_id: Arc) { - self.sender - .send_async(AdminRoomEvent::ProcessMessage(room_message, event_id)) - .await - .unwrap(); - } + async fn handle_response_error( + &self, e: &Error, admin_room: &OwnedRoomId, server_user: &UserId, state_lock: &MutexGuard<'_, ()>, + ) -> Result<()> { + error!("Failed to build and append admin room response PDU: \"{e}\""); + let error_room_message = RoomMessageEventContent::text_plain(format!( + "Failed to build and append admin room PDU: \"{e}\"\n\nThe original admin command may have finished \ + successfully, but we could not return the output." + )); - pub(crate) async fn send_message(&self, message_content: RoomMessageEventContent) { - self.sender - .send_async(AdminRoomEvent::SendMessage(message_content)) - .await - .unwrap(); + let response_pdu = PduBuilder { + event_type: TimelineEventType::RoomMessage, + content: to_raw_value(&error_room_message).expect("event is valid, we just created it"), + unsigned: None, + state_key: None, + redacts: None, + }; + + services() + .rooms + .timeline + .build_and_append_pdu(response_pdu, server_user, admin_room, state_lock) + .await?; + + Ok(()) } // Parse and process a message from the admin room @@ -389,10 +402,10 @@ impl Service { let state_lock = mutex_state.lock().await; // Create a user for the server - let conduit_user = UserId::parse_with_server_name("conduit", services().globals.server_name()) + let server_user = UserId::parse_with_server_name("conduit", services().globals.server_name()) .expect("@conduit:server_name is valid"); - services().users.create(&conduit_user, None)?; + services().users.create(&server_user, None)?; let room_version = services().globals.default_room_version(); let mut content = match room_version { @@ -405,7 +418,7 @@ impl Service { | RoomVersionId::V7 | RoomVersionId::V8 | RoomVersionId::V9 - | RoomVersionId::V10 => RoomCreateEventContent::new_v1(conduit_user.clone()), + | RoomVersionId::V10 => RoomCreateEventContent::new_v1(server_user.clone()), RoomVersionId::V11 => RoomCreateEventContent::new_v11(), _ => { warn!("Unexpected or unsupported room version {}", room_version); @@ -432,7 +445,7 @@ impl Service { state_key: Some(String::new()), redacts: None, }, - &conduit_user, + &server_user, &room_id, &state_lock, ) @@ -457,10 +470,10 @@ impl Service { }) .expect("event is valid, we just created it"), unsigned: None, - state_key: Some(conduit_user.to_string()), + state_key: Some(server_user.to_string()), redacts: None, }, - &conduit_user, + &server_user, &room_id, &state_lock, ) @@ -468,7 +481,7 @@ impl Service { // 3. Power levels let mut users = BTreeMap::new(); - users.insert(conduit_user.clone(), 100.into()); + users.insert(server_user.clone(), 100.into()); services() .rooms @@ -485,7 +498,7 @@ impl Service { state_key: Some(String::new()), redacts: None, }, - &conduit_user, + &server_user, &room_id, &state_lock, ) @@ -504,7 +517,7 @@ impl Service { state_key: Some(String::new()), redacts: None, }, - &conduit_user, + &server_user, &room_id, &state_lock, ) @@ -523,7 +536,7 @@ impl Service { state_key: Some(String::new()), redacts: None, }, - &conduit_user, + &server_user, &room_id, &state_lock, ) @@ -542,7 +555,7 @@ impl Service { state_key: Some(String::new()), redacts: None, }, - &conduit_user, + &server_user, &room_id, &state_lock, ) @@ -562,7 +575,7 @@ impl Service { state_key: Some(String::new()), redacts: None, }, - &conduit_user, + &server_user, &room_id, &state_lock, ) @@ -582,7 +595,7 @@ impl Service { state_key: Some(String::new()), redacts: None, }, - &conduit_user, + &server_user, &room_id, &state_lock, ) @@ -608,7 +621,7 @@ impl Service { state_key: Some(String::new()), redacts: None, }, - &conduit_user, + &server_user, &room_id, &state_lock, ) @@ -651,7 +664,7 @@ impl Service { let state_lock = mutex_state.lock().await; // Use the server user to grant the new admin's power level - let conduit_user = UserId::parse_with_server_name("conduit", services().globals.server_name()) + let server_user = UserId::parse_with_server_name("conduit", services().globals.server_name()) .expect("@conduit:server_name is valid"); // Invite and join the real user @@ -676,7 +689,7 @@ impl Service { state_key: Some(user_id.to_string()), redacts: None, }, - &conduit_user, + &server_user, &room_id, &state_lock, ) @@ -710,7 +723,7 @@ impl Service { // Set power level let mut users = BTreeMap::new(); - users.insert(conduit_user.clone(), 100.into()); + users.insert(server_user.clone(), 100.into()); users.insert(user_id.to_owned(), 100.into()); services() @@ -728,7 +741,7 @@ impl Service { state_key: Some(String::new()), redacts: None, }, - &conduit_user, + &server_user, &room_id, &state_lock, ) @@ -747,7 +760,7 @@ impl Service { state_key: None, redacts: None, }, - &conduit_user, + &server_user, &room_id, &state_lock, ).await?;