Compare commits
5 commits
Author | SHA1 | Date | |
---|---|---|---|
|
3e79b15bbf | ||
|
b0a2877f20 | ||
|
e46f2199c9 | ||
|
03d78b25f2 | ||
|
67749364aa |
4 changed files with 194 additions and 221 deletions
|
@ -270,8 +270,9 @@ impl Database {
|
|||
|
||||
eventid_outlierpdu: builder.open_tree("eventid_outlierpdu")?,
|
||||
referencedevents: builder.open_tree("referencedevents")?,
|
||||
pdu_cache: Mutex::new(LruCache::new(1_000_000)),
|
||||
auth_chain_cache: Mutex::new(LruCache::new(1_000_000)),
|
||||
pdu_cache: Mutex::new(LruCache::new(100_000)),
|
||||
auth_chain_cache: Mutex::new(LruCache::new(100_000)),
|
||||
shorteventid_cache: Mutex::new(LruCache::new(1_000_000)),
|
||||
},
|
||||
account_data: account_data::AccountData {
|
||||
roomuserdataid_accountdata: builder.open_tree("roomuserdataid_accountdata")?,
|
||||
|
|
|
@ -12,9 +12,7 @@ use std::{
|
|||
time::{Duration, Instant},
|
||||
};
|
||||
use tokio::sync::oneshot::Sender;
|
||||
use tracing::{debug, warn};
|
||||
|
||||
pub const MILLI: Duration = Duration::from_millis(1);
|
||||
use tracing::debug;
|
||||
|
||||
thread_local! {
|
||||
static READ_CONNECTION: RefCell<Option<&'static Connection>> = RefCell::new(None);
|
||||
|
@ -164,16 +162,7 @@ impl Tree for SqliteTable {
|
|||
#[tracing::instrument(skip(self, key, value))]
|
||||
fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> {
|
||||
let guard = self.engine.write_lock();
|
||||
|
||||
let start = Instant::now();
|
||||
|
||||
self.insert_with_guard(&guard, key, value)?;
|
||||
|
||||
let elapsed = start.elapsed();
|
||||
if elapsed > MILLI {
|
||||
warn!("insert took {:?} : {}", elapsed, &self.name);
|
||||
}
|
||||
|
||||
drop(guard);
|
||||
|
||||
let watchers = self.watchers.read();
|
||||
|
@ -220,20 +209,11 @@ impl Tree for SqliteTable {
|
|||
fn remove(&self, key: &[u8]) -> Result<()> {
|
||||
let guard = self.engine.write_lock();
|
||||
|
||||
let start = Instant::now();
|
||||
|
||||
guard.execute(
|
||||
format!("DELETE FROM {} WHERE key = ?", self.name).as_str(),
|
||||
[key],
|
||||
)?;
|
||||
|
||||
let elapsed = start.elapsed();
|
||||
|
||||
if elapsed > MILLI {
|
||||
debug!("remove: took {:012?} : {}", elapsed, &self.name);
|
||||
}
|
||||
// debug!("remove key: {:?}", &key);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -326,8 +306,6 @@ impl Tree for SqliteTable {
|
|||
fn increment(&self, key: &[u8]) -> Result<Vec<u8>> {
|
||||
let guard = self.engine.write_lock();
|
||||
|
||||
let start = Instant::now();
|
||||
|
||||
let old = self.get_with_guard(&guard, key)?;
|
||||
|
||||
let new =
|
||||
|
@ -335,26 +313,11 @@ impl Tree for SqliteTable {
|
|||
|
||||
self.insert_with_guard(&guard, key, &new)?;
|
||||
|
||||
let elapsed = start.elapsed();
|
||||
|
||||
if elapsed > MILLI {
|
||||
debug!("increment: took {:012?} : {}", elapsed, &self.name);
|
||||
}
|
||||
// debug!("increment key: {:?}", &key);
|
||||
|
||||
Ok(new)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self, prefix))]
|
||||
fn scan_prefix<'a>(&'a self, prefix: Vec<u8>) -> Box<dyn Iterator<Item = TupleOfBytes> + 'a> {
|
||||
// let name = self.name.clone();
|
||||
// self.iter_from_thread(
|
||||
// format!(
|
||||
// "SELECT key, value FROM {} WHERE key BETWEEN ?1 AND ?1 || X'FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF' ORDER BY key ASC",
|
||||
// name
|
||||
// )
|
||||
// [prefix]
|
||||
// )
|
||||
Box::new(
|
||||
self.iter_from(&prefix, false)
|
||||
.take_while(move |(key, _)| key.starts_with(&prefix)),
|
||||
|
|
|
@ -88,7 +88,8 @@ pub struct Rooms {
|
|||
pub(super) referencedevents: Arc<dyn Tree>,
|
||||
|
||||
pub(super) pdu_cache: Mutex<LruCache<EventId, Arc<PduEvent>>>,
|
||||
pub(super) auth_chain_cache: Mutex<LruCache<Vec<EventId>, HashSet<EventId>>>,
|
||||
pub(super) auth_chain_cache: Mutex<LruCache<u64, HashSet<u64>>>,
|
||||
pub(super) shorteventid_cache: Mutex<LruCache<u64, EventId>>,
|
||||
}
|
||||
|
||||
impl Rooms {
|
||||
|
@ -98,15 +99,11 @@ impl Rooms {
|
|||
Ok(self
|
||||
.stateid_shorteventid
|
||||
.scan_prefix(shortstatehash.to_be_bytes().to_vec())
|
||||
.map(|(_, bytes)| self.shorteventid_eventid.get(&bytes).ok().flatten())
|
||||
.flatten()
|
||||
.map(|bytes| {
|
||||
EventId::try_from(utils::string_from_bytes(&bytes).map_err(|_| {
|
||||
Error::bad_database("EventID in stateid_shorteventid is invalid unicode.")
|
||||
})?)
|
||||
.map_err(|_| Error::bad_database("EventId in stateid_shorteventid is invalid."))
|
||||
.map(|(_, bytes)| {
|
||||
self.get_eventid_from_short(utils::u64_from_bytes(&bytes).unwrap())
|
||||
.ok()
|
||||
})
|
||||
.filter_map(|r| r.ok())
|
||||
.flatten()
|
||||
.collect())
|
||||
}
|
||||
|
||||
|
@ -117,15 +114,11 @@ impl Rooms {
|
|||
let state = self
|
||||
.stateid_shorteventid
|
||||
.scan_prefix(shortstatehash.to_be_bytes().to_vec())
|
||||
.map(|(_, bytes)| self.shorteventid_eventid.get(&bytes).ok().flatten())
|
||||
.flatten()
|
||||
.map(|bytes| {
|
||||
EventId::try_from(utils::string_from_bytes(&bytes).map_err(|_| {
|
||||
Error::bad_database("EventID in stateid_shorteventid is invalid unicode.")
|
||||
})?)
|
||||
.map_err(|_| Error::bad_database("EventId in stateid_shorteventid is invalid."))
|
||||
.map(|(_, bytes)| {
|
||||
self.get_eventid_from_short(utils::u64_from_bytes(&bytes).unwrap())
|
||||
.ok()
|
||||
})
|
||||
.filter_map(|r| r.ok())
|
||||
.flatten()
|
||||
.map(|eventid| self.get_pdu(&eventid))
|
||||
.filter_map(|r| r.ok().flatten())
|
||||
.map(|pdu| {
|
||||
|
@ -167,15 +160,10 @@ impl Rooms {
|
|||
Ok(self
|
||||
.stateid_shorteventid
|
||||
.get(&stateid)?
|
||||
.map(|bytes| self.shorteventid_eventid.get(&bytes).ok().flatten())
|
||||
.flatten()
|
||||
.map(|bytes| {
|
||||
EventId::try_from(utils::string_from_bytes(&bytes).map_err(|_| {
|
||||
Error::bad_database("EventID in stateid_shorteventid is invalid unicode.")
|
||||
})?)
|
||||
.map_err(|_| Error::bad_database("EventId in stateid_shorteventid is invalid."))
|
||||
self.get_eventid_from_short(utils::u64_from_bytes(&bytes).unwrap())
|
||||
.ok()
|
||||
})
|
||||
.map(|r| r.ok())
|
||||
.flatten())
|
||||
} else {
|
||||
Ok(None)
|
||||
|
@ -315,19 +303,7 @@ impl Rooms {
|
|||
);
|
||||
|
||||
let (shortstatehash, already_existed) =
|
||||
match self.statehash_shortstatehash.get(&state_hash)? {
|
||||
Some(shortstatehash) => (
|
||||
utils::u64_from_bytes(&shortstatehash)
|
||||
.map_err(|_| Error::bad_database("Invalid shortstatehash in db."))?,
|
||||
true,
|
||||
),
|
||||
None => {
|
||||
let shortstatehash = db.globals.next_count()?;
|
||||
self.statehash_shortstatehash
|
||||
.insert(&state_hash, &shortstatehash.to_be_bytes())?;
|
||||
(shortstatehash, false)
|
||||
}
|
||||
};
|
||||
self.get_or_create_shortstatehash(&state_hash, &db.globals)?;
|
||||
|
||||
let new_state = if !already_existed {
|
||||
let mut new_state = HashSet::new();
|
||||
|
@ -352,25 +328,14 @@ impl Rooms {
|
|||
}
|
||||
};
|
||||
|
||||
let shorteventid =
|
||||
match self.eventid_shorteventid.get(eventid.as_bytes()).ok()? {
|
||||
Some(shorteventid) => shorteventid.to_vec(),
|
||||
None => {
|
||||
let shorteventid = db.globals.next_count().ok()?;
|
||||
self.eventid_shorteventid
|
||||
.insert(eventid.as_bytes(), &shorteventid.to_be_bytes())
|
||||
.ok()?;
|
||||
self.shorteventid_eventid
|
||||
.insert(&shorteventid.to_be_bytes(), eventid.as_bytes())
|
||||
.ok()?;
|
||||
shorteventid.to_be_bytes().to_vec()
|
||||
}
|
||||
};
|
||||
let shorteventid = self
|
||||
.get_or_create_shorteventid(&eventid, &db.globals)
|
||||
.ok()?;
|
||||
|
||||
let mut state_id = shortstatehash.to_be_bytes().to_vec();
|
||||
state_id.extend_from_slice(&shortstatekey);
|
||||
|
||||
Some((state_id, shorteventid))
|
||||
Some((state_id, shorteventid.to_be_bytes().to_vec()))
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
|
@ -428,6 +393,76 @@ impl Rooms {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns (shortstatehash, already_existed)
|
||||
fn get_or_create_shortstatehash(
|
||||
&self,
|
||||
state_hash: &StateHashId,
|
||||
globals: &super::globals::Globals,
|
||||
) -> Result<(u64, bool)> {
|
||||
Ok(match self.statehash_shortstatehash.get(&state_hash)? {
|
||||
Some(shortstatehash) => (
|
||||
utils::u64_from_bytes(&shortstatehash)
|
||||
.map_err(|_| Error::bad_database("Invalid shortstatehash in db."))?,
|
||||
true,
|
||||
),
|
||||
None => {
|
||||
let shortstatehash = globals.next_count()?;
|
||||
self.statehash_shortstatehash
|
||||
.insert(&state_hash, &shortstatehash.to_be_bytes())?;
|
||||
(shortstatehash, false)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns (shortstatehash, already_existed)
|
||||
pub fn get_or_create_shorteventid(
|
||||
&self,
|
||||
event_id: &EventId,
|
||||
globals: &super::globals::Globals,
|
||||
) -> Result<u64> {
|
||||
Ok(match self.eventid_shorteventid.get(event_id.as_bytes())? {
|
||||
Some(shorteventid) => utils::u64_from_bytes(&shorteventid)
|
||||
.map_err(|_| Error::bad_database("Invalid shorteventid in db."))?,
|
||||
None => {
|
||||
let shorteventid = globals.next_count()?;
|
||||
self.eventid_shorteventid
|
||||
.insert(event_id.as_bytes(), &shorteventid.to_be_bytes())?;
|
||||
self.shorteventid_eventid
|
||||
.insert(&shorteventid.to_be_bytes(), event_id.as_bytes())?;
|
||||
shorteventid
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub fn get_eventid_from_short(&self, shorteventid: u64) -> Result<EventId> {
|
||||
if let Some(id) = self
|
||||
.shorteventid_cache
|
||||
.lock()
|
||||
.unwrap()
|
||||
.get_mut(&shorteventid)
|
||||
{
|
||||
return Ok(id.clone());
|
||||
}
|
||||
|
||||
let bytes = self
|
||||
.shorteventid_eventid
|
||||
.get(&shorteventid.to_be_bytes())?
|
||||
.ok_or_else(|| Error::bad_database("Shorteventid does not exist"))?;
|
||||
|
||||
let event_id =
|
||||
EventId::try_from(utils::string_from_bytes(&bytes).map_err(|_| {
|
||||
Error::bad_database("EventID in roomid_pduleaves is invalid unicode.")
|
||||
})?)
|
||||
.map_err(|_| Error::bad_database("EventId in roomid_pduleaves is invalid."))?;
|
||||
|
||||
self.shorteventid_cache
|
||||
.lock()
|
||||
.unwrap()
|
||||
.insert(shorteventid, event_id.clone());
|
||||
|
||||
Ok(event_id)
|
||||
}
|
||||
|
||||
/// Returns the full room state.
|
||||
#[tracing::instrument(skip(self))]
|
||||
pub fn room_state_full(
|
||||
|
@ -1116,17 +1151,7 @@ impl Rooms {
|
|||
state: &StateMap<Arc<PduEvent>>,
|
||||
globals: &super::globals::Globals,
|
||||
) -> Result<()> {
|
||||
let shorteventid = match self.eventid_shorteventid.get(event_id.as_bytes())? {
|
||||
Some(shorteventid) => shorteventid.to_vec(),
|
||||
None => {
|
||||
let shorteventid = globals.next_count()?;
|
||||
self.eventid_shorteventid
|
||||
.insert(event_id.as_bytes(), &shorteventid.to_be_bytes())?;
|
||||
self.shorteventid_eventid
|
||||
.insert(&shorteventid.to_be_bytes(), event_id.as_bytes())?;
|
||||
shorteventid.to_be_bytes().to_vec()
|
||||
}
|
||||
};
|
||||
let shorteventid = self.get_or_create_shorteventid(&event_id, globals)?;
|
||||
|
||||
let state_hash = self.calculate_hash(
|
||||
&state
|
||||
|
@ -1135,69 +1160,45 @@ impl Rooms {
|
|||
.collect::<Vec<_>>(),
|
||||
);
|
||||
|
||||
let shortstatehash = match self.statehash_shortstatehash.get(&state_hash)? {
|
||||
Some(shortstatehash) => {
|
||||
// State already existed in db
|
||||
self.shorteventid_shortstatehash
|
||||
.insert(&shorteventid, &*shortstatehash)?;
|
||||
return Ok(());
|
||||
}
|
||||
None => {
|
||||
let shortstatehash = globals.next_count()?;
|
||||
self.statehash_shortstatehash
|
||||
.insert(&state_hash, &shortstatehash.to_be_bytes())?;
|
||||
shortstatehash.to_be_bytes().to_vec()
|
||||
}
|
||||
};
|
||||
let (shortstatehash, already_existed) =
|
||||
self.get_or_create_shortstatehash(&state_hash, globals)?;
|
||||
|
||||
let batch = state
|
||||
.iter()
|
||||
.filter_map(|((event_type, state_key), pdu)| {
|
||||
let mut statekey = event_type.as_ref().as_bytes().to_vec();
|
||||
statekey.push(0xff);
|
||||
statekey.extend_from_slice(&state_key.as_bytes());
|
||||
if !already_existed {
|
||||
let batch = state
|
||||
.iter()
|
||||
.filter_map(|((event_type, state_key), pdu)| {
|
||||
let mut statekey = event_type.as_ref().as_bytes().to_vec();
|
||||
statekey.push(0xff);
|
||||
statekey.extend_from_slice(&state_key.as_bytes());
|
||||
|
||||
let shortstatekey = match self.statekey_shortstatekey.get(&statekey).ok()? {
|
||||
Some(shortstatekey) => shortstatekey.to_vec(),
|
||||
None => {
|
||||
let shortstatekey = globals.next_count().ok()?;
|
||||
self.statekey_shortstatekey
|
||||
.insert(&statekey, &shortstatekey.to_be_bytes())
|
||||
.ok()?;
|
||||
shortstatekey.to_be_bytes().to_vec()
|
||||
}
|
||||
};
|
||||
let shortstatekey = match self.statekey_shortstatekey.get(&statekey).ok()? {
|
||||
Some(shortstatekey) => shortstatekey.to_vec(),
|
||||
None => {
|
||||
let shortstatekey = globals.next_count().ok()?;
|
||||
self.statekey_shortstatekey
|
||||
.insert(&statekey, &shortstatekey.to_be_bytes())
|
||||
.ok()?;
|
||||
shortstatekey.to_be_bytes().to_vec()
|
||||
}
|
||||
};
|
||||
|
||||
let shorteventid = match self
|
||||
.eventid_shorteventid
|
||||
.get(pdu.event_id.as_bytes())
|
||||
.ok()?
|
||||
{
|
||||
Some(shorteventid) => shorteventid.to_vec(),
|
||||
None => {
|
||||
let shorteventid = globals.next_count().ok()?;
|
||||
self.eventid_shorteventid
|
||||
.insert(pdu.event_id.as_bytes(), &shorteventid.to_be_bytes())
|
||||
.ok()?;
|
||||
self.shorteventid_eventid
|
||||
.insert(&shorteventid.to_be_bytes(), pdu.event_id.as_bytes())
|
||||
.ok()?;
|
||||
shorteventid.to_be_bytes().to_vec()
|
||||
}
|
||||
};
|
||||
let shorteventid = self
|
||||
.get_or_create_shorteventid(&pdu.event_id, globals)
|
||||
.ok()?;
|
||||
|
||||
let mut state_id = shortstatehash.clone();
|
||||
state_id.extend_from_slice(&shortstatekey);
|
||||
let mut state_id = shortstatehash.to_be_bytes().to_vec();
|
||||
state_id.extend_from_slice(&shortstatekey);
|
||||
|
||||
Some((state_id, shorteventid))
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
Some((state_id, shorteventid.to_be_bytes().to_vec()))
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
self.stateid_shorteventid
|
||||
.insert_batch(&mut batch.into_iter())?;
|
||||
self.stateid_shorteventid
|
||||
.insert_batch(&mut batch.into_iter())?;
|
||||
}
|
||||
|
||||
self.shorteventid_shortstatehash
|
||||
.insert(&shorteventid, &*shortstatehash)?;
|
||||
.insert(&shorteventid.to_be_bytes(), &shortstatehash.to_be_bytes())?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -1212,26 +1213,16 @@ impl Rooms {
|
|||
new_pdu: &PduEvent,
|
||||
globals: &super::globals::Globals,
|
||||
) -> Result<u64> {
|
||||
let shorteventid = self.get_or_create_shorteventid(&new_pdu.event_id, globals)?;
|
||||
|
||||
let old_state = if let Some(old_shortstatehash) =
|
||||
self.roomid_shortstatehash.get(new_pdu.room_id.as_bytes())?
|
||||
{
|
||||
// Store state for event. The state does not include the event itself.
|
||||
// Instead it's the state before the pdu, so the room's old state.
|
||||
|
||||
let shorteventid = match self.eventid_shorteventid.get(new_pdu.event_id.as_bytes())? {
|
||||
Some(shorteventid) => shorteventid.to_vec(),
|
||||
None => {
|
||||
let shorteventid = globals.next_count()?;
|
||||
self.eventid_shorteventid
|
||||
.insert(new_pdu.event_id.as_bytes(), &shorteventid.to_be_bytes())?;
|
||||
self.shorteventid_eventid
|
||||
.insert(&shorteventid.to_be_bytes(), new_pdu.event_id.as_bytes())?;
|
||||
shorteventid.to_be_bytes().to_vec()
|
||||
}
|
||||
};
|
||||
|
||||
self.shorteventid_shortstatehash
|
||||
.insert(&shorteventid, &old_shortstatehash)?;
|
||||
.insert(&shorteventid.to_be_bytes(), &old_shortstatehash)?;
|
||||
|
||||
if new_pdu.state_key.is_none() {
|
||||
return utils::u64_from_bytes(&old_shortstatehash).map_err(|_| {
|
||||
Error::bad_database("Invalid shortstatehash in roomid_shortstatehash.")
|
||||
|
@ -1264,19 +1255,7 @@ impl Rooms {
|
|||
}
|
||||
};
|
||||
|
||||
let shorteventid = match self.eventid_shorteventid.get(new_pdu.event_id.as_bytes())? {
|
||||
Some(shorteventid) => shorteventid.to_vec(),
|
||||
None => {
|
||||
let shorteventid = globals.next_count()?;
|
||||
self.eventid_shorteventid
|
||||
.insert(new_pdu.event_id.as_bytes(), &shorteventid.to_be_bytes())?;
|
||||
self.shorteventid_eventid
|
||||
.insert(&shorteventid.to_be_bytes(), new_pdu.event_id.as_bytes())?;
|
||||
shorteventid.to_be_bytes().to_vec()
|
||||
}
|
||||
};
|
||||
|
||||
new_state.insert(shortstatekey, shorteventid);
|
||||
new_state.insert(shortstatekey, shorteventid.to_be_bytes().to_vec());
|
||||
|
||||
let new_state_hash = self.calculate_hash(
|
||||
&new_state
|
||||
|
@ -1516,11 +1495,7 @@ impl Rooms {
|
|||
);
|
||||
|
||||
// Generate short event id
|
||||
let shorteventid = db.globals.next_count()?;
|
||||
self.eventid_shorteventid
|
||||
.insert(pdu.event_id.as_bytes(), &shorteventid.to_be_bytes())?;
|
||||
self.shorteventid_eventid
|
||||
.insert(&shorteventid.to_be_bytes(), pdu.event_id.as_bytes())?;
|
||||
let _shorteventid = self.get_or_create_shorteventid(&pdu.event_id, &db.globals)?;
|
||||
|
||||
// We append to state before appending the pdu, so we don't have a moment in time with the
|
||||
// pdu without it's state. This is okay because append_pdu can't fail.
|
||||
|
@ -2655,9 +2630,7 @@ impl Rooms {
|
|||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
pub fn auth_chain_cache(
|
||||
&self,
|
||||
) -> std::sync::MutexGuard<'_, LruCache<Vec<EventId>, HashSet<EventId>>> {
|
||||
pub fn auth_chain_cache(&self) -> std::sync::MutexGuard<'_, LruCache<u64, HashSet<u64>>> {
|
||||
self.auth_chain_cache.lock().unwrap()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -668,7 +668,7 @@ pub async fn send_transaction_message_route(
|
|||
|
||||
let elapsed = start_time.elapsed();
|
||||
warn!(
|
||||
"Handling event {} took {}m{}s",
|
||||
"Handling transaction of event {} took {}m{}s",
|
||||
event_id,
|
||||
elapsed.as_secs() / 60,
|
||||
elapsed.as_secs() % 60
|
||||
|
@ -721,7 +721,8 @@ pub async fn send_transaction_message_route(
|
|||
&db.globals,
|
||||
)?;
|
||||
} else {
|
||||
warn!("No known event ids in read receipt: {:?}", user_updates);
|
||||
// TODO fetch missing events
|
||||
debug!("No known event ids in read receipt: {:?}", user_updates);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -849,6 +850,8 @@ pub fn handle_incoming_pdu<'a>(
|
|||
pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, String>>>,
|
||||
) -> AsyncRecursiveType<'a, StdResult<Option<Vec<u8>>, String>> {
|
||||
Box::pin(async move {
|
||||
let start_time = Instant::now();
|
||||
|
||||
// TODO: For RoomVersion6 we must check that Raw<..> is canonical do we anywhere?: https://matrix.org/docs/spec/rooms/v6#canonical-json
|
||||
match db.rooms.exists(&room_id) {
|
||||
Ok(true) => {}
|
||||
|
@ -1013,16 +1016,22 @@ pub fn handle_incoming_pdu<'a>(
|
|||
// 8. if not timeline event: stop
|
||||
if !is_timeline_event
|
||||
|| incoming_pdu.origin_server_ts
|
||||
< db.rooms
|
||||
.first_pdu_in_room(&room_id)
|
||||
.map_err(|_| "Error loading first room event.".to_owned())?
|
||||
.expect("Room exists")
|
||||
.origin_server_ts
|
||||
< (utils::millis_since_unix_epoch() - 1000 * 60 * 20)
|
||||
.try_into()
|
||||
.expect("time is valid")
|
||||
// Not older than 20 mins
|
||||
{
|
||||
let elapsed = start_time.elapsed();
|
||||
warn!(
|
||||
"Handling outlier event {} took {}m{}s",
|
||||
event_id,
|
||||
elapsed.as_secs() / 60,
|
||||
elapsed.as_secs() % 60
|
||||
);
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// Load missing prev events first
|
||||
// 9. Fetch any missing prev events doing all checks listed here starting at 1. These are timeline events
|
||||
fetch_and_handle_events(
|
||||
db,
|
||||
origin,
|
||||
|
@ -1033,8 +1042,6 @@ pub fn handle_incoming_pdu<'a>(
|
|||
)
|
||||
.await;
|
||||
|
||||
// TODO: 9. fetch any missing prev events doing all checks listed here starting at 1. These are timeline events
|
||||
|
||||
// 10. Fetch missing state and auth chain events by calling /state_ids at backwards extremities
|
||||
// doing all the checks in this list starting at 1. These are not timeline events.
|
||||
|
||||
|
@ -1046,13 +1053,16 @@ pub fn handle_incoming_pdu<'a>(
|
|||
|
||||
if incoming_pdu.prev_events.len() == 1 {
|
||||
let prev_event = &incoming_pdu.prev_events[0];
|
||||
let state = db
|
||||
let prev_event_sstatehash = db
|
||||
.rooms
|
||||
.pdu_shortstatehash(prev_event)
|
||||
.map_err(|_| "Failed talking to db".to_owned())?
|
||||
.map(|shortstatehash| db.rooms.state_full_ids(shortstatehash).ok())
|
||||
.flatten();
|
||||
if let Some(state) = state {
|
||||
.map_err(|_| "Failed talking to db".to_owned())?;
|
||||
|
||||
let state =
|
||||
prev_event_sstatehash.map(|shortstatehash| db.rooms.state_full_ids(shortstatehash));
|
||||
|
||||
if let Some(Ok(state)) = state {
|
||||
warn!("Using cached state");
|
||||
let mut state = fetch_and_handle_events(
|
||||
db,
|
||||
origin,
|
||||
|
@ -1090,6 +1100,7 @@ pub fn handle_incoming_pdu<'a>(
|
|||
}
|
||||
|
||||
if state_at_incoming_event.is_none() {
|
||||
warn!("Calling /state_ids");
|
||||
// Call /state_ids to find out what the state at this pdu is. We trust the server's
|
||||
// response to some extend, but we still do a lot of checks on the events
|
||||
match db
|
||||
|
@ -1309,7 +1320,8 @@ pub fn handle_incoming_pdu<'a>(
|
|||
for state in fork_states {
|
||||
auth_chain_sets.push(
|
||||
get_auth_chain(state.iter().map(|(_, id)| id.clone()).collect(), db)
|
||||
.map_err(|_| "Failed to load auth chain.".to_owned())?,
|
||||
.map_err(|_| "Failed to load auth chain.".to_owned())?
|
||||
.collect(),
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -1382,6 +1394,14 @@ pub fn handle_incoming_pdu<'a>(
|
|||
|
||||
// Event has passed all auth/stateres checks
|
||||
drop(state_lock);
|
||||
|
||||
let elapsed = start_time.elapsed();
|
||||
warn!(
|
||||
"Handling timeline event {} took {}m{}s",
|
||||
event_id,
|
||||
elapsed.as_secs() / 60,
|
||||
elapsed.as_secs() % 60
|
||||
);
|
||||
Ok(pdu_id)
|
||||
})
|
||||
}
|
||||
|
@ -1754,38 +1774,57 @@ fn append_incoming_pdu(
|
|||
Ok(pdu_id)
|
||||
}
|
||||
|
||||
fn get_auth_chain(starting_events: Vec<EventId>, db: &Database) -> Result<HashSet<EventId>> {
|
||||
fn get_auth_chain(
|
||||
starting_events: Vec<EventId>,
|
||||
db: &Database,
|
||||
) -> Result<impl Iterator<Item = EventId> + '_> {
|
||||
let mut full_auth_chain = HashSet::new();
|
||||
|
||||
let starting_events = starting_events
|
||||
.iter()
|
||||
.map(|id| {
|
||||
db.rooms
|
||||
.get_or_create_shorteventid(id, &db.globals)
|
||||
.map(|s| (s, id))
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
let mut cache = db.rooms.auth_chain_cache();
|
||||
|
||||
for event_id in &starting_events {
|
||||
if let Some(cached) = cache.get_mut(&[event_id.clone()][..]) {
|
||||
for (sevent_id, event_id) in starting_events {
|
||||
if let Some(cached) = cache.get_mut(&sevent_id) {
|
||||
full_auth_chain.extend(cached.iter().cloned());
|
||||
} else {
|
||||
drop(cache);
|
||||
let mut auth_chain = HashSet::new();
|
||||
get_auth_chain_recursive(&event_id, &mut auth_chain, db)?;
|
||||
cache = db.rooms.auth_chain_cache();
|
||||
cache.insert(vec![event_id.clone()], auth_chain.clone());
|
||||
cache.insert(sevent_id, auth_chain.clone());
|
||||
full_auth_chain.extend(auth_chain);
|
||||
};
|
||||
}
|
||||
|
||||
Ok(full_auth_chain)
|
||||
drop(cache);
|
||||
|
||||
Ok(full_auth_chain
|
||||
.into_iter()
|
||||
.filter_map(move |sid| db.rooms.get_eventid_from_short(sid).ok()))
|
||||
}
|
||||
|
||||
fn get_auth_chain_recursive(
|
||||
event_id: &EventId,
|
||||
found: &mut HashSet<EventId>,
|
||||
found: &mut HashSet<u64>,
|
||||
db: &Database,
|
||||
) -> Result<()> {
|
||||
let r = db.rooms.get_pdu(&event_id);
|
||||
match r {
|
||||
Ok(Some(pdu)) => {
|
||||
for auth_event in &pdu.auth_events {
|
||||
if !found.contains(auth_event) {
|
||||
found.insert(auth_event.clone());
|
||||
let sauthevent = db
|
||||
.rooms
|
||||
.get_or_create_shorteventid(auth_event, &db.globals)?;
|
||||
if !found.contains(&sauthevent) {
|
||||
found.insert(sauthevent);
|
||||
get_auth_chain_recursive(&auth_event, found, db)?;
|
||||
}
|
||||
}
|
||||
|
@ -1892,7 +1931,6 @@ pub fn get_event_authorization_route(
|
|||
|
||||
Ok(get_event_authorization::v1::Response {
|
||||
auth_chain: auth_chain_ids
|
||||
.into_iter()
|
||||
.filter_map(|id| Some(db.rooms.get_pdu_json(&id).ok()??))
|
||||
.map(|event| PduEvent::convert_to_outgoing_federation_event(event))
|
||||
.collect(),
|
||||
|
@ -1936,7 +1974,6 @@ pub fn get_room_state_route(
|
|||
|
||||
Ok(get_room_state::v1::Response {
|
||||
auth_chain: auth_chain_ids
|
||||
.into_iter()
|
||||
.map(|id| {
|
||||
Ok::<_, Error>(PduEvent::convert_to_outgoing_federation_event(
|
||||
db.rooms.get_pdu_json(&id)?.unwrap(),
|
||||
|
@ -1979,7 +2016,7 @@ pub fn get_room_state_ids_route(
|
|||
let auth_chain_ids = get_auth_chain(vec![body.event_id.clone()], &db)?;
|
||||
|
||||
Ok(get_room_state_ids::v1::Response {
|
||||
auth_chain_ids: auth_chain_ids.into_iter().collect(),
|
||||
auth_chain_ids: auth_chain_ids.collect(),
|
||||
pdu_ids,
|
||||
}
|
||||
.into())
|
||||
|
@ -2248,7 +2285,6 @@ pub async fn create_join_event_route(
|
|||
Ok(create_join_event::v2::Response {
|
||||
room_state: RoomState {
|
||||
auth_chain: auth_chain_ids
|
||||
.iter()
|
||||
.filter_map(|id| db.rooms.get_pdu_json(&id).ok().flatten())
|
||||
.map(PduEvent::convert_to_outgoing_federation_event)
|
||||
.collect(),
|
||||
|
|
Loading…
Add table
Reference in a new issue