Implement federation/v1/send_join

This commit is contained in:
Kurt Roeckx 2021-07-25 19:28:54 +02:00
parent 33481ec062
commit 48494c9464
2 changed files with 71 additions and 51 deletions

View file

@ -160,7 +160,8 @@ fn setup_rocket(config: Figment, data: Arc<RwLock<Database>>) -> rocket::Rocket<
server_server::get_room_state_route, server_server::get_room_state_route,
server_server::get_room_state_ids_route, server_server::get_room_state_ids_route,
server_server::create_join_event_template_route, server_server::create_join_event_template_route,
server_server::create_join_event_route, server_server::create_join_event_v1_route,
server_server::create_join_event_v2_route,
server_server::create_invite_route, server_server::create_invite_route,
server_server::get_devices_route, server_server::get_devices_route,
server_server::get_room_information_route, server_server::get_room_information_route,

View file

@ -2342,33 +2342,29 @@ pub fn create_join_event_template_route(
.into()) .into())
} }
#[cfg_attr( async fn create_join_event(
feature = "conduit_bin", db: &DatabaseGuard,
put("/_matrix/federation/v2/send_join/<_>/<_>", data = "<body>") room_id: &RoomId,
)] pdu: &Raw<ruma::events::pdu::Pdu>,
#[tracing::instrument(skip(db, body))] ) -> Result<RoomState> {
pub async fn create_join_event_route(
db: DatabaseGuard,
body: Ruma<create_join_event::v2::Request<'_>>,
) -> ConduitResult<create_join_event::v2::Response> {
if !db.globals.allow_federation() { if !db.globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled.")); return Err(Error::bad_config("Federation is disabled."));
} }
// We need to return the state prior to joining, let's keep a reference to that here // We need to return the state prior to joining, let's keep a reference to that here
let shortstatehash = let shortstatehash = db
db.rooms .rooms
.current_shortstatehash(&body.room_id)? .current_shortstatehash(&room_id)?
.ok_or(Error::BadRequest( .ok_or(Error::BadRequest(
ErrorKind::NotFound, ErrorKind::NotFound,
"Pdu state not found.", "Pdu state not found.",
))?; ))?;
let pub_key_map = RwLock::new(BTreeMap::new()); let pub_key_map = RwLock::new(BTreeMap::new());
// let mut auth_cache = EventMap::new(); // let mut auth_cache = EventMap::new();
// We do not add the event_id field to the pdu here because of signature and hashes checks // We do not add the event_id field to the pdu here because of signature and hashes checks
let (event_id, value) = match crate::pdu::gen_event_id_canonical_json(&body.pdu) { let (event_id, value) = match crate::pdu::gen_event_id_canonical_json(&pdu) {
Ok(t) => t, Ok(t) => t,
Err(_) => { Err(_) => {
// Event could not be converted to canonical json // Event could not be converted to canonical json
@ -2393,31 +2389,23 @@ pub async fn create_join_event_route(
.roomid_mutex_federation .roomid_mutex_federation
.write() .write()
.unwrap() .unwrap()
.entry(body.room_id.clone()) .entry(room_id.clone())
.or_default(), .or_default(),
); );
let mutex_lock = mutex.lock().await; let mutex_lock = mutex.lock().await;
let pdu_id = handle_incoming_pdu( let pdu_id = handle_incoming_pdu(&origin, &event_id, &room_id, value, true, &db, &pub_key_map)
&origin, .await
&event_id, .map_err(|e| {
&body.room_id, warn!("Error while handling incoming send join PDU: {}", e);
value, Error::BadRequest(
true, ErrorKind::InvalidParam,
&db, "Error while handling incoming PDU.",
&pub_key_map, )
) })?
.await .ok_or(Error::BadRequest(
.map_err(|e| {
warn!("Error while handling incoming send join PDU: {}", e);
Error::BadRequest(
ErrorKind::InvalidParam, ErrorKind::InvalidParam,
"Error while handling incoming PDU.", "Could not accept incoming PDU as timeline event.",
) ))?;
})?
.ok_or(Error::BadRequest(
ErrorKind::InvalidParam,
"Could not accept incoming PDU as timeline event.",
))?;
drop(mutex_lock); drop(mutex_lock);
let state_ids = db.rooms.state_full_ids(shortstatehash)?; let state_ids = db.rooms.state_full_ids(shortstatehash)?;
@ -2425,7 +2413,7 @@ pub async fn create_join_event_route(
for server in db for server in db
.rooms .rooms
.room_servers(&body.room_id) .room_servers(&room_id)
.filter_map(|r| r.ok()) .filter_map(|r| r.ok())
.filter(|server| &**server != db.globals.server_name()) .filter(|server| &**server != db.globals.server_name())
{ {
@ -2434,18 +2422,49 @@ pub async fn create_join_event_route(
db.flush()?; db.flush()?;
Ok(RoomState {
auth_chain: auth_chain_ids
.filter_map(|id| db.rooms.get_pdu_json(&id).ok().flatten())
.map(PduEvent::convert_to_outgoing_federation_event)
.collect(),
state: state_ids
.iter()
.filter_map(|id| db.rooms.get_pdu_json(&id).ok().flatten())
.map(PduEvent::convert_to_outgoing_federation_event)
.collect(),
})
}
#[cfg_attr(
feature = "conduit_bin",
put("/_matrix/federation/v1/send_join/<_>/<_>", data = "<body>")
)]
#[tracing::instrument(skip(db, body))]
pub async fn create_join_event_v1_route(
db: DatabaseGuard,
body: Ruma<create_join_event::v1::Request<'_>>,
) -> ConduitResult<create_join_event::v1::Response> {
let room_state = create_join_event(&db, &body.room_id, &body.pdu).await?;
Ok(create_join_event::v1::Response {
room_state: room_state,
}
.into())
}
#[cfg_attr(
feature = "conduit_bin",
put("/_matrix/federation/v2/send_join/<_>/<_>", data = "<body>")
)]
#[tracing::instrument(skip(db, body))]
pub async fn create_join_event_v2_route(
db: DatabaseGuard,
body: Ruma<create_join_event::v2::Request<'_>>,
) -> ConduitResult<create_join_event::v2::Response> {
let room_state = create_join_event(&db, &body.room_id, &body.pdu).await?;
Ok(create_join_event::v2::Response { Ok(create_join_event::v2::Response {
room_state: RoomState { room_state: room_state,
auth_chain: auth_chain_ids
.filter_map(|id| db.rooms.get_pdu_json(&id).ok().flatten())
.map(PduEvent::convert_to_outgoing_federation_event)
.collect(),
state: state_ids
.iter()
.filter_map(|id| db.rooms.get_pdu_json(&id).ok().flatten())
.map(PduEvent::convert_to_outgoing_federation_event)
.collect(),
},
} }
.into()) .into())
} }