From d51f8a6c550bd0102b1d6d6de1de90f67f1a46fd Mon Sep 17 00:00:00 2001 From: Andrei Vasiliu Date: Sat, 15 Jan 2022 19:13:17 +0200 Subject: [PATCH] feat: Add federation backfill and event visibility Co-authored-by: Nyaaori <+@nyaaori.cat> --- src/api/server_server.rs | 106 ++++++++++++++++-- .../key_value/rooms/state_accessor.rs | 37 +++++- src/main.rs | 1 + src/service/rooms/state_accessor/data.rs | 5 +- src/service/rooms/state_accessor/mod.rs | 12 +- 5 files changed, 150 insertions(+), 11 deletions(-) diff --git a/src/api/server_server.rs b/src/api/server_server.rs index b7f88078..63a3a57e 100644 --- a/src/api/server_server.rs +++ b/src/api/server_server.rs @@ -12,6 +12,7 @@ use ruma::{ client::error::{Error as RumaError, ErrorKind}, federation::{ authorization::get_event_authorization, + backfill::get_backfill, device::get_devices::{self, v1::UserDevice}, directory::{get_public_rooms, get_public_rooms_filtered}, discovery::{get_server_keys, get_server_version, ServerSigningKeys, VerifyKey}, @@ -43,11 +44,11 @@ use ruma::{ serde::{Base64, JsonObject, Raw}, to_device::DeviceIdOrAllDevices, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, - OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomId, ServerName, + OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomId, ServerName, UInt, }; use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; use std::{ - collections::BTreeMap, + collections::{BTreeMap, HashSet}, fmt::Debug, mem, net::{IpAddr, SocketAddr}, @@ -952,6 +953,53 @@ pub async fn get_event_route( }) } +/// # `GET /_matrix/federation/v1/backfill/` +/// +/// Retrieves events from before the sender joined the room, if the room's +/// history visibility allows. +pub async fn get_backfill_route( + body: Ruma, +) -> Result { + if !services().globals.allow_federation() { + return Err(Error::bad_config("Federation is disabled.")); + } + + let sender_servername = body + .sender_servername + .as_ref() + .expect("server is authenticated"); + + info!("Got backfill request from: {}", sender_servername); + + if !services() + .rooms + .state_cache + .server_in_room(sender_servername, &body.room_id)? + { + return Err(Error::BadRequest( + ErrorKind::Forbidden, + "Server is not in room.", + )); + } + + let origin = services().globals.server_name().to_owned(); + let earliest_events = &[]; + + let events = get_missing_events( + sender_servername, + &body.room_id, + earliest_events, + &body.v, + body.limit, + )?; + + Ok(get_backfill::v1::Response { + origin, + origin_server_ts: MilliSecondsSinceUnixEpoch::now(), + pdus: events, + }) +} + /// # `POST /_matrix/federation/v1/get_missing_events/{roomId}` /// /// Retrieves events that the sender is missing. @@ -983,11 +1031,43 @@ pub async fn get_missing_events_route( .event_handler .acl_check(sender_servername, &body.room_id)?; - let mut queued_events = body.latest_events.clone(); + let events = get_missing_events( + sender_servername, + &body.room_id, + &body.earliest_events, + &body.latest_events, + body.limit, + )?; + + Ok(get_missing_events::v1::Response { events }) +} + +// Recursively fetch events starting from `latest_events`, going backwards +// through each event's `prev_events` until reaching the `earliest_events`. +// +// Used by the federation /backfill and /get_missing_events routes. +fn get_missing_events( + sender_servername: &ServerName, + room_id: &RoomId, + earliest_events: &[OwnedEventId], + latest_events: &Vec, + limit: UInt, +) -> Result>> { + let limit = u64::from(limit) as usize; + + let mut queued_events = latest_events.clone(); let mut events = Vec::new(); + let mut stop_at_events = HashSet::with_capacity(limit); + stop_at_events.extend(earliest_events.iter().cloned()); + let mut i = 0; - while i < queued_events.len() && events.len() < u64::from(body.limit) as usize { + while i < queued_events.len() && events.len() < limit { + if stop_at_events.contains(&queued_events[i]) { + i += 1; + continue; + } + if let Some(pdu) = services().rooms.timeline.get_pdu_json(&queued_events[i])? { let room_id_str = pdu .get("room_id") @@ -997,10 +1077,10 @@ pub async fn get_missing_events_route( let event_room_id = <&RoomId>::try_from(room_id_str) .map_err(|_| Error::bad_database("Invalid room id field in event in database"))?; - if event_room_id != body.room_id { + if event_room_id != room_id { warn!( "Evil event detected: Event {} found while searching in room {}", - queued_events[i], body.room_id + queued_events[i], room_id ); return Err(Error::BadRequest( ErrorKind::InvalidParam, @@ -1008,10 +1088,20 @@ pub async fn get_missing_events_route( )); } - if body.earliest_events.contains(&queued_events[i]) { + let event_is_visible = services() + .rooms + .state_accessor + .server_can_see_event(sender_servername, &queued_events[i])?; + + if !event_is_visible { i += 1; continue; } + + // Don't send this event again if it comes through some other + // event's prev_events. + stop_at_events.insert(queued_events[i].clone()); + queued_events.extend_from_slice( &serde_json::from_value::>( serde_json::to_value(pdu.get("prev_events").cloned().ok_or_else(|| { @@ -1026,7 +1116,7 @@ pub async fn get_missing_events_route( i += 1; } - Ok(get_missing_events::v1::Response { events }) + Ok(events) } /// # `GET /_matrix/federation/v1/event_auth/{roomId}/{eventId}` diff --git a/src/database/key_value/rooms/state_accessor.rs b/src/database/key_value/rooms/state_accessor.rs index 70e59acb..8bc94982 100644 --- a/src/database/key_value/rooms/state_accessor.rs +++ b/src/database/key_value/rooms/state_accessor.rs @@ -5,7 +5,13 @@ use std::{ use crate::{database::KeyValueDatabase, service, services, utils, Error, PduEvent, Result}; use async_trait::async_trait; -use ruma::{events::StateEventType, EventId, RoomId}; +use ruma::{ + events::{ + room::history_visibility::{HistoryVisibility, RoomHistoryVisibilityEventContent}, + StateEventType, + }, + EventId, RoomId, ServerName, +}; #[async_trait] impl service::rooms::state_accessor::Data for KeyValueDatabase { @@ -141,6 +147,35 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase { }) } + /// Whether a server is allowed to see an event through federation, based on + /// the room's history_visibility at that event's state. + /// + /// Note: Joined/Invited history visibility not yet implemented. + #[tracing::instrument(skip(self))] + fn server_can_see_event(&self, _server_name: &ServerName, event_id: &EventId) -> Result { + let shortstatehash = match self.pdu_shortstatehash(event_id) { + Ok(Some(shortstatehash)) => shortstatehash, + _ => return Ok(false), + }; + + let history_visibility = self + .state_get(shortstatehash, &StateEventType::RoomHistoryVisibility, "")? + .map(|event| serde_json::from_str(event.content.get())) + .transpose() + .map_err(|_| Error::bad_database("Invalid room history visibility event in database."))? + .map(|content: RoomHistoryVisibilityEventContent| content.history_visibility); + + Ok(match history_visibility { + Some(HistoryVisibility::WorldReadable) => true, + Some(HistoryVisibility::Shared) => true, + // TODO: Check if any of the server's users were invited + // at this point in time. + Some(HistoryVisibility::Joined) => false, + Some(HistoryVisibility::Invited) => false, + _ => false, + }) + } + /// Returns the full room state. async fn room_state_full( &self, diff --git a/src/main.rs b/src/main.rs index d2183a39..9b79a51b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -368,6 +368,7 @@ fn routes() -> Router { .ruma_route(server_server::send_transaction_message_route) .ruma_route(server_server::get_event_route) .ruma_route(server_server::get_missing_events_route) + .ruma_route(server_server::get_backfill_route) .ruma_route(server_server::get_event_authorization_route) .ruma_route(server_server::get_room_state_route) .ruma_route(server_server::get_room_state_ids_route) diff --git a/src/service/rooms/state_accessor/data.rs b/src/service/rooms/state_accessor/data.rs index 340b19c3..169c8be8 100644 --- a/src/service/rooms/state_accessor/data.rs +++ b/src/service/rooms/state_accessor/data.rs @@ -4,7 +4,7 @@ use std::{ }; use async_trait::async_trait; -use ruma::{events::StateEventType, EventId, RoomId}; +use ruma::{events::StateEventType, EventId, RoomId, ServerName}; use crate::{PduEvent, Result}; @@ -38,6 +38,9 @@ pub trait Data: Send + Sync { /// Returns the state hash for this pdu. fn pdu_shortstatehash(&self, event_id: &EventId) -> Result>; + /// Returns true if a server has permission to see an event + fn server_can_see_event(&self, sever_name: &ServerName, event_id: &EventId) -> Result; + /// Returns the full room state. async fn room_state_full( &self, diff --git a/src/service/rooms/state_accessor/mod.rs b/src/service/rooms/state_accessor/mod.rs index 1a9c4a9e..89135da6 100644 --- a/src/service/rooms/state_accessor/mod.rs +++ b/src/service/rooms/state_accessor/mod.rs @@ -5,7 +5,7 @@ use std::{ }; pub use data::Data; -use ruma::{events::StateEventType, EventId, RoomId}; +use ruma::{events::StateEventType, EventId, RoomId, ServerName}; use crate::{PduEvent, Result}; @@ -54,6 +54,16 @@ impl Service { self.db.pdu_shortstatehash(event_id) } + /// Returns true if a server has permission to see an event + #[tracing::instrument(skip(self))] + pub fn server_can_see_event<'a>( + &'a self, + sever_name: &ServerName, + event_id: &EventId, + ) -> Result { + self.db.server_can_see_event(sever_name, event_id) + } + /// Returns the full room state. #[tracing::instrument(skip(self))] pub async fn room_state_full(