diff --git a/Cargo.lock b/Cargo.lock index b205f7f2..ca347020 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -533,6 +533,7 @@ dependencies = [ "rusqlite", "rust-argon2", "rust-rocksdb", + "rusty-s3", "sd-notify", "serde", "serde_html_form", @@ -1582,6 +1583,30 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" +[[package]] +name = "jiff" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be1f93b8b1eb69c77f24bbb0afdf66f54b632ee39af40ca21c4365a1d7347e49" +dependencies = [ + "jiff-static", + "log", + "portable-atomic", + "portable-atomic-util", + "serde", +] + +[[package]] +name = "jiff-static" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03343451ff899767262ec32146f6d559dd759fdadf42ff0e227c7c48f72594b4" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "jobserver" version = "0.1.33" @@ -1680,7 +1705,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07033963ba89ebaf1584d767badaa2e8fcec21aedea6b8c0346d487d49c28667" dependencies = [ "cfg-if", - "windows-targets 0.48.5", + "windows-targets 0.52.6", ] [[package]] @@ -1808,6 +1833,16 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" +[[package]] +name = "md-5" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" +dependencies = [ + "cfg-if", + "digest", +] + [[package]] name = "memchr" version = "2.7.5" @@ -2213,6 +2248,15 @@ version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483" +[[package]] +name = "portable-atomic-util" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8a2f0d8d040d7848a709caf78912debcc3f33ee4b3cac47d73d1e1069e83507" +dependencies = [ + "portable-atomic", +] + [[package]] name = "potential_utf" version = "0.1.2" @@ -2313,6 +2357,16 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a993555f31e5a609f617c12db6250dedcac1b0a85076912c436e6fc9b2c8e6a3" +[[package]] +name = "quick-xml" +version = "0.38.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9845d9dccf565065824e69f9f235fafba1587031eda353c1f1561cd6a6be78f4" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "quote" version = "1.0.40" @@ -2890,6 +2944,25 @@ version = "1.0.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a0d197bd2c9dc6e53b84da9556a69ba4cdfab8619eb41a8bd1cc2027a0f6b1d" +[[package]] +name = "rusty-s3" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fac2edd2f0b56bd79a7343f49afc01c2d41010df480538a510e0abc56044f66c" +dependencies = [ + "base64 0.22.1", + "hmac", + "jiff", + "md-5", + "percent-encoding", + "quick-xml", + "serde", + "serde_json", + "sha2", + "url", + "zeroize", +] + [[package]] name = "ryu" version = "1.0.20" diff --git a/Cargo.toml b/Cargo.toml index 7f87ff0d..a8c5918c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -154,6 +154,8 @@ tikv-jemallocator = { version = "0.6", features = [ sd-notify = { version = "0.4", optional = true } # Used for inspecting request errors http-body-util = "0.1.3" +# Used for S3 media backend +rusty-s3 = "0.8.1" # Used for matrix spec type definitions and helpers [dependencies.ruma] diff --git a/docs/configuration.md b/docs/configuration.md index 816fc006..5ed64a77 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -99,6 +99,33 @@ depth = 4 length = 2 ``` +#### S3 backend +The S3 backend has the following fields: +- `endpoint`: The URL of the S3 endpoint to connect to +- `bucket`: The name of the S3 bucket to use for storage. This bucket must already exist and your credentials must have access to it +- `region`: The region where your S3 bucket is located +- `path`: The base directory where all the media files will be stored (defaults to + root of the bucket) +- `key`: Your Access Key ID +- `secret`: Your Secret Access Key +- `duration`: The time (in seconds) that signed requests to the S3 bucket will be valid (default: ` 30`) +- `bucket_use_path`: Controls the structure of the path to files in S3. If `true`, the bucket name will be included as part of the file path. If `false` (or omitted), it will be used as the bucket name in the domain name +- `directory_structure`: This is a table, used to configure how files are to be distributed within + the media directory (see [Filesystem backend](#filesystem-backend) for details) + +##### Example: +```toml +[global.media] +backend = "s3" +endpoint = "http://minio:9000" +bucket = "test" +region = "minio" +key = "" +secret = "" +duration = 15 +bucket_use_path = false +``` + #### Retention policies Over time, the amount of media will keep growing, even if they were only accessed once. Retention policies allow for media files to automatically be deleted if they meet certain crietia, diff --git a/src/config/mod.rs b/src/config/mod.rs index 88215691..098dc20d 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -242,6 +242,7 @@ impl From for Config { }), directory_structure, }, + IncompleteMediaBackendConfig::S3(value) => MediaBackendConfig::S3(value), }, retention: media.retention.into(), }; @@ -481,6 +482,7 @@ pub enum IncompleteMediaBackendConfig { #[serde(default)] directory_structure: DirectoryStructure, }, + S3(S3MediaBackend), } impl Default for IncompleteMediaBackendConfig { @@ -498,6 +500,7 @@ pub enum MediaBackendConfig { path: String, directory_structure: DirectoryStructure, }, + S3(S3MediaBackend), } #[derive(Debug, Clone, Deserialize)] @@ -554,6 +557,58 @@ impl TryFrom for DirectoryStructure { } } +#[derive(Deserialize)] +struct ShadowS3MediaBackend { + endpoint: Url, + bucket: String, + region: String, + path: Option, + + key: String, + secret: String, + + #[serde(default = "default_s3_duration")] + duration: u64, + #[serde(default = "false_fn")] + bucket_use_path: bool, + #[serde(default)] + directory_structure: DirectoryStructure, +} + +impl TryFrom for S3MediaBackend { + type Error = Error; + + fn try_from(value: ShadowS3MediaBackend) -> Result { + let path_style = if value.bucket_use_path { + rusty_s3::UrlStyle::Path + } else { + rusty_s3::UrlStyle::VirtualHost + }; + let credentials = rusty_s3::Credentials::new(value.key, value.secret); + + match rusty_s3::Bucket::new(value.endpoint, path_style, value.bucket, value.region) { + Ok(bucket) => Ok(S3MediaBackend { + bucket: Box::new(bucket), + credentials: Box::new(credentials), + duration: Duration::from_secs(value.duration), + path: value.path, + directory_structure: value.directory_structure, + }), + Err(_) => Err(Error::bad_config("Invalid S3 config")), + } + } +} + +#[derive(Deserialize, Debug, Clone)] +#[serde(try_from = "ShadowS3MediaBackend")] +pub struct S3MediaBackend { + pub bucket: Box, + pub credentials: Box, + pub duration: Duration, + pub path: Option, + pub directory_structure: DirectoryStructure, +} + const DEPRECATED_KEYS: &[&str] = &[ "cache_capacity", "turn_username", @@ -727,3 +782,7 @@ fn default_openid_token_ttl() -> u64 { pub fn default_default_room_version() -> RoomVersionId { RoomVersionId::V12 } + +fn default_s3_duration() -> u64 { + 30 +} diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index df633b11..0a3f87b9 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -589,7 +589,8 @@ impl Service { RoomMessageEventContent::text_plain( "Expected code block in command body. Add --help for details.", ) - }.into() + } + .into() } AdminCommand::UnregisterAppservice { appservice_identifier, @@ -602,7 +603,8 @@ impl Service { Err(e) => RoomMessageEventContent::text_plain(format!( "Failed to unregister appservice: {e}" )), - }.into(), + } + .into(), AdminCommand::ListAppservices => { let appservices = services().appservice.iter_ids().await; let output = format!( @@ -640,7 +642,8 @@ impl Service { RoomMessageEventContent::text_plain(&msg) } Err(e) => RoomMessageEventContent::text_plain(e.to_string()), - }.into(), + } + .into(), AdminCommand::IncomingFederation => { let map = services().globals.roomid_federationhandletime.read().await; let mut msg: String = format!("Handling {} incoming pdus:\n", map.len()); @@ -681,7 +684,8 @@ impl Service { )) } else { RoomMessageEventContent::text_plain("Event not found.") - }.into() + } + .into() } AdminCommand::ParsePdu => { if body.len() > 2 && body[0].trim() == "```" && body.last().unwrap().trim() == "```" @@ -715,7 +719,8 @@ impl Service { } } else { RoomMessageEventContent::text_plain("Expected code block in command body.") - }.into() + } + .into() } AdminCommand::GetPdu { event_id } => { let mut outlier = false; @@ -753,7 +758,8 @@ impl Service { ) } None => RoomMessageEventContent::text_plain("PDU not found."), - }.into() + } + .into() } AdminCommand::MemoryUsage => { let response1 = services().memory_usage().await; @@ -761,7 +767,8 @@ impl Service { RoomMessageEventContent::text_plain(format!( "Services:\n{response1}\n\nDatabase:\n{response2}" - )).into() + )) + .into() } AdminCommand::ClearDatabaseCaches { amount } => { services().globals.db.clear_caches(amount); @@ -786,7 +793,8 @@ impl Service { Err(e) => { return Ok(RoomMessageEventContent::text_plain(format!( "The supplied username is not a valid username: {e}" - )).into()) + )) + .into()) } }; @@ -794,7 +802,8 @@ impl Service { if user_id.server_name() != services().globals.server_name() { return Ok(RoomMessageEventContent::text_plain( "The specified user is not from this server!", - ).into()); + ) + .into()); }; // Check if the specified user is valid @@ -808,7 +817,8 @@ impl Service { { return Ok(RoomMessageEventContent::text_plain( "The specified user does not exist!", - ).into()); + ) + .into()); } let new_password = utils::random_string(AUTO_GEN_PASSWORD_LENGTH); @@ -823,7 +833,8 @@ impl Service { Err(e) => RoomMessageEventContent::text_plain(format!( "Couldn't reset the password for user {user_id}: {e}" )), - }.into() + } + .into() } AdminCommand::CreateUser { username, password } => { let password = @@ -837,7 +848,8 @@ impl Service { Err(e) => { return Ok(RoomMessageEventContent::text_plain(format!( "The supplied username is not a valid username: {e}" - )).into()) + )) + .into()) } }; @@ -845,18 +857,21 @@ impl Service { if user_id.server_name() != services().globals.server_name() { return Ok(RoomMessageEventContent::text_plain( "The specified user is not from this server!", - ).into()); + ) + .into()); }; if user_id.is_historical() { return Ok(RoomMessageEventContent::text_plain(format!( "Userid {user_id} is not allowed due to historical" - )).into()); + )) + .into()); } if services().users.exists(&user_id)? { return Ok(RoomMessageEventContent::text_plain(format!( "Userid {user_id} already exists" - )).into()); + )) + .into()); } // Create user services().users.create(&user_id, Some(password.as_str()))?; @@ -893,26 +908,26 @@ impl Service { // Inhibit login does not work for guests RoomMessageEventContent::text_plain(format!( "Created user with user_id: {user_id} and password: {password}" - )).into() + )) + .into() } - AdminCommand::AllowRegistration { status } => { - if let Some(status) = status { - services().globals.set_registration(status).await; - RoomMessageEventContent::text_plain(if status { - "Registration is now enabled" - } else { - "Registration is now disabled" - }) + AdminCommand::AllowRegistration { status } => if let Some(status) = status { + services().globals.set_registration(status).await; + RoomMessageEventContent::text_plain(if status { + "Registration is now enabled" } else { - RoomMessageEventContent::text_plain( - if services().globals.allow_registration().await { - "Registration is currently enabled" - } else { - "Registration is currently disabled" - }, - ) - }.into() + "Registration is now disabled" + }) + } else { + RoomMessageEventContent::text_plain( + if services().globals.allow_registration().await { + "Registration is currently enabled" + } else { + "Registration is currently disabled" + }, + ) } + .into(), AdminCommand::DisableRoom { room_id } => { services().rooms.metadata.disable_room(&room_id, true)?; RoomMessageEventContent::text_plain("Room disabled.").into() @@ -955,6 +970,7 @@ impl Service { services() .media .purge_from_user(&user_id, purge_media.force_filehash, after) + .await .len() } else { 0 @@ -1023,13 +1039,17 @@ impl Service { failed_count += services() .media .purge_from_user(user_id, purge_media.force_filehash, after) + .await .len(); } } let mut message = format!("Deactivated {deactivation_count} accounts."); if !admins.is_empty() { - message.push_str(&format!("\nSkipped admin accounts: {:?}. Use --force to deactivate admin accounts",admins.join(", "))); + message.push_str(&format!( + "\nSkipped admin accounts: {:?}. Use --force to deactivate admin accounts", + admins.join(", ") + )); } if failed_count != 0 { message.push_str(&format!( @@ -1042,14 +1062,19 @@ impl Service { RoomMessageEventContent::text_plain( "Expected code block in command body. Add --help for details.", ) - }.into() + } + .into() } AdminCommand::QueryMedia { mxc } => { let Ok((server_name, media_id)) = mxc.parts() else { return Ok(RoomMessageEventContent::text_plain("Invalid media MXC").into()); }; - let MediaQuery{ is_blocked, source_file, thumbnails } = services().media.query(server_name, media_id)?; + let MediaQuery { + is_blocked, + source_file, + thumbnails, + } = services().media.query(server_name, media_id)?; let mut message = format!("Is blocked Media ID: {is_blocked}"); if let Some(MediaQueryFileInfo { @@ -1060,14 +1085,16 @@ impl Service { unauthenticated_access_permitted, is_blocked_via_filehash, file_info: time_info, - }) = source_file { + }) = source_file + { message.push_str("\n\nInformation on full (non-thumbnail) file:\n"); if let Some(FileInfo { creation, last_access, size, - }) = time_info { + }) = time_info + { message.push_str(&format!("\nIs stored: true\nCreated at: {}\nLast accessed at: {}\nSize of file: {}", DateTime::from_timestamp(creation.try_into().unwrap_or(i64::MAX),0).expect("Timestamp is within range"), DateTime::from_timestamp(last_access.try_into().unwrap_or(i64::MAX),0).expect("Timestamp is within range"), @@ -1096,7 +1123,7 @@ impl Service { message.push_str("\n\nInformation on thumbnails of media:"); } - for MediaQueryThumbInfo{ + for MediaQueryThumbInfo { width, height, sha256_hex, @@ -1105,13 +1132,15 @@ impl Service { unauthenticated_access_permitted, is_blocked_via_filehash, file_info: time_info, - } in thumbnails { + } in thumbnails + { message.push_str(&format!("\n\nDimensions: {width}x{height}")); if let Some(FileInfo { creation, last_access, size, - }) = time_info { + }) = time_info + { message.push_str(&format!("\nIs stored: true\nCreated at: {}\nLast accessed at: {}\nSize of file: {}", DateTime::from_timestamp(creation.try_into().unwrap_or(i64::MAX),0).expect("Timestamp is within range"), DateTime::from_timestamp(last_access.try_into().unwrap_or(i64::MAX),0).expect("Timestamp is within range"), @@ -1145,7 +1174,8 @@ impl Service { file, content_type, content_disposition, - } = client_server::media::get_content(server_name, media_id.to_owned(), true, true).await?; + } = client_server::media::get_content(server_name, media_id.to_owned(), true, true) + .await?; if let Ok(image) = image::load_from_memory(&file) { let filename = content_disposition.and_then(|cd| cd.filename); @@ -1185,10 +1215,7 @@ impl Service { } } AdminCommand::ListMedia { - user_server_filter: ListMediaArgs { - user, - server, - }, + user_server_filter: ListMediaArgs { user, server }, include_thumbnails, content_type, uploaded_before, @@ -1201,7 +1228,7 @@ impl Service { r#""#, ); - for MediaListItem{ + for MediaListItem { server_name, media_id, uploader_localpart, @@ -1211,9 +1238,8 @@ impl Service { size, creation, } in services().media.list( - user - .map(ServerNameOrUserId::UserId) - .or_else(|| server.map(ServerNameOrUserId::ServerName)), + user.map(ServerNameOrUserId::UserId) + .or_else(|| server.map(ServerNameOrUserId::ServerName)), include_thumbnails, content_type.as_deref(), uploaded_before @@ -1227,15 +1253,20 @@ impl Service { .transpose() .map_err(|_| Error::AdminCommand("Timestamp must be after unix epoch"))? .as_ref() - .map(Duration::as_secs) + .map(Duration::as_secs), )? { - - let user_id = uploader_localpart.map(|localpart| format!("@{localpart}:{server_name}")).unwrap_or_default(); + let user_id = uploader_localpart + .map(|localpart| format!("@{localpart}:{server_name}")) + .unwrap_or_default(); let content_type = content_type.unwrap_or_default(); let filename = filename.unwrap_or_default(); - let dimensions = dimensions.map(|(w, h)| format!("{w}x{h}")).unwrap_or_default(); + let dimensions = dimensions + .map(|(w, h)| format!("{w}x{h}")) + .unwrap_or_default(); let size = ByteSize::b(size).display().si(); - let creation = DateTime::from_timestamp(creation.try_into().unwrap_or(i64::MAX),0).expect("Timestamp is within range"); + let creation = + DateTime::from_timestamp(creation.try_into().unwrap_or(i64::MAX), 0) + .expect("Timestamp is within range"); markdown_message .push_str(&format!("\n| mxc://{server_name}/{media_id} | {dimensions} | {creation} | {user_id} | {content_type} | {filename} | {size} |")); @@ -1248,11 +1279,10 @@ impl Service { html_message.push_str("
MXC URIDimensions (if thumbnail)Created/Downloaded atUploaderContent-TypeFilenameSize
"); RoomMessageEventContent::text_html(markdown_message, html_message).into() - }, - AdminCommand::PurgeMedia => media_from_body(body).map_or_else( - |message| message, - |media| { - let failed_count = services().media.purge(&media, true).len(); + } + AdminCommand::PurgeMedia => match media_from_body(body) { + Ok(media) => { + let failed_count = services().media.purge(&media, true).await.len(); if failed_count == 0 { RoomMessageEventContent::text_plain("Successfully purged media") @@ -1260,9 +1290,11 @@ impl Service { RoomMessageEventContent::text_plain(format!( "Failed to delete {failed_count} media, check logs for more details" )) - }.into() - }, - ), + } + .into() + } + Err(message) => message, + }, AdminCommand::PurgeMediaFromUsers { from_last, force_filehash, @@ -1282,6 +1314,7 @@ impl Service { failed_count += services() .media .purge_from_user(user_id, force_filehash, after) + .await .len(); } @@ -1296,7 +1329,8 @@ impl Service { RoomMessageEventContent::text_plain( "Expected code block in command body. Add --help for details.", ) - }.into() + } + .into() } AdminCommand::PurgeMediaFromServer { server_id: server_name, @@ -1314,6 +1348,7 @@ impl Service { let failed_count = services() .media .purge_from_server(&server_name, force_filehash, after) + .await .len(); if failed_count == 0 { @@ -1324,14 +1359,14 @@ impl Service { RoomMessageEventContent::text_plain(format!( "Failed to purge {failed_count} media, check logs for more details" )) - }.into() + } + .into() } - AdminCommand::BlockMedia { and_purge, reason } => media_from_body(body).map_or_else( - |message| message, - |media| { + AdminCommand::BlockMedia { and_purge, reason } => match media_from_body(body) { + Ok(media) => { let failed_count = services().media.block(&media, reason).len(); let failed_purge_count = if and_purge { - services().media.purge(&media, true).len() + services().media.purge(&media, true).await.len() } else { 0 }; @@ -1348,8 +1383,9 @@ impl Service { "Failed to block {failed_count}, and purge {failed_purge_count} media, check logs for more details" )) }.into() - }, - ), + } + Err(message) => message, + }, AdminCommand::BlockMediaFromUsers { from_last, reason } => { let after = from_last.map(unix_secs_from_duration).transpose()?; @@ -1385,7 +1421,8 @@ impl Service { RoomMessageEventContent::text_plain( "Expected code block in command body. Add --help for details.", ) - }.into() + } + .into() } AdminCommand::ListBlockedMedia => { let mut markdown_message = String::from( @@ -1402,7 +1439,8 @@ impl Service { unix_secs, reason, sha256_hex, - }) = media else { + }) = media + else { continue; }; @@ -1415,8 +1453,9 @@ impl Service { .flatten() .expect("Time is valid"); - markdown_message - .push_str(&format!("\n| {sha256_hex} | mxc://{server_name}/{media_id} | {time} | {reason} |")); + markdown_message.push_str(&format!( + "\n| {sha256_hex} | mxc://{server_name}/{media_id} | {time} | {reason} |" + )); html_message.push_str(&format!( "{sha256_hex}mxc://{server_name}/{media_id}{time}{reason}", @@ -1438,7 +1477,8 @@ impl Service { RoomMessageEventContent::text_plain(format!( "Failed to unblock {failed_count} media, check logs for more details" )) - }.into() + } + .into() }, ), AdminCommand::SignJson => { @@ -1463,7 +1503,8 @@ impl Service { RoomMessageEventContent::text_plain( "Expected code block in command body. Add --help for details.", ) - }.into() + } + .into() } AdminCommand::VerifyJson => { if body.len() > 2 && body[0].trim() == "```" && body.last().unwrap().trim() == "```" @@ -1524,7 +1565,8 @@ impl Service { RoomMessageEventContent::text_plain( "Expected code block in command body. Add --help for details.", ) - }.into() + } + .into() } AdminCommand::HashAndSignEvent { room_version_id } => { if body.len() > 2 @@ -1539,7 +1581,10 @@ impl Service { services().globals.server_name().as_str(), services().globals.keypair(), &mut value, - &room_version_id.rules().expect("Supported room version has rules").redaction, + &room_version_id + .rules() + .expect("Supported room version has rules") + .redaction, ) { RoomMessageEventContent::text_plain(format!("Invalid event: {e}")) } else { @@ -1554,7 +1599,8 @@ impl Service { RoomMessageEventContent::text_plain( "Expected code block in command body. Add --help for details.", ) - }.into() + } + .into() } AdminCommand::RemoveAlias { alias } => { if alias.server_name() != services().globals.server_name() { @@ -1579,7 +1625,8 @@ impl Service { .alias .remove_alias(&alias, services().globals.server_user())?; RoomMessageEventContent::text_plain("Alias removed successfully") - }.into() + } + .into() } }; diff --git a/src/service/globals/mod.rs b/src/service/globals/mod.rs index d634f080..27d84d9a 100644 --- a/src/service/globals/mod.rs +++ b/src/service/globals/mod.rs @@ -231,8 +231,6 @@ impl Service { shutdown: AtomicBool::new(false), }; - // Remove this exception once other media backends are added - #[allow(irrefutable_let_patterns)] if let MediaBackendConfig::FileSystem { path, .. } = &s.config.media.backend { fs::create_dir_all(path)?; } @@ -488,26 +486,40 @@ impl Service { directory_structure: &DirectoryStructure, sha256_hex: &str, ) -> Result { - let mut r = PathBuf::new(); - r.push(media_directory); + Ok(PathBuf::from_iter(self.split_media_path( + Some(media_directory), + directory_structure, + sha256_hex, + ))) + } - if let DirectoryStructure::Deep { length, depth } = directory_structure { - let mut filename = sha256_hex; - for _ in 0..depth.get() { - let (current_path, next) = filename.split_at(length.get().into()); - filename = next; - r.push(current_path); + pub fn split_media_path<'a>( + &self, + media_directory: Option<&'a str>, + directory_structure: &DirectoryStructure, + sha256_hex: &'a str, + ) -> Vec<&'a str> { + match directory_structure { + DirectoryStructure::Flat => match media_directory { + Some(path) => vec![path, sha256_hex], + None => vec![sha256_hex], + }, + DirectoryStructure::Deep { length, depth } => { + let mut r: Vec<&'a str> = Vec::with_capacity((depth.get() + 2).into()); + if let Some(path) = media_directory { + r.push(path); + } + let mut filename = sha256_hex; + for _ in 0..depth.get() { + let (current_path, next) = filename.split_at(length.get().into()); + filename = next; + r.push(current_path); + } + r.push(filename); + + r } - - // Create all directories leading up to file - fs::create_dir_all(&r).inspect_err(|e| error!("Error creating leading directories for media with sha256 hash of {sha256_hex}: {e}"))?; - - r.push(filename); - } else { - r.push(sha256_hex); } - - Ok(r) } pub async fn shutdown(&self) { diff --git a/src/service/media/mod.rs b/src/service/media/mod.rs index 16060dfb..2f5c814d 100644 --- a/src/service/media/mod.rs +++ b/src/service/media/mod.rs @@ -1,17 +1,22 @@ mod data; -use std::{fs, io::Cursor, sync::Arc}; +use std::{io::Cursor, sync::Arc}; pub use data::Data; +use http::StatusCode; use ruma::{ api::client::{error::ErrorKind, media::is_safe_inline_content_type}, http_headers::{ContentDisposition, ContentDispositionType}, OwnedServerName, ServerName, UserId, }; +use rusty_s3::{ + actions::{DeleteObjectsResponse, ObjectIdentifier}, + S3Action, +}; use sha2::{digest::Output, Digest, Sha256}; -use tracing::{error, info}; +use tracing::{error, info, warn}; use crate::{ - config::{DirectoryStructure, MediaBackendConfig}, + config::{DirectoryStructure, MediaBackendConfig, S3MediaBackend}, services, utils, Error, Result, }; use image::imageops::FilterType; @@ -24,7 +29,7 @@ pub struct DbFileMeta { } use tokio::{ - fs::File, + fs::{self, File}, io::{AsyncReadExt, AsyncWriteExt}, }; @@ -142,7 +147,7 @@ impl Service { let count = files.iter().filter(|res| res.is_ok()).count(); info!("Found {count} media files to delete"); - purge_files(files); + purge_files(files).await; Ok(()) } @@ -159,11 +164,14 @@ impl Service { ) -> Result<()> { let (sha256_digest, sha256_hex) = generate_digests(file); - for error in self.clear_required_space( - &sha256_digest, - MediaType::new(servername, false), - size(file)?, - )? { + for error in self + .clear_required_space( + &sha256_digest, + MediaType::new(servername, false), + size(file)?, + ) + .await? + { error!( "Error deleting file to clear space when downloading/creating new media file: {error}" ) @@ -207,7 +215,8 @@ impl Service { &sha256_digest, MediaType::new(servername, true), size(file)?, - )?; + ) + .await?; self.db.create_thumbnail_metadata( sha256_digest, @@ -444,10 +453,14 @@ impl Service { /// purged have that sha256 hash. /// /// Returns errors for all the files that were failed to be deleted, if any. - pub fn purge(&self, media: &[(OwnedServerName, String)], force_filehash: bool) -> Vec { + pub async fn purge( + &self, + media: &[(OwnedServerName, String)], + force_filehash: bool, + ) -> Vec { let hashes = self.db.purge_and_get_hashes(media, force_filehash); - purge_files(hashes) + purge_files(hashes).await } /// Purges all (past a certain time in unix seconds, if specified) media @@ -462,7 +475,7 @@ impl Service { /// /// Note: it only currently works for local users, as we cannot determine who /// exactly uploaded the file when it comes to remove users. - pub fn purge_from_user( + pub async fn purge_from_user( &self, user_id: &UserId, force_filehash: bool, @@ -472,7 +485,7 @@ impl Service { .db .purge_and_get_hashes_from_user(user_id, force_filehash, after); - purge_files(hashes) + purge_files(hashes).await } /// Purges all (past a certain time in unix seconds, if specified) media @@ -484,7 +497,7 @@ impl Service { /// purged have that sha256 hash. /// /// Returns errors for all the files that were failed to be deleted, if any. - pub fn purge_from_server( + pub async fn purge_from_server( &self, server_name: &ServerName, force_filehash: bool, @@ -494,7 +507,7 @@ impl Service { .db .purge_and_get_hashes_from_server(server_name, force_filehash, after); - purge_files(hashes) + purge_files(hashes).await } /// Checks whether the media has been blocked by administrators, returning either @@ -558,7 +571,7 @@ impl Service { self.db.list_blocked() } - pub fn clear_required_space( + pub async fn clear_required_space( &self, sha256_digest: &[u8], media_type: MediaType, @@ -577,7 +590,7 @@ impl Service { info!("Deleting {} files to clear space for new media file", count); } - Ok(purge_files(files)) + Ok(purge_files(files).await) } /// Fetches the file from the configured media backend, as well as updating the "last accessed" @@ -606,6 +619,39 @@ impl Service { file } + MediaBackendConfig::S3(s3) => { + let sha256_hex = hex::encode(sha256_digest); + let file_name = services() + .globals + .split_media_path(s3.path.as_deref(), &s3.directory_structure, &sha256_hex) + .join("/"); + let url = s3 + .bucket + .get_object(Some(&s3.credentials), &file_name) + .sign(s3.duration); + + let client = services().globals.default_client(); + let resp = client.get(url).send().await?; + + if resp.status() == StatusCode::NOT_FOUND { + return Err(Error::BadRequest( + ErrorKind::NotFound, + "File does not exist", + )); + } + if !resp.status().is_success() { + error!( + "Failed to get file with sha256 hash of \"{}\" from S3 bucket: {}", + sha256_hex, + resp.text().await? + ); + return Err(Error::BadS3Response( + "Failed to get media file from S3 bucket", + )); + } + + resp.bytes().await?.to_vec() + } }; if let Some((server_name, media_id)) = original_file_id { @@ -631,76 +677,187 @@ pub async fn create_file(sha256_hex: &str, file: &[u8]) -> Result<()> { .globals .get_media_path(path, directory_structure, sha256_hex)?; + // Create all directories leading up to file + if let DirectoryStructure::Deep { .. } = directory_structure { + if let Some(parent) = path.parent() { + fs::create_dir_all(&parent).await.inspect_err(|e| error!("Error creating leading directories for media with sha256 hash of {sha256_hex}: {e}"))?; + } + } + let mut f = File::create(path).await?; f.write_all(file).await?; } + MediaBackendConfig::S3(s3) => { + let file_name = services() + .globals + .split_media_path(s3.path.as_deref(), &s3.directory_structure, sha256_hex) + .join("/"); + + let url = s3 + .bucket + .put_object(Some(&s3.credentials), &file_name) + .sign(s3.duration); + + let client = services().globals.default_client(); + let resp = client.put(url).body(file.to_vec()).send().await?; + + if !resp.status().is_success() { + error!( + "Failed to upload file with sha256 hash of \"{}\" to S3 bucket: {}", + sha256_hex, + resp.text().await? + ); + return Err(Error::BadS3Response( + "Failed to upload media file to S3 bucket", + )); + } + } } Ok(()) } +/// The size of a chunk for S3 delete operation. +const S3_CHUNK_SIZE: usize = 1000; + /// Purges the given files from the media backend /// Returns a `Vec` of errors that occurred when attempting to delete the files /// /// Note: this does NOT remove the related metadata from the database -fn purge_files(hashes: Vec>) -> Vec { - hashes - .into_iter() - .map(|hash| match hash { - Ok(v) => delete_file(&v), - Err(e) => Err(e), - }) - .filter_map(|r| if let Err(e) = r { Some(e) } else { None }) - .collect() -} +async fn purge_files(hashes: Vec>) -> Vec { + let (ok_values, err_values): (Vec<_>, Vec<_>) = + hashes.into_iter().partition(|result| result.is_ok()); + + let mut result: Vec = err_values.into_iter().map(Result::unwrap_err).collect(); + + let to_delete: Vec = ok_values.into_iter().map(Result::unwrap).collect(); -/// Deletes the given file from the media backend -/// -/// Note: this does NOT remove the related metadata from the database -fn delete_file(sha256_hex: &str) -> Result<()> { match &services().globals.config.media.backend { MediaBackendConfig::FileSystem { path, directory_structure, } => { - let mut path = - services() - .globals - .get_media_path(path, directory_structure, sha256_hex)?; - - if let Err(e) = fs::remove_file(&path) { - // Multiple files with the same filehash might be requseted to be deleted - if e.kind() != std::io::ErrorKind::NotFound { - error!("Error removing media from filesystem: {e}"); - Err(e)?; + for v in to_delete { + if let Err(err) = delete_file_fs(path, directory_structure, &v).await { + result.push(err); } } - - if let DirectoryStructure::Deep { length: _, depth } = directory_structure { - let mut depth = depth.get(); - - while depth > 0 { - // Here at the start so that the first time, the file gets removed from the path - path.pop(); - - if let Err(e) = fs::remove_dir(&path) { - if e.kind() == std::io::ErrorKind::DirectoryNotEmpty { - break; - } else { - error!("Error removing empty media directories: {e}"); - Err(e)?; - } + } + MediaBackendConfig::S3(s3) => { + for chunk in to_delete.chunks(S3_CHUNK_SIZE) { + match delete_files_s3(s3, chunk).await { + Ok(errors) => { + result.extend(errors); + } + Err(error) => { + result.push(error); } - - depth -= 1; } } } } + result +} + +/// Deletes the given file from the fs media backend +/// +/// Note: this does NOT remove the related metadata from the database +async fn delete_file_fs( + path: &str, + directory_structure: &DirectoryStructure, + sha256_hex: &str, +) -> Result<()> { + let mut path = services() + .globals + .get_media_path(path, directory_structure, sha256_hex)?; + + if let Err(e) = fs::remove_file(&path).await { + // Multiple files with the same filehash might be requseted to be deleted + if e.kind() != std::io::ErrorKind::NotFound { + error!("Error removing media from filesystem: {e}"); + Err(e)?; + } + } + + if let DirectoryStructure::Deep { length: _, depth } = directory_structure { + let mut depth = depth.get(); + + while depth > 0 { + // Here at the start so that the first time, the file gets removed from the path + path.pop(); + + if let Err(e) = fs::remove_dir(&path).await { + if e.kind() == std::io::ErrorKind::DirectoryNotEmpty { + break; + } else { + error!("Error removing empty media directories: {e}"); + Err(e)?; + } + } + + depth -= 1; + } + } + Ok(()) } +/// Deletes the given files from the s3 media backend +/// +/// Note: this does NOT remove the related metadata from the database +async fn delete_files_s3(s3: &S3MediaBackend, files: &[String]) -> Result> { + let objects: Vec = files + .iter() + .map(|v| { + services() + .globals + .split_media_path(s3.path.as_deref(), &s3.directory_structure, v) + .join("/") + }) + .map(|v| ObjectIdentifier::new(v.to_string())) + .collect(); + + let mut request = s3 + .bucket + .delete_objects(Some(&s3.credentials), objects.iter()); + request.set_quiet(true); + + let url = request.sign(s3.duration); + let (body, md5) = request.body_with_md5(); + + let client = services().globals.default_client(); + let resp = client + .post(url) + .header("Content-MD5", md5) + .body(body) + .send() + .await?; + + if !resp.status().is_success() { + error!( + "Failed to delete files from S3 bucket: {}", + resp.text().await? + ); + return Err(Error::BadS3Response( + "Failed to delete media files from S3 bucket", + )); + } + + let parsed = DeleteObjectsResponse::parse(resp.text().await?).map_err(|e| { + warn!("Cannot parse S3 response: {}", e); + Error::BadS3Response("Cannot parse S3 response") + })?; + + let result = parsed + .errors + .into_iter() + .map(|v| Error::CannotDeleteS3File(v.message)) + .collect(); + + Ok(result) +} + /// Creates a content disposition with the given `filename`, using the `content_type` to determine whether /// the disposition should be `inline` or `attachment` fn content_disposition( diff --git a/src/utils/error.rs b/src/utils/error.rs index 1b1a26db..00fbc0a6 100644 --- a/src/utils/error.rs +++ b/src/utils/error.rs @@ -52,6 +52,10 @@ pub enum Error { source: std::io::Error, }, #[error("{0}")] + BadS3Response(&'static str), + #[error("Could not delete S3 file: {0}")] + CannotDeleteS3File(String), // This is only needed when an S3 deletion fails + #[error("{0}")] BadServerResponse(&'static str), #[error("{0}")] BadConfig(&'static str),