feat: send invites over federation

This commit is contained in:
Timo Kösters 2021-04-25 14:10:07 +02:00
parent abe176e3d0
commit 58463bba93
No known key found for this signature in database
GPG key ID: 24DA7517711A2BA4
5 changed files with 373 additions and 157 deletions

View file

@ -5,6 +5,7 @@ use crate::{
server_server, utils, ConduitResult, Database, Error, Result, Ruma,
};
use log::{error, warn};
use member::{MemberEventContent, MembershipState};
use rocket::futures;
use ruma::{
api::{
@ -16,16 +17,21 @@ use ruma::{
unban_user, IncomingThirdPartySigned,
},
},
federation,
federation::{self, membership::create_invite},
},
events::{
pdu::Pdu,
room::{create::CreateEventContent, member},
EventType,
},
events::{pdu::Pdu, room::member, EventType},
serde::{to_canonical_value, CanonicalJsonObject, Raw},
EventId, RoomId, RoomVersionId, ServerName, UserId,
uint, EventId, RoomId, RoomVersionId, ServerName, UserId,
};
use state_res::EventMap;
use std::{
collections::{BTreeMap, HashSet},
convert::TryFrom,
sync::RwLock,
convert::{TryFrom, TryInto},
sync::{Arc, RwLock},
};
#[cfg(feature = "conduit_bin")]
@ -152,35 +158,8 @@ pub async fn invite_user_route(
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
if let invite_user::IncomingInvitationRecipient::UserId { user_id } = &body.recipient {
if body.room_id.server_name() != db.globals.server_name() {
return Err(Error::BadRequest(
ErrorKind::Forbidden,
"Inviting users from other homeservers is not implemented yet.",
));
}
db.rooms.build_and_append_pdu(
PduBuilder {
event_type: EventType::RoomMember,
content: serde_json::to_value(member::MemberEventContent {
membership: member::MembershipState::Invite,
displayname: db.users.displayname(&user_id)?,
avatar_url: db.users.avatar_url(&user_id)?,
is_direct: None,
third_party_invite: None,
})
.expect("event is valid, we just created it"),
unsigned: None,
state_key: Some(user_id.to_string()),
redacts: None,
},
&sender_user,
&body.room_id,
&db,
)?;
invite_helper(sender_user, user_id, &body.room_id, &db, false).await?;
db.flush().await?;
Ok(invite_user::Response.into())
} else {
Err(Error::BadRequest(ErrorKind::NotFound, "User not found."))
@ -744,3 +723,241 @@ async fn validate_and_add_event_id(
Ok((event_id, value))
}
pub async fn invite_helper(
sender_user: &UserId,
user_id: &UserId,
room_id: &RoomId,
db: &Database,
is_direct: bool,
) -> Result<()> {
if user_id.server_name() != db.globals.server_name() {
let prev_events = db
.rooms
.get_pdu_leaves(room_id)?
.into_iter()
.take(20)
.collect::<Vec<_>>();
let create_event = db
.rooms
.room_state_get(room_id, &EventType::RoomCreate, "")?;
let create_event_content = create_event
.as_ref()
.map(|create_event| {
Ok::<_, Error>(
serde_json::from_value::<Raw<CreateEventContent>>(create_event.content.clone())
.expect("Raw::from_value always works.")
.deserialize()
.map_err(|_| Error::bad_database("Invalid PowerLevels event in db."))?,
)
})
.transpose()?;
let create_prev_event = if prev_events.len() == 1
&& Some(&prev_events[0]) == create_event.as_ref().map(|c| &c.event_id)
{
create_event.map(Arc::new)
} else {
None
};
// If there was no create event yet, assume we are creating a version 6 room right now
let room_version = create_event_content.map_or(RoomVersionId::Version6, |create_event| {
create_event.room_version
});
let content = serde_json::to_value(MemberEventContent {
avatar_url: None,
displayname: None,
is_direct: Some(is_direct),
membership: MembershipState::Invite,
third_party_invite: None,
})
.expect("member event is valid value");
let state_key = user_id.to_string();
let kind = EventType::RoomMember;
let auth_events =
db.rooms
.get_auth_events(room_id, &kind, &sender_user, Some(&state_key), &content)?;
// Our depth is the maximum depth of prev_events + 1
let depth = prev_events
.iter()
.filter_map(|event_id| Some(db.rooms.get_pdu(event_id).ok()??.depth))
.max()
.unwrap_or_else(|| uint!(0))
+ uint!(1);
let mut unsigned = BTreeMap::new();
if let Some(prev_pdu) = db.rooms.room_state_get(room_id, &kind, &state_key)? {
unsigned.insert("prev_content".to_owned(), prev_pdu.content);
unsigned.insert(
"prev_sender".to_owned(),
serde_json::to_value(prev_pdu.sender).expect("UserId::to_value always works"),
);
}
let pdu = PduEvent {
event_id: ruma::event_id!("$thiswillbefilledinlater"),
room_id: room_id.clone(),
sender: sender_user.clone(),
origin_server_ts: utils::millis_since_unix_epoch()
.try_into()
.expect("time is valid"),
kind,
content,
state_key: Some(state_key),
prev_events,
depth,
auth_events: auth_events
.iter()
.map(|(_, pdu)| pdu.event_id.clone())
.collect(),
redacts: None,
unsigned,
hashes: ruma::events::pdu::EventHash {
sha256: "aaa".to_owned(),
},
signatures: BTreeMap::new(),
};
let auth_check = state_res::auth_check(
&room_version,
&Arc::new(pdu.clone()),
create_prev_event,
&auth_events,
None, // TODO: third_party_invite
)
.map_err(|e| {
error!("{:?}", e);
Error::bad_database("Auth check failed.")
})?;
if !auth_check {
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Event is not authorized.",
));
}
// Hash and sign
let mut pdu_json =
utils::to_canonical_object(&pdu).expect("event is valid, we just created it");
pdu_json.remove("event_id");
// Add origin because synapse likes that (and it's required in the spec)
pdu_json.insert(
"origin".to_owned(),
to_canonical_value(db.globals.server_name())
.expect("server name is a valid CanonicalJsonValue"),
);
ruma::signatures::hash_and_sign_event(
db.globals.server_name().as_str(),
db.globals.keypair(),
&mut pdu_json,
&room_version,
)
.expect("event is valid, we just created it");
let invite_room_state = db.rooms.calculate_invite_state(&pdu)?;
let response = db
.sending
.send_federation_request(
&db.globals,
user_id.server_name(),
create_invite::v2::Request {
room_id: room_id.clone(),
event_id: ruma::event_id!("$receivingservershouldsetthis"),
room_version: RoomVersionId::Version6,
event: PduEvent::convert_to_outgoing_federation_event(pdu_json),
invite_room_state,
},
)
.await?;
let pub_key_map = RwLock::new(BTreeMap::new());
let mut auth_cache = EventMap::new();
// We do not add the event_id field to the pdu here because of signature and hashes checks
let (event_id, value) = match crate::pdu::gen_event_id_canonical_json(&response.event) {
Ok(t) => t,
Err(_) => {
// Event could not be converted to canonical json
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Could not convert event to canonical json.",
));
}
};
let origin = serde_json::from_value::<Box<ServerName>>(
serde_json::to_value(value.get("origin").ok_or(Error::BadRequest(
ErrorKind::InvalidParam,
"Event needs an origin field.",
))?)
.expect("CanonicalJson is valid json value"),
)
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Origin field is invalid."))?;
let pdu_id = server_server::handle_incoming_pdu(
&origin,
&event_id,
value,
true,
&db,
&pub_key_map,
&mut auth_cache,
)
.await
.map_err(|_| {
Error::BadRequest(
ErrorKind::InvalidParam,
"Error while handling incoming PDU.",
)
})?
.ok_or(Error::BadRequest(
ErrorKind::InvalidParam,
"Could not accept incoming PDU as timeline event.",
))?;
for server in db
.rooms
.room_servers(room_id)
.filter_map(|r| r.ok())
.filter(|server| &**server != db.globals.server_name())
{
db.sending.send_pdu(&server, &pdu_id)?;
}
return Ok(());
}
db.rooms.build_and_append_pdu(
PduBuilder {
event_type: EventType::RoomMember,
content: serde_json::to_value(member::MemberEventContent {
membership: member::MembershipState::Invite,
displayname: db.users.displayname(&user_id)?,
avatar_url: db.users.avatar_url(&user_id)?,
is_direct: None,
third_party_invite: None,
})
.expect("event is valid, we just created it"),
unsigned: None,
state_key: Some(user_id.to_string()),
redacts: None,
},
&sender_user,
room_id,
&db,
)?;
Ok(())
}

