diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index e7f169fb..afb3cef6 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -9,6 +9,7 @@ use std::{ use bytesize::ByteSize; use chrono::DateTime; use clap::{Args, Parser}; +use futures_util::future::Either; use image::GenericImageView; use regex::Regex; use ruma::{ @@ -955,6 +956,7 @@ impl Service { services() .media .purge_from_user(&user_id, purge_media.force_filehash, after) + .await .len() } else { 0 @@ -1023,6 +1025,7 @@ impl Service { failed_count += services() .media .purge_from_user(user_id, purge_media.force_filehash, after) + .await .len(); } } @@ -1250,9 +1253,9 @@ impl Service { RoomMessageEventContent::text_html(markdown_message, html_message).into() }, AdminCommand::PurgeMedia => media_from_body(body).map_or_else( - |message| message, - |media| { - let failed_count = services().media.purge(&media, true).len(); + |message| Either::Left(async move { message }), + |media| Either::Right(async move { + let failed_count = services().media.purge(&media, true).await.len(); if failed_count == 0 { RoomMessageEventContent::text_plain("Successfully purged media") @@ -1261,8 +1264,8 @@ impl Service { "Failed to delete {failed_count} media, check logs for more details" )) }.into() - }, - ), + }), + ).await, AdminCommand::PurgeMediaFromUsers { from_last, force_filehash, @@ -1282,6 +1285,7 @@ impl Service { failed_count += services() .media .purge_from_user(user_id, force_filehash, after) + .await .len(); } @@ -1314,6 +1318,7 @@ impl Service { let failed_count = services() .media .purge_from_server(&server_name, force_filehash, after) + .await .len(); if failed_count == 0 { @@ -1327,11 +1332,11 @@ impl Service { }.into() } AdminCommand::BlockMedia { and_purge, reason } => media_from_body(body).map_or_else( - |message| message, - |media| { + |message| Either::Left(async move { message }), + |media| Either::Right(async move { let failed_count = services().media.block(&media, reason).len(); let failed_purge_count = if and_purge { - services().media.purge(&media, true).len() + services().media.purge(&media, true).await.len() } else { 0 }; @@ -1348,8 +1353,8 @@ impl Service { "Failed to block {failed_count}, and purge {failed_purge_count} media, check logs for more details" )) }.into() - }, - ), + }), + ).await, AdminCommand::BlockMediaFromUsers { from_last, reason } => { let after = from_last.map(unix_secs_from_duration).transpose()?; diff --git a/src/service/media/mod.rs b/src/service/media/mod.rs index 16060dfb..81eb30df 100644 --- a/src/service/media/mod.rs +++ b/src/service/media/mod.rs @@ -2,6 +2,7 @@ mod data; use std::{fs, io::Cursor, sync::Arc}; pub use data::Data; +use futures_util::{stream, StreamExt}; use ruma::{ api::client::{error::ErrorKind, media::is_safe_inline_content_type}, http_headers::{ContentDisposition, ContentDispositionType}, @@ -142,7 +143,7 @@ impl Service { let count = files.iter().filter(|res| res.is_ok()).count(); info!("Found {count} media files to delete"); - purge_files(files); + purge_files(files).await; Ok(()) } @@ -159,11 +160,14 @@ impl Service { ) -> Result<()> { let (sha256_digest, sha256_hex) = generate_digests(file); - for error in self.clear_required_space( - &sha256_digest, - MediaType::new(servername, false), - size(file)?, - )? { + for error in self + .clear_required_space( + &sha256_digest, + MediaType::new(servername, false), + size(file)?, + ) + .await? + { error!( "Error deleting file to clear space when downloading/creating new media file: {error}" ) @@ -207,7 +211,8 @@ impl Service { &sha256_digest, MediaType::new(servername, true), size(file)?, - )?; + ) + .await?; self.db.create_thumbnail_metadata( sha256_digest, @@ -444,10 +449,14 @@ impl Service { /// purged have that sha256 hash. /// /// Returns errors for all the files that were failed to be deleted, if any. - pub fn purge(&self, media: &[(OwnedServerName, String)], force_filehash: bool) -> Vec { + pub async fn purge( + &self, + media: &[(OwnedServerName, String)], + force_filehash: bool, + ) -> Vec { let hashes = self.db.purge_and_get_hashes(media, force_filehash); - purge_files(hashes) + purge_files(hashes).await } /// Purges all (past a certain time in unix seconds, if specified) media @@ -462,7 +471,7 @@ impl Service { /// /// Note: it only currently works for local users, as we cannot determine who /// exactly uploaded the file when it comes to remove users. - pub fn purge_from_user( + pub async fn purge_from_user( &self, user_id: &UserId, force_filehash: bool, @@ -472,7 +481,7 @@ impl Service { .db .purge_and_get_hashes_from_user(user_id, force_filehash, after); - purge_files(hashes) + purge_files(hashes).await } /// Purges all (past a certain time in unix seconds, if specified) media @@ -484,7 +493,7 @@ impl Service { /// purged have that sha256 hash. /// /// Returns errors for all the files that were failed to be deleted, if any. - pub fn purge_from_server( + pub async fn purge_from_server( &self, server_name: &ServerName, force_filehash: bool, @@ -494,7 +503,7 @@ impl Service { .db .purge_and_get_hashes_from_server(server_name, force_filehash, after); - purge_files(hashes) + purge_files(hashes).await } /// Checks whether the media has been blocked by administrators, returning either @@ -558,7 +567,7 @@ impl Service { self.db.list_blocked() } - pub fn clear_required_space( + pub async fn clear_required_space( &self, sha256_digest: &[u8], media_type: MediaType, @@ -577,7 +586,7 @@ impl Service { info!("Deleting {} files to clear space for new media file", count); } - Ok(purge_files(files)) + Ok(purge_files(files).await) } /// Fetches the file from the configured media backend, as well as updating the "last accessed" @@ -643,21 +652,29 @@ pub async fn create_file(sha256_hex: &str, file: &[u8]) -> Result<()> { /// Returns a `Vec` of errors that occurred when attempting to delete the files /// /// Note: this does NOT remove the related metadata from the database -fn purge_files(hashes: Vec>) -> Vec { - hashes - .into_iter() - .map(|hash| match hash { - Ok(v) => delete_file(&v), - Err(e) => Err(e), +async fn purge_files(hashes: Vec>) -> Vec { + stream::iter(hashes) + .then(|hash| async move { + match hash { + Ok(v) => delete_file(&v).await, + Err(e) => Err(e), + } + }) + .filter_map(|r| async { + if let Err(e) = r { + Some(e) + } else { + None + } }) - .filter_map(|r| if let Err(e) = r { Some(e) } else { None }) .collect() + .await } /// Deletes the given file from the media backend /// /// Note: this does NOT remove the related metadata from the database -fn delete_file(sha256_hex: &str) -> Result<()> { +async fn delete_file(sha256_hex: &str) -> Result<()> { match &services().globals.config.media.backend { MediaBackendConfig::FileSystem { path,