1
0
Fork 0
mirror of https://gitlab.com/famedly/conduit.git synced 2025-09-05 18:41:00 +00:00

feat(service/media): add S3 support

This commit is contained in:
AndSDev 2025-06-06 08:56:19 +03:00 committed by Matthias Ahouansou
parent 470e4770cc
commit 6d227019ec
No known key found for this signature in database
7 changed files with 381 additions and 68 deletions

75
Cargo.lock generated
View file

@ -533,6 +533,7 @@ dependencies = [
"rusqlite", "rusqlite",
"rust-argon2", "rust-argon2",
"rust-rocksdb", "rust-rocksdb",
"rusty-s3",
"sd-notify", "sd-notify",
"serde", "serde",
"serde_html_form", "serde_html_form",
@ -1582,6 +1583,30 @@ version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c"
[[package]]
name = "jiff"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be1f93b8b1eb69c77f24bbb0afdf66f54b632ee39af40ca21c4365a1d7347e49"
dependencies = [
"jiff-static",
"log",
"portable-atomic",
"portable-atomic-util",
"serde",
]
[[package]]
name = "jiff-static"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03343451ff899767262ec32146f6d559dd759fdadf42ff0e227c7c48f72594b4"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "jobserver" name = "jobserver"
version = "0.1.33" version = "0.1.33"
@ -1680,7 +1705,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07033963ba89ebaf1584d767badaa2e8fcec21aedea6b8c0346d487d49c28667" checksum = "07033963ba89ebaf1584d767badaa2e8fcec21aedea6b8c0346d487d49c28667"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"windows-targets 0.48.5", "windows-targets 0.52.6",
] ]
[[package]] [[package]]
@ -1808,6 +1833,16 @@ version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3"
[[package]]
name = "md-5"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf"
dependencies = [
"cfg-if",
"digest",
]
[[package]] [[package]]
name = "memchr" name = "memchr"
version = "2.7.5" version = "2.7.5"
@ -2213,6 +2248,15 @@ version = "1.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483" checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483"
[[package]]
name = "portable-atomic-util"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8a2f0d8d040d7848a709caf78912debcc3f33ee4b3cac47d73d1e1069e83507"
dependencies = [
"portable-atomic",
]
[[package]] [[package]]
name = "potential_utf" name = "potential_utf"
version = "0.1.2" version = "0.1.2"
@ -2313,6 +2357,16 @@ version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a993555f31e5a609f617c12db6250dedcac1b0a85076912c436e6fc9b2c8e6a3" checksum = "a993555f31e5a609f617c12db6250dedcac1b0a85076912c436e6fc9b2c8e6a3"
[[package]]
name = "quick-xml"
version = "0.38.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9845d9dccf565065824e69f9f235fafba1587031eda353c1f1561cd6a6be78f4"
dependencies = [
"memchr",
"serde",
]
[[package]] [[package]]
name = "quote" name = "quote"
version = "1.0.40" version = "1.0.40"
@ -2890,6 +2944,25 @@ version = "1.0.21"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a0d197bd2c9dc6e53b84da9556a69ba4cdfab8619eb41a8bd1cc2027a0f6b1d" checksum = "8a0d197bd2c9dc6e53b84da9556a69ba4cdfab8619eb41a8bd1cc2027a0f6b1d"
[[package]]
name = "rusty-s3"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fac2edd2f0b56bd79a7343f49afc01c2d41010df480538a510e0abc56044f66c"
dependencies = [
"base64 0.22.1",
"hmac",
"jiff",
"md-5",
"percent-encoding",
"quick-xml",
"serde",
"serde_json",
"sha2",
"url",
"zeroize",
]
[[package]] [[package]]
name = "ryu" name = "ryu"
version = "1.0.20" version = "1.0.20"

View file

