diff --git a/Cargo.lock b/Cargo.lock index cf5e656d..6764c0c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -503,6 +503,7 @@ dependencies = [ "hickory-resolver", "hmac", "http 1.1.0", + "humantime", "hyper 1.3.1", "hyper-util", "image", @@ -1195,6 +1196,12 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "humantime" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b112acc8b3adf4b107a8ec20977da0273a8c386765a3ec0229bd500a1443f9f" + [[package]] name = "hyper" version = "0.14.29" diff --git a/Cargo.toml b/Cargo.toml index d352ce7a..7ee97b4f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -131,6 +131,8 @@ clap = { version = "4.3.0", default-features = false, features = [ "string", "usage", ] } +humantime = "2" + futures-util = { version = "0.3.28", default-features = false } # Used for reading the configuration from conduit.toml & environment variables figment = { version = "0.10.8", features = ["env", "toml"] } diff --git a/src/database/key_value/media.rs b/src/database/key_value/media.rs index 8ab9046b..6f835b5b 100644 --- a/src/database/key_value/media.rs +++ b/src/database/key_value/media.rs @@ -1,4 +1,6 @@ -use ruma::{api::client::error::ErrorKind, ServerName, UserId}; +use std::{collections::BTreeMap, ops::Range}; + +use ruma::{api::client::error::ErrorKind, OwnedServerName, ServerName, UserId}; use sha2::{digest::Output, Sha256}; use tracing::error; @@ -153,6 +155,403 @@ impl service::media::Data for KeyValueDatabase { .map(|_| metadata) .ok_or_else(|| Error::BadRequest(ErrorKind::NotFound, "Media not found.")) } + + fn purge_and_get_hashes( + &self, + media: &[(OwnedServerName, String)], + force_filehash: bool, + ) -> Vec> { + let mut files = Vec::new(); + + let purge = |mut value: Vec| { + value.truncate(32); + let sha256_digest = value; + + let sha256_hex = hex::encode(&sha256_digest); + + self.purge_filehash(sha256_digest, false)?; + + Ok(sha256_hex) + }; + + for (server_name, media_id) in media { + if force_filehash { + let mut key = server_name.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(media_id.as_bytes()); + + match self.servernamemediaid_metadata.get(&key) { + Ok(Some(value)) => { + files.push(purge(value)); + } + Ok(None) => (), + Err(e) => { + files.push(Err(e)); + } + } + + key.push(0xff); + for (_, value) in self.thumbnailid_metadata.scan_prefix(key) { + files.push(purge(value)); + } + } else { + match self.purge_mediaid(server_name, media_id, false) { + Ok(f) => { + files.append(&mut f.into_iter().map(Ok).collect()); + } + Err(e) => files.push(Err(e)), + } + } + } + + files + } + + fn purge_and_get_hashes_from_user( + &self, + user_id: &UserId, + force_filehash: bool, + after: Option, + ) -> Vec> { + let mut files = Vec::new(); + let mut prefix = user_id.server_name().as_bytes().to_vec(); + prefix.push(0xff); + prefix.extend_from_slice(user_id.localpart().as_bytes()); + prefix.push(0xff); + + let purge_filehash = |sha256_digest: Vec| { + let sha256_hex = hex::encode(&sha256_digest); + + self.purge_filehash(sha256_digest, false)?; + + Ok(sha256_hex) + }; + + for (k, _) in self.servername_userlocalpart_mediaid.scan_prefix(prefix) { + let metadata = || { + let mut parts = k.rsplit(|&b| b == 0xff); + let media_id_bytes = parts.next().ok_or_else(|| { + Error::bad_database( + "Invalid format for key of servername_userlocalpart_mediaid", + ) + })?; + + let media_id = utils::string_from_bytes(media_id_bytes).map_err(|_| { + Error::bad_database( + "Invalid media_id string in servername_userlocalpart_mediaid", + ) + })?; + + let mut key = user_id.server_name().as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(media_id.as_bytes()); + + Ok(( + self.servernamemediaid_metadata.get(&key)?.ok_or_else(|| { + error!( + "Missing metadata for \"mxc://{}/{media_id}\", despite storing it's uploader", + user_id.server_name() + ); + Error::BadDatabase("Missing metadata for media id and server_name") + })?, + media_id, + )) + }; + + let (mut metadata, media_id) = match metadata() { + Ok(v) => v, + Err(e) => { + files.push(Err(e)); + continue; + } + }; + + metadata.truncate(32); + let sha256_digest = metadata; + + if let Some(after) = after { + let metadata = match self + .filehash_metadata + .get(&sha256_digest) + .map(|opt| opt.map(FilehashMetadata::from_vec)) + { + Ok(Some(metadata)) => metadata, + // If the media has already been deleted, we shouldn't treat that as an error + Ok(None) => continue, + Err(e) => { + files.push(Err(e)); + continue; + } + }; + + let creation = match metadata.creation(&sha256_digest) { + Ok(c) => c, + Err(e) => { + files.push(Err(e)); + continue; + } + }; + + if creation < after { + continue; + } + } + + if force_filehash { + files.push(purge_filehash(sha256_digest)); + + let mut prefix = user_id.server_name().as_bytes().to_vec(); + prefix.push(0xff); + prefix.extend_from_slice(media_id.as_bytes()); + prefix.push(0xff); + for (_, mut metadata) in self.thumbnailid_metadata.scan_prefix(prefix) { + metadata.truncate(32); + let sha256_digest = metadata; + files.push(purge_filehash(sha256_digest)); + } + } else { + match self.purge_mediaid(user_id.server_name(), &media_id, false) { + Ok(f) => { + files.append(&mut f.into_iter().map(Ok).collect()); + } + Err(e) => files.push(Err(e)), + } + } + } + + files + } + + fn purge_and_get_hashes_from_server( + &self, + server_name: &ServerName, + force_filehash: bool, + after: Option, + ) -> Vec> { + let mut prefix = server_name.as_bytes().to_vec(); + prefix.push(0xff); + + let mut files = Vec::new(); + + // Purges all references to the given media in the database, + // returning a Vec of hex sha256 digests + let purge_sha256 = |files: &mut Vec>, mut metadata: Vec| { + metadata.truncate(32); + let sha256_digest = metadata; + + if let Some(after) = after { + let Some(metadata) = self + .filehash_metadata + .get(&sha256_digest)? + .map(FilehashMetadata::from_vec) + else { + // If the media has already been deleted, we shouldn't treat that as an error + return Ok(()); + }; + + if metadata.creation(&sha256_digest)? < after { + return Ok(()); + } + } + + let sha256_hex = hex::encode(&sha256_digest); + + self.purge_filehash(sha256_digest, false)?; + + files.push(Ok(sha256_hex)); + Ok(()) + }; + + let purge_mediaid = |files: &mut Vec>, key: Vec| { + let mut parts = key.split(|&b| b == 0xff); + + let server_name = parts + .next() + .ok_or_else(|| Error::bad_database("Invalid format of metadata key")) + .map(utils::string_from_bytes)? + .map_err(|_| Error::bad_database("Invalid ServerName String in metadata key")) + .map(OwnedServerName::try_from)? + .map_err(|_| Error::bad_database("Invalid ServerName String in metadata key"))?; + + let media_id = parts + .next() + .ok_or_else(|| Error::bad_database("Invalid format of metadata key")) + .map(utils::string_from_bytes)? + .map_err(|_| Error::bad_database("Invalid Media ID String in metadata key"))?; + + files.append( + &mut self + .purge_mediaid(&server_name, &media_id, false)? + .into_iter() + .map(Ok) + .collect(), + ); + + Ok(()) + }; + + for (key, value) in self + .servernamemediaid_metadata + .scan_prefix(prefix.clone()) + .chain(self.thumbnailid_metadata.scan_prefix(prefix.clone())) + { + if let Err(e) = if force_filehash { + purge_sha256(&mut files, value) + } else { + purge_mediaid(&mut files, key) + } { + files.push(Err(e)); + } + } + + files + } +} + +impl KeyValueDatabase { + fn purge_mediaid( + &self, + server_name: &ServerName, + media_id: &str, + only_filehash_metadata: bool, + ) -> Result> { + let mut files = Vec::new(); + + let count_required_to_purge = if only_filehash_metadata { 1 } else { 0 }; + + let mut key = server_name.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(media_id.as_bytes()); + + if let Some(sha256_digest) = self.servernamemediaid_metadata.get(&key)?.map(|mut value| { + value.truncate(32); + value + }) { + if !only_filehash_metadata { + if let Some(localpart) = self.servernamemediaid_userlocalpart.get(&key)? { + self.servernamemediaid_userlocalpart.remove(&key)?; + + let mut key = server_name.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(&localpart); + key.push(0xff); + key.extend_from_slice(media_id.as_bytes()); + + self.servername_userlocalpart_mediaid.remove(&key)?; + }; + + self.servernamemediaid_metadata.remove(&key)?; + + let mut key = sha256_digest.clone(); + key.extend_from_slice(server_name.as_bytes()); + key.push(0xff); + key.extend_from_slice(media_id.as_bytes()); + + self.filehash_servername_mediaid.remove(&key)?; + } + + if self + .filehash_servername_mediaid + .scan_prefix(sha256_digest.clone()) + .count() + <= count_required_to_purge + && self + .filehash_thumbnailid + .scan_prefix(sha256_digest.clone()) + .next() + .is_none() + { + self.filehash_metadata.remove(&sha256_digest)?; + files.push(hex::encode(sha256_digest)); + } + } + + key.push(0xff); + + let mut thumbnails = BTreeMap::new(); + + for (thumbnail_id, mut value) in self.thumbnailid_metadata.scan_prefix(key) { + value.truncate(32); + let sha256_digest = value; + + let entry = thumbnails + .entry(sha256_digest.clone()) + .and_modify(|v| *v += 1) + .or_insert(1); + + if !only_filehash_metadata { + self.filehash_thumbnailid.remove(&sha256_digest)?; + self.thumbnailid_metadata.remove(&thumbnail_id)?; + } + + // Basically, if this is the only media pointing to the filehash, get rid of it. + // It's a little complicated due to how blocking works. + if self + .filehash_servername_mediaid + .scan_prefix(sha256_digest.clone()) + .count() + <= count_required_to_purge + && self + .filehash_thumbnailid + .scan_prefix(sha256_digest.clone()) + .count() + <= if only_filehash_metadata { *entry } else { 0 } + { + self.filehash_metadata.remove(&sha256_digest)?; + files.push(hex::encode(sha256_digest)); + } + } + + Ok(files) + } + + fn purge_filehash(&self, sha256_digest: Vec, only_filehash_metadata: bool) -> Result<()> { + let handle_error = || { + error!( + "Invalid format of key in filehash_servername_mediaid for media with sha256 content hash of {}", + hex::encode(&sha256_digest) + ); + Error::BadDatabase("Invalid format of key in filehash_servername_mediaid") + }; + + if !only_filehash_metadata { + for (key, _) in self.filehash_thumbnailid.scan_prefix(sha256_digest.clone()) { + self.filehash_thumbnailid.remove(&key)?; + let (_, key) = key.split_at(32); + self.thumbnailid_metadata.remove(key)?; + } + + for (k, _) in self + .filehash_servername_mediaid + .scan_prefix(sha256_digest.clone()) + { + let (_, servername_mediaid) = k.split_at_checked(32).ok_or_else(handle_error)?; + + self.servernamemediaid_metadata.remove(servername_mediaid)?; + self.filehash_servername_mediaid.remove(&k)?; + + if let Some(localpart) = self + .servernamemediaid_userlocalpart + .get(servername_mediaid)? + { + self.servernamemediaid_userlocalpart + .remove(servername_mediaid)?; + + let mut parts = servername_mediaid.split(|b: &u8| *b == 0xff); + + let mut key = parts.next().ok_or_else(handle_error)?.to_vec(); + key.push(0xff); + key.extend_from_slice(&localpart); + key.push(0xff); + key.extend_from_slice(parts.next().ok_or_else(handle_error)?); + + self.servername_userlocalpart_mediaid.remove(&key)?; + }; + } + } + + self.filehash_metadata.remove(&sha256_digest) + } } fn parse_metadata(value: &[u8]) -> Result { @@ -213,7 +612,47 @@ impl FilehashMetadata { Self { value } } + pub fn from_vec(vec: Vec) -> Self { + Self { value: vec } + } + pub fn value(&self) -> &[u8] { &self.value } + + fn get_u64_val( + &self, + range: Range, + name: &str, + sha256_digest: &[u8], + invalid_error: &'static str, + ) -> Result { + self.value + .get(range) + .ok_or_else(|| { + error!( + "Invalid format of metadata for media with sha256 content hash of {}", + hex::encode(sha256_digest) + ); + Error::BadDatabase("Invalid format of metadata in filehash_metadata") + })? + .try_into() + .map(u64::from_be_bytes) + .map_err(|_| { + error!( + "Invalid {name} for media with sha256 content hash of {}", + hex::encode(sha256_digest) + ); + Error::BadDatabase(invalid_error) + }) + } + + pub fn creation(&self, sha256_digest: &[u8]) -> Result { + self.get_u64_val( + 8..16, + "creation time", + sha256_digest, + "Invalid creation time in filehash_metadata", + ) + } } diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index 690da984..0e3cef1e 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -1,6 +1,11 @@ -use std::{collections::BTreeMap, convert::TryFrom, sync::Arc, time::Instant}; +use std::{ + collections::BTreeMap, + convert::TryFrom, + sync::Arc, + time::{Duration, Instant, SystemTime, UNIX_EPOCH}, +}; -use clap::Parser; +use clap::{Args, Parser}; use regex::Regex; use ruma::{ api::appservice::Registration, @@ -19,8 +24,8 @@ use ruma::{ }, TimelineEventType, }, - EventId, MilliSecondsSinceUnixEpoch, OwnedRoomAliasId, OwnedRoomId, RoomAliasId, RoomId, - RoomVersionId, ServerName, UserId, + EventId, MilliSecondsSinceUnixEpoch, MxcUri, OwnedRoomAliasId, OwnedRoomId, OwnedServerName, + RoomAliasId, RoomId, RoomVersionId, ServerName, UserId, }; use serde_json::value::to_raw_value; use tokio::sync::{mpsc, Mutex, RwLock}; @@ -82,11 +87,13 @@ enum AdminCommand { /// Deactivate a user /// /// User will not be removed from all rooms by default. - /// Use --leave-rooms to force the user to leave all rooms + /// Use --leave-rooms to force the user to leave all rooms. DeactivateUser { #[arg(short, long)] leave_rooms: bool, user_id: Box, + #[command(flatten)] + purge_media: DeactivatePurgeMediaArgs, }, #[command(verbatim_doc_comment)] @@ -94,6 +101,8 @@ enum AdminCommand { /// /// Recommended to use in conjunction with list-local-users. /// + /// Use either --purge-all-media or --purge-media-from-last to either delete all media uploaded + /// by them (in the last {specified timeframe}, if any) /// Users will not be removed from joined rooms by default. /// Can be overridden with --leave-rooms flag. /// Removing a mass amount of users from a room may cause a significant amount of leave events. @@ -107,9 +116,68 @@ enum AdminCommand { #[arg(short, long)] /// Remove users from their joined rooms leave_rooms: bool, - #[arg(short, long)] + #[arg(short = 'F', long)] /// Also deactivate admin accounts force: bool, + #[command(flatten)] + purge_media: DeactivatePurgeMediaArgs, + }, + + /// Purge a list of media, formatted as MXC URIs + /// There should be one URI per line, all contained within a code-block + /// + /// Note: This will also delete media with the same sha256 hash, so + /// only use this when you are certain all the media is undesirable + PurgeMedia, + + /// Purges all media uploaded by the local users listed in a code-block. + /// + /// Note: This will also delete identical media uploaded by other users, so + /// only use this when all the media they uploaded is undesirable + PurgeMediaFromUsers { + #[arg( + long, short = 't', + value_parser = humantime::parse_duration + )] + /// Only purge media uploaded in the last {timeframe} + /// + /// Should be in the form specified by humantime::parse_duration + /// (e.g. 48h, 60min, 10days etc.) + // --help is unformatted + #[allow(rustdoc::bare_urls)] + /// https://docs.rs/humantime/2.2.0/humantime/fn.parse_duration.html + from_last: Option, + + #[arg(long, short)] + /// Also deletes other media with the same SHA256 hash, ensuring that the file is removed from + /// the media backend, so only use this when all the media they uploaded is undesirable + force_filehash: bool, + }, + + /// Purges all media from the specified server + /// + /// Note: This will also delete identical media uploaded by local users, so + /// only use this when all the media from that server is undesirable (or if + /// you know that no media on the remote server is also uploaded locally) + PurgeMediaFromServer { + server_id: Box, + #[arg( + long, short = 't', + value_parser = humantime::parse_duration + )] + /// Only purge media uploaded in the last {timeframe} + /// + /// Should be in the form specified by humantime::parse_duration + /// (e.g. 48h, 60min, 10days etc.) + // --help is unformatted + #[allow(rustdoc::bare_urls)] + /// https://docs.rs/humantime/2.2.0/humantime/fn.parse_duration.html + from_last: Option, + + #[arg(long, short)] + /// Also deletes other media with the same SHA256 hash, ensuring that the file is removed from + /// the media backend, so only use this when all the media they uploaded is undesirable + force_filehash: bool, }, /// Get the auth_chain of a PDU @@ -181,6 +249,37 @@ enum AdminCommand { HashAndSignEvent { room_version_id: RoomVersionId }, } +#[derive(Args, Debug)] +#[group(multiple = true, required = false)] +pub struct DeactivatePurgeMediaArgs { + #[arg(long, short = 'm')] + /// Purges all media uploaded by the user(s) after deactivating their account + purge_media: bool, + + #[arg( + long, short = 't', + value_parser = humantime::parse_duration, + requires = "purge_media" + )] + /// If the --purge-media is present, it only purges media uploaded in the last {time-period} + /// + /// Should be in the form specified by humantime::parse_duration + /// (e.g. 48h, 60min, 10days etc.) + // --help is unformatted + #[allow(rustdoc::bare_urls)] + /// https://docs.rs/humantime/2.2.0/humantime/fn.parse_duration.html + /// + /// Note: This will also delete identical media uploaded by other users, so + /// only use this when all the media they uploaded in this timeframe is undesirable + media_from_last: Option, + + #[arg(long, short = 'f', requires = "purge_media")] + /// If the --purge-media is present, it will also delete identical media uploaded by other + /// users, ensuring that the file is removed from the media backend, so only use this when all + /// the media they uploaded is undesirable + force_filehash: bool, +} + #[derive(Debug)] pub enum AdminRoomEvent { ProcessMessage(String), @@ -690,6 +789,7 @@ impl Service { AdminCommand::DeactivateUser { leave_rooms, user_id, + purge_media, } => { let user_id = Arc::::from(user_id); if !services().users.exists(&user_id)? { @@ -711,78 +811,42 @@ impl Service { leave_all_rooms(&user_id).await?; } - RoomMessageEventContent::text_plain(format!( - "User {user_id} has been deactivated" + let failed_purged_media = if purge_media.purge_media { + let after = purge_media + .media_from_last + .map(unix_secs_from_duration) + .transpose()?; + + services() + .media + .purge_from_user(&user_id, purge_media.force_filehash, after) + .len() + } else { + 0 + }; + + if failed_purged_media == 0 { + RoomMessageEventContent::text_plain(format!( + "User {user_id} has been deactivated" + )) + } else { + RoomMessageEventContent ::text_plain(format!( + "User {user_id} has been deactivated, but {failed_purged_media} media failed to be purged, check the logs for more details" )) + } } } - AdminCommand::DeactivateAll { leave_rooms, force } => { + AdminCommand::DeactivateAll { + leave_rooms, + force, + purge_media, + } => { if body.len() > 2 && body[0].trim() == "```" && body.last().unwrap().trim() == "```" { - let users = body.clone().drain(1..body.len() - 1).collect::>(); - - let mut user_ids = Vec::new(); - let mut remote_ids = Vec::new(); - let mut non_existent_ids = Vec::new(); - let mut invalid_users = Vec::new(); - - for &user in &users { - match <&UserId>::try_from(user) { - Ok(user_id) => { - if user_id.server_name() != services().globals.server_name() { - remote_ids.push(user_id) - } else if !services().users.exists(user_id)? { - non_existent_ids.push(user_id) - } else { - user_ids.push(user_id) - } - } - Err(_) => { - invalid_users.push(user); - } - } - } - - let mut markdown_message = String::new(); - let mut html_message = String::new(); - if !invalid_users.is_empty() { - markdown_message.push_str("The following user ids are not valid:\n```\n"); - html_message.push_str("The following user ids are not valid:\n
\n");
-                        for invalid_user in invalid_users {
-                            markdown_message.push_str(&format!("{invalid_user}\n"));
-                            html_message.push_str(&format!("{invalid_user}\n"));
-                        }
-                        markdown_message.push_str("```\n\n");
-                        html_message.push_str("
\n\n"); - } - if !remote_ids.is_empty() { - markdown_message - .push_str("The following users are not from this server:\n```\n"); - html_message - .push_str("The following users are not from this server:\n
\n");
-                        for remote_id in remote_ids {
-                            markdown_message.push_str(&format!("{remote_id}\n"));
-                            html_message.push_str(&format!("{remote_id}\n"));
-                        }
-                        markdown_message.push_str("```\n\n");
-                        html_message.push_str("
\n\n"); - } - if !non_existent_ids.is_empty() { - markdown_message.push_str("The following users do not exist:\n```\n"); - html_message.push_str("The following users do not exist:\n
\n");
-                        for non_existent_id in non_existent_ids {
-                            markdown_message.push_str(&format!("{non_existent_id}\n"));
-                            html_message.push_str(&format!("{non_existent_id}\n"));
-                        }
-                        markdown_message.push_str("```\n\n");
-                        html_message.push_str("
\n\n"); - } - if !markdown_message.is_empty() { - return Ok(RoomMessageEventContent::text_html( - markdown_message, - html_message, - )); - } + let mut user_ids = match userids_from_body(&body)? { + Ok(v) => v, + Err(message) => return Ok(message), + }; let mut deactivation_count = 0; let mut admins = Vec::new(); @@ -812,12 +876,81 @@ impl Service { } } - if admins.is_empty() { - RoomMessageEventContent::text_plain(format!( - "Deactivated {deactivation_count} accounts." + let mut failed_count = 0; + + if purge_media.purge_media { + let after = purge_media + .media_from_last + .map(unix_secs_from_duration) + .transpose()?; + + for user_id in user_ids { + failed_count += services() + .media + .purge_from_user(user_id, purge_media.force_filehash, after) + .len(); + } + } + + let mut message = format!("Deactivated {deactivation_count} accounts."); + if !admins.is_empty() { + message.push_str(&format!("\nSkipped admin accounts: {:?}. Use --force to deactivate admin accounts",admins.join(", "))); + } + if failed_count != 0 { + message.push_str(&format!( + "\nFailed to delete {failed_count} media, check logs for more details" )) + } + + RoomMessageEventContent::text_plain(message) + } else { + RoomMessageEventContent::text_plain( + "Expected code block in command body. Add --help for details.", + ) + } + } + AdminCommand::PurgeMedia => media_from_body(body).map_or_else( + |message| message, + |media| { + let failed_count = services().media.purge(&media, true).len(); + + if failed_count == 0 { + RoomMessageEventContent::text_plain("Successfully purged media") } else { - RoomMessageEventContent::text_plain(format!("Deactivated {} accounts.\nSkipped admin accounts: {:?}. Use --force to deactivate admin accounts", deactivation_count, admins.join(", "))) + RoomMessageEventContent::text_plain(format!( + "Failed to delete {failed_count} media, check logs for more details" + )) + } + }, + ), + AdminCommand::PurgeMediaFromUsers { + from_last, + force_filehash, + } => { + let after = from_last.map(unix_secs_from_duration).transpose()?; + + if body.len() > 2 && body[0].trim() == "```" && body.last().unwrap().trim() == "```" + { + let user_ids = match userids_from_body(&body)? { + Ok(v) => v, + Err(message) => return Ok(message), + }; + + let mut failed_count = 0; + + for user_id in user_ids { + failed_count += services() + .media + .purge_from_user(user_id, force_filehash, after) + .len(); + } + + if failed_count == 0 { + RoomMessageEventContent::text_plain("Successfully purged media") + } else { + RoomMessageEventContent::text_plain(format!( + "Failed to purge {failed_count} media, check logs for more details" + )) } } else { RoomMessageEventContent::text_plain( @@ -825,6 +958,34 @@ impl Service { ) } } + AdminCommand::PurgeMediaFromServer { + server_id: server_name, + from_last, + force_filehash, + } => { + if server_name == services().globals.server_name() { + return Err(Error::AdminCommand( + "Cannot purge all media from your own homeserver", + )); + } + + let after = from_last.map(unix_secs_from_duration).transpose()?; + + let failed_count = services() + .media + .purge_from_server(&server_name, force_filehash, after) + .len(); + + if failed_count == 0 { + RoomMessageEventContent::text_plain(format!( + "Media from {server_name} has successfully been purged" + )) + } else { + RoomMessageEventContent::text_plain(format!( + "Failed to purge {failed_count} media, check logs for more details" + )) + } + } AdminCommand::SignJson => { if body.len() > 2 && body[0].trim() == "```" && body.last().unwrap().trim() == "```" { @@ -1456,6 +1617,105 @@ impl Service { } } +fn userids_from_body<'a>( + body: &'a [&'a str], +) -> Result, RoomMessageEventContent>, Error> { + let users = body.to_owned().drain(1..body.len() - 1).collect::>(); + + let mut user_ids = Vec::new(); + let mut remote_ids = Vec::new(); + let mut non_existent_ids = Vec::new(); + let mut invalid_users = Vec::new(); + + for &user in &users { + match <&UserId>::try_from(user) { + Ok(user_id) => { + if user_id.server_name() != services().globals.server_name() { + remote_ids.push(user_id) + } else if !services().users.exists(user_id)? { + non_existent_ids.push(user_id) + } else { + user_ids.push(user_id) + } + } + Err(_) => { + invalid_users.push(user); + } + } + } + + let mut markdown_message = String::new(); + let mut html_message = String::new(); + if !invalid_users.is_empty() { + markdown_message.push_str("The following user ids are not valid:\n```\n"); + html_message.push_str("The following user ids are not valid:\n
\n");
+        for invalid_user in invalid_users {
+            markdown_message.push_str(&format!("{invalid_user}\n"));
+            html_message.push_str(&format!("{invalid_user}\n"));
+        }
+        markdown_message.push_str("```\n\n");
+        html_message.push_str("
\n\n"); + } + if !remote_ids.is_empty() { + markdown_message.push_str("The following users are not from this server:\n```\n"); + html_message.push_str("The following users are not from this server:\n
\n");
+        for remote_id in remote_ids {
+            markdown_message.push_str(&format!("{remote_id}\n"));
+            html_message.push_str(&format!("{remote_id}\n"));
+        }
+        markdown_message.push_str("```\n\n");
+        html_message.push_str("
\n\n"); + } + if !non_existent_ids.is_empty() { + markdown_message.push_str("The following users do not exist:\n```\n"); + html_message.push_str("The following users do not exist:\n
\n");
+        for non_existent_id in non_existent_ids {
+            markdown_message.push_str(&format!("{non_existent_id}\n"));
+            html_message.push_str(&format!("{non_existent_id}\n"));
+        }
+        markdown_message.push_str("```\n\n");
+        html_message.push_str("
\n\n"); + } + if !markdown_message.is_empty() { + return Ok(Err(RoomMessageEventContent::text_html( + markdown_message, + html_message, + ))); + } + + Ok(Ok(user_ids)) +} + +fn media_from_body( + body: Vec<&str>, +) -> Result, RoomMessageEventContent> { + if body.len() > 2 && body[0].trim() == "```" && body.last().unwrap().trim() == "```" { + Ok(body + .clone() + .drain(1..body.len() - 1) + .map(>::from) + .filter_map(|mxc| { + mxc.parts() + .map(|(server_name, media_id)| (server_name.to_owned(), media_id.to_owned())) + .ok() + }) + .collect::>()) + } else { + Err(RoomMessageEventContent::text_plain( + "Expected code block in command body. Add --help for details.", + )) + } +} + +fn unix_secs_from_duration(duration: Duration) -> Result { + SystemTime::now() + .checked_sub(duration).ok_or_else(||Error::AdminCommand("Given timeframe cannot be represented as system time, please try again with a shorter time-frame")) + .map(|time| time + .duration_since(UNIX_EPOCH) + .expect("Time is after unix epoch") + .as_secs()) +} + #[cfg(test)] mod test { use super::*; diff --git a/src/service/media/data.rs b/src/service/media/data.rs index 2301ebbc..97074d30 100644 --- a/src/service/media/data.rs +++ b/src/service/media/data.rs @@ -1,4 +1,4 @@ -use ruma::{ServerName, UserId}; +use ruma::{OwnedServerName, ServerName, UserId}; use sha2::{digest::Output, Sha256}; use crate::Result; @@ -42,4 +42,24 @@ pub trait Data: Send + Sync { width: u32, height: u32, ) -> Result; + + fn purge_and_get_hashes( + &self, + media: &[(OwnedServerName, String)], + force_filehash: bool, + ) -> Vec>; + + fn purge_and_get_hashes_from_user( + &self, + user_id: &UserId, + force_filehash: bool, + after: Option, + ) -> Vec>; + + fn purge_and_get_hashes_from_server( + &self, + server_name: &ServerName, + force_filehash: bool, + after: Option, + ) -> Vec>; } diff --git a/src/service/media/mod.rs b/src/service/media/mod.rs index b325f507..8cf1d6b5 100644 --- a/src/service/media/mod.rs +++ b/src/service/media/mod.rs @@ -1,15 +1,19 @@ mod data; -use std::io::Cursor; +use std::{fs, io::Cursor}; pub use data::Data; use ruma::{ api::client::{error::ErrorKind, media::is_safe_inline_content_type}, http_headers::{ContentDisposition, ContentDispositionType}, - ServerName, UserId, + OwnedServerName, ServerName, UserId, }; use sha2::{digest::Output, Digest, Sha256}; +use tracing::error; -use crate::{config::MediaConfig, services, Error, Result}; +use crate::{ + config::{DirectoryStructure, MediaConfig}, + services, Error, Result, +}; use image::imageops::FilterType; pub struct DbFileMeta { @@ -293,6 +297,67 @@ impl Service { })) } } + + /// Purges all of the specified media. + /// + /// If `force_filehash` is true, all media and/or thumbnails which share sha256 content hashes + /// with the purged media will also be purged, meaning that the media is guaranteed to be deleted + /// from the media backend. Otherwise, it will be deleted if only the media IDs requested to be + /// purged have that sha256 hash. + /// + /// Returns errors for all the files that were failed to be deleted, if any. + pub fn purge(&self, media: &[(OwnedServerName, String)], force_filehash: bool) -> Vec { + let hashes = self.db.purge_and_get_hashes(media, force_filehash); + + purge_files(hashes) + } + + /// Purges all (past a certain time in unix seconds, if specified) media + /// sent by a user. + /// + /// If `force_filehash` is true, all media and/or thumbnails which share sha256 content hashes + /// with the purged media will also be purged, meaning that the media is guaranteed to be deleted + /// from the media backend. Otherwise, it will be deleted if only the media IDs requested to be + /// purged have that sha256 hash. + /// + /// Returns errors for all the files that were failed to be deleted, if any. + /// + /// Note: it only currently works for local users, as we cannot determine who + /// exactly uploaded the file when it comes to remove users. + pub fn purge_from_user( + &self, + user_id: &UserId, + force_filehash: bool, + after: Option, + ) -> Vec { + let hashes = self + .db + .purge_and_get_hashes_from_user(user_id, force_filehash, after); + + purge_files(hashes) + } + + /// Purges all (past a certain time in unix seconds, if specified) media + /// obtained from the specified server (due to the MXC URI). + /// + /// If `force_filehash` is true, all media and/or thumbnails which share sha256 content hashes + /// with the purged media will also be purged, meaning that the media is guaranteed to be deleted + /// from the media backend. Otherwise, it will be deleted if only the media IDs requested to be + /// purged have that sha256 hash. + /// + /// Returns errors for all the files that were failed to be deleted, if any. + pub fn purge_from_server( + &self, + server_name: &ServerName, + force_filehash: bool, + after: Option, + ) -> Vec { + let hashes = self + .db + .purge_and_get_hashes_from_server(server_name, force_filehash, after); + + purge_files(hashes) + } } /// Creates the media file, using the configured media backend @@ -335,6 +400,68 @@ async fn get_file(sha256_hex: &str) -> Result> { }) } +/// Purges the given files from the media backend +/// Returns a `Vec` of errors that occurred when attempting to delete the files +/// +/// Note: this does NOT remove the related metadata from the database +fn purge_files(hashes: Vec>) -> Vec { + hashes + .into_iter() + .map(|hash| match hash { + Ok(v) => delete_file(&v), + Err(e) => Err(e), + }) + .filter_map(|r| if let Err(e) = r { Some(e) } else { None }) + .collect() +} + +/// Deletes the given file from the media backend +/// +/// Note: this does NOT remove the related metadata from the database +fn delete_file(sha256_hex: &str) -> Result<()> { + match &services().globals.config.media { + MediaConfig::FileSystem { + path, + directory_structure, + } => { + let mut path = + services() + .globals + .get_media_path(path, directory_structure, sha256_hex)?; + + if let Err(e) = fs::remove_file(&path) { + // Multiple files with the same filehash might be requseted to be deleted + if e.kind() != std::io::ErrorKind::NotFound { + error!("Error removing media from filesystem: {e}"); + Err(e)?; + } + } + + if let DirectoryStructure::Deep { length: _, depth } = directory_structure { + let mut depth = depth.get(); + + while depth > 0 { + // Here at the start so that the first time, the file gets removed from the path + path.pop(); + + if let Err(e) = fs::remove_dir(&path) { + if e.kind() == std::io::ErrorKind::DirectoryNotEmpty { + break; + } else { + error!("Error removing empty media directories: {e}"); + Err(e)?; + } + } + + depth -= 1; + } + } + } + } + + Ok(()) +} + /// Creates a content disposition with the given `filename`, using the `content_type` to determine whether /// the disposition should be `inline` or `attachment` fn content_disposition(