split s2s into units

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-06-05 04:32:58 +00:00 committed by June 🍓🦴
parent 38238c309f
commit 3af153f5ae
23 changed files with 2260 additions and 2092 deletions

View file

@ -1,7 +1,7 @@
pub mod client_server;
pub mod router;
mod ruma_wrapper;
pub mod server_server;
pub mod server;
extern crate conduit_core as conduit;
extern crate conduit_service as service;

View file

@ -7,7 +7,7 @@ use conduit::{Error, Server};
use http::Uri;
use ruma::api::client::error::ErrorKind;
use crate::{client_server, ruma_wrapper::RouterExt, server_server};
use crate::{client_server, ruma_wrapper::RouterExt, server};
pub fn build(router: Router, server: &Server) -> Router {
let config = &server.config;
@ -188,35 +188,32 @@ pub fn build(router: Router, server: &Server) -> Router {
if config.allow_federation {
router
.ruma_route(server_server::get_server_version_route)
.route("/_matrix/key/v2/server", get(server_server::get_server_keys_route))
.route(
"/_matrix/key/v2/server/:key_id",
get(server_server::get_server_keys_deprecated_route),
)
.ruma_route(server_server::get_public_rooms_route)
.ruma_route(server_server::get_public_rooms_filtered_route)
.ruma_route(server_server::send_transaction_message_route)
.ruma_route(server_server::get_event_route)
.ruma_route(server_server::get_backfill_route)
.ruma_route(server_server::get_missing_events_route)
.ruma_route(server_server::get_event_authorization_route)
.ruma_route(server_server::get_room_state_route)
.ruma_route(server_server::get_room_state_ids_route)
.ruma_route(server_server::create_leave_event_template_route)
.ruma_route(server_server::create_leave_event_v1_route)
.ruma_route(server_server::create_leave_event_v2_route)
.ruma_route(server_server::create_join_event_template_route)
.ruma_route(server_server::create_join_event_v1_route)
.ruma_route(server_server::create_join_event_v2_route)
.ruma_route(server_server::create_invite_route)
.ruma_route(server_server::get_devices_route)
.ruma_route(server_server::get_room_information_route)
.ruma_route(server_server::get_profile_information_route)
.ruma_route(server_server::get_keys_route)
.ruma_route(server_server::claim_keys_route)
.ruma_route(server_server::get_hierarchy_route)
.ruma_route(server_server::well_known_server)
.ruma_route(server::get_server_version_route)
.route("/_matrix/key/v2/server", get(server::get_server_keys_route))
.route("/_matrix/key/v2/server/:key_id", get(server::get_server_keys_deprecated_route))
.ruma_route(server::get_public_rooms_route)
.ruma_route(server::get_public_rooms_filtered_route)
.ruma_route(server::send_transaction_message_route)
.ruma_route(server::get_event_route)
.ruma_route(server::get_backfill_route)
.ruma_route(server::get_missing_events_route)
.ruma_route(server::get_event_authorization_route)
.ruma_route(server::get_room_state_route)
.ruma_route(server::get_room_state_ids_route)
.ruma_route(server::create_leave_event_template_route)
.ruma_route(server::create_leave_event_v1_route)
.ruma_route(server::create_leave_event_v2_route)
.ruma_route(server::create_join_event_template_route)
.ruma_route(server::create_join_event_v1_route)
.ruma_route(server::create_join_event_v2_route)
.ruma_route(server::create_invite_route)
.ruma_route(server::get_devices_route)
.ruma_route(server::get_room_information_route)
.ruma_route(server::get_profile_information_route)
.ruma_route(server::get_keys_route)
.ruma_route(server::claim_keys_route)
.ruma_route(server::get_hierarchy_route)
.ruma_route(server::well_known_server)
.route("/_conduwuit/local_user_count", get(client_server::conduwuit_local_user_count))
} else {
router

View file

@ -0,0 +1,69 @@
use ruma::{
api::{client::error::ErrorKind, federation::backfill::get_backfill},
uint, user_id, MilliSecondsSinceUnixEpoch,
};
use crate::{services, Error, PduEvent, Result, Ruma};
/// # `GET /_matrix/federation/v1/backfill/<room_id>`
///
/// Retrieves events from before the sender joined the room, if the room's
/// history visibility allows.
pub(crate) async fn get_backfill_route(body: Ruma<get_backfill::v1::Request>) -> Result<get_backfill::v1::Response> {
let origin = body.origin.as_ref().expect("server is authenticated");
if !services()
.rooms
.state_cache
.server_in_room(origin, &body.room_id)?
{
return Err(Error::BadRequest(ErrorKind::forbidden(), "Server is not in room."));
}
services()
.rooms
.event_handler
.acl_check(origin, &body.room_id)?;
let until = body
.v
.iter()
.map(|event_id| services().rooms.timeline.get_pdu_count(event_id))
.filter_map(|r| r.ok().flatten())
.max()
.ok_or_else(|| Error::BadRequest(ErrorKind::InvalidParam, "Event not found."))?;
let limit = body
.limit
.min(uint!(100))
.try_into()
.expect("UInt could not be converted to usize");
let all_events = services()
.rooms
.timeline
.pdus_until(user_id!("@doesntmatter:conduit.rs"), &body.room_id, until)?
.take(limit);
let events = all_events
.filter_map(Result::ok)
.filter(|(_, e)| {
matches!(
services()
.rooms
.state_accessor
.server_can_see_event(origin, &e.room_id, &e.event_id,),
Ok(true),
)
})
.map(|(_, pdu)| services().rooms.timeline.get_pdu_json(&pdu.event_id))
.filter_map(|r| r.ok().flatten())
.map(PduEvent::convert_to_outgoing_federation_event)
.collect();
Ok(get_backfill::v1::Response {
origin: services().globals.server_name().to_owned(),
origin_server_ts: MilliSecondsSinceUnixEpoch::now(),
pdus: events,
})
}

52
src/api/server/event.rs Normal file
View file

@ -0,0 +1,52 @@
use ruma::{
api::{client::error::ErrorKind, federation::event::get_event},
MilliSecondsSinceUnixEpoch, RoomId,
};
use crate::{services, Error, PduEvent, Result, Ruma};
/// # `GET /_matrix/federation/v1/event/{eventId}`
///
/// Retrieves a single event from the server.
///
/// - Only works if a user of this server is currently invited or joined the
/// room
pub(crate) async fn get_event_route(body: Ruma<get_event::v1::Request>) -> Result<get_event::v1::Response> {
let origin = body.origin.as_ref().expect("server is authenticated");
let event = services()
.rooms
.timeline
.get_pdu_json(&body.event_id)?
.ok_or_else(|| Error::BadRequest(ErrorKind::NotFound, "Event not found."))?;
let room_id_str = event
.get("room_id")
.and_then(|val| val.as_str())
.ok_or_else(|| Error::bad_database("Invalid event in database."))?;
let room_id =
<&RoomId>::try_from(room_id_str).map_err(|_| Error::bad_database("Invalid room_id in event in database."))?;
if !services()
.rooms
.state_cache
.server_in_room(origin, room_id)?
{
return Err(Error::BadRequest(ErrorKind::forbidden(), "Server is not in room."));
}
if !services()
.rooms
.state_accessor
.server_can_see_event(origin, room_id, &body.event_id)?
{
return Err(Error::BadRequest(ErrorKind::forbidden(), "Server is not allowed to see event."));
}
Ok(get_event::v1::Response {
origin: services().globals.server_name().to_owned(),
origin_server_ts: MilliSecondsSinceUnixEpoch::now(),
pdu: PduEvent::convert_to_outgoing_federation_event(event),
})
}

View file

@ -0,0 +1,59 @@
use std::sync::Arc;
use ruma::{
api::{client::error::ErrorKind, federation::authorization::get_event_authorization},
RoomId,
};
use crate::{services, Error, PduEvent, Result, Ruma};
/// # `GET /_matrix/federation/v1/event_auth/{roomId}/{eventId}`
///
/// Retrieves the auth chain for a given event.
///
/// - This does not include the event itself
pub(crate) async fn get_event_authorization_route(
body: Ruma<get_event_authorization::v1::Request>,
) -> Result<get_event_authorization::v1::Response> {
let origin = body.origin.as_ref().expect("server is authenticated");
if !services()
.rooms
.state_cache
.server_in_room(origin, &body.room_id)?
{
return Err(Error::BadRequest(ErrorKind::forbidden(), "Server is not in room."));
}
services()
.rooms
.event_handler
.acl_check(origin, &body.room_id)?;
let event = services()
.rooms
.timeline
.get_pdu_json(&body.event_id)?
.ok_or_else(|| Error::BadRequest(ErrorKind::NotFound, "Event not found."))?;
let room_id_str = event
.get("room_id")
.and_then(|val| val.as_str())
.ok_or_else(|| Error::bad_database("Invalid event in database."))?;
let room_id =
<&RoomId>::try_from(room_id_str).map_err(|_| Error::bad_database("Invalid room_id in event in database."))?;
let auth_chain_ids = services()
.rooms
.auth_chain
.event_ids_iter(room_id, vec![Arc::from(&*body.event_id)])
.await?;
Ok(get_event_authorization::v1::Response {
auth_chain: auth_chain_ids
.filter_map(|id| services().rooms.timeline.get_pdu_json(&id).ok()?)
.map(PduEvent::convert_to_outgoing_federation_event)
.collect(),
})
}

View file

@ -0,0 +1,86 @@
use ruma::{
api::{client::error::ErrorKind, federation::event::get_missing_events},
OwnedEventId, RoomId,
};
use crate::{services, Error, PduEvent, Result, Ruma};
/// # `POST /_matrix/federation/v1/get_missing_events/{roomId}`
///
/// Retrieves events that the sender is missing.
pub(crate) async fn get_missing_events_route(
body: Ruma<get_missing_events::v1::Request>,
) -> Result<get_missing_events::v1::Response> {
let origin = body.origin.as_ref().expect("server is authenticated");
if !services()
.rooms
.state_cache
.server_in_room(origin, &body.room_id)?
{
return Err(Error::BadRequest(ErrorKind::forbidden(), "Server is not in room"));
}
services()
.rooms
.event_handler
.acl_check(origin, &body.room_id)?;
let limit = body
.limit
.try_into()
.expect("UInt could not be converted to usize");
let mut queued_events = body.latest_events.clone();
// the vec will never have more entries the limit
let mut events = Vec::with_capacity(limit);
let mut i: usize = 0;
while i < queued_events.len() && events.len() < limit {
if let Some(pdu) = services().rooms.timeline.get_pdu_json(&queued_events[i])? {
let room_id_str = pdu
.get("room_id")
.and_then(|val| val.as_str())
.ok_or_else(|| Error::bad_database("Invalid event in database."))?;
let event_room_id = <&RoomId>::try_from(room_id_str)
.map_err(|_| Error::bad_database("Invalid room_id in event in database."))?;
if event_room_id != body.room_id {
return Err(Error::BadRequest(ErrorKind::InvalidParam, "Event from wrong room."));
}
if body.earliest_events.contains(&queued_events[i]) {
i = i.saturating_add(1);
continue;
}
if !services()
.rooms
.state_accessor
.server_can_see_event(origin, &body.room_id, &queued_events[i])?
{
i = i.saturating_add(1);
continue;
}
queued_events.extend_from_slice(
&serde_json::from_value::<Vec<OwnedEventId>>(
serde_json::to_value(
pdu.get("prev_events")
.cloned()
.ok_or_else(|| Error::bad_database("Event in db has no prev_events property."))?,
)
.expect("canonical json is valid json value"),
)
.map_err(|_| Error::bad_database("Invalid prev_events in event in database."))?,
);
events.push(PduEvent::convert_to_outgoing_federation_event(pdu));
}
i = i.saturating_add(1);
}
Ok(get_missing_events::v1::Response {
events,
})
}

View file

@ -0,0 +1,21 @@
use ruma::api::{client::error::ErrorKind, federation::space::get_hierarchy};
use crate::{services, Error, Result, Ruma};
/// # `GET /_matrix/federation/v1/hierarchy/{roomId}`
///
/// Gets the space tree in a depth-first manner to locate child rooms of a given
/// space.
pub(crate) async fn get_hierarchy_route(body: Ruma<get_hierarchy::v1::Request>) -> Result<get_hierarchy::v1::Response> {
let origin = body.origin.as_ref().expect("server is authenticated");
if services().rooms.metadata.exists(&body.room_id)? {
services()
.rooms
.spaces
.get_federation_hierarchy(&body.room_id, origin, body.suggested_only)
.await
} else {
Err(Error::BadRequest(ErrorKind::NotFound, "Room does not exist."))
}
}

177
src/api/server/invite.rs Normal file
View file

@ -0,0 +1,177 @@
use ruma::{
api::{client::error::ErrorKind, federation::membership::create_invite},
events::room::member::{MembershipState, RoomMemberEventContent},
serde::JsonObject,
CanonicalJsonValue, EventId, OwnedUserId,
};
use tracing::warn;
use crate::{
service::server_is_ours,
services,
utils::{self},
Error, PduEvent, Result, Ruma,
};
/// # `PUT /_matrix/federation/v2/invite/{roomId}/{eventId}`
///
/// Invites a remote user to a room.
pub(crate) async fn create_invite_route(body: Ruma<create_invite::v2::Request>) -> Result<create_invite::v2::Response> {
let origin = body.origin.as_ref().expect("server is authenticated");
// ACL check origin
services()
.rooms
.event_handler
.acl_check(origin, &body.room_id)?;
if !services()
.globals
.supported_room_versions()
.contains(&body.room_version)
{
return Err(Error::BadRequest(
ErrorKind::IncompatibleRoomVersion {
room_version: body.room_version.clone(),
},
"Server does not support this room version.",
));
}
if let Some(server) = body.room_id.server_name() {
if services()
.globals
.config
.forbidden_remote_server_names
.contains(&server.to_owned())
{
return Err(Error::BadRequest(
ErrorKind::forbidden(),
"Server is banned on this homeserver.",
));
}
}
if services()
.globals
.config
.forbidden_remote_server_names
.contains(origin)
{
warn!(
"Received federated/remote invite from banned server {origin} for room ID {}. Rejecting.",
body.room_id
);
return Err(Error::BadRequest(
ErrorKind::forbidden(),
"Server is banned on this homeserver.",
));
}
if let Some(via) = &body.via {
if via.is_empty() {
return Err(Error::BadRequest(ErrorKind::InvalidParam, "via field must not be empty."));
}
}
let mut signed_event = utils::to_canonical_object(&body.event)
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Invite event is invalid."))?;
let invited_user: OwnedUserId = serde_json::from_value(
signed_event
.get("state_key")
.ok_or_else(|| Error::BadRequest(ErrorKind::InvalidParam, "Event has no state_key property."))?
.clone()
.into(),
)
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "state_key is not a user ID."))?;
if !server_is_ours(invited_user.server_name()) {
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"User does not belong to this homeserver.",
));
}
// Make sure we're not ACL'ed from their room.
services()
.rooms
.event_handler
.acl_check(invited_user.server_name(), &body.room_id)?;
ruma::signatures::hash_and_sign_event(
services().globals.server_name().as_str(),
services().globals.keypair(),
&mut signed_event,
&body.room_version,
)
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Failed to sign event."))?;
// Generate event id
let event_id = EventId::parse(format!(
"${}",
ruma::signatures::reference_hash(&signed_event, &body.room_version)
.expect("ruma can calculate reference hashes")
))
.expect("ruma's reference hashes are valid event ids");
// Add event_id back
signed_event.insert("event_id".to_owned(), CanonicalJsonValue::String(event_id.to_string()));
let sender: OwnedUserId = serde_json::from_value(
signed_event
.get("sender")
.ok_or_else(|| Error::BadRequest(ErrorKind::InvalidParam, "Event had no sender property."))?
.clone()
.into(),
)
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "sender is not a user ID."))?;
if services().rooms.metadata.is_banned(&body.room_id)? && !services().users.is_admin(&invited_user)? {
return Err(Error::BadRequest(
ErrorKind::forbidden(),
"This room is banned on this homeserver.",
));
}
if services().globals.block_non_admin_invites() && !services().users.is_admin(&invited_user)? {
return Err(Error::BadRequest(
ErrorKind::forbidden(),
"This server does not allow room invites.",
));
}
let mut invite_state = body.invite_room_state.clone();
let mut event: JsonObject = serde_json::from_str(body.event.get())
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Invalid invite event bytes."))?;
event.insert("event_id".to_owned(), "$placeholder".into());
let pdu: PduEvent = serde_json::from_value(event.into())
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Invalid invite event."))?;
invite_state.push(pdu.to_stripped_state_event());
// If we are active in the room, the remote server will notify us about the join
// via /send
if !services()
.rooms
.state_cache
.server_in_room(services().globals.server_name(), &body.room_id)?
{
services().rooms.state_cache.update_membership(
&body.room_id,
&invited_user,
RoomMemberEventContent::new(MembershipState::Invite),
&sender,
Some(invite_state),
body.via.clone(),
true,
)?;
}
Ok(create_invite::v2::Response {
event: PduEvent::convert_to_outgoing_federation_event(signed_event),
})
}

