diff --git a/Cargo.lock b/Cargo.lock index d44c49ed..719cf32a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -487,7 +487,7 @@ checksum = "3d7b894f5411737b7867f4827955924d7c254fc9f4d91a6aad6b097804b1018b" [[package]] name = "conduit" -version = "0.9.0-alpha" +version = "0.10.0-alpha" dependencies = [ "async-trait", "axum 0.7.5", @@ -1194,6 +1194,15 @@ dependencies = [ "itoa", ] +[[package]] +name = "http-auth" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "643c9bbf6a4ea8a656d6b4cd53d34f79e3f841ad5203c1a55fb7d761923bc255" +dependencies = [ + "memchr", +] + [[package]] name = "http-body" version = "0.4.6" @@ -1230,9 +1239,9 @@ dependencies = [ [[package]] name = "httparse" -version = "1.9.3" +version = "1.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0e7a4dd27b9476dc40cb050d3632d3bba3a70ddbff012285f7f8559a1e7e545" +checksum = "0fcc0b4a115bf80b728eb8ea024ad5bd707b615bfed49e0665b6e0f86fd082d9" [[package]] name = "httpdate" @@ -2486,7 +2495,7 @@ dependencies = [ [[package]] name = "ruma" version = "0.10.1" -source = "git+https://github.com/ruma/ruma#c21817436979acbe66d43064498920a6d289b562" +source = "git+https://github.com/ruma/ruma#c06af4385e0e30c48a8e9ca3d488da32102d0db9" dependencies = [ "assign", "js_int", @@ -2507,7 +2516,7 @@ dependencies = [ [[package]] name = "ruma-appservice-api" version = "0.10.0" -source = "git+https://github.com/ruma/ruma#c21817436979acbe66d43064498920a6d289b562" +source = "git+https://github.com/ruma/ruma#c06af4385e0e30c48a8e9ca3d488da32102d0db9" dependencies = [ "js_int", "ruma-common", @@ -2519,7 +2528,7 @@ dependencies = [ [[package]] name = "ruma-client-api" version = "0.18.0" -source = "git+https://github.com/ruma/ruma#c21817436979acbe66d43064498920a6d289b562" +source = "git+https://github.com/ruma/ruma#c06af4385e0e30c48a8e9ca3d488da32102d0db9" dependencies = [ "as_variant", "assign", @@ -2542,7 +2551,7 @@ dependencies = [ [[package]] name = "ruma-common" version = "0.13.0" -source = "git+https://github.com/ruma/ruma#c21817436979acbe66d43064498920a6d289b562" +source = "git+https://github.com/ruma/ruma#c06af4385e0e30c48a8e9ca3d488da32102d0db9" dependencies = [ "as_variant", "base64 0.22.1", @@ -2572,7 +2581,7 @@ dependencies = [ [[package]] name = "ruma-events" version = "0.28.1" -source = "git+https://github.com/ruma/ruma#c21817436979acbe66d43064498920a6d289b562" +source = "git+https://github.com/ruma/ruma#c06af4385e0e30c48a8e9ca3d488da32102d0db9" dependencies = [ "as_variant", "indexmap 2.2.6", @@ -2588,15 +2597,22 @@ dependencies = [ "thiserror", "tracing", "url", + "web-time", "wildmatch", ] [[package]] name = "ruma-federation-api" version = "0.9.0" -source = "git+https://github.com/ruma/ruma#c21817436979acbe66d43064498920a6d289b562" +source = "git+https://github.com/ruma/ruma#c06af4385e0e30c48a8e9ca3d488da32102d0db9" dependencies = [ + "bytes", + "http 1.1.0", + "httparse", "js_int", + "memchr", + "mime", + "rand", "ruma-common", "ruma-events", "serde", @@ -2606,7 +2622,7 @@ dependencies = [ [[package]] name = "ruma-identifiers-validation" version = "0.9.5" -source = "git+https://github.com/ruma/ruma#c21817436979acbe66d43064498920a6d289b562" +source = "git+https://github.com/ruma/ruma#c06af4385e0e30c48a8e9ca3d488da32102d0db9" dependencies = [ "js_int", "thiserror", @@ -2615,7 +2631,7 @@ dependencies = [ [[package]] name = "ruma-identity-service-api" version = "0.9.0" -source = "git+https://github.com/ruma/ruma#c21817436979acbe66d43064498920a6d289b562" +source = "git+https://github.com/ruma/ruma#c06af4385e0e30c48a8e9ca3d488da32102d0db9" dependencies = [ "js_int", "ruma-common", @@ -2625,8 +2641,9 @@ dependencies = [ [[package]] name = "ruma-macros" version = "0.13.0" -source = "git+https://github.com/ruma/ruma#c21817436979acbe66d43064498920a6d289b562" +source = "git+https://github.com/ruma/ruma#c06af4385e0e30c48a8e9ca3d488da32102d0db9" dependencies = [ + "cfg-if", "once_cell", "proc-macro-crate", "proc-macro2", @@ -2640,7 +2657,7 @@ dependencies = [ [[package]] name = "ruma-push-gateway-api" version = "0.9.0" -source = "git+https://github.com/ruma/ruma#c21817436979acbe66d43064498920a6d289b562" +source = "git+https://github.com/ruma/ruma#c06af4385e0e30c48a8e9ca3d488da32102d0db9" dependencies = [ "js_int", "ruma-common", @@ -2652,18 +2669,20 @@ dependencies = [ [[package]] name = "ruma-server-util" version = "0.3.0" -source = "git+https://github.com/ruma/ruma#c21817436979acbe66d43064498920a6d289b562" +source = "git+https://github.com/ruma/ruma#c06af4385e0e30c48a8e9ca3d488da32102d0db9" dependencies = [ "headers", + "http 1.1.0", + "http-auth", "ruma-common", + "thiserror", "tracing", - "yap", ] [[package]] name = "ruma-signatures" version = "0.15.0" -source = "git+https://github.com/ruma/ruma#c21817436979acbe66d43064498920a6d289b562" +source = "git+https://github.com/ruma/ruma#c06af4385e0e30c48a8e9ca3d488da32102d0db9" dependencies = [ "base64 0.22.1", "ed25519-dalek", @@ -2679,7 +2698,7 @@ dependencies = [ [[package]] name = "ruma-state-res" version = "0.11.0" -source = "git+https://github.com/ruma/ruma#c21817436979acbe66d43064498920a6d289b562" +source = "git+https://github.com/ruma/ruma#c06af4385e0e30c48a8e9ca3d488da32102d0db9" dependencies = [ "itertools", "js_int", @@ -4143,12 +4162,6 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cfe53a6657fd280eaa890a3bc59152892ffa3e30101319d168b781ed6529b049" -[[package]] -name = "yap" -version = "0.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfe269e7b803a5e8e20cbd97860e136529cd83bf2c9c6d37b142467e7e1f051f" - [[package]] name = "yoke" version = "0.7.4" diff --git a/Cargo.toml b/Cargo.toml index ad87827d..d440acce 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,10 +16,10 @@ license = "Apache-2.0" name = "conduit" readme = "README.md" repository = "https://gitlab.com/famedly/conduit" -version = "0.9.0-alpha" +version = "0.10.0-alpha" # See also `rust-toolchain.toml` -rust-version = "1.78.0" +rust-version = "1.79.0" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/complement/Dockerfile b/complement/Dockerfile index 341470af..e7cde40e 100644 --- a/complement/Dockerfile +++ b/complement/Dockerfile @@ -1,4 +1,4 @@ -FROM rust:1.78.0 +FROM rust:1.79.0 WORKDIR /workdir diff --git a/docs/configuration.md b/docs/configuration.md index d903a21e..9687ead1 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -6,6 +6,8 @@ > **Note:** If you update the configuration file, you must restart Conduit for the changes to take effect +> **Note:** You can also configure Conduit by using `CONDUIT_{field_name}` environment variables. To set values inside a table, use `CONDUIT_{table_name}_{field_name}`. Example: `CONDUIT_WELL_KNOWN_CLIENT="https://matrix.example.org"` + Conduit's configuration file is divided into the following sections: - [Global](#global) diff --git a/docs/deploying/docker.md b/docs/deploying/docker.md index a45c6704..f914427a 100644 --- a/docs/deploying/docker.md +++ b/docs/deploying/docker.md @@ -64,6 +64,7 @@ docker run -d -p 8448:6167 \ -e CONDUIT_MAX_REQUEST_SIZE="20000000" \ -e CONDUIT_TRUSTED_SERVERS="[\"matrix.org\"]" \ -e CONDUIT_MAX_CONCURRENT_REQUESTS="100" \ + -e CONDUIT_PORT="6167" \ --name conduit ``` diff --git a/docs/turn.md b/docs/turn.md index 11a7180f..94d32db1 100644 --- a/docs/turn.md +++ b/docs/turn.md @@ -2,7 +2,7 @@ ## General instructions -* It is assumed you have a [Coturn server](https://github.com/coturn/coturn) up and running. See [Synapse reference implementation](https://github.com/matrix-org/synapse/blob/develop/docs/turn-howto.md). +* It is assumed you have a [Coturn server](https://github.com/coturn/coturn) up and running. See [Synapse reference implementation](https://github.com/element-hq/synapse/blob/develop/docs/turn-howto.md). ## Edit/Add a few settings to your existing conduit.toml diff --git a/flake.nix b/flake.nix index 91325443..f36f7e7a 100644 --- a/flake.nix +++ b/flake.nix @@ -59,7 +59,7 @@ file = ./rust-toolchain.toml; # See also `rust-toolchain.toml` - sha256 = "sha256-opUgs6ckUQCyDxcB9Wy51pqhd0MPGHUVbwRKKPGiwZU="; + sha256 = "sha256-Ngiz76YP4HTY75GGdH2P+APE/DEIx2R/Dn+BwwOyzZU="; }; }); in diff --git a/nix/shell.nix b/nix/shell.nix index bd070fe6..584a6bb3 100644 --- a/nix/shell.nix +++ b/nix/shell.nix @@ -23,7 +23,7 @@ mkShell { }; # Development tools - nativeBuildInputs = default.nativeBuildInputs ++ [ + nativeBuildInputs = [ # Always use nightly rustfmt because most of its options are unstable # # This needs to come before `toolchain` in this list, otherwise @@ -57,5 +57,5 @@ mkShell { # Useful for editing the book locally mdbook - ]; + ] ++ default.nativeBuildInputs ; } diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 3ffd3a5e..957c8f41 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -2,7 +2,6 @@ # # Other files that need upkeep when this changes: # -# * `.gitlab-ci.yml` # * `Cargo.toml` # * `flake.nix` # @@ -10,7 +9,7 @@ # If you're having trouble making the relevant changes, bug a maintainer. [toolchain] -channel = "1.78.0" +channel = "1.79.0" components = [ # For rust-analyzer "rust-src", diff --git a/src/api/client_server/account.rs b/src/api/client_server/account.rs index 36640b54..47ccdc83 100644 --- a/src/api/client_server/account.rs +++ b/src/api/client_server/account.rs @@ -315,7 +315,11 @@ pub async fn register_route(body: Ruma) -> Result, ) -> Result { - let sender_user = body.sender_user.as_ref().expect("user is authenticated"); + let sender_user = body + .sender_user + .as_ref() + // In the future password changes could be performed with UIA with 3PIDs, but we don't support that currently + .ok_or_else(|| Error::BadRequest(ErrorKind::MissingToken, "Missing access token."))?; let sender_device = body.sender_device.as_ref().expect("user is authenticated"); let mut uiaainfo = UiaaInfo { @@ -402,7 +406,11 @@ pub async fn whoami_route(body: Ruma) -> Result, ) -> Result { - let sender_user = body.sender_user.as_ref().expect("user is authenticated"); + let sender_user = body + .sender_user + .as_ref() + // In the future password changes could be performed with UIA with SSO, but we don't support that currently + .ok_or_else(|| Error::BadRequest(ErrorKind::MissingToken, "Missing access token."))?; let sender_device = body.sender_device.as_ref().expect("user is authenticated"); let mut uiaainfo = UiaaInfo { diff --git a/src/api/client_server/media.rs b/src/api/client_server/media.rs index 8b37adf0..77db7f83 100644 --- a/src/api/client_server/media.rs +++ b/src/api/client_server/media.rs @@ -1,12 +1,26 @@ +// Unauthenticated media is deprecated +#![allow(deprecated)] + use std::time::Duration; use crate::{service::media::{FileMeta, UrlPreviewData}, services, utils, Error, Result, Ruma}; -use ruma::api::client::{ - error::{ErrorKind, RetryAfter}, - media::{ - create_content, get_content, get_content_as_filename, get_content_thumbnail, - get_media_config, get_media_preview +use http::header::{CONTENT_DISPOSITION, CONTENT_TYPE}; +use ruma::{ + api::{ + client::{ + authenticated_media::{ + get_content, get_content_as_filename, get_content_thumbnail, get_media_config, + }, + error::{ErrorKind, RetryAfter}, + media::{ + self, create_content, get_media_preview, + }, + }, + federation::authenticated_media::{self as federation_media, FileOrLocation}, }, + http_headers::{ContentDisposition, ContentDispositionType}, + media::Method, + ServerName, UInt, }; use { @@ -22,9 +36,20 @@ const MXC_LENGTH: usize = 32; /// /// Returns max upload size. pub async fn get_media_config_route( - _body: Ruma, -) -> Result { - Ok(get_media_config::v3::Response { + _body: Ruma, +) -> Result { + Ok(media::get_media_config::v3::Response { + upload_size: services().globals.max_request_size().into(), + }) +} + +/// # `GET /_matrix/client/v1/media/config` +/// +/// Returns max upload size. +pub async fn get_media_config_auth_route( + _body: Ruma, +) -> Result { + Ok(get_media_config::v1::Response { upload_size: services().globals.max_request_size().into(), }) } @@ -301,10 +326,10 @@ pub async fn create_content_route( .media .create( mxc.clone(), - body.filename - .as_ref() - .map(|filename| "inline; filename=".to_owned() + filename) - .as_deref(), + Some( + ContentDisposition::new(ContentDispositionType::Inline) + .with_filename(body.filename.clone()), + ), body.content_type.as_deref(), &body.file, ) @@ -318,28 +343,67 @@ pub async fn create_content_route( pub async fn get_remote_content( mxc: &str, - server_name: &ruma::ServerName, + server_name: &ServerName, media_id: String, -) -> Result { - let content_response = services() +) -> Result { + let content_response = match services() .sending .send_federation_request( server_name, - get_content::v3::Request { - allow_remote: false, - server_name: server_name.to_owned(), - media_id, + federation_media::get_content::v1::Request { + media_id: media_id.clone(), timeout_ms: Duration::from_secs(20), - allow_redirect: false, }, ) - .await?; + .await + { + Ok(federation_media::get_content::v1::Response { + metadata: _, + content: FileOrLocation::File(content), + }) => get_content::v1::Response { + file: content.file, + content_type: content.content_type, + content_disposition: content.content_disposition, + }, + + Ok(federation_media::get_content::v1::Response { + metadata: _, + content: FileOrLocation::Location(url), + }) => get_location_content(url).await?, + Err(Error::BadRequest(ErrorKind::Unrecognized, _)) => { + let media::get_content::v3::Response { + file, + content_type, + content_disposition, + .. + } = services() + .sending + .send_federation_request( + server_name, + media::get_content::v3::Request { + server_name: server_name.to_owned(), + media_id, + timeout_ms: Duration::from_secs(20), + allow_remote: false, + allow_redirect: true, + }, + ) + .await?; + + get_content::v1::Response { + file, + content_type, + content_disposition, + } + } + Err(e) => return Err(e), + }; services() .media .create( mxc.to_owned(), - content_response.content_disposition.as_deref(), + content_response.content_disposition.clone(), content_response.content_type.as_deref(), &content_response.file, ) @@ -354,31 +418,57 @@ pub async fn get_remote_content( /// /// - Only allows federation if `allow_remote` is true pub async fn get_content_route( - body: Ruma, -) -> Result { - let mxc = format!("mxc://{}/{}", body.server_name, body.media_id); + body: Ruma, +) -> Result { + let get_content::v1::Response { + file, + content_disposition, + content_type, + } = get_content(&body.server_name, body.media_id.clone(), body.allow_remote).await?; - if let Some(FileMeta { + Ok(media::get_content::v3::Response { + file, + content_type, + content_disposition, + cross_origin_resource_policy: Some("cross-origin".to_owned()), + }) +} + +/// # `GET /_matrix/client/v1/media/download/{serverName}/{mediaId}` +/// +/// Load media from our server or over federation. +pub async fn get_content_auth_route( + body: Ruma, +) -> Result { + get_content(&body.server_name, body.media_id.clone(), true).await +} + +async fn get_content( + server_name: &ServerName, + media_id: String, + allow_remote: bool, +) -> Result { + let mxc = format!("mxc://{}/{}", server_name, media_id); + + if let Ok(Some(FileMeta { content_disposition, content_type, file, - }) = services().media.get(mxc.clone()).await? + })) = services().media.get(mxc.clone()).await { - Ok(get_content::v3::Response { + Ok(get_content::v1::Response { file, content_type, - content_disposition, - cross_origin_resource_policy: Some("cross-origin".to_owned()), + content_disposition: Some(content_disposition), }) - } else if &*body.server_name != services().globals.server_name() && body.allow_remote { + } else if server_name != services().globals.server_name() && allow_remote { let remote_content_response = - get_remote_content(&mxc, &body.server_name, body.media_id.clone()).await?; + get_remote_content(&mxc, server_name, media_id.clone()).await?; - Ok(get_content::v3::Response { + Ok(get_content::v1::Response { content_disposition: remote_content_response.content_disposition, content_type: remote_content_response.content_type, file: remote_content_response.file, - cross_origin_resource_policy: Some("cross-origin".to_owned()), }) } else { Err(Error::BadRequest(ErrorKind::NotFound, "Media not found.")) @@ -391,29 +481,74 @@ pub async fn get_content_route( /// /// - Only allows federation if `allow_remote` is true pub async fn get_content_as_filename_route( - body: Ruma, -) -> Result { - let mxc = format!("mxc://{}/{}", body.server_name, body.media_id); + body: Ruma, +) -> Result { + let get_content_as_filename::v1::Response { + file, + content_type, + content_disposition, + } = get_content_as_filename( + &body.server_name, + body.media_id.clone(), + body.filename.clone(), + body.allow_remote, + ) + .await?; - if let Some(FileMeta { + Ok(media::get_content_as_filename::v3::Response { + file, + content_type, + content_disposition, + cross_origin_resource_policy: Some("cross-origin".to_owned()), + }) +} + +/// # `GET /_matrix/client/v1/media/download/{serverName}/{mediaId}/{fileName}` +/// +/// Load media from our server or over federation, permitting desired filename. +pub async fn get_content_as_filename_auth_route( + body: Ruma, +) -> Result { + get_content_as_filename( + &body.server_name, + body.media_id.clone(), + body.filename.clone(), + true, + ) + .await +} + +async fn get_content_as_filename( + server_name: &ServerName, + media_id: String, + filename: String, + allow_remote: bool, +) -> Result { + let mxc = format!("mxc://{}/{}", server_name, media_id); + + if let Ok(Some(FileMeta { file, content_type, .. - }) = services().media.get(mxc.clone()).await? + })) = services().media.get(mxc.clone()).await { - Ok(get_content_as_filename::v3::Response { + Ok(get_content_as_filename::v1::Response { file, content_type, - content_disposition: Some(format!("inline; filename={}", body.filename)), - cross_origin_resource_policy: Some("cross-origin".to_owned()), + content_disposition: Some( + ContentDisposition::new(ContentDispositionType::Inline) + .with_filename(Some(filename.clone())), + ), }) - } else if &*body.server_name != services().globals.server_name() && body.allow_remote { + } else if server_name != services().globals.server_name() && allow_remote { let remote_content_response = - get_remote_content(&mxc, &body.server_name, body.media_id.clone()).await?; + get_remote_content(&mxc, server_name, media_id.clone()).await?; - Ok(get_content_as_filename::v3::Response { - content_disposition: Some(format!("inline: filename={}", body.filename)), + Ok(get_content_as_filename::v1::Response { + content_disposition: Some( + ContentDisposition::new(ContentDispositionType::Inline) + .with_filename(Some(filename.clone())), + ), content_type: remote_content_response.content_type, file: remote_content_response.file, - cross_origin_resource_policy: Some("cross-origin".to_owned()), }) } else { Err(Error::BadRequest(ErrorKind::NotFound, "Media not found.")) @@ -426,62 +561,169 @@ pub async fn get_content_as_filename_route( /// /// - Only allows federation if `allow_remote` is true pub async fn get_content_thumbnail_route( - body: Ruma, -) -> Result { - let mxc = format!("mxc://{}/{}", body.server_name, body.media_id); + body: Ruma, +) -> Result { + let get_content_thumbnail::v1::Response { file, content_type } = get_content_thumbnail( + &body.server_name, + body.media_id.clone(), + body.height, + body.width, + body.method.clone(), + body.animated, + body.allow_remote, + ) + .await?; - if let Some(FileMeta { + Ok(media::get_content_thumbnail::v3::Response { + file, + content_type, + cross_origin_resource_policy: Some("cross-origin".to_owned()), + }) +} + +/// # `GET /_matrix/client/v1/media/thumbnail/{serverName}/{mediaId}` +/// +/// Load media thumbnail from our server or over federation. +pub async fn get_content_thumbnail_auth_route( + body: Ruma, +) -> Result { + get_content_thumbnail( + &body.server_name, + body.media_id.clone(), + body.height, + body.width, + body.method.clone(), + body.animated, + true, + ) + .await +} + +async fn get_content_thumbnail( + server_name: &ServerName, + media_id: String, + height: UInt, + width: UInt, + method: Option, + animated: Option, + allow_remote: bool, +) -> Result { + let mxc = format!("mxc://{}/{}", server_name, media_id); + + if let Ok(Some(FileMeta { file, content_type, .. - }) = services() + })) = services() .media .get_thumbnail( mxc.clone(), - body.width + width .try_into() .map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Width is invalid."))?, - body.height + height .try_into() - .map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Width is invalid."))?, + .map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Height is invalid."))?, ) - .await? + .await { - Ok(get_content_thumbnail::v3::Response { - file, - content_type, - cross_origin_resource_policy: Some("cross-origin".to_owned()), - }) - } else if &*body.server_name != services().globals.server_name() && body.allow_remote { - let get_thumbnail_response = services() + Ok(get_content_thumbnail::v1::Response { file, content_type }) + } else if server_name != services().globals.server_name() && allow_remote { + let thumbnail_response = match services() .sending .send_federation_request( - &body.server_name, - get_content_thumbnail::v3::Request { - allow_remote: false, - height: body.height, - width: body.width, - method: body.method.clone(), - server_name: body.server_name.clone(), - media_id: body.media_id.clone(), + server_name, + federation_media::get_content_thumbnail::v1::Request { + height, + width, + method: method.clone(), + media_id: media_id.clone(), timeout_ms: Duration::from_secs(20), - allow_redirect: false, + animated, }, ) - .await?; + .await + { + Ok(federation_media::get_content_thumbnail::v1::Response { + metadata: _, + content: FileOrLocation::File(content), + }) => get_content_thumbnail::v1::Response { + file: content.file, + content_type: content.content_type, + }, + + Ok(federation_media::get_content_thumbnail::v1::Response { + metadata: _, + content: FileOrLocation::Location(url), + }) => { + let get_content::v1::Response { + file, content_type, .. + } = get_location_content(url).await?; + + get_content_thumbnail::v1::Response { file, content_type } + } + Err(Error::BadRequest(ErrorKind::Unrecognized, _)) => { + let media::get_content_thumbnail::v3::Response { + file, content_type, .. + } = services() + .sending + .send_federation_request( + server_name, + media::get_content_thumbnail::v3::Request { + height, + width, + method: method.clone(), + server_name: server_name.to_owned(), + media_id: media_id.clone(), + timeout_ms: Duration::from_secs(20), + allow_redirect: false, + animated, + allow_remote: false, + }, + ) + .await?; + + get_content_thumbnail::v1::Response { file, content_type } + } + Err(e) => return Err(e), + }; services() .media .upload_thumbnail( mxc, - None, - get_thumbnail_response.content_type.as_deref(), - body.width.try_into().expect("all UInts are valid u32s"), - body.height.try_into().expect("all UInts are valid u32s"), - &get_thumbnail_response.file, + thumbnail_response.content_type.as_deref(), + width.try_into().expect("all UInts are valid u32s"), + height.try_into().expect("all UInts are valid u32s"), + &thumbnail_response.file, ) .await?; - Ok(get_thumbnail_response) + Ok(thumbnail_response) } else { Err(Error::BadRequest(ErrorKind::NotFound, "Media not found.")) } } + +async fn get_location_content(url: String) -> Result { + let client = services().globals.default_client(); + let response = client.get(url).send().await?; + let headers = response.headers(); + + let content_type = headers + .get(CONTENT_TYPE) + .and_then(|header| header.to_str().ok()) + .map(ToOwned::to_owned); + + let content_disposition = headers + .get(CONTENT_DISPOSITION) + .map(|header| header.as_bytes()) + .map(TryFrom::try_from) + .and_then(Result::ok); + + let file = response.bytes().await?.to_vec(); + + Ok(get_content::v1::Response { + file, + content_type, + content_disposition, + }) +} diff --git a/src/api/client_server/membership.rs b/src/api/client_server/membership.rs index 1ca711e2..baf2f239 100644 --- a/src/api/client_server/membership.rs +++ b/src/api/client_server/membership.rs @@ -97,7 +97,7 @@ pub async fn join_room_by_id_or_alias_route( let (servers, room_id) = match OwnedRoomId::try_from(body.room_id_or_alias) { Ok(room_id) => { - let mut servers = body.server_name.clone(); + let mut servers = body.via.clone(); servers.extend( services() .rooms @@ -241,6 +241,7 @@ pub async fn kick_user_route( unsigned: None, state_key: Some(body.user_id.to_string()), redacts: None, + timestamp: None, }, sender_user, &body.room_id, @@ -313,6 +314,7 @@ pub async fn ban_user_route(body: Ruma) -> Result::try_from(event_id.as_str()) .expect("ruma's reference hashes are valid event ids"); @@ -938,6 +941,7 @@ async fn join_room_by_id_helper( unsigned: None, state_key: Some(sender_user.to_string()), redacts: None, + timestamp: None, }, sender_user, room_id, @@ -1141,7 +1145,7 @@ async fn validate_and_add_event_id( let event_id = EventId::parse(format!( "${}", ruma::signatures::reference_hash(&value, room_version) - .expect("ruma can calculate reference hashes") + .map_err(|_| Error::BadRequest(ErrorKind::BadJson, "Invalid PDU format"))? )) .expect("ruma's reference hashes are valid event ids"); @@ -1260,6 +1264,7 @@ pub(crate) async fn invite_helper<'a>( unsigned: None, state_key: Some(user_id.to_string()), redacts: None, + timestamp: None, }, sender_user, room_id, @@ -1379,6 +1384,7 @@ pub(crate) async fn invite_helper<'a>( unsigned: None, state_key: Some(user_id.to_string()), redacts: None, + timestamp: None, }, sender_user, room_id, @@ -1506,6 +1512,7 @@ pub async fn leave_room(user_id: &UserId, room_id: &RoomId, reason: Option Result<()> { let event_id = EventId::parse(format!( "${}", ruma::signatures::reference_hash(&leave_event_stub, &room_version_id) - .expect("ruma can calculate reference hashes") + .expect("Event format validated when event was hashed") )) .expect("ruma's reference hashes are valid event ids"); diff --git a/src/api/client_server/message.rs b/src/api/client_server/message.rs index c9b39f16..fbdda077 100644 --- a/src/api/client_server/message.rs +++ b/src/api/client_server/message.rs @@ -84,6 +84,11 @@ pub async fn send_message_event_route( unsigned: Some(unsigned), state_key: None, redacts: None, + timestamp: if body.appservice_info.is_some() { + body.timestamp + } else { + None + }, }, sender_user, &body.room_id, diff --git a/src/api/client_server/profile.rs b/src/api/client_server/profile.rs index cf1db2d7..b90ae00e 100644 --- a/src/api/client_server/profile.rs +++ b/src/api/client_server/profile.rs @@ -65,6 +65,7 @@ pub async fn set_displayname_route( unsigned: None, state_key: Some(sender_user.to_string()), redacts: None, + timestamp: None, }, room_id, )) @@ -200,6 +201,7 @@ pub async fn set_avatar_url_route( unsigned: None, state_key: Some(sender_user.to_string()), redacts: None, + timestamp: None, }, room_id, )) diff --git a/src/api/client_server/redact.rs b/src/api/client_server/redact.rs index f0603f4b..fd6ac7b0 100644 --- a/src/api/client_server/redact.rs +++ b/src/api/client_server/redact.rs @@ -44,6 +44,7 @@ pub async fn redact_event_route( unsigned: None, state_key: None, redacts: Some(body.event_id.into()), + timestamp: None, }, sender_user, &body.room_id, diff --git a/src/api/client_server/room.rs b/src/api/client_server/room.rs index 890ff9cb..9c49d6a4 100644 --- a/src/api/client_server/room.rs +++ b/src/api/client_server/room.rs @@ -230,6 +230,7 @@ pub async fn create_room_route( unsigned: None, state_key: Some("".to_owned()), redacts: None, + timestamp: None, }, sender_user, &room_id, @@ -258,6 +259,7 @@ pub async fn create_room_route( unsigned: None, state_key: Some(sender_user.to_string()), redacts: None, + timestamp: None, }, sender_user, &room_id, @@ -311,6 +313,7 @@ pub async fn create_room_route( unsigned: None, state_key: Some("".to_owned()), redacts: None, + timestamp: None, }, sender_user, &room_id, @@ -334,6 +337,7 @@ pub async fn create_room_route( unsigned: None, state_key: Some("".to_owned()), redacts: None, + timestamp: None, }, sender_user, &room_id, @@ -360,6 +364,7 @@ pub async fn create_room_route( unsigned: None, state_key: Some("".to_owned()), redacts: None, + timestamp: None, }, sender_user, &room_id, @@ -381,6 +386,7 @@ pub async fn create_room_route( unsigned: None, state_key: Some("".to_owned()), redacts: None, + timestamp: None, }, sender_user, &room_id, @@ -403,6 +409,7 @@ pub async fn create_room_route( unsigned: None, state_key: Some("".to_owned()), redacts: None, + timestamp: None, }, sender_user, &room_id, @@ -447,6 +454,7 @@ pub async fn create_room_route( unsigned: None, state_key: Some("".to_owned()), redacts: None, + timestamp: None, }, sender_user, &room_id, @@ -469,6 +477,7 @@ pub async fn create_room_route( unsigned: None, state_key: Some("".to_owned()), redacts: None, + timestamp: None, }, sender_user, &room_id, @@ -629,6 +638,7 @@ pub async fn upgrade_room_route( unsigned: None, state_key: Some("".to_owned()), redacts: None, + timestamp: None, }, sender_user, &body.room_id, @@ -730,6 +740,7 @@ pub async fn upgrade_room_route( unsigned: None, state_key: Some("".to_owned()), redacts: None, + timestamp: None, }, sender_user, &replacement_room, @@ -758,6 +769,7 @@ pub async fn upgrade_room_route( unsigned: None, state_key: Some(sender_user.to_string()), redacts: None, + timestamp: None, }, sender_user, &replacement_room, @@ -800,6 +812,7 @@ pub async fn upgrade_room_route( unsigned: None, state_key: Some("".to_owned()), redacts: None, + timestamp: None, }, sender_user, &replacement_room, @@ -850,6 +863,7 @@ pub async fn upgrade_room_route( unsigned: None, state_key: Some("".to_owned()), redacts: None, + timestamp: None, }, sender_user, &body.room_id, diff --git a/src/api/client_server/space.rs b/src/api/client_server/space.rs index e2ea8c34..0bf9c567 100644 --- a/src/api/client_server/space.rs +++ b/src/api/client_server/space.rs @@ -1,5 +1,10 @@ -use crate::{services, Result, Ruma}; -use ruma::api::client::space::get_hierarchy; +use std::str::FromStr; + +use crate::{service::rooms::spaces::PagnationToken, services, Error, Result, Ruma}; +use ruma::{ + api::client::{error::ErrorKind, space::get_hierarchy}, + UInt, +}; /// # `GET /_matrix/client/v1/rooms/{room_id}/hierarchy`` /// @@ -9,25 +14,42 @@ pub async fn get_hierarchy_route( ) -> Result { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); - let skip = body + let limit = body + .limit + .unwrap_or(UInt::from(10_u32)) + .min(UInt::from(100_u32)); + let max_depth = body + .max_depth + .unwrap_or(UInt::from(3_u32)) + .min(UInt::from(10_u32)); + + let key = body .from .as_ref() - .and_then(|s| s.parse::().ok()) - .unwrap_or(0); + .and_then(|s| PagnationToken::from_str(s).ok()); - let limit = body.limit.map_or(10, u64::from).min(100) as usize; - - let max_depth = body.max_depth.map_or(3, u64::from).min(10) as usize + 1; // +1 to skip the space room itself + // Should prevent unexpected behaviour in (bad) clients + if let Some(token) = &key { + if token.suggested_only != body.suggested_only || token.max_depth != max_depth { + return Err(Error::BadRequest( + ErrorKind::InvalidParam, + "suggested_only and max_depth cannot change on paginated requests", + )); + } + } services() .rooms .spaces - .get_hierarchy( + .get_client_hierarchy( sender_user, &body.room_id, - limit, - skip, - max_depth, + usize::try_from(limit) + .map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Limit is too great"))?, + key.map_or(vec![], |token| token.short_room_ids), + usize::try_from(max_depth).map_err(|_| { + Error::BadRequest(ErrorKind::InvalidParam, "Max depth is too great") + })?, body.suggested_only, ) .await diff --git a/src/api/client_server/state.rs b/src/api/client_server/state.rs index 8a47161c..03c6abc7 100644 --- a/src/api/client_server/state.rs +++ b/src/api/client_server/state.rs @@ -10,7 +10,7 @@ use ruma::{ room::canonical_alias::RoomCanonicalAliasEventContent, AnyStateEventContent, StateEventType, }, serde::Raw, - EventId, RoomId, UserId, + EventId, MilliSecondsSinceUnixEpoch, RoomId, UserId, }; use tracing::log::warn; @@ -32,6 +32,11 @@ pub async fn send_state_event_for_key_route( &body.event_type, &body.body.body, // Yes, I hate it too body.state_key.to_owned(), + if body.appservice_info.is_some() { + body.timestamp + } else { + None + }, ) .await?; @@ -65,6 +70,11 @@ pub async fn send_state_event_for_empty_key_route( &body.event_type.to_string().into(), &body.body.body, body.state_key.to_owned(), + if body.appservice_info.is_some() { + body.timestamp + } else { + None + }, ) .await?; @@ -190,6 +200,7 @@ async fn send_state_event_for_key_helper( event_type: &StateEventType, json: &Raw, state_key: String, + timestamp: Option, ) -> Result> { let sender_user = sender; @@ -243,6 +254,7 @@ async fn send_state_event_for_key_helper( unsigned: None, state_key: Some(state_key), redacts: None, + timestamp, }, sender_user, room_id, diff --git a/src/api/client_server/unversioned.rs b/src/api/client_server/unversioned.rs index 7706afee..6ddc1327 100644 --- a/src/api/client_server/unversioned.rs +++ b/src/api/client_server/unversioned.rs @@ -27,7 +27,10 @@ pub async fn get_supported_versions_route( "v1.4".to_owned(), "v1.5".to_owned(), ], - unstable_features: BTreeMap::from_iter([("org.matrix.e2e_cross_signing".to_owned(), true)]), + unstable_features: BTreeMap::from_iter([ + ("org.matrix.e2e_cross_signing".to_owned(), true), + ("org.matrix.msc3916.stable".to_owned(), true), + ]), }; Ok(resp) diff --git a/src/api/ruma_wrapper/axum.rs b/src/api/ruma_wrapper/axum.rs index 047f7dcf..2c5da21b 100644 --- a/src/api/ruma_wrapper/axum.rs +++ b/src/api/ruma_wrapper/axum.rs @@ -7,8 +7,11 @@ use axum::{ response::{IntoResponse, Response}, RequestExt, RequestPartsExt, }; -use axum_extra::headers::authorization::Bearer; -use axum_extra::{headers::Authorization, typed_header::TypedHeaderRejectionReason, TypedHeader}; +use axum_extra::{ + headers::{authorization::Bearer, Authorization}, + typed_header::TypedHeaderRejectionReason, + TypedHeader, +}; use bytes::{BufMut, BytesMut}; use http::{Request, StatusCode}; use ruma::{ @@ -186,7 +189,7 @@ where let origin_signatures = BTreeMap::from_iter([( x_matrix.key.clone(), - CanonicalJsonValue::String(x_matrix.sig), + CanonicalJsonValue::String(x_matrix.sig.to_string()), )]); let signatures = BTreeMap::from_iter([( diff --git a/src/api/server_server.rs b/src/api/server_server.rs index 605a4672..f8afcf39 100644 --- a/src/api/server_server.rs +++ b/src/api/server_server.rs @@ -2,17 +2,25 @@ use crate::{ api::client_server::{self, claim_keys_helper, get_keys_helper}, - service::pdu::{gen_event_id_canonical_json, PduBuilder}, + service::{ + globals::SigningKeys, + media::FileMeta, + pdu::{gen_event_id_canonical_json, PduBuilder}, + }, services, utils, Error, PduEvent, Result, Ruma, }; use axum::{response::IntoResponse, Json}; +use axum_extra::headers::{CacheControl, Header}; use get_profile_information::v1::ProfileField; -use http::header::{HeaderValue, AUTHORIZATION}; +use http::header::AUTHORIZATION; use ruma::{ api::{ client::error::{Error as RumaError, ErrorKind}, federation::{ + authenticated_media::{ + get_content, get_content_thumbnail, Content, ContentMetadata, FileOrLocation, + }, authorization::get_event_authorization, backfill::get_backfill, device::get_devices::{self, v1::UserDevice}, @@ -26,6 +34,7 @@ use ruma::{ membership::{create_invite, create_join_event, prepare_join_event}, openid::get_openid_userinfo, query::{get_profile_information, get_room_information}, + space::get_hierarchy, transactions::{ edu::{DeviceListUpdateContent, DirectDeviceContent, Edu, SigningKeyUpdateContent}, send_transaction_message, @@ -94,13 +103,6 @@ impl FedDest { } } - fn into_uri_string(self) -> String { - match self { - Self::Literal(addr) => addr.to_string(), - Self::Named(host, ref port) => host + port, - } - } - fn hostname(&self) -> String { match &self { Self::Literal(addr) => addr.ip().to_string(), @@ -136,8 +138,6 @@ where debug!("Preparing to send request to {destination}"); - let mut write_destination_to_cache = false; - let cached_result = services() .globals .actual_destination_cache @@ -146,14 +146,63 @@ where .get(destination) .cloned(); - let (actual_destination, host) = if let Some(result) = cached_result { - result + let actual_destination = if let Some(DestinationResponse { + actual_destination, + dest_type, + }) = cached_result + { + match dest_type { + DestType::IsIpOrHasPort => actual_destination, + DestType::LookupFailed { + well_known_retry, + well_known_backoff_mins, + } => { + if well_known_retry < Instant::now() { + find_actual_destination(destination, None, false, Some(well_known_backoff_mins)) + .await + } else { + actual_destination + } + } + + DestType::WellKnown { expires } => { + if expires < Instant::now() { + find_actual_destination(destination, None, false, None).await + } else { + actual_destination + } + } + DestType::WellKnownSrv { + srv_expires, + well_known_expires, + well_known_host, + } => { + if well_known_expires < Instant::now() { + find_actual_destination(destination, None, false, None).await + } else if srv_expires < Instant::now() { + find_actual_destination(destination, Some(well_known_host), true, None).await + } else { + actual_destination + } + } + DestType::Srv { + well_known_retry, + well_known_backoff_mins, + srv_expires, + } => { + if well_known_retry < Instant::now() { + find_actual_destination(destination, None, false, Some(well_known_backoff_mins)) + .await + } else if srv_expires < Instant::now() { + find_actual_destination(destination, None, true, Some(well_known_backoff_mins)) + .await + } else { + actual_destination + } + } + } } else { - write_destination_to_cache = true; - - let result = find_actual_destination(destination).await; - - (result.0, result.1.into_uri_string()) + find_actual_destination(destination, None, false, None).await }; let actual_destination_str = actual_destination.clone().into_https_string(); @@ -162,7 +211,7 @@ where .try_into_http_request::>( &actual_destination_str, SendAccessToken::IfRequired(""), - &[MatrixVersion::V1_4], + &[MatrixVersion::V1_11], ) .map_err(|e| { warn!( @@ -226,13 +275,14 @@ where for s in signature_server { http_request.headers_mut().insert( AUTHORIZATION, - HeaderValue::from_str(&format!( + format!( "X-Matrix origin=\"{}\",destination=\"{}\",key=\"{}\",sig=\"{}\"", services().globals.server_name(), destination, s.0, s.1 - )) + ) + .try_into() .unwrap(), ); } @@ -290,21 +340,10 @@ where if status == 200 { debug!("Parsing response bytes from {destination}"); let response = T::IncomingResponse::try_from_http_response(http_response); - if response.is_ok() && write_destination_to_cache { - services() - .globals - .actual_destination_cache - .write() - .await - .insert( - OwnedServerName::from(destination), - (actual_destination, host), - ); - } response.map_err(|e| { warn!( - "Invalid 200 response from {} on: {} {}", + "Invalid 200 response from {} on: {} {:?}", &destination, url, e ); Error::BadServerResponse("Server returned bad 200 response.") @@ -345,142 +384,225 @@ fn add_port_to_hostname(destination_str: &str) -> FedDest { FedDest::Named(host.to_owned(), port.to_owned()) } -/// Returns: actual_destination, host header -/// Implemented according to the specification at +#[derive(Clone)] +pub struct DestinationResponse { + pub actual_destination: FedDest, + pub dest_type: DestType, +} + +#[derive(Clone)] +pub enum DestType { + WellKnownSrv { + srv_expires: Instant, + well_known_expires: Instant, + well_known_host: String, + }, + WellKnown { + expires: Instant, + }, + Srv { + srv_expires: Instant, + well_known_retry: Instant, + well_known_backoff_mins: u16, + }, + IsIpOrHasPort, + LookupFailed { + well_known_retry: Instant, + well_known_backoff_mins: u16, + }, +} + +/// Implemented according to the specification at /// Numbers in comments below refer to bullet points in linked section of specification -async fn find_actual_destination(destination: &'_ ServerName) -> (FedDest, FedDest) { +async fn find_actual_destination( + destination: &'_ ServerName, + // The host used to potentially lookup SRV records against, only used when only_request_srv is true + well_known_dest: Option, + // Should be used when only the SRV lookup has expired + only_request_srv: bool, + // The backoff time for the last well known failure, if any + well_known_backoff_mins: Option, +) -> FedDest { debug!("Finding actual destination for {destination}"); - let destination_str = destination.as_str().to_owned(); - let mut hostname = destination_str.clone(); - let actual_destination = match get_ip_with_port(&destination_str) { - Some(host_port) => { - debug!("1: IP literal with provided or default port"); - host_port - } - None => { - if let Some(pos) = destination_str.find(':') { - debug!("2: Hostname with included port"); - let (host, port) = destination_str.split_at(pos); - FedDest::Named(host.to_owned(), port.to_owned()) + let destination_str = destination.to_string(); + let next_backoff_mins = well_known_backoff_mins + // Errors are recommended to be cached for up to an hour + .map(|mins| (mins * 2).min(60)) + .unwrap_or(1); + + let (actual_destination, dest_type) = if only_request_srv { + let destination_str = well_known_dest.unwrap_or(destination_str); + let (dest, expires) = get_srv_destination(destination_str).await; + let well_known_retry = + Instant::now() + Duration::from_secs((60 * next_backoff_mins).into()); + ( + dest, + if let Some(expires) = expires { + DestType::Srv { + well_known_backoff_mins: next_backoff_mins, + srv_expires: expires, + + well_known_retry, + } } else { - debug!("Requesting well known for {destination}"); - match request_well_known(destination.as_str()).await { - Some(delegated_hostname) => { - debug!("3: A .well-known file is available"); - hostname = add_port_to_hostname(&delegated_hostname).into_uri_string(); - match get_ip_with_port(&delegated_hostname) { - Some(host_and_port) => host_and_port, // 3.1: IP literal in .well-known file - None => { - if let Some(pos) = delegated_hostname.find(':') { - debug!("3.2: Hostname with port in .well-known file"); - let (host, port) = delegated_hostname.split_at(pos); - FedDest::Named(host.to_owned(), port.to_owned()) - } else { - debug!("Delegated hostname has no port in this branch"); - if let Some(hostname_override) = - query_srv_record(&delegated_hostname).await - { - debug!("3.3: SRV lookup successful"); - let force_port = hostname_override.port(); - - if let Ok(override_ip) = services() - .globals - .dns_resolver() - .lookup_ip(hostname_override.hostname()) - .await - { - services() - .globals - .tls_name_override - .write() - .unwrap() - .insert( - delegated_hostname.clone(), - ( - override_ip.iter().collect(), - force_port.unwrap_or(8448), - ), - ); - } else { - warn!("Using SRV record, but could not resolve to IP"); - } - - if let Some(port) = force_port { - FedDest::Named(delegated_hostname, format!(":{port}")) - } else { - add_port_to_hostname(&delegated_hostname) - } + DestType::LookupFailed { + well_known_retry, + well_known_backoff_mins: next_backoff_mins, + } + }, + ) + } else { + match get_ip_with_port(&destination_str) { + Some(host_port) => { + debug!("1: IP literal with provided or default port"); + (host_port, DestType::IsIpOrHasPort) + } + None => { + if let Some(pos) = destination_str.find(':') { + debug!("2: Hostname with included port"); + let (host, port) = destination_str.split_at(pos); + ( + FedDest::Named(host.to_owned(), port.to_owned()), + DestType::IsIpOrHasPort, + ) + } else { + debug!("Requesting well known for {destination_str}"); + match request_well_known(destination_str.as_str()).await { + Some((delegated_hostname, timestamp)) => { + debug!("3: A .well-known file is available"); + match get_ip_with_port(&delegated_hostname) { + // 3.1: IP literal in .well-known file + Some(host_and_port) => { + (host_and_port, DestType::WellKnown { expires: timestamp }) + } + None => { + if let Some(pos) = delegated_hostname.find(':') { + debug!("3.2: Hostname with port in .well-known file"); + let (host, port) = delegated_hostname.split_at(pos); + ( + FedDest::Named(host.to_owned(), port.to_owned()), + DestType::WellKnown { expires: timestamp }, + ) } else { - debug!("3.4: No SRV records, just use the hostname from .well-known"); - add_port_to_hostname(&delegated_hostname) + debug!("Delegated hostname has no port in this branch"); + let (dest, srv_expires) = + get_srv_destination(delegated_hostname.clone()).await; + ( + dest, + if let Some(srv_expires) = srv_expires { + DestType::WellKnownSrv { + srv_expires, + well_known_expires: timestamp, + well_known_host: delegated_hostname, + } + } else { + DestType::WellKnown { expires: timestamp } + }, + ) } } } } - } - None => { - debug!("4: No .well-known or an error occured"); - match query_srv_record(&destination_str).await { - Some(hostname_override) => { - debug!("4: SRV record found"); - let force_port = hostname_override.port(); - - if let Ok(override_ip) = services() - .globals - .dns_resolver() - .lookup_ip(hostname_override.hostname()) - .await - { - services() - .globals - .tls_name_override - .write() - .unwrap() - .insert( - hostname.clone(), - ( - override_ip.iter().collect(), - force_port.unwrap_or(8448), - ), - ); + None => { + debug!("4: No .well-known or an error occured"); + let (dest, expires) = get_srv_destination(destination_str).await; + let well_known_retry = Instant::now() + + Duration::from_secs((60 * next_backoff_mins).into()); + ( + dest, + if let Some(expires) = expires { + DestType::Srv { + srv_expires: expires, + well_known_retry, + well_known_backoff_mins: next_backoff_mins, + } } else { - warn!("Using SRV record, but could not resolve to IP"); - } - - if let Some(port) = force_port { - FedDest::Named(hostname.clone(), format!(":{port}")) - } else { - add_port_to_hostname(&hostname) - } - } - None => { - debug!("5: No SRV record found"); - add_port_to_hostname(&destination_str) - } + DestType::LookupFailed { + well_known_retry, + well_known_backoff_mins: next_backoff_mins, + } + }, + ) } } } } } }; + debug!("Actual destination: {actual_destination:?}"); - // Can't use get_ip_with_port here because we don't want to add a port - // to an IP address if it wasn't specified - let hostname = if let Ok(addr) = hostname.parse::() { - FedDest::Literal(addr) - } else if let Ok(addr) = hostname.parse::() { - FedDest::Named(addr.to_string(), ":8448".to_owned()) - } else if let Some(pos) = hostname.find(':') { - let (host, port) = hostname.split_at(pos); - FedDest::Named(host.to_owned(), port.to_owned()) - } else { - FedDest::Named(hostname, ":8448".to_owned()) + let response = DestinationResponse { + actual_destination, + dest_type, }; - (actual_destination, hostname) + + services() + .globals + .actual_destination_cache + .write() + .await + .insert(destination.to_owned(), response.clone()); + + response.actual_destination } -async fn query_given_srv_record(record: &str) -> Option { +/// Looks up the SRV records for federation usage +/// +/// If no timestamp is returned, that means no SRV record was found +async fn get_srv_destination(delegated_hostname: String) -> (FedDest, Option) { + if let Some((hostname_override, timestamp)) = query_srv_record(&delegated_hostname).await { + debug!("SRV lookup successful"); + let force_port = hostname_override.port(); + + if let Ok(override_ip) = services() + .globals + .dns_resolver() + .lookup_ip(hostname_override.hostname()) + .await + { + services() + .globals + .tls_name_override + .write() + .unwrap() + .insert( + delegated_hostname.clone(), + (override_ip.iter().collect(), force_port.unwrap_or(8448)), + ); + } else { + // Removing in case there was previously a SRV record + services() + .globals + .tls_name_override + .write() + .unwrap() + .remove(&delegated_hostname); + warn!("Using SRV record, but could not resolve to IP"); + } + + if let Some(port) = force_port { + ( + FedDest::Named(delegated_hostname, format!(":{port}")), + Some(timestamp), + ) + } else { + (add_port_to_hostname(&delegated_hostname), Some(timestamp)) + } + } else { + // Removing in case there was previously a SRV record + services() + .globals + .tls_name_override + .write() + .unwrap() + .remove(&delegated_hostname); + debug!("No SRV records found"); + (add_port_to_hostname(&delegated_hostname), None) + } +} + +async fn query_given_srv_record(record: &str) -> Option<(FedDest, Instant)> { services() .globals .dns_resolver() @@ -488,16 +610,19 @@ async fn query_given_srv_record(record: &str) -> Option { .await .map(|srv| { srv.iter().next().map(|result| { - FedDest::Named( - result.target().to_string().trim_end_matches('.').to_owned(), - format!(":{}", result.port()), + ( + FedDest::Named( + result.target().to_string().trim_end_matches('.').to_owned(), + format!(":{}", result.port()), + ), + srv.as_lookup().valid_until(), ) }) }) .unwrap_or(None) } -async fn query_srv_record(hostname: &'_ str) -> Option { +async fn query_srv_record(hostname: &'_ str) -> Option<(FedDest, Instant)> { let hostname = hostname.trim_end_matches('.'); if let Some(host_port) = query_given_srv_record(&format!("_matrix-fed._tcp.{hostname}.")).await @@ -508,7 +633,7 @@ async fn query_srv_record(hostname: &'_ str) -> Option { } } -async fn request_well_known(destination: &str) -> Option { +async fn request_well_known(destination: &str) -> Option<(String, Instant)> { let response = services() .globals .default_client() @@ -516,14 +641,40 @@ async fn request_well_known(destination: &str) -> Option { .send() .await; debug!("Got well known response"); - if let Err(e) = &response { - debug!("Well known error: {e:?}"); - return None; - } - let text = response.ok()?.text().await; + let response = match response { + Err(e) => { + debug!("Well known error: {e:?}"); + return None; + } + Ok(r) => r, + }; + + let mut headers = response.headers().values(); + + let cache_for = CacheControl::decode(&mut headers) + .ok() + .and_then(|cc| { + // Servers should respect the cache control headers present on the response, or use a sensible default when headers are not present. + if cc.no_store() || cc.no_cache() { + Some(Duration::ZERO) + } else { + cc.max_age() + // Servers should additionally impose a maximum cache time for responses: 48 hours is recommended. + .map(|age| age.min(Duration::from_secs(60 * 60 * 48))) + } + }) + // The recommended sensible default is 24 hours. + .unwrap_or_else(|| Duration::from_secs(60 * 60 * 24)); + + let text = response.text().await; debug!("Got well known response text"); - let body: serde_json::Value = serde_json::from_str(&text.ok()?).ok()?; - Some(body.get("m.server")?.as_str()?.to_owned()) + + let host = || { + let body: serde_json::Value = serde_json::from_str(&text.ok()?).ok()?; + body.get("m.server")?.as_str().map(ToOwned::to_owned) + }; + + host().map(|host| (host, Instant::now() + cache_for)) } /// # `GET /_matrix/federation/v1/version` @@ -663,17 +814,78 @@ pub fn parse_incoming_pdu( let (event_id, value) = match gen_event_id_canonical_json(pdu, &room_version_id) { Ok(t) => t, - Err(_) => { + Err(e) => { // Event could not be converted to canonical json - return Err(Error::BadRequest( - ErrorKind::InvalidParam, - "Could not convert event to canonical json.", - )); + return Err(e); } }; Ok((event_id, value, room_id)) } +/// Attempts to parse and append PDU to timeline. +/// If no event ID is returned, then the PDU was failed to be parsed. +/// If the Ok(()) is returned, then the PDU was successfully appended to the timeline. +async fn handle_pdu_in_transaction( + origin: &ServerName, + pub_key_map: &RwLock>, + pdu: &RawJsonValue, +) -> (Option, Result<()>) { + let (event_id, value, room_id) = match parse_incoming_pdu(pdu) { + Ok(t) => t, + Err(e) => { + warn!("Could not parse PDU: {e}"); + warn!("Full PDU: {:?}", &pdu); + return (None, Err(Error::BadServerResponse("Could not parse PDU"))); + } + }; + + // Makes use of the m.room.create event. If we cannot fetch this event, + // we must have never been in that room. + if services().rooms.state.get_room_version(&room_id).is_err() { + debug!("Room {room_id} is not known to this server"); + return ( + Some(event_id), + Err(Error::BadServerResponse("Room is not known to this server")), + ); + } + + // We do not add the event_id field to the pdu here because of signature and hashes checks + + let mutex = Arc::clone( + services() + .globals + .roomid_mutex_federation + .write() + .await + .entry(room_id.to_owned()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; + let start_time = Instant::now(); + + if let Err(e) = services() + .rooms + .event_handler + .handle_incoming_pdu(origin, &event_id, &room_id, value, true, pub_key_map) + .await + { + warn!("Error appending PDU to timeline: {}: {:?}", e, pdu); + return (Some(event_id), Err(e)); + } + + drop(mutex_lock); + + let elapsed = start_time.elapsed(); + debug!( + "Handling transaction of event {} took {}m{}s", + event_id, + elapsed.as_secs() / 60, + elapsed.as_secs() % 60 + ); + + (Some(event_id), Ok(())) +} + /// # `PUT /_matrix/federation/v1/send/{txnId}` /// /// Push EDUs and PDUs to this server. @@ -698,77 +910,11 @@ pub async fn send_transaction_message_route( // let mut auth_cache = EventMap::new(); for pdu in &body.pdus { - let value: CanonicalJsonObject = serde_json::from_str(pdu.get()).map_err(|e| { - warn!("Error parsing incoming event {:?}: {:?}", pdu, e); - Error::BadServerResponse("Invalid PDU in server response") - })?; - let room_id: OwnedRoomId = value - .get("room_id") - .and_then(|id| RoomId::parse(id.as_str()?).ok()) - .ok_or(Error::BadRequest( - ErrorKind::InvalidParam, - "Invalid room id in pdu", - ))?; + let (event_id, result) = + handle_pdu_in_transaction(sender_servername, &pub_key_map, pdu).await; - if services().rooms.state.get_room_version(&room_id).is_err() { - debug!("Server is not in room {room_id}"); - continue; - } - - let r = parse_incoming_pdu(pdu); - let (event_id, value, room_id) = match r { - Ok(t) => t, - Err(e) => { - warn!("Could not parse PDU: {e}"); - warn!("Full PDU: {:?}", &pdu); - continue; - } - }; - // We do not add the event_id field to the pdu here because of signature and hashes checks - - let mutex = Arc::clone( - services() - .globals - .roomid_mutex_federation - .write() - .await - .entry(room_id.to_owned()) - .or_default(), - ); - let mutex_lock = mutex.lock().await; - let start_time = Instant::now(); - resolved_map.insert( - event_id.clone(), - services() - .rooms - .event_handler - .handle_incoming_pdu( - sender_servername, - &event_id, - &room_id, - value, - true, - &pub_key_map, - ) - .await - .map(|_| ()), - ); - drop(mutex_lock); - - let elapsed = start_time.elapsed(); - debug!( - "Handling transaction of event {} took {}m{}s", - event_id, - elapsed.as_secs() / 60, - elapsed.as_secs() % 60 - ); - } - - for pdu in &resolved_map { - if let Err(e) = pdu.1 { - if matches!(e, Error::BadRequest(ErrorKind::NotFound, _)) { - warn!("Incoming PDU failed {:?}", pdu); - } + if let Some(event_id) = event_id { + resolved_map.insert(event_id.clone(), result.map_err(|e| e.sanitized_error())); } } @@ -937,12 +1083,7 @@ pub async fn send_transaction_message_route( } } - Ok(send_transaction_message::v1::Response { - pdus: resolved_map - .into_iter() - .map(|(e, r)| (e, r.map_err(|e| e.sanitized_error()))) - .collect(), - }) + Ok(send_transaction_message::v1::Response { pdus: resolved_map }) } /// # `GET /_matrix/federation/v1/event/{eventId}` @@ -1445,6 +1586,7 @@ pub async fn create_join_event_template_route( unsigned: None, state_key: Some(body.user_id.to_string()), redacts: None, + timestamp: None, }, &body.user_id, &body.room_id, @@ -1684,7 +1826,7 @@ pub async fn create_invite_route( let event_id = EventId::parse(format!( "${}", ruma::signatures::reference_hash(&signed_event, &body.room_version) - .expect("ruma can calculate reference hashes") + .expect("Event format validated when event was hashed") )) .expect("ruma's reference hashes are valid event ids"); @@ -1753,6 +1895,90 @@ pub async fn create_invite_route( }) } +/// # `GET /_matrix/federation/v1/media/download/{serverName}/{mediaId}` +/// +/// Load media from our server. +pub async fn get_content_route( + body: Ruma, +) -> Result { + let mxc = format!( + "mxc://{}/{}", + services().globals.server_name(), + body.media_id + ); + + if let Some(FileMeta { + content_disposition, + content_type, + file, + }) = services().media.get(mxc.clone()).await? + { + Ok(get_content::v1::Response::new( + ContentMetadata::new(), + FileOrLocation::File(Content { + file, + content_type, + content_disposition: Some(content_disposition), + }), + )) + } else { + Err(Error::BadRequest(ErrorKind::NotFound, "Media not found.")) + } +} + +/// # `GET /_matrix/federation/v1/media/thumbnail/{serverName}/{mediaId}` +/// +/// Load media thumbnail from our server or over federation. +pub async fn get_content_thumbnail_route( + body: Ruma, +) -> Result { + let mxc = format!( + "mxc://{}/{}", + services().globals.server_name(), + body.media_id + ); + + let Some(FileMeta { + file, + content_type, + content_disposition, + }) = services() + .media + .get_thumbnail( + mxc.clone(), + body.width + .try_into() + .map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Width is invalid."))?, + body.height + .try_into() + .map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Width is invalid."))?, + ) + .await? + else { + return Err(Error::BadRequest(ErrorKind::NotFound, "Media not found.")); + }; + + services() + .media + .upload_thumbnail( + mxc, + content_type.as_deref(), + body.width.try_into().expect("all UInts are valid u32s"), + body.height.try_into().expect("all UInts are valid u32s"), + &file, + ) + .await?; + + Ok(get_content_thumbnail::v1::Response::new( + ContentMetadata::new(), + FileOrLocation::File(Content { + file, + content_type, + content_disposition: Some(content_disposition), + }), + )) +} + /// # `GET /_matrix/federation/v1/user/devices/{userId}` /// /// Gets information on all devices of the user. @@ -1937,6 +2163,31 @@ pub async fn get_openid_userinfo_route( )) } +/// # `GET /_matrix/federation/v1/hierarchy/{roomId}` +/// +/// Gets the space tree in a depth-first manner to locate child rooms of a given space. +pub async fn get_hierarchy_route( + body: Ruma, +) -> Result { + let sender_servername = body + .sender_servername + .as_ref() + .expect("server is authenticated"); + + if services().rooms.metadata.exists(&body.room_id)? { + services() + .rooms + .spaces + .get_federation_hierarchy(&body.room_id, sender_servername, body.suggested_only) + .await + } else { + Err(Error::BadRequest( + ErrorKind::NotFound, + "Room does not exist.", + )) + } +} + /// # `GET /.well-known/matrix/server` /// /// Returns the federation server discovery information. diff --git a/src/database/key_value/media.rs b/src/database/key_value/media.rs index 6d05a9f4..cd67a583 100644 --- a/src/database/key_value/media.rs +++ b/src/database/key_value/media.rs @@ -1,4 +1,4 @@ -use ruma::api::client::error::ErrorKind; +use ruma::{api::client::error::ErrorKind, http_headers::ContentDisposition}; use crate::{database::KeyValueDatabase, service::{self, media::UrlPreviewData}, utils, Error, Result}; @@ -8,7 +8,7 @@ impl service::media::Data for KeyValueDatabase { mxc: String, width: u32, height: u32, - content_disposition: Option<&str>, + content_disposition: &ContentDisposition, content_type: Option<&str>, ) -> Result> { let mut key = mxc.as_bytes().to_vec(); @@ -16,12 +16,7 @@ impl service::media::Data for KeyValueDatabase { key.extend_from_slice(&width.to_be_bytes()); key.extend_from_slice(&height.to_be_bytes()); key.push(0xff); - key.extend_from_slice( - content_disposition - .as_ref() - .map(|f| f.as_bytes()) - .unwrap_or_default(), - ); + key.extend_from_slice(content_disposition.to_string().as_bytes()); key.push(0xff); key.extend_from_slice( content_type @@ -40,7 +35,7 @@ impl service::media::Data for KeyValueDatabase { mxc: String, width: u32, height: u32, - ) -> Result<(Option, Option, Vec)> { + ) -> Result<(ContentDisposition, Option, Vec)> { let mut prefix = mxc.as_bytes().to_vec(); prefix.push(0xff); prefix.extend_from_slice(&width.to_be_bytes()); @@ -68,15 +63,9 @@ impl service::media::Data for KeyValueDatabase { .next() .ok_or_else(|| Error::bad_database("Media ID in db is invalid."))?; - let content_disposition = if content_disposition_bytes.is_empty() { - None - } else { - Some( - utils::string_from_bytes(content_disposition_bytes).map_err(|_| { - Error::bad_database("Content Disposition in mediaid_file is invalid unicode.") - })?, - ) - }; + let content_disposition = content_disposition_bytes.try_into().unwrap_or_else(|_| { + ContentDisposition::new(ruma::http_headers::ContentDispositionType::Inline) + }); Ok((content_disposition, content_type, key)) } diff --git a/src/database/mod.rs b/src/database/mod.rs index 35dbb202..83632d47 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -6,6 +6,7 @@ use crate::{ SERVICES, }; use abstraction::{KeyValueDatabaseEngine, KvTree}; +use base64::{engine::general_purpose, Engine}; use directories::ProjectDirs; use lru_cache::LruCache; @@ -426,7 +427,7 @@ impl KeyValueDatabase { } // If the database has any data, perform data migrations before starting - let latest_database_version = 13; + let latest_database_version = 16; if services().users.count()? > 0 { // MIGRATIONS @@ -943,6 +944,84 @@ impl KeyValueDatabase { warn!("Migration: 12 -> 13 finished"); } + if services().globals.database_version()? < 16 { + // Reconstruct all media using the filesystem + db.mediaid_file.clear().unwrap(); + + for file in fs::read_dir(services().globals.get_media_folder()).unwrap() { + let file = file.unwrap(); + let mediaid = general_purpose::URL_SAFE_NO_PAD + .decode(file.file_name().into_string().unwrap()) + .unwrap(); + + let mut parts = mediaid.rsplit(|&b| b == 0xff); + + let mut removed_bytes = 0; + + let content_type_bytes = parts.next().unwrap(); + removed_bytes += content_type_bytes.len() + 1; + + let content_disposition_bytes = parts + .next() + .ok_or_else(|| Error::bad_database("Media ID in db is invalid."))?; + removed_bytes += content_disposition_bytes.len(); + + let mut content_disposition = + utils::string_from_bytes(content_disposition_bytes).map_err(|_| { + Error::bad_database("Content Disposition in mediaid_file is invalid.") + })?; + + if content_disposition.contains("filename=") + && !content_disposition.contains("filename=\"") + { + content_disposition = + content_disposition.replacen("filename=", "filename=\"", 1); + content_disposition.push('"'); + + let mut new_key = mediaid[..(mediaid.len() - removed_bytes)].to_vec(); + assert!(*new_key.last().unwrap() == 0xff); + + let mut shorter_key = new_key.clone(); + shorter_key.extend( + ruma::http_headers::ContentDisposition::new( + ruma::http_headers::ContentDispositionType::Inline, + ) + .to_string() + .as_bytes(), + ); + shorter_key.push(0xff); + shorter_key.extend_from_slice(content_type_bytes); + + new_key.extend_from_slice(content_disposition.to_string().as_bytes()); + new_key.push(0xff); + new_key.extend_from_slice(content_type_bytes); + + // Some file names are too long. Ignore those. + match fs::rename( + services().globals.get_media_file(&mediaid), + services().globals.get_media_file(&new_key), + ) { + Ok(_) => { + db.mediaid_file.insert(&new_key, &[])?; + } + Err(_) => { + fs::rename( + services().globals.get_media_file(&mediaid), + services().globals.get_media_file(&shorter_key), + ) + .unwrap(); + db.mediaid_file.insert(&shorter_key, &[])?; + } + } + } else { + db.mediaid_file.insert(&mediaid, &[])?; + } + } + services().globals.bump_database_version(16)?; + + warn!("Migration: 13 -> 16 finished"); + } + assert_eq!( services().globals.database_version().unwrap(), latest_database_version diff --git a/src/main.rs b/src/main.rs index 232aa2cd..831f3896 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,6 +12,7 @@ use axum_server::{bind, bind_rustls, tls_rustls::RustlsConfig, Handle as ServerH use conduit::api::{client_server, server_server}; use figment::{ providers::{Env, Format, Toml}, + value::Uncased, Figment, }; use http::{ @@ -44,6 +45,8 @@ use tikv_jemallocator::Jemalloc; #[global_allocator] static GLOBAL: Jemalloc = Jemalloc; +static SUB_TABLES: [&str; 2] = ["well_known", "tls"]; // Not doing `proxy` cause setting that with env vars would be a pain + #[tokio::main] async fn main() { clap::parse(); @@ -57,7 +60,20 @@ async fn main() { )) .nested(), ) - .merge(Env::prefixed("CONDUIT_").global()); + .merge(Env::prefixed("CONDUIT_").global().map(|k| { + let mut key: Uncased = k.into(); + + for table in SUB_TABLES { + if k.starts_with(&(table.to_owned() + "_")) { + key = Uncased::from( + table.to_owned() + "." + k[table.len() + 1..k.len()].as_str(), + ); + break; + } + } + + key + })); let config = match raw_config.extract::() { Ok(s) => s, @@ -379,11 +395,15 @@ fn routes(config: &Config) -> Router { .ruma_route(client_server::turn_server_route) .ruma_route(client_server::send_event_to_device_route) .ruma_route(client_server::get_media_config_route) + .ruma_route(client_server::get_media_config_auth_route) .ruma_route(client_server::get_media_preview_route) .ruma_route(client_server::create_content_route) .ruma_route(client_server::get_content_route) + .ruma_route(client_server::get_content_auth_route) .ruma_route(client_server::get_content_as_filename_route) + .ruma_route(client_server::get_content_as_filename_auth_route) .ruma_route(client_server::get_content_thumbnail_route) + .ruma_route(client_server::get_content_thumbnail_auth_route) .ruma_route(client_server::get_devices_route) .ruma_route(client_server::get_device_route) .ruma_route(client_server::update_device_route) @@ -441,11 +461,14 @@ fn routes(config: &Config) -> Router { .ruma_route(server_server::create_join_event_v2_route) .ruma_route(server_server::create_invite_route) .ruma_route(server_server::get_devices_route) + .ruma_route(server_server::get_content_route) + .ruma_route(server_server::get_content_thumbnail_route) .ruma_route(server_server::get_room_information_route) .ruma_route(server_server::get_profile_information_route) .ruma_route(server_server::get_keys_route) .ruma_route(server_server::claim_keys_route) .ruma_route(server_server::get_openid_userinfo_route) + .ruma_route(server_server::get_hierarchy_route) .ruma_route(server_server::well_known_server) } else { router diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index 70c63381..583bfcd1 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -246,6 +246,7 @@ impl Service { unsigned: None, state_key: None, redacts: None, + timestamp: None, }, conduit_user, &conduit_room, @@ -1105,6 +1106,7 @@ impl Service { unsigned: None, state_key: Some("".to_owned()), redacts: None, + timestamp: None, }, conduit_user, &room_id, @@ -1133,6 +1135,7 @@ impl Service { unsigned: None, state_key: Some(conduit_user.to_string()), redacts: None, + timestamp: None, }, conduit_user, &room_id, @@ -1158,6 +1161,7 @@ impl Service { unsigned: None, state_key: Some("".to_owned()), redacts: None, + timestamp: None, }, conduit_user, &room_id, @@ -1177,6 +1181,7 @@ impl Service { unsigned: None, state_key: Some("".to_owned()), redacts: None, + timestamp: None, }, conduit_user, &room_id, @@ -1198,6 +1203,7 @@ impl Service { unsigned: None, state_key: Some("".to_owned()), redacts: None, + timestamp: None, }, conduit_user, &room_id, @@ -1219,6 +1225,7 @@ impl Service { unsigned: None, state_key: Some("".to_owned()), redacts: None, + timestamp: None, }, conduit_user, &room_id, @@ -1239,6 +1246,7 @@ impl Service { unsigned: None, state_key: Some("".to_owned()), redacts: None, + timestamp: None, }, conduit_user, &room_id, @@ -1259,6 +1267,7 @@ impl Service { unsigned: None, state_key: Some("".to_owned()), redacts: None, + timestamp: None, }, conduit_user, &room_id, @@ -1283,6 +1292,7 @@ impl Service { unsigned: None, state_key: Some("".to_owned()), redacts: None, + timestamp: None, }, conduit_user, &room_id, @@ -1352,6 +1362,7 @@ impl Service { unsigned: None, state_key: Some(user_id.to_string()), redacts: None, + timestamp: None, }, conduit_user, &room_id, @@ -1378,6 +1389,7 @@ impl Service { unsigned: None, state_key: Some(user_id.to_string()), redacts: None, + timestamp: None, }, user_id, &room_id, @@ -1404,6 +1416,7 @@ impl Service { unsigned: None, state_key: Some("".to_owned()), redacts: None, + timestamp: None, }, conduit_user, &room_id, @@ -1423,6 +1436,7 @@ impl Service { unsigned: None, state_key: None, redacts: None, + timestamp: None, }, conduit_user, &room_id, diff --git a/src/service/globals/mod.rs b/src/service/globals/mod.rs index e1762054..134b797b 100644 --- a/src/service/globals/mod.rs +++ b/src/service/globals/mod.rs @@ -1,11 +1,11 @@ mod data; -pub use data::Data; -pub use data::SigningKeys; -use ruma::MilliSecondsSinceUnixEpoch; -use ruma::{serde::Base64, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedServerName, OwnedUserId}; -use ruma::{OwnedRoomAliasId, RoomAliasId}; +pub use data::{Data, SigningKeys}; +use ruma::{ + serde::Base64, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedEventId, OwnedRoomAliasId, + OwnedRoomId, OwnedServerName, OwnedUserId, RoomAliasId, +}; -use crate::api::server_server::FedDest; +use crate::api::server_server::DestinationResponse; use crate::{services, Config, Error, Result}; use futures_util::FutureExt; @@ -16,7 +16,6 @@ use ruma::{ api::{client::sync::sync_events, federation::discovery::ServerSigningKeys}, DeviceId, RoomVersionId, ServerName, UserId, }; -use std::str::FromStr; use std::{ collections::{BTreeMap, HashMap}, error::Error as StdError, @@ -25,6 +24,7 @@ use std::{ iter, net::{IpAddr, SocketAddr}, path::PathBuf, + str::FromStr, sync::{ atomic::{self, AtomicBool}, Arc, RwLock as StdRwLock, @@ -37,7 +37,7 @@ use tracing::{error, info}; use base64::{engine::general_purpose, Engine as _}; -type WellKnownMap = HashMap; +type WellKnownMap = HashMap; type TlsNameMap = HashMap, u16)>; type RateLimitState = (Instant, u32); // Time if last failed try, number of failed tries type SyncHandle = ( diff --git a/src/service/media/data.rs b/src/service/media/data.rs index 6e13cad0..1d0f87ab 100644 --- a/src/service/media/data.rs +++ b/src/service/media/data.rs @@ -1,3 +1,5 @@ +use ruma::http_headers::ContentDisposition; + use crate::Result; pub trait Data: Send + Sync { @@ -6,7 +8,7 @@ pub trait Data: Send + Sync { mxc: String, width: u32, height: u32, - content_disposition: Option<&str>, + content_disposition: &ContentDisposition, content_type: Option<&str>, ) -> Result>; @@ -16,7 +18,7 @@ pub trait Data: Send + Sync { mxc: String, width: u32, height: u32, - ) -> Result<(Option, Option, Vec)>; + ) -> Result<(ContentDisposition, Option, Vec)>; fn remove_url_preview( &self, diff --git a/src/service/media/mod.rs b/src/service/media/mod.rs index 3b02b919..d0d98def 100644 --- a/src/service/media/mod.rs +++ b/src/service/media/mod.rs @@ -7,6 +7,7 @@ use std::{ }; pub use data::Data; +use ruma::http_headers::{ContentDisposition, ContentDispositionType}; use crate::{services, Result}; use image::imageops::FilterType; @@ -19,7 +20,7 @@ use tokio::{ use serde::Serialize; pub struct FileMeta { - pub content_disposition: Option, + pub content_disposition: ContentDisposition, pub content_type: Option, pub file: Vec, } @@ -68,14 +69,17 @@ impl Service { pub async fn create( &self, mxc: String, - content_disposition: Option<&str>, + content_disposition: Option, content_type: Option<&str>, file: &[u8], ) -> Result<()> { + let content_disposition = + content_disposition.unwrap_or(ContentDisposition::new(ContentDispositionType::Inline)); + // Width, Height = 0 if it's not a thumbnail let key = self .db - .create_file_metadata(mxc, 0, 0, content_disposition, content_type)?; + .create_file_metadata(mxc, 0, 0, &content_disposition, content_type)?; let path = services().globals.get_media_file(&key); let mut f = File::create(path).await?; @@ -88,15 +92,18 @@ impl Service { pub async fn upload_thumbnail( &self, mxc: String, - content_disposition: Option<&str>, content_type: Option<&str>, width: u32, height: u32, file: &[u8], ) -> Result<()> { - let key = - self.db - .create_file_metadata(mxc, width, height, content_disposition, content_type)?; + let key = self.db.create_file_metadata( + mxc, + width, + height, + &ContentDisposition::new(ContentDispositionType::Inline), + content_type, + )?; let path = services().globals.get_media_file(&key); let mut f = File::create(path).await?; @@ -208,22 +215,20 @@ impl Service { / u64::from(original_height) }; if use_width { - if intermediate <= u64::from(::std::u32::MAX) { + if intermediate <= u64::from(u32::MAX) { (width, intermediate as u32) } else { ( - (u64::from(width) * u64::from(::std::u32::MAX) / intermediate) - as u32, - ::std::u32::MAX, + (u64::from(width) * u64::from(u32::MAX) / intermediate) as u32, + u32::MAX, ) } - } else if intermediate <= u64::from(::std::u32::MAX) { + } else if intermediate <= u64::from(u32::MAX) { (intermediate as u32, height) } else { ( - ::std::u32::MAX, - (u64::from(height) * u64::from(::std::u32::MAX) / intermediate) - as u32, + u32::MAX, + (u64::from(height) * u64::from(u32::MAX) / intermediate) as u32, ) } }; @@ -242,7 +247,7 @@ impl Service { mxc, width, height, - content_disposition.as_deref(), + &content_disposition, content_type.as_deref(), )?; diff --git a/src/service/mod.rs b/src/service/mod.rs index ec6b69a4..cb418254 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -105,7 +105,7 @@ impl Services { }, threads: rooms::threads::Service { db }, spaces: rooms::spaces::Service { - roomid_spacechunk_cache: Mutex::new(LruCache::new(200)), + roomid_spacehierarchy_cache: Mutex::new(LruCache::new(200)), }, user: rooms::user::Service { db }, }, @@ -157,7 +157,13 @@ impl Services { .lock() .await .len(); - let roomid_spacechunk_cache = self.rooms.spaces.roomid_spacechunk_cache.lock().await.len(); + let roomid_spacehierarchy_cache = self + .rooms + .spaces + .roomid_spacehierarchy_cache + .lock() + .await + .len(); format!( "\ @@ -166,7 +172,7 @@ server_visibility_cache: {server_visibility_cache} user_visibility_cache: {user_visibility_cache} stateinfo_cache: {stateinfo_cache} lasttimelinecount_cache: {lasttimelinecount_cache} -roomid_spacechunk_cache: {roomid_spacechunk_cache}\ +roomid_spacechunk_cache: {roomid_spacehierarchy_cache}\ " ) } @@ -214,7 +220,7 @@ roomid_spacechunk_cache: {roomid_spacechunk_cache}\ if amount > 5 { self.rooms .spaces - .roomid_spacechunk_cache + .roomid_spacehierarchy_cache .lock() .await .clear(); diff --git a/src/service/pdu.rs b/src/service/pdu.rs index 6991a083..7934909b 100644 --- a/src/service/pdu.rs +++ b/src/service/pdu.rs @@ -1,5 +1,6 @@ use crate::Error; use ruma::{ + api::client::error::ErrorKind, canonical_json::redact_content_in_place, events::{ room::{member::RoomMemberEventContent, redaction::RoomRedactionEventContent}, @@ -443,7 +444,7 @@ pub(crate) fn gen_event_id_canonical_json( "${}", // Anything higher than version3 behaves the same ruma::signatures::reference_hash(&value, room_version_id) - .expect("ruma can calculate reference hashes") + .map_err(|_| Error::BadRequest(ErrorKind::BadJson, "Invalid PDU format"))? ) .try_into() .expect("ruma's reference hashes are valid event ids"); @@ -460,4 +461,8 @@ pub struct PduBuilder { pub unsigned: Option>, pub state_key: Option, pub redacts: Option>, + /// For timestamped messaging, should only be used for appservices + /// + /// Will be set to current time if None + pub timestamp: Option, } diff --git a/src/service/rooms/event_handler/mod.rs b/src/service/rooms/event_handler/mod.rs index 0bdfd4ae..0dd405c7 100644 --- a/src/service/rooms/event_handler/mod.rs +++ b/src/service/rooms/event_handler/mod.rs @@ -1477,7 +1477,7 @@ impl Service { let event_id = format!( "${}", ruma::signatures::reference_hash(&value, room_version) - .expect("ruma can calculate reference hashes") + .map_err(|_| Error::BadRequest(ErrorKind::BadJson, "Invalid PDU format"))? ); let event_id = <&EventId>::try_from(event_id.as_str()) .expect("ruma's reference hashes are valid event ids"); @@ -1687,11 +1687,6 @@ impl Service { } }; - if acl_event_content.allow.is_empty() { - // Ignore broken acl events - return Ok(()); - } - if acl_event_content.is_allowed(server_name) { Ok(()) } else { diff --git a/src/service/rooms/spaces/mod.rs b/src/service/rooms/spaces/mod.rs index a78296b6..26a40f9f 100644 --- a/src/service/rooms/spaces/mod.rs +++ b/src/service/rooms/spaces/mod.rs @@ -1,318 +1,430 @@ -use std::sync::Arc; +use std::{ + collections::VecDeque, + fmt::{Display, Formatter}, + str::FromStr, +}; use lru_cache::LruCache; use ruma::{ api::{ - client::{ - error::ErrorKind, - space::{get_hierarchy, SpaceHierarchyRoomsChunk}, + client::{self, error::ErrorKind, space::SpaceHierarchyRoomsChunk}, + federation::{ + self, + space::{SpaceHierarchyChildSummary, SpaceHierarchyParentSummary}, }, - federation, }, events::{ room::{ avatar::RoomAvatarEventContent, canonical_alias::RoomCanonicalAliasEventContent, create::RoomCreateEventContent, - guest_access::{GuestAccess, RoomGuestAccessEventContent}, - history_visibility::{HistoryVisibility, RoomHistoryVisibilityEventContent}, - join_rules::{self, AllowRule, JoinRule, RoomJoinRulesEventContent}, + join_rules::{JoinRule, RoomJoinRulesEventContent}, topic::RoomTopicEventContent, }, - space::child::SpaceChildEventContent, + space::child::{HierarchySpaceChildEvent, SpaceChildEventContent}, StateEventType, }, + serde::Raw, space::SpaceRoomJoinRule, - OwnedRoomId, RoomId, UserId, + OwnedRoomId, OwnedServerName, RoomId, ServerName, UInt, UserId, }; use tokio::sync::Mutex; +use tracing::{debug, error, info, warn}; -use tracing::{debug, error, warn}; +use crate::{services, Error, Result}; -use crate::{services, Error, PduEvent, Result}; - -pub enum CachedJoinRule { - //Simplified(SpaceRoomJoinRule), - Full(JoinRule), +pub struct CachedSpaceHierarchySummary { + summary: SpaceHierarchyParentSummary, } -pub struct CachedSpaceChunk { - chunk: SpaceHierarchyRoomsChunk, - children: Vec, - join_rule: CachedJoinRule, +pub enum SummaryAccessibility { + Accessible(Box), + Inaccessible, +} + +// Note: perhaps use some better form of token rather than just room count +#[derive(Debug, PartialEq)] +pub struct PagnationToken { + /// Path down the hierarchy of the room to start the response at, + /// excluding the root space. + pub short_room_ids: Vec, + pub limit: UInt, + pub max_depth: UInt, + pub suggested_only: bool, +} + +impl FromStr for PagnationToken { + fn from_str(value: &str) -> Result { + let mut values = value.split('_'); + + let mut pag_tok = || { + let mut rooms = vec![]; + + for room in values.next()?.split(',') { + rooms.push(u64::from_str(room).ok()?) + } + + Some(PagnationToken { + short_room_ids: rooms, + limit: UInt::from_str(values.next()?).ok()?, + max_depth: UInt::from_str(values.next()?).ok()?, + suggested_only: { + let slice = values.next()?; + + if values.next().is_none() { + if slice == "true" { + true + } else if slice == "false" { + false + } else { + None? + } + } else { + None? + } + }, + }) + }; + + if let Some(token) = pag_tok() { + Ok(token) + } else { + Err(Error::BadRequest(ErrorKind::InvalidParam, "invalid token")) + } + } + + type Err = Error; +} + +impl Display for PagnationToken { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}_{}_{}_{}", + self.short_room_ids + .iter() + .map(|b| b.to_string()) + .collect::>() + .join(","), + self.limit, + self.max_depth, + self.suggested_only + ) + } +} + +/// Identifier used to check if rooms are accessible +/// +/// None is used if you want to return the room, no matter if accessible or not +pub enum Identifier<'a> { + UserId(&'a UserId), + ServerName(&'a ServerName), } pub struct Service { - pub roomid_spacechunk_cache: Mutex>>, + pub roomid_spacehierarchy_cache: + Mutex>>, +} + +// Here because cannot implement `From` across ruma-federation-api and ruma-client-api types +impl From for SpaceHierarchyRoomsChunk { + fn from(value: CachedSpaceHierarchySummary) -> Self { + let SpaceHierarchyParentSummary { + canonical_alias, + name, + num_joined_members, + room_id, + topic, + world_readable, + guest_can_join, + avatar_url, + join_rule, + room_type, + children_state, + .. + } = value.summary; + + SpaceHierarchyRoomsChunk { + canonical_alias, + name, + num_joined_members, + room_id, + topic, + world_readable, + guest_can_join, + avatar_url, + join_rule, + room_type, + children_state, + } + } } impl Service { - pub async fn get_hierarchy( + ///Gets the response for the space hierarchy over federation request + /// + ///Panics if the room does not exist, so a check if the room exists should be done + pub async fn get_federation_hierarchy( &self, - sender_user: &UserId, room_id: &RoomId, - limit: usize, - skip: usize, - max_depth: usize, + server_name: &ServerName, suggested_only: bool, - ) -> Result { - let mut left_to_skip = skip; + ) -> Result { + match self + .get_summary_and_children_local( + &room_id.to_owned(), + Identifier::ServerName(server_name), + ) + .await? + { + Some(SummaryAccessibility::Accessible(room)) => { + let mut children = Vec::new(); + let mut inaccessible_children = Vec::new(); - let mut rooms_in_path = Vec::new(); - let mut stack = vec![vec![room_id.to_owned()]]; - let mut results = Vec::new(); + for (child, _via) in get_parent_children_via(*room.clone(), suggested_only) { + match self + .get_summary_and_children_local(&child, Identifier::ServerName(server_name)) + .await? + { + Some(SummaryAccessibility::Accessible(summary)) => { + children.push((*summary).into()); + } + Some(SummaryAccessibility::Inaccessible) => { + inaccessible_children.push(child); + } + None => (), + } + } - while let Some(current_room) = { - while stack.last().map_or(false, |s| s.is_empty()) { - stack.pop(); + Ok(federation::space::get_hierarchy::v1::Response { + room: *room, + children, + inaccessible_children, + }) } - if !stack.is_empty() { - stack.last_mut().and_then(|s| s.pop()) + Some(SummaryAccessibility::Inaccessible) => Err(Error::BadRequest( + ErrorKind::NotFound, + "The requested room is inaccessible", + )), + None => Err(Error::BadRequest( + ErrorKind::NotFound, + "The requested room was not found", + )), + } + } + + /// Gets the summary of a space using solely local information + async fn get_summary_and_children_local( + &self, + current_room: &OwnedRoomId, + identifier: Identifier<'_>, + ) -> Result> { + if let Some(cached) = self + .roomid_spacehierarchy_cache + .lock() + .await + .get_mut(¤t_room.to_owned()) + .as_ref() + { + return Ok(if let Some(cached) = cached { + if is_accessable_child( + current_room, + &cached.summary.join_rule, + &identifier, + &cached.summary.allowed_room_ids, + ) { + Some(SummaryAccessibility::Accessible(Box::new( + cached.summary.clone(), + ))) + } else { + Some(SummaryAccessibility::Inaccessible) + } } else { None - } - } { - rooms_in_path.push(current_room.clone()); - if results.len() >= limit { - break; - } + }); + } - if let Some(cached) = self - .roomid_spacechunk_cache - .lock() - .await - .get_mut(¤t_room.to_owned()) - .as_ref() - { - if let Some(cached) = cached { - let allowed = match &cached.join_rule { - //CachedJoinRule::Simplified(s) => { - //self.handle_simplified_join_rule(s, sender_user, ¤t_room)? - //} - CachedJoinRule::Full(f) => { - self.handle_join_rule(f, sender_user, ¤t_room)? - } - }; - if allowed { - if left_to_skip > 0 { - left_to_skip -= 1; - } else { - results.push(cached.chunk.clone()); - } - if rooms_in_path.len() < max_depth { - stack.push(cached.children.clone()); - } - } - } - continue; - } - - if let Some(current_shortstatehash) = services() - .rooms - .state - .get_room_shortstatehash(¤t_room)? - { - let state = services() - .rooms - .state_accessor - .state_full_ids(current_shortstatehash) - .await?; - - let mut children_ids = Vec::new(); - let mut children_pdus = Vec::new(); - for (key, id) in state { - let (event_type, state_key) = - services().rooms.short.get_statekey_from_short(key)?; - if event_type != StateEventType::SpaceChild { - continue; - } - - let pdu = services() - .rooms - .timeline - .get_pdu(&id)? - .ok_or_else(|| Error::bad_database("Event in space state not found"))?; - - if serde_json::from_str::(pdu.content.get()) - .ok() - .map(|c| c.via) - .map_or(true, |v| v.is_empty()) - { - continue; - } - - if let Ok(room_id) = OwnedRoomId::try_from(state_key) { - children_ids.push(room_id); - children_pdus.push(pdu); - } - } - - // TODO: Sort children - children_ids.reverse(); - - let chunk = self.get_room_chunk(sender_user, ¤t_room, children_pdus); - if let Ok(chunk) = chunk { - if left_to_skip > 0 { - left_to_skip -= 1; - } else { - results.push(chunk.clone()); - } - let join_rule = services() - .rooms - .state_accessor - .room_state_get(¤t_room, &StateEventType::RoomJoinRules, "")? - .map(|s| { - serde_json::from_str(s.content.get()) - .map(|c: RoomJoinRulesEventContent| c.join_rule) - .map_err(|e| { - error!("Invalid room join rule event in database: {}", e); - Error::BadDatabase("Invalid room join rule event in database.") - }) - }) - .transpose()? - .unwrap_or(JoinRule::Invite); - - self.roomid_spacechunk_cache.lock().await.insert( + Ok( + if let Some(children_pdus) = get_stripped_space_child_events(current_room).await? { + let summary = self.get_room_summary(current_room, children_pdus, identifier); + if let Ok(summary) = summary { + self.roomid_spacehierarchy_cache.lock().await.insert( current_room.clone(), - Some(CachedSpaceChunk { - chunk, - children: children_ids.clone(), - join_rule: CachedJoinRule::Full(join_rule), + Some(CachedSpaceHierarchySummary { + summary: summary.clone(), }), ); - } - if rooms_in_path.len() < max_depth { - stack.push(children_ids); + Some(SummaryAccessibility::Accessible(Box::new(summary))) + } else { + None } } else { - let server = current_room - .server_name() - .expect("Room IDs should always have a server name"); - if server == services().globals.server_name() { - continue; - } - if !results.is_empty() { - // Early return so the client can see some data already - break; - } - debug!("Asking {server} for /hierarchy"); - if let Ok(response) = services() - .sending - .send_federation_request( - server, - federation::space::get_hierarchy::v1::Request { - room_id: current_room.to_owned(), - suggested_only, - }, - ) - .await - { - warn!("Got response from {server} for /hierarchy\n{response:?}"); - let chunk = SpaceHierarchyRoomsChunk { - canonical_alias: response.room.canonical_alias, - name: response.room.name, - num_joined_members: response.room.num_joined_members, - room_id: response.room.room_id, - topic: response.room.topic, - world_readable: response.room.world_readable, - guest_can_join: response.room.guest_can_join, - avatar_url: response.room.avatar_url, - join_rule: response.room.join_rule.clone(), - room_type: response.room.room_type, - children_state: response.room.children_state, - }; - let children = response - .children - .iter() - .map(|c| c.room_id.clone()) - .collect::>(); + None + }, + ) + } - let join_rule = match response.room.join_rule { - SpaceRoomJoinRule::Invite => JoinRule::Invite, - SpaceRoomJoinRule::Knock => JoinRule::Knock, - SpaceRoomJoinRule::Private => JoinRule::Private, - SpaceRoomJoinRule::Restricted => { - JoinRule::Restricted(join_rules::Restricted { - allow: response - .room - .allowed_room_ids - .into_iter() - .map(AllowRule::room_membership) - .collect(), - }) - } - SpaceRoomJoinRule::KnockRestricted => { - JoinRule::KnockRestricted(join_rules::Restricted { - allow: response - .room - .allowed_room_ids - .into_iter() - .map(AllowRule::room_membership) - .collect(), - }) - } - SpaceRoomJoinRule::Public => JoinRule::Public, - _ => return Err(Error::BadServerResponse("Unknown join rule")), - }; - if self.handle_join_rule(&join_rule, sender_user, ¤t_room)? { - if left_to_skip > 0 { - left_to_skip -= 1; - } else { - results.push(chunk.clone()); - } - if rooms_in_path.len() < max_depth { - stack.push(children.clone()); - } - } + /// Gets the summary of a space using solely federation + async fn get_summary_and_children_federation( + &self, + current_room: &OwnedRoomId, + suggested_only: bool, + user_id: &UserId, + via: &Vec, + ) -> Result> { + for server in via { + info!("Asking {server} for /hierarchy"); + if let Ok(response) = services() + .sending + .send_federation_request( + server, + federation::space::get_hierarchy::v1::Request { + room_id: current_room.to_owned(), + suggested_only, + }, + ) + .await + { + info!("Got response from {server} for /hierarchy\n{response:?}"); + let summary = response.room.clone(); - self.roomid_spacechunk_cache.lock().await.insert( - current_room.clone(), - Some(CachedSpaceChunk { - chunk, - children, - join_rule: CachedJoinRule::Full(join_rule), - }), - ); + self.roomid_spacehierarchy_cache.lock().await.insert( + current_room.clone(), + Some(CachedSpaceHierarchySummary { + summary: summary.clone(), + }), + ); - /* TODO: - for child in response.children { - roomid_spacechunk_cache.insert( + for child in response.children { + let mut guard = self.roomid_spacehierarchy_cache.lock().await; + if !guard.contains_key(current_room) { + guard.insert( current_room.clone(), - CachedSpaceChunk { - chunk: child.chunk, - children, - join_rule, - }, + Some(CachedSpaceHierarchySummary { + summary: { + let SpaceHierarchyChildSummary { + canonical_alias, + name, + num_joined_members, + room_id, + topic, + world_readable, + guest_can_join, + avatar_url, + join_rule, + room_type, + allowed_room_ids, + } = child; + + SpaceHierarchyParentSummary { + canonical_alias, + name, + num_joined_members, + room_id: room_id.clone(), + topic, + world_readable, + guest_can_join, + avatar_url, + join_rule, + room_type, + children_state: get_stripped_space_child_events(&room_id) + .await? + .unwrap(), + allowed_room_ids, + } + }, + }), ); } - */ + } + if is_accessable_child( + current_room, + &response.room.join_rule, + &Identifier::UserId(user_id), + &response.room.allowed_room_ids, + ) { + return Ok(Some(SummaryAccessibility::Accessible(Box::new( + summary.clone(), + )))); } else { - self.roomid_spacechunk_cache - .lock() - .await - .insert(current_room.clone(), None); + return Ok(Some(SummaryAccessibility::Inaccessible)); } } } - Ok(get_hierarchy::v1::Response { - next_batch: if results.is_empty() { - None - } else { - Some((skip + results.len()).to_string()) - }, - rooms: results, - }) + self.roomid_spacehierarchy_cache + .lock() + .await + .insert(current_room.clone(), None); + + Ok(None) } - fn get_room_chunk( + /// Gets the summary of a space using either local or remote (federation) sources + async fn get_summary_and_children_client( &self, - sender_user: &UserId, - room_id: &RoomId, - children: Vec>, - ) -> Result { - Ok(SpaceHierarchyRoomsChunk { + current_room: &OwnedRoomId, + suggested_only: bool, + user_id: &UserId, + via: &Vec, + ) -> Result> { + if let Ok(Some(response)) = self + .get_summary_and_children_local(current_room, Identifier::UserId(user_id)) + .await + { + Ok(Some(response)) + } else { + self.get_summary_and_children_federation(current_room, suggested_only, user_id, via) + .await + } + } + + fn get_room_summary( + &self, + current_room: &OwnedRoomId, + children_state: Vec>, + identifier: Identifier<'_>, + ) -> Result { + let room_id: &RoomId = current_room; + + let join_rule = services() + .rooms + .state_accessor + .room_state_get(room_id, &StateEventType::RoomJoinRules, "")? + .map(|s| { + serde_json::from_str(s.content.get()) + .map(|c: RoomJoinRulesEventContent| c.join_rule) + .map_err(|e| { + error!("Invalid room join rule event in database: {}", e); + Error::BadDatabase("Invalid room join rule event in database.") + }) + }) + .transpose()? + .unwrap_or(JoinRule::Invite); + + let allowed_room_ids = services() + .rooms + .state_accessor + .allowed_room_ids(join_rule.clone()); + + if !is_accessable_child( + current_room, + &join_rule.clone().into(), + &identifier, + &allowed_room_ids, + ) { + debug!("User is not allowed to see room {room_id}"); + // This error will be caught later + return Err(Error::BadRequest( + ErrorKind::forbidden(), + "User is not allowed to see the room", + )); + } + + let join_rule = join_rule.into(); + + Ok(SpaceHierarchyParentSummary { canonical_alias: services() .rooms .state_accessor @@ -348,34 +460,8 @@ impl Service { Error::bad_database("Invalid room topic event in database.") }) })?, - world_readable: services() - .rooms - .state_accessor - .room_state_get(room_id, &StateEventType::RoomHistoryVisibility, "")? - .map_or(Ok(false), |s| { - serde_json::from_str(s.content.get()) - .map(|c: RoomHistoryVisibilityEventContent| { - c.history_visibility == HistoryVisibility::WorldReadable - }) - .map_err(|_| { - Error::bad_database( - "Invalid room history visibility event in database.", - ) - }) - })?, - guest_can_join: services() - .rooms - .state_accessor - .room_state_get(room_id, &StateEventType::RoomGuestAccess, "")? - .map_or(Ok(false), |s| { - serde_json::from_str(s.content.get()) - .map(|c: RoomGuestAccessEventContent| { - c.guest_access == GuestAccess::CanJoin - }) - .map_err(|_| { - Error::bad_database("Invalid room guest access event in database.") - }) - })?, + world_readable: services().rooms.state_accessor.world_readable(room_id)?, + guest_can_join: services().rooms.state_accessor.guest_can_join(room_id)?, avatar_url: services() .rooms .state_accessor @@ -388,33 +474,7 @@ impl Service { .transpose()? // url is now an Option so we must flatten .flatten(), - join_rule: { - let join_rule = services() - .rooms - .state_accessor - .room_state_get(room_id, &StateEventType::RoomJoinRules, "")? - .map(|s| { - serde_json::from_str(s.content.get()) - .map(|c: RoomJoinRulesEventContent| c.join_rule) - .map_err(|e| { - error!("Invalid room join rule event in database: {}", e); - Error::BadDatabase("Invalid room join rule event in database.") - }) - }) - .transpose()? - .unwrap_or(JoinRule::Invite); - - if !self.handle_join_rule(&join_rule, sender_user, room_id)? { - debug!("User is not allowed to see room {room_id}"); - // This error will be caught later - return Err(Error::BadRequest( - ErrorKind::forbidden(), - "User is not allowed to see the room", - )); - } - - self.translate_joinrule(&join_rule)? - }, + join_rule, room_type: services() .rooms .state_accessor @@ -427,79 +487,477 @@ impl Service { }) .transpose()? .and_then(|e| e.room_type), - children_state: children - .into_iter() - .map(|pdu| pdu.to_stripped_spacechild_state_event()) - .collect(), + children_state, + allowed_room_ids, }) } - fn translate_joinrule(&self, join_rule: &JoinRule) -> Result { - match join_rule { - JoinRule::Invite => Ok(SpaceRoomJoinRule::Invite), - JoinRule::Knock => Ok(SpaceRoomJoinRule::Knock), - JoinRule::Private => Ok(SpaceRoomJoinRule::Private), - JoinRule::Restricted(_) => Ok(SpaceRoomJoinRule::Restricted), - JoinRule::KnockRestricted(_) => Ok(SpaceRoomJoinRule::KnockRestricted), - JoinRule::Public => Ok(SpaceRoomJoinRule::Public), - _ => Err(Error::BadServerResponse("Unknown join rule")), - } - } - - fn handle_simplified_join_rule( + pub async fn get_client_hierarchy( &self, - join_rule: &SpaceRoomJoinRule, sender_user: &UserId, room_id: &RoomId, - ) -> Result { - let allowed = match join_rule { - SpaceRoomJoinRule::Public => true, - SpaceRoomJoinRule::Knock => true, - SpaceRoomJoinRule::Invite => services() + limit: usize, + short_room_ids: Vec, + max_depth: usize, + suggested_only: bool, + ) -> Result { + let mut parents = VecDeque::new(); + + // Don't start populating the results if we have to start at a specific room. + let mut populate_results = short_room_ids.is_empty(); + + let mut stack = vec![vec![( + room_id.to_owned(), + match room_id.server_name() { + Some(server_name) => vec![server_name.into()], + None => vec![], + }, + )]]; + + let mut results = Vec::new(); + + while let Some((current_room, via)) = { next_room_to_traverse(&mut stack, &mut parents) } { + if limit > results.len() { + match ( + self.get_summary_and_children_client( + ¤t_room, + suggested_only, + sender_user, + &via, + ) + .await?, + current_room == room_id, + ) { + (Some(SummaryAccessibility::Accessible(summary)), _) => { + let mut children: Vec<(OwnedRoomId, Vec)> = + get_parent_children_via(*summary.clone(), suggested_only) + .into_iter() + .filter(|(room, _)| parents.iter().all(|parent| parent != room)) + .rev() + .collect(); + + if populate_results { + results.push(summary_to_chunk(*summary.clone())) + } else { + children = children + .into_iter() + .rev() + .skip_while(|(room, _)| { + if let Ok(short) = services().rooms.short.get_shortroomid(room) + { + short.as_ref() != short_room_ids.get(parents.len()) + } else { + false + } + }) + .collect::>() + // skip_while doesn't implement DoubleEndedIterator, which is needed for rev + .into_iter() + .rev() + .collect(); + + if children.is_empty() { + return Err(Error::BadRequest( + ErrorKind::InvalidParam, + "Short room ids in token were not found.", + )); + } + + // We have reached the room after where we last left off + if parents.len() + 1 == short_room_ids.len() { + populate_results = true; + } + } + + if !children.is_empty() && parents.len() < max_depth { + parents.push_back(current_room.clone()); + stack.push(children); + } + // Root room in the space hierarchy, we return an error if this one fails. + } + (Some(SummaryAccessibility::Inaccessible), true) => { + return Err(Error::BadRequest( + ErrorKind::forbidden(), + "The requested room is inaccessible", + )); + } + (None, true) => { + return Err(Error::BadRequest( + ErrorKind::forbidden(), + "The requested room was not found", + )); + } + // Just ignore other unavailable rooms + (None | Some(SummaryAccessibility::Inaccessible), false) => (), + } + } else { + break; + } + } + + Ok(client::space::get_hierarchy::v1::Response { + next_batch: if let Some((room, _)) = next_room_to_traverse(&mut stack, &mut parents) { + parents.pop_front(); + parents.push_back(room); + + let mut short_room_ids = vec![]; + + for room in parents { + short_room_ids.push(services().rooms.short.get_or_create_shortroomid(&room)?); + } + + Some( + PagnationToken { + short_room_ids, + limit: UInt::new(max_depth as u64) + .expect("When sent in request it must have been valid UInt"), + max_depth: UInt::new(max_depth as u64) + .expect("When sent in request it must have been valid UInt"), + suggested_only, + } + .to_string(), + ) + } else { + None + }, + rooms: results, + }) + } +} + +fn next_room_to_traverse( + stack: &mut Vec)>>, + parents: &mut VecDeque, +) -> Option<(OwnedRoomId, Vec)> { + while stack.last().map_or(false, |s| s.is_empty()) { + stack.pop(); + parents.pop_back(); + } + + stack.last_mut().and_then(|s| s.pop()) +} + +/// Simply returns the stripped m.space.child events of a room +async fn get_stripped_space_child_events( + room_id: &RoomId, +) -> Result>>, Error> { + if let Some(current_shortstatehash) = services().rooms.state.get_room_shortstatehash(room_id)? { + let state = services() + .rooms + .state_accessor + .state_full_ids(current_shortstatehash) + .await?; + let mut children_pdus = Vec::new(); + for (key, id) in state { + let (event_type, state_key) = services().rooms.short.get_statekey_from_short(key)?; + if event_type != StateEventType::SpaceChild { + continue; + } + + let pdu = services() + .rooms + .timeline + .get_pdu(&id)? + .ok_or_else(|| Error::bad_database("Event in space state not found"))?; + + if serde_json::from_str::(pdu.content.get()) + .ok() + .map(|c| c.via) + .map_or(true, |v| v.is_empty()) + { + continue; + } + + if OwnedRoomId::try_from(state_key).is_ok() { + children_pdus.push(pdu.to_stripped_spacechild_state_event()); + } + } + Ok(Some(children_pdus)) + } else { + Ok(None) + } +} + +/// With the given identifier, checks if a room is accessable +fn is_accessable_child( + current_room: &OwnedRoomId, + join_rule: &SpaceRoomJoinRule, + identifier: &Identifier<'_>, + allowed_room_ids: &Vec, +) -> bool { + // Note: unwrap_or_default for bool means false + match identifier { + Identifier::ServerName(server_name) => { + let room_id: &RoomId = current_room; + + // Checks if ACLs allow for the server to participate + if services() + .rooms + .event_handler + .acl_check(server_name, room_id) + .is_err() + { + return false; + } + } + Identifier::UserId(user_id) => { + if services() .rooms .state_cache - .is_joined(sender_user, room_id)?, - _ => false, - }; - - Ok(allowed) - } - - fn handle_join_rule( - &self, - join_rule: &JoinRule, - sender_user: &UserId, - room_id: &RoomId, - ) -> Result { - if self.handle_simplified_join_rule( - &self.translate_joinrule(join_rule)?, - sender_user, - room_id, - )? { - return Ok(true); + .is_joined(user_id, current_room) + .unwrap_or_default() + || services() + .rooms + .state_cache + .is_invited(user_id, current_room) + .unwrap_or_default() + { + return true; + } } - - match join_rule { - JoinRule::Restricted(r) => { - for rule in &r.allow { - if let AllowRule::RoomMembership(rm) = rule { - if let Ok(true) = services() + } // Takes care of joinrules + match join_rule { + SpaceRoomJoinRule::Restricted => { + for room in allowed_room_ids { + match identifier { + Identifier::UserId(user) => { + if services() .rooms .state_cache - .is_joined(sender_user, &rm.room_id) + .is_joined(user, room) + .unwrap_or_default() { - return Ok(true); + return true; + } + } + Identifier::ServerName(server) => { + if services() + .rooms + .state_cache + .server_in_room(server, room) + .unwrap_or_default() + { + return true; } } } - - Ok(false) } - JoinRule::KnockRestricted(_) => { - // TODO: Check rules - Ok(false) - } - _ => Ok(false), + false } + SpaceRoomJoinRule::Public + | SpaceRoomJoinRule::Knock + | SpaceRoomJoinRule::KnockRestricted => true, + SpaceRoomJoinRule::Invite | SpaceRoomJoinRule::Private => false, + // Custom join rule + _ => false, + } +} + +// Here because cannot implement `From` across ruma-federation-api and ruma-client-api types +fn summary_to_chunk(summary: SpaceHierarchyParentSummary) -> SpaceHierarchyRoomsChunk { + let SpaceHierarchyParentSummary { + canonical_alias, + name, + num_joined_members, + room_id, + topic, + world_readable, + guest_can_join, + avatar_url, + join_rule, + room_type, + children_state, + .. + } = summary; + + SpaceHierarchyRoomsChunk { + canonical_alias, + name, + num_joined_members, + room_id, + topic, + world_readable, + guest_can_join, + avatar_url, + join_rule, + room_type, + children_state, + } +} + +/// Returns the children of a SpaceHierarchyParentSummary, making use of the children_state field +fn get_parent_children_via( + parent: SpaceHierarchyParentSummary, + suggested_only: bool, +) -> Vec<(OwnedRoomId, Vec)> { + parent + .children_state + .iter() + .filter_map(|raw_ce| { + raw_ce.deserialize().map_or(None, |ce| { + if suggested_only && !ce.content.suggested { + None + } else { + Some((ce.state_key, ce.content.via)) + } + }) + }) + .collect() +} + +#[cfg(test)] +mod tests { + use ruma::{ + api::federation::space::SpaceHierarchyParentSummaryInit, owned_room_id, owned_server_name, + }; + + use super::*; + + #[test] + fn get_summary_children() { + let summary: SpaceHierarchyParentSummary = SpaceHierarchyParentSummaryInit { + num_joined_members: UInt::from(1_u32), + room_id: owned_room_id!("!root:example.org"), + world_readable: true, + guest_can_join: true, + join_rule: SpaceRoomJoinRule::Public, + children_state: vec![ + serde_json::from_str( + r#"{ + "content": { + "via": [ + "example.org" + ], + "suggested": false + }, + "origin_server_ts": 1629413349153, + "sender": "@alice:example.org", + "state_key": "!foo:example.org", + "type": "m.space.child" + }"#, + ) + .unwrap(), + serde_json::from_str( + r#"{ + "content": { + "via": [ + "example.org" + ], + "suggested": true + }, + "origin_server_ts": 1629413349157, + "sender": "@alice:example.org", + "state_key": "!bar:example.org", + "type": "m.space.child" + }"#, + ) + .unwrap(), + serde_json::from_str( + r#"{ + "content": { + "via": [ + "example.org" + ] + }, + "origin_server_ts": 1629413349160, + "sender": "@alice:example.org", + "state_key": "!baz:example.org", + "type": "m.space.child" + }"#, + ) + .unwrap(), + ], + allowed_room_ids: vec![], + } + .into(); + + assert_eq!( + get_parent_children_via(summary.clone(), false), + vec![ + ( + owned_room_id!("!foo:example.org"), + vec![owned_server_name!("example.org")] + ), + ( + owned_room_id!("!bar:example.org"), + vec![owned_server_name!("example.org")] + ), + ( + owned_room_id!("!baz:example.org"), + vec![owned_server_name!("example.org")] + ) + ] + ); + assert_eq!( + get_parent_children_via(summary, true), + vec![( + owned_room_id!("!bar:example.org"), + vec![owned_server_name!("example.org")] + )] + ); + } + + #[test] + fn invalid_pagnation_tokens() { + fn token_is_err(token: &str) { + let token: Result = PagnationToken::from_str(token); + assert!(token.is_err()); + } + + token_is_err("231_2_noabool"); + token_is_err(""); + token_is_err("111_3_"); + token_is_err("foo_not_int"); + token_is_err("11_4_true_"); + token_is_err("___"); + token_is_err("__false"); + } + + #[test] + fn valid_pagnation_tokens() { + assert_eq!( + PagnationToken { + short_room_ids: vec![5383, 42934, 283, 423], + limit: UInt::from(20_u32), + max_depth: UInt::from(1_u32), + suggested_only: true + }, + PagnationToken::from_str("5383,42934,283,423_20_1_true").unwrap() + ); + + assert_eq!( + PagnationToken { + short_room_ids: vec![740], + limit: UInt::from(97_u32), + max_depth: UInt::from(10539_u32), + suggested_only: false + }, + PagnationToken::from_str("740_97_10539_false").unwrap() + ); + } + + #[test] + fn pagnation_token_to_string() { + assert_eq!( + PagnationToken { + short_room_ids: vec![740], + limit: UInt::from(97_u32), + max_depth: UInt::from(10539_u32), + suggested_only: false + } + .to_string(), + "740_97_10539_false" + ); + + assert_eq!( + PagnationToken { + short_room_ids: vec![9, 34], + limit: UInt::from(3_u32), + max_depth: UInt::from(1_u32), + suggested_only: true + } + .to_string(), + "9,34_3_1_true" + ); } } diff --git a/src/service/rooms/state/mod.rs b/src/service/rooms/state/mod.rs index f6581bb5..f5bd7e9f 100644 --- a/src/service/rooms/state/mod.rs +++ b/src/service/rooms/state/mod.rs @@ -93,7 +93,7 @@ impl Service { services() .rooms .spaces - .roomid_spacechunk_cache + .roomid_spacehierarchy_cache .lock() .await .remove(&pdu.room_id); diff --git a/src/service/rooms/state_accessor/mod.rs b/src/service/rooms/state_accessor/mod.rs index 53e3176f..f1dcb3dc 100644 --- a/src/service/rooms/state_accessor/mod.rs +++ b/src/service/rooms/state_accessor/mod.rs @@ -10,15 +10,18 @@ use ruma::{ events::{ room::{ avatar::RoomAvatarEventContent, + guest_access::{GuestAccess, RoomGuestAccessEventContent}, history_visibility::{HistoryVisibility, RoomHistoryVisibilityEventContent}, + join_rules::{AllowRule, JoinRule, RoomJoinRulesEventContent, RoomMembership}, member::{MembershipState, RoomMemberEventContent}, name::RoomNameEventContent, power_levels::{RoomPowerLevels, RoomPowerLevelsEventContent}, }, StateEventType, }, + space::SpaceRoomJoinRule, state_res::Event, - EventId, JsOption, OwnedServerName, OwnedUserId, RoomId, ServerName, UserId, + EventId, JsOption, OwnedRoomId, OwnedServerName, OwnedUserId, RoomId, ServerName, UserId, }; use serde_json::value::to_raw_value; use tokio::sync::MutexGuard; @@ -321,6 +324,7 @@ impl Service { unsigned: None, state_key: Some(target_user.into()), redacts: None, + timestamp: None, }; Ok(services() @@ -395,4 +399,70 @@ impl Service { } }) } + + /// Checks if guests are able to join a given room + pub fn guest_can_join(&self, room_id: &RoomId) -> Result { + self.room_state_get(room_id, &StateEventType::RoomGuestAccess, "")? + .map_or(Ok(false), |s| { + serde_json::from_str(s.content.get()) + .map(|c: RoomGuestAccessEventContent| c.guest_access == GuestAccess::CanJoin) + .map_err(|_| { + Error::bad_database("Invalid room guest access event in database.") + }) + }) + } + + /// Checks if guests are able to view room content without joining + pub fn world_readable(&self, room_id: &RoomId) -> Result { + self.room_state_get(room_id, &StateEventType::RoomHistoryVisibility, "")? + .map_or(Ok(false), |s| { + serde_json::from_str(s.content.get()) + .map(|c: RoomHistoryVisibilityEventContent| { + c.history_visibility == HistoryVisibility::WorldReadable + }) + .map_err(|_| { + Error::bad_database("Invalid room history visibility event in database.") + }) + }) + } + + /// Returns the join rule for a given room + pub fn get_join_rule( + &self, + current_room: &RoomId, + ) -> Result<(SpaceRoomJoinRule, Vec), Error> { + Ok(self + .room_state_get(current_room, &StateEventType::RoomJoinRules, "")? + .map(|s| { + serde_json::from_str(s.content.get()) + .map(|c: RoomJoinRulesEventContent| { + ( + c.join_rule.clone().into(), + self.allowed_room_ids(c.join_rule), + ) + }) + .map_err(|e| { + error!("Invalid room join rule event in database: {}", e); + Error::BadDatabase("Invalid room join rule event in database.") + }) + }) + .transpose()? + .unwrap_or((SpaceRoomJoinRule::Invite, vec![]))) + } + + /// Returns an empty vec if not a restricted room + pub fn allowed_room_ids(&self, join_rule: JoinRule) -> Vec { + let mut room_ids = vec![]; + if let JoinRule::Restricted(r) | JoinRule::KnockRestricted(r) = join_rule { + for rule in r.allow { + if let AllowRule::RoomMembership(RoomMembership { + room_id: membership, + }) = rule + { + room_ids.push(membership.to_owned()); + } + } + } + room_ids + } } diff --git a/src/service/rooms/state_cache/mod.rs b/src/service/rooms/state_cache/mod.rs index c108695d..1604a14a 100644 --- a/src/service/rooms/state_cache/mod.rs +++ b/src/service/rooms/state_cache/mod.rs @@ -248,11 +248,13 @@ impl Service { self.db.room_members(room_id) } + /// Returns the number of users which are currently in a room #[tracing::instrument(skip(self))] pub fn room_joined_count(&self, room_id: &RoomId) -> Result> { self.db.room_joined_count(room_id) } + /// Returns the number of users which are currently invited to a room #[tracing::instrument(skip(self))] pub fn room_invited_count(&self, room_id: &RoomId) -> Result> { self.db.room_invited_count(room_id) diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index 29d8339d..80690663 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -22,8 +22,8 @@ use ruma::{ }, push::{Action, Ruleset, Tweak}, state_res::{self, Event, RoomVersion}, - uint, user_id, CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId, - OwnedServerName, RoomId, RoomVersionId, ServerName, UserId, + uint, user_id, CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, + OwnedEventId, OwnedRoomId, OwnedServerName, RoomId, RoomVersionId, ServerName, UserId, }; use serde::Deserialize; use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; @@ -430,7 +430,7 @@ impl Service { services() .rooms .spaces - .roomid_spacechunk_cache + .roomid_spacehierarchy_cache .lock() .await .remove(&pdu.room_id); @@ -665,6 +665,7 @@ impl Service { unsigned, state_key, redacts, + timestamp, } = pdu_builder; let prev_events: Vec<_> = services() @@ -734,9 +735,9 @@ impl Service { event_id: ruma::event_id!("$thiswillbefilledinlater").into(), room_id: room_id.to_owned(), sender: sender.to_owned(), - origin_server_ts: utils::millis_since_unix_epoch() - .try_into() - .expect("time is valid"), + origin_server_ts: timestamp + .map(|ts| ts.get()) + .unwrap_or_else(|| MilliSecondsSinceUnixEpoch::now().get()), kind: event_type, content, state_key, @@ -814,7 +815,7 @@ impl Service { pdu.event_id = EventId::parse_arc(format!( "${}", ruma::signatures::reference_hash(&pdu_json, &room_version_id) - .expect("ruma can calculate reference hashes") + .expect("Event format validated when event was hashed") )) .expect("ruma's reference hashes are valid event ids");