1
0
Fork 0
mirror of https://gitlab.com/famedly/conduit.git synced 2025-06-27 16:35:59 +00:00

refactor: media.delete_file is async

This commit is contained in:
AndSDev 2025-06-04 16:15:42 +03:00
parent a1886a1396
commit 30d578a81a
2 changed files with 55 additions and 33 deletions

View file

@ -9,6 +9,7 @@ use std::{
use bytesize::ByteSize; use bytesize::ByteSize;
use chrono::DateTime; use chrono::DateTime;
use clap::{Args, Parser}; use clap::{Args, Parser};
use futures_util::future::Either;
use image::GenericImageView; use image::GenericImageView;
use regex::Regex; use regex::Regex;
use ruma::{ use ruma::{
@ -955,6 +956,7 @@ impl Service {
services() services()
.media .media
.purge_from_user(&user_id, purge_media.force_filehash, after) .purge_from_user(&user_id, purge_media.force_filehash, after)
.await
.len() .len()
} else { } else {
0 0
@ -1023,6 +1025,7 @@ impl Service {
failed_count += services() failed_count += services()
.media .media
.purge_from_user(user_id, purge_media.force_filehash, after) .purge_from_user(user_id, purge_media.force_filehash, after)
.await
.len(); .len();
} }
} }
@ -1250,9 +1253,9 @@ impl Service {
RoomMessageEventContent::text_html(markdown_message, html_message).into() RoomMessageEventContent::text_html(markdown_message, html_message).into()
}, },
AdminCommand::PurgeMedia => media_from_body(body).map_or_else( AdminCommand::PurgeMedia => media_from_body(body).map_or_else(
|message| message, |message| Either::Left(async move { message }),
|media| { |media| Either::Right(async move {
let failed_count = services().media.purge(&media, true).len(); let failed_count = services().media.purge(&media, true).await.len();
if failed_count == 0 { if failed_count == 0 {
RoomMessageEventContent::text_plain("Successfully purged media") RoomMessageEventContent::text_plain("Successfully purged media")
@ -1261,8 +1264,8 @@ impl Service {
"Failed to delete {failed_count} media, check logs for more details" "Failed to delete {failed_count} media, check logs for more details"
)) ))
}.into() }.into()
}, }),
), ).await,
AdminCommand::PurgeMediaFromUsers { AdminCommand::PurgeMediaFromUsers {
from_last, from_last,
force_filehash, force_filehash,
@ -1282,6 +1285,7 @@ impl Service {
failed_count += services() failed_count += services()
.media .media
.purge_from_user(user_id, force_filehash, after) .purge_from_user(user_id, force_filehash, after)
.await
.len(); .len();
} }
@ -1314,6 +1318,7 @@ impl Service {
let failed_count = services() let failed_count = services()
.media .media
.purge_from_server(&server_name, force_filehash, after) .purge_from_server(&server_name, force_filehash, after)
.await
.len(); .len();
if failed_count == 0 { if failed_count == 0 {
@ -1327,11 +1332,11 @@ impl Service {
}.into() }.into()
} }
AdminCommand::BlockMedia { and_purge, reason } => media_from_body(body).map_or_else( AdminCommand::BlockMedia { and_purge, reason } => media_from_body(body).map_or_else(
|message| message, |message| Either::Left(async move { message }),
|media| { |media| Either::Right(async move {
let failed_count = services().media.block(&media, reason).len(); let failed_count = services().media.block(&media, reason).len();
let failed_purge_count = if and_purge { let failed_purge_count = if and_purge {
services().media.purge(&media, true).len() services().media.purge(&media, true).await.len()
} else { } else {
0 0
}; };
@ -1348,8 +1353,8 @@ impl Service {
"Failed to block {failed_count}, and purge {failed_purge_count} media, check logs for more details" "Failed to block {failed_count}, and purge {failed_purge_count} media, check logs for more details"
)) ))
}.into() }.into()
}, }),
), ).await,
AdminCommand::BlockMediaFromUsers { from_last, reason } => { AdminCommand::BlockMediaFromUsers { from_last, reason } => {
let after = from_last.map(unix_secs_from_duration).transpose()?; let after = from_last.map(unix_secs_from_duration).transpose()?;

View file

@ -2,6 +2,7 @@ mod data;
use std::{fs, io::Cursor, sync::Arc}; use std::{fs, io::Cursor, sync::Arc};
pub use data::Data; pub use data::Data;
use futures_util::{stream, StreamExt};
use ruma::{ use ruma::{
api::client::{error::ErrorKind, media::is_safe_inline_content_type}, api::client::{error::ErrorKind, media::is_safe_inline_content_type},
http_headers::{ContentDisposition, ContentDispositionType}, http_headers::{ContentDisposition, ContentDispositionType},
@ -142,7 +143,7 @@ impl Service {
let count = files.iter().filter(|res| res.is_ok()).count(); let count = files.iter().filter(|res| res.is_ok()).count();
info!("Found {count} media files to delete"); info!("Found {count} media files to delete");
purge_files(files); purge_files(files).await;
Ok(()) Ok(())
} }
@ -159,11 +160,14 @@ impl Service {
) -> Result<()> { ) -> Result<()> {
let (sha256_digest, sha256_hex) = generate_digests(file); let (sha256_digest, sha256_hex) = generate_digests(file);
for error in self.clear_required_space( for error in self
.clear_required_space(
&sha256_digest, &sha256_digest,
MediaType::new(servername, false), MediaType::new(servername, false),
size(file)?, size(file)?,
)? { )
.await?
{
error!( error!(
"Error deleting file to clear space when downloading/creating new media file: {error}" "Error deleting file to clear space when downloading/creating new media file: {error}"
) )
@ -207,7 +211,8 @@ impl Service {
&sha256_digest, &sha256_digest,
MediaType::new(servername, true), MediaType::new(servername, true),
size(file)?, size(file)?,
)?; )
.await?;
self.db.create_thumbnail_metadata( self.db.create_thumbnail_metadata(
sha256_digest, sha256_digest,
@ -444,10 +449,14 @@ impl Service {
/// purged have that sha256 hash. /// purged have that sha256 hash.
/// ///
/// Returns errors for all the files that were failed to be deleted, if any. /// Returns errors for all the files that were failed to be deleted, if any.
pub fn purge(&self, media: &[(OwnedServerName, String)], force_filehash: bool) -> Vec<Error> { pub async fn purge(
&self,
media: &[(OwnedServerName, String)],
force_filehash: bool,
) -> Vec<Error> {
let hashes = self.db.purge_and_get_hashes(media, force_filehash); let hashes = self.db.purge_and_get_hashes(media, force_filehash);
purge_files(hashes) purge_files(hashes).await
} }
/// Purges all (past a certain time in unix seconds, if specified) media /// Purges all (past a certain time in unix seconds, if specified) media
@ -462,7 +471,7 @@ impl Service {
/// ///
/// Note: it only currently works for local users, as we cannot determine who /// Note: it only currently works for local users, as we cannot determine who
/// exactly uploaded the file when it comes to remove users. /// exactly uploaded the file when it comes to remove users.
pub fn purge_from_user( pub async fn purge_from_user(
&self, &self,
user_id: &UserId, user_id: &UserId,
force_filehash: bool, force_filehash: bool,
@ -472,7 +481,7 @@ impl Service {
.db .db
.purge_and_get_hashes_from_user(user_id, force_filehash, after); .purge_and_get_hashes_from_user(user_id, force_filehash, after);
purge_files(hashes) purge_files(hashes).await
} }
/// Purges all (past a certain time in unix seconds, if specified) media /// Purges all (past a certain time in unix seconds, if specified) media
@ -484,7 +493,7 @@ impl Service {
/// purged have that sha256 hash. /// purged have that sha256 hash.
/// ///
/// Returns errors for all the files that were failed to be deleted, if any. /// Returns errors for all the files that were failed to be deleted, if any.
pub fn purge_from_server( pub async fn purge_from_server(
&self, &self,
server_name: &ServerName, server_name: &ServerName,
force_filehash: bool, force_filehash: bool,
@ -494,7 +503,7 @@ impl Service {
.db .db
.purge_and_get_hashes_from_server(server_name, force_filehash, after); .purge_and_get_hashes_from_server(server_name, force_filehash, after);
purge_files(hashes) purge_files(hashes).await
} }
/// Checks whether the media has been blocked by administrators, returning either /// Checks whether the media has been blocked by administrators, returning either
@ -558,7 +567,7 @@ impl Service {
self.db.list_blocked() self.db.list_blocked()
} }
pub fn clear_required_space( pub async fn clear_required_space(
&self, &self,
sha256_digest: &[u8], sha256_digest: &[u8],
media_type: MediaType, media_type: MediaType,
@ -577,7 +586,7 @@ impl Service {
info!("Deleting {} files to clear space for new media file", count); info!("Deleting {} files to clear space for new media file", count);
} }
Ok(purge_files(files)) Ok(purge_files(files).await)
} }
/// Fetches the file from the configured media backend, as well as updating the "last accessed" /// Fetches the file from the configured media backend, as well as updating the "last accessed"
@ -643,21 +652,29 @@ pub async fn create_file(sha256_hex: &str, file: &[u8]) -> Result<()> {
/// Returns a `Vec` of errors that occurred when attempting to delete the files /// Returns a `Vec` of errors that occurred when attempting to delete the files
/// ///
/// Note: this does NOT remove the related metadata from the database /// Note: this does NOT remove the related metadata from the database
fn purge_files(hashes: Vec<Result<String>>) -> Vec<Error> { async fn purge_files(hashes: Vec<Result<String>>) -> Vec<Error> {
hashes stream::iter(hashes)
.into_iter() .then(|hash| async move {
.map(|hash| match hash { match hash {
Ok(v) => delete_file(&v), Ok(v) => delete_file(&v).await,
Err(e) => Err(e), Err(e) => Err(e),
}
})
.filter_map(|r| async {
if let Err(e) = r {
Some(e)
} else {
None
}
}) })
.filter_map(|r| if let Err(e) = r { Some(e) } else { None })
.collect() .collect()
.await
} }
/// Deletes the given file from the media backend /// Deletes the given file from the media backend
/// ///
/// Note: this does NOT remove the related metadata from the database /// Note: this does NOT remove the related metadata from the database
fn delete_file(sha256_hex: &str) -> Result<()> { async fn delete_file(sha256_hex: &str) -> Result<()> {
match &services().globals.config.media.backend { match &services().globals.config.media.backend {
MediaBackendConfig::FileSystem { MediaBackendConfig::FileSystem {
path, path,