76
src/api/server/key.rs Normal file
View file

@ -0,0 +1,76 @@
use std::{
collections::BTreeMap,
time::{Duration, SystemTime},
};
use axum::{response::IntoResponse, Json};
use ruma::{
api::{
federation::discovery::{get_server_keys, ServerSigningKeys, VerifyKey},
OutgoingResponse,
},
serde::{Base64, Raw},
MilliSecondsSinceUnixEpoch, OwnedServerSigningKeyId,
};
use crate::{services, Result};
/// # `GET /_matrix/key/v2/server`
///
/// Gets the public signing keys of this server.
///
/// - Matrix does not support invalidating public keys, so the key returned by
/// this will be valid
/// forever.
// Response type for this endpoint is Json because we need to calculate a
// signature for the response
pub(crate) async fn get_server_keys_route() -> Result<impl IntoResponse> {
let verify_keys: BTreeMap<OwnedServerSigningKeyId, VerifyKey> = BTreeMap::from([(
format!("ed25519:{}", services().globals.keypair().version())
.try_into()
.expect("found invalid server signing keys in DB"),
VerifyKey {
key: Base64::new(services().globals.keypair().public_key().to_vec()),
},
)]);
let mut response = serde_json::from_slice(
get_server_keys::v2::Response {
server_key: Raw::new(&ServerSigningKeys {
server_name: services().globals.server_name().to_owned(),
verify_keys,
old_verify_keys: BTreeMap::new(),
signatures: BTreeMap::new(),
valid_until_ts: MilliSecondsSinceUnixEpoch::from_system_time(
SystemTime::now()
.checked_add(Duration::from_secs(86400 * 7))
.expect("valid_until_ts should not get this high"),
)
.expect("time is valid"),
})
.expect("static conversion, no errors"),
}
.try_into_http_response::<Vec<u8>>()
.unwrap()
.body(),
)
.unwrap();
ruma::signatures::sign_json(
services().globals.server_name().as_str(),
services().globals.keypair(),
&mut response,
)
.unwrap();
Ok(Json(response))
}
/// # `GET /_matrix/key/v2/server/{keyId}`
///
/// Gets the public signing keys of this server.
///
/// - Matrix does not support invalidating public keys, so the key returned by
/// this will be valid
/// forever.
pub(crate) async fn get_server_keys_deprecated_route() -> impl IntoResponse { get_server_keys_route().await }

