mirror of
https://gitlab.com/famedly/conduit.git
synced 2025-06-27 16:35:59 +00:00
Merge branch 's3-media-storage' into 'next'
feat(media): add S3 backend See merge request famedly/conduit!384
This commit is contained in:
commit
bbed38eadc
8 changed files with 450 additions and 126 deletions
40
Cargo.lock
generated
40
Cargo.lock
generated
|
@ -532,6 +532,7 @@ dependencies = [
|
||||||
"rusqlite",
|
"rusqlite",
|
||||||
"rust-argon2",
|
"rust-argon2",
|
||||||
"rust-rocksdb",
|
"rust-rocksdb",
|
||||||
|
"rusty-s3",
|
||||||
"sd-notify",
|
"sd-notify",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_html_form",
|
"serde_html_form",
|
||||||
|
@ -1807,6 +1808,16 @@ version = "0.8.4"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3"
|
checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "md-5"
|
||||||
|
version = "0.10.6"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf"
|
||||||
|
dependencies = [
|
||||||
|
"cfg-if",
|
||||||
|
"digest",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "memchr"
|
name = "memchr"
|
||||||
version = "2.7.5"
|
version = "2.7.5"
|
||||||
|
@ -2312,6 +2323,16 @@ version = "2.0.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "a993555f31e5a609f617c12db6250dedcac1b0a85076912c436e6fc9b2c8e6a3"
|
checksum = "a993555f31e5a609f617c12db6250dedcac1b0a85076912c436e6fc9b2c8e6a3"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "quick-xml"
|
||||||
|
version = "0.37.5"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "331e97a1af0bf59823e6eadffe373d7b27f485be8748f71471c662c1f269b7fb"
|
||||||
|
dependencies = [
|
||||||
|
"memchr",
|
||||||
|
"serde",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "quote"
|
name = "quote"
|
||||||
version = "1.0.40"
|
version = "1.0.40"
|
||||||
|
@ -2887,6 +2908,25 @@ version = "1.0.21"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "8a0d197bd2c9dc6e53b84da9556a69ba4cdfab8619eb41a8bd1cc2027a0f6b1d"
|
checksum = "8a0d197bd2c9dc6e53b84da9556a69ba4cdfab8619eb41a8bd1cc2027a0f6b1d"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "rusty-s3"
|
||||||
|
version = "0.7.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "8f51a5a6b15f25d3e10c068039ee13befb6110fcb36c2b26317bcbdc23484d96"
|
||||||
|
dependencies = [
|
||||||
|
"base64 0.22.1",
|
||||||
|
"hmac",
|
||||||
|
"md-5",
|
||||||
|
"percent-encoding",
|
||||||
|
"quick-xml",
|
||||||
|
"serde",
|
||||||
|
"serde_json",
|
||||||
|
"sha2",
|
||||||
|
"time",
|
||||||
|
"url",
|
||||||
|
"zeroize",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ryu"
|
name = "ryu"
|
||||||
version = "1.0.20"
|
version = "1.0.20"
|
||||||
|
|
|
@ -89,6 +89,8 @@ image = { version = "0.25", default-features = false, features = [
|
||||||
# Used for creating media filenames
|
# Used for creating media filenames
|
||||||
hex = "0.4"
|
hex = "0.4"
|
||||||
sha2 = "0.10"
|
sha2 = "0.10"
|
||||||
|
# Used for S3 media backend
|
||||||
|
rusty-s3 = "0.7.0"
|
||||||
# Used for parsing media retention policies from the config
|
# Used for parsing media retention policies from the config
|
||||||
bytesize = { version = "2", features = ["serde"] }
|
bytesize = { version = "2", features = ["serde"] }
|
||||||
humantime-serde = "1"
|
humantime-serde = "1"
|
||||||
|
|
|
@ -99,6 +99,33 @@ depth = 4
|
||||||
length = 2
|
length = 2
|
||||||
```
|
```
|
||||||
|
|
||||||
|
#### S3 backend
|
||||||
|
The S3 backend has the following fields:
|
||||||
|
- `endpoint`: The URL of the S3 endpoint to connect to
|
||||||
|
- `bucket`: The name of the S3 bucket to use for storage. This bucket must already exist and your credentials must have access to it
|
||||||
|
- `region`: The region where your S3 bucket is located
|
||||||
|
- `path`: The base directory where all the media files will be stored (defaults to
|
||||||
|
root of the bucket)
|
||||||
|
- `key`: Your Access Key ID
|
||||||
|
- `secret`: Your Secret Access Key
|
||||||
|
- `duration`: The time (in seconds) that signed requests to the S3 bucket will be valid (default: `30`)
|
||||||
|
- `bucket_use_path`: Controls the structure of the path to files in S3. If `true`, the bucket name will be included as part of the file path. If `false` (or omitted), it will be used as the bucket name in the domain name
|
||||||
|
- `directory_structure`: This is a table, used to configure how files are to be distributed within
|
||||||
|
the media directory (see [Filesystem backend](#filesystem-backend) for details)
|
||||||
|
|
||||||
|
##### Example:
|
||||||
|
```toml
|
||||||
|
[global.media]
|
||||||
|
backend = "s3"
|
||||||
|
endpoint = "http://minio:9000",
|
||||||
|
bucket = "test",
|
||||||
|
region = "minio",
|
||||||
|
key = "<s3_key>",
|
||||||
|
secret = "<s3_secret>",
|
||||||
|
duration = "15",
|
||||||
|
bucket_use_path = "true",
|
||||||
|
```
|
||||||
|
|
||||||
#### Retention policies
|
#### Retention policies
|
||||||
Over time, the amount of media will keep growing, even if they were only accessed once.
|
Over time, the amount of media will keep growing, even if they were only accessed once.
|
||||||
Retention policies allow for media files to automatically be deleted if they meet certain crietia,
|
Retention policies allow for media files to automatically be deleted if they meet certain crietia,
|
||||||
|
|
|
@ -242,6 +242,13 @@ impl From<IncompleteConfig> for Config {
|
||||||
}),
|
}),
|
||||||
directory_structure,
|
directory_structure,
|
||||||
},
|
},
|
||||||
|
IncompleteMediaBackendConfig::S3(value) => MediaBackendConfig::S3 {
|
||||||
|
bucket: value.bucket,
|
||||||
|
credentials: value.credentials,
|
||||||
|
duration: value.duration,
|
||||||
|
path: value.path,
|
||||||
|
directory_structure: value.directory_structure,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
retention: media.retention.into(),
|
retention: media.retention.into(),
|
||||||
};
|
};
|
||||||
|
@ -481,6 +488,7 @@ pub enum IncompleteMediaBackendConfig {
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
directory_structure: DirectoryStructure,
|
directory_structure: DirectoryStructure,
|
||||||
},
|
},
|
||||||
|
S3(S3MediaBackend),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for IncompleteMediaBackendConfig {
|
impl Default for IncompleteMediaBackendConfig {
|
||||||
|
@ -498,6 +506,13 @@ pub enum MediaBackendConfig {
|
||||||
path: String,
|
path: String,
|
||||||
directory_structure: DirectoryStructure,
|
directory_structure: DirectoryStructure,
|
||||||
},
|
},
|
||||||
|
S3 {
|
||||||
|
bucket: Box<rusty_s3::Bucket>,
|
||||||
|
credentials: Box<rusty_s3::Credentials>,
|
||||||
|
duration: Duration,
|
||||||
|
path: Option<String>,
|
||||||
|
directory_structure: DirectoryStructure,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Deserialize)]
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
|
@ -554,6 +569,58 @@ impl TryFrom<ShadowDirectoryStructure> for DirectoryStructure {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
struct ShadowS3MediaBackend {
|
||||||
|
endpoint: Url,
|
||||||
|
bucket: String,
|
||||||
|
region: String,
|
||||||
|
path: Option<String>,
|
||||||
|
|
||||||
|
key: String,
|
||||||
|
secret: String,
|
||||||
|
|
||||||
|
#[serde(default = "default_s3_duration")]
|
||||||
|
duration: u64,
|
||||||
|
#[serde(default = "false_fn")]
|
||||||
|
bucket_use_path: bool,
|
||||||
|
#[serde(default)]
|
||||||
|
directory_structure: DirectoryStructure,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TryFrom<ShadowS3MediaBackend> for S3MediaBackend {
|
||||||
|
type Error = Error;
|
||||||
|
|
||||||
|
fn try_from(value: ShadowS3MediaBackend) -> Result<Self, Self::Error> {
|
||||||
|
let path_style = if value.bucket_use_path {
|
||||||
|
rusty_s3::UrlStyle::Path
|
||||||
|
} else {
|
||||||
|
rusty_s3::UrlStyle::VirtualHost
|
||||||
|
};
|
||||||
|
let credentials = rusty_s3::Credentials::new(value.key, value.secret);
|
||||||
|
|
||||||
|
match rusty_s3::Bucket::new(value.endpoint, path_style, value.bucket, value.region) {
|
||||||
|
Ok(bucket) => Ok(S3MediaBackend {
|
||||||
|
bucket: Box::new(bucket),
|
||||||
|
credentials: Box::new(credentials),
|
||||||
|
duration: Duration::from_secs(value.duration),
|
||||||
|
path: value.path,
|
||||||
|
directory_structure: value.directory_structure,
|
||||||
|
}),
|
||||||
|
Err(_) => Err(Error::bad_config("Invalid S3 config")),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize, Debug, Clone)]
|
||||||
|
#[serde(try_from = "ShadowS3MediaBackend")]
|
||||||
|
pub struct S3MediaBackend {
|
||||||
|
pub bucket: Box<rusty_s3::Bucket>,
|
||||||
|
pub credentials: Box<rusty_s3::Credentials>,
|
||||||
|
pub duration: Duration,
|
||||||
|
pub path: Option<String>,
|
||||||
|
pub directory_structure: DirectoryStructure,
|
||||||
|
}
|
||||||
|
|
||||||
const DEPRECATED_KEYS: &[&str] = &[
|
const DEPRECATED_KEYS: &[&str] = &[
|
||||||
"cache_capacity",
|
"cache_capacity",
|
||||||
"turn_username",
|
"turn_username",
|
||||||
|
@ -727,3 +794,7 @@ fn default_openid_token_ttl() -> u64 {
|
||||||
pub fn default_default_room_version() -> RoomVersionId {
|
pub fn default_default_room_version() -> RoomVersionId {
|
||||||
RoomVersionId::V10
|
RoomVersionId::V10
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn default_s3_duration() -> u64 {
|
||||||
|
30
|
||||||
|
}
|
||||||
|
|
|
@ -589,7 +589,8 @@ impl Service {
|
||||||
RoomMessageEventContent::text_plain(
|
RoomMessageEventContent::text_plain(
|
||||||
"Expected code block in command body. Add --help for details.",
|
"Expected code block in command body. Add --help for details.",
|
||||||
)
|
)
|
||||||
}.into()
|
}
|
||||||
|
.into()
|
||||||
}
|
}
|
||||||
AdminCommand::UnregisterAppservice {
|
AdminCommand::UnregisterAppservice {
|
||||||
appservice_identifier,
|
appservice_identifier,
|
||||||
|
@ -602,7 +603,8 @@ impl Service {
|
||||||
Err(e) => RoomMessageEventContent::text_plain(format!(
|
Err(e) => RoomMessageEventContent::text_plain(format!(
|
||||||
"Failed to unregister appservice: {e}"
|
"Failed to unregister appservice: {e}"
|
||||||
)),
|
)),
|
||||||
}.into(),
|
}
|
||||||
|
.into(),
|
||||||
AdminCommand::ListAppservices => {
|
AdminCommand::ListAppservices => {
|
||||||
let appservices = services().appservice.iter_ids().await;
|
let appservices = services().appservice.iter_ids().await;
|
||||||
let output = format!(
|
let output = format!(
|
||||||
|
@ -640,7 +642,8 @@ impl Service {
|
||||||
RoomMessageEventContent::text_plain(&msg)
|
RoomMessageEventContent::text_plain(&msg)
|
||||||
}
|
}
|
||||||
Err(e) => RoomMessageEventContent::text_plain(e.to_string()),
|
Err(e) => RoomMessageEventContent::text_plain(e.to_string()),
|
||||||
}.into(),
|
}
|
||||||
|
.into(),
|
||||||
AdminCommand::IncomingFederation => {
|
AdminCommand::IncomingFederation => {
|
||||||
let map = services().globals.roomid_federationhandletime.read().await;
|
let map = services().globals.roomid_federationhandletime.read().await;
|
||||||
let mut msg: String = format!("Handling {} incoming pdus:\n", map.len());
|
let mut msg: String = format!("Handling {} incoming pdus:\n", map.len());
|
||||||
|
@ -681,7 +684,8 @@ impl Service {
|
||||||
))
|
))
|
||||||
} else {
|
} else {
|
||||||
RoomMessageEventContent::text_plain("Event not found.")
|
RoomMessageEventContent::text_plain("Event not found.")
|
||||||
}.into()
|
}
|
||||||
|
.into()
|
||||||
}
|
}
|
||||||
AdminCommand::ParsePdu => {
|
AdminCommand::ParsePdu => {
|
||||||
if body.len() > 2 && body[0].trim() == "```" && body.last().unwrap().trim() == "```"
|
if body.len() > 2 && body[0].trim() == "```" && body.last().unwrap().trim() == "```"
|
||||||
|
@ -715,7 +719,8 @@ impl Service {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
RoomMessageEventContent::text_plain("Expected code block in command body.")
|
RoomMessageEventContent::text_plain("Expected code block in command body.")
|
||||||
}.into()
|
}
|
||||||
|
.into()
|
||||||
}
|
}
|
||||||
AdminCommand::GetPdu { event_id } => {
|
AdminCommand::GetPdu { event_id } => {
|
||||||
let mut outlier = false;
|
let mut outlier = false;
|
||||||
|
@ -753,7 +758,8 @@ impl Service {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
None => RoomMessageEventContent::text_plain("PDU not found."),
|
None => RoomMessageEventContent::text_plain("PDU not found."),
|
||||||
}.into()
|
}
|
||||||
|
.into()
|
||||||
}
|
}
|
||||||
AdminCommand::MemoryUsage => {
|
AdminCommand::MemoryUsage => {
|
||||||
let response1 = services().memory_usage().await;
|
let response1 = services().memory_usage().await;
|
||||||
|
@ -761,7 +767,8 @@ impl Service {
|
||||||
|
|
||||||
RoomMessageEventContent::text_plain(format!(
|
RoomMessageEventContent::text_plain(format!(
|
||||||
"Services:\n{response1}\n\nDatabase:\n{response2}"
|
"Services:\n{response1}\n\nDatabase:\n{response2}"
|
||||||
)).into()
|
))
|
||||||
|
.into()
|
||||||
}
|
}
|
||||||
AdminCommand::ClearDatabaseCaches { amount } => {
|
AdminCommand::ClearDatabaseCaches { amount } => {
|
||||||
services().globals.db.clear_caches(amount);
|
services().globals.db.clear_caches(amount);
|
||||||
|
@ -786,7 +793,8 @@ impl Service {
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
return Ok(RoomMessageEventContent::text_plain(format!(
|
return Ok(RoomMessageEventContent::text_plain(format!(
|
||||||
"The supplied username is not a valid username: {e}"
|
"The supplied username is not a valid username: {e}"
|
||||||
)).into())
|
))
|
||||||
|
.into())
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -794,7 +802,8 @@ impl Service {
|
||||||
if user_id.server_name() != services().globals.server_name() {
|
if user_id.server_name() != services().globals.server_name() {
|
||||||
return Ok(RoomMessageEventContent::text_plain(
|
return Ok(RoomMessageEventContent::text_plain(
|
||||||
"The specified user is not from this server!",
|
"The specified user is not from this server!",
|
||||||
).into());
|
)
|
||||||
|
.into());
|
||||||
};
|
};
|
||||||
|
|
||||||
// Check if the specified user is valid
|
// Check if the specified user is valid
|
||||||
|
@ -808,7 +817,8 @@ impl Service {
|
||||||
{
|
{
|
||||||
return Ok(RoomMessageEventContent::text_plain(
|
return Ok(RoomMessageEventContent::text_plain(
|
||||||
"The specified user does not exist!",
|
"The specified user does not exist!",
|
||||||
).into());
|
)
|
||||||
|
.into());
|
||||||
}
|
}
|
||||||
|
|
||||||
let new_password = utils::random_string(AUTO_GEN_PASSWORD_LENGTH);
|
let new_password = utils::random_string(AUTO_GEN_PASSWORD_LENGTH);
|
||||||
|
@ -823,7 +833,8 @@ impl Service {
|
||||||
Err(e) => RoomMessageEventContent::text_plain(format!(
|
Err(e) => RoomMessageEventContent::text_plain(format!(
|
||||||
"Couldn't reset the password for user {user_id}: {e}"
|
"Couldn't reset the password for user {user_id}: {e}"
|
||||||
)),
|
)),
|
||||||
}.into()
|
}
|
||||||
|
.into()
|
||||||
}
|
}
|
||||||
AdminCommand::CreateUser { username, password } => {
|
AdminCommand::CreateUser { username, password } => {
|
||||||
let password =
|
let password =
|
||||||
|
@ -837,7 +848,8 @@ impl Service {
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
return Ok(RoomMessageEventContent::text_plain(format!(
|
return Ok(RoomMessageEventContent::text_plain(format!(
|
||||||
"The supplied username is not a valid username: {e}"
|
"The supplied username is not a valid username: {e}"
|
||||||
)).into())
|
))
|
||||||
|
.into())
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -845,18 +857,21 @@ impl Service {
|
||||||
if user_id.server_name() != services().globals.server_name() {
|
if user_id.server_name() != services().globals.server_name() {
|
||||||
return Ok(RoomMessageEventContent::text_plain(
|
return Ok(RoomMessageEventContent::text_plain(
|
||||||
"The specified user is not from this server!",
|
"The specified user is not from this server!",
|
||||||
).into());
|
)
|
||||||
|
.into());
|
||||||
};
|
};
|
||||||
|
|
||||||
if user_id.is_historical() {
|
if user_id.is_historical() {
|
||||||
return Ok(RoomMessageEventContent::text_plain(format!(
|
return Ok(RoomMessageEventContent::text_plain(format!(
|
||||||
"Userid {user_id} is not allowed due to historical"
|
"Userid {user_id} is not allowed due to historical"
|
||||||
)).into());
|
))
|
||||||
|
.into());
|
||||||
}
|
}
|
||||||
if services().users.exists(&user_id)? {
|
if services().users.exists(&user_id)? {
|
||||||
return Ok(RoomMessageEventContent::text_plain(format!(
|
return Ok(RoomMessageEventContent::text_plain(format!(
|
||||||
"Userid {user_id} already exists"
|
"Userid {user_id} already exists"
|
||||||
)).into());
|
))
|
||||||
|
.into());
|
||||||
}
|
}
|
||||||
// Create user
|
// Create user
|
||||||
services().users.create(&user_id, Some(password.as_str()))?;
|
services().users.create(&user_id, Some(password.as_str()))?;
|
||||||
|
@ -893,26 +908,26 @@ impl Service {
|
||||||
// Inhibit login does not work for guests
|
// Inhibit login does not work for guests
|
||||||
RoomMessageEventContent::text_plain(format!(
|
RoomMessageEventContent::text_plain(format!(
|
||||||
"Created user with user_id: {user_id} and password: {password}"
|
"Created user with user_id: {user_id} and password: {password}"
|
||||||
)).into()
|
))
|
||||||
|
.into()
|
||||||
}
|
}
|
||||||
AdminCommand::AllowRegistration { status } => {
|
AdminCommand::AllowRegistration { status } => if let Some(status) = status {
|
||||||
if let Some(status) = status {
|
services().globals.set_registration(status).await;
|
||||||
services().globals.set_registration(status).await;
|
RoomMessageEventContent::text_plain(if status {
|
||||||
RoomMessageEventContent::text_plain(if status {
|
"Registration is now enabled"
|
||||||
"Registration is now enabled"
|
|
||||||
} else {
|
|
||||||
"Registration is now disabled"
|
|
||||||
})
|
|
||||||
} else {
|
} else {
|
||||||
RoomMessageEventContent::text_plain(
|
"Registration is now disabled"
|
||||||
if services().globals.allow_registration().await {
|
})
|
||||||
"Registration is currently enabled"
|
} else {
|
||||||
} else {
|
RoomMessageEventContent::text_plain(
|
||||||
"Registration is currently disabled"
|
if services().globals.allow_registration().await {
|
||||||
},
|
"Registration is currently enabled"
|
||||||
)
|
} else {
|
||||||
}.into()
|
"Registration is currently disabled"
|
||||||
|
},
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
.into(),
|
||||||
AdminCommand::DisableRoom { room_id } => {
|
AdminCommand::DisableRoom { room_id } => {
|
||||||
services().rooms.metadata.disable_room(&room_id, true)?;
|
services().rooms.metadata.disable_room(&room_id, true)?;
|
||||||
RoomMessageEventContent::text_plain("Room disabled.").into()
|
RoomMessageEventContent::text_plain("Room disabled.").into()
|
||||||
|
@ -955,6 +970,7 @@ impl Service {
|
||||||
services()
|
services()
|
||||||
.media
|
.media
|
||||||
.purge_from_user(&user_id, purge_media.force_filehash, after)
|
.purge_from_user(&user_id, purge_media.force_filehash, after)
|
||||||
|
.await
|
||||||
.len()
|
.len()
|
||||||
} else {
|
} else {
|
||||||
0
|
0
|
||||||
|
@ -1023,13 +1039,17 @@ impl Service {
|
||||||
failed_count += services()
|
failed_count += services()
|
||||||
.media
|
.media
|
||||||
.purge_from_user(user_id, purge_media.force_filehash, after)
|
.purge_from_user(user_id, purge_media.force_filehash, after)
|
||||||
|
.await
|
||||||
.len();
|
.len();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut message = format!("Deactivated {deactivation_count} accounts.");
|
let mut message = format!("Deactivated {deactivation_count} accounts.");
|
||||||
if !admins.is_empty() {
|
if !admins.is_empty() {
|
||||||
message.push_str(&format!("\nSkipped admin accounts: {:?}. Use --force to deactivate admin accounts",admins.join(", ")));
|
message.push_str(&format!(
|
||||||
|
"\nSkipped admin accounts: {:?}. Use --force to deactivate admin accounts",
|
||||||
|
admins.join(", ")
|
||||||
|
));
|
||||||
}
|
}
|
||||||
if failed_count != 0 {
|
if failed_count != 0 {
|
||||||
message.push_str(&format!(
|
message.push_str(&format!(
|
||||||
|
@ -1042,14 +1062,19 @@ impl Service {
|
||||||
RoomMessageEventContent::text_plain(
|
RoomMessageEventContent::text_plain(
|
||||||
"Expected code block in command body. Add --help for details.",
|
"Expected code block in command body. Add --help for details.",
|
||||||
)
|
)
|
||||||
}.into()
|
}
|
||||||
|
.into()
|
||||||
}
|
}
|
||||||
AdminCommand::QueryMedia { mxc } => {
|
AdminCommand::QueryMedia { mxc } => {
|
||||||
let Ok((server_name, media_id)) = mxc.parts() else {
|
let Ok((server_name, media_id)) = mxc.parts() else {
|
||||||
return Ok(RoomMessageEventContent::text_plain("Invalid media MXC").into());
|
return Ok(RoomMessageEventContent::text_plain("Invalid media MXC").into());
|
||||||
};
|
};
|
||||||
|
|
||||||
let MediaQuery{ is_blocked, source_file, thumbnails } = services().media.query(server_name, media_id)?;
|
let MediaQuery {
|
||||||
|
is_blocked,
|
||||||
|
source_file,
|
||||||
|
thumbnails,
|
||||||
|
} = services().media.query(server_name, media_id)?;
|
||||||
let mut message = format!("Is blocked Media ID: {is_blocked}");
|
let mut message = format!("Is blocked Media ID: {is_blocked}");
|
||||||
|
|
||||||
if let Some(MediaQueryFileInfo {
|
if let Some(MediaQueryFileInfo {
|
||||||
|
@ -1060,14 +1085,16 @@ impl Service {
|
||||||
unauthenticated_access_permitted,
|
unauthenticated_access_permitted,
|
||||||
is_blocked_via_filehash,
|
is_blocked_via_filehash,
|
||||||
file_info: time_info,
|
file_info: time_info,
|
||||||
}) = source_file {
|
}) = source_file
|
||||||
|
{
|
||||||
message.push_str("\n\nInformation on full (non-thumbnail) file:\n");
|
message.push_str("\n\nInformation on full (non-thumbnail) file:\n");
|
||||||
|
|
||||||
if let Some(FileInfo {
|
if let Some(FileInfo {
|
||||||
creation,
|
creation,
|
||||||
last_access,
|
last_access,
|
||||||
size,
|
size,
|
||||||
}) = time_info {
|
}) = time_info
|
||||||
|
{
|
||||||
message.push_str(&format!("\nIs stored: true\nCreated at: {}\nLast accessed at: {}\nSize of file: {}",
|
message.push_str(&format!("\nIs stored: true\nCreated at: {}\nLast accessed at: {}\nSize of file: {}",
|
||||||
DateTime::from_timestamp(creation.try_into().unwrap_or(i64::MAX),0).expect("Timestamp is within range"),
|
DateTime::from_timestamp(creation.try_into().unwrap_or(i64::MAX),0).expect("Timestamp is within range"),
|
||||||
DateTime::from_timestamp(last_access.try_into().unwrap_or(i64::MAX),0).expect("Timestamp is within range"),
|
DateTime::from_timestamp(last_access.try_into().unwrap_or(i64::MAX),0).expect("Timestamp is within range"),
|
||||||
|
@ -1096,7 +1123,7 @@ impl Service {
|
||||||
message.push_str("\n\nInformation on thumbnails of media:");
|
message.push_str("\n\nInformation on thumbnails of media:");
|
||||||
}
|
}
|
||||||
|
|
||||||
for MediaQueryThumbInfo{
|
for MediaQueryThumbInfo {
|
||||||
width,
|
width,
|
||||||
height,
|
height,
|
||||||
sha256_hex,
|
sha256_hex,
|
||||||
|
@ -1105,13 +1132,15 @@ impl Service {
|
||||||
unauthenticated_access_permitted,
|
unauthenticated_access_permitted,
|
||||||
is_blocked_via_filehash,
|
is_blocked_via_filehash,
|
||||||
file_info: time_info,
|
file_info: time_info,
|
||||||
} in thumbnails {
|
} in thumbnails
|
||||||
|
{
|
||||||
message.push_str(&format!("\n\nDimensions: {width}x{height}"));
|
message.push_str(&format!("\n\nDimensions: {width}x{height}"));
|
||||||
if let Some(FileInfo {
|
if let Some(FileInfo {
|
||||||
creation,
|
creation,
|
||||||
last_access,
|
last_access,
|
||||||
size,
|
size,
|
||||||
}) = time_info {
|
}) = time_info
|
||||||
|
{
|
||||||
message.push_str(&format!("\nIs stored: true\nCreated at: {}\nLast accessed at: {}\nSize of file: {}",
|
message.push_str(&format!("\nIs stored: true\nCreated at: {}\nLast accessed at: {}\nSize of file: {}",
|
||||||
DateTime::from_timestamp(creation.try_into().unwrap_or(i64::MAX),0).expect("Timestamp is within range"),
|
DateTime::from_timestamp(creation.try_into().unwrap_or(i64::MAX),0).expect("Timestamp is within range"),
|
||||||
DateTime::from_timestamp(last_access.try_into().unwrap_or(i64::MAX),0).expect("Timestamp is within range"),
|
DateTime::from_timestamp(last_access.try_into().unwrap_or(i64::MAX),0).expect("Timestamp is within range"),
|
||||||
|
@ -1145,7 +1174,8 @@ impl Service {
|
||||||
file,
|
file,
|
||||||
content_type,
|
content_type,
|
||||||
content_disposition,
|
content_disposition,
|
||||||
} = client_server::media::get_content(server_name, media_id.to_owned(), true, true).await?;
|
} = client_server::media::get_content(server_name, media_id.to_owned(), true, true)
|
||||||
|
.await?;
|
||||||
|
|
||||||
if let Ok(image) = image::load_from_memory(&file) {
|
if let Ok(image) = image::load_from_memory(&file) {
|
||||||
let filename = content_disposition.and_then(|cd| cd.filename);
|
let filename = content_disposition.and_then(|cd| cd.filename);
|
||||||
|
@ -1185,10 +1215,7 @@ impl Service {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
AdminCommand::ListMedia {
|
AdminCommand::ListMedia {
|
||||||
user_server_filter: ListMediaArgs {
|
user_server_filter: ListMediaArgs { user, server },
|
||||||
user,
|
|
||||||
server,
|
|
||||||
},
|
|
||||||
include_thumbnails,
|
include_thumbnails,
|
||||||
content_type,
|
content_type,
|
||||||
uploaded_before,
|
uploaded_before,
|
||||||
|
@ -1201,7 +1228,7 @@ impl Service {
|
||||||
r#"<table><thead><tr><th scope="col">MXC URI</th><th scope="col">Dimensions (if thumbnail)</th><th scope="col">Created/Downloaded at</th><th scope="col">Uploader</th><th scope="col">Content-Type</th><th scope="col">Filename</th><th scope="col">Size</th></tr></thead><tbody>"#,
|
r#"<table><thead><tr><th scope="col">MXC URI</th><th scope="col">Dimensions (if thumbnail)</th><th scope="col">Created/Downloaded at</th><th scope="col">Uploader</th><th scope="col">Content-Type</th><th scope="col">Filename</th><th scope="col">Size</th></tr></thead><tbody>"#,
|
||||||
);
|
);
|
||||||
|
|
||||||
for MediaListItem{
|
for MediaListItem {
|
||||||
server_name,
|
server_name,
|
||||||
media_id,
|
media_id,
|
||||||
uploader_localpart,
|
uploader_localpart,
|
||||||
|
@ -1211,9 +1238,8 @@ impl Service {
|
||||||
size,
|
size,
|
||||||
creation,
|
creation,
|
||||||
} in services().media.list(
|
} in services().media.list(
|
||||||
user
|
user.map(ServerNameOrUserId::UserId)
|
||||||
.map(ServerNameOrUserId::UserId)
|
.or_else(|| server.map(ServerNameOrUserId::ServerName)),
|
||||||
.or_else(|| server.map(ServerNameOrUserId::ServerName)),
|
|
||||||
include_thumbnails,
|
include_thumbnails,
|
||||||
content_type.as_deref(),
|
content_type.as_deref(),
|
||||||
uploaded_before
|
uploaded_before
|
||||||
|
@ -1227,15 +1253,20 @@ impl Service {
|
||||||
.transpose()
|
.transpose()
|
||||||
.map_err(|_| Error::AdminCommand("Timestamp must be after unix epoch"))?
|
.map_err(|_| Error::AdminCommand("Timestamp must be after unix epoch"))?
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.map(Duration::as_secs)
|
.map(Duration::as_secs),
|
||||||
)? {
|
)? {
|
||||||
|
let user_id = uploader_localpart
|
||||||
let user_id = uploader_localpart.map(|localpart| format!("@{localpart}:{server_name}")).unwrap_or_default();
|
.map(|localpart| format!("@{localpart}:{server_name}"))
|
||||||
|
.unwrap_or_default();
|
||||||
let content_type = content_type.unwrap_or_default();
|
let content_type = content_type.unwrap_or_default();
|
||||||
let filename = filename.unwrap_or_default();
|
let filename = filename.unwrap_or_default();
|
||||||
let dimensions = dimensions.map(|(w, h)| format!("{w}x{h}")).unwrap_or_default();
|
let dimensions = dimensions
|
||||||
|
.map(|(w, h)| format!("{w}x{h}"))
|
||||||
|
.unwrap_or_default();
|
||||||
let size = ByteSize::b(size).display().si();
|
let size = ByteSize::b(size).display().si();
|
||||||
let creation = DateTime::from_timestamp(creation.try_into().unwrap_or(i64::MAX),0).expect("Timestamp is within range");
|
let creation =
|
||||||
|
DateTime::from_timestamp(creation.try_into().unwrap_or(i64::MAX), 0)
|
||||||
|
.expect("Timestamp is within range");
|
||||||
|
|
||||||
markdown_message
|
markdown_message
|
||||||
.push_str(&format!("\n| mxc://{server_name}/{media_id} | {dimensions} | {creation} | {user_id} | {content_type} | {filename} | {size} |"));
|
.push_str(&format!("\n| mxc://{server_name}/{media_id} | {dimensions} | {creation} | {user_id} | {content_type} | {filename} | {size} |"));
|
||||||
|
@ -1248,11 +1279,10 @@ impl Service {
|
||||||
html_message.push_str("</tbody></table>");
|
html_message.push_str("</tbody></table>");
|
||||||
|
|
||||||
RoomMessageEventContent::text_html(markdown_message, html_message).into()
|
RoomMessageEventContent::text_html(markdown_message, html_message).into()
|
||||||
},
|
}
|
||||||
AdminCommand::PurgeMedia => media_from_body(body).map_or_else(
|
AdminCommand::PurgeMedia => match media_from_body(body) {
|
||||||
|message| message,
|
Ok(media) => {
|
||||||
|media| {
|
let failed_count = services().media.purge(&media, true).await.len();
|
||||||
let failed_count = services().media.purge(&media, true).len();
|
|
||||||
|
|
||||||
if failed_count == 0 {
|
if failed_count == 0 {
|
||||||
RoomMessageEventContent::text_plain("Successfully purged media")
|
RoomMessageEventContent::text_plain("Successfully purged media")
|
||||||
|
@ -1260,9 +1290,11 @@ impl Service {
|
||||||
RoomMessageEventContent::text_plain(format!(
|
RoomMessageEventContent::text_plain(format!(
|
||||||
"Failed to delete {failed_count} media, check logs for more details"
|
"Failed to delete {failed_count} media, check logs for more details"
|
||||||
))
|
))
|
||||||
}.into()
|
}
|
||||||
},
|
.into()
|
||||||
),
|
}
|
||||||
|
Err(message) => message,
|
||||||
|
},
|
||||||
AdminCommand::PurgeMediaFromUsers {
|
AdminCommand::PurgeMediaFromUsers {
|
||||||
from_last,
|
from_last,
|
||||||
force_filehash,
|
force_filehash,
|
||||||
|
@ -1282,6 +1314,7 @@ impl Service {
|
||||||
failed_count += services()
|
failed_count += services()
|
||||||
.media
|
.media
|
||||||
.purge_from_user(user_id, force_filehash, after)
|
.purge_from_user(user_id, force_filehash, after)
|
||||||
|
.await
|
||||||
.len();
|
.len();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1296,7 +1329,8 @@ impl Service {
|
||||||
RoomMessageEventContent::text_plain(
|
RoomMessageEventContent::text_plain(
|
||||||
"Expected code block in command body. Add --help for details.",
|
"Expected code block in command body. Add --help for details.",
|
||||||
)
|
)
|
||||||
}.into()
|
}
|
||||||
|
.into()
|
||||||
}
|
}
|
||||||
AdminCommand::PurgeMediaFromServer {
|
AdminCommand::PurgeMediaFromServer {
|
||||||
server_id: server_name,
|
server_id: server_name,
|
||||||
|
@ -1314,6 +1348,7 @@ impl Service {
|
||||||
let failed_count = services()
|
let failed_count = services()
|
||||||
.media
|
.media
|
||||||
.purge_from_server(&server_name, force_filehash, after)
|
.purge_from_server(&server_name, force_filehash, after)
|
||||||
|
.await
|
||||||
.len();
|
.len();
|
||||||
|
|
||||||
if failed_count == 0 {
|
if failed_count == 0 {
|
||||||
|
@ -1324,14 +1359,14 @@ impl Service {
|
||||||
RoomMessageEventContent::text_plain(format!(
|
RoomMessageEventContent::text_plain(format!(
|
||||||
"Failed to purge {failed_count} media, check logs for more details"
|
"Failed to purge {failed_count} media, check logs for more details"
|
||||||
))
|
))
|
||||||
}.into()
|
}
|
||||||
|
.into()
|
||||||
}
|
}
|
||||||
AdminCommand::BlockMedia { and_purge, reason } => media_from_body(body).map_or_else(
|
AdminCommand::BlockMedia { and_purge, reason } => match media_from_body(body) {
|
||||||
|message| message,
|
Ok(media) => {
|
||||||
|media| {
|
|
||||||
let failed_count = services().media.block(&media, reason).len();
|
let failed_count = services().media.block(&media, reason).len();
|
||||||
let failed_purge_count = if and_purge {
|
let failed_purge_count = if and_purge {
|
||||||
services().media.purge(&media, true).len()
|
services().media.purge(&media, true).await.len()
|
||||||
} else {
|
} else {
|
||||||
0
|
0
|
||||||
};
|
};
|
||||||
|
@ -1348,8 +1383,9 @@ impl Service {
|
||||||
"Failed to block {failed_count}, and purge {failed_purge_count} media, check logs for more details"
|
"Failed to block {failed_count}, and purge {failed_purge_count} media, check logs for more details"
|
||||||
))
|
))
|
||||||
}.into()
|
}.into()
|
||||||
},
|
}
|
||||||
),
|
Err(message) => message,
|
||||||
|
},
|
||||||
AdminCommand::BlockMediaFromUsers { from_last, reason } => {
|
AdminCommand::BlockMediaFromUsers { from_last, reason } => {
|
||||||
let after = from_last.map(unix_secs_from_duration).transpose()?;
|
let after = from_last.map(unix_secs_from_duration).transpose()?;
|
||||||
|
|
||||||
|
@ -1385,7 +1421,8 @@ impl Service {
|
||||||
RoomMessageEventContent::text_plain(
|
RoomMessageEventContent::text_plain(
|
||||||
"Expected code block in command body. Add --help for details.",
|
"Expected code block in command body. Add --help for details.",
|
||||||
)
|
)
|
||||||
}.into()
|
}
|
||||||
|
.into()
|
||||||
}
|
}
|
||||||
AdminCommand::ListBlockedMedia => {
|
AdminCommand::ListBlockedMedia => {
|
||||||
let mut markdown_message = String::from(
|
let mut markdown_message = String::from(
|
||||||
|
@ -1402,7 +1439,8 @@ impl Service {
|
||||||
unix_secs,
|
unix_secs,
|
||||||
reason,
|
reason,
|
||||||
sha256_hex,
|
sha256_hex,
|
||||||
}) = media else {
|
}) = media
|
||||||
|
else {
|
||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -1415,8 +1453,9 @@ impl Service {
|
||||||
.flatten()
|
.flatten()
|
||||||
.expect("Time is valid");
|
.expect("Time is valid");
|
||||||
|
|
||||||
markdown_message
|
markdown_message.push_str(&format!(
|
||||||
.push_str(&format!("\n| {sha256_hex} | mxc://{server_name}/{media_id} | {time} | {reason} |"));
|
"\n| {sha256_hex} | mxc://{server_name}/{media_id} | {time} | {reason} |"
|
||||||
|
));
|
||||||
|
|
||||||
html_message.push_str(&format!(
|
html_message.push_str(&format!(
|
||||||
"<tr><td>{sha256_hex}</td><td>mxc://{server_name}/{media_id}</td><td>{time}</td><td>{reason}</td></tr>",
|
"<tr><td>{sha256_hex}</td><td>mxc://{server_name}/{media_id}</td><td>{time}</td><td>{reason}</td></tr>",
|
||||||
|
@ -1438,7 +1477,8 @@ impl Service {
|
||||||
RoomMessageEventContent::text_plain(format!(
|
RoomMessageEventContent::text_plain(format!(
|
||||||
"Failed to unblock {failed_count} media, check logs for more details"
|
"Failed to unblock {failed_count} media, check logs for more details"
|
||||||
))
|
))
|
||||||
}.into()
|
}
|
||||||
|
.into()
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
AdminCommand::SignJson => {
|
AdminCommand::SignJson => {
|
||||||
|
@ -1463,7 +1503,8 @@ impl Service {
|
||||||
RoomMessageEventContent::text_plain(
|
RoomMessageEventContent::text_plain(
|
||||||
"Expected code block in command body. Add --help for details.",
|
"Expected code block in command body. Add --help for details.",
|
||||||
)
|
)
|
||||||
}.into()
|
}
|
||||||
|
.into()
|
||||||
}
|
}
|
||||||
AdminCommand::VerifyJson => {
|
AdminCommand::VerifyJson => {
|
||||||
if body.len() > 2 && body[0].trim() == "```" && body.last().unwrap().trim() == "```"
|
if body.len() > 2 && body[0].trim() == "```" && body.last().unwrap().trim() == "```"
|
||||||
|
@ -1524,7 +1565,8 @@ impl Service {
|
||||||
RoomMessageEventContent::text_plain(
|
RoomMessageEventContent::text_plain(
|
||||||
"Expected code block in command body. Add --help for details.",
|
"Expected code block in command body. Add --help for details.",
|
||||||
)
|
)
|
||||||
}.into()
|
}
|
||||||
|
.into()
|
||||||
}
|
}
|
||||||
AdminCommand::HashAndSignEvent { room_version_id } => {
|
AdminCommand::HashAndSignEvent { room_version_id } => {
|
||||||
if body.len() > 2
|
if body.len() > 2
|
||||||
|
@ -1539,7 +1581,10 @@ impl Service {
|
||||||
services().globals.server_name().as_str(),
|
services().globals.server_name().as_str(),
|
||||||
services().globals.keypair(),
|
services().globals.keypair(),
|
||||||
&mut value,
|
&mut value,
|
||||||
&room_version_id.rules().expect("Supported room version has rules").redaction,
|
&room_version_id
|
||||||
|
.rules()
|
||||||
|
.expect("Supported room version has rules")
|
||||||
|
.redaction,
|
||||||
) {
|
) {
|
||||||
RoomMessageEventContent::text_plain(format!("Invalid event: {e}"))
|
RoomMessageEventContent::text_plain(format!("Invalid event: {e}"))
|
||||||
} else {
|
} else {
|
||||||
|
@ -1554,7 +1599,8 @@ impl Service {
|
||||||
RoomMessageEventContent::text_plain(
|
RoomMessageEventContent::text_plain(
|
||||||
"Expected code block in command body. Add --help for details.",
|
"Expected code block in command body. Add --help for details.",
|
||||||
)
|
)
|
||||||
}.into()
|
}
|
||||||
|
.into()
|
||||||
}
|
}
|
||||||
AdminCommand::RemoveAlias { alias } => {
|
AdminCommand::RemoveAlias { alias } => {
|
||||||
if alias.server_name() != services().globals.server_name() {
|
if alias.server_name() != services().globals.server_name() {
|
||||||
|
@ -1579,7 +1625,8 @@ impl Service {
|
||||||
.alias
|
.alias
|
||||||
.remove_alias(&alias, services().globals.server_user())?;
|
.remove_alias(&alias, services().globals.server_user())?;
|
||||||
RoomMessageEventContent::text_plain("Alias removed successfully")
|
RoomMessageEventContent::text_plain("Alias removed successfully")
|
||||||
}.into()
|
}
|
||||||
|
.into()
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -230,8 +230,6 @@ impl Service {
|
||||||
shutdown: AtomicBool::new(false),
|
shutdown: AtomicBool::new(false),
|
||||||
};
|
};
|
||||||
|
|
||||||
// Remove this exception once other media backends are added
|
|
||||||
#[allow(irrefutable_let_patterns)]
|
|
||||||
if let MediaBackendConfig::FileSystem { path, .. } = &s.config.media.backend {
|
if let MediaBackendConfig::FileSystem { path, .. } = &s.config.media.backend {
|
||||||
fs::create_dir_all(path)?;
|
fs::create_dir_all(path)?;
|
||||||
}
|
}
|
||||||
|
@ -490,26 +488,40 @@ impl Service {
|
||||||
directory_structure: &DirectoryStructure,
|
directory_structure: &DirectoryStructure,
|
||||||
sha256_hex: &str,
|
sha256_hex: &str,
|
||||||
) -> Result<PathBuf> {
|
) -> Result<PathBuf> {
|
||||||
let mut r = PathBuf::new();
|
Ok(PathBuf::from_iter(self.split_media_path(
|
||||||
r.push(media_directory);
|
Some(media_directory),
|
||||||
|
directory_structure,
|
||||||
|
sha256_hex,
|
||||||
|
)?))
|
||||||
|
}
|
||||||
|
|
||||||
if let DirectoryStructure::Deep { length, depth } = directory_structure {
|
pub fn split_media_path<'a>(
|
||||||
let mut filename = sha256_hex;
|
&self,
|
||||||
for _ in 0..depth.get() {
|
media_directory: Option<&'a str>,
|
||||||
let (current_path, next) = filename.split_at(length.get().into());
|
directory_structure: &DirectoryStructure,
|
||||||
filename = next;
|
sha256_hex: &'a str,
|
||||||
r.push(current_path);
|
) -> Result<Vec<&'a str>> {
|
||||||
|
match directory_structure {
|
||||||
|
DirectoryStructure::Flat => match media_directory {
|
||||||
|
Some(path) => Ok(vec![path, sha256_hex]),
|
||||||
|
None => Ok(vec![sha256_hex]),
|
||||||
|
},
|
||||||
|
DirectoryStructure::Deep { length, depth } => {
|
||||||
|
let mut r: Vec<&'a str> = Vec::with_capacity((depth.get() + 2).into());
|
||||||
|
if let Some(path) = media_directory {
|
||||||
|
r.push(path);
|
||||||
|
}
|
||||||
|
let mut filename = sha256_hex;
|
||||||
|
for _ in 0..depth.get() {
|
||||||
|
let (current_path, next) = filename.split_at(length.get().into());
|
||||||
|
filename = next;
|
||||||
|
r.push(current_path);
|
||||||
|
}
|
||||||
|
r.push(filename);
|
||||||
|
|
||||||
|
Ok(r)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create all directories leading up to file
|
|
||||||
fs::create_dir_all(&r).inspect_err(|e| error!("Error creating leading directories for media with sha256 hash of {sha256_hex}: {e}"))?;
|
|
||||||
|
|
||||||
r.push(filename);
|
|
||||||
} else {
|
|
||||||
r.push(sha256_hex);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(r)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn shutdown(&self) {
|
pub async fn shutdown(&self) {
|
||||||
|
|
|
@ -1,12 +1,15 @@
|
||||||
mod data;
|
mod data;
|
||||||
use std::{fs, io::Cursor, sync::Arc};
|
use std::{io::Cursor, sync::Arc};
|
||||||
|
|
||||||
pub use data::Data;
|
pub use data::Data;
|
||||||
|
use futures_util::{stream, StreamExt};
|
||||||
|
use http::StatusCode;
|
||||||
use ruma::{
|
use ruma::{
|
||||||
api::client::{error::ErrorKind, media::is_safe_inline_content_type},
|
api::client::{error::ErrorKind, media::is_safe_inline_content_type},
|
||||||
http_headers::{ContentDisposition, ContentDispositionType},
|
http_headers::{ContentDisposition, ContentDispositionType},
|
||||||
OwnedServerName, ServerName, UserId,
|
OwnedServerName, ServerName, UserId,
|
||||||
};
|
};
|
||||||
|
use rusty_s3::S3Action;
|
||||||
use sha2::{digest::Output, Digest, Sha256};
|
use sha2::{digest::Output, Digest, Sha256};
|
||||||
use tracing::{error, info};
|
use tracing::{error, info};
|
||||||
|
|
||||||
|
@ -24,7 +27,7 @@ pub struct DbFileMeta {
|
||||||
}
|
}
|
||||||
|
|
||||||
use tokio::{
|
use tokio::{
|
||||||
fs::File,
|
fs::{self, File},
|
||||||
io::{AsyncReadExt, AsyncWriteExt},
|
io::{AsyncReadExt, AsyncWriteExt},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -142,7 +145,7 @@ impl Service {
|
||||||
let count = files.iter().filter(|res| res.is_ok()).count();
|
let count = files.iter().filter(|res| res.is_ok()).count();
|
||||||
info!("Found {count} media files to delete");
|
info!("Found {count} media files to delete");
|
||||||
|
|
||||||
purge_files(files);
|
purge_files(files).await;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -159,11 +162,14 @@ impl Service {
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let (sha256_digest, sha256_hex) = generate_digests(file);
|
let (sha256_digest, sha256_hex) = generate_digests(file);
|
||||||
|
|
||||||
for error in self.clear_required_space(
|
for error in self
|
||||||
&sha256_digest,
|
.clear_required_space(
|
||||||
MediaType::new(servername, false),
|
&sha256_digest,
|
||||||
size(file)?,
|
MediaType::new(servername, false),
|
||||||
)? {
|
size(file)?,
|
||||||
|
)
|
||||||
|
.await?
|
||||||
|
{
|
||||||
error!(
|
error!(
|
||||||
"Error deleting file to clear space when downloading/creating new media file: {error}"
|
"Error deleting file to clear space when downloading/creating new media file: {error}"
|
||||||
)
|
)
|
||||||
|
@ -207,7 +213,8 @@ impl Service {
|
||||||
&sha256_digest,
|
&sha256_digest,
|
||||||
MediaType::new(servername, true),
|
MediaType::new(servername, true),
|
||||||
size(file)?,
|
size(file)?,
|
||||||
)?;
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
self.db.create_thumbnail_metadata(
|
self.db.create_thumbnail_metadata(
|
||||||
sha256_digest,
|
sha256_digest,
|
||||||
|
@ -444,10 +451,14 @@ impl Service {
|
||||||
/// purged have that sha256 hash.
|
/// purged have that sha256 hash.
|
||||||
///
|
///
|
||||||
/// Returns errors for all the files that were failed to be deleted, if any.
|
/// Returns errors for all the files that were failed to be deleted, if any.
|
||||||
pub fn purge(&self, media: &[(OwnedServerName, String)], force_filehash: bool) -> Vec<Error> {
|
pub async fn purge(
|
||||||
|
&self,
|
||||||
|
media: &[(OwnedServerName, String)],
|
||||||
|
force_filehash: bool,
|
||||||
|
) -> Vec<Error> {
|
||||||
let hashes = self.db.purge_and_get_hashes(media, force_filehash);
|
let hashes = self.db.purge_and_get_hashes(media, force_filehash);
|
||||||
|
|
||||||
purge_files(hashes)
|
purge_files(hashes).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Purges all (past a certain time in unix seconds, if specified) media
|
/// Purges all (past a certain time in unix seconds, if specified) media
|
||||||
|
@ -462,7 +473,7 @@ impl Service {
|
||||||
///
|
///
|
||||||
/// Note: it only currently works for local users, as we cannot determine who
|
/// Note: it only currently works for local users, as we cannot determine who
|
||||||
/// exactly uploaded the file when it comes to remove users.
|
/// exactly uploaded the file when it comes to remove users.
|
||||||
pub fn purge_from_user(
|
pub async fn purge_from_user(
|
||||||
&self,
|
&self,
|
||||||
user_id: &UserId,
|
user_id: &UserId,
|
||||||
force_filehash: bool,
|
force_filehash: bool,
|
||||||
|
@ -472,7 +483,7 @@ impl Service {
|
||||||
.db
|
.db
|
||||||
.purge_and_get_hashes_from_user(user_id, force_filehash, after);
|
.purge_and_get_hashes_from_user(user_id, force_filehash, after);
|
||||||
|
|
||||||
purge_files(hashes)
|
purge_files(hashes).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Purges all (past a certain time in unix seconds, if specified) media
|
/// Purges all (past a certain time in unix seconds, if specified) media
|
||||||
|
@ -484,7 +495,7 @@ impl Service {
|
||||||
/// purged have that sha256 hash.
|
/// purged have that sha256 hash.
|
||||||
///
|
///
|
||||||
/// Returns errors for all the files that were failed to be deleted, if any.
|
/// Returns errors for all the files that were failed to be deleted, if any.
|
||||||
pub fn purge_from_server(
|
pub async fn purge_from_server(
|
||||||
&self,
|
&self,
|
||||||
server_name: &ServerName,
|
server_name: &ServerName,
|
||||||
force_filehash: bool,
|
force_filehash: bool,
|
||||||
|
@ -494,7 +505,7 @@ impl Service {
|
||||||
.db
|
.db
|
||||||
.purge_and_get_hashes_from_server(server_name, force_filehash, after);
|
.purge_and_get_hashes_from_server(server_name, force_filehash, after);
|
||||||
|
|
||||||
purge_files(hashes)
|
purge_files(hashes).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Checks whether the media has been blocked by administrators, returning either
|
/// Checks whether the media has been blocked by administrators, returning either
|
||||||
|
@ -558,7 +569,7 @@ impl Service {
|
||||||
self.db.list_blocked()
|
self.db.list_blocked()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn clear_required_space(
|
pub async fn clear_required_space(
|
||||||
&self,
|
&self,
|
||||||
sha256_digest: &[u8],
|
sha256_digest: &[u8],
|
||||||
media_type: MediaType,
|
media_type: MediaType,
|
||||||
|
@ -577,7 +588,7 @@ impl Service {
|
||||||
info!("Deleting {} files to clear space for new media file", count);
|
info!("Deleting {} files to clear space for new media file", count);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(purge_files(files))
|
Ok(purge_files(files).await)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Fetches the file from the configured media backend, as well as updating the "last accessed"
|
/// Fetches the file from the configured media backend, as well as updating the "last accessed"
|
||||||
|
@ -606,6 +617,44 @@ impl Service {
|
||||||
|
|
||||||
file
|
file
|
||||||
}
|
}
|
||||||
|
MediaBackendConfig::S3 {
|
||||||
|
bucket,
|
||||||
|
credentials,
|
||||||
|
duration,
|
||||||
|
path,
|
||||||
|
directory_structure,
|
||||||
|
} => {
|
||||||
|
let sha256_hex = hex::encode(sha256_digest);
|
||||||
|
let file_name = services()
|
||||||
|
.globals
|
||||||
|
.split_media_path(path.as_deref(), directory_structure, &sha256_hex)?
|
||||||
|
.join("/");
|
||||||
|
let url = bucket
|
||||||
|
.get_object(Some(credentials), &file_name)
|
||||||
|
.sign(*duration);
|
||||||
|
|
||||||
|
let client = services().globals.default_client();
|
||||||
|
let resp = client.get(url).send().await?;
|
||||||
|
|
||||||
|
if resp.status() == StatusCode::NOT_FOUND {
|
||||||
|
return Err(Error::BadRequest(
|
||||||
|
ErrorKind::NotFound,
|
||||||
|
"File does not exist",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
if !resp.status().is_success() {
|
||||||
|
error!(
|
||||||
|
"Failed to get file with sha256 hash of \"{}\" from S3 bucket: {}",
|
||||||
|
sha256_hex,
|
||||||
|
resp.text().await?
|
||||||
|
);
|
||||||
|
return Err(Error::BadS3Response(
|
||||||
|
"Failed to get media file from S3 bucket",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
resp.bytes().await?.to_vec()
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Some((server_name, media_id)) = original_file_id {
|
if let Some((server_name, media_id)) = original_file_id {
|
||||||
|
@ -631,9 +680,46 @@ pub async fn create_file(sha256_hex: &str, file: &[u8]) -> Result<()> {
|
||||||
.globals
|
.globals
|
||||||
.get_media_path(path, directory_structure, sha256_hex)?;
|
.get_media_path(path, directory_structure, sha256_hex)?;
|
||||||
|
|
||||||
|
// Create all directories leading up to file
|
||||||
|
if let DirectoryStructure::Deep { .. } = directory_structure {
|
||||||
|
if let Some(parent) = path.parent() {
|
||||||
|
fs::create_dir_all(&parent).await.inspect_err(|e| error!("Error creating leading directories for media with sha256 hash of {sha256_hex}: {e}"))?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let mut f = File::create(path).await?;
|
let mut f = File::create(path).await?;
|
||||||
f.write_all(file).await?;
|
f.write_all(file).await?;
|
||||||
}
|
}
|
||||||
|
MediaBackendConfig::S3 {
|
||||||
|
bucket,
|
||||||
|
credentials,
|
||||||
|
duration,
|
||||||
|
path,
|
||||||
|
directory_structure,
|
||||||
|
} => {
|
||||||
|
let file_name = services()
|
||||||
|
.globals
|
||||||
|
.split_media_path(path.as_deref(), directory_structure, sha256_hex)?
|
||||||
|
.join("/");
|
||||||
|
|
||||||
|
let url = bucket
|
||||||
|
.put_object(Some(credentials), &file_name)
|
||||||
|
.sign(*duration);
|
||||||
|
|
||||||
|
let client = services().globals.default_client();
|
||||||
|
let resp = client.put(url).body(file.to_vec()).send().await?;
|
||||||
|
|
||||||
|
if !resp.status().is_success() {
|
||||||
|
error!(
|
||||||
|
"Failed to upload file with sha256 hash of \"{}\" to S3 bucket: {}",
|
||||||
|
sha256_hex,
|
||||||
|
resp.text().await?
|
||||||
|
);
|
||||||
|
return Err(Error::BadS3Response(
|
||||||
|
"Failed to upload media file to S3 bucket",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -643,21 +729,29 @@ pub async fn create_file(sha256_hex: &str, file: &[u8]) -> Result<()> {
|
||||||
/// Returns a `Vec` of errors that occurred when attempting to delete the files
|
/// Returns a `Vec` of errors that occurred when attempting to delete the files
|
||||||
///
|
///
|
||||||
/// Note: this does NOT remove the related metadata from the database
|
/// Note: this does NOT remove the related metadata from the database
|
||||||
fn purge_files(hashes: Vec<Result<String>>) -> Vec<Error> {
|
async fn purge_files(hashes: Vec<Result<String>>) -> Vec<Error> {
|
||||||
hashes
|
stream::iter(hashes)
|
||||||
.into_iter()
|
.then(|hash| async move {
|
||||||
.map(|hash| match hash {
|
match hash {
|
||||||
Ok(v) => delete_file(&v),
|
Ok(v) => delete_file(&v).await,
|
||||||
Err(e) => Err(e),
|
Err(e) => Err(e),
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.filter_map(|r| async {
|
||||||
|
if let Err(e) = r {
|
||||||
|
Some(e)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
})
|
})
|
||||||
.filter_map(|r| if let Err(e) = r { Some(e) } else { None })
|
|
||||||
.collect()
|
.collect()
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Deletes the given file from the media backend
|
/// Deletes the given file from the media backend
|
||||||
///
|
///
|
||||||
/// Note: this does NOT remove the related metadata from the database
|
/// Note: this does NOT remove the related metadata from the database
|
||||||
fn delete_file(sha256_hex: &str) -> Result<()> {
|
async fn delete_file(sha256_hex: &str) -> Result<()> {
|
||||||
match &services().globals.config.media.backend {
|
match &services().globals.config.media.backend {
|
||||||
MediaBackendConfig::FileSystem {
|
MediaBackendConfig::FileSystem {
|
||||||
path,
|
path,
|
||||||
|
@ -668,7 +762,7 @@ fn delete_file(sha256_hex: &str) -> Result<()> {
|
||||||
.globals
|
.globals
|
||||||
.get_media_path(path, directory_structure, sha256_hex)?;
|
.get_media_path(path, directory_structure, sha256_hex)?;
|
||||||
|
|
||||||
if let Err(e) = fs::remove_file(&path) {
|
if let Err(e) = fs::remove_file(&path).await {
|
||||||
// Multiple files with the same filehash might be requseted to be deleted
|
// Multiple files with the same filehash might be requseted to be deleted
|
||||||
if e.kind() != std::io::ErrorKind::NotFound {
|
if e.kind() != std::io::ErrorKind::NotFound {
|
||||||
error!("Error removing media from filesystem: {e}");
|
error!("Error removing media from filesystem: {e}");
|
||||||
|
@ -683,7 +777,7 @@ fn delete_file(sha256_hex: &str) -> Result<()> {
|
||||||
// Here at the start so that the first time, the file gets removed from the path
|
// Here at the start so that the first time, the file gets removed from the path
|
||||||
path.pop();
|
path.pop();
|
||||||
|
|
||||||
if let Err(e) = fs::remove_dir(&path) {
|
if let Err(e) = fs::remove_dir(&path).await {
|
||||||
if e.kind() == std::io::ErrorKind::DirectoryNotEmpty {
|
if e.kind() == std::io::ErrorKind::DirectoryNotEmpty {
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
|
@ -696,6 +790,35 @@ fn delete_file(sha256_hex: &str) -> Result<()> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
MediaBackendConfig::S3 {
|
||||||
|
bucket,
|
||||||
|
credentials,
|
||||||
|
duration,
|
||||||
|
path,
|
||||||
|
directory_structure,
|
||||||
|
} => {
|
||||||
|
let file_name = services()
|
||||||
|
.globals
|
||||||
|
.split_media_path(path.as_deref(), directory_structure, sha256_hex)?
|
||||||
|
.join("/");
|
||||||
|
let url = bucket
|
||||||
|
.delete_object(Some(credentials), &file_name)
|
||||||
|
.sign(*duration);
|
||||||
|
|
||||||
|
let client = services().globals.default_client();
|
||||||
|
let resp = client.delete(url).send().await?;
|
||||||
|
|
||||||
|
if !resp.status().is_success() {
|
||||||
|
error!(
|
||||||
|
"Failed to delete file with sha256 hash of \"{}\" from S3 bucket: {}",
|
||||||
|
sha256_hex,
|
||||||
|
resp.text().await?
|
||||||
|
);
|
||||||
|
return Err(Error::BadS3Response(
|
||||||
|
"Failed to delete media file from S3 bucket",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
|
@ -52,6 +52,8 @@ pub enum Error {
|
||||||
source: std::io::Error,
|
source: std::io::Error,
|
||||||
},
|
},
|
||||||
#[error("{0}")]
|
#[error("{0}")]
|
||||||
|
BadS3Response(&'static str),
|
||||||
|
#[error("{0}")]
|
||||||
BadServerResponse(&'static str),
|
BadServerResponse(&'static str),
|
||||||
#[error("{0}")]
|
#[error("{0}")]
|
||||||
BadConfig(&'static str),
|
BadConfig(&'static str),
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue