remove sync response cache

This cache can serve invalid responses, and has an extremely low hit
rate.

It serves invalid responses because because it's only keyed off
the `since` parameter, but many of the other request parameters also
affect the response or it's side effects. This will become worse once we
implement filtering, because there will be a wider space of parameters
with different responses. This problem is fixable, but not worth it
because of the low hit rate.

The low hit rate is because normal clients will always issue the next
sync request with `since` set to the `prev_batch` value of the previous
response. The only time we expect to see multiple requests with the same
`since` is when the response is empty, but we don't cache empty
responses.

This was confirmed experimentally by logging cache hits and misses over
15 minutes with a wide variety of clients. This test was run on
matrix.computer.surgery, which has only a few active users, but a
large volume of sync traffic from many rooms. Over the test period, we
had 3 hits and 5309 misses. All hits occurred in the first minute, so I
suspect that they had something to do with client recovery from an
offline state. The clients that were connected during the test are:

 - element web
 - schildichat web
 - iamb
 - gomuks
 - nheko
 - fractal
 - fluffychat web
 - fluffychat android
 - cinny web
 - element android
 - element X android

Fixes: #336
This commit is contained in:
Benjamin Lee 2024-05-16 21:02:05 -07:00 committed by June
parent 6ef4781050
commit 8bffcfe82b
2 changed files with 8 additions and 109 deletions

View file