241
src/api/server/make_join.rs Normal file
View file

@ -0,0 +1,241 @@
use std::sync::Arc;
use ruma::{
api::{client::error::ErrorKind, federation::membership::prepare_join_event},
events::{
room::{
join_rules::{AllowRule, JoinRule, RoomJoinRulesEventContent},
member::{MembershipState, RoomMemberEventContent},
},
StateEventType, TimelineEventType,
},
RoomVersionId,
};
use serde_json::value::to_raw_value;
use tracing::warn;
use crate::{
service::{pdu::PduBuilder, user_is_local},
services, Error, Result, Ruma,
};
/// # `GET /_matrix/federation/v1/make_join/{roomId}/{userId}`
///
/// Creates a join template.
pub(crate) async fn create_join_event_template_route(
body: Ruma<prepare_join_event::v1::Request>,
) -> Result<prepare_join_event::v1::Response> {
if !services().rooms.metadata.exists(&body.room_id)? {
return Err(Error::BadRequest(ErrorKind::NotFound, "Room is unknown to this server."));
}
let origin = body.origin.as_ref().expect("server is authenticated");
if body.user_id.server_name() != origin {
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Not allowed to join on behalf of another server/user",
));
}
// ACL check origin server
services()
.rooms
.event_handler
.acl_check(origin, &body.room_id)?;
if services()
.globals
.config
.forbidden_remote_server_names
.contains(origin)
{
warn!(
"Server {origin} for remote user {} tried joining room ID {} which has a server name that is globally \
forbidden. Rejecting.",
&body.user_id, &body.room_id,
);
return Err(Error::BadRequest(
ErrorKind::forbidden(),
"Server is banned on this homeserver.",
));
}
if let Some(server) = body.room_id.server_name() {
if services()
.globals
.config
.forbidden_remote_server_names
.contains(&server.to_owned())
{
return Err(Error::BadRequest(
ErrorKind::forbidden(),
"Server is banned on this homeserver.",
));
}
}
let mutex_state = Arc::clone(
services()
.globals
.roomid_mutex_state
.write()
.await
.entry(body.room_id.clone())
.or_default(),
);
let state_lock = mutex_state.lock().await;
let join_rules_event =
services()
.rooms
.state_accessor
.room_state_get(&body.room_id, &StateEventType::RoomJoinRules, "")?;
let join_rules_event_content: Option<RoomJoinRulesEventContent> = join_rules_event
.as_ref()
.map(|join_rules_event| {
serde_json::from_str(join_rules_event.content.get())
.map_err(|_| Error::bad_database("Invalid join rules event in db."))
})
.transpose()?;
let join_authorized_via_users_server = if let Some(join_rules_event_content) = join_rules_event_content {
if let JoinRule::Restricted(r) | JoinRule::KnockRestricted(r) = join_rules_event_content.join_rule {
if r.allow
.iter()
.filter_map(|rule| {
if let AllowRule::RoomMembership(membership) = rule {
Some(membership)
} else {
None
}
})
.any(|m| {
services()
.rooms
.state_cache
.is_joined(&body.user_id, &m.room_id)
.unwrap_or(false)
}) {
if services()
.rooms
.state_cache
.is_left(&body.user_id, &body.room_id)
.unwrap_or(true)
{
let members: Vec<_> = services()
.rooms
.state_cache
.room_members(&body.room_id)
.filter_map(Result::ok)
.filter(|user| user_is_local(user))
.collect();
let mut auth_user = None;
for user in members {
if services()
.rooms
.state_accessor
.user_can_invite(&body.room_id, &user, &body.user_id, &state_lock)
.await
.unwrap_or(false)
{
auth_user = Some(user);
break;
}
}
if auth_user.is_some() {
auth_user
} else {
return Err(Error::BadRequest(
ErrorKind::UnableToGrantJoin,
"No user on this server is able to assist in joining.",
));
}
} else {
// If the user has any state other than leave, either:
// - the auth_check will deny them (ban, knock - (until/unless MSC4123 is
// merged))
// - they are able to join via other methods (invite)
// - they are already in the room (join)
None
}
} else {
return Err(Error::BadRequest(
ErrorKind::UnableToAuthorizeJoin,
"User is not known to be in any required room.",
));
}
} else {
None
}
} else {
None
};
let room_version_id = services().rooms.state.get_room_version(&body.room_id)?;
if !body.ver.contains(&room_version_id) {
return Err(Error::BadRequest(
ErrorKind::IncompatibleRoomVersion {
room_version: room_version_id,
},
"Room version not supported.",
));
}
let content = to_raw_value(&RoomMemberEventContent {
avatar_url: None,
blurhash: None,
displayname: None,
is_direct: None,
membership: MembershipState::Join,
third_party_invite: None,
reason: None,
join_authorized_via_users_server,
})
.expect("member event is valid value");
let (_pdu, mut pdu_json) = services().rooms.timeline.create_hash_and_sign_event(
PduBuilder {
event_type: TimelineEventType::RoomMember,
content,
unsigned: None,
state_key: Some(body.user_id.to_string()),
redacts: None,
},
&body.user_id,
&body.room_id,
&state_lock,
)?;
drop(state_lock);
// room v3 and above removed the "event_id" field from remote PDU format
match room_version_id {
RoomVersionId::V1 | RoomVersionId::V2 => {},
RoomVersionId::V3
| RoomVersionId::V4
| RoomVersionId::V5
| RoomVersionId::V6
| RoomVersionId::V7
| RoomVersionId::V8
| RoomVersionId::V9
| RoomVersionId::V10
| RoomVersionId::V11 => {
pdu_json.remove("event_id");
},
_ => {
warn!("Unexpected or unsupported room version {room_version_id}");
return Err(Error::BadRequest(
ErrorKind::BadJson,
"Unexpected or unsupported room version found",
));
},
};
Ok(prepare_join_event::v1::Response {
room_version: Some(room_version_id),
event: to_raw_value(&pdu_json).expect("CanonicalJson can be serialized to JSON"),
})
}

