diff --git a/src/admin/debug/debug_commands.rs b/src/admin/debug/debug_commands.rs index 4522b678..56563748 100644 --- a/src/admin/debug/debug_commands.rs +++ b/src/admin/debug/debug_commands.rs @@ -1,9 +1,15 @@ -use std::{collections::BTreeMap, sync::Arc, time::Instant}; +use std::{ + collections::{BTreeMap, HashMap}, + sync::Arc, + time::Instant, +}; +use api::client::validate_and_add_event_id; use conduit::{utils::HtmlEscape, Error, Result}; use ruma::{ - api::client::error::ErrorKind, events::room::message::RoomMessageEventContent, CanonicalJsonObject, EventId, - RoomId, RoomVersionId, ServerName, + api::{client::error::ErrorKind, federation::event::get_room_state}, + events::room::message::RoomMessageEventContent, + CanonicalJsonObject, EventId, RoomId, RoomVersionId, ServerName, }; use service::{rooms::event_handler::parse_incoming_pdu, sending::resolve::resolve_actual_dest, services, PduEvent}; use tokio::sync::RwLock; @@ -472,6 +478,147 @@ pub(crate) async fn latest_pdu_in_room(_body: Vec<&str>, room_id: Box) - Ok(RoomMessageEventContent::text_plain(format!("{latest_pdu:?}"))) } +#[tracing::instrument(skip(_body))] +pub(crate) async fn force_set_room_state_from_server( + _body: Vec<&str>, server_name: Box, room_id: Box, +) -> Result { + if !services() + .rooms + .state_cache + .server_in_room(&services().globals.config.server_name, &room_id)? + { + return Ok(RoomMessageEventContent::text_plain( + "We are not participating in the room / we don't know about the room ID.", + )); + } + + let first_pdu = services() + .rooms + .timeline + .latest_pdu_in_room(&room_id)? + .ok_or_else(|| Error::bad_database("Failed to find the latest PDU in database"))?; + + let room_version = services().rooms.state.get_room_version(&room_id)?; + + let mut state: HashMap> = HashMap::new(); + let pub_key_map = RwLock::new(BTreeMap::new()); + + let remote_state_response = services() + .sending + .send_federation_request( + &server_name, + get_room_state::v1::Request { + room_id: room_id.clone().into(), + event_id: first_pdu.event_id.clone().into(), + }, + ) + .await?; + + let mut events = Vec::with_capacity(remote_state_response.pdus.len()); + + for pdu in remote_state_response.pdus.clone() { + events.push(match parse_incoming_pdu(&pdu) { + Ok(t) => t, + Err(e) => { + warn!("Could not parse PDU, ignoring: {e}"); + continue; + }, + }); + } + + info!("Fetching required signing keys for all the state events we got"); + services() + .rooms + .event_handler + .fetch_required_signing_keys(events.iter().map(|(_event_id, event, _room_id)| event), &pub_key_map) + .await?; + + info!("Going through room_state response PDUs"); + for result in remote_state_response + .pdus + .iter() + .map(|pdu| validate_and_add_event_id(pdu, &room_version, &pub_key_map)) + { + let Ok((event_id, value)) = result.await else { + continue; + }; + + let pdu = PduEvent::from_id_val(&event_id, value.clone()).map_err(|e| { + warn!("Invalid PDU in fetching remote room state PDUs response: {} {:?}", e, value); + Error::BadServerResponse("Invalid PDU in send_join response.") + })?; + + services() + .rooms + .outlier + .add_pdu_outlier(&event_id, &value)?; + if let Some(state_key) = &pdu.state_key { + let shortstatekey = services() + .rooms + .short + .get_or_create_shortstatekey(&pdu.kind.to_string().into(), state_key)?; + state.insert(shortstatekey, pdu.event_id.clone()); + } + } + + info!("Going through auth_chain response"); + for result in remote_state_response + .auth_chain + .iter() + .map(|pdu| validate_and_add_event_id(pdu, &room_version, &pub_key_map)) + { + let Ok((event_id, value)) = result.await else { + continue; + }; + + services() + .rooms + .outlier + .add_pdu_outlier(&event_id, &value)?; + } + + let new_room_state = services() + .rooms + .event_handler + .resolve_state(room_id.clone().as_ref(), &room_version, state) + .await?; + + info!("Forcing new room state"); + let (short_state_hash, new, removed) = services() + .rooms + .state_compressor + .save_state(room_id.clone().as_ref(), new_room_state)?; + + let mutex_state = Arc::clone( + services() + .globals + .roomid_mutex_state + .write() + .await + .entry(room_id.clone().into()) + .or_default(), + ); + let state_lock = mutex_state.lock().await; + + services() + .rooms + .state + .force_state(room_id.clone().as_ref(), short_state_hash, new, removed, &state_lock) + .await?; + + info!( + "Updating joined counts for room just in case (e.g. we may have found a difference in the room's \ + m.room.member state" + ); + services().rooms.state_cache.update_joined_count(&room_id)?; + + drop(state_lock); + + Ok(RoomMessageEventContent::text_plain( + "Successfully forced the room state from the requested remote server.", + )) +} + pub(crate) async fn resolve_true_destination( _body: Vec<&str>, server_name: Box, no_cache: bool, ) -> Result { diff --git a/src/admin/debug/mod.rs b/src/admin/debug/mod.rs index 614a4ae5..afcc49b0 100644 --- a/src/admin/debug/mod.rs +++ b/src/admin/debug/mod.rs @@ -1,5 +1,5 @@ use clap::Subcommand; -use debug_commands::{first_pdu_in_room, latest_pdu_in_room}; +use debug_commands::{first_pdu_in_room, force_set_room_state_from_server, latest_pdu_in_room}; use ruma::{events::room::message::RoomMessageEventContent, EventId, RoomId, ServerName}; use self::debug_commands::{ @@ -123,6 +123,27 @@ pub(crate) enum DebugCommand { room_id: Box, }, + /// - Forcefully replaces the room state of our local copy of the specified + /// room, with the copy (auth chain and room state events) the specified + /// remote server says. + /// + /// A common desire for room deletion is to simply "reset" our copy of the + /// room. While this admin command is not a replacement for that, if you + /// know you have split/broken room state and you know another server in the + /// room that has the best/working room state, this command can let you use + /// their room state. Such example is your server saying users are in a + /// room, but other servers are saying they're not in the room in question. + /// + /// This command will get the latest PDU in the room we know about, and + /// request the room state at that point in time via + /// `/_matrix/federation/v1/state/{roomId}`. + ForceSetRoomStateFromServer { + /// The impacted room ID + room_id: Box, + /// The server we will use to query the room state for + server_name: Box, + }, + /// - Runs a server name through conduwuit's true destination resolution /// process /// @@ -174,6 +195,10 @@ pub(crate) async fn process(command: DebugCommand, body: Vec<&str>) -> Result get_remote_pdu_list(body, server, force).await?, + DebugCommand::ForceSetRoomStateFromServer { + room_id, + server_name, + } => force_set_room_state_from_server(body, server_name, room_id).await?, DebugCommand::ResolveTrueDestination { server_name, no_cache,