1
0
Fork 0
mirror of https://gitlab.com/famedly/conduit.git synced 2025-06-27 16:35:59 +00:00

feat: janky s3 implementation

This commit is contained in:
Samuel Meenzen 2025-05-31 03:00:03 +02:00
parent 18a83ea292
commit 1124334e5f
No known key found for this signature in database
7 changed files with 375 additions and 125 deletions

40
Cargo.lock generated
View file

@ -500,6 +500,7 @@ dependencies = [
"rusqlite",
"rust-argon2",
"rust-rocksdb",
"rusty-s3",
"sd-notify",
"serde",
"serde_html_form",
@ -1732,6 +1733,16 @@ version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94"
[[package]]
name = "md-5"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf"
dependencies = [
"cfg-if",
"digest",
]
[[package]]
name = "memchr"
version = "2.7.4"
@ -2244,6 +2255,16 @@ version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a993555f31e5a609f617c12db6250dedcac1b0a85076912c436e6fc9b2c8e6a3"
[[package]]
name = "quick-xml"
version = "0.37.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "331e97a1af0bf59823e6eadffe373d7b27f485be8748f71471c662c1f269b7fb"
dependencies = [
"memchr",
"serde",
]
[[package]]
name = "quote"
version = "1.0.40"
@ -2803,6 +2824,25 @@ version = "1.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "955d28af4278de8121b7ebeb796b6a45735dc01436d898801014aced2773a3d6"
[[package]]
name = "rusty-s3"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f51a5a6b15f25d3e10c068039ee13befb6110fcb36c2b26317bcbdc23484d96"
dependencies = [
"base64 0.22.1",
"hmac",
"md-5",
"percent-encoding",
"quick-xml",
"serde",
"serde_json",
"sha2",
"time",
"url",
"zeroize",
]
[[package]]
name = "ryu"
version = "1.0.20"

View file

@ -151,6 +151,7 @@ tikv-jemallocator = { version = "0.6", features = [
"unprefixed_malloc_on_supported_platforms",
], optional = true }
rusty-s3 = "0.7.0"
sd-notify = { version = "0.4", optional = true }
# Used for matrix spec type definitions and helpers

View file

@ -242,6 +242,41 @@ impl From<IncompleteConfig> for Config {
}),
directory_structure,
},
IncompleteMediaBackendConfig::S3 {
directory_structure,
endpoint,
bucket,
region,
key,
secret,
path_prefix,
path_style,
expiration,
} => {
let path_style = if path_style {
rusty_s3::UrlStyle::Path
} else {
rusty_s3::UrlStyle::VirtualHost
};
let bucket = rusty_s3::Bucket::new(
endpoint.clone(),
path_style,
bucket.clone(),
region.clone(),
)
.expect("S3 config is valid");
let credentials = rusty_s3::Credentials::new(key, secret);
MediaBackendConfig::S3 {
directory_structure,
bucket,
credentials,
duration: expiration.unwrap_or_else(|| Duration::from_secs(30)),
path_prefix: path_prefix.unwrap_or("/".to_owned()),
}
}
},
retention: media.retention.into(),
};
@ -481,6 +516,20 @@ pub enum IncompleteMediaBackendConfig {
#[serde(default)]
directory_structure: DirectoryStructure,
},
S3 {
#[serde(default)]
directory_structure: DirectoryStructure,
endpoint: Url,
bucket: String,
region: String,
key: String,
secret: String,
path_prefix: Option<String>,
#[serde(default = "false_fn")]
path_style: bool,
#[serde(default, with = "humantime_serde::option")]
expiration: Option<Duration>,
},
}
impl Default for IncompleteMediaBackendConfig {
@ -498,6 +547,13 @@ pub enum MediaBackendConfig {
path: String,
directory_structure: DirectoryStructure,
},
S3 {
directory_structure: DirectoryStructure,
bucket: rusty_s3::Bucket,
credentials: rusty_s3::Credentials,
duration: Duration,
path_prefix: String,
},
}
#[derive(Debug, Clone, Deserialize)]

View file

@ -1079,6 +1079,7 @@ impl KeyValueDatabase {
depth: *depth,
},
file.file_name().to_str().unwrap(),
true,
)?,
)
.await?;

View file