View file

@ -0,0 +1,105 @@
use std::sync::Arc;
use ruma::{
api::{client::error::ErrorKind, federation::membership::prepare_leave_event},
events::{
room::member::{MembershipState, RoomMemberEventContent},
TimelineEventType,
},
RoomVersionId,
};
use serde_json::value::to_raw_value;
use crate::{service::pdu::PduBuilder, services, Error, Result, Ruma};
/// # `PUT /_matrix/federation/v1/make_leave/{roomId}/{eventId}`
///
/// Creates a leave template.
pub(crate) async fn create_leave_event_template_route(
body: Ruma<prepare_leave_event::v1::Request>,
) -> Result<prepare_leave_event::v1::Response> {
if !services().rooms.metadata.exists(&body.room_id)? {
return Err(Error::BadRequest(ErrorKind::NotFound, "Room is unknown to this server."));
}
let origin = body.origin.as_ref().expect("server is authenticated");
if body.user_id.server_name() != origin {
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Not allowed to leave on behalf of another server/user",
));
}
// ACL check origin
services()
.rooms
.event_handler
.acl_check(origin, &body.room_id)?;
let room_version_id = services().rooms.state.get_room_version(&body.room_id)?;
let mutex_state = Arc::clone(
services()
.globals
.roomid_mutex_state
.write()
.await
.entry(body.room_id.clone())
.or_default(),
);
let state_lock = mutex_state.lock().await;
let content = to_raw_value(&RoomMemberEventContent {
avatar_url: None,
blurhash: None,
displayname: None,
is_direct: None,
membership: MembershipState::Leave,
third_party_invite: None,
reason: None,
join_authorized_via_users_server: None,
})
.expect("member event is valid value");
let (_pdu, mut pdu_json) = services().rooms.timeline.create_hash_and_sign_event(
PduBuilder {
event_type: TimelineEventType::RoomMember,
content,
unsigned: None,
state_key: Some(body.user_id.to_string()),
redacts: None,
},
&body.user_id,
&body.room_id,
&state_lock,
)?;
drop(state_lock);
// room v3 and above removed the "event_id" field from remote PDU format
match room_version_id {
RoomVersionId::V1 | RoomVersionId::V2 => {},
RoomVersionId::V3
| RoomVersionId::V4
| RoomVersionId::V5
| RoomVersionId::V6
| RoomVersionId::V7
| RoomVersionId::V8
| RoomVersionId::V9
| RoomVersionId::V10
| RoomVersionId::V11 => {
pdu_json.remove("event_id");
},
_ => {
return Err(Error::BadRequest(
ErrorKind::BadJson,
"Unexpected or unsupported room version found",
));
},
};
Ok(prepare_leave_event::v1::Response {
room_version: Some(room_version_id),
event: to_raw_value(&pdu_json).expect("CanonicalJson can be serialized to JSON"),
})
}

39
src/api/server/mod.rs Normal file
View file

@ -0,0 +1,39 @@
pub(super) mod backfill;
pub(super) mod event;
pub(super) mod event_auth;
pub(super) mod get_missing_events;
pub(super) mod hierarchy;
pub(super) mod invite;
pub(super) mod key;
pub(super) mod make_join;
pub(super) mod make_leave;
pub(super) mod publicrooms;
pub(super) mod query;
pub(super) mod send;
pub(super) mod send_join;
pub(super) mod send_leave;
pub(super) mod state;
pub(super) mod state_ids;
pub(super) mod user;
pub(super) mod version;
pub(super) mod well_known;
pub(super) use backfill::*;
pub(super) use event::*;
pub(super) use event_auth::*;
pub(super) use get_missing_events::*;
pub(super) use hierarchy::*;
pub(super) use invite::*;
pub(super) use key::*;
pub(super) use make_join::*;
pub(super) use make_leave::*;
pub(super) use publicrooms::*;
pub(super) use query::*;
pub(super) use send::*;
pub(super) use send_join::*;
pub(super) use send_leave::*;
pub(super) use state::*;
pub(super) use state_ids::*;
pub(super) use user::*;
pub(super) use version::*;
pub(super) use well_known::*;

View file

@ -0,0 +1,74 @@
use ruma::{
api::{
client::error::ErrorKind,
federation::directory::{get_public_rooms, get_public_rooms_filtered},
},
directory::Filter,
};
use crate::{
client_server::{self},
services, Error, Result, Ruma,
};
/// # `POST /_matrix/federation/v1/publicRooms`
///
/// Lists the public rooms on this server.
pub(crate) async fn get_public_rooms_filtered_route(
body: Ruma<get_public_rooms_filtered::v1::Request>,
) -> Result<get_public_rooms_filtered::v1::Response> {
if !services()
.globals
.allow_public_room_directory_over_federation()
{
return Err(Error::BadRequest(ErrorKind::forbidden(), "Room directory is not public"));
}
let response = client_server::get_public_rooms_filtered_helper(
None,
body.limit,
body.since.as_deref(),
&body.filter,
&body.room_network,
)
.await
.map_err(|_| Error::BadRequest(ErrorKind::Unknown, "Failed to return this server's public room list."))?;
Ok(get_public_rooms_filtered::v1::Response {
chunk: response.chunk,
prev_batch: response.prev_batch,
next_batch: response.next_batch,
total_room_count_estimate: response.total_room_count_estimate,
})
}
/// # `GET /_matrix/federation/v1/publicRooms`
///
/// Lists the public rooms on this server.
pub(crate) async fn get_public_rooms_route(
body: Ruma<get_public_rooms::v1::Request>,
) -> Result<get_public_rooms::v1::Response> {
if !services()
.globals
.allow_public_room_directory_over_federation()
{
return Err(Error::BadRequest(ErrorKind::forbidden(), "Room directory is not public"));
}
let response = client_server::get_public_rooms_filtered_helper(
None,
body.limit,
body.since.as_deref(),
&Filter::default(),
&body.room_network,
)
.await
.map_err(|_| Error::BadRequest(ErrorKind::Unknown, "Failed to return this server's public room list."))?;
Ok(get_public_rooms::v1::Response {
chunk: response.chunk,
prev_batch: response.prev_batch,
next_batch: response.next_batch,
total_room_count_estimate: response.total_room_count_estimate,
})
}

102
src/api/server/query.rs Normal file
View file