View file

@ -30,41 +30,48 @@ pub async fn set_displayname_route(
.set_displayname(&sender_user, body.displayname.clone())?;
// Send a new membership event and presence update into all joined rooms
for room_id in db.rooms.rooms_joined(&sender_user) {
let room_id = room_id?;
db.rooms.build_and_append_pdu(
PduBuilder {
event_type: EventType::RoomMember,
content: serde_json::to_value(ruma::events::room::member::MemberEventContent {
displayname: body.displayname.clone(),
..serde_json::from_value::<Raw<_>>(
db.rooms
.room_state_get(
&room_id,
&EventType::RoomMember,
&sender_user.to_string(),
)?
.ok_or_else(|| {
Error::bad_database(
"Tried to send displayname update for user not in the room.",
)
})?
.content
.clone(),
)
.expect("from_value::<Raw<..>> can never fail")
.deserialize()
.map_err(|_| Error::bad_database("Database contains invalid PDU."))?
})
.expect("event is valid, we just created it"),
unsigned: None,
state_key: Some(sender_user.to_string()),
redacts: None,
},
&sender_user,
&room_id,
&db,
)?;
for (pdu_builder, room_id) in db
.rooms
.rooms_joined(&sender_user)
.filter_map(|r| r.ok())
.map(|room_id| {
Ok::<_, Error>((
PduBuilder {
event_type: EventType::RoomMember,
content: serde_json::to_value(ruma::events::room::member::MemberEventContent {
displayname: body.displayname.clone(),
..serde_json::from_value::<Raw<_>>(
db.rooms
.room_state_get(
&room_id,
&EventType::RoomMember,
&sender_user.to_string(),
)?
.ok_or_else(|| {
Error::bad_database(
"Tried to send displayname update for user not in the room.",
)
})?
.content
.clone(),
)
.expect("from_value::<Raw<..>> can never fail")
.deserialize()
.map_err(|_| Error::bad_database("Database contains invalid PDU."))?
})
.expect("event is valid, we just created it"),
unsigned: None,
state_key: Some(sender_user.to_string()),
redacts: None,
},
room_id,
))
})
.filter_map(|r| r.ok())
{
let _ = db
.rooms
.build_and_append_pdu(pdu_builder, &sender_user, &room_id, &db);
// Presence update
db.rooms.edus.update_presence(
@ -124,41 +131,48 @@ pub async fn set_avatar_url_route(
.set_avatar_url(&sender_user, body.avatar_url.clone())?;
// Send a new membership event and presence update into all joined rooms
for room_id in db.rooms.rooms_joined(&sender_user) {
let room_id = room_id?;
db.rooms.build_and_append_pdu(
PduBuilder {
event_type: EventType::RoomMember,
content: serde_json::to_value(ruma::events::room::member::MemberEventContent {
avatar_url: body.avatar_url.clone(),
..serde_json::from_value::<Raw<_>>(
db.rooms
.room_state_get(
&room_id,
&EventType::RoomMember,
&sender_user.to_string(),
)?
.ok_or_else(|| {
Error::bad_database(
"Tried to send avatar url update for user not in the room.",
)
})?
.content
.clone(),
)
.expect("from_value::<Raw<..>> can never fail")
.deserialize()
.map_err(|_| Error::bad_database("Database contains invalid PDU."))?
})
.expect("event is valid, we just created it"),
unsigned: None,
state_key: Some(sender_user.to_string()),
redacts: None,
},
&sender_user,
&room_id,
&db,
)?;
for (pdu_builder, room_id) in db
.rooms
.rooms_joined(&sender_user)
.filter_map(|r| r.ok())
.map(|room_id| {
Ok::<_, Error>((
PduBuilder {
event_type: EventType::RoomMember,
content: serde_json::to_value(ruma::events::room::member::MemberEventContent {
avatar_url: body.avatar_url.clone(),
..serde_json::from_value::<Raw<_>>(
db.rooms
.room_state_get(
&room_id,
&EventType::RoomMember,
&sender_user.to_string(),
)?
.ok_or_else(|| {
Error::bad_database(
"Tried to send displayname update for user not in the room.",
)
})?
.content
.clone(),
)
.expect("from_value::<Raw<..>> can never fail")
.deserialize()
.map_err(|_| Error::bad_database("Database contains invalid PDU."))?
})
.expect("event is valid, we just created it"),
unsigned: None,
state_key: Some(sender_user.to_string()),
redacts: None,
},
room_id,
))
})
.filter_map(|r| r.ok())
{
let _ = db
.rooms
.build_and_append_pdu(pdu_builder, &sender_user, &room_id, &db);
// Presence update
db.rooms.edus.update_presence(

View file

@ -1,4 +1,5 @@
use super::State;
use crate::client_server::invite_helper;
use crate::{pdu::PduBuilder, ConduitResult, Database, Error, Ruma};
use log::info;
use ruma::{
@ -269,26 +270,8 @@ pub async fn create_room_route(
}
// 7. Events implied by invite (and TODO: invite_3pid)
for user in &body.invite {
db.rooms.build_and_append_pdu(
PduBuilder {
event_type: EventType::RoomMember,
content: serde_json::to_value(member::MemberEventContent {
membership: member::MembershipState::Invite,
displayname: db.users.displayname(&user)?,
avatar_url: db.users.avatar_url(&user)?,
is_direct: Some(body.is_direct),
third_party_invite: None,
})
.expect("event is valid, we just created it"),
unsigned: None,
state_key: Some(user.to_string()),
redacts: None,
},
&sender_user,
&room_id,
&db,
)?;
for user_id in &body.invite {
let _ = invite_helper(sender_user, user_id, &room_id, &db, body.is_direct).await;
}
// Homeserver specific stuff

View file

@ -809,39 +809,7 @@ impl Rooms {
let invite_state = match membership {
member::MembershipState::Invite => {
let mut state = Vec::new();
// Add recommended events
if let Some(e) =
self.room_state_get(&pdu.room_id, &EventType::RoomJoinRules, "")?
{
state.push(e.to_stripped_state_event());
}
if let Some(e) = self.room_state_get(
&pdu.room_id,
&EventType::RoomCanonicalAlias,
"",
)? {
state.push(e.to_stripped_state_event());
}
if let Some(e) =
self.room_state_get(&pdu.room_id, &EventType::RoomAvatar, "")?
{
state.push(e.to_stripped_state_event());
}
if let Some(e) =
self.room_state_get(&pdu.room_id, &EventType::RoomName, "")?
{
state.push(e.to_stripped_state_event());
}
if let Some(e) = self.room_state_get(
&pdu.room_id,
&EventType::RoomMember,
pdu.sender.as_str(),
)? {
state.push(e.to_stripped_state_event());
}
state.push(pdu.to_stripped_state_event());
let state = self.calculate_invite_state(pdu)?;
Some(state)
}
@ -1184,6 +1152,40 @@ impl Rooms {
}
}
pub fn calculate_invite_state(
&self,
invite_event: &PduEvent,
) -> Result<Vec<Raw<AnyStrippedStateEvent>>> {
let mut state = Vec::new();
// Add recommended events
if let Some(e) =
self.room_state_get(&invite_event.room_id, &EventType::RoomJoinRules, "")?
{
state.push(e.to_stripped_state_event());
}
if let Some(e) =
self.room_state_get(&invite_event.room_id, &EventType::RoomCanonicalAlias, "")?
{
state.push(e.to_stripped_state_event());
}
if let Some(e) = self.room_state_get(&invite_event.room_id, &EventType::RoomAvatar, "")? {
state.push(e.to_stripped_state_event());
}
if let Some(e) = self.room_state_get(&invite_event.room_id, &EventType::RoomName, "")? {
state.push(e.to_stripped_state_event());
}
if let Some(e) = self.room_state_get(
&invite_event.room_id,
&EventType::RoomMember,
invite_event.sender.as_str(),
)? {
state.push(e.to_stripped_state_event());
}
state.push(invite_event.to_stripped_state_event());
Ok(state)
}
pub fn set_room_state(&self, room_id: &RoomId, shortstatehash: u64) -> Result<()> {
self.roomid_shortstatehash
.insert(room_id.as_bytes(), &shortstatehash.to_be_bytes())?;

View file

@ -689,7 +689,7 @@ type AsyncRecursiveResult<'a, T, E> = Pin<Box<dyn Future<Output = StdResult<T, E
/// it
/// 14. Use state resolution to find new room state
// We use some AsyncRecursiveResult hacks here so we can call this async funtion recursively
fn handle_incoming_pdu<'a>(
pub fn handle_incoming_pdu<'a>(
origin: &'a ServerName,
event_id: &'a EventId,
value: BTreeMap<String, CanonicalJsonValue>,