1
0
Fork 0
mirror of https://gitlab.com/famedly/conduit.git synced 2025-07-22 17:18:35 +00:00

feat(media): use file's sha256 for on-disk name & make directory configurable

In addition, metadata about the file, such as creation time, last access, and
file size, are stored in the database
This commit is contained in:
Matthias Ahouansou 2025-03-16 17:40:55 +00:00
parent 937521fcf1
commit 70d7f77363
No known key found for this signature in database
14 changed files with 840 additions and 286 deletions

View file

@ -2,12 +2,13 @@ pub mod abstraction;
pub mod key_value;
use crate::{
service::rooms::timeline::PduCount, services, utils, Config, Error, PduEvent, Result, Services,
SERVICES,
service::{globals, rooms::timeline::PduCount},
services, utils, Config, Error, PduEvent, Result, Services, SERVICES,
};
use abstraction::{KeyValueDatabaseEngine, KvTree};
use base64::{engine::general_purpose, Engine};
use directories::ProjectDirs;
use key_value::media::FilehashMetadata;
use lru_cache::LruCache;
use ruma::{
@ -17,23 +18,50 @@ use ruma::{
GlobalAccountDataEvent, GlobalAccountDataEventType, StateEventType,
},
push::Ruleset,
CanonicalJsonValue, EventId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId,
UserId,
CanonicalJsonValue, EventId, OwnedDeviceId, OwnedEventId, OwnedMxcUri, OwnedRoomId,
OwnedUserId, RoomId, UserId,
};
use serde::Deserialize;
use sha2::{Digest, Sha256};
use std::{
collections::{BTreeMap, HashMap, HashSet},
fs::{self, remove_dir_all},
io::Write,
mem::size_of,
path::Path,
path::{Path, PathBuf},
sync::{Arc, Mutex, RwLock},
time::Duration,
time::{Duration, UNIX_EPOCH},
};
use tokio::time::interval;
use tokio::{io::AsyncReadExt, time::interval};
use tracing::{debug, error, info, warn};
/// This trait should only be used for migrations, and hence should never be made "pub"
trait GlobalsMigrationsExt {
/// As the name states, old version of `get_media_file`, only for usage in migrations
fn get_media_file_old_only_use_for_migrations(&self, key: &[u8]) -> PathBuf;
/// As the name states, this should only be used for migrations.
fn get_media_folder_only_use_for_migrations(&self) -> PathBuf;
}
impl GlobalsMigrationsExt for globals::Service {
fn get_media_file_old_only_use_for_migrations(&self, key: &[u8]) -> PathBuf {
let mut r = PathBuf::new();
r.push(self.config.database_path.clone());
r.push("media");
r.push(general_purpose::URL_SAFE_NO_PAD.encode(key));
r
}
fn get_media_folder_only_use_for_migrations(&self) -> PathBuf {
let mut r = PathBuf::new();
r.push(self.config.database_path.clone());
r.push("media");
r
}
}
pub struct KeyValueDatabase {
_db: Arc<dyn KeyValueDatabaseEngine>,
@ -148,7 +176,11 @@ pub struct KeyValueDatabase {
pub(super) roomusertype_roomuserdataid: Arc<dyn KvTree>, // RoomUserType = Room + User + Type
//pub media: media::Media,
pub(super) mediaid_file: Arc<dyn KvTree>, // MediaId = MXC + WidthHeight + ContentDisposition + ContentType
pub(super) servernamemediaid_metadata: Arc<dyn KvTree>, // Servername + MediaID -> content sha256 + Filename + ContentType + extra 0xff byte if media is allowed on unauthenticated endpoints
pub(super) filehash_servername_mediaid: Arc<dyn KvTree>, // sha256 of content + Servername + MediaID, used to delete dangling references to filehashes from servernamemediaid
pub(super) filehash_metadata: Arc<dyn KvTree>, // sha256 of content -> file size + creation time + last access time
pub(super) thumbnailid_metadata: Arc<dyn KvTree>, // ThumbnailId = Servername + MediaID + width + height -> Filename + ContentType + extra 0xff byte if media is allowed on unauthenticated endpoints
pub(super) filehash_thumbnailid: Arc<dyn KvTree>, // sha256 of content + "ThumbnailId", as defined above. Used to dangling references to filehashes from thumbnailIds
//pub key_backups: key_backups::KeyBackups,
pub(super) backupid_algorithm: Arc<dyn KvTree>, // BackupId = UserId + Version(Count)
pub(super) backupid_etag: Arc<dyn KvTree>, // BackupId = UserId + Version(Count)
@ -352,7 +384,11 @@ impl KeyValueDatabase {
referencedevents: builder.open_tree("referencedevents")?,
roomuserdataid_accountdata: builder.open_tree("roomuserdataid_accountdata")?,
roomusertype_roomuserdataid: builder.open_tree("roomusertype_roomuserdataid")?,
mediaid_file: builder.open_tree("mediaid_file")?,
servernamemediaid_metadata: builder.open_tree("servernamemediaid_metadata")?,
filehash_servername_mediaid: builder.open_tree("filehash_servername_mediaid")?,
filehash_metadata: builder.open_tree("filehash_metadata")?,
thumbnailid_metadata: builder.open_tree("thumbnailid_metadata")?,
filehash_thumbnailid: builder.open_tree("filehash_thumbnailid")?,
backupid_algorithm: builder.open_tree("backupid_algorithm")?,
backupid_etag: builder.open_tree("backupid_etag")?,
backupkeyid_backup: builder.open_tree("backupkeyid_backup")?,
@ -415,7 +451,7 @@ impl KeyValueDatabase {
}
// If the database has any data, perform data migrations before starting
let latest_database_version = 16;
let latest_database_version = 17;
if services().users.count()? > 0 {
// MIGRATIONS
@ -462,16 +498,19 @@ impl KeyValueDatabase {
}
if services().globals.database_version()? < 3 {
let tree = db._db.open_tree("mediaid_file")?;
// Move media to filesystem
for (key, content) in db.mediaid_file.iter() {
for (key, content) in tree.iter() {
if content.is_empty() {
continue;
}
let path = services().globals.get_media_file(&key);
let path = services()
.globals
.get_media_file_old_only_use_for_migrations(&key);
let mut file = fs::File::create(path)?;
file.write_all(&content)?;
db.mediaid_file.insert(&key, &[])?;
tree.insert(&key, &[])?;
}
services().globals.bump_database_version(3)?;
@ -933,16 +972,23 @@ impl KeyValueDatabase {
}
if services().globals.database_version()? < 16 {
let tree = db._db.open_tree("mediaid_file")?;
// Reconstruct all media using the filesystem
db.mediaid_file.clear().unwrap();
tree.clear().unwrap();
for file in fs::read_dir(services().globals.get_media_folder()).unwrap() {
for file in fs::read_dir(
services()
.globals
.get_media_folder_only_use_for_migrations(),
)
.unwrap()
{
let file = file.unwrap();
let file_name = file.file_name().into_string().unwrap();
let mediaid = general_purpose::URL_SAFE_NO_PAD.decode(&file_name).unwrap();
if let Err(e) = migrate_content_disposition_format(mediaid, db) {
if let Err(e) = migrate_content_disposition_format(mediaid, &tree) {
error!("Error migrating media file with name \"{file_name}\": {e}");
return Err(e);
}
@ -952,6 +998,55 @@ impl KeyValueDatabase {
warn!("Migration: 13 -> 16 finished");
}
if services().globals.database_version()? < 17 {
warn!("Migrating media repository to new format. If you have a lot of media stored, this may take a while, so please be patiant!");
let tree = db._db.open_tree("mediaid_file")?;
tree.clear().unwrap();
let mxc_prefix = general_purpose::URL_SAFE_NO_PAD.encode(b"mxc://");
for file in fs::read_dir(
services()
.globals
.get_media_folder_only_use_for_migrations(),
)
.unwrap()
.filter_map(Result::ok)
.filter(|result| {
result.file_type().unwrap().is_file()
&& result
.file_name()
.to_str()
.unwrap()
.starts_with(&mxc_prefix)
}) {
let file_name = file.file_name().into_string().unwrap();
if let Err(e) = migrate_to_sha256_media(
db,
&file_name,
file.metadata()
.ok()
.and_then(|meta| meta.created().ok())
.and_then(|time| time.duration_since(UNIX_EPOCH).ok())
.map(|dur| dur.as_secs()),
file.metadata()
.ok()
.and_then(|meta| meta.accessed().ok())
.and_then(|time| time.duration_since(UNIX_EPOCH).ok())
.map(|dur| dur.as_secs()),
)
.await
{
error!("Error migrating media file with name \"{file_name}\": {e}");
return Err(e);
}
}
services().globals.bump_database_version(17)?;
warn!("Migration: 16 -> 17 finished");
}
assert_eq!(
services().globals.database_version().unwrap(),
latest_database_version
@ -1117,7 +1212,7 @@ impl KeyValueDatabase {
fn migrate_content_disposition_format(
mediaid: Vec<u8>,
db: &KeyValueDatabase,
tree: &Arc<dyn KvTree>,
) -> Result<(), Error> {
let mut parts = mediaid.rsplit(|&b| b == 0xff);
let mut removed_bytes = 0;
@ -1153,28 +1248,165 @@ fn migrate_content_disposition_format(
// Some file names are too long. Ignore those.
match fs::rename(
services().globals.get_media_file(&mediaid),
services().globals.get_media_file(&new_key),
services()
.globals
.get_media_file_old_only_use_for_migrations(&mediaid),
services()
.globals
.get_media_file_old_only_use_for_migrations(&new_key),
) {
Ok(_) => {
db.mediaid_file.insert(&new_key, &[])?;
tree.insert(&new_key, &[])?;
}
Err(_) => {
fs::rename(
services().globals.get_media_file(&mediaid),
services().globals.get_media_file(&shorter_key),
services()
.globals
.get_media_file_old_only_use_for_migrations(&mediaid),
services()
.globals
.get_media_file_old_only_use_for_migrations(&shorter_key),
)
.unwrap();
db.mediaid_file.insert(&shorter_key, &[])?;
tree.insert(&shorter_key, &[])?;
}
}
} else {
db.mediaid_file.insert(&mediaid, &[])?;
tree.insert(&mediaid, &[])?;
};
Ok(())
}
async fn migrate_to_sha256_media(
db: &KeyValueDatabase,
file_name: &str,
creation: Option<u64>,
last_accessed: Option<u64>,
) -> Result<()> {
use crate::service::media::size;
let media_info = general_purpose::URL_SAFE_NO_PAD.decode(file_name).unwrap();
let mxc_dimension_splitter_pos = media_info
.iter()
.position(|&b| b == 0xff)
.ok_or_else(|| Error::BadDatabase("Invalid format of media info from file's name"))?;
let mxc = utils::string_from_bytes(&media_info[..mxc_dimension_splitter_pos])
.map(OwnedMxcUri::from)
.map_err(|_| Error::BadDatabase("MXC from file's name is invalid UTF-8."))?;
let (server_name, media_id) = mxc
.parts()
.map_err(|_| Error::BadDatabase("MXC from file's name is invalid."))?;
let width_height = media_info
.get(mxc_dimension_splitter_pos + 1..mxc_dimension_splitter_pos + 9)
.ok_or_else(|| Error::BadDatabase("Invalid format of media info from file's name"))?;
let mut parts = media_info
.get(mxc_dimension_splitter_pos + 10..)
.ok_or_else(|| Error::BadDatabase("Invalid format of media info from file's name"))?
.split(|&b| b == 0xff);
let content_disposition_bytes = parts.next().ok_or_else(|| {
Error::BadDatabase(
"Media ID parsed from file's name is invalid: Missing Content Disposition.",
)
})?;
let content_disposition = content_disposition_bytes.try_into().unwrap_or_else(|_| {
ruma::http_headers::ContentDisposition::new(
ruma::http_headers::ContentDispositionType::Inline,
)
});
let content_type = parts
.next()
.map(|bytes| {
utils::string_from_bytes(bytes)
.map_err(|_| Error::BadDatabase("Content type from file's name is invalid UTF-8."))
})
.transpose()?;
let mut path = services()
.globals
.get_media_folder_only_use_for_migrations();
path.push(file_name);
let mut file = Vec::new();
tokio::fs::File::open(&path)
.await?
.read_to_end(&mut file)
.await?;
let sha256_digest = Sha256::digest(&file);
let mut zero_zero = 0u32.to_be_bytes().to_vec();
zero_zero.extend_from_slice(&0u32.to_be_bytes());
let mut key = sha256_digest.to_vec();
let now = utils::secs_since_unix_epoch();
let metadata = FilehashMetadata::new_with_times(
size(&file)?,
creation.unwrap_or(now),
last_accessed.unwrap_or(now),
);
db.filehash_metadata.insert(&key, metadata.value())?;
// If not a thumbnail
if width_height == zero_zero {
key.extend_from_slice(server_name.as_bytes());
key.push(0xff);
key.extend_from_slice(media_id.as_bytes());
db.filehash_servername_mediaid.insert(&key, &[])?;
let mut key = server_name.as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(media_id.as_bytes());
let mut value = sha256_digest.to_vec();
value.extend_from_slice(content_disposition.filename.unwrap_or_default().as_bytes());
value.push(0xff);
value.extend_from_slice(content_type.unwrap_or_default().as_bytes());
// To mark as available on unauthenticated endpoints
value.push(0xff);
db.servernamemediaid_metadata.insert(&key, &value)?;
} else {
key.extend_from_slice(server_name.as_bytes());
key.push(0xff);
key.extend_from_slice(media_id.as_bytes());
key.push(0xff);
key.extend_from_slice(width_height);
db.filehash_thumbnailid.insert(&key, &[])?;
let mut key = server_name.as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(media_id.as_bytes());
key.push(0xff);
key.extend_from_slice(width_height);
let mut value = sha256_digest.to_vec();
value.extend_from_slice(content_disposition.filename.unwrap_or_default().as_bytes());
value.push(0xff);
value.extend_from_slice(content_type.unwrap_or_default().as_bytes());
// To mark as available on unauthenticated endpoints
value.push(0xff);
db.thumbnailid_metadata.insert(&key, &value)?;
}
crate::service::media::create_file(&hex::encode(sha256_digest), &file).await?;
tokio::fs::remove_file(path).await?;
Ok(())
}
/// Sets the emergency password and push rules for the @conduit account in case emergency password is set
fn set_emergency_access() -> Result<bool> {
let conduit_user = services().globals.server_user();