@ -0,0 +1,102 @@
use get_profile_information::v1::ProfileField;
use rand::seq::SliceRandom;
use ruma::{
api::{
client::error::ErrorKind,
federation::query::{get_profile_information, get_room_information},
},
OwnedServerName,
};
use crate::{service::server_is_ours, services, Error, Result, Ruma};
/// # `GET /_matrix/federation/v1/query/directory`
///
/// Resolve a room alias to a room id.
pub(crate) async fn get_room_information_route(
body: Ruma<get_room_information::v1::Request>,
) -> Result<get_room_information::v1::Response> {
let room_id = services()
.rooms
.alias
.resolve_local_alias(&body.room_alias)?
.ok_or_else(|| Error::BadRequest(ErrorKind::NotFound, "Room alias not found."))?;
let mut servers: Vec<OwnedServerName> = services()
.rooms
.state_cache
.room_servers(&room_id)
.filter_map(Result::ok)
.collect();
servers.sort_unstable();
servers.dedup();
servers.shuffle(&mut rand::thread_rng());
// insert our server as the very first choice if in list
if let Some(server_index) = servers
.iter()
.position(|server| server == services().globals.server_name())
{
servers.swap_remove(server_index);
servers.insert(0, services().globals.server_name().to_owned());
}
Ok(get_room_information::v1::Response {
room_id,
servers,
})
}
/// # `GET /_matrix/federation/v1/query/profile`
///
///
/// Gets information on a profile.
pub(crate) async fn get_profile_information_route(
body: Ruma<get_profile_information::v1::Request>,
) -> Result<get_profile_information::v1::Response> {
if !services()
.globals
.allow_profile_lookup_federation_requests()
{
return Err(Error::BadRequest(
ErrorKind::forbidden(),
"Profile lookup over federation is not allowed on this homeserver.",
));
}
if !server_is_ours(body.user_id.server_name()) {
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"User does not belong to this server.",
));
}
let mut displayname = None;
let mut avatar_url = None;
let mut blurhash = None;
match &body.field {
Some(ProfileField::DisplayName) => {
displayname = services().users.displayname(&body.user_id)?;
},
Some(ProfileField::AvatarUrl) => {
avatar_url = services().users.avatar_url(&body.user_id)?;
blurhash = services().users.blurhash(&body.user_id)?;
},
// TODO: what to do with custom
Some(_) => {},
None => {
displayname = services().users.displayname(&body.user_id)?;
avatar_url = services().users.avatar_url(&body.user_id)?;
blurhash = services().users.blurhash(&body.user_id)?;
},
}
Ok(get_profile_information::v1::Response {
displayname,
avatar_url,
blurhash,
})
}

379
src/api/server/send.rs Normal file
View file

@ -0,0 +1,379 @@
use std::{collections::BTreeMap, sync::Arc, time::Instant};
use conduit::debug_warn;
use ruma::{
api::{
client::error::ErrorKind,
federation::transactions::{
edu::{DeviceListUpdateContent, DirectDeviceContent, Edu, SigningKeyUpdateContent},
send_transaction_message,
},
},
events::receipt::{ReceiptEvent, ReceiptEventContent, ReceiptType},
to_device::DeviceIdOrAllDevices,
};
use tokio::sync::RwLock;
use tracing::{debug, error, trace, warn};
use crate::{
service::rooms::event_handler::parse_incoming_pdu,
services,
utils::{self},
Error, Result, Ruma,
};
/// # `PUT /_matrix/federation/v1/send/{txnId}`
///
/// Push EDUs and PDUs to this server.
pub(crate) async fn send_transaction_message_route(
body: Ruma<send_transaction_message::v1::Request>,
) -> Result<send_transaction_message::v1::Response> {
let origin = body.origin.as_ref().expect("server is authenticated");
if *origin != body.body.origin {
return Err(Error::BadRequest(
ErrorKind::forbidden(),
"Not allowed to send transactions on behalf of other servers",
));
}
if body.pdus.len() > 50_usize {
return Err(Error::BadRequest(
ErrorKind::forbidden(),
"Not allowed to send more than 50 PDUs in one transaction",
));
}
if body.edus.len() > 100_usize {
return Err(Error::BadRequest(
ErrorKind::forbidden(),
"Not allowed to send more than 100 EDUs in one transaction",
));
}
// This is all the auth_events that have been recursively fetched so they don't
// have to be deserialized over and over again.
// TODO: make this persist across requests but not in a DB Tree (in globals?)
// TODO: This could potentially also be some sort of trie (suffix tree) like
// structure so that once an auth event is known it would know (using indexes
// maybe) all of the auth events that it references.
// let mut auth_cache = EventMap::new();
let txn_start_time = Instant::now();
let mut parsed_pdus = Vec::with_capacity(body.pdus.len());
for pdu in &body.pdus {
parsed_pdus.push(match parse_incoming_pdu(pdu) {
Ok(t) => t,
Err(e) => {
debug_warn!("Could not parse PDU: {e}");
continue;
},
});
// We do not add the event_id field to the pdu here because of signature
// and hashes checks
}
trace!(
pdus = ?parsed_pdus.len(),
edus = ?body.edus.len(),
elapsed = ?txn_start_time.elapsed(),
id = ?body.transaction_id,
origin =?body.origin,
"Starting txn",
);
// We go through all the signatures we see on the PDUs and fetch the
// corresponding signing keys
let pub_key_map = RwLock::new(BTreeMap::new());
if !parsed_pdus.is_empty() {
services()
.rooms
.event_handler
.fetch_required_signing_keys(parsed_pdus.iter().map(|(_event_id, event, _room_id)| event), &pub_key_map)
.await
.unwrap_or_else(|e| {
warn!("Could not fetch all signatures for PDUs from {origin}: {:?}", e);
});
debug!(
elapsed = ?txn_start_time.elapsed(),
"Fetched signing keys"
);
}
let mut resolved_map = BTreeMap::new();
for (event_id, value, room_id) in parsed_pdus {
let pdu_start_time = Instant::now();
let mutex = Arc::clone(
services()
.globals
.roomid_mutex_federation
.write()
.await
.entry(room_id.clone())
.or_default(),
);
let mutex_lock = mutex.lock().await;
resolved_map.insert(
event_id.clone(),
services()
.rooms
.event_handler
.handle_incoming_pdu(origin, &room_id, &event_id, value, true, &pub_key_map)
.await
.map(|_| ()),
);
drop(mutex_lock);
debug!(
pdu_elapsed = ?pdu_start_time.elapsed(),
txn_elapsed = ?txn_start_time.elapsed(),
"Finished PDU {event_id}",
);
}
for pdu in &resolved_map {
if let Err(e) = pdu.1 {
if matches!(e, Error::BadRequest(ErrorKind::NotFound, _)) {
warn!("Incoming PDU failed {:?}", pdu);
}
}
}
for edu in body
.edus
.iter()
.filter_map(|edu| serde_json::from_str::<Edu>(edu.json().get()).ok())
{
match edu {
Edu::Presence(presence) => {
if !services().globals.allow_incoming_presence() {
continue;
}
for update in presence.push {
if update.user_id.server_name() != origin {
debug_warn!(%update.user_id, %origin, "received presence EDU for user not belonging to origin");
continue;
}
services().presence.set_presence(
&update.user_id,
&update.presence,
Some(update.currently_active),
Some(update.last_active_ago),
update.status_msg.clone(),
)?;
}
},
Edu::Receipt(receipt) => {
if !services().globals.allow_incoming_read_receipts() {
continue;
}
for (room_id, room_updates) in receipt.receipts {
if services()
.rooms
.event_handler
.acl_check(origin, &room_id)
.is_err()
{
debug_warn!(%origin, %room_id, "received read receipt EDU from ACL'd server");
continue;
}
for (user_id, user_updates) in room_updates.read {
if user_id.server_name() != origin {
debug_warn!(%user_id, %origin, "received read receipt EDU for user not belonging to origin");
continue;
}
if services()
.rooms
.state_cache
.room_members(&room_id)
.filter_map(Result::ok)
.any(|member| member.server_name() == user_id.server_name())
{
for event_id in &user_updates.event_ids {
let user_receipts = BTreeMap::from([(user_id.clone(), user_updates.data.clone())]);
let receipts = BTreeMap::from([(ReceiptType::Read, user_receipts)]);
let receipt_content = BTreeMap::from([(event_id.to_owned(), receipts)]);
let event = ReceiptEvent {
content: ReceiptEventContent(receipt_content),
room_id: room_id.clone(),
};
services()
.rooms
.read_receipt
.readreceipt_update(&user_id, &room_id, event)?;
}
} else {
debug_warn!(%user_id, %room_id, %origin, "received read receipt EDU from server who does not have a single member from their server in the room");
continue;
}
}
}
},
Edu::Typing(typing) => {
if !services().globals.config.allow_incoming_typing {
continue;
}
if typing.user_id.server_name() != origin {
debug_warn!(%typing.user_id, %origin, "received typing EDU for user not belonging to origin");
continue;
}
if services()
.rooms
.event_handler
.acl_check(typing.user_id.server_name(), &typing.room_id)
.is_err()
{
debug_warn!(%typing.user_id, %typing.room_id, %origin, "received typing EDU for ACL'd user's server");
continue;
}
if services()
.rooms
.state_cache
.is_joined(&typing.user_id, &typing.room_id)?
{
if typing.typing {
let timeout = utils::millis_since_unix_epoch().saturating_add(
services()
.globals
.config
.typing_federation_timeout_s
.saturating_mul(1000),
);
services()
.rooms
.typing
.typing_add(&typing.user_id, &typing.room_id, timeout)
.await?;
} else {
services()
.rooms
.typing
.typing_remove(&typing.user_id, &typing.room_id)
.await?;
}
} else {
debug_warn!(%typing.user_id, %typing.room_id, %origin, "received typing EDU for user not in room");
continue;
}
},
Edu::DeviceListUpdate(DeviceListUpdateContent {
user_id,
..
}) => {
if user_id.server_name() != origin {
debug_warn!(%user_id, %origin, "received device list update EDU for user not belonging to origin");
continue;
}
services().users.mark_device_key_update(&user_id)?;
},
Edu::DirectToDevice(DirectDeviceContent {
sender,
ev_type,
message_id,
messages,
}) => {
if sender.server_name() != origin {
debug_warn!(%sender, %origin, "received direct to device EDU for user not belonging to origin");
continue;
}
// Check if this is a new transaction id
if services()
.transaction_ids
.existing_txnid(&sender, None, &message_id)?
.is_some()
{
continue;
}
for (target_user_id, map) in &messages {
for (target_device_id_maybe, event) in map {
match target_device_id_maybe {
DeviceIdOrAllDevices::DeviceId(target_device_id) => {
services().users.add_to_device_event(
&sender,
target_user_id,
target_device_id,
&ev_type.to_string(),
event.deserialize_as().map_err(|e| {
error!("To-Device event is invalid: {event:?} {e}");
Error::BadRequest(ErrorKind::InvalidParam, "Event is invalid")
})?,
)?;
},
DeviceIdOrAllDevices::AllDevices => {
for target_device_id in services().users.all_device_ids(target_user_id) {
services().users.add_to_device_event(
&sender,
target_user_id,
&target_device_id?,
&ev_type.to_string(),
event.deserialize_as().map_err(|_| {
Error::BadRequest(ErrorKind::InvalidParam, "Event is invalid")
})?,
)?;
}
},
}
}
}
// Save transaction id with empty data
services()
.transaction_ids
.add_txnid(&sender, None, &message_id, &[])?;
},
Edu::SigningKeyUpdate(SigningKeyUpdateContent {
user_id,
master_key,
self_signing_key,
}) => {
if user_id.server_name() != origin {
debug_warn!(%user_id, %origin, "received signing key update EDU from server that does not belong to user's server");
continue;
}
if let Some(master_key) = master_key {
services()
.users
.add_cross_signing_keys(&user_id, &master_key, &self_signing_key, &None, true)?;
}
},
Edu::_Custom(custom) => {
debug_warn!(?custom, "received custom/unknown EDU");
},
}
}
debug!(
pdus = ?body.pdus.len(),
edus = ?body.edus.len(),
elapsed = ?txn_start_time.elapsed(),
id = ?body.transaction_id,
origin =?body.origin,
"Finished txn",
);
Ok(send_transaction_message::v1::Response {
pdus: resolved_map
.into_iter()
.map(|(e, r)| (e, r.map_err(|e| e.sanitized_error())))
.collect(),
})
}

