Compare commits
2 commits
Author | SHA1 | Date | |
---|---|---|---|
|
62fcba7176 | ||
|
b4065a3e28 |
3 changed files with 187 additions and 80 deletions
|
@ -1,7 +1,7 @@
|
||||||
use std::{
|
use std::{
|
||||||
collections::BTreeMap,
|
collections::BTreeMap,
|
||||||
convert::{TryFrom, TryInto},
|
convert::{TryFrom, TryInto},
|
||||||
sync::Arc,
|
sync::{Arc, RwLock},
|
||||||
time::Instant,
|
time::Instant,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -21,12 +21,13 @@ use ruma::{
|
||||||
power_levels::RoomPowerLevelsEventContent,
|
power_levels::RoomPowerLevelsEventContent,
|
||||||
topic::RoomTopicEventContent,
|
topic::RoomTopicEventContent,
|
||||||
},
|
},
|
||||||
TimelineEventType,
|
StateEventType, TimelineEventType,
|
||||||
},
|
},
|
||||||
EventId, OwnedRoomAliasId, RoomAliasId, RoomId, RoomVersionId, ServerName, UserId,
|
EventId, OwnedRoomAliasId, RoomAliasId, RoomId, RoomVersionId, ServerName, UserId,
|
||||||
};
|
};
|
||||||
use serde_json::value::to_raw_value;
|
use serde_json::value::to_raw_value;
|
||||||
use tokio::sync::{mpsc, Mutex, MutexGuard};
|
use tokio::sync::{mpsc, Mutex, MutexGuard};
|
||||||
|
use tracing::{error, info};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
api::client_server::{leave_all_rooms, AUTO_GEN_PASSWORD_LENGTH},
|
api::client_server::{leave_all_rooms, AUTO_GEN_PASSWORD_LENGTH},
|
||||||
|
@ -153,6 +154,12 @@ enum AdminCommand {
|
||||||
password: Option<String>,
|
password: Option<String>,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
AskForState {
|
||||||
|
room_id: Box<RoomId>,
|
||||||
|
event_id: Box<EventId>,
|
||||||
|
server: Box<ServerName>,
|
||||||
|
},
|
||||||
|
|
||||||
/// Disables incoming federation handling for a room.
|
/// Disables incoming federation handling for a room.
|
||||||
DisableRoom { room_id: Box<RoomId> },
|
DisableRoom { room_id: Box<RoomId> },
|
||||||
/// Enables incoming federation handling for a room again.
|
/// Enables incoming federation handling for a room again.
|
||||||
|
@ -736,6 +743,71 @@ impl Service {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
AdminCommand::AskForState {
|
||||||
|
room_id,
|
||||||
|
event_id,
|
||||||
|
server,
|
||||||
|
} => {
|
||||||
|
let create_event = services()
|
||||||
|
.rooms
|
||||||
|
.state_accessor
|
||||||
|
.room_state_get(&room_id, &StateEventType::RoomCreate, "")?
|
||||||
|
.ok_or_else(|| Error::bad_database("Failed to find create event in db."))?;
|
||||||
|
let create_event_content: RoomCreateEventContent =
|
||||||
|
serde_json::from_str(create_event.content.get()).map_err(|e| {
|
||||||
|
error!("Invalid create event: {}", e);
|
||||||
|
Error::BadDatabase("Invalid create event in db")
|
||||||
|
})?;
|
||||||
|
let room_version_id = &create_event_content.room_version;
|
||||||
|
|
||||||
|
let state_at_event = services()
|
||||||
|
.rooms
|
||||||
|
.event_handler
|
||||||
|
.ask_for_state(
|
||||||
|
&room_id,
|
||||||
|
&event_id,
|
||||||
|
&server,
|
||||||
|
&create_event,
|
||||||
|
room_version_id,
|
||||||
|
&mut RwLock::new(BTreeMap::new()),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// We start looking at current room state now, so lets lock the room
|
||||||
|
let mutex_state = Arc::clone(
|
||||||
|
services()
|
||||||
|
.globals
|
||||||
|
.roomid_mutex_state
|
||||||
|
.write()
|
||||||
|
.unwrap()
|
||||||
|
.entry((*room_id).to_owned())
|
||||||
|
.or_default(),
|
||||||
|
);
|
||||||
|
let state_lock = mutex_state.lock().await;
|
||||||
|
|
||||||
|
let new_room_state = services()
|
||||||
|
.rooms
|
||||||
|
.event_handler
|
||||||
|
.resolve_state(&room_id, room_version_id, state_at_event)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// Set the new room state to the resolved state
|
||||||
|
info!("Forcing new room state");
|
||||||
|
|
||||||
|
let (sstatehash, new, removed) = services()
|
||||||
|
.rooms
|
||||||
|
.state_compressor
|
||||||
|
.save_state(&room_id, new_room_state)?;
|
||||||
|
|
||||||
|
services()
|
||||||
|
.rooms
|
||||||
|
.state
|
||||||
|
.force_state(&room_id, sstatehash, new, removed, &state_lock)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
drop(state_lock);
|
||||||
|
RoomMessageEventContent::text_plain("Updated state.")
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(reply_message_content)
|
Ok(reply_message_content)
|
||||||
|
|
|
@ -624,7 +624,7 @@ impl Service {
|
||||||
.collect::<Result<_>>()?,
|
.collect::<Result<_>>()?,
|
||||||
),
|
),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!("State resolution on prev events failed, either an event could not be found or deserialization: {}", e);
|
warn!("State resolution on prev events failed, either an event could not be found or deserialization failed: {}", e);
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -632,81 +632,18 @@ impl Service {
|
||||||
}
|
}
|
||||||
|
|
||||||
if state_at_incoming_event.is_none() {
|
if state_at_incoming_event.is_none() {
|
||||||
info!("Calling /state_ids");
|
state_at_incoming_event = Some(
|
||||||
// Call /state_ids to find out what the state at this pdu is. We trust the server's
|
self.ask_for_state(
|
||||||
// response to some extend, but we still do a lot of checks on the events
|
|
||||||
match services()
|
|
||||||
.sending
|
|
||||||
.send_federation_request(
|
|
||||||
origin,
|
|
||||||
get_room_state_ids::v1::Request {
|
|
||||||
room_id: room_id.to_owned(),
|
|
||||||
event_id: (*incoming_pdu.event_id).to_owned(),
|
|
||||||
},
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(res) => {
|
|
||||||
info!("Fetching state events at event.");
|
|
||||||
let state_vec = self
|
|
||||||
.fetch_and_handle_outliers(
|
|
||||||
origin,
|
|
||||||
&res.pdu_ids
|
|
||||||
.iter()
|
|
||||||
.map(|x| Arc::from(&**x))
|
|
||||||
.collect::<Vec<_>>(),
|
|
||||||
create_event,
|
|
||||||
room_id,
|
room_id,
|
||||||
|
&incoming_pdu.event_id,
|
||||||
|
origin,
|
||||||
|
create_event,
|
||||||
room_version_id,
|
room_version_id,
|
||||||
pub_key_map,
|
pub_key_map,
|
||||||
)
|
)
|
||||||
.await;
|
.await?,
|
||||||
|
);
|
||||||
let mut state: HashMap<_, Arc<EventId>> = HashMap::new();
|
|
||||||
for (pdu, _) in state_vec {
|
|
||||||
let state_key = pdu.state_key.clone().ok_or_else(|| {
|
|
||||||
Error::bad_database("Found non-state pdu in state events.")
|
|
||||||
})?;
|
|
||||||
|
|
||||||
let shortstatekey = services().rooms.short.get_or_create_shortstatekey(
|
|
||||||
&pdu.kind.to_string().into(),
|
|
||||||
&state_key,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
match state.entry(shortstatekey) {
|
|
||||||
hash_map::Entry::Vacant(v) => {
|
|
||||||
v.insert(Arc::from(&*pdu.event_id));
|
|
||||||
}
|
}
|
||||||
hash_map::Entry::Occupied(_) => return Err(
|
|
||||||
Error::bad_database("State event's type and state_key combination exists multiple times."),
|
|
||||||
),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// The original create event must still be in the state
|
|
||||||
let create_shortstatekey = services()
|
|
||||||
.rooms
|
|
||||||
.short
|
|
||||||
.get_shortstatekey(&StateEventType::RoomCreate, "")?
|
|
||||||
.expect("Room exists");
|
|
||||||
|
|
||||||
if state.get(&create_shortstatekey).map(|id| id.as_ref())
|
|
||||||
!= Some(&create_event.event_id)
|
|
||||||
{
|
|
||||||
return Err(Error::bad_database(
|
|
||||||
"Incoming event refers to wrong create event.",
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
state_at_incoming_event = Some(state);
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
warn!("Fetching state for event failed: {}", e);
|
|
||||||
return Err(e);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
let state_at_incoming_event =
|
let state_at_incoming_event =
|
||||||
state_at_incoming_event.expect("we always set this to some above");
|
state_at_incoming_event.expect("we always set this to some above");
|
||||||
|
|
||||||
|
@ -884,7 +821,91 @@ impl Service {
|
||||||
Ok(pdu_id)
|
Ok(pdu_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn resolve_state(
|
pub async fn ask_for_state(
|
||||||
|
&self,
|
||||||
|
room_id: &RoomId,
|
||||||
|
event_id: &EventId,
|
||||||
|
server_name: &ServerName,
|
||||||
|
create_event: &PduEvent,
|
||||||
|
room_version_id: &RoomVersionId,
|
||||||
|
pub_key_map: &RwLock<BTreeMap<String, BTreeMap<String, Base64>>>,
|
||||||
|
) -> Result<HashMap<u64, Arc<EventId>>> {
|
||||||
|
info!("Calling /state_ids");
|
||||||
|
// Call /state_ids to find out what the state at this pdu is. We trust the server's
|
||||||
|
// response to some extend, but we still do a lot of checks on the events
|
||||||
|
match services()
|
||||||
|
.sending
|
||||||
|
.send_federation_request(
|
||||||
|
server_name,
|
||||||
|
get_room_state_ids::v1::Request {
|
||||||
|
room_id: room_id.to_owned(),
|
||||||
|
event_id: event_id.to_owned(),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(res) => {
|
||||||
|
info!("Fetching state events at event.");
|
||||||
|
let state_vec = self
|
||||||
|
.fetch_and_handle_outliers(
|
||||||
|
server_name,
|
||||||
|
&res.pdu_ids
|
||||||
|
.iter()
|
||||||
|
.map(|x| Arc::from(&**x))
|
||||||
|
.collect::<Vec<_>>(),
|
||||||
|
create_event,
|
||||||
|
room_id,
|
||||||
|
room_version_id,
|
||||||
|
pub_key_map,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let mut state: HashMap<_, Arc<EventId>> = HashMap::new();
|
||||||
|
for (pdu, _) in state_vec {
|
||||||
|
let state_key = pdu.state_key.clone().ok_or_else(|| {
|
||||||
|
Error::bad_database("Found non-state pdu in state events.")
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let shortstatekey = services()
|
||||||
|
.rooms
|
||||||
|
.short
|
||||||
|
.get_or_create_shortstatekey(&pdu.kind.to_string().into(), &state_key)?;
|
||||||
|
|
||||||
|
match state.entry(shortstatekey) {
|
||||||
|
hash_map::Entry::Vacant(v) => {
|
||||||
|
v.insert(Arc::from(&*pdu.event_id));
|
||||||
|
}
|
||||||
|
hash_map::Entry::Occupied(_) => return Err(Error::bad_database(
|
||||||
|
"State event's type and state_key combination exists multiple times.",
|
||||||
|
)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// The original create event must still be in the state
|
||||||
|
let create_shortstatekey = services()
|
||||||
|
.rooms
|
||||||
|
.short
|
||||||
|
.get_shortstatekey(&StateEventType::RoomCreate, "")?
|
||||||
|
.expect("Room exists");
|
||||||
|
|
||||||
|
if state.get(&create_shortstatekey).map(|id| id.as_ref())
|
||||||
|
!= Some(&create_event.event_id)
|
||||||
|
{
|
||||||
|
return Err(Error::bad_database(
|
||||||
|
"Incoming event refers to wrong create event.",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(state)
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Fetching state for event failed: {}", e);
|
||||||
|
Err(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn resolve_state(
|
||||||
&self,
|
&self,
|
||||||
room_id: &RoomId,
|
room_id: &RoomId,
|
||||||
room_version_id: &RoomVersionId,
|
room_version_id: &RoomVersionId,
|
||||||
|
@ -946,8 +967,9 @@ impl Service {
|
||||||
res.ok().flatten()
|
res.ok().flatten()
|
||||||
}) {
|
}) {
|
||||||
Ok(new_state) => new_state,
|
Ok(new_state) => new_state,
|
||||||
Err(_) => {
|
Err(e) => {
|
||||||
return Err(Error::bad_database("State resolution failed, either an event could not be found or deserialization"));
|
warn!("State resolution on prev events failed, either an event could not be found or deserialization failed: {}", e);
|
||||||
|
return Err(Error::bad_database("State resolution failed, either an event could not be found or deserialization failed"));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -16,7 +16,7 @@ use ruma::{
|
||||||
};
|
};
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use tokio::sync::MutexGuard;
|
use tokio::sync::MutexGuard;
|
||||||
use tracing::warn;
|
use tracing::{info, warn};
|
||||||
|
|
||||||
use crate::{services, utils::calculate_hash, Error, PduEvent, Result};
|
use crate::{services, utils::calculate_hash, Error, PduEvent, Result};
|
||||||
|
|
||||||
|
@ -33,7 +33,7 @@ impl Service {
|
||||||
room_id: &RoomId,
|
room_id: &RoomId,
|
||||||
shortstatehash: u64,
|
shortstatehash: u64,
|
||||||
statediffnew: Arc<HashSet<CompressedStateEvent>>,
|
statediffnew: Arc<HashSet<CompressedStateEvent>>,
|
||||||
_statediffremoved: Arc<HashSet<CompressedStateEvent>>,
|
statediffremoved: Arc<HashSet<CompressedStateEvent>>,
|
||||||
state_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex
|
state_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
for event_id in statediffnew.iter().filter_map(|new| {
|
for event_id in statediffnew.iter().filter_map(|new| {
|
||||||
|
@ -49,6 +49,8 @@ impl Service {
|
||||||
None => continue,
|
None => continue,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
info!("New in state: {event_id}");
|
||||||
|
|
||||||
if pdu.get("type").and_then(|val| val.as_str()) != Some("m.room.member") {
|
if pdu.get("type").and_then(|val| val.as_str()) != Some("m.room.member") {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -90,6 +92,17 @@ impl Service {
|
||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for event_id in statediffremoved.iter().filter_map(|removed| {
|
||||||
|
services()
|
||||||
|
.rooms
|
||||||
|
.state_compressor
|
||||||
|
.parse_compressed_state_event(&removed)
|
||||||
|
.ok()
|
||||||
|
.map(|(_, id)| id)
|
||||||
|
}) {
|
||||||
|
info!("Removed from state: {event_id}");
|
||||||
|
}
|
||||||
|
|
||||||
services().rooms.state_cache.update_joined_count(room_id)?;
|
services().rooms.state_cache.update_joined_count(room_id)?;
|
||||||
|
|
||||||
self.db
|
self.db
|
||||||
|
|
Loading…
Add table
Reference in a new issue