diff --git a/src/api/client/membership.rs b/src/api/client/membership.rs index 8529a9df..bb19ee29 100644 --- a/src/api/client/membership.rs +++ b/src/api/client/membership.rs @@ -33,6 +33,7 @@ use ruma::{ OwnedUserId, RoomId, RoomVersionId, ServerName, UserId, }; use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; +use service::sending::convert_to_outgoing_federation_event; use tokio::sync::RwLock; use tracing::{debug, error, info, trace, warn}; @@ -779,7 +780,7 @@ async fn join_room_by_id_helper_remote( federation::membership::create_join_event::v2::Request { room_id: room_id.to_owned(), event_id: event_id.to_owned(), - pdu: PduEvent::convert_to_outgoing_federation_event(join_event.clone()), + pdu: convert_to_outgoing_federation_event(join_event.clone()), omit_members: false, }, ) @@ -1207,7 +1208,7 @@ async fn join_room_by_id_helper_local( federation::membership::create_join_event::v2::Request { room_id: room_id.to_owned(), event_id: event_id.to_owned(), - pdu: PduEvent::convert_to_outgoing_federation_event(join_event.clone()), + pdu: convert_to_outgoing_federation_event(join_event.clone()), omit_members: false, }, ) @@ -1438,7 +1439,7 @@ pub(crate) async fn invite_helper( room_id: room_id.to_owned(), event_id: (*pdu.event_id).to_owned(), room_version: room_version_id.clone(), - event: PduEvent::convert_to_outgoing_federation_event(pdu_json.clone()), + event: convert_to_outgoing_federation_event(pdu_json.clone()), invite_room_state, via: services().rooms.state_cache.servers_route_via(room_id).ok(), }, @@ -1775,7 +1776,7 @@ async fn remote_leave_room(user_id: &UserId, room_id: &RoomId) -> Result<()> { federation::membership::create_leave_event::v2::Request { room_id: room_id.to_owned(), event_id, - pdu: PduEvent::convert_to_outgoing_federation_event(leave_event.clone()), + pdu: convert_to_outgoing_federation_event(leave_event.clone()), }, ) .await?; diff --git a/src/api/server/backfill.rs b/src/api/server/backfill.rs index e3ff82e4..b432ae20 100644 --- a/src/api/server/backfill.rs +++ b/src/api/server/backfill.rs @@ -1,9 +1,11 @@ +use conduit::{Error, Result}; use ruma::{ api::{client::error::ErrorKind, federation::backfill::get_backfill}, uint, user_id, MilliSecondsSinceUnixEpoch, }; +use service::{sending::convert_to_outgoing_federation_event, services}; -use crate::{services, Error, PduEvent, Result, Ruma}; +use crate::Ruma; /// # `GET /_matrix/federation/v1/backfill/` /// @@ -62,7 +64,7 @@ pub(crate) async fn get_backfill_route(body: Ruma) -> }) .map(|(_, pdu)| services().rooms.timeline.get_pdu_json(&pdu.event_id)) .filter_map(|r| r.ok().flatten()) - .map(PduEvent::convert_to_outgoing_federation_event) + .map(convert_to_outgoing_federation_event) .collect(); Ok(get_backfill::v1::Response { diff --git a/src/api/server/event.rs b/src/api/server/event.rs index 29f538b4..f4c9d145 100644 --- a/src/api/server/event.rs +++ b/src/api/server/event.rs @@ -1,9 +1,11 @@ +use conduit::{Error, Result}; use ruma::{ api::{client::error::ErrorKind, federation::event::get_event}, MilliSecondsSinceUnixEpoch, RoomId, }; +use service::{sending::convert_to_outgoing_federation_event, services}; -use crate::{services, Error, PduEvent, Result, Ruma}; +use crate::Ruma; /// # `GET /_matrix/federation/v1/event/{eventId}` /// @@ -48,6 +50,6 @@ pub(crate) async fn get_event_route(body: Ruma) -> Resul Ok(get_event::v1::Response { origin: services().globals.server_name().to_owned(), origin_server_ts: MilliSecondsSinceUnixEpoch::now(), - pdu: PduEvent::convert_to_outgoing_federation_event(event), + pdu: convert_to_outgoing_federation_event(event), }) } diff --git a/src/api/server/event_auth.rs b/src/api/server/event_auth.rs index 1ddf2ce3..bef5116b 100644 --- a/src/api/server/event_auth.rs +++ b/src/api/server/event_auth.rs @@ -1,11 +1,13 @@ use std::sync::Arc; +use conduit::{Error, Result}; use ruma::{ api::{client::error::ErrorKind, federation::authorization::get_event_authorization}, RoomId, }; +use service::{sending::convert_to_outgoing_federation_event, services}; -use crate::{services, Error, PduEvent, Result, Ruma}; +use crate::Ruma; /// # `GET /_matrix/federation/v1/event_auth/{roomId}/{eventId}` /// @@ -57,7 +59,7 @@ pub(crate) async fn get_event_authorization_route( Ok(get_event_authorization::v1::Response { auth_chain: auth_chain_ids .filter_map(|id| services().rooms.timeline.get_pdu_json(&id).ok()?) - .map(PduEvent::convert_to_outgoing_federation_event) + .map(convert_to_outgoing_federation_event) .collect(), }) } diff --git a/src/api/server/get_missing_events.rs b/src/api/server/get_missing_events.rs index 1c9a6a38..5ab9abf8 100644 --- a/src/api/server/get_missing_events.rs +++ b/src/api/server/get_missing_events.rs @@ -1,9 +1,11 @@ +use conduit::{Error, Result}; use ruma::{ api::{client::error::ErrorKind, federation::event::get_missing_events}, OwnedEventId, RoomId, }; +use service::{sending::convert_to_outgoing_federation_event, services}; -use crate::{services, Error, PduEvent, Result, Ruma}; +use crate::Ruma; /// # `POST /_matrix/federation/v1/get_missing_events/{roomId}` /// @@ -79,7 +81,7 @@ pub(crate) async fn get_missing_events_route( ) .map_err(|_| Error::bad_database("Invalid prev_events in event in database."))?, ); - events.push(PduEvent::convert_to_outgoing_federation_event(pdu)); + events.push(convert_to_outgoing_federation_event(pdu)); } i = i.saturating_add(1); } diff --git a/src/api/server/invite.rs b/src/api/server/invite.rs index 333ebee6..89b90058 100644 --- a/src/api/server/invite.rs +++ b/src/api/server/invite.rs @@ -1,18 +1,14 @@ use axum_client_ip::InsecureClientIp; +use conduit::{utils, warn, Error, PduEvent, Result}; use ruma::{ api::{client::error::ErrorKind, federation::membership::create_invite}, events::room::member::{MembershipState, RoomMemberEventContent}, serde::JsonObject, CanonicalJsonValue, EventId, OwnedUserId, }; -use tracing::warn; +use service::{sending::convert_to_outgoing_federation_event, server_is_ours, services}; -use crate::{ - service::server_is_ours, - services, - utils::{self}, - Error, PduEvent, Result, Ruma, -}; +use crate::Ruma; /// # `PUT /_matrix/federation/v2/invite/{roomId}/{eventId}` /// @@ -176,6 +172,6 @@ pub(crate) async fn create_invite_route( } Ok(create_invite::v2::Response { - event: PduEvent::convert_to_outgoing_federation_event(signed_event), + event: convert_to_outgoing_federation_event(signed_event), }) } diff --git a/src/api/server/send_join.rs b/src/api/server/send_join.rs index 82cfc45e..ff362f64 100644 --- a/src/api/server/send_join.rs +++ b/src/api/server/send_join.rs @@ -2,6 +2,7 @@ use std::collections::BTreeMap; +use conduit::{Error, Result}; use ruma::{ api::{client::error::ErrorKind, federation::membership::create_join_event}, events::{ @@ -11,11 +12,13 @@ use ruma::{ CanonicalJsonValue, OwnedServerName, OwnedUserId, RoomId, ServerName, }; use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; -use service::user_is_local; +use service::{ + pdu::gen_event_id_canonical_json, sending::convert_to_outgoing_federation_event, services, user_is_local, +}; use tokio::sync::RwLock; use tracing::warn; -use crate::{service::pdu::gen_event_id_canonical_json, services, Error, PduEvent, Result, Ruma}; +use crate::Ruma; /// helper method for /send_join v1 and v2 async fn create_join_event( @@ -181,12 +184,12 @@ async fn create_join_event( Ok(create_join_event::v1::RoomState { auth_chain: auth_chain_ids .filter_map(|id| services().rooms.timeline.get_pdu_json(&id).ok().flatten()) - .map(PduEvent::convert_to_outgoing_federation_event) + .map(convert_to_outgoing_federation_event) .collect(), state: state_ids .iter() .filter_map(|(_, id)| services().rooms.timeline.get_pdu_json(id).ok().flatten()) - .map(PduEvent::convert_to_outgoing_federation_event) + .map(convert_to_outgoing_federation_event) .collect(), // Event field is required if the room version supports restricted join rules. event: Some( diff --git a/src/api/server/state.rs b/src/api/server/state.rs index c858f6fd..22044840 100644 --- a/src/api/server/state.rs +++ b/src/api/server/state.rs @@ -1,8 +1,10 @@ use std::sync::Arc; +use conduit::{Error, Result}; use ruma::api::{client::error::ErrorKind, federation::event::get_room_state}; +use service::{sending::convert_to_outgoing_federation_event, services}; -use crate::{services, Error, PduEvent, Result, Ruma}; +use crate::Ruma; /// # `GET /_matrix/federation/v1/state/{roomId}` /// @@ -42,7 +44,7 @@ pub(crate) async fn get_room_state_route( .await? .into_values() .map(|id| { - PduEvent::convert_to_outgoing_federation_event( + convert_to_outgoing_federation_event( services() .rooms .timeline @@ -67,7 +69,7 @@ pub(crate) async fn get_room_state_route( .timeline .get_pdu_json(&id) .ok()? - .map(PduEvent::convert_to_outgoing_federation_event) + .map(convert_to_outgoing_federation_event) }) .collect(), pdus, diff --git a/src/core/mod.rs b/src/core/mod.rs index 5ffe4cb9..ec536ee2 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -4,14 +4,14 @@ pub mod debug; pub mod error; pub mod log; pub mod mods; -pub mod pducount; +pub mod pdu; pub mod server; pub mod utils; pub mod version; pub use config::Config; pub use error::{Error, RumaResponse}; -pub use pducount::PduCount; +pub use pdu::{PduBuilder, PduCount, PduEvent}; pub use server::Server; pub use version::version; diff --git a/src/core/pdu/builder.rs b/src/core/pdu/builder.rs new file mode 100644 index 00000000..a8bad677 --- /dev/null +++ b/src/core/pdu/builder.rs @@ -0,0 +1,16 @@ +use std::{collections::BTreeMap, sync::Arc}; + +use ruma::{events::TimelineEventType, EventId}; +use serde::Deserialize; +use serde_json::value::RawValue as RawJsonValue; + +/// Build the start of a PDU in order to add it to the Database. +#[derive(Debug, Deserialize)] +pub struct PduBuilder { + #[serde(rename = "type")] + pub event_type: TimelineEventType, + pub content: Box, + pub unsigned: Option>, + pub state_key: Option, + pub redacts: Option>, +} diff --git a/src/core/pducount.rs b/src/core/pdu/count.rs similarity index 100% rename from src/core/pducount.rs rename to src/core/pdu/count.rs diff --git a/src/service/pdu.rs b/src/core/pdu/mod.rs similarity index 88% rename from src/service/pdu.rs rename to src/core/pdu/mod.rs index b5650c0a..c7d93608 100644 --- a/src/service/pdu.rs +++ b/src/core/pdu/mod.rs @@ -1,6 +1,10 @@ +mod builder; +mod count; + use std::{cmp::Ordering, collections::BTreeMap, sync::Arc}; -use conduit::{warn, Error}; +pub use builder::PduBuilder; +pub use count::PduCount; use ruma::{ canonical_json::redact_content_in_place, events::{ @@ -19,7 +23,7 @@ use serde_json::{ value::{to_raw_value, RawValue as RawJsonValue}, }; -use crate::services; +use crate::{warn, Error}; #[derive(Deserialize)] struct ExtractRedactedBecause { @@ -336,42 +340,6 @@ impl PduEvent { serde_json::from_value(json).expect("Raw::from_value always works") } - /// This does not return a full `Pdu` it is only to satisfy ruma's types. - #[tracing::instrument] - pub fn convert_to_outgoing_federation_event(mut pdu_json: CanonicalJsonObject) -> Box { - if let Some(unsigned) = pdu_json - .get_mut("unsigned") - .and_then(|val| val.as_object_mut()) - { - unsigned.remove("transaction_id"); - } - - // room v3 and above removed the "event_id" field from remote PDU format - if let Some(room_id) = pdu_json - .get("room_id") - .and_then(|val| RoomId::parse(val.as_str()?).ok()) - { - match services().rooms.state.get_room_version(&room_id) { - Ok(room_version_id) => match room_version_id { - RoomVersionId::V1 | RoomVersionId::V2 => {}, - _ => _ = pdu_json.remove("event_id"), - }, - Err(_) => _ = pdu_json.remove("event_id"), - } - } else { - pdu_json.remove("event_id"); - } - - // TODO: another option would be to convert it to a canonical string to validate - // size and return a Result> - // serde_json::from_str::>( - // ruma::serde::to_canonical_json_string(pdu_json).expect("CanonicalJson is - // valid serde_json::Value"), ) - // .expect("Raw::from_value always works") - - to_raw_value(&pdu_json).expect("CanonicalJson is valid serde_json::Value") - } - pub fn from_id_val(event_id: &EventId, mut json: CanonicalJsonObject) -> Result { json.insert("event_id".to_owned(), CanonicalJsonValue::String(event_id.as_str().to_owned())); @@ -438,14 +406,3 @@ pub fn gen_event_id_canonical_json( Ok((event_id, value)) } - -/// Build the start of a PDU in order to add it to the Database. -#[derive(Debug, Deserialize)] -pub struct PduBuilder { - #[serde(rename = "type")] - pub event_type: TimelineEventType, - pub content: Box, - pub unsigned: Option>, - pub state_key: Option, - pub redacts: Option>, -} diff --git a/src/service/mod.rs b/src/service/mod.rs index c80c9862..641dd36a 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -1,4 +1,3 @@ -pub mod pdu; pub mod services; pub mod account_data; @@ -20,12 +19,12 @@ extern crate conduit_database as database; use std::sync::{Arc, RwLock}; -pub(crate) use conduit::{config, debug_error, debug_info, debug_warn, utils, Config, Error, PduCount, Result, Server}; +pub(crate) use conduit::{config, debug_error, debug_info, debug_warn, utils, Config, Error, Result, Server}; +pub use conduit::{pdu, PduBuilder, PduCount, PduEvent}; use database::Database; pub use crate::{ globals::{server_is_ours, user_is_local}, - pdu::PduEvent, services::Services, }; diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index 9bda6cad..c57ca14d 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -14,6 +14,7 @@ use ruma::{ api::{appservice::Registration, OutgoingRequest}, OwnedServerName, OwnedUserId, RoomId, ServerName, UserId, }; +pub use sender::convert_to_outgoing_federation_event; use tokio::{sync::Mutex, task::JoinHandle}; use tracing::{error, warn}; diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index aa2865ec..61cb9f6d 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -18,12 +18,14 @@ use ruma::{ }, device_id, events::{push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent, GlobalAccountDataEventType}, - push, uint, MilliSecondsSinceUnixEpoch, OwnedServerName, OwnedUserId, RoomId, ServerName, UInt, + push, uint, CanonicalJsonObject, MilliSecondsSinceUnixEpoch, OwnedServerName, OwnedUserId, RoomId, RoomVersionId, + ServerName, UInt, }; +use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; use tracing::{debug, error, warn}; use super::{appservice, send, Destination, Msg, SendingEvent, Service}; -use crate::{presence::Presence, services, user_is_local, utils::calculate_hash, Error, PduEvent, Result}; +use crate::{presence::Presence, services, user_is_local, utils::calculate_hash, Error, Result}; #[derive(Debug)] enum TransactionStatus { @@ -548,24 +550,21 @@ async fn send_events_dest_normal( for event in &events { match event { - SendingEvent::Pdu(pdu_id) => { + SendingEvent::Pdu(pdu_id) => pdu_jsons.push(convert_to_outgoing_federation_event( // TODO: check room version and remove event_id if needed - let raw = PduEvent::convert_to_outgoing_federation_event( - services() - .rooms - .timeline - .get_pdu_json_from_id(pdu_id) - .map_err(|e| (dest.clone(), e))? - .ok_or_else(|| { - error!(?dest, ?server, ?pdu_id, "event not found"); - ( - dest.clone(), - Error::bad_database("[Normal] Event in servernameevent_data not found in db."), - ) - })?, - ); - pdu_jsons.push(raw); - }, + services() + .rooms + .timeline + .get_pdu_json_from_id(pdu_id) + .map_err(|e| (dest.clone(), e))? + .ok_or_else(|| { + error!(?dest, ?server, ?pdu_id, "event not found"); + ( + dest.clone(), + Error::bad_database("[Normal] Event in servernameevent_data not found in db."), + ) + })?, + )), SendingEvent::Edu(edu) => { if let Ok(raw) = serde_json::from_slice(edu) { edu_jsons.push(raw); @@ -611,3 +610,39 @@ async fn send_events_dest_normal( }) .map_err(|e| (dest.clone(), e)) } + +/// This does not return a full `Pdu` it is only to satisfy ruma's types. +#[tracing::instrument] +pub fn convert_to_outgoing_federation_event(mut pdu_json: CanonicalJsonObject) -> Box { + if let Some(unsigned) = pdu_json + .get_mut("unsigned") + .and_then(|val| val.as_object_mut()) + { + unsigned.remove("transaction_id"); + } + + // room v3 and above removed the "event_id" field from remote PDU format + if let Some(room_id) = pdu_json + .get("room_id") + .and_then(|val| RoomId::parse(val.as_str()?).ok()) + { + match services().rooms.state.get_room_version(&room_id) { + Ok(room_version_id) => match room_version_id { + RoomVersionId::V1 | RoomVersionId::V2 => {}, + _ => _ = pdu_json.remove("event_id"), + }, + Err(_) => _ = pdu_json.remove("event_id"), + } + } else { + pdu_json.remove("event_id"); + } + + // TODO: another option would be to convert it to a canonical string to validate + // size and return a Result> + // serde_json::from_str::>( + // ruma::serde::to_canonical_json_string(pdu_json).expect("CanonicalJson is + // valid serde_json::Value"), ) + // .expect("Raw::from_value always works") + + to_raw_value(&pdu_json).expect("CanonicalJson is valid serde_json::Value") +}