cleanup admin worker loop
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
4b6938e0f6
commit
56f652c12d
1 changed files with 116 additions and 103 deletions
|
@ -23,7 +23,7 @@ use ruma::{
|
||||||
EventId, MxcUri, OwnedRoomAliasId, OwnedRoomId, RoomAliasId, RoomId, RoomVersionId, ServerName, UserId,
|
EventId, MxcUri, OwnedRoomAliasId, OwnedRoomId, RoomAliasId, RoomId, RoomVersionId, ServerName, UserId,
|
||||||
};
|
};
|
||||||
use serde_json::value::to_raw_value;
|
use serde_json::value::to_raw_value;
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::{Mutex, MutexGuard};
|
||||||
use tracing::{error, warn};
|
use tracing::{error, warn};
|
||||||
|
|
||||||
use self::fsck::FsckCommand;
|
use self::fsck::FsckCommand;
|
||||||
|
@ -119,100 +119,113 @@ impl Service {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn process_message(&self, room_message: String, event_id: Arc<EventId>) {
|
||||||
|
self.send(AdminRoomEvent::ProcessMessage(room_message, event_id))
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn send_message(&self, message_content: RoomMessageEventContent) {
|
||||||
|
self.send(AdminRoomEvent::SendMessage(message_content))
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send(&self, message: AdminRoomEvent) {
|
||||||
|
debug_assert!(!self.sender.is_full(), "channel full");
|
||||||
|
debug_assert!(!self.sender.is_closed(), "channel closed");
|
||||||
|
self.sender.send(message).expect("message sent");
|
||||||
|
}
|
||||||
|
|
||||||
async fn handler(&self) -> Result<()> {
|
async fn handler(&self) -> Result<()> {
|
||||||
let receiver = self.receiver.lock().await;
|
let receiver = self.receiver.lock().await;
|
||||||
// TODO: Use futures when we have long admin commands
|
let Ok(Some(admin_room)) = Self::get_admin_room().await else {
|
||||||
//let mut futures = FuturesUnordered::new();
|
return Ok(());
|
||||||
|
};
|
||||||
|
let server_name = services().globals.server_name();
|
||||||
|
let server_user = UserId::parse(format!("@conduit:{server_name}")).expect("server's username is valid");
|
||||||
|
|
||||||
let conduit_user = UserId::parse(format!("@conduit:{}", services().globals.server_name()))
|
loop {
|
||||||
.expect("@conduit:server_name is valid");
|
debug_assert!(!receiver.is_closed(), "channel closed");
|
||||||
|
tokio::select! {
|
||||||
if let Ok(Some(conduit_room)) = Self::get_admin_room().await {
|
event = receiver.recv_async() => match event {
|
||||||
loop {
|
Ok(event) => self.handle_event(event, &admin_room, &server_user).await?,
|
||||||
tokio::select! {
|
Err(e) => error!("Failed to receive admin room event from channel: {e}"),
|
||||||
event = receiver.recv_async() => {
|
|
||||||
match event {
|
|
||||||
Ok(event) => {
|
|
||||||
let (mut message_content, reply) = match event {
|
|
||||||
AdminRoomEvent::SendMessage(content) => (content, None),
|
|
||||||
AdminRoomEvent::ProcessMessage(room_message, reply_id) => {
|
|
||||||
(self.process_admin_message(room_message).await, Some(reply_id))
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let mutex_state = Arc::clone(
|
|
||||||
services().globals
|
|
||||||
.roomid_mutex_state
|
|
||||||
.write()
|
|
||||||
.await
|
|
||||||
.entry(conduit_room.clone())
|
|
||||||
.or_default(),
|
|
||||||
);
|
|
||||||
|
|
||||||
let state_lock = mutex_state.lock().await;
|
|
||||||
|
|
||||||
if let Some(reply) = reply {
|
|
||||||
message_content.relates_to = Some(Reply { in_reply_to: InReplyTo { event_id: reply.into() } });
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Err(e) = services().rooms.timeline.build_and_append_pdu(
|
|
||||||
PduBuilder {
|
|
||||||
event_type: TimelineEventType::RoomMessage,
|
|
||||||
content: to_raw_value(&message_content)
|
|
||||||
.expect("event is valid, we just created it"),
|
|
||||||
unsigned: None,
|
|
||||||
state_key: None,
|
|
||||||
redacts: None,
|
|
||||||
},
|
|
||||||
&conduit_user,
|
|
||||||
&conduit_room,
|
|
||||||
&state_lock)
|
|
||||||
.await {
|
|
||||||
error!("Failed to build and append admin room response PDU: \"{e}\"");
|
|
||||||
|
|
||||||
let error_room_message = RoomMessageEventContent::text_plain(format!("Failed to build and append admin room PDU: \"{e}\"\n\nThe original admin command may have finished successfully, but we could not return the output."));
|
|
||||||
|
|
||||||
services().rooms.timeline.build_and_append_pdu(
|
|
||||||
PduBuilder {
|
|
||||||
event_type: TimelineEventType::RoomMessage,
|
|
||||||
content: to_raw_value(&error_room_message)
|
|
||||||
.expect("event is valid, we just created it"),
|
|
||||||
unsigned: None,
|
|
||||||
state_key: None,
|
|
||||||
redacts: None,
|
|
||||||
},
|
|
||||||
&conduit_user,
|
|
||||||
&conduit_room,
|
|
||||||
&state_lock)
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
drop(state_lock);
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
// generally shouldn't happen
|
|
||||||
error!("Failed to receive admin room event from channel: {e}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_event(&self, event: AdminRoomEvent, admin_room: &OwnedRoomId, server_user: &UserId) -> Result<()> {
|
||||||
|
let (mut message_content, reply) = match event {
|
||||||
|
AdminRoomEvent::SendMessage(content) => (content, None),
|
||||||
|
AdminRoomEvent::ProcessMessage(room_message, reply_id) => {
|
||||||
|
(self.process_admin_message(room_message).await, Some(reply_id))
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
let mutex_state = Arc::clone(
|
||||||
|
services()
|
||||||
|
.globals
|
||||||
|
.roomid_mutex_state
|
||||||
|
.write()
|
||||||
|
.await
|
||||||
|
.entry(admin_room.clone())
|
||||||
|
.or_default(),
|
||||||
|
);
|
||||||
|
let state_lock = mutex_state.lock().await;
|
||||||
|
|
||||||
|
if let Some(reply) = reply {
|
||||||
|
message_content.relates_to = Some(Reply {
|
||||||
|
in_reply_to: InReplyTo {
|
||||||
|
event_id: reply.into(),
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
let response_pdu = PduBuilder {
|
||||||
|
event_type: TimelineEventType::RoomMessage,
|
||||||
|
content: to_raw_value(&message_content).expect("event is valid, we just created it"),
|
||||||
|
unsigned: None,
|
||||||
|
state_key: None,
|
||||||
|
redacts: None,
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err(e) = services()
|
||||||
|
.rooms
|
||||||
|
.timeline
|
||||||
|
.build_and_append_pdu(response_pdu, server_user, admin_room, &state_lock)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
self.handle_response_error(&e, admin_room, server_user, &state_lock)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn process_message(&self, room_message: String, event_id: Arc<EventId>) {
|
async fn handle_response_error(
|
||||||
self.sender
|
&self, e: &Error, admin_room: &OwnedRoomId, server_user: &UserId, state_lock: &MutexGuard<'_, ()>,
|
||||||
.send_async(AdminRoomEvent::ProcessMessage(room_message, event_id))
|
) -> Result<()> {
|
||||||
.await
|
error!("Failed to build and append admin room response PDU: \"{e}\"");
|
||||||
.unwrap();
|
let error_room_message = RoomMessageEventContent::text_plain(format!(
|
||||||
}
|
"Failed to build and append admin room PDU: \"{e}\"\n\nThe original admin command may have finished \
|
||||||
|
successfully, but we could not return the output."
|
||||||
|
));
|
||||||
|
|
||||||
pub(crate) async fn send_message(&self, message_content: RoomMessageEventContent) {
|
let response_pdu = PduBuilder {
|
||||||
self.sender
|
event_type: TimelineEventType::RoomMessage,
|
||||||
.send_async(AdminRoomEvent::SendMessage(message_content))
|
content: to_raw_value(&error_room_message).expect("event is valid, we just created it"),
|
||||||
.await
|
unsigned: None,
|
||||||
.unwrap();
|
state_key: None,
|
||||||
|
redacts: None,
|
||||||
|
};
|
||||||
|
|
||||||
|
services()
|
||||||
|
.rooms
|
||||||
|
.timeline
|
||||||
|
.build_and_append_pdu(response_pdu, server_user, admin_room, state_lock)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Parse and process a message from the admin room
|
// Parse and process a message from the admin room
|
||||||
|
@ -389,10 +402,10 @@ impl Service {
|
||||||
let state_lock = mutex_state.lock().await;
|
let state_lock = mutex_state.lock().await;
|
||||||
|
|
||||||
// Create a user for the server
|
// Create a user for the server
|
||||||
let conduit_user = UserId::parse_with_server_name("conduit", services().globals.server_name())
|
let server_user = UserId::parse_with_server_name("conduit", services().globals.server_name())
|
||||||
.expect("@conduit:server_name is valid");
|
.expect("@conduit:server_name is valid");
|
||||||
|
|
||||||
services().users.create(&conduit_user, None)?;
|
services().users.create(&server_user, None)?;
|
||||||
|
|
||||||
let room_version = services().globals.default_room_version();
|
let room_version = services().globals.default_room_version();
|
||||||
let mut content = match room_version {
|
let mut content = match room_version {
|
||||||
|
@ -405,7 +418,7 @@ impl Service {
|
||||||
| RoomVersionId::V7
|
| RoomVersionId::V7
|
||||||
| RoomVersionId::V8
|
| RoomVersionId::V8
|
||||||
| RoomVersionId::V9
|
| RoomVersionId::V9
|
||||||
| RoomVersionId::V10 => RoomCreateEventContent::new_v1(conduit_user.clone()),
|
| RoomVersionId::V10 => RoomCreateEventContent::new_v1(server_user.clone()),
|
||||||
RoomVersionId::V11 => RoomCreateEventContent::new_v11(),
|
RoomVersionId::V11 => RoomCreateEventContent::new_v11(),
|
||||||
_ => {
|
_ => {
|
||||||
warn!("Unexpected or unsupported room version {}", room_version);
|
warn!("Unexpected or unsupported room version {}", room_version);
|
||||||
|
@ -432,7 +445,7 @@ impl Service {
|
||||||
state_key: Some(String::new()),
|
state_key: Some(String::new()),
|
||||||
redacts: None,
|
redacts: None,
|
||||||
},
|
},
|
||||||
&conduit_user,
|
&server_user,
|
||||||
&room_id,
|
&room_id,
|
||||||
&state_lock,
|
&state_lock,
|
||||||
)
|
)
|
||||||
|
@ -457,10 +470,10 @@ impl Service {
|
||||||
})
|
})
|
||||||
.expect("event is valid, we just created it"),
|
.expect("event is valid, we just created it"),
|
||||||
unsigned: None,
|
unsigned: None,
|
||||||
state_key: Some(conduit_user.to_string()),
|
state_key: Some(server_user.to_string()),
|
||||||
redacts: None,
|
redacts: None,
|
||||||
},
|
},
|
||||||
&conduit_user,
|
&server_user,
|
||||||
&room_id,
|
&room_id,
|
||||||
&state_lock,
|
&state_lock,
|
||||||
)
|
)
|
||||||
|
@ -468,7 +481,7 @@ impl Service {
|
||||||
|
|
||||||
// 3. Power levels
|
// 3. Power levels
|
||||||
let mut users = BTreeMap::new();
|
let mut users = BTreeMap::new();
|
||||||
users.insert(conduit_user.clone(), 100.into());
|
users.insert(server_user.clone(), 100.into());
|
||||||
|
|
||||||
services()
|
services()
|
||||||
.rooms
|
.rooms
|
||||||
|
@ -485,7 +498,7 @@ impl Service {
|
||||||
state_key: Some(String::new()),
|
state_key: Some(String::new()),
|
||||||
redacts: None,
|
redacts: None,
|
||||||
},
|
},
|
||||||
&conduit_user,
|
&server_user,
|
||||||
&room_id,
|
&room_id,
|
||||||
&state_lock,
|
&state_lock,
|
||||||
)
|
)
|
||||||
|
@ -504,7 +517,7 @@ impl Service {
|
||||||
state_key: Some(String::new()),
|
state_key: Some(String::new()),
|
||||||
redacts: None,
|
redacts: None,
|
||||||
},
|
},
|
||||||
&conduit_user,
|
&server_user,
|
||||||
&room_id,
|
&room_id,
|
||||||
&state_lock,
|
&state_lock,
|
||||||
)
|
)
|
||||||
|
@ -523,7 +536,7 @@ impl Service {
|
||||||
state_key: Some(String::new()),
|
state_key: Some(String::new()),
|
||||||
redacts: None,
|
redacts: None,
|
||||||
},
|
},
|
||||||
&conduit_user,
|
&server_user,
|
||||||
&room_id,
|
&room_id,
|
||||||
&state_lock,
|
&state_lock,
|
||||||
)
|
)
|
||||||
|
@ -542,7 +555,7 @@ impl Service {
|
||||||
state_key: Some(String::new()),
|
state_key: Some(String::new()),
|
||||||
redacts: None,
|
redacts: None,
|
||||||
},
|
},
|
||||||
&conduit_user,
|
&server_user,
|
||||||
&room_id,
|
&room_id,
|
||||||
&state_lock,
|
&state_lock,
|
||||||
)
|
)
|
||||||
|
@ -562,7 +575,7 @@ impl Service {
|
||||||
state_key: Some(String::new()),
|
state_key: Some(String::new()),
|
||||||
redacts: None,
|
redacts: None,
|
||||||
},
|
},
|
||||||
&conduit_user,
|
&server_user,
|
||||||
&room_id,
|
&room_id,
|
||||||
&state_lock,
|
&state_lock,
|
||||||
)
|
)
|
||||||
|
@ -582,7 +595,7 @@ impl Service {
|
||||||
state_key: Some(String::new()),
|
state_key: Some(String::new()),
|
||||||
redacts: None,
|
redacts: None,
|
||||||
},
|
},
|
||||||
&conduit_user,
|
&server_user,
|
||||||
&room_id,
|
&room_id,
|
||||||
&state_lock,
|
&state_lock,
|
||||||
)
|
)
|
||||||
|
@ -608,7 +621,7 @@ impl Service {
|
||||||
state_key: Some(String::new()),
|
state_key: Some(String::new()),
|
||||||
redacts: None,
|
redacts: None,
|
||||||
},
|
},
|
||||||
&conduit_user,
|
&server_user,
|
||||||
&room_id,
|
&room_id,
|
||||||
&state_lock,
|
&state_lock,
|
||||||
)
|
)
|
||||||
|
@ -651,7 +664,7 @@ impl Service {
|
||||||
let state_lock = mutex_state.lock().await;
|
let state_lock = mutex_state.lock().await;
|
||||||
|
|
||||||
// Use the server user to grant the new admin's power level
|
// Use the server user to grant the new admin's power level
|
||||||
let conduit_user = UserId::parse_with_server_name("conduit", services().globals.server_name())
|
let server_user = UserId::parse_with_server_name("conduit", services().globals.server_name())
|
||||||
.expect("@conduit:server_name is valid");
|
.expect("@conduit:server_name is valid");
|
||||||
|
|
||||||
// Invite and join the real user
|
// Invite and join the real user
|
||||||
|
@ -676,7 +689,7 @@ impl Service {
|
||||||
state_key: Some(user_id.to_string()),
|
state_key: Some(user_id.to_string()),
|
||||||
redacts: None,
|
redacts: None,
|
||||||
},
|
},
|
||||||
&conduit_user,
|
&server_user,
|
||||||
&room_id,
|
&room_id,
|
||||||
&state_lock,
|
&state_lock,
|
||||||
)
|
)
|
||||||
|
@ -710,7 +723,7 @@ impl Service {
|
||||||
|
|
||||||
// Set power level
|
// Set power level
|
||||||
let mut users = BTreeMap::new();
|
let mut users = BTreeMap::new();
|
||||||
users.insert(conduit_user.clone(), 100.into());
|
users.insert(server_user.clone(), 100.into());
|
||||||
users.insert(user_id.to_owned(), 100.into());
|
users.insert(user_id.to_owned(), 100.into());
|
||||||
|
|
||||||
services()
|
services()
|
||||||
|
@ -728,7 +741,7 @@ impl Service {
|
||||||
state_key: Some(String::new()),
|
state_key: Some(String::new()),
|
||||||
redacts: None,
|
redacts: None,
|
||||||
},
|
},
|
||||||
&conduit_user,
|
&server_user,
|
||||||
&room_id,
|
&room_id,
|
||||||
&state_lock,
|
&state_lock,
|
||||||
)
|
)
|
||||||
|
@ -747,7 +760,7 @@ impl Service {
|
||||||
state_key: None,
|
state_key: None,
|
||||||
redacts: None,
|
redacts: None,
|
||||||
},
|
},
|
||||||
&conduit_user,
|
&server_user,
|
||||||
&room_id,
|
&room_id,
|
||||||
&state_lock,
|
&state_lock,
|
||||||
).await?;
|
).await?;
|
||||||
|
|
Loading…
Add table
Reference in a new issue