From 30d578a81aa359a21d2cf1f4d89f41bc23784307 Mon Sep 17 00:00:00 2001 From: AndSDev Date: Wed, 4 Jun 2025 16:15:42 +0300 Subject: [PATCH 01/16] refactor: `media.delete_file` is async --- src/service/admin/mod.rs | 25 +++++++++------- src/service/media/mod.rs | 63 +++++++++++++++++++++++++--------------- 2 files changed, 55 insertions(+), 33 deletions(-) diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index e7f169fb..afb3cef6 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -9,6 +9,7 @@ use std::{ use bytesize::ByteSize; use chrono::DateTime; use clap::{Args, Parser}; +use futures_util::future::Either; use image::GenericImageView; use regex::Regex; use ruma::{ @@ -955,6 +956,7 @@ impl Service { services() .media .purge_from_user(&user_id, purge_media.force_filehash, after) + .await .len() } else { 0 @@ -1023,6 +1025,7 @@ impl Service { failed_count += services() .media .purge_from_user(user_id, purge_media.force_filehash, after) + .await .len(); } } @@ -1250,9 +1253,9 @@ impl Service { RoomMessageEventContent::text_html(markdown_message, html_message).into() }, AdminCommand::PurgeMedia => media_from_body(body).map_or_else( - |message| message, - |media| { - let failed_count = services().media.purge(&media, true).len(); + |message| Either::Left(async move { message }), + |media| Either::Right(async move { + let failed_count = services().media.purge(&media, true).await.len(); if failed_count == 0 { RoomMessageEventContent::text_plain("Successfully purged media") @@ -1261,8 +1264,8 @@ impl Service { "Failed to delete {failed_count} media, check logs for more details" )) }.into() - }, - ), + }), + ).await, AdminCommand::PurgeMediaFromUsers { from_last, force_filehash, @@ -1282,6 +1285,7 @@ impl Service { failed_count += services() .media .purge_from_user(user_id, force_filehash, after) + .await .len(); } @@ -1314,6 +1318,7 @@ impl Service { let failed_count = services() .media .purge_from_server(&server_name, force_filehash, after) + .await .len(); if failed_count == 0 { @@ -1327,11 +1332,11 @@ impl Service { }.into() } AdminCommand::BlockMedia { and_purge, reason } => media_from_body(body).map_or_else( - |message| message, - |media| { + |message| Either::Left(async move { message }), + |media| Either::Right(async move { let failed_count = services().media.block(&media, reason).len(); let failed_purge_count = if and_purge { - services().media.purge(&media, true).len() + services().media.purge(&media, true).await.len() } else { 0 }; @@ -1348,8 +1353,8 @@ impl Service { "Failed to block {failed_count}, and purge {failed_purge_count} media, check logs for more details" )) }.into() - }, - ), + }), + ).await, AdminCommand::BlockMediaFromUsers { from_last, reason } => { let after = from_last.map(unix_secs_from_duration).transpose()?; diff --git a/src/service/media/mod.rs b/src/service/media/mod.rs index 16060dfb..81eb30df 100644 --- a/src/service/media/mod.rs +++ b/src/service/media/mod.rs @@ -2,6 +2,7 @@ mod data; use std::{fs, io::Cursor, sync::Arc}; pub use data::Data; +use futures_util::{stream, StreamExt}; use ruma::{ api::client::{error::ErrorKind, media::is_safe_inline_content_type}, http_headers::{ContentDisposition, ContentDispositionType}, @@ -142,7 +143,7 @@ impl Service { let count = files.iter().filter(|res| res.is_ok()).count(); info!("Found {count} media files to delete"); - purge_files(files); + purge_files(files).await; Ok(()) } @@ -159,11 +160,14 @@ impl Service { ) -> Result<()> { let (sha256_digest, sha256_hex) = generate_digests(file); - for error in self.clear_required_space( - &sha256_digest, - MediaType::new(servername, false), - size(file)?, - )? { + for error in self + .clear_required_space( + &sha256_digest, + MediaType::new(servername, false), + size(file)?, + ) + .await? + { error!( "Error deleting file to clear space when downloading/creating new media file: {error}" ) @@ -207,7 +211,8 @@ impl Service { &sha256_digest, MediaType::new(servername, true), size(file)?, - )?; + ) + .await?; self.db.create_thumbnail_metadata( sha256_digest, @@ -444,10 +449,14 @@ impl Service { /// purged have that sha256 hash. /// /// Returns errors for all the files that were failed to be deleted, if any. - pub fn purge(&self, media: &[(OwnedServerName, String)], force_filehash: bool) -> Vec { + pub async fn purge( + &self, + media: &[(OwnedServerName, String)], + force_filehash: bool, + ) -> Vec { let hashes = self.db.purge_and_get_hashes(media, force_filehash); - purge_files(hashes) + purge_files(hashes).await } /// Purges all (past a certain time in unix seconds, if specified) media @@ -462,7 +471,7 @@ impl Service { /// /// Note: it only currently works for local users, as we cannot determine who /// exactly uploaded the file when it comes to remove users. - pub fn purge_from_user( + pub async fn purge_from_user( &self, user_id: &UserId, force_filehash: bool, @@ -472,7 +481,7 @@ impl Service { .db .purge_and_get_hashes_from_user(user_id, force_filehash, after); - purge_files(hashes) + purge_files(hashes).await } /// Purges all (past a certain time in unix seconds, if specified) media @@ -484,7 +493,7 @@ impl Service { /// purged have that sha256 hash. /// /// Returns errors for all the files that were failed to be deleted, if any. - pub fn purge_from_server( + pub async fn purge_from_server( &self, server_name: &ServerName, force_filehash: bool, @@ -494,7 +503,7 @@ impl Service { .db .purge_and_get_hashes_from_server(server_name, force_filehash, after); - purge_files(hashes) + purge_files(hashes).await } /// Checks whether the media has been blocked by administrators, returning either @@ -558,7 +567,7 @@ impl Service { self.db.list_blocked() } - pub fn clear_required_space( + pub async fn clear_required_space( &self, sha256_digest: &[u8], media_type: MediaType, @@ -577,7 +586,7 @@ impl Service { info!("Deleting {} files to clear space for new media file", count); } - Ok(purge_files(files)) + Ok(purge_files(files).await) } /// Fetches the file from the configured media backend, as well as updating the "last accessed" @@ -643,21 +652,29 @@ pub async fn create_file(sha256_hex: &str, file: &[u8]) -> Result<()> { /// Returns a `Vec` of errors that occurred when attempting to delete the files /// /// Note: this does NOT remove the related metadata from the database -fn purge_files(hashes: Vec>) -> Vec { - hashes - .into_iter() - .map(|hash| match hash { - Ok(v) => delete_file(&v), - Err(e) => Err(e), +async fn purge_files(hashes: Vec>) -> Vec { + stream::iter(hashes) + .then(|hash| async move { + match hash { + Ok(v) => delete_file(&v).await, + Err(e) => Err(e), + } + }) + .filter_map(|r| async { + if let Err(e) = r { + Some(e) + } else { + None + } }) - .filter_map(|r| if let Err(e) = r { Some(e) } else { None }) .collect() + .await } /// Deletes the given file from the media backend /// /// Note: this does NOT remove the related metadata from the database -fn delete_file(sha256_hex: &str) -> Result<()> { +async fn delete_file(sha256_hex: &str) -> Result<()> { match &services().globals.config.media.backend { MediaBackendConfig::FileSystem { path, From 65414026090750a217a6ed3e604f4ea9a45df1f3 Mon Sep 17 00:00:00 2001 From: AndSDev Date: Fri, 6 Jun 2025 08:56:19 +0300 Subject: [PATCH 02/16] feat(service/media): add S3 support --- Cargo.lock | 40 +++++++++++++++++++++++++++ Cargo.toml | 2 ++ src/config/mod.rs | 46 +++++++++++++++++++++++++++++++ src/service/globals/mod.rs | 2 -- src/service/media/mod.rs | 56 +++++++++++++++++++++++++++++++++++++- 5 files changed, 143 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ed4c7638..8f9b1811 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -500,6 +500,7 @@ dependencies = [ "rusqlite", "rust-argon2", "rust-rocksdb", + "rusty-s3", "sd-notify", "serde", "serde_html_form", @@ -1733,6 +1734,16 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" +[[package]] +name = "md-5" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" +dependencies = [ + "cfg-if", + "digest", +] + [[package]] name = "memchr" version = "2.7.4" @@ -2245,6 +2256,16 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a993555f31e5a609f617c12db6250dedcac1b0a85076912c436e6fc9b2c8e6a3" +[[package]] +name = "quick-xml" +version = "0.37.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "331e97a1af0bf59823e6eadffe373d7b27f485be8748f71471c662c1f269b7fb" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "quote" version = "1.0.40" @@ -2804,6 +2825,25 @@ version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "955d28af4278de8121b7ebeb796b6a45735dc01436d898801014aced2773a3d6" +[[package]] +name = "rusty-s3" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f51a5a6b15f25d3e10c068039ee13befb6110fcb36c2b26317bcbdc23484d96" +dependencies = [ + "base64 0.22.1", + "hmac", + "md-5", + "percent-encoding", + "quick-xml", + "serde", + "serde_json", + "sha2", + "time", + "url", + "zeroize", +] + [[package]] name = "ryu" version = "1.0.20" diff --git a/Cargo.toml b/Cargo.toml index ebdabcd9..3634808f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -154,6 +154,8 @@ tikv-jemallocator = { version = "0.6", features = [ sd-notify = { version = "0.4", optional = true } +rusty-s3 = "0.7.0" + # Used for matrix spec type definitions and helpers [dependencies.ruma] features = [ diff --git a/src/config/mod.rs b/src/config/mod.rs index 95f1c76e..c16b89a6 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -242,6 +242,32 @@ impl From for Config { }), directory_structure, }, + IncompleteMediaBackendConfig::S3 { + endpoint, + bucket, + region, + key, + secret, + duration, + bucket_use_path, + } => { + let path_style = if bucket_use_path { + rusty_s3::UrlStyle::Path + } else { + rusty_s3::UrlStyle::VirtualHost + }; + + let bucket = rusty_s3::Bucket::new(endpoint, path_style, bucket, region) + .expect("Invalid S3 config"); + + let credentials = rusty_s3::Credentials::new(key, secret); + + MediaBackendConfig::S3 { + bucket: bucket, + credentials: credentials, + duration: Duration::from_secs(duration), + } + } }, retention: media.retention.into(), }; @@ -481,6 +507,17 @@ pub enum IncompleteMediaBackendConfig { #[serde(default)] directory_structure: DirectoryStructure, }, + S3 { + endpoint: Url, + bucket: String, + region: String, + key: String, + secret: String, + #[serde(default = "default_s3_duration")] + duration: u64, + #[serde(default = "false_fn")] + bucket_use_path: bool, + }, } impl Default for IncompleteMediaBackendConfig { @@ -498,6 +535,11 @@ pub enum MediaBackendConfig { path: String, directory_structure: DirectoryStructure, }, + S3 { + bucket: rusty_s3::Bucket, + credentials: rusty_s3::Credentials, + duration: Duration, + }, } #[derive(Debug, Clone, Deserialize)] @@ -727,3 +769,7 @@ fn default_openid_token_ttl() -> u64 { pub fn default_default_room_version() -> RoomVersionId { RoomVersionId::V10 } + +fn default_s3_duration() -> u64 { + 30 +} diff --git a/src/service/globals/mod.rs b/src/service/globals/mod.rs index 05c3eab1..0ab3a936 100644 --- a/src/service/globals/mod.rs +++ b/src/service/globals/mod.rs @@ -230,8 +230,6 @@ impl Service { shutdown: AtomicBool::new(false), }; - // Remove this exception once other media backends are added - #[allow(irrefutable_let_patterns)] if let MediaBackendConfig::FileSystem { path, .. } = &s.config.media.backend { fs::create_dir_all(path)?; } diff --git a/src/service/media/mod.rs b/src/service/media/mod.rs index 81eb30df..dce07405 100644 --- a/src/service/media/mod.rs +++ b/src/service/media/mod.rs @@ -8,8 +8,9 @@ use ruma::{ http_headers::{ContentDisposition, ContentDispositionType}, OwnedServerName, ServerName, UserId, }; +use rusty_s3::S3Action; use sha2::{digest::Output, Digest, Sha256}; -use tracing::{error, info}; +use tracing::{error, info, warn}; use crate::{ config::{DirectoryStructure, MediaBackendConfig}, @@ -615,6 +616,25 @@ impl Service { file } + MediaBackendConfig::S3 { + bucket, + credentials, + duration, + } => { + let url = bucket + .get_object(Some(credentials), &hex::encode(sha256_digest)) + .sign(*duration); + + let client = services().globals.default_client(); + let resp = client.get(url).send().await?; + + if !resp.status().is_success() { + warn!("S3 request error:\n{}", resp.text().await?); + return Err(Error::bad_database("Cannot read media file")); + } + + resp.bytes().await?.to_vec() + } }; if let Some((server_name, media_id)) = original_file_id { @@ -643,6 +663,23 @@ pub async fn create_file(sha256_hex: &str, file: &[u8]) -> Result<()> { let mut f = File::create(path).await?; f.write_all(file).await?; } + MediaBackendConfig::S3 { + bucket, + credentials, + duration, + } => { + let url = bucket + .put_object(Some(credentials), sha256_hex) + .sign(*duration); + + let client = services().globals.default_client(); + let resp = client.put(url).body(file.to_vec()).send().await?; + + if !resp.status().is_success() { + warn!("S3 request error:\n{}", resp.text().await?); + return Err(Error::bad_database("Cannot write media file")); + } + } } Ok(()) @@ -713,6 +750,23 @@ async fn delete_file(sha256_hex: &str) -> Result<()> { } } } + MediaBackendConfig::S3 { + bucket, + credentials, + duration, + } => { + let url = bucket + .delete_object(Some(credentials), sha256_hex) + .sign(*duration); + + let client = services().globals.default_client(); + let resp = client.delete(url).send().await?; + + if !resp.status().is_success() { + warn!("S3 request error:\n{}", resp.text().await?); + return Err(Error::bad_database("Cannot delete media file")); + } + } } Ok(()) From 0b5e93eeffebe896401dd640014dfa816bfe09be Mon Sep 17 00:00:00 2001 From: AndSDev Date: Fri, 6 Jun 2025 10:34:02 +0300 Subject: [PATCH 03/16] fix(service/media): create directory for media file only on new file creation Any access to a media file created a directory for it, which led to unnecessary operations. Especially if this media file had already been deleted (as well as its directory). --- src/service/globals/mod.rs | 3 --- src/service/media/mod.rs | 5 +++++ 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/service/globals/mod.rs b/src/service/globals/mod.rs index 0ab3a936..45def51a 100644 --- a/src/service/globals/mod.rs +++ b/src/service/globals/mod.rs @@ -499,9 +499,6 @@ impl Service { r.push(current_path); } - // Create all directories leading up to file - fs::create_dir_all(&r).inspect_err(|e| error!("Error creating leading directories for media with sha256 hash of {sha256_hex}: {e}"))?; - r.push(filename); } else { r.push(sha256_hex); diff --git a/src/service/media/mod.rs b/src/service/media/mod.rs index dce07405..8e0e199f 100644 --- a/src/service/media/mod.rs +++ b/src/service/media/mod.rs @@ -660,6 +660,11 @@ pub async fn create_file(sha256_hex: &str, file: &[u8]) -> Result<()> { .globals .get_media_path(path, directory_structure, sha256_hex)?; + // Create all directories leading up to file + if let Some(parent) = path.parent() { + fs::create_dir_all(&parent).inspect_err(|e| error!("Error creating leading directories for media with sha256 hash of {sha256_hex}: {e}"))?; + } + let mut f = File::create(path).await?; f.write_all(file).await?; } From e7822f0332c59e68f5272fd0e769ad8e71023990 Mon Sep 17 00:00:00 2001 From: AndSDev Date: Fri, 6 Jun 2025 10:46:58 +0300 Subject: [PATCH 04/16] refactor(service/media): make all fs operations async --- src/service/media/mod.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/service/media/mod.rs b/src/service/media/mod.rs index 8e0e199f..fd695277 100644 --- a/src/service/media/mod.rs +++ b/src/service/media/mod.rs @@ -1,5 +1,5 @@ mod data; -use std::{fs, io::Cursor, sync::Arc}; +use std::{io::Cursor, sync::Arc}; pub use data::Data; use futures_util::{stream, StreamExt}; @@ -26,7 +26,7 @@ pub struct DbFileMeta { } use tokio::{ - fs::File, + fs::{self, File}, io::{AsyncReadExt, AsyncWriteExt}, }; @@ -662,7 +662,7 @@ pub async fn create_file(sha256_hex: &str, file: &[u8]) -> Result<()> { // Create all directories leading up to file if let Some(parent) = path.parent() { - fs::create_dir_all(&parent).inspect_err(|e| error!("Error creating leading directories for media with sha256 hash of {sha256_hex}: {e}"))?; + fs::create_dir_all(&parent).await.inspect_err(|e| error!("Error creating leading directories for media with sha256 hash of {sha256_hex}: {e}"))?; } let mut f = File::create(path).await?; @@ -727,7 +727,7 @@ async fn delete_file(sha256_hex: &str) -> Result<()> { .globals .get_media_path(path, directory_structure, sha256_hex)?; - if let Err(e) = fs::remove_file(&path) { + if let Err(e) = fs::remove_file(&path).await { // Multiple files with the same filehash might be requseted to be deleted if e.kind() != std::io::ErrorKind::NotFound { error!("Error removing media from filesystem: {e}"); @@ -742,7 +742,7 @@ async fn delete_file(sha256_hex: &str) -> Result<()> { // Here at the start so that the first time, the file gets removed from the path path.pop(); - if let Err(e) = fs::remove_dir(&path) { + if let Err(e) = fs::remove_dir(&path).await { if e.kind() == std::io::ErrorKind::DirectoryNotEmpty { break; } else { From 2c47045659e1750affda4347d5906272e8965d07 Mon Sep 17 00:00:00 2001 From: AndSDev Date: Fri, 6 Jun 2025 12:59:27 +0300 Subject: [PATCH 05/16] feat(service/media): improve S3 error processing --- src/service/media/mod.rs | 22 +++++++++++++++------- src/utils/error.rs | 7 +++++++ 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/src/service/media/mod.rs b/src/service/media/mod.rs index fd695277..36817562 100644 --- a/src/service/media/mod.rs +++ b/src/service/media/mod.rs @@ -3,6 +3,7 @@ use std::{io::Cursor, sync::Arc}; pub use data::Data; use futures_util::{stream, StreamExt}; +use http::StatusCode; use ruma::{ api::client::{error::ErrorKind, media::is_safe_inline_content_type}, http_headers::{ContentDisposition, ContentDispositionType}, @@ -621,16 +622,23 @@ impl Service { credentials, duration, } => { + let sha256_hex = hex::encode(sha256_digest); let url = bucket - .get_object(Some(credentials), &hex::encode(sha256_digest)) + .get_object(Some(credentials), &sha256_hex) .sign(*duration); let client = services().globals.default_client(); let resp = client.get(url).send().await?; + if resp.status() == StatusCode::NOT_FOUND { + return Err(Error::BadRequest( + ErrorKind::NotFound, + "File does not exist", + )); + } if !resp.status().is_success() { - warn!("S3 request error:\n{}", resp.text().await?); - return Err(Error::bad_database("Cannot read media file")); + warn!("S3 request error ({}):\n{}", sha256_hex, resp.text().await?); + return Err(Error::bad_s3_response("Cannot read media file")); } resp.bytes().await?.to_vec() @@ -681,8 +689,8 @@ pub async fn create_file(sha256_hex: &str, file: &[u8]) -> Result<()> { let resp = client.put(url).body(file.to_vec()).send().await?; if !resp.status().is_success() { - warn!("S3 request error:\n{}", resp.text().await?); - return Err(Error::bad_database("Cannot write media file")); + warn!("S3 request error ({}):\n{}", sha256_hex, resp.text().await?); + return Err(Error::bad_s3_response("Cannot write media file")); } } } @@ -768,8 +776,8 @@ async fn delete_file(sha256_hex: &str) -> Result<()> { let resp = client.delete(url).send().await?; if !resp.status().is_success() { - warn!("S3 request error:\n{}", resp.text().await?); - return Err(Error::bad_database("Cannot delete media file")); + warn!("S3 request error ({}):\n{}", sha256_hex, resp.text().await?); + return Err(Error::bad_s3_response("Cannot delete media file")); } } } diff --git a/src/utils/error.rs b/src/utils/error.rs index 1b1a26db..1c1302e7 100644 --- a/src/utils/error.rs +++ b/src/utils/error.rs @@ -52,6 +52,8 @@ pub enum Error { source: std::io::Error, }, #[error("{0}")] + BadS3Response(&'static str), + #[error("{0}")] BadServerResponse(&'static str), #[error("{0}")] BadConfig(&'static str), @@ -91,6 +93,11 @@ impl Error { error!("BadConfig: {}", message); Self::BadConfig(message) } + + pub fn bad_s3_response(message: &'static str) -> Self { + info!("BadS3Response: {}", message); + Self::BadS3Response(message) + } } impl Error { From 59e4f4d8f125cacf713d80752def422cbb0a179e Mon Sep 17 00:00:00 2001 From: AndSDev Date: Fri, 6 Jun 2025 17:19:18 +0300 Subject: [PATCH 06/16] feat(service/media): add deep hashed directory structure for S3 --- src/config/mod.rs | 13 +++++++++-- src/service/globals/mod.rs | 45 ++++++++++++++++++++++++++------------ src/service/media/mod.rs | 25 ++++++++++++++++++--- 3 files changed, 64 insertions(+), 19 deletions(-) diff --git a/src/config/mod.rs b/src/config/mod.rs index c16b89a6..57c50b0a 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -250,6 +250,8 @@ impl From for Config { secret, duration, bucket_use_path, + path, + directory_structure, } => { let path_style = if bucket_use_path { rusty_s3::UrlStyle::Path @@ -263,9 +265,11 @@ impl From for Config { let credentials = rusty_s3::Credentials::new(key, secret); MediaBackendConfig::S3 { - bucket: bucket, - credentials: credentials, + bucket, + credentials, duration: Duration::from_secs(duration), + path, + directory_structure, } } }, @@ -517,6 +521,9 @@ pub enum IncompleteMediaBackendConfig { duration: u64, #[serde(default = "false_fn")] bucket_use_path: bool, + path: Option, + #[serde(default)] + directory_structure: DirectoryStructure, }, } @@ -539,6 +546,8 @@ pub enum MediaBackendConfig { bucket: rusty_s3::Bucket, credentials: rusty_s3::Credentials, duration: Duration, + path: Option, + directory_structure: DirectoryStructure, }, } diff --git a/src/service/globals/mod.rs b/src/service/globals/mod.rs index 45def51a..8ed92cae 100644 --- a/src/service/globals/mod.rs +++ b/src/service/globals/mod.rs @@ -488,23 +488,40 @@ impl Service { directory_structure: &DirectoryStructure, sha256_hex: &str, ) -> Result { - let mut r = PathBuf::new(); - r.push(media_directory); + Ok(PathBuf::from_iter(self.split_media_path( + Some(media_directory), + directory_structure, + sha256_hex, + )?)) + } - if let DirectoryStructure::Deep { length, depth } = directory_structure { - let mut filename = sha256_hex; - for _ in 0..depth.get() { - let (current_path, next) = filename.split_at(length.get().into()); - filename = next; - r.push(current_path); + pub fn split_media_path<'a>( + &self, + media_directory: Option<&'a str>, + directory_structure: &DirectoryStructure, + sha256_hex: &'a str, + ) -> Result> { + match directory_structure { + DirectoryStructure::Flat => match media_directory { + Some(path) => Ok(vec![path, sha256_hex]), + None => Ok(vec![sha256_hex]), + }, + DirectoryStructure::Deep { length, depth } => { + let mut r: Vec<&'a str> = Vec::with_capacity((depth.get() + 2).into()); + if let Some(path) = media_directory { + r.push(path); + } + let mut filename = sha256_hex; + for _ in 0..depth.get() { + let (current_path, next) = filename.split_at(length.get().into()); + filename = next; + r.push(current_path); + } + r.push(filename); + + Ok(r) } - - r.push(filename); - } else { - r.push(sha256_hex); } - - Ok(r) } pub fn shutdown(&self) { diff --git a/src/service/media/mod.rs b/src/service/media/mod.rs index 36817562..71a7dd9f 100644 --- a/src/service/media/mod.rs +++ b/src/service/media/mod.rs @@ -621,10 +621,16 @@ impl Service { bucket, credentials, duration, + path, + directory_structure, } => { let sha256_hex = hex::encode(sha256_digest); + let file_name = services() + .globals + .split_media_path(path.as_deref(), directory_structure, &sha256_hex)? + .join("/"); let url = bucket - .get_object(Some(credentials), &sha256_hex) + .get_object(Some(credentials), &file_name) .sign(*duration); let client = services().globals.default_client(); @@ -680,9 +686,16 @@ pub async fn create_file(sha256_hex: &str, file: &[u8]) -> Result<()> { bucket, credentials, duration, + path, + directory_structure, } => { + let file_name = services() + .globals + .split_media_path(path.as_deref(), directory_structure, sha256_hex)? + .join("/"); + let url = bucket - .put_object(Some(credentials), sha256_hex) + .put_object(Some(credentials), &file_name) .sign(*duration); let client = services().globals.default_client(); @@ -767,9 +780,15 @@ async fn delete_file(sha256_hex: &str) -> Result<()> { bucket, credentials, duration, + path, + directory_structure, } => { + let file_name = services() + .globals + .split_media_path(path.as_deref(), directory_structure, sha256_hex)? + .join("/"); let url = bucket - .delete_object(Some(credentials), sha256_hex) + .delete_object(Some(credentials), &file_name) .sign(*duration); let client = services().globals.default_client(); From 521f46d764468f731a6c87c8113fcd90e890fc4b Mon Sep 17 00:00:00 2001 From: AndSDev Date: Mon, 9 Jun 2025 10:50:56 +0300 Subject: [PATCH 07/16] docs: add S3 connection details --- docs/configuration.md | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/docs/configuration.md b/docs/configuration.md index 145c2c48..114d685e 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -99,6 +99,33 @@ depth = 4 length = 2 ``` +#### S3 backend +The S3 backend has the following fields: +- `endpoint`: The URL of the S3 endpoint to connect to +- `bucket`: The name of the S3 bucket to use for storage. This bucket must already exist and your credentials must have access to it +- `region`: The region where your S3 bucket is located +- `key`: Your Access Key ID +- `secret`: Your Secret Access Key +- `duration`: The time (in seconds) that signed requests to the S3 bucket will be valid (default: `30`) +- `bucket_use_path`: Controls the structure of the path to files in S3. If `true`, the bucket name will be included as part of the file path. If `false` (or omitted), it will be used as the bucket name in the domain name +- `path`: The base directory where all the media files will be stored (defaults to + root of the bucket) +- `directory_structure`: This is a table, used to configure how files are to be distributed within + the media directory (see [Filesystem backend](#filesystem-backend) for details) + +##### Example: +```toml +[global.media] +backend = "s3" +endpoint = "http://minio:9000", +bucket = "test", +region = "minio", +key = "", +secret = "", +duration = "15", +bucket_use_path = "true", +``` + #### Retention policies Over time, the amount of media will keep growing, even if they were only accessed once. Retention policies allow for media files to automatically be deleted if they meet certain crietia, From 263b424de375c0e7980eefd1b054f2cf61ee33a9 Mon Sep 17 00:00:00 2001 From: AndSDev Date: Mon, 9 Jun 2025 11:00:09 +0300 Subject: [PATCH 08/16] refactor: reduce the size of the S3 configuration struct --- src/config/mod.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/config/mod.rs b/src/config/mod.rs index 57c50b0a..7d4fe69a 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -265,8 +265,8 @@ impl From for Config { let credentials = rusty_s3::Credentials::new(key, secret); MediaBackendConfig::S3 { - bucket, - credentials, + bucket: Box::new(bucket), + credentials: Box::new(credentials), duration: Duration::from_secs(duration), path, directory_structure, @@ -543,8 +543,8 @@ pub enum MediaBackendConfig { directory_structure: DirectoryStructure, }, S3 { - bucket: rusty_s3::Bucket, - credentials: rusty_s3::Credentials, + bucket: Box, + credentials: Box, duration: Duration, path: Option, directory_structure: DirectoryStructure, From a30e0537e44a22b2ffa7f4580f46a779d64f16e5 Mon Sep 17 00:00:00 2001 From: AndSDev Date: Tue, 10 Jun 2025 06:56:06 +0000 Subject: [PATCH 09/16] refactor: group S3 config fields together --- docs/configuration.md | 4 ++-- src/config/mod.rs | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 114d685e..39ca6f79 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -104,12 +104,12 @@ The S3 backend has the following fields: - `endpoint`: The URL of the S3 endpoint to connect to - `bucket`: The name of the S3 bucket to use for storage. This bucket must already exist and your credentials must have access to it - `region`: The region where your S3 bucket is located +- `path`: The base directory where all the media files will be stored (defaults to + root of the bucket) - `key`: Your Access Key ID - `secret`: Your Secret Access Key - `duration`: The time (in seconds) that signed requests to the S3 bucket will be valid (default: `30`) - `bucket_use_path`: Controls the structure of the path to files in S3. If `true`, the bucket name will be included as part of the file path. If `false` (or omitted), it will be used as the bucket name in the domain name -- `path`: The base directory where all the media files will be stored (defaults to - root of the bucket) - `directory_structure`: This is a table, used to configure how files are to be distributed within the media directory (see [Filesystem backend](#filesystem-backend) for details) diff --git a/src/config/mod.rs b/src/config/mod.rs index 7d4fe69a..7e658647 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -515,13 +515,15 @@ pub enum IncompleteMediaBackendConfig { endpoint: Url, bucket: String, region: String, + path: Option, + key: String, secret: String, + #[serde(default = "default_s3_duration")] duration: u64, #[serde(default = "false_fn")] bucket_use_path: bool, - path: Option, #[serde(default)] directory_structure: DirectoryStructure, }, From efe774b3c196365e1978db5addefa2d30ce9426b Mon Sep 17 00:00:00 2001 From: AndSDev Date: Tue, 10 Jun 2025 07:55:59 +0000 Subject: [PATCH 10/16] refactor: improve error processing for S3 --- src/service/media/mod.rs | 12 ++++++------ src/utils/error.rs | 5 ----- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/src/service/media/mod.rs b/src/service/media/mod.rs index 71a7dd9f..17b232ec 100644 --- a/src/service/media/mod.rs +++ b/src/service/media/mod.rs @@ -643,8 +643,8 @@ impl Service { )); } if !resp.status().is_success() { - warn!("S3 request error ({}):\n{}", sha256_hex, resp.text().await?); - return Err(Error::bad_s3_response("Cannot read media file")); + error!("Failed to get file with sha256 hash of \"{}\" from S3 bucket: {}", sha256_hex, resp.text().await?); + return Err(Error::BadS3Response("Failed to get media file from S3 bucket")); } resp.bytes().await?.to_vec() @@ -702,8 +702,8 @@ pub async fn create_file(sha256_hex: &str, file: &[u8]) -> Result<()> { let resp = client.put(url).body(file.to_vec()).send().await?; if !resp.status().is_success() { - warn!("S3 request error ({}):\n{}", sha256_hex, resp.text().await?); - return Err(Error::bad_s3_response("Cannot write media file")); + error!("Failed to upload file with sha256 hash of \"{}\" to S3 bucket: {}", sha256_hex, resp.text().await?); + return Err(Error::BadS3Response("Failed to upload media file to S3 bucket")); } } } @@ -795,8 +795,8 @@ async fn delete_file(sha256_hex: &str) -> Result<()> { let resp = client.delete(url).send().await?; if !resp.status().is_success() { - warn!("S3 request error ({}):\n{}", sha256_hex, resp.text().await?); - return Err(Error::bad_s3_response("Cannot delete media file")); + error!("Failed to delete file with sha256 hash of \"{}\" from S3 bucket: {}", sha256_hex, resp.text().await?); + return Err(Error::BadS3Response("Failed to delete media file from S3 bucket")); } } } diff --git a/src/utils/error.rs b/src/utils/error.rs index 1c1302e7..c9e83843 100644 --- a/src/utils/error.rs +++ b/src/utils/error.rs @@ -93,11 +93,6 @@ impl Error { error!("BadConfig: {}", message); Self::BadConfig(message) } - - pub fn bad_s3_response(message: &'static str) -> Self { - info!("BadS3Response: {}", message); - Self::BadS3Response(message) - } } impl Error { From b2a4e398d804b51448ed5b8e5f8ce565b55797c9 Mon Sep 17 00:00:00 2001 From: AndSDev Date: Tue, 10 Jun 2025 11:01:07 +0300 Subject: [PATCH 11/16] style: fix code fmt --- src/config/mod.rs | 2 +- src/service/media/mod.rs | 32 +++++++++++++++++++++++++------- 2 files changed, 26 insertions(+), 8 deletions(-) diff --git a/src/config/mod.rs b/src/config/mod.rs index 7e658647..03052879 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -246,11 +246,11 @@ impl From for Config { endpoint, bucket, region, + path, key, secret, duration, bucket_use_path, - path, directory_structure, } => { let path_style = if bucket_use_path { diff --git a/src/service/media/mod.rs b/src/service/media/mod.rs index 17b232ec..3cececf6 100644 --- a/src/service/media/mod.rs +++ b/src/service/media/mod.rs @@ -11,7 +11,7 @@ use ruma::{ }; use rusty_s3::S3Action; use sha2::{digest::Output, Digest, Sha256}; -use tracing::{error, info, warn}; +use tracing::{error, info}; use crate::{ config::{DirectoryStructure, MediaBackendConfig}, @@ -643,8 +643,14 @@ impl Service { )); } if !resp.status().is_success() { - error!("Failed to get file with sha256 hash of \"{}\" from S3 bucket: {}", sha256_hex, resp.text().await?); - return Err(Error::BadS3Response("Failed to get media file from S3 bucket")); + error!( + "Failed to get file with sha256 hash of \"{}\" from S3 bucket: {}", + sha256_hex, + resp.text().await? + ); + return Err(Error::BadS3Response( + "Failed to get media file from S3 bucket", + )); } resp.bytes().await?.to_vec() @@ -702,8 +708,14 @@ pub async fn create_file(sha256_hex: &str, file: &[u8]) -> Result<()> { let resp = client.put(url).body(file.to_vec()).send().await?; if !resp.status().is_success() { - error!("Failed to upload file with sha256 hash of \"{}\" to S3 bucket: {}", sha256_hex, resp.text().await?); - return Err(Error::BadS3Response("Failed to upload media file to S3 bucket")); + error!( + "Failed to upload file with sha256 hash of \"{}\" to S3 bucket: {}", + sha256_hex, + resp.text().await? + ); + return Err(Error::BadS3Response( + "Failed to upload media file to S3 bucket", + )); } } } @@ -795,8 +807,14 @@ async fn delete_file(sha256_hex: &str) -> Result<()> { let resp = client.delete(url).send().await?; if !resp.status().is_success() { - error!("Failed to delete file with sha256 hash of \"{}\" from S3 bucket: {}", sha256_hex, resp.text().await?); - return Err(Error::BadS3Response("Failed to delete media file from S3 bucket")); + error!( + "Failed to delete file with sha256 hash of \"{}\" from S3 bucket: {}", + sha256_hex, + resp.text().await? + ); + return Err(Error::BadS3Response( + "Failed to delete media file from S3 bucket", + )); } } } From 4982aa1098744582e5b44e869cbae50b74b01b1e Mon Sep 17 00:00:00 2001 From: AndSDev Date: Tue, 10 Jun 2025 11:25:39 +0300 Subject: [PATCH 12/16] style: organize S3 deps in cargo.toml --- Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3634808f..0900c6f6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -90,6 +90,8 @@ image = { version = "0.25", default-features = false, features = [ # Used for creating media filenames hex = "0.4" sha2 = "0.10" +# Used for S3 media backend +rusty-s3 = "0.7.0" # Used for parsing media retention policies from the config bytesize = { version = "2", features = ["serde"] } humantime-serde = "1" @@ -154,8 +156,6 @@ tikv-jemallocator = { version = "0.6", features = [ sd-notify = { version = "0.4", optional = true } -rusty-s3 = "0.7.0" - # Used for matrix spec type definitions and helpers [dependencies.ruma] features = [ From 39c09f865a00568bbdefc74a93ffc284187fb8a9 Mon Sep 17 00:00:00 2001 From: AndSDev Date: Tue, 10 Jun 2025 15:13:58 +0300 Subject: [PATCH 13/16] refactor: Create the directory for media only if the `directory_structure` is `Deep` --- src/service/media/mod.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/service/media/mod.rs b/src/service/media/mod.rs index 3cececf6..502996e9 100644 --- a/src/service/media/mod.rs +++ b/src/service/media/mod.rs @@ -681,8 +681,10 @@ pub async fn create_file(sha256_hex: &str, file: &[u8]) -> Result<()> { .get_media_path(path, directory_structure, sha256_hex)?; // Create all directories leading up to file - if let Some(parent) = path.parent() { - fs::create_dir_all(&parent).await.inspect_err(|e| error!("Error creating leading directories for media with sha256 hash of {sha256_hex}: {e}"))?; + if let DirectoryStructure::Deep { .. } = directory_structure { + if let Some(parent) = path.parent() { + fs::create_dir_all(&parent).await.inspect_err(|e| error!("Error creating leading directories for media with sha256 hash of {sha256_hex}: {e}"))?; + } } let mut f = File::create(path).await?; From d079df63c34cab16849cd977db948050bfc888e8 Mon Sep 17 00:00:00 2001 From: AndSDev Date: Tue, 10 Jun 2025 17:34:42 +0300 Subject: [PATCH 14/16] refactor(service/admin): improve readability for command processing --- src/service/admin/mod.rs | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index afb3cef6..60269d90 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -9,7 +9,6 @@ use std::{ use bytesize::ByteSize; use chrono::DateTime; use clap::{Args, Parser}; -use futures_util::future::Either; use image::GenericImageView; use regex::Regex; use ruma::{ @@ -1252,9 +1251,8 @@ impl Service { RoomMessageEventContent::text_html(markdown_message, html_message).into() }, - AdminCommand::PurgeMedia => media_from_body(body).map_or_else( - |message| Either::Left(async move { message }), - |media| Either::Right(async move { + AdminCommand::PurgeMedia => match media_from_body(body) { + Ok(media) => { let failed_count = services().media.purge(&media, true).await.len(); if failed_count == 0 { @@ -1264,8 +1262,9 @@ impl Service { "Failed to delete {failed_count} media, check logs for more details" )) }.into() - }), - ).await, + }, + Err(message) => message, + } AdminCommand::PurgeMediaFromUsers { from_last, force_filehash, @@ -1331,9 +1330,8 @@ impl Service { )) }.into() } - AdminCommand::BlockMedia { and_purge, reason } => media_from_body(body).map_or_else( - |message| Either::Left(async move { message }), - |media| Either::Right(async move { + AdminCommand::BlockMedia { and_purge, reason } => match media_from_body(body) { + Ok(media) =>{ let failed_count = services().media.block(&media, reason).len(); let failed_purge_count = if and_purge { services().media.purge(&media, true).await.len() @@ -1353,8 +1351,9 @@ impl Service { "Failed to block {failed_count}, and purge {failed_purge_count} media, check logs for more details" )) }.into() - }), - ).await, + }, + Err(message) => message, + } AdminCommand::BlockMediaFromUsers { from_last, reason } => { let after = from_last.map(unix_secs_from_duration).transpose()?; From 9da99bb3b5997f7ab70b0cb1230307ffcdb20f14 Mon Sep 17 00:00:00 2001 From: AndSDev Date: Tue, 10 Jun 2025 17:41:05 +0300 Subject: [PATCH 15/16] style: reformat code --- src/service/admin/mod.rs | 189 ++++++++++++++++++++++++--------------- 1 file changed, 116 insertions(+), 73 deletions(-) diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index 60269d90..72ea506a 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -589,7 +589,8 @@ impl Service { RoomMessageEventContent::text_plain( "Expected code block in command body. Add --help for details.", ) - }.into() + } + .into() } AdminCommand::UnregisterAppservice { appservice_identifier, @@ -602,7 +603,8 @@ impl Service { Err(e) => RoomMessageEventContent::text_plain(format!( "Failed to unregister appservice: {e}" )), - }.into(), + } + .into(), AdminCommand::ListAppservices => { let appservices = services().appservice.iter_ids().await; let output = format!( @@ -640,7 +642,8 @@ impl Service { RoomMessageEventContent::text_plain(&msg) } Err(e) => RoomMessageEventContent::text_plain(e.to_string()), - }.into(), + } + .into(), AdminCommand::IncomingFederation => { let map = services().globals.roomid_federationhandletime.read().await; let mut msg: String = format!("Handling {} incoming pdus:\n", map.len()); @@ -681,7 +684,8 @@ impl Service { )) } else { RoomMessageEventContent::text_plain("Event not found.") - }.into() + } + .into() } AdminCommand::ParsePdu => { if body.len() > 2 && body[0].trim() == "```" && body.last().unwrap().trim() == "```" @@ -715,7 +719,8 @@ impl Service { } } else { RoomMessageEventContent::text_plain("Expected code block in command body.") - }.into() + } + .into() } AdminCommand::GetPdu { event_id } => { let mut outlier = false; @@ -753,7 +758,8 @@ impl Service { ) } None => RoomMessageEventContent::text_plain("PDU not found."), - }.into() + } + .into() } AdminCommand::MemoryUsage => { let response1 = services().memory_usage().await; @@ -761,7 +767,8 @@ impl Service { RoomMessageEventContent::text_plain(format!( "Services:\n{response1}\n\nDatabase:\n{response2}" - )).into() + )) + .into() } AdminCommand::ClearDatabaseCaches { amount } => { services().globals.db.clear_caches(amount); @@ -786,7 +793,8 @@ impl Service { Err(e) => { return Ok(RoomMessageEventContent::text_plain(format!( "The supplied username is not a valid username: {e}" - )).into()) + )) + .into()) } }; @@ -794,7 +802,8 @@ impl Service { if user_id.server_name() != services().globals.server_name() { return Ok(RoomMessageEventContent::text_plain( "The specified user is not from this server!", - ).into()); + ) + .into()); }; // Check if the specified user is valid @@ -808,7 +817,8 @@ impl Service { { return Ok(RoomMessageEventContent::text_plain( "The specified user does not exist!", - ).into()); + ) + .into()); } let new_password = utils::random_string(AUTO_GEN_PASSWORD_LENGTH); @@ -823,7 +833,8 @@ impl Service { Err(e) => RoomMessageEventContent::text_plain(format!( "Couldn't reset the password for user {user_id}: {e}" )), - }.into() + } + .into() } AdminCommand::CreateUser { username, password } => { let password = @@ -837,7 +848,8 @@ impl Service { Err(e) => { return Ok(RoomMessageEventContent::text_plain(format!( "The supplied username is not a valid username: {e}" - )).into()) + )) + .into()) } }; @@ -845,18 +857,21 @@ impl Service { if user_id.server_name() != services().globals.server_name() { return Ok(RoomMessageEventContent::text_plain( "The specified user is not from this server!", - ).into()); + ) + .into()); }; if user_id.is_historical() { return Ok(RoomMessageEventContent::text_plain(format!( "Userid {user_id} is not allowed due to historical" - )).into()); + )) + .into()); } if services().users.exists(&user_id)? { return Ok(RoomMessageEventContent::text_plain(format!( "Userid {user_id} already exists" - )).into()); + )) + .into()); } // Create user services().users.create(&user_id, Some(password.as_str()))?; @@ -893,26 +908,26 @@ impl Service { // Inhibit login does not work for guests RoomMessageEventContent::text_plain(format!( "Created user with user_id: {user_id} and password: {password}" - )).into() + )) + .into() } - AdminCommand::AllowRegistration { status } => { - if let Some(status) = status { - services().globals.set_registration(status).await; - RoomMessageEventContent::text_plain(if status { - "Registration is now enabled" - } else { - "Registration is now disabled" - }) + AdminCommand::AllowRegistration { status } => if let Some(status) = status { + services().globals.set_registration(status).await; + RoomMessageEventContent::text_plain(if status { + "Registration is now enabled" } else { - RoomMessageEventContent::text_plain( - if services().globals.allow_registration().await { - "Registration is currently enabled" - } else { - "Registration is currently disabled" - }, - ) - }.into() + "Registration is now disabled" + }) + } else { + RoomMessageEventContent::text_plain( + if services().globals.allow_registration().await { + "Registration is currently enabled" + } else { + "Registration is currently disabled" + }, + ) } + .into(), AdminCommand::DisableRoom { room_id } => { services().rooms.metadata.disable_room(&room_id, true)?; RoomMessageEventContent::text_plain("Room disabled.").into() @@ -1031,7 +1046,10 @@ impl Service { let mut message = format!("Deactivated {deactivation_count} accounts."); if !admins.is_empty() { - message.push_str(&format!("\nSkipped admin accounts: {:?}. Use --force to deactivate admin accounts",admins.join(", "))); + message.push_str(&format!( + "\nSkipped admin accounts: {:?}. Use --force to deactivate admin accounts", + admins.join(", ") + )); } if failed_count != 0 { message.push_str(&format!( @@ -1044,14 +1062,19 @@ impl Service { RoomMessageEventContent::text_plain( "Expected code block in command body. Add --help for details.", ) - }.into() + } + .into() } AdminCommand::QueryMedia { mxc } => { let Ok((server_name, media_id)) = mxc.parts() else { return Ok(RoomMessageEventContent::text_plain("Invalid media MXC").into()); }; - let MediaQuery{ is_blocked, source_file, thumbnails } = services().media.query(server_name, media_id)?; + let MediaQuery { + is_blocked, + source_file, + thumbnails, + } = services().media.query(server_name, media_id)?; let mut message = format!("Is blocked Media ID: {is_blocked}"); if let Some(MediaQueryFileInfo { @@ -1062,14 +1085,16 @@ impl Service { unauthenticated_access_permitted, is_blocked_via_filehash, file_info: time_info, - }) = source_file { + }) = source_file + { message.push_str("\n\nInformation on full (non-thumbnail) file:\n"); if let Some(FileInfo { creation, last_access, size, - }) = time_info { + }) = time_info + { message.push_str(&format!("\nIs stored: true\nCreated at: {}\nLast accessed at: {}\nSize of file: {}", DateTime::from_timestamp(creation.try_into().unwrap_or(i64::MAX),0).expect("Timestamp is within range"), DateTime::from_timestamp(last_access.try_into().unwrap_or(i64::MAX),0).expect("Timestamp is within range"), @@ -1098,7 +1123,7 @@ impl Service { message.push_str("\n\nInformation on thumbnails of media:"); } - for MediaQueryThumbInfo{ + for MediaQueryThumbInfo { width, height, sha256_hex, @@ -1107,13 +1132,15 @@ impl Service { unauthenticated_access_permitted, is_blocked_via_filehash, file_info: time_info, - } in thumbnails { + } in thumbnails + { message.push_str(&format!("\n\nDimensions: {width}x{height}")); if let Some(FileInfo { creation, last_access, size, - }) = time_info { + }) = time_info + { message.push_str(&format!("\nIs stored: true\nCreated at: {}\nLast accessed at: {}\nSize of file: {}", DateTime::from_timestamp(creation.try_into().unwrap_or(i64::MAX),0).expect("Timestamp is within range"), DateTime::from_timestamp(last_access.try_into().unwrap_or(i64::MAX),0).expect("Timestamp is within range"), @@ -1147,7 +1174,8 @@ impl Service { file, content_type, content_disposition, - } = client_server::media::get_content(server_name, media_id.to_owned(), true, true).await?; + } = client_server::media::get_content(server_name, media_id.to_owned(), true, true) + .await?; if let Ok(image) = image::load_from_memory(&file) { let filename = content_disposition.and_then(|cd| cd.filename); @@ -1187,10 +1215,7 @@ impl Service { } } AdminCommand::ListMedia { - user_server_filter: ListMediaArgs { - user, - server, - }, + user_server_filter: ListMediaArgs { user, server }, include_thumbnails, content_type, uploaded_before, @@ -1203,7 +1228,7 @@ impl Service { r#""#, ); - for MediaListItem{ + for MediaListItem { server_name, media_id, uploader_localpart, @@ -1213,9 +1238,8 @@ impl Service { size, creation, } in services().media.list( - user - .map(ServerNameOrUserId::UserId) - .or_else(|| server.map(ServerNameOrUserId::ServerName)), + user.map(ServerNameOrUserId::UserId) + .or_else(|| server.map(ServerNameOrUserId::ServerName)), include_thumbnails, content_type.as_deref(), uploaded_before @@ -1229,15 +1253,20 @@ impl Service { .transpose() .map_err(|_| Error::AdminCommand("Timestamp must be after unix epoch"))? .as_ref() - .map(Duration::as_secs) + .map(Duration::as_secs), )? { - - let user_id = uploader_localpart.map(|localpart| format!("@{localpart}:{server_name}")).unwrap_or_default(); + let user_id = uploader_localpart + .map(|localpart| format!("@{localpart}:{server_name}")) + .unwrap_or_default(); let content_type = content_type.unwrap_or_default(); let filename = filename.unwrap_or_default(); - let dimensions = dimensions.map(|(w, h)| format!("{w}x{h}")).unwrap_or_default(); + let dimensions = dimensions + .map(|(w, h)| format!("{w}x{h}")) + .unwrap_or_default(); let size = ByteSize::b(size).display().si(); - let creation = DateTime::from_timestamp(creation.try_into().unwrap_or(i64::MAX),0).expect("Timestamp is within range"); + let creation = + DateTime::from_timestamp(creation.try_into().unwrap_or(i64::MAX), 0) + .expect("Timestamp is within range"); markdown_message .push_str(&format!("\n| mxc://{server_name}/{media_id} | {dimensions} | {creation} | {user_id} | {content_type} | {filename} | {size} |")); @@ -1250,7 +1279,7 @@ impl Service { html_message.push_str("
MXC URIDimensions (if thumbnail)Created/Downloaded atUploaderContent-TypeFilenameSize
"); RoomMessageEventContent::text_html(markdown_message, html_message).into() - }, + } AdminCommand::PurgeMedia => match media_from_body(body) { Ok(media) => { let failed_count = services().media.purge(&media, true).await.len(); @@ -1261,10 +1290,11 @@ impl Service { RoomMessageEventContent::text_plain(format!( "Failed to delete {failed_count} media, check logs for more details" )) - }.into() - }, + } + .into() + } Err(message) => message, - } + }, AdminCommand::PurgeMediaFromUsers { from_last, force_filehash, @@ -1299,7 +1329,8 @@ impl Service { RoomMessageEventContent::text_plain( "Expected code block in command body. Add --help for details.", ) - }.into() + } + .into() } AdminCommand::PurgeMediaFromServer { server_id: server_name, @@ -1328,10 +1359,11 @@ impl Service { RoomMessageEventContent::text_plain(format!( "Failed to purge {failed_count} media, check logs for more details" )) - }.into() + } + .into() } AdminCommand::BlockMedia { and_purge, reason } => match media_from_body(body) { - Ok(media) =>{ + Ok(media) => { let failed_count = services().media.block(&media, reason).len(); let failed_purge_count = if and_purge { services().media.purge(&media, true).await.len() @@ -1351,9 +1383,9 @@ impl Service { "Failed to block {failed_count}, and purge {failed_purge_count} media, check logs for more details" )) }.into() - }, + } Err(message) => message, - } + }, AdminCommand::BlockMediaFromUsers { from_last, reason } => { let after = from_last.map(unix_secs_from_duration).transpose()?; @@ -1389,7 +1421,8 @@ impl Service { RoomMessageEventContent::text_plain( "Expected code block in command body. Add --help for details.", ) - }.into() + } + .into() } AdminCommand::ListBlockedMedia => { let mut markdown_message = String::from( @@ -1406,7 +1439,8 @@ impl Service { unix_secs, reason, sha256_hex, - }) = media else { + }) = media + else { continue; }; @@ -1419,8 +1453,9 @@ impl Service { .flatten() .expect("Time is valid"); - markdown_message - .push_str(&format!("\n| {sha256_hex} | mxc://{server_name}/{media_id} | {time} | {reason} |")); + markdown_message.push_str(&format!( + "\n| {sha256_hex} | mxc://{server_name}/{media_id} | {time} | {reason} |" + )); html_message.push_str(&format!( "{sha256_hex}mxc://{server_name}/{media_id}{time}{reason}", @@ -1442,7 +1477,8 @@ impl Service { RoomMessageEventContent::text_plain(format!( "Failed to unblock {failed_count} media, check logs for more details" )) - }.into() + } + .into() }, ), AdminCommand::SignJson => { @@ -1467,7 +1503,8 @@ impl Service { RoomMessageEventContent::text_plain( "Expected code block in command body. Add --help for details.", ) - }.into() + } + .into() } AdminCommand::VerifyJson => { if body.len() > 2 && body[0].trim() == "```" && body.last().unwrap().trim() == "```" @@ -1528,7 +1565,8 @@ impl Service { RoomMessageEventContent::text_plain( "Expected code block in command body. Add --help for details.", ) - }.into() + } + .into() } AdminCommand::HashAndSignEvent { room_version_id } => { if body.len() > 2 @@ -1543,7 +1581,10 @@ impl Service { services().globals.server_name().as_str(), services().globals.keypair(), &mut value, - &room_version_id.rules().expect("Supported room version has rules").redaction, + &room_version_id + .rules() + .expect("Supported room version has rules") + .redaction, ) { RoomMessageEventContent::text_plain(format!("Invalid event: {e}")) } else { @@ -1558,7 +1599,8 @@ impl Service { RoomMessageEventContent::text_plain( "Expected code block in command body. Add --help for details.", ) - }.into() + } + .into() } AdminCommand::RemoveAlias { alias } => { if alias.server_name() != services().globals.server_name() { @@ -1583,7 +1625,8 @@ impl Service { .alias .remove_alias(&alias, services().globals.server_user())?; RoomMessageEventContent::text_plain("Alias removed successfully") - }.into() + } + .into() } }; From 02ddf2725871997671f973791736e66025c29c10 Mon Sep 17 00:00:00 2001 From: AndSDev Date: Tue, 10 Jun 2025 20:06:39 +0300 Subject: [PATCH 16/16] refactor: improve S3 config reading --- src/config/mod.rs | 106 ++++++++++++++++++++++++++-------------------- 1 file changed, 60 insertions(+), 46 deletions(-) diff --git a/src/config/mod.rs b/src/config/mod.rs index 03052879..887a93c5 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -242,36 +242,13 @@ impl From for Config { }), directory_structure, }, - IncompleteMediaBackendConfig::S3 { - endpoint, - bucket, - region, - path, - key, - secret, - duration, - bucket_use_path, - directory_structure, - } => { - let path_style = if bucket_use_path { - rusty_s3::UrlStyle::Path - } else { - rusty_s3::UrlStyle::VirtualHost - }; - - let bucket = rusty_s3::Bucket::new(endpoint, path_style, bucket, region) - .expect("Invalid S3 config"); - - let credentials = rusty_s3::Credentials::new(key, secret); - - MediaBackendConfig::S3 { - bucket: Box::new(bucket), - credentials: Box::new(credentials), - duration: Duration::from_secs(duration), - path, - directory_structure, - } - } + IncompleteMediaBackendConfig::S3(value) => MediaBackendConfig::S3 { + bucket: value.bucket, + credentials: value.credentials, + duration: value.duration, + path: value.path, + directory_structure: value.directory_structure, + }, }, retention: media.retention.into(), }; @@ -511,22 +488,7 @@ pub enum IncompleteMediaBackendConfig { #[serde(default)] directory_structure: DirectoryStructure, }, - S3 { - endpoint: Url, - bucket: String, - region: String, - path: Option, - - key: String, - secret: String, - - #[serde(default = "default_s3_duration")] - duration: u64, - #[serde(default = "false_fn")] - bucket_use_path: bool, - #[serde(default)] - directory_structure: DirectoryStructure, - }, + S3(S3MediaBackend), } impl Default for IncompleteMediaBackendConfig { @@ -607,6 +569,58 @@ impl TryFrom for DirectoryStructure { } } +#[derive(Deserialize)] +struct ShadowS3MediaBackend { + endpoint: Url, + bucket: String, + region: String, + path: Option, + + key: String, + secret: String, + + #[serde(default = "default_s3_duration")] + duration: u64, + #[serde(default = "false_fn")] + bucket_use_path: bool, + #[serde(default)] + directory_structure: DirectoryStructure, +} + +impl TryFrom for S3MediaBackend { + type Error = Error; + + fn try_from(value: ShadowS3MediaBackend) -> Result { + let path_style = if value.bucket_use_path { + rusty_s3::UrlStyle::Path + } else { + rusty_s3::UrlStyle::VirtualHost + }; + let credentials = rusty_s3::Credentials::new(value.key, value.secret); + + match rusty_s3::Bucket::new(value.endpoint, path_style, value.bucket, value.region) { + Ok(bucket) => Ok(S3MediaBackend { + bucket: Box::new(bucket), + credentials: Box::new(credentials), + duration: Duration::from_secs(value.duration), + path: value.path, + directory_structure: value.directory_structure, + }), + Err(_) => Err(Error::bad_config("Invalid S3 config")), + } + } +} + +#[derive(Deserialize, Debug, Clone)] +#[serde(try_from = "ShadowS3MediaBackend")] +pub struct S3MediaBackend { + pub bucket: Box, + pub credentials: Box, + pub duration: Duration, + pub path: Option, + pub directory_structure: DirectoryStructure, +} + const DEPRECATED_KEYS: &[&str] = &[ "cache_capacity", "turn_username",