diff --git a/Cargo.lock b/Cargo.lock index 505c71c9..dd581661 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -378,7 +378,7 @@ checksum = "3d7b894f5411737b7867f4827955924d7c254fc9f4d91a6aad6b097804b1018b" [[package]] name = "conduit" -version = "0.6.0-alpha" +version = "0.6.0-alpha.experinyantal" dependencies = [ "async-trait", "axum", diff --git a/Cargo.toml b/Cargo.toml index 019f9ce1..df6d94ad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ authors = ["timokoesters "] homepage = "https://conduit.rs" repository = "https://gitlab.com/famedly/conduit" readme = "README.md" -version = "0.6.0-alpha" +version = "0.6.0-alpha.experinyantal" rust-version = "1.64" edition = "2021" diff --git a/conduit-example.toml b/conduit-example.toml index 0549030e..fee31020 100644 --- a/conduit-example.toml +++ b/conduit-example.toml @@ -38,8 +38,12 @@ max_request_size = 20_000_000 # in bytes # Enables registration. If set to false, no users can register on this server. allow_registration = true +# Enables federation. If set to false, this server will not federate with others (rooms from other server will not be available). allow_federation = true +# Enables presence. If set to false, the presence of users (whether they are online, idle or offline) will not be shown or processed. +allow_presence = true + # Enable the display name lightning bolt on registration. enable_lightning_bolt = true diff --git a/src/api/client_server/membership.rs b/src/api/client_server/membership.rs index 61c67cbc..10449b39 100644 --- a/src/api/client_server/membership.rs +++ b/src/api/client_server/membership.rs @@ -207,18 +207,22 @@ pub async fn kick_user_route( ); let state_lock = mutex_state.lock().await; - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomMember, - content: to_raw_value(&event).expect("event is valid, we just created it"), - unsigned: None, - state_key: Some(body.user_id.to_string()), - redacts: None, - }, - sender_user, - &body.room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomMember, + content: to_raw_value(&event).expect("event is valid, we just created it"), + unsigned: None, + state_key: Some(body.user_id.to_string()), + redacts: None, + }, + sender_user, + &body.room_id, + &state_lock, + ) + .await?; drop(state_lock); @@ -271,18 +275,22 @@ pub async fn ban_user_route(body: Ruma) -> Result return Ok(join_room_by_id::v3::Response::new(room_id.to_owned())), Err(e) => e, }; @@ -1269,28 +1290,32 @@ pub(crate) async fn invite_helper<'a>( ); let state_lock = mutex_state.lock().await; - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomMember, - content: to_raw_value(&RoomMemberEventContent { - membership: MembershipState::Invite, - displayname: services().users.displayname(user_id)?, - avatar_url: services().users.avatar_url(user_id)?, - is_direct: Some(is_direct), - third_party_invite: None, - blurhash: services().users.blurhash(user_id)?, - reason, - join_authorized_via_users_server: None, - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some(user_id.to_string()), - redacts: None, - }, - sender_user, - room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomMember, + content: to_raw_value(&RoomMemberEventContent { + membership: MembershipState::Invite, + displayname: services().users.displayname(user_id)?, + avatar_url: services().users.avatar_url(user_id)?, + is_direct: Some(is_direct), + third_party_invite: None, + blurhash: services().users.blurhash(user_id)?, + reason, + join_authorized_via_users_server: None, + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some(user_id.to_string()), + redacts: None, + }, + sender_user, + room_id, + &state_lock, + ) + .await?; drop(state_lock); @@ -1344,14 +1369,18 @@ pub async fn leave_room(user_id: &UserId, room_id: &RoomId, reason: Option { error!("Trying to leave a room you are not a member of."); - services().rooms.state_cache.update_membership( - room_id, - user_id, - MembershipState::Leave, - user_id, - None, - true, - )?; + services() + .rooms + .state_cache + .update_membership( + room_id, + user_id, + RoomMemberEventContent::new(MembershipState::Leave), + user_id, + None, + true, + ) + .await?; return Ok(()); } Some(e) => e, @@ -1394,18 +1427,22 @@ pub async fn leave_room(user_id: &UserId, room_id: &RoomId, reason: Option, ) -> Result { - if body.user_id.server_name() != services().globals.server_name() { + if (services().users.exists(&body.user_id)?) + && (body.user_id.server_name() != services().globals.server_name()) + { let response = services() .sending .send_federation_request( @@ -135,6 +137,18 @@ pub async fn get_displayname_route( ) .await?; + // Create and update our local copy of the user + let _ = services().users.create(&body.user_id, None); + let _ = services() + .users + .set_displayname(&body.user_id, response.displayname.clone()); + let _ = services() + .users + .set_avatar_url(&body.user_id, response.avatar_url); + let _ = services() + .users + .set_blurhash(&body.user_id, response.blurhash); + return Ok(get_display_name::v3::Response { displayname: response.displayname, }); @@ -218,12 +232,11 @@ pub async fn set_avatar_url_route( ); let state_lock = mutex_state.lock().await; - let _ = services().rooms.timeline.build_and_append_pdu( - pdu_builder, - sender_user, - &room_id, - &state_lock, - ); + let _ = services() + .rooms + .timeline + .build_and_append_pdu(pdu_builder, sender_user, &room_id, &state_lock) + .await; // Presence update services().rooms.edus.presence.update_presence( @@ -244,21 +257,24 @@ pub async fn set_avatar_url_route( }, sender: sender_user.clone(), }, + true, )?; } Ok(set_avatar_url::v3::Response {}) } -/// # `GET /_matrix/client/r0/profile/{userId}/avatar_url` +/// # `GET /_matrix/client/v3/profile/{userId}/avatar_url` /// /// Returns the avatar_url and blurhash of the user. /// -/// - If user is on another server: Fetches avatar_url and blurhash over federation +/// - If user is on another server and we do not have a copy, fetch over federation pub async fn get_avatar_url_route( body: Ruma, ) -> Result { - if body.user_id.server_name() != services().globals.server_name() { + if (services().users.exists(&body.user_id)?) + && (body.user_id.server_name() != services().globals.server_name()) + { let response = services() .sending .send_federation_request( @@ -270,6 +286,18 @@ pub async fn get_avatar_url_route( ) .await?; + // Create and update our local copy of the user + let _ = services().users.create(&body.user_id, None); + let _ = services() + .users + .set_displayname(&body.user_id, response.displayname); + let _ = services() + .users + .set_avatar_url(&body.user_id, response.avatar_url.clone()); + let _ = services() + .users + .set_blurhash(&body.user_id, response.blurhash.clone()); + return Ok(get_avatar_url::v3::Response { avatar_url: response.avatar_url, blurhash: response.blurhash, @@ -286,11 +314,13 @@ pub async fn get_avatar_url_route( /// /// Returns the displayname, avatar_url and blurhash of the user. /// -/// - If user is on another server: Fetches profile over federation +/// - If user is on another server and we do not have a copy, fetch over federation pub async fn get_profile_route( body: Ruma, ) -> Result { - if body.user_id.server_name() != services().globals.server_name() { + if (services().users.exists(&body.user_id)?) + && (body.user_id.server_name() != services().globals.server_name()) + { let response = services() .sending .send_federation_request( @@ -302,6 +332,18 @@ pub async fn get_profile_route( ) .await?; + // Create and update our local copy of the user + let _ = services().users.create(&body.user_id, None); + let _ = services() + .users + .set_displayname(&body.user_id, response.displayname.clone()); + let _ = services() + .users + .set_avatar_url(&body.user_id, response.avatar_url.clone()); + let _ = services() + .users + .set_blurhash(&body.user_id, response.blurhash.clone()); + return Ok(get_profile::v3::Response { displayname: response.displayname, avatar_url: response.avatar_url, diff --git a/src/api/client_server/read_marker.rs b/src/api/client_server/read_marker.rs index b12468a7..2744f6ac 100644 --- a/src/api/client_server/read_marker.rs +++ b/src/api/client_server/read_marker.rs @@ -34,51 +34,62 @@ pub async fn set_read_marker_route( )?; } - if body.private_read_receipt.is_some() || body.read_receipt.is_some() { - services() - .rooms - .user - .reset_notification_counts(sender_user, &body.room_id)?; - } - if let Some(event) = &body.private_read_receipt { + let _pdu = services() + .rooms + .timeline + .get_pdu(event)? + .ok_or(Error::BadRequest( + ErrorKind::InvalidParam, + "Event does not exist.", + ))?; + services().rooms.edus.read_receipt.private_read_set( &body.room_id, sender_user, - services() - .rooms - .timeline - .get_pdu_count(event)? - .ok_or(Error::BadRequest( - ErrorKind::InvalidParam, - "Event does not exist.", - ))?, + services().rooms.short.get_or_create_shorteventid(event)?, )?; } if let Some(event) = &body.read_receipt { - let mut user_receipts = BTreeMap::new(); - user_receipts.insert( - sender_user.clone(), - ruma::events::receipt::Receipt { - ts: Some(MilliSecondsSinceUnixEpoch::now()), - thread: ReceiptThread::Unthreaded, - }, - ); + let _pdu = services() + .rooms + .timeline + .get_pdu(event)? + .ok_or(Error::BadRequest( + ErrorKind::InvalidParam, + "Event does not exist.", + ))?; - let mut receipts = BTreeMap::new(); - receipts.insert(ReceiptType::Read, user_receipts); + if services().globals.allow_public_read_receipts() { + let mut user_receipts = BTreeMap::new(); + user_receipts.insert( + sender_user.clone(), + ruma::events::receipt::Receipt { + ts: Some(MilliSecondsSinceUnixEpoch::now()), + thread: ReceiptThread::Unthreaded, + }, + ); - let mut receipt_content = BTreeMap::new(); - receipt_content.insert(event.to_owned(), receipts); + let mut receipts = BTreeMap::new(); + receipts.insert(ReceiptType::Read, user_receipts); - services().rooms.edus.read_receipt.readreceipt_update( - sender_user, + let mut receipt_content = BTreeMap::new(); + receipt_content.insert(event.to_owned(), receipts); + + services().rooms.edus.read_receipt.readreceipt_update( + sender_user, + &body.room_id, + ruma::events::receipt::ReceiptEvent { + content: ruma::events::receipt::ReceiptEventContent(receipt_content), + room_id: body.room_id.clone(), + }, + )?; + }; + services().rooms.edus.read_receipt.private_read_set( &body.room_id, - ruma::events::receipt::ReceiptEvent { - content: ruma::events::receipt::ReceiptEventContent(receipt_content), - room_id: body.room_id.clone(), - }, + sender_user, + services().rooms.short.get_or_create_shorteventid(event)?, )?; } @@ -93,16 +104,6 @@ pub async fn create_receipt_route( ) -> Result { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); - if matches!( - &body.receipt_type, - create_receipt::v3::ReceiptType::Read | create_receipt::v3::ReceiptType::ReadPrivate - ) { - services() - .rooms - .user - .reset_notification_counts(sender_user, &body.room_id)?; - } - match body.receipt_type { create_receipt::v3::ReceiptType::FullyRead => { let fully_read_event = ruma::events::fully_read::FullyReadEvent { @@ -118,41 +119,67 @@ pub async fn create_receipt_route( )?; } create_receipt::v3::ReceiptType::Read => { - let mut user_receipts = BTreeMap::new(); - user_receipts.insert( - sender_user.clone(), - ruma::events::receipt::Receipt { - ts: Some(MilliSecondsSinceUnixEpoch::now()), - thread: ReceiptThread::Unthreaded, - }, - ); - let mut receipts = BTreeMap::new(); - receipts.insert(ReceiptType::Read, user_receipts); + let _pdu = + services() + .rooms + .timeline + .get_pdu(&body.event_id)? + .ok_or(Error::BadRequest( + ErrorKind::InvalidParam, + "Event does not exist.", + ))?; - let mut receipt_content = BTreeMap::new(); - receipt_content.insert(body.event_id.to_owned(), receipts); + if services().globals.allow_public_read_receipts() { + let mut user_receipts = BTreeMap::new(); + user_receipts.insert( + sender_user.clone(), + ruma::events::receipt::Receipt { + ts: Some(MilliSecondsSinceUnixEpoch::now()), + thread: ReceiptThread::Unthreaded, + }, + ); + let mut receipts = BTreeMap::new(); + receipts.insert(ReceiptType::Read, user_receipts); - services().rooms.edus.read_receipt.readreceipt_update( - sender_user, - &body.room_id, - ruma::events::receipt::ReceiptEvent { - content: ruma::events::receipt::ReceiptEventContent(receipt_content), - room_id: body.room_id.clone(), - }, - )?; - } - create_receipt::v3::ReceiptType::ReadPrivate => { + let mut receipt_content = BTreeMap::new(); + receipt_content.insert(body.event_id.to_owned(), receipts); + + services().rooms.edus.read_receipt.readreceipt_update( + sender_user, + &body.room_id, + ruma::events::receipt::ReceiptEvent { + content: ruma::events::receipt::ReceiptEventContent(receipt_content), + room_id: body.room_id.clone(), + }, + )?; + }; services().rooms.edus.read_receipt.private_read_set( &body.room_id, sender_user, + services() + .rooms + .short + .get_or_create_shorteventid(&body.event_id)?, + )?; + } + create_receipt::v3::ReceiptType::ReadPrivate => { + let _pdu = services() .rooms .timeline - .get_pdu_count(&body.event_id)? + .get_pdu(&body.event_id)? .ok_or(Error::BadRequest( ErrorKind::InvalidParam, "Event does not exist.", - ))?, + ))?; + + services().rooms.edus.read_receipt.private_read_set( + &body.room_id, + sender_user, + services() + .rooms + .short + .get_or_create_shorteventid(&body.event_id)?, )?; } _ => return Err(Error::bad_database("Unsupported receipt type")), diff --git a/src/api/client_server/redact.rs b/src/api/client_server/redact.rs index a29a5610..88059d6b 100644 --- a/src/api/client_server/redact.rs +++ b/src/api/client_server/redact.rs @@ -30,21 +30,25 @@ pub async fn redact_event_route( ); let state_lock = mutex_state.lock().await; - let event_id = services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomRedaction, - content: to_raw_value(&RoomRedactionEventContent { - reason: body.reason.clone(), - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: None, - redacts: Some(body.event_id.into()), - }, - sender_user, - &body.room_id, - &state_lock, - )?; + let event_id = services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomRedaction, + content: to_raw_value(&RoomRedactionEventContent { + reason: body.reason.clone(), + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: None, + redacts: Some(body.event_id.into()), + }, + sender_user, + &body.room_id, + &state_lock, + ) + .await?; drop(state_lock); diff --git a/src/api/client_server/room.rs b/src/api/client_server/room.rs index 830e0858..2da8d8d6 100644 --- a/src/api/client_server/room.rs +++ b/src/api/client_server/room.rs @@ -173,42 +173,50 @@ pub async fn create_room_route( } // 1. The room create event - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomCreate, - content: to_raw_value(&content).expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - sender_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomCreate, + content: to_raw_value(&content).expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + sender_user, + &room_id, + &state_lock, + ) + .await?; // 2. Let the room creator join - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomMember, - content: to_raw_value(&RoomMemberEventContent { - membership: MembershipState::Join, - displayname: services().users.displayname(sender_user)?, - avatar_url: services().users.avatar_url(sender_user)?, - is_direct: Some(body.is_direct), - third_party_invite: None, - blurhash: services().users.blurhash(sender_user)?, - reason: None, - join_authorized_via_users_server: None, - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some(sender_user.to_string()), - redacts: None, - }, - sender_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomMember, + content: to_raw_value(&RoomMemberEventContent { + membership: MembershipState::Join, + displayname: services().users.displayname(sender_user)?, + avatar_url: services().users.avatar_url(sender_user)?, + is_direct: Some(body.is_direct), + third_party_invite: None, + blurhash: services().users.blurhash(sender_user)?, + reason: None, + join_authorized_via_users_server: None, + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some(sender_user.to_string()), + redacts: None, + }, + sender_user, + &room_id, + &state_lock, + ) + .await?; // 3. Power levels @@ -245,30 +253,14 @@ pub async fn create_room_route( } } - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomPowerLevels, - content: to_raw_value(&power_levels_content) - .expect("to_raw_value always works on serde_json::Value"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - sender_user, - &room_id, - &state_lock, - )?; - - // 4. Canonical room alias - if let Some(room_alias_id) = &alias { - services().rooms.timeline.build_and_append_pdu( + services() + .rooms + .timeline + .build_and_append_pdu( PduBuilder { - event_type: RoomEventType::RoomCanonicalAlias, - content: to_raw_value(&RoomCanonicalAliasEventContent { - alias: Some(room_alias_id.to_owned()), - alt_aliases: vec![], - }) - .expect("We checked that alias earlier, it must be fine"), + event_type: RoomEventType::RoomPowerLevels, + content: to_raw_value(&power_levels_content) + .expect("to_raw_value always works on serde_json::Value"), unsigned: None, state_key: Some("".to_owned()), redacts: None, @@ -276,64 +268,100 @@ pub async fn create_room_route( sender_user, &room_id, &state_lock, - )?; + ) + .await?; + + // 4. Canonical room alias + if let Some(room_alias_id) = &alias { + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomCanonicalAlias, + content: to_raw_value(&RoomCanonicalAliasEventContent { + alias: Some(room_alias_id.to_owned()), + alt_aliases: vec![], + }) + .expect("We checked that alias earlier, it must be fine"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + sender_user, + &room_id, + &state_lock, + ) + .await?; } // 5. Events set by preset // 5.1 Join Rules - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomJoinRules, - content: to_raw_value(&RoomJoinRulesEventContent::new(match preset { - RoomPreset::PublicChat => JoinRule::Public, - // according to spec "invite" is the default - _ => JoinRule::Invite, - })) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - sender_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomJoinRules, + content: to_raw_value(&RoomJoinRulesEventContent::new(match preset { + RoomPreset::PublicChat => JoinRule::Public, + // according to spec "invite" is the default + _ => JoinRule::Invite, + })) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + sender_user, + &room_id, + &state_lock, + ) + .await?; // 5.2 History Visibility - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomHistoryVisibility, - content: to_raw_value(&RoomHistoryVisibilityEventContent::new( - HistoryVisibility::Shared, - )) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - sender_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomHistoryVisibility, + content: to_raw_value(&RoomHistoryVisibilityEventContent::new( + HistoryVisibility::Shared, + )) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + sender_user, + &room_id, + &state_lock, + ) + .await?; // 5.3 Guest Access - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomGuestAccess, - content: to_raw_value(&RoomGuestAccessEventContent::new(match preset { - RoomPreset::PublicChat => GuestAccess::Forbidden, - _ => GuestAccess::CanJoin, - })) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - sender_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomGuestAccess, + content: to_raw_value(&RoomGuestAccessEventContent::new(match preset { + RoomPreset::PublicChat => GuestAccess::Forbidden, + _ => GuestAccess::CanJoin, + })) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + sender_user, + &room_id, + &state_lock, + ) + .await?; // 6. Events listed in initial_state for event in &body.initial_state { @@ -352,47 +380,54 @@ pub async fn create_room_route( continue; } - services().rooms.timeline.build_and_append_pdu( - pdu_builder, - sender_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu(pdu_builder, sender_user, &room_id, &state_lock) + .await?; } // 7. Events implied by name and topic if let Some(name) = &body.name { - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomName, - content: to_raw_value(&RoomNameEventContent::new(Some(name.clone()))) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - sender_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomName, + content: to_raw_value(&RoomNameEventContent::new(Some(name.clone()))) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + sender_user, + &room_id, + &state_lock, + ) + .await?; } if let Some(topic) = &body.topic { - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomTopic, - content: to_raw_value(&RoomTopicEventContent { - topic: topic.clone(), - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - sender_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomTopic, + content: to_raw_value(&RoomTopicEventContent { + topic: topic.clone(), + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + sender_user, + &room_id, + &state_lock, + ) + .await?; } // 8. Events implied by invite (and TODO: invite_3pid) @@ -523,22 +558,26 @@ pub async fn upgrade_room_route( // Send a m.room.tombstone event to the old room to indicate that it is not intended to be used any further // Fail if the sender does not have the required permissions - let tombstone_event_id = services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomTombstone, - content: to_raw_value(&RoomTombstoneEventContent { - body: "This room has been replaced".to_owned(), - replacement_room: replacement_room.clone(), - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - sender_user, - &body.room_id, - &state_lock, - )?; + let tombstone_event_id = services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomTombstone, + content: to_raw_value(&RoomTombstoneEventContent { + body: "This room has been replaced".to_owned(), + replacement_room: replacement_room.clone(), + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + sender_user, + &body.room_id, + &state_lock, + ) + .await?; // Change lock to replacement room drop(state_lock); @@ -605,43 +644,51 @@ pub async fn upgrade_room_route( )); } - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomCreate, - content: to_raw_value(&create_event_content) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - sender_user, - &replacement_room, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomCreate, + content: to_raw_value(&create_event_content) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + sender_user, + &replacement_room, + &state_lock, + ) + .await?; // Join the new room - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomMember, - content: to_raw_value(&RoomMemberEventContent { - membership: MembershipState::Join, - displayname: services().users.displayname(sender_user)?, - avatar_url: services().users.avatar_url(sender_user)?, - is_direct: None, - third_party_invite: None, - blurhash: services().users.blurhash(sender_user)?, - reason: None, - join_authorized_via_users_server: None, - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some(sender_user.to_string()), - redacts: None, - }, - sender_user, - &replacement_room, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomMember, + content: to_raw_value(&RoomMemberEventContent { + membership: MembershipState::Join, + displayname: services().users.displayname(sender_user)?, + avatar_url: services().users.avatar_url(sender_user)?, + is_direct: None, + third_party_invite: None, + blurhash: services().users.blurhash(sender_user)?, + reason: None, + join_authorized_via_users_server: None, + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some(sender_user.to_string()), + redacts: None, + }, + sender_user, + &replacement_room, + &state_lock, + ) + .await?; // Recommended transferable state events list from the specs let transferable_state_events = vec![ @@ -668,18 +715,22 @@ pub async fn upgrade_room_route( None => continue, // Skipping missing events. }; - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: event_type.to_string().into(), - content: event_content, - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - sender_user, - &replacement_room, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: event_type.to_string().into(), + content: event_content, + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + sender_user, + &replacement_room, + &state_lock, + ) + .await?; } // Moves any local aliases to the new room @@ -713,19 +764,23 @@ pub async fn upgrade_room_route( power_levels_event_content.invite = new_level; // Modify the power levels in the old room to prevent sending of events and inviting new users - let _ = services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomPowerLevels, - content: to_raw_value(&power_levels_event_content) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - sender_user, - &body.room_id, - &state_lock, - )?; + let _ = services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomPowerLevels, + content: to_raw_value(&power_levels_event_content) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + sender_user, + &body.room_id, + &state_lock, + ) + .await?; drop(state_lock); diff --git a/src/api/client_server/space.rs b/src/api/client_server/space.rs new file mode 100644 index 00000000..d21dda6a --- /dev/null +++ b/src/api/client_server/space.rs @@ -0,0 +1,290 @@ +use std::{collections::HashSet, sync::Arc}; + +use crate::{services, Error, PduEvent, Result, Ruma}; +use ruma::{ + api::client::{ + error::ErrorKind, + space::{get_hierarchy, SpaceHierarchyRoomsChunk, SpaceRoomJoinRule}, + }, + events::{ + room::{ + avatar::RoomAvatarEventContent, + canonical_alias::RoomCanonicalAliasEventContent, + create::RoomCreateEventContent, + guest_access::{GuestAccess, RoomGuestAccessEventContent}, + history_visibility::{HistoryVisibility, RoomHistoryVisibilityEventContent}, + join_rules::{JoinRule, RoomJoinRulesEventContent}, + name::RoomNameEventContent, + topic::RoomTopicEventContent, + }, + space::child::SpaceChildEventContent, + StateEventType, + }, + serde::Raw, + MilliSecondsSinceUnixEpoch, OwnedRoomId, RoomId, +}; +use serde_json::{self, json}; +use tracing::warn; + +use ruma::events::space::child::HierarchySpaceChildEvent; + +/// # `GET /_matrix/client/v1/rooms/{room_id}/hierarchy`` +/// +/// Paginates over the space tree in a depth-first manner to locate child rooms of a given space. +/// +/// - TODO: Use federation for unknown room. +/// +pub async fn get_hierarchy_route( + body: Ruma, +) -> Result { + let sender_user = body.sender_user.as_ref().expect("user is authenticated"); + + // Check if room is world readable + let is_world_readable = services() + .rooms + .state_accessor + .room_state_get(&body.room_id, &StateEventType::RoomHistoryVisibility, "")? + .map_or(Ok(false), |s| { + serde_json::from_str(s.content.get()) + .map(|c: RoomHistoryVisibilityEventContent| { + c.history_visibility == HistoryVisibility::WorldReadable + }) + .map_err(|_| { + Error::bad_database("Invalid room history visibility event in database.") + }) + }) + .unwrap_or(false); + + // Reject if user not in room and not world readable + if !services() + .rooms + .state_cache + .is_joined(sender_user, &body.room_id)? + && !is_world_readable + { + return Err(Error::BadRequest( + ErrorKind::Forbidden, + "You don't have permission to view this room.", + )); + } + + // from format is '{suggested_only}|{max_depth}|{skip}' + let (suggested_only, max_depth, start) = body + .from + .as_ref() + .map_or( + Some(( + body.suggested_only, + body.max_depth + .map_or(services().globals.hierarchy_max_depth(), |v| v.into()) + .min(services().globals.hierarchy_max_depth()), + 0, + )), + |from| { + let mut p = from.split('|'); + Some(( + p.next()?.trim().parse().ok()?, + p.next()? + .trim() + .parse::() + .ok()? + .min(services().globals.hierarchy_max_depth()), + p.next()?.trim().parse().ok()?, + )) + }, + ) + .ok_or(Error::BadRequest(ErrorKind::InvalidParam, "Invalid from"))?; + + let limit = body.limit.map_or(20u64, |v| v.into()) as usize; + let mut skip = start; + + // Set for avoid search in loop. + let mut room_set = HashSet::new(); + let mut rooms_chunk: Vec = vec![]; + let mut stack = vec![(0, body.room_id.clone())]; + + while let (Some((depth, room_id)), true) = (stack.pop(), rooms_chunk.len() < limit) { + let (childern, pdus): (Vec<_>, Vec<_>) = services() + .rooms + .state_accessor + .room_state_full(&room_id) + .await? + .into_iter() + .filter_map(|((e_type, key), pdu)| { + (e_type == StateEventType::SpaceChild && !room_set.contains(&room_id)) + .then_some((key, pdu)) + }) + .unzip(); + + if skip == 0 { + if rooms_chunk.len() < limit { + room_set.insert(room_id.clone()); + if let Ok(chunk) = get_room_chunk(room_id, suggested_only, pdus).await { + rooms_chunk.push(chunk) + }; + } + } else { + skip -= 1; + } + + if depth < max_depth { + childern.into_iter().rev().for_each(|key| { + stack.push((depth + 1, RoomId::parse(key).unwrap())); + }); + } + } + + Ok(get_hierarchy::v1::Response { + next_batch: (!stack.is_empty()).then_some(format!( + "{}|{}|{}", + suggested_only, + max_depth, + start + limit + )), + rooms: rooms_chunk, + }) +} + +async fn get_room_chunk( + room_id: OwnedRoomId, + suggested_only: bool, + pdus: Vec>, +) -> Result { + Ok(SpaceHierarchyRoomsChunk { + canonical_alias: services() + .rooms + .state_accessor + .room_state_get(&room_id, &StateEventType::RoomCanonicalAlias, "") + .ok() + .and_then(|s| { + serde_json::from_str(s?.content.get()) + .map(|c: RoomCanonicalAliasEventContent| c.alias) + .ok()? + }), + name: services() + .rooms + .state_accessor + .room_state_get(&room_id, &StateEventType::RoomName, "") + .ok() + .flatten() + .and_then(|s| { + serde_json::from_str(s.content.get()) + .map(|c: RoomNameEventContent| c.name) + .ok()? + }), + num_joined_members: services() + .rooms + .state_cache + .room_joined_count(&room_id)? + .unwrap_or_else(|| { + warn!("Room {} has no member count", &room_id); + 0 + }) + .try_into() + .expect("user count should not be that big"), + topic: services() + .rooms + .state_accessor + .room_state_get(&room_id, &StateEventType::RoomTopic, "") + .ok() + .and_then(|s| { + serde_json::from_str(s?.content.get()) + .ok() + .map(|c: RoomTopicEventContent| c.topic) + }), + world_readable: services() + .rooms + .state_accessor + .room_state_get(&room_id, &StateEventType::RoomHistoryVisibility, "")? + .map_or(Ok(false), |s| { + serde_json::from_str(s.content.get()) + .map(|c: RoomHistoryVisibilityEventContent| { + c.history_visibility == HistoryVisibility::WorldReadable + }) + .map_err(|_| { + Error::bad_database("Invalid room history visibility event in database.") + }) + })?, + guest_can_join: services() + .rooms + .state_accessor + .room_state_get(&room_id, &StateEventType::RoomGuestAccess, "")? + .map_or(Ok(false), |s| { + serde_json::from_str(s.content.get()) + .map(|c: RoomGuestAccessEventContent| c.guest_access == GuestAccess::CanJoin) + .map_err(|_| { + Error::bad_database("Invalid room guest access event in database.") + }) + })?, + avatar_url: services() + .rooms + .state_accessor + .room_state_get(&room_id, &StateEventType::RoomAvatar, "") + .ok() + .and_then(|s| { + serde_json::from_str(s?.content.get()) + .map(|c: RoomAvatarEventContent| c.url) + .ok()? + }), + join_rule: services() + .rooms + .state_accessor + .room_state_get(&room_id, &StateEventType::RoomJoinRules, "")? + .map(|s| { + serde_json::from_str(s.content.get()) + .map(|c: RoomJoinRulesEventContent| match c.join_rule { + JoinRule::Invite => SpaceRoomJoinRule::Invite, + JoinRule::Knock => SpaceRoomJoinRule::Knock, + JoinRule::KnockRestricted(_) => SpaceRoomJoinRule::KnockRestricted, + JoinRule::Private => SpaceRoomJoinRule::Private, + JoinRule::Public => SpaceRoomJoinRule::Public, + JoinRule::Restricted(_) => SpaceRoomJoinRule::Restricted, + JoinRule::_Custom(_) => SpaceRoomJoinRule::from(c.join_rule.as_str()), + }) + .map_err(|_| Error::bad_database("Invalid room join rules event in database.")) + }) + .ok_or_else(|| Error::bad_database("Invalid room join rules event in database."))??, + room_type: services() + .rooms + .state_accessor + .room_state_get(&room_id, &StateEventType::RoomCreate, "") + .map(|s| { + serde_json::from_str(s?.content.get()) + .map(|c: RoomCreateEventContent| c.room_type) + .ok()? + }) + .ok() + .flatten(), + children_state: pdus + .into_iter() + .flat_map(|pdu| { + Some(HierarchySpaceChildEvent { + // Ignore unsuggested rooms if suggested_only is set + content: serde_json::from_str(pdu.content.get()).ok().filter( + |pdu: &SpaceChildEventContent| { + !suggested_only || pdu.suggested.unwrap_or(false) + }, + )?, + sender: pdu.sender.clone(), + state_key: pdu.state_key.clone()?, + origin_server_ts: MilliSecondsSinceUnixEpoch(pdu.origin_server_ts), + }) + }) + .filter_map(|hsce| { + Raw::::from_json_string( + json!( + { + "content": &hsce.content, + "sender": &hsce.sender, + "state_key": &hsce.state_key, + "origin_server_ts": &hsce.origin_server_ts + } + ) + .to_string(), + ) + .ok() + }) + .collect::>(), + room_id, + }) +} diff --git a/src/api/client_server/state.rs b/src/api/client_server/state.rs index d9c14648..12af5199 100644 --- a/src/api/client_server/state.rs +++ b/src/api/client_server/state.rs @@ -287,18 +287,22 @@ async fn send_state_event_for_key_helper( ); let state_lock = mutex_state.lock().await; - let event_id = services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: event_type.to_string().into(), - content: serde_json::from_str(json.json().get()).expect("content is valid json"), - unsigned: None, - state_key: Some(state_key), - redacts: None, - }, - sender_user, - room_id, - &state_lock, - )?; + let event_id = services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: event_type.to_string().into(), + content: serde_json::from_str(json.json().get()).expect("content is valid json"), + unsigned: None, + state_key: Some(state_key), + redacts: None, + }, + sender_user, + room_id, + &state_lock, + ) + .await?; Ok(event_id) } diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs index 568a23ce..599f4f1b 100644 --- a/src/api/client_server/sync.rs +++ b/src/api/client_server/sync.rs @@ -6,6 +6,7 @@ use ruma::{ uiaa::UiaaResponse, }, events::{ + receipt::{ReceiptThread, ReceiptType}, room::member::{MembershipState, RoomMemberEventContent}, RoomEventType, StateEventType, }, @@ -166,7 +167,11 @@ async fn sync_helper( }; // TODO: match body.set_presence { - services().rooms.edus.presence.ping_presence(&sender_user)?; + services() + .rooms + .edus + .presence + .ping_presence(&sender_user, false, true, true)?; // Setup watchers, so if there's no response, we can wait for them let watcher = services().globals.watch(&sender_user, &sender_device); @@ -231,7 +236,7 @@ async fn sync_helper( .entry(room_id.clone()) .or_default(), ); - let insert_lock = mutex_insert.lock().unwrap(); + let insert_lock = mutex_insert.lock().await; drop(insert_lock); } @@ -731,6 +736,50 @@ async fn sync_helper( .map(|(_, _, v)| v) .collect(); + if services() + .rooms + .edus + .read_receipt + .last_privateread_update(&sender_user, &room_id) + .unwrap_or(0) + > since + { + if let Ok(event_id) = services().rooms.short.get_eventid_from_short( + services() + .rooms + .edus + .read_receipt + .private_read_get(&room_id, &sender_user) + .expect("User did not have a valid private read receipt?") + .expect("User had a last read private receipt update but no receipt?"), + ) { + let mut user_receipts = BTreeMap::new(); + user_receipts.insert( + sender_user.clone(), + ruma::events::receipt::Receipt { + ts: None, + thread: ReceiptThread::Unthreaded, + }, + ); + + let mut receipts = BTreeMap::new(); + receipts.insert(ReceiptType::ReadPrivate, user_receipts); + + let mut receipt_content = BTreeMap::new(); + receipt_content.insert((*event_id).to_owned(), receipts); + + edus.push( + serde_json::from_str( + &serde_json::to_string(&ruma::events::SyncEphemeralRoomEvent { + content: ruma::events::receipt::ReceiptEventContent(receipt_content), + }) + .expect("Did not get valid JSON?"), + ) + .expect("JSON was somehow invalid despite just being created"), + ); + } + }; + if services().rooms.edus.typing.last_typing_update(&room_id)? > since { edus.push( serde_json::from_str( @@ -847,7 +896,7 @@ async fn sync_helper( .entry(room_id.clone()) .or_default(), ); - let insert_lock = mutex_insert.lock().unwrap(); + let insert_lock = mutex_insert.lock().await; drop(insert_lock); } @@ -979,7 +1028,7 @@ async fn sync_helper( .entry(room_id.clone()) .or_default(), ); - let insert_lock = mutex_insert.lock().unwrap(); + let insert_lock = mutex_insert.lock().await; drop(insert_lock); } diff --git a/src/api/client_server/unversioned.rs b/src/api/client_server/unversioned.rs index 526598b9..885eb677 100644 --- a/src/api/client_server/unversioned.rs +++ b/src/api/client_server/unversioned.rs @@ -24,7 +24,11 @@ pub async fn get_supported_versions_route( "v1.1".to_owned(), "v1.2".to_owned(), ], - unstable_features: BTreeMap::from_iter([("org.matrix.e2e_cross_signing".to_owned(), true)]), + unstable_features: BTreeMap::from_iter([ + ("org.matrix.e2e_cross_signing".to_owned(), true), + ("org.matrix.msc3827.stable".to_owned(), true), + ("org.matrix.msc2285.stable".to_owned(), true), + ]), }; Ok(resp) diff --git a/src/api/server_server.rs b/src/api/server_server.rs index fc3e2c0f..76a66c81 100644 --- a/src/api/server_server.rs +++ b/src/api/server_server.rs @@ -12,6 +12,7 @@ use ruma::{ client::error::{Error as RumaError, ErrorKind}, federation::{ authorization::get_event_authorization, + backfill::get_backfill, device::get_devices::{self, v1::UserDevice}, directory::{get_public_rooms, get_public_rooms_filtered}, discovery::{get_server_keys, get_server_version, ServerSigningKeys, VerifyKey}, @@ -33,6 +34,7 @@ use ruma::{ }, directory::{Filter, RoomNetwork}, events::{ + presence::{PresenceEvent, PresenceEventContent}, receipt::{ReceiptEvent, ReceiptEventContent, ReceiptType}, room::{ join_rules::{JoinRule, RoomJoinRulesEventContent}, @@ -43,11 +45,11 @@ use ruma::{ serde::{Base64, JsonObject, Raw}, to_device::DeviceIdOrAllDevices, CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, - OwnedRoomId, OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomId, ServerName, + OwnedRoomId, OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomId, ServerName, UInt, }; use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; use std::{ - collections::BTreeMap, + collections::{BTreeMap, HashSet, VecDeque}, fmt::Debug, mem, net::{IpAddr, SocketAddr}, @@ -744,45 +746,74 @@ pub async fn send_transaction_message_route( .filter_map(|edu| serde_json::from_str::(edu.json().get()).ok()) { match edu { - Edu::Presence(_) => {} + Edu::Presence(presence) => { + for presence_update in presence.push { + let user_id = presence_update.user_id; + for room_id in services() + .rooms + .state_cache + .rooms_joined(&user_id) + .filter_map(|room_id| room_id.ok()) + { + services().rooms.edus.presence.update_presence( + &user_id, + &room_id, + PresenceEvent { + content: PresenceEventContent { + avatar_url: services().users.avatar_url(&user_id)?, + currently_active: Some(presence_update.currently_active), + displayname: services().users.displayname(&user_id)?, + last_active_ago: Some(presence_update.last_active_ago), + presence: presence_update.presence.clone(), + status_msg: presence_update.status_msg.clone(), + }, + sender: user_id.clone(), + }, + true, + )?; + } + } + } Edu::Receipt(receipt) => { - for (room_id, room_updates) in receipt.receipts { - for (user_id, user_updates) in room_updates.read { - if let Some((event_id, _)) = user_updates - .event_ids - .iter() - .filter_map(|id| { + if services().globals.allow_receiving_read_receipts() { + for (room_id, room_updates) in receipt.receipts { + for (user_id, user_updates) in room_updates.read { + if let Some((event_id, _)) = user_updates + .event_ids + .iter() + .filter_map(|id| { + services() + .rooms + .timeline + .get_pdu_count(id) + .ok() + .flatten() + .map(|r| (id, r)) + }) + .max_by_key(|(_, count)| *count) + { + let mut user_receipts = BTreeMap::new(); + user_receipts.insert(user_id.clone(), user_updates.data); + + let mut receipts = BTreeMap::new(); + receipts.insert(ReceiptType::Read, user_receipts); + + let mut receipt_content = BTreeMap::new(); + receipt_content.insert(event_id.to_owned(), receipts); + + let event = ReceiptEvent { + content: ReceiptEventContent(receipt_content), + room_id: room_id.clone(), + }; services() .rooms - .timeline - .get_pdu_count(id) - .ok() - .flatten() - .map(|r| (id, r)) - }) - .max_by_key(|(_, count)| *count) - { - let mut user_receipts = BTreeMap::new(); - user_receipts.insert(user_id.clone(), user_updates.data); - - let mut receipts = BTreeMap::new(); - receipts.insert(ReceiptType::Read, user_receipts); - - let mut receipt_content = BTreeMap::new(); - receipt_content.insert(event_id.to_owned(), receipts); - - let event = ReceiptEvent { - content: ReceiptEventContent(receipt_content), - room_id: room_id.clone(), - }; - services() - .rooms - .edus - .read_receipt - .readreceipt_update(&user_id, &room_id, event)?; - } else { - // TODO fetch missing events - info!("No known event ids in read receipt: {:?}", user_updates); + .edus + .read_receipt + .readreceipt_update(&user_id, &room_id, event)?; + } else { + // TODO fetch missing events + info!("No known event ids in read receipt: {:?}", user_updates); + } } } } @@ -950,6 +981,58 @@ pub async fn get_event_route( }) } +/// # `GET /_matrix/federation/v1/backfill/` +/// +/// Retrieves events from before the sender joined the room, if the room's +/// history visibility allows. +pub async fn get_backfill_route( + body: Ruma, +) -> Result { + if !services().globals.allow_federation() { + return Err(Error::bad_config("Federation is disabled.")); + } + + let sender_servername = body + .sender_servername + .as_ref() + .expect("server is authenticated"); + + info!("Got backfill request from: {}", sender_servername); + + if !services() + .rooms + .state_cache + .server_in_room(sender_servername, &body.room_id)? + { + return Err(Error::BadRequest( + ErrorKind::Forbidden, + "Server is not in room.", + )); + } + + services() + .rooms + .event_handler + .acl_check(sender_servername, &body.room_id)?; + + let origin = services().globals.server_name().to_owned(); + let earliest_events = &[]; + + let events = get_missing_events( + sender_servername, + &body.room_id, + earliest_events, + &body.v, + body.limit, + )?; + + Ok(get_backfill::v1::Response { + origin, + origin_server_ts: MilliSecondsSinceUnixEpoch::now(), + pdus: events, + }) +} + /// # `POST /_matrix/federation/v1/get_missing_events/{roomId}` /// /// Retrieves events that the sender is missing. @@ -981,52 +1064,197 @@ pub async fn get_missing_events_route( .event_handler .acl_check(sender_servername, &body.room_id)?; - let mut queued_events = body.latest_events.clone(); - let mut events = Vec::new(); - - let mut i = 0; - while i < queued_events.len() && events.len() < u64::from(body.limit) as usize { - if let Some(pdu) = services().rooms.timeline.get_pdu_json(&queued_events[i])? { - let room_id_str = pdu - .get("room_id") - .and_then(|val| val.as_str()) - .ok_or_else(|| Error::bad_database("Invalid event in database"))?; - - let event_room_id = <&RoomId>::try_from(room_id_str) - .map_err(|_| Error::bad_database("Invalid room id field in event in database"))?; - - if event_room_id != body.room_id { - warn!( - "Evil event detected: Event {} found while searching in room {}", - queued_events[i], body.room_id - ); - return Err(Error::BadRequest( - ErrorKind::InvalidParam, - "Evil event detected", - )); - } - - if body.earliest_events.contains(&queued_events[i]) { - i += 1; - continue; - } - queued_events.extend_from_slice( - &serde_json::from_value::>( - serde_json::to_value(pdu.get("prev_events").cloned().ok_or_else(|| { - Error::bad_database("Event in db has no prev_events field.") - })?) - .expect("canonical json is valid json value"), - ) - .map_err(|_| Error::bad_database("Invalid prev_events content in pdu in db."))?, - ); - events.push(PduEvent::convert_to_outgoing_federation_event(pdu)); - } - i += 1; - } + let events = get_missing_events( + sender_servername, + &body.room_id, + &body.earliest_events, + &body.latest_events, + body.limit, + )?; Ok(get_missing_events::v1::Response { events }) } +/// Fetch events starting from `latest_events`, going backwards +/// through each event's `prev_events` until reaching the `earliest_events`. +/// +/// Used by the federation /backfill and /get_missing_events routes. +fn get_missing_events( + sender_servername: &ServerName, + room_id: &RoomId, + earliest_events: &[OwnedEventId], + latest_events: &[OwnedEventId], + limit: UInt, +) -> Result>> { + let (room_members, room_errors): (Vec<_>, Vec<_>) = services() + .rooms + .state_cache + .room_members(room_id) + .partition(Result::is_ok); + + // Just log errors and continue with correct users + if !room_errors.is_empty() { + warn!(?room_id, "Some errors occurred when fetching room members"); + } + + let current_server_members: Vec = room_members + .into_iter() + .map(Result::unwrap) + .filter(|member| member.server_name() == sender_servername) + .collect(); + + let event_filter = |event_id: &EventId| { + services() + .rooms + .state_accessor + .server_can_see_event( + sender_servername, + current_server_members.as_slice(), + event_id, + ) + .unwrap_or_default() + }; + + let pdu_filter = |pdu: &CanonicalJsonObject| { + let event_room_id = pdu + .get("room_id") + .and_then(|val| val.as_str()) + .and_then(|room_id_str| <&RoomId>::try_from(room_id_str).ok()); + + match event_room_id { + Some(event_room_id) => { + let valid_event = event_room_id == room_id; + if !valid_event { + error!(?room_id, ?event_room_id, "An evil event detected"); + } + valid_event + } + None => { + error!(?pdu, "Can't extract valid `room_id` from pdu"); + false + } + } + }; + + #[inline] + fn get_pdu(event: &EventId) -> Option { + services() + .rooms + .timeline + .get_pdu_json(event) + .unwrap_or_default() + } + + let events = linearize_previous_events( + latest_events.iter().cloned(), + earliest_events.iter().cloned(), + limit, + get_pdu, + event_filter, + pdu_filter, + ); + + Ok(events) +} + +/// Unwinds previous events by doing a breadth-first walk from given roots +/// +/// # Arguments +/// +/// * `roots`: Starting point to unwind event history +/// * `excluded`: Skipped events +/// * `limit`: How many events to extract +/// * `pdu_extractor`: Closure to extract PDU for given event_id, for example, from DB. +/// * `event_filter`: Closure to filter event by it's visiblity. It may or may not hit DB. +/// * `pdu_filter`: Closure to get basic validation against malformed PDUs. +/// +/// # Returns +/// +/// The previous events for given roots, without any `excluded` events, up to the provided `limit`. +/// +/// # Note +/// +/// In matrix specification, «Server-Server API», paragraph 8 there is no mention of previous events for excluded events. +/// Therefore, algorithm below excludes **only** events itself, but allows to process their history. +fn linearize_previous_events( + roots: E, + excluded: E, + limit: L, + pdu_extractor: P, + event_filter: F, + pdu_filter: V, +) -> Vec> +where + E: IntoIterator, + F: Fn(&EventId) -> bool, + L: Into, + V: Fn(&CanonicalJsonObject) -> bool, + P: Fn(&EventId) -> Option, +{ + let limit = limit.into() as usize; + assert!(limit > 0, "Limit should be > 0"); + + #[inline] + fn get_previous_events(pdu: &CanonicalJsonObject) -> Option> { + match pdu.get("prev_events") { + None => { + error!(?pdu, "A stored event has no 'prev_events' field"); + None + } + Some(prev_events) => { + let val = prev_events.clone().into(); + let events = serde_json::from_value::>(val); + if let Err(error) = events { + error!(?prev_events, ?error, "Broken 'prev_events' field"); + return None; + } + Some(events.unwrap_or_default()) + } + } + } + + let mut visited: HashSet = Default::default(); + let mut history: Vec> = Default::default(); + let mut queue: VecDeque = Default::default(); + let excluded: HashSet<_> = excluded.into_iter().collect(); + + // Add all roots into processing queue + for root in roots { + queue.push_back(root); + } + + while let Some(current_event) = queue.pop_front() { + // Return all collected events if reached limit + if history.len() >= limit { + return history; + } + + // Skip an entire branch containing incorrect events + if !event_filter(¤t_event) { + continue; + } + + // Process PDU from a current event if it exists and valid + if let Some(pdu) = pdu_extractor(¤t_event).filter(&pdu_filter) { + if !&excluded.contains(¤t_event) { + history.push(PduEvent::convert_to_outgoing_federation_event(pdu.clone())); + } + + // Fetch previous events, if they exists + if let Some(previous_events) = get_previous_events(&pdu) { + for previous_event in previous_events { + if !visited.contains(&previous_event) { + visited.insert(previous_event.clone()); + queue.push_back(previous_event); + } + } + } + } + } + // All done, return collected events + history +} + /// # `GET /_matrix/federation/v1/event_auth/{roomId}/{eventId}` /// /// Retrieves the auth chain for a given event. @@ -1615,14 +1843,18 @@ pub async fn create_invite_route( .state_cache .server_in_room(services().globals.server_name(), &body.room_id)? { - services().rooms.state_cache.update_membership( - &body.room_id, - &invited_user, - MembershipState::Invite, - &sender, - Some(invite_state), - true, - )?; + services() + .rooms + .state_cache + .update_membership( + &body.room_id, + &invited_user, + RoomMemberEventContent::new(MembershipState::Invite), + &sender, + Some(invite_state), + true, + ) + .await?; } Ok(create_invite::v2::Response { @@ -1712,6 +1944,13 @@ pub async fn get_profile_information_route( return Err(Error::bad_config("Federation is disabled.")); } + if body.user_id.server_name() != services().globals.server_name() { + return Err(Error::BadRequest( + ErrorKind::NotFound, + "User does not belong to this server", + )); + } + let mut displayname = None; let mut avatar_url = None; let mut blurhash = None; @@ -1779,7 +2018,11 @@ pub async fn claim_keys_route( #[cfg(test)] mod tests { - use super::{add_port_to_hostname, get_ip_with_port, FedDest}; + use super::{add_port_to_hostname, get_ip_with_port, linearize_previous_events, FedDest}; + use ruma::{CanonicalJsonObject, CanonicalJsonValue, OwnedEventId}; + use serde::{Deserialize, Serialize}; + use serde_json::{value::RawValue, Value}; + use std::collections::HashMap; #[test] fn ips_get_default_ports() { @@ -1820,4 +2063,227 @@ mod tests { FedDest::Named(String::from("example.com"), String::from(":1337")) ) } + + type PduStorage = HashMap; + + #[derive(Debug, Serialize, Deserialize)] + struct MockPDU { + content: i32, + prev_events: Vec, + } + + fn mock_event_id(id: &i32) -> OwnedEventId { + const DOMAIN: &str = "canterlot.eq"; + ::try_from(format!("${id}:{DOMAIN}")).unwrap() + } + + fn create_graph(data: Vec<(i32, Vec)>) -> PduStorage { + data.iter() + .map(|(head, tail)| { + let key = mock_event_id(head); + let pdu = MockPDU { + content: *head, + prev_events: tail.iter().map(mock_event_id).collect(), + }; + let value = serde_json::to_value(pdu).unwrap(); + let value: CanonicalJsonValue = value.try_into().unwrap(); + (key, value.as_object().unwrap().to_owned()) + }) + .collect() + } + + fn mock_full_graph() -> PduStorage { + /* + (1) + __________|___________ + / / \ \ + (2) (3) (10) (11) + / \ / \ | | + (4) (5) (6) (7) (12) (13) + | | | + (8) (9) (14) + \ / + (15) + | + (16) + */ + create_graph(vec![ + (1, vec![2, 3, 10, 11]), + (2, vec![4, 5]), + (3, vec![6, 7]), + (4, vec![]), + (5, vec![8]), + (6, vec![9]), + (7, vec![]), + (8, vec![15]), + (9, vec![15]), + (10, vec![12]), + (11, vec![13]), + (12, vec![]), + (13, vec![14]), + (14, vec![]), + (15, vec![16]), + (16, vec![16]), + ]) + } + + fn extract_events_payload(events: Vec>) -> Vec { + events + .iter() + .map(|e| serde_json::from_str(e.get()).unwrap()) + .map(|p: MockPDU| p.content) + .collect() + } + + #[test] + fn backfill_empty() { + let events = linearize_previous_events( + vec![], + vec![], + 16u64, + |_| unreachable!(), + |_| true, + |_| true, + ); + assert!(events.is_empty()); + } + #[test] + fn backfill_limit() { + /* + (5) → (4) → (3) → (2) → (1) → × + */ + let events = create_graph(vec![ + (1, vec![]), + (2, vec![1]), + (3, vec![2]), + (4, vec![3]), + (5, vec![4]), + ]); + let roots = vec![mock_event_id(&5)]; + let result = linearize_previous_events( + roots, + vec![], + 3u64, + |e| events.get(e).cloned(), + |_| true, + |_| true, + ); + + assert_eq!(extract_events_payload(result), vec![5, 4, 3]) + } + + #[test] + fn backfill_bfs() { + let events = mock_full_graph(); + let roots = vec![mock_event_id(&1)]; + let result = linearize_previous_events( + roots, + vec![], + 100u64, + |e| events.get(e).cloned(), + |_| true, + |_| true, + ); + assert_eq!( + extract_events_payload(result), + vec![1, 2, 3, 10, 11, 4, 5, 6, 7, 12, 13, 8, 9, 14, 15, 16] + ) + } + + #[test] + fn backfill_subgraph() { + let events = mock_full_graph(); + let roots = vec![mock_event_id(&3)]; + let result = linearize_previous_events( + roots, + vec![], + 100u64, + |e| events.get(e).cloned(), + |_| true, + |_| true, + ); + assert_eq!(extract_events_payload(result), vec![3, 6, 7, 9, 15, 16]) + } + + #[test] + fn backfill_two_roots() { + let events = mock_full_graph(); + let roots = vec![mock_event_id(&3), mock_event_id(&11)]; + let result = linearize_previous_events( + roots, + vec![], + 100u64, + |e| events.get(e).cloned(), + |_| true, + |_| true, + ); + assert_eq!( + extract_events_payload(result), + vec![3, 11, 6, 7, 13, 9, 14, 15, 16] + ) + } + + #[test] + fn backfill_exclude_events() { + let events = mock_full_graph(); + let roots = vec![mock_event_id(&1)]; + let excluded_events = vec![ + mock_event_id(&14), + mock_event_id(&15), + mock_event_id(&16), + mock_event_id(&3), + ]; + let result = linearize_previous_events( + roots, + excluded_events, + 100u64, + |e| events.get(e).cloned(), + |_| true, + |_| true, + ); + assert_eq!( + extract_events_payload(result), + vec![1, 2, 10, 11, 4, 5, 6, 7, 12, 13, 8, 9] + ) + } + + #[test] + fn backfill_exclude_branch_with_evil_event() { + let events = mock_full_graph(); + let roots = vec![mock_event_id(&1)]; + let result = linearize_previous_events( + roots, + vec![], + 100u64, + |e| events.get(e).cloned(), + |_| true, + |e| { + let value: Value = CanonicalJsonValue::Object(e.clone()).into(); + let pdu: MockPDU = serde_json::from_value(value).unwrap(); + pdu.content != 3 + }, + ); + assert_eq!( + extract_events_payload(result), + vec![1, 2, 10, 11, 4, 5, 12, 13, 8, 14, 15, 16] + ) + } + + #[test] + fn backfill_exclude_branch_with_inaccessible_event() { + let events = mock_full_graph(); + let roots = vec![mock_event_id(&1)]; + let result = linearize_previous_events( + roots, + vec![], + 100u64, + |e| events.get(e).cloned(), + |e| e != mock_event_id(&3), + |_| true, + ); + assert_eq!( + extract_events_payload(result), + vec![1, 2, 10, 11, 4, 5, 12, 13, 8, 14, 15, 16] + ) + } } diff --git a/src/config/mod.rs b/src/config/mod.rs index 31a586f2..1c8017c7 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -49,11 +49,17 @@ pub struct Config { #[serde(default = "false_fn")] pub allow_federation: bool, #[serde(default = "true_fn")] + pub allow_public_read_receipts: bool, + #[serde(default = "true_fn")] + pub allow_receiving_read_receipts: bool, + #[serde(default = "true_fn")] pub allow_room_creation: bool, #[serde(default = "true_fn")] pub allow_unstable_room_versions: bool, #[serde(default = "default_default_room_version")] pub default_room_version: RoomVersionId, + #[serde(default = "default_hierarchy_max_depth")] + pub hierarchy_max_depth: u64, #[serde(default = "false_fn")] pub allow_jaeger: bool, #[serde(default = "false_fn")] @@ -78,6 +84,19 @@ pub struct Config { pub emergency_password: Option, + #[serde(default = "true_fn")] + pub allow_presence: bool, + + #[serde(default = "default_presence_idle_timeout")] + pub presence_idle_timeout: u64, + #[serde(default = "default_presence_offline_timeout")] + pub presence_offline_timeout: u64, + + #[serde(default = "default_presence_cleanup_period")] + pub presence_cleanup_period: u64, + #[serde(default = "default_presence_cleanup_limit")] + pub presence_cleanup_limit: u64, + #[serde(flatten)] pub catchall: BTreeMap, } @@ -263,7 +282,27 @@ fn default_turn_ttl() -> u64 { 60 * 60 * 24 } +fn default_presence_idle_timeout() -> u64 { + 60 +} + +fn default_presence_offline_timeout() -> u64 { + 30 * 60 +} + +fn default_presence_cleanup_period() -> u64 { + 24 * 60 * 60 +} + +fn default_presence_cleanup_limit() -> u64 { + 24 * 60 * 60 +} + // I know, it's a great name pub fn default_default_room_version() -> RoomVersionId { RoomVersionId::V9 } + +fn default_hierarchy_max_depth() -> u64 { + 6 +} diff --git a/src/database/key_value/rooms/edus/presence.rs b/src/database/key_value/rooms/edus/presence.rs index 904b1c44..7732983a 100644 --- a/src/database/key_value/rooms/edus/presence.rs +++ b/src/database/key_value/rooms/edus/presence.rs @@ -1,10 +1,53 @@ -use std::collections::HashMap; +use futures_util::{stream::FuturesUnordered, StreamExt}; +use std::{ + collections::{hash_map::Entry, HashMap}, + mem, + time::Duration, +}; +use tracing::{error, info}; use ruma::{ events::presence::PresenceEvent, presence::PresenceState, OwnedUserId, RoomId, UInt, UserId, }; +use tokio::{sync::mpsc, time::sleep}; -use crate::{database::KeyValueDatabase, service, services, utils, Error, Result}; +use crate::{ + database::KeyValueDatabase, + service::{self, rooms::edus::presence::PresenceIter}, + services, utils, + utils::{millis_since_unix_epoch, u64_from_bytes}, + Error, Result, +}; + +pub struct PresenceUpdate { + count: u64, + prev_timestamp: u64, + curr_timestamp: u64, +} + +impl PresenceUpdate { + fn to_be_bytes(&self) -> Vec { + [ + self.count.to_be_bytes(), + self.prev_timestamp.to_be_bytes(), + self.curr_timestamp.to_be_bytes(), + ] + .concat() + } + + fn from_be_bytes(bytes: &[u8]) -> Result { + let (count_bytes, timestamps_bytes) = bytes.split_at(mem::size_of::()); + let (prev_timestamp_bytes, curr_timestamp_bytes) = + timestamps_bytes.split_at(mem::size_of::()); + Ok(Self { + count: u64_from_bytes(count_bytes).expect("count bytes from DB are valid"), + prev_timestamp: u64_from_bytes(prev_timestamp_bytes) + .expect("timestamp bytes from DB are valid"), + curr_timestamp: u64_from_bytes(curr_timestamp_bytes) + .expect("timestamp bytes from DB are valid"), + }) + } +} impl service::rooms::edus::presence::Data for KeyValueDatabase { fn update_presence( @@ -13,45 +56,82 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase { room_id: &RoomId, presence: PresenceEvent, ) -> Result<()> { - // TODO: Remove old entry? Or maybe just wipe completely from time to time? + let roomuser_id = [room_id.as_bytes(), &[0xff], user_id.as_bytes()].concat(); - let count = services().globals.next_count()?.to_be_bytes(); - - let mut presence_id = room_id.as_bytes().to_vec(); - presence_id.push(0xff); - presence_id.extend_from_slice(&count); - presence_id.push(0xff); - presence_id.extend_from_slice(presence.sender.as_bytes()); - - self.presenceid_presence.insert( - &presence_id, - &serde_json::to_vec(&presence).expect("PresenceEvent can be serialized"), + self.roomuserid_presenceevent.insert( + &roomuser_id, + &serde_json::to_vec(&presence).expect("presence event from DB is valid"), )?; - self.userid_lastpresenceupdate.insert( + let timestamp = match presence.content.last_active_ago { + Some(active_ago) => millis_since_unix_epoch().saturating_sub(active_ago.into()), + None => millis_since_unix_epoch(), + }; + + self.userid_presenceupdate.insert( user_id.as_bytes(), - &utils::millis_since_unix_epoch().to_be_bytes(), + &PresenceUpdate { + count: services().globals.next_count()?, + prev_timestamp: timestamp, + curr_timestamp: timestamp, + } + .to_be_bytes(), )?; Ok(()) } - fn ping_presence(&self, user_id: &UserId) -> Result<()> { - self.userid_lastpresenceupdate.insert( - user_id.as_bytes(), - &utils::millis_since_unix_epoch().to_be_bytes(), - )?; + fn ping_presence( + &self, + user_id: &UserId, + update_count: bool, + update_timestamp: bool, + ) -> Result<()> { + let now = millis_since_unix_epoch(); + + let presence = self + .userid_presenceupdate + .get(user_id.as_bytes())? + .map(|presence_bytes| PresenceUpdate::from_be_bytes(&presence_bytes)) + .transpose()?; + + let new_presence = match presence { + Some(presence) => PresenceUpdate { + count: if update_count { + services().globals.next_count()? + } else { + presence.count + }, + prev_timestamp: if update_timestamp { + presence.curr_timestamp + } else { + presence.prev_timestamp + }, + curr_timestamp: if update_timestamp { + now + } else { + presence.curr_timestamp + }, + }, + None => PresenceUpdate { + count: services().globals.current_count()?, + prev_timestamp: now, + curr_timestamp: now, + }, + }; + + self.userid_presenceupdate + .insert(user_id.as_bytes(), &new_presence.to_be_bytes())?; Ok(()) } - fn last_presence_update(&self, user_id: &UserId) -> Result> { - self.userid_lastpresenceupdate + fn last_presence_update(&self, user_id: &UserId) -> Result> { + self.userid_presenceupdate .get(user_id.as_bytes())? .map(|bytes| { - utils::u64_from_bytes(&bytes).map_err(|_| { - Error::bad_database("Invalid timestamp in userid_lastpresenceupdate.") - }) + PresenceUpdate::from_be_bytes(&bytes) + .map(|update| (update.prev_timestamp, update.curr_timestamp)) }) .transpose() } @@ -60,93 +140,268 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase { &self, room_id: &RoomId, user_id: &UserId, - count: u64, + presence_timestamp: u64, ) -> Result> { - let mut presence_id = room_id.as_bytes().to_vec(); - presence_id.push(0xff); - presence_id.extend_from_slice(&count.to_be_bytes()); - presence_id.push(0xff); - presence_id.extend_from_slice(user_id.as_bytes()); - - self.presenceid_presence - .get(&presence_id)? - .map(|value| parse_presence_event(&value)) + let roomuser_id = [room_id.as_bytes(), &[0xff], user_id.as_bytes()].concat(); + self.roomuserid_presenceevent + .get(&roomuser_id)? + .map(|value| parse_presence_event(&value, presence_timestamp)) .transpose() } - fn presence_since( - &self, - room_id: &RoomId, - since: u64, - ) -> Result> { - let mut prefix = room_id.as_bytes().to_vec(); - prefix.push(0xff); - - let mut first_possible_edu = prefix.clone(); - first_possible_edu.extend_from_slice(&(since + 1).to_be_bytes()); // +1 so we don't send the event at since - let mut hashmap = HashMap::new(); - - for (key, value) in self - .presenceid_presence - .iter_from(&first_possible_edu, false) - .take_while(|(key, _)| key.starts_with(&prefix)) - { - let user_id = UserId::parse( - utils::string_from_bytes( - key.rsplit(|&b| b == 0xff) - .next() - .expect("rsplit always returns an element"), + fn presence_since<'a>(&'a self, room_id: &RoomId, since: u64) -> Result> { + let user_timestamp: HashMap = self + .userid_presenceupdate + .iter() + .map(|(user_id_bytes, update_bytes)| { + ( + UserId::parse( + utils::string_from_bytes(&user_id_bytes) + .expect("UserID bytes are a valid string"), + ) + .expect("UserID bytes from database are a valid UserID"), + PresenceUpdate::from_be_bytes(&update_bytes) + .expect("PresenceUpdate bytes from database are a valid PresenceUpdate"), ) - .map_err(|_| Error::bad_database("Invalid UserId bytes in presenceid_presence."))?, - ) - .map_err(|_| Error::bad_database("Invalid UserId in presenceid_presence."))?; + }) + .filter_map(|(user_id, presence_update)| { + if presence_update.count <= since + || !services() + .rooms + .state_cache + .is_joined(&user_id, room_id) + .ok()? + { + return None; + } - let presence = parse_presence_event(&value)?; + Some((user_id, presence_update.curr_timestamp)) + }) + .collect(); - hashmap.insert(user_id, presence); - } + Ok(Box::new( + self.roomuserid_presenceevent + .scan_prefix(room_id.as_bytes().to_vec()) + .filter_map(move |(roomuserid_bytes, presence_bytes)| { + let user_id_bytes = roomuserid_bytes.split(|byte| *byte == 0xff).last()?; + let user_id: OwnedUserId = UserId::parse( + utils::string_from_bytes(user_id_bytes) + .expect("UserID bytes are a valid string"), + ) + .expect("UserID bytes from database are a valid UserID"); - Ok(hashmap) + let timestamp = user_timestamp.get(&user_id)?; + let presence_event = parse_presence_event(&presence_bytes, *timestamp) + .expect("PresenceEvent bytes from database are a valid PresenceEvent"); + + Some((user_id, presence_event)) + }), + )) } - /* - fn presence_maintain(&self, db: Arc>) { - // TODO @M0dEx: move this to a timed tasks module - tokio::spawn(async move { - loop { - select! { - Some(user_id) = self.presence_timers.next() { - // TODO @M0dEx: would it be better to acquire the lock outside the loop? - let guard = db.read().await; + fn presence_maintain( + &self, + mut timer_receiver: mpsc::UnboundedReceiver, + ) -> Result<()> { + let mut timers = FuturesUnordered::new(); + let mut timers_timestamp: HashMap = HashMap::new(); - // TODO @M0dEx: add self.presence_timers - // TODO @M0dEx: maintain presence + tokio::spawn(async move { + // Wait for services to be created + sleep(Duration::from_secs(15)).await; + + if !services().globals.allow_presence() { + return; + } + + let idle_timeout = Duration::from_secs(services().globals.presence_idle_timeout()); + let offline_timeout = + Duration::from_secs(services().globals.presence_offline_timeout()); + + // TODO: Get rid of this hack (hinting correct types to rustc) + timers.push(create_presence_timer( + idle_timeout, + UserId::parse_with_server_name("conduit", services().globals.server_name()) + .expect("Conduit user always exists"), + )); + + loop { + tokio::select! { + Some(user_id) = timers.next() => { + info!("Processing timer for user '{}' ({})", user_id.clone(), timers.len()); + let (prev_timestamp, curr_timestamp) = match services().rooms.edus.presence.last_presence_update(&user_id) { + Ok(timestamp_tuple) => match timestamp_tuple { + Some(timestamp_tuple) => timestamp_tuple, + None => continue, + }, + Err(e) => { + error!("{e}"); + continue; + } + }; + + let prev_presence_state = determine_presence_state(prev_timestamp); + let curr_presence_state = determine_presence_state(curr_timestamp); + + // Continue if there is no change in state + if prev_presence_state == curr_presence_state { + continue; + } + + match services().rooms.edus.presence.ping_presence(&user_id, true, false, false) { + Ok(_) => (), + Err(e) => error!("{e}") + } + + // TODO: Notify federation sender + } + Some(user_id) = timer_receiver.recv() => { + let now = millis_since_unix_epoch(); + // Do not create timers if we added timers recently + let should_send = match timers_timestamp.entry(user_id.to_owned()) { + Entry::Occupied(mut entry) => { + if now - entry.get() > 15 * 1000 { + entry.insert(now); + true + } else { + false + } + }, + Entry::Vacant(entry) => { + entry.insert(now); + true + } + }; + + if !should_send { + continue; + } + + // Idle timeout + timers.push(create_presence_timer(idle_timeout, user_id.clone())); + + // Offline timeout + timers.push(create_presence_timer(offline_timeout, user_id.clone())); + + info!("Added timers for user '{}' ({})", user_id, timers.len()); } } } }); + + Ok(()) + } + + fn presence_cleanup(&self) -> Result<()> { + let userid_presenceupdate = self.userid_presenceupdate.clone(); + let roomuserid_presenceevent = self.roomuserid_presenceevent.clone(); + + tokio::spawn(async move { + // Wait for services to be created + sleep(Duration::from_secs(15)).await; + + if !services().globals.allow_presence() { + return; + } + + let period = Duration::from_secs(services().globals.presence_cleanup_period()); + let age_limit = Duration::from_secs(services().globals.presence_cleanup_limit()); + + loop { + let mut removed_events: u64 = 0; + let age_limit_curr = + millis_since_unix_epoch().saturating_sub(age_limit.as_millis() as u64); + + for user_id in userid_presenceupdate + .iter() + .map(|(user_id_bytes, update_bytes)| { + ( + UserId::parse( + utils::string_from_bytes(&user_id_bytes) + .expect("UserID bytes are a valid string"), + ) + .expect("UserID bytes from database are a valid UserID"), + PresenceUpdate::from_be_bytes(&update_bytes).expect( + "PresenceUpdate bytes from database are a valid PresenceUpdate", + ), + ) + }) + .filter_map(|(user_id, presence_update)| { + if presence_update.curr_timestamp < age_limit_curr { + return None; + } + + Some(user_id) + }) + { + match userid_presenceupdate.remove(user_id.as_bytes()) { + Ok(_) => (), + Err(e) => { + error!("An errord occured while removing a stale presence update: {e}") + } + } + + for room_id in services() + .rooms + .state_cache + .rooms_joined(&user_id) + .filter_map(|room_id| room_id.ok()) + { + match roomuserid_presenceevent + .remove(&[room_id.as_bytes(), &[0xff], user_id.as_bytes()].concat()) + { + Ok(_) => removed_events += 1, + Err(e) => error!( + "An errord occured while removing a stale presence event: {e}" + ), + } + } + } + + info!("Cleaned up {removed_events} stale presence events!"); + sleep(period).await; + } + }); + + Ok(()) } - */ } -fn parse_presence_event(bytes: &[u8]) -> Result { +async fn create_presence_timer(duration: Duration, user_id: OwnedUserId) -> OwnedUserId { + sleep(duration).await; + + user_id +} + +fn parse_presence_event(bytes: &[u8], presence_timestamp: u64) -> Result { let mut presence: PresenceEvent = serde_json::from_slice(bytes) .map_err(|_| Error::bad_database("Invalid presence event in db."))?; - let current_timestamp: UInt = utils::millis_since_unix_epoch() - .try_into() - .expect("time is valid"); - - if presence.content.presence == PresenceState::Online { - // Don't set last_active_ago when the user is online - presence.content.last_active_ago = None; - } else { - // Convert from timestamp to duration - presence.content.last_active_ago = presence - .content - .last_active_ago - .map(|timestamp| current_timestamp - timestamp); - } + translate_active_ago(&mut presence, presence_timestamp); Ok(presence) } + +fn determine_presence_state(last_active_ago: u64) -> PresenceState { + let globals = &services().globals; + + if last_active_ago < globals.presence_idle_timeout() * 1000 { + PresenceState::Online + } else if last_active_ago < globals.presence_offline_timeout() * 1000 { + PresenceState::Unavailable + } else { + PresenceState::Offline + } +} + +/// Translates the timestamp representing last_active_ago to a diff from now. +fn translate_active_ago(presence_event: &mut PresenceEvent, last_active_ts: u64) { + let last_active_ago = millis_since_unix_epoch().saturating_sub(last_active_ts); + + presence_event.content.presence = determine_presence_state(last_active_ago); + + presence_event.content.last_active_ago = match presence_event.content.presence { + PresenceState::Online => None, + _ => Some(UInt::new_saturating(last_active_ago)), + } +} diff --git a/src/database/key_value/rooms/edus/read_receipt.rs b/src/database/key_value/rooms/edus/read_receipt.rs index fa97ea34..4722cdc0 100644 --- a/src/database/key_value/rooms/edus/read_receipt.rs +++ b/src/database/key_value/rooms/edus/read_receipt.rs @@ -105,16 +105,25 @@ impl service::rooms::edus::read_receipt::Data for KeyValueDatabase { ) } - fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64) -> Result<()> { + fn private_read_set( + &self, + room_id: &RoomId, + user_id: &UserId, + shorteventid: u64, + ) -> Result<()> { let mut key = room_id.as_bytes().to_vec(); key.push(0xff); key.extend_from_slice(user_id.as_bytes()); - self.roomuserid_privateread - .insert(&key, &count.to_be_bytes())?; + if self.private_read_get(room_id, user_id)?.unwrap_or(0) < shorteventid { + self.roomuserid_privateread + .insert(&key, &shorteventid.to_be_bytes())?; - self.roomuserid_lastprivatereadupdate - .insert(&key, &services().globals.next_count()?.to_be_bytes()) + self.roomuserid_lastprivatereadupdate + .insert(&key, &services().globals.next_count()?.to_be_bytes()) + } else { + Ok(()) + } } fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result> { diff --git a/src/database/key_value/rooms/state_accessor.rs b/src/database/key_value/rooms/state_accessor.rs index 0f0c0dc7..fe139315 100644 --- a/src/database/key_value/rooms/state_accessor.rs +++ b/src/database/key_value/rooms/state_accessor.rs @@ -2,7 +2,11 @@ use std::{collections::HashMap, sync::Arc}; use crate::{database::KeyValueDatabase, service, services, utils, Error, PduEvent, Result}; use async_trait::async_trait; -use ruma::{events::StateEventType, EventId, RoomId}; +use ruma::{ + events::{room::member::MembershipState, StateEventType}, + EventId, RoomId, UserId, +}; +use serde_json::Value; #[async_trait] impl service::rooms::state_accessor::Data for KeyValueDatabase { @@ -120,6 +124,21 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase { }) } + fn state_get_content( + &self, + shortstatehash: u64, + event_type: &StateEventType, + state_key: &str, + ) -> Result> { + let content = self + .state_get(shortstatehash, event_type, state_key)? + .map(|event| serde_json::from_str(event.content.get())) + .transpose() + .map_err(|_| Error::bad_database("Invalid event in database"))?; + + Ok(content) + } + /// Returns the state hash for this pdu. fn pdu_shortstatehash(&self, event_id: &EventId) -> Result> { self.eventid_shorteventid @@ -138,6 +157,23 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase { }) } + /// Get membership for given user in state + fn user_membership(&self, shortstatehash: u64, user_id: &UserId) -> Result { + self.state_get_content( + shortstatehash, + &StateEventType::RoomMember, + user_id.as_str(), + )? + .map(|content| match content.get("membership") { + Some(Value::String(membership)) => Ok(MembershipState::from(membership.as_str())), + None => Ok(MembershipState::Leave), + _ => Err(Error::bad_database( + "Malformed membership, expected Value::String", + )), + }) + .unwrap_or(Ok(MembershipState::Leave)) + } + /// Returns the full room state. async fn room_state_full( &self, diff --git a/src/database/key_value/rooms/user.rs b/src/database/key_value/rooms/user.rs index 4c435720..63a13d36 100644 --- a/src/database/key_value/rooms/user.rs +++ b/src/database/key_value/rooms/user.rs @@ -3,7 +3,13 @@ use ruma::{OwnedRoomId, OwnedUserId, RoomId, UserId}; use crate::{database::KeyValueDatabase, service, services, utils, Error, Result}; impl service::rooms::user::Data for KeyValueDatabase { - fn reset_notification_counts(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> { + fn update_notification_counts( + &self, + user_id: &UserId, + room_id: &RoomId, + notification_count: u64, + highlight_count: u64, + ) -> Result<()> { let mut userroom_id = user_id.as_bytes().to_vec(); userroom_id.push(0xff); userroom_id.extend_from_slice(room_id.as_bytes()); @@ -12,9 +18,9 @@ impl service::rooms::user::Data for KeyValueDatabase { roomuser_id.extend_from_slice(user_id.as_bytes()); self.userroomid_notificationcount - .insert(&userroom_id, &0_u64.to_be_bytes())?; + .insert(&userroom_id, ¬ification_count.to_be_bytes())?; self.userroomid_highlightcount - .insert(&userroom_id, &0_u64.to_be_bytes())?; + .insert(&userroom_id, &highlight_count.to_be_bytes())?; self.roomuserid_lastnotificationread.insert( &roomuser_id, diff --git a/src/database/mod.rs b/src/database/mod.rs index 78bb358b..edae29bc 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -66,8 +66,8 @@ pub struct KeyValueDatabase { pub(super) roomuserid_lastprivatereadupdate: Arc, // LastPrivateReadUpdate = Count pub(super) typingid_userid: Arc, // TypingId = RoomId + TimeoutTime + Count pub(super) roomid_lasttypingupdate: Arc, // LastRoomTypingUpdate = Count - pub(super) presenceid_presence: Arc, // PresenceId = RoomId + Count + UserId - pub(super) userid_lastpresenceupdate: Arc, // LastPresenceUpdate = Count + pub(super) userid_presenceupdate: Arc, // PresenceUpdate = Count + Timestamp + pub(super) roomuserid_presenceevent: Arc, // PresenceEvent //pub rooms: rooms::Rooms, pub(super) pduid_pdu: Arc, // PduId = ShortRoomId + Count @@ -289,8 +289,8 @@ impl KeyValueDatabase { .open_tree("roomuserid_lastprivatereadupdate")?, typingid_userid: builder.open_tree("typingid_userid")?, roomid_lasttypingupdate: builder.open_tree("roomid_lasttypingupdate")?, - presenceid_presence: builder.open_tree("presenceid_presence")?, - userid_lastpresenceupdate: builder.open_tree("userid_lastpresenceupdate")?, + userid_presenceupdate: builder.open_tree("userid_presenceupdate")?, + roomuserid_presenceevent: builder.open_tree("roomuserid_presenceevent")?, pduid_pdu: builder.open_tree("pduid_pdu")?, eventid_pduid: builder.open_tree("eventid_pduid")?, roomid_pduleaves: builder.open_tree("roomid_pduleaves")?, @@ -895,9 +895,6 @@ impl KeyValueDatabase { ); } - // This data is probably outdated - db.presenceid_presence.clear()?; - services().admin.start_handler(); // Set emergency access for the conduit user diff --git a/src/main.rs b/src/main.rs index da80507c..7cd8c127 100644 --- a/src/main.rs +++ b/src/main.rs @@ -328,6 +328,7 @@ fn routes() -> Router { .ruma_route(client_server::send_state_event_for_key_route) .ruma_route(client_server::get_state_events_route) .ruma_route(client_server::get_state_events_for_key_route) + .ruma_route(client_server::get_hierarchy_route) // Ruma doesn't have support for multiple paths for a single endpoint yet, and these routes // share one Ruma request / response type pair with {get,send}_state_event_for_key_route .route( @@ -391,6 +392,7 @@ fn routes() -> Router { .ruma_route(server_server::send_transaction_message_route) .ruma_route(server_server::get_event_route) .ruma_route(server_server::get_missing_events_route) + .ruma_route(server_server::get_backfill_route) .ruma_route(server_server::get_event_authorization_route) .ruma_route(server_server::get_room_state_route) .ruma_route(server_server::get_room_state_ids_route) diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index 77f351a9..2df3180f 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -26,7 +26,7 @@ use ruma::{ EventId, OwnedRoomAliasId, RoomAliasId, RoomId, RoomVersionId, ServerName, UserId, }; use serde_json::value::to_raw_value; -use tokio::sync::{mpsc, Mutex, MutexGuard}; +use tokio::sync::{mpsc, Mutex}; use crate::{ api::client_server::{leave_all_rooms, AUTO_GEN_PASSWORD_LENGTH}, @@ -206,26 +206,6 @@ impl Service { .expect("Database data for admin room alias must be valid") .expect("Admin room must exist"); - let send_message = |message: RoomMessageEventContent, mutex_lock: &MutexGuard<'_, ()>| { - services() - .rooms - .timeline - .build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomMessage, - content: to_raw_value(&message) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: None, - redacts: None, - }, - &conduit_user, - &conduit_room, - mutex_lock, - ) - .unwrap(); - }; - loop { tokio::select! { Some(event) = receiver.recv() => { @@ -245,7 +225,20 @@ impl Service { let state_lock = mutex_state.lock().await; - send_message(message_content, &state_lock); + services().rooms.timeline.build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomMessage, + content: to_raw_value(&message_content) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: None, + redacts: None, + }, + &conduit_user, + &conduit_room, + &state_lock) + .await + .unwrap(); drop(state_lock); } @@ -853,164 +846,202 @@ impl Service { content.room_version = services().globals.default_room_version(); // 1. The room create event - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomCreate, - content: to_raw_value(&content).expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - &conduit_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomCreate, + content: to_raw_value(&content).expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + &conduit_user, + &room_id, + &state_lock, + ) + .await?; // 2. Make conduit bot join - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomMember, - content: to_raw_value(&RoomMemberEventContent { - membership: MembershipState::Join, - displayname: None, - avatar_url: None, - is_direct: None, - third_party_invite: None, - blurhash: None, - reason: None, - join_authorized_via_users_server: None, - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some(conduit_user.to_string()), - redacts: None, - }, - &conduit_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomMember, + content: to_raw_value(&RoomMemberEventContent { + membership: MembershipState::Join, + displayname: None, + avatar_url: None, + is_direct: None, + third_party_invite: None, + blurhash: None, + reason: None, + join_authorized_via_users_server: None, + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some(conduit_user.to_string()), + redacts: None, + }, + &conduit_user, + &room_id, + &state_lock, + ) + .await?; // 3. Power levels let mut users = BTreeMap::new(); users.insert(conduit_user.clone(), 100.into()); - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomPowerLevels, - content: to_raw_value(&RoomPowerLevelsEventContent { - users, - ..Default::default() - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - &conduit_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomPowerLevels, + content: to_raw_value(&RoomPowerLevelsEventContent { + users, + ..Default::default() + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + &conduit_user, + &room_id, + &state_lock, + ) + .await?; // 4.1 Join Rules - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomJoinRules, - content: to_raw_value(&RoomJoinRulesEventContent::new(JoinRule::Invite)) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - &conduit_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomJoinRules, + content: to_raw_value(&RoomJoinRulesEventContent::new(JoinRule::Invite)) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + &conduit_user, + &room_id, + &state_lock, + ) + .await?; // 4.2 History Visibility - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomHistoryVisibility, - content: to_raw_value(&RoomHistoryVisibilityEventContent::new( - HistoryVisibility::Shared, - )) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - &conduit_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomHistoryVisibility, + content: to_raw_value(&RoomHistoryVisibilityEventContent::new( + HistoryVisibility::Shared, + )) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + &conduit_user, + &room_id, + &state_lock, + ) + .await?; // 4.3 Guest Access - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomGuestAccess, - content: to_raw_value(&RoomGuestAccessEventContent::new(GuestAccess::Forbidden)) + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomGuestAccess, + content: to_raw_value(&RoomGuestAccessEventContent::new( + GuestAccess::Forbidden, + )) .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - &conduit_user, - &room_id, - &state_lock, - )?; + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + &conduit_user, + &room_id, + &state_lock, + ) + .await?; // 5. Events implied by name and topic let room_name = format!("{} Admin Room", services().globals.server_name()); - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomName, - content: to_raw_value(&RoomNameEventContent::new(Some(room_name))) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - &conduit_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomName, + content: to_raw_value(&RoomNameEventContent::new(Some(room_name))) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + &conduit_user, + &room_id, + &state_lock, + ) + .await?; - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomTopic, - content: to_raw_value(&RoomTopicEventContent { - topic: format!("Manage {}", services().globals.server_name()), - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - &conduit_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomTopic, + content: to_raw_value(&RoomTopicEventContent { + topic: format!("Manage {}", services().globals.server_name()), + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + &conduit_user, + &room_id, + &state_lock, + ) + .await?; // 6. Room alias let alias: OwnedRoomAliasId = format!("#admins:{}", services().globals.server_name()) .try_into() .expect("#admins:server_name is a valid alias name"); - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomCanonicalAlias, - content: to_raw_value(&RoomCanonicalAliasEventContent { - alias: Some(alias.clone()), - alt_aliases: Vec::new(), - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - &conduit_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomCanonicalAlias, + content: to_raw_value(&RoomCanonicalAliasEventContent { + alias: Some(alias.clone()), + alt_aliases: Vec::new(), + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + &conduit_user, + &room_id, + &state_lock, + ) + .await?; services().rooms.alias.set_alias(&alias, &room_id)?; @@ -1052,72 +1083,84 @@ impl Service { .expect("@conduit:server_name is valid"); // Invite and join the real user - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomMember, - content: to_raw_value(&RoomMemberEventContent { - membership: MembershipState::Invite, - displayname: None, - avatar_url: None, - is_direct: None, - third_party_invite: None, - blurhash: None, - reason: None, - join_authorized_via_users_server: None, - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some(user_id.to_string()), - redacts: None, - }, - &conduit_user, - &room_id, - &state_lock, - )?; - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomMember, - content: to_raw_value(&RoomMemberEventContent { - membership: MembershipState::Join, - displayname: Some(displayname), - avatar_url: None, - is_direct: None, - third_party_invite: None, - blurhash: None, - reason: None, - join_authorized_via_users_server: None, - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some(user_id.to_string()), - redacts: None, - }, - user_id, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomMember, + content: to_raw_value(&RoomMemberEventContent { + membership: MembershipState::Invite, + displayname: None, + avatar_url: None, + is_direct: None, + third_party_invite: None, + blurhash: None, + reason: None, + join_authorized_via_users_server: None, + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some(user_id.to_string()), + redacts: None, + }, + &conduit_user, + &room_id, + &state_lock, + ) + .await?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomMember, + content: to_raw_value(&RoomMemberEventContent { + membership: MembershipState::Join, + displayname: Some(displayname), + avatar_url: None, + is_direct: None, + third_party_invite: None, + blurhash: None, + reason: None, + join_authorized_via_users_server: None, + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some(user_id.to_string()), + redacts: None, + }, + user_id, + &room_id, + &state_lock, + ) + .await?; // Set power level let mut users = BTreeMap::new(); users.insert(conduit_user.to_owned(), 100.into()); users.insert(user_id.to_owned(), 100.into()); - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomPowerLevels, - content: to_raw_value(&RoomPowerLevelsEventContent { - users, - ..Default::default() - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - &conduit_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomPowerLevels, + content: to_raw_value(&RoomPowerLevelsEventContent { + users, + ..Default::default() + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + &conduit_user, + &room_id, + &state_lock, + ) + .await?; // Send welcome message services().rooms.timeline.build_and_append_pdu( @@ -1135,7 +1178,7 @@ impl Service { &conduit_user, &room_id, &state_lock, - )?; + ).await?; Ok(()) } diff --git a/src/service/globals/mod.rs b/src/service/globals/mod.rs index bb823e2c..01af1557 100644 --- a/src/service/globals/mod.rs +++ b/src/service/globals/mod.rs @@ -52,7 +52,7 @@ pub struct Service { pub bad_signature_ratelimiter: Arc, RateLimitState>>>, pub servername_ratelimiter: Arc>>>, pub sync_receivers: RwLock>, - pub roomid_mutex_insert: RwLock>>>, + pub roomid_mutex_insert: RwLock>>>, pub roomid_mutex_state: RwLock>>>, pub roomid_mutex_federation: RwLock>>>, // this lock will be held longer pub roomid_federationhandletime: RwLock>, @@ -238,6 +238,14 @@ impl Service { self.config.allow_federation } + pub fn allow_public_read_receipts(&self) -> bool { + self.config.allow_public_read_receipts + } + + pub fn allow_receiving_read_receipts(&self) -> bool { + self.config.allow_receiving_read_receipts + } + pub fn allow_room_creation(&self) -> bool { self.config.allow_room_creation } @@ -254,6 +262,10 @@ impl Service { self.config.enable_lightning_bolt } + pub fn hierarchy_max_depth(&self) -> u64 { + self.config.hierarchy_max_depth + } + pub fn trusted_servers(&self) -> &[OwnedServerName] { &self.config.trusted_servers } @@ -290,6 +302,26 @@ impl Service { &self.config.emergency_password } + pub fn allow_presence(&self) -> bool { + self.config.allow_presence + } + + pub fn presence_idle_timeout(&self) -> u64 { + self.config.presence_idle_timeout + } + + pub fn presence_offline_timeout(&self) -> u64 { + self.config.presence_offline_timeout + } + + pub fn presence_cleanup_period(&self) -> u64 { + self.config.presence_cleanup_period + } + + pub fn presence_cleanup_limit(&self) -> u64 { + self.config.presence_cleanup_limit + } + pub fn supported_room_versions(&self) -> Vec { let mut room_versions: Vec = vec![]; room_versions.extend(self.stable_room_versions.clone()); diff --git a/src/service/mod.rs b/src/service/mod.rs index 385dcc69..97ae53f7 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -62,7 +62,7 @@ impl Services { auth_chain: rooms::auth_chain::Service { db }, directory: rooms::directory::Service { db }, edus: rooms::edus::Service { - presence: rooms::edus::presence::Service { db }, + presence: rooms::edus::presence::Service::build(db)?, read_receipt: rooms::edus::read_receipt::Service { db }, typing: rooms::edus::typing::Service { db }, }, @@ -77,7 +77,12 @@ impl Services { search: rooms::search::Service { db }, short: rooms::short::Service { db }, state: rooms::state::Service { db }, - state_accessor: rooms::state_accessor::Service { db }, + state_accessor: rooms::state_accessor::Service { + db, + server_visibility_cache: Mutex::new(LruCache::new( + (100.0 * config.conduit_cache_capacity_modifier) as usize, + )), + }, state_cache: rooms::state_cache::Service { db }, state_compressor: rooms::state_compressor::Service { db, diff --git a/src/service/pdu.rs b/src/service/pdu.rs index 554f3be7..a7983b24 100644 --- a/src/service/pdu.rs +++ b/src/service/pdu.rs @@ -281,6 +281,10 @@ impl state_res::Event for PduEvent { &self.sender } + fn origin_server_ts(&self) -> MilliSecondsSinceUnixEpoch { + MilliSecondsSinceUnixEpoch(self.origin_server_ts) + } + fn event_type(&self) -> &RoomEventType { &self.kind } @@ -289,10 +293,6 @@ impl state_res::Event for PduEvent { &self.content } - fn origin_server_ts(&self) -> MilliSecondsSinceUnixEpoch { - MilliSecondsSinceUnixEpoch(self.origin_server_ts) - } - fn state_key(&self) -> Option<&str> { self.state_key.as_deref() } diff --git a/src/service/rooms/edus/presence/data.rs b/src/service/rooms/edus/presence/data.rs index 53329e08..2dd78b6f 100644 --- a/src/service/rooms/edus/presence/data.rs +++ b/src/service/rooms/edus/presence/data.rs @@ -1,7 +1,8 @@ -use std::collections::HashMap; - use crate::Result; use ruma::{events::presence::PresenceEvent, OwnedUserId, RoomId, UserId}; +use tokio::sync::mpsc; + +use super::PresenceIter; pub trait Data: Send + Sync { /// Adds a presence event which will be saved until a new event replaces it. @@ -16,23 +17,29 @@ pub trait Data: Send + Sync { ) -> Result<()>; /// Resets the presence timeout, so the user will stay in their current presence state. - fn ping_presence(&self, user_id: &UserId) -> Result<()>; + fn ping_presence( + &self, + user_id: &UserId, + update_count: bool, + update_timestamp: bool, + ) -> Result<()>; /// Returns the timestamp of the last presence update of this user in millis since the unix epoch. - fn last_presence_update(&self, user_id: &UserId) -> Result>; + fn last_presence_update(&self, user_id: &UserId) -> Result>; /// Returns the presence event with correct last_active_ago. fn get_presence_event( &self, room_id: &RoomId, user_id: &UserId, - count: u64, + presence_timestamp: u64, ) -> Result>; /// Returns the most recent presence updates that happened after the event with id `since`. - fn presence_since( - &self, - room_id: &RoomId, - since: u64, - ) -> Result>; + fn presence_since<'a>(&'a self, room_id: &RoomId, since: u64) -> Result>; + + fn presence_maintain(&self, timer_receiver: mpsc::UnboundedReceiver) + -> Result<()>; + + fn presence_cleanup(&self) -> Result<()>; } diff --git a/src/service/rooms/edus/presence/mod.rs b/src/service/rooms/edus/presence/mod.rs index 860aea18..0f3421c9 100644 --- a/src/service/rooms/edus/presence/mod.rs +++ b/src/service/rooms/edus/presence/mod.rs @@ -1,16 +1,55 @@ mod data; -use std::collections::HashMap; pub use data::Data; use ruma::{events::presence::PresenceEvent, OwnedUserId, RoomId, UserId}; +use tokio::sync::mpsc; -use crate::Result; +use crate::{services, Error, Result}; + +pub(crate) type PresenceIter<'a> = Box + 'a>; pub struct Service { pub db: &'static dyn Data, + + // Presence timers + timer_sender: mpsc::UnboundedSender, } impl Service { + /// Builds the service and initialized the presence_maintain task + pub fn build(db: &'static dyn Data) -> Result { + let (sender, receiver) = mpsc::unbounded_channel(); + let service = Self { + db, + timer_sender: sender, + }; + + service.presence_maintain(receiver)?; + service.presence_cleanup()?; + + Ok(service) + } + + /// Resets the presence timeout, so the user will stay in their current presence state. + pub fn ping_presence( + &self, + user_id: &UserId, + update_count: bool, + update_timestamp: bool, + spawn_timer: bool, + ) -> Result<()> { + if !services().globals.allow_presence() { + return Ok(()); + } + + if spawn_timer { + self.spawn_timer(user_id)?; + } + + self.db + .ping_presence(user_id, update_count, update_timestamp) + } + /// Adds a presence event which will be saved until a new event replaces it. /// /// Note: This method takes a RoomId because presence updates are always bound to rooms to @@ -20,103 +59,78 @@ impl Service { user_id: &UserId, room_id: &RoomId, presence: PresenceEvent, + spawn_timer: bool, ) -> Result<()> { + if !services().globals.allow_presence() { + return Ok(()); + } + + if spawn_timer { + self.spawn_timer(user_id)?; + } + self.db.update_presence(user_id, room_id, presence) } - /// Resets the presence timeout, so the user will stay in their current presence state. - pub fn ping_presence(&self, user_id: &UserId) -> Result<()> { - self.db.ping_presence(user_id) + /// Returns the timestamp of when the presence was last updated for the specified user. + pub fn last_presence_update(&self, user_id: &UserId) -> Result> { + if !services().globals.allow_presence() { + return Ok(None); + } + + self.db.last_presence_update(user_id) } - pub fn get_last_presence_event( + /// Returns the saved presence event for this user with actual last_active_ago. + pub fn get_presence_event( &self, user_id: &UserId, room_id: &RoomId, ) -> Result> { + if !services().globals.allow_presence() { + return Ok(None); + } + let last_update = match self.db.last_presence_update(user_id)? { - Some(last) => last, + Some(last) => last.1, None => return Ok(None), }; self.db.get_presence_event(room_id, user_id, last_update) } - /* TODO - /// Sets all users to offline who have been quiet for too long. - fn _presence_maintain( - &self, - rooms: &super::Rooms, - globals: &super::super::globals::Globals, - ) -> Result<()> { - let current_timestamp = utils::millis_since_unix_epoch(); - - for (user_id_bytes, last_timestamp) in self - .userid_lastpresenceupdate - .iter() - .filter_map(|(k, bytes)| { - Some(( - k, - utils::u64_from_bytes(&bytes) - .map_err(|_| { - Error::bad_database("Invalid timestamp in userid_lastpresenceupdate.") - }) - .ok()?, - )) - }) - .take_while(|(_, timestamp)| current_timestamp.saturating_sub(*timestamp) > 5 * 60_000) - // 5 Minutes - { - // Send new presence events to set the user offline - let count = globals.next_count()?.to_be_bytes(); - let user_id: Box<_> = utils::string_from_bytes(&user_id_bytes) - .map_err(|_| { - Error::bad_database("Invalid UserId bytes in userid_lastpresenceupdate.") - })? - .try_into() - .map_err(|_| Error::bad_database("Invalid UserId in userid_lastpresenceupdate."))?; - for room_id in rooms.rooms_joined(&user_id).filter_map(|r| r.ok()) { - let mut presence_id = room_id.as_bytes().to_vec(); - presence_id.push(0xff); - presence_id.extend_from_slice(&count); - presence_id.push(0xff); - presence_id.extend_from_slice(&user_id_bytes); - - self.presenceid_presence.insert( - &presence_id, - &serde_json::to_vec(&PresenceEvent { - content: PresenceEventContent { - avatar_url: None, - currently_active: None, - displayname: None, - last_active_ago: Some( - last_timestamp.try_into().expect("time is valid"), - ), - presence: PresenceState::Offline, - status_msg: None, - }, - sender: user_id.to_owned(), - }) - .expect("PresenceEvent can be serialized"), - )?; - } - - self.userid_lastpresenceupdate.insert( - user_id.as_bytes(), - &utils::millis_since_unix_epoch().to_be_bytes(), - )?; - } - - Ok(()) - }*/ - /// Returns the most recent presence updates that happened after the event with id `since`. #[tracing::instrument(skip(self, since, room_id))] - pub fn presence_since( - &self, - room_id: &RoomId, - since: u64, - ) -> Result> { + pub fn presence_since<'a>(&'a self, room_id: &RoomId, since: u64) -> Result> { + if !services().globals.allow_presence() { + return Ok(Box::new(std::iter::empty())); + } + self.db.presence_since(room_id, since) } + + /// Spawns a task maintaining presence data + fn presence_maintain( + &self, + timer_receiver: mpsc::UnboundedReceiver, + ) -> Result<()> { + self.db.presence_maintain(timer_receiver) + } + + fn presence_cleanup(&self) -> Result<()> { + self.db.presence_cleanup() + } + + /// Spawns a timer for the user used by the maintenance task + fn spawn_timer(&self, user_id: &UserId) -> Result<()> { + if !services().globals.allow_presence() { + return Ok(()); + } + + self.timer_sender + .send(user_id.into()) + .map_err(|_| Error::bad_database("Sender errored out"))?; + + Ok(()) + } } diff --git a/src/service/rooms/edus/read_receipt/data.rs b/src/service/rooms/edus/read_receipt/data.rs index a183d196..7ebd3589 100644 --- a/src/service/rooms/edus/read_receipt/data.rs +++ b/src/service/rooms/edus/read_receipt/data.rs @@ -25,8 +25,9 @@ pub trait Data: Send + Sync { > + 'a, >; - /// Sets a private read marker at `count`. - fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64) -> Result<()>; + /// Sets a private read marker at `shorteventid`. + fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, shorteventid: u64) + -> Result<()>; /// Returns the private read marker. fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result>; diff --git a/src/service/rooms/edus/read_receipt/mod.rs b/src/service/rooms/edus/read_receipt/mod.rs index c6035280..a18a0dae 100644 --- a/src/service/rooms/edus/read_receipt/mod.rs +++ b/src/service/rooms/edus/read_receipt/mod.rs @@ -2,7 +2,7 @@ mod data; pub use data::Data; -use crate::Result; +use crate::{services, Result}; use ruma::{events::receipt::ReceiptEvent, serde::Raw, OwnedUserId, RoomId, UserId}; pub struct Service { @@ -36,10 +36,19 @@ impl Service { self.db.readreceipts_since(room_id, since) } - /// Sets a private read marker at `count`. + /// Sets a private read marker at `shorteventid`. #[tracing::instrument(skip(self))] - pub fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64) -> Result<()> { - self.db.private_read_set(room_id, user_id, count) + pub fn private_read_set( + &self, + room_id: &RoomId, + user_id: &UserId, + shorteventid: u64, + ) -> Result<()> { + self.db.private_read_set(room_id, user_id, shorteventid)?; + services() + .rooms + .user + .update_notification_counts(user_id, room_id) } /// Returns the private read marker. diff --git a/src/service/rooms/event_handler/mod.rs b/src/service/rooms/event_handler/mod.rs index bc67f7a2..4a03da5c 100644 --- a/src/service/rooms/event_handler/mod.rs +++ b/src/service/rooms/event_handler/mod.rs @@ -801,14 +801,18 @@ impl Service { .map_err(|_e| Error::BadRequest(ErrorKind::InvalidParam, "Auth check failed."))?; if soft_fail { - services().rooms.timeline.append_incoming_pdu( - &incoming_pdu, - val, - extremities.iter().map(|e| (**e).to_owned()).collect(), - state_ids_compressed, - soft_fail, - &state_lock, - )?; + services() + .rooms + .timeline + .append_incoming_pdu( + &incoming_pdu, + val, + extremities.iter().map(|e| (**e).to_owned()).collect(), + state_ids_compressed, + soft_fail, + &state_lock, + ) + .await?; // Soft fail, we keep the event as an outlier but don't add it to the timeline warn!("Event was soft failed: {:?}", incoming_pdu); @@ -1004,14 +1008,18 @@ impl Service { // We use the `state_at_event` instead of `state_after` so we accurately // represent the state for this event. - let pdu_id = services().rooms.timeline.append_incoming_pdu( - &incoming_pdu, - val, - extremities.iter().map(|e| (**e).to_owned()).collect(), - state_ids_compressed, - soft_fail, - &state_lock, - )?; + let pdu_id = services() + .rooms + .timeline + .append_incoming_pdu( + &incoming_pdu, + val, + extremities.iter().map(|e| (**e).to_owned()).collect(), + state_ids_compressed, + soft_fail, + &state_lock, + ) + .await?; info!("Appended incoming pdu"); diff --git a/src/service/rooms/state/mod.rs b/src/service/rooms/state/mod.rs index 3072b80f..b7a2cb79 100644 --- a/src/service/rooms/state/mod.rs +++ b/src/service/rooms/state/mod.rs @@ -7,14 +7,13 @@ use std::{ pub use data::Data; use ruma::{ events::{ - room::{create::RoomCreateEventContent, member::MembershipState}, + room::{create::RoomCreateEventContent, member::RoomMemberEventContent}, AnyStrippedStateEvent, RoomEventType, StateEventType, }, serde::Raw, state_res::{self, StateMap}, EventId, OwnedEventId, RoomId, RoomVersionId, UserId, }; -use serde::Deserialize; use tokio::sync::MutexGuard; use tracing::warn; @@ -60,15 +59,11 @@ impl Service { Err(_) => continue, }; - #[derive(Deserialize)] - struct ExtractMembership { - membership: MembershipState, - } - - let membership = match serde_json::from_str::(pdu.content.get()) { - Ok(e) => e.membership, - Err(_) => continue, - }; + let membership_event = + match serde_json::from_str::(pdu.content.get()) { + Ok(e) => e, + Err(_) => continue, + }; let state_key = match pdu.state_key { Some(k) => k, @@ -80,14 +75,18 @@ impl Service { Err(_) => continue, }; - services().rooms.state_cache.update_membership( - room_id, - &user_id, - membership, - &pdu.sender, - None, - false, - )?; + services() + .rooms + .state_cache + .update_membership( + room_id, + &user_id, + membership_event, + &pdu.sender, + None, + false, + ) + .await?; } services().rooms.state_cache.update_joined_count(room_id)?; diff --git a/src/service/rooms/state_accessor/data.rs b/src/service/rooms/state_accessor/data.rs index f3ae3c21..70261b09 100644 --- a/src/service/rooms/state_accessor/data.rs +++ b/src/service/rooms/state_accessor/data.rs @@ -1,7 +1,10 @@ use std::{collections::HashMap, sync::Arc}; use async_trait::async_trait; -use ruma::{events::StateEventType, EventId, RoomId}; +use ruma::{ + events::{room::member::MembershipState, StateEventType}, + EventId, RoomId, UserId, +}; use crate::{PduEvent, Result}; @@ -32,9 +35,19 @@ pub trait Data: Send + Sync { state_key: &str, ) -> Result>>; + fn state_get_content( + &self, + shortstatehash: u64, + event_type: &StateEventType, + state_key: &str, + ) -> Result>; + /// Returns the state hash for this pdu. fn pdu_shortstatehash(&self, event_id: &EventId) -> Result>; + /// Get membership for given user in state + fn user_membership(&self, shortstatehash: u64, user_id: &UserId) -> Result; + /// Returns the full room state. async fn room_state_full( &self, diff --git a/src/service/rooms/state_accessor/mod.rs b/src/service/rooms/state_accessor/mod.rs index 87d99368..7a8e65db 100644 --- a/src/service/rooms/state_accessor/mod.rs +++ b/src/service/rooms/state_accessor/mod.rs @@ -1,13 +1,25 @@ mod data; -use std::{collections::HashMap, sync::Arc}; +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; pub use data::Data; -use ruma::{events::StateEventType, EventId, RoomId}; +use lru_cache::LruCache; +use ruma::{ + events::{ + room::{history_visibility::HistoryVisibility, member::MembershipState}, + StateEventType, + }, + EventId, OwnedServerName, OwnedUserId, RoomId, ServerName, UserId, +}; +use tracing::warn; use crate::{PduEvent, Result}; pub struct Service { pub db: &'static dyn Data, + pub server_visibility_cache: Mutex>, } impl Service { @@ -46,11 +58,95 @@ impl Service { self.db.state_get(shortstatehash, event_type, state_key) } + pub fn state_get_content( + &self, + shortstatehash: u64, + event_type: &StateEventType, + state_key: &str, + ) -> Result> { + self.db + .state_get_content(shortstatehash, event_type, state_key) + } + /// Returns the state hash for this pdu. pub fn pdu_shortstatehash(&self, event_id: &EventId) -> Result> { self.db.pdu_shortstatehash(event_id) } + /// Whether a server is allowed to see an event through federation, based on + /// the room's history_visibility at that event's state. + #[tracing::instrument(skip(self))] + pub fn server_can_see_event( + &self, + server_name: &ServerName, + current_server_members: &[OwnedUserId], + event_id: &EventId, + ) -> Result { + let shortstatehash = match self.pdu_shortstatehash(event_id) { + Ok(Some(shortstatehash)) => shortstatehash, + _ => return Ok(false), + }; + + if let Some(visibility) = self + .server_visibility_cache + .lock() + .unwrap() + .get_mut(&(server_name.to_owned(), shortstatehash)) + { + return Ok(*visibility); + } + + let history_visibility = self + .state_get_content(shortstatehash, &StateEventType::RoomHistoryVisibility, "")? + .map(|content| match content.get("history_visibility") { + Some(visibility) => HistoryVisibility::from(visibility.as_str().unwrap_or("")), + None => HistoryVisibility::Shared, + }); + + let visibility = match history_visibility { + Some(HistoryVisibility::WorldReadable) => { + // Allow if event was sent while world readable + true + } + Some(HistoryVisibility::Invited) => { + // Allow if any member on requesting server was AT LEAST invited, else deny + current_server_members + .iter() + .any(|member| self.user_was_invited(shortstatehash, member)) + } + _ => { + // Allow if any member on requested server was joined, else deny + current_server_members + .iter() + .any(|member| self.user_was_joined(shortstatehash, member)) + } + }; + + self.server_visibility_cache + .lock() + .unwrap() + .insert((server_name.to_owned(), shortstatehash), visibility); + + Ok(visibility) + } + + /// The user was a joined member at this state (potentially in the past) + fn user_was_joined(&self, shortstatehash: u64, user_id: &UserId) -> bool { + self.db + .user_membership(shortstatehash, user_id) + .map(|s| s == MembershipState::Join) + .unwrap_or_default() // Return sensible default, i.e. false + } + + /// The user was an invited or joined room member at this state (potentially + /// in the past) + fn user_was_invited(&self, shortstatehash: u64, user_id: &UserId) -> bool { + self.db + .user_membership(shortstatehash, user_id) + .map(|s| s == MembershipState::Join || s == MembershipState::Invite) + .unwrap_or_default() // Return sensible default, i.e. false + } + /// Returns the full room state. #[tracing::instrument(skip(self))] pub async fn room_state_full( diff --git a/src/service/rooms/state_cache/mod.rs b/src/service/rooms/state_cache/mod.rs index 32afdd4e..dfbe5db7 100644 --- a/src/service/rooms/state_cache/mod.rs +++ b/src/service/rooms/state_cache/mod.rs @@ -4,10 +4,14 @@ use std::{collections::HashSet, sync::Arc}; pub use data::Data; use ruma::{ + api::federation::{self, query::get_profile_information::v1::ProfileField}, events::{ direct::DirectEvent, ignored_user_list::IgnoredUserListEvent, - room::{create::RoomCreateEventContent, member::MembershipState}, + room::{ + create::RoomCreateEventContent, + member::{MembershipState, RoomMemberEventContent}, + }, AnyStrippedStateEvent, AnySyncStateEvent, GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType, }, @@ -24,19 +28,43 @@ pub struct Service { impl Service { /// Update current membership data. #[tracing::instrument(skip(self, last_state))] - pub fn update_membership( + pub async fn update_membership( &self, room_id: &RoomId, user_id: &UserId, - membership: MembershipState, + membership_event: RoomMemberEventContent, sender: &UserId, last_state: Option>>, update_joined_count: bool, ) -> Result<()> { + let membership = membership_event.membership; // Keep track what remote users exist by adding them as "deactivated" users if user_id.server_name() != services().globals.server_name() { services().users.create(user_id, None)?; - // TODO: displayname, avatar url + // Try to update our local copy of the user if ours does not match + if ((services().users.displayname(user_id)? != membership_event.displayname) + || (services().users.avatar_url(user_id)? != membership_event.avatar_url) + || (services().users.blurhash(user_id)? != membership_event.blurhash)) + && (membership != MembershipState::Leave) + { + let response = services() + .sending + .send_federation_request( + user_id.server_name(), + federation::query::get_profile_information::v1::Request { + user_id: user_id.into(), + field: Some(ProfileField::AvatarUrl), + }, + ) + .await?; + let _ = services() + .users + .set_displayname(user_id, response.displayname.clone()); + let _ = services() + .users + .set_avatar_url(user_id, response.avatar_url); + let _ = services().users.set_blurhash(user_id, response.blurhash); + }; } match &membership { diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index 34399d46..3cc64a4b 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -15,7 +15,8 @@ use ruma::{ events::{ push_rules::PushRulesEvent, room::{ - create::RoomCreateEventContent, member::MembershipState, + create::RoomCreateEventContent, + member::{MembershipState, RoomMemberEventContent}, power_levels::RoomPowerLevelsEventContent, }, GlobalAccountDataEventType, RoomEventType, StateEventType, @@ -145,7 +146,7 @@ impl Service { /// /// Returns pdu id #[tracing::instrument(skip(self, pdu, pdu_json, leaves))] - pub fn append_pdu<'a>( + pub async fn append_pdu<'a>( &self, pdu: &PduEvent, mut pdu_json: CanonicalJsonObject, @@ -211,20 +212,19 @@ impl Service { .entry(pdu.room_id.clone()) .or_default(), ); - let insert_lock = mutex_insert.lock().unwrap(); + let insert_lock = mutex_insert.lock().await; - let count1 = services().globals.next_count()?; + let _count1 = services().globals.next_count()?; // Mark as read first so the sending client doesn't get a notification even if appending // fails - services() - .rooms - .edus - .read_receipt - .private_read_set(&pdu.room_id, &pdu.sender, count1)?; - services() - .rooms - .user - .reset_notification_counts(&pdu.sender, &pdu.room_id)?; + services().rooms.edus.read_receipt.private_read_set( + &pdu.room_id, + &pdu.sender, + services() + .rooms + .short + .get_or_create_shorteventid(&pdu.event_id)?, + )?; let count2 = services().globals.next_count()?; let mut pdu_id = shortroomid.to_be_bytes().to_vec(); @@ -323,16 +323,11 @@ impl Service { } RoomEventType::RoomMember => { if let Some(state_key) = &pdu.state_key { - #[derive(Deserialize)] - struct ExtractMembership { - membership: MembershipState, - } - // if the state_key fails let target_user_id = UserId::parse(state_key.clone()) .expect("This state_key was previously validated"); - let content = serde_json::from_str::(pdu.content.get()) + let content = serde_json::from_str::(pdu.content.get()) .map_err(|_| Error::bad_database("Invalid content in pdu."))?; let invite_state = match content.membership { @@ -345,14 +340,18 @@ impl Service { // Update our membership info, we do this here incase a user is invited // and immediately leaves we need the DB to record the invite event for auth - services().rooms.state_cache.update_membership( - &pdu.room_id, - &target_user_id, - content.membership, - &pdu.sender, - invite_state, - true, - )?; + services() + .rooms + .state_cache + .update_membership( + &pdu.room_id, + &target_user_id, + content, + &pdu.sender, + invite_state, + true, + ) + .await?; } } RoomEventType::RoomMessage => { @@ -673,7 +672,7 @@ impl Service { /// Creates a new persisted data unit and adds it to a room. This function takes a /// roomid_mutex_state, meaning that only this function is able to mutate the room state. #[tracing::instrument(skip(self, state_lock))] - pub fn build_and_append_pdu( + pub async fn build_and_append_pdu( &self, pdu_builder: PduBuilder, sender: &UserId, @@ -687,14 +686,16 @@ impl Service { // pdu without it's state. This is okay because append_pdu can't fail. let statehashid = services().rooms.state.append_to_state(&pdu)?; - let pdu_id = self.append_pdu( - &pdu, - pdu_json, - // Since this PDU references all pdu_leaves we can update the leaves - // of the room - vec![(*pdu.event_id).to_owned()], - state_lock, - )?; + let pdu_id = self + .append_pdu( + &pdu, + pdu_json, + // Since this PDU references all pdu_leaves we can update the leaves + // of the room + vec![(*pdu.event_id).to_owned()], + state_lock, + ) + .await?; // We set the room state after inserting the pdu, so that we never have a moment in time // where events in the current room state do not exist @@ -732,7 +733,7 @@ impl Service { /// Append the incoming event setting the state snapshot to the state from the /// server that sent the event. #[tracing::instrument(skip_all)] - pub fn append_incoming_pdu<'a>( + pub async fn append_incoming_pdu<'a>( &self, pdu: &PduEvent, pdu_json: CanonicalJsonObject, @@ -762,11 +763,11 @@ impl Service { return Ok(None); } - let pdu_id = - services() - .rooms - .timeline - .append_pdu(pdu, pdu_json, new_room_leaves, state_lock)?; + let pdu_id = services() + .rooms + .timeline + .append_pdu(pdu, pdu_json, new_room_leaves, state_lock) + .await?; Ok(Some(pdu_id)) } diff --git a/src/service/rooms/user/data.rs b/src/service/rooms/user/data.rs index 4b8a4eca..90fc18bf 100644 --- a/src/service/rooms/user/data.rs +++ b/src/service/rooms/user/data.rs @@ -2,7 +2,13 @@ use crate::Result; use ruma::{OwnedRoomId, OwnedUserId, RoomId, UserId}; pub trait Data: Send + Sync { - fn reset_notification_counts(&self, user_id: &UserId, room_id: &RoomId) -> Result<()>; + fn update_notification_counts( + &self, + user_id: &UserId, + room_id: &RoomId, + notification_count: u64, + highlight_count: u64, + ) -> Result<()>; fn notification_count(&self, user_id: &UserId, room_id: &RoomId) -> Result; diff --git a/src/service/rooms/user/mod.rs b/src/service/rooms/user/mod.rs index 672e502d..3b3e6374 100644 --- a/src/service/rooms/user/mod.rs +++ b/src/service/rooms/user/mod.rs @@ -1,17 +1,117 @@ mod data; pub use data::Data; -use ruma::{OwnedRoomId, OwnedUserId, RoomId, UserId}; +use ruma::{ + events::{ + push_rules::PushRulesEvent, room::power_levels::RoomPowerLevelsEventContent, + GlobalAccountDataEventType, StateEventType, + }, + push::{Action, Ruleset, Tweak}, + OwnedRoomId, OwnedUserId, RoomId, UserId, +}; -use crate::Result; +use crate::{services, Error, Result}; pub struct Service { pub db: &'static dyn Data, } impl Service { - pub fn reset_notification_counts(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> { - self.db.reset_notification_counts(user_id, room_id) + pub fn update_notification_counts(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> { + let power_levels: RoomPowerLevelsEventContent = services() + .rooms + .state_accessor + .room_state_get(room_id, &StateEventType::RoomPowerLevels, "")? + .map(|ev| { + serde_json::from_str(ev.content.get()) + .map_err(|_| Error::bad_database("invalid m.room.power_levels event")) + }) + .transpose()? + .unwrap_or_default(); + + let read_event = services() + .rooms + .edus + .read_receipt + .private_read_get(room_id, user_id) + .unwrap_or(None) + .unwrap_or(0u64); + let mut notification_count = 0u64; + let mut highlight_count = 0u64; + + services() + .rooms + .timeline + .pdus_since(user_id, room_id, read_event)? + .filter_map(|pdu| pdu.ok()) + .map(|(_, pdu)| pdu) + .filter(|pdu| { + // Don't include user's own messages in notification counts + user_id != pdu.sender + && services() + .rooms + .short + .get_or_create_shorteventid(&pdu.event_id) + .unwrap_or(0) + != read_event + }) + .filter_map(|pdu| { + let rules_for_user = services() + .account_data + .get( + None, + user_id, + GlobalAccountDataEventType::PushRules.to_string().into(), + ) + .ok()? + .map(|event| { + serde_json::from_str::(event.get()) + .map_err(|_| Error::bad_database("Invalid push rules event in db.")) + }) + .transpose() + .ok()? + .map(|ev: PushRulesEvent| ev.content.global) + .unwrap_or_else(|| Ruleset::server_default(user_id)); + + let mut highlight = false; + let mut notify = false; + + for action in services() + .pusher + .get_actions( + user_id, + &rules_for_user, + &power_levels, + &pdu.to_sync_room_event(), + &pdu.room_id, + ) + .ok()? + { + match action { + Action::DontNotify => notify = false, + // TODO: Implement proper support for coalesce + Action::Notify | Action::Coalesce => notify = true, + Action::SetTweak(Tweak::Highlight(true)) => { + highlight = true; + } + _ => {} + }; + } + + if notify { + notification_count += 1; + }; + + if highlight { + highlight_count += 1; + }; + + Some(()) + }) + .for_each(|_| {}); + + self.db + .update_notification_counts(user_id, room_id, notification_count, highlight_count) } pub fn notification_count(&self, user_id: &UserId, room_id: &RoomId) -> Result { diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index 1861feb4..e33f9af7 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -24,7 +24,8 @@ use ruma::{ federation::{ self, transactions::edu::{ - DeviceListUpdateContent, Edu, ReceiptContent, ReceiptData, ReceiptMap, + DeviceListUpdateContent, Edu, PresenceContent, PresenceUpdate, ReceiptContent, + ReceiptData, ReceiptMap, }, }, OutgoingRequest, @@ -283,6 +284,34 @@ impl Service { .filter(|user_id| user_id.server_name() == services().globals.server_name()), ); + // Look for presence updates in this room + let presence_updates: Vec = services() + .rooms + .edus + .presence + .presence_since(&room_id, since)? + .filter(|(user_id, _)| user_id.server_name() == services().globals.server_name()) + .map(|(user_id, presence_event)| PresenceUpdate { + user_id, + presence: presence_event.content.presence, + status_msg: presence_event.content.status_msg, + last_active_ago: presence_event + .content + .last_active_ago + .unwrap_or_else(|| uint!(0)), + currently_active: presence_event.content.currently_active.unwrap_or(false), + }) + .collect(); + + let presence_content = PresenceContent { + push: presence_updates, + }; + + events.push( + serde_json::to_vec(&Edu::Presence(presence_content)) + .expect("presence json can be serialized"), + ); + // Look for read receipts in this room for r in services() .rooms