add remote fetch suite for authenticated media

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-08-27 11:18:57 +00:00
parent fcfb323cd5
commit ea2343850f
4 changed files with 381 additions and 6 deletions

View file

@ -1,5 +1,10 @@
use conduit::{debug, info, trace, warn, Result};
use ruma::{events::room::message::RoomMessageEventContent, EventId, Mxc, MxcUri, ServerName};
use std::time::Duration;
use conduit::{debug, info, trace, utils::time::parse_timepoint_ago, warn, Result};
use conduit_service::media::Dim;
use ruma::{
events::room::message::RoomMessageEventContent, EventId, Mxc, MxcUri, OwnedMxcUri, OwnedServerName, ServerName,
};
use crate::{admin_command, utils::parse_local_user_id};
@ -266,3 +271,44 @@ pub(super) async fn get_file_info(&self, mxc: OwnedMxcUri) -> Result<RoomMessage
Ok(RoomMessageEventContent::notice_markdown(format!("```\n{metadata:#?}\n```")))
}
#[admin_command]
pub(super) async fn get_remote_file(
&self, mxc: OwnedMxcUri, server: Option<OwnedServerName>, timeout: u32,
) -> Result<RoomMessageEventContent> {
let mxc: Mxc<'_> = mxc.as_str().try_into()?;
let timeout = Duration::from_millis(timeout.into());
let mut result = self
.services
.media
.fetch_remote_content(&mxc, None, server.as_deref(), timeout)
.await?;
// Grab the length of the content before clearing it to not flood the output
let len = result.content.as_ref().expect("content").len();
result.content.as_mut().expect("content").clear();
let out = format!("```\n{result:#?}\nreceived {len} bytes for file content.\n```");
Ok(RoomMessageEventContent::notice_markdown(out))
}
#[admin_command]
pub(super) async fn get_remote_thumbnail(
&self, mxc: OwnedMxcUri, server: Option<OwnedServerName>, timeout: u32, width: u32, height: u32,
) -> Result<RoomMessageEventContent> {
let mxc: Mxc<'_> = mxc.as_str().try_into()?;
let timeout = Duration::from_millis(timeout.into());
let dim = Dim::new(width, height, None);
let mut result = self
.services
.media
.fetch_remote_thumbnail(&mxc, None, server.as_deref(), timeout, &dim)
.await?;
// Grab the length of the content before clearing it to not flood the output
let len = result.content.as_ref().expect("content").len();
result.content.as_mut().expect("content").clear();
let out = format!("```\n{result:#?}\nreceived {len} bytes for file content.\n```");
Ok(RoomMessageEventContent::notice_markdown(out))
}

View file

@ -2,7 +2,7 @@ mod commands;
use clap::Subcommand;
use conduit::Result;
use ruma::{EventId, MxcUri, OwnedMxcUri, ServerName};
use ruma::{EventId, MxcUri, OwnedMxcUri, OwnedServerName, ServerName};
use crate::admin_command_dispatch;
@ -60,4 +60,32 @@ pub(super) enum MediaCommand {
/// The MXC URL to lookup info for.
mxc: OwnedMxcUri,
},
GetRemoteFile {
/// The MXC URL to fetch
mxc: OwnedMxcUri,
#[arg(short, long)]
server: Option<OwnedServerName>,
#[arg(short, long, default_value("10000"))]
timeout: u32,
},
GetRemoteThumbnail {
/// The MXC URL to fetch
mxc: OwnedMxcUri,
#[arg(short, long)]
server: Option<OwnedServerName>,
#[arg(short, long, default_value("10000"))]
timeout: u32,
#[arg(short, long)]
width: u32,
#[arg(short, long)]
height: u32,
},
}

View file