@ -25,10 +25,9 @@ use ruma::{
StateEventType, TimelineEventType,
},
serde::Raw,
uint, DeviceId, EventId, OwnedDeviceId, OwnedUserId, RoomId, UInt, UserId,
uint, DeviceId, EventId, OwnedUserId, RoomId, UInt, UserId,
};
use tokio::sync::watch::Sender;
use tracing::{debug, error, Instrument as _, Span};
use tracing::{error, Instrument as _, Span};
use crate::{
service::{pdu::EventHash, rooms::timeline::PduCount},
@ -73,10 +72,6 @@ use crate::{
/// For left rooms:
/// - If the user left after `since`: `prev_batch` token, empty state (TODO:
/// subset of the state at the point of the leave)
///
/// - Sync is handled in an async task, multiple requests from the same device
/// with the same
/// `since` will be cached
pub(crate) async fn sync_events_route(
body: Ruma<sync_events::v3::Request>,
) -> Result<sync_events::v3::Response, RumaResponse<UiaaResponse>> {
@ -84,95 +79,6 @@ pub(crate) async fn sync_events_route(
let sender_device = body.sender_device.expect("user is authenticated");
let body = body.body;
let mut rx = match services()
.globals
.sync_receivers
.write()
.await
.entry((sender_user.clone(), sender_device.clone()))
{
Entry::Vacant(v) => {
let (tx, rx) = tokio::sync::watch::channel(None);
v.insert((body.since.clone(), rx.clone()));
tokio::spawn(sync_helper_wrapper(sender_user.clone(), sender_device.clone(), body, tx));
rx
},
Entry::Occupied(mut o) => {
if o.get().0 != body.since {
let (tx, rx) = tokio::sync::watch::channel(None);
o.insert((body.since.clone(), rx.clone()));
debug!("Sync started for {sender_user}");
tokio::spawn(sync_helper_wrapper(sender_user.clone(), sender_device.clone(), body, tx));
rx
} else {
o.get().1.clone()
}
},
};
let we_have_to_wait = rx.borrow().is_none();
if we_have_to_wait {
if let Err(e) = rx.changed().await {
error!("Error waiting for sync: {}", e);
}
}
let result = match rx
.borrow()
.as_ref()
.expect("When sync channel changes it's always set to some")
{
Ok(response) => Ok(response.clone()),
Err(error) => Err(error.to_response()),
};
result
}
async fn sync_helper_wrapper(
sender_user: OwnedUserId, sender_device: OwnedDeviceId, body: sync_events::v3::Request,
tx: Sender<Option<Result<sync_events::v3::Response>>>,
) {
let since = body.since.clone();
let r = sync_helper(sender_user.clone(), sender_device.clone(), body).await;
if let Ok((_, caching_allowed)) = r {
if !caching_allowed {
match services()
.globals
.sync_receivers
.write()
.await
.entry((sender_user, sender_device))
{
Entry::Occupied(o) => {
// Only remove if the device didn't start a different /sync already
if o.get().0 == since {
o.remove();
}
},
Entry::Vacant(_) => {},
}
}
}
_ = tx.send(Some(r.map(|(r, _)| r)));
}
async fn sync_helper(
sender_user: OwnedUserId,
sender_device: OwnedDeviceId,
body: sync_events::v3::Request,
// bool = caching allowed
) -> Result<(sync_events::v3::Response, bool), Error> {
// Presence update
if services().globals.allow_local_presence() {
services()
@ -414,10 +320,9 @@ async fn sync_helper(
duration = Duration::from_secs(30);
}
_ = tokio::time::timeout(duration, watcher).await;
Ok((response, false))
} else {
Ok((response, since != next_batch)) // Only cache if we made progress
}
Ok(response)
}
#[tracing::instrument(skip_all, fields(user_id = %sender_user, room_id = %room_id))]

View file

@ -18,14 +18,14 @@ use ipaddress::IPAddress;
use regex::RegexSet;
use ruma::{
api::{
client::{discovery::discover_support::ContactRole, sync::sync_events},
client::discovery::discover_support::ContactRole,
federation::discovery::{ServerSigningKeys, VerifyKey},
},
serde::Base64,
DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedServerName, OwnedServerSigningKeyId, OwnedUserId,
RoomVersionId, ServerName, UserId,
DeviceId, OwnedEventId, OwnedRoomId, OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomVersionId,
ServerName, UserId,
};
use tokio::sync::{broadcast, watch::Receiver, Mutex, RwLock};
use tokio::sync::{broadcast, Mutex, RwLock};
use tracing::{error, info, trace};
use url::Url;
@ -36,10 +36,6 @@ mod data;
mod resolver;
type RateLimitState = (Instant, u32); // Time if last failed try, number of failed tries
type SyncHandle = (
Option<String>, // since
Receiver<Option<Result<sync_events::v3::Response>>>, // rx
);
pub(crate) struct Service<'a> {
pub(crate) db: &'static dyn Data,
@ -56,7 +52,6 @@ pub(crate) struct Service<'a> {
pub(crate) bad_event_ratelimiter: Arc<RwLock<HashMap<OwnedEventId, RateLimitState>>>,
pub(crate) bad_signature_ratelimiter: Arc<RwLock<HashMap<Vec<String>, RateLimitState>>>,
pub(crate) bad_query_ratelimiter: Arc<RwLock<HashMap<OwnedServerName, RateLimitState>>>,
pub(crate) sync_receivers: RwLock<HashMap<(OwnedUserId, OwnedDeviceId), SyncHandle>>,
pub(crate) roomid_mutex_insert: RwLock<HashMap<OwnedRoomId, Arc<Mutex<()>>>>,
pub(crate) roomid_mutex_state: RwLock<HashMap<OwnedRoomId, Arc<Mutex<()>>>>,
pub(crate) roomid_mutex_federation: RwLock<HashMap<OwnedRoomId, Arc<Mutex<()>>>>, // this lock will be held longer
@ -163,7 +158,6 @@ impl Service<'_> {
roomid_mutex_federation: RwLock::new(HashMap::new()),
roomid_federationhandletime: RwLock::new(HashMap::new()),
stateres_mutex: Arc::new(Mutex::new(())),
sync_receivers: RwLock::new(HashMap::new()),
rotate: RotationHandler::new(),
started: SystemTime::now(),
shutdown: AtomicBool::new(false),