split GetPdu and GetRemotePdu, handle response as incoming PDU
Signed-off-by: strawberry <strawberry@puppygock.gay>
This commit is contained in:
parent
10336f9af6
commit
9dc4290438
2 changed files with 152 additions and 191 deletions
|
@ -746,7 +746,7 @@ pub async fn send_transaction_message_route(
|
|||
Ok(t) => t,
|
||||
Err(e) => {
|
||||
warn!("Could not parse PDU: {e}");
|
||||
warn!("Full PDU: {:?}", &pdu);
|
||||
info!("Full PDU: {:?}", &pdu);
|
||||
continue;
|
||||
},
|
||||
};
|
||||
|
|
|
@ -29,7 +29,10 @@ use tracing::{debug, error, info, warn};
|
|||
|
||||
use super::pdu::PduBuilder;
|
||||
use crate::{
|
||||
api::client_server::{get_alias_helper, leave_all_rooms, leave_room, AUTO_GEN_PASSWORD_LENGTH},
|
||||
api::{
|
||||
client_server::{get_alias_helper, leave_all_rooms, leave_room, AUTO_GEN_PASSWORD_LENGTH},
|
||||
server_server::parse_incoming_pdu,
|
||||
},
|
||||
services,
|
||||
utils::{self, HtmlEscape},
|
||||
Error, PduEvent, Result,
|
||||
|
@ -375,26 +378,26 @@ enum DebugCommand {
|
|||
/// the command.
|
||||
ParsePdu,
|
||||
|
||||
/// - Retrieve and print a PDU by ID from the conduwuit database or from a
|
||||
/// remote server
|
||||
/// - Retrieve and print a PDU by ID from the conduwuit database
|
||||
GetPdu {
|
||||
/// An event ID (a $ followed by the base64 reference hash)
|
||||
event_id: Box<EventId>,
|
||||
|
||||
/// Optional argument for us to attempt to fetch the event from the
|
||||
/// specified remote server. Inserts it into our database/timeline if
|
||||
/// found.
|
||||
/// Use --fetch-first to fetch from the remote server first instead of
|
||||
/// checking our database first.
|
||||
server: Option<Box<ServerName>>,
|
||||
|
||||
/// Optional argument alongside `server` to attempt to fetch the event
|
||||
/// from the remote server first instead of checking our database first.
|
||||
#[arg(short, long)]
|
||||
fetch_first: bool,
|
||||
},
|
||||
|
||||
/// - Forces device lists for all the local users to be updated
|
||||
/// - Attempts to retrieve a PDU from a remote server. Inserts it into our
|
||||
/// database/timeline if found and we do not have this PDU already
|
||||
/// (following normal event auth rules, handles it as an incoming PDU).
|
||||
GetRemotePdu {
|
||||
/// An event ID (a $ followed by the base64 reference hash)
|
||||
event_id: Box<EventId>,
|
||||
|
||||
/// Argument for us to attempt to fetch the event from the
|
||||
/// specified remote server.
|
||||
server: Box<ServerName>,
|
||||
},
|
||||
|
||||
/// - Forces device lists for all local and remote users to be updated (as
|
||||
/// having new keys available)
|
||||
ForceDeviceListUpdates,
|
||||
}
|
||||
|
||||
|
@ -1921,51 +1924,7 @@ impl Service {
|
|||
},
|
||||
DebugCommand::GetPdu {
|
||||
event_id,
|
||||
server,
|
||||
fetch_first,
|
||||
} => {
|
||||
if fetch_first {
|
||||
if let Some(destination) = server {
|
||||
match services()
|
||||
.sending
|
||||
.send_federation_request(
|
||||
&destination,
|
||||
ruma::api::federation::event::get_event::v1::Request {
|
||||
event_id: event_id.to_owned().into(),
|
||||
},
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(response) => {
|
||||
let json: CanonicalJsonObject =
|
||||
serde_json::from_str(response.pdu.get()).map_err(|e| {
|
||||
warn!(
|
||||
"Requested event ID {event_id} from server but failed to convert from \
|
||||
RawValue to CanonicalJsonObject (malformed event/response?): {e}"
|
||||
);
|
||||
Error::BadRequest(
|
||||
ErrorKind::Unknown,
|
||||
"Received response from server but failed to parse PDU",
|
||||
)
|
||||
})?;
|
||||
|
||||
let json_text =
|
||||
serde_json::to_string_pretty(&json).expect("canonical json is valid json");
|
||||
|
||||
return Ok(RoomMessageEventContent::text_html(
|
||||
format!("{}\n```json\n{}\n```", "Got PDU from specified server", json_text),
|
||||
format!(
|
||||
"<p>{}</p>\n<pre><code class=\"language-json\">{}\n</code></pre>\n",
|
||||
"Got PDU from specified server",
|
||||
HtmlEscape(&json_text)
|
||||
),
|
||||
));
|
||||
},
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"Failed sending request to {destination} for PDU {event_id}, checking if we \
|
||||
have it locally. Error message: {e}"
|
||||
);
|
||||
let mut outlier = false;
|
||||
let mut pdu_json = services().rooms.timeline.get_non_outlier_pdu_json(&event_id)?;
|
||||
if pdu_json.is_none() {
|
||||
|
@ -1974,8 +1933,7 @@ impl Service {
|
|||
}
|
||||
match pdu_json {
|
||||
Some(json) => {
|
||||
let json_text = serde_json::to_string_pretty(&json)
|
||||
.expect("canonical json is valid json");
|
||||
let json_text = serde_json::to_string_pretty(&json).expect("canonical json is valid json");
|
||||
return Ok(RoomMessageEventContent::text_html(
|
||||
format!(
|
||||
"{}\n```json\n{}\n```",
|
||||
|
@ -1998,111 +1956,114 @@ impl Service {
|
|||
));
|
||||
},
|
||||
None => {
|
||||
return Ok(RoomMessageEventContent::text_plain(
|
||||
"PDU not found locally and failed sending request to server for PDU.",
|
||||
));
|
||||
},
|
||||
}
|
||||
},
|
||||
}
|
||||
} else {
|
||||
return Ok(RoomMessageEventContent::text_plain(
|
||||
"--fetch-first was specified but with no server. Please specify a server.",
|
||||
));
|
||||
}
|
||||
} else {
|
||||
debug!("Attempting to get PDU {event_id} locally");
|
||||
let mut outlier = false;
|
||||
let mut pdu_json = services().rooms.timeline.get_non_outlier_pdu_json(&event_id)?;
|
||||
if pdu_json.is_none() {
|
||||
outlier = true;
|
||||
pdu_json = services().rooms.timeline.get_pdu_json(&event_id)?;
|
||||
}
|
||||
match pdu_json {
|
||||
Some(json) => {
|
||||
let json_text =
|
||||
serde_json::to_string_pretty(&json).expect("canonical json is valid json");
|
||||
return Ok(RoomMessageEventContent::text_html(
|
||||
format!(
|
||||
"{}\n```json\n{}\n```",
|
||||
if outlier {
|
||||
"Outlier PDU found in our database"
|
||||
} else {
|
||||
"PDU found in our database"
|
||||
},
|
||||
json_text
|
||||
),
|
||||
format!(
|
||||
"<p>{}</p>\n<pre><code class=\"language-json\">{}\n</code></pre>\n",
|
||||
if outlier {
|
||||
"Outlier PDU found in our database"
|
||||
} else {
|
||||
"PDU found in our database"
|
||||
},
|
||||
HtmlEscape(&json_text)
|
||||
),
|
||||
));
|
||||
},
|
||||
None => {
|
||||
if let Some(destination) = server {
|
||||
debug!(
|
||||
"We don't have PDU {event_id} in our database and server was specified, \
|
||||
fetching from remote server {destination}"
|
||||
);
|
||||
match services()
|
||||
.sending
|
||||
.send_federation_request(
|
||||
&destination,
|
||||
ruma::api::federation::event::get_event::v1::Request {
|
||||
event_id: event_id.to_owned().into(),
|
||||
},
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(response) => {
|
||||
let json: CanonicalJsonObject = serde_json::from_str(response.pdu.get())
|
||||
.map_err(|e| {
|
||||
warn!(
|
||||
"Requested event ID {event_id} from server but failed to \
|
||||
convert from RawValue to CanonicalJsonObject (malformed \
|
||||
event/response?): {e}"
|
||||
);
|
||||
Error::BadRequest(
|
||||
ErrorKind::Unknown,
|
||||
"Received response from server but failed to parse PDU",
|
||||
)
|
||||
})?;
|
||||
|
||||
let json_text = serde_json::to_string_pretty(&json)
|
||||
.expect("canonical json is valid json");
|
||||
|
||||
return Ok(RoomMessageEventContent::text_html(
|
||||
format!(
|
||||
"{}\n```json\n{}\n```",
|
||||
"Got PDU from specified server", json_text
|
||||
),
|
||||
format!(
|
||||
"<p>{}</p>\n<pre><code class=\"language-json\">{}\n</code></pre>\n",
|
||||
"Got PDU from specified server",
|
||||
HtmlEscape(&json_text)
|
||||
),
|
||||
));
|
||||
},
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"Failed sending request to {destination} for PDU {event_id}. Error \
|
||||
message: {e}"
|
||||
);
|
||||
return Ok(RoomMessageEventContent::text_plain(
|
||||
"PDU not found locally and failed sending request to server for PDU.",
|
||||
));
|
||||
},
|
||||
}
|
||||
} else {
|
||||
return Ok(RoomMessageEventContent::text_plain("PDU not found locally."));
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
DebugCommand::GetRemotePdu {
|
||||
event_id,
|
||||
server,
|
||||
} => {
|
||||
// TODO: use Futures as some requests may take a while so we dont block the
|
||||
// admin room
|
||||
match services()
|
||||
.sending
|
||||
.send_federation_request(
|
||||
&server,
|
||||
ruma::api::federation::event::get_event::v1::Request {
|
||||
event_id: event_id.to_owned().into(),
|
||||
},
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(response) => {
|
||||
let json: CanonicalJsonObject = serde_json::from_str(response.pdu.get()).map_err(|e| {
|
||||
warn!(
|
||||
"Requested event ID {event_id} from server but failed to convert from RawValue to \
|
||||
CanonicalJsonObject (malformed event/response?): {e}"
|
||||
);
|
||||
Error::BadRequest(
|
||||
ErrorKind::Unknown,
|
||||
"Received response from server but failed to parse PDU",
|
||||
)
|
||||
})?;
|
||||
|
||||
debug!("Attempting to parse PDU: {:?}", &response.pdu);
|
||||
let parsed_pdu = {
|
||||
let parsed_result = parse_incoming_pdu(&response.pdu);
|
||||
let (event_id, value, room_id) = match parsed_result {
|
||||
Ok(t) => t,
|
||||
Err(e) => {
|
||||
warn!("Failed to parse PDU: {e}");
|
||||
info!("Full PDU: {:?}", &response.pdu);
|
||||
return Ok(RoomMessageEventContent::text_plain(format!(
|
||||
"Failed to parse PDU remote server {server} sent us: {e}"
|
||||
)));
|
||||
},
|
||||
};
|
||||
|
||||
vec![(event_id, value, room_id)]
|
||||
};
|
||||
|
||||
let pub_key_map = RwLock::new(BTreeMap::new());
|
||||
|
||||
debug!("Attempting to fetch homeserver signing keys for {server}");
|
||||
services()
|
||||
.rooms
|
||||
.event_handler
|
||||
.fetch_required_signing_keys(
|
||||
parsed_pdu.iter().map(|(_event_id, event, _room_id)| event),
|
||||
&pub_key_map,
|
||||
)
|
||||
.await
|
||||
.unwrap_or_else(|e| {
|
||||
warn!("Could not fetch all signatures for PDUs from {server}: {e:?}");
|
||||
});
|
||||
|
||||
info!("Attempting to handle event ID {event_id} as incoming PDU");
|
||||
for (event_id, value, room_id) in parsed_pdu {
|
||||
let mutex = Arc::clone(
|
||||
services()
|
||||
.globals
|
||||
.roomid_mutex_federation
|
||||
.write()
|
||||
.await
|
||||
.entry(room_id.clone())
|
||||
.or_default(),
|
||||
);
|
||||
let mutex_lock = mutex.lock().await;
|
||||
|
||||
services()
|
||||
.rooms
|
||||
.event_handler
|
||||
.handle_incoming_pdu(&server, &event_id, &room_id, value, false, &pub_key_map)
|
||||
.await?;
|
||||
|
||||
drop(mutex_lock);
|
||||
}
|
||||
|
||||
let json_text = serde_json::to_string_pretty(&json).expect("canonical json is valid json");
|
||||
|
||||
return Ok(RoomMessageEventContent::text_html(
|
||||
format!(
|
||||
"{}\n```json\n{}\n```",
|
||||
"Got PDU from specified server and handled as incoming PDU successfully. Event \
|
||||
body:",
|
||||
json_text
|
||||
),
|
||||
format!(
|
||||
"<p>{}</p>\n<pre><code class=\"language-json\">{}\n</code></pre>\n",
|
||||
"Got PDU from specified server and handled as incoming PDU successfully. Event \
|
||||
body:",
|
||||
HtmlEscape(&json_text)
|
||||
),
|
||||
));
|
||||
},
|
||||
Err(_) => {
|
||||
return Ok(RoomMessageEventContent::text_plain(
|
||||
"Remote server did not have PDU or failed sending request to remote server.",
|
||||
));
|
||||
},
|
||||
}
|
||||
},
|
||||
DebugCommand::ForceDeviceListUpdates => {
|
||||
|
|
Loading…
Add table
Reference in a new issue