impl crate::Service for Service

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-07-04 03:26:19 +00:00
parent 177c9e8bfa
commit e125af620e
44 changed files with 673 additions and 548 deletions

View file

@ -15,14 +15,18 @@ pub(super) async fn enable_room(_body: Vec<&str>, room_id: Box<RoomId>) -> Resul
}
pub(super) async fn incoming_federation(_body: Vec<&str>) -> Result<RoomMessageEventContent> {
let map = services().globals.roomid_federationhandletime.read().await;
let map = services()
.globals
.roomid_federationhandletime
.read()
.expect("locked");
let mut msg = format!("Handling {} incoming pdus:\n", map.len());
for (r, (e, i)) in map.iter() {
let elapsed = i.elapsed();
writeln!(msg, "{} {}: {}m{}s", r, e, elapsed.as_secs() / 60, elapsed.as_secs() % 60,)
.expect("should be able to write to string buffer");
writeln!(msg, "{} {}: {}m{}s", r, e, elapsed.as_secs() / 60, elapsed.as_secs() % 60)?;
}
Ok(RoomMessageEventContent::text_plain(&msg))
}

View file

@ -27,12 +27,12 @@ pub(super) async fn show_config(_body: Vec<&str>) -> Result<RoomMessageEventCont
}
pub(super) async fn memory_usage(_body: Vec<&str>) -> Result<RoomMessageEventContent> {
let response0 = services().memory_usage().await;
let response1 = services().globals.db.memory_usage();
let response0 = services().memory_usage().await?;
let response1 = services().db.db.memory_usage()?;
let response2 = conduit::alloc::memory_usage();
Ok(RoomMessageEventContent::text_plain(format!(
"Services:\n{response0}\n\nDatabase:\n{response1}\n{}",
"Services:\n{response0}\nDatabase:\n{response1}\n{}",
if !response2.is_empty() {
format!("Allocator:\n {response2}")
} else {
@ -41,14 +41,8 @@ pub(super) async fn memory_usage(_body: Vec<&str>) -> Result<RoomMessageEventCon
)))
}
pub(super) async fn clear_database_caches(_body: Vec<&str>, amount: u32) -> Result<RoomMessageEventContent> {
services().globals.db.clear_caches(amount);
Ok(RoomMessageEventContent::text_plain("Done."))
}
pub(super) async fn clear_service_caches(_body: Vec<&str>, amount: u32) -> Result<RoomMessageEventContent> {
services().clear_caches(amount).await;
pub(super) async fn clear_caches(_body: Vec<&str>) -> Result<RoomMessageEventContent> {
services().clear_cache().await;
Ok(RoomMessageEventContent::text_plain("Done."))
}

View file

@ -18,17 +18,8 @@ pub(super) enum ServerCommand {
/// - Print database memory usage statistics
MemoryUsage,
/// - Clears all of Conduit's database caches with index smaller than the
/// amount
ClearDatabaseCaches {
amount: u32,
},
/// - Clears all of Conduit's service caches with index smaller than the
/// amount
ClearServiceCaches {
amount: u32,
},
/// - Clears all of Conduwuit's caches
ClearCaches,
/// - Performs an online backup of the database (only available for RocksDB
/// at the moment)
@ -65,12 +56,7 @@ pub(super) async fn process(command: ServerCommand, body: Vec<&str>) -> Result<R
ServerCommand::Uptime => uptime(body).await?,
ServerCommand::ShowConfig => show_config(body).await?,
ServerCommand::MemoryUsage => memory_usage(body).await?,
ServerCommand::ClearDatabaseCaches {
amount,
} => clear_database_caches(body, amount).await?,
ServerCommand::ClearServiceCaches {
amount,
} => clear_service_caches(body, amount).await?,
ServerCommand::ClearCaches => clear_caches(body).await?,
ServerCommand::ListBackups => list_backups(body).await?,
ServerCommand::BackupDatabase => backup_database(body).await?,
ServerCommand::ListDatabaseFiles => list_database_files(body).await?,

View file

@ -334,7 +334,7 @@ pub(crate) async fn get_keys_helper<F: Fn(&UserId) -> bool + Send>(
.globals
.bad_query_ratelimiter
.write()
.await
.expect("locked")
.entry(id)
{
hash_map::Entry::Vacant(e) => {
@ -353,7 +353,7 @@ pub(crate) async fn get_keys_helper<F: Fn(&UserId) -> bool + Send>(
.globals
.bad_query_ratelimiter
.read()
.await
.expect("locked")
.get(server)
{
// Exponential backoff

View file

@ -1343,7 +1343,7 @@ pub async fn validate_and_add_event_id(
.globals
.bad_event_ratelimiter
.write()
.await
.expect("locked")
.entry(id)
{
Entry::Vacant(e) => {
@ -1359,7 +1359,7 @@ pub async fn validate_and_add_event_id(
.globals
.bad_event_ratelimiter
.read()
.await
.expect("locked")
.get(&event_id)
{
// Exponential backoff

View file

@ -2,9 +2,8 @@ mod data;
use std::{collections::HashMap, sync::Arc};
use conduit::{Result, Server};
use conduit::Result;
use data::Data;
use database::Database;
use ruma::{
events::{AnyEphemeralRoomEvent, RoomAccountDataEventType},
serde::Raw,
@ -15,13 +14,17 @@ pub struct Service {
db: Data,
}
impl Service {
pub fn build(_server: &Arc<Server>, db: &Arc<Database>) -> Result<Self> {
Ok(Self {
db: Data::new(db),
})
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
db: Data::new(args.db),
}))
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
/// Places one event in the account data of the user and removes the
/// previous entry.
#[allow(clippy::needless_pass_by_value)]

View file

@ -4,9 +4,9 @@ mod grant;
use std::{future::Future, pin::Pin, sync::Arc};
use conduit::{error, utils::mutex_map, Error, Result, Server};
use async_trait::async_trait;
use conduit::{error, utils::mutex_map, Error, Result};
pub use create::create_admin_room;
use database::Database;
pub use grant::make_user_admin;
use loole::{Receiver, Sender};
use ruma::{
@ -43,8 +43,9 @@ pub struct Command {
pub reply_id: Option<OwnedEventId>,
}
impl Service {
pub fn build(_server: &Arc<Server>, _db: &Arc<Database>) -> Result<Arc<Self>> {
#[async_trait]
impl crate::Service for Service {
fn build(_args: crate::Args<'_>) -> Result<Arc<Self>> {
let (sender, receiver) = loole::bounded(COMMAND_QUEUE_LIMIT);
Ok(Arc::new(Self {
sender,
@ -56,8 +57,8 @@ impl Service {
}))
}
pub async fn start_handler(self: &Arc<Self>) {
let self_ = Arc::clone(self);
async fn start(self: Arc<Self>) -> Result<()> {
let self_ = Arc::clone(&self);
let handle = services().server.runtime().spawn(async move {
self_
.handler()
@ -66,9 +67,11 @@ impl Service {
});
_ = self.handler_join.lock().await.insert(handle);
Ok(())
}
pub fn interrupt(&self) {
fn interrupt(&self) {
#[cfg(feature = "console")]
self.console.interrupt();
@ -77,7 +80,7 @@ impl Service {
}
}
pub async fn close(&self) {
async fn stop(&self) {
self.interrupt();
#[cfg(feature = "console")]
@ -90,6 +93,10 @@ impl Service {
}
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
pub async fn send_text(&self, body: &str) {
self.send_message(RoomMessageEventContent::text_markdown(body))
.await;

View file

@ -2,9 +2,8 @@ mod data;
use std::{collections::BTreeMap, sync::Arc};
use conduit::{Result, Server};
use conduit::Result;
use data::Data;
use database::Database;
use futures_util::Future;
use regex::RegexSet;
use ruma::{
@ -119,10 +118,10 @@ pub struct Service {
registration_info: RwLock<BTreeMap<String, RegistrationInfo>>,
}
impl Service {
pub fn build(_server: &Arc<Server>, db: &Arc<Database>) -> Result<Self> {
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
let mut registration_info = BTreeMap::new();
let db = Data::new(db);
let db = Data::new(args.db);
// Inserting registrations into cache
for appservice in iter_ids(&db)? {
registration_info.insert(
@ -134,12 +133,16 @@ impl Service {
);
}
Ok(Self {
Ok(Arc::new(Self {
db,
registration_info: RwLock::new(registration_info),
})
}))
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
pub fn all(&self) -> Result<Vec<(String, Registration)>> { iter_ids(&self.db) }
/// Registers an appservice and returns the ID to the caller

View file

@ -208,41 +208,6 @@ impl Data {
pub fn cleanup(&self) -> Result<()> { self.db.db.cleanup() }
pub fn memory_usage(&self) -> String {
let (auth_chain_cache, max_auth_chain_cache) = services().rooms.auth_chain.get_cache_usage();
let (appservice_in_room_cache, max_appservice_in_room_cache) = services()
.rooms
.state_cache
.get_appservice_in_room_cache_usage();
let (lasttimelinecount_cache, max_lasttimelinecount_cache) = services()
.rooms
.timeline
.get_lasttimelinecount_cache_usage();
format!(
"auth_chain_cache: {auth_chain_cache} / {max_auth_chain_cache}\nappservice_in_room_cache: \
{appservice_in_room_cache} / {max_appservice_in_room_cache}\nlasttimelinecount_cache: \
{lasttimelinecount_cache} / {max_lasttimelinecount_cache}\n\n{}",
self.db.db.memory_usage().unwrap_or_default()
)
}
#[allow(clippy::unused_self)]
pub fn clear_caches(&self, amount: u32) {
if amount > 1 {
services().rooms.auth_chain.clear_cache();
}
if amount > 2 {
services()
.rooms
.state_cache
.clear_appservice_in_room_cache();
}
if amount > 3 {
services().rooms.timeline.clear_lasttimelinecount_cache();
}
}
pub fn load_keypair(&self) -> Result<Ed25519KeyPair> {
let keypair_bytes = self.global.get(b"keypair")?.map_or_else(
|| {

View file

@ -7,13 +7,13 @@ pub(super) mod updates;
use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
fmt::Write,
sync::{Arc, RwLock},
time::Instant,
};
use conduit::{error, trace, utils::MutexMap, Config, Result, Server};
use conduit::{error, trace, utils::MutexMap, Config, Result};
use data::Data;
use database::Database;
use ipaddress::IPAddress;
use regex::RegexSet;
use ruma::{
@ -25,10 +25,7 @@ use ruma::{
DeviceId, OwnedEventId, OwnedRoomAliasId, OwnedRoomId, OwnedServerName, OwnedServerSigningKeyId, OwnedUserId,
RoomAliasId, RoomVersionId, ServerName, UserId,
};
use tokio::{
sync::{Mutex, RwLock},
task::JoinHandle,
};
use tokio::{sync::Mutex, task::JoinHandle};
use url::Url;
use crate::services;
@ -59,10 +56,10 @@ pub struct Service {
pub admin_alias: OwnedRoomAliasId,
}
impl Service {
pub fn build(server: &Arc<Server>, db: &Arc<Database>) -> Result<Self> {
let config = &server.config;
let db = Data::new(db);
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
let config = &args.server.config;
let db = Data::new(args.db);
let keypair = db.load_keypair();
let keypair = match keypair {
@ -133,9 +130,59 @@ impl Service {
s.config.default_room_version = crate::config::default_default_room_version();
};
Ok(s)
Ok(Arc::new(s))
}
fn memory_usage(&self, out: &mut dyn Write) -> Result<()> {
self.resolver.memory_usage(out)?;
let bad_event_ratelimiter = self
.bad_event_ratelimiter
.read()
.expect("locked for reading")
.len();
writeln!(out, "bad_event_ratelimiter: {bad_event_ratelimiter}")?;
let bad_query_ratelimiter = self
.bad_query_ratelimiter
.read()
.expect("locked for reading")
.len();
writeln!(out, "bad_query_ratelimiter: {bad_query_ratelimiter}")?;
let bad_signature_ratelimiter = self
.bad_signature_ratelimiter
.read()
.expect("locked for reading")
.len();
writeln!(out, "bad_signature_ratelimiter: {bad_signature_ratelimiter}")?;
Ok(())
}
fn clear_cache(&self) {
self.resolver.clear_cache();
self.bad_event_ratelimiter
.write()
.expect("locked for writing")
.clear();
self.bad_query_ratelimiter
.write()
.expect("locked for writing")
.clear();
self.bad_signature_ratelimiter
.write()
.expect("locked for writing")
.clear();
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
/// Returns this server's keypair.
pub fn keypair(&self) -> &ruma::signatures::Ed25519KeyPair { &self.keypair }

View file

@ -1,12 +1,13 @@
use std::{
collections::HashMap,
fmt::Write,
future, iter,
net::{IpAddr, SocketAddr},
sync::{Arc, RwLock},
time::Duration,
};
use conduit::{error, Config, Error};
use conduit::{error, Config, Error, Result};
use hickory_resolver::TokioAsyncResolver;
use reqwest::dns::{Addrs, Name, Resolve, Resolving};
use ruma::OwnedServerName;
@ -30,7 +31,7 @@ pub struct Hooked {
impl Resolver {
#[allow(clippy::as_conversions, clippy::cast_sign_loss, clippy::cast_possible_truncation)]
pub fn new(config: &Config) -> Self {
pub(super) fn new(config: &Config) -> Self {
let (sys_conf, mut opts) = hickory_resolver::system_conf::read_system_conf()
.map_err(|e| {
error!("Failed to set up hickory dns resolver with system config: {e}");
@ -92,6 +93,22 @@ impl Resolver {
}),
}
}
pub(super) fn memory_usage(&self, out: &mut dyn Write) -> Result<()> {
let resolver_overrides_cache = self.overrides.read().expect("locked for reading").len();
writeln!(out, "resolver_overrides_cache: {resolver_overrides_cache}")?;
let resolver_destinations_cache = self.destinations.read().expect("locked for reading").len();
writeln!(out, "resolver_destinations_cache: {resolver_destinations_cache}")?;
Ok(())
}
pub(super) fn clear_cache(&self) {
self.overrides.write().expect("write locked").clear();
self.destinations.write().expect("write locked").clear();
self.resolver.clear_cache();
}
}
impl Resolve for Resolver {

View file

@ -2,9 +2,8 @@ mod data;
use std::{collections::BTreeMap, sync::Arc};
use conduit::{Result, Server};
use conduit::Result;
use data::Data;
use database::Database;
use ruma::{
api::client::backup::{BackupAlgorithm, KeyBackupData, RoomKeyBackup},
serde::Raw,
@ -15,13 +14,17 @@ pub struct Service {
db: Data,
}
impl Service {
pub fn build(_server: &Arc<Server>, db: &Arc<Database>) -> Result<Self> {
Ok(Self {
db: Data::new(db),
})
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
db: Data::new(args.db),
}))
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
pub fn create_backup(&self, user_id: &UserId, backup_metadata: &Raw<BackupAlgorithm>) -> Result<String> {
self.db.create_backup(user_id, backup_metadata)
}

View file

@ -6,7 +6,6 @@ use std::{collections::HashMap, io::Cursor, path::PathBuf, sync::Arc, time::Syst
use base64::{engine::general_purpose, Engine as _};
use conduit::{debug, debug_error, error, utils, Error, Result, Server};
use data::Data;
use database::Database;
use image::imageops::FilterType;
use ruma::{OwnedMxcUri, OwnedUserId};
use serde::Serialize;
@ -48,15 +47,19 @@ pub struct Service {
pub url_preview_mutex: RwLock<HashMap<String, Arc<Mutex<()>>>>,
}
impl Service {
pub fn build(server: &Arc<Server>, db: &Arc<Database>) -> Result<Self> {
Ok(Self {
server: server.clone(),
db: Data::new(db),
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
server: args.server.clone(),
db: Data::new(args.db),
url_preview_mutex: RwLock::new(HashMap::new()),
})
}))
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
/// Uploads a file.
pub async fn create(
&self, sender_user: Option<OwnedUserId>, mxc: &str, content_disposition: Option<&str>,

View file

@ -1,3 +1,6 @@
#![allow(refining_impl_trait)]
mod service;
pub mod services;
pub mod account_data;
@ -22,6 +25,7 @@ use std::sync::{Arc, RwLock};
pub(crate) use conduit::{config, debug_error, debug_info, debug_warn, utils, Config, Error, Result, Server};
pub use conduit::{pdu, PduBuilder, PduCount, PduEvent};
use database::Database;
pub(crate) use service::{Args, Service};
pub use crate::{
globals::{server_is_ours, user_is_local},
@ -36,7 +40,7 @@ static SERVICES: RwLock<Option<&Services>> = RwLock::new(None);
#[allow(clippy::let_underscore_must_use)]
pub async fn init(server: &Arc<Server>) -> Result<()> {
let d = Arc::new(Database::open(server).await?);
let s = Box::new(Services::build(server.clone(), d.clone()).await?);
let s = Box::new(Services::build(server.clone(), d)?);
_ = SERVICES.write().expect("write locked").insert(Box::leak(s));
Ok(())

View file

@ -2,9 +2,9 @@ mod data;
use std::{sync::Arc, time::Duration};
use conduit::{debug, error, utils, Error, Result, Server};
use async_trait::async_trait;
use conduit::{debug, error, utils, Error, Result};
use data::Data;
use database::Database;
use futures_util::{stream::FuturesUnordered, StreamExt};
use ruma::{
events::presence::{PresenceEvent, PresenceEventContent},
@ -81,12 +81,13 @@ pub struct Service {
timeout_remote_users: bool,
}
impl Service {
pub fn build(server: &Arc<Server>, db: &Arc<Database>) -> Result<Arc<Self>> {
let config = &server.config;
#[async_trait]
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
let config = &args.server.config;
let (timer_sender, timer_receiver) = loole::unbounded();
Ok(Arc::new(Self {
db: Data::new(db),
db: Data::new(args.db),
timer_sender,
timer_receiver: Mutex::new(timer_receiver),
handler_join: Mutex::new(None),
@ -94,8 +95,10 @@ impl Service {
}))
}
pub async fn start_handler(self: &Arc<Self>) {
let self_ = Arc::clone(self);
async fn start(self: Arc<Self>) -> Result<()> {
//TODO: if self.globals.config.allow_local_presence { return; }
let self_ = Arc::clone(&self);
let handle = services().server.runtime().spawn(async move {
self_
.handler()
@ -104,9 +107,11 @@ impl Service {
});
_ = self.handler_join.lock().await.insert(handle);
Ok(())
}
pub async fn close(&self) {
async fn stop(&self) {
self.interrupt();
if let Some(handler_join) = self.handler_join.lock().await.take() {
if let Err(e) = handler_join.await {
@ -115,12 +120,16 @@ impl Service {
}
}
pub fn interrupt(&self) {
fn interrupt(&self) {
if !self.timer_sender.is_closed() {
self.timer_sender.close();
}
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
/// Returns the latest presence event for the given user.
pub fn get_presence(&self, user_id: &UserId) -> Result<Option<PresenceEvent>> {
if let Some((_, presence)) = self.db.get_presence(user_id)? {

View file

@ -3,9 +3,8 @@ mod data;
use std::{fmt::Debug, mem, sync::Arc};
use bytes::BytesMut;
use conduit::{debug_info, info, trace, warn, Error, Result, Server};
use conduit::{debug_info, info, trace, warn, Error, Result};
use data::Data;
use database::Database;
use ipaddress::IPAddress;
use ruma::{
api::{
@ -30,13 +29,17 @@ pub struct Service {
db: Data,
}
impl Service {
pub fn build(_server: &Arc<Server>, db: &Arc<Database>) -> Result<Self> {
Ok(Self {
db: Data::new(db),
})
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
db: Data::new(args.db),
}))
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
pub fn set_pusher(&self, sender: &UserId, pusher: &set_pusher::v3::PusherAction) -> Result<()> {
self.db.set_pusher(sender, pusher)
}

View file

@ -3,9 +3,8 @@ mod remote;
use std::sync::Arc;
use conduit::{Error, Result, Server};
use conduit::{Error, Result};
use data::Data;
use database::Database;
use ruma::{
api::{appservice, client::error::ErrorKind},
events::{
@ -21,13 +20,17 @@ pub struct Service {
db: Data,
}
impl Service {
pub fn build(_server: &Arc<Server>, db: &Arc<Database>) -> Result<Self> {
Ok(Self {
db: Data::new(db),
})
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
db: Data::new(args.db),
}))
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
#[tracing::instrument(skip(self))]
pub fn set_alias(&self, alias: &RoomAliasId, room_id: &RoomId, user_id: &UserId) -> Result<()> {
if alias == services().globals.admin_alias && user_id != services().globals.server_user {

View file

@ -5,9 +5,8 @@ use std::{
sync::Arc,
};
use conduit::{debug, error, trace, warn, Error, Result, Server};
use conduit::{debug, error, trace, warn, Error, Result};
use data::Data;
use database::Database;
use ruma::{api::client::error::ErrorKind, EventId, RoomId};
use crate::services;
@ -16,13 +15,17 @@ pub struct Service {
db: Data,
}
impl Service {
pub fn build(server: &Arc<Server>, db: &Arc<Database>) -> Result<Self> {
Ok(Self {
db: Data::new(server, db),
})
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
db: Data::new(args.server, args.db),
}))
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
pub async fn event_ids_iter<'a>(
&self, room_id: &RoomId, starting_events_: Vec<Arc<EventId>>,
) -> Result<impl Iterator<Item = Arc<EventId>> + 'a> {

View file

@ -2,9 +2,7 @@ mod data;
use std::sync::Arc;
use conduit::Server;
use data::Data;
use database::Database;
use ruma::{OwnedRoomId, RoomId};
use crate::Result;
@ -13,13 +11,17 @@ pub struct Service {
db: Data,
}
impl Service {
pub fn build(_server: &Arc<Server>, db: &Arc<Database>) -> Result<Self> {
Ok(Self {
db: Data::new(db),
})
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
db: Data::new(args.db),
}))
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
#[tracing::instrument(skip(self))]
pub fn set_public(&self, room_id: &RoomId) -> Result<()> { self.db.set_public(room_id) }

View file

@ -9,8 +9,7 @@ use std::{
time::{Duration, Instant},
};
use conduit::{debug_error, debug_info, Error, Result, Server};
use database::Database;
use conduit::{debug_error, debug_info, Error, Result};
use futures_util::Future;
pub use parse_incoming_pdu::parse_incoming_pdu;
use ruma::{
@ -45,9 +44,13 @@ type AsyncRecursiveCanonicalJsonVec<'a> =
type AsyncRecursiveCanonicalJsonResult<'a> =
AsyncRecursiveType<'a, Result<(Arc<PduEvent>, BTreeMap<String, CanonicalJsonValue>)>>;
impl Service {
pub fn build(_server: &Arc<Server>, _db: &Arc<Database>) -> Result<Self> { Ok(Self {}) }
impl crate::Service for Service {
fn build(_args: crate::Args<'_>) -> Result<Arc<Self>> { Ok(Arc::new(Self {})) }
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
/// When receiving an event one needs to:
/// 0. Check the server is in the room
/// 1. Skip the PDU if we already know about it
@ -180,7 +183,7 @@ impl Service {
.globals
.bad_event_ratelimiter
.write()
.await
.expect("locked")
.entry((*prev_id).to_owned())
{
hash_map::Entry::Vacant(e) => {
@ -200,7 +203,7 @@ impl Service {
.globals
.roomid_federationhandletime
.write()
.await
.expect("locked")
.insert(room_id.to_owned(), (event_id.to_owned(), start_time));
let r = self
@ -211,7 +214,7 @@ impl Service {
.globals
.roomid_federationhandletime
.write()
.await
.expect("locked")
.remove(&room_id.to_owned());
r
@ -245,7 +248,7 @@ impl Service {
.globals
.bad_event_ratelimiter
.read()
.await
.expect("locked")
.get(prev_id)
{
// Exponential backoff
@ -274,7 +277,7 @@ impl Service {
.globals
.roomid_federationhandletime
.write()
.await
.expect("locked")
.insert(room_id.to_owned(), ((*prev_id).to_owned(), start_time));
self.upgrade_outlier_to_timeline_pdu(pdu, json, create_event, origin, room_id, pub_key_map)
@ -284,7 +287,7 @@ impl Service {
.globals
.roomid_federationhandletime
.write()
.await
.expect("locked")
.remove(&room_id.to_owned());
debug!(
@ -1043,7 +1046,7 @@ impl Service {
.globals
.bad_event_ratelimiter
.write()
.await
.expect("locked")
.entry(id)
{
hash_map::Entry::Vacant(e) => {
@ -1076,7 +1079,7 @@ impl Service {
.globals
.bad_event_ratelimiter
.read()
.await
.expect("locked")
.get(&*next_id)
{
// Exponential backoff
@ -1184,7 +1187,7 @@ impl Service {
.globals
.bad_event_ratelimiter
.read()
.await
.expect("locked")
.get(&**next_id)
{
// Exponential backoff

View file

@ -2,14 +2,12 @@ mod data;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
fmt::Write,
sync::{Arc, Mutex},
};
use conduit::Server;
use data::Data;
use database::Database;
use ruma::{DeviceId, OwnedDeviceId, OwnedRoomId, OwnedUserId, RoomId, UserId};
use tokio::sync::Mutex;
use crate::{PduCount, Result};
@ -20,14 +18,27 @@ pub struct Service {
pub lazy_load_waiting: Mutex<HashMap<(OwnedUserId, OwnedDeviceId, OwnedRoomId, PduCount), HashSet<OwnedUserId>>>,
}
impl Service {
pub fn build(_server: &Arc<Server>, db: &Arc<Database>) -> Result<Self> {
Ok(Self {
db: Data::new(db),
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
db: Data::new(args.db),
lazy_load_waiting: Mutex::new(HashMap::new()),
})
}))
}
fn memory_usage(&self, out: &mut dyn Write) -> Result<()> {
let lazy_load_waiting = self.lazy_load_waiting.lock().expect("locked").len();
writeln!(out, "lazy_load_waiting: {lazy_load_waiting}")?;
Ok(())
}
fn clear_cache(&self) { self.lazy_load_waiting.lock().expect("locked").clear(); }
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
#[tracing::instrument(skip(self))]
pub fn lazy_load_was_sent_before(
&self, user_id: &UserId, device_id: &DeviceId, room_id: &RoomId, ll_user: &UserId,
@ -43,7 +54,7 @@ impl Service {
) {
self.lazy_load_waiting
.lock()
.await
.expect("locked")
.insert((user_id.to_owned(), device_id.to_owned(), room_id.to_owned(), count), lazy_load);
}
@ -51,7 +62,7 @@ impl Service {
pub async fn lazy_load_confirm_delivery(
&self, user_id: &UserId, device_id: &DeviceId, room_id: &RoomId, since: PduCount,
) -> Result<()> {
if let Some(user_ids) = self.lazy_load_waiting.lock().await.remove(&(
if let Some(user_ids) = self.lazy_load_waiting.lock().expect("locked").remove(&(
user_id.to_owned(),
device_id.to_owned(),
room_id.to_owned(),

View file

@ -2,22 +2,25 @@ mod data;
use std::sync::Arc;
use conduit::{Result, Server};
use conduit::Result;
use data::Data;
use database::Database;
use ruma::{OwnedRoomId, RoomId};
pub struct Service {
db: Data,
}
impl Service {
pub fn build(_server: &Arc<Server>, db: &Arc<Database>) -> Result<Self> {
Ok(Self {
db: Data::new(db),
})
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
db: Data::new(args.db),
}))
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
/// Checks if a room exists.
#[tracing::instrument(skip(self))]
pub fn exists(&self, room_id: &RoomId) -> Result<bool> { self.db.exists(room_id) }

View file

@ -19,25 +19,27 @@ pub mod timeline;
pub mod typing;
pub mod user;
use std::sync::Arc;
pub struct Service {
pub alias: alias::Service,
pub auth_chain: auth_chain::Service,
pub directory: directory::Service,
pub event_handler: event_handler::Service,
pub lazy_loading: lazy_loading::Service,
pub metadata: metadata::Service,
pub outlier: outlier::Service,
pub pdu_metadata: pdu_metadata::Service,
pub read_receipt: read_receipt::Service,
pub search: search::Service,
pub short: short::Service,
pub state: state::Service,
pub state_accessor: state_accessor::Service,
pub state_cache: state_cache::Service,
pub state_compressor: state_compressor::Service,
pub timeline: timeline::Service,
pub threads: threads::Service,
pub typing: typing::Service,
pub spaces: spaces::Service,
pub user: user::Service,
pub alias: Arc<alias::Service>,
pub auth_chain: Arc<auth_chain::Service>,
pub directory: Arc<directory::Service>,
pub event_handler: Arc<event_handler::Service>,
pub lazy_loading: Arc<lazy_loading::Service>,
pub metadata: Arc<metadata::Service>,
pub outlier: Arc<outlier::Service>,
pub pdu_metadata: Arc<pdu_metadata::Service>,
pub read_receipt: Arc<read_receipt::Service>,
pub search: Arc<search::Service>,
pub short: Arc<short::Service>,
pub state: Arc<state::Service>,
pub state_accessor: Arc<state_accessor::Service>,
pub state_cache: Arc<state_cache::Service>,
pub state_compressor: Arc<state_compressor::Service>,
pub timeline: Arc<timeline::Service>,
pub threads: Arc<threads::Service>,
pub typing: Arc<typing::Service>,
pub spaces: Arc<spaces::Service>,
pub user: Arc<user::Service>,
}

View file

@ -2,9 +2,8 @@ mod data;
use std::sync::Arc;
use conduit::{Result, Server};
use conduit::Result;
use data::Data;
use database::Database;
use ruma::{CanonicalJsonObject, EventId};
use crate::PduEvent;
@ -13,13 +12,17 @@ pub struct Service {
db: Data,
}
impl Service {
pub fn build(_server: &Arc<Server>, db: &Arc<Database>) -> Result<Self> {
Ok(Self {
db: Data::new(db),
})
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
db: Data::new(args.db),
}))
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
/// Returns the pdu from the outlier tree.
pub fn get_outlier_pdu_json(&self, event_id: &EventId) -> Result<Option<CanonicalJsonObject>> {
self.db.get_outlier_pdu_json(event_id)

View file

@ -2,9 +2,8 @@ mod data;
use std::sync::Arc;
use conduit::{Result, Server};
use conduit::Result;
use data::Data;
use database::Database;
use ruma::{
api::{client::relations::get_relating_events, Direction},
events::{relation::RelationType, TimelineEventType},
@ -28,13 +27,17 @@ struct ExtractRelatesToEventId {
relates_to: ExtractRelType,
}
impl Service {
pub fn build(_server: &Arc<Server>, db: &Arc<Database>) -> Result<Self> {
Ok(Self {
db: Data::new(db),
})
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
db: Data::new(args.db),
}))
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
#[tracing::instrument(skip(self, from, to))]
pub fn add_relation(&self, from: PduCount, to: PduCount) -> Result<()> {
match (from, to) {

View file

@ -2,9 +2,8 @@ mod data;
use std::sync::Arc;
use conduit::{Result, Server};
use conduit::Result;
use data::Data;
use database::Database;
use ruma::{events::receipt::ReceiptEvent, serde::Raw, OwnedUserId, RoomId, UserId};
use crate::services;
@ -13,13 +12,17 @@ pub struct Service {
db: Data,
}
impl Service {
pub fn build(_server: &Arc<Server>, db: &Arc<Database>) -> Result<Self> {
Ok(Self {
db: Data::new(db),
})
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
db: Data::new(args.db),
}))
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
/// Replaces the previous read receipt.
pub fn readreceipt_update(&self, user_id: &UserId, room_id: &RoomId, event: &ReceiptEvent) -> Result<()> {
self.db.readreceipt_update(user_id, room_id, event)?;

View file

@ -2,22 +2,25 @@ mod data;
use std::sync::Arc;
use conduit::{Result, Server};
use conduit::Result;
use data::Data;
use database::Database;
use ruma::RoomId;
pub struct Service {
db: Data,
}
impl Service {
pub fn build(_server: &Arc<Server>, db: &Arc<Database>) -> Result<Self> {
Ok(Self {
db: Data::new(db),
})
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
db: Data::new(args.db),
}))
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
#[tracing::instrument(skip(self))]
pub fn index_pdu(&self, shortroomid: u64, pdu_id: &[u8], message_body: &str) -> Result<()> {
self.db.index_pdu(shortroomid, pdu_id, message_body)

View file

@ -2,22 +2,25 @@ mod data;
use std::sync::Arc;
use conduit::{Result, Server};
use conduit::Result;
use data::Data;
use database::Database;
use ruma::{events::StateEventType, EventId, RoomId};
pub struct Service {
db: Data,
}
impl Service {
pub fn build(_server: &Arc<Server>, db: &Arc<Database>) -> Result<Self> {
Ok(Self {
db: Data::new(db),
})
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
db: Data::new(args.db),
}))
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
pub fn get_or_create_shorteventid(&self, event_id: &EventId) -> Result<u64> {
self.db.get_or_create_shorteventid(event_id)
}

View file

@ -7,8 +7,7 @@ use std::{
sync::Arc,
};
use conduit::{debug_info, Server};
use database::Database;
use conduit::debug_info;
use lru_cache::LruCache;
use ruma::{
api::{
@ -159,17 +158,21 @@ impl From<CachedSpaceHierarchySummary> for SpaceHierarchyRoomsChunk {
}
}
impl Service {
pub fn build(server: &Arc<Server>, _db: &Arc<Database>) -> Result<Self> {
let config = &server.config;
Ok(Self {
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
let config = &args.server.config;
Ok(Arc::new(Self {
roomid_spacehierarchy_cache: Mutex::new(LruCache::new(
(f64::from(config.roomid_spacehierarchy_cache_capacity) * config.conduit_cache_capacity_modifier)
as usize,
)),
})
}))
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
/// Gets the response for the space hierarchy over federation request
///
/// Errors if the room does not exist, so a check if the room exists should

View file

@ -7,10 +7,9 @@ use std::{
use conduit::{
utils::{calculate_hash, mutex_map},
warn, Error, Result, Server,
warn, Error, Result,
};
use data::Data;
use database::Database;
use ruma::{
api::client::error::ErrorKind,
events::{
@ -29,13 +28,17 @@ pub struct Service {
db: Data,
}
impl Service {
pub fn build(_server: &Arc<Server>, db: &Arc<Database>) -> Result<Self> {
Ok(Self {
db: Data::new(db),
})
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
db: Data::new(args.db),
}))
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
/// Set the room to the given statehash and update caches.
pub async fn force_state(
&self,

View file

@ -2,12 +2,12 @@ mod data;
use std::{
collections::HashMap,
fmt::Write,
sync::{Arc, Mutex as StdMutex, Mutex},
};
use conduit::{error, utils::mutex_map, warn, Error, Result, Server};
use conduit::{error, utils::mutex_map, warn, Error, Result};
use data::Data;
use database::Database;
use lru_cache::LruCache;
use ruma::{
events::{
@ -41,20 +41,39 @@ pub struct Service {
pub user_visibility_cache: Mutex<LruCache<(OwnedUserId, u64), bool>>,
}
impl Service {
pub fn build(server: &Arc<Server>, db: &Arc<Database>) -> Result<Self> {
let config = &server.config;
Ok(Self {
db: Data::new(db),
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
let config = &args.server.config;
Ok(Arc::new(Self {
db: Data::new(args.db),
server_visibility_cache: StdMutex::new(LruCache::new(
(f64::from(config.server_visibility_cache_capacity) * config.conduit_cache_capacity_modifier) as usize,
)),
user_visibility_cache: StdMutex::new(LruCache::new(
(f64::from(config.user_visibility_cache_capacity) * config.conduit_cache_capacity_modifier) as usize,
)),
})
}))
}
fn memory_usage(&self, out: &mut dyn Write) -> Result<()> {
let server_visibility_cache = self.server_visibility_cache.lock().expect("locked").len();
writeln!(out, "server_visibility_cache: {server_visibility_cache}")?;
let user_visibility_cache = self.user_visibility_cache.lock().expect("locked").len();
writeln!(out, "user_visibility_cache: {user_visibility_cache}")?;
Ok(())
}
fn clear_cache(&self) {
self.server_visibility_cache.lock().expect("locked").clear();
self.user_visibility_cache.lock().expect("locked").clear();
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
/// Builds a StateMap by iterating over all keys that start
/// with state_hash, this gives the full state for the given state_hash.
#[tracing::instrument(skip(self))]

View file

@ -2,9 +2,8 @@ mod data;
use std::sync::Arc;
use conduit::{error, warn, Error, Result, Server};
use conduit::{error, warn, Error, Result};
use data::Data;
use database::Database;
use itertools::Itertools;
use ruma::{
events::{
@ -28,13 +27,17 @@ pub struct Service {
db: Data,
}
impl Service {
pub fn build(_server: &Arc<Server>, db: &Arc<Database>) -> Result<Self> {
Ok(Self {
db: Data::new(db),
})
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
db: Data::new(args.db),
}))
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
/// Update current membership data.
#[tracing::instrument(skip(self, last_state))]
#[allow(clippy::too_many_arguments)]

View file

@ -2,13 +2,13 @@ mod data;
use std::{
collections::HashSet,
fmt::Write,
mem::size_of,
sync::{Arc, Mutex as StdMutex, Mutex},
};
use conduit::{utils, Result, Server};
use conduit::{utils, Result};
use data::Data;
use database::Database;
use lru_cache::LruCache;
use ruma::{EventId, RoomId};
@ -52,17 +52,30 @@ pub struct Service {
pub stateinfo_cache: StateInfoLruCache,
}
impl Service {
pub fn build(server: &Arc<Server>, db: &Arc<Database>) -> Result<Self> {
let config = &server.config;
Ok(Self {
db: Data::new(db),
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
let config = &args.server.config;
Ok(Arc::new(Self {
db: Data::new(args.db),
stateinfo_cache: StdMutex::new(LruCache::new(
(f64::from(config.stateinfo_cache_capacity) * config.conduit_cache_capacity_modifier) as usize,
)),
})
}))
}
fn memory_usage(&self, out: &mut dyn Write) -> Result<()> {
let stateinfo_cache = self.stateinfo_cache.lock().expect("locked").len();
writeln!(out, "stateinfo_cache: {stateinfo_cache}")?;
Ok(())
}
fn clear_cache(&self) { self.stateinfo_cache.lock().expect("locked").clear(); }
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
/// Returns a stack with info on shortstatehash, full state, added diff and
/// removed diff for the selected shortstatehash and each parent layer.
#[tracing::instrument(skip(self))]

View file

@ -2,9 +2,8 @@ mod data;
use std::{collections::BTreeMap, sync::Arc};
use conduit::{Error, Result, Server};
use conduit::{Error, Result};
use data::Data;
use database::Database;
use ruma::{
api::client::{error::ErrorKind, threads::get_threads::v1::IncludeThreads},
events::relation::BundledThread,
@ -18,13 +17,17 @@ pub struct Service {
db: Data,
}
impl Service {
pub fn build(_server: &Arc<Server>, db: &Arc<Database>) -> Result<Self> {
Ok(Self {
db: Data::new(db),
})
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
db: Data::new(args.db),
}))
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
pub fn threads_until<'a>(
&'a self, user_id: &'a UserId, room_id: &'a RoomId, until: u64, include: &'a IncludeThreads,
) -> Result<impl Iterator<Item = Result<(u64, PduEvent)>> + 'a> {

View file

@ -2,12 +2,12 @@ mod data;
use std::{
collections::{BTreeMap, HashSet},
fmt::Write,
sync::Arc,
};
use conduit::{debug, error, info, utils, utils::mutex_map, warn, Error, Result, Server};
use conduit::{debug, error, info, utils, utils::mutex_map, warn, Error, Result};
use data::Data;
use database::Database;
use itertools::Itertools;
use rand::prelude::SliceRandom;
use ruma::{
@ -68,13 +68,37 @@ pub struct Service {
db: Data,
}
impl Service {
pub fn build(_server: &Arc<Server>, db: &Arc<Database>) -> Result<Self> {
Ok(Self {
db: Data::new(db),
})
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
db: Data::new(args.db),
}))
}
fn memory_usage(&self, out: &mut dyn Write) -> Result<()> {
let lasttimelinecount_cache = self
.db
.lasttimelinecount_cache
.lock()
.expect("locked")
.len();
writeln!(out, "lasttimelinecount_cache: {lasttimelinecount_cache}")?;
Ok(())
}
fn clear_cache(&self) {
self.db
.lasttimelinecount_cache
.lock()
.expect("locked")
.clear();
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
#[tracing::instrument(skip(self))]
pub fn first_pdu_in_room(&self, room_id: &RoomId) -> Result<Option<Arc<PduEvent>>> {
self.all_pdus(user_id!("@doesntmatter:conduit.rs"), room_id)?
@ -1238,19 +1262,6 @@ impl Service {
debug!("Prepended backfill pdu");
Ok(())
}
pub fn get_lasttimelinecount_cache_usage(&self) -> (usize, usize) {
let cache = self.db.lasttimelinecount_cache.lock().expect("locked");
(cache.len(), cache.capacity())
}
pub fn clear_lasttimelinecount_cache(&self) {
self.db
.lasttimelinecount_cache
.lock()
.expect("locked")
.clear();
}
}
#[cfg(test)]

View file

@ -1,7 +1,6 @@
use std::{collections::BTreeMap, sync::Arc};
use conduit::{debug_info, trace, utils, Result, Server};
use database::Database;
use conduit::{debug_info, trace, utils, Result};
use ruma::{
api::federation::transactions::edu::{Edu, TypingContent},
events::SyncEphemeralRoomEvent,
@ -19,15 +18,19 @@ pub struct Service {
pub typing_update_sender: broadcast::Sender<OwnedRoomId>,
}
impl Service {
pub fn build(_server: &Arc<Server>, _db: &Arc<Database>) -> Result<Self> {
Ok(Self {
impl crate::Service for Service {
fn build(_args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
typing: RwLock::new(BTreeMap::new()),
last_typing_update: RwLock::new(BTreeMap::new()),
typing_update_sender: broadcast::channel(100).0,
})
}))
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
/// Sets a user as typing until the timeout timestamp is reached or
/// roomtyping_remove is called.
pub async fn typing_add(&self, user_id: &UserId, room_id: &RoomId, timeout: u64) -> Result<()> {

View file

@ -2,22 +2,25 @@ mod data;
use std::sync::Arc;
use conduit::{Result, Server};
use conduit::Result;
use data::Data;
use database::Database;
use ruma::{OwnedRoomId, OwnedUserId, RoomId, UserId};
pub struct Service {
db: Data,
}
impl Service {
pub fn build(_server: &Arc<Server>, db: &Arc<Database>) -> Result<Self> {
Ok(Self {
db: Data::new(db),
})
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
db: Data::new(args.db),
}))
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
pub fn reset_notification_counts(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> {
self.db.reset_notification_counts(user_id, room_id)
}

View file

@ -6,9 +6,9 @@ mod sender;
use std::{fmt::Debug, sync::Arc};
use conduit::{Error, Result, Server};
use async_trait::async_trait;
use conduit::{Error, Result};
use data::Data;
use database::Database;
pub use resolve::FedDest;
use ruma::{
api::{appservice::Registration, OutgoingRequest},
@ -53,12 +53,13 @@ pub enum SendingEvent {
Flush, // none
}
impl Service {
pub fn build(server: &Arc<Server>, db: &Arc<Database>) -> Result<Arc<Self>> {
let config = &server.config;
#[async_trait]
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
let config = &args.server.config;
let (sender, receiver) = loole::unbounded();
Ok(Arc::new(Self {
db: Data::new(db.clone()),
db: Data::new(args.db.clone()),
sender,
receiver: Mutex::new(receiver),
handler_join: Mutex::new(None),
@ -67,7 +68,13 @@ impl Service {
}))
}
pub async fn close(&self) {
async fn start(self: Arc<Self>) -> Result<()> {
self.start_handler().await;
Ok(())
}
async fn stop(&self) {
self.interrupt();
if let Some(handler_join) = self.handler_join.lock().await.take() {
if let Err(e) = handler_join.await {
@ -76,12 +83,16 @@ impl Service {
}
}
pub fn interrupt(&self) {
fn interrupt(&self) {
if !self.sender.is_closed() {
self.sender.close();
}
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
#[tracing::instrument(skip(self, pdu_id, user, pushkey))]
pub fn send_pdu_push(&self, pdu_id: &[u8], user: &UserId, pushkey: String) -> Result<()> {
let dest = Destination::Push(user.to_owned(), pushkey);

53
src/service/service.rs Normal file
View file

@ -0,0 +1,53 @@
use std::{collections::BTreeMap, fmt::Write, sync::Arc};
use async_trait::async_trait;
use conduit::{utils::string::split_once_infallible, Result, Server};
use database::Database;
#[async_trait]
pub(crate) trait Service: Send + Sync {
/// Implement the construction of the service instance. Services are
/// generally singletons so expect this to only be called once for a
/// service type. Note that it may be called again after a server reload,
/// but the prior instance will have been dropped first. Failure will
/// shutdown the server with an error.
fn build(args: Args<'_>) -> Result<Arc<impl Service>>
where
Self: Sized;
/// Start the service. Implement the spawning of any service workers. This
/// is called after all other services have been constructed. Failure will
/// shutdown the server with an error.
async fn start(self: Arc<Self>) -> Result<()> { Ok(()) }
/// Stop the service. Implement the joining of any service workers and
/// cleanup of any other state. This function is asynchronous to allow that
/// gracefully, but errors cannot propagate.
async fn stop(&self) {}
/// Interrupt the service. This may be sent prior to `stop()` as a
/// notification to improve the shutdown sequence. Implementations must be
/// robust to this being called multiple times.
fn interrupt(&self) {}
/// Clear any caches or similar runtime state.
fn clear_cache(&self) {}
/// Memory usage report in a markdown string.
fn memory_usage(&self, _out: &mut dyn Write) -> Result<()> { Ok(()) }
/// Return the name of the service.
/// i.e. `crate::service::make_name(std::module_path!())`
fn name(&self) -> &str;
}
pub(crate) struct Args<'a> {
pub(crate) server: &'a Arc<Server>,
pub(crate) db: &'a Arc<Database>,
pub(crate) _service: &'a Map,
}
pub(crate) type Map = BTreeMap<String, Arc<dyn Service>>;
#[inline]
pub(crate) fn make_name(module_path: &str) -> &str { split_once_infallible(module_path, "::").1 }

View file

@ -1,101 +1,97 @@
use std::sync::Arc;
use std::{collections::BTreeMap, fmt::Write, sync::Arc};
use conduit::{debug_info, Result, Server};
use conduit::{debug, debug_info, info, trace, Result, Server};
use database::Database;
use tracing::{debug, info, trace};
use crate::{
account_data, admin, appservice, globals, key_backups, media, presence, pusher, rooms, sending, transaction_ids,
uiaa, users,
account_data, admin, appservice, globals, key_backups, media, presence, pusher, rooms, sending,
service::{Args, Map, Service},
transaction_ids, uiaa, users,
};
pub struct Services {
pub rooms: rooms::Service,
pub appservice: appservice::Service,
pub pusher: pusher::Service,
pub transaction_ids: transaction_ids::Service,
pub uiaa: uiaa::Service,
pub users: users::Service,
pub account_data: account_data::Service,
pub appservice: Arc<appservice::Service>,
pub pusher: Arc<pusher::Service>,
pub transaction_ids: Arc<transaction_ids::Service>,
pub uiaa: Arc<uiaa::Service>,
pub users: Arc<users::Service>,
pub account_data: Arc<account_data::Service>,
pub presence: Arc<presence::Service>,
pub admin: Arc<admin::Service>,
pub globals: globals::Service,
pub key_backups: key_backups::Service,
pub media: media::Service,
pub globals: Arc<globals::Service>,
pub key_backups: Arc<key_backups::Service>,
pub media: Arc<media::Service>,
pub sending: Arc<sending::Service>,
pub(crate) service: Map,
pub server: Arc<Server>,
pub db: Arc<Database>,
}
impl Services {
pub async fn build(server: Arc<Server>, db: Arc<Database>) -> Result<Self> {
pub fn build(server: Arc<Server>, db: Arc<Database>) -> Result<Self> {
let mut service: Map = BTreeMap::new();
macro_rules! build {
($tyname:ty) => {{
let built = <$tyname>::build(Args {
server: &server,
db: &db,
_service: &service,
})?;
service.insert(built.name().to_owned(), built.clone());
built
}};
}
Ok(Self {
rooms: rooms::Service {
alias: rooms::alias::Service::build(&server, &db)?,
auth_chain: rooms::auth_chain::Service::build(&server, &db)?,
directory: rooms::directory::Service::build(&server, &db)?,
event_handler: rooms::event_handler::Service::build(&server, &db)?,
lazy_loading: rooms::lazy_loading::Service::build(&server, &db)?,
metadata: rooms::metadata::Service::build(&server, &db)?,
outlier: rooms::outlier::Service::build(&server, &db)?,
pdu_metadata: rooms::pdu_metadata::Service::build(&server, &db)?,
read_receipt: rooms::read_receipt::Service::build(&server, &db)?,
search: rooms::search::Service::build(&server, &db)?,
short: rooms::short::Service::build(&server, &db)?,
state: rooms::state::Service::build(&server, &db)?,
state_accessor: rooms::state_accessor::Service::build(&server, &db)?,
state_cache: rooms::state_cache::Service::build(&server, &db)?,
state_compressor: rooms::state_compressor::Service::build(&server, &db)?,
timeline: rooms::timeline::Service::build(&server, &db)?,
threads: rooms::threads::Service::build(&server, &db)?,
typing: rooms::typing::Service::build(&server, &db)?,
spaces: rooms::spaces::Service::build(&server, &db)?,
user: rooms::user::Service::build(&server, &db)?,
alias: build!(rooms::alias::Service),
auth_chain: build!(rooms::auth_chain::Service),
directory: build!(rooms::directory::Service),
event_handler: build!(rooms::event_handler::Service),
lazy_loading: build!(rooms::lazy_loading::Service),
metadata: build!(rooms::metadata::Service),
outlier: build!(rooms::outlier::Service),
pdu_metadata: build!(rooms::pdu_metadata::Service),
read_receipt: build!(rooms::read_receipt::Service),
search: build!(rooms::search::Service),
short: build!(rooms::short::Service),
state: build!(rooms::state::Service),
state_accessor: build!(rooms::state_accessor::Service),
state_cache: build!(rooms::state_cache::Service),
state_compressor: build!(rooms::state_compressor::Service),
timeline: build!(rooms::timeline::Service),
threads: build!(rooms::threads::Service),
typing: build!(rooms::typing::Service),
spaces: build!(rooms::spaces::Service),
user: build!(rooms::user::Service),
},
appservice: appservice::Service::build(&server, &db)?,
pusher: pusher::Service::build(&server, &db)?,
transaction_ids: transaction_ids::Service::build(&server, &db)?,
uiaa: uiaa::Service::build(&server, &db)?,
users: users::Service::build(&server, &db)?,
account_data: account_data::Service::build(&server, &db)?,
presence: presence::Service::build(&server, &db)?,
admin: admin::Service::build(&server, &db)?,
key_backups: key_backups::Service::build(&server, &db)?,
media: media::Service::build(&server, &db)?,
sending: sending::Service::build(&server, &db)?,
globals: globals::Service::build(&server, &db)?,
appservice: build!(appservice::Service),
pusher: build!(pusher::Service),
transaction_ids: build!(transaction_ids::Service),
uiaa: build!(uiaa::Service),
users: build!(users::Service),
account_data: build!(account_data::Service),
presence: build!(presence::Service),
admin: build!(admin::Service),
key_backups: build!(key_backups::Service),
media: build!(media::Service),
sending: build!(sending::Service),
globals: build!(globals::Service),
service,
server,
db,
})
}
pub async fn memory_usage(&self) -> String {
let lazy_load_waiting = self.rooms.lazy_loading.lazy_load_waiting.lock().await.len();
let server_visibility_cache = self
.rooms
.state_accessor
.server_visibility_cache
.lock()
.unwrap()
.len();
let user_visibility_cache = self
.rooms
.state_accessor
.user_visibility_cache
.lock()
.unwrap()
.len();
let stateinfo_cache = self
.rooms
.state_compressor
.stateinfo_cache
.lock()
.unwrap()
.len();
let lasttimelinecount_cache = self
.rooms
.timeline
.get_lasttimelinecount_cache_usage().0;
pub async fn memory_usage(&self) -> Result<String> {
let mut out = String::new();
for service in self.service.values() {
service.memory_usage(&mut out)?;
}
//TODO
let roomid_spacehierarchy_cache = self
.rooms
.spaces
@ -103,80 +99,17 @@ impl Services {
.lock()
.await
.len();
let resolver_overrides_cache = self
.globals
.resolver
.overrides
.read()
.expect("locked for reading")
.len();
let resolver_destinations_cache = self
.globals
.resolver
.destinations
.read()
.expect("locked for reading")
.len();
let bad_event_ratelimiter = self.globals.bad_event_ratelimiter.read().await.len();
let bad_query_ratelimiter = self.globals.bad_query_ratelimiter.read().await.len();
let bad_signature_ratelimiter = self.globals.bad_signature_ratelimiter.read().await.len();
writeln!(out, "roomid_spacehierarchy_cache: {roomid_spacehierarchy_cache}")?;
format!(
"\
lazy_load_waiting: {lazy_load_waiting}
server_visibility_cache: {server_visibility_cache}
user_visibility_cache: {user_visibility_cache}
stateinfo_cache: {stateinfo_cache}
lasttimelinecount_cache: {lasttimelinecount_cache}
roomid_spacehierarchy_cache: {roomid_spacehierarchy_cache}
resolver_overrides_cache: {resolver_overrides_cache}
resolver_destinations_cache: {resolver_destinations_cache}
bad_event_ratelimiter: {bad_event_ratelimiter}
bad_query_ratelimiter: {bad_query_ratelimiter}
bad_signature_ratelimiter: {bad_signature_ratelimiter}
"
)
Ok(out)
}
pub async fn clear_caches(&self, amount: u32) {
if amount > 0 {
self.rooms
.lazy_loading
.lazy_load_waiting
.lock()
.await
.clear();
pub async fn clear_cache(&self) {
for service in self.service.values() {
service.clear_cache();
}
if amount > 1 {
self.rooms
.state_accessor
.server_visibility_cache
.lock()
.unwrap()
.clear();
}
if amount > 2 {
self.rooms
.state_accessor
.user_visibility_cache
.lock()
.unwrap()
.clear();
}
if amount > 3 {
self.rooms
.state_compressor
.stateinfo_cache
.lock()
.unwrap()
.clear();
}
if amount > 4 {
self.rooms
.timeline
.clear_lasttimelinecount_cache();
}
if amount > 5 {
//TODO
self.rooms
.spaces
.roomid_spacehierarchy_cache
@ -184,33 +117,6 @@ bad_signature_ratelimiter: {bad_signature_ratelimiter}
.await
.clear();
}
if amount > 6 {
self.globals
.resolver
.overrides
.write()
.expect("locked for writing")
.clear();
self.globals
.resolver
.destinations
.write()
.expect("locked for writing")
.clear();
}
if amount > 7 {
self.globals.resolver.resolver.clear_cache();
}
if amount > 8 {
self.globals.bad_event_ratelimiter.write().await.clear();
}
if amount > 9 {
self.globals.bad_query_ratelimiter.write().await.clear();
}
if amount > 10 {
self.globals.bad_signature_ratelimiter.write().await.clear();
}
}
pub async fn start(&self) -> Result<()> {
debug_info!("Starting services");
@ -219,10 +125,9 @@ bad_signature_ratelimiter: {bad_signature_ratelimiter}
globals::migrations::migrations(&self.db, &self.globals.config).await?;
globals::emerg_access::init_emergency_access();
self.admin.start_handler().await;
self.sending.start_handler().await;
if self.globals.config.allow_local_presence {
self.presence.start_handler().await;
for (name, service) in &self.service {
debug!("Starting {name}");
service.clone().start().await?;
}
if self.globals.allow_check_for_updates() {
@ -238,18 +143,18 @@ bad_signature_ratelimiter: {bad_signature_ratelimiter}
Ok(())
}
pub async fn interrupt(&self) {
trace!("Interrupting services...");
self.sending.interrupt();
self.presence.interrupt();
self.admin.interrupt();
pub fn interrupt(&self) {
debug!("Interrupting services...");
trace!("Services interrupt complete.");
for (name, service) in &self.service {
trace!("Interrupting {name}");
service.interrupt();
}
}
pub async fn stop(&self) {
info!("Shutting down services");
self.interrupt().await;
self.interrupt();
debug!("Waiting for update worker...");
if let Some(updates_handle) = self.globals.updates_handle.lock().await.take() {
@ -261,14 +166,10 @@ bad_signature_ratelimiter: {bad_signature_ratelimiter}
}
}
debug!("Waiting for admin worker...");
self.admin.close().await;
debug!("Waiting for presence worker...");
self.presence.close().await;
debug!("Waiting for sender...");
self.sending.close().await;
for (name, service) in &self.service {
debug!("Waiting for {name} ...");
service.stop().await;
}
debug_info!("Services shutdown complete.");
}

View file

@ -2,22 +2,25 @@ mod data;
use std::sync::Arc;
use conduit::{Result, Server};
use conduit::Result;
use data::Data;
use database::Database;
use ruma::{DeviceId, TransactionId, UserId};
pub struct Service {
pub db: Data,
}
impl Service {
pub fn build(_server: &Arc<Server>, db: &Arc<Database>) -> Result<Self> {
Ok(Self {
db: Data::new(db),
})
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
db: Data::new(args.db),
}))
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
pub fn add_txnid(
&self, user_id: &UserId, device_id: Option<&DeviceId>, txn_id: &TransactionId, data: &[u8],
) -> Result<()> {

View file

@ -2,9 +2,8 @@ mod data;
use std::sync::Arc;
use conduit::{utils, utils::hash, Error, Result, Server};
use conduit::{utils, utils::hash, Error, Result};
use data::Data;
use database::Database;
use ruma::{
api::client::{
error::ErrorKind,
@ -22,13 +21,17 @@ pub struct Service {
pub db: Data,
}
impl Service {
pub fn build(_server: &Arc<Server>, db: &Arc<Database>) -> Result<Self> {
Ok(Self {
db: Data::new(db),
})
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
db: Data::new(args.db),
}))
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
/// Creates a new Uiaa session. Make sure the session token is unique.
pub fn create(
&self, user_id: &UserId, device_id: &DeviceId, uiaainfo: &UiaaInfo, json_body: &CanonicalJsonValue,

View file

@ -6,9 +6,8 @@ use std::{
sync::{Arc, Mutex, Mutex as StdMutex},
};
use conduit::{Error, Result, Server};
use conduit::{Error, Result};
use data::Data;
use database::Database;
use ruma::{
api::client::{
device::Device,
@ -41,14 +40,18 @@ pub struct Service {
pub connections: DbConnections,
}
impl Service {
pub fn build(_server: &Arc<Server>, db: &Arc<Database>) -> Result<Self> {
Ok(Self {
db: Data::new(db.clone()),
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
db: Data::new(args.db.clone()),
connections: StdMutex::new(BTreeMap::new()),
})
}))
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
/// Check if a user has an account on this homeserver.
pub fn exists(&self, user_id: &UserId) -> Result<bool> { self.db.exists(user_id) }