move PduEvent from services to core

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-07-03 21:05:24 +00:00
parent 229f2fde7a
commit 0cea64309a
15 changed files with 118 additions and 100 deletions

View file

@ -33,6 +33,7 @@ use ruma::{
OwnedUserId, RoomId, RoomVersionId, ServerName, UserId, OwnedUserId, RoomId, RoomVersionId, ServerName, UserId,
}; };
use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
use service::sending::convert_to_outgoing_federation_event;
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tracing::{debug, error, info, trace, warn}; use tracing::{debug, error, info, trace, warn};
@ -779,7 +780,7 @@ async fn join_room_by_id_helper_remote(
federation::membership::create_join_event::v2::Request { federation::membership::create_join_event::v2::Request {
room_id: room_id.to_owned(), room_id: room_id.to_owned(),
event_id: event_id.to_owned(), event_id: event_id.to_owned(),
pdu: PduEvent::convert_to_outgoing_federation_event(join_event.clone()), pdu: convert_to_outgoing_federation_event(join_event.clone()),
omit_members: false, omit_members: false,
}, },
) )
@ -1207,7 +1208,7 @@ async fn join_room_by_id_helper_local(
federation::membership::create_join_event::v2::Request { federation::membership::create_join_event::v2::Request {
room_id: room_id.to_owned(), room_id: room_id.to_owned(),
event_id: event_id.to_owned(), event_id: event_id.to_owned(),
pdu: PduEvent::convert_to_outgoing_federation_event(join_event.clone()), pdu: convert_to_outgoing_federation_event(join_event.clone()),
omit_members: false, omit_members: false,
}, },
) )
@ -1438,7 +1439,7 @@ pub(crate) async fn invite_helper(
room_id: room_id.to_owned(), room_id: room_id.to_owned(),
event_id: (*pdu.event_id).to_owned(), event_id: (*pdu.event_id).to_owned(),
room_version: room_version_id.clone(), room_version: room_version_id.clone(),
event: PduEvent::convert_to_outgoing_federation_event(pdu_json.clone()), event: convert_to_outgoing_federation_event(pdu_json.clone()),
invite_room_state, invite_room_state,
via: services().rooms.state_cache.servers_route_via(room_id).ok(), via: services().rooms.state_cache.servers_route_via(room_id).ok(),
}, },
@ -1775,7 +1776,7 @@ async fn remote_leave_room(user_id: &UserId, room_id: &RoomId) -> Result<()> {
federation::membership::create_leave_event::v2::Request { federation::membership::create_leave_event::v2::Request {
room_id: room_id.to_owned(), room_id: room_id.to_owned(),
event_id, event_id,
pdu: PduEvent::convert_to_outgoing_federation_event(leave_event.clone()), pdu: convert_to_outgoing_federation_event(leave_event.clone()),
}, },
) )
.await?; .await?;

View file

