diff --git a/src/database/key_value/room.rs b/src/database/key_value/room.rs deleted file mode 100644 index 8bd6648e..00000000 --- a/src/database/key_value/room.rs +++ /dev/null @@ -1 +0,0 @@ -asdf diff --git a/src/database/key_value/room/alias.rs b/src/database/key_value/rooms/alias.rs similarity index 100% rename from src/database/key_value/room/alias.rs rename to src/database/key_value/rooms/alias.rs diff --git a/src/database/key_value/room/directory.rs b/src/database/key_value/rooms/directory.rs similarity index 100% rename from src/database/key_value/room/directory.rs rename to src/database/key_value/rooms/directory.rs diff --git a/src/database/key_value/room/edus/presence.rs b/src/database/key_value/rooms/edus/presence.rs similarity index 100% rename from src/database/key_value/room/edus/presence.rs rename to src/database/key_value/rooms/edus/presence.rs diff --git a/src/database/key_value/room/edus/read_receipt.rs b/src/database/key_value/rooms/edus/read_receipt.rs similarity index 100% rename from src/database/key_value/room/edus/read_receipt.rs rename to src/database/key_value/rooms/edus/read_receipt.rs diff --git a/src/database/key_value/room/edus/typing.rs b/src/database/key_value/rooms/edus/typing.rs similarity index 100% rename from src/database/key_value/room/edus/typing.rs rename to src/database/key_value/rooms/edus/typing.rs diff --git a/src/database/key_value/room/lazy_load.rs b/src/database/key_value/rooms/lazy_load.rs similarity index 100% rename from src/database/key_value/room/lazy_load.rs rename to src/database/key_value/rooms/lazy_load.rs diff --git a/src/database/key_value/room/metadata.rs b/src/database/key_value/rooms/metadata.rs similarity index 100% rename from src/database/key_value/room/metadata.rs rename to src/database/key_value/rooms/metadata.rs diff --git a/src/database/key_value/room/mod.rs b/src/database/key_value/rooms/mod.rs similarity index 100% rename from src/database/key_value/room/mod.rs rename to src/database/key_value/rooms/mod.rs diff --git a/src/database/key_value/room/outlier.rs b/src/database/key_value/rooms/outlier.rs similarity index 100% rename from src/database/key_value/room/outlier.rs rename to src/database/key_value/rooms/outlier.rs diff --git a/src/database/key_value/room/pdu_metadata.rs b/src/database/key_value/rooms/pdu_metadata.rs similarity index 100% rename from src/database/key_value/room/pdu_metadata.rs rename to src/database/key_value/rooms/pdu_metadata.rs diff --git a/src/database/key_value/rooms/search.rs b/src/database/key_value/rooms/search.rs index 6a32e8b4..1ffffe56 100644 --- a/src/database/key_value/rooms/search.rs +++ b/src/database/key_value/rooms/search.rs @@ -1,956 +1,23 @@ - - /// Checks if a room exists. - #[tracing::instrument(skip(self))] - pub fn first_pdu_in_room(&self, room_id: &RoomId) -> Result>> { - let prefix = self - .get_shortroomid(room_id)? - .expect("room exists") - .to_be_bytes() - .to_vec(); - - // Look for PDUs in that room. - self.pduid_pdu - .iter_from(&prefix, false) - .filter(|(k, _)| k.starts_with(&prefix)) - .map(|(_, pdu)| { - serde_json::from_slice(&pdu) - .map_err(|_| Error::bad_database("Invalid first PDU in db.")) - .map(Arc::new) - }) - .next() - .transpose() - } - - #[tracing::instrument(skip(self))] - pub fn last_timeline_count(&self, sender_user: &UserId, room_id: &RoomId) -> Result { - match self - .lasttimelinecount_cache - .lock() - .unwrap() - .entry(room_id.to_owned()) - { - hash_map::Entry::Vacant(v) => { - if let Some(last_count) = self - .pdus_until(&sender_user, &room_id, u64::MAX)? - .filter_map(|r| { - // Filter out buggy events - if r.is_err() { - error!("Bad pdu in pdus_since: {:?}", r); - } - r.ok() - }) - .map(|(pduid, _)| self.pdu_count(&pduid)) - .next() - { - Ok(*v.insert(last_count?)) - } else { - Ok(0) - } - } - hash_map::Entry::Occupied(o) => Ok(*o.get()), - } - } - - // TODO Is this the same as the function above? - #[tracing::instrument(skip(self))] - pub fn latest_pdu_count(&self, room_id: &RoomId) -> Result { - let prefix = self - .get_shortroomid(room_id)? - .expect("room exists") - .to_be_bytes() - .to_vec(); - - let mut last_possible_key = prefix.clone(); - last_possible_key.extend_from_slice(&u64::MAX.to_be_bytes()); - - self.pduid_pdu - .iter_from(&last_possible_key, true) - .take_while(move |(k, _)| k.starts_with(&prefix)) - .next() - .map(|b| self.pdu_count(&b.0)) - .transpose() - .map(|op| op.unwrap_or_default()) - } - - - - /// Returns the `count` of this pdu's id. - pub fn get_pdu_count(&self, event_id: &EventId) -> Result> { - self.eventid_pduid - .get(event_id.as_bytes())? - .map(|pdu_id| self.pdu_count(&pdu_id)) - .transpose() - } - - /// Returns the json of a pdu. - pub fn get_pdu_json(&self, event_id: &EventId) -> Result> { - self.eventid_pduid - .get(event_id.as_bytes())? - .map_or_else( - || self.eventid_outlierpdu.get(event_id.as_bytes()), - |pduid| { - Ok(Some(self.pduid_pdu.get(&pduid)?.ok_or_else(|| { - Error::bad_database("Invalid pduid in eventid_pduid.") - })?)) - }, - )? - .map(|pdu| { - serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db.")) - }) - .transpose() - } - - /// Returns the json of a pdu. - pub fn get_non_outlier_pdu_json( - &self, - event_id: &EventId, - ) -> Result> { - self.eventid_pduid - .get(event_id.as_bytes())? - .map(|pduid| { - self.pduid_pdu - .get(&pduid)? - .ok_or_else(|| Error::bad_database("Invalid pduid in eventid_pduid.")) - }) - .transpose()? - .map(|pdu| { - serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db.")) - }) - .transpose() - } - - /// Returns the pdu's id. - pub fn get_pdu_id(&self, event_id: &EventId) -> Result>> { - self.eventid_pduid.get(event_id.as_bytes()) - } - - /// Returns the pdu. - /// - /// Checks the `eventid_outlierpdu` Tree if not found in the timeline. - pub fn get_non_outlier_pdu(&self, event_id: &EventId) -> Result> { - self.eventid_pduid - .get(event_id.as_bytes())? - .map(|pduid| { - self.pduid_pdu - .get(&pduid)? - .ok_or_else(|| Error::bad_database("Invalid pduid in eventid_pduid.")) - }) - .transpose()? - .map(|pdu| { - serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db.")) - }) - .transpose() - } - - /// Returns the pdu. - /// - /// Checks the `eventid_outlierpdu` Tree if not found in the timeline. - pub fn get_pdu(&self, event_id: &EventId) -> Result>> { - if let Some(p) = self.pdu_cache.lock().unwrap().get_mut(event_id) { - return Ok(Some(Arc::clone(p))); - } - - if let Some(pdu) = self - .eventid_pduid - .get(event_id.as_bytes())? - .map_or_else( - || self.eventid_outlierpdu.get(event_id.as_bytes()), - |pduid| { - Ok(Some(self.pduid_pdu.get(&pduid)?.ok_or_else(|| { - Error::bad_database("Invalid pduid in eventid_pduid.") - })?)) - }, - )? - .map(|pdu| { - serde_json::from_slice(&pdu) - .map_err(|_| Error::bad_database("Invalid PDU in db.")) - .map(Arc::new) - }) - .transpose()? - { - self.pdu_cache - .lock() - .unwrap() - .insert(event_id.to_owned(), Arc::clone(&pdu)); - Ok(Some(pdu)) - } else { - Ok(None) - } - } - - /// Returns the pdu. - /// - /// This does __NOT__ check the outliers `Tree`. - pub fn get_pdu_from_id(&self, pdu_id: &[u8]) -> Result> { - self.pduid_pdu.get(pdu_id)?.map_or(Ok(None), |pdu| { - Ok(Some( - serde_json::from_slice(&pdu) - .map_err(|_| Error::bad_database("Invalid PDU in db."))?, - )) - }) - } - - /// Returns the pdu as a `BTreeMap`. - pub fn get_pdu_json_from_id(&self, pdu_id: &[u8]) -> Result> { - self.pduid_pdu.get(pdu_id)?.map_or(Ok(None), |pdu| { - Ok(Some( - serde_json::from_slice(&pdu) - .map_err(|_| Error::bad_database("Invalid PDU in db."))?, - )) - }) - } - - /// Returns the `count` of this pdu's id. - pub fn pdu_count(&self, pdu_id: &[u8]) -> Result { - utils::u64_from_bytes(&pdu_id[pdu_id.len() - size_of::()..]) - .map_err(|_| Error::bad_database("PDU has invalid count bytes.")) - } - - /// Removes a pdu and creates a new one with the same id. - #[tracing::instrument(skip(self))] - fn replace_pdu(&self, pdu_id: &[u8], pdu: &PduEvent) -> Result<()> { - if self.pduid_pdu.get(pdu_id)?.is_some() { - self.pduid_pdu.insert( - pdu_id, - &serde_json::to_vec(pdu).expect("PduEvent::to_vec always works"), - )?; - Ok(()) - } else { - Err(Error::BadRequest( - ErrorKind::NotFound, - "PDU does not exist.", - )) - } - } - - /// Creates a new persisted data unit and adds it to a room. - /// - /// By this point the incoming event should be fully authenticated, no auth happens - /// in `append_pdu`. - /// - /// Returns pdu id - #[tracing::instrument(skip(self, pdu, pdu_json, leaves, db))] - pub fn append_pdu<'a>( - &self, - pdu: &PduEvent, - mut pdu_json: CanonicalJsonObject, - leaves: impl IntoIterator + Debug, - db: &Database, - ) -> Result> { - let shortroomid = self.get_shortroomid(&pdu.room_id)?.expect("room exists"); - - // Make unsigned fields correct. This is not properly documented in the spec, but state - // events need to have previous content in the unsigned field, so clients can easily - // interpret things like membership changes - if let Some(state_key) = &pdu.state_key { - if let CanonicalJsonValue::Object(unsigned) = pdu_json - .entry("unsigned".to_owned()) - .or_insert_with(|| CanonicalJsonValue::Object(Default::default())) - { - if let Some(shortstatehash) = self.pdu_shortstatehash(&pdu.event_id).unwrap() { - if let Some(prev_state) = self - .state_get(shortstatehash, &pdu.kind.to_string().into(), state_key) - .unwrap() - { - unsigned.insert( - "prev_content".to_owned(), - CanonicalJsonValue::Object( - utils::to_canonical_object(prev_state.content.clone()) - .expect("event is valid, we just created it"), - ), - ); - } - } - } else { - error!("Invalid unsigned type in pdu."); - } - } - - // We must keep track of all events that have been referenced. - self.mark_as_referenced(&pdu.room_id, &pdu.prev_events)?; - self.replace_pdu_leaves(&pdu.room_id, leaves)?; - - let mutex_insert = Arc::clone( - db.globals - .roomid_mutex_insert - .write() - .unwrap() - .entry(pdu.room_id.clone()) - .or_default(), - ); - let insert_lock = mutex_insert.lock().unwrap(); - - let count1 = db.globals.next_count()?; - // Mark as read first so the sending client doesn't get a notification even if appending - // fails - self.edus - .private_read_set(&pdu.room_id, &pdu.sender, count1, &db.globals)?; - self.reset_notification_counts(&pdu.sender, &pdu.room_id)?; - - let count2 = db.globals.next_count()?; - let mut pdu_id = shortroomid.to_be_bytes().to_vec(); - pdu_id.extend_from_slice(&count2.to_be_bytes()); - - // There's a brief moment of time here where the count is updated but the pdu does not - // exist. This could theoretically lead to dropped pdus, but it's extremely rare - // - // Update: We fixed this using insert_lock - - self.pduid_pdu.insert( - &pdu_id, - &serde_json::to_vec(&pdu_json).expect("CanonicalJsonObject is always a valid"), - )?; - self.lasttimelinecount_cache - .lock() - .unwrap() - .insert(pdu.room_id.clone(), count2); - - self.eventid_pduid - .insert(pdu.event_id.as_bytes(), &pdu_id)?; - self.eventid_outlierpdu.remove(pdu.event_id.as_bytes())?; - - drop(insert_lock); - - // See if the event matches any known pushers - let power_levels: RoomPowerLevelsEventContent = db - .rooms - .room_state_get(&pdu.room_id, &StateEventType::RoomPowerLevels, "")? - .map(|ev| { - serde_json::from_str(ev.content.get()) - .map_err(|_| Error::bad_database("invalid m.room.power_levels event")) - }) - .transpose()? - .unwrap_or_default(); - - let sync_pdu = pdu.to_sync_room_event(); - - let mut notifies = Vec::new(); - let mut highlights = Vec::new(); - - for user in self.get_our_real_users(&pdu.room_id, db)?.iter() { - // Don't notify the user of their own events - if user == &pdu.sender { - continue; - } - - let rules_for_user = db - .account_data - .get( - None, - user, - GlobalAccountDataEventType::PushRules.to_string().into(), - )? - .map(|ev: PushRulesEvent| ev.content.global) - .unwrap_or_else(|| Ruleset::server_default(user)); - - let mut highlight = false; - let mut notify = false; - - for action in pusher::get_actions( - user, - &rules_for_user, - &power_levels, - &sync_pdu, - &pdu.room_id, - db, - )? { - match action { - Action::DontNotify => notify = false, - // TODO: Implement proper support for coalesce - Action::Notify | Action::Coalesce => notify = true, - Action::SetTweak(Tweak::Highlight(true)) => { - highlight = true; - } - _ => {} - }; - } - - let mut userroom_id = user.as_bytes().to_vec(); - userroom_id.push(0xff); - userroom_id.extend_from_slice(pdu.room_id.as_bytes()); - - if notify { - notifies.push(userroom_id.clone()); - } - - if highlight { - highlights.push(userroom_id); - } - - for senderkey in db.pusher.get_pusher_senderkeys(user) { - db.sending.send_push_pdu(&*pdu_id, senderkey)?; - } - } - - self.userroomid_notificationcount - .increment_batch(&mut notifies.into_iter())?; - self.userroomid_highlightcount - .increment_batch(&mut highlights.into_iter())?; - - match pdu.kind { - RoomEventType::RoomRedaction => { - if let Some(redact_id) = &pdu.redacts { - self.redact_pdu(redact_id, pdu)?; - } - } - RoomEventType::RoomMember => { - if let Some(state_key) = &pdu.state_key { - #[derive(Deserialize)] - struct ExtractMembership { - membership: MembershipState, - } - - // if the state_key fails - let target_user_id = UserId::parse(state_key.clone()) - .expect("This state_key was previously validated"); - - let content = serde_json::from_str::(pdu.content.get()) - .map_err(|_| Error::bad_database("Invalid content in pdu."))?; - - let invite_state = match content.membership { - MembershipState::Invite => { - let state = self.calculate_invite_state(pdu)?; - Some(state) - } - _ => None, - }; - - // Update our membership info, we do this here incase a user is invited - // and immediately leaves we need the DB to record the invite event for auth - self.update_membership( - &pdu.room_id, - &target_user_id, - content.membership, - &pdu.sender, - invite_state, - db, - true, - )?; - } - } - RoomEventType::RoomMessage => { - #[derive(Deserialize)] - struct ExtractBody<'a> { - #[serde(borrow)] - body: Option>, - } - - let content = serde_json::from_str::>(pdu.content.get()) - .map_err(|_| Error::bad_database("Invalid content in pdu."))?; - - if let Some(body) = content.body { - let mut batch = body - .split_terminator(|c: char| !c.is_alphanumeric()) - .filter(|s| !s.is_empty()) - .filter(|word| word.len() <= 50) - .map(str::to_lowercase) - .map(|word| { - let mut key = shortroomid.to_be_bytes().to_vec(); - key.extend_from_slice(word.as_bytes()); - key.push(0xff); - key.extend_from_slice(&pdu_id); - (key, Vec::new()) - }); - - self.tokenids.insert_batch(&mut batch)?; - - let admin_room = self.id_from_alias( - <&RoomAliasId>::try_from( - format!("#admins:{}", db.globals.server_name()).as_str(), - ) - .expect("#admins:server_name is a valid room alias"), - )?; - let server_user = format!("@conduit:{}", db.globals.server_name()); - - let to_conduit = body.starts_with(&format!("{}: ", server_user)); - - // This will evaluate to false if the emergency password is set up so that - // the administrator can execute commands as conduit - let from_conduit = - pdu.sender == server_user && db.globals.emergency_password().is_none(); - - if to_conduit && !from_conduit && admin_room.as_ref() == Some(&pdu.room_id) { - db.admin.process_message(body.to_string()); - } - } - } - _ => {} - } - - for appservice in db.appservice.all()? { - if self.appservice_in_room(room_id, &appservice, db)? { - db.sending.send_pdu_appservice(&appservice.0, &pdu_id)?; - continue; - } - - // If the RoomMember event has a non-empty state_key, it is targeted at someone. - // If it is our appservice user, we send this PDU to it. - if pdu.kind == RoomEventType::RoomMember { - if let Some(state_key_uid) = &pdu - .state_key - .as_ref() - .and_then(|state_key| UserId::parse(state_key.as_str()).ok()) - { - if let Some(appservice_uid) = appservice - .1 - .get("sender_localpart") - .and_then(|string| string.as_str()) - .and_then(|string| { - UserId::parse_with_server_name(string, db.globals.server_name()).ok() - }) - { - if state_key_uid == &appservice_uid { - db.sending.send_pdu_appservice(&appservice.0, &pdu_id)?; - continue; - } - } - } - } - - if let Some(namespaces) = appservice.1.get("namespaces") { - let users = namespaces - .get("users") - .and_then(|users| users.as_sequence()) - .map_or_else(Vec::new, |users| { - users - .iter() - .filter_map(|users| Regex::new(users.get("regex")?.as_str()?).ok()) - .collect::>() - }); - let aliases = namespaces - .get("aliases") - .and_then(|aliases| aliases.as_sequence()) - .map_or_else(Vec::new, |aliases| { - aliases - .iter() - .filter_map(|aliases| Regex::new(aliases.get("regex")?.as_str()?).ok()) - .collect::>() - }); - let rooms = namespaces - .get("rooms") - .and_then(|rooms| rooms.as_sequence()); - - let matching_users = |users: &Regex| { - users.is_match(pdu.sender.as_str()) - || pdu.kind == RoomEventType::RoomMember - && pdu - .state_key - .as_ref() - .map_or(false, |state_key| users.is_match(state_key)) - }; - let matching_aliases = |aliases: &Regex| { - self.room_aliases(room_id) - .filter_map(|r| r.ok()) - .any(|room_alias| aliases.is_match(room_alias.as_str())) - }; - - if aliases.iter().any(matching_aliases) - || rooms.map_or(false, |rooms| rooms.contains(&room_id.as_str().into())) - || users.iter().any(matching_users) - { - db.sending.send_pdu_appservice(&appservice.0, &pdu_id)?; - } - } - } - - - Ok(pdu_id) - } - - pub fn create_hash_and_sign_event( - &self, - pdu_builder: PduBuilder, - sender: &UserId, - room_id: &RoomId, - db: &Database, - _mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex - ) -> (PduEvent, CanonicalJsonObj) { - let PduBuilder { - event_type, - content, - unsigned, - state_key, - redacts, - } = pdu_builder; - - let prev_events: Vec<_> = db - .rooms - .get_pdu_leaves(room_id)? - .into_iter() - .take(20) - .collect(); - - let create_event = db - .rooms - .room_state_get(room_id, &StateEventType::RoomCreate, "")?; - - let create_event_content: Option = create_event - .as_ref() - .map(|create_event| { - serde_json::from_str(create_event.content.get()).map_err(|e| { - warn!("Invalid create event: {}", e); - Error::bad_database("Invalid create event in db.") - }) - }) - .transpose()?; - - // If there was no create event yet, assume we are creating a room with the default - // version right now - let room_version_id = create_event_content - .map_or(db.globals.default_room_version(), |create_event| { - create_event.room_version +impl service::room::search::Data for KeyValueDatabase { + + fn index_pdu<'a>(&self, room_id: &RoomId, pdu_id: u64, message_body: String) -> Result<()> { + let mut batch = body + .split_terminator(|c: char| !c.is_alphanumeric()) + .filter(|s| !s.is_empty()) + .filter(|word| word.len() <= 50) + .map(str::to_lowercase) + .map(|word| { + let mut key = shortroomid.to_be_bytes().to_vec(); + key.extend_from_slice(word.as_bytes()); + key.push(0xff); + key.extend_from_slice(&pdu_id); + (key, Vec::new()) }); - let room_version = - RoomVersion::new(&room_version_id).expect("room version is supported"); - let auth_events = - self.get_auth_events(room_id, &event_type, sender, state_key.as_deref(), &content)?; - - // Our depth is the maximum depth of prev_events + 1 - let depth = prev_events - .iter() - .filter_map(|event_id| Some(db.rooms.get_pdu(event_id).ok()??.depth)) - .max() - .unwrap_or_else(|| uint!(0)) - + uint!(1); - - let mut unsigned = unsigned.unwrap_or_default(); - - if let Some(state_key) = &state_key { - if let Some(prev_pdu) = - self.room_state_get(room_id, &event_type.to_string().into(), state_key)? - { - unsigned.insert( - "prev_content".to_owned(), - serde_json::from_str(prev_pdu.content.get()).expect("string is valid json"), - ); - unsigned.insert( - "prev_sender".to_owned(), - serde_json::to_value(&prev_pdu.sender).expect("UserId::to_value always works"), - ); - } - } - - let pdu = PduEvent { - event_id: ruma::event_id!("$thiswillbefilledinlater").into(), - room_id: room_id.to_owned(), - sender: sender_user.to_owned(), - origin_server_ts: utils::millis_since_unix_epoch() - .try_into() - .expect("time is valid"), - kind: event_type, - content, - state_key, - prev_events, - depth, - auth_events: auth_events - .iter() - .map(|(_, pdu)| pdu.event_id.clone()) - .collect(), - redacts, - unsigned: if unsigned.is_empty() { - None - } else { - Some(to_raw_value(&unsigned).expect("to_raw_value always works")) - }, - hashes: EventHash { - sha256: "aaa".to_owned(), - }, - signatures: None, - }; - - let auth_check = state_res::auth_check( - &room_version, - &pdu, - None::, // TODO: third_party_invite - |k, s| auth_events.get(&(k.clone(), s.to_owned())), - ) - .map_err(|e| { - error!("{:?}", e); - Error::bad_database("Auth check failed.") - })?; - - if !auth_check { - return Err(Error::BadRequest( - ErrorKind::Forbidden, - "Event is not authorized.", - )); - } - - // Hash and sign - let mut pdu_json = - utils::to_canonical_object(&pdu).expect("event is valid, we just created it"); - - pdu_json.remove("event_id"); - - // Add origin because synapse likes that (and it's required in the spec) - pdu_json.insert( - "origin".to_owned(), - to_canonical_value(db.globals.server_name()) - .expect("server name is a valid CanonicalJsonValue"), - ); - - match ruma::signatures::hash_and_sign_event( - db.globals.server_name().as_str(), - db.globals.keypair(), - &mut pdu_json, - &room_version_id, - ) { - Ok(_) => {} - Err(e) => { - return match e { - ruma::signatures::Error::PduSize => Err(Error::BadRequest( - ErrorKind::TooLarge, - "Message is too long", - )), - _ => Err(Error::BadRequest( - ErrorKind::Unknown, - "Signing event failed", - )), - } - } - } - - // Generate event id - pdu.event_id = EventId::parse_arc(format!( - "${}", - ruma::signatures::reference_hash(&pdu_json, &room_version_id) - .expect("ruma can calculate reference hashes") - )) - .expect("ruma's reference hashes are valid event ids"); - - pdu_json.insert( - "event_id".to_owned(), - CanonicalJsonValue::String(pdu.event_id.as_str().to_owned()), - ); - - // Generate short event id - let _shorteventid = self.get_or_create_shorteventid(&pdu.event_id, &db.globals)?; + self.tokenids.insert_batch(&mut batch)?; } - /// Creates a new persisted data unit and adds it to a room. This function takes a - /// roomid_mutex_state, meaning that only this function is able to mutate the room state. - #[tracing::instrument(skip(self, db, _mutex_lock))] - pub fn build_and_append_pdu( - &self, - pdu_builder: PduBuilder, - sender: &UserId, - room_id: &RoomId, - db: &Database, - _mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex - ) -> Result> { - - let (pdu, pdu_json) = create_hash_and_sign_event()?; - - - // We append to state before appending the pdu, so we don't have a moment in time with the - // pdu without it's state. This is okay because append_pdu can't fail. - let statehashid = self.append_to_state(&pdu, &db.globals)?; - - let pdu_id = self.append_pdu( - &pdu, - pdu_json, - // Since this PDU references all pdu_leaves we can update the leaves - // of the room - iter::once(&*pdu.event_id), - db, - )?; - - // We set the room state after inserting the pdu, so that we never have a moment in time - // where events in the current room state do not exist - self.set_room_state(room_id, statehashid)?; - - let mut servers: HashSet> = - self.room_servers(room_id).filter_map(|r| r.ok()).collect(); - - // In case we are kicking or banning a user, we need to inform their server of the change - if pdu.kind == RoomEventType::RoomMember { - if let Some(state_key_uid) = &pdu - .state_key - .as_ref() - .and_then(|state_key| UserId::parse(state_key.as_str()).ok()) - { - servers.insert(Box::from(state_key_uid.server_name())); - } - } - - // Remove our server from the server list since it will be added to it by room_servers() and/or the if statement above - servers.remove(db.globals.server_name()); - - db.sending.send_pdu(servers.into_iter(), &pdu_id)?; - - Ok(pdu.event_id) - } - - /// Append the incoming event setting the state snapshot to the state from the - /// server that sent the event. - #[tracing::instrument(skip_all)] - fn append_incoming_pdu<'a>( - db: &Database, - pdu: &PduEvent, - pdu_json: CanonicalJsonObject, - new_room_leaves: impl IntoIterator + Clone + Debug, - state_ids_compressed: HashSet, - soft_fail: bool, - _mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex - ) -> Result>> { - // We append to state before appending the pdu, so we don't have a moment in time with the - // pdu without it's state. This is okay because append_pdu can't fail. - db.rooms.set_event_state( - &pdu.event_id, - &pdu.room_id, - state_ids_compressed, - &db.globals, - )?; - - if soft_fail { - db.rooms - .mark_as_referenced(&pdu.room_id, &pdu.prev_events)?; - db.rooms.replace_pdu_leaves(&pdu.room_id, new_room_leaves)?; - return Ok(None); - } - - let pdu_id = db.rooms.append_pdu(pdu, pdu_json, new_room_leaves, db)?; - - Ok(Some(pdu_id)) - } - - /// Returns an iterator over all PDUs in a room. - #[tracing::instrument(skip(self))] - pub fn all_pdus<'a>( - &'a self, - user_id: &UserId, - room_id: &RoomId, - ) -> Result, PduEvent)>> + 'a> { - self.pdus_since(user_id, room_id, 0) - } - - /// Returns an iterator over all events in a room that happened after the event with id `since` - /// in chronological order. - #[tracing::instrument(skip(self))] - pub fn pdus_since<'a>( - &'a self, - user_id: &UserId, - room_id: &RoomId, - since: u64, - ) -> Result, PduEvent)>> + 'a> { - let prefix = self - .get_shortroomid(room_id)? - .expect("room exists") - .to_be_bytes() - .to_vec(); - - // Skip the first pdu if it's exactly at since, because we sent that last time - let mut first_pdu_id = prefix.clone(); - first_pdu_id.extend_from_slice(&(since + 1).to_be_bytes()); - - let user_id = user_id.to_owned(); - - Ok(self - .pduid_pdu - .iter_from(&first_pdu_id, false) - .take_while(move |(k, _)| k.starts_with(&prefix)) - .map(move |(pdu_id, v)| { - let mut pdu = serde_json::from_slice::(&v) - .map_err(|_| Error::bad_database("PDU in db is invalid."))?; - if pdu.sender != user_id { - pdu.remove_transaction_id()?; - } - Ok((pdu_id, pdu)) - })) - } - - /// Returns an iterator over all events and their tokens in a room that happened before the - /// event with id `until` in reverse-chronological order. - #[tracing::instrument(skip(self))] - pub fn pdus_until<'a>( - &'a self, - user_id: &UserId, - room_id: &RoomId, - until: u64, - ) -> Result, PduEvent)>> + 'a> { - // Create the first part of the full pdu id - let prefix = self - .get_shortroomid(room_id)? - .expect("room exists") - .to_be_bytes() - .to_vec(); - - let mut current = prefix.clone(); - current.extend_from_slice(&(until.saturating_sub(1)).to_be_bytes()); // -1 because we don't want event at `until` - - let current: &[u8] = ¤t; - - let user_id = user_id.to_owned(); - - Ok(self - .pduid_pdu - .iter_from(current, true) - .take_while(move |(k, _)| k.starts_with(&prefix)) - .map(move |(pdu_id, v)| { - let mut pdu = serde_json::from_slice::(&v) - .map_err(|_| Error::bad_database("PDU in db is invalid."))?; - if pdu.sender != user_id { - pdu.remove_transaction_id()?; - } - Ok((pdu_id, pdu)) - })) - } - - /// Returns an iterator over all events and their token in a room that happened after the event - /// with id `from` in chronological order. - #[tracing::instrument(skip(self))] - pub fn pdus_after<'a>( - &'a self, - user_id: &UserId, - room_id: &RoomId, - from: u64, - ) -> Result, PduEvent)>> + 'a> { - // Create the first part of the full pdu id - let prefix = self - .get_shortroomid(room_id)? - .expect("room exists") - .to_be_bytes() - .to_vec(); - - let mut current = prefix.clone(); - current.extend_from_slice(&(from + 1).to_be_bytes()); // +1 so we don't send the base event - - let current: &[u8] = ¤t; - - let user_id = user_id.to_owned(); - - Ok(self - .pduid_pdu - .iter_from(current, false) - .take_while(move |(k, _)| k.starts_with(&prefix)) - .map(move |(pdu_id, v)| { - let mut pdu = serde_json::from_slice::(&v) - .map_err(|_| Error::bad_database("PDU in db is invalid."))?; - if pdu.sender != user_id { - pdu.remove_transaction_id()?; - } - Ok((pdu_id, pdu)) - })) - } - - /// Replace a PDU with the redacted form. - #[tracing::instrument(skip(self, reason))] - pub fn redact_pdu(&self, event_id: &EventId, reason: &PduEvent) -> Result<()> { - if let Some(pdu_id) = self.get_pdu_id(event_id)? { - let mut pdu = self - .get_pdu_from_id(&pdu_id)? - .ok_or_else(|| Error::bad_database("PDU ID points to invalid PDU."))?; - pdu.redact(reason)?; - self.replace_pdu(&pdu_id, &pdu)?; - } - // If event does not exist, just noop - Ok(()) - } - - - #[tracing::instrument(skip(self))] - pub fn search_pdus<'a>( + fn search_pdus<'a>( &'a self, room_id: &RoomId, search_string: &str, @@ -997,4 +64,3 @@ ) })) } - diff --git a/src/database/key_value/room/state.rs b/src/database/key_value/rooms/state.rs similarity index 100% rename from src/database/key_value/room/state.rs rename to src/database/key_value/rooms/state.rs diff --git a/src/service/rooms/search/data.rs b/src/service/rooms/search/data.rs new file mode 100644 index 00000000..1601e0de --- /dev/null +++ b/src/service/rooms/search/data.rs @@ -0,0 +1,9 @@ +pub trait Data { + pub fn index_pdu<'a>(&self, room_id: &RoomId, pdu_id: u64, message_body: String) -> Result<()>; + + pub fn search_pdus<'a>( + &'a self, + room_id: &RoomId, + search_string: &str, + ) -> Result> + 'a, Vec)>>; +} diff --git a/src/service/rooms/search/mod.rs b/src/service/rooms/search/mod.rs index ce055058..5478273c 100644 --- a/src/service/rooms/search/mod.rs +++ b/src/service/rooms/search/mod.rs @@ -1,50 +1,19 @@ +mod data; +pub use data::Data; +use crate::service::*; + +pub struct Service { + db: D, +} + +impl Service<_> { #[tracing::instrument(skip(self))] pub fn search_pdus<'a>( &'a self, room_id: &RoomId, search_string: &str, ) -> Result> + 'a, Vec)>> { - let prefix = self - .get_shortroomid(room_id)? - .expect("room exists") - .to_be_bytes() - .to_vec(); - let prefix_clone = prefix.clone(); - - let words: Vec<_> = search_string - .split_terminator(|c: char| !c.is_alphanumeric()) - .filter(|s| !s.is_empty()) - .map(str::to_lowercase) - .collect(); - - let iterators = words.clone().into_iter().map(move |word| { - let mut prefix2 = prefix.clone(); - prefix2.extend_from_slice(word.as_bytes()); - prefix2.push(0xff); - - let mut last_possible_id = prefix2.clone(); - last_possible_id.extend_from_slice(&u64::MAX.to_be_bytes()); - - self.tokenids - .iter_from(&last_possible_id, true) // Newest pdus first - .take_while(move |(k, _)| k.starts_with(&prefix2)) - .map(|(key, _)| key[key.len() - size_of::()..].to_vec()) - }); - - Ok(utils::common_elements(iterators, |a, b| { - // We compare b with a because we reversed the iterator earlier - b.cmp(a) - }) - .map(|iter| { - ( - iter.map(move |id| { - let mut pduid = prefix_clone.clone(); - pduid.extend_from_slice(&id); - pduid - }), - words, - ) - })) + self.db.search_pdus(room_id, search_string) } - +} diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index 6299b16c..5b423d2d 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -439,20 +439,7 @@ .map_err(|_| Error::bad_database("Invalid content in pdu."))?; if let Some(body) = content.body { - let mut batch = body - .split_terminator(|c: char| !c.is_alphanumeric()) - .filter(|s| !s.is_empty()) - .filter(|word| word.len() <= 50) - .map(str::to_lowercase) - .map(|word| { - let mut key = shortroomid.to_be_bytes().to_vec(); - key.extend_from_slice(word.as_bytes()); - key.push(0xff); - key.extend_from_slice(&pdu_id); - (key, Vec::new()) - }); - - self.tokenids.insert_batch(&mut batch)?; + DB.rooms.search.index_pdu(room_id, pdu_id, body)?; let admin_room = self.id_from_alias( <&RoomAliasId>::try_from(