diff --git a/Cargo.lock b/Cargo.lock index 6bb2699d..829d0d68 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -532,6 +532,7 @@ dependencies = [ "rusqlite", "rust-argon2", "rust-rocksdb", + "rusty-s3", "sd-notify", "serde", "serde_html_form", @@ -1807,6 +1808,16 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" +[[package]] +name = "md-5" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" +dependencies = [ + "cfg-if", + "digest", +] + [[package]] name = "memchr" version = "2.7.5" @@ -2312,6 +2323,16 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a993555f31e5a609f617c12db6250dedcac1b0a85076912c436e6fc9b2c8e6a3" +[[package]] +name = "quick-xml" +version = "0.37.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "331e97a1af0bf59823e6eadffe373d7b27f485be8748f71471c662c1f269b7fb" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "quote" version = "1.0.40" @@ -2887,6 +2908,25 @@ version = "1.0.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a0d197bd2c9dc6e53b84da9556a69ba4cdfab8619eb41a8bd1cc2027a0f6b1d" +[[package]] +name = "rusty-s3" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f51a5a6b15f25d3e10c068039ee13befb6110fcb36c2b26317bcbdc23484d96" +dependencies = [ + "base64 0.22.1", + "hmac", + "md-5", + "percent-encoding", + "quick-xml", + "serde", + "serde_json", + "sha2", + "time", + "url", + "zeroize", +] + [[package]] name = "ryu" version = "1.0.20" diff --git a/Cargo.toml b/Cargo.toml index 557b155b..f282461b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -89,6 +89,8 @@ image = { version = "0.25", default-features = false, features = [ # Used for creating media filenames hex = "0.4" sha2 = "0.10" +# Used for S3 media backend +rusty-s3 = "0.7.0" # Used for parsing media retention policies from the config bytesize = { version = "2", features = ["serde"] } humantime-serde = "1" diff --git a/docs/configuration.md b/docs/configuration.md index 145c2c48..39ca6f79 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -99,6 +99,33 @@ depth = 4 length = 2 ``` +#### S3 backend +The S3 backend has the following fields: +- `endpoint`: The URL of the S3 endpoint to connect to +- `bucket`: The name of the S3 bucket to use for storage. This bucket must already exist and your credentials must have access to it +- `region`: The region where your S3 bucket is located +- `path`: The base directory where all the media files will be stored (defaults to + root of the bucket) +- `key`: Your Access Key ID +- `secret`: Your Secret Access Key +- `duration`: The time (in seconds) that signed requests to the S3 bucket will be valid (default: `30`) +- `bucket_use_path`: Controls the structure of the path to files in S3. If `true`, the bucket name will be included as part of the file path. If `false` (or omitted), it will be used as the bucket name in the domain name +- `directory_structure`: This is a table, used to configure how files are to be distributed within + the media directory (see [Filesystem backend](#filesystem-backend) for details) + +##### Example: +```toml +[global.media] +backend = "s3" +endpoint = "http://minio:9000", +bucket = "test", +region = "minio", +key = "", +secret = "", +duration = "15", +bucket_use_path = "true", +``` + #### Retention policies Over time, the amount of media will keep growing, even if they were only accessed once. Retention policies allow for media files to automatically be deleted if they meet certain crietia, diff --git a/src/config/mod.rs b/src/config/mod.rs index 95f1c76e..887a93c5 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -242,6 +242,13 @@ impl From for Config { }), directory_structure, }, + IncompleteMediaBackendConfig::S3(value) => MediaBackendConfig::S3 { + bucket: value.bucket, + credentials: value.credentials, + duration: value.duration, + path: value.path, + directory_structure: value.directory_structure, + }, }, retention: media.retention.into(), }; @@ -481,6 +488,7 @@ pub enum IncompleteMediaBackendConfig { #[serde(default)] directory_structure: DirectoryStructure, }, + S3(S3MediaBackend), } impl Default for IncompleteMediaBackendConfig { @@ -498,6 +506,13 @@ pub enum MediaBackendConfig { path: String, directory_structure: DirectoryStructure, }, + S3 { + bucket: Box, + credentials: Box, + duration: Duration, + path: Option, + directory_structure: DirectoryStructure, + }, } #[derive(Debug, Clone, Deserialize)] @@ -554,6 +569,58 @@ impl TryFrom for DirectoryStructure { } } +#[derive(Deserialize)] +struct ShadowS3MediaBackend { + endpoint: Url, + bucket: String, + region: String, + path: Option, + + key: String, + secret: String, + + #[serde(default = "default_s3_duration")] + duration: u64, + #[serde(default = "false_fn")] + bucket_use_path: bool, + #[serde(default)] + directory_structure: DirectoryStructure, +} + +impl TryFrom for S3MediaBackend { + type Error = Error; + + fn try_from(value: ShadowS3MediaBackend) -> Result { + let path_style = if value.bucket_use_path { + rusty_s3::UrlStyle::Path + } else { + rusty_s3::UrlStyle::VirtualHost + }; + let credentials = rusty_s3::Credentials::new(value.key, value.secret); + + match rusty_s3::Bucket::new(value.endpoint, path_style, value.bucket, value.region) { + Ok(bucket) => Ok(S3MediaBackend { + bucket: Box::new(bucket), + credentials: Box::new(credentials), + duration: Duration::from_secs(value.duration), + path: value.path, + directory_structure: value.directory_structure, + }), + Err(_) => Err(Error::bad_config("Invalid S3 config")), + } + } +} + +#[derive(Deserialize, Debug, Clone)] +#[serde(try_from = "ShadowS3MediaBackend")] +pub struct S3MediaBackend { + pub bucket: Box, + pub credentials: Box, + pub duration: Duration, + pub path: Option, + pub directory_structure: DirectoryStructure, +} + const DEPRECATED_KEYS: &[&str] = &[ "cache_capacity", "turn_username", @@ -727,3 +794,7 @@ fn default_openid_token_ttl() -> u64 { pub fn default_default_room_version() -> RoomVersionId { RoomVersionId::V10 } + +fn default_s3_duration() -> u64 { + 30 +} diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index e7f169fb..72ea506a 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -589,7 +589,8 @@ impl Service { RoomMessageEventContent::text_plain( "Expected code block in command body. Add --help for details.", ) - }.into() + } + .into() } AdminCommand::UnregisterAppservice { appservice_identifier, @@ -602,7 +603,8 @@ impl Service { Err(e) => RoomMessageEventContent::text_plain(format!( "Failed to unregister appservice: {e}" )), - }.into(), + } + .into(), AdminCommand::ListAppservices => { let appservices = services().appservice.iter_ids().await; let output = format!( @@ -640,7 +642,8 @@ impl Service { RoomMessageEventContent::text_plain(&msg) } Err(e) => RoomMessageEventContent::text_plain(e.to_string()), - }.into(), + } + .into(), AdminCommand::IncomingFederation => { let map = services().globals.roomid_federationhandletime.read().await; let mut msg: String = format!("Handling {} incoming pdus:\n", map.len()); @@ -681,7 +684,8 @@ impl Service { )) } else { RoomMessageEventContent::text_plain("Event not found.") - }.into() + } + .into() } AdminCommand::ParsePdu => { if body.len() > 2 && body[0].trim() == "```" && body.last().unwrap().trim() == "```" @@ -715,7 +719,8 @@ impl Service { } } else { RoomMessageEventContent::text_plain("Expected code block in command body.") - }.into() + } + .into() } AdminCommand::GetPdu { event_id } => { let mut outlier = false; @@ -753,7 +758,8 @@ impl Service { ) } None => RoomMessageEventContent::text_plain("PDU not found."), - }.into() + } + .into() } AdminCommand::MemoryUsage => { let response1 = services().memory_usage().await; @@ -761,7 +767,8 @@ impl Service { RoomMessageEventContent::text_plain(format!( "Services:\n{response1}\n\nDatabase:\n{response2}" - )).into() + )) + .into() } AdminCommand::ClearDatabaseCaches { amount } => { services().globals.db.clear_caches(amount); @@ -786,7 +793,8 @@ impl Service { Err(e) => { return Ok(RoomMessageEventContent::text_plain(format!( "The supplied username is not a valid username: {e}" - )).into()) + )) + .into()) } }; @@ -794,7 +802,8 @@ impl Service { if user_id.server_name() != services().globals.server_name() { return Ok(RoomMessageEventContent::text_plain( "The specified user is not from this server!", - ).into()); + ) + .into()); }; // Check if the specified user is valid @@ -808,7 +817,8 @@ impl Service { { return Ok(RoomMessageEventContent::text_plain( "The specified user does not exist!", - ).into()); + ) + .into()); } let new_password = utils::random_string(AUTO_GEN_PASSWORD_LENGTH); @@ -823,7 +833,8 @@ impl Service { Err(e) => RoomMessageEventContent::text_plain(format!( "Couldn't reset the password for user {user_id}: {e}" )), - }.into() + } + .into() } AdminCommand::CreateUser { username, password } => { let password = @@ -837,7 +848,8 @@ impl Service { Err(e) => { return Ok(RoomMessageEventContent::text_plain(format!( "The supplied username is not a valid username: {e}" - )).into()) + )) + .into()) } }; @@ -845,18 +857,21 @@ impl Service { if user_id.server_name() != services().globals.server_name() { return Ok(RoomMessageEventContent::text_plain( "The specified user is not from this server!", - ).into()); + ) + .into()); }; if user_id.is_historical() { return Ok(RoomMessageEventContent::text_plain(format!( "Userid {user_id} is not allowed due to historical" - )).into()); + )) + .into()); } if services().users.exists(&user_id)? { return Ok(RoomMessageEventContent::text_plain(format!( "Userid {user_id} already exists" - )).into()); + )) + .into()); } // Create user services().users.create(&user_id, Some(password.as_str()))?; @@ -893,26 +908,26 @@ impl Service { // Inhibit login does not work for guests RoomMessageEventContent::text_plain(format!( "Created user with user_id: {user_id} and password: {password}" - )).into() + )) + .into() } - AdminCommand::AllowRegistration { status } => { - if let Some(status) = status { - services().globals.set_registration(status).await; - RoomMessageEventContent::text_plain(if status { - "Registration is now enabled" - } else { - "Registration is now disabled" - }) + AdminCommand::AllowRegistration { status } => if let Some(status) = status { + services().globals.set_registration(status).await; + RoomMessageEventContent::text_plain(if status { + "Registration is now enabled" } else { - RoomMessageEventContent::text_plain( - if services().globals.allow_registration().await { - "Registration is currently enabled" - } else { - "Registration is currently disabled" - }, - ) - }.into() + "Registration is now disabled" + }) + } else { + RoomMessageEventContent::text_plain( + if services().globals.allow_registration().await { + "Registration is currently enabled" + } else { + "Registration is currently disabled" + }, + ) } + .into(), AdminCommand::DisableRoom { room_id } => { services().rooms.metadata.disable_room(&room_id, true)?; RoomMessageEventContent::text_plain("Room disabled.").into() @@ -955,6 +970,7 @@ impl Service { services() .media .purge_from_user(&user_id, purge_media.force_filehash, after) + .await .len() } else { 0 @@ -1023,13 +1039,17 @@ impl Service { failed_count += services() .media .purge_from_user(user_id, purge_media.force_filehash, after) + .await .len(); } } let mut message = format!("Deactivated {deactivation_count} accounts."); if !admins.is_empty() { - message.push_str(&format!("\nSkipped admin accounts: {:?}. Use --force to deactivate admin accounts",admins.join(", "))); + message.push_str(&format!( + "\nSkipped admin accounts: {:?}. Use --force to deactivate admin accounts", + admins.join(", ") + )); } if failed_count != 0 { message.push_str(&format!( @@ -1042,14 +1062,19 @@ impl Service { RoomMessageEventContent::text_plain( "Expected code block in command body. Add --help for details.", ) - }.into() + } + .into() } AdminCommand::QueryMedia { mxc } => { let Ok((server_name, media_id)) = mxc.parts() else { return Ok(RoomMessageEventContent::text_plain("Invalid media MXC").into()); }; - let MediaQuery{ is_blocked, source_file, thumbnails } = services().media.query(server_name, media_id)?; + let MediaQuery { + is_blocked, + source_file, + thumbnails, + } = services().media.query(server_name, media_id)?; let mut message = format!("Is blocked Media ID: {is_blocked}"); if let Some(MediaQueryFileInfo { @@ -1060,14 +1085,16 @@ impl Service { unauthenticated_access_permitted, is_blocked_via_filehash, file_info: time_info, - }) = source_file { + }) = source_file + { message.push_str("\n\nInformation on full (non-thumbnail) file:\n"); if let Some(FileInfo { creation, last_access, size, - }) = time_info { + }) = time_info + { message.push_str(&format!("\nIs stored: true\nCreated at: {}\nLast accessed at: {}\nSize of file: {}", DateTime::from_timestamp(creation.try_into().unwrap_or(i64::MAX),0).expect("Timestamp is within range"), DateTime::from_timestamp(last_access.try_into().unwrap_or(i64::MAX),0).expect("Timestamp is within range"), @@ -1096,7 +1123,7 @@ impl Service { message.push_str("\n\nInformation on thumbnails of media:"); } - for MediaQueryThumbInfo{ + for MediaQueryThumbInfo { width, height, sha256_hex, @@ -1105,13 +1132,15 @@ impl Service { unauthenticated_access_permitted, is_blocked_via_filehash, file_info: time_info, - } in thumbnails { + } in thumbnails + { message.push_str(&format!("\n\nDimensions: {width}x{height}")); if let Some(FileInfo { creation, last_access, size, - }) = time_info { + }) = time_info + { message.push_str(&format!("\nIs stored: true\nCreated at: {}\nLast accessed at: {}\nSize of file: {}", DateTime::from_timestamp(creation.try_into().unwrap_or(i64::MAX),0).expect("Timestamp is within range"), DateTime::from_timestamp(last_access.try_into().unwrap_or(i64::MAX),0).expect("Timestamp is within range"), @@ -1145,7 +1174,8 @@ impl Service { file, content_type, content_disposition, - } = client_server::media::get_content(server_name, media_id.to_owned(), true, true).await?; + } = client_server::media::get_content(server_name, media_id.to_owned(), true, true) + .await?; if let Ok(image) = image::load_from_memory(&file) { let filename = content_disposition.and_then(|cd| cd.filename); @@ -1185,10 +1215,7 @@ impl Service { } } AdminCommand::ListMedia { - user_server_filter: ListMediaArgs { - user, - server, - }, + user_server_filter: ListMediaArgs { user, server }, include_thumbnails, content_type, uploaded_before, @@ -1201,7 +1228,7 @@ impl Service { r#""#, ); - for MediaListItem{ + for MediaListItem { server_name, media_id, uploader_localpart, @@ -1211,9 +1238,8 @@ impl Service { size, creation, } in services().media.list( - user - .map(ServerNameOrUserId::UserId) - .or_else(|| server.map(ServerNameOrUserId::ServerName)), + user.map(ServerNameOrUserId::UserId) + .or_else(|| server.map(ServerNameOrUserId::ServerName)), include_thumbnails, content_type.as_deref(), uploaded_before @@ -1227,15 +1253,20 @@ impl Service { .transpose() .map_err(|_| Error::AdminCommand("Timestamp must be after unix epoch"))? .as_ref() - .map(Duration::as_secs) + .map(Duration::as_secs), )? { - - let user_id = uploader_localpart.map(|localpart| format!("@{localpart}:{server_name}")).unwrap_or_default(); + let user_id = uploader_localpart + .map(|localpart| format!("@{localpart}:{server_name}")) + .unwrap_or_default(); let content_type = content_type.unwrap_or_default(); let filename = filename.unwrap_or_default(); - let dimensions = dimensions.map(|(w, h)| format!("{w}x{h}")).unwrap_or_default(); + let dimensions = dimensions + .map(|(w, h)| format!("{w}x{h}")) + .unwrap_or_default(); let size = ByteSize::b(size).display().si(); - let creation = DateTime::from_timestamp(creation.try_into().unwrap_or(i64::MAX),0).expect("Timestamp is within range"); + let creation = + DateTime::from_timestamp(creation.try_into().unwrap_or(i64::MAX), 0) + .expect("Timestamp is within range"); markdown_message .push_str(&format!("\n| mxc://{server_name}/{media_id} | {dimensions} | {creation} | {user_id} | {content_type} | {filename} | {size} |")); @@ -1248,11 +1279,10 @@ impl Service { html_message.push_str("
MXC URIDimensions (if thumbnail)Created/Downloaded atUploaderContent-TypeFilenameSize
"); RoomMessageEventContent::text_html(markdown_message, html_message).into() - }, - AdminCommand::PurgeMedia => media_from_body(body).map_or_else( - |message| message, - |media| { - let failed_count = services().media.purge(&media, true).len(); + } + AdminCommand::PurgeMedia => match media_from_body(body) { + Ok(media) => { + let failed_count = services().media.purge(&media, true).await.len(); if failed_count == 0 { RoomMessageEventContent::text_plain("Successfully purged media") @@ -1260,9 +1290,11 @@ impl Service { RoomMessageEventContent::text_plain(format!( "Failed to delete {failed_count} media, check logs for more details" )) - }.into() - }, - ), + } + .into() + } + Err(message) => message, + }, AdminCommand::PurgeMediaFromUsers { from_last, force_filehash, @@ -1282,6 +1314,7 @@ impl Service { failed_count += services() .media .purge_from_user(user_id, force_filehash, after) + .await .len(); } @@ -1296,7 +1329,8 @@ impl Service { RoomMessageEventContent::text_plain( "Expected code block in command body. Add --help for details.", ) - }.into() + } + .into() } AdminCommand::PurgeMediaFromServer { server_id: server_name, @@ -1314,6 +1348,7 @@ impl Service { let failed_count = services() .media .purge_from_server(&server_name, force_filehash, after) + .await .len(); if failed_count == 0 { @@ -1324,14 +1359,14 @@ impl Service { RoomMessageEventContent::text_plain(format!( "Failed to purge {failed_count} media, check logs for more details" )) - }.into() + } + .into() } - AdminCommand::BlockMedia { and_purge, reason } => media_from_body(body).map_or_else( - |message| message, - |media| { + AdminCommand::BlockMedia { and_purge, reason } => match media_from_body(body) { + Ok(media) => { let failed_count = services().media.block(&media, reason).len(); let failed_purge_count = if and_purge { - services().media.purge(&media, true).len() + services().media.purge(&media, true).await.len() } else { 0 }; @@ -1348,8 +1383,9 @@ impl Service { "Failed to block {failed_count}, and purge {failed_purge_count} media, check logs for more details" )) }.into() - }, - ), + } + Err(message) => message, + }, AdminCommand::BlockMediaFromUsers { from_last, reason } => { let after = from_last.map(unix_secs_from_duration).transpose()?; @@ -1385,7 +1421,8 @@ impl Service { RoomMessageEventContent::text_plain( "Expected code block in command body. Add --help for details.", ) - }.into() + } + .into() } AdminCommand::ListBlockedMedia => { let mut markdown_message = String::from( @@ -1402,7 +1439,8 @@ impl Service { unix_secs, reason, sha256_hex, - }) = media else { + }) = media + else { continue; }; @@ -1415,8 +1453,9 @@ impl Service { .flatten() .expect("Time is valid"); - markdown_message - .push_str(&format!("\n| {sha256_hex} | mxc://{server_name}/{media_id} | {time} | {reason} |")); + markdown_message.push_str(&format!( + "\n| {sha256_hex} | mxc://{server_name}/{media_id} | {time} | {reason} |" + )); html_message.push_str(&format!( "{sha256_hex}mxc://{server_name}/{media_id}{time}{reason}", @@ -1438,7 +1477,8 @@ impl Service { RoomMessageEventContent::text_plain(format!( "Failed to unblock {failed_count} media, check logs for more details" )) - }.into() + } + .into() }, ), AdminCommand::SignJson => { @@ -1463,7 +1503,8 @@ impl Service { RoomMessageEventContent::text_plain( "Expected code block in command body. Add --help for details.", ) - }.into() + } + .into() } AdminCommand::VerifyJson => { if body.len() > 2 && body[0].trim() == "```" && body.last().unwrap().trim() == "```" @@ -1524,7 +1565,8 @@ impl Service { RoomMessageEventContent::text_plain( "Expected code block in command body. Add --help for details.", ) - }.into() + } + .into() } AdminCommand::HashAndSignEvent { room_version_id } => { if body.len() > 2 @@ -1539,7 +1581,10 @@ impl Service { services().globals.server_name().as_str(), services().globals.keypair(), &mut value, - &room_version_id.rules().expect("Supported room version has rules").redaction, + &room_version_id + .rules() + .expect("Supported room version has rules") + .redaction, ) { RoomMessageEventContent::text_plain(format!("Invalid event: {e}")) } else { @@ -1554,7 +1599,8 @@ impl Service { RoomMessageEventContent::text_plain( "Expected code block in command body. Add --help for details.", ) - }.into() + } + .into() } AdminCommand::RemoveAlias { alias } => { if alias.server_name() != services().globals.server_name() { @@ -1579,7 +1625,8 @@ impl Service { .alias .remove_alias(&alias, services().globals.server_user())?; RoomMessageEventContent::text_plain("Alias removed successfully") - }.into() + } + .into() } }; diff --git a/src/service/globals/mod.rs b/src/service/globals/mod.rs index f55e8c1a..dc2e1a88 100644 --- a/src/service/globals/mod.rs +++ b/src/service/globals/mod.rs @@ -230,8 +230,6 @@ impl Service { shutdown: AtomicBool::new(false), }; - // Remove this exception once other media backends are added - #[allow(irrefutable_let_patterns)] if let MediaBackendConfig::FileSystem { path, .. } = &s.config.media.backend { fs::create_dir_all(path)?; } @@ -490,26 +488,40 @@ impl Service { directory_structure: &DirectoryStructure, sha256_hex: &str, ) -> Result { - let mut r = PathBuf::new(); - r.push(media_directory); + Ok(PathBuf::from_iter(self.split_media_path( + Some(media_directory), + directory_structure, + sha256_hex, + )?)) + } - if let DirectoryStructure::Deep { length, depth } = directory_structure { - let mut filename = sha256_hex; - for _ in 0..depth.get() { - let (current_path, next) = filename.split_at(length.get().into()); - filename = next; - r.push(current_path); + pub fn split_media_path<'a>( + &self, + media_directory: Option<&'a str>, + directory_structure: &DirectoryStructure, + sha256_hex: &'a str, + ) -> Result> { + match directory_structure { + DirectoryStructure::Flat => match media_directory { + Some(path) => Ok(vec![path, sha256_hex]), + None => Ok(vec![sha256_hex]), + }, + DirectoryStructure::Deep { length, depth } => { + let mut r: Vec<&'a str> = Vec::with_capacity((depth.get() + 2).into()); + if let Some(path) = media_directory { + r.push(path); + } + let mut filename = sha256_hex; + for _ in 0..depth.get() { + let (current_path, next) = filename.split_at(length.get().into()); + filename = next; + r.push(current_path); + } + r.push(filename); + + Ok(r) } - - // Create all directories leading up to file - fs::create_dir_all(&r).inspect_err(|e| error!("Error creating leading directories for media with sha256 hash of {sha256_hex}: {e}"))?; - - r.push(filename); - } else { - r.push(sha256_hex); } - - Ok(r) } pub async fn shutdown(&self) { diff --git a/src/service/media/mod.rs b/src/service/media/mod.rs index 16060dfb..502996e9 100644 --- a/src/service/media/mod.rs +++ b/src/service/media/mod.rs @@ -1,12 +1,15 @@ mod data; -use std::{fs, io::Cursor, sync::Arc}; +use std::{io::Cursor, sync::Arc}; pub use data::Data; +use futures_util::{stream, StreamExt}; +use http::StatusCode; use ruma::{ api::client::{error::ErrorKind, media::is_safe_inline_content_type}, http_headers::{ContentDisposition, ContentDispositionType}, OwnedServerName, ServerName, UserId, }; +use rusty_s3::S3Action; use sha2::{digest::Output, Digest, Sha256}; use tracing::{error, info}; @@ -24,7 +27,7 @@ pub struct DbFileMeta { } use tokio::{ - fs::File, + fs::{self, File}, io::{AsyncReadExt, AsyncWriteExt}, }; @@ -142,7 +145,7 @@ impl Service { let count = files.iter().filter(|res| res.is_ok()).count(); info!("Found {count} media files to delete"); - purge_files(files); + purge_files(files).await; Ok(()) } @@ -159,11 +162,14 @@ impl Service { ) -> Result<()> { let (sha256_digest, sha256_hex) = generate_digests(file); - for error in self.clear_required_space( - &sha256_digest, - MediaType::new(servername, false), - size(file)?, - )? { + for error in self + .clear_required_space( + &sha256_digest, + MediaType::new(servername, false), + size(file)?, + ) + .await? + { error!( "Error deleting file to clear space when downloading/creating new media file: {error}" ) @@ -207,7 +213,8 @@ impl Service { &sha256_digest, MediaType::new(servername, true), size(file)?, - )?; + ) + .await?; self.db.create_thumbnail_metadata( sha256_digest, @@ -444,10 +451,14 @@ impl Service { /// purged have that sha256 hash. /// /// Returns errors for all the files that were failed to be deleted, if any. - pub fn purge(&self, media: &[(OwnedServerName, String)], force_filehash: bool) -> Vec { + pub async fn purge( + &self, + media: &[(OwnedServerName, String)], + force_filehash: bool, + ) -> Vec { let hashes = self.db.purge_and_get_hashes(media, force_filehash); - purge_files(hashes) + purge_files(hashes).await } /// Purges all (past a certain time in unix seconds, if specified) media @@ -462,7 +473,7 @@ impl Service { /// /// Note: it only currently works for local users, as we cannot determine who /// exactly uploaded the file when it comes to remove users. - pub fn purge_from_user( + pub async fn purge_from_user( &self, user_id: &UserId, force_filehash: bool, @@ -472,7 +483,7 @@ impl Service { .db .purge_and_get_hashes_from_user(user_id, force_filehash, after); - purge_files(hashes) + purge_files(hashes).await } /// Purges all (past a certain time in unix seconds, if specified) media @@ -484,7 +495,7 @@ impl Service { /// purged have that sha256 hash. /// /// Returns errors for all the files that were failed to be deleted, if any. - pub fn purge_from_server( + pub async fn purge_from_server( &self, server_name: &ServerName, force_filehash: bool, @@ -494,7 +505,7 @@ impl Service { .db .purge_and_get_hashes_from_server(server_name, force_filehash, after); - purge_files(hashes) + purge_files(hashes).await } /// Checks whether the media has been blocked by administrators, returning either @@ -558,7 +569,7 @@ impl Service { self.db.list_blocked() } - pub fn clear_required_space( + pub async fn clear_required_space( &self, sha256_digest: &[u8], media_type: MediaType, @@ -577,7 +588,7 @@ impl Service { info!("Deleting {} files to clear space for new media file", count); } - Ok(purge_files(files)) + Ok(purge_files(files).await) } /// Fetches the file from the configured media backend, as well as updating the "last accessed" @@ -606,6 +617,44 @@ impl Service { file } + MediaBackendConfig::S3 { + bucket, + credentials, + duration, + path, + directory_structure, + } => { + let sha256_hex = hex::encode(sha256_digest); + let file_name = services() + .globals + .split_media_path(path.as_deref(), directory_structure, &sha256_hex)? + .join("/"); + let url = bucket + .get_object(Some(credentials), &file_name) + .sign(*duration); + + let client = services().globals.default_client(); + let resp = client.get(url).send().await?; + + if resp.status() == StatusCode::NOT_FOUND { + return Err(Error::BadRequest( + ErrorKind::NotFound, + "File does not exist", + )); + } + if !resp.status().is_success() { + error!( + "Failed to get file with sha256 hash of \"{}\" from S3 bucket: {}", + sha256_hex, + resp.text().await? + ); + return Err(Error::BadS3Response( + "Failed to get media file from S3 bucket", + )); + } + + resp.bytes().await?.to_vec() + } }; if let Some((server_name, media_id)) = original_file_id { @@ -631,9 +680,46 @@ pub async fn create_file(sha256_hex: &str, file: &[u8]) -> Result<()> { .globals .get_media_path(path, directory_structure, sha256_hex)?; + // Create all directories leading up to file + if let DirectoryStructure::Deep { .. } = directory_structure { + if let Some(parent) = path.parent() { + fs::create_dir_all(&parent).await.inspect_err(|e| error!("Error creating leading directories for media with sha256 hash of {sha256_hex}: {e}"))?; + } + } + let mut f = File::create(path).await?; f.write_all(file).await?; } + MediaBackendConfig::S3 { + bucket, + credentials, + duration, + path, + directory_structure, + } => { + let file_name = services() + .globals + .split_media_path(path.as_deref(), directory_structure, sha256_hex)? + .join("/"); + + let url = bucket + .put_object(Some(credentials), &file_name) + .sign(*duration); + + let client = services().globals.default_client(); + let resp = client.put(url).body(file.to_vec()).send().await?; + + if !resp.status().is_success() { + error!( + "Failed to upload file with sha256 hash of \"{}\" to S3 bucket: {}", + sha256_hex, + resp.text().await? + ); + return Err(Error::BadS3Response( + "Failed to upload media file to S3 bucket", + )); + } + } } Ok(()) @@ -643,21 +729,29 @@ pub async fn create_file(sha256_hex: &str, file: &[u8]) -> Result<()> { /// Returns a `Vec` of errors that occurred when attempting to delete the files /// /// Note: this does NOT remove the related metadata from the database -fn purge_files(hashes: Vec>) -> Vec { - hashes - .into_iter() - .map(|hash| match hash { - Ok(v) => delete_file(&v), - Err(e) => Err(e), +async fn purge_files(hashes: Vec>) -> Vec { + stream::iter(hashes) + .then(|hash| async move { + match hash { + Ok(v) => delete_file(&v).await, + Err(e) => Err(e), + } + }) + .filter_map(|r| async { + if let Err(e) = r { + Some(e) + } else { + None + } }) - .filter_map(|r| if let Err(e) = r { Some(e) } else { None }) .collect() + .await } /// Deletes the given file from the media backend /// /// Note: this does NOT remove the related metadata from the database -fn delete_file(sha256_hex: &str) -> Result<()> { +async fn delete_file(sha256_hex: &str) -> Result<()> { match &services().globals.config.media.backend { MediaBackendConfig::FileSystem { path, @@ -668,7 +762,7 @@ fn delete_file(sha256_hex: &str) -> Result<()> { .globals .get_media_path(path, directory_structure, sha256_hex)?; - if let Err(e) = fs::remove_file(&path) { + if let Err(e) = fs::remove_file(&path).await { // Multiple files with the same filehash might be requseted to be deleted if e.kind() != std::io::ErrorKind::NotFound { error!("Error removing media from filesystem: {e}"); @@ -683,7 +777,7 @@ fn delete_file(sha256_hex: &str) -> Result<()> { // Here at the start so that the first time, the file gets removed from the path path.pop(); - if let Err(e) = fs::remove_dir(&path) { + if let Err(e) = fs::remove_dir(&path).await { if e.kind() == std::io::ErrorKind::DirectoryNotEmpty { break; } else { @@ -696,6 +790,35 @@ fn delete_file(sha256_hex: &str) -> Result<()> { } } } + MediaBackendConfig::S3 { + bucket, + credentials, + duration, + path, + directory_structure, + } => { + let file_name = services() + .globals + .split_media_path(path.as_deref(), directory_structure, sha256_hex)? + .join("/"); + let url = bucket + .delete_object(Some(credentials), &file_name) + .sign(*duration); + + let client = services().globals.default_client(); + let resp = client.delete(url).send().await?; + + if !resp.status().is_success() { + error!( + "Failed to delete file with sha256 hash of \"{}\" from S3 bucket: {}", + sha256_hex, + resp.text().await? + ); + return Err(Error::BadS3Response( + "Failed to delete media file from S3 bucket", + )); + } + } } Ok(()) diff --git a/src/utils/error.rs b/src/utils/error.rs index 1b1a26db..c9e83843 100644 --- a/src/utils/error.rs +++ b/src/utils/error.rs @@ -52,6 +52,8 @@ pub enum Error { source: std::io::Error, }, #[error("{0}")] + BadS3Response(&'static str), + #[error("{0}")] BadServerResponse(&'static str), #[error("{0}")] BadConfig(&'static str),