From 1124334e5f2ff0cc239d592ddf4706236b043441 Mon Sep 17 00:00:00 2001 From: Samuel Meenzen Date: Sat, 31 May 2025 03:00:03 +0200 Subject: [PATCH] feat: janky s3 implementation --- Cargo.lock | 40 +++++++ Cargo.toml | 1 + src/config/mod.rs | 56 +++++++++ src/database/mod.rs | 1 + src/service/admin/mod.rs | 230 ++++++++++++++++++++++--------------- src/service/globals/mod.rs | 19 ++- src/service/media/mod.rs | 153 +++++++++++++++++++----- 7 files changed, 375 insertions(+), 125 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 353e882a..998fcc80 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -500,6 +500,7 @@ dependencies = [ "rusqlite", "rust-argon2", "rust-rocksdb", + "rusty-s3", "sd-notify", "serde", "serde_html_form", @@ -1732,6 +1733,16 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" +[[package]] +name = "md-5" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" +dependencies = [ + "cfg-if", + "digest", +] + [[package]] name = "memchr" version = "2.7.4" @@ -2244,6 +2255,16 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a993555f31e5a609f617c12db6250dedcac1b0a85076912c436e6fc9b2c8e6a3" +[[package]] +name = "quick-xml" +version = "0.37.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "331e97a1af0bf59823e6eadffe373d7b27f485be8748f71471c662c1f269b7fb" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "quote" version = "1.0.40" @@ -2803,6 +2824,25 @@ version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "955d28af4278de8121b7ebeb796b6a45735dc01436d898801014aced2773a3d6" +[[package]] +name = "rusty-s3" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f51a5a6b15f25d3e10c068039ee13befb6110fcb36c2b26317bcbdc23484d96" +dependencies = [ + "base64 0.22.1", + "hmac", + "md-5", + "percent-encoding", + "quick-xml", + "serde", + "serde_json", + "sha2", + "time", + "url", + "zeroize", +] + [[package]] name = "ryu" version = "1.0.20" diff --git a/Cargo.toml b/Cargo.toml index cad4bf15..95a4bdde 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -151,6 +151,7 @@ tikv-jemallocator = { version = "0.6", features = [ "unprefixed_malloc_on_supported_platforms", ], optional = true } +rusty-s3 = "0.7.0" sd-notify = { version = "0.4", optional = true } # Used for matrix spec type definitions and helpers diff --git a/src/config/mod.rs b/src/config/mod.rs index 95f1c76e..847ff005 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -242,6 +242,41 @@ impl From for Config { }), directory_structure, }, + IncompleteMediaBackendConfig::S3 { + directory_structure, + endpoint, + bucket, + region, + key, + secret, + path_prefix, + path_style, + expiration, + } => { + let path_style = if path_style { + rusty_s3::UrlStyle::Path + } else { + rusty_s3::UrlStyle::VirtualHost + }; + + let bucket = rusty_s3::Bucket::new( + endpoint.clone(), + path_style, + bucket.clone(), + region.clone(), + ) + .expect("S3 config is valid"); + + let credentials = rusty_s3::Credentials::new(key, secret); + + MediaBackendConfig::S3 { + directory_structure, + bucket, + credentials, + duration: expiration.unwrap_or_else(|| Duration::from_secs(30)), + path_prefix: path_prefix.unwrap_or("/".to_owned()), + } + } }, retention: media.retention.into(), }; @@ -481,6 +516,20 @@ pub enum IncompleteMediaBackendConfig { #[serde(default)] directory_structure: DirectoryStructure, }, + S3 { + #[serde(default)] + directory_structure: DirectoryStructure, + endpoint: Url, + bucket: String, + region: String, + key: String, + secret: String, + path_prefix: Option, + #[serde(default = "false_fn")] + path_style: bool, + #[serde(default, with = "humantime_serde::option")] + expiration: Option, + }, } impl Default for IncompleteMediaBackendConfig { @@ -498,6 +547,13 @@ pub enum MediaBackendConfig { path: String, directory_structure: DirectoryStructure, }, + S3 { + directory_structure: DirectoryStructure, + bucket: rusty_s3::Bucket, + credentials: rusty_s3::Credentials, + duration: Duration, + path_prefix: String, + }, } #[derive(Debug, Clone, Deserialize)] diff --git a/src/database/mod.rs b/src/database/mod.rs index 30d4231a..a17f001a 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -1079,6 +1079,7 @@ impl KeyValueDatabase { depth: *depth, }, file.file_name().to_str().unwrap(), + true, )?, ) .await?; diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index c88dc044..4e20c663 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -586,7 +586,8 @@ impl Service { RoomMessageEventContent::text_plain( "Expected code block in command body. Add --help for details.", ) - }.into() + } + .into() } AdminCommand::UnregisterAppservice { appservice_identifier, @@ -599,7 +600,8 @@ impl Service { Err(e) => RoomMessageEventContent::text_plain(format!( "Failed to unregister appservice: {e}" )), - }.into(), + } + .into(), AdminCommand::ListAppservices => { let appservices = services().appservice.iter_ids().await; let output = format!( @@ -637,7 +639,8 @@ impl Service { RoomMessageEventContent::text_plain(&msg) } Err(e) => RoomMessageEventContent::text_plain(e.to_string()), - }.into(), + } + .into(), AdminCommand::IncomingFederation => { let map = services().globals.roomid_federationhandletime.read().await; let mut msg: String = format!("Handling {} incoming pdus:\n", map.len()); @@ -678,7 +681,8 @@ impl Service { )) } else { RoomMessageEventContent::text_plain("Event not found.") - }.into() + } + .into() } AdminCommand::ParsePdu => { if body.len() > 2 && body[0].trim() == "```" && body.last().unwrap().trim() == "```" @@ -712,7 +716,8 @@ impl Service { } } else { RoomMessageEventContent::text_plain("Expected code block in command body.") - }.into() + } + .into() } AdminCommand::GetPdu { event_id } => { let mut outlier = false; @@ -750,7 +755,8 @@ impl Service { ) } None => RoomMessageEventContent::text_plain("PDU not found."), - }.into() + } + .into() } AdminCommand::MemoryUsage => { let response1 = services().memory_usage().await; @@ -758,7 +764,8 @@ impl Service { RoomMessageEventContent::text_plain(format!( "Services:\n{response1}\n\nDatabase:\n{response2}" - )).into() + )) + .into() } AdminCommand::ClearDatabaseCaches { amount } => { services().globals.db.clear_caches(amount); @@ -783,7 +790,8 @@ impl Service { Err(e) => { return Ok(RoomMessageEventContent::text_plain(format!( "The supplied username is not a valid username: {e}" - )).into()) + )) + .into()) } }; @@ -791,7 +799,8 @@ impl Service { if user_id.server_name() != services().globals.server_name() { return Ok(RoomMessageEventContent::text_plain( "The specified user is not from this server!", - ).into()); + ) + .into()); }; // Check if the specified user is valid @@ -805,7 +814,8 @@ impl Service { { return Ok(RoomMessageEventContent::text_plain( "The specified user does not exist!", - ).into()); + ) + .into()); } let new_password = utils::random_string(AUTO_GEN_PASSWORD_LENGTH); @@ -820,7 +830,8 @@ impl Service { Err(e) => RoomMessageEventContent::text_plain(format!( "Couldn't reset the password for user {user_id}: {e}" )), - }.into() + } + .into() } AdminCommand::CreateUser { username, password } => { let password = @@ -834,7 +845,8 @@ impl Service { Err(e) => { return Ok(RoomMessageEventContent::text_plain(format!( "The supplied username is not a valid username: {e}" - )).into()) + )) + .into()) } }; @@ -842,18 +854,21 @@ impl Service { if user_id.server_name() != services().globals.server_name() { return Ok(RoomMessageEventContent::text_plain( "The specified user is not from this server!", - ).into()); + ) + .into()); }; if user_id.is_historical() { return Ok(RoomMessageEventContent::text_plain(format!( "Userid {user_id} is not allowed due to historical" - )).into()); + )) + .into()); } if services().users.exists(&user_id)? { return Ok(RoomMessageEventContent::text_plain(format!( "Userid {user_id} already exists" - )).into()); + )) + .into()); } // Create user services().users.create(&user_id, Some(password.as_str()))?; @@ -890,26 +905,26 @@ impl Service { // Inhibit login does not work for guests RoomMessageEventContent::text_plain(format!( "Created user with user_id: {user_id} and password: {password}" - )).into() + )) + .into() } - AdminCommand::AllowRegistration { status } => { - if let Some(status) = status { - services().globals.set_registration(status).await; - RoomMessageEventContent::text_plain(if status { - "Registration is now enabled" - } else { - "Registration is now disabled" - }) + AdminCommand::AllowRegistration { status } => if let Some(status) = status { + services().globals.set_registration(status).await; + RoomMessageEventContent::text_plain(if status { + "Registration is now enabled" } else { - RoomMessageEventContent::text_plain( - if services().globals.allow_registration().await { - "Registration is currently enabled" - } else { - "Registration is currently disabled" - }, - ) - }.into() + "Registration is now disabled" + }) + } else { + RoomMessageEventContent::text_plain( + if services().globals.allow_registration().await { + "Registration is currently enabled" + } else { + "Registration is currently disabled" + }, + ) } + .into(), AdminCommand::DisableRoom { room_id } => { services().rooms.metadata.disable_room(&room_id, true)?; RoomMessageEventContent::text_plain("Room disabled.").into() @@ -952,6 +967,7 @@ impl Service { services() .media .purge_from_user(&user_id, purge_media.force_filehash, after) + .await .len() } else { 0 @@ -1020,13 +1036,17 @@ impl Service { failed_count += services() .media .purge_from_user(user_id, purge_media.force_filehash, after) + .await .len(); } } let mut message = format!("Deactivated {deactivation_count} accounts."); if !admins.is_empty() { - message.push_str(&format!("\nSkipped admin accounts: {:?}. Use --force to deactivate admin accounts",admins.join(", "))); + message.push_str(&format!( + "\nSkipped admin accounts: {:?}. Use --force to deactivate admin accounts", + admins.join(", ") + )); } if failed_count != 0 { message.push_str(&format!( @@ -1039,14 +1059,19 @@ impl Service { RoomMessageEventContent::text_plain( "Expected code block in command body. Add --help for details.", ) - }.into() + } + .into() } AdminCommand::QueryMedia { mxc } => { let Ok((server_name, media_id)) = mxc.parts() else { return Ok(RoomMessageEventContent::text_plain("Invalid media MXC").into()); }; - let MediaQuery{ is_blocked, source_file, thumbnails } = services().media.query(server_name, media_id)?; + let MediaQuery { + is_blocked, + source_file, + thumbnails, + } = services().media.query(server_name, media_id)?; let mut message = format!("Is blocked Media ID: {is_blocked}"); if let Some(MediaQueryFileInfo { @@ -1057,14 +1082,16 @@ impl Service { unauthenticated_access_permitted, is_blocked_via_filehash, file_info: time_info, - }) = source_file { + }) = source_file + { message.push_str("\n\nInformation on full (non-thumbnail) file:\n"); if let Some(FileInfo { creation, last_access, size, - }) = time_info { + }) = time_info + { message.push_str(&format!("\nIs stored: true\nCreated at: {}\nLast accessed at: {}\nSize of file: {}", DateTime::from_timestamp(creation.try_into().unwrap_or(i64::MAX),0).expect("Timestamp is within range"), DateTime::from_timestamp(last_access.try_into().unwrap_or(i64::MAX),0).expect("Timestamp is within range"), @@ -1093,7 +1120,7 @@ impl Service { message.push_str("\n\nInformation on thumbnails of media:"); } - for MediaQueryThumbInfo{ + for MediaQueryThumbInfo { width, height, sha256_hex, @@ -1102,13 +1129,15 @@ impl Service { unauthenticated_access_permitted, is_blocked_via_filehash, file_info: time_info, - } in thumbnails { + } in thumbnails + { message.push_str(&format!("\n\nDimensions: {width}x{height}")); if let Some(FileInfo { creation, last_access, size, - }) = time_info { + }) = time_info + { message.push_str(&format!("\nIs stored: true\nCreated at: {}\nLast accessed at: {}\nSize of file: {}", DateTime::from_timestamp(creation.try_into().unwrap_or(i64::MAX),0).expect("Timestamp is within range"), DateTime::from_timestamp(last_access.try_into().unwrap_or(i64::MAX),0).expect("Timestamp is within range"), @@ -1142,7 +1171,8 @@ impl Service { file, content_type, content_disposition, - } = client_server::media::get_content(server_name, media_id.to_owned(), true, true).await?; + } = client_server::media::get_content(server_name, media_id.to_owned(), true, true) + .await?; if let Ok(image) = image::load_from_memory(&file) { let filename = content_disposition.and_then(|cd| cd.filename); @@ -1182,10 +1212,7 @@ impl Service { } } AdminCommand::ListMedia { - user_server_filter: ListMediaArgs { - user, - server, - }, + user_server_filter: ListMediaArgs { user, server }, include_thumbnails, content_type, uploaded_before, @@ -1198,7 +1225,7 @@ impl Service { r#""#, ); - for MediaListItem{ + for MediaListItem { server_name, media_id, uploader_localpart, @@ -1208,9 +1235,8 @@ impl Service { size, creation, } in services().media.list( - user - .map(ServerNameOrUserId::UserId) - .or_else(|| server.map(ServerNameOrUserId::ServerName)), + user.map(ServerNameOrUserId::UserId) + .or_else(|| server.map(ServerNameOrUserId::ServerName)), include_thumbnails, content_type.as_deref(), uploaded_before @@ -1224,15 +1250,20 @@ impl Service { .transpose() .map_err(|_| Error::AdminCommand("Timestamp must be after unix epoch"))? .as_ref() - .map(Duration::as_secs) + .map(Duration::as_secs), )? { - - let user_id = uploader_localpart.map(|localpart| format!("@{localpart}:{server_name}")).unwrap_or_default(); + let user_id = uploader_localpart + .map(|localpart| format!("@{localpart}:{server_name}")) + .unwrap_or_default(); let content_type = content_type.unwrap_or_default(); let filename = filename.unwrap_or_default(); - let dimensions = dimensions.map(|(w, h)| format!("{w}x{h}")).unwrap_or_default(); + let dimensions = dimensions + .map(|(w, h)| format!("{w}x{h}")) + .unwrap_or_default(); let size = ByteSize::b(size).display().si(); - let creation = DateTime::from_timestamp(creation.try_into().unwrap_or(i64::MAX),0).expect("Timestamp is within range"); + let creation = + DateTime::from_timestamp(creation.try_into().unwrap_or(i64::MAX), 0) + .expect("Timestamp is within range"); markdown_message .push_str(&format!("\n| mxc://{server_name}/{media_id} | {dimensions} | {creation} | {user_id} | {content_type} | {filename} | {size} |")); @@ -1245,11 +1276,10 @@ impl Service { html_message.push_str("
MXC URIDimensions (if thumbnail)Created/Downloaded atUploaderContent-TypeFilenameSize
"); RoomMessageEventContent::text_html(markdown_message, html_message).into() - }, - AdminCommand::PurgeMedia => media_from_body(body).map_or_else( - |message| message, - |media| { - let failed_count = services().media.purge(&media, true).len(); + } + AdminCommand::PurgeMedia => match media_from_body(body) { + Ok(media) => { + let failed_count = services().media.purge(&media, true).await.len(); if failed_count == 0 { RoomMessageEventContent::text_plain("Successfully purged media") @@ -1257,9 +1287,11 @@ impl Service { RoomMessageEventContent::text_plain(format!( "Failed to delete {failed_count} media, check logs for more details" )) - }.into() - }, - ), + } + .into() + } + Err(message) => message, + }, AdminCommand::PurgeMediaFromUsers { from_last, force_filehash, @@ -1279,6 +1311,7 @@ impl Service { failed_count += services() .media .purge_from_user(user_id, force_filehash, after) + .await .len(); } @@ -1293,7 +1326,8 @@ impl Service { RoomMessageEventContent::text_plain( "Expected code block in command body. Add --help for details.", ) - }.into() + } + .into() } AdminCommand::PurgeMediaFromServer { server_id: server_name, @@ -1311,6 +1345,7 @@ impl Service { let failed_count = services() .media .purge_from_server(&server_name, force_filehash, after) + .await .len(); if failed_count == 0 { @@ -1321,32 +1356,34 @@ impl Service { RoomMessageEventContent::text_plain(format!( "Failed to purge {failed_count} media, check logs for more details" )) - }.into() + } + .into() } - AdminCommand::BlockMedia { and_purge, reason } => media_from_body(body).map_or_else( - |message| message, - |media| { + AdminCommand::BlockMedia { and_purge, reason } => match media_from_body(body) { + Err(message) => message, + Ok(media) => { let failed_count = services().media.block(&media, reason).len(); let failed_purge_count = if and_purge { - services().media.purge(&media, true).len() + services().media.purge(&media, true).await.len() } else { 0 }; match (failed_count == 0, failed_purge_count == 0) { - (true, true) => RoomMessageEventContent::text_plain("Successfully blocked media"), - (false, true) => RoomMessageEventContent::text_plain(format!( - "Failed to block {failed_count} media, check logs for more details" - )), - (true, false ) => RoomMessageEventContent::text_plain(format!( - "Failed to purge {failed_purge_count} media, check logs for more details" - )), - (false, false) => RoomMessageEventContent::text_plain(format!( - "Failed to block {failed_count}, and purge {failed_purge_count} media, check logs for more details" - )) - }.into() - }, - ), + (true, true) => RoomMessageEventContent::text_plain("Successfully blocked media"), + (false, true) => RoomMessageEventContent::text_plain(format!( + "Failed to block {failed_count} media, check logs for more details" + )), + (true, false) => RoomMessageEventContent::text_plain(format!( + "Failed to purge {failed_purge_count} media, check logs for more details" + )), + (false, false) => RoomMessageEventContent::text_plain(format!( + "Failed to block {failed_count}, and purge {failed_purge_count} media, check logs for more details" + )), + } + .into() + } + }, AdminCommand::BlockMediaFromUsers { from_last, reason } => { let after = from_last.map(unix_secs_from_duration).transpose()?; @@ -1382,7 +1419,8 @@ impl Service { RoomMessageEventContent::text_plain( "Expected code block in command body. Add --help for details.", ) - }.into() + } + .into() } AdminCommand::ListBlockedMedia => { let mut markdown_message = String::from( @@ -1399,7 +1437,8 @@ impl Service { unix_secs, reason, sha256_hex, - }) = media else { + }) = media + else { continue; }; @@ -1412,8 +1451,9 @@ impl Service { .flatten() .expect("Time is valid"); - markdown_message - .push_str(&format!("\n| {sha256_hex} | mxc://{server_name}/{media_id} | {time} | {reason} |")); + markdown_message.push_str(&format!( + "\n| {sha256_hex} | mxc://{server_name}/{media_id} | {time} | {reason} |" + )); html_message.push_str(&format!( "{sha256_hex}mxc://{server_name}/{media_id}{time}{reason}", @@ -1435,7 +1475,8 @@ impl Service { RoomMessageEventContent::text_plain(format!( "Failed to unblock {failed_count} media, check logs for more details" )) - }.into() + } + .into() }, ), AdminCommand::SignJson => { @@ -1460,7 +1501,8 @@ impl Service { RoomMessageEventContent::text_plain( "Expected code block in command body. Add --help for details.", ) - }.into() + } + .into() } AdminCommand::VerifyJson => { if body.len() > 2 && body[0].trim() == "```" && body.last().unwrap().trim() == "```" @@ -1521,7 +1563,8 @@ impl Service { RoomMessageEventContent::text_plain( "Expected code block in command body. Add --help for details.", ) - }.into() + } + .into() } AdminCommand::HashAndSignEvent { room_version_id } => { if body.len() > 2 @@ -1536,7 +1579,10 @@ impl Service { services().globals.server_name().as_str(), services().globals.keypair(), &mut value, - &room_version_id.rules().expect("Supported room version has rules").redaction, + &room_version_id + .rules() + .expect("Supported room version has rules") + .redaction, ) { RoomMessageEventContent::text_plain(format!("Invalid event: {e}")) } else { @@ -1551,7 +1597,8 @@ impl Service { RoomMessageEventContent::text_plain( "Expected code block in command body. Add --help for details.", ) - }.into() + } + .into() } AdminCommand::RemoveAlias { alias } => { if alias.server_name() != services().globals.server_name() { @@ -1576,7 +1623,8 @@ impl Service { .alias .remove_alias(&alias, services().globals.server_user())?; RoomMessageEventContent::text_plain("Alias removed successfully") - }.into() + } + .into() } }; diff --git a/src/service/globals/mod.rs b/src/service/globals/mod.rs index 05c3eab1..75f3670e 100644 --- a/src/service/globals/mod.rs +++ b/src/service/globals/mod.rs @@ -230,8 +230,6 @@ impl Service { shutdown: AtomicBool::new(false), }; - // Remove this exception once other media backends are added - #[allow(irrefutable_let_patterns)] if let MediaBackendConfig::FileSystem { path, .. } = &s.config.media.backend { fs::create_dir_all(path)?; } @@ -489,6 +487,7 @@ impl Service { media_directory: &str, directory_structure: &DirectoryStructure, sha256_hex: &str, + create: bool, ) -> Result { let mut r = PathBuf::new(); r.push(media_directory); @@ -501,8 +500,10 @@ impl Service { r.push(current_path); } - // Create all directories leading up to file - fs::create_dir_all(&r).inspect_err(|e| error!("Error creating leading directories for media with sha256 hash of {sha256_hex}: {e}"))?; + if create { + // Create all directories leading up to file + fs::create_dir_all(&r).inspect_err(|e| error!("Error creating leading directories for media with sha256 hash of {sha256_hex}: {e}"))?; + } r.push(filename); } else { @@ -512,6 +513,16 @@ impl Service { Ok(r) } + pub fn get_media_path_string( + &self, + media_directory: &str, + directory_structure: &DirectoryStructure, + sha256_hex: &str, + ) -> Result { + let path = self.get_media_path(media_directory, directory_structure, sha256_hex, false)?; + Ok(path.to_str().expect("Media path is valid UTF-8").to_owned()) + } + pub fn shutdown(&self) { self.shutdown.store(true, atomic::Ordering::Relaxed); // On shutdown diff --git a/src/service/media/mod.rs b/src/service/media/mod.rs index 16060dfb..29b9ed2b 100644 --- a/src/service/media/mod.rs +++ b/src/service/media/mod.rs @@ -8,13 +8,14 @@ use ruma::{ OwnedServerName, ServerName, UserId, }; use sha2::{digest::Output, Digest, Sha256}; -use tracing::{error, info}; +use tracing::{error, info, warn}; use crate::{ config::{DirectoryStructure, MediaBackendConfig}, services, utils, Error, Result, }; use image::imageops::FilterType; +use rusty_s3::S3Action; pub struct DbFileMeta { pub sha256_digest: Vec, @@ -142,7 +143,7 @@ impl Service { let count = files.iter().filter(|res| res.is_ok()).count(); info!("Found {count} media files to delete"); - purge_files(files); + purge_files(files).await; Ok(()) } @@ -159,11 +160,14 @@ impl Service { ) -> Result<()> { let (sha256_digest, sha256_hex) = generate_digests(file); - for error in self.clear_required_space( - &sha256_digest, - MediaType::new(servername, false), - size(file)?, - )? { + for error in self + .clear_required_space( + &sha256_digest, + MediaType::new(servername, false), + size(file)?, + ) + .await? + { error!( "Error deleting file to clear space when downloading/creating new media file: {error}" ) @@ -207,7 +211,8 @@ impl Service { &sha256_digest, MediaType::new(servername, true), size(file)?, - )?; + ) + .await?; self.db.create_thumbnail_metadata( sha256_digest, @@ -444,10 +449,14 @@ impl Service { /// purged have that sha256 hash. /// /// Returns errors for all the files that were failed to be deleted, if any. - pub fn purge(&self, media: &[(OwnedServerName, String)], force_filehash: bool) -> Vec { + pub async fn purge( + &self, + media: &[(OwnedServerName, String)], + force_filehash: bool, + ) -> Vec { let hashes = self.db.purge_and_get_hashes(media, force_filehash); - purge_files(hashes) + purge_files(hashes).await } /// Purges all (past a certain time in unix seconds, if specified) media @@ -462,7 +471,7 @@ impl Service { /// /// Note: it only currently works for local users, as we cannot determine who /// exactly uploaded the file when it comes to remove users. - pub fn purge_from_user( + pub async fn purge_from_user( &self, user_id: &UserId, force_filehash: bool, @@ -472,7 +481,7 @@ impl Service { .db .purge_and_get_hashes_from_user(user_id, force_filehash, after); - purge_files(hashes) + purge_files(hashes).await } /// Purges all (past a certain time in unix seconds, if specified) media @@ -484,7 +493,7 @@ impl Service { /// purged have that sha256 hash. /// /// Returns errors for all the files that were failed to be deleted, if any. - pub fn purge_from_server( + pub async fn purge_from_server( &self, server_name: &ServerName, force_filehash: bool, @@ -494,7 +503,7 @@ impl Service { .db .purge_and_get_hashes_from_server(server_name, force_filehash, after); - purge_files(hashes) + purge_files(hashes).await } /// Checks whether the media has been blocked by administrators, returning either @@ -558,7 +567,7 @@ impl Service { self.db.list_blocked() } - pub fn clear_required_space( + pub async fn clear_required_space( &self, sha256_digest: &[u8], media_type: MediaType, @@ -577,7 +586,7 @@ impl Service { info!("Deleting {} files to clear space for new media file", count); } - Ok(purge_files(files)) + Ok(purge_files(files).await) } /// Fetches the file from the configured media backend, as well as updating the "last accessed" @@ -599,6 +608,7 @@ impl Service { path, directory_structure, &hex::encode(sha256_digest), + true, )?; let mut file = Vec::new(); @@ -606,6 +616,28 @@ impl Service { file } + MediaBackendConfig::S3 { + directory_structure, + bucket, + credentials, + duration, + path_prefix, + } => { + let path = services().globals.get_media_path_string( + path_prefix, + directory_structure, + &hex::encode(sha256_digest), + )?; + + let url = bucket.get_object(Some(credentials), &path).sign(*duration); + let response = services().globals.default_client().get(url).send().await?; + if !response.status().is_success() { + warn!("S3 request error:\n{}", response.text().await?); + return Err(Error::bad_database("Cannot read media file")); + } + + response.bytes().await?.to_vec() + } }; if let Some((server_name, media_id)) = original_file_id { @@ -627,13 +659,42 @@ pub async fn create_file(sha256_hex: &str, file: &[u8]) -> Result<()> { path, directory_structure, } => { - let path = services() - .globals - .get_media_path(path, directory_structure, sha256_hex)?; + let path = + services() + .globals + .get_media_path(path, directory_structure, sha256_hex, true)?; let mut f = File::create(path).await?; f.write_all(file).await?; } + MediaBackendConfig::S3 { + directory_structure, + bucket, + credentials, + duration, + path_prefix, + } => { + let path = services().globals.get_media_path_string( + path_prefix, + directory_structure, + sha256_hex, + )?; + + let url = bucket.put_object(Some(credentials), &path).sign(*duration); + let resp = services() + .globals + .default_client() + .put(url) + // todo: to_owned is probably awful for performance + .body(file.to_owned()) + .send() + .await?; + + if !resp.status().is_success() { + warn!("S3 request error:\n{}", resp.text().await?); + return Err(Error::bad_database("Cannot create media file")); + } + } } Ok(()) @@ -643,21 +704,25 @@ pub async fn create_file(sha256_hex: &str, file: &[u8]) -> Result<()> { /// Returns a `Vec` of errors that occurred when attempting to delete the files /// /// Note: this does NOT remove the related metadata from the database -fn purge_files(hashes: Vec>) -> Vec { - hashes - .into_iter() - .map(|hash| match hash { - Ok(v) => delete_file(&v), - Err(e) => Err(e), - }) - .filter_map(|r| if let Err(e) = r { Some(e) } else { None }) - .collect() +async fn purge_files(hashes: Vec>) -> Vec { + let mut errors = Vec::new(); + for hash in hashes { + match hash { + Ok(v) => { + if let Err(e) = delete_file(&v).await { + errors.push(e); + } + } + Err(e) => errors.push(e), + } + } + errors } /// Deletes the given file from the media backend /// /// Note: this does NOT remove the related metadata from the database -fn delete_file(sha256_hex: &str) -> Result<()> { +async fn delete_file(sha256_hex: &str) -> Result<()> { match &services().globals.config.media.backend { MediaBackendConfig::FileSystem { path, @@ -666,7 +731,7 @@ fn delete_file(sha256_hex: &str) -> Result<()> { let mut path = services() .globals - .get_media_path(path, directory_structure, sha256_hex)?; + .get_media_path(path, directory_structure, sha256_hex, true)?; if let Err(e) = fs::remove_file(&path) { // Multiple files with the same filehash might be requseted to be deleted @@ -696,6 +761,34 @@ fn delete_file(sha256_hex: &str) -> Result<()> { } } } + MediaBackendConfig::S3 { + directory_structure, + bucket, + credentials, + duration, + path_prefix, + } => { + let path = services().globals.get_media_path_string( + path_prefix, + directory_structure, + sha256_hex, + )?; + let url = bucket + .delete_object(Some(credentials), &path) + .sign(*duration); + + let resp = services() + .globals + .default_client() + .delete(url) + .send() + .await?; + + if !resp.status().is_success() { + warn!("S3 request error:\n{}", resp.text().await?); + return Err(Error::bad_database("Cannot delete media file")); + } + } } Ok(())