admin cmd to force download and use a server's room state

Signed-off-by: strawberry <strawberry@puppygock.gay>
This commit is contained in:
strawberry 2024-06-07 18:35:48 -04:00
parent 1287a86c05
commit 03ba9bde29
2 changed files with 176 additions and 4 deletions

View file

@ -1,9 +1,15 @@
use std::{collections::BTreeMap, sync::Arc, time::Instant};
use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
time::Instant,
};
use api::client::validate_and_add_event_id;
use conduit::{utils::HtmlEscape, Error, Result};
use ruma::{
api::client::error::ErrorKind, events::room::message::RoomMessageEventContent, CanonicalJsonObject, EventId,
RoomId, RoomVersionId, ServerName,
api::{client::error::ErrorKind, federation::event::get_room_state},
events::room::message::RoomMessageEventContent,
CanonicalJsonObject, EventId, RoomId, RoomVersionId, ServerName,
};
use service::{rooms::event_handler::parse_incoming_pdu, sending::resolve::resolve_actual_dest, services, PduEvent};
use tokio::sync::RwLock;
@ -472,6 +478,147 @@ pub(crate) async fn latest_pdu_in_room(_body: Vec<&str>, room_id: Box<RoomId>) -
Ok(RoomMessageEventContent::text_plain(format!("{latest_pdu:?}")))
}
#[tracing::instrument(skip(_body))]
pub(crate) async fn force_set_room_state_from_server(
_body: Vec<&str>, server_name: Box<ServerName>, room_id: Box<RoomId>,
) -> Result<RoomMessageEventContent> {
if !services()
.rooms
.state_cache
.server_in_room(&services().globals.config.server_name, &room_id)?
{
return Ok(RoomMessageEventContent::text_plain(
"We are not participating in the room / we don't know about the room ID.",
));
}
let first_pdu = services()
.rooms
.timeline
.latest_pdu_in_room(&room_id)?
.ok_or_else(|| Error::bad_database("Failed to find the latest PDU in database"))?;
let room_version = services().rooms.state.get_room_version(&room_id)?;
let mut state: HashMap<u64, Arc<EventId>> = HashMap::new();
let pub_key_map = RwLock::new(BTreeMap::new());
let remote_state_response = services()
.sending
.send_federation_request(
&server_name,
get_room_state::v1::Request {
room_id: room_id.clone().into(),
event_id: first_pdu.event_id.clone().into(),
},
)
.await?;
let mut events = Vec::with_capacity(remote_state_response.pdus.len());
for pdu in remote_state_response.pdus.clone() {
events.push(match parse_incoming_pdu(&pdu) {
Ok(t) => t,
Err(e) => {
warn!("Could not parse PDU, ignoring: {e}");
continue;
},
});
}
info!("Fetching required signing keys for all the state events we got");
services()
.rooms
.event_handler
.fetch_required_signing_keys(events.iter().map(|(_event_id, event, _room_id)| event), &pub_key_map)
.await?;
info!("Going through room_state response PDUs");
for result in remote_state_response
.pdus
.iter()
.map(|pdu| validate_and_add_event_id(pdu, &room_version, &pub_key_map))
{
let Ok((event_id, value)) = result.await else {
continue;
};
let pdu = PduEvent::from_id_val(&event_id, value.clone()).map_err(|e| {
warn!("Invalid PDU in fetching remote room state PDUs response: {} {:?}", e, value);
Error::BadServerResponse("Invalid PDU in send_join response.")
})?;
services()
.rooms
.outlier
.add_pdu_outlier(&event_id, &value)?;
if let Some(state_key) = &pdu.state_key {
let shortstatekey = services()
.rooms
.short
.get_or_create_shortstatekey(&pdu.kind.to_string().into(), state_key)?;
state.insert(shortstatekey, pdu.event_id.clone());
}
}
info!("Going through auth_chain response");
for result in remote_state_response
.auth_chain
.iter()
.map(|pdu| validate_and_add_event_id(pdu, &room_version, &pub_key_map))
{
let Ok((event_id, value)) = result.await else {
continue;
};
services()
.rooms
.outlier
.add_pdu_outlier(&event_id, &value)?;
}
let new_room_state = services()
.rooms
.event_handler
.resolve_state(room_id.clone().as_ref(), &room_version, state)
.await?;
info!("Forcing new room state");
let (short_state_hash, new, removed) = services()
.rooms
.state_compressor
.save_state(room_id.clone().as_ref(), new_room_state)?;
let mutex_state = Arc::clone(
services()
.globals
.roomid_mutex_state
.write()
.await
.entry(room_id.clone().into())
.or_default(),
);
let state_lock = mutex_state.lock().await;
services()
.rooms
.state
.force_state(room_id.clone().as_ref(), short_state_hash, new, removed, &state_lock)
.await?;
info!(
"Updating joined counts for room just in case (e.g. we may have found a difference in the room's \
m.room.member state"
);
services().rooms.state_cache.update_joined_count(&room_id)?;
drop(state_lock);
Ok(RoomMessageEventContent::text_plain(
"Successfully forced the room state from the requested remote server.",
))
}
pub(crate) async fn resolve_true_destination(
_body: Vec<&str>, server_name: Box<ServerName>, no_cache: bool,
) -> Result<RoomMessageEventContent> {

View file

@ -1,5 +1,5 @@
use clap::Subcommand;
use debug_commands::{first_pdu_in_room, latest_pdu_in_room};
use debug_commands::{first_pdu_in_room, force_set_room_state_from_server, latest_pdu_in_room};
use ruma::{events::room::message::RoomMessageEventContent, EventId, RoomId, ServerName};
use self::debug_commands::{
@ -123,6 +123,27 @@ pub(crate) enum DebugCommand {
room_id: Box<RoomId>,
},
/// - Forcefully replaces the room state of our local copy of the specified
/// room, with the copy (auth chain and room state events) the specified
/// remote server says.
///
/// A common desire for room deletion is to simply "reset" our copy of the
/// room. While this admin command is not a replacement for that, if you
/// know you have split/broken room state and you know another server in the
/// room that has the best/working room state, this command can let you use
/// their room state. Such example is your server saying users are in a
/// room, but other servers are saying they're not in the room in question.
///
/// This command will get the latest PDU in the room we know about, and
/// request the room state at that point in time via
/// `/_matrix/federation/v1/state/{roomId}`.
ForceSetRoomStateFromServer {
/// The impacted room ID
room_id: Box<RoomId>,
/// The server we will use to query the room state for
server_name: Box<ServerName>,
},
/// - Runs a server name through conduwuit's true destination resolution
/// process
///
@ -174,6 +195,10 @@ pub(crate) async fn process(command: DebugCommand, body: Vec<&str>) -> Result<Ro
server,
force,
} => get_remote_pdu_list(body, server, force).await?,
DebugCommand::ForceSetRoomStateFromServer {
room_id,
server_name,
} => force_set_room_state_from_server(body, server_name, room_id).await?,
DebugCommand::ResolveTrueDestination {
server_name,
no_cache,