improvement: better sqlite

This commit is contained in:
Timo Kösters 2021-08-15 13:17:42 +02:00
parent a4310f840e
commit 2c3bee34a0
No known key found for this signature in database
GPG key ID: 356E705610F626D5
3 changed files with 47 additions and 18 deletions

View file

@ -35,6 +35,7 @@ pub trait Tree: Send + Sync {
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>; ) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>;
fn increment(&self, key: &[u8]) -> Result<Vec<u8>>; fn increment(&self, key: &[u8]) -> Result<Vec<u8>>;
fn increment_batch<'a>(&self, iter: &mut dyn Iterator<Item = Vec<u8>>) -> Result<()>;
fn scan_prefix<'a>( fn scan_prefix<'a>(
&'a self, &'a self,

View file

@ -49,11 +49,11 @@ impl Engine {
fn prepare_conn(path: &Path, cache_size_kb: u32) -> Result<Connection> { fn prepare_conn(path: &Path, cache_size_kb: u32) -> Result<Connection> {
let conn = Connection::open(&path)?; let conn = Connection::open(&path)?;
conn.pragma_update(Some(Main), "page_size", &32768)?; conn.pragma_update(Some(Main), "page_size", &1024)?;
conn.pragma_update(Some(Main), "journal_mode", &"WAL")?; conn.pragma_update(Some(Main), "journal_mode", &"WAL")?;
conn.pragma_update(Some(Main), "synchronous", &"NORMAL")?; conn.pragma_update(Some(Main), "synchronous", &"NORMAL")?;
conn.pragma_update(Some(Main), "cache_size", &(-i64::from(cache_size_kb)))?; conn.pragma_update(Some(Main), "cache_size", &(-i64::from(cache_size_kb)))?;
conn.pragma_update(Some(Main), "wal_autocheckpoint", &0)?; conn.pragma_update(Some(Main), "wal_autocheckpoint", &8000)?;
Ok(conn) Ok(conn)
} }
@ -93,8 +93,9 @@ impl Engine {
} }
pub fn flush_wal(self: &Arc<Self>) -> Result<()> { pub fn flush_wal(self: &Arc<Self>) -> Result<()> {
self.write_lock() // We use autocheckpoints
.pragma_update(Some(Main), "wal_checkpoint", &"TRUNCATE")?; //self.write_lock()
//.pragma_update(Some(Main), "wal_checkpoint", &"TRUNCATE")?;
Ok(()) Ok(())
} }
} }
@ -248,6 +249,24 @@ impl Tree for SqliteTable {
Ok(()) Ok(())
} }
#[tracing::instrument(skip(self, iter))]
fn increment_batch<'a>(&self, iter: &mut dyn Iterator<Item = Vec<u8>>) -> Result<()> {
let guard = self.engine.write_lock();
guard.execute("BEGIN", [])?;
for key in iter {
let old = self.get_with_guard(&guard, &key)?;
let new = crate::utils::increment(old.as_deref())
.expect("utils::increment always returns Some");
self.insert_with_guard(&guard, &key, &new)?;
}
guard.execute("COMMIT", [])?;
drop(guard);
Ok(())
}
#[tracing::instrument(skip(self, key))] #[tracing::instrument(skip(self, key))]
fn remove(&self, key: &[u8]) -> Result<()> { fn remove(&self, key: &[u8]) -> Result<()> {
let guard = self.engine.write_lock(); let guard = self.engine.write_lock();

View file

@ -92,13 +92,17 @@ pub struct Rooms {
pub(super) pdu_cache: Mutex<LruCache<EventId, Arc<PduEvent>>>, pub(super) pdu_cache: Mutex<LruCache<EventId, Arc<PduEvent>>>,
pub(super) auth_chain_cache: Mutex<LruCache<u64, HashSet<u64>>>, pub(super) auth_chain_cache: Mutex<LruCache<u64, HashSet<u64>>>,
pub(super) shorteventid_cache: Mutex<LruCache<u64, EventId>>, pub(super) shorteventid_cache: Mutex<LruCache<u64, EventId>>,
pub(super) stateinfo_cache: Mutex<LruCache<u64, pub(super) stateinfo_cache: Mutex<
LruCache<
u64,
Vec<( Vec<(
u64, // sstatehash u64, // sstatehash
HashSet<CompressedStateEvent>, // full state HashSet<CompressedStateEvent>, // full state
HashSet<CompressedStateEvent>, // added HashSet<CompressedStateEvent>, // added
HashSet<CompressedStateEvent>, // removed HashSet<CompressedStateEvent>, // removed
)>>>, )>,
>,
>,
} }
impl Rooms { impl Rooms {
@ -414,7 +418,8 @@ impl Rooms {
HashSet<CompressedStateEvent>, // removed HashSet<CompressedStateEvent>, // removed
)>, )>,
> { > {
if let Some(r) = self.stateinfo_cache if let Some(r) = self
.stateinfo_cache
.lock() .lock()
.unwrap() .unwrap()
.get_mut(&shortstatehash) .get_mut(&shortstatehash)
@ -458,10 +463,6 @@ impl Rooms {
response.push((shortstatehash, state, added, removed)); response.push((shortstatehash, state, added, removed));
self.stateinfo_cache
.lock()
.unwrap()
.insert(shortstatehash, response.clone());
Ok(response) Ok(response)
} else { } else {
let mut response = Vec::new(); let mut response = Vec::new();
@ -1173,6 +1174,9 @@ impl Rooms {
let sync_pdu = pdu.to_sync_room_event(); let sync_pdu = pdu.to_sync_room_event();
let mut notifies = Vec::new();
let mut highlights = Vec::new();
for user in db for user in db
.rooms .rooms
.room_members(&pdu.room_id) .room_members(&pdu.room_id)
@ -1218,11 +1222,11 @@ impl Rooms {
userroom_id.extend_from_slice(pdu.room_id.as_bytes()); userroom_id.extend_from_slice(pdu.room_id.as_bytes());
if notify { if notify {
self.userroomid_notificationcount.increment(&userroom_id)?; notifies.push(userroom_id.clone());
} }
if highlight { if highlight {
self.userroomid_highlightcount.increment(&userroom_id)?; highlights.push(userroom_id);
} }
for senderkey in db.pusher.get_pusher_senderkeys(&user) { for senderkey in db.pusher.get_pusher_senderkeys(&user) {
@ -1230,6 +1234,11 @@ impl Rooms {
} }
} }
self.userroomid_notificationcount
.increment_batch(&mut notifies.into_iter())?;
self.userroomid_highlightcount
.increment_batch(&mut highlights.into_iter())?;
match pdu.kind { match pdu.kind {
EventType::RoomRedaction => { EventType::RoomRedaction => {
if let Some(redact_id) = &pdu.redacts { if let Some(redact_id) = &pdu.redacts {