@ -1,9 +1,11 @@
use conduit::{Error, Result};
use ruma::{ use ruma::{
api::{client::error::ErrorKind, federation::backfill::get_backfill}, api::{client::error::ErrorKind, federation::backfill::get_backfill},
uint, user_id, MilliSecondsSinceUnixEpoch, uint, user_id, MilliSecondsSinceUnixEpoch,
}; };
use service::{sending::convert_to_outgoing_federation_event, services};
use crate::{services, Error, PduEvent, Result, Ruma}; use crate::Ruma;
/// # `GET /_matrix/federation/v1/backfill/<room_id>` /// # `GET /_matrix/federation/v1/backfill/<room_id>`
/// ///
@ -62,7 +64,7 @@ pub(crate) async fn get_backfill_route(body: Ruma<get_backfill::v1::Request>) ->
}) })
.map(|(_, pdu)| services().rooms.timeline.get_pdu_json(&pdu.event_id)) .map(|(_, pdu)| services().rooms.timeline.get_pdu_json(&pdu.event_id))
.filter_map(|r| r.ok().flatten()) .filter_map(|r| r.ok().flatten())
.map(PduEvent::convert_to_outgoing_federation_event) .map(convert_to_outgoing_federation_event)
.collect(); .collect();
Ok(get_backfill::v1::Response { Ok(get_backfill::v1::Response {

View file

@ -1,9 +1,11 @@
use conduit::{Error, Result};
use ruma::{ use ruma::{
api::{client::error::ErrorKind, federation::event::get_event}, api::{client::error::ErrorKind, federation::event::get_event},
MilliSecondsSinceUnixEpoch, RoomId, MilliSecondsSinceUnixEpoch, RoomId,
}; };
use service::{sending::convert_to_outgoing_federation_event, services};
use crate::{services, Error, PduEvent, Result, Ruma}; use crate::Ruma;
/// # `GET /_matrix/federation/v1/event/{eventId}` /// # `GET /_matrix/federation/v1/event/{eventId}`
/// ///
@ -48,6 +50,6 @@ pub(crate) async fn get_event_route(body: Ruma<get_event::v1::Request>) -> Resul
Ok(get_event::v1::Response { Ok(get_event::v1::Response {
origin: services().globals.server_name().to_owned(), origin: services().globals.server_name().to_owned(),
origin_server_ts: MilliSecondsSinceUnixEpoch::now(), origin_server_ts: MilliSecondsSinceUnixEpoch::now(),
pdu: PduEvent::convert_to_outgoing_federation_event(event), pdu: convert_to_outgoing_federation_event(event),
}) })
} }

View file

@ -1,11 +1,13 @@
use std::sync::Arc; use std::sync::Arc;
use conduit::{Error, Result};
use ruma::{ use ruma::{
api::{client::error::ErrorKind, federation::authorization::get_event_authorization}, api::{client::error::ErrorKind, federation::authorization::get_event_authorization},
RoomId, RoomId,
}; };
use service::{sending::convert_to_outgoing_federation_event, services};
use crate::{services, Error, PduEvent, Result, Ruma}; use crate::Ruma;
/// # `GET /_matrix/federation/v1/event_auth/{roomId}/{eventId}` /// # `GET /_matrix/federation/v1/event_auth/{roomId}/{eventId}`
/// ///
@ -57,7 +59,7 @@ pub(crate) async fn get_event_authorization_route(
Ok(get_event_authorization::v1::Response { Ok(get_event_authorization::v1::Response {
auth_chain: auth_chain_ids auth_chain: auth_chain_ids
.filter_map(|id| services().rooms.timeline.get_pdu_json(&id).ok()?) .filter_map(|id| services().rooms.timeline.get_pdu_json(&id).ok()?)
.map(PduEvent::convert_to_outgoing_federation_event) .map(convert_to_outgoing_federation_event)
.collect(), .collect(),
}) })
} }

View file

@ -1,9 +1,11 @@
use conduit::{Error, Result};
use ruma::{ use ruma::{
api::{client::error::ErrorKind, federation::event::get_missing_events}, api::{client::error::ErrorKind, federation::event::get_missing_events},
OwnedEventId, RoomId, OwnedEventId, RoomId,
}; };
use service::{sending::convert_to_outgoing_federation_event, services};
use crate::{services, Error, PduEvent, Result, Ruma}; use crate::Ruma;
/// # `POST /_matrix/federation/v1/get_missing_events/{roomId}` /// # `POST /_matrix/federation/v1/get_missing_events/{roomId}`
/// ///
@ -79,7 +81,7 @@ pub(crate) async fn get_missing_events_route(
) )
.map_err(|_| Error::bad_database("Invalid prev_events in event in database."))?, .map_err(|_| Error::bad_database("Invalid prev_events in event in database."))?,
); );
events.push(PduEvent::convert_to_outgoing_federation_event(pdu)); events.push(convert_to_outgoing_federation_event(pdu));
} }
i = i.saturating_add(1); i = i.saturating_add(1);
} }

View file

@ -1,18 +1,14 @@
use axum_client_ip::InsecureClientIp; use axum_client_ip::InsecureClientIp;
use conduit::{utils, warn, Error, PduEvent, Result};
use ruma::{ use ruma::{
api::{client::error::ErrorKind, federation::membership::create_invite}, api::{client::error::ErrorKind, federation::membership::create_invite},
events::room::member::{MembershipState, RoomMemberEventContent}, events::room::member::{MembershipState, RoomMemberEventContent},
serde::JsonObject, serde::JsonObject,
CanonicalJsonValue, EventId, OwnedUserId, CanonicalJsonValue, EventId, OwnedUserId,
}; };
use tracing::warn; use service::{sending::convert_to_outgoing_federation_event, server_is_ours, services};
use crate::{ use crate::Ruma;
service::server_is_ours,
services,
utils::{self},
Error, PduEvent, Result, Ruma,
};
/// # `PUT /_matrix/federation/v2/invite/{roomId}/{eventId}` /// # `PUT /_matrix/federation/v2/invite/{roomId}/{eventId}`
/// ///
@ -176,6 +172,6 @@ pub(crate) async fn create_invite_route(
} }
Ok(create_invite::v2::Response { Ok(create_invite::v2::Response {
event: PduEvent::convert_to_outgoing_federation_event(signed_event), event: convert_to_outgoing_federation_event(signed_event),
}) })
} }

