From 8708cd3b633d88d260982563f2e2826bc8b12038 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Wed, 5 Oct 2022 09:34:25 +0200 Subject: [PATCH] 431 errors left --- src/api/client_server/media.rs | 2 +- src/api/client_server/membership.rs | 61 ++- src/api/client_server/message.rs | 19 +- src/api/client_server/presence.rs | 7 +- src/api/client_server/profile.rs | 11 +- src/api/client_server/read_marker.rs | 17 +- src/api/client_server/redact.rs | 2 +- src/api/client_server/report.rs | 2 +- src/api/client_server/room.rs | 53 +-- src/api/client_server/search.rs | 5 +- src/api/client_server/state.rs | 18 +- src/api/client_server/sync.rs | 79 ++-- src/api/client_server/typing.rs | 6 +- src/api/client_server/user_directory.rs | 6 +- src/api/client_server/voip.rs | 2 +- src/api/server_server.rs | 379 +++---------------- src/database/key_value/appservice.rs | 6 +- src/database/key_value/rooms/directory.rs | 4 +- src/database/mod.rs | 2 +- src/lib.rs | 4 +- src/service/account_data/mod.rs | 110 +----- src/service/admin/mod.rs | 52 +-- src/service/appservice/data.rs | 2 +- src/service/globals/mod.rs | 3 +- src/service/key_backups/mod.rs | 280 +------------- src/service/media/mod.rs | 8 +- src/service/pusher/mod.rs | 2 + src/service/rooms/auth_chain/mod.rs | 2 +- src/service/rooms/edus/mod.rs | 6 +- src/service/rooms/event_handler/mod.rs | 441 +++++++++++++++++----- src/service/rooms/state/mod.rs | 10 +- src/service/rooms/timeline/mod.rs | 12 +- 32 files changed, 640 insertions(+), 973 deletions(-) diff --git a/src/api/client_server/media.rs b/src/api/client_server/media.rs index d6e8213c..316e284b 100644 --- a/src/api/client_server/media.rs +++ b/src/api/client_server/media.rs @@ -196,7 +196,7 @@ pub async fn get_content_thumbnail_route( .upload_thumbnail( mxc, &None, - &get_thumbnail_response.content_type, + &get_thumbnail_response.content_type.as_deref(), body.width.try_into().expect("all UInts are valid u32s"), body.height.try_into().expect("all UInts are valid u32s"), &get_thumbnail_response.file, diff --git a/src/api/client_server/membership.rs b/src/api/client_server/membership.rs index d6f820a7..98931f25 100644 --- a/src/api/client_server/membership.rs +++ b/src/api/client_server/membership.rs @@ -481,7 +481,7 @@ async fn join_room_by_id_helper( let (make_join_response, remote_server) = make_join_response_and_server?; let room_version = match make_join_response.room_version { - Some(room_version) if services().rooms.is_supported_version(&room_version) => room_version, + Some(room_version) if services().rooms.metadata.is_supported_version(&room_version) => room_version, _ => return Err(Error::BadServerResponse("Room version is not supported")), }; @@ -591,7 +591,7 @@ async fn join_room_by_id_helper( Error::BadServerResponse("Invalid PDU in send_join response.") })?; - services().rooms.add_pdu_outlier(&event_id, &value)?; + services().rooms.outlier.add_pdu_outlier(&event_id, &value)?; if let Some(state_key) = &pdu.state_key { let shortstatekey = services().rooms.short.get_or_create_shortstatekey( &pdu.kind.to_string().into(), @@ -621,14 +621,6 @@ async fn join_room_by_id_helper( return Err(Error::BadServerResponse("State contained no create event.")); } - services().rooms.state.force_state( - room_id, - state - .into_iter() - .map(|(k, id)| services().rooms.compress_state_event(k, &id)) - .collect::>()?, - )?; - for result in send_join_response .room_state .auth_chain @@ -640,14 +632,21 @@ async fn join_room_by_id_helper( Err(_) => continue, }; - services().rooms.add_pdu_outlier(&event_id, &value)?; + services().rooms.outlier.add_pdu_outlier(&event_id, &value)?; } + let shortstatehash = services().rooms.state.set_event_state( + event_id, + room_id, + state + .into_iter() + .map(|(k, id)| services().rooms.state_compressor.compress_state_event(k, &id)) + .collect::>()?, + )?; + // We append to state before appending the pdu, so we don't have a moment in time with the // pdu without it's state. This is okay because append_pdu can't fail. - let statehashid = services().rooms.append_to_state(&parsed_pdu)?; - - services().rooms.append_pdu( + services().rooms.timeline.append_pdu( &parsed_pdu, join_event, iter::once(&*parsed_pdu.event_id), @@ -655,7 +654,9 @@ async fn join_room_by_id_helper( // We set the room state after inserting the pdu, so that we never have a moment in time // where events in the current room state do not exist - services().rooms.set_room_state(room_id, statehashid)?; + services().rooms.state.set_room_state(room_id, shortstatehash)?; + + let statehashid = services().rooms.state.append_to_state(&parsed_pdu)?; } else { let event = RoomMemberEventContent { membership: MembershipState::Join, @@ -668,7 +669,7 @@ async fn join_room_by_id_helper( join_authorized_via_users_server: None, }; - services().rooms.build_and_append_pdu( + services().rooms.timeline.build_and_append_pdu( PduBuilder { event_type: RoomEventType::RoomMember, content: to_raw_value(&event).expect("event is valid, we just created it"), @@ -678,7 +679,6 @@ async fn join_room_by_id_helper( }, sender_user, room_id, - services(), &state_lock, )?; } @@ -786,7 +786,7 @@ pub(crate) async fn invite_helper<'a>( unsigned: None, state_key: Some(user_id.to_string()), redacts: None, - }, sender_user, room_id, &state_lock); + }, sender_user, room_id, &state_lock)?; let invite_room_state = services().rooms.state.calculate_invite_state(&pdu)?; @@ -811,7 +811,7 @@ pub(crate) async fn invite_helper<'a>( create_invite::v2::Request { room_id, event_id: expected_event_id, - room_version: &services().state.get_room_version(&room_id)?, + room_version: &services().rooms.state.get_room_version(&room_id)?, event: &PduEvent::convert_to_outgoing_federation_event(pdu_json.clone()), invite_room_state: &invite_room_state, }, @@ -846,7 +846,7 @@ pub(crate) async fn invite_helper<'a>( ) .map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Origin field is invalid."))?; - let pdu_id = services().rooms.event_handler.handle_incoming_pdu( + let pdu_id: Vec = services().rooms.event_handler.handle_incoming_pdu( &origin, &event_id, room_id, @@ -854,13 +854,7 @@ pub(crate) async fn invite_helper<'a>( true, &pub_key_map, ) - .await - .map_err(|_| { - Error::BadRequest( - ErrorKind::InvalidParam, - "Error while handling incoming PDU.", - ) - })? + .await? .ok_or(Error::BadRequest( ErrorKind::InvalidParam, "Could not accept incoming PDU as timeline event.", @@ -868,6 +862,7 @@ pub(crate) async fn invite_helper<'a>( let servers = services() .rooms + .state_cache .room_servers(room_id) .filter_map(|r| r.ok()) .filter(|server| &**server != services().globals.server_name()); @@ -877,7 +872,7 @@ pub(crate) async fn invite_helper<'a>( return Ok(()); } - if !services().rooms.is_joined(sender_user, &room_id)? { + if !services().rooms.state_cache.is_joined(sender_user, &room_id)? { return Err(Error::BadRequest( ErrorKind::Forbidden, "You don't have permission to view this room.", @@ -894,7 +889,7 @@ pub(crate) async fn invite_helper<'a>( ); let state_lock = mutex_state.lock().await; - services().rooms.build_and_append_pdu( + services().rooms.timeline.build_and_append_pdu( PduBuilder { event_type: RoomEventType::RoomMember, content: to_raw_value(&RoomMemberEventContent { @@ -926,8 +921,9 @@ pub(crate) async fn invite_helper<'a>( pub async fn leave_all_rooms(user_id: &UserId) -> Result<()> { let all_rooms = services() .rooms + .state_cache .rooms_joined(user_id) - .chain(services().rooms.rooms_invited(user_id).map(|t| t.map(|(r, _)| r))) + .chain(services().rooms.state_cache.rooms_invited(user_id).map(|t| t.map(|(r, _)| r))) .collect::>(); for room_id in all_rooms { @@ -955,7 +951,7 @@ pub async fn leave_room( let last_state = services().rooms.state_cache .invite_state(user_id, room_id)? - .map_or_else(|| services().rooms.left_state(user_id, room_id), |s| Ok(Some(s)))?; + .map_or_else(|| services().rooms.state_cache.left_state(user_id, room_id), |s| Ok(Some(s)))?; // We always drop the invite, we can't rely on other servers services().rooms.state_cache.update_membership( @@ -978,7 +974,7 @@ pub async fn leave_room( let state_lock = mutex_state.lock().await; let mut event: RoomMemberEventContent = serde_json::from_str( - services().rooms.state.room_state_get(room_id, &StateEventType::RoomMember, user_id.as_str())? + services().rooms.state_accessor.room_state_get(room_id, &StateEventType::RoomMember, user_id.as_str())? .ok_or(Error::BadRequest( ErrorKind::BadState, "Cannot leave a room you are not a member of.", @@ -1017,6 +1013,7 @@ async fn remote_leave_room( let invite_state = services() .rooms + .state_cache .invite_state(user_id, room_id)? .ok_or(Error::BadRequest( ErrorKind::BadState, diff --git a/src/api/client_server/message.rs b/src/api/client_server/message.rs index 861f9c13..bfdc2fdb 100644 --- a/src/api/client_server/message.rs +++ b/src/api/client_server/message.rs @@ -68,7 +68,7 @@ pub async fn send_message_event_route( let mut unsigned = BTreeMap::new(); unsigned.insert("transaction_id".to_owned(), body.txn_id.to_string().into()); - let event_id = services().rooms.build_and_append_pdu( + let event_id = services().rooms.timeline.build_and_append_pdu( PduBuilder { event_type: body.event_type.to_string().into(), content: serde_json::from_str(body.body.body.json().get()) @@ -108,7 +108,7 @@ pub async fn get_message_events_route( let sender_user = body.sender_user.as_ref().expect("user is authenticated"); let sender_device = body.sender_device.as_ref().expect("user is authenticated"); - if !services().rooms.is_joined(sender_user, &body.room_id)? { + if !services().rooms.state_cache.is_joined(sender_user, &body.room_id)? { return Err(Error::BadRequest( ErrorKind::Forbidden, "You don't have permission to view this room.", @@ -129,7 +129,7 @@ pub async fn get_message_events_route( let to = body.to.as_ref().map(|t| t.parse()); services().rooms - .lazy_load_confirm_delivery(sender_user, sender_device, &body.room_id, from)?; + .lazy_loading.lazy_load_confirm_delivery(sender_user, sender_device, &body.room_id, from)?; // Use limit or else 10 let limit = body.limit.try_into().map_or(10_usize, |l: u32| l as usize); @@ -144,12 +144,13 @@ pub async fn get_message_events_route( get_message_events::v3::Direction::Forward => { let events_after: Vec<_> = services() .rooms + .timeline .pdus_after(sender_user, &body.room_id, from)? .take(limit) .filter_map(|r| r.ok()) // Filter out buggy events .filter_map(|(pdu_id, pdu)| { services().rooms - .pdu_count(&pdu_id) + .timeline.pdu_count(&pdu_id) .map(|pdu_count| (pdu_count, pdu)) .ok() }) @@ -157,7 +158,7 @@ pub async fn get_message_events_route( .collect(); for (_, event) in &events_after { - if !services().rooms.lazy_load_was_sent_before( + if !services().rooms.lazy_loading.lazy_load_was_sent_before( sender_user, sender_device, &body.room_id, @@ -181,11 +182,13 @@ pub async fn get_message_events_route( get_message_events::v3::Direction::Backward => { let events_before: Vec<_> = services() .rooms + .timeline .pdus_until(sender_user, &body.room_id, from)? .take(limit) .filter_map(|r| r.ok()) // Filter out buggy events .filter_map(|(pdu_id, pdu)| { services().rooms + .timeline .pdu_count(&pdu_id) .map(|pdu_count| (pdu_count, pdu)) .ok() @@ -194,7 +197,7 @@ pub async fn get_message_events_route( .collect(); for (_, event) in &events_before { - if !services().rooms.lazy_load_was_sent_before( + if !services().rooms.lazy_loading.lazy_load_was_sent_before( sender_user, sender_device, &body.room_id, @@ -220,7 +223,7 @@ pub async fn get_message_events_route( resp.state = Vec::new(); for ll_id in &lazy_loaded { if let Some(member_event) = - services().rooms + services().rooms.state_accessor .room_state_get(&body.room_id, &StateEventType::RoomMember, ll_id.as_str())? { resp.state.push(member_event.to_state_event()); @@ -228,7 +231,7 @@ pub async fn get_message_events_route( } if let Some(next_token) = next_token { - services().rooms.lazy_load_mark_sent( + services().rooms.lazy_loading.lazy_load_mark_sent( sender_user, sender_device, &body.room_id, diff --git a/src/api/client_server/presence.rs b/src/api/client_server/presence.rs index bc220b80..6a915e44 100644 --- a/src/api/client_server/presence.rs +++ b/src/api/client_server/presence.rs @@ -10,10 +10,10 @@ pub async fn set_presence_route( ) -> Result { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); - for room_id in services().rooms.rooms_joined(sender_user) { + for room_id in services().rooms.state_cache.rooms_joined(sender_user) { let room_id = room_id?; - services().rooms.edus.update_presence( + services().rooms.edus.presence.update_presence( sender_user, &room_id, ruma::events::presence::PresenceEvent { @@ -51,13 +51,14 @@ pub async fn get_presence_route( for room_id in services() .rooms - .get_shared_rooms(vec![sender_user.clone(), body.user_id.clone()])? + .user.get_shared_rooms(vec![sender_user.clone(), body.user_id.clone()])? { let room_id = room_id?; if let Some(presence) = services() .rooms .edus + .presence .get_last_presence_event(sender_user, &room_id)? { presence_event = Some(presence); diff --git a/src/api/client_server/profile.rs b/src/api/client_server/profile.rs index 7a87bcd1..3e1d736f 100644 --- a/src/api/client_server/profile.rs +++ b/src/api/client_server/profile.rs @@ -30,6 +30,7 @@ pub async fn set_displayname_route( // Send a new membership event and presence update into all joined rooms let all_rooms_joined: Vec<_> = services() .rooms + .state_cache .rooms_joined(sender_user) .filter_map(|r| r.ok()) .map(|room_id| { @@ -40,6 +41,7 @@ pub async fn set_displayname_route( displayname: body.displayname.clone(), ..serde_json::from_str( services().rooms + .state_accessor .room_state_get( &room_id, &StateEventType::RoomMember, @@ -80,10 +82,11 @@ pub async fn set_displayname_route( let _ = services() .rooms + .timeline .build_and_append_pdu(pdu_builder, sender_user, &room_id, &state_lock); // Presence update - services().rooms.edus.update_presence( + services().rooms.edus.presence.update_presence( sender_user, &room_id, ruma::events::presence::PresenceEvent { @@ -155,6 +158,7 @@ pub async fn set_avatar_url_route( // Send a new membership event and presence update into all joined rooms let all_joined_rooms: Vec<_> = services() .rooms + .state_cache .rooms_joined(sender_user) .filter_map(|r| r.ok()) .map(|room_id| { @@ -165,6 +169,7 @@ pub async fn set_avatar_url_route( avatar_url: body.avatar_url.clone(), ..serde_json::from_str( services().rooms + .state_accessor .room_state_get( &room_id, &StateEventType::RoomMember, @@ -205,10 +210,11 @@ pub async fn set_avatar_url_route( let _ = services() .rooms + .timeline .build_and_append_pdu(pdu_builder, sender_user, &room_id, &state_lock); // Presence update - services().rooms.edus.update_presence( + services().rooms.edus.presence.update_presence( sender_user, &room_id, ruma::events::presence::PresenceEvent { @@ -226,7 +232,6 @@ pub async fn set_avatar_url_route( }, sender: sender_user.clone(), }, - &services().globals, )?; } diff --git a/src/api/client_server/read_marker.rs b/src/api/client_server/read_marker.rs index 284ae65e..eda57d57 100644 --- a/src/api/client_server/read_marker.rs +++ b/src/api/client_server/read_marker.rs @@ -31,15 +31,15 @@ pub async fn set_read_marker_route( )?; if let Some(event) = &body.read_receipt { - services().rooms.edus.private_read_set( + services().rooms.edus.read_receipt.private_read_set( &body.room_id, sender_user, - services().rooms.get_pdu_count(event)?.ok_or(Error::BadRequest( + services().rooms.timeline.get_pdu_count(event)?.ok_or(Error::BadRequest( ErrorKind::InvalidParam, "Event does not exist.", ))?, )?; - services().rooms + services().rooms.user .reset_notification_counts(sender_user, &body.room_id)?; let mut user_receipts = BTreeMap::new(); @@ -56,7 +56,7 @@ pub async fn set_read_marker_route( let mut receipt_content = BTreeMap::new(); receipt_content.insert(event.to_owned(), receipts); - services().rooms.edus.readreceipt_update( + services().rooms.edus.read_receipt.readreceipt_update( sender_user, &body.room_id, ruma::events::receipt::ReceiptEvent { @@ -77,17 +77,18 @@ pub async fn create_receipt_route( ) -> Result { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); - services().rooms.edus.private_read_set( + services().rooms.edus.read_receipt.private_read_set( &body.room_id, sender_user, services().rooms + .timeline .get_pdu_count(&body.event_id)? .ok_or(Error::BadRequest( ErrorKind::InvalidParam, "Event does not exist.", ))?, )?; - services().rooms + services().rooms.user .reset_notification_counts(sender_user, &body.room_id)?; let mut user_receipts = BTreeMap::new(); @@ -103,7 +104,7 @@ pub async fn create_receipt_route( let mut receipt_content = BTreeMap::new(); receipt_content.insert(body.event_id.to_owned(), receipts); - services().rooms.edus.readreceipt_update( + services().rooms.edus.read_receipt.readreceipt_update( sender_user, &body.room_id, ruma::events::receipt::ReceiptEvent { @@ -112,7 +113,5 @@ pub async fn create_receipt_route( }, )?; - services().flush()?; - Ok(create_receipt::v3::Response {}) } diff --git a/src/api/client_server/redact.rs b/src/api/client_server/redact.rs index d6699bcf..57e442ab 100644 --- a/src/api/client_server/redact.rs +++ b/src/api/client_server/redact.rs @@ -29,7 +29,7 @@ pub async fn redact_event_route( ); let state_lock = mutex_state.lock().await; - let event_id = services().rooms.build_and_append_pdu( + let event_id = services().rooms.timeline.build_and_append_pdu( PduBuilder { event_type: RoomEventType::RoomRedaction, content: to_raw_value(&RoomRedactionEventContent { diff --git a/src/api/client_server/report.rs b/src/api/client_server/report.rs index 2c2a5493..efcc4348 100644 --- a/src/api/client_server/report.rs +++ b/src/api/client_server/report.rs @@ -14,7 +14,7 @@ pub async fn report_event_route( ) -> Result { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); - let pdu = match services().rooms.get_pdu(&body.event_id)? { + let pdu = match services().rooms.timeline.get_pdu(&body.event_id)? { Some(pdu) => pdu, _ => { return Err(Error::BadRequest( diff --git a/src/api/client_server/room.rs b/src/api/client_server/room.rs index f8d06023..a7fa9520 100644 --- a/src/api/client_server/room.rs +++ b/src/api/client_server/room.rs @@ -54,7 +54,7 @@ pub async fn create_room_route( let room_id = RoomId::new(services().globals.server_name()); - services().rooms.get_or_create_shortroomid(&room_id)?; + services().rooms.short.get_or_create_shortroomid(&room_id)?; let mutex_state = Arc::clone( services().globals @@ -162,7 +162,7 @@ pub async fn create_room_route( } // 1. The room create event - services().rooms.build_and_append_pdu( + services().rooms.timeline.build_and_append_pdu( PduBuilder { event_type: RoomEventType::RoomCreate, content: to_raw_value(&content).expect("event is valid, we just created it"), @@ -176,7 +176,7 @@ pub async fn create_room_route( )?; // 2. Let the room creator join - services().rooms.build_and_append_pdu( + services().rooms.timeline.build_and_append_pdu( PduBuilder { event_type: RoomEventType::RoomMember, content: to_raw_value(&RoomMemberEventContent { @@ -237,7 +237,7 @@ pub async fn create_room_route( } } - services().rooms.build_and_append_pdu( + services().rooms.timeline.build_and_append_pdu( PduBuilder { event_type: RoomEventType::RoomPowerLevels, content: to_raw_value(&power_levels_content) @@ -253,7 +253,7 @@ pub async fn create_room_route( // 4. Canonical room alias if let Some(room_alias_id) = &alias { - services().rooms.build_and_append_pdu( + services().rooms.timeline.build_and_append_pdu( PduBuilder { event_type: RoomEventType::RoomCanonicalAlias, content: to_raw_value(&RoomCanonicalAliasEventContent { @@ -274,7 +274,7 @@ pub async fn create_room_route( // 5. Events set by preset // 5.1 Join Rules - services().rooms.build_and_append_pdu( + services().rooms.timeline.build_and_append_pdu( PduBuilder { event_type: RoomEventType::RoomJoinRules, content: to_raw_value(&RoomJoinRulesEventContent::new(match preset { @@ -293,7 +293,7 @@ pub async fn create_room_route( )?; // 5.2 History Visibility - services().rooms.build_and_append_pdu( + services().rooms.timeline.build_and_append_pdu( PduBuilder { event_type: RoomEventType::RoomHistoryVisibility, content: to_raw_value(&RoomHistoryVisibilityEventContent::new( @@ -310,7 +310,7 @@ pub async fn create_room_route( )?; // 5.3 Guest Access - services().rooms.build_and_append_pdu( + services().rooms.timeline.build_and_append_pdu( PduBuilder { event_type: RoomEventType::RoomGuestAccess, content: to_raw_value(&RoomGuestAccessEventContent::new(match preset { @@ -344,12 +344,12 @@ pub async fn create_room_route( } services().rooms - .build_and_append_pdu(pdu_builder, sender_user, &room_id, &state_lock)?; + .timeline.build_and_append_pdu(pdu_builder, sender_user, &room_id, &state_lock)?; } // 7. Events implied by name and topic if let Some(name) = &body.name { - services().rooms.build_and_append_pdu( + services().rooms.timeline.build_and_append_pdu( PduBuilder { event_type: RoomEventType::RoomName, content: to_raw_value(&RoomNameEventContent::new(Some(name.clone()))) @@ -365,7 +365,7 @@ pub async fn create_room_route( } if let Some(topic) = &body.topic { - services().rooms.build_and_append_pdu( + services().rooms.timeline.build_and_append_pdu( PduBuilder { event_type: RoomEventType::RoomTopic, content: to_raw_value(&RoomTopicEventContent { @@ -390,11 +390,11 @@ pub async fn create_room_route( // Homeserver specific stuff if let Some(alias) = alias { - services().rooms.set_alias(&alias, Some(&room_id))?; + services().rooms.alias.set_alias(&alias, &room_id)?; } if body.visibility == room::Visibility::Public { - services().rooms.set_public(&room_id, true)?; + services().rooms.directory.set_public(&room_id)?; } info!("{} created a room", sender_user); @@ -412,7 +412,7 @@ pub async fn get_room_event_route( ) -> Result { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); - if !services().rooms.is_joined(sender_user, &body.room_id)? { + if !services().rooms.state_cache.is_joined(sender_user, &body.room_id)? { return Err(Error::BadRequest( ErrorKind::Forbidden, "You don't have permission to view this room.", @@ -422,6 +422,7 @@ pub async fn get_room_event_route( Ok(get_room_event::v3::Response { event: services() .rooms + .timeline .get_pdu(&body.event_id)? .ok_or(Error::BadRequest(ErrorKind::NotFound, "Event not found."))? .to_room_event(), @@ -438,7 +439,7 @@ pub async fn get_room_aliases_route( ) -> Result { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); - if !services().rooms.is_joined(sender_user, &body.room_id)? { + if !services().rooms.state_cache.is_joined(sender_user, &body.room_id)? { return Err(Error::BadRequest( ErrorKind::Forbidden, "You don't have permission to view this room.", @@ -448,7 +449,7 @@ pub async fn get_room_aliases_route( Ok(aliases::v3::Response { aliases: services() .rooms - .room_aliases(&body.room_id) + .alias.local_aliases_for_room(&body.room_id) .filter_map(|a| a.ok()) .collect(), }) @@ -479,7 +480,7 @@ pub async fn upgrade_room_route( // Create a replacement room let replacement_room = RoomId::new(services().globals.server_name()); services().rooms - .get_or_create_shortroomid(&replacement_room)?; + .short.get_or_create_shortroomid(&replacement_room)?; let mutex_state = Arc::clone( services().globals @@ -493,7 +494,7 @@ pub async fn upgrade_room_route( // Send a m.room.tombstone event to the old room to indicate that it is not intended to be used any further // Fail if the sender does not have the required permissions - let tombstone_event_id = services().rooms.build_and_append_pdu( + let tombstone_event_id = services().rooms.timeline.build_and_append_pdu( PduBuilder { event_type: RoomEventType::RoomTombstone, content: to_raw_value(&RoomTombstoneEventContent { @@ -525,6 +526,7 @@ pub async fn upgrade_room_route( // Get the old room creation event let mut create_event_content = serde_json::from_str::( services().rooms + .state_accessor .room_state_get(&body.room_id, &StateEventType::RoomCreate, "")? .ok_or_else(|| Error::bad_database("Found room without m.room.create event."))? .content @@ -572,7 +574,7 @@ pub async fn upgrade_room_route( )); } - services().rooms.build_and_append_pdu( + services().rooms.timeline.build_and_append_pdu( PduBuilder { event_type: RoomEventType::RoomCreate, content: to_raw_value(&create_event_content) @@ -587,7 +589,7 @@ pub async fn upgrade_room_route( )?; // Join the new room - services().rooms.build_and_append_pdu( + services().rooms.timeline.build_and_append_pdu( PduBuilder { event_type: RoomEventType::RoomMember, content: to_raw_value(&RoomMemberEventContent { @@ -625,12 +627,12 @@ pub async fn upgrade_room_route( // Replicate transferable state events to the new room for event_type in transferable_state_events { - let event_content = match services().rooms.room_state_get(&body.room_id, &event_type, "")? { + let event_content = match services().rooms.state_accessor.room_state_get(&body.room_id, &event_type, "")? { Some(v) => v.content.clone(), None => continue, // Skipping missing events. }; - services().rooms.build_and_append_pdu( + services().rooms.timeline.build_and_append_pdu( PduBuilder { event_type: event_type.to_string().into(), content: event_content, @@ -645,14 +647,15 @@ pub async fn upgrade_room_route( } // Moves any local aliases to the new room - for alias in services().rooms.room_aliases(&body.room_id).filter_map(|r| r.ok()) { + for alias in services().rooms.alias.local_aliases_for_room(&body.room_id).filter_map(|r| r.ok()) { services().rooms - .set_alias(&alias, Some(&replacement_room))?; + .alias.set_alias(&alias, &replacement_room)?; } // Get the old room power levels let mut power_levels_event_content: RoomPowerLevelsEventContent = serde_json::from_str( services().rooms + .state_accessor .room_state_get(&body.room_id, &StateEventType::RoomPowerLevels, "")? .ok_or_else(|| Error::bad_database("Found room without m.room.create event."))? .content @@ -666,7 +669,7 @@ pub async fn upgrade_room_route( power_levels_event_content.invite = new_level; // Modify the power levels in the old room to prevent sending of events and inviting new users - let _ = services().rooms.build_and_append_pdu( + let _ = services().rooms.timeline.build_and_append_pdu( PduBuilder { event_type: RoomEventType::RoomPowerLevels, content: to_raw_value(&power_levels_event_content) diff --git a/src/api/client_server/search.rs b/src/api/client_server/search.rs index b7eecd5a..f648649b 100644 --- a/src/api/client_server/search.rs +++ b/src/api/client_server/search.rs @@ -24,6 +24,7 @@ pub async fn search_events_route( let room_ids = filter.rooms.clone().unwrap_or_else(|| { services().rooms + .state_cache .rooms_joined(sender_user) .filter_map(|r| r.ok()) .collect() @@ -34,7 +35,7 @@ pub async fn search_events_route( let mut searches = Vec::new(); for room_id in room_ids { - if !services().rooms.is_joined(sender_user, &room_id)? { + if !services().rooms.state_cache.is_joined(sender_user, &room_id)? { return Err(Error::BadRequest( ErrorKind::Forbidden, "You don't have permission to view this room.", @@ -43,6 +44,7 @@ pub async fn search_events_route( if let Some(search) = services() .rooms + .search .search_pdus(&room_id, &search_criteria.search_term)? { searches.push(search.0.peekable()); @@ -86,6 +88,7 @@ pub async fn search_events_route( rank: None, result: services() .rooms + .timeline .get_pdu_from_id(result)? .map(|pdu| pdu.to_room_event()), }) diff --git a/src/api/client_server/state.rs b/src/api/client_server/state.rs index b2dfe2a7..ece74536 100644 --- a/src/api/client_server/state.rs +++ b/src/api/client_server/state.rs @@ -90,9 +90,10 @@ pub async fn get_state_events_route( #[allow(clippy::blocks_in_if_conditions)] // Users not in the room should not be able to access the state unless history_visibility is // WorldReadable - if !services().rooms.is_joined(sender_user, &body.room_id)? + if !services().rooms.state_cache.is_joined(sender_user, &body.room_id)? && !matches!( services().rooms + .state_accessor .room_state_get(&body.room_id, &StateEventType::RoomHistoryVisibility, "")? .map(|event| { serde_json::from_str(event.content.get()) @@ -115,6 +116,7 @@ pub async fn get_state_events_route( Ok(get_state_events::v3::Response { room_state: services() .rooms + .state_accessor .room_state_full(&body.room_id) .await? .values() @@ -136,10 +138,10 @@ pub async fn get_state_events_for_key_route( #[allow(clippy::blocks_in_if_conditions)] // Users not in the room should not be able to access the state unless history_visibility is // WorldReadable - if !services().rooms.is_joined(sender_user, &body.room_id)? + if !services().rooms.state_cache.is_joined(sender_user, &body.room_id)? && !matches!( services().rooms - .room_state_get(&body.room_id, &StateEventType::RoomHistoryVisibility, "")? + .state_accessor.room_state_get(&body.room_id, &StateEventType::RoomHistoryVisibility, "")? .map(|event| { serde_json::from_str(event.content.get()) .map(|e: RoomHistoryVisibilityEventContent| e.history_visibility) @@ -160,7 +162,7 @@ pub async fn get_state_events_for_key_route( let event = services() .rooms - .room_state_get(&body.room_id, &body.event_type, &body.state_key)? + .state_accessor.room_state_get(&body.room_id, &body.event_type, &body.state_key)? .ok_or(Error::BadRequest( ErrorKind::NotFound, "State event not found.", @@ -185,10 +187,10 @@ pub async fn get_state_events_for_empty_key_route( #[allow(clippy::blocks_in_if_conditions)] // Users not in the room should not be able to access the state unless history_visibility is // WorldReadable - if !services().rooms.is_joined(sender_user, &body.room_id)? + if !services().rooms.state_cache.is_joined(sender_user, &body.room_id)? && !matches!( services().rooms - .room_state_get(&body.room_id, &StateEventType::RoomHistoryVisibility, "")? + .state_accessor.room_state_get(&body.room_id, &StateEventType::RoomHistoryVisibility, "")? .map(|event| { serde_json::from_str(event.content.get()) .map(|e: RoomHistoryVisibilityEventContent| e.history_visibility) @@ -209,7 +211,7 @@ pub async fn get_state_events_for_empty_key_route( let event = services() .rooms - .room_state_get(&body.room_id, &body.event_type, "")? + .state_accessor.room_state_get(&body.room_id, &body.event_type, "")? .ok_or(Error::BadRequest( ErrorKind::NotFound, "State event not found.", @@ -269,7 +271,7 @@ async fn send_state_event_for_key_helper( ); let state_lock = mutex_state.lock().await; - let event_id = services().rooms.build_and_append_pdu( + let event_id = services().rooms.timeline.build_and_append_pdu( PduBuilder { event_type: event_type.to_string().into(), content: serde_json::from_str(json.json().get()).expect("content is valid json"), diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs index cc4ebf6e..e38ea600 100644 --- a/src/api/client_server/sync.rs +++ b/src/api/client_server/sync.rs @@ -172,7 +172,7 @@ async fn sync_helper( }; // TODO: match body.set_presence { - services().rooms.edus.ping_presence(&sender_user)?; + services().rooms.edus.presence.ping_presence(&sender_user)?; // Setup watchers, so if there's no response, we can wait for them let watcher = services().watch(&sender_user, &sender_device); @@ -216,7 +216,7 @@ async fn sync_helper( .filter_map(|r| r.ok()), ); - let all_joined_rooms = services().rooms.rooms_joined(&sender_user).collect::>(); + let all_joined_rooms = services().rooms.state_cache.rooms_joined(&sender_user).collect::>(); for room_id in all_joined_rooms { let room_id = room_id?; @@ -237,9 +237,10 @@ async fn sync_helper( let timeline_pdus; let limited; - if services().rooms.last_timeline_count(&sender_user, &room_id)? > since { + if services().rooms.timeline.last_timeline_count(&sender_user, &room_id)? > since { let mut non_timeline_pdus = services() .rooms + .timeline .pdus_until(&sender_user, &room_id, u64::MAX)? .filter_map(|r| { // Filter out buggy events @@ -250,6 +251,7 @@ async fn sync_helper( }) .take_while(|(pduid, _)| { services().rooms + .timeline .pdu_count(pduid) .map_or(false, |count| count > since) }); @@ -275,6 +277,7 @@ async fn sync_helper( || services() .rooms .edus + .read_receipt .last_privateread_update(&sender_user, &room_id)? > since; @@ -283,24 +286,24 @@ async fn sync_helper( timeline_users.insert(event.sender.as_str().to_owned()); } - services().rooms + services().rooms.lazy_loading .lazy_load_confirm_delivery(&sender_user, &sender_device, &room_id, since)?; // Database queries: - let current_shortstatehash = if let Some(s) = services().rooms.current_shortstatehash(&room_id)? { + let current_shortstatehash = if let Some(s) = services().rooms.state.get_room_shortstatehash(&room_id)? { s } else { error!("Room {} has no state", room_id); continue; }; - let since_shortstatehash = services().rooms.get_token_shortstatehash(&room_id, since)?; + let since_shortstatehash = services().rooms.user.get_token_shortstatehash(&room_id, since)?; // Calculates joined_member_count, invited_member_count and heroes let calculate_counts = || { - let joined_member_count = services().rooms.room_joined_count(&room_id)?.unwrap_or(0); - let invited_member_count = services().rooms.room_invited_count(&room_id)?.unwrap_or(0); + let joined_member_count = services().rooms.state_cache.room_joined_count(&room_id)?.unwrap_or(0); + let invited_member_count = services().rooms.state_cache.room_invited_count(&room_id)?.unwrap_or(0); // Recalculate heroes (first 5 members) let mut heroes = Vec::new(); @@ -311,7 +314,7 @@ async fn sync_helper( for hero in services() .rooms - .all_pdus(&sender_user, &room_id)? + .timeline.all_pdus(&sender_user, &room_id)? .filter_map(|pdu| pdu.ok()) // Ignore all broken pdus .filter(|(_, pdu)| pdu.kind == RoomEventType::RoomMember) .map(|(_, pdu)| { @@ -329,8 +332,8 @@ async fn sync_helper( if matches!( content.membership, MembershipState::Join | MembershipState::Invite - ) && (services().rooms.is_joined(&user_id, &room_id)? - || services().rooms.is_invited(&user_id, &room_id)?) + ) && (services().rooms.state_cache.is_joined(&user_id, &room_id)? + || services().rooms.state_cache.is_invited(&user_id, &room_id)?) { Ok::<_, Error>(Some(state_key.clone())) } else { @@ -371,17 +374,17 @@ async fn sync_helper( let (joined_member_count, invited_member_count, heroes) = calculate_counts()?; - let current_state_ids = services().rooms.state_full_ids(current_shortstatehash).await?; + let current_state_ids = services().rooms.state_accessor.state_full_ids(current_shortstatehash).await?; let mut state_events = Vec::new(); let mut lazy_loaded = HashSet::new(); let mut i = 0; for (shortstatekey, id) in current_state_ids { - let (event_type, state_key) = services().rooms.get_statekey_from_short(shortstatekey)?; + let (event_type, state_key) = services().rooms.short.get_statekey_from_short(shortstatekey)?; if event_type != StateEventType::RoomMember { - let pdu = match services().rooms.get_pdu(&id)? { + let pdu = match services().rooms.timeline.get_pdu(&id)? { Some(pdu) => pdu, None => { error!("Pdu in state not found: {}", id); @@ -398,7 +401,7 @@ async fn sync_helper( || body.full_state || timeline_users.contains(&state_key) { - let pdu = match services().rooms.get_pdu(&id)? { + let pdu = match services().rooms.timeline.get_pdu(&id)? { Some(pdu) => pdu, None => { error!("Pdu in state not found: {}", id); @@ -420,12 +423,12 @@ async fn sync_helper( } // Reset lazy loading because this is an initial sync - services().rooms + services().rooms.lazy_loading .lazy_load_reset(&sender_user, &sender_device, &room_id)?; // The state_events above should contain all timeline_users, let's mark them as lazy // loaded. - services().rooms.lazy_load_mark_sent( + services().rooms.lazy_loading.lazy_load_mark_sent( &sender_user, &sender_device, &room_id, @@ -449,6 +452,7 @@ async fn sync_helper( let since_sender_member: Option = services() .rooms + .state_accessor .state_get( since_shortstatehash, &StateEventType::RoomMember, @@ -467,12 +471,12 @@ async fn sync_helper( let mut lazy_loaded = HashSet::new(); if since_shortstatehash != current_shortstatehash { - let current_state_ids = services().rooms.state_full_ids(current_shortstatehash).await?; - let since_state_ids = services().rooms.state_full_ids(since_shortstatehash).await?; + let current_state_ids = services().rooms.state_accessor.state_full_ids(current_shortstatehash).await?; + let since_state_ids = services().rooms.state_accessor.state_full_ids(since_shortstatehash).await?; for (key, id) in current_state_ids { if body.full_state || since_state_ids.get(&key) != Some(&id) { - let pdu = match services().rooms.get_pdu(&id)? { + let pdu = match services().rooms.timeline.get_pdu(&id)? { Some(pdu) => pdu, None => { error!("Pdu in state not found: {}", id); @@ -505,14 +509,14 @@ async fn sync_helper( continue; } - if !services().rooms.lazy_load_was_sent_before( + if !services().rooms.lazy_loading.lazy_load_was_sent_before( &sender_user, &sender_device, &room_id, &event.sender, )? || lazy_load_send_redundant { - if let Some(member_event) = services().rooms.room_state_get( + if let Some(member_event) = services().rooms.state_accessor.room_state_get( &room_id, &StateEventType::RoomMember, event.sender.as_str(), @@ -523,7 +527,7 @@ async fn sync_helper( } } - services().rooms.lazy_load_mark_sent( + services().rooms.lazy_loading.lazy_load_mark_sent( &sender_user, &sender_device, &room_id, @@ -533,11 +537,12 @@ async fn sync_helper( let encrypted_room = services() .rooms - .state_get(current_shortstatehash, &StateEventType::RoomEncryption, "")? + .state_accessor.state_get(current_shortstatehash, &StateEventType::RoomEncryption, "")? .is_some(); let since_encryption = services().rooms + .state_accessor .state_get(since_shortstatehash, &StateEventType::RoomEncryption, "")?; // Calculations: @@ -588,6 +593,7 @@ async fn sync_helper( // If the user is in a new encrypted room, give them all joined users device_list_updates.extend( services().rooms + .state_cache .room_members(&room_id) .flatten() .filter(|user_id| { @@ -627,6 +633,7 @@ async fn sync_helper( let notification_count = if send_notification_counts { Some( services().rooms + .user .notification_count(&sender_user, &room_id)? .try_into() .expect("notification count can't go that high"), @@ -638,6 +645,7 @@ async fn sync_helper( let highlight_count = if send_notification_counts { Some( services().rooms + .user .highlight_count(&sender_user, &room_id)? .try_into() .expect("highlight count can't go that high"), @@ -649,7 +657,7 @@ async fn sync_helper( let prev_batch = timeline_pdus .first() .map_or(Ok::<_, Error>(None), |(pdu_id, _)| { - Ok(Some(services().rooms.pdu_count(pdu_id)?.to_string())) + Ok(Some(services().rooms.timeline.pdu_count(pdu_id)?.to_string())) })?; let room_events: Vec<_> = timeline_pdus @@ -660,15 +668,16 @@ async fn sync_helper( let mut edus: Vec<_> = services() .rooms .edus + .read_receipt .readreceipts_since(&room_id, since) .filter_map(|r| r.ok()) // Filter out buggy events .map(|(_, _, v)| v) .collect(); - if services().rooms.edus.last_typing_update(&room_id, &services().globals)? > since { + if services().rooms.edus.typing.last_typing_update(&room_id)? > since { edus.push( serde_json::from_str( - &serde_json::to_string(&services().rooms.edus.typings_all(&room_id)?) + &serde_json::to_string(&services().rooms.edus.typing.typings_all(&room_id)?) .expect("event is valid, we just created it"), ) .expect("event is valid, we just created it"), @@ -676,7 +685,7 @@ async fn sync_helper( } // Save the state after this sync so we can send the correct state diff next sync - services().rooms + services().rooms.user .associate_token_shortstatehash(&room_id, next_batch, current_shortstatehash)?; let joined_room = JoinedRoom { @@ -723,6 +732,7 @@ async fn sync_helper( for (user_id, presence) in services().rooms .edus + .presence .presence_since(&room_id, since)? { match presence_updates.entry(user_id) { @@ -755,7 +765,7 @@ async fn sync_helper( } let mut left_rooms = BTreeMap::new(); - let all_left_rooms: Vec<_> = services().rooms.rooms_left(&sender_user).collect(); + let all_left_rooms: Vec<_> = services().rooms.state_cache.rooms_left(&sender_user).collect(); for result in all_left_rooms { let (room_id, left_state_events) = result?; @@ -773,7 +783,7 @@ async fn sync_helper( drop(insert_lock); } - let left_count = services().rooms.get_left_count(&room_id, &sender_user)?; + let left_count = services().rooms.state_cache.get_left_count(&room_id, &sender_user)?; // Left before last sync if Some(since) >= left_count { @@ -797,7 +807,7 @@ async fn sync_helper( } let mut invited_rooms = BTreeMap::new(); - let all_invited_rooms: Vec<_> = services().rooms.rooms_invited(&sender_user).collect(); + let all_invited_rooms: Vec<_> = services().rooms.state_cache.rooms_invited(&sender_user).collect(); for result in all_invited_rooms { let (room_id, invite_state_events) = result?; @@ -815,7 +825,7 @@ async fn sync_helper( drop(insert_lock); } - let invite_count = services().rooms.get_invite_count(&room_id, &sender_user)?; + let invite_count = services().rooms.state_cache.get_invite_count(&room_id, &sender_user)?; // Invited before last sync if Some(since) >= invite_count { @@ -835,12 +845,13 @@ async fn sync_helper( for user_id in left_encrypted_users { let still_share_encrypted_room = services() .rooms + .user .get_shared_rooms(vec![sender_user.clone(), user_id.clone()])? .filter_map(|r| r.ok()) .filter_map(|other_room_id| { Some( services().rooms - .room_state_get(&other_room_id, &StateEventType::RoomEncryption, "") + .state_accessor.room_state_get(&other_room_id, &StateEventType::RoomEncryption, "") .ok()? .is_some(), ) @@ -925,12 +936,14 @@ fn share_encrypted_room( ) -> Result { Ok(services() .rooms + .user .get_shared_rooms(vec![sender_user.to_owned(), user_id.to_owned()])? .filter_map(|r| r.ok()) .filter(|room_id| room_id != ignore_room) .filter_map(|other_room_id| { Some( services().rooms + .state_accessor .room_state_get(&other_room_id, &StateEventType::RoomEncryption, "") .ok()? .is_some(), diff --git a/src/api/client_server/typing.rs b/src/api/client_server/typing.rs index afd5d6b3..abb669b1 100644 --- a/src/api/client_server/typing.rs +++ b/src/api/client_server/typing.rs @@ -11,7 +11,7 @@ pub async fn create_typing_event_route( let sender_user = body.sender_user.as_ref().expect("user is authenticated"); - if !services().rooms.is_joined(sender_user, &body.room_id)? { + if !services().rooms.state_cache.is_joined(sender_user, &body.room_id)? { return Err(Error::BadRequest( ErrorKind::Forbidden, "You are not in this room.", @@ -19,14 +19,14 @@ pub async fn create_typing_event_route( } if let Typing::Yes(duration) = body.state { - services().rooms.edus.typing_add( + services().rooms.edus.typing.typing_add( sender_user, &body.room_id, duration.as_millis() as u64 + utils::millis_since_unix_epoch(), )?; } else { services().rooms - .edus + .edus.typing .typing_remove(sender_user, &body.room_id)?; } diff --git a/src/api/client_server/user_directory.rs b/src/api/client_server/user_directory.rs index 60b4e2fa..c94a283e 100644 --- a/src/api/client_server/user_directory.rs +++ b/src/api/client_server/user_directory.rs @@ -50,11 +50,11 @@ pub async fn search_users_route( let user_is_in_public_rooms = services().rooms - .rooms_joined(&user_id) + .state_cache.rooms_joined(&user_id) .filter_map(|r| r.ok()) .any(|room| { services().rooms - .room_state_get(&room, &StateEventType::RoomJoinRules, "") + .state_accessor.room_state_get(&room, &StateEventType::RoomJoinRules, "") .map_or(false, |event| { event.map_or(false, |event| { serde_json::from_str(event.content.get()) @@ -71,7 +71,7 @@ pub async fn search_users_route( let user_is_in_shared_rooms = services() .rooms - .get_shared_rooms(vec![sender_user.clone(), user_id.clone()]) + .user.get_shared_rooms(vec![sender_user.clone(), user_id.clone()]) .ok()? .next() .is_some(); diff --git a/src/api/client_server/voip.rs b/src/api/client_server/voip.rs index 2a804f97..9917979c 100644 --- a/src/api/client_server/voip.rs +++ b/src/api/client_server/voip.rs @@ -14,7 +14,7 @@ pub async fn turn_server_route( ) -> Result { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); - let turn_secret = services().globals.turn_secret(); + let turn_secret = services().globals.turn_secret().clone(); let (username, password) = if !turn_secret.is_empty() { let expiry = SecondsSinceUnixEpoch::from_system_time( diff --git a/src/api/server_server.rs b/src/api/server_server.rs index bacc1ac7..9aa2beb9 100644 --- a/src/api/server_server.rs +++ b/src/api/server_server.rs @@ -669,7 +669,7 @@ pub async fn send_transaction_message_route( } }; - acl_check(&sender_servername, &room_id)?; + services().rooms.event_handler.acl_check(&sender_servername, &room_id)?; let mutex = Arc::clone( services().globals @@ -727,7 +727,7 @@ pub async fn send_transaction_message_route( .event_ids .iter() .filter_map(|id| { - services().rooms.get_pdu_count(id).ok().flatten().map(|r| (id, r)) + services().rooms.timeline.get_pdu_count(id).ok().flatten().map(|r| (id, r)) }) .max_by_key(|(_, count)| *count) { @@ -744,7 +744,7 @@ pub async fn send_transaction_message_route( content: ReceiptEventContent(receipt_content), room_id: room_id.clone(), }; - services().rooms.edus.readreceipt_update( + services().rooms.edus.read_receipt.readreceipt_update( &user_id, &room_id, event, @@ -757,15 +757,15 @@ pub async fn send_transaction_message_route( } } Edu::Typing(typing) => { - if services().rooms.is_joined(&typing.user_id, &typing.room_id)? { + if services().rooms.state_cache.is_joined(&typing.user_id, &typing.room_id)? { if typing.typing { - services().rooms.edus.typing_add( + services().rooms.edus.typing.typing_add( &typing.user_id, &typing.room_id, 3000 + utils::millis_since_unix_epoch(), )?; } else { - services().rooms.edus.typing_remove( + services().rooms.edus.typing.typing_remove( &typing.user_id, &typing.room_id, )?; @@ -1031,7 +1031,7 @@ pub(crate) async fn get_auth_chain<'a>( let mut i = 0; for id in starting_events { - let short = services().rooms.get_or_create_shorteventid(&id)?; + let short = services().rooms.short.get_or_create_shorteventid(&id)?; let bucket_id = (short % NUM_BUCKETS as u64) as usize; buckets[bucket_id].insert((short, id.clone())); i += 1; @@ -1050,7 +1050,7 @@ pub(crate) async fn get_auth_chain<'a>( } let chunk_key: Vec = chunk.iter().map(|(short, _)| short).copied().collect(); - if let Some(cached) = services().rooms.get_auth_chain_from_cache(&chunk_key)? { + if let Some(cached) = services().rooms.auth_chain.get_auth_chain_from_cache(&chunk_key)? { hits += 1; full_auth_chain.extend(cached.iter().copied()); continue; @@ -1062,13 +1062,14 @@ pub(crate) async fn get_auth_chain<'a>( let mut misses2 = 0; let mut i = 0; for (sevent_id, event_id) in chunk { - if let Some(cached) = services().rooms.get_auth_chain_from_cache(&[sevent_id])? { + if let Some(cached) = services().rooms.auth_chain.get_auth_chain_from_cache(&[sevent_id])? { hits2 += 1; chunk_cache.extend(cached.iter().copied()); } else { misses2 += 1; let auth_chain = Arc::new(get_auth_chain_inner(room_id, &event_id)?); services().rooms + .auth_chain .cache_auth_chain(vec![sevent_id], Arc::clone(&auth_chain))?; println!( "cache missed event {} with auth chain len {}", @@ -1091,7 +1092,7 @@ pub(crate) async fn get_auth_chain<'a>( ); let chunk_cache = Arc::new(chunk_cache); services().rooms - .cache_auth_chain(chunk_key, Arc::clone(&chunk_cache))?; + .auth_chain.cache_auth_chain(chunk_key, Arc::clone(&chunk_cache))?; full_auth_chain.extend(chunk_cache.iter()); } @@ -1104,7 +1105,7 @@ pub(crate) async fn get_auth_chain<'a>( Ok(full_auth_chain .into_iter() - .filter_map(move |sid| services().rooms.get_eventid_from_short(sid).ok())) + .filter_map(move |sid| services().rooms.short.get_eventid_from_short(sid).ok())) } #[tracing::instrument(skip(event_id))] @@ -1116,14 +1117,14 @@ fn get_auth_chain_inner( let mut found = HashSet::new(); while let Some(event_id) = todo.pop() { - match services().rooms.get_pdu(&event_id) { + match services().rooms.timeline.get_pdu(&event_id) { Ok(Some(pdu)) => { if pdu.room_id != room_id { return Err(Error::BadRequest(ErrorKind::Forbidden, "Evil event in db")); } for auth_event in &pdu.auth_events { let sauthevent = services() - .rooms + .rooms.short .get_or_create_shorteventid(auth_event)?; if !found.contains(&sauthevent) { @@ -1162,7 +1163,7 @@ pub async fn get_event_route( .expect("server is authenticated"); let event = services() - .rooms + .rooms.timeline .get_pdu_json(&body.event_id)? .ok_or(Error::BadRequest(ErrorKind::NotFound, "Event not found."))?; @@ -1174,7 +1175,7 @@ pub async fn get_event_route( let room_id = <&RoomId>::try_from(room_id_str) .map_err(|_| Error::bad_database("Invalid room id field in event in database"))?; - if !services().rooms.server_in_room(sender_servername, room_id)? { + if !services().rooms.state_cache.server_in_room(sender_servername, room_id)? { return Err(Error::BadRequest( ErrorKind::Forbidden, "Server is not in room", @@ -1203,21 +1204,21 @@ pub async fn get_missing_events_route( .as_ref() .expect("server is authenticated"); - if !services().rooms.server_in_room(sender_servername, &body.room_id)? { + if !services().rooms.state_cache.server_in_room(sender_servername, &body.room_id)? { return Err(Error::BadRequest( ErrorKind::Forbidden, "Server is not in room", )); } - acl_check(sender_servername, &body.room_id)?; + services().rooms.event_handler.acl_check(&sender_servername, &body.room_id)?; let mut queued_events = body.latest_events.clone(); let mut events = Vec::new(); let mut i = 0; while i < queued_events.len() && events.len() < u64::from(body.limit) as usize { - if let Some(pdu) = services().rooms.get_pdu_json(&queued_events[i])? { + if let Some(pdu) = services().rooms.timeline.get_pdu_json(&queued_events[i])? { let room_id_str = pdu .get("room_id") .and_then(|val| val.as_str()) @@ -1275,17 +1276,17 @@ pub async fn get_event_authorization_route( .as_ref() .expect("server is authenticated"); - if !services().rooms.server_in_room(sender_servername, &body.room_id)? { + if !services().rooms.state_cache.server_in_room(sender_servername, &body.room_id)? { return Err(Error::BadRequest( ErrorKind::Forbidden, "Server is not in room.", )); } - acl_check(sender_servername, &body.room_id)?; + services().rooms.event_handler.acl_check(&sender_servername, &body.room_id)?; let event = services() - .rooms + .rooms.timeline .get_pdu_json(&body.event_id)? .ok_or(Error::BadRequest(ErrorKind::NotFound, "Event not found."))?; @@ -1301,7 +1302,7 @@ pub async fn get_event_authorization_route( Ok(get_event_authorization::v1::Response { auth_chain: auth_chain_ids - .filter_map(|id| services().rooms.get_pdu_json(&id).ok()?) + .filter_map(|id| services().rooms.timeline.get_pdu_json(&id).ok()?) .map(PduEvent::convert_to_outgoing_federation_event) .collect(), }) @@ -1322,17 +1323,17 @@ pub async fn get_room_state_route( .as_ref() .expect("server is authenticated"); - if !services().rooms.server_in_room(sender_servername, &body.room_id)? { + if !services().rooms.state_cache.server_in_room(sender_servername, &body.room_id)? { return Err(Error::BadRequest( ErrorKind::Forbidden, "Server is not in room.", )); } - acl_check(sender_servername, &body.room_id)?; + services().rooms.event_handler.acl_check(&sender_servername, &body.room_id)?; let shortstatehash = services() - .rooms + .rooms.state_accessor .pdu_shortstatehash(&body.event_id)? .ok_or(Error::BadRequest( ErrorKind::NotFound, @@ -1340,13 +1341,13 @@ pub async fn get_room_state_route( ))?; let pdus = services() - .rooms + .rooms.state_accessor .state_full_ids(shortstatehash) .await? .into_iter() .map(|(_, id)| { PduEvent::convert_to_outgoing_federation_event( - services().rooms.get_pdu_json(&id).unwrap().unwrap(), + services().rooms.timeline.get_pdu_json(&id).unwrap().unwrap(), ) }) .collect(); @@ -1357,7 +1358,7 @@ pub async fn get_room_state_route( Ok(get_room_state::v1::Response { auth_chain: auth_chain_ids .map(|id| { - services().rooms.get_pdu_json(&id).map(|maybe_json| { + services().rooms.timeline.get_pdu_json(&id).map(|maybe_json| { PduEvent::convert_to_outgoing_federation_event(maybe_json.unwrap()) }) }) @@ -1382,17 +1383,17 @@ pub async fn get_room_state_ids_route( .as_ref() .expect("server is authenticated"); - if !services().rooms.server_in_room(sender_servername, &body.room_id)? { + if !services().rooms.state_cache.server_in_room(sender_servername, &body.room_id)? { return Err(Error::BadRequest( ErrorKind::Forbidden, "Server is not in room.", )); } - acl_check(sender_servername, &body.room_id)?; + services().rooms.event_handler.acl_check(&sender_servername, &body.room_id)?; let shortstatehash = services() - .rooms + .rooms.state_accessor .pdu_shortstatehash(&body.event_id)? .ok_or(Error::BadRequest( ErrorKind::NotFound, @@ -1400,7 +1401,7 @@ pub async fn get_room_state_ids_route( ))?; let pdu_ids = services() - .rooms + .rooms.state_accessor .state_full_ids(shortstatehash) .await? .into_iter() @@ -1426,7 +1427,7 @@ pub async fn create_join_event_template_route( return Err(Error::bad_config("Federation is disabled.")); } - if !services().rooms.exists(&body.room_id)? { + if !services().rooms.metadata.exists(&body.room_id)? { return Err(Error::BadRequest( ErrorKind::NotFound, "Room is unknown to this server.", @@ -1438,7 +1439,7 @@ pub async fn create_join_event_template_route( .as_ref() .expect("server is authenticated"); - acl_check(sender_servername, &body.room_id)?; + services().rooms.event_handler.acl_check(&sender_servername, &body.room_id)?; let mutex_state = Arc::clone( services().globals @@ -1452,7 +1453,7 @@ pub async fn create_join_event_template_route( // TODO: Conduit does not implement restricted join rules yet, we always reject let join_rules_event = - services().rooms + services().rooms.state_accessor .room_state_get(&body.room_id, &StateEventType::RoomJoinRules, "")?; let join_rules_event_content: Option = join_rules_event @@ -1477,8 +1478,8 @@ pub async fn create_join_event_template_route( } } - let room_version_id = services().rooms.state.get_room_version(&body.room_id); - if !body.ver.contains(room_version_id) { + let room_version_id = services().rooms.state.get_room_version(&body.room_id)?; + if !body.ver.contains(&room_version_id) { return Err(Error::BadRequest( ErrorKind::IncompatibleRoomVersion { room_version: room_version_id, @@ -1505,7 +1506,7 @@ pub async fn create_join_event_template_route( unsigned: None, state_key: Some(body.user_id.to_string()), redacts: None, - }, &body.user_id, &body.room_id, &state_lock); + }, &body.user_id, &body.room_id, &state_lock)?; drop(state_lock); @@ -1524,18 +1525,18 @@ async fn create_join_event( return Err(Error::bad_config("Federation is disabled.")); } - if !services().rooms.exists(room_id)? { + if !services().rooms.metadata.exists(room_id)? { return Err(Error::BadRequest( ErrorKind::NotFound, "Room is unknown to this server.", )); } - acl_check(sender_servername, room_id)?; + services().rooms.event_handler.acl_check(&sender_servername, room_id)?; // TODO: Conduit does not implement restricted join rules yet, we always reject let join_rules_event = services() - .rooms + .rooms.state_accessor .room_state_get(room_id, &StateEventType::RoomJoinRules, "")?; let join_rules_event_content: Option = join_rules_event @@ -1562,8 +1563,8 @@ async fn create_join_event( // We need to return the state prior to joining, let's keep a reference to that here let shortstatehash = services() - .rooms - .current_shortstatehash(room_id)? + .rooms.state + .get_room_shortstatehash(room_id)? .ok_or(Error::BadRequest( ErrorKind::NotFound, "Pdu state not found.", @@ -1602,22 +1603,15 @@ async fn create_join_event( .or_default(), ); let mutex_lock = mutex.lock().await; - let pdu_id = services().rooms.event_handler.handle_incoming_pdu(&origin, &event_id, room_id, value, true, &pub_key_map) - .await - .map_err(|e| { - warn!("Error while handling incoming send join PDU: {}", e); - Error::BadRequest( - ErrorKind::InvalidParam, - "Error while handling incoming PDU.", - ) - })? + let pdu_id: Vec = services().rooms.event_handler.handle_incoming_pdu(&origin, &event_id, room_id, value, true, &pub_key_map) + .await? .ok_or(Error::BadRequest( ErrorKind::InvalidParam, "Could not accept incoming PDU as timeline event.", ))?; drop(mutex_lock); - let state_ids = services().rooms.state_full_ids(shortstatehash).await?; + let state_ids = services().rooms.state_accessor.state_full_ids(shortstatehash).await?; let auth_chain_ids = get_auth_chain( room_id, state_ids.iter().map(|(_, id)| id.clone()).collect(), @@ -1626,6 +1620,7 @@ async fn create_join_event( let servers = services() .rooms + .state_cache .room_servers(room_id) .filter_map(|r| r.ok()) .filter(|server| &**server != services().globals.server_name()); @@ -1634,12 +1629,12 @@ async fn create_join_event( Ok(RoomState { auth_chain: auth_chain_ids - .filter_map(|id| services().rooms.get_pdu_json(&id).ok().flatten()) + .filter_map(|id| services().rooms.timeline.get_pdu_json(&id).ok().flatten()) .map(PduEvent::convert_to_outgoing_federation_event) .collect(), state: state_ids .iter() - .filter_map(|(_, id)| services().rooms.get_pdu_json(id).ok().flatten()) + .filter_map(|(_, id)| services().rooms.timeline.get_pdu_json(id).ok().flatten()) .map(PduEvent::convert_to_outgoing_federation_event) .collect(), }) @@ -1692,7 +1687,7 @@ pub async fn create_invite_route( .as_ref() .expect("server is authenticated"); - acl_check(sender_servername, &body.room_id)?; + services().rooms.event_handler.acl_check(&sender_servername, &body.room_id)?; if !services().rooms.is_supported_version(&body.room_version) { return Err(Error::BadRequest( @@ -1767,8 +1762,8 @@ pub async fn create_invite_route( invite_state.push(pdu.to_stripped_state_event()); // If the room already exists, the remote server will notify us about the join via /send - if !services().rooms.exists(&pdu.room_id)? { - services().rooms.update_membership( + if !services().rooms.metadata.exists(&pdu.room_id)? { + services().rooms.state_cache.update_membership( &body.room_id, &invited_user, MembershipState::Invite, @@ -1931,274 +1926,6 @@ pub async fn claim_keys_route( }) } -#[tracing::instrument(skip_all)] -pub(crate) async fn fetch_required_signing_keys( - event: &BTreeMap, - pub_key_map: &RwLock>>, -) -> Result<()> { - let signatures = event - .get("signatures") - .ok_or(Error::BadServerResponse( - "No signatures in server response pdu.", - ))? - .as_object() - .ok_or(Error::BadServerResponse( - "Invalid signatures object in server response pdu.", - ))?; - - // We go through all the signatures we see on the value and fetch the corresponding signing - // keys - for (signature_server, signature) in signatures { - let signature_object = signature.as_object().ok_or(Error::BadServerResponse( - "Invalid signatures content object in server response pdu.", - ))?; - - let signature_ids = signature_object.keys().cloned().collect::>(); - - let fetch_res = fetch_signing_keys( - signature_server.as_str().try_into().map_err(|_| { - Error::BadServerResponse("Invalid servername in signatures of server response pdu.") - })?, - signature_ids, - ) - .await; - - let keys = match fetch_res { - Ok(keys) => keys, - Err(_) => { - warn!("Signature verification failed: Could not fetch signing key.",); - continue; - } - }; - - pub_key_map - .write() - .map_err(|_| Error::bad_database("RwLock is poisoned."))? - .insert(signature_server.clone(), keys); - } - - Ok(()) -} - -// Gets a list of servers for which we don't have the signing key yet. We go over -// the PDUs and either cache the key or add it to the list that needs to be retrieved. -fn get_server_keys_from_cache( - pdu: &RawJsonValue, - servers: &mut BTreeMap, BTreeMap, QueryCriteria>>, - room_version: &RoomVersionId, - pub_key_map: &mut RwLockWriteGuard<'_, BTreeMap>>, -) -> Result<()> { - let value: CanonicalJsonObject = serde_json::from_str(pdu.get()).map_err(|e| { - error!("Invalid PDU in server response: {:?}: {:?}", pdu, e); - Error::BadServerResponse("Invalid PDU in server response") - })?; - - let event_id = format!( - "${}", - ruma::signatures::reference_hash(&value, room_version) - .expect("ruma can calculate reference hashes") - ); - let event_id = <&EventId>::try_from(event_id.as_str()) - .expect("ruma's reference hashes are valid event ids"); - - if let Some((time, tries)) = services() - .globals - .bad_event_ratelimiter - .read() - .unwrap() - .get(event_id) - { - // Exponential backoff - let mut min_elapsed_duration = Duration::from_secs(30) * (*tries) * (*tries); - if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) { - min_elapsed_duration = Duration::from_secs(60 * 60 * 24); - } - - if time.elapsed() < min_elapsed_duration { - debug!("Backing off from {}", event_id); - return Err(Error::BadServerResponse("bad event, still backing off")); - } - } - - let signatures = value - .get("signatures") - .ok_or(Error::BadServerResponse( - "No signatures in server response pdu.", - ))? - .as_object() - .ok_or(Error::BadServerResponse( - "Invalid signatures object in server response pdu.", - ))?; - - for (signature_server, signature) in signatures { - let signature_object = signature.as_object().ok_or(Error::BadServerResponse( - "Invalid signatures content object in server response pdu.", - ))?; - - let signature_ids = signature_object.keys().cloned().collect::>(); - - let contains_all_ids = - |keys: &BTreeMap| signature_ids.iter().all(|id| keys.contains_key(id)); - - let origin = <&ServerName>::try_from(signature_server.as_str()).map_err(|_| { - Error::BadServerResponse("Invalid servername in signatures of server response pdu.") - })?; - - if servers.contains_key(origin) || pub_key_map.contains_key(origin.as_str()) { - continue; - } - - trace!("Loading signing keys for {}", origin); - - let result: BTreeMap<_, _> = services() - .globals - .signing_keys_for(origin)? - .into_iter() - .map(|(k, v)| (k.to_string(), v.key)) - .collect(); - - if !contains_all_ids(&result) { - trace!("Signing key not loaded for {}", origin); - servers.insert(origin.to_owned(), BTreeMap::new()); - } - - pub_key_map.insert(origin.to_string(), result); - } - - Ok(()) -} - -pub(crate) async fn fetch_join_signing_keys( - event: &create_join_event::v2::Response, - room_version: &RoomVersionId, - pub_key_map: &RwLock>>, -) -> Result<()> { - let mut servers: BTreeMap, BTreeMap, QueryCriteria>> = - BTreeMap::new(); - - { - let mut pkm = pub_key_map - .write() - .map_err(|_| Error::bad_database("RwLock is poisoned."))?; - - // Try to fetch keys, failure is okay - // Servers we couldn't find in the cache will be added to `servers` - for pdu in &event.room_state.state { - let _ = get_server_keys_from_cache(pdu, &mut servers, room_version, &mut pkm); - } - for pdu in &event.room_state.auth_chain { - let _ = get_server_keys_from_cache(pdu, &mut servers, room_version, &mut pkm); - } - - drop(pkm); - } - - if servers.is_empty() { - // We had all keys locally - return Ok(()); - } - - for server in services().globals.trusted_servers() { - trace!("Asking batch signing keys from trusted server {}", server); - if let Ok(keys) = services() - .sending - .send_federation_request( - server, - get_remote_server_keys_batch::v2::Request { - server_keys: servers.clone(), - }, - ) - .await - { - trace!("Got signing keys: {:?}", keys); - let mut pkm = pub_key_map - .write() - .map_err(|_| Error::bad_database("RwLock is poisoned."))?; - for k in keys.server_keys { - let k = k.deserialize().unwrap(); - - // TODO: Check signature from trusted server? - servers.remove(&k.server_name); - - let result = services() - .globals - .add_signing_key(&k.server_name, k.clone())? - .into_iter() - .map(|(k, v)| (k.to_string(), v.key)) - .collect::>(); - - pkm.insert(k.server_name.to_string(), result); - } - } - - if servers.is_empty() { - return Ok(()); - } - } - - let mut futures: FuturesUnordered<_> = servers - .into_iter() - .map(|(server, _)| async move { - ( - services().sending - .send_federation_request( - &server, - get_server_keys::v2::Request::new(), - ) - .await, - server, - ) - }) - .collect(); - - while let Some(result) = futures.next().await { - if let (Ok(get_keys_response), origin) = result { - let result: BTreeMap<_, _> = services() - .globals - .add_signing_key(&origin, get_keys_response.server_key.deserialize().unwrap())? - .into_iter() - .map(|(k, v)| (k.to_string(), v.key)) - .collect(); - - pub_key_map - .write() - .map_err(|_| Error::bad_database("RwLock is poisoned."))? - .insert(origin.to_string(), result); - } - } - - Ok(()) -} - -/// Returns Ok if the acl allows the server -fn acl_check(server_name: &ServerName, room_id: &RoomId) -> Result<()> { - let acl_event = match services() - .rooms - .room_state_get(room_id, &StateEventType::RoomServerAcl, "")? - { - Some(acl) => acl, - None => return Ok(()), - }; - - let acl_event_content: RoomServerAclEventContent = - match serde_json::from_str(acl_event.content.get()) { - Ok(content) => content, - Err(_) => { - warn!("Invalid ACL event"); - return Ok(()); - } - }; - - if acl_event_content.is_allowed(server_name) { - Ok(()) - } else { - Err(Error::BadRequest( - ErrorKind::Forbidden, - "Server was denied by ACL", - )) - } -} - #[cfg(test)] mod tests { use super::{add_port_to_hostname, get_ip_with_port, FedDest}; diff --git a/src/database/key_value/appservice.rs b/src/database/key_value/appservice.rs index edb027e9..f427ba71 100644 --- a/src/database/key_value/appservice.rs +++ b/src/database/key_value/appservice.rs @@ -54,11 +54,11 @@ impl service::appservice::Data for KeyValueDatabase { ) } - fn iter_ids(&self) -> Result>>> { - Ok(self.id_appserviceregistrations.iter().map(|(id, _)| { + fn iter_ids<'a>(&'a self) -> Result> + 'a>> { + Ok(Box::new(self.id_appserviceregistrations.iter().map(|(id, _)| { utils::string_from_bytes(&id) .map_err(|_| Error::bad_database("Invalid id bytes in id_appserviceregistrations.")) - })) + }))) } fn all(&self) -> Result> { diff --git a/src/database/key_value/rooms/directory.rs b/src/database/key_value/rooms/directory.rs index c48afa9a..727004e7 100644 --- a/src/database/key_value/rooms/directory.rs +++ b/src/database/key_value/rooms/directory.rs @@ -16,13 +16,13 @@ impl service::rooms::directory::Data for KeyValueDatabase { } fn public_rooms(&self) -> Box>>> { - self.publicroomids.iter().map(|(bytes, _)| { + Box::new(self.publicroomids.iter().map(|(bytes, _)| { RoomId::parse( utils::string_from_bytes(&bytes).map_err(|_| { Error::bad_database("Room ID in publicroomids is invalid unicode.") })?, ) .map_err(|_| Error::bad_database("Room ID in publicroomids is invalid.")) - }) + })) } } diff --git a/src/database/mod.rs b/src/database/mod.rs index 4ea619a8..22bfef06 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -513,7 +513,7 @@ impl KeyValueDatabase { let states_parents = last_roomsstatehash.map_or_else( || Ok(Vec::new()), |&last_roomsstatehash| { - db.rooms.load_shortstatehash_info(dbg!(last_roomsstatehash)) + db.rooms.state_accessor.load_shortstatehash_info(dbg!(last_roomsstatehash)) }, )?; diff --git a/src/lib.rs b/src/lib.rs index c6e65697..72399003 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,7 +28,7 @@ enum ServicesEnum { Rocksdb(Services) } -pub fn services() -> Services { - SERVICES.read().unwrap() +pub fn services<'a>() -> &'a Services { + &SERVICES.read().unwrap() } diff --git a/src/service/account_data/mod.rs b/src/service/account_data/mod.rs index 7a399223..c56c69d2 100644 --- a/src/service/account_data/mod.rs +++ b/src/service/account_data/mod.rs @@ -31,80 +31,18 @@ impl Service { event_type: RoomAccountDataEventType, data: &T, ) -> Result<()> { - let mut prefix = room_id - .map(|r| r.to_string()) - .unwrap_or_default() - .as_bytes() - .to_vec(); - prefix.push(0xff); - prefix.extend_from_slice(user_id.as_bytes()); - prefix.push(0xff); - - let mut roomuserdataid = prefix.clone(); - roomuserdataid.extend_from_slice(&services().globals.next_count()?.to_be_bytes()); - roomuserdataid.push(0xff); - roomuserdataid.extend_from_slice(event_type.to_string().as_bytes()); - - let mut key = prefix; - key.extend_from_slice(event_type.to_string().as_bytes()); - - let json = serde_json::to_value(data).expect("all types here can be serialized"); // TODO: maybe add error handling - if json.get("type").is_none() || json.get("content").is_none() { - return Err(Error::BadRequest( - ErrorKind::InvalidParam, - "Account data doesn't have all required fields.", - )); - } - - self.roomuserdataid_accountdata.insert( - &roomuserdataid, - &serde_json::to_vec(&json).expect("to_vec always works on json values"), - )?; - - let prev = self.roomusertype_roomuserdataid.get(&key)?; - - self.roomusertype_roomuserdataid - .insert(&key, &roomuserdataid)?; - - // Remove old entry - if let Some(prev) = prev { - self.roomuserdataid_accountdata.remove(&prev)?; - } - - Ok(()) + self.db.update(room_id, user_id, event_type, data) } /// Searches the account data for a specific kind. - #[tracing::instrument(skip(self, room_id, user_id, kind))] + #[tracing::instrument(skip(self, room_id, user_id, event_type))] pub fn get( &self, room_id: Option<&RoomId>, user_id: &UserId, - kind: RoomAccountDataEventType, + event_type: RoomAccountDataEventType, ) -> Result> { - let mut key = room_id - .map(|r| r.to_string()) - .unwrap_or_default() - .as_bytes() - .to_vec(); - key.push(0xff); - key.extend_from_slice(user_id.as_bytes()); - key.push(0xff); - key.extend_from_slice(kind.to_string().as_bytes()); - - self.roomusertype_roomuserdataid - .get(&key)? - .and_then(|roomuserdataid| { - self.roomuserdataid_accountdata - .get(&roomuserdataid) - .transpose() - }) - .transpose()? - .map(|data| { - serde_json::from_slice(&data) - .map_err(|_| Error::bad_database("could not deserialize")) - }) - .transpose() + self.db.get(room_id, user_id, event_type) } /// Returns all changes to the account data that happened after `since`. @@ -115,44 +53,6 @@ impl Service { user_id: &UserId, since: u64, ) -> Result>> { - let mut userdata = HashMap::new(); - - let mut prefix = room_id - .map(|r| r.to_string()) - .unwrap_or_default() - .as_bytes() - .to_vec(); - prefix.push(0xff); - prefix.extend_from_slice(user_id.as_bytes()); - prefix.push(0xff); - - // Skip the data that's exactly at since, because we sent that last time - let mut first_possible = prefix.clone(); - first_possible.extend_from_slice(&(since + 1).to_be_bytes()); - - for r in self - .roomuserdataid_accountdata - .iter_from(&first_possible, false) - .take_while(move |(k, _)| k.starts_with(&prefix)) - .map(|(k, v)| { - Ok::<_, Error>(( - RoomAccountDataEventType::try_from( - utils::string_from_bytes(k.rsplit(|&b| b == 0xff).next().ok_or_else( - || Error::bad_database("RoomUserData ID in db is invalid."), - )?) - .map_err(|_| Error::bad_database("RoomUserData ID in db is invalid."))?, - ) - .map_err(|_| Error::bad_database("RoomUserData ID in db is invalid."))?, - serde_json::from_slice::>(&v).map_err(|_| { - Error::bad_database("Database contains invalid account data.") - })?, - )) - }) - { - let (kind, data) = r?; - userdata.insert(kind, data); - } - - Ok(userdata) + self.db.changes_since(room_id, user_id, since) } } diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index dad4ceba..48f828fc 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -192,7 +192,7 @@ impl Service { mutex_lock: &MutexGuard<'_, ()>| { services() .rooms - .build_and_append_pdu( + .timeline.build_and_append_pdu( PduBuilder { event_type: RoomEventType::RoomMessage, content: to_raw_value(&message) @@ -213,7 +213,7 @@ impl Service { Some(event) = receiver.recv() => { let message_content = match event { AdminRoomEvent::SendMessage(content) => content, - AdminRoomEvent::ProcessMessage(room_message) => process_admin_message(room_message).await + AdminRoomEvent::ProcessMessage(room_message) => self.process_admin_message(room_message).await }; let mutex_state = Arc::clone( @@ -254,20 +254,20 @@ impl Service { let command_line = lines.next().expect("each string has at least one line"); let body: Vec<_> = lines.collect(); - let admin_command = match parse_admin_command(&command_line) { + let admin_command = match self.parse_admin_command(&command_line) { Ok(command) => command, Err(error) => { let server_name = services().globals.server_name(); let message = error .to_string() .replace("server.name", server_name.as_str()); - let html_message = usage_to_html(&message, server_name); + let html_message = self.usage_to_html(&message, server_name); return RoomMessageEventContent::text_html(message, html_message); } }; - match process_admin_command(admin_command, body).await { + match self.process_admin_command(admin_command, body).await { Ok(reply_message) => reply_message, Err(error) => { let markdown_message = format!( @@ -367,6 +367,8 @@ impl Service { } } AdminCommand::ListRooms => { + todo!(); + /* let room_ids = services().rooms.iter_ids(); let output = format!( "Rooms:\n{}", @@ -385,6 +387,7 @@ impl Service { .join("\n") ); RoomMessageEventContent::text_plain(output) + */ } AdminCommand::ListLocalUsers => match services().users.list_local_users() { Ok(users) => { @@ -412,7 +415,7 @@ impl Service { } AdminCommand::GetAuthChain { event_id } => { let event_id = Arc::::from(event_id); - if let Some(event) = services().rooms.get_pdu_json(&event_id)? { + if let Some(event) = services().rooms.timeline.get_pdu_json(&event_id)? { let room_id_str = event .get("room_id") .and_then(|val| val.as_str()) @@ -473,10 +476,10 @@ impl Service { } AdminCommand::GetPdu { event_id } => { let mut outlier = false; - let mut pdu_json = services().rooms.get_non_outlier_pdu_json(&event_id)?; + let mut pdu_json = services().rooms.timeline.get_non_outlier_pdu_json(&event_id)?; if pdu_json.is_none() { outlier = true; - pdu_json = services().rooms.get_pdu_json(&event_id)?; + pdu_json = services().rooms.timeline.get_pdu_json(&event_id)?; } match pdu_json { Some(json) => { @@ -506,7 +509,7 @@ impl Service { None => RoomMessageEventContent::text_plain("PDU not found."), } } - AdminCommand::DatabaseMemoryUsage => match services()._db.memory_usage() { + AdminCommand::DatabaseMemoryUsage => match services().globals.db.memory_usage() { Ok(response) => RoomMessageEventContent::text_plain(response), Err(e) => RoomMessageEventContent::text_plain(format!( "Failed to get database memory usage: {}", @@ -825,7 +828,7 @@ impl Service { content.room_version = RoomVersionId::V6; // 1. The room create event - services().rooms.build_and_append_pdu( + services().rooms.timeline.build_and_append_pdu( PduBuilder { event_type: RoomEventType::RoomCreate, content: to_raw_value(&content).expect("event is valid, we just created it"), @@ -839,7 +842,7 @@ impl Service { )?; // 2. Make conduit bot join - services().rooms.build_and_append_pdu( + services().rooms.timeline.build_and_append_pdu( PduBuilder { event_type: RoomEventType::RoomMember, content: to_raw_value(&RoomMemberEventContent { @@ -866,7 +869,7 @@ impl Service { let mut users = BTreeMap::new(); users.insert(conduit_user.clone(), 100.into()); - services().rooms.build_and_append_pdu( + services().rooms.timeline.build_and_append_pdu( PduBuilder { event_type: RoomEventType::RoomPowerLevels, content: to_raw_value(&RoomPowerLevelsEventContent { @@ -884,7 +887,7 @@ impl Service { )?; // 4.1 Join Rules - services().rooms.build_and_append_pdu( + services().rooms.timeline.build_and_append_pdu( PduBuilder { event_type: RoomEventType::RoomJoinRules, content: to_raw_value(&RoomJoinRulesEventContent::new(JoinRule::Invite)) @@ -899,7 +902,7 @@ impl Service { )?; // 4.2 History Visibility - services().rooms.build_and_append_pdu( + services().rooms.timeline.build_and_append_pdu( PduBuilder { event_type: RoomEventType::RoomHistoryVisibility, content: to_raw_value(&RoomHistoryVisibilityEventContent::new( @@ -916,7 +919,7 @@ impl Service { )?; // 4.3 Guest Access - services().rooms.build_and_append_pdu( + services().rooms.timeline.build_and_append_pdu( PduBuilder { event_type: RoomEventType::RoomGuestAccess, content: to_raw_value(&RoomGuestAccessEventContent::new(GuestAccess::Forbidden)) @@ -933,7 +936,7 @@ impl Service { // 5. Events implied by name and topic let room_name = RoomName::parse(format!("{} Admin Room", services().globals.server_name())) .expect("Room name is valid"); - services().rooms.build_and_append_pdu( + services().rooms.timeline.build_and_append_pdu( PduBuilder { event_type: RoomEventType::RoomName, content: to_raw_value(&RoomNameEventContent::new(Some(room_name))) @@ -947,7 +950,7 @@ impl Service { &state_lock, )?; - services().rooms.build_and_append_pdu( + services().rooms.timeline.build_and_append_pdu( PduBuilder { event_type: RoomEventType::RoomTopic, content: to_raw_value(&RoomTopicEventContent { @@ -968,7 +971,7 @@ impl Service { .try_into() .expect("#admins:server_name is a valid alias name"); - services().rooms.build_and_append_pdu( + services().rooms.timeline.build_and_append_pdu( PduBuilder { event_type: RoomEventType::RoomCanonicalAlias, content: to_raw_value(&RoomCanonicalAliasEventContent { @@ -985,7 +988,7 @@ impl Service { &state_lock, )?; - services().rooms.set_alias(&alias, Some(&room_id))?; + services().rooms.alias.set_alias(&alias, &room_id)?; Ok(()) } @@ -1003,7 +1006,8 @@ impl Service { .expect("#admins:server_name is a valid alias name"); let room_id = services() .rooms - .id_from_alias(&admin_room_alias)? + .alias + .resolve_local_alias(&admin_room_alias)? .expect("Admin room must exist"); let mutex_state = Arc::clone( @@ -1021,7 +1025,7 @@ impl Service { .expect("@conduit:server_name is valid"); // Invite and join the real user - services().rooms.build_and_append_pdu( + services().rooms.timeline.build_and_append_pdu( PduBuilder { event_type: RoomEventType::RoomMember, content: to_raw_value(&RoomMemberEventContent { @@ -1043,7 +1047,7 @@ impl Service { &room_id, &state_lock, )?; - services().rooms.build_and_append_pdu( + services().rooms.timeline.build_and_append_pdu( PduBuilder { event_type: RoomEventType::RoomMember, content: to_raw_value(&RoomMemberEventContent { @@ -1071,7 +1075,7 @@ impl Service { users.insert(conduit_user.to_owned(), 100.into()); users.insert(user_id.to_owned(), 100.into()); - services().rooms.build_and_append_pdu( + services().rooms.timeline.build_and_append_pdu( PduBuilder { event_type: RoomEventType::RoomPowerLevels, content: to_raw_value(&RoomPowerLevelsEventContent { @@ -1089,7 +1093,7 @@ impl Service { )?; // Send welcome message - services().rooms.build_and_append_pdu( + services().rooms.timeline.build_and_append_pdu( PduBuilder { event_type: RoomEventType::RoomMessage, content: to_raw_value(&RoomMessageEventContent::text_html( diff --git a/src/service/appservice/data.rs b/src/service/appservice/data.rs index cd48e85d..a70bf9c1 100644 --- a/src/service/appservice/data.rs +++ b/src/service/appservice/data.rs @@ -13,7 +13,7 @@ pub trait Data { fn get_registration(&self, id: &str) -> Result>; - fn iter_ids(&self) -> Result>>>; + fn iter_ids<'a>(&'a self) -> Result> + 'a>>; fn all(&self) -> Result>; } diff --git a/src/service/globals/mod.rs b/src/service/globals/mod.rs index 556ca71c..6cfeab81 100644 --- a/src/service/globals/mod.rs +++ b/src/service/globals/mod.rs @@ -1,6 +1,7 @@ mod data; pub use data::Data; +use crate::api::server_server::FedDest; use crate::service::*; use crate::{Config, utils, Error, Result}; @@ -36,7 +37,7 @@ type SyncHandle = ( ); pub struct Service { - db: D, + pub db: D, pub actual_destination_cache: Arc>, // actual_destination, host pub tls_name_override: Arc>, diff --git a/src/service/key_backups/mod.rs b/src/service/key_backups/mod.rs index 8e842d4e..ce867fb5 100644 --- a/src/service/key_backups/mod.rs +++ b/src/service/key_backups/mod.rs @@ -22,36 +22,11 @@ impl Service { user_id: &UserId, backup_metadata: &Raw, ) -> Result { - let version = services().globals.next_count()?.to_string(); - - let mut key = user_id.as_bytes().to_vec(); - key.push(0xff); - key.extend_from_slice(version.as_bytes()); - - self.backupid_algorithm.insert( - &key, - &serde_json::to_vec(backup_metadata).expect("BackupAlgorithm::to_vec always works"), - )?; - self.backupid_etag - .insert(&key, &services().globals.next_count()?.to_be_bytes())?; - Ok(version) + self.db.create_backup(user_id, backup_metadata) } pub fn delete_backup(&self, user_id: &UserId, version: &str) -> Result<()> { - let mut key = user_id.as_bytes().to_vec(); - key.push(0xff); - key.extend_from_slice(version.as_bytes()); - - self.backupid_algorithm.remove(&key)?; - self.backupid_etag.remove(&key)?; - - key.push(0xff); - - for (outdated_key, _) in self.backupkeyid_backup.scan_prefix(key) { - self.backupkeyid_backup.remove(&outdated_key)?; - } - - Ok(()) + self.db.delete_backup(user_id, version) } pub fn update_backup( @@ -60,74 +35,18 @@ impl Service { version: &str, backup_metadata: &Raw, ) -> Result { - let mut key = user_id.as_bytes().to_vec(); - key.push(0xff); - key.extend_from_slice(version.as_bytes()); - - if self.backupid_algorithm.get(&key)?.is_none() { - return Err(Error::BadRequest( - ErrorKind::NotFound, - "Tried to update nonexistent backup.", - )); - } - - self.backupid_algorithm - .insert(&key, backup_metadata.json().get().as_bytes())?; - self.backupid_etag - .insert(&key, &services().globals.next_count()?.to_be_bytes())?; - Ok(version.to_owned()) + self.db.update_backup(user_id, version, backup_metadata) } pub fn get_latest_backup_version(&self, user_id: &UserId) -> Result> { - let mut prefix = user_id.as_bytes().to_vec(); - prefix.push(0xff); - let mut last_possible_key = prefix.clone(); - last_possible_key.extend_from_slice(&u64::MAX.to_be_bytes()); - - self.backupid_algorithm - .iter_from(&last_possible_key, true) - .take_while(move |(k, _)| k.starts_with(&prefix)) - .next() - .map(|(key, _)| { - utils::string_from_bytes( - key.rsplit(|&b| b == 0xff) - .next() - .expect("rsplit always returns an element"), - ) - .map_err(|_| Error::bad_database("backupid_algorithm key is invalid.")) - }) - .transpose() + self.db.get_latest_backup_version(user_id) } pub fn get_latest_backup( &self, user_id: &UserId, ) -> Result)>> { - let mut prefix = user_id.as_bytes().to_vec(); - prefix.push(0xff); - let mut last_possible_key = prefix.clone(); - last_possible_key.extend_from_slice(&u64::MAX.to_be_bytes()); - - self.backupid_algorithm - .iter_from(&last_possible_key, true) - .take_while(move |(k, _)| k.starts_with(&prefix)) - .next() - .map(|(key, value)| { - let version = utils::string_from_bytes( - key.rsplit(|&b| b == 0xff) - .next() - .expect("rsplit always returns an element"), - ) - .map_err(|_| Error::bad_database("backupid_algorithm key is invalid."))?; - - Ok(( - version, - serde_json::from_slice(&value).map_err(|_| { - Error::bad_database("Algorithm in backupid_algorithm is invalid.") - })?, - )) - }) - .transpose() + self.db.get_latest_backup(user_id) } pub fn get_backup( @@ -135,16 +54,7 @@ impl Service { user_id: &UserId, version: &str, ) -> Result>> { - let mut key = user_id.as_bytes().to_vec(); - key.push(0xff); - key.extend_from_slice(version.as_bytes()); - - self.backupid_algorithm - .get(&key)? - .map_or(Ok(None), |bytes| { - serde_json::from_slice(&bytes) - .map_err(|_| Error::bad_database("Algorithm in backupid_algorithm is invalid.")) - }) + self.db.get_backup(user_id, version) } pub fn add_key( @@ -155,52 +65,15 @@ impl Service { session_id: &str, key_data: &Raw, ) -> Result<()> { - let mut key = user_id.as_bytes().to_vec(); - key.push(0xff); - key.extend_from_slice(version.as_bytes()); - - if self.backupid_algorithm.get(&key)?.is_none() { - return Err(Error::BadRequest( - ErrorKind::NotFound, - "Tried to update nonexistent backup.", - )); - } - - self.backupid_etag - .insert(&key, &services().globals.next_count()?.to_be_bytes())?; - - key.push(0xff); - key.extend_from_slice(room_id.as_bytes()); - key.push(0xff); - key.extend_from_slice(session_id.as_bytes()); - - self.backupkeyid_backup - .insert(&key, key_data.json().get().as_bytes())?; - - Ok(()) + self.db.add_key(user_id, version, room_id, session_id, key_data) } pub fn count_keys(&self, user_id: &UserId, version: &str) -> Result { - let mut prefix = user_id.as_bytes().to_vec(); - prefix.push(0xff); - prefix.extend_from_slice(version.as_bytes()); - - Ok(self.backupkeyid_backup.scan_prefix(prefix).count()) + self.db.count_keys(user_id, version) } pub fn get_etag(&self, user_id: &UserId, version: &str) -> Result { - let mut key = user_id.as_bytes().to_vec(); - key.push(0xff); - key.extend_from_slice(version.as_bytes()); - - Ok(utils::u64_from_bytes( - &self - .backupid_etag - .get(&key)? - .ok_or_else(|| Error::bad_database("Backup has no etag."))?, - ) - .map_err(|_| Error::bad_database("etag in backupid_etag invalid."))? - .to_string()) + self.db.get_etag(user_id, version) } pub fn get_all( @@ -208,55 +81,7 @@ impl Service { user_id: &UserId, version: &str, ) -> Result, RoomKeyBackup>> { - let mut prefix = user_id.as_bytes().to_vec(); - prefix.push(0xff); - prefix.extend_from_slice(version.as_bytes()); - prefix.push(0xff); - - let mut rooms = BTreeMap::, RoomKeyBackup>::new(); - - for result in self - .backupkeyid_backup - .scan_prefix(prefix) - .map(|(key, value)| { - let mut parts = key.rsplit(|&b| b == 0xff); - - let session_id = - utils::string_from_bytes(parts.next().ok_or_else(|| { - Error::bad_database("backupkeyid_backup key is invalid.") - })?) - .map_err(|_| { - Error::bad_database("backupkeyid_backup session_id is invalid.") - })?; - - let room_id = RoomId::parse( - utils::string_from_bytes(parts.next().ok_or_else(|| { - Error::bad_database("backupkeyid_backup key is invalid.") - })?) - .map_err(|_| Error::bad_database("backupkeyid_backup room_id is invalid."))?, - ) - .map_err(|_| { - Error::bad_database("backupkeyid_backup room_id is invalid room id.") - })?; - - let key_data = serde_json::from_slice(&value).map_err(|_| { - Error::bad_database("KeyBackupData in backupkeyid_backup is invalid.") - })?; - - Ok::<_, Error>((room_id, session_id, key_data)) - }) - { - let (room_id, session_id, key_data) = result?; - rooms - .entry(room_id) - .or_insert_with(|| RoomKeyBackup { - sessions: BTreeMap::new(), - }) - .sessions - .insert(session_id, key_data); - } - - Ok(rooms) + self.db.get_all(user_id, version) } pub fn get_room( @@ -265,35 +90,7 @@ impl Service { version: &str, room_id: &RoomId, ) -> Result>> { - let mut prefix = user_id.as_bytes().to_vec(); - prefix.push(0xff); - prefix.extend_from_slice(version.as_bytes()); - prefix.push(0xff); - prefix.extend_from_slice(room_id.as_bytes()); - prefix.push(0xff); - - Ok(self - .backupkeyid_backup - .scan_prefix(prefix) - .map(|(key, value)| { - let mut parts = key.rsplit(|&b| b == 0xff); - - let session_id = - utils::string_from_bytes(parts.next().ok_or_else(|| { - Error::bad_database("backupkeyid_backup key is invalid.") - })?) - .map_err(|_| { - Error::bad_database("backupkeyid_backup session_id is invalid.") - })?; - - let key_data = serde_json::from_slice(&value).map_err(|_| { - Error::bad_database("KeyBackupData in backupkeyid_backup is invalid.") - })?; - - Ok::<_, Error>((session_id, key_data)) - }) - .filter_map(|r| r.ok()) - .collect()) + self.db.get_room(user_id, version, room_id) } pub fn get_session( @@ -303,35 +100,11 @@ impl Service { room_id: &RoomId, session_id: &str, ) -> Result>> { - let mut key = user_id.as_bytes().to_vec(); - key.push(0xff); - key.extend_from_slice(version.as_bytes()); - key.push(0xff); - key.extend_from_slice(room_id.as_bytes()); - key.push(0xff); - key.extend_from_slice(session_id.as_bytes()); - - self.backupkeyid_backup - .get(&key)? - .map(|value| { - serde_json::from_slice(&value).map_err(|_| { - Error::bad_database("KeyBackupData in backupkeyid_backup is invalid.") - }) - }) - .transpose() + self.db.get_session(user_id, version, room_id, session_id) } pub fn delete_all_keys(&self, user_id: &UserId, version: &str) -> Result<()> { - let mut key = user_id.as_bytes().to_vec(); - key.push(0xff); - key.extend_from_slice(version.as_bytes()); - key.push(0xff); - - for (outdated_key, _) in self.backupkeyid_backup.scan_prefix(key) { - self.backupkeyid_backup.remove(&outdated_key)?; - } - - Ok(()) + self.db.delete_all_keys(user_id, version) } pub fn delete_room_keys( @@ -340,18 +113,7 @@ impl Service { version: &str, room_id: &RoomId, ) -> Result<()> { - let mut key = user_id.as_bytes().to_vec(); - key.push(0xff); - key.extend_from_slice(version.as_bytes()); - key.push(0xff); - key.extend_from_slice(room_id.as_bytes()); - key.push(0xff); - - for (outdated_key, _) in self.backupkeyid_backup.scan_prefix(key) { - self.backupkeyid_backup.remove(&outdated_key)?; - } - - Ok(()) + self.db.delete_room_keys(user_id, version, room_id) } pub fn delete_room_key( @@ -361,18 +123,6 @@ impl Service { room_id: &RoomId, session_id: &str, ) -> Result<()> { - let mut key = user_id.as_bytes().to_vec(); - key.push(0xff); - key.extend_from_slice(version.as_bytes()); - key.push(0xff); - key.extend_from_slice(room_id.as_bytes()); - key.push(0xff); - key.extend_from_slice(session_id.as_bytes()); - - for (outdated_key, _) in self.backupkeyid_backup.scan_prefix(key) { - self.backupkeyid_backup.remove(&outdated_key)?; - } - - Ok(()) + self.db.delete_room_key(user_id, version, room_id, session_id) } } diff --git a/src/service/media/mod.rs b/src/service/media/mod.rs index a5aca036..5037809c 100644 --- a/src/service/media/mod.rs +++ b/src/service/media/mod.rs @@ -29,7 +29,7 @@ impl Service { file: &[u8], ) -> Result<()> { // Width, Height = 0 if it's not a thumbnail - let key = self.db.create_file_metadata(mxc, 0, 0, content_disposition, content_type); + let key = self.db.create_file_metadata(mxc, 0, 0, content_disposition, content_type)?; let path = services().globals.get_media_file(&key); let mut f = File::create(path).await?; @@ -42,13 +42,13 @@ impl Service { pub async fn upload_thumbnail( &self, mxc: String, - content_disposition: &Option, - content_type: &Option, + content_disposition: &Option<&str>, + content_type: &Option<&str>, width: u32, height: u32, file: &[u8], ) -> Result<()> { - let key = self.db.create_file_metadata(mxc, width, height, content_disposition, content_type); + let key = self.db.create_file_metadata(mxc, width, height, content_disposition, content_type)?; let path = services().globals.get_media_file(&key); let mut f = File::create(path).await?; diff --git a/src/service/pusher/mod.rs b/src/service/pusher/mod.rs index 66a8ae36..64c7f1fa 100644 --- a/src/service/pusher/mod.rs +++ b/src/service/pusher/mod.rs @@ -138,6 +138,7 @@ impl Service { let power_levels: RoomPowerLevelsEventContent = services() .rooms + .state_accessor .room_state_get(&pdu.room_id, &StateEventType::RoomPowerLevels, "")? .map(|ev| { serde_json::from_str(ev.content.get()) @@ -274,6 +275,7 @@ impl Service { let room_name = if let Some(room_name_pdu) = services().rooms + .state_accessor .room_state_get(&event.room_id, &StateEventType::RoomName, "")? { serde_json::from_str::(room_name_pdu.content.get()) diff --git a/src/service/rooms/auth_chain/mod.rs b/src/service/rooms/auth_chain/mod.rs index 113d2e81..9ea4763e 100644 --- a/src/service/rooms/auth_chain/mod.rs +++ b/src/service/rooms/auth_chain/mod.rs @@ -21,7 +21,7 @@ impl Service { } // We only save auth chains for single events in the db - if key.len == 1 { + if key.len() == 1 { // Check DB cache if let Some(chain) = self.db.get_cached_eventid_authchain(key[0]) { diff --git a/src/service/rooms/edus/mod.rs b/src/service/rooms/edus/mod.rs index a5ce37f1..dbe1b6e8 100644 --- a/src/service/rooms/edus/mod.rs +++ b/src/service/rooms/edus/mod.rs @@ -5,7 +5,7 @@ pub mod typing; pub trait Data: presence::Data + read_receipt::Data + typing::Data {} pub struct Service { - presence: presence::Service, - read_receipt: read_receipt::Service, - typing: typing::Service, + pub presence: presence::Service, + pub read_receipt: read_receipt::Service, + pub typing: typing::Service, } diff --git a/src/service/rooms/event_handler/mod.rs b/src/service/rooms/event_handler/mod.rs index c9b041c2..8a8725b8 100644 --- a/src/service/rooms/event_handler/mod.rs +++ b/src/service/rooms/event_handler/mod.rs @@ -8,22 +8,23 @@ use std::{ time::{Duration, Instant}, }; -use futures_util::Future; +use futures_util::{Future, stream::FuturesUnordered}; use ruma::{ api::{ client::error::ErrorKind, - federation::event::{get_event, get_room_state_ids}, + federation::{event::{get_event, get_room_state_ids}, membership::create_join_event, discovery::get_remote_server_keys_batch::{v2::QueryCriteria, self}}, }, - events::{room::create::RoomCreateEventContent, StateEventType}, + events::{room::{create::RoomCreateEventContent, server_acl::RoomServerAclEventContent}, StateEventType}, int, serde::Base64, signatures::CanonicalJsonValue, state_res::{self, RoomVersion, StateMap}, - uint, EventId, MilliSecondsSinceUnixEpoch, RoomId, ServerName, + uint, EventId, MilliSecondsSinceUnixEpoch, RoomId, ServerName, ServerSigningKeyId, }; +use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; use tracing::{error, info, trace, warn}; -use crate::{service::*, services, Error, PduEvent}; +use crate::{service::*, services, Result, Error, PduEvent}; pub struct Service; @@ -62,10 +63,11 @@ impl Service { is_timeline_event: bool, pub_key_map: &'a RwLock>>, ) -> Result>> { - services().rooms.exists(room_id)?.ok_or(Error::BadRequest( - ErrorKind::NotFound, - "Room is unknown to this server", - ))?; + if !services().rooms.metadata.exists(room_id)? { + return Error::BadRequest( + ErrorKind::NotFound, + "Room is unknown to this server", + )}; services() .rooms @@ -76,17 +78,18 @@ impl Service { ))?; // 1. Skip the PDU if we already have it as a timeline event - if let Some(pdu_id) = services().rooms.get_pdu_id(event_id)? { + if let Some(pdu_id) = services().rooms.timeline.get_pdu_id(event_id)? { return Ok(Some(pdu_id.to_vec())); } let create_event = services() .rooms + .state_accessor .room_state_get(room_id, &StateEventType::RoomCreate, "")? .ok_or_else(|| Error::bad_database("Failed to find create event in db."))?; let first_pdu_in_room = services() - .rooms + .rooms.timeline .first_pdu_in_room(room_id)? .ok_or_else(|| Error::bad_database("Failed to find first pdu in db."))?; @@ -111,7 +114,7 @@ impl Service { room_id, pub_key_map, incoming_pdu.prev_events.clone(), - ); + ).await; let mut errors = 0; for prev_id in dbg!(sorted_prev_events) { @@ -243,7 +246,7 @@ impl Service { room_id: &'a RoomId, value: BTreeMap, pub_key_map: &'a RwLock>>, - ) -> AsyncRecursiveType<'a, Result<(Arc, BTreeMap), String>> + ) -> AsyncRecursiveType<'a, Result<(Arc, BTreeMap)>> { Box::pin(async move { // TODO: For RoomVersion6 we must check that Raw<..> is canonical do we anywhere?: https://matrix.org/docs/spec/rooms/v6#canonical-json @@ -367,11 +370,7 @@ impl Service { &incoming_pdu, None::, // TODO: third party invite |k, s| auth_events.get(&(k.to_string().into(), s.to_owned())), - ) - .map_err(|e| { - error!(e); - Error::BadRequest(ErrorKind::InvalidParam, "Auth check failed") - })? { + )? { return Err(Error::BadRequest( ErrorKind::InvalidParam, "Auth check failed", @@ -400,16 +399,15 @@ impl Service { origin: &ServerName, room_id: &RoomId, pub_key_map: &RwLock>>, - ) -> Result>, String> { + ) -> Result>> { // Skip the PDU if we already have it as a timeline event - if let Ok(Some(pduid)) = services().rooms.get_pdu_id(&incoming_pdu.event_id) { + if let Ok(Some(pduid)) = services().rooms.timeline.get_pdu_id(&incoming_pdu.event_id) { return Ok(Some(pduid)); } if services() .rooms - .is_event_soft_failed(&incoming_pdu.event_id) - .map_err(|_| "Failed to ask db for soft fail".to_owned())? + .pdu_metadata.is_event_soft_failed(&incoming_pdu.event_id)? { return Err("Event has been soft failed".into()); } @@ -438,11 +436,11 @@ impl Service { let prev_event = &*incoming_pdu.prev_events[0]; let prev_event_sstatehash = services() .rooms - .pdu_shortstatehash(prev_event) - .map_err(|_| "Failed talking to db".to_owned())?; + .state_accessor + .pdu_shortstatehash(prev_event)?; let state = if let Some(shortstatehash) = prev_event_sstatehash { - Some(services().rooms.state_full_ids(shortstatehash).await) + Some(services().rooms.state_accessor.state_full_ids(shortstatehash).await) } else { None }; @@ -451,18 +449,19 @@ impl Service { info!("Using cached state"); let prev_pdu = services() .rooms + .timeline .get_pdu(prev_event) .ok() .flatten() .ok_or_else(|| { - "Could not find prev event, but we know the state.".to_owned() + Error::bad_database("Could not find prev event, but we know the state.") })?; if let Some(state_key) = &prev_pdu.state_key { let shortstatekey = services() .rooms - .get_or_create_shortstatekey(&prev_pdu.kind.to_string().into(), state_key) - .map_err(|_| "Failed to create shortstatekey.".to_owned())?; + .short + .get_or_create_shortstatekey(&prev_pdu.kind.to_string().into(), state_key)?; state.insert(shortstatekey, Arc::from(prev_event)); // Now it's the state after the pdu @@ -501,18 +500,18 @@ impl Service { for (sstatehash, prev_event) in extremity_sstatehashes { let mut leaf_state: BTreeMap<_, _> = services() .rooms + .state_accessor .state_full_ids(sstatehash) - .await - .map_err(|_| "Failed to ask db for room state.".to_owned())?; + .await?; if let Some(state_key) = &prev_event.state_key { let shortstatekey = services() .rooms + .short .get_or_create_shortstatekey( &prev_event.kind.to_string().into(), state_key, - ) - .map_err(|_| "Failed to create shortstatekey.".to_owned())?; + )?; leaf_state.insert(shortstatekey, Arc::from(&*prev_event.event_id)); // Now it's the state after the pdu } @@ -536,8 +535,7 @@ impl Service { .rooms .auth_chain .get_auth_chain(room_id, starting_events, services()) - .await - .map_err(|_| "Failed to load auth chain.".to_owned())? + .await? .collect(), ); @@ -563,16 +561,14 @@ impl Service { .map(|((event_type, state_key), event_id)| { let shortstatekey = services() .rooms + .short .get_or_create_shortstatekey( &event_type.to_string().into(), &state_key, - ) - .map_err(|_| { - "Failed to get_or_create_shortstatekey".to_owned() - })?; + )?; Ok((shortstatekey, event_id)) }) - .collect::>()?, + .collect::>()?, ), Err(e) => { warn!("State resolution on prev events failed, either an event could not be found or deserialization: {}", e); @@ -617,20 +613,19 @@ impl Service { let state_key = pdu .state_key .clone() - .ok_or_else(|| "Found non-state pdu in state events.".to_owned())?; + .ok_or_else(|| Error::bad_database("Found non-state pdu in state events."))?; let shortstatekey = services() .rooms - .get_or_create_shortstatekey(&pdu.kind.to_string().into(), &state_key) - .map_err(|_| "Failed to create shortstatekey.".to_owned())?; + .short + .get_or_create_shortstatekey(&pdu.kind.to_string().into(), &state_key)?; match state.entry(shortstatekey) { btree_map::Entry::Vacant(v) => { v.insert(Arc::from(&*pdu.event_id)); } btree_map::Entry::Occupied(_) => return Err( - "State event's type and state_key combination exists multiple times." - .to_owned(), + Error::bad_database("State event's type and state_key combination exists multiple times."), ), } } @@ -638,21 +633,21 @@ impl Service { // The original create event must still be in the state let create_shortstatekey = services() .rooms - .get_shortstatekey(&StateEventType::RoomCreate, "") - .map_err(|_| "Failed to talk to db.")? + .short + .get_shortstatekey(&StateEventType::RoomCreate, "")? .expect("Room exists"); if state.get(&create_shortstatekey).map(|id| id.as_ref()) != Some(&create_event.event_id) { - return Err("Incoming event refers to wrong create event.".to_owned()); + return Err(Error::bad_database("Incoming event refers to wrong create event.")); } state_at_incoming_event = Some(state); } Err(e) => { warn!("Fetching state for event failed: {}", e); - return Err("Fetching state for event failed".into()); + return Err(e); } }; } @@ -669,17 +664,18 @@ impl Service { |k, s| { services() .rooms + .short .get_shortstatekey(&k.to_string().into(), s) .ok() .flatten() .and_then(|shortstatekey| state_at_incoming_event.get(&shortstatekey)) - .and_then(|event_id| services().rooms.get_pdu(event_id).ok().flatten()) + .and_then(|event_id| services().rooms.timeline.get_pdu(event_id).ok().flatten()) }, ) .map_err(|_e| "Auth check failed.".to_owned())?; if !check_result { - return Err("Event has failed auth check with state at the event.".into()); + return Err(Error::bad_database("Event has failed auth check with state at the event.")); } info!("Auth check succeeded"); @@ -701,8 +697,8 @@ impl Service { info!("Calculating extremities"); let mut extremities = services() .rooms - .get_pdu_leaves(room_id) - .map_err(|_| "Failed to load room leaves".to_owned())?; + .state + .get_forward_extremities(room_id)?; // Remove any forward extremities that are referenced by this incoming event's prev_events for prev_event in &incoming_pdu.prev_events { @@ -721,10 +717,9 @@ impl Service { .map(|(shortstatekey, id)| { services() .rooms - .compress_state_event(*shortstatekey, id) - .map_err(|_| "Failed to compress_state_event".to_owned()) + .compress_state_event(*shortstatekey, id)? }) - .collect::>()?; + .collect::>()?; // 13. Check if the event passes auth based on the "current state" of the room, if not "soft fail" it info!("Starting soft fail auth check"); @@ -737,16 +732,14 @@ impl Service { &incoming_pdu.sender, incoming_pdu.state_key.as_deref(), &incoming_pdu.content, - ) - .map_err(|_| "Failed to get_auth_events.".to_owned())?; + )? let soft_fail = !state_res::event_auth::auth_check( &room_version, &incoming_pdu, None::, |k, s| auth_events.get(&(k.clone(), s.to_owned())), - ) - .map_err(|_e| "Auth check failed.".to_owned())?; + )?; if soft_fail { self.append_incoming_pdu( @@ -756,18 +749,13 @@ impl Service { state_ids_compressed, soft_fail, &state_lock, - ) - .map_err(|e| { - warn!("Failed to add pdu to db: {}", e); - "Failed to add pdu to db.".to_owned() - })?; + )?; // Soft fail, we keep the event as an outlier but don't add it to the timeline warn!("Event was soft failed: {:?}", incoming_pdu); services() .rooms - .mark_event_soft_failed(&incoming_pdu.event_id) - .map_err(|_| "Failed to set soft failed flag".to_owned())?; + .mark_event_soft_failed(&incoming_pdu.event_id)?; return Err("Event has been soft failed".into()); } @@ -775,15 +763,15 @@ impl Service { info!("Loading current room state ids"); let current_sstatehash = services() .rooms - .current_shortstatehash(room_id) - .map_err(|_| "Failed to load current state hash.".to_owned())? + .state + .get_room_shortstatehash(room_id)? .expect("every room has state"); let current_state_ids = services() .rooms + .state_accessor .state_full_ids(current_sstatehash) - .await - .map_err(|_| "Failed to load room state.")?; + .await?; info!("Preparing for stateres to derive new room state"); let mut extremity_sstatehashes = HashMap::new(); @@ -792,14 +780,14 @@ impl Service { for id in dbg!(&extremities) { match services() .rooms - .get_pdu(id) - .map_err(|_| "Failed to ask db for pdu.".to_owned())? + .timeline + .get_pdu(id)? { Some(leaf_pdu) => { extremity_sstatehashes.insert( services() - .pdu_shortstatehash(&leaf_pdu.event_id) - .map_err(|_| "Failed to ask db for pdu state hash.".to_owned())? + .rooms.state_accessor + .pdu_shortstatehash(&leaf_pdu.event_id)? .ok_or_else(|| { error!( "Found extremity pdu with no statehash in db: {:?}", @@ -832,8 +820,8 @@ impl Service { if let Some(state_key) = &incoming_pdu.state_key { let shortstatekey = services() .rooms - .get_or_create_shortstatekey(&incoming_pdu.kind.to_string().into(), state_key) - .map_err(|_| "Failed to create shortstatekey.".to_owned())?; + .short + .get_or_create_shortstatekey(&incoming_pdu.kind.to_string().into(), state_key)? state_after.insert(shortstatekey, Arc::from(&*incoming_pdu.event_id)); } @@ -852,10 +840,9 @@ impl Service { .map(|(k, id)| { services() .rooms - .compress_state_event(*k, id) - .map_err(|_| "Failed to compress_state_event.".to_owned()) + .compress_state_event(*k, id)? }) - .collect::>()? + .collect::>()? } else { info!("Loading auth chains"); // We do need to force an update to this room's state @@ -871,8 +858,7 @@ impl Service { room_id, state.iter().map(|(_, id)| id.clone()).collect(), ) - .await - .map_err(|_| "Failed to load auth chain.".to_owned())? + .await? .collect(), ); } @@ -886,11 +872,10 @@ impl Service { .filter_map(|(k, id)| { services() .rooms - .get_statekey_from_short(k) + .get_statekey_from_short(k)? // FIXME: Undo .to_string().into() when StateMap // is updated to use StateEventType .map(|(ty, st_key)| ((ty.to_string().into(), st_key), id)) - .map_err(|e| warn!("Failed to get_statekey_from_short: {}", e)) .ok() }) .collect::>() @@ -927,14 +912,13 @@ impl Service { .map(|((event_type, state_key), event_id)| { let shortstatekey = services() .rooms - .get_or_create_shortstatekey(&event_type.to_string().into(), &state_key) - .map_err(|_| "Failed to get_or_create_shortstatekey".to_owned())?; + .short + .get_or_create_shortstatekey(&event_type.to_string().into(), &state_key)?; services() .rooms .compress_state_event(shortstatekey, &event_id) - .map_err(|_| "Failed to compress state event".to_owned()) }) - .collect::>()? + .collect::>()? }; // Set the new room state to the resolved state @@ -942,8 +926,7 @@ impl Service { info!("Forcing new room state"); services() .rooms - .force_state(room_id, new_room_state) - .map_err(|_| "Failed to set new room state.".to_owned())?; + .force_state(room_id, new_room_state)?; } } @@ -962,11 +945,7 @@ impl Service { state_ids_compressed, soft_fail, &state_lock, - ) - .map_err(|e| { - warn!("Failed to add pdu to db: {}", e); - "Failed to add pdu to db.".to_owned() - })?; + )?; info!("Appended incoming pdu"); @@ -1227,9 +1206,279 @@ impl Service { .map_or_else(|| uint!(0), |info| info.0.origin_server_ts), ), )) - }) - .map_err(|_| "Error sorting prev events".to_owned())?; + })?; (sorted, eventid_info) } + + #[tracing::instrument(skip_all)] + pub(crate) async fn fetch_required_signing_keys( + &self, + event: &BTreeMap, + pub_key_map: &RwLock>>, + ) -> Result<()> { + let signatures = event + .get("signatures") + .ok_or(Error::BadServerResponse( + "No signatures in server response pdu.", + ))? + .as_object() + .ok_or(Error::BadServerResponse( + "Invalid signatures object in server response pdu.", + ))?; + + // We go through all the signatures we see on the value and fetch the corresponding signing + // keys + for (signature_server, signature) in signatures { + let signature_object = signature.as_object().ok_or(Error::BadServerResponse( + "Invalid signatures content object in server response pdu.", + ))?; + + let signature_ids = signature_object.keys().cloned().collect::>(); + + let fetch_res = fetch_signing_keys( + signature_server.as_str().try_into().map_err(|_| { + Error::BadServerResponse("Invalid servername in signatures of server response pdu.") + })?, + signature_ids, + ) + .await; + + let keys = match fetch_res { + Ok(keys) => keys, + Err(_) => { + warn!("Signature verification failed: Could not fetch signing key.",); + continue; + } + }; + + pub_key_map + .write() + .map_err(|_| Error::bad_database("RwLock is poisoned."))? + .insert(signature_server.clone(), keys); + } + + Ok(()) + } + + // Gets a list of servers for which we don't have the signing key yet. We go over + // the PDUs and either cache the key or add it to the list that needs to be retrieved. + fn get_server_keys_from_cache( + &self, + pdu: &RawJsonValue, + servers: &mut BTreeMap, BTreeMap, QueryCriteria>>, + room_version: &RoomVersionId, + pub_key_map: &mut RwLockWriteGuard<'_, BTreeMap>>, + ) -> Result<()> { + let value: CanonicalJsonObject = serde_json::from_str(pdu.get()).map_err(|e| { + error!("Invalid PDU in server response: {:?}: {:?}", pdu, e); + Error::BadServerResponse("Invalid PDU in server response") + })?; + + let event_id = format!( + "${}", + ruma::signatures::reference_hash(&value, room_version) + .expect("ruma can calculate reference hashes") + ); + let event_id = <&EventId>::try_from(event_id.as_str()) + .expect("ruma's reference hashes are valid event ids"); + + if let Some((time, tries)) = services() + .globals + .bad_event_ratelimiter + .read() + .unwrap() + .get(event_id) + { + // Exponential backoff + let mut min_elapsed_duration = Duration::from_secs(30) * (*tries) * (*tries); + if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) { + min_elapsed_duration = Duration::from_secs(60 * 60 * 24); + } + + if time.elapsed() < min_elapsed_duration { + debug!("Backing off from {}", event_id); + return Err(Error::BadServerResponse("bad event, still backing off")); + } + } + + let signatures = value + .get("signatures") + .ok_or(Error::BadServerResponse( + "No signatures in server response pdu.", + ))? + .as_object() + .ok_or(Error::BadServerResponse( + "Invalid signatures object in server response pdu.", + ))?; + + for (signature_server, signature) in signatures { + let signature_object = signature.as_object().ok_or(Error::BadServerResponse( + "Invalid signatures content object in server response pdu.", + ))?; + + let signature_ids = signature_object.keys().cloned().collect::>(); + + let contains_all_ids = + |keys: &BTreeMap| signature_ids.iter().all(|id| keys.contains_key(id)); + + let origin = <&ServerName>::try_from(signature_server.as_str()).map_err(|_| { + Error::BadServerResponse("Invalid servername in signatures of server response pdu.") + })?; + + if servers.contains_key(origin) || pub_key_map.contains_key(origin.as_str()) { + continue; + } + + trace!("Loading signing keys for {}", origin); + + let result: BTreeMap<_, _> = services() + .globals + .signing_keys_for(origin)? + .into_iter() + .map(|(k, v)| (k.to_string(), v.key)) + .collect(); + + if !contains_all_ids(&result) { + trace!("Signing key not loaded for {}", origin); + servers.insert(origin.to_owned(), BTreeMap::new()); + } + + pub_key_map.insert(origin.to_string(), result); + } + + Ok(()) + } + + pub(crate) async fn fetch_join_signing_keys( + &self, + event: &create_join_event::v2::Response, + room_version: &RoomVersionId, + pub_key_map: &RwLock>>, + ) -> Result<()> { + let mut servers: BTreeMap, BTreeMap, QueryCriteria>> = + BTreeMap::new(); + + { + let mut pkm = pub_key_map + .write() + .map_err(|_| Error::bad_database("RwLock is poisoned."))?; + + // Try to fetch keys, failure is okay + // Servers we couldn't find in the cache will be added to `servers` + for pdu in &event.room_state.state { + let _ = self.get_server_keys_from_cache(pdu, &mut servers, room_version, &mut pkm); + } + for pdu in &event.room_state.auth_chain { + let _ = self.get_server_keys_from_cache(pdu, &mut servers, room_version, &mut pkm); + } + + drop(pkm); + } + + if servers.is_empty() { + // We had all keys locally + return Ok(()); + } + + for server in services().globals.trusted_servers() { + trace!("Asking batch signing keys from trusted server {}", server); + if let Ok(keys) = services() + .sending + .send_federation_request( + server, + get_remote_server_keys_batch::v2::Request { + server_keys: servers.clone(), + }, + ) + .await + { + trace!("Got signing keys: {:?}", keys); + let mut pkm = pub_key_map + .write() + .map_err(|_| Error::bad_database("RwLock is poisoned."))?; + for k in keys.server_keys { + let k = k.deserialize().unwrap(); + + // TODO: Check signature from trusted server? + servers.remove(&k.server_name); + + let result = services() + .globals + .add_signing_key(&k.server_name, k.clone())? + .into_iter() + .map(|(k, v)| (k.to_string(), v.key)) + .collect::>(); + + pkm.insert(k.server_name.to_string(), result); + } + } + + if servers.is_empty() { + return Ok(()); + } + } + + let mut futures: FuturesUnordered<_> = servers + .into_iter() + .map(|(server, _)| async move { + ( + services().sending + .send_federation_request( + &server, + get_server_keys::v2::Request::new(), + ) + .await, + server, + ) + }) + .collect(); + + while let Some(result) = futures.next().await { + if let (Ok(get_keys_response), origin) = result { + let result: BTreeMap<_, _> = services() + .globals + .add_signing_key(&origin, get_keys_response.server_key.deserialize().unwrap())? + .into_iter() + .map(|(k, v)| (k.to_string(), v.key)) + .collect(); + + pub_key_map + .write() + .map_err(|_| Error::bad_database("RwLock is poisoned."))? + .insert(origin.to_string(), result); + } + } + + Ok(()) + } + + /// Returns Ok if the acl allows the server + pub fn acl_check(&self, server_name: &ServerName, room_id: &RoomId) -> Result<()> { + let acl_event = match services() + .rooms.state_accessor + .room_state_get(room_id, &StateEventType::RoomServerAcl, "")? + { + Some(acl) => acl, + None => return Ok(()), + }; + + let acl_event_content: RoomServerAclEventContent = + match serde_json::from_str(acl_event.content.get()) { + Ok(content) => content, + Err(_) => { + warn!("Invalid ACL event"); + return Ok(()); + } + }; + + if acl_event_content.is_allowed(server_name) { + Ok(()) + } else { + Err(Error::BadRequest( + ErrorKind::Forbidden, + "Server was denied by ACL", + )) + } + } } diff --git a/src/service/rooms/state/mod.rs b/src/service/rooms/state/mod.rs index e6b5ce20..a26ed46b 100644 --- a/src/service/rooms/state/mod.rs +++ b/src/service/rooms/state/mod.rs @@ -1,5 +1,5 @@ mod data; -use std::collections::HashSet; +use std::{collections::HashSet, sync::Arc}; pub use data::Data; use ruma::{RoomId, events::{room::{member::MembershipState, create::RoomCreateEventContent}, AnyStrippedStateEvent, StateEventType}, UserId, EventId, serde::Raw, RoomVersionId}; @@ -85,7 +85,7 @@ impl Service { event_id: &EventId, room_id: &RoomId, state_ids_compressed: HashSet, - ) -> Result<()> { + ) -> Result { let shorteventid = services().short.get_or_create_shorteventid(event_id)?; let previous_shortstatehash = self.db.get_room_shortstatehash(room_id)?; @@ -132,7 +132,7 @@ impl Service { self.db.set_event_state(&shorteventid.to_be_bytes(), &shortstatehash.to_be_bytes())?; - Ok(()) + Ok(shortstatehash) } /// Generates a new StateHash and associates it with the incoming event. @@ -279,4 +279,8 @@ impl Service { pub fn get_room_shortstatehash(&self, room_id: &RoomId) -> Result> { self.db.get_room_shortstatehash(room_id) } + + pub fn get_forward_extremities(&self, room_id: &RoomId) -> Result>> { + self.db.get_forward_extremities(room_id) + } } diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index 09f66ddf..7669b0b3 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -1,7 +1,7 @@ mod data; use std::borrow::Cow; use std::sync::Arc; -use std::{sync::MutexGuard, iter, collections::HashSet}; +use std::{iter, collections::HashSet}; use std::fmt::Debug; pub use data::Data; @@ -13,6 +13,7 @@ use ruma::state_res::RoomVersion; use ruma::{EventId, signatures::CanonicalJsonObject, push::{Action, Tweak}, events::{push_rules::PushRulesEvent, GlobalAccountDataEventType, RoomEventType, room::{member::MembershipState, create::RoomCreateEventContent}, StateEventType}, UserId, RoomAliasId, RoomId, uint, state_res, api::client::error::ErrorKind, serde::to_canonical_value, ServerName}; use serde::Deserialize; use serde_json::value::to_raw_value; +use tokio::sync::MutexGuard; use tracing::{warn, error}; use crate::{services, Result, service::pdu::{PduBuilder, EventHash}, Error, PduEvent, utils}; @@ -460,7 +461,7 @@ impl Service { sender: &UserId, room_id: &RoomId, _mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex - ) -> (PduEvent, CanonicalJsonObject) { + ) -> Result<(PduEvent, CanonicalJsonObject)> { let PduBuilder { event_type, content, @@ -471,7 +472,8 @@ impl Service { let prev_events: Vec<_> = services() .rooms - .get_pdu_leaves(room_id)? + .state + .get_forward_extremities(room_id)? .into_iter() .take(20) .collect(); @@ -622,6 +624,8 @@ impl Service { // Generate short event id let _shorteventid = self.get_or_create_shorteventid(&pdu.event_id)?; + + Ok((pdu, pdu_json)) } /// Creates a new persisted data unit and adds it to a room. This function takes a @@ -634,7 +638,7 @@ impl Service { room_id: &RoomId, state_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex ) -> Result> { - let (pdu, pdu_json) = self.create_hash_and_sign_event(pdu_builder, sender, room_id, &state_lock); + let (pdu, pdu_json) = self.create_hash_and_sign_event(pdu_builder, sender, room_id, &state_lock)?; // We append to state before appending the pdu, so we don't have a moment in time with the // pdu without it's state. This is okay because append_pdu can't fail.