fix: don't use recursion for prev events

This commit is contained in:
Timo Kösters 2021-08-14 21:30:14 +02:00
parent 1e3a8ca35d
commit 1d46569929
No known key found for this signature in database
GPG key ID: 356E705610F626D5

View file

@ -840,7 +840,7 @@ type AsyncRecursiveType<'a, T> = Pin<Box<dyn Future<Output = T> + 'a + Send>>;
/// 14. Use state resolution to find new room state /// 14. Use state resolution to find new room state
// We use some AsyncRecursiveType hacks here so we can call this async funtion recursively // We use some AsyncRecursiveType hacks here so we can call this async funtion recursively
#[tracing::instrument(skip(value, is_timeline_event, db, pub_key_map))] #[tracing::instrument(skip(value, is_timeline_event, db, pub_key_map))]
pub fn handle_incoming_pdu<'a>( pub async fn handle_incoming_pdu<'a>(
origin: &'a ServerName, origin: &'a ServerName,
event_id: &'a EventId, event_id: &'a EventId,
room_id: &'a RoomId, room_id: &'a RoomId,
@ -848,11 +848,7 @@ pub fn handle_incoming_pdu<'a>(
is_timeline_event: bool, is_timeline_event: bool,
db: &'a Database, db: &'a Database,
pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, String>>>, pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, String>>>,
) -> AsyncRecursiveType<'a, StdResult<Option<Vec<u8>>, String>> { ) -> StdResult<Option<Vec<u8>>, String> {
Box::pin(async move {
let start_time = Instant::now();
// TODO: For RoomVersion6 we must check that Raw<..> is canonical do we anywhere?: https://matrix.org/docs/spec/rooms/v6#canonical-json
match db.rooms.exists(&room_id) { match db.rooms.exists(&room_id) {
Ok(true) => {} Ok(true) => {}
_ => { _ => {
@ -865,6 +861,64 @@ pub fn handle_incoming_pdu<'a>(
return Ok(Some(pdu_id.to_vec())); return Ok(Some(pdu_id.to_vec()));
} }
let create_event = db
.rooms
.room_state_get(&room_id, &EventType::RoomCreate, "")
.map_err(|_| "Failed to ask database for event.".to_owned())?
.ok_or_else(|| "Failed to find create event in db.".to_owned())?;
let (incoming_pdu, val) = handle_outlier_pdu(origin, &create_event, event_id, room_id, value, db, pub_key_map).await?;
// 8. if not timeline event: stop
if !is_timeline_event
|| incoming_pdu.origin_server_ts
< db.rooms
.first_pdu_in_room(&room_id)
.map_err(|_| "Error loading first room event.".to_owned())?
.expect("Room exists")
.origin_server_ts
{
return Ok(None);
}
// 9. Fetch any missing prev events doing all checks listed here starting at 1. These are timeline events
let mut todo_outlier_stack = incoming_pdu.prev_events.clone();
let mut todo_timeline_stack = Vec::new();
while let Some(prev_event_id) = todo_outlier_stack.pop() {
if let Some((pdu, Some(json))) = fetch_and_handle_outliers(
db,
origin,
&[prev_event_id],
&create_event,
&room_id,
pub_key_map,
)
.await.pop() {
todo_timeline_stack.push((pdu, json));
}
}
while let Some(prev) = todo_timeline_stack.pop() {
upgrade_outlier_to_timeline_pdu(prev.0, prev.1, &create_event, origin, db, room_id, pub_key_map).await?;
}
upgrade_outlier_to_timeline_pdu(incoming_pdu, val, &create_event, origin, db, room_id, pub_key_map).await
}
fn handle_outlier_pdu<'a>(
origin: &'a ServerName,
create_event: &'a PduEvent,
event_id: &'a EventId,
room_id: &'a RoomId,
value: BTreeMap<String, CanonicalJsonValue>,
db: &'a Database,
pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, String>>>,
) -> AsyncRecursiveType<'a, StdResult<(Arc<PduEvent>, BTreeMap<String, CanonicalJsonValue>), String>> {
Box::pin(async move {
let start_time = Instant::now();
// TODO: For RoomVersion6 we must check that Raw<..> is canonical do we anywhere?: https://matrix.org/docs/spec/rooms/v6#canonical-json
// We go through all the signatures we see on the value and fetch the corresponding signing // We go through all the signatures we see on the value and fetch the corresponding signing
// keys // keys
fetch_required_signing_keys(&value, &pub_key_map, db) fetch_required_signing_keys(&value, &pub_key_map, db)
@ -873,11 +927,6 @@ pub fn handle_incoming_pdu<'a>(
// 2. Check signatures, otherwise drop // 2. Check signatures, otherwise drop
// 3. check content hash, redact if doesn't match // 3. check content hash, redact if doesn't match
let create_event = db
.rooms
.room_state_get(&room_id, &EventType::RoomCreate, "")
.map_err(|_| "Failed to ask database for event.".to_owned())?
.ok_or_else(|| "Failed to find create event in db.".to_owned())?;
let create_event_content = let create_event_content =
serde_json::from_value::<Raw<CreateEventContent>>(create_event.content.clone()) serde_json::from_value::<Raw<CreateEventContent>>(create_event.content.clone())
@ -924,13 +973,13 @@ pub fn handle_incoming_pdu<'a>(
// 5. Reject "due to auth events" if can't get all the auth events or some of the auth events are also rejected "due to auth events" // 5. Reject "due to auth events" if can't get all the auth events or some of the auth events are also rejected "due to auth events"
// EDIT: Step 5 is not applied anymore because it failed too often // EDIT: Step 5 is not applied anymore because it failed too often
debug!("Fetching auth events for {}", incoming_pdu.event_id); debug!("Fetching auth events for {}", incoming_pdu.event_id);
fetch_and_handle_events( fetch_and_handle_outliers(
db, db,
origin, origin,
&incoming_pdu.auth_events, &incoming_pdu.auth_events,
&create_event,
&room_id, &room_id,
pub_key_map, pub_key_map,
false,
) )
.await; .await;
@ -1013,37 +1062,20 @@ pub fn handle_incoming_pdu<'a>(
.map_err(|_| "Failed to add pdu as outlier.".to_owned())?; .map_err(|_| "Failed to add pdu as outlier.".to_owned())?;
debug!("Added pdu as outlier."); debug!("Added pdu as outlier.");
// 8. if not timeline event: stop Ok((incoming_pdu,val))
if !is_timeline_event })
|| incoming_pdu.origin_server_ts
< db.rooms
.first_pdu_in_room(&room_id)
.map_err(|_| "Error loading first room event.".to_owned())?
.expect("Room exists")
.origin_server_ts
{
let elapsed = start_time.elapsed();
warn!(
"Handling outlier event {} took {}m{}s",
event_id,
elapsed.as_secs() / 60,
elapsed.as_secs() % 60
);
return Ok(None);
}
// TODO: make not recursive }
// 9. Fetch any missing prev events doing all checks listed here starting at 1. These are timeline events
fetch_and_handle_events(
db,
origin,
&incoming_pdu.prev_events,
&room_id,
pub_key_map,
true,
)
.await;
async fn upgrade_outlier_to_timeline_pdu(
incoming_pdu: Arc<PduEvent>,
val: BTreeMap<String, CanonicalJsonValue>,
create_event: &PduEvent,
origin: &ServerName,
db: &Database,
room_id: &RoomId,
pub_key_map: &RwLock<BTreeMap<String, BTreeMap<String, String>>>,
) -> StdResult<Option<Vec<u8>>, String> {
// 10. Fetch missing state and auth chain events by calling /state_ids at backwards extremities // 10. Fetch missing state and auth chain events by calling /state_ids at backwards extremities
// doing all the checks in this list starting at 1. These are not timeline events. // doing all the checks in this list starting at 1. These are not timeline events.
@ -1065,17 +1097,17 @@ pub fn handle_incoming_pdu<'a>(
if let Some(Ok(state)) = state { if let Some(Ok(state)) = state {
warn!("Using cached state"); warn!("Using cached state");
let mut state = fetch_and_handle_events( let mut state = fetch_and_handle_outliers(
db, db,
origin, origin,
&state.into_iter().collect::<Vec<_>>(), &state.into_iter().collect::<Vec<_>>(),
&create_event,
&room_id, &room_id,
pub_key_map, pub_key_map,
false,
) )
.await .await
.into_iter() .into_iter()
.map(|pdu| { .map(|(pdu,_)| {
( (
( (
pdu.kind.clone(), pdu.kind.clone(),
@ -1119,18 +1151,18 @@ pub fn handle_incoming_pdu<'a>(
{ {
Ok(res) => { Ok(res) => {
debug!("Fetching state events at event."); debug!("Fetching state events at event.");
let state_vec = fetch_and_handle_events( let state_vec = fetch_and_handle_outliers(
&db, &db,
origin, origin,
&res.pdu_ids, &res.pdu_ids,
&create_event,
&room_id, &room_id,
pub_key_map, pub_key_map,
false,
) )
.await; .await;
let mut state = HashMap::new(); let mut state = HashMap::new();
for pdu in state_vec { for (pdu, _) in state_vec {
match state.entry((pdu.kind.clone(), pdu.state_key.clone().ok_or_else(|| "Found non-state pdu in state events.".to_owned())?)) { match state.entry((pdu.kind.clone(), pdu.state_key.clone().ok_or_else(|| "Found non-state pdu in state events.".to_owned())?)) {
Entry::Vacant(v) => { Entry::Vacant(v) => {
v.insert(pdu); v.insert(pdu);
@ -1153,13 +1185,13 @@ pub fn handle_incoming_pdu<'a>(
} }
debug!("Fetching auth chain events at event."); debug!("Fetching auth chain events at event.");
fetch_and_handle_events( fetch_and_handle_outliers(
&db, &db,
origin, origin,
&res.auth_chain_ids, &res.auth_chain_ids,
&create_event,
&room_id, &room_id,
pub_key_map, pub_key_map,
false,
) )
.await; .await;
@ -1175,6 +1207,28 @@ pub fn handle_incoming_pdu<'a>(
state_at_incoming_event.expect("we always set this to some above"); state_at_incoming_event.expect("we always set this to some above");
// 11. Check the auth of the event passes based on the state of the event // 11. Check the auth of the event passes based on the state of the event
let create_event_content =
serde_json::from_value::<Raw<CreateEventContent>>(create_event.content.clone())
.expect("Raw::from_value always works.")
.deserialize()
.map_err(|_| "Invalid PowerLevels event in db.".to_owned())?;
let room_version_id = &create_event_content.room_version;
let room_version = RoomVersion::new(room_version_id).expect("room version is supported");
// If the previous event was the create event special rules apply
let previous_create = if incoming_pdu.auth_events.len() == 1
&& incoming_pdu.prev_events == incoming_pdu.auth_events
{
db.rooms
.get_pdu(&incoming_pdu.auth_events[0])
.map_err(|e| e.to_string())?
.filter(|maybe_create| **maybe_create == *create_event)
} else {
None
};
if !state_res::event_auth::auth_check( if !state_res::event_auth::auth_check(
&room_version, &room_version,
&incoming_pdu, &incoming_pdu,
@ -1396,34 +1450,27 @@ pub fn handle_incoming_pdu<'a>(
// Event has passed all auth/stateres checks // Event has passed all auth/stateres checks
drop(state_lock); drop(state_lock);
let elapsed = start_time.elapsed();
warn!(
"Handling timeline event {} took {}m{}s",
event_id,
elapsed.as_secs() / 60,
elapsed.as_secs() % 60
);
Ok(pdu_id) Ok(pdu_id)
})
} }
/// Find the event and auth it. Once the event is validated (steps 1 - 8) /// Find the event and auth it. Once the event is validated (steps 1 - 8)
/// it is appended to the outliers Tree. /// it is appended to the outliers Tree.
/// ///
/// Returns pdu and if we fetched it over federation the raw json.
///
/// a. Look in the main timeline (pduid_pdu tree) /// a. Look in the main timeline (pduid_pdu tree)
/// b. Look at outlier pdu tree /// b. Look at outlier pdu tree
/// c. Ask origin server over federation /// c. Ask origin server over federation
/// d. TODO: Ask other servers over federation? /// d. TODO: Ask other servers over federation?
//#[tracing::instrument(skip(db, key_map, auth_cache))] //#[tracing::instrument(skip(db, key_map, auth_cache))]
pub(crate) fn fetch_and_handle_events<'a>( pub(crate) fn fetch_and_handle_outliers<'a>(
db: &'a Database, db: &'a Database,
origin: &'a ServerName, origin: &'a ServerName,
events: &'a [EventId], events: &'a [EventId],
create_event: &'a PduEvent,
room_id: &'a RoomId, room_id: &'a RoomId,
pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, String>>>, pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, String>>>,
are_timeline_events: bool, ) -> AsyncRecursiveType<'a, Vec<(Arc<PduEvent>, Option<BTreeMap<String, CanonicalJsonValue>>)>> {
) -> AsyncRecursiveType<'a, Vec<Arc<PduEvent>>> {
Box::pin(async move { Box::pin(async move {
let back_off = |id| match db.globals.bad_event_ratelimiter.write().unwrap().entry(id) { let back_off = |id| match db.globals.bad_event_ratelimiter.write().unwrap().entry(id) {
Entry::Vacant(e) => { Entry::Vacant(e) => {
@ -1449,16 +1496,12 @@ pub(crate) fn fetch_and_handle_events<'a>(
// a. Look in the main timeline (pduid_pdu tree) // a. Look in the main timeline (pduid_pdu tree)
// b. Look at outlier pdu tree // b. Look at outlier pdu tree
// (get_pdu checks both) // (get_pdu_json checks both)
let local_pdu = if are_timeline_events { let local_pdu = db.rooms.get_pdu(&id);
db.rooms.get_non_outlier_pdu(&id).map(|o| o.map(Arc::new))
} else {
db.rooms.get_pdu(&id)
};
let pdu = match local_pdu { let pdu = match local_pdu {
Ok(Some(pdu)) => { Ok(Some(pdu)) => {
trace!("Found {} in db", id); trace!("Found {} in db", id);
pdu (pdu, None)
} }
Ok(None) => { Ok(None) => {
// c. Ask origin server over federation // c. Ask origin server over federation
@ -1474,39 +1517,26 @@ pub(crate) fn fetch_and_handle_events<'a>(
{ {
Ok(res) => { Ok(res) => {
debug!("Got {} over federation", id); debug!("Got {} over federation", id);
let (event_id, mut value) = let (event_id, value) =
match crate::pdu::gen_event_id_canonical_json(&res.pdu) { match crate::pdu::gen_event_id_canonical_json(&res.pdu) {
Ok(t) => t, Ok(t) => t,
Err(_) => continue, Err(_) => continue,
}; };
// This will also fetch the auth chain // This will also fetch the auth chain
match handle_incoming_pdu( match handle_outlier_pdu(
origin, origin,
create_event,
&event_id, &event_id,
&room_id, &room_id,
value.clone(), value.clone(),
are_timeline_events,
db, db,
pub_key_map, pub_key_map,
) )
.await .await
{ {
Ok(_) => { Ok((pdu, json)) => {
value.insert( (pdu, Some(json))
"event_id".to_owned(),
CanonicalJsonValue::String(event_id.into()),
);
Arc::new(
serde_json::from_value(
serde_json::to_value(value)
.expect("canonicaljsonobject is valid value"),
)
.expect(
"This is possible because handle_incoming_pdu worked",
),
)
} }
Err(e) => { Err(e) => {
warn!("Authentication of event {} failed: {:?}", id, e); warn!("Authentication of event {} failed: {:?}", id, e);