fix double-deserialization in federation transaction handler.
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
97fc6c158f
commit
c4ebc2f1d1
1 changed files with 22 additions and 38 deletions
|
@ -213,7 +213,9 @@ pub fn parse_incoming_pdu(pdu: &RawJsonValue) -> Result<(OwnedEventId, Canonical
|
|||
.and_then(|id| RoomId::parse(id.as_str()?).ok())
|
||||
.ok_or(Error::BadRequest(ErrorKind::InvalidParam, "Invalid room id in pdu"))?;
|
||||
|
||||
let room_version_id = services().rooms.state.get_room_version(&room_id)?;
|
||||
let Ok(room_version_id) = services().rooms.state.get_room_version(&room_id) else {
|
||||
return Err(Error::Error(format!("Server is not in room {room_id}")));
|
||||
};
|
||||
|
||||
let Ok((event_id, value)) = gen_event_id_canonical_json(pdu, &room_version_id) else {
|
||||
// Event could not be converted to canonical json
|
||||
|
@ -222,6 +224,7 @@ pub fn parse_incoming_pdu(pdu: &RawJsonValue) -> Result<(OwnedEventId, Canonical
|
|||
"Could not convert event to canonical json.",
|
||||
));
|
||||
};
|
||||
|
||||
Ok((event_id, value, room_id))
|
||||
}
|
||||
|
||||
|
@ -236,10 +239,6 @@ pub async fn send_transaction_message_route(
|
|||
.as_ref()
|
||||
.expect("server is authenticated");
|
||||
|
||||
let mut resolved_map = BTreeMap::new();
|
||||
|
||||
let pub_key_map = RwLock::new(BTreeMap::new());
|
||||
|
||||
// This is all the auth_events that have been recursively fetched so they don't
|
||||
// have to be deserialized over and over again.
|
||||
// TODO: make this persist across requests but not in a DB Tree (in globals?)
|
||||
|
@ -248,47 +247,35 @@ pub async fn send_transaction_message_route(
|
|||
// maybe) all of the auth events that it references.
|
||||
// let mut auth_cache = EventMap::new();
|
||||
|
||||
let mut parsed_pdus = vec![];
|
||||
let mut parsed_pdus = Vec::with_capacity(body.pdus.len());
|
||||
for pdu in &body.pdus {
|
||||
let value: CanonicalJsonObject = serde_json::from_str(pdu.get()).map_err(|e| {
|
||||
warn!("Error parsing incoming event {:?}: {:?}", pdu, e);
|
||||
Error::BadServerResponse("Invalid PDU in server response")
|
||||
})?;
|
||||
let room_id: OwnedRoomId = value
|
||||
.get("room_id")
|
||||
.and_then(|id| RoomId::parse(id.as_str()?).ok())
|
||||
.ok_or(Error::BadRequest(ErrorKind::InvalidParam, "Invalid room id in pdu"))?;
|
||||
|
||||
if services().rooms.state.get_room_version(&room_id).is_err() {
|
||||
debug!("Server is not in room {room_id}");
|
||||
continue;
|
||||
}
|
||||
|
||||
let r = parse_incoming_pdu(pdu);
|
||||
let (event_id, value, room_id) = match r {
|
||||
parsed_pdus.push(match parse_incoming_pdu(pdu) {
|
||||
Ok(t) => t,
|
||||
Err(e) => {
|
||||
warn!("Could not parse PDU: {e}");
|
||||
info!("Full PDU: {:?}", &pdu);
|
||||
continue;
|
||||
},
|
||||
};
|
||||
parsed_pdus.push((event_id, value, room_id));
|
||||
});
|
||||
|
||||
// We do not add the event_id field to the pdu here because of signature
|
||||
// and hashes checks
|
||||
}
|
||||
|
||||
// We go through all the signatures we see on the PDUs and fetch the
|
||||
// corresponding signing keys
|
||||
services()
|
||||
.rooms
|
||||
.event_handler
|
||||
.fetch_required_signing_keys(parsed_pdus.iter().map(|(_event_id, event, _room_id)| event), &pub_key_map)
|
||||
.await
|
||||
.unwrap_or_else(|e| {
|
||||
warn!("Could not fetch all signatures for PDUs from {}: {:?}", sender_servername, e);
|
||||
});
|
||||
let pub_key_map = RwLock::new(BTreeMap::new());
|
||||
if !parsed_pdus.is_empty() {
|
||||
services()
|
||||
.rooms
|
||||
.event_handler
|
||||
.fetch_required_signing_keys(parsed_pdus.iter().map(|(_event_id, event, _room_id)| event), &pub_key_map)
|
||||
.await
|
||||
.unwrap_or_else(|e| {
|
||||
warn!("Could not fetch all signatures for PDUs from {}: {:?}", sender_servername, e);
|
||||
});
|
||||
}
|
||||
|
||||
let mut resolved_map = BTreeMap::new();
|
||||
for (event_id, value, room_id) in parsed_pdus {
|
||||
let mutex = Arc::clone(
|
||||
services()
|
||||
|
@ -312,12 +299,9 @@ pub async fn send_transaction_message_route(
|
|||
);
|
||||
drop(mutex_lock);
|
||||
|
||||
let elapsed = start_time.elapsed();
|
||||
debug!(
|
||||
"Handling transaction of event {} took {}m{}s",
|
||||
event_id,
|
||||
elapsed.as_secs() / 60,
|
||||
elapsed.as_secs() % 60
|
||||
elapsed = ?start_time.elapsed(),
|
||||
"Handled PDU {event_id}"
|
||||
);
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue