de-global some services()

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-07-15 10:37:16 +00:00
parent d67f19a55d
commit 4430e4dee0

View file

@ -1,5 +1,6 @@
use std::{collections::BTreeMap, net::IpAddr, time::Instant}; use std::{collections::BTreeMap, net::IpAddr, time::Instant};
use axum::extract::State;
use axum_client_ip::InsecureClientIp; use axum_client_ip::InsecureClientIp;
use conduit::{debug, debug_warn, err, trace, warn, Err}; use conduit::{debug, debug_warn, err, trace, warn, Err};
use ruma::{ use ruma::{
@ -21,7 +22,7 @@ use tokio::sync::RwLock;
use crate::{ use crate::{
service::rooms::event_handler::parse_incoming_pdu, service::rooms::event_handler::parse_incoming_pdu,
services, services::Services,
utils::{self}, utils::{self},
Error, Result, Ruma, Error, Result, Ruma,
}; };
@ -33,7 +34,8 @@ type ResolvedMap = BTreeMap<OwnedEventId, Result<(), Error>>;
/// Push EDUs and PDUs to this server. /// Push EDUs and PDUs to this server.
#[tracing::instrument(skip_all, fields(%client), name = "send")] #[tracing::instrument(skip_all, fields(%client), name = "send")]
pub(crate) async fn send_transaction_message_route( pub(crate) async fn send_transaction_message_route(
InsecureClientIp(client): InsecureClientIp, body: Ruma<send_transaction_message::v1::Request>, State(services): State<&Services>, InsecureClientIp(client): InsecureClientIp,
body: Ruma<send_transaction_message::v1::Request>,
) -> Result<send_transaction_message::v1::Response> { ) -> Result<send_transaction_message::v1::Response> {
let origin = body.origin.as_ref().expect("server is authenticated"); let origin = body.origin.as_ref().expect("server is authenticated");
@ -61,8 +63,8 @@ pub(crate) async fn send_transaction_message_route(
"Starting txn", "Starting txn",
); );
let resolved_map = handle_pdus(&client, &body, origin, &txn_start_time).await?; let resolved_map = handle_pdus(services, &client, &body, origin, &txn_start_time).await?;
handle_edus(&client, &body, origin).await?; handle_edus(services, &client, &body, origin).await?;
debug!( debug!(
pdus = ?body.pdus.len(), pdus = ?body.pdus.len(),
@ -82,7 +84,8 @@ pub(crate) async fn send_transaction_message_route(
} }
async fn handle_pdus( async fn handle_pdus(
_client: &IpAddr, body: &Ruma<send_transaction_message::v1::Request>, origin: &ServerName, txn_start_time: &Instant, services: &Services, _client: &IpAddr, body: &Ruma<send_transaction_message::v1::Request>, origin: &ServerName,
txn_start_time: &Instant,
) -> Result<ResolvedMap> { ) -> Result<ResolvedMap> {
let mut parsed_pdus = Vec::with_capacity(body.pdus.len()); let mut parsed_pdus = Vec::with_capacity(body.pdus.len());
for pdu in &body.pdus { for pdu in &body.pdus {
@ -102,7 +105,7 @@ async fn handle_pdus(
// corresponding signing keys // corresponding signing keys
let pub_key_map = RwLock::new(BTreeMap::new()); let pub_key_map = RwLock::new(BTreeMap::new());
if !parsed_pdus.is_empty() { if !parsed_pdus.is_empty() {
services() services
.rooms .rooms
.event_handler .event_handler
.fetch_required_signing_keys(parsed_pdus.iter().map(|(_event_id, event, _room_id)| event), &pub_key_map) .fetch_required_signing_keys(parsed_pdus.iter().map(|(_event_id, event, _room_id)| event), &pub_key_map)
@ -118,7 +121,7 @@ async fn handle_pdus(
let mut resolved_map = BTreeMap::new(); let mut resolved_map = BTreeMap::new();
for (event_id, value, room_id) in parsed_pdus { for (event_id, value, room_id) in parsed_pdus {
let pdu_start_time = Instant::now(); let pdu_start_time = Instant::now();
let mutex_lock = services() let mutex_lock = services
.rooms .rooms
.event_handler .event_handler
.mutex_federation .mutex_federation
@ -126,7 +129,7 @@ async fn handle_pdus(
.await; .await;
resolved_map.insert( resolved_map.insert(
event_id.clone(), event_id.clone(),
services() services
.rooms .rooms
.event_handler .event_handler
.handle_incoming_pdu(origin, &room_id, &event_id, value, true, &pub_key_map) .handle_incoming_pdu(origin, &room_id, &event_id, value, true, &pub_key_map)
@ -154,7 +157,7 @@ async fn handle_pdus(
} }
async fn handle_edus( async fn handle_edus(
client: &IpAddr, body: &Ruma<send_transaction_message::v1::Request>, origin: &ServerName, services: &Services, client: &IpAddr, body: &Ruma<send_transaction_message::v1::Request>, origin: &ServerName,
) -> Result<()> { ) -> Result<()> {
for edu in body for edu in body
.edus .edus
@ -162,12 +165,12 @@ async fn handle_edus(
.filter_map(|edu| serde_json::from_str::<Edu>(edu.json().get()).ok()) .filter_map(|edu| serde_json::from_str::<Edu>(edu.json().get()).ok())
{ {
match edu { match edu {
Edu::Presence(presence) => handle_edu_presence(client, origin, presence).await?, Edu::Presence(presence) => handle_edu_presence(services, client, origin, presence).await?,
Edu::Receipt(receipt) => handle_edu_receipt(client, origin, receipt).await?, Edu::Receipt(receipt) => handle_edu_receipt(services, client, origin, receipt).await?,
Edu::Typing(typing) => handle_edu_typing(client, origin, typing).await?, Edu::Typing(typing) => handle_edu_typing(services, client, origin, typing).await?,
Edu::DeviceListUpdate(content) => handle_edu_device_list_update(client, origin, content).await?, Edu::DeviceListUpdate(content) => handle_edu_device_list_update(services, client, origin, content).await?,
Edu::DirectToDevice(content) => handle_edu_direct_to_device(client, origin, content).await?, Edu::DirectToDevice(content) => handle_edu_direct_to_device(services, client, origin, content).await?,
Edu::SigningKeyUpdate(content) => handle_edu_signing_key_update(client, origin, content).await?, Edu::SigningKeyUpdate(content) => handle_edu_signing_key_update(services, client, origin, content).await?,
Edu::_Custom(ref _custom) => { Edu::_Custom(ref _custom) => {
debug_warn!(?body.edus, "received custom/unknown EDU"); debug_warn!(?body.edus, "received custom/unknown EDU");
}, },
@ -177,8 +180,10 @@ async fn handle_edus(
Ok(()) Ok(())
} }
async fn handle_edu_presence(_client: &IpAddr, origin: &ServerName, presence: PresenceContent) -> Result<()> { async fn handle_edu_presence(
if !services().globals.allow_incoming_presence() { services: &Services, _client: &IpAddr, origin: &ServerName, presence: PresenceContent,
) -> Result<()> {
if !services.globals.allow_incoming_presence() {
return Ok(()); return Ok(());
} }
@ -191,7 +196,7 @@ async fn handle_edu_presence(_client: &IpAddr, origin: &ServerName, presence: Pr
continue; continue;
} }
services().presence.set_presence( services.presence.set_presence(
&update.user_id, &update.user_id,
&update.presence, &update.presence,
Some(update.currently_active), Some(update.currently_active),
@ -203,13 +208,15 @@ async fn handle_edu_presence(_client: &IpAddr, origin: &ServerName, presence: Pr
Ok(()) Ok(())
} }
async fn handle_edu_receipt(_client: &IpAddr, origin: &ServerName, receipt: ReceiptContent) -> Result<()> { async fn handle_edu_receipt(
if !services().globals.allow_incoming_read_receipts() { services: &Services, _client: &IpAddr, origin: &ServerName, receipt: ReceiptContent,
) -> Result<()> {
if !services.globals.allow_incoming_read_receipts() {
return Ok(()); return Ok(());
} }
for (room_id, room_updates) in receipt.receipts { for (room_id, room_updates) in receipt.receipts {
if services() if services
.rooms .rooms
.event_handler .event_handler
.acl_check(origin, &room_id) .acl_check(origin, &room_id)
@ -231,7 +238,7 @@ async fn handle_edu_receipt(_client: &IpAddr, origin: &ServerName, receipt: Rece
continue; continue;
} }
if services() if services
.rooms .rooms
.state_cache .state_cache
.room_members(&room_id) .room_members(&room_id)
@ -247,7 +254,7 @@ async fn handle_edu_receipt(_client: &IpAddr, origin: &ServerName, receipt: Rece
room_id: room_id.clone(), room_id: room_id.clone(),
}; };
services() services
.rooms .rooms
.read_receipt .read_receipt
.readreceipt_update(&user_id, &room_id, &event)?; .readreceipt_update(&user_id, &room_id, &event)?;
@ -265,8 +272,10 @@ async fn handle_edu_receipt(_client: &IpAddr, origin: &ServerName, receipt: Rece
Ok(()) Ok(())
} }
async fn handle_edu_typing(_client: &IpAddr, origin: &ServerName, typing: TypingContent) -> Result<()> { async fn handle_edu_typing(
if !services().globals.config.allow_incoming_typing { services: &Services, _client: &IpAddr, origin: &ServerName, typing: TypingContent,
) -> Result<()> {
if !services.globals.config.allow_incoming_typing {
return Ok(()); return Ok(());
} }
@ -278,7 +287,7 @@ async fn handle_edu_typing(_client: &IpAddr, origin: &ServerName, typing: Typing
return Ok(()); return Ok(());
} }
if services() if services
.rooms .rooms
.event_handler .event_handler
.acl_check(typing.user_id.server_name(), &typing.room_id) .acl_check(typing.user_id.server_name(), &typing.room_id)
@ -291,26 +300,26 @@ async fn handle_edu_typing(_client: &IpAddr, origin: &ServerName, typing: Typing
return Ok(()); return Ok(());
} }
if services() if services
.rooms .rooms
.state_cache .state_cache
.is_joined(&typing.user_id, &typing.room_id)? .is_joined(&typing.user_id, &typing.room_id)?
{ {
if typing.typing { if typing.typing {
let timeout = utils::millis_since_unix_epoch().saturating_add( let timeout = utils::millis_since_unix_epoch().saturating_add(
services() services
.globals .globals
.config .config
.typing_federation_timeout_s .typing_federation_timeout_s
.saturating_mul(1000), .saturating_mul(1000),
); );
services() services
.rooms .rooms
.typing .typing
.typing_add(&typing.user_id, &typing.room_id, timeout) .typing_add(&typing.user_id, &typing.room_id, timeout)
.await?; .await?;
} else { } else {
services() services
.rooms .rooms
.typing .typing
.typing_remove(&typing.user_id, &typing.room_id) .typing_remove(&typing.user_id, &typing.room_id)
@ -328,7 +337,7 @@ async fn handle_edu_typing(_client: &IpAddr, origin: &ServerName, typing: Typing
} }
async fn handle_edu_device_list_update( async fn handle_edu_device_list_update(
_client: &IpAddr, origin: &ServerName, content: DeviceListUpdateContent, services: &Services, _client: &IpAddr, origin: &ServerName, content: DeviceListUpdateContent,
) -> Result<()> { ) -> Result<()> {
let DeviceListUpdateContent { let DeviceListUpdateContent {
user_id, user_id,
@ -343,13 +352,13 @@ async fn handle_edu_device_list_update(
return Ok(()); return Ok(());
} }
services().users.mark_device_key_update(&user_id)?; services.users.mark_device_key_update(&user_id)?;
Ok(()) Ok(())
} }
async fn handle_edu_direct_to_device( async fn handle_edu_direct_to_device(
_client: &IpAddr, origin: &ServerName, content: DirectDeviceContent, services: &Services, _client: &IpAddr, origin: &ServerName, content: DirectDeviceContent,
) -> Result<()> { ) -> Result<()> {
let DirectDeviceContent { let DirectDeviceContent {
sender, sender,
@ -367,7 +376,7 @@ async fn handle_edu_direct_to_device(
} }
// Check if this is a new transaction id // Check if this is a new transaction id
if services() if services
.transaction_ids .transaction_ids
.existing_txnid(&sender, None, &message_id)? .existing_txnid(&sender, None, &message_id)?
.is_some() .is_some()
@ -379,7 +388,7 @@ async fn handle_edu_direct_to_device(
for (target_device_id_maybe, event) in map { for (target_device_id_maybe, event) in map {
match target_device_id_maybe { match target_device_id_maybe {
DeviceIdOrAllDevices::DeviceId(target_device_id) => { DeviceIdOrAllDevices::DeviceId(target_device_id) => {
services().users.add_to_device_event( services.users.add_to_device_event(
&sender, &sender,
target_user_id, target_user_id,
target_device_id, target_device_id,
@ -391,8 +400,8 @@ async fn handle_edu_direct_to_device(
}, },
DeviceIdOrAllDevices::AllDevices => { DeviceIdOrAllDevices::AllDevices => {
for target_device_id in services().users.all_device_ids(target_user_id) { for target_device_id in services.users.all_device_ids(target_user_id) {
services().users.add_to_device_event( services.users.add_to_device_event(
&sender, &sender,
target_user_id, target_user_id,
&target_device_id?, &target_device_id?,
@ -408,7 +417,7 @@ async fn handle_edu_direct_to_device(
} }
// Save transaction id with empty data // Save transaction id with empty data
services() services
.transaction_ids .transaction_ids
.add_txnid(&sender, None, &message_id, &[])?; .add_txnid(&sender, None, &message_id, &[])?;
@ -416,7 +425,7 @@ async fn handle_edu_direct_to_device(
} }
async fn handle_edu_signing_key_update( async fn handle_edu_signing_key_update(
_client: &IpAddr, origin: &ServerName, content: SigningKeyUpdateContent, services: &Services, _client: &IpAddr, origin: &ServerName, content: SigningKeyUpdateContent,
) -> Result<()> { ) -> Result<()> {
let SigningKeyUpdateContent { let SigningKeyUpdateContent {
user_id, user_id,
@ -433,7 +442,7 @@ async fn handle_edu_signing_key_update(
} }
if let Some(master_key) = master_key { if let Some(master_key) = master_key {
services() services
.users .users
.add_cross_signing_keys(&user_id, &master_key, &self_signing_key, &None, true)?; .add_cross_signing_keys(&user_id, &master_key, &self_signing_key, &None, true)?;
} }