@ -8,6 +8,7 @@ use crate::{resolver, service};
pub struct Service {
pub default: reqwest::Client,
pub url_preview: reqwest::Client,
pub extern_media: reqwest::Client,
pub well_known: reqwest::Client,
pub federation: reqwest::Client,
pub sender: reqwest::Client,
@ -30,6 +31,11 @@ impl crate::Service for Service {
.redirect(redirect::Policy::limited(3))
.build()?,
extern_media: base(config)?
.dns_resolver(resolver.resolver.clone())
.redirect(redirect::Policy::limited(3))
.build()?,
well_known: base(config)?
.dns_resolver(resolver.resolver.hooked.clone())
.connect_timeout(Duration::from_secs(config.well_known_conn_timeout))

View file

@ -1,9 +1,304 @@
use std::time::Duration;
use std::{fmt::Debug, time::Duration};
use conduit::{debug_warn, err, implement, utils::content_disposition::make_content_disposition, Err, Error, Result};
use ruma::{api::client::media, Mxc};
use http::header::{HeaderValue, CONTENT_DISPOSITION, CONTENT_TYPE};
use ruma::{
api::{
client::{
error::ErrorKind::{NotFound, Unrecognized},
media,
},
federation,
federation::authenticated_media::{Content, FileOrLocation},
OutgoingRequest,
},
Mxc, ServerName, UserId,
};
use super::Dim;
use super::{Dim, FileMeta};
#[implement(super::Service)]
pub async fn fetch_remote_thumbnail(
&self, mxc: &Mxc<'_>, user: Option<&UserId>, server: Option<&ServerName>, timeout_ms: Duration, dim: &Dim,
) -> Result<FileMeta> {
self.check_fetch_authorized(mxc)?;
let result = self
.fetch_thumbnail_unauthenticated(mxc, user, server, timeout_ms, dim)
.await;
if let Err(Error::Request(NotFound, ..)) = &result {
return self
.fetch_thumbnail_authenticated(mxc, user, server, timeout_ms, dim)
.await;
}
result
}
#[implement(super::Service)]
pub async fn fetch_remote_content(
&self, mxc: &Mxc<'_>, user: Option<&UserId>, server: Option<&ServerName>, timeout_ms: Duration,
) -> Result<FileMeta> {
self.check_fetch_authorized(mxc)?;
let result = self
.fetch_content_unauthenticated(mxc, user, server, timeout_ms)
.await;
if let Err(Error::Request(NotFound, ..)) = &result {
return self
.fetch_content_authenticated(mxc, user, server, timeout_ms)
.await;
}
result
}
#[implement(super::Service)]
async fn fetch_thumbnail_authenticated(
&self, mxc: &Mxc<'_>, user: Option<&UserId>, server: Option<&ServerName>, timeout_ms: Duration, dim: &Dim,
) -> Result<FileMeta> {
use federation::authenticated_media::get_content_thumbnail::v1::{Request, Response};
let request = Request {
media_id: mxc.media_id.into(),
method: dim.method.clone().into(),
width: dim.width.into(),
height: dim.height.into(),
animated: true.into(),
timeout_ms,
};
let Response {
content,
..
} = self.federation_request(mxc, user, server, request).await?;
match content {
FileOrLocation::File(content) => self.handle_thumbnail_file(mxc, user, dim, content).await,
FileOrLocation::Location(location) => self.handle_location(mxc, user, &location).await,
}
}
#[implement(super::Service)]
async fn fetch_content_authenticated(
&self, mxc: &Mxc<'_>, user: Option<&UserId>, server: Option<&ServerName>, timeout_ms: Duration,
) -> Result<FileMeta> {
use federation::authenticated_media::get_content::v1::{Request, Response};
let request = Request {
media_id: mxc.media_id.into(),
timeout_ms,
};
let Response {
content,
..
} = self.federation_request(mxc, user, server, request).await?;
match content {
FileOrLocation::File(content) => self.handle_content_file(mxc, user, content).await,
FileOrLocation::Location(location) => self.handle_location(mxc, user, &location).await,
}
}
#[allow(deprecated)]
#[implement(super::Service)]
async fn fetch_thumbnail_unauthenticated(
&self, mxc: &Mxc<'_>, user: Option<&UserId>, server: Option<&ServerName>, timeout_ms: Duration, dim: &Dim,
) -> Result<FileMeta> {
use media::get_content_thumbnail::v3::{Request, Response};
let request = Request {
allow_remote: true,
allow_redirect: true,
animated: true.into(),
method: dim.method.clone().into(),
width: dim.width.into(),
height: dim.height.into(),
server_name: mxc.server_name.into(),
media_id: mxc.media_id.into(),
timeout_ms,
};
let Response {
file,
content_type,
content_disposition,
..
} = self.federation_request(mxc, user, server, request).await?;
let content = Content {
file,
content_type,
content_disposition,
};
self.handle_thumbnail_file(mxc, user, dim, content).await
}
#[allow(deprecated)]
#[implement(super::Service)]
async fn fetch_content_unauthenticated(
&self, mxc: &Mxc<'_>, user: Option<&UserId>, server: Option<&ServerName>, timeout_ms: Duration,
) -> Result<FileMeta> {
use media::get_content::v3::{Request, Response};
let request = Request {
allow_remote: true,
allow_redirect: true,
server_name: mxc.server_name.into(),
media_id: mxc.media_id.into(),
timeout_ms,
};
let Response {
file,
content_type,
content_disposition,
..
} = self.federation_request(mxc, user, server, request).await?;
let content = Content {
file,
content_type,
content_disposition,
};
self.handle_content_file(mxc, user, content).await
}
#[implement(super::Service)]
async fn handle_thumbnail_file(
&self, mxc: &Mxc<'_>, user: Option<&UserId>, dim: &Dim, content: Content,
) -> Result<FileMeta> {
let content_disposition =
make_content_disposition(content.content_disposition.as_ref(), content.content_type.as_deref(), None);
self.upload_thumbnail(
mxc,
user,
Some(&content_disposition),
content.content_type.as_deref(),
dim,
&content.file,
)
.await
.map(|()| FileMeta {
content: Some(content.file),
content_type: content.content_type.map(Into::into),
content_disposition: Some(content_disposition),
})
}
#[implement(super::Service)]
async fn handle_content_file(&self, mxc: &Mxc<'_>, user: Option<&UserId>, content: Content) -> Result<FileMeta> {
let content_disposition =
make_content_disposition(content.content_disposition.as_ref(), content.content_type.as_deref(), None);
self.create(
mxc,
user,
Some(&content_disposition),
content.content_type.as_deref(),
&content.file,
)
.await
.map(|()| FileMeta {
content: Some(content.file),
content_type: content.content_type.map(Into::into),
content_disposition: Some(content_disposition),
})
}
#[implement(super::Service)]
async fn handle_location(&self, mxc: &Mxc<'_>, user: Option<&UserId>, location: &str) -> Result<FileMeta> {
self.location_request(location).await.map_err(|error| {
err!(Request(NotFound(
debug_warn!(%mxc, ?user, ?location, ?error, "Fetching media from location failed")
)))
})
}
#[implement(super::Service)]
async fn location_request(&self, location: &str) -> Result<FileMeta> {
let response = self
.services
.client
.extern_media
.get(location)
.send()
.await?;
let content_type = response
.headers()
.get(CONTENT_TYPE)
.map(HeaderValue::to_str)
.and_then(Result::ok)
.map(str::to_owned);
let content_disposition = response
.headers()
.get(CONTENT_DISPOSITION)
.map(HeaderValue::as_bytes)
.map(TryFrom::try_from)
.and_then(Result::ok);
response
.bytes()
.await
.map(Vec::from)
.map_err(Into::into)
.map(|content| FileMeta {
content: Some(content),
content_type: content_type.clone().map(Into::into),
content_disposition: Some(make_content_disposition(
content_disposition.as_ref(),
content_type.as_deref(),
None,
)),
})
}
#[implement(super::Service)]
async fn federation_request<Request>(
&self, mxc: &Mxc<'_>, user: Option<&UserId>, server: Option<&ServerName>, request: Request,
) -> Result<Request::IncomingResponse>
where
Request: OutgoingRequest + Send + Debug,
{
self.services
.sending
.send_federation_request(server.unwrap_or(mxc.server_name), request)
.await
.map_err(|error| handle_federation_error(mxc, user, server, error))
}
// Handles and adjusts the error for the caller to determine if they should
// request the fallback endpoint or give up.
fn handle_federation_error(mxc: &Mxc<'_>, user: Option<&UserId>, server: Option<&ServerName>, error: Error) -> Error {
let fallback = || {
err!(Request(NotFound(
debug_error!(%mxc, ?user, ?server, ?error, "Remote media not found")
)))
};
// Matrix server responses for fallback always taken.
if error.kind() == NotFound || error.kind() == Unrecognized {
return fallback();
}
// If we get these from any middleware we'll try the other endpoint rather than
// giving up too early.
if error.status_code().is_client_error() || error.status_code().is_redirection() {
return fallback();
}
// Reached for 5xx errors. This is where we don't fallback given the likelyhood
// the other endpoint will also be a 5xx and we're wasting time.
error
}
#[implement(super::Service)]
#[allow(deprecated)]