@ -154,6 +154,8 @@ tikv-jemallocator = { version = "0.6", features = [
sd-notify = { version = "0.4", optional = true } sd-notify = { version = "0.4", optional = true }
# Used for inspecting request errors # Used for inspecting request errors
http-body-util = "0.1.3" http-body-util = "0.1.3"
# Used for S3 media backend
rusty-s3 = "0.8.1"
# Used for matrix spec type definitions and helpers # Used for matrix spec type definitions and helpers
[dependencies.ruma] [dependencies.ruma]

View file

@ -99,6 +99,33 @@ depth = 4
length = 2 length = 2
``` ```
#### S3 backend
The S3 backend has the following fields:
- `endpoint`: The URL of the S3 endpoint to connect to
- `bucket`: The name of the S3 bucket to use for storage. This bucket must already exist and your credentials must have access to it
- `region`: The region where your S3 bucket is located
- `path`: The base directory where all the media files will be stored (defaults to
root of the bucket)
- `key`: Your Access Key ID
- `secret`: Your Secret Access Key
- `duration`: The time (in seconds) that signed requests to the S3 bucket will be valid (default: ` 30`)
- `bucket_use_path`: Controls the structure of the path to files in S3. If `true`, the bucket name will be included as part of the file path. If `false` (or omitted), it will be used as the bucket name in the domain name
- `directory_structure`: This is a table, used to configure how files are to be distributed within
the media directory (see [Filesystem backend](#filesystem-backend) for details)
##### Example:
```toml
[global.media]
backend = "s3"
endpoint = "http://minio:9000"
bucket = "test"
region = "minio"
key = "<s3_key>"
secret = "<s3_secret>"
duration = 15
bucket_use_path = false
```
#### Retention policies #### Retention policies
Over time, the amount of media will keep growing, even if they were only accessed once. Over time, the amount of media will keep growing, even if they were only accessed once.
Retention policies allow for media files to automatically be deleted if they meet certain crietia, Retention policies allow for media files to automatically be deleted if they meet certain crietia,

View file

@ -242,6 +242,7 @@ impl From<IncompleteConfig> for Config {
}), }),
directory_structure, directory_structure,
}, },
IncompleteMediaBackendConfig::S3(value) => MediaBackendConfig::S3(value),
}, },
retention: media.retention.into(), retention: media.retention.into(),
}; };
@ -481,6 +482,7 @@ pub enum IncompleteMediaBackendConfig {
#[serde(default)] #[serde(default)]
directory_structure: DirectoryStructure, directory_structure: DirectoryStructure,
}, },
S3(S3MediaBackend),
} }
impl Default for IncompleteMediaBackendConfig { impl Default for IncompleteMediaBackendConfig {
@ -498,6 +500,7 @@ pub enum MediaBackendConfig {
path: String, path: String,
directory_structure: DirectoryStructure, directory_structure: DirectoryStructure,
}, },
S3(S3MediaBackend),
} }
#[derive(Debug, Clone, Deserialize)] #[derive(Debug, Clone, Deserialize)]
@ -554,6 +557,58 @@ impl TryFrom<ShadowDirectoryStructure> for DirectoryStructure {
} }
} }
#[derive(Deserialize)]
struct ShadowS3MediaBackend {
endpoint: Url,
bucket: String,
region: String,
path: Option<String>,
key: String,
secret: String,
#[serde(default = "default_s3_duration")]
duration: u64,
#[serde(default = "false_fn")]
bucket_use_path: bool,
#[serde(default)]
directory_structure: DirectoryStructure,
}
impl TryFrom<ShadowS3MediaBackend> for S3MediaBackend {
type Error = Error;
fn try_from(value: ShadowS3MediaBackend) -> Result<Self, Self::Error> {
let path_style = if value.bucket_use_path {
rusty_s3::UrlStyle::Path
} else {
rusty_s3::UrlStyle::VirtualHost
};
let credentials = rusty_s3::Credentials::new(value.key, value.secret);
match rusty_s3::Bucket::new(value.endpoint, path_style, value.bucket, value.region) {
Ok(bucket) => Ok(S3MediaBackend {
bucket: Box::new(bucket),
credentials: Box::new(credentials),
duration: Duration::from_secs(value.duration),
path: value.path,
directory_structure: value.directory_structure,
}),
Err(_) => Err(Error::bad_config("Invalid S3 config")),
}
}
}
#[derive(Deserialize, Debug, Clone)]
#[serde(try_from = "ShadowS3MediaBackend")]
pub struct S3MediaBackend {
pub bucket: Box<rusty_s3::Bucket>,
pub credentials: Box<rusty_s3::Credentials>,
pub duration: Duration,
pub path: Option<String>,
pub directory_structure: DirectoryStructure,
}
const DEPRECATED_KEYS: &[&str] = &[ const DEPRECATED_KEYS: &[&str] = &[
"cache_capacity", "cache_capacity",
"turn_username", "turn_username",
@ -727,3 +782,7 @@ fn default_openid_token_ttl() -> u64 {
pub fn default_default_room_version() -> RoomVersionId { pub fn default_default_room_version() -> RoomVersionId {
RoomVersionId::V12 RoomVersionId::V12
} }
fn default_s3_duration() -> u64 {
30
}

View file

@ -231,8 +231,6 @@ impl Service {
shutdown: AtomicBool::new(false), shutdown: AtomicBool::new(false),
}; };
// Remove this exception once other media backends are added
#[allow(irrefutable_let_patterns)]
if let MediaBackendConfig::FileSystem { path, .. } = &s.config.media.backend { if let MediaBackendConfig::FileSystem { path, .. } = &s.config.media.backend {
fs::create_dir_all(path)?; fs::create_dir_all(path)?;
} }
@ -488,23 +486,40 @@ impl Service {
directory_structure: &DirectoryStructure, directory_structure: &DirectoryStructure,
sha256_hex: &str, sha256_hex: &str,
) -> Result<PathBuf> { ) -> Result<PathBuf> {
let mut r = PathBuf::new(); Ok(PathBuf::from_iter(self.split_media_path(
r.push(media_directory); Some(media_directory),
directory_structure,
sha256_hex,
)))
}
if let DirectoryStructure::Deep { length, depth } = directory_structure { pub fn split_media_path<'a>(
&self,
media_directory: Option<&'a str>,
directory_structure: &DirectoryStructure,
sha256_hex: &'a str,
) -> Vec<&'a str> {
match directory_structure {
DirectoryStructure::Flat => match media_directory {
Some(path) => vec![path, sha256_hex],
None => vec![sha256_hex],
},
DirectoryStructure::Deep { length, depth } => {
let mut r: Vec<&'a str> = Vec::with_capacity((depth.get() + 2).into());
if let Some(path) = media_directory {
r.push(path);
}
let mut filename = sha256_hex; let mut filename = sha256_hex;
for _ in 0..depth.get() { for _ in 0..depth.get() {
let (current_path, next) = filename.split_at(length.get().into()); let (current_path, next) = filename.split_at(length.get().into());
filename = next; filename = next;
r.push(current_path); r.push(current_path);
} }
r.push(filename); r.push(filename);
} else {
r.push(sha256_hex);
}
Ok(r) r
}
}
} }
pub async fn shutdown(&self) { pub async fn shutdown(&self) {

View file

@ -2,17 +2,21 @@ mod data;
use std::{io::Cursor, sync::Arc}; use std::{io::Cursor, sync::Arc};
pub use data::Data; pub use data::Data;
use futures_util::{stream, StreamExt}; use http::StatusCode;
use ruma::{ use ruma::{
api::client::{error::ErrorKind, media::is_safe_inline_content_type}, api::client::{error::ErrorKind, media::is_safe_inline_content_type},
http_headers::{ContentDisposition, ContentDispositionType}, http_headers::{ContentDisposition, ContentDispositionType},
OwnedServerName, ServerName, UserId, OwnedServerName, ServerName, UserId,
}; };
use rusty_s3::{
actions::{DeleteObjectsResponse, ObjectIdentifier},
S3Action,
};
use sha2::{digest::Output, Digest, Sha256}; use sha2::{digest::Output, Digest, Sha256};
use tracing::{error, info}; use tracing::{error, info, warn};
use crate::{ use crate::{
config::{DirectoryStructure, MediaBackendConfig}, config::{DirectoryStructure, MediaBackendConfig, S3MediaBackend},
services, utils, Error, Result, services, utils, Error, Result,
}; };
use image::imageops::FilterType; use image::imageops::FilterType;
@ -615,6 +619,39 @@ impl Service {
file file
} }
MediaBackendConfig::S3(s3) => {
let sha256_hex = hex::encode(sha256_digest);
let file_name = services()
.globals
.split_media_path(s3.path.as_deref(), &s3.directory_structure, &sha256_hex)
.join("/");
let url = s3
.bucket
.get_object(Some(&s3.credentials), &file_name)
.sign(s3.duration);
let client = services().globals.default_client();
let resp = client.get(url).send().await?;
if resp.status() == StatusCode::NOT_FOUND {
return Err(Error::BadRequest(
ErrorKind::NotFound,
"File does not exist",
));
}
if !resp.status().is_success() {
error!(
"Failed to get file with sha256 hash of \"{}\" from S3 bucket: {}",
sha256_hex,
resp.text().await?
);
return Err(Error::BadS3Response(
"Failed to get media file from S3 bucket",
));
}
resp.bytes().await?.to_vec()
}
}; };
if let Some((server_name, media_id)) = original_file_id { if let Some((server_name, media_id)) = original_file_id {
@ -650,45 +687,88 @@ pub async fn create_file(sha256_hex: &str, file: &[u8]) -> Result<()> {
let mut f = File::create(path).await?; let mut f = File::create(path).await?;
f.write_all(file).await?; f.write_all(file).await?;
} }
MediaBackendConfig::S3(s3) => {
let file_name = services()
.globals
.split_media_path(s3.path.as_deref(), &s3.directory_structure, sha256_hex)
.join("/");
let url = s3
.bucket
.put_object(Some(&s3.credentials), &file_name)
.sign(s3.duration);
let client = services().globals.default_client();
let resp = client.put(url).body(file.to_vec()).send().await?;
if !resp.status().is_success() {
error!(
"Failed to upload file with sha256 hash of \"{}\" to S3 bucket: {}",
sha256_hex,
resp.text().await?
);
return Err(Error::BadS3Response(
"Failed to upload media file to S3 bucket",
));
}
}
} }
Ok(()) Ok(())
} }
/// The size of a chunk for S3 delete operation.
const S3_CHUNK_SIZE: usize = 1000;
/// Purges the given files from the media backend /// Purges the given files from the media backend
/// Returns a `Vec` of errors that occurred when attempting to delete the files /// Returns a `Vec` of errors that occurred when attempting to delete the files
/// ///
/// Note: this does NOT remove the related metadata from the database /// Note: this does NOT remove the related metadata from the database
async fn purge_files(hashes: Vec<Result<String>>) -> Vec<Error> { async fn purge_files(hashes: Vec<Result<String>>) -> Vec<Error> {
stream::iter(hashes) let (ok_values, err_values): (Vec<_>, Vec<_>) =
.then(|hash| async move { hashes.into_iter().partition(|result| result.is_ok());
match hash {
Ok(v) => delete_file(&v).await, let mut result: Vec<Error> = err_values.into_iter().map(Result::unwrap_err).collect();
Err(e) => Err(e),
} let to_delete: Vec<String> = ok_values.into_iter().map(Result::unwrap).collect();
})
.filter_map(|r| async {
if let Err(e) = r {
Some(e)
} else {
None
}
})
.collect()
.await
}
/// Deletes the given file from the media backend
///
/// Note: this does NOT remove the related metadata from the database
async fn delete_file(sha256_hex: &str) -> Result<()> {
match &services().globals.config.media.backend { match &services().globals.config.media.backend {
MediaBackendConfig::FileSystem { MediaBackendConfig::FileSystem {
path, path,
directory_structure, directory_structure,
} => { } => {
let mut path = for v in to_delete {
services() if let Err(err) = delete_file_fs(path, directory_structure, &v).await {
result.push(err);
}
}
}
MediaBackendConfig::S3(s3) => {
for chunk in to_delete.chunks(S3_CHUNK_SIZE) {
match delete_files_s3(s3, chunk).await {
Ok(errors) => {
result.extend(errors);
}
Err(error) => {
result.push(error);
}
}
}
}
}
result
}
/// Deletes the given file from the fs media backend
///
/// Note: this does NOT remove the related metadata from the database
async fn delete_file_fs(
path: &str,
directory_structure: &DirectoryStructure,
sha256_hex: &str,
) -> Result<()> {
let mut path = services()
.globals .globals
.get_media_path(path, directory_structure, sha256_hex)?; .get_media_path(path, directory_structure, sha256_hex)?;
@ -719,12 +799,65 @@ async fn delete_file(sha256_hex: &str) -> Result<()> {
depth -= 1; depth -= 1;
} }
} }
}
}
Ok(()) Ok(())
} }
/// Deletes the given files from the s3 media backend
///
/// Note: this does NOT remove the related metadata from the database
async fn delete_files_s3(s3: &S3MediaBackend, files: &[String]) -> Result<Vec<Error>> {
let objects: Vec<ObjectIdentifier> = files
.iter()
.map(|v| {
services()
.globals
.split_media_path(s3.path.as_deref(), &s3.directory_structure, v)
.join("/")
})
.map(|v| ObjectIdentifier::new(v.to_string()))
.collect();
let mut request = s3
.bucket
.delete_objects(Some(&s3.credentials), objects.iter());
request.set_quiet(true);
let url = request.sign(s3.duration);
let (body, md5) = request.body_with_md5();
let client = services().globals.default_client();
let resp = client
.post(url)
.header("Content-MD5", md5)
.body(body)
.send()
.await?;
if !resp.status().is_success() {
error!(
"Failed to delete files from S3 bucket: {}",
resp.text().await?
);
return Err(Error::BadS3Response(
"Failed to delete media files from S3 bucket",
));
}
let parsed = DeleteObjectsResponse::parse(resp.text().await?).map_err(|e| {
warn!("Cannot parse S3 response: {}", e);
Error::BadS3Response("Cannot parse S3 response")
})?;
let result = parsed
.errors
.into_iter()
.map(|v| Error::CannotDeleteS3File(v.message))
.collect();
Ok(result)
}
/// Creates a content disposition with the given `filename`, using the `content_type` to determine whether /// Creates a content disposition with the given `filename`, using the `content_type` to determine whether
/// the disposition should be `inline` or `attachment` /// the disposition should be `inline` or `attachment`
fn content_disposition( fn content_disposition(

View file

@ -52,6 +52,10 @@ pub enum Error {
source: std::io::Error, source: std::io::Error,
}, },
#[error("{0}")] #[error("{0}")]
BadS3Response(&'static str),
#[error("Could not delete S3 file: {0}")]
CannotDeleteS3File(String), // This is only needed when an S3 deletion fails
#[error("{0}")]
BadServerResponse(&'static str), BadServerResponse(&'static str),
#[error("{0}")] #[error("{0}")]
BadConfig(&'static str), BadConfig(&'static str),