From f919fa879b60c7fb2bf24d5af32acaacda7efdc8 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Tue, 23 Apr 2024 14:00:21 -0700 Subject: [PATCH] abbrev destination in sender Signed-off-by: Jason Volk --- src/service/sending/mod.rs | 155 ++++++++++++++++++------------------ src/service/sending/send.rs | 111 +++++++++++++------------- 2 files changed, 132 insertions(+), 134 deletions(-) diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index c330cfcc..7cde0941 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -86,12 +86,12 @@ impl Service { #[tracing::instrument(skip(self, pdu_id, user, pushkey))] pub(crate) fn send_pdu_push(&self, pdu_id: &[u8], user: &UserId, pushkey: String) -> Result<()> { - let destination = Destination::Push(user.to_owned(), pushkey); + let dest = Destination::Push(user.to_owned(), pushkey); let event = SendingEventType::Pdu(pdu_id.to_owned()); let _cork = services().globals.db.cork()?; - let keys = self.db.queue_requests(&[(&destination, event.clone())])?; + let keys = self.db.queue_requests(&[(&dest, event.clone())])?; self.sender - .send((destination, event, keys.into_iter().next().unwrap())) + .send((dest, event, keys.into_iter().next().unwrap())) .unwrap(); Ok(()) @@ -99,12 +99,12 @@ impl Service { #[tracing::instrument(skip(self))] pub(crate) fn send_pdu_appservice(&self, appservice_id: String, pdu_id: Vec) -> Result<()> { - let destination = Destination::Appservice(appservice_id); + let dest = Destination::Appservice(appservice_id); let event = SendingEventType::Pdu(pdu_id); let _cork = services().globals.db.cork()?; - let keys = self.db.queue_requests(&[(&destination, event.clone())])?; + let keys = self.db.queue_requests(&[(&dest, event.clone())])?; self.sender - .send((destination, event, keys.into_iter().next().unwrap())) + .send((dest, event, keys.into_iter().next().unwrap())) .unwrap(); Ok(()) @@ -137,8 +137,8 @@ impl Service { .map(|(o, e)| (o, e.clone())) .collect::>(), )?; - for ((destination, event), key) in requests.into_iter().zip(keys) { - self.sender.send((destination.clone(), event, key)).unwrap(); + for ((dest, event), key) in requests.into_iter().zip(keys) { + self.sender.send((dest.clone(), event, key)).unwrap(); } Ok(()) @@ -146,12 +146,12 @@ impl Service { #[tracing::instrument(skip(self, server, serialized))] pub(crate) fn send_edu_server(&self, server: &ServerName, serialized: Vec) -> Result<()> { - let destination = Destination::Normal(server.to_owned()); + let dest = Destination::Normal(server.to_owned()); let event = SendingEventType::Edu(serialized); let _cork = services().globals.db.cork()?; - let keys = self.db.queue_requests(&[(&destination, event.clone())])?; + let keys = self.db.queue_requests(&[(&dest, event.clone())])?; self.sender - .send((destination, event, keys.into_iter().next().unwrap())) + .send((dest, event, keys.into_iter().next().unwrap())) .unwrap(); Ok(()) @@ -184,8 +184,8 @@ impl Service { .map(|(o, e)| (o, e.clone())) .collect::>(), )?; - for ((destination, event), key) in requests.into_iter().zip(keys) { - self.sender.send((destination.clone(), event, key)).unwrap(); + for ((dest, event), key) in requests.into_iter().zip(keys) { + self.sender.send((dest.clone(), event, key)).unwrap(); } Ok(()) @@ -207,9 +207,9 @@ impl Service { pub(crate) fn flush_servers>(&self, servers: I) -> Result<()> { let requests = servers.into_iter().map(Destination::Normal); - for destination in requests { + for dest in requests { self.sender - .send((destination, SendingEventType::Flush, Vec::::new())) + .send((dest, SendingEventType::Flush, Vec::::new())) .unwrap(); } @@ -282,13 +282,13 @@ impl Service { // Retry requests we could not finish yet if self.startup_netburst { let mut initial_transactions = HashMap::>::new(); - for (key, destination, event) in self.db.active_requests().filter_map(Result::ok) { - let entry = initial_transactions.entry(destination.clone()).or_default(); + for (key, dest, event) in self.db.active_requests().filter_map(Result::ok) { + let entry = initial_transactions.entry(dest.clone()).or_default(); if self.startup_netburst_keep >= 0 && entry.len() >= usize::try_from(self.startup_netburst_keep).unwrap() { - warn!("Dropping unsent event {:?} {:?}", destination, String::from_utf8_lossy(&key),); + warn!("Dropping unsent event {:?} {:?}", dest, String::from_utf8_lossy(&key),); self.db.delete_active_request(key)?; continue; } @@ -296,9 +296,9 @@ impl Service { entry.push(event); } - for (destination, events) in initial_transactions { - current_transaction_status.insert(destination.clone(), TransactionStatus::Running); - futures.push(handle_events(destination.clone(), events)); + for (dest, events) in initial_transactions { + current_transaction_status.insert(dest.clone(), TransactionStatus::Running); + futures.push(send_events(dest.clone(), events)); } } @@ -306,30 +306,30 @@ impl Service { tokio::select! { Some(response) = futures.next() => { match response { - Ok(destination) => { + Ok(dest) => { let _cork = services().globals.db.cork(); - self.db.delete_all_active_requests_for(&destination)?; + self.db.delete_all_active_requests_for(&dest)?; // Find events that have been added since starting the last request let new_events = self .db - .queued_requests(&destination) + .queued_requests(&dest) .filter_map(Result::ok) .take(30).collect::>(); if !new_events.is_empty() { // Insert pdus we found self.db.mark_as_active(&new_events)?; - futures.push(handle_events( - destination.clone(), + futures.push(send_events( + dest.clone(), new_events.into_iter().map(|(event, _)| event).collect(), )); } else { - current_transaction_status.remove(&destination); + current_transaction_status.remove(&dest); } } - Err((destination, _)) => { - current_transaction_status.entry(destination).and_modify(|e| *e = match e { + Err((dest, _)) => { + current_transaction_status.entry(dest).and_modify(|e| *e = match e { TransactionStatus::Running => TransactionStatus::Failed(1, Instant::now()), TransactionStatus::Retrying(n) => TransactionStatus::Failed(*n+1, Instant::now()), TransactionStatus::Failed(_, _) => { @@ -342,13 +342,13 @@ impl Service { }, event = receiver.recv_async() => { - if let Ok((destination, event, key)) = event { + if let Ok((dest, event, key)) = event { if let Ok(Some(events)) = self.select_events( - &destination, + &dest, vec![(event, key)], &mut current_transaction_status, ) { - futures.push(handle_events(destination, events)); + futures.push(send_events(dest, events)); } } } @@ -356,14 +356,14 @@ impl Service { } } - #[tracing::instrument(skip(self, destination, new_events, current_transaction_status))] + #[tracing::instrument(skip(self, dest, new_events, current_transaction_status))] fn select_events( &self, - destination: &Destination, + dest: &Destination, new_events: Vec<(SendingEventType, Vec)>, // Events we want to send: event and full key current_transaction_status: &mut HashMap, ) -> Result>> { - let (allow, retry) = self.select_events_current(destination.clone(), current_transaction_status)?; + let (allow, retry) = self.select_events_current(dest.clone(), current_transaction_status)?; // Nothing can be done for this remote, bail out. if !allow { @@ -376,7 +376,7 @@ impl Service { // Must retry any previous transaction for this remote. if retry { self.db - .active_requests_for(destination) + .active_requests_for(dest) .filter_map(Result::ok) .for_each(|(_, e)| events.push(e)); @@ -393,7 +393,7 @@ impl Service { } // Add EDU's into the transaction - if let Destination::Normal(server_name) = destination { + if let Destination::Normal(server_name) = dest { if let Ok((select_edus, last_count)) = self.select_edus(server_name) { events.extend(select_edus.into_iter().map(SendingEventType::Edu)); self.db.set_latest_educount(server_name, last_count)?; @@ -403,13 +403,13 @@ impl Service { Ok(Some(events)) } - #[tracing::instrument(skip(self, destination, current_transaction_status))] + #[tracing::instrument(skip(self, dest, current_transaction_status))] fn select_events_current( - &self, destination: Destination, current_transaction_status: &mut HashMap, + &self, dest: Destination, current_transaction_status: &mut HashMap, ) -> Result<(bool, bool)> { let (mut allow, mut retry) = (true, false); current_transaction_status - .entry(destination) + .entry(dest) .and_modify(|e| match e { TransactionStatus::Failed(tries, time) => { // Fail if a request has failed recently (exponential backoff) @@ -595,21 +595,17 @@ pub(crate) fn select_edus_receipts( Ok(true) } -async fn handle_events( - destination: Destination, events: Vec, -) -> Result { - match destination { - Destination::Appservice(ref id) => handle_events_destination_appservice(&destination, id, events).await, - Destination::Push(ref userid, ref pushkey) => { - handle_events_destination_push(&destination, userid, pushkey, events).await - }, - Destination::Normal(ref server) => handle_events_destination_normal(&destination, server, events).await, +async fn send_events(dest: Destination, events: Vec) -> Result { + match dest { + Destination::Normal(ref server) => send_events_dest_normal(&dest, server, events).await, + Destination::Appservice(ref id) => send_events_dest_appservice(&dest, id, events).await, + Destination::Push(ref userid, ref pushkey) => send_events_dest_push(&dest, userid, pushkey, events).await, } } -#[tracing::instrument(skip(destination, events))] -async fn handle_events_destination_appservice( - destination: &Destination, id: &String, events: Vec, +#[tracing::instrument(skip(dest, events))] +async fn send_events_dest_appservice( + dest: &Destination, id: &String, events: Vec, ) -> Result { let mut pdu_jsons = Vec::new(); @@ -621,10 +617,10 @@ async fn handle_events_destination_appservice( .rooms .timeline .get_pdu_from_id(pdu_id) - .map_err(|e| (destination.clone(), e))? + .map_err(|e| (dest.clone(), e))? .ok_or_else(|| { ( - destination.clone(), + dest.clone(), Error::bad_database("[Appservice] Event in servernameevent_data not found in db."), ) })? @@ -647,7 +643,7 @@ async fn handle_events_destination_appservice( .await .ok_or_else(|| { ( - destination.clone(), + dest.clone(), Error::bad_database("[Appservice] Could not load registration from db."), ) })?, @@ -667,18 +663,18 @@ async fn handle_events_destination_appservice( ) .await { - Ok(_) => Ok(destination.clone()), - Err(e) => Err((destination.clone(), e)), - }; + Ok(_) => Ok(dest.clone()), + Err(e) => Err((dest.clone(), e)), + } drop(permit); response } -#[tracing::instrument(skip(destination, events))] -async fn handle_events_destination_push( - destination: &Destination, userid: &OwnedUserId, pushkey: &String, events: Vec, +#[tracing::instrument(skip(dest, events))] +async fn send_events_dest_push( + dest: &Destination, userid: &OwnedUserId, pushkey: &String, events: Vec, ) -> Result { let mut pdus = Vec::new(); @@ -690,10 +686,10 @@ async fn handle_events_destination_push( .rooms .timeline .get_pdu_from_id(pdu_id) - .map_err(|e| (destination.clone(), e))? + .map_err(|e| (dest.clone(), e))? .ok_or_else(|| { ( - destination.clone(), + dest.clone(), Error::bad_database("[Push] Event in servernamevent_datas not found in db."), ) })?, @@ -719,7 +715,7 @@ async fn handle_events_destination_push( let Some(pusher) = services() .pusher .get_pusher(userid, pushkey) - .map_err(|e| (destination.clone(), e))? + .map_err(|e| (dest.clone(), e))? else { continue; }; @@ -735,7 +731,7 @@ async fn handle_events_destination_push( .rooms .user .notification_count(userid, &pdu.room_id) - .map_err(|e| (destination.clone(), e))? + .map_err(|e| (dest.clone(), e))? .try_into() .expect("notification count can't go that high"); @@ -745,18 +741,18 @@ async fn handle_events_destination_push( .pusher .send_push_notice(userid, unread, &pusher, rules_for_user, &pdu) .await - .map(|_response| destination.clone()) - .map_err(|e| (destination.clone(), e)); + .map(|_response| dest.clone()) + .map_err(|e| (dest.clone(), e)); drop(permit); } - Ok(destination.clone()) + Ok(dest.clone()) } -#[tracing::instrument(skip(destination, events), name = "")] -async fn handle_events_destination_normal( - destination: &Destination, dest: &OwnedServerName, events: Vec, +#[tracing::instrument(skip(dest, events), name = "")] +async fn send_events_dest_normal( + dest: &Destination, server_name: &OwnedServerName, events: Vec, ) -> Result { let mut edu_jsons = Vec::new(); let mut pdu_jsons = Vec::new(); @@ -770,11 +766,16 @@ async fn handle_events_destination_normal( .rooms .timeline .get_pdu_json_from_id(pdu_id) - .map_err(|e| (destination.clone(), e))? + .map_err(|e| (dest.clone(), e))? .ok_or_else(|| { - error!("event not found: {dest} {pdu_id:?}"); + error!( + dest = ?dest, + server_name = ?server_name, + pdu_id = ?pdu_id, + "event not found" + ); ( - destination.clone(), + dest.clone(), Error::bad_database("[Normal] Event in servernamevent_datas not found in db."), ) })?, @@ -796,7 +797,7 @@ async fn handle_events_destination_normal( let client = &services().globals.client.sender; let response = send::send_request( client, - dest, + server_name, send_transaction_message::v1::Request { origin: services().globals.server_name().to_owned(), pdus: pdu_jsons, @@ -821,9 +822,9 @@ async fn handle_events_destination_normal( warn!("error for {} from remote: {:?}", pdu.0, pdu.1); } } - destination.clone() + dest.clone() }) - .map_err(|e| (destination.clone(), e)); + .map_err(|e| (dest.clone(), e)); drop(permit); diff --git a/src/service/sending/send.rs b/src/service/sending/send.rs index f08024e4..d9c52af2 100644 --- a/src/service/sending/send.rs +++ b/src/service/sending/send.rs @@ -43,15 +43,15 @@ pub(crate) enum FedDest { Named(String, String), } -struct ActualDestination { - destination: FedDest, +struct ActualDest { + dest: FedDest, host: String, string: String, cached: bool, } #[tracing::instrument(skip_all, name = "send")] -pub(crate) async fn send_request(client: &Client, destination: &ServerName, req: T) -> Result +pub(crate) async fn send_request(client: &Client, dest: &ServerName, req: T) -> Result where T: OutgoingRequest + Debug, { @@ -60,8 +60,8 @@ where } trace!("Preparing to send request"); - validate_destination(destination)?; - let actual = get_actual_destination(destination).await?; + validate_dest(dest)?; + let actual = get_actual_dest(dest).await?; let mut http_request = req .try_into_http_request::>(&actual.string, SendAccessToken::IfRequired(""), &[MatrixVersion::V1_5]) .map_err(|e| { @@ -69,7 +69,7 @@ where Error::BadServerResponse("Invalid destination") })?; - sign_request::(destination, &mut http_request); + sign_request::(dest, &mut http_request); let request = Request::try_from(http_request)?; let method = request.method().clone(); let url = request.url().clone(); @@ -81,13 +81,13 @@ where "Sending request", ); match client.execute(request).await { - Ok(response) => handle_response::(destination, actual, &method, &url, response).await, - Err(e) => handle_error::(destination, &actual, &method, &url, e), + Ok(response) => handle_response::(dest, actual, &method, &url, response).await, + Err(e) => handle_error::(dest, &actual, &method, &url, e), } } async fn handle_response( - destination: &ServerName, actual: ActualDestination, method: &Method, url: &Url, mut response: Response, + dest: &ServerName, actual: ActualDest, method: &Method, url: &Url, mut response: Response, ) -> Result where T: OutgoingRequest + Debug, @@ -116,10 +116,7 @@ where debug!("Got {status:?} for {method} {url}"); if !status.is_success() { - return Err(Error::Federation( - destination.to_owned(), - RumaError::from_http_response(http_response), - )); + return Err(Error::Federation(dest.to_owned(), RumaError::from_http_response(http_response))); } let response = T::IncomingResponse::try_from_http_response(http_response); @@ -129,7 +126,7 @@ where .actual_destinations() .write() .await - .insert(OwnedServerName::from(destination), (actual.destination, actual.host)); + .insert(OwnedServerName::from(dest), (actual.dest, actual.host)); } match response { @@ -139,7 +136,7 @@ where } fn handle_error( - _dest: &ServerName, actual: &ActualDestination, method: &Method, url: &Url, mut e: reqwest::Error, + _dest: &ServerName, actual: &ActualDest, method: &Method, url: &Url, mut e: reqwest::Error, ) -> Result where T: OutgoingRequest + Debug, @@ -164,7 +161,7 @@ where } #[tracing::instrument(skip_all, name = "resolve")] -async fn get_actual_destination(server_name: &ServerName) -> Result { +async fn get_actual_dest(server_name: &ServerName) -> Result { let cached; let cached_result = services() .globals @@ -174,17 +171,17 @@ async fn get_actual_destination(server_name: &ServerName) -> Result Result /// Numbers in comments below refer to bullet points in linked section of /// specification -async fn resolve_actual_destination(destination: &'_ ServerName) -> Result<(FedDest, String)> { - trace!("Finding actual destination for {destination}"); - let destination_str = destination.as_str().to_owned(); - let mut hostname = destination_str.clone(); - let actual_destination = match get_ip_with_port(&destination_str) { +async fn resolve_actual_dest(dest: &'_ ServerName) -> Result<(FedDest, String)> { + trace!("Finding actual destination for {dest}"); + let dest_str = dest.as_str().to_owned(); + let mut hostname = dest_str.clone(); + let actual_dest = match get_ip_with_port(&dest_str) { Some(host_port) => { debug!("1: IP literal with provided or default port"); host_port }, None => { - if let Some(pos) = destination_str.find(':') { + if let Some(pos) = dest_str.find(':') { debug!("2: Hostname with included port"); - let (host, port) = destination_str.split_at(pos); + let (host, port) = dest_str.split_at(pos); query_and_cache_override(host, host, port.parse::().unwrap_or(8448)).await?; FedDest::Named(host.to_owned(), port.to_owned()) } else { - trace!("Requesting well known for {destination}"); - if let Some(delegated_hostname) = request_well_known(destination.as_str()).await? { + trace!("Requesting well known for {dest}"); + if let Some(delegated_hostname) = request_well_known(dest.as_str()).await? { debug!("3: A .well-known file is available"); hostname = add_port_to_hostname(&delegated_hostname).into_uri_string(); match get_ip_with_port(&delegated_hostname) { @@ -255,7 +252,7 @@ async fn resolve_actual_destination(destination: &'_ ServerName) -> Result<(FedD } } else { trace!("4: No .well-known or an error occured"); - if let Some(hostname_override) = query_srv_record(&destination_str).await? { + if let Some(hostname_override) = query_srv_record(&dest_str).await? { debug!("4: No .well-known; SRV record found"); let force_port = hostname_override.port(); @@ -269,8 +266,8 @@ async fn resolve_actual_destination(destination: &'_ ServerName) -> Result<(FedD } } else { debug!("4: No .well-known; 5: No SRV record found"); - query_and_cache_override(&destination_str, &destination_str, 8448).await?; - add_port_to_hostname(&destination_str) + query_and_cache_override(&dest_str, &dest_str, 8448).await?; + add_port_to_hostname(&dest_str) } } } @@ -290,28 +287,28 @@ async fn resolve_actual_destination(destination: &'_ ServerName) -> Result<(FedD FedDest::Named(hostname, ":8448".to_owned()) }; - debug!("Actual destination: {actual_destination:?} hostname: {hostname:?}"); - Ok((actual_destination, hostname.into_uri_string())) + debug!("Actual destination: {actual_dest:?} hostname: {hostname:?}"); + Ok((actual_dest, hostname.into_uri_string())) } #[tracing::instrument(skip_all, name = "well-known")] -async fn request_well_known(destination: &str) -> Result> { +async fn request_well_known(dest: &str) -> Result> { if !services() .globals .resolver .overrides .read() .unwrap() - .contains_key(destination) + .contains_key(dest) { - query_and_cache_override(destination, destination, 8448).await?; + query_and_cache_override(dest, dest, 8448).await?; } let response = services() .globals .client .well_known - .get(&format!("https://{destination}/.well-known/matrix/server")) + .get(&format!("https://{dest}/.well-known/matrix/server")) .send() .await; @@ -347,7 +344,7 @@ async fn request_well_known(destination: &str) -> Result> { return Ok(None); } - debug_info!("{:?} found at {:?}", destination, m_server); + debug_info!("{:?} found at {:?}", dest, m_server); Ok(Some(m_server.to_owned())) } @@ -428,7 +425,7 @@ fn handle_resolve_error(e: &ResolveError) -> Result<()> { } } -fn sign_request(destination: &ServerName, http_request: &mut http::Request>) +fn sign_request(dest: &ServerName, http_request: &mut http::Request>) where T: OutgoingRequest + Debug, { @@ -451,7 +448,7 @@ where .into(), ); req_map.insert("origin".to_owned(), services().globals.server_name().as_str().into()); - req_map.insert("destination".to_owned(), destination.as_str().into()); + req_map.insert("destination".to_owned(), dest.as_str().into()); let mut req_json = serde_json::from_value(req_map.into()).expect("valid JSON is valid BTreeMap"); ruma::signatures::sign_json( @@ -502,25 +499,25 @@ fn validate_url(url: &Url) -> Result<()> { Ok(()) } -fn validate_destination(destination: &ServerName) -> Result<()> { - if destination == services().globals.server_name() { +fn validate_dest(dest: &ServerName) -> Result<()> { + if dest == services().globals.server_name() { return Err(Error::bad_config("Won't send federation request to ourselves")); } - if destination.is_ip_literal() || IPAddress::is_valid(destination.host()) { - validate_destination_ip_literal(destination)?; + if dest.is_ip_literal() || IPAddress::is_valid(dest.host()) { + validate_dest_ip_literal(dest)?; } Ok(()) } -fn validate_destination_ip_literal(destination: &ServerName) -> Result<()> { +fn validate_dest_ip_literal(dest: &ServerName) -> Result<()> { trace!("Destination is an IP literal, checking against IP range denylist.",); debug_assert!( - destination.is_ip_literal() || !IPAddress::is_valid(destination.host()), + dest.is_ip_literal() || !IPAddress::is_valid(dest.host()), "Destination is not an IP literal." ); - let ip = IPAddress::parse(destination.host()).map_err(|e| { + let ip = IPAddress::parse(dest.host()).map_err(|e| { debug_error!("Failed to parse IP literal from string: {}", e); Error::BadServerResponse("Invalid IP address") })?; @@ -538,20 +535,20 @@ fn validate_ip(ip: &IPAddress) -> Result<()> { Ok(()) } -fn get_ip_with_port(destination_str: &str) -> Option { - if let Ok(destination) = destination_str.parse::() { - Some(FedDest::Literal(destination)) - } else if let Ok(ip_addr) = destination_str.parse::() { +fn get_ip_with_port(dest_str: &str) -> Option { + if let Ok(dest) = dest_str.parse::() { + Some(FedDest::Literal(dest)) + } else if let Ok(ip_addr) = dest_str.parse::() { Some(FedDest::Literal(SocketAddr::new(ip_addr, 8448))) } else { None } } -fn add_port_to_hostname(destination_str: &str) -> FedDest { - let (host, port) = match destination_str.find(':') { - None => (destination_str, ":8448"), - Some(pos) => destination_str.split_at(pos), +fn add_port_to_hostname(dest_str: &str) -> FedDest { + let (host, port) = match dest_str.find(':') { + None => (dest_str, ":8448"), + Some(pos) => dest_str.split_at(pos), }; FedDest::Named(host.to_owned(), port.to_owned())