optimize get w/ zero-copy ref handle
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
0522fe7d92
commit
ee64fb149c
11 changed files with 46 additions and 16 deletions
25
src/database/handle.rs
Normal file
25
src/database/handle.rs
Normal file
|
@ -0,0 +1,25 @@
|
||||||
|
use std::ops::Deref;
|
||||||
|
|
||||||
|
use rocksdb::DBPinnableSlice;
|
||||||
|
|
||||||
|
pub struct Handle<'a> {
|
||||||
|
val: DBPinnableSlice<'a>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> From<DBPinnableSlice<'a>> for Handle<'a> {
|
||||||
|
fn from(val: DBPinnableSlice<'a>) -> Self {
|
||||||
|
Self {
|
||||||
|
val,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Deref for Handle<'_> {
|
||||||
|
type Target = [u8];
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target { &self.val }
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsRef<[u8]> for Handle<'_> {
|
||||||
|
fn as_ref(&self) -> &[u8] { &self.val }
|
||||||
|
}
|
|
@ -5,7 +5,7 @@ use rocksdb::{
|
||||||
AsColumnFamilyRef, ColumnFamily, Direction, IteratorMode, ReadOptions, WriteBatchWithTransaction, WriteOptions,
|
AsColumnFamilyRef, ColumnFamily, Direction, IteratorMode, ReadOptions, WriteBatchWithTransaction, WriteOptions,
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::{or_else, result, watchers::Watchers, Engine, Iter};
|
use crate::{or_else, result, watchers::Watchers, Engine, Handle, Iter};
|
||||||
|
|
||||||
pub struct Map {
|
pub struct Map {
|
||||||
name: String,
|
name: String,
|
||||||
|
@ -32,11 +32,11 @@ impl Map {
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
|
pub fn get(&self, key: &[u8]) -> Result<Option<Handle<'_>>> {
|
||||||
let read_options = &self.read_options;
|
let read_options = &self.read_options;
|
||||||
let res = self.db.db.get_cf_opt(&self.cf(), key, read_options);
|
let res = self.db.db.get_pinned_cf_opt(&self.cf(), key, read_options);
|
||||||
|
|
||||||
result(res)
|
Ok(result(res)?.map(Handle::from))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn multi_get(&self, keys: &[&[u8]]) -> Result<Vec<Option<Vec<u8>>>> {
|
pub fn multi_get(&self, keys: &[&[u8]]) -> Result<Vec<Option<Vec<u8>>>> {
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
mod cork;
|
mod cork;
|
||||||
mod database;
|
mod database;
|
||||||
mod engine;
|
mod engine;
|
||||||
|
mod handle;
|
||||||
mod iter;
|
mod iter;
|
||||||
mod map;
|
mod map;
|
||||||
pub mod maps;
|
pub mod maps;
|
||||||
|
@ -13,6 +14,7 @@ extern crate rust_rocksdb as rocksdb;
|
||||||
|
|
||||||
pub use database::Database;
|
pub use database::Database;
|
||||||
pub(crate) use engine::Engine;
|
pub(crate) use engine::Engine;
|
||||||
|
pub use handle::Handle;
|
||||||
pub use iter::Iter;
|
pub use iter::Iter;
|
||||||
pub use map::Map;
|
pub use map::Map;
|
||||||
pub(crate) use util::{or_else, result};
|
pub(crate) use util::{or_else, result};
|
||||||
|
|
|
@ -226,7 +226,7 @@ lasttimelinecount_cache: {lasttimelinecount_cache} / {max_lasttimelinecount_cach
|
||||||
self.global.insert(b"keypair", &keypair)?;
|
self.global.insert(b"keypair", &keypair)?;
|
||||||
Ok::<_, Error>(keypair)
|
Ok::<_, Error>(keypair)
|
||||||
},
|
},
|
||||||
Ok,
|
|val| Ok(val.to_vec()),
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
let mut parts = keypair_bytes.splitn(2, |&b| b == 0xFF);
|
let mut parts = keypair_bytes.splitn(2, |&b| b == 0xFF);
|
||||||
|
|
|
@ -479,7 +479,7 @@ async fn db_lt_8(db: &Arc<Database>, _config: &Config) -> Result<()> {
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.expect("shortroomid should exist");
|
.expect("shortroomid should exist");
|
||||||
|
|
||||||
let mut new_key = short_room_id;
|
let mut new_key = short_room_id.to_vec();
|
||||||
new_key.extend_from_slice(count);
|
new_key.extend_from_slice(count);
|
||||||
|
|
||||||
Some((new_key, v))
|
Some((new_key, v))
|
||||||
|
@ -500,7 +500,7 @@ async fn db_lt_8(db: &Arc<Database>, _config: &Config) -> Result<()> {
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.expect("shortroomid should exist");
|
.expect("shortroomid should exist");
|
||||||
|
|
||||||
let mut new_value = short_room_id;
|
let mut new_value = short_room_id.to_vec();
|
||||||
new_value.extend_from_slice(count);
|
new_value.extend_from_slice(count);
|
||||||
|
|
||||||
Some((k, new_value))
|
Some((k, new_value))
|
||||||
|
@ -534,7 +534,7 @@ async fn db_lt_9(db: &Arc<Database>, _config: &Config) -> Result<()> {
|
||||||
.get(room_id)
|
.get(room_id)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.expect("shortroomid should exist");
|
.expect("shortroomid should exist");
|
||||||
let mut new_key = short_room_id;
|
let mut new_key = short_room_id.to_vec();
|
||||||
new_key.extend_from_slice(word);
|
new_key.extend_from_slice(word);
|
||||||
new_key.push(0xFF);
|
new_key.push(0xFF);
|
||||||
new_key.extend_from_slice(pdu_id_count);
|
new_key.extend_from_slice(pdu_id_count);
|
||||||
|
|
|
@ -39,7 +39,7 @@ impl Data {
|
||||||
|
|
||||||
pub(super) fn remove_alias(&self, alias: &RoomAliasId) -> Result<()> {
|
pub(super) fn remove_alias(&self, alias: &RoomAliasId) -> Result<()> {
|
||||||
if let Some(room_id) = self.alias_roomid.get(alias.alias().as_bytes())? {
|
if let Some(room_id) = self.alias_roomid.get(alias.alias().as_bytes())? {
|
||||||
let mut prefix = room_id;
|
let mut prefix = room_id.to_vec();
|
||||||
prefix.push(0xFF);
|
prefix.push(0xFF);
|
||||||
|
|
||||||
for (key, _) in self.aliasid_alias.scan_prefix(prefix) {
|
for (key, _) in self.aliasid_alias.scan_prefix(prefix) {
|
||||||
|
|
|
@ -83,7 +83,7 @@ impl Service {
|
||||||
) -> Result<Option<Vec<u8>>> {
|
) -> Result<Option<Vec<u8>>> {
|
||||||
// 1. Skip the PDU if we already have it as a timeline event
|
// 1. Skip the PDU if we already have it as a timeline event
|
||||||
if let Some(pdu_id) = services().rooms.timeline.get_pdu_id(event_id)? {
|
if let Some(pdu_id) = services().rooms.timeline.get_pdu_id(event_id)? {
|
||||||
return Ok(Some(pdu_id));
|
return Ok(Some(pdu_id.to_vec()));
|
||||||
}
|
}
|
||||||
|
|
||||||
// 1.1 Check the server is in the room
|
// 1.1 Check the server is in the room
|
||||||
|
@ -449,7 +449,7 @@ impl Service {
|
||||||
) -> Result<Option<Vec<u8>>> {
|
) -> Result<Option<Vec<u8>>> {
|
||||||
// Skip the PDU if we already have it as a timeline event
|
// Skip the PDU if we already have it as a timeline event
|
||||||
if let Ok(Some(pduid)) = services().rooms.timeline.get_pdu_id(&incoming_pdu.event_id) {
|
if let Ok(Some(pduid)) = services().rooms.timeline.get_pdu_id(&incoming_pdu.event_id) {
|
||||||
return Ok(Some(pduid));
|
return Ok(Some(pduid.to_vec()));
|
||||||
}
|
}
|
||||||
|
|
||||||
if services()
|
if services()
|
||||||
|
|
|
@ -93,7 +93,7 @@ impl Data {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the pdu's id.
|
/// Returns the pdu's id.
|
||||||
pub(super) fn get_pdu_id(&self, event_id: &EventId) -> Result<Option<Vec<u8>>> {
|
pub(super) fn get_pdu_id(&self, event_id: &EventId) -> Result<Option<database::Handle<'_>>> {
|
||||||
self.eventid_pduid.get(event_id.as_bytes())
|
self.eventid_pduid.get(event_id.as_bytes())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -159,7 +159,9 @@ impl Service {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the pdu's id.
|
/// Returns the pdu's id.
|
||||||
pub fn get_pdu_id(&self, event_id: &EventId) -> Result<Option<Vec<u8>>> { self.db.get_pdu_id(event_id) }
|
pub fn get_pdu_id(&self, event_id: &EventId) -> Result<Option<database::Handle<'_>>> {
|
||||||
|
self.db.get_pdu_id(event_id)
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns the pdu.
|
/// Returns the pdu.
|
||||||
///
|
///
|
||||||
|
@ -1185,7 +1187,8 @@ impl Service {
|
||||||
|
|
||||||
// Skip the PDU if we already have it as a timeline event
|
// Skip the PDU if we already have it as a timeline event
|
||||||
if let Some(pdu_id) = self.get_pdu_id(&event_id)? {
|
if let Some(pdu_id) = self.get_pdu_id(&event_id)? {
|
||||||
info!("We already know {event_id} at {pdu_id:?}");
|
let pdu_id = pdu_id.to_vec();
|
||||||
|
debug!("We already know {event_id} at {pdu_id:?}");
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -31,7 +31,7 @@ impl Data {
|
||||||
|
|
||||||
pub(super) fn existing_txnid(
|
pub(super) fn existing_txnid(
|
||||||
&self, user_id: &UserId, device_id: Option<&DeviceId>, txn_id: &TransactionId,
|
&self, user_id: &UserId, device_id: Option<&DeviceId>, txn_id: &TransactionId,
|
||||||
) -> Result<Option<Vec<u8>>> {
|
) -> Result<Option<database::Handle<'_>>> {
|
||||||
let mut key = user_id.as_bytes().to_vec();
|
let mut key = user_id.as_bytes().to_vec();
|
||||||
key.push(0xFF);
|
key.push(0xFF);
|
||||||
key.extend_from_slice(device_id.map(DeviceId::as_bytes).unwrap_or_default());
|
key.extend_from_slice(device_id.map(DeviceId::as_bytes).unwrap_or_default());
|
||||||
|
|
|
@ -26,7 +26,7 @@ impl Service {
|
||||||
|
|
||||||
pub fn existing_txnid(
|
pub fn existing_txnid(
|
||||||
&self, user_id: &UserId, device_id: Option<&DeviceId>, txn_id: &TransactionId,
|
&self, user_id: &UserId, device_id: Option<&DeviceId>, txn_id: &TransactionId,
|
||||||
) -> Result<Option<Vec<u8>>> {
|
) -> Result<Option<database::Handle<'_>>> {
|
||||||
self.db.existing_txnid(user_id, device_id, txn_id)
|
self.db.existing_txnid(user_id, device_id, txn_id)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue