don't return extra member count or e2ee device updates from sync

Previously, we were returning redundant member count updates or encrypted
device updates from the /sync endpoint in some cases. The extra member
count updates are spec-compliant, but unnecessary, while the extra
encrypted device updates violate the spec.

The refactor necessary to fix this bug is also necessary to support
filtering on state events in sync.

Details:

Joined room incremental sync needs to examine state events for four
purposes:

 1. determining whether we need to return an update to room member counts
 2. determining the set of left/joined devices for encrypted rooms
    (returned in `device_lists`)
 3. returning state events to the client (in `rooms.joined.*.state`)
 4. tracking which member events we have sent to the client, so they can
    be omitted on future requests when lazy-loading is enabled.

The state events that we need to examine for the first two cases is member
events in the delta between `since` and the end of `timeline`. For the
second two cases, we need the delta between `since` and the start of
`timeline`, plus contextual member events for any senders that occur in
`timeline`. The second list is subject to filtering, while the first is
not.

Before this change, we were using the same set of state events that we are
returning to the client (cases 3/4) to do the analysis for cases 1/2.
In a compliant implementation, this would result in us missing some
relevant member events in 1/2 in addition to seeing redundant member
events. In current conduwuit this is not the case because the set of
events that we return to the client is always a superset of the set that
is needed for cases 1/2. This is because we don't support filtering, and
we have an existing bug[1] where we are returning the delta between
`since` and the end of `timeline` rather than the start.

[1]: https://github.com/girlbossceo/conduwuit/issues/361

Fixing this is necessary to implement filtering because otherwise
we would start missing some member events for member count or encrypted
device updates if the relevant member events are rejected by the filter.
This would be much worse than our current behavior.
This commit is contained in:
Benjamin Lee 2024-05-10 17:17:50 -07:00 committed by June 🍓🦴
parent 8bffcfe82b
commit 9eb0784f6f

View file

@ -750,8 +750,7 @@ async fn load_joined_room(
// Incremental /sync // Incremental /sync
let since_shortstatehash = since_shortstatehash.unwrap(); let since_shortstatehash = since_shortstatehash.unwrap();
let mut state_events = Vec::new(); let mut delta_state_events = Vec::new();
let mut lazy_loaded = HashSet::new();
if since_shortstatehash != current_shortstatehash { if since_shortstatehash != current_shortstatehash {
let current_state_ids = services() let current_state_ids = services()
@ -772,55 +771,12 @@ async fn load_joined_room(
continue; continue;
}; };
if pdu.kind == TimelineEventType::RoomMember { delta_state_events.push(pdu);
match UserId::parse(
pdu.state_key
.as_ref()
.expect("State event has state key")
.clone(),
) {
Ok(state_key_userid) => {
lazy_loaded.insert(state_key_userid);
},
Err(e) => error!("Invalid state key for member event: {}", e),
}
}
state_events.push(pdu);
tokio::task::yield_now().await; tokio::task::yield_now().await;
} }
} }
} }
for (_, event) in &timeline_pdus {
if lazy_loaded.contains(&event.sender) {
continue;
}
if !services().rooms.lazy_loading.lazy_load_was_sent_before(
sender_user,
sender_device,
room_id,
&event.sender,
)? || lazy_load_send_redundant
{
if let Some(member_event) = services().rooms.state_accessor.room_state_get(
room_id,
&StateEventType::RoomMember,
event.sender.as_str(),
)? {
lazy_loaded.insert(event.sender.clone());
state_events.push(member_event);
}
}
}
services()
.rooms
.lazy_loading
.lazy_load_mark_sent(sender_user, sender_device, room_id, lazy_loaded, next_batchcount)
.await;
let encrypted_room = services() let encrypted_room = services()
.rooms .rooms
.state_accessor .state_accessor
@ -836,12 +792,12 @@ async fn load_joined_room(
// Calculations: // Calculations:
let new_encrypted_room = encrypted_room && since_encryption.is_none(); let new_encrypted_room = encrypted_room && since_encryption.is_none();
let send_member_count = state_events let send_member_count = delta_state_events
.iter() .iter()
.any(|event| event.kind == TimelineEventType::RoomMember); .any(|event| event.kind == TimelineEventType::RoomMember);
if encrypted_room { if encrypted_room {
for state_event in &state_events { for state_event in &delta_state_events {
if state_event.kind != TimelineEventType::RoomMember { if state_event.kind != TimelineEventType::RoomMember {
continue; continue;
} }
@ -902,6 +858,57 @@ async fn load_joined_room(
(None, None, Vec::new()) (None, None, Vec::new())
}; };
let mut state_events = delta_state_events;
let mut lazy_loaded = HashSet::new();
// Mark all member events we're returning as lazy-loaded
for pdu in &state_events {
if pdu.kind == TimelineEventType::RoomMember {
match UserId::parse(
pdu.state_key
.as_ref()
.expect("State event has state key")
.clone(),
) {
Ok(state_key_userid) => {
lazy_loaded.insert(state_key_userid);
},
Err(e) => error!("Invalid state key for member event: {}", e),
}
}
}
// Fetch contextual member state events for events from the timeline, and
// mark them as lazy-loaded as well.
for (_, event) in &timeline_pdus {
if lazy_loaded.contains(&event.sender) {
continue;
}
if !services().rooms.lazy_loading.lazy_load_was_sent_before(
sender_user,
sender_device,
room_id,
&event.sender,
)? || lazy_load_send_redundant
{
if let Some(member_event) = services().rooms.state_accessor.room_state_get(
room_id,
&StateEventType::RoomMember,
event.sender.as_str(),
)? {
lazy_loaded.insert(event.sender.clone());
state_events.push(member_event);
}
}
}
services()
.rooms
.lazy_loading
.lazy_load_mark_sent(sender_user, sender_device, room_id, lazy_loaded, next_batchcount)
.await;
( (
heroes, heroes,
joined_member_count, joined_member_count,