diff --git a/Cargo.lock b/Cargo.lock index c5d5695b..9bd25569 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -402,6 +402,15 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9" +[[package]] +name = "bytesize" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3c8f83209414aacf0eeae3cf730b18d6981697fba62f200fcfb92b9f082acba" +dependencies = [ + "serde", +] + [[package]] name = "bzip2-sys" version = "0.1.11+1.0.8" @@ -524,6 +533,7 @@ dependencies = [ "axum-server", "base64 0.22.1", "bytes", + "bytesize", "chrono", "clap", "directories", @@ -534,6 +544,7 @@ dependencies = [ "hmac", "http 1.1.0", "humantime", + "humantime-serde", "hyper 1.3.1", "hyper-util", "image", @@ -1232,6 +1243,16 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b112acc8b3adf4b107a8ec20977da0273a8c386765a3ec0229bd500a1443f9f" +[[package]] +name = "humantime-serde" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57a3db5ea5923d99402c94e9feb261dc5ee9b4efa158b0315f788cf549cc200c" +dependencies = [ + "humantime", + "serde", +] + [[package]] name = "hyper" version = "0.14.29" diff --git a/Cargo.toml b/Cargo.toml index c70055f1..511a6f4c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -88,6 +88,9 @@ image = { version = "0.25", default-features = false, features = [ # Used for creating media filenames hex = "0.4" sha2 = "0.10" +# Used for parsing media retention policies from the config +bytesize = { version = "2", features = ["serde"] } +humantime-serde = "1" # Used to encode server public key base64 = "0.22" # Used when hashing the state diff --git a/docs/configuration.md b/docs/configuration.md index 3323fb64..145c2c48 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -99,6 +99,61 @@ depth = 4 length = 2 ``` +#### Retention policies +Over time, the amount of media will keep growing, even if they were only accessed once. +Retention policies allow for media files to automatically be deleted if they meet certain crietia, +allowing disk space to be saved. + +This can be configured via the `retention` field of the media config, which is an array with +"scopes" specified +- `scope`: specifies what type of media this policy applies to. If unset, all other scopes which + you have not configured will use this as a default. Possible values: `"local"`, `"remote"`, + `"thumbnail"` +- `accessed`: the maximum amount of time since the media was last accessed, + in the form specified by [`humantime::parse_duration`](https://docs.rs/humantime/2.2.0/humantime/fn.parse_duration.html) + (e.g. `"240h"`, `"1400min"`, `"2months"`, etc.) +- `created`: the maximum amount of time since the media was created after, in the same format as + `accessed` above. +- `space`: the maximum amount of space all of the media in this scope can occupy (if no scope is + specified, this becomes the total for **all** media). If the creation/downloading of new media, + will cause this to be exceeded, the last accessed media will be deleted repetitively until there + is enough space for the new media. The format is specified by [`ByteSize`](https://docs.rs/bytesize/2.0.1/bytesize/index.html) + (e.g. `"10000MB"`, `"15GiB"`, `"1.5TB"`, etc.) + +Media needs to meet **all** the specified requirements to be kept, otherwise, it will be deleted. +This means that thumbnails have to meet both the `"thumbnail"`, and either `"local"` or `"remote"` +requirements in order to be kept. + +If the media does not meet the `accessed` or `created` requirement, they will be deleted during a +periodic cleanup, which happens every 1/10th of the period of the shortest retention time, with a +maximum frequency of every minute, and a minimum of every 24 hours. For example, if I set my +`accessed` time for all media to `"2months"`, but override that to be `"48h"` for thumbnails, +the cleanup will happen every 4.8 hours. + +##### Example +```toml +# Total of 40GB for all media +[[global.media.retention]] # Notice the double "[]", due to this being a table item in an array +space = "40G" + +# Delete remote media not accessed for 30 days, or older than 90 days +[[global.media.retention]] +scope = "remote" +accessed = "30d" +created = "90days" # you can mix and match between the long and short format + +# Delete local media not accessed for 1 year +[[global.media.retention]] +scope = "local" +accessed = "1y" + +# Only store 1GB of thumbnails +[[global.media.retention]] +scope = "thumbnail" +space = "1GB" + +``` + ### TLS The `tls` table contains the following fields: - `certs`: The path to the public PEM certificate diff --git a/src/config/mod.rs b/src/config/mod.rs index 46dcf7e0..bfe4065e 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -1,13 +1,16 @@ use std::{ - collections::BTreeMap, + collections::{BTreeMap, HashMap, HashSet}, fmt, net::{IpAddr, Ipv4Addr}, num::NonZeroU8, path::PathBuf, + time::Duration, }; +use bytesize::ByteSize; use ruma::{OwnedServerName, RoomVersionId}; use serde::{de::IgnoredAny, Deserialize}; +use tokio::time::{interval, Interval}; use tracing::warn; use url::Url; @@ -221,23 +224,26 @@ impl From for Config { server: well_known_server, }; - let media = match media { - IncompleteMediaConfig::FileSystem { - path, - directory_structure, - } => MediaConfig::FileSystem { - path: path.unwrap_or_else(|| { - // We do this as we don't know if the path has a trailing slash, or even if the - // path separator is a forward or backward slash - [&database_path, "media"] - .iter() - .collect::() - .into_os_string() - .into_string() - .expect("Both inputs are valid UTF-8") - }), - directory_structure, + let media = MediaConfig { + backend: match media.backend { + IncompleteMediaBackendConfig::FileSystem { + path, + directory_structure, + } => MediaBackendConfig::FileSystem { + path: path.unwrap_or_else(|| { + // We do this as we don't know if the path has a trailing slash, or even if the + // path separator is a forward or backward slash + [&database_path, "media"] + .iter() + .collect::() + .into_os_string() + .into_string() + .expect("Both inputs are valid UTF-8") + }), + directory_structure, + }, }, + retention: media.retention.into(), }; Config { @@ -317,9 +323,159 @@ pub struct WellKnownConfig { pub server: OwnedServerName, } +#[derive(Deserialize, Default)] +pub struct IncompleteMediaConfig { + #[serde(flatten, default)] + pub backend: IncompleteMediaBackendConfig, + pub retention: IncompleteMediaRetentionConfig, +} + +#[derive(Clone, Debug)] +pub struct MediaConfig { + pub backend: MediaBackendConfig, + pub retention: MediaRetentionConfig, +} + +type IncompleteMediaRetentionConfig = Option>; + +#[derive(Clone, Debug)] +pub struct MediaRetentionConfig { + pub scoped: HashMap, + pub global_space: Option, +} + +impl MediaRetentionConfig { + /// Interval for the duration-based retention policies to be checked & enforced + pub fn cleanup_interval(&self) -> Option { + self.scoped + .values() + .filter_map(|scoped| match (scoped.created, scoped.accessed) { + (None, accessed) => accessed, + (created, None) => created, + (created, accessed) => created.min(accessed), + }) + .map(|dur| { + dur.mul_f32(0.1) + .max(Duration::from_secs(60).min(Duration::from_secs(60 * 60 * 24))) + }) + .min() + .map(interval) + } +} + +#[derive(Deserialize)] +pub struct IncompleteScopedMediaRetentionConfig { + pub scope: Option, + #[serde(default, with = "humantime_serde::option")] + pub accessed: Option, + #[serde(default, with = "humantime_serde::option")] + pub created: Option, + pub space: Option, +} + +impl From for MediaRetentionConfig { + fn from(value: IncompleteMediaRetentionConfig) -> Self { + { + let mut scoped = HashMap::from([ + ( + MediaRetentionScope::Remote, + ScopedMediaRetentionConfig::default(), + ), + ( + MediaRetentionScope::Thumbnail, + ScopedMediaRetentionConfig::default(), + ), + ]); + let mut fallback = None; + + if let Some(retention) = value { + for IncompleteScopedMediaRetentionConfig { + scope, + accessed, + space, + created, + } in retention + { + if let Some(scope) = scope { + scoped.insert( + scope, + ScopedMediaRetentionConfig { + accessed, + space, + created, + }, + ); + } else { + fallback = Some(ScopedMediaRetentionConfig { + accessed, + space, + created, + }) + } + } + } + + if let Some(fallback) = fallback.clone() { + for scope in [ + MediaRetentionScope::Remote, + MediaRetentionScope::Local, + MediaRetentionScope::Thumbnail, + ] { + scoped.entry(scope).or_insert_with(|| fallback.clone()); + } + } + + Self { + global_space: fallback.and_then(|global| global.space), + scoped, + } + } + } +} + +impl std::hash::Hash for IncompleteScopedMediaRetentionConfig { + fn hash(&self, state: &mut H) { + self.scope.hash(state); + } +} + +impl PartialEq for IncompleteScopedMediaRetentionConfig { + fn eq(&self, other: &Self) -> bool { + self.scope == other.scope + } +} + +impl Eq for IncompleteScopedMediaRetentionConfig {} + +#[derive(Debug, Clone)] +pub struct ScopedMediaRetentionConfig { + pub accessed: Option, + pub created: Option, + pub space: Option, +} + +impl Default for ScopedMediaRetentionConfig { + fn default() -> Self { + Self { + // 30 days + accessed: Some(Duration::from_secs(60 * 60 * 24 * 30)), + created: None, + space: None, + } + } +} + +#[derive(Deserialize, Clone, Debug, Hash, PartialEq, Eq)] +#[serde(rename_all = "lowercase")] +pub enum MediaRetentionScope { + Remote, + Local, + Thumbnail, +} + #[derive(Deserialize)] #[serde(tag = "backend", rename_all = "lowercase")] -pub enum IncompleteMediaConfig { +pub enum IncompleteMediaBackendConfig { FileSystem { path: Option, #[serde(default)] @@ -327,7 +483,7 @@ pub enum IncompleteMediaConfig { }, } -impl Default for IncompleteMediaConfig { +impl Default for IncompleteMediaBackendConfig { fn default() -> Self { Self::FileSystem { path: None, @@ -337,7 +493,7 @@ impl Default for IncompleteMediaConfig { } #[derive(Debug, Clone)] -pub enum MediaConfig { +pub enum MediaBackendConfig { FileSystem { path: String, directory_structure: DirectoryStructure, diff --git a/src/database/key_value/media.rs b/src/database/key_value/media.rs index f1a3f6e8..27a239fd 100644 --- a/src/database/key_value/media.rs +++ b/src/database/key_value/media.rs @@ -1,16 +1,18 @@ use std::{collections::BTreeMap, ops::Range, slice::Split}; +use bytesize::ByteSize; use ruma::{api::client::error::ErrorKind, OwnedServerName, ServerName, UserId}; use sha2::{digest::Output, Sha256}; use tracing::error; use crate::{ + config::{MediaRetentionConfig, MediaRetentionScope}, database::KeyValueDatabase, service::{ self, - media::{BlockedMediaInfo, DbFileMeta}, + media::{BlockedMediaInfo, Data as _, DbFileMeta, MediaType}, }, - utils, Error, Result, + services, utils, Error, Result, }; impl service::media::Data for KeyValueDatabase { @@ -773,6 +775,140 @@ impl service::media::Data for KeyValueDatabase { Ok(false) } + + fn files_to_delete( + &self, + sha256_digest: &[u8], + retention: &MediaRetentionConfig, + media_type: MediaType, + new_size: u64, + ) -> Result>> { + // If the file already exists, no space needs to be cleared + if self.filehash_metadata.get(sha256_digest)?.is_some() { + return Ok(Vec::new()); + } + + let scoped_space = |scope| retention.scoped.get(&scope).and_then(|policy| policy.space); + + let mut files_to_delete = Vec::new(); + + if media_type.is_thumb() { + if let Some(mut f) = self.purge_if_necessary( + scoped_space(MediaRetentionScope::Thumbnail), + |k| self.file_is_thumb(k), + &new_size, + ) { + files_to_delete.append(&mut f); + } + } + + match media_type { + MediaType::LocalMedia { thumbnail: _ } => { + if let Some(mut f) = self.purge_if_necessary( + scoped_space(MediaRetentionScope::Local), + |k| self.file_is_local(k).unwrap_or(true), + &new_size, + ) { + files_to_delete.append(&mut f); + } + } + MediaType::RemoteMedia { thumbnail: _ } => { + if let Some(mut f) = self.purge_if_necessary( + scoped_space(MediaRetentionScope::Remote), + |k| !self.file_is_local(k).unwrap_or(true), + &new_size, + ) { + files_to_delete.append(&mut f); + } + } + } + + if let Some(mut f) = self.purge_if_necessary(retention.global_space, |_| true, &new_size) { + files_to_delete.append(&mut f); + } + + Ok(files_to_delete) + } + + fn cleanup_time_retention(&self, retention: &MediaRetentionConfig) -> Vec> { + let now = utils::secs_since_unix_epoch(); + + let should_be_deleted = |k: &[u8], metadata: &FilehashMetadata| { + let check_policy = |retention_scope| { + if let Some(scoped_retention) = retention.scoped.get(&retention_scope) { + if let Some(created_policy) = scoped_retention.created { + if now - metadata.creation(k)? > created_policy.as_secs() { + return Ok(true); + } + } + + if let Some(accessed_policy) = scoped_retention.accessed { + if now - metadata.last_access(k)? > accessed_policy.as_secs() { + return Ok(true); + } + } + } + Ok(false) + }; + + if self.file_is_thumb(k) && check_policy(MediaRetentionScope::Thumbnail)? { + return Ok(true); + } + + if self.file_is_local(k)? { + check_policy(MediaRetentionScope::Local) + } else { + check_policy(MediaRetentionScope::Remote) + } + }; + + let mut files_to_delete = Vec::new(); + let mut errors_and_hashes = Vec::new(); + + for (k, v) in self.filehash_metadata.iter() { + match should_be_deleted(&k, &FilehashMetadata::from_vec(v)) { + Ok(true) => files_to_delete.push(k), + Ok(false) => (), + Err(e) => errors_and_hashes.push(Err(e)), + } + } + + errors_and_hashes.append(&mut self.purge(files_to_delete)); + + errors_and_hashes + } + + fn update_last_accessed(&self, server_name: &ServerName, media_id: &str) -> Result<()> { + let mut key = server_name.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(media_id.as_bytes()); + + if let Some(mut meta) = self.servernamemediaid_metadata.get(&key)? { + meta.truncate(32); + let sha256_digest = meta; + + self.update_last_accessed_filehash(&sha256_digest) + } else { + // File was probably deleted just as we were fetching it, so nothing to do + Ok(()) + } + } + + fn update_last_accessed_filehash(&self, sha256_digest: &[u8]) -> Result<()> { + if let Some(mut metadata) = self + .filehash_metadata + .get(sha256_digest)? + .map(FilehashMetadata::from_vec) + { + metadata.update_last_access(); + + self.filehash_metadata + .insert(sha256_digest, metadata.value()) + } else { + // File was probably deleted just as we were fetching it, so nothing to do + Ok(()) + } + } } impl KeyValueDatabase { @@ -930,6 +1066,119 @@ impl KeyValueDatabase { self.filehash_metadata.remove(&sha256_digest) } + + fn file_is_local(&self, k: &[u8]) -> Result { + for (k, _) in self.filehash_servername_mediaid.scan_prefix(k.to_vec()) { + let mut parts = k + .get(32..) + .map(|k| k.split(|&b| b == 0xff)) + .ok_or_else(|| { + Error::bad_database("Invalid format of key in filehash_servername_mediaid") + })?; + + let Some(server_name) = parts.next() else { + return Err(Error::bad_database( + "Invalid format of key in filehash_servername_mediaid", + )); + }; + + if utils::string_from_bytes(server_name).map_err(|_| { + Error::bad_database("Invalid UTF-8 servername in filehash_servername_mediaid") + })? == services().globals.server_name().as_str() + { + return Ok(true); + } + } + + Ok(false) + } + + fn file_is_thumb(&self, k: &[u8]) -> bool { + self.filehash_thumbnailid + .scan_prefix(k.to_vec()) + .next() + .is_some() + && self + .filehash_servername_mediaid + .scan_prefix(k.to_vec()) + .next() + .is_none() + } + + fn purge_if_necessary( + &self, + space: Option, + filter: impl Fn(&[u8]) -> bool, + new_size: &u64, + ) -> Option>> { + if let Some(space) = space { + let mut candidate_files_to_delete = Vec::new(); + let mut errors_and_hashes = Vec::new(); + let mut total_size = 0; + + let parse_value = |k: Vec, v: &FilehashMetadata| { + let last_access = v.last_access(&k)?; + let size = v.size(&k)?; + Ok((k, last_access, size)) + }; + + for (k, v) in self.filehash_metadata.iter().filter(|(k, _)| filter(k)) { + match parse_value(k, &FilehashMetadata::from_vec(v)) { + Ok(x) => { + total_size += x.2; + candidate_files_to_delete.push(x) + } + Err(e) => errors_and_hashes.push(Err(e)), + } + } + + if let Some(required_to_delete) = (total_size + *new_size).checked_sub(space.as_u64()) { + candidate_files_to_delete.sort_by_key(|(_, last_access, _)| *last_access); + candidate_files_to_delete.reverse(); + + let mut size_sum = 0; + let mut take = candidate_files_to_delete.len(); + + for (i, (_, _, file_size)) in candidate_files_to_delete.iter().enumerate() { + size_sum += file_size; + if size_sum >= required_to_delete { + take = i + 1; + break; + } + } + + errors_and_hashes.append( + &mut self.purge( + candidate_files_to_delete + .into_iter() + .take(take) + .map(|(hash, _, _)| hash) + .collect(), + ), + ); + + Some(errors_and_hashes) + } else { + None + } + } else { + None + } + } + + fn purge(&self, hashes: Vec>) -> Vec> { + hashes + .into_iter() + .map(|sha256_digest| { + let sha256_hex = hex::encode(&sha256_digest); + let is_blocked = self.is_blocked_filehash(&sha256_digest)?; + + self.purge_filehash(sha256_digest, is_blocked)?; + + Ok(sha256_hex) + }) + .collect() + } } fn parse_metadata(value: &[u8]) -> Result { @@ -994,6 +1243,12 @@ impl FilehashMetadata { Self { value: vec } } + pub fn update_last_access(&mut self) { + let now = utils::secs_since_unix_epoch().to_be_bytes(); + self.value.truncate(16); + self.value.extend_from_slice(&now); + } + pub fn value(&self) -> &[u8] { &self.value } @@ -1025,6 +1280,15 @@ impl FilehashMetadata { }) } + pub fn size(&self, sha256_digest: &[u8]) -> Result { + self.get_u64_val( + 0..8, + "file size", + sha256_digest, + "Invalid file size in filehash_metadata", + ) + } + pub fn creation(&self, sha256_digest: &[u8]) -> Result { self.get_u64_val( 8..16, @@ -1033,4 +1297,13 @@ impl FilehashMetadata { "Invalid creation time in filehash_metadata", ) } + + pub fn last_access(&self, sha256_digest: &[u8]) -> Result { + self.get_u64_val( + 16..24, + "last access time", + sha256_digest, + "Invalid last access time in filehash_metadata", + ) + } } diff --git a/src/database/mod.rs b/src/database/mod.rs index b564833b..e1389bc7 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -1103,6 +1103,8 @@ impl KeyValueDatabase { services().sending.start_handler(); + services().media.start_time_retention_checker(); + Self::start_cleanup_task().await; if services().globals.allow_check_for_updates() { Self::start_check_for_updates_task(); diff --git a/src/main.rs b/src/main.rs index 01af9ad2..1a00e9ff 100644 --- a/src/main.rs +++ b/src/main.rs @@ -49,7 +49,7 @@ static SUB_TABLES: [&str; 3] = ["well_known", "tls", "media"]; // Not doing `pro // Yeah, I know it's terrible, but since it seems the container users dont want syntax like A[B][C]="...", // this is what we have to deal with. Also see: https://github.com/SergioBenitez/Figment/issues/12#issuecomment-801449465 -static SUB_SUB_TABLES: [&str; 1] = ["directory_structure"]; +static SUB_SUB_TABLES: [&str; 2] = ["directory_structure", "retention"]; #[tokio::main] async fn main() { diff --git a/src/service/globals/mod.rs b/src/service/globals/mod.rs index ac77afe9..069adef3 100644 --- a/src/service/globals/mod.rs +++ b/src/service/globals/mod.rs @@ -8,7 +8,7 @@ use ruma::{ use crate::api::server_server::DestinationResponse; use crate::{ - config::{DirectoryStructure, MediaConfig, TurnConfig}, + config::{DirectoryStructure, MediaBackendConfig, TurnConfig}, services, Config, Error, Result, }; use futures_util::FutureExt; @@ -230,7 +230,7 @@ impl Service { // Remove this exception once other media backends are added #[allow(irrefutable_let_patterns)] - if let MediaConfig::FileSystem { path, .. } = &s.config.media { + if let MediaBackendConfig::FileSystem { path, .. } = &s.config.media.backend { fs::create_dir_all(path)?; } diff --git a/src/service/media/data.rs b/src/service/media/data.rs index f6da1788..9f1d48c9 100644 --- a/src/service/media/data.rs +++ b/src/service/media/data.rs @@ -1,11 +1,9 @@ use ruma::{OwnedServerName, ServerName, UserId}; use sha2::{digest::Output, Sha256}; -use crate::{Error, Result}; +use crate::{config::MediaRetentionConfig, Error, Result}; -use super::BlockedMediaInfo; - -use super::DbFileMeta; +use super::{BlockedMediaInfo, DbFileMeta, MediaType}; pub trait Data: Send + Sync { #[allow(clippy::too_many_arguments)] @@ -93,4 +91,24 @@ pub trait Data: Send + Sync { fn list_blocked(&self) -> Vec>; fn is_blocked_filehash(&self, sha256_digest: &[u8]) -> Result; + + /// Gets the files that need to be deleted from the media backend in order to meet the `space` + /// requirements, as specified in the retention config. Calling this also causes those files' + /// metadata to be deleted from the database. + fn files_to_delete( + &self, + sha256_digest: &[u8], + retention: &MediaRetentionConfig, + media_type: MediaType, + new_size: u64, + ) -> Result>>; + + /// Gets the files that need to be deleted from the media backend in order to meet the + /// time-based requirements (`created` and `accessed`), as specified in the retention config. + /// Calling this also causes those files' metadata to be deleted from the database. + fn cleanup_time_retention(&self, retention: &MediaRetentionConfig) -> Vec>; + + fn update_last_accessed(&self, server_name: &ServerName, media_id: &str) -> Result<()>; + + fn update_last_accessed_filehash(&self, sha256_digest: &[u8]) -> Result<()>; } diff --git a/src/service/media/mod.rs b/src/service/media/mod.rs index d9ae2b22..a26e615d 100644 --- a/src/service/media/mod.rs +++ b/src/service/media/mod.rs @@ -1,5 +1,5 @@ mod data; -use std::{fs, io::Cursor}; +use std::{fs, io::Cursor, sync::Arc}; pub use data::Data; use ruma::{ @@ -8,10 +8,10 @@ use ruma::{ OwnedServerName, ServerName, UserId, }; use sha2::{digest::Output, Digest, Sha256}; -use tracing::error; +use tracing::{error, info}; use crate::{ - config::{DirectoryStructure, MediaConfig}, + config::{DirectoryStructure, MediaBackendConfig}, services, utils, Error, Result, }; use image::imageops::FilterType; @@ -34,6 +34,29 @@ pub struct FileMeta { pub file: Vec, } +pub enum MediaType { + LocalMedia { thumbnail: bool }, + RemoteMedia { thumbnail: bool }, +} + +impl MediaType { + pub fn new(server_name: &ServerName, thumbnail: bool) -> Self { + if server_name == services().globals.server_name() { + Self::LocalMedia { thumbnail } + } else { + Self::RemoteMedia { thumbnail } + } + } + + pub fn is_thumb(&self) -> bool { + match self { + MediaType::LocalMedia { thumbnail } | MediaType::RemoteMedia { thumbnail } => { + *thumbnail + } + } + } +} + pub struct Service { pub db: &'static dyn Data, } @@ -47,6 +70,34 @@ pub struct BlockedMediaInfo { } impl Service { + pub fn start_time_retention_checker(self: &Arc) { + let self2 = Arc::clone(self); + if let Some(cleanup_interval) = services().globals.config.media.retention.cleanup_interval() + { + tokio::spawn(async move { + let mut i = cleanup_interval; + loop { + i.tick().await; + let _ = self2.try_purge_time_retention().await; + } + }); + } + } + + async fn try_purge_time_retention(&self) -> Result<()> { + info!("Checking if any media should be deleted due to time-based retention policies"); + let files = self + .db + .cleanup_time_retention(&services().globals.config.media.retention); + + let count = files.iter().filter(|res| res.is_ok()).count(); + info!("Found {count} media files to delete"); + + purge_files(files); + + Ok(()) + } + /// Uploads a file. pub async fn create( &self, @@ -59,6 +110,16 @@ impl Service { ) -> Result<()> { let (sha256_digest, sha256_hex) = generate_digests(file); + for error in self.clear_required_space( + &sha256_digest, + MediaType::new(servername, false), + size(file)?, + )? { + error!( + "Error deleting file to clear space when downloading/creating new media file: {error}" + ) + } + self.db.create_file_metadata( sha256_digest, size(file)?, @@ -93,6 +154,12 @@ impl Service { ) -> Result<()> { let (sha256_digest, sha256_hex) = generate_digests(file); + self.clear_required_space( + &sha256_digest, + MediaType::new(servername, true), + size(file)?, + )?; + self.db.create_thumbnail_metadata( sha256_digest, size(file)?, @@ -125,7 +192,7 @@ impl Service { return Ok(None); } - let file = get_file(&hex::encode(sha256_digest)).await?; + let file = self.get_file(&sha256_digest, None).await?; Ok(Some(FileMeta { content_disposition: content_disposition(filename, &content_type), @@ -180,7 +247,9 @@ impl Service { } // Using saved thumbnail - let file = get_file(&hex::encode(sha256_digest)).await?; + let file = self + .get_file(&sha256_digest, Some((servername, media_id))) + .await?; Ok(Some(FileMeta { content_disposition: content_disposition(filename, &content_type), @@ -202,7 +271,7 @@ impl Service { let content_disposition = content_disposition(filename.clone(), &content_type); // Generate a thumbnail - let file = get_file(&hex::encode(sha256_digest)).await?; + let file = self.get_file(&sha256_digest, None).await?; if let Ok(image) = image::load_from_memory(&file) { let original_width = image.width(); @@ -303,7 +372,7 @@ impl Service { return Ok(None); } - let file = get_file(&hex::encode(sha256_digest)).await?; + let file = self.get_file(&sha256_digest, None).await?; Ok(Some(FileMeta { content_disposition: content_disposition(filename, &content_type), @@ -416,14 +485,73 @@ impl Service { pub fn list_blocked(&self) -> Vec> { self.db.list_blocked() } + + pub fn clear_required_space( + &self, + sha256_digest: &[u8], + media_type: MediaType, + new_size: u64, + ) -> Result> { + let files = self.db.files_to_delete( + sha256_digest, + &services().globals.config.media.retention, + media_type, + new_size, + )?; + + let count = files.iter().filter(|r| r.is_ok()).count(); + + if count != 0 { + info!("Deleting {} files to clear space for new media file", count); + } + + Ok(purge_files(files)) + } + + /// Fetches the file from the configured media backend, as well as updating the "last accessed" + /// part of the metadata of the file + /// + /// If specified, the original file will also have it's last accessed time updated, if present + /// (use when accessing thumbnails) + async fn get_file( + &self, + sha256_digest: &[u8], + original_file_id: Option<(&ServerName, &str)>, + ) -> Result> { + let file = match &services().globals.config.media.backend { + MediaBackendConfig::FileSystem { + path, + directory_structure, + } => { + let path = services().globals.get_media_path( + path, + directory_structure, + &hex::encode(sha256_digest), + )?; + + let mut file = Vec::new(); + File::open(path).await?.read_to_end(&mut file).await?; + + file + } + }; + + if let Some((server_name, media_id)) = original_file_id { + self.db.update_last_accessed(server_name, media_id)?; + } + + self.db + .update_last_accessed_filehash(sha256_digest) + .map(|_| file) + } } /// Creates the media file, using the configured media backend /// /// Note: this function does NOT set the metadata related to the file pub async fn create_file(sha256_hex: &str, file: &[u8]) -> Result<()> { - match &services().globals.config.media { - MediaConfig::FileSystem { + match &services().globals.config.media.backend { + MediaBackendConfig::FileSystem { path, directory_structure, } => { @@ -439,25 +567,6 @@ pub async fn create_file(sha256_hex: &str, file: &[u8]) -> Result<()> { Ok(()) } -/// Fetches the file from the configured media backend -async fn get_file(sha256_hex: &str) -> Result> { - Ok(match &services().globals.config.media { - MediaConfig::FileSystem { - path, - directory_structure, - } => { - let path = services() - .globals - .get_media_path(path, directory_structure, sha256_hex)?; - - let mut file = Vec::new(); - File::open(path).await?.read_to_end(&mut file).await?; - - file - } - }) -} - /// Purges the given files from the media backend /// Returns a `Vec` of errors that occurred when attempting to delete the files /// @@ -477,8 +586,8 @@ fn purge_files(hashes: Vec>) -> Vec { /// /// Note: this does NOT remove the related metadata from the database fn delete_file(sha256_hex: &str) -> Result<()> { - match &services().globals.config.media { - MediaConfig::FileSystem { + match &services().globals.config.media.backend { + MediaBackendConfig::FileSystem { path, directory_structure, } => { diff --git a/src/service/mod.rs b/src/service/mod.rs index c328bf7e..832ca8ae 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -34,7 +34,7 @@ pub struct Services { pub admin: Arc, pub globals: globals::Service, pub key_backups: key_backups::Service, - pub media: media::Service, + pub media: Arc, pub sending: Arc, } @@ -119,7 +119,7 @@ impl Services { account_data: account_data::Service { db }, admin: admin::Service::build(), key_backups: key_backups::Service { db }, - media: media::Service { db }, + media: Arc::new(media::Service { db }), sending: sending::Service::build(db, &config), globals: globals::Service::load(db, config)?,