@ -586,7 +586,8 @@ impl Service {
RoomMessageEventContent::text_plain(
"Expected code block in command body. Add --help for details.",
)
}.into()
}
.into()
}
AdminCommand::UnregisterAppservice {
appservice_identifier,
@ -599,7 +600,8 @@ impl Service {
Err(e) => RoomMessageEventContent::text_plain(format!(
"Failed to unregister appservice: {e}"
)),
}.into(),
}
.into(),
AdminCommand::ListAppservices => {
let appservices = services().appservice.iter_ids().await;
let output = format!(
@ -637,7 +639,8 @@ impl Service {
RoomMessageEventContent::text_plain(&msg)
}
Err(e) => RoomMessageEventContent::text_plain(e.to_string()),
}.into(),
}
.into(),
AdminCommand::IncomingFederation => {
let map = services().globals.roomid_federationhandletime.read().await;
let mut msg: String = format!("Handling {} incoming pdus:\n", map.len());
@ -678,7 +681,8 @@ impl Service {
))
} else {
RoomMessageEventContent::text_plain("Event not found.")
}.into()
}
.into()
}
AdminCommand::ParsePdu => {
if body.len() > 2 && body[0].trim() == "```" && body.last().unwrap().trim() == "```"
@ -712,7 +716,8 @@ impl Service {
}
} else {
RoomMessageEventContent::text_plain("Expected code block in command body.")
}.into()
}
.into()
}
AdminCommand::GetPdu { event_id } => {
let mut outlier = false;
@ -750,7 +755,8 @@ impl Service {
)
}
None => RoomMessageEventContent::text_plain("PDU not found."),
}.into()
}
.into()
}
AdminCommand::MemoryUsage => {
let response1 = services().memory_usage().await;
@ -758,7 +764,8 @@ impl Service {
RoomMessageEventContent::text_plain(format!(
"Services:\n{response1}\n\nDatabase:\n{response2}"
)).into()
))
.into()
}
AdminCommand::ClearDatabaseCaches { amount } => {
services().globals.db.clear_caches(amount);
@ -783,7 +790,8 @@ impl Service {
Err(e) => {
return Ok(RoomMessageEventContent::text_plain(format!(
"The supplied username is not a valid username: {e}"
)).into())
))
.into())
}
};
@ -791,7 +799,8 @@ impl Service {
if user_id.server_name() != services().globals.server_name() {
return Ok(RoomMessageEventContent::text_plain(
"The specified user is not from this server!",
).into());
)
.into());
};
// Check if the specified user is valid
@ -805,7 +814,8 @@ impl Service {
{
return Ok(RoomMessageEventContent::text_plain(
"The specified user does not exist!",
).into());
)
.into());
}
let new_password = utils::random_string(AUTO_GEN_PASSWORD_LENGTH);
@ -820,7 +830,8 @@ impl Service {
Err(e) => RoomMessageEventContent::text_plain(format!(
"Couldn't reset the password for user {user_id}: {e}"
)),
}.into()
}
.into()
}
AdminCommand::CreateUser { username, password } => {
let password =
@ -834,7 +845,8 @@ impl Service {
Err(e) => {
return Ok(RoomMessageEventContent::text_plain(format!(
"The supplied username is not a valid username: {e}"
)).into())
))
.into())
}
};
@ -842,18 +854,21 @@ impl Service {
if user_id.server_name() != services().globals.server_name() {
return Ok(RoomMessageEventContent::text_plain(
"The specified user is not from this server!",
).into());
)
.into());
};
if user_id.is_historical() {
return Ok(RoomMessageEventContent::text_plain(format!(
"Userid {user_id} is not allowed due to historical"
)).into());
))
.into());
}
if services().users.exists(&user_id)? {
return Ok(RoomMessageEventContent::text_plain(format!(
"Userid {user_id} already exists"
)).into());
))
.into());
}
// Create user
services().users.create(&user_id, Some(password.as_str()))?;
@ -890,10 +905,10 @@ impl Service {
// Inhibit login does not work for guests
RoomMessageEventContent::text_plain(format!(
"Created user with user_id: {user_id} and password: {password}"
)).into()
))
.into()
}
AdminCommand::AllowRegistration { status } => {
if let Some(status) = status {
AdminCommand::AllowRegistration { status } => if let Some(status) = status {
services().globals.set_registration(status).await;
RoomMessageEventContent::text_plain(if status {
"Registration is now enabled"
@ -908,8 +923,8 @@ impl Service {
"Registration is currently disabled"
},
)
}.into()
}
.into(),
AdminCommand::DisableRoom { room_id } => {
services().rooms.metadata.disable_room(&room_id, true)?;
RoomMessageEventContent::text_plain("Room disabled.").into()
@ -952,6 +967,7 @@ impl Service {
services()
.media
.purge_from_user(&user_id, purge_media.force_filehash, after)
.await
.len()
} else {
0
@ -1020,13 +1036,17 @@ impl Service {
failed_count += services()
.media
.purge_from_user(user_id, purge_media.force_filehash, after)
.await
.len();
}
}
let mut message = format!("Deactivated {deactivation_count} accounts.");
if !admins.is_empty() {
message.push_str(&format!("\nSkipped admin accounts: {:?}. Use --force to deactivate admin accounts",admins.join(", ")));
message.push_str(&format!(
"\nSkipped admin accounts: {:?}. Use --force to deactivate admin accounts",
admins.join(", ")
));
}
if failed_count != 0 {
message.push_str(&format!(
@ -1039,14 +1059,19 @@ impl Service {
RoomMessageEventContent::text_plain(
"Expected code block in command body. Add --help for details.",
)
}.into()
}
.into()
}
AdminCommand::QueryMedia { mxc } => {
let Ok((server_name, media_id)) = mxc.parts() else {
return Ok(RoomMessageEventContent::text_plain("Invalid media MXC").into());
};
let MediaQuery{ is_blocked, source_file, thumbnails } = services().media.query(server_name, media_id)?;
let MediaQuery {
is_blocked,
source_file,
thumbnails,
} = services().media.query(server_name, media_id)?;
let mut message = format!("Is blocked Media ID: {is_blocked}");
if let Some(MediaQueryFileInfo {
@ -1057,14 +1082,16 @@ impl Service {
unauthenticated_access_permitted,
is_blocked_via_filehash,
file_info: time_info,
}) = source_file {
}) = source_file
{
message.push_str("\n\nInformation on full (non-thumbnail) file:\n");
if let Some(FileInfo {
creation,
last_access,
size,
}) = time_info {
}) = time_info
{
message.push_str(&format!("\nIs stored: true\nCreated at: {}\nLast accessed at: {}\nSize of file: {}",
DateTime::from_timestamp(creation.try_into().unwrap_or(i64::MAX),0).expect("Timestamp is within range"),
DateTime::from_timestamp(last_access.try_into().unwrap_or(i64::MAX),0).expect("Timestamp is within range"),
@ -1093,7 +1120,7 @@ impl Service {
message.push_str("\n\nInformation on thumbnails of media:");
}
for MediaQueryThumbInfo{
for MediaQueryThumbInfo {
width,
height,
sha256_hex,
@ -1102,13 +1129,15 @@ impl Service {
unauthenticated_access_permitted,
is_blocked_via_filehash,
file_info: time_info,
} in thumbnails {
} in thumbnails
{
message.push_str(&format!("\n\nDimensions: {width}x{height}"));
if let Some(FileInfo {
creation,
last_access,
size,
}) = time_info {
}) = time_info
{
message.push_str(&format!("\nIs stored: true\nCreated at: {}\nLast accessed at: {}\nSize of file: {}",
DateTime::from_timestamp(creation.try_into().unwrap_or(i64::MAX),0).expect("Timestamp is within range"),
DateTime::from_timestamp(last_access.try_into().unwrap_or(i64::MAX),0).expect("Timestamp is within range"),
@ -1142,7 +1171,8 @@ impl Service {
file,
content_type,
content_disposition,
} = client_server::media::get_content(server_name, media_id.to_owned(), true, true).await?;
} = client_server::media::get_content(server_name, media_id.to_owned(), true, true)
.await?;
if let Ok(image) = image::load_from_memory(&file) {
let filename = content_disposition.and_then(|cd| cd.filename);
@ -1182,10 +1212,7 @@ impl Service {
}
}
AdminCommand::ListMedia {
user_server_filter: ListMediaArgs {
user,
server,
},
user_server_filter: ListMediaArgs { user, server },
include_thumbnails,
content_type,
uploaded_before,
@ -1198,7 +1225,7 @@ impl Service {
r#"<table><thead><tr><th scope="col">MXC URI</th><th scope="col">Dimensions (if thumbnail)</th><th scope="col">Created/Downloaded at</th><th scope="col">Uploader</th><th scope="col">Content-Type</th><th scope="col">Filename</th><th scope="col">Size</th></tr></thead><tbody>"#,
);
for MediaListItem{
for MediaListItem {
server_name,
media_id,
uploader_localpart,
@ -1208,8 +1235,7 @@ impl Service {
size,
creation,
} in services().media.list(
user
.map(ServerNameOrUserId::UserId)
user.map(ServerNameOrUserId::UserId)
.or_else(|| server.map(ServerNameOrUserId::ServerName)),
include_thumbnails,
content_type.as_deref(),
@ -1224,15 +1250,20 @@ impl Service {
.transpose()
.map_err(|_| Error::AdminCommand("Timestamp must be after unix epoch"))?
.as_ref()
.map(Duration::as_secs)
.map(Duration::as_secs),
)? {
let user_id = uploader_localpart.map(|localpart| format!("@{localpart}:{server_name}")).unwrap_or_default();
let user_id = uploader_localpart
.map(|localpart| format!("@{localpart}:{server_name}"))
.unwrap_or_default();
let content_type = content_type.unwrap_or_default();
let filename = filename.unwrap_or_default();
let dimensions = dimensions.map(|(w, h)| format!("{w}x{h}")).unwrap_or_default();
let dimensions = dimensions
.map(|(w, h)| format!("{w}x{h}"))
.unwrap_or_default();
let size = ByteSize::b(size).display().si();
let creation = DateTime::from_timestamp(creation.try_into().unwrap_or(i64::MAX),0).expect("Timestamp is within range");
let creation =
DateTime::from_timestamp(creation.try_into().unwrap_or(i64::MAX), 0)
.expect("Timestamp is within range");
markdown_message
.push_str(&format!("\n| mxc://{server_name}/{media_id} | {dimensions} | {creation} | {user_id} | {content_type} | {filename} | {size} |"));
@ -1245,11 +1276,10 @@ impl Service {
html_message.push_str("</tbody></table>");
RoomMessageEventContent::text_html(markdown_message, html_message).into()
},
AdminCommand::PurgeMedia => media_from_body(body).map_or_else(
|message| message,
|media| {
let failed_count = services().media.purge(&media, true).len();
}
AdminCommand::PurgeMedia => match media_from_body(body) {
Ok(media) => {
let failed_count = services().media.purge(&media, true).await.len();
if failed_count == 0 {
RoomMessageEventContent::text_plain("Successfully purged media")
@ -1257,9 +1287,11 @@ impl Service {
RoomMessageEventContent::text_plain(format!(
"Failed to delete {failed_count} media, check logs for more details"
))
}.into()
}
.into()
}
Err(message) => message,
},
),
AdminCommand::PurgeMediaFromUsers {
from_last,
force_filehash,
@ -1279,6 +1311,7 @@ impl Service {
failed_count += services()
.media
.purge_from_user(user_id, force_filehash, after)
.await
.len();
}
@ -1293,7 +1326,8 @@ impl Service {
RoomMessageEventContent::text_plain(
"Expected code block in command body. Add --help for details.",
)
}.into()
}
.into()
}
AdminCommand::PurgeMediaFromServer {
server_id: server_name,
@ -1311,6 +1345,7 @@ impl Service {
let failed_count = services()
.media
.purge_from_server(&server_name, force_filehash, after)
.await
.len();
if failed_count == 0 {
@ -1321,14 +1356,15 @@ impl Service {
RoomMessageEventContent::text_plain(format!(
"Failed to purge {failed_count} media, check logs for more details"
))
}.into()
}
AdminCommand::BlockMedia { and_purge, reason } => media_from_body(body).map_or_else(
|message| message,
|media| {
.into()
}
AdminCommand::BlockMedia { and_purge, reason } => match media_from_body(body) {
Err(message) => message,
Ok(media) => {
let failed_count = services().media.block(&media, reason).len();
let failed_purge_count = if and_purge {
services().media.purge(&media, true).len()
services().media.purge(&media, true).await.len()
} else {
0
};
@ -1338,15 +1374,16 @@ impl Service {
(false, true) => RoomMessageEventContent::text_plain(format!(
"Failed to block {failed_count} media, check logs for more details"
)),
(true, false ) => RoomMessageEventContent::text_plain(format!(
(true, false) => RoomMessageEventContent::text_plain(format!(
"Failed to purge {failed_purge_count} media, check logs for more details"
)),
(false, false) => RoomMessageEventContent::text_plain(format!(
"Failed to block {failed_count}, and purge {failed_purge_count} media, check logs for more details"
))
}.into()
)),
}
.into()
}
},
),
AdminCommand::BlockMediaFromUsers { from_last, reason } => {
let after = from_last.map(unix_secs_from_duration).transpose()?;
@ -1382,7 +1419,8 @@ impl Service {
RoomMessageEventContent::text_plain(
"Expected code block in command body. Add --help for details.",
)
}.into()
}
.into()
}
AdminCommand::ListBlockedMedia => {
let mut markdown_message = String::from(
@ -1399,7 +1437,8 @@ impl Service {
unix_secs,
reason,
sha256_hex,
}) = media else {
}) = media
else {
continue;
};
@ -1412,8 +1451,9 @@ impl Service {
.flatten()
.expect("Time is valid");
markdown_message
.push_str(&format!("\n| {sha256_hex} | mxc://{server_name}/{media_id} | {time} | {reason} |"));
markdown_message.push_str(&format!(
"\n| {sha256_hex} | mxc://{server_name}/{media_id} | {time} | {reason} |"
));
html_message.push_str(&format!(
"<tr><td>{sha256_hex}</td><td>mxc://{server_name}/{media_id}</td><td>{time}</td><td>{reason}</td></tr>",
@ -1435,7 +1475,8 @@ impl Service {
RoomMessageEventContent::text_plain(format!(
"Failed to unblock {failed_count} media, check logs for more details"
))
}.into()
}
.into()
},
),
AdminCommand::SignJson => {
@ -1460,7 +1501,8 @@ impl Service {
RoomMessageEventContent::text_plain(
"Expected code block in command body. Add --help for details.",
)
}.into()
}
.into()
}
AdminCommand::VerifyJson => {
if body.len() > 2 && body[0].trim() == "```" && body.last().unwrap().trim() == "```"
@ -1521,7 +1563,8 @@ impl Service {
RoomMessageEventContent::text_plain(
"Expected code block in command body. Add --help for details.",
)
}.into()
}
.into()
}
AdminCommand::HashAndSignEvent { room_version_id } => {
if body.len() > 2
@ -1536,7 +1579,10 @@ impl Service {
services().globals.server_name().as_str(),
services().globals.keypair(),
&mut value,
&room_version_id.rules().expect("Supported room version has rules").redaction,
&room_version_id
.rules()
.expect("Supported room version has rules")
.redaction,
) {
RoomMessageEventContent::text_plain(format!("Invalid event: {e}"))
} else {
@ -1551,7 +1597,8 @@ impl Service {
RoomMessageEventContent::text_plain(
"Expected code block in command body. Add --help for details.",
)
}.into()
}
.into()
}
AdminCommand::RemoveAlias { alias } => {
if alias.server_name() != services().globals.server_name() {
@ -1576,7 +1623,8 @@ impl Service {
.alias
.remove_alias(&alias, services().globals.server_user())?;
RoomMessageEventContent::text_plain("Alias removed successfully")
}.into()
}
.into()
}
};

View file

@ -230,8 +230,6 @@ impl Service {
shutdown: AtomicBool::new(false),
};
// Remove this exception once other media backends are added
#[allow(irrefutable_let_patterns)]
if let MediaBackendConfig::FileSystem { path, .. } = &s.config.media.backend {
fs::create_dir_all(path)?;
}
@ -489,6 +487,7 @@ impl Service {
media_directory: &str,
directory_structure: &DirectoryStructure,
sha256_hex: &str,
create: bool,
) -> Result<PathBuf> {
let mut r = PathBuf::new();
r.push(media_directory);
@ -501,8 +500,10 @@ impl Service {
r.push(current_path);
}
if create {
// Create all directories leading up to file
fs::create_dir_all(&r).inspect_err(|e| error!("Error creating leading directories for media with sha256 hash of {sha256_hex}: {e}"))?;
}
r.push(filename);
} else {
@ -512,6 +513,16 @@ impl Service {
Ok(r)
}
pub fn get_media_path_string(
&self,
media_directory: &str,
directory_structure: &DirectoryStructure,
sha256_hex: &str,
) -> Result<String> {
let path = self.get_media_path(media_directory, directory_structure, sha256_hex, false)?;
Ok(path.to_str().expect("Media path is valid UTF-8").to_owned())
}
pub fn shutdown(&self) {
self.shutdown.store(true, atomic::Ordering::Relaxed);
// On shutdown

View file

@ -8,13 +8,14 @@ use ruma::{
OwnedServerName, ServerName, UserId,
};
use sha2::{digest::Output, Digest, Sha256};
use tracing::{error, info};
use tracing::{error, info, warn};
use crate::{
config::{DirectoryStructure, MediaBackendConfig},
services, utils, Error, Result,
};
use image::imageops::FilterType;
use rusty_s3::S3Action;
pub struct DbFileMeta {
pub sha256_digest: Vec<u8>,
@ -142,7 +143,7 @@ impl Service {
let count = files.iter().filter(|res| res.is_ok()).count();
info!("Found {count} media files to delete");
purge_files(files);
purge_files(files).await;
Ok(())
}
@ -159,11 +160,14 @@ impl Service {
) -> Result<()> {
let (sha256_digest, sha256_hex) = generate_digests(file);
for error in self.clear_required_space(
for error in self
.clear_required_space(
&sha256_digest,
MediaType::new(servername, false),
size(file)?,
)? {
)
.await?
{
error!(
"Error deleting file to clear space when downloading/creating new media file: {error}"
)
@ -207,7 +211,8 @@ impl Service {
&sha256_digest,
MediaType::new(servername, true),
size(file)?,
)?;
)
.await?;
self.db.create_thumbnail_metadata(
sha256_digest,
@ -444,10 +449,14 @@ impl Service {
/// purged have that sha256 hash.
///
/// Returns errors for all the files that were failed to be deleted, if any.
pub fn purge(&self, media: &[(OwnedServerName, String)], force_filehash: bool) -> Vec<Error> {
pub async fn purge(
&self,
media: &[(OwnedServerName, String)],
force_filehash: bool,
) -> Vec<Error> {
let hashes = self.db.purge_and_get_hashes(media, force_filehash);
purge_files(hashes)
purge_files(hashes).await
}
/// Purges all (past a certain time in unix seconds, if specified) media
@ -462,7 +471,7 @@ impl Service {
///
/// Note: it only currently works for local users, as we cannot determine who
/// exactly uploaded the file when it comes to remove users.
pub fn purge_from_user(
pub async fn purge_from_user(
&self,
user_id: &UserId,
force_filehash: bool,
@ -472,7 +481,7 @@ impl Service {
.db
.purge_and_get_hashes_from_user(user_id, force_filehash, after);
purge_files(hashes)
purge_files(hashes).await
}
/// Purges all (past a certain time in unix seconds, if specified) media
@ -484,7 +493,7 @@ impl Service {
/// purged have that sha256 hash.
///
/// Returns errors for all the files that were failed to be deleted, if any.
pub fn purge_from_server(
pub async fn purge_from_server(
&self,
server_name: &ServerName,
force_filehash: bool,
@ -494,7 +503,7 @@ impl Service {
.db
.purge_and_get_hashes_from_server(server_name, force_filehash, after);
purge_files(hashes)
purge_files(hashes).await
}
/// Checks whether the media has been blocked by administrators, returning either
@ -558,7 +567,7 @@ impl Service {
self.db.list_blocked()
}
pub fn clear_required_space(
pub async fn clear_required_space(
&self,
sha256_digest: &[u8],
media_type: MediaType,
@ -577,7 +586,7 @@ impl Service {
info!("Deleting {} files to clear space for new media file", count);
}
Ok(purge_files(files))
Ok(purge_files(files).await)
}
/// Fetches the file from the configured media backend, as well as updating the "last accessed"
@ -599,6 +608,7 @@ impl Service {
path,
directory_structure,
&hex::encode(sha256_digest),
true,
)?;
let mut file = Vec::new();
@ -606,6 +616,28 @@ impl Service {
file
}
MediaBackendConfig::S3 {
directory_structure,
bucket,
credentials,
duration,
path_prefix,
} => {
let path = services().globals.get_media_path_string(
path_prefix,
directory_structure,
&hex::encode(sha256_digest),
)?;
let url = bucket.get_object(Some(credentials), &path).sign(*duration);
let response = services().globals.default_client().get(url).send().await?;
if !response.status().is_success() {
warn!("S3 request error:\n{}", response.text().await?);
return Err(Error::bad_database("Cannot read media file"));
}
response.bytes().await?.to_vec()
}
};
if let Some((server_name, media_id)) = original_file_id {
@ -627,13 +659,42 @@ pub async fn create_file(sha256_hex: &str, file: &[u8]) -> Result<()> {
path,
directory_structure,
} => {
let path = services()
let path =
services()
.globals
.get_media_path(path, directory_structure, sha256_hex)?;
.get_media_path(path, directory_structure, sha256_hex, true)?;
let mut f = File::create(path).await?;
f.write_all(file).await?;
}
MediaBackendConfig::S3 {
directory_structure,
bucket,
credentials,
duration,
path_prefix,
} => {
let path = services().globals.get_media_path_string(
path_prefix,
directory_structure,
sha256_hex,
)?;
let url = bucket.put_object(Some(credentials), &path).sign(*duration);
let resp = services()
.globals
.default_client()
.put(url)
// todo: to_owned is probably awful for performance
.body(file.to_owned())
.send()
.await?;
if !resp.status().is_success() {
warn!("S3 request error:\n{}", resp.text().await?);
return Err(Error::bad_database("Cannot create media file"));
}
}
}
Ok(())
@ -643,21 +704,25 @@ pub async fn create_file(sha256_hex: &str, file: &[u8]) -> Result<()> {
/// Returns a `Vec` of errors that occurred when attempting to delete the files
///
/// Note: this does NOT remove the related metadata from the database
fn purge_files(hashes: Vec<Result<String>>) -> Vec<Error> {
hashes
.into_iter()
.map(|hash| match hash {
Ok(v) => delete_file(&v),
Err(e) => Err(e),
})
.filter_map(|r| if let Err(e) = r { Some(e) } else { None })
.collect()
async fn purge_files(hashes: Vec<Result<String>>) -> Vec<Error> {
let mut errors = Vec::new();
for hash in hashes {
match hash {
Ok(v) => {
if let Err(e) = delete_file(&v).await {
errors.push(e);
}
}
Err(e) => errors.push(e),
}
}
errors
}
/// Deletes the given file from the media backend
///
/// Note: this does NOT remove the related metadata from the database
fn delete_file(sha256_hex: &str) -> Result<()> {
async fn delete_file(sha256_hex: &str) -> Result<()> {
match &services().globals.config.media.backend {
MediaBackendConfig::FileSystem {
path,
@ -666,7 +731,7 @@ fn delete_file(sha256_hex: &str) -> Result<()> {
let mut path =
services()
.globals
.get_media_path(path, directory_structure, sha256_hex)?;
.get_media_path(path, directory_structure, sha256_hex, true)?;
if let Err(e) = fs::remove_file(&path) {
// Multiple files with the same filehash might be requseted to be deleted
@ -696,6 +761,34 @@ fn delete_file(sha256_hex: &str) -> Result<()> {
}
}
}
MediaBackendConfig::S3 {
directory_structure,
bucket,
credentials,
duration,
path_prefix,
} => {
let path = services().globals.get_media_path_string(
path_prefix,
directory_structure,
sha256_hex,
)?;
let url = bucket
.delete_object(Some(credentials), &path)
.sign(*duration);
let resp = services()
.globals
.default_client()
.delete(url)
.send()
.await?;
if !resp.status().is_success() {
warn!("S3 request error:\n{}", resp.text().await?);
return Err(Error::bad_database("Cannot delete media file"));
}
}
}
Ok(())