300
src/api/server/send_join.rs Normal file
View file

@ -0,0 +1,300 @@
#![allow(deprecated)]
use std::{collections::BTreeMap, sync::Arc};
use ruma::{
api::{client::error::ErrorKind, federation::membership::create_join_event},
events::{room::member::MembershipState, StateEventType},
CanonicalJsonValue, OwnedServerName, OwnedUserId, RoomId, ServerName,
};
use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
use tokio::sync::RwLock;
use tracing::warn;
use crate::{service::pdu::gen_event_id_canonical_json, services, Error, PduEvent, Result, Ruma};
/// helper method for /send_join v1 and v2
async fn create_join_event(
origin: &ServerName, room_id: &RoomId, pdu: &RawJsonValue,
) -> Result<create_join_event::v1::RoomState> {
if !services().rooms.metadata.exists(room_id)? {
return Err(Error::BadRequest(ErrorKind::NotFound, "Room is unknown to this server."));
}
// ACL check origin server
services().rooms.event_handler.acl_check(origin, room_id)?;
// We need to return the state prior to joining, let's keep a reference to that
// here
let shortstatehash = services()
.rooms
.state
.get_room_shortstatehash(room_id)?
.ok_or_else(|| Error::BadRequest(ErrorKind::NotFound, "Event state not found."))?;
let pub_key_map = RwLock::new(BTreeMap::new());
// let mut auth_cache = EventMap::new();
// We do not add the event_id field to the pdu here because of signature and
// hashes checks
let room_version_id = services().rooms.state.get_room_version(room_id)?;
let Ok((event_id, mut value)) = gen_event_id_canonical_json(pdu, &room_version_id) else {
// Event could not be converted to canonical json
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Could not convert event to canonical json.",
));
};
let event_type: StateEventType = serde_json::from_value(
value
.get("type")
.ok_or_else(|| Error::BadRequest(ErrorKind::InvalidParam, "Event missing type property."))?
.clone()
.into(),
)
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Event has invalid event type."))?;
if event_type != StateEventType::RoomMember {
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Not allowed to send non-membership state event to join endpoint.",
));
}
let content = value
.get("content")
.ok_or_else(|| Error::BadRequest(ErrorKind::InvalidParam, "Event missing content property."))?
.as_object()
.ok_or_else(|| Error::BadRequest(ErrorKind::InvalidParam, "Event content is empty or invalid."))?;
let membership: MembershipState = serde_json::from_value(
content
.get("membership")
.ok_or_else(|| Error::BadRequest(ErrorKind::InvalidParam, "Event content missing membership property."))?
.clone()
.into(),
)
.map_err(|_| Error::BadRequest(ErrorKind::BadJson, "Event has an invalid membership state."))?;
if membership != MembershipState::Join {
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Not allowed to send a non-join membership event to join endpoint.",
));
}
// ACL check sender server name
let sender: OwnedUserId = serde_json::from_value(
value
.get("sender")
.ok_or_else(|| Error::BadRequest(ErrorKind::InvalidParam, "Event missing sender property."))?
.clone()
.into(),
)
.map_err(|_| Error::BadRequest(ErrorKind::BadJson, "sender is not a valid user ID."))?;
services()
.rooms
.event_handler
.acl_check(sender.server_name(), room_id)?;
// check if origin server is trying to send for another server
if sender.server_name() != origin {
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Not allowed to join on behalf of another server.",
));
}
let state_key: OwnedUserId = serde_json::from_value(
value
.get("state_key")
.ok_or_else(|| Error::BadRequest(ErrorKind::InvalidParam, "Event missing state_key property."))?
.clone()
.into(),
)
.map_err(|_| Error::BadRequest(ErrorKind::BadJson, "state_key is invalid or not a user ID."))?;
if state_key != sender {
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"State key does not match sender user",
));
}
ruma::signatures::hash_and_sign_event(
services().globals.server_name().as_str(),
services().globals.keypair(),
&mut value,
&room_version_id,
)
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Failed to sign event."))?;
let origin: OwnedServerName = serde_json::from_value(
serde_json::to_value(
value
.get("origin")
.ok_or_else(|| Error::BadRequest(ErrorKind::InvalidParam, "Event missing origin property."))?,
)
.expect("CanonicalJson is valid json value"),
)
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "origin is not a server name."))?;
services()
.rooms
.event_handler
.fetch_required_signing_keys([&value], &pub_key_map)
.await?;
let mutex = Arc::clone(
services()
.globals
.roomid_mutex_federation
.write()
.await
.entry(room_id.to_owned())
.or_default(),
);
let mutex_lock = mutex.lock().await;
let pdu_id: Vec<u8> = services()
.rooms
.event_handler
.handle_incoming_pdu(&origin, room_id, &event_id, value.clone(), true, &pub_key_map)
.await?
.ok_or_else(|| Error::BadRequest(ErrorKind::InvalidParam, "Could not accept as timeline event."))?;
drop(mutex_lock);
let state_ids = services()
.rooms
.state_accessor
.state_full_ids(shortstatehash)
.await?;
let auth_chain_ids = services()
.rooms
.auth_chain
.event_ids_iter(room_id, state_ids.values().cloned().collect())
.await?;
services().sending.send_pdu_room(room_id, &pdu_id)?;
Ok(create_join_event::v1::RoomState {
auth_chain: auth_chain_ids
.filter_map(|id| services().rooms.timeline.get_pdu_json(&id).ok().flatten())
.map(PduEvent::convert_to_outgoing_federation_event)
.collect(),
state: state_ids
.iter()
.filter_map(|(_, id)| services().rooms.timeline.get_pdu_json(id).ok().flatten())
.map(PduEvent::convert_to_outgoing_federation_event)
.collect(),
// Event field is required if the room version supports restricted join rules.
event: Some(
to_raw_value(&CanonicalJsonValue::Object(value.clone()))
.expect("To raw json should not fail since only change was adding signature"),
),
})
}
/// # `PUT /_matrix/federation/v1/send_join/{roomId}/{eventId}`
///
/// Submits a signed join event.
pub(crate) async fn create_join_event_v1_route(
body: Ruma<create_join_event::v1::Request>,
) -> Result<create_join_event::v1::Response> {
let origin = body.origin.as_ref().expect("server is authenticated");
if services()
.globals
.config
.forbidden_remote_server_names
.contains(origin)
{
warn!(
"Server {origin} tried joining room ID {} who has a server name that is globally forbidden. Rejecting.",
&body.room_id,
);
return Err(Error::BadRequest(
ErrorKind::forbidden(),
"Server is banned on this homeserver.",
));
}
if let Some(server) = body.room_id.server_name() {
if services()
.globals
.config
.forbidden_remote_server_names
.contains(&server.to_owned())
{
warn!(
"Server {origin} tried joining room ID {} which has a server name that is globally forbidden. \
Rejecting.",
&body.room_id,
);
return Err(Error::BadRequest(
ErrorKind::forbidden(),
"Server is banned on this homeserver.",
));
}
}
let room_state = create_join_event(origin, &body.room_id, &body.pdu).await?;
Ok(create_join_event::v1::Response {
room_state,
})
}
/// # `PUT /_matrix/federation/v2/send_join/{roomId}/{eventId}`
///
/// Submits a signed join event.
pub(crate) async fn create_join_event_v2_route(
body: Ruma<create_join_event::v2::Request>,
) -> Result<create_join_event::v2::Response> {
let origin = body.origin.as_ref().expect("server is authenticated");
if services()
.globals
.config
.forbidden_remote_server_names
.contains(origin)
{
return Err(Error::BadRequest(
ErrorKind::forbidden(),
"Server is banned on this homeserver.",
));
}
if let Some(server) = body.room_id.server_name() {
if services()
.globals
.config
.forbidden_remote_server_names
.contains(&server.to_owned())
{
return Err(Error::BadRequest(
ErrorKind::forbidden(),
"Server is banned on this homeserver.",
));
}
}
let create_join_event::v1::RoomState {
auth_chain,
state,
event,
} = create_join_event(origin, &body.room_id, &body.pdu).await?;
let room_state = create_join_event::v2::RoomState {
members_omitted: false,
auth_chain,
state,
event,
servers_in_room: None,
};
Ok(create_join_event::v2::Response {
room_state,
})
}

