refactor: Rewrite backfill algorithm according to specification in more readable form
This commit is contained in:
parent
7f8f6b52d9
commit
0d14451bfb
1 changed files with 146 additions and 62 deletions
|
@ -43,12 +43,12 @@ use ruma::{
|
||||||
},
|
},
|
||||||
serde::{Base64, JsonObject, Raw},
|
serde::{Base64, JsonObject, Raw},
|
||||||
to_device::DeviceIdOrAllDevices,
|
to_device::DeviceIdOrAllDevices,
|
||||||
CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId,
|
CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId,
|
||||||
OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomId, ServerName, UInt,
|
OwnedRoomId, OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomId, ServerName, UInt,
|
||||||
};
|
};
|
||||||
use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
|
use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
|
||||||
use std::{
|
use std::{
|
||||||
collections::{BTreeMap, HashSet},
|
collections::{BTreeMap, HashSet, VecDeque},
|
||||||
fmt::Debug,
|
fmt::Debug,
|
||||||
mem,
|
mem,
|
||||||
net::{IpAddr, SocketAddr},
|
net::{IpAddr, SocketAddr},
|
||||||
|
@ -1047,10 +1047,10 @@ pub async fn get_missing_events_route(
|
||||||
Ok(get_missing_events::v1::Response { events })
|
Ok(get_missing_events::v1::Response { events })
|
||||||
}
|
}
|
||||||
|
|
||||||
// Recursively fetch events starting from `latest_events`, going backwards
|
/// Fetch events starting from `latest_events`, going backwards
|
||||||
// through each event's `prev_events` until reaching the `earliest_events`.
|
/// through each event's `prev_events` until reaching the `earliest_events`.
|
||||||
//
|
///
|
||||||
// Used by the federation /backfill and /get_missing_events routes.
|
/// Used by the federation /backfill and /get_missing_events routes.
|
||||||
fn get_missing_events(
|
fn get_missing_events(
|
||||||
sender_servername: &ServerName,
|
sender_servername: &ServerName,
|
||||||
room_id: &RoomId,
|
room_id: &RoomId,
|
||||||
|
@ -1075,72 +1075,156 @@ fn get_missing_events(
|
||||||
.filter(|member| member.server_name() == sender_servername)
|
.filter(|member| member.server_name() == sender_servername)
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let limit = u64::from(limit) as usize;
|
let event_filter = |event_id: &EventId| {
|
||||||
|
services()
|
||||||
|
.rooms
|
||||||
|
.state_accessor
|
||||||
|
.server_can_see_event(
|
||||||
|
sender_servername,
|
||||||
|
current_server_members.as_slice(),
|
||||||
|
event_id,
|
||||||
|
)
|
||||||
|
.unwrap_or_default()
|
||||||
|
};
|
||||||
|
|
||||||
let mut queued_events = latest_events.to_owned();
|
let pdu_filter = |pdu: &CanonicalJsonObject| {
|
||||||
let mut events = Vec::new();
|
let event_room_id = pdu
|
||||||
|
.get("room_id")
|
||||||
|
.and_then(|val| val.as_str())
|
||||||
|
.and_then(|room_id_str| <&RoomId>::try_from(room_id_str).ok());
|
||||||
|
|
||||||
let mut stop_at_events = HashSet::with_capacity(limit);
|
match event_room_id {
|
||||||
stop_at_events.extend(earliest_events.iter().cloned());
|
Some(event_room_id) => {
|
||||||
|
let valid_event = event_room_id != room_id;
|
||||||
|
if !valid_event {
|
||||||
|
error!(?room_id, ?event_room_id, "An evil event detected");
|
||||||
|
}
|
||||||
|
valid_event
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
error!(?pdu, "Can't extract valid `room_id` from pdu");
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
let mut i = 0;
|
#[inline]
|
||||||
while i < queued_events.len() && events.len() < limit {
|
fn get_pdu(event: &EventId) -> Option<CanonicalJsonObject> {
|
||||||
if stop_at_events.contains(&queued_events[i]) {
|
services()
|
||||||
i += 1;
|
.rooms
|
||||||
|
.timeline
|
||||||
|
.get_pdu_json(event)
|
||||||
|
.unwrap_or_default()
|
||||||
|
}
|
||||||
|
|
||||||
|
let events = linearize_previous_events(
|
||||||
|
latest_events.into_iter().cloned(),
|
||||||
|
earliest_events.into_iter().cloned(),
|
||||||
|
limit,
|
||||||
|
get_pdu,
|
||||||
|
event_filter,
|
||||||
|
pdu_filter,
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(events)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Unwinds previous events by doing a breadth-first walk from given roots
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
///
|
||||||
|
/// * `roots`: Starting point to unwind event history
|
||||||
|
/// * `excluded`: Skipped events
|
||||||
|
/// * `limit`: How many events to extract
|
||||||
|
/// * `pdu_extractor`: Closure to extract PDU for given event_id, for example, from DB.
|
||||||
|
/// * `event_filter`: Closure to filter event by it's visiblity. It may or may not hit DB.
|
||||||
|
/// * `pdu_filter`: Closure to get basic validation against malformed PDUs.
|
||||||
|
///
|
||||||
|
/// # Returns
|
||||||
|
///
|
||||||
|
/// The previous events for given roots, without any `excluded` events, up to the provided `limit`.
|
||||||
|
///
|
||||||
|
/// # Note
|
||||||
|
///
|
||||||
|
/// In matrix specification, «Server-Server API», paragraph 8 there is no mention of previous events for excluded events.
|
||||||
|
/// Therefore, algorithm below excludes **only** events itself, but allows to process their history.
|
||||||
|
fn linearize_previous_events<E, L, F, V, P>(
|
||||||
|
roots: E,
|
||||||
|
excluded: E,
|
||||||
|
limit: L,
|
||||||
|
pdu_extractor: P,
|
||||||
|
event_filter: F,
|
||||||
|
pdu_filter: V,
|
||||||
|
) -> Vec<Box<RawJsonValue>>
|
||||||
|
where
|
||||||
|
E: IntoIterator<Item = OwnedEventId>,
|
||||||
|
F: Fn(&EventId) -> bool,
|
||||||
|
L: Into<u64>,
|
||||||
|
V: Fn(&CanonicalJsonObject) -> bool,
|
||||||
|
P: Fn(&EventId) -> Option<CanonicalJsonObject>,
|
||||||
|
{
|
||||||
|
let limit = limit.into() as usize;
|
||||||
|
assert!(limit > 0, "Limit should be > 0");
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn get_previous_events(pdu: &CanonicalJsonObject) -> Option<Vec<OwnedEventId>> {
|
||||||
|
match pdu.get("prev_events") {
|
||||||
|
None => {
|
||||||
|
error!(?pdu, "A stored event has no 'prev_events' field");
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
Some(prev_events) => {
|
||||||
|
let val = prev_events.clone().into();
|
||||||
|
let events = serde_json::from_value::<Vec<OwnedEventId>>(val);
|
||||||
|
if let Err(error) = events {
|
||||||
|
error!(?prev_events, ?error, "Broken 'prev_events' field");
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
Some(events.unwrap_or_default())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut visited: HashSet<OwnedEventId> = Default::default();
|
||||||
|
let mut history: Vec<Box<RawJsonValue>> = Default::default();
|
||||||
|
let mut queue: VecDeque<OwnedEventId> = Default::default();
|
||||||
|
let excluded: HashSet<_> = excluded.into_iter().collect();
|
||||||
|
|
||||||
|
// Add all roots into processing queue
|
||||||
|
for root in roots {
|
||||||
|
queue.push_back(root);
|
||||||
|
}
|
||||||
|
|
||||||
|
while let Some(current_event) = queue.pop_front() {
|
||||||
|
// Return all collected events if reached limit
|
||||||
|
if history.len() >= limit {
|
||||||
|
return history;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip an entire branch containing incorrect events
|
||||||
|
if !event_filter(¤t_event) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(pdu) = services().rooms.timeline.get_pdu_json(&queued_events[i])? {
|
// Process PDU from a current event if it exists and valid
|
||||||
let room_id_str = pdu
|
if let Some(pdu) = pdu_extractor(¤t_event).filter(&pdu_filter) {
|
||||||
.get("room_id")
|
if !&excluded.contains(¤t_event) {
|
||||||
.and_then(|val| val.as_str())
|
history.push(PduEvent::convert_to_outgoing_federation_event(pdu.clone()));
|
||||||
.ok_or_else(|| Error::bad_database("Invalid event in database"))?;
|
|
||||||
|
|
||||||
let event_room_id = <&RoomId>::try_from(room_id_str)
|
|
||||||
.map_err(|_| Error::bad_database("Invalid room id field in event in database"))?;
|
|
||||||
|
|
||||||
if event_room_id != room_id {
|
|
||||||
warn!(
|
|
||||||
?room_id,
|
|
||||||
evil_event = ?queued_events[i],
|
|
||||||
"Evil event detected while searching in room"
|
|
||||||
);
|
|
||||||
return Err(Error::BadRequest(
|
|
||||||
ErrorKind::InvalidParam,
|
|
||||||
"Evil event detected",
|
|
||||||
));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let event_is_visible = services().rooms.state_accessor.server_can_see_event(
|
// Fetch previous events, if they exists
|
||||||
sender_servername,
|
if let Some(previous_events) = get_previous_events(&pdu) {
|
||||||
current_server_members.as_slice(),
|
for previous_event in previous_events {
|
||||||
&queued_events[i],
|
if !visited.contains(&previous_event) {
|
||||||
)?;
|
visited.insert(previous_event.clone());
|
||||||
|
queue.push_back(previous_event);
|
||||||
if !event_is_visible {
|
}
|
||||||
i += 1;
|
}
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Don't send this event again if it comes through some other
|
|
||||||
// event's prev_events.
|
|
||||||
stop_at_events.insert(queued_events[i].clone());
|
|
||||||
|
|
||||||
let prev_events = pdu
|
|
||||||
.get("prev_events")
|
|
||||||
.ok_or_else(|| Error::bad_database("Event in db has no prev_events field."))?;
|
|
||||||
|
|
||||||
queued_events.extend_from_slice(
|
|
||||||
&serde_json::from_value::<Vec<OwnedEventId>>(prev_events.clone().into()).map_err(
|
|
||||||
|_| Error::bad_database("Invalid prev_events content in pdu in db."),
|
|
||||||
)?,
|
|
||||||
);
|
|
||||||
events.push(PduEvent::convert_to_outgoing_federation_event(pdu));
|
|
||||||
}
|
}
|
||||||
i += 1;
|
|
||||||
}
|
}
|
||||||
|
// All done, return collected events
|
||||||
Ok(events)
|
history
|
||||||
}
|
}
|
||||||
|
|
||||||
/// # `GET /_matrix/federation/v1/event_auth/{roomId}/{eventId}`
|
/// # `GET /_matrix/federation/v1/event_auth/{roomId}/{eventId}`
|
||||||
|
|
Loading…
Add table
Reference in a new issue