View file

@ -2,6 +2,7 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use conduit::{Error, Result};
use ruma::{ use ruma::{
api::{client::error::ErrorKind, federation::membership::create_join_event}, api::{client::error::ErrorKind, federation::membership::create_join_event},
events::{ events::{
@ -11,11 +12,13 @@ use ruma::{
CanonicalJsonValue, OwnedServerName, OwnedUserId, RoomId, ServerName, CanonicalJsonValue, OwnedServerName, OwnedUserId, RoomId, ServerName,
}; };
use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
use service::user_is_local; use service::{
pdu::gen_event_id_canonical_json, sending::convert_to_outgoing_federation_event, services, user_is_local,
};
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tracing::warn; use tracing::warn;
use crate::{service::pdu::gen_event_id_canonical_json, services, Error, PduEvent, Result, Ruma}; use crate::Ruma;
/// helper method for /send_join v1 and v2 /// helper method for /send_join v1 and v2
async fn create_join_event( async fn create_join_event(
@ -181,12 +184,12 @@ async fn create_join_event(
Ok(create_join_event::v1::RoomState { Ok(create_join_event::v1::RoomState {
auth_chain: auth_chain_ids auth_chain: auth_chain_ids
.filter_map(|id| services().rooms.timeline.get_pdu_json(&id).ok().flatten()) .filter_map(|id| services().rooms.timeline.get_pdu_json(&id).ok().flatten())
.map(PduEvent::convert_to_outgoing_federation_event) .map(convert_to_outgoing_federation_event)
.collect(), .collect(),
state: state_ids state: state_ids
.iter() .iter()
.filter_map(|(_, id)| services().rooms.timeline.get_pdu_json(id).ok().flatten()) .filter_map(|(_, id)| services().rooms.timeline.get_pdu_json(id).ok().flatten())
.map(PduEvent::convert_to_outgoing_federation_event) .map(convert_to_outgoing_federation_event)
.collect(), .collect(),
// Event field is required if the room version supports restricted join rules. // Event field is required if the room version supports restricted join rules.
event: Some( event: Some(

View file

@ -1,8 +1,10 @@
use std::sync::Arc; use std::sync::Arc;
use conduit::{Error, Result};
use ruma::api::{client::error::ErrorKind, federation::event::get_room_state}; use ruma::api::{client::error::ErrorKind, federation::event::get_room_state};
use service::{sending::convert_to_outgoing_federation_event, services};
use crate::{services, Error, PduEvent, Result, Ruma}; use crate::Ruma;
/// # `GET /_matrix/federation/v1/state/{roomId}` /// # `GET /_matrix/federation/v1/state/{roomId}`
/// ///
@ -42,7 +44,7 @@ pub(crate) async fn get_room_state_route(
.await? .await?
.into_values() .into_values()
.map(|id| { .map(|id| {
PduEvent::convert_to_outgoing_federation_event( convert_to_outgoing_federation_event(
services() services()
.rooms .rooms
.timeline .timeline
@ -67,7 +69,7 @@ pub(crate) async fn get_room_state_route(
.timeline .timeline
.get_pdu_json(&id) .get_pdu_json(&id)
.ok()? .ok()?
.map(PduEvent::convert_to_outgoing_federation_event) .map(convert_to_outgoing_federation_event)
}) })
.collect(), .collect(),
pdus, pdus,

View file

@ -4,14 +4,14 @@ pub mod debug;
pub mod error; pub mod error;
pub mod log; pub mod log;
pub mod mods; pub mod mods;
pub mod pducount; pub mod pdu;
pub mod server; pub mod server;
pub mod utils; pub mod utils;
pub mod version; pub mod version;
pub use config::Config; pub use config::Config;
pub use error::{Error, RumaResponse}; pub use error::{Error, RumaResponse};
pub use pducount::PduCount; pub use pdu::{PduBuilder, PduCount, PduEvent};
pub use server::Server; pub use server::Server;
pub use version::version; pub use version::version;

16
src/core/pdu/builder.rs Normal file
View file

@ -0,0 +1,16 @@
use std::{collections::BTreeMap, sync::Arc};
use ruma::{events::TimelineEventType, EventId};
use serde::Deserialize;
use serde_json::value::RawValue as RawJsonValue;
/// Build the start of a PDU in order to add it to the Database.
#[derive(Debug, Deserialize)]
pub struct PduBuilder {
#[serde(rename = "type")]
pub event_type: TimelineEventType,
pub content: Box<RawJsonValue>,
pub unsigned: Option<BTreeMap<String, serde_json::Value>>,
pub state_key: Option<String>,
pub redacts: Option<Arc<EventId>>,
}

View file

@ -1,6 +1,10 @@
mod builder;
mod count;
use std::{cmp::Ordering, collections::BTreeMap, sync::Arc}; use std::{cmp::Ordering, collections::BTreeMap, sync::Arc};
use conduit::{warn, Error}; pub use builder::PduBuilder;
pub use count::PduCount;
use ruma::{ use ruma::{
canonical_json::redact_content_in_place, canonical_json::redact_content_in_place,
events::{ events::{
@ -19,7 +23,7 @@ use serde_json::{
value::{to_raw_value, RawValue as RawJsonValue}, value::{to_raw_value, RawValue as RawJsonValue},
}; };
use crate::services; use crate::{warn, Error};
#[derive(Deserialize)] #[derive(Deserialize)]
struct ExtractRedactedBecause { struct ExtractRedactedBecause {
@ -336,42 +340,6 @@ impl PduEvent {
serde_json::from_value(json).expect("Raw::from_value always works") serde_json::from_value(json).expect("Raw::from_value always works")
} }
/// This does not return a full `Pdu` it is only to satisfy ruma's types.
#[tracing::instrument]
pub fn convert_to_outgoing_federation_event(mut pdu_json: CanonicalJsonObject) -> Box<RawJsonValue> {
if let Some(unsigned) = pdu_json
.get_mut("unsigned")
.and_then(|val| val.as_object_mut())
{
unsigned.remove("transaction_id");
}
// room v3 and above removed the "event_id" field from remote PDU format
if let Some(room_id) = pdu_json
.get("room_id")
.and_then(|val| RoomId::parse(val.as_str()?).ok())
{
match services().rooms.state.get_room_version(&room_id) {
Ok(room_version_id) => match room_version_id {
RoomVersionId::V1 | RoomVersionId::V2 => {},
_ => _ = pdu_json.remove("event_id"),
},
Err(_) => _ = pdu_json.remove("event_id"),
}
} else {
pdu_json.remove("event_id");
}
// TODO: another option would be to convert it to a canonical string to validate
// size and return a Result<Raw<...>>
// serde_json::from_str::<Raw<_>>(
// ruma::serde::to_canonical_json_string(pdu_json).expect("CanonicalJson is
// valid serde_json::Value"), )
// .expect("Raw::from_value always works")
to_raw_value(&pdu_json).expect("CanonicalJson is valid serde_json::Value")
}
pub fn from_id_val(event_id: &EventId, mut json: CanonicalJsonObject) -> Result<Self, serde_json::Error> { pub fn from_id_val(event_id: &EventId, mut json: CanonicalJsonObject) -> Result<Self, serde_json::Error> {
json.insert("event_id".to_owned(), CanonicalJsonValue::String(event_id.as_str().to_owned())); json.insert("event_id".to_owned(), CanonicalJsonValue::String(event_id.as_str().to_owned()));
@ -438,14 +406,3 @@ pub fn gen_event_id_canonical_json(
Ok((event_id, value)) Ok((event_id, value))
} }
/// Build the start of a PDU in order to add it to the Database.
#[derive(Debug, Deserialize)]
pub struct PduBuilder {
#[serde(rename = "type")]
pub event_type: TimelineEventType,
pub content: Box<RawJsonValue>,
pub unsigned: Option<BTreeMap<String, serde_json::Value>>,
pub state_key: Option<String>,
pub redacts: Option<Arc<EventId>>,
}

View file

@ -1,4 +1,3 @@
pub mod pdu;
pub mod services; pub mod services;
pub mod account_data; pub mod account_data;
@ -20,12 +19,12 @@ extern crate conduit_database as database;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
pub(crate) use conduit::{config, debug_error, debug_info, debug_warn, utils, Config, Error, PduCount, Result, Server}; pub(crate) use conduit::{config, debug_error, debug_info, debug_warn, utils, Config, Error, Result, Server};
pub use conduit::{pdu, PduBuilder, PduCount, PduEvent};
use database::Database; use database::Database;
pub use crate::{ pub use crate::{
globals::{server_is_ours, user_is_local}, globals::{server_is_ours, user_is_local},
pdu::PduEvent,
services::Services, services::Services,
}; };

View file

@ -14,6 +14,7 @@ use ruma::{
api::{appservice::Registration, OutgoingRequest}, api::{appservice::Registration, OutgoingRequest},
OwnedServerName, OwnedUserId, RoomId, ServerName, UserId, OwnedServerName, OwnedUserId, RoomId, ServerName, UserId,
}; };
pub use sender::convert_to_outgoing_federation_event;
use tokio::{sync::Mutex, task::JoinHandle}; use tokio::{sync::Mutex, task::JoinHandle};
use tracing::{error, warn}; use tracing::{error, warn};

View file

@ -18,12 +18,14 @@ use ruma::{
}, },
device_id, device_id,
events::{push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent, GlobalAccountDataEventType}, events::{push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent, GlobalAccountDataEventType},
push, uint, MilliSecondsSinceUnixEpoch, OwnedServerName, OwnedUserId, RoomId, ServerName, UInt, push, uint, CanonicalJsonObject, MilliSecondsSinceUnixEpoch, OwnedServerName, OwnedUserId, RoomId, RoomVersionId,
ServerName, UInt,
}; };
use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
use tracing::{debug, error, warn}; use tracing::{debug, error, warn};
use super::{appservice, send, Destination, Msg, SendingEvent, Service}; use super::{appservice, send, Destination, Msg, SendingEvent, Service};
use crate::{presence::Presence, services, user_is_local, utils::calculate_hash, Error, PduEvent, Result}; use crate::{presence::Presence, services, user_is_local, utils::calculate_hash, Error, Result};
#[derive(Debug)] #[derive(Debug)]
enum TransactionStatus { enum TransactionStatus {
@ -548,24 +550,21 @@ async fn send_events_dest_normal(
for event in &events { for event in &events {
match event { match event {
SendingEvent::Pdu(pdu_id) => { SendingEvent::Pdu(pdu_id) => pdu_jsons.push(convert_to_outgoing_federation_event(
// TODO: check room version and remove event_id if needed // TODO: check room version and remove event_id if needed
let raw = PduEvent::convert_to_outgoing_federation_event( services()
services() .rooms
.rooms .timeline
.timeline .get_pdu_json_from_id(pdu_id)
.get_pdu_json_from_id(pdu_id) .map_err(|e| (dest.clone(), e))?
.map_err(|e| (dest.clone(), e))? .ok_or_else(|| {
.ok_or_else(|| { error!(?dest, ?server, ?pdu_id, "event not found");
error!(?dest, ?server, ?pdu_id, "event not found"); (
( dest.clone(),
dest.clone(), Error::bad_database("[Normal] Event in servernameevent_data not found in db."),
Error::bad_database("[Normal] Event in servernameevent_data not found in db."), )
) })?,
})?, )),
);
pdu_jsons.push(raw);
},
SendingEvent::Edu(edu) => { SendingEvent::Edu(edu) => {
if let Ok(raw) = serde_json::from_slice(edu) { if let Ok(raw) = serde_json::from_slice(edu) {
edu_jsons.push(raw); edu_jsons.push(raw);
@ -611,3 +610,39 @@ async fn send_events_dest_normal(
}) })
.map_err(|e| (dest.clone(), e)) .map_err(|e| (dest.clone(), e))
} }
/// This does not return a full `Pdu` it is only to satisfy ruma's types.
#[tracing::instrument]
pub fn convert_to_outgoing_federation_event(mut pdu_json: CanonicalJsonObject) -> Box<RawJsonValue> {
if let Some(unsigned) = pdu_json
.get_mut("unsigned")
.and_then(|val| val.as_object_mut())
{
unsigned.remove("transaction_id");
}
// room v3 and above removed the "event_id" field from remote PDU format
if let Some(room_id) = pdu_json
.get("room_id")
.and_then(|val| RoomId::parse(val.as_str()?).ok())
{
match services().rooms.state.get_room_version(&room_id) {
Ok(room_version_id) => match room_version_id {
RoomVersionId::V1 | RoomVersionId::V2 => {},
_ => _ = pdu_json.remove("event_id"),
},
Err(_) => _ = pdu_json.remove("event_id"),
}
} else {
pdu_json.remove("event_id");
}
// TODO: another option would be to convert it to a canonical string to validate
// size and return a Result<Raw<...>>
// serde_json::from_str::<Raw<_>>(
// ruma::serde::to_canonical_json_string(pdu_json).expect("CanonicalJson is
// valid serde_json::Value"), )
// .expect("Raw::from_value always works")
to_raw_value(&pdu_json).expect("CanonicalJson is valid serde_json::Value")
}