some optimizations to get_auth_chain()
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
678d87ced1
commit
dba0575e75
4 changed files with 71 additions and 46 deletions
|
@ -734,7 +734,7 @@ pub async fn get_event_authorization_route(
|
||||||
let auth_chain_ids = services()
|
let auth_chain_ids = services()
|
||||||
.rooms
|
.rooms
|
||||||
.auth_chain
|
.auth_chain
|
||||||
.get_auth_chain(room_id, vec![Arc::from(&*body.event_id)])
|
.event_ids_iter(room_id, vec![Arc::from(&*body.event_id)])
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
Ok(get_event_authorization::v1::Response {
|
Ok(get_event_authorization::v1::Response {
|
||||||
|
@ -794,7 +794,7 @@ pub async fn get_room_state_route(body: Ruma<get_room_state::v1::Request>) -> Re
|
||||||
let auth_chain_ids = services()
|
let auth_chain_ids = services()
|
||||||
.rooms
|
.rooms
|
||||||
.auth_chain
|
.auth_chain
|
||||||
.get_auth_chain(&body.room_id, vec![Arc::from(&*body.event_id)])
|
.event_ids_iter(&body.room_id, vec![Arc::from(&*body.event_id)])
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
Ok(get_room_state::v1::Response {
|
Ok(get_room_state::v1::Response {
|
||||||
|
@ -854,7 +854,7 @@ pub async fn get_room_state_ids_route(
|
||||||
let auth_chain_ids = services()
|
let auth_chain_ids = services()
|
||||||
.rooms
|
.rooms
|
||||||
.auth_chain
|
.auth_chain
|
||||||
.get_auth_chain(&body.room_id, vec![Arc::from(&*body.event_id)])
|
.event_ids_iter(&body.room_id, vec![Arc::from(&*body.event_id)])
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
Ok(get_room_state_ids::v1::Response {
|
Ok(get_room_state_ids::v1::Response {
|
||||||
|
@ -1142,7 +1142,7 @@ async fn create_join_event(
|
||||||
let auth_chain_ids = services()
|
let auth_chain_ids = services()
|
||||||
.rooms
|
.rooms
|
||||||
.auth_chain
|
.auth_chain
|
||||||
.get_auth_chain(room_id, state_ids.values().cloned().collect())
|
.event_ids_iter(room_id, state_ids.values().cloned().collect())
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
services().sending.send_pdu_room(room_id, &pdu_id)?;
|
services().sending.send_pdu_room(room_id, &pdu_id)?;
|
||||||
|
|
|
@ -104,7 +104,7 @@ pub(crate) async fn process(command: DebugCommand, body: Vec<&str>) -> Result<Ro
|
||||||
let count = services()
|
let count = services()
|
||||||
.rooms
|
.rooms
|
||||||
.auth_chain
|
.auth_chain
|
||||||
.get_auth_chain(room_id, vec![event_id])
|
.event_ids_iter(room_id, vec![event_id])
|
||||||
.await?
|
.await?
|
||||||
.count();
|
.count();
|
||||||
let elapsed = start.elapsed();
|
let elapsed = start.elapsed();
|
||||||
|
|
|
@ -15,28 +15,47 @@ pub struct Service {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Service {
|
impl Service {
|
||||||
pub async fn get_auth_chain<'a>(
|
pub async fn event_ids_iter<'a>(
|
||||||
&self, room_id: &RoomId, starting_events: Vec<Arc<EventId>>,
|
&self, room_id: &RoomId, starting_events_: Vec<Arc<EventId>>,
|
||||||
) -> Result<impl Iterator<Item = Arc<EventId>> + 'a> {
|
) -> Result<impl Iterator<Item = Arc<EventId>> + 'a> {
|
||||||
const NUM_BUCKETS: usize = 50;
|
let mut starting_events: Vec<&EventId> = Vec::with_capacity(starting_events_.len());
|
||||||
|
for starting_event in &starting_events_ {
|
||||||
let mut buckets = vec![BTreeSet::new(); NUM_BUCKETS];
|
starting_events.push(starting_event);
|
||||||
|
|
||||||
let mut i = 0;
|
|
||||||
for id in starting_events {
|
|
||||||
let short = services().rooms.short.get_or_create_shorteventid(&id)?;
|
|
||||||
let bucket_id = (short % NUM_BUCKETS as u64) as usize;
|
|
||||||
buckets[bucket_id].insert((short, id.clone()));
|
|
||||||
i += 1;
|
|
||||||
if i % 100 == 0 {
|
|
||||||
tokio::task::yield_now().await;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut full_auth_chain = HashSet::new();
|
Ok(self
|
||||||
|
.get_auth_chain(room_id, &starting_events)
|
||||||
|
.await?
|
||||||
|
.into_iter()
|
||||||
|
.filter_map(move |sid| services().rooms.short.get_eventid_from_short(sid).ok()))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_auth_chain(&self, room_id: &RoomId, starting_events: &[&EventId]) -> Result<Vec<u64>> {
|
||||||
|
const NUM_BUCKETS: usize = 50; //TODO: change possible w/o disrupting db?
|
||||||
|
const BUCKET: BTreeSet<(u64, &EventId)> = BTreeSet::new();
|
||||||
|
|
||||||
|
let started = std::time::Instant::now();
|
||||||
|
let mut buckets = [BUCKET; NUM_BUCKETS];
|
||||||
|
for (i, short) in services()
|
||||||
|
.rooms
|
||||||
|
.short
|
||||||
|
.multi_get_or_create_shorteventid(starting_events)?
|
||||||
|
.iter()
|
||||||
|
.enumerate()
|
||||||
|
{
|
||||||
|
let bucket = short % NUM_BUCKETS as u64;
|
||||||
|
buckets[bucket as usize].insert((*short, starting_events[i]));
|
||||||
|
}
|
||||||
|
|
||||||
|
debug!(
|
||||||
|
starting_events = ?starting_events.len(),
|
||||||
|
elapsed = ?started.elapsed(),
|
||||||
|
"start",
|
||||||
|
);
|
||||||
|
|
||||||
let mut hits = 0;
|
let mut hits = 0;
|
||||||
let mut misses = 0;
|
let mut misses = 0;
|
||||||
|
let mut full_auth_chain = Vec::new();
|
||||||
for chunk in buckets {
|
for chunk in buckets {
|
||||||
if chunk.is_empty() {
|
if chunk.is_empty() {
|
||||||
continue;
|
continue;
|
||||||
|
@ -48,68 +67,68 @@ impl Service {
|
||||||
.auth_chain
|
.auth_chain
|
||||||
.get_cached_eventid_authchain(&chunk_key)?
|
.get_cached_eventid_authchain(&chunk_key)?
|
||||||
{
|
{
|
||||||
hits += 1;
|
|
||||||
full_auth_chain.extend(cached.iter().copied());
|
full_auth_chain.extend(cached.iter().copied());
|
||||||
|
hits += 1;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
misses += 1;
|
|
||||||
|
|
||||||
let mut chunk_cache = HashSet::new();
|
|
||||||
let mut hits2 = 0;
|
let mut hits2 = 0;
|
||||||
let mut misses2 = 0;
|
let mut misses2 = 0;
|
||||||
let mut i = 0;
|
let mut chunk_cache = Vec::new();
|
||||||
for (sevent_id, event_id) in chunk {
|
for (sevent_id, event_id) in chunk {
|
||||||
if let Some(cached) = services()
|
if let Some(cached) = services()
|
||||||
.rooms
|
.rooms
|
||||||
.auth_chain
|
.auth_chain
|
||||||
.get_cached_eventid_authchain(&[sevent_id])?
|
.get_cached_eventid_authchain(&[sevent_id])?
|
||||||
{
|
{
|
||||||
hits2 += 1;
|
|
||||||
chunk_cache.extend(cached.iter().copied());
|
chunk_cache.extend(cached.iter().copied());
|
||||||
|
hits2 += 1;
|
||||||
} else {
|
} else {
|
||||||
misses2 += 1;
|
let auth_chain = self.get_auth_chain_inner(room_id, event_id)?;
|
||||||
let auth_chain = Arc::new(self.get_auth_chain_inner(room_id, &event_id)?);
|
|
||||||
services()
|
services()
|
||||||
.rooms
|
.rooms
|
||||||
.auth_chain
|
.auth_chain
|
||||||
.cache_auth_chain(vec![sevent_id], &auth_chain)?;
|
.cache_auth_chain(vec![sevent_id], &auth_chain)?;
|
||||||
|
chunk_cache.extend(auth_chain.iter());
|
||||||
|
misses2 += 1;
|
||||||
debug!(
|
debug!(
|
||||||
event_id = ?event_id,
|
event_id = ?event_id,
|
||||||
chain_length = ?auth_chain.len(),
|
chain_length = ?auth_chain.len(),
|
||||||
|
chunk_cache_length = ?chunk_cache.len(),
|
||||||
|
elapsed = ?started.elapsed(),
|
||||||
"Cache missed event"
|
"Cache missed event"
|
||||||
);
|
);
|
||||||
chunk_cache.extend(auth_chain.iter());
|
|
||||||
|
|
||||||
i += 1;
|
|
||||||
if i % 100 == 0 {
|
|
||||||
tokio::task::yield_now().await;
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
chunk_cache.sort_unstable();
|
||||||
|
chunk_cache.dedup();
|
||||||
|
services()
|
||||||
|
.rooms
|
||||||
|
.auth_chain
|
||||||
|
.cache_auth_chain_vec(chunk_key, &chunk_cache)?;
|
||||||
|
full_auth_chain.extend(chunk_cache.iter());
|
||||||
|
misses += 1;
|
||||||
debug!(
|
debug!(
|
||||||
chunk_cache_length = ?chunk_cache.len(),
|
chunk_cache_length = ?chunk_cache.len(),
|
||||||
hits = ?hits2,
|
hits = ?hits2,
|
||||||
misses = ?misses2,
|
misses = ?misses2,
|
||||||
|
elapsed = ?started.elapsed(),
|
||||||
"Chunk missed",
|
"Chunk missed",
|
||||||
);
|
);
|
||||||
let chunk_cache = Arc::new(chunk_cache);
|
|
||||||
services()
|
|
||||||
.rooms
|
|
||||||
.auth_chain
|
|
||||||
.cache_auth_chain(chunk_key, &chunk_cache)?;
|
|
||||||
full_auth_chain.extend(chunk_cache.iter());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
full_auth_chain.sort();
|
||||||
|
full_auth_chain.dedup();
|
||||||
debug!(
|
debug!(
|
||||||
chain_length = ?full_auth_chain.len(),
|
chain_length = ?full_auth_chain.len(),
|
||||||
hits = ?hits,
|
hits = ?hits,
|
||||||
misses = ?misses,
|
misses = ?misses,
|
||||||
"Auth chain stats",
|
elapsed = ?started.elapsed(),
|
||||||
|
"done",
|
||||||
);
|
);
|
||||||
|
|
||||||
Ok(full_auth_chain
|
Ok(full_auth_chain)
|
||||||
.into_iter()
|
|
||||||
.filter_map(move |sid| services().rooms.short.get_eventid_from_short(sid).ok()))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(self, event_id))]
|
#[tracing::instrument(skip(self, event_id))]
|
||||||
|
@ -155,4 +174,10 @@ impl Service {
|
||||||
self.db
|
self.db
|
||||||
.cache_auth_chain(key, auth_chain.iter().copied().collect::<Arc<[u64]>>())
|
.cache_auth_chain(key, auth_chain.iter().copied().collect::<Arc<[u64]>>())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(skip(self))]
|
||||||
|
pub fn cache_auth_chain_vec(&self, key: Vec<u64>, auth_chain: &Vec<u64>) -> Result<()> {
|
||||||
|
self.db
|
||||||
|
.cache_auth_chain(key, auth_chain.iter().copied().collect::<Arc<[u64]>>())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -578,7 +578,7 @@ impl Service {
|
||||||
services()
|
services()
|
||||||
.rooms
|
.rooms
|
||||||
.auth_chain
|
.auth_chain
|
||||||
.get_auth_chain(room_id, starting_events)
|
.event_ids_iter(room_id, starting_events)
|
||||||
.await?
|
.await?
|
||||||
.collect(),
|
.collect(),
|
||||||
);
|
);
|
||||||
|
@ -909,7 +909,7 @@ impl Service {
|
||||||
services()
|
services()
|
||||||
.rooms
|
.rooms
|
||||||
.auth_chain
|
.auth_chain
|
||||||
.get_auth_chain(room_id, state.iter().map(|(_, id)| id.clone()).collect())
|
.event_ids_iter(room_id, state.iter().map(|(_, id)| id.clone()).collect())
|
||||||
.await?
|
.await?
|
||||||
.collect(),
|
.collect(),
|
||||||
);
|
);
|
||||||
|
|
Loading…
Add table
Reference in a new issue