View file

@ -0,0 +1,186 @@
#![allow(deprecated)]
use std::{collections::BTreeMap, sync::Arc};
use ruma::{
api::{client::error::ErrorKind, federation::membership::create_leave_event},
events::{room::member::MembershipState, StateEventType},
OwnedServerName, OwnedUserId, RoomId, ServerName,
};
use serde_json::value::RawValue as RawJsonValue;
use tokio::sync::RwLock;
use crate::{
service::{pdu::gen_event_id_canonical_json, server_is_ours},
services, Error, Result, Ruma,
};
/// # `PUT /_matrix/federation/v1/send_leave/{roomId}/{eventId}`
///
/// Submits a signed leave event.
pub(crate) async fn create_leave_event_v1_route(
body: Ruma<create_leave_event::v1::Request>,
) -> Result<create_leave_event::v1::Response> {
let origin = body.origin.as_ref().expect("server is authenticated");
create_leave_event(origin, &body.room_id, &body.pdu).await?;
Ok(create_leave_event::v1::Response::new())
}
/// # `PUT /_matrix/federation/v2/send_leave/{roomId}/{eventId}`
///
/// Submits a signed leave event.
pub(crate) async fn create_leave_event_v2_route(
body: Ruma<create_leave_event::v2::Request>,
) -> Result<create_leave_event::v2::Response> {
let origin = body.origin.as_ref().expect("server is authenticated");
create_leave_event(origin, &body.room_id, &body.pdu).await?;
Ok(create_leave_event::v2::Response::new())
}
async fn create_leave_event(origin: &ServerName, room_id: &RoomId, pdu: &RawJsonValue) -> Result<()> {
if !services().rooms.metadata.exists(room_id)? {
return Err(Error::BadRequest(ErrorKind::NotFound, "Room is unknown to this server."));
}
// ACL check origin
services().rooms.event_handler.acl_check(origin, room_id)?;
let pub_key_map = RwLock::new(BTreeMap::new());
// We do not add the event_id field to the pdu here because of signature and
// hashes checks
let room_version_id = services().rooms.state.get_room_version(room_id)?;
let Ok((event_id, value)) = gen_event_id_canonical_json(pdu, &room_version_id) else {
// Event could not be converted to canonical json
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Could not convert event to canonical json.",
));
};
let content = value
.get("content")
.ok_or_else(|| Error::BadRequest(ErrorKind::InvalidParam, "Event missing content property."))?
.as_object()
.ok_or_else(|| Error::BadRequest(ErrorKind::InvalidParam, "Event content not an object."))?;
let membership: MembershipState = serde_json::from_value(
content
.get("membership")
.ok_or_else(|| Error::BadRequest(ErrorKind::InvalidParam, "Event membership is missing."))?
.clone()
.into(),
)
.map_err(|_| Error::BadRequest(ErrorKind::BadJson, "Event membership state is not valid."))?;
if membership != MembershipState::Leave {
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Not allowed to send a non-leave membership event to leave endpoint.",
));
}
let event_type: StateEventType = serde_json::from_value(
value
.get("type")
.ok_or_else(|| Error::BadRequest(ErrorKind::InvalidParam, "Event missing type property."))?
.clone()
.into(),
)
.map_err(|_| Error::BadRequest(ErrorKind::BadJson, "Event does not have a valid state event type."))?;
if event_type != StateEventType::RoomMember {
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Not allowed to send non-membership state event to leave endpoint.",
));
}
// ACL check sender server name
let sender: OwnedUserId = serde_json::from_value(
value
.get("sender")
.ok_or_else(|| Error::BadRequest(ErrorKind::InvalidParam, "Event missing sender property."))?
.clone()
.into(),
)
.map_err(|_| Error::BadRequest(ErrorKind::BadJson, "User ID in sender is invalid."))?;
services()
.rooms
.event_handler
.acl_check(sender.server_name(), room_id)?;
if sender.server_name() != origin {
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Not allowed to leave on behalf of another server.",
));
}
let state_key: OwnedUserId = serde_json::from_value(
value
.get("state_key")
.ok_or_else(|| Error::BadRequest(ErrorKind::InvalidParam, "Event missing state_key property."))?
.clone()
.into(),
)
.map_err(|_| Error::BadRequest(ErrorKind::BadJson, "state_key is invalid or not a user ID"))?;
if state_key != sender {
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"state_key does not match sender user.",
));
}
let origin: OwnedServerName = serde_json::from_value(
serde_json::to_value(
value
.get("origin")
.ok_or_else(|| Error::BadRequest(ErrorKind::InvalidParam, "Event missing origin property."))?,
)
.expect("CanonicalJson is valid json value"),
)
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "origin is not a server name."))?;
services()
.rooms
.event_handler
.fetch_required_signing_keys([&value], &pub_key_map)
.await?;
let mutex = Arc::clone(
services()
.globals
.roomid_mutex_federation
.write()
.await
.entry(room_id.to_owned())
.or_default(),
);
let mutex_lock = mutex.lock().await;
let pdu_id: Vec<u8> = services()
.rooms
.event_handler
.handle_incoming_pdu(&origin, room_id, &event_id, value, true, &pub_key_map)
.await?
.ok_or_else(|| Error::BadRequest(ErrorKind::InvalidParam, "Could not accept as timeline event."))?;
drop(mutex_lock);
let servers = services()
.rooms
.state_cache
.room_servers(room_id)
.filter_map(Result::ok)
.filter(|server| !server_is_ours(server));
services().sending.send_pdu_servers(servers, &pdu_id)?;
Ok(())
}

71
src/api/server/state.rs Normal file
View file

@ -0,0 +1,71 @@
use std::sync::Arc;
use ruma::api::{client::error::ErrorKind, federation::event::get_room_state};
use crate::{services, Error, PduEvent, Result, Ruma};
/// # `GET /_matrix/federation/v1/state/{roomId}`
///
/// Retrieves the current state of the room.
pub(crate) async fn get_room_state_route(
body: Ruma<get_room_state::v1::Request>,
) -> Result<get_room_state::v1::Response> {
let origin = body.origin.as_ref().expect("server is authenticated");
if !services()
.rooms
.state_cache
.server_in_room(origin, &body.room_id)?
{
return Err(Error::BadRequest(ErrorKind::forbidden(), "Server is not in room."));
}
services()
.rooms
.event_handler
.acl_check(origin, &body.room_id)?;
let shortstatehash = services()
.rooms
.state_accessor
.pdu_shortstatehash(&body.event_id)?
.ok_or_else(|| Error::BadRequest(ErrorKind::NotFound, "Pdu state not found."))?;
let pdus = services()
.rooms
.state_accessor
.state_full_ids(shortstatehash)
.await?
.into_values()
.map(|id| {
PduEvent::convert_to_outgoing_federation_event(
services()
.rooms
.timeline
.get_pdu_json(&id)
.unwrap()
.unwrap(),
)
})
.collect();
let auth_chain_ids = services()
.rooms
.auth_chain
.event_ids_iter(&body.room_id, vec![Arc::from(&*body.event_id)])
.await?;
Ok(get_room_state::v1::Response {
auth_chain: auth_chain_ids
.filter_map(|id| {
services()
.rooms
.timeline
.get_pdu_json(&id)
.ok()?
.map(PduEvent::convert_to_outgoing_federation_event)
})
.collect(),
pdus,
})
}

View file

@ -0,0 +1,53 @@
use std::sync::Arc;
use ruma::api::{client::error::ErrorKind, federation::event::get_room_state_ids};
use crate::{services, Error, Result, Ruma};
/// # `GET /_matrix/federation/v1/state_ids/{roomId}`
///
/// Retrieves the current state of the room.
pub(crate) async fn get_room_state_ids_route(
body: Ruma<get_room_state_ids::v1::Request>,
) -> Result<get_room_state_ids::v1::Response> {
let origin = body.origin.as_ref().expect("server is authenticated");
if !services()
.rooms
.state_cache
.server_in_room(origin, &body.room_id)?
{
return Err(Error::BadRequest(ErrorKind::forbidden(), "Server is not in room."));
}
services()
.rooms
.event_handler
.acl_check(origin, &body.room_id)?;
let shortstatehash = services()
.rooms
.state_accessor
.pdu_shortstatehash(&body.event_id)?
.ok_or_else(|| Error::BadRequest(ErrorKind::NotFound, "Pdu state not found."))?;
let pdu_ids = services()
.rooms
.state_accessor
.state_full_ids(shortstatehash)
.await?
.into_values()
.map(|id| (*id).to_owned())
.collect();
let auth_chain_ids = services()
.rooms
.auth_chain
.event_ids_iter(&body.room_id, vec![Arc::from(&*body.event_id)])
.await?;
Ok(get_room_state_ids::v1::Response {
auth_chain_ids: auth_chain_ids.map(|id| (*id).to_owned()).collect(),
pdu_ids,
})
}

108
src/api/server/user.rs Normal file
View file

@ -0,0 +1,108 @@
use ruma::api::{
client::error::ErrorKind,
federation::{
device::get_devices::{self, v1::UserDevice},
keys::{claim_keys, get_keys},
},
};
use crate::{
client_server::{claim_keys_helper, get_keys_helper},
service::user_is_local,
services, Error, Result, Ruma,
};
/// # `GET /_matrix/federation/v1/user/devices/{userId}`
///
/// Gets information on all devices of the user.
pub(crate) async fn get_devices_route(body: Ruma<get_devices::v1::Request>) -> Result<get_devices::v1::Response> {
if !user_is_local(&body.user_id) {
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Tried to access user from other server.",
));
}
let origin = body.origin.as_ref().expect("server is authenticated");
Ok(get_devices::v1::Response {
user_id: body.user_id.clone(),
stream_id: services()
.users
.get_devicelist_version(&body.user_id)?
.unwrap_or(0)
.try_into()
.expect("version will not grow that large"),
devices: services()
.users
.all_devices_metadata(&body.user_id)
.filter_map(Result::ok)
.filter_map(|metadata| {
let device_id_string = metadata.device_id.as_str().to_owned();
let device_display_name = if services().globals.allow_device_name_federation() {
metadata.display_name
} else {
Some(device_id_string)
};
Some(UserDevice {
keys: services()
.users
.get_device_keys(&body.user_id, &metadata.device_id)
.ok()??,
device_id: metadata.device_id,
device_display_name,
})
})
.collect(),
master_key: services()
.users
.get_master_key(None, &body.user_id, &|u| u.server_name() == origin)?,
self_signing_key: services()
.users
.get_self_signing_key(None, &body.user_id, &|u| u.server_name() == origin)?,
})
}
/// # `POST /_matrix/federation/v1/user/keys/query`
///
/// Gets devices and identity keys for the given users.
pub(crate) async fn get_keys_route(body: Ruma<get_keys::v1::Request>) -> Result<get_keys::v1::Response> {
if body.device_keys.iter().any(|(u, _)| !user_is_local(u)) {
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"User does not belong to this server.",
));
}
let result = get_keys_helper(
None,
&body.device_keys,
|u| Some(u.server_name()) == body.origin.as_deref(),
services().globals.allow_device_name_federation(),
)
.await?;
Ok(get_keys::v1::Response {
device_keys: result.device_keys,
master_keys: result.master_keys,
self_signing_keys: result.self_signing_keys,
})
}
/// # `POST /_matrix/federation/v1/user/keys/claim`
///
/// Claims one-time keys.
pub(crate) async fn claim_keys_route(body: Ruma<claim_keys::v1::Request>) -> Result<claim_keys::v1::Response> {
if body.one_time_keys.iter().any(|(u, _)| !user_is_local(u)) {
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Tried to access user from other server.",
));
}
let result = claim_keys_helper(&body.one_time_keys).await?;
Ok(claim_keys::v1::Response {
one_time_keys: result.one_time_keys,
})
}

17
src/api/server/version.rs Normal file
View file

@ -0,0 +1,17 @@
use ruma::api::federation::discovery::get_server_version;
use crate::{Result, Ruma};
/// # `GET /_matrix/federation/v1/version`
///
/// Get version information on this server.
pub(crate) async fn get_server_version_route(
_body: Ruma<get_server_version::v1::Request>,
) -> Result<get_server_version::v1::Response> {
Ok(get_server_version::v1::Response {
server: Some(get_server_version::v1::Server {
name: Some("Conduwuit".to_owned()),
version: Some(conduit::version::conduwuit()),
}),
})
}

View file

@ -0,0 +1,17 @@
use ruma::api::{client::error::ErrorKind, federation::discovery::discover_homeserver};
use crate::{services, Error, Result, Ruma};
/// # `GET /.well-known/matrix/server`
///
/// Returns the .well-known URL if it is configured, otherwise returns 404.
pub(crate) async fn well_known_server(
_body: Ruma<discover_homeserver::Request>,
) -> Result<discover_homeserver::Response> {
Ok(discover_homeserver::Response {
server: match services().globals.well_known_server() {
Some(server_name) => server_name.to_owned(),
None => return Err(Error::BadRequest(ErrorKind::NotFound, "Not found.")),
},
})
}

File diff suppressed because it is too large Load diff