mirror of
https://gitlab.com/famedly/conduit.git
synced 2025-06-27 16:35:59 +00:00
Merge branch 'feat/s3' into 'next'
Draft: feat: janky s3 implementation See merge request famedly/conduit!751
This commit is contained in:
commit
b3aaab4bb1
7 changed files with 375 additions and 125 deletions
40
Cargo.lock
generated
40
Cargo.lock
generated
|
@ -500,6 +500,7 @@ dependencies = [
|
|||
"rusqlite",
|
||||
"rust-argon2",
|
||||
"rust-rocksdb",
|
||||
"rusty-s3",
|
||||
"sd-notify",
|
||||
"serde",
|
||||
"serde_html_form",
|
||||
|
@ -1733,6 +1734,16 @@ version = "0.7.3"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94"
|
||||
|
||||
[[package]]
|
||||
name = "md-5"
|
||||
version = "0.10.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"digest",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "memchr"
|
||||
version = "2.7.4"
|
||||
|
@ -2245,6 +2256,16 @@ version = "2.0.1"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a993555f31e5a609f617c12db6250dedcac1b0a85076912c436e6fc9b2c8e6a3"
|
||||
|
||||
[[package]]
|
||||
name = "quick-xml"
|
||||
version = "0.37.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "331e97a1af0bf59823e6eadffe373d7b27f485be8748f71471c662c1f269b7fb"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quote"
|
||||
version = "1.0.40"
|
||||
|
@ -2804,6 +2825,25 @@ version = "1.0.17"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "955d28af4278de8121b7ebeb796b6a45735dc01436d898801014aced2773a3d6"
|
||||
|
||||
[[package]]
|
||||
name = "rusty-s3"
|
||||
version = "0.7.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8f51a5a6b15f25d3e10c068039ee13befb6110fcb36c2b26317bcbdc23484d96"
|
||||
dependencies = [
|
||||
"base64 0.22.1",
|
||||
"hmac",
|
||||
"md-5",
|
||||
"percent-encoding",
|
||||
"quick-xml",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sha2",
|
||||
"time",
|
||||
"url",
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ryu"
|
||||
version = "1.0.20"
|
||||
|
|
|
@ -152,6 +152,7 @@ tikv-jemallocator = { version = "0.6", features = [
|
|||
"unprefixed_malloc_on_supported_platforms",
|
||||
], optional = true }
|
||||
|
||||
rusty-s3 = "0.7.0"
|
||||
sd-notify = { version = "0.4", optional = true }
|
||||
|
||||
# Used for matrix spec type definitions and helpers
|
||||
|
|
|
@ -242,6 +242,41 @@ impl From<IncompleteConfig> for Config {
|
|||
}),
|
||||
directory_structure,
|
||||
},
|
||||
IncompleteMediaBackendConfig::S3 {
|
||||
directory_structure,
|
||||
endpoint,
|
||||
bucket,
|
||||
region,
|
||||
key,
|
||||
secret,
|
||||
path_prefix,
|
||||
path_style,
|
||||
expiration,
|
||||
} => {
|
||||
let path_style = if path_style {
|
||||
rusty_s3::UrlStyle::Path
|
||||
} else {
|
||||
rusty_s3::UrlStyle::VirtualHost
|
||||
};
|
||||
|
||||
let bucket = rusty_s3::Bucket::new(
|
||||
endpoint.clone(),
|
||||
path_style,
|
||||
bucket.clone(),
|
||||
region.clone(),
|
||||
)
|
||||
.expect("S3 config is valid");
|
||||
|
||||
let credentials = rusty_s3::Credentials::new(key, secret);
|
||||
|
||||
MediaBackendConfig::S3 {
|
||||
directory_structure,
|
||||
bucket,
|
||||
credentials,
|
||||
duration: expiration.unwrap_or_else(|| Duration::from_secs(30)),
|
||||
path_prefix: path_prefix.unwrap_or("/".to_owned()),
|
||||
}
|
||||
}
|
||||
},
|
||||
retention: media.retention.into(),
|
||||
};
|
||||
|
@ -481,6 +516,20 @@ pub enum IncompleteMediaBackendConfig {
|
|||
#[serde(default)]
|
||||
directory_structure: DirectoryStructure,
|
||||
},
|
||||
S3 {
|
||||
#[serde(default)]
|
||||
directory_structure: DirectoryStructure,
|
||||
endpoint: Url,
|
||||
bucket: String,
|
||||
region: String,
|
||||
key: String,
|
||||
secret: String,
|
||||
path_prefix: Option<String>,
|
||||
#[serde(default = "false_fn")]
|
||||
path_style: bool,
|
||||
#[serde(default, with = "humantime_serde::option")]
|
||||
expiration: Option<Duration>,
|
||||
},
|
||||
}
|
||||
|
||||
impl Default for IncompleteMediaBackendConfig {
|
||||
|
@ -498,6 +547,13 @@ pub enum MediaBackendConfig {
|
|||
path: String,
|
||||
directory_structure: DirectoryStructure,
|
||||
},
|
||||
S3 {
|
||||
directory_structure: DirectoryStructure,
|
||||
bucket: rusty_s3::Bucket,
|
||||
credentials: rusty_s3::Credentials,
|
||||
duration: Duration,
|
||||
path_prefix: String,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
|
|
|
@ -1079,6 +1079,7 @@ impl KeyValueDatabase {
|
|||
depth: *depth,
|
||||
},
|
||||
file.file_name().to_str().unwrap(),
|
||||
true,
|
||||
)?,
|
||||
)
|
||||
.await?;
|
||||
|
|
|
@ -589,7 +589,8 @@ impl Service {
|
|||
RoomMessageEventContent::text_plain(
|
||||
"Expected code block in command body. Add --help for details.",
|
||||
)
|
||||
}.into()
|
||||
}
|
||||
.into()
|
||||
}
|
||||
AdminCommand::UnregisterAppservice {
|
||||
appservice_identifier,
|
||||
|
@ -602,7 +603,8 @@ impl Service {
|
|||
Err(e) => RoomMessageEventContent::text_plain(format!(
|
||||
"Failed to unregister appservice: {e}"
|
||||
)),
|
||||
}.into(),
|
||||
}
|
||||
.into(),
|
||||
AdminCommand::ListAppservices => {
|
||||
let appservices = services().appservice.iter_ids().await;
|
||||
let output = format!(
|
||||
|
@ -640,7 +642,8 @@ impl Service {
|
|||
RoomMessageEventContent::text_plain(&msg)
|
||||
}
|
||||
Err(e) => RoomMessageEventContent::text_plain(e.to_string()),
|
||||
}.into(),
|
||||
}
|
||||
.into(),
|
||||
AdminCommand::IncomingFederation => {
|
||||
let map = services().globals.roomid_federationhandletime.read().await;
|
||||
let mut msg: String = format!("Handling {} incoming pdus:\n", map.len());
|
||||
|
@ -681,7 +684,8 @@ impl Service {
|
|||
))
|
||||
} else {
|
||||
RoomMessageEventContent::text_plain("Event not found.")
|
||||
}.into()
|
||||
}
|
||||
.into()
|
||||
}
|
||||
AdminCommand::ParsePdu => {
|
||||
if body.len() > 2 && body[0].trim() == "```" && body.last().unwrap().trim() == "```"
|
||||
|
@ -715,7 +719,8 @@ impl Service {
|
|||
}
|
||||
} else {
|
||||
RoomMessageEventContent::text_plain("Expected code block in command body.")
|
||||
}.into()
|
||||
}
|
||||
.into()
|
||||
}
|
||||
AdminCommand::GetPdu { event_id } => {
|
||||
let mut outlier = false;
|
||||
|
@ -753,7 +758,8 @@ impl Service {
|
|||
)
|
||||
}
|
||||
None => RoomMessageEventContent::text_plain("PDU not found."),
|
||||
}.into()
|
||||
}
|
||||
.into()
|
||||
}
|
||||
AdminCommand::MemoryUsage => {
|
||||
let response1 = services().memory_usage().await;
|
||||
|
@ -761,7 +767,8 @@ impl Service {
|
|||
|
||||
RoomMessageEventContent::text_plain(format!(
|
||||
"Services:\n{response1}\n\nDatabase:\n{response2}"
|
||||
)).into()
|
||||
))
|
||||
.into()
|
||||
}
|
||||
AdminCommand::ClearDatabaseCaches { amount } => {
|
||||
services().globals.db.clear_caches(amount);
|
||||
|
@ -786,7 +793,8 @@ impl Service {
|
|||
Err(e) => {
|
||||
return Ok(RoomMessageEventContent::text_plain(format!(
|
||||
"The supplied username is not a valid username: {e}"
|
||||
)).into())
|
||||
))
|
||||
.into())
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -794,7 +802,8 @@ impl Service {
|
|||
if user_id.server_name() != services().globals.server_name() {
|
||||
return Ok(RoomMessageEventContent::text_plain(
|
||||
"The specified user is not from this server!",
|
||||
).into());
|
||||
)
|
||||
.into());
|
||||
};
|
||||
|
||||
// Check if the specified user is valid
|
||||
|
@ -808,7 +817,8 @@ impl Service {
|
|||
{
|
||||
return Ok(RoomMessageEventContent::text_plain(
|
||||
"The specified user does not exist!",
|
||||
).into());
|
||||
)
|
||||
.into());
|
||||
}
|
||||
|
||||
let new_password = utils::random_string(AUTO_GEN_PASSWORD_LENGTH);
|
||||
|
@ -823,7 +833,8 @@ impl Service {
|
|||
Err(e) => RoomMessageEventContent::text_plain(format!(
|
||||
"Couldn't reset the password for user {user_id}: {e}"
|
||||
)),
|
||||
}.into()
|
||||
}
|
||||
.into()
|
||||
}
|
||||
AdminCommand::CreateUser { username, password } => {
|
||||
let password =
|
||||
|
@ -837,7 +848,8 @@ impl Service {
|
|||
Err(e) => {
|
||||
return Ok(RoomMessageEventContent::text_plain(format!(
|
||||
"The supplied username is not a valid username: {e}"
|
||||
)).into())
|
||||
))
|
||||
.into())
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -845,18 +857,21 @@ impl Service {
|
|||
if user_id.server_name() != services().globals.server_name() {
|
||||
return Ok(RoomMessageEventContent::text_plain(
|
||||
"The specified user is not from this server!",
|
||||
).into());
|
||||
)
|
||||
.into());
|
||||
};
|
||||
|
||||
if user_id.is_historical() {
|
||||
return Ok(RoomMessageEventContent::text_plain(format!(
|
||||
"Userid {user_id} is not allowed due to historical"
|
||||
)).into());
|
||||
))
|
||||
.into());
|
||||
}
|
||||
if services().users.exists(&user_id)? {
|
||||
return Ok(RoomMessageEventContent::text_plain(format!(
|
||||
"Userid {user_id} already exists"
|
||||
)).into());
|
||||
))
|
||||
.into());
|
||||
}
|
||||
// Create user
|
||||
services().users.create(&user_id, Some(password.as_str()))?;
|
||||
|
@ -893,10 +908,10 @@ impl Service {
|
|||
// Inhibit login does not work for guests
|
||||
RoomMessageEventContent::text_plain(format!(
|
||||
"Created user with user_id: {user_id} and password: {password}"
|
||||
)).into()
|
||||
))
|
||||
.into()
|
||||
}
|
||||
AdminCommand::AllowRegistration { status } => {
|
||||
if let Some(status) = status {
|
||||
AdminCommand::AllowRegistration { status } => if let Some(status) = status {
|
||||
services().globals.set_registration(status).await;
|
||||
RoomMessageEventContent::text_plain(if status {
|
||||
"Registration is now enabled"
|
||||
|
@ -911,8 +926,8 @@ impl Service {
|
|||
"Registration is currently disabled"
|
||||
},
|
||||
)
|
||||
}.into()
|
||||
}
|
||||
.into(),
|
||||
AdminCommand::DisableRoom { room_id } => {
|
||||
services().rooms.metadata.disable_room(&room_id, true)?;
|
||||
RoomMessageEventContent::text_plain("Room disabled.").into()
|
||||
|
@ -955,6 +970,7 @@ impl Service {
|
|||
services()
|
||||
.media
|
||||
.purge_from_user(&user_id, purge_media.force_filehash, after)
|
||||
.await
|
||||
.len()
|
||||
} else {
|
||||
0
|
||||
|
@ -1023,13 +1039,17 @@ impl Service {
|
|||
failed_count += services()
|
||||
.media
|
||||
.purge_from_user(user_id, purge_media.force_filehash, after)
|
||||
.await
|
||||
.len();
|
||||
}
|
||||
}
|
||||
|
||||
let mut message = format!("Deactivated {deactivation_count} accounts.");
|
||||
if !admins.is_empty() {
|
||||
message.push_str(&format!("\nSkipped admin accounts: {:?}. Use --force to deactivate admin accounts",admins.join(", ")));
|
||||
message.push_str(&format!(
|
||||
"\nSkipped admin accounts: {:?}. Use --force to deactivate admin accounts",
|
||||
admins.join(", ")
|
||||
));
|
||||
}
|
||||
if failed_count != 0 {
|
||||
message.push_str(&format!(
|
||||
|
@ -1042,14 +1062,19 @@ impl Service {
|
|||
RoomMessageEventContent::text_plain(
|
||||
"Expected code block in command body. Add --help for details.",
|
||||
)
|
||||
}.into()
|
||||
}
|
||||
.into()
|
||||
}
|
||||
AdminCommand::QueryMedia { mxc } => {
|
||||
let Ok((server_name, media_id)) = mxc.parts() else {
|
||||
return Ok(RoomMessageEventContent::text_plain("Invalid media MXC").into());
|
||||
};
|
||||
|
||||
let MediaQuery{ is_blocked, source_file, thumbnails } = services().media.query(server_name, media_id)?;
|
||||
let MediaQuery {
|
||||
is_blocked,
|
||||
source_file,
|
||||
thumbnails,
|
||||
} = services().media.query(server_name, media_id)?;
|
||||
let mut message = format!("Is blocked Media ID: {is_blocked}");
|
||||
|
||||
if let Some(MediaQueryFileInfo {
|
||||
|
@ -1060,14 +1085,16 @@ impl Service {
|
|||
unauthenticated_access_permitted,
|
||||
is_blocked_via_filehash,
|
||||
file_info: time_info,
|
||||
}) = source_file {
|
||||
}) = source_file
|
||||
{
|
||||
message.push_str("\n\nInformation on full (non-thumbnail) file:\n");
|
||||
|
||||
if let Some(FileInfo {
|
||||
creation,
|
||||
last_access,
|
||||
size,
|
||||
}) = time_info {
|
||||
}) = time_info
|
||||
{
|
||||
message.push_str(&format!("\nIs stored: true\nCreated at: {}\nLast accessed at: {}\nSize of file: {}",
|
||||
DateTime::from_timestamp(creation.try_into().unwrap_or(i64::MAX),0).expect("Timestamp is within range"),
|
||||
DateTime::from_timestamp(last_access.try_into().unwrap_or(i64::MAX),0).expect("Timestamp is within range"),
|
||||
|
@ -1105,13 +1132,15 @@ impl Service {
|
|||
unauthenticated_access_permitted,
|
||||
is_blocked_via_filehash,
|
||||
file_info: time_info,
|
||||
} in thumbnails {
|
||||
} in thumbnails
|
||||
{
|
||||
message.push_str(&format!("\n\nDimensions: {width}x{height}"));
|
||||
if let Some(FileInfo {
|
||||
creation,
|
||||
last_access,
|
||||
size,
|
||||
}) = time_info {
|
||||
}) = time_info
|
||||
{
|
||||
message.push_str(&format!("\nIs stored: true\nCreated at: {}\nLast accessed at: {}\nSize of file: {}",
|
||||
DateTime::from_timestamp(creation.try_into().unwrap_or(i64::MAX),0).expect("Timestamp is within range"),
|
||||
DateTime::from_timestamp(last_access.try_into().unwrap_or(i64::MAX),0).expect("Timestamp is within range"),
|
||||
|
@ -1145,7 +1174,8 @@ impl Service {
|
|||
file,
|
||||
content_type,
|
||||
content_disposition,
|
||||
} = client_server::media::get_content(server_name, media_id.to_owned(), true, true).await?;
|
||||
} = client_server::media::get_content(server_name, media_id.to_owned(), true, true)
|
||||
.await?;
|
||||
|
||||
if let Ok(image) = image::load_from_memory(&file) {
|
||||
let filename = content_disposition.and_then(|cd| cd.filename);
|
||||
|
@ -1185,10 +1215,7 @@ impl Service {
|
|||
}
|
||||
}
|
||||
AdminCommand::ListMedia {
|
||||
user_server_filter: ListMediaArgs {
|
||||
user,
|
||||
server,
|
||||
},
|
||||
user_server_filter: ListMediaArgs { user, server },
|
||||
include_thumbnails,
|
||||
content_type,
|
||||
uploaded_before,
|
||||
|
@ -1211,8 +1238,7 @@ impl Service {
|
|||
size,
|
||||
creation,
|
||||
} in services().media.list(
|
||||
user
|
||||
.map(ServerNameOrUserId::UserId)
|
||||
user.map(ServerNameOrUserId::UserId)
|
||||
.or_else(|| server.map(ServerNameOrUserId::ServerName)),
|
||||
include_thumbnails,
|
||||
content_type.as_deref(),
|
||||
|
@ -1227,15 +1253,20 @@ impl Service {
|
|||
.transpose()
|
||||
.map_err(|_| Error::AdminCommand("Timestamp must be after unix epoch"))?
|
||||
.as_ref()
|
||||
.map(Duration::as_secs)
|
||||
.map(Duration::as_secs),
|
||||
)? {
|
||||
|
||||
let user_id = uploader_localpart.map(|localpart| format!("@{localpart}:{server_name}")).unwrap_or_default();
|
||||
let user_id = uploader_localpart
|
||||
.map(|localpart| format!("@{localpart}:{server_name}"))
|
||||
.unwrap_or_default();
|
||||
let content_type = content_type.unwrap_or_default();
|
||||
let filename = filename.unwrap_or_default();
|
||||
let dimensions = dimensions.map(|(w, h)| format!("{w}x{h}")).unwrap_or_default();
|
||||
let dimensions = dimensions
|
||||
.map(|(w, h)| format!("{w}x{h}"))
|
||||
.unwrap_or_default();
|
||||
let size = ByteSize::b(size).display().si();
|
||||
let creation = DateTime::from_timestamp(creation.try_into().unwrap_or(i64::MAX),0).expect("Timestamp is within range");
|
||||
let creation =
|
||||
DateTime::from_timestamp(creation.try_into().unwrap_or(i64::MAX), 0)
|
||||
.expect("Timestamp is within range");
|
||||
|
||||
markdown_message
|
||||
.push_str(&format!("\n| mxc://{server_name}/{media_id} | {dimensions} | {creation} | {user_id} | {content_type} | {filename} | {size} |"));
|
||||
|
@ -1248,11 +1279,10 @@ impl Service {
|
|||
html_message.push_str("</tbody></table>");
|
||||
|
||||
RoomMessageEventContent::text_html(markdown_message, html_message).into()
|
||||
},
|
||||
AdminCommand::PurgeMedia => media_from_body(body).map_or_else(
|
||||
|message| message,
|
||||
|media| {
|
||||
let failed_count = services().media.purge(&media, true).len();
|
||||
}
|
||||
AdminCommand::PurgeMedia => match media_from_body(body) {
|
||||
Ok(media) => {
|
||||
let failed_count = services().media.purge(&media, true).await.len();
|
||||
|
||||
if failed_count == 0 {
|
||||
RoomMessageEventContent::text_plain("Successfully purged media")
|
||||
|
@ -1260,9 +1290,11 @@ impl Service {
|
|||
RoomMessageEventContent::text_plain(format!(
|
||||
"Failed to delete {failed_count} media, check logs for more details"
|
||||
))
|
||||
}.into()
|
||||
}
|
||||
.into()
|
||||
}
|
||||
Err(message) => message,
|
||||
},
|
||||
),
|
||||
AdminCommand::PurgeMediaFromUsers {
|
||||
from_last,
|
||||
force_filehash,
|
||||
|
@ -1282,6 +1314,7 @@ impl Service {
|
|||
failed_count += services()
|
||||
.media
|
||||
.purge_from_user(user_id, force_filehash, after)
|
||||
.await
|
||||
.len();
|
||||
}
|
||||
|
||||
|
@ -1296,7 +1329,8 @@ impl Service {
|
|||
RoomMessageEventContent::text_plain(
|
||||
"Expected code block in command body. Add --help for details.",
|
||||
)
|
||||
}.into()
|
||||
}
|
||||
.into()
|
||||
}
|
||||
AdminCommand::PurgeMediaFromServer {
|
||||
server_id: server_name,
|
||||
|
@ -1314,6 +1348,7 @@ impl Service {
|
|||
let failed_count = services()
|
||||
.media
|
||||
.purge_from_server(&server_name, force_filehash, after)
|
||||
.await
|
||||
.len();
|
||||
|
||||
if failed_count == 0 {
|
||||
|
@ -1324,14 +1359,15 @@ impl Service {
|
|||
RoomMessageEventContent::text_plain(format!(
|
||||
"Failed to purge {failed_count} media, check logs for more details"
|
||||
))
|
||||
}.into()
|
||||
}
|
||||
AdminCommand::BlockMedia { and_purge, reason } => media_from_body(body).map_or_else(
|
||||
|message| message,
|
||||
|media| {
|
||||
.into()
|
||||
}
|
||||
AdminCommand::BlockMedia { and_purge, reason } => match media_from_body(body) {
|
||||
Err(message) => message,
|
||||
Ok(media) => {
|
||||
let failed_count = services().media.block(&media, reason).len();
|
||||
let failed_purge_count = if and_purge {
|
||||
services().media.purge(&media, true).len()
|
||||
services().media.purge(&media, true).await.len()
|
||||
} else {
|
||||
0
|
||||
};
|
||||
|
@ -1346,10 +1382,11 @@ impl Service {
|
|||
)),
|
||||
(false, false) => RoomMessageEventContent::text_plain(format!(
|
||||
"Failed to block {failed_count}, and purge {failed_purge_count} media, check logs for more details"
|
||||
))
|
||||
}.into()
|
||||
)),
|
||||
}
|
||||
.into()
|
||||
}
|
||||
},
|
||||
),
|
||||
AdminCommand::BlockMediaFromUsers { from_last, reason } => {
|
||||
let after = from_last.map(unix_secs_from_duration).transpose()?;
|
||||
|
||||
|
@ -1385,7 +1422,8 @@ impl Service {
|
|||
RoomMessageEventContent::text_plain(
|
||||
"Expected code block in command body. Add --help for details.",
|
||||
)
|
||||
}.into()
|
||||
}
|
||||
.into()
|
||||
}
|
||||
AdminCommand::ListBlockedMedia => {
|
||||
let mut markdown_message = String::from(
|
||||
|
@ -1402,7 +1440,8 @@ impl Service {
|
|||
unix_secs,
|
||||
reason,
|
||||
sha256_hex,
|
||||
}) = media else {
|
||||
}) = media
|
||||
else {
|
||||
continue;
|
||||
};
|
||||
|
||||
|
@ -1415,8 +1454,9 @@ impl Service {
|
|||
.flatten()
|
||||
.expect("Time is valid");
|
||||
|
||||
markdown_message
|
||||
.push_str(&format!("\n| {sha256_hex} | mxc://{server_name}/{media_id} | {time} | {reason} |"));
|
||||
markdown_message.push_str(&format!(
|
||||
"\n| {sha256_hex} | mxc://{server_name}/{media_id} | {time} | {reason} |"
|
||||
));
|
||||
|
||||
html_message.push_str(&format!(
|
||||
"<tr><td>{sha256_hex}</td><td>mxc://{server_name}/{media_id}</td><td>{time}</td><td>{reason}</td></tr>",
|
||||
|
@ -1438,7 +1478,8 @@ impl Service {
|
|||
RoomMessageEventContent::text_plain(format!(
|
||||
"Failed to unblock {failed_count} media, check logs for more details"
|
||||
))
|
||||
}.into()
|
||||
}
|
||||
.into()
|
||||
},
|
||||
),
|
||||
AdminCommand::SignJson => {
|
||||
|
@ -1463,7 +1504,8 @@ impl Service {
|
|||
RoomMessageEventContent::text_plain(
|
||||
"Expected code block in command body. Add --help for details.",
|
||||
)
|
||||
}.into()
|
||||
}
|
||||
.into()
|
||||
}
|
||||
AdminCommand::VerifyJson => {
|
||||
if body.len() > 2 && body[0].trim() == "```" && body.last().unwrap().trim() == "```"
|
||||
|
@ -1524,7 +1566,8 @@ impl Service {
|
|||
RoomMessageEventContent::text_plain(
|
||||
"Expected code block in command body. Add --help for details.",
|
||||
)
|
||||
}.into()
|
||||
}
|
||||
.into()
|
||||
}
|
||||
AdminCommand::HashAndSignEvent { room_version_id } => {
|
||||
if body.len() > 2
|
||||
|
@ -1539,7 +1582,10 @@ impl Service {
|
|||
services().globals.server_name().as_str(),
|
||||
services().globals.keypair(),
|
||||
&mut value,
|
||||
&room_version_id.rules().expect("Supported room version has rules").redaction,
|
||||
&room_version_id
|
||||
.rules()
|
||||
.expect("Supported room version has rules")
|
||||
.redaction,
|
||||
) {
|
||||
RoomMessageEventContent::text_plain(format!("Invalid event: {e}"))
|
||||
} else {
|
||||
|
@ -1554,7 +1600,8 @@ impl Service {
|
|||
RoomMessageEventContent::text_plain(
|
||||
"Expected code block in command body. Add --help for details.",
|
||||
)
|
||||
}.into()
|
||||
}
|
||||
.into()
|
||||
}
|
||||
AdminCommand::RemoveAlias { alias } => {
|
||||
if alias.server_name() != services().globals.server_name() {
|
||||
|
@ -1579,7 +1626,8 @@ impl Service {
|
|||
.alias
|
||||
.remove_alias(&alias, services().globals.server_user())?;
|
||||
RoomMessageEventContent::text_plain("Alias removed successfully")
|
||||
}.into()
|
||||
}
|
||||
.into()
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -230,8 +230,6 @@ impl Service {
|
|||
shutdown: AtomicBool::new(false),
|
||||
};
|
||||
|
||||
// Remove this exception once other media backends are added
|
||||
#[allow(irrefutable_let_patterns)]
|
||||
if let MediaBackendConfig::FileSystem { path, .. } = &s.config.media.backend {
|
||||
fs::create_dir_all(path)?;
|
||||
}
|
||||
|
@ -489,6 +487,7 @@ impl Service {
|
|||
media_directory: &str,
|
||||
directory_structure: &DirectoryStructure,
|
||||
sha256_hex: &str,
|
||||
create: bool,
|
||||
) -> Result<PathBuf> {
|
||||
let mut r = PathBuf::new();
|
||||
r.push(media_directory);
|
||||
|
@ -501,8 +500,10 @@ impl Service {
|
|||
r.push(current_path);
|
||||
}
|
||||
|
||||
if create {
|
||||
// Create all directories leading up to file
|
||||
fs::create_dir_all(&r).inspect_err(|e| error!("Error creating leading directories for media with sha256 hash of {sha256_hex}: {e}"))?;
|
||||
}
|
||||
|
||||
r.push(filename);
|
||||
} else {
|
||||
|
@ -512,6 +513,16 @@ impl Service {
|
|||
Ok(r)
|
||||
}
|
||||
|
||||
pub fn get_media_path_string(
|
||||
&self,
|
||||
media_directory: &str,
|
||||
directory_structure: &DirectoryStructure,
|
||||
sha256_hex: &str,
|
||||
) -> Result<String> {
|
||||
let path = self.get_media_path(media_directory, directory_structure, sha256_hex, false)?;
|
||||
Ok(path.to_str().expect("Media path is valid UTF-8").to_owned())
|
||||
}
|
||||
|
||||
pub fn shutdown(&self) {
|
||||
self.shutdown.store(true, atomic::Ordering::Relaxed);
|
||||
// On shutdown
|
||||
|
|
|
@ -8,13 +8,14 @@ use ruma::{
|
|||
OwnedServerName, ServerName, UserId,
|
||||
};
|
||||
use sha2::{digest::Output, Digest, Sha256};
|
||||
use tracing::{error, info};
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
use crate::{
|
||||
config::{DirectoryStructure, MediaBackendConfig},
|
||||
services, utils, Error, Result,
|
||||
};
|
||||
use image::imageops::FilterType;
|
||||
use rusty_s3::S3Action;
|
||||
|
||||
pub struct DbFileMeta {
|
||||
pub sha256_digest: Vec<u8>,
|
||||
|
@ -142,7 +143,7 @@ impl Service {
|
|||
let count = files.iter().filter(|res| res.is_ok()).count();
|
||||
info!("Found {count} media files to delete");
|
||||
|
||||
purge_files(files);
|
||||
purge_files(files).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -159,11 +160,14 @@ impl Service {
|
|||
) -> Result<()> {
|
||||
let (sha256_digest, sha256_hex) = generate_digests(file);
|
||||
|
||||
for error in self.clear_required_space(
|
||||
for error in self
|
||||
.clear_required_space(
|
||||
&sha256_digest,
|
||||
MediaType::new(servername, false),
|
||||
size(file)?,
|
||||
)? {
|
||||
)
|
||||
.await?
|
||||
{
|
||||
error!(
|
||||
"Error deleting file to clear space when downloading/creating new media file: {error}"
|
||||
)
|
||||
|
@ -207,7 +211,8 @@ impl Service {
|
|||
&sha256_digest,
|
||||
MediaType::new(servername, true),
|
||||
size(file)?,
|
||||
)?;
|
||||
)
|
||||
.await?;
|
||||
|
||||
self.db.create_thumbnail_metadata(
|
||||
sha256_digest,
|
||||
|
@ -444,10 +449,14 @@ impl Service {
|
|||
/// purged have that sha256 hash.
|
||||
///
|
||||
/// Returns errors for all the files that were failed to be deleted, if any.
|
||||
pub fn purge(&self, media: &[(OwnedServerName, String)], force_filehash: bool) -> Vec<Error> {
|
||||
pub async fn purge(
|
||||
&self,
|
||||
media: &[(OwnedServerName, String)],
|
||||
force_filehash: bool,
|
||||
) -> Vec<Error> {
|
||||
let hashes = self.db.purge_and_get_hashes(media, force_filehash);
|
||||
|
||||
purge_files(hashes)
|
||||
purge_files(hashes).await
|
||||
}
|
||||
|
||||
/// Purges all (past a certain time in unix seconds, if specified) media
|
||||
|
@ -462,7 +471,7 @@ impl Service {
|
|||
///
|
||||
/// Note: it only currently works for local users, as we cannot determine who
|
||||
/// exactly uploaded the file when it comes to remove users.
|
||||
pub fn purge_from_user(
|
||||
pub async fn purge_from_user(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
force_filehash: bool,
|
||||
|
@ -472,7 +481,7 @@ impl Service {
|
|||
.db
|
||||
.purge_and_get_hashes_from_user(user_id, force_filehash, after);
|
||||
|
||||
purge_files(hashes)
|
||||
purge_files(hashes).await
|
||||
}
|
||||
|
||||
/// Purges all (past a certain time in unix seconds, if specified) media
|
||||
|
@ -484,7 +493,7 @@ impl Service {
|
|||
/// purged have that sha256 hash.
|
||||
///
|
||||
/// Returns errors for all the files that were failed to be deleted, if any.
|
||||
pub fn purge_from_server(
|
||||
pub async fn purge_from_server(
|
||||
&self,
|
||||
server_name: &ServerName,
|
||||
force_filehash: bool,
|
||||
|
@ -494,7 +503,7 @@ impl Service {
|
|||
.db
|
||||
.purge_and_get_hashes_from_server(server_name, force_filehash, after);
|
||||
|
||||
purge_files(hashes)
|
||||
purge_files(hashes).await
|
||||
}
|
||||
|
||||
/// Checks whether the media has been blocked by administrators, returning either
|
||||
|
@ -558,7 +567,7 @@ impl Service {
|
|||
self.db.list_blocked()
|
||||
}
|
||||
|
||||
pub fn clear_required_space(
|
||||
pub async fn clear_required_space(
|
||||
&self,
|
||||
sha256_digest: &[u8],
|
||||
media_type: MediaType,
|
||||
|
@ -577,7 +586,7 @@ impl Service {
|
|||
info!("Deleting {} files to clear space for new media file", count);
|
||||
}
|
||||
|
||||
Ok(purge_files(files))
|
||||
Ok(purge_files(files).await)
|
||||
}
|
||||
|
||||
/// Fetches the file from the configured media backend, as well as updating the "last accessed"
|
||||
|
@ -599,6 +608,7 @@ impl Service {
|
|||
path,
|
||||
directory_structure,
|
||||
&hex::encode(sha256_digest),
|
||||
true,
|
||||
)?;
|
||||
|
||||
let mut file = Vec::new();
|
||||
|
@ -606,6 +616,28 @@ impl Service {
|
|||
|
||||
file
|
||||
}
|
||||
MediaBackendConfig::S3 {
|
||||
directory_structure,
|
||||
bucket,
|
||||
credentials,
|
||||
duration,
|
||||
path_prefix,
|
||||
} => {
|
||||
let path = services().globals.get_media_path_string(
|
||||
path_prefix,
|
||||
directory_structure,
|
||||
&hex::encode(sha256_digest),
|
||||
)?;
|
||||
|
||||
let url = bucket.get_object(Some(credentials), &path).sign(*duration);
|
||||
let response = services().globals.default_client().get(url).send().await?;
|
||||
if !response.status().is_success() {
|
||||
warn!("S3 request error:\n{}", response.text().await?);
|
||||
return Err(Error::bad_database("Cannot read media file"));
|
||||
}
|
||||
|
||||
response.bytes().await?.to_vec()
|
||||
}
|
||||
};
|
||||
|
||||
if let Some((server_name, media_id)) = original_file_id {
|
||||
|
@ -627,13 +659,42 @@ pub async fn create_file(sha256_hex: &str, file: &[u8]) -> Result<()> {
|
|||
path,
|
||||
directory_structure,
|
||||
} => {
|
||||
let path = services()
|
||||
let path =
|
||||
services()
|
||||
.globals
|
||||
.get_media_path(path, directory_structure, sha256_hex)?;
|
||||
.get_media_path(path, directory_structure, sha256_hex, true)?;
|
||||
|
||||
let mut f = File::create(path).await?;
|
||||
f.write_all(file).await?;
|
||||
}
|
||||
MediaBackendConfig::S3 {
|
||||
directory_structure,
|
||||
bucket,
|
||||
credentials,
|
||||
duration,
|
||||
path_prefix,
|
||||
} => {
|
||||
let path = services().globals.get_media_path_string(
|
||||
path_prefix,
|
||||
directory_structure,
|
||||
sha256_hex,
|
||||
)?;
|
||||
|
||||
let url = bucket.put_object(Some(credentials), &path).sign(*duration);
|
||||
let resp = services()
|
||||
.globals
|
||||
.default_client()
|
||||
.put(url)
|
||||
// todo: to_owned is probably awful for performance
|
||||
.body(file.to_owned())
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
if !resp.status().is_success() {
|
||||
warn!("S3 request error:\n{}", resp.text().await?);
|
||||
return Err(Error::bad_database("Cannot create media file"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
@ -643,21 +704,25 @@ pub async fn create_file(sha256_hex: &str, file: &[u8]) -> Result<()> {
|
|||
/// Returns a `Vec` of errors that occurred when attempting to delete the files
|
||||
///
|
||||
/// Note: this does NOT remove the related metadata from the database
|
||||
fn purge_files(hashes: Vec<Result<String>>) -> Vec<Error> {
|
||||
hashes
|
||||
.into_iter()
|
||||
.map(|hash| match hash {
|
||||
Ok(v) => delete_file(&v),
|
||||
Err(e) => Err(e),
|
||||
})
|
||||
.filter_map(|r| if let Err(e) = r { Some(e) } else { None })
|
||||
.collect()
|
||||
async fn purge_files(hashes: Vec<Result<String>>) -> Vec<Error> {
|
||||
let mut errors = Vec::new();
|
||||
for hash in hashes {
|
||||
match hash {
|
||||
Ok(v) => {
|
||||
if let Err(e) = delete_file(&v).await {
|
||||
errors.push(e);
|
||||
}
|
||||
}
|
||||
Err(e) => errors.push(e),
|
||||
}
|
||||
}
|
||||
errors
|
||||
}
|
||||
|
||||
/// Deletes the given file from the media backend
|
||||
///
|
||||
/// Note: this does NOT remove the related metadata from the database
|
||||
fn delete_file(sha256_hex: &str) -> Result<()> {
|
||||
async fn delete_file(sha256_hex: &str) -> Result<()> {
|
||||
match &services().globals.config.media.backend {
|
||||
MediaBackendConfig::FileSystem {
|
||||
path,
|
||||
|
@ -666,7 +731,7 @@ fn delete_file(sha256_hex: &str) -> Result<()> {
|
|||
let mut path =
|
||||
services()
|
||||
.globals
|
||||
.get_media_path(path, directory_structure, sha256_hex)?;
|
||||
.get_media_path(path, directory_structure, sha256_hex, true)?;
|
||||
|
||||
if let Err(e) = fs::remove_file(&path) {
|
||||
// Multiple files with the same filehash might be requseted to be deleted
|
||||
|
@ -696,6 +761,34 @@ fn delete_file(sha256_hex: &str) -> Result<()> {
|
|||
}
|
||||
}
|
||||
}
|
||||
MediaBackendConfig::S3 {
|
||||
directory_structure,
|
||||
bucket,
|
||||
credentials,
|
||||
duration,
|
||||
path_prefix,
|
||||
} => {
|
||||
let path = services().globals.get_media_path_string(
|
||||
path_prefix,
|
||||
directory_structure,
|
||||
sha256_hex,
|
||||
)?;
|
||||
let url = bucket
|
||||
.delete_object(Some(credentials), &path)
|
||||
.sign(*duration);
|
||||
|
||||
let resp = services()
|
||||
.globals
|
||||
.default_client()
|
||||
.delete(url)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
if !resp.status().is_success() {
|
||||
warn!("S3 request error:\n{}", resp.text().await?);
|
||||
return Err(Error::bad_database("Cannot delete media file"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue