1
0
Fork 0
mirror of https://gitlab.com/famedly/conduit.git synced 2025-06-27 16:35:59 +00:00
conduit/src/api/server_server.rs
2022-12-21 10:45:26 +01:00

2250 lines
72 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

use crate::{
api::client_server::{self, claim_keys_helper, get_keys_helper},
service::pdu::{gen_event_id_canonical_json, PduBuilder},
services, utils, Error, PduEvent, Result, Ruma,
};
use axum::{response::IntoResponse, Json};
use get_profile_information::v1::ProfileField;
use http::header::{HeaderValue, AUTHORIZATION};
use ruma::{
api::{
client::error::{Error as RumaError, ErrorKind},
federation::{
authorization::get_event_authorization,
backfill::get_backfill,
device::get_devices::{self, v1::UserDevice},
directory::{get_public_rooms, get_public_rooms_filtered},
discovery::{get_server_keys, get_server_version, ServerSigningKeys, VerifyKey},
event::{get_event, get_missing_events, get_room_state, get_room_state_ids},
keys::{claim_keys, get_keys},
membership::{
create_invite,
create_join_event::{self, RoomState},
prepare_join_event,
},
query::{get_profile_information, get_room_information},
transactions::{
edu::{DeviceListUpdateContent, DirectDeviceContent, Edu, SigningKeyUpdateContent},
send_transaction_message,
},
},
EndpointError, IncomingResponse, MatrixVersion, OutgoingRequest, OutgoingResponse,
SendAccessToken,
},
directory::{Filter, RoomNetwork},
events::{
receipt::{ReceiptEvent, ReceiptEventContent, ReceiptType},
room::{
join_rules::{JoinRule, RoomJoinRulesEventContent},
member::{MembershipState, RoomMemberEventContent},
},
RoomEventType, StateEventType,
},
serde::{Base64, JsonObject, Raw},
to_device::DeviceIdOrAllDevices,
CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId,
OwnedRoomId, OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomId, ServerName, UInt,
};
use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
use std::{
collections::{BTreeMap, HashSet, VecDeque},
fmt::Debug,
mem,
net::{IpAddr, SocketAddr},
sync::{Arc, RwLock},
time::{Duration, Instant, SystemTime},
};
use tracing::{debug, error, info, warn};
/// Wraps either an literal IP address plus port, or a hostname plus complement
/// (colon-plus-port if it was specified).
///
/// Note: A `FedDest::Named` might contain an IP address in string form if there
/// was no port specified to construct a SocketAddr with.
///
/// # Examples:
/// ```rust
/// # use conduit::api::server_server::FedDest;
/// # fn main() -> Result<(), std::net::AddrParseError> {
/// FedDest::Literal("198.51.100.3:8448".parse()?);
/// FedDest::Literal("[2001:db8::4:5]:443".parse()?);
/// FedDest::Named("matrix.example.org".to_owned(), "".to_owned());
/// FedDest::Named("matrix.example.org".to_owned(), ":8448".to_owned());
/// FedDest::Named("198.51.100.5".to_owned(), "".to_owned());
/// # Ok(())
/// # }
/// ```
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum FedDest {
Literal(SocketAddr),
Named(String, String),
}
impl FedDest {
fn into_https_string(self) -> String {
match self {
Self::Literal(addr) => format!("https://{addr}"),
Self::Named(host, port) => format!("https://{host}{port}"),
}
}
fn into_uri_string(self) -> String {
match self {
Self::Literal(addr) => addr.to_string(),
Self::Named(host, ref port) => host + port,
}
}
fn hostname(&self) -> String {
match &self {
Self::Literal(addr) => addr.ip().to_string(),
Self::Named(host, _) => host.clone(),
}
}
fn port(&self) -> Option<u16> {
match &self {
Self::Literal(addr) => Some(addr.port()),
Self::Named(_, port) => port[1..].parse().ok(),
}
}
}
#[tracing::instrument(skip(request))]
pub(crate) async fn send_request<T: OutgoingRequest>(
destination: &ServerName,
request: T,
) -> Result<T::IncomingResponse>
where
T: Debug,
{
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
}
let mut write_destination_to_cache = false;
let cached_result = services()
.globals
.actual_destination_cache
.read()
.unwrap()
.get(destination)
.cloned();
let (actual_destination, host) = if let Some(result) = cached_result {
result
} else {
write_destination_to_cache = true;
let result = find_actual_destination(destination).await;
(result.0, result.1.into_uri_string())
};
let actual_destination_str = actual_destination.clone().into_https_string();
let mut http_request = request
.try_into_http_request::<Vec<u8>>(
&actual_destination_str,
SendAccessToken::IfRequired(""),
&[MatrixVersion::V1_0],
)
.map_err(|e| {
warn!(
"Failed to find destination {}: {}",
actual_destination_str, e
);
Error::BadServerResponse("Invalid destination")
})?;
let mut request_map = serde_json::Map::new();
if !http_request.body().is_empty() {
request_map.insert(
"content".to_owned(),
serde_json::from_slice(http_request.body())
.expect("body is valid json, we just created it"),
);
};
request_map.insert("method".to_owned(), T::METADATA.method.to_string().into());
request_map.insert(
"uri".to_owned(),
http_request
.uri()
.path_and_query()
.expect("all requests have a path")
.to_string()
.into(),
);
request_map.insert(
"origin".to_owned(),
services().globals.server_name().as_str().into(),
);
request_map.insert("destination".to_owned(), destination.as_str().into());
let mut request_json =
serde_json::from_value(request_map.into()).expect("valid JSON is valid BTreeMap");
ruma::signatures::sign_json(
services().globals.server_name().as_str(),
services().globals.keypair(),
&mut request_json,
)
.expect("our request json is what ruma expects");
let request_json: serde_json::Map<String, serde_json::Value> =
serde_json::from_slice(&serde_json::to_vec(&request_json).unwrap()).unwrap();
let signatures = request_json["signatures"]
.as_object()
.unwrap()
.values()
.map(|v| {
v.as_object()
.unwrap()
.iter()
.map(|(k, v)| (k, v.as_str().unwrap()))
});
for signature_server in signatures {
for s in signature_server {
http_request.headers_mut().insert(
AUTHORIZATION,
HeaderValue::from_str(&format!(
"X-Matrix origin={},key=\"{}\",sig=\"{}\"",
services().globals.server_name(),
s.0,
s.1
))
.unwrap(),
);
}
}
let reqwest_request = reqwest::Request::try_from(http_request)
.expect("all http requests are valid reqwest requests");
let url = reqwest_request.url().clone();
let response = services()
.globals
.federation_client()
.execute(reqwest_request)
.await;
match response {
Ok(mut response) => {
// reqwest::Response -> http::Response conversion
let status = response.status();
let mut http_response_builder = http::Response::builder()
.status(status)
.version(response.version());
mem::swap(
response.headers_mut(),
http_response_builder
.headers_mut()
.expect("http::response::Builder is usable"),
);
let body = response.bytes().await.unwrap_or_else(|e| {
warn!("server error {}", e);
Vec::new().into()
}); // TODO: handle timeout
if status != 200 {
warn!(
"{} {}: {}",
url,
status,
String::from_utf8_lossy(&body)
.lines()
.collect::<Vec<_>>()
.join(" ")
);
}
let http_response = http_response_builder
.body(body)
.expect("reqwest body is valid http body");
if status == 200 {
let response = T::IncomingResponse::try_from_http_response(http_response);
if response.is_ok() && write_destination_to_cache {
services()
.globals
.actual_destination_cache
.write()
.unwrap()
.insert(
OwnedServerName::from(destination),
(actual_destination, host),
);
}
response.map_err(|e| {
warn!(
"Invalid 200 response from {} on: {} {}",
&destination, url, e
);
Error::BadServerResponse("Server returned bad 200 response.")
})
} else {
Err(Error::FederationError(
destination.to_owned(),
RumaError::from_http_response(http_response),
))
}
}
Err(e) => {
warn!(
"Could not send request to {} at {}: {}",
destination, actual_destination_str, e
);
Err(e.into())
}
}
}
fn get_ip_with_port(destination_str: &str) -> Option<FedDest> {
if let Ok(destination) = destination_str.parse::<SocketAddr>() {
Some(FedDest::Literal(destination))
} else if let Ok(ip_addr) = destination_str.parse::<IpAddr>() {
Some(FedDest::Literal(SocketAddr::new(ip_addr, 8448)))
} else {
None
}
}
fn add_port_to_hostname(destination_str: &str) -> FedDest {
let (host, port) = match destination_str.find(':') {
None => (destination_str, ":8448"),
Some(pos) => destination_str.split_at(pos),
};
FedDest::Named(host.to_owned(), port.to_owned())
}
/// Returns: actual_destination, host header
/// Implemented according to the specification at https://matrix.org/docs/spec/server_server/r0.1.4#resolving-server-names
/// Numbers in comments below refer to bullet points in linked section of specification
async fn find_actual_destination(destination: &'_ ServerName) -> (FedDest, FedDest) {
let destination_str = destination.as_str().to_owned();
let mut hostname = destination_str.clone();
let actual_destination = match get_ip_with_port(&destination_str) {
Some(host_port) => {
// 1: IP literal with provided or default port
host_port
}
None => {
if let Some(pos) = destination_str.find(':') {
// 2: Hostname with included port
let (host, port) = destination_str.split_at(pos);
FedDest::Named(host.to_owned(), port.to_owned())
} else {
match request_well_known(destination.as_str()).await {
// 3: A .well-known file is available
Some(delegated_hostname) => {
hostname = add_port_to_hostname(&delegated_hostname).into_uri_string();
match get_ip_with_port(&delegated_hostname) {
Some(host_and_port) => host_and_port, // 3.1: IP literal in .well-known file
None => {
if let Some(pos) = delegated_hostname.find(':') {
// 3.2: Hostname with port in .well-known file
let (host, port) = delegated_hostname.split_at(pos);
FedDest::Named(host.to_owned(), port.to_owned())
} else {
// Delegated hostname has no port in this branch
if let Some(hostname_override) =
query_srv_record(&delegated_hostname).await
{
// 3.3: SRV lookup successful
let force_port = hostname_override.port();
if let Ok(override_ip) = services()
.globals
.dns_resolver()
.lookup_ip(hostname_override.hostname())
.await
{
services()
.globals
.tls_name_override
.write()
.unwrap()
.insert(
delegated_hostname.clone(),
(
override_ip.iter().collect(),
force_port.unwrap_or(8448),
),
);
} else {
warn!("Using SRV record, but could not resolve to IP");
}
if let Some(port) = force_port {
FedDest::Named(delegated_hostname, format!(":{port}"))
} else {
add_port_to_hostname(&delegated_hostname)
}
} else {
// 3.4: No SRV records, just use the hostname from .well-known
add_port_to_hostname(&delegated_hostname)
}
}
}
}
}
// 4: No .well-known or an error occured
None => {
match query_srv_record(&destination_str).await {
// 4: SRV record found
Some(hostname_override) => {
let force_port = hostname_override.port();
if let Ok(override_ip) = services()
.globals
.dns_resolver()
.lookup_ip(hostname_override.hostname())
.await
{
services()
.globals
.tls_name_override
.write()
.unwrap()
.insert(
hostname.clone(),
(
override_ip.iter().collect(),
force_port.unwrap_or(8448),
),
);
} else {
warn!("Using SRV record, but could not resolve to IP");
}
if let Some(port) = force_port {
FedDest::Named(hostname.clone(), format!(":{port}"))
} else {
add_port_to_hostname(&hostname)
}
}
// 5: No SRV record found
None => add_port_to_hostname(&destination_str),
}
}
}
}
}
};
// Can't use get_ip_with_port here because we don't want to add a port
// to an IP address if it wasn't specified
let hostname = if let Ok(addr) = hostname.parse::<SocketAddr>() {
FedDest::Literal(addr)
} else if let Ok(addr) = hostname.parse::<IpAddr>() {
FedDest::Named(addr.to_string(), ":8448".to_owned())
} else if let Some(pos) = hostname.find(':') {
let (host, port) = hostname.split_at(pos);
FedDest::Named(host.to_owned(), port.to_owned())
} else {
FedDest::Named(hostname, ":8448".to_owned())
};
(actual_destination, hostname)
}
async fn query_srv_record(hostname: &'_ str) -> Option<FedDest> {
if let Ok(Some(host_port)) = services()
.globals
.dns_resolver()
.srv_lookup(format!("_matrix._tcp.{hostname}"))
.await
.map(|srv| {
srv.iter().next().map(|result| {
FedDest::Named(
result.target().to_string().trim_end_matches('.').to_owned(),
format!(":{}", result.port()),
)
})
})
{
Some(host_port)
} else {
None
}
}
async fn request_well_known(destination: &str) -> Option<String> {
let body: serde_json::Value = serde_json::from_str(
&services()
.globals
.default_client()
.get(&format!("https://{destination}/.well-known/matrix/server"))
.send()
.await
.ok()?
.text()
.await
.ok()?,
)
.ok()?;
Some(body.get("m.server")?.as_str()?.to_owned())
}
/// # `GET /_matrix/federation/v1/version`
///
/// Get version information on this server.
pub async fn get_server_version_route(
_body: Ruma<get_server_version::v1::Request>,
) -> Result<get_server_version::v1::Response> {
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
}
Ok(get_server_version::v1::Response {
server: Some(get_server_version::v1::Server {
name: Some("Conduit".to_owned()),
version: Some(env!("CARGO_PKG_VERSION").to_owned()),
}),
})
}
/// # `GET /_matrix/key/v2/server`
///
/// Gets the public signing keys of this server.
///
/// - Matrix does not support invalidating public keys, so the key returned by this will be valid
/// forever.
// Response type for this endpoint is Json because we need to calculate a signature for the response
pub async fn get_server_keys_route() -> Result<impl IntoResponse> {
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
}
let mut verify_keys: BTreeMap<OwnedServerSigningKeyId, VerifyKey> = BTreeMap::new();
verify_keys.insert(
format!("ed25519:{}", services().globals.keypair().version())
.try_into()
.expect("found invalid server signing keys in DB"),
VerifyKey {
key: Base64::new(services().globals.keypair().public_key().to_vec()),
},
);
let mut response = serde_json::from_slice(
get_server_keys::v2::Response {
server_key: Raw::new(&ServerSigningKeys {
server_name: services().globals.server_name().to_owned(),
verify_keys,
old_verify_keys: BTreeMap::new(),
signatures: BTreeMap::new(),
valid_until_ts: MilliSecondsSinceUnixEpoch::from_system_time(
SystemTime::now() + Duration::from_secs(86400 * 7),
)
.expect("time is valid"),
})
.expect("static conversion, no errors"),
}
.try_into_http_response::<Vec<u8>>()
.unwrap()
.body(),
)
.unwrap();
ruma::signatures::sign_json(
services().globals.server_name().as_str(),
services().globals.keypair(),
&mut response,
)
.unwrap();
Ok(Json(response))
}
/// # `GET /_matrix/key/v2/server/{keyId}`
///
/// Gets the public signing keys of this server.
///
/// - Matrix does not support invalidating public keys, so the key returned by this will be valid
/// forever.
pub async fn get_server_keys_deprecated_route() -> impl IntoResponse {
get_server_keys_route().await
}
/// # `POST /_matrix/federation/v1/publicRooms`
///
/// Lists the public rooms on this server.
pub async fn get_public_rooms_filtered_route(
body: Ruma<get_public_rooms_filtered::v1::Request>,
) -> Result<get_public_rooms_filtered::v1::Response> {
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
}
let response = client_server::get_public_rooms_filtered_helper(
None,
body.limit,
body.since.as_deref(),
&body.filter,
&body.room_network,
)
.await?;
Ok(get_public_rooms_filtered::v1::Response {
chunk: response.chunk,
prev_batch: response.prev_batch,
next_batch: response.next_batch,
total_room_count_estimate: response.total_room_count_estimate,
})
}
/// # `GET /_matrix/federation/v1/publicRooms`
///
/// Lists the public rooms on this server.
pub async fn get_public_rooms_route(
body: Ruma<get_public_rooms::v1::Request>,
) -> Result<get_public_rooms::v1::Response> {
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
}
let response = client_server::get_public_rooms_filtered_helper(
None,
body.limit,
body.since.as_deref(),
&Filter::default(),
&RoomNetwork::Matrix,
)
.await?;
Ok(get_public_rooms::v1::Response {
chunk: response.chunk,
prev_batch: response.prev_batch,
next_batch: response.next_batch,
total_room_count_estimate: response.total_room_count_estimate,
})
}
/// # `PUT /_matrix/federation/v1/send/{txnId}`
///
/// Push EDUs and PDUs to this server.
pub async fn send_transaction_message_route(
body: Ruma<send_transaction_message::v1::Request>,
) -> Result<send_transaction_message::v1::Response> {
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
}
let sender_servername = body
.sender_servername
.as_ref()
.expect("server is authenticated");
let mut resolved_map = BTreeMap::new();
let pub_key_map = RwLock::new(BTreeMap::new());
// This is all the auth_events that have been recursively fetched so they don't have to be
// deserialized over and over again.
// TODO: make this persist across requests but not in a DB Tree (in globals?)
// TODO: This could potentially also be some sort of trie (suffix tree) like structure so
// that once an auth event is known it would know (using indexes maybe) all of the auth
// events that it references.
// let mut auth_cache = EventMap::new();
for pdu in &body.pdus {
let value: CanonicalJsonObject = serde_json::from_str(pdu.get()).map_err(|e| {
warn!("Error parsing incoming event {:?}: {:?}", pdu, e);
Error::BadServerResponse("Invalid PDU in server response")
})?;
let room_id: OwnedRoomId = match value
.get("room_id")
.and_then(|id| RoomId::parse(id.as_str()?).ok())
{
Some(id) => id,
None => {
// Event is invalid
continue;
}
};
let room_version_id = match services().rooms.state.get_room_version(&room_id) {
Ok(v) => v,
Err(_) => {
continue;
}
};
let (event_id, value) = match gen_event_id_canonical_json(pdu, &room_version_id) {
Ok(t) => t,
Err(_) => {
// Event could not be converted to canonical json
continue;
}
};
// We do not add the event_id field to the pdu here because of signature and hashes checks
services()
.rooms
.event_handler
.acl_check(sender_servername, &room_id)?;
let mutex = Arc::clone(
services()
.globals
.roomid_mutex_federation
.write()
.unwrap()
.entry(room_id.to_owned())
.or_default(),
);
let mutex_lock = mutex.lock().await;
let start_time = Instant::now();
resolved_map.insert(
event_id.clone(),
services()
.rooms
.event_handler
.handle_incoming_pdu(
sender_servername,
&event_id,
&room_id,
value,
true,
&pub_key_map,
)
.await
.map(|_| ()),
);
drop(mutex_lock);
let elapsed = start_time.elapsed();
debug!(
"Handling transaction of event {} took {}m{}s",
event_id,
elapsed.as_secs() / 60,
elapsed.as_secs() % 60
);
}
for pdu in &resolved_map {
if let Err(e) = pdu.1 {
if matches!(e, Error::BadRequest(ErrorKind::NotFound, _)) {
warn!("Incoming PDU failed {:?}", pdu);
}
}
}
for edu in body
.edus
.iter()
.filter_map(|edu| serde_json::from_str::<Edu>(edu.json().get()).ok())
{
match edu {
Edu::Presence(_) => {}
Edu::Receipt(receipt) => {
for (room_id, room_updates) in receipt.receipts {
for (user_id, user_updates) in room_updates.read {
if let Some((event_id, _)) = user_updates
.event_ids
.iter()
.filter_map(|id| {
services()
.rooms
.timeline
.get_pdu_count(id)
.ok()
.flatten()
.map(|r| (id, r))
})
.max_by_key(|(_, count)| *count)
{
let mut user_receipts = BTreeMap::new();
user_receipts.insert(user_id.clone(), user_updates.data);
let mut receipts = BTreeMap::new();
receipts.insert(ReceiptType::Read, user_receipts);
let mut receipt_content = BTreeMap::new();
receipt_content.insert(event_id.to_owned(), receipts);
let event = ReceiptEvent {
content: ReceiptEventContent(receipt_content),
room_id: room_id.clone(),
};
services()
.rooms
.edus
.read_receipt
.readreceipt_update(&user_id, &room_id, event)?;
} else {
// TODO fetch missing events
info!("No known event ids in read receipt: {:?}", user_updates);
}
}
}
}
Edu::Typing(typing) => {
if services()
.rooms
.state_cache
.is_joined(&typing.user_id, &typing.room_id)?
{
if typing.typing {
services().rooms.edus.typing.typing_add(
&typing.user_id,
&typing.room_id,
3000 + utils::millis_since_unix_epoch(),
)?;
} else {
services()
.rooms
.edus
.typing
.typing_remove(&typing.user_id, &typing.room_id)?;
}
}
}
Edu::DeviceListUpdate(DeviceListUpdateContent { user_id, .. }) => {
services().users.mark_device_key_update(&user_id)?;
}
Edu::DirectToDevice(DirectDeviceContent {
sender,
ev_type,
message_id,
messages,
}) => {
// Check if this is a new transaction id
if services()
.transaction_ids
.existing_txnid(&sender, None, &message_id)?
.is_some()
{
continue;
}
for (target_user_id, map) in &messages {
for (target_device_id_maybe, event) in map {
match target_device_id_maybe {
DeviceIdOrAllDevices::DeviceId(target_device_id) => {
services().users.add_to_device_event(
&sender,
target_user_id,
target_device_id,
&ev_type.to_string(),
event.deserialize_as().map_err(|e| {
warn!("To-Device event is invalid: {event:?} {e}");
Error::BadRequest(
ErrorKind::InvalidParam,
"Event is invalid",
)
})?,
)?
}
DeviceIdOrAllDevices::AllDevices => {
for target_device_id in
services().users.all_device_ids(target_user_id)
{
services().users.add_to_device_event(
&sender,
target_user_id,
&target_device_id?,
&ev_type.to_string(),
event.deserialize_as().map_err(|_| {
Error::BadRequest(
ErrorKind::InvalidParam,
"Event is invalid",
)
})?,
)?;
}
}
}
}
}
// Save transaction id with empty data
services()
.transaction_ids
.add_txnid(&sender, None, &message_id, &[])?;
}
Edu::SigningKeyUpdate(SigningKeyUpdateContent {
user_id,
master_key,
self_signing_key,
}) => {
if user_id.server_name() != sender_servername {
continue;
}
if let Some(master_key) = master_key {
services().users.add_cross_signing_keys(
&user_id,
&master_key,
&self_signing_key,
&None,
)?;
}
}
Edu::_Custom(_) => {}
}
}
Ok(send_transaction_message::v1::Response {
pdus: resolved_map
.into_iter()
.map(|(e, r)| (e, r.map_err(|e| e.to_string())))
.collect(),
})
}
/// # `GET /_matrix/federation/v1/event/{eventId}`
///
/// Retrieves a single event from the server.
///
/// - Only works if a user of this server is currently invited or joined the room
pub async fn get_event_route(
body: Ruma<get_event::v1::Request>,
) -> Result<get_event::v1::Response> {
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
}
let sender_servername = body
.sender_servername
.as_ref()
.expect("server is authenticated");
let event = services()
.rooms
.timeline
.get_pdu_json(&body.event_id)?
.ok_or(Error::BadRequest(ErrorKind::NotFound, "Event not found."))?;
let room_id_str = event
.get("room_id")
.and_then(|val| val.as_str())
.ok_or_else(|| Error::bad_database("Invalid event in database"))?;
let room_id = <&RoomId>::try_from(room_id_str)
.map_err(|_| Error::bad_database("Invalid room id field in event in database"))?;
if !services()
.rooms
.state_cache
.server_in_room(sender_servername, room_id)?
{
return Err(Error::BadRequest(
ErrorKind::Forbidden,
"Server is not in room",
));
}
Ok(get_event::v1::Response {
origin: services().globals.server_name().to_owned(),
origin_server_ts: MilliSecondsSinceUnixEpoch::now(),
pdu: PduEvent::convert_to_outgoing_federation_event(event),
})
}
/// # `GET /_matrix/federation/v1/backfill/<room_id>`
///
/// Retrieves events from before the sender joined the room, if the room's
/// history visibility allows.
pub async fn get_backfill_route(
body: Ruma<get_backfill::v1::Request>,
) -> Result<get_backfill::v1::Response> {
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
}
let sender_servername = body
.sender_servername
.as_ref()
.expect("server is authenticated");
info!("Got backfill request from: {}", sender_servername);
if !services()
.rooms
.state_cache
.server_in_room(sender_servername, &body.room_id)?
{
return Err(Error::BadRequest(
ErrorKind::Forbidden,
"Server is not in room.",
));
}
services()
.rooms
.event_handler
.acl_check(sender_servername, &body.room_id)?;
let origin = services().globals.server_name().to_owned();
let earliest_events = &[];
let events = get_missing_events(
sender_servername,
&body.room_id,
earliest_events,
&body.v,
body.limit,
)?;
Ok(get_backfill::v1::Response {
origin,
origin_server_ts: MilliSecondsSinceUnixEpoch::now(),
pdus: events,
})
}
/// # `POST /_matrix/federation/v1/get_missing_events/{roomId}`
///
/// Retrieves events that the sender is missing.
pub async fn get_missing_events_route(
body: Ruma<get_missing_events::v1::Request>,
) -> Result<get_missing_events::v1::Response> {
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
}
let sender_servername = body
.sender_servername
.as_ref()
.expect("server is authenticated");
if !services()
.rooms
.state_cache
.server_in_room(sender_servername, &body.room_id)?
{
return Err(Error::BadRequest(
ErrorKind::Forbidden,
"Server is not in room",
));
}
services()
.rooms
.event_handler
.acl_check(sender_servername, &body.room_id)?;
let events = get_missing_events(
sender_servername,
&body.room_id,
&body.earliest_events,
&body.latest_events,
body.limit,
)?;
Ok(get_missing_events::v1::Response { events })
}
/// Fetch events starting from `latest_events`, going backwards
/// through each event's `prev_events` until reaching the `earliest_events`.
///
/// Used by the federation /backfill and /get_missing_events routes.
fn get_missing_events(
sender_servername: &ServerName,
room_id: &RoomId,
earliest_events: &[OwnedEventId],
latest_events: &[OwnedEventId],
limit: UInt,
) -> Result<Vec<Box<RawJsonValue>>> {
let (room_members, room_errors): (Vec<_>, Vec<_>) = services()
.rooms
.state_cache
.room_members(room_id)
.partition(Result::is_ok);
// Just log errors and continue with correct users
if !room_errors.is_empty() {
warn!(?room_id, "Some errors occurred when fetching room members");
}
let current_server_members: Vec<OwnedUserId> = room_members
.into_iter()
.map(Result::unwrap)
.filter(|member| member.server_name() == sender_servername)
.collect();
let event_filter = |event_id: &EventId| {
services()
.rooms
.state_accessor
.server_can_see_event(
sender_servername,
current_server_members.as_slice(),
event_id,
)
.unwrap_or_default()
};
let pdu_filter = |pdu: &CanonicalJsonObject| {
let event_room_id = pdu
.get("room_id")
.and_then(|val| val.as_str())
.and_then(|room_id_str| <&RoomId>::try_from(room_id_str).ok());
match event_room_id {
Some(event_room_id) => {
let valid_event = event_room_id == room_id;
if !valid_event {
error!(?room_id, ?event_room_id, "An evil event detected");
}
valid_event
}
None => {
error!(?pdu, "Can't extract valid `room_id` from pdu");
false
}
}
};
#[inline]
fn get_pdu(event: &EventId) -> Option<CanonicalJsonObject> {
services()
.rooms
.timeline
.get_pdu_json(event)
.unwrap_or_default()
}
let events = linearize_previous_events(
latest_events.into_iter().cloned(),
earliest_events.into_iter().cloned(),
limit,
get_pdu,
event_filter,
pdu_filter,
);
Ok(events)
}
/// Unwinds previous events by doing a breadth-first walk from given roots
///
/// # Arguments
///
/// * `roots`: Starting point to unwind event history
/// * `excluded`: Skipped events
/// * `limit`: How many events to extract
/// * `pdu_extractor`: Closure to extract PDU for given event_id, for example, from DB.
/// * `event_filter`: Closure to filter event by it's visiblity. It may or may not hit DB.
/// * `pdu_filter`: Closure to get basic validation against malformed PDUs.
///
/// # Returns
///
/// The previous events for given roots, without any `excluded` events, up to the provided `limit`.
///
/// # Note
///
/// In matrix specification, «Server-Server API», paragraph 8 there is no mention of previous events for excluded events.
/// Therefore, algorithm below excludes **only** events itself, but allows to process their history.
fn linearize_previous_events<E, L, F, V, P>(
roots: E,
excluded: E,
limit: L,
pdu_extractor: P,
event_filter: F,
pdu_filter: V,
) -> Vec<Box<RawJsonValue>>
where
E: IntoIterator<Item = OwnedEventId>,
F: Fn(&EventId) -> bool,
L: Into<u64>,
V: Fn(&CanonicalJsonObject) -> bool,
P: Fn(&EventId) -> Option<CanonicalJsonObject>,
{
let limit = limit.into() as usize;
assert!(limit > 0, "Limit should be > 0");
#[inline]
fn get_previous_events(pdu: &CanonicalJsonObject) -> Option<Vec<OwnedEventId>> {
match pdu.get("prev_events") {
None => {
error!(?pdu, "A stored event has no 'prev_events' field");
return None;
}
Some(prev_events) => {
let val = prev_events.clone().into();
let events = serde_json::from_value::<Vec<OwnedEventId>>(val);
if let Err(error) = events {
error!(?prev_events, ?error, "Broken 'prev_events' field");
return None;
}
Some(events.unwrap_or_default())
}
}
}
let mut visited: HashSet<OwnedEventId> = Default::default();
let mut history: Vec<Box<RawJsonValue>> = Default::default();
let mut queue: VecDeque<OwnedEventId> = Default::default();
let excluded: HashSet<_> = excluded.into_iter().collect();
// Add all roots into processing queue
for root in roots {
queue.push_back(root);
}
while let Some(current_event) = queue.pop_front() {
// Return all collected events if reached limit
if history.len() >= limit {
return history;
}
// Skip an entire branch containing incorrect events
if !event_filter(&current_event) {
continue;
}
// Process PDU from a current event if it exists and valid
if let Some(pdu) = pdu_extractor(&current_event).filter(&pdu_filter) {
if !&excluded.contains(&current_event) {
history.push(PduEvent::convert_to_outgoing_federation_event(pdu.clone()));
}
// Fetch previous events, if they exists
if let Some(previous_events) = get_previous_events(&pdu) {
for previous_event in previous_events {
if !visited.contains(&previous_event) {
visited.insert(previous_event.clone());
queue.push_back(previous_event);
}
}
}
}
}
// All done, return collected events
history
}
/// # `GET /_matrix/federation/v1/event_auth/{roomId}/{eventId}`
///
/// Retrieves the auth chain for a given event.
///
/// - This does not include the event itself
pub async fn get_event_authorization_route(
body: Ruma<get_event_authorization::v1::Request>,
) -> Result<get_event_authorization::v1::Response> {
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
}
let sender_servername = body
.sender_servername
.as_ref()
.expect("server is authenticated");
if !services()
.rooms
.state_cache
.server_in_room(sender_servername, &body.room_id)?
{
return Err(Error::BadRequest(
ErrorKind::Forbidden,
"Server is not in room.",
));
}
services()
.rooms
.event_handler
.acl_check(sender_servername, &body.room_id)?;
let event = services()
.rooms
.timeline
.get_pdu_json(&body.event_id)?
.ok_or(Error::BadRequest(ErrorKind::NotFound, "Event not found."))?;
let room_id_str = event
.get("room_id")
.and_then(|val| val.as_str())
.ok_or_else(|| Error::bad_database("Invalid event in database"))?;
let room_id = <&RoomId>::try_from(room_id_str)
.map_err(|_| Error::bad_database("Invalid room id field in event in database"))?;
let auth_chain_ids = services()
.rooms
.auth_chain
.get_auth_chain(room_id, vec![Arc::from(&*body.event_id)])
.await?;
Ok(get_event_authorization::v1::Response {
auth_chain: auth_chain_ids
.filter_map(|id| services().rooms.timeline.get_pdu_json(&id).ok()?)
.map(PduEvent::convert_to_outgoing_federation_event)
.collect(),
})
}
/// # `GET /_matrix/federation/v1/state/{roomId}`
///
/// Retrieves the current state of the room.
pub async fn get_room_state_route(
body: Ruma<get_room_state::v1::Request>,
) -> Result<get_room_state::v1::Response> {
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
}
let sender_servername = body
.sender_servername
.as_ref()
.expect("server is authenticated");
if !services()
.rooms
.state_cache
.server_in_room(sender_servername, &body.room_id)?
{
return Err(Error::BadRequest(
ErrorKind::Forbidden,
"Server is not in room.",
));
}
services()
.rooms
.event_handler
.acl_check(sender_servername, &body.room_id)?;
let shortstatehash = services()
.rooms
.state_accessor
.pdu_shortstatehash(&body.event_id)?
.ok_or(Error::BadRequest(
ErrorKind::NotFound,
"Pdu state not found.",
))?;
let pdus = services()
.rooms
.state_accessor
.state_full_ids(shortstatehash)
.await?
.into_values()
.map(|id| {
PduEvent::convert_to_outgoing_federation_event(
services()
.rooms
.timeline
.get_pdu_json(&id)
.unwrap()
.unwrap(),
)
})
.collect();
let auth_chain_ids = services()
.rooms
.auth_chain
.get_auth_chain(&body.room_id, vec![Arc::from(&*body.event_id)])
.await?;
Ok(get_room_state::v1::Response {
auth_chain: auth_chain_ids
.filter_map(
|id| match services().rooms.timeline.get_pdu_json(&id).ok()? {
Some(json) => Some(PduEvent::convert_to_outgoing_federation_event(json)),
None => {
error!("Could not find event json for {id} in db.");
None
}
},
)
.collect(),
pdus,
})
}
/// # `GET /_matrix/federation/v1/state_ids/{roomId}`
///
/// Retrieves the current state of the room.
pub async fn get_room_state_ids_route(
body: Ruma<get_room_state_ids::v1::Request>,
) -> Result<get_room_state_ids::v1::Response> {
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
}
let sender_servername = body
.sender_servername
.as_ref()
.expect("server is authenticated");
if !services()
.rooms
.state_cache
.server_in_room(sender_servername, &body.room_id)?
{
return Err(Error::BadRequest(
ErrorKind::Forbidden,
"Server is not in room.",
));
}
services()
.rooms
.event_handler
.acl_check(sender_servername, &body.room_id)?;
let shortstatehash = services()
.rooms
.state_accessor
.pdu_shortstatehash(&body.event_id)?
.ok_or(Error::BadRequest(
ErrorKind::NotFound,
"Pdu state not found.",
))?;
let pdu_ids = services()
.rooms
.state_accessor
.state_full_ids(shortstatehash)
.await?
.into_values()
.map(|id| (*id).to_owned())
.collect();
let auth_chain_ids = services()
.rooms
.auth_chain
.get_auth_chain(&body.room_id, vec![Arc::from(&*body.event_id)])
.await?;
Ok(get_room_state_ids::v1::Response {
auth_chain_ids: auth_chain_ids.map(|id| (*id).to_owned()).collect(),
pdu_ids,
})
}
/// # `GET /_matrix/federation/v1/make_join/{roomId}/{userId}`
///
/// Creates a join template.
pub async fn create_join_event_template_route(
body: Ruma<prepare_join_event::v1::Request>,
) -> Result<prepare_join_event::v1::Response> {
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
}
if !services().rooms.metadata.exists(&body.room_id)? {
return Err(Error::BadRequest(
ErrorKind::NotFound,
"Room is unknown to this server.",
));
}
let sender_servername = body
.sender_servername
.as_ref()
.expect("server is authenticated");
services()
.rooms
.event_handler
.acl_check(sender_servername, &body.room_id)?;
let mutex_state = Arc::clone(
services()
.globals
.roomid_mutex_state
.write()
.unwrap()
.entry(body.room_id.to_owned())
.or_default(),
);
let state_lock = mutex_state.lock().await;
// TODO: Conduit does not implement restricted join rules yet, we always reject
let join_rules_event = services().rooms.state_accessor.room_state_get(
&body.room_id,
&StateEventType::RoomJoinRules,
"",
)?;
let join_rules_event_content: Option<RoomJoinRulesEventContent> = join_rules_event
.as_ref()
.map(|join_rules_event| {
serde_json::from_str(join_rules_event.content.get()).map_err(|e| {
warn!("Invalid join rules event: {}", e);
Error::bad_database("Invalid join rules event in db.")
})
})
.transpose()?;
if let Some(join_rules_event_content) = join_rules_event_content {
if matches!(
join_rules_event_content.join_rule,
JoinRule::Restricted { .. } | JoinRule::KnockRestricted { .. }
) {
return Err(Error::BadRequest(
ErrorKind::UnableToAuthorizeJoin,
"Conduit does not support restricted rooms yet.",
));
}
}
let room_version_id = services().rooms.state.get_room_version(&body.room_id)?;
if !body.ver.contains(&room_version_id) {
return Err(Error::BadRequest(
ErrorKind::IncompatibleRoomVersion {
room_version: room_version_id,
},
"Room version not supported.",
));
}
let content = to_raw_value(&RoomMemberEventContent {
avatar_url: None,
blurhash: None,
displayname: None,
is_direct: None,
membership: MembershipState::Join,
third_party_invite: None,
reason: None,
join_authorized_via_users_server: None,
})
.expect("member event is valid value");
let (_pdu, mut pdu_json) = services().rooms.timeline.create_hash_and_sign_event(
PduBuilder {
event_type: RoomEventType::RoomMember,
content,
unsigned: None,
state_key: Some(body.user_id.to_string()),
redacts: None,
},
&body.user_id,
&body.room_id,
&state_lock,
)?;
drop(state_lock);
pdu_json.remove("event_id");
Ok(prepare_join_event::v1::Response {
room_version: Some(room_version_id),
event: to_raw_value(&pdu_json).expect("CanonicalJson can be serialized to JSON"),
})
}
async fn create_join_event(
sender_servername: &ServerName,
room_id: &RoomId,
pdu: &RawJsonValue,
) -> Result<RoomState> {
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
}
if !services().rooms.metadata.exists(room_id)? {
return Err(Error::BadRequest(
ErrorKind::NotFound,
"Room is unknown to this server.",
));
}
services()
.rooms
.event_handler
.acl_check(sender_servername, room_id)?;
// TODO: Conduit does not implement restricted join rules yet, we always reject
let join_rules_event = services().rooms.state_accessor.room_state_get(
room_id,
&StateEventType::RoomJoinRules,
"",
)?;
let join_rules_event_content: Option<RoomJoinRulesEventContent> = join_rules_event
.as_ref()
.map(|join_rules_event| {
serde_json::from_str(join_rules_event.content.get()).map_err(|e| {
warn!("Invalid join rules event: {}", e);
Error::bad_database("Invalid join rules event in db.")
})
})
.transpose()?;
if let Some(join_rules_event_content) = join_rules_event_content {
if matches!(
join_rules_event_content.join_rule,
JoinRule::Restricted { .. } | JoinRule::KnockRestricted { .. }
) {
return Err(Error::BadRequest(
ErrorKind::UnableToAuthorizeJoin,
"Conduit does not support restricted rooms yet.",
));
}
}
// We need to return the state prior to joining, let's keep a reference to that here
let shortstatehash = services()
.rooms
.state
.get_room_shortstatehash(room_id)?
.ok_or(Error::BadRequest(
ErrorKind::NotFound,
"Pdu state not found.",
))?;
let pub_key_map = RwLock::new(BTreeMap::new());
// let mut auth_cache = EventMap::new();
// We do not add the event_id field to the pdu here because of signature and hashes checks
let room_version_id = services().rooms.state.get_room_version(room_id)?;
let (event_id, value) = match gen_event_id_canonical_json(pdu, &room_version_id) {
Ok(t) => t,
Err(_) => {
// Event could not be converted to canonical json
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Could not convert event to canonical json.",
));
}
};
let origin: OwnedServerName = serde_json::from_value(
serde_json::to_value(value.get("origin").ok_or(Error::BadRequest(
ErrorKind::InvalidParam,
"Event needs an origin field.",
))?)
.expect("CanonicalJson is valid json value"),
)
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Origin field is invalid."))?;
let mutex = Arc::clone(
services()
.globals
.roomid_mutex_federation
.write()
.unwrap()
.entry(room_id.to_owned())
.or_default(),
);
let mutex_lock = mutex.lock().await;
let pdu_id: Vec<u8> = services()
.rooms
.event_handler
.handle_incoming_pdu(&origin, &event_id, room_id, value, true, &pub_key_map)
.await?
.ok_or(Error::BadRequest(
ErrorKind::InvalidParam,
"Could not accept incoming PDU as timeline event.",
))?;
drop(mutex_lock);
let state_ids = services()
.rooms
.state_accessor
.state_full_ids(shortstatehash)
.await?;
let auth_chain_ids = services()
.rooms
.auth_chain
.get_auth_chain(room_id, state_ids.values().cloned().collect())
.await?;
let servers = services()
.rooms
.state_cache
.room_servers(room_id)
.filter_map(|r| r.ok())
.filter(|server| &**server != services().globals.server_name());
services().sending.send_pdu(servers, &pdu_id)?;
Ok(RoomState {
auth_chain: auth_chain_ids
.filter_map(|id| services().rooms.timeline.get_pdu_json(&id).ok().flatten())
.map(PduEvent::convert_to_outgoing_federation_event)
.collect(),
state: state_ids
.iter()
.filter_map(|(_, id)| services().rooms.timeline.get_pdu_json(id).ok().flatten())
.map(PduEvent::convert_to_outgoing_federation_event)
.collect(),
event: None, // TODO: handle restricted joins
})
}
/// # `PUT /_matrix/federation/v1/send_join/{roomId}/{eventId}`
///
/// Submits a signed join event.
pub async fn create_join_event_v1_route(
body: Ruma<create_join_event::v1::Request>,
) -> Result<create_join_event::v1::Response> {
let sender_servername = body
.sender_servername
.as_ref()
.expect("server is authenticated");
let room_state = create_join_event(sender_servername, &body.room_id, &body.pdu).await?;
Ok(create_join_event::v1::Response { room_state })
}
/// # `PUT /_matrix/federation/v2/send_join/{roomId}/{eventId}`
///
/// Submits a signed join event.
pub async fn create_join_event_v2_route(
body: Ruma<create_join_event::v2::Request>,
) -> Result<create_join_event::v2::Response> {
let sender_servername = body
.sender_servername
.as_ref()
.expect("server is authenticated");
let room_state = create_join_event(sender_servername, &body.room_id, &body.pdu).await?;
Ok(create_join_event::v2::Response { room_state })
}
/// # `PUT /_matrix/federation/v2/invite/{roomId}/{eventId}`
///
/// Invites a remote user to a room.
pub async fn create_invite_route(
body: Ruma<create_invite::v2::Request>,
) -> Result<create_invite::v2::Response> {
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
}
let sender_servername = body
.sender_servername
.as_ref()
.expect("server is authenticated");
services()
.rooms
.event_handler
.acl_check(sender_servername, &body.room_id)?;
if !services()
.globals
.supported_room_versions()
.contains(&body.room_version)
{
return Err(Error::BadRequest(
ErrorKind::IncompatibleRoomVersion {
room_version: body.room_version.clone(),
},
"Server does not support this room version.",
));
}
let mut signed_event = utils::to_canonical_object(&body.event)
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Invite event is invalid."))?;
ruma::signatures::hash_and_sign_event(
services().globals.server_name().as_str(),
services().globals.keypair(),
&mut signed_event,
&body.room_version,
)
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Failed to sign event."))?;
// Generate event id
let event_id = EventId::parse(format!(
"${}",
ruma::signatures::reference_hash(&signed_event, &body.room_version)
.expect("ruma can calculate reference hashes")
))
.expect("ruma's reference hashes are valid event ids");
// Add event_id back
signed_event.insert(
"event_id".to_owned(),
CanonicalJsonValue::String(event_id.to_string()),
);
let sender: OwnedUserId = serde_json::from_value(
signed_event
.get("sender")
.ok_or(Error::BadRequest(
ErrorKind::InvalidParam,
"Event had no sender field.",
))?
.clone()
.into(),
)
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "sender is not a user id."))?;
let invited_user: Box<_> = serde_json::from_value(
signed_event
.get("state_key")
.ok_or(Error::BadRequest(
ErrorKind::InvalidParam,
"Event had no state_key field.",
))?
.clone()
.into(),
)
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "state_key is not a user id."))?;
let mut invite_state = body.invite_room_state.clone();
let mut event: JsonObject = serde_json::from_str(body.event.get())
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Invalid invite event bytes."))?;
event.insert("event_id".to_owned(), "$dummy".into());
let pdu: PduEvent = serde_json::from_value(event.into()).map_err(|e| {
warn!("Invalid invite event: {}", e);
Error::BadRequest(ErrorKind::InvalidParam, "Invalid invite event.")
})?;
invite_state.push(pdu.to_stripped_state_event());
// If we are active in the room, the remote server will notify us about the join via /send
if !services()
.rooms
.state_cache
.server_in_room(services().globals.server_name(), &body.room_id)?
{
services().rooms.state_cache.update_membership(
&body.room_id,
&invited_user,
MembershipState::Invite,
&sender,
Some(invite_state),
true,
)?;
}
Ok(create_invite::v2::Response {
event: PduEvent::convert_to_outgoing_federation_event(signed_event),
})
}
/// # `GET /_matrix/federation/v1/user/devices/{userId}`
///
/// Gets information on all devices of the user.
pub async fn get_devices_route(
body: Ruma<get_devices::v1::Request>,
) -> Result<get_devices::v1::Response> {
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
}
let sender_servername = body
.sender_servername
.as_ref()
.expect("server is authenticated");
Ok(get_devices::v1::Response {
user_id: body.user_id.clone(),
stream_id: services()
.users
.get_devicelist_version(&body.user_id)?
.unwrap_or(0)
.try_into()
.expect("version will not grow that large"),
devices: services()
.users
.all_devices_metadata(&body.user_id)
.filter_map(|r| r.ok())
.filter_map(|metadata| {
Some(UserDevice {
keys: services()
.users
.get_device_keys(&body.user_id, &metadata.device_id)
.ok()??,
device_id: metadata.device_id,
device_display_name: metadata.display_name,
})
})
.collect(),
master_key: services()
.users
.get_master_key(&body.user_id, &|u| u.server_name() == sender_servername)?,
self_signing_key: services()
.users
.get_self_signing_key(&body.user_id, &|u| u.server_name() == sender_servername)?,
})
}
/// # `GET /_matrix/federation/v1/query/directory`
///
/// Resolve a room alias to a room id.
pub async fn get_room_information_route(
body: Ruma<get_room_information::v1::Request>,
) -> Result<get_room_information::v1::Response> {
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
}
let room_id = services()
.rooms
.alias
.resolve_local_alias(&body.room_alias)?
.ok_or(Error::BadRequest(
ErrorKind::NotFound,
"Room alias not found.",
))?;
Ok(get_room_information::v1::Response {
room_id,
servers: vec![services().globals.server_name().to_owned()],
})
}
/// # `GET /_matrix/federation/v1/query/profile`
///
/// Gets information on a profile.
pub async fn get_profile_information_route(
body: Ruma<get_profile_information::v1::Request>,
) -> Result<get_profile_information::v1::Response> {
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
}
let mut displayname = None;
let mut avatar_url = None;
let mut blurhash = None;
match &body.field {
Some(ProfileField::DisplayName) => {
displayname = services().users.displayname(&body.user_id)?
}
Some(ProfileField::AvatarUrl) => {
avatar_url = services().users.avatar_url(&body.user_id)?;
blurhash = services().users.blurhash(&body.user_id)?
}
// TODO: what to do with custom
Some(_) => {}
None => {
displayname = services().users.displayname(&body.user_id)?;
avatar_url = services().users.avatar_url(&body.user_id)?;
blurhash = services().users.blurhash(&body.user_id)?;
}
}
Ok(get_profile_information::v1::Response {
blurhash,
displayname,
avatar_url,
})
}
/// # `POST /_matrix/federation/v1/user/keys/query`
///
/// Gets devices and identity keys for the given users.
pub async fn get_keys_route(body: Ruma<get_keys::v1::Request>) -> Result<get_keys::v1::Response> {
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
}
let result = get_keys_helper(None, &body.device_keys, |u| {
Some(u.server_name()) == body.sender_servername.as_deref()
})
.await?;
Ok(get_keys::v1::Response {
device_keys: result.device_keys,
master_keys: result.master_keys,
self_signing_keys: result.self_signing_keys,
})
}
/// # `POST /_matrix/federation/v1/user/keys/claim`
///
/// Claims one-time keys.
pub async fn claim_keys_route(
body: Ruma<claim_keys::v1::Request>,
) -> Result<claim_keys::v1::Response> {
if !services().globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
}
let result = claim_keys_helper(&body.one_time_keys).await?;
Ok(claim_keys::v1::Response {
one_time_keys: result.one_time_keys,
})
}
#[cfg(test)]
mod tests {
use super::linearize_previous_events;
use super::{add_port_to_hostname, get_ip_with_port, FedDest};
use ruma::{CanonicalJsonObject, CanonicalJsonValue, OwnedEventId};
use serde::{Deserialize, Serialize};
use serde_json::value::RawValue;
use serde_json::Value;
use std::collections::HashMap;
#[test]
fn ips_get_default_ports() {
assert_eq!(
get_ip_with_port("1.1.1.1"),
Some(FedDest::Literal("1.1.1.1:8448".parse().unwrap()))
);
assert_eq!(
get_ip_with_port("dead:beef::"),
Some(FedDest::Literal("[dead:beef::]:8448".parse().unwrap()))
);
}
#[test]
fn ips_keep_custom_ports() {
assert_eq!(
get_ip_with_port("1.1.1.1:1234"),
Some(FedDest::Literal("1.1.1.1:1234".parse().unwrap()))
);
assert_eq!(
get_ip_with_port("[dead::beef]:8933"),
Some(FedDest::Literal("[dead::beef]:8933".parse().unwrap()))
);
}
#[test]
fn hostnames_get_default_ports() {
assert_eq!(
add_port_to_hostname("example.com"),
FedDest::Named(String::from("example.com"), String::from(":8448"))
)
}
#[test]
fn hostnames_keep_custom_ports() {
assert_eq!(
add_port_to_hostname("example.com:1337"),
FedDest::Named(String::from("example.com"), String::from(":1337"))
)
}
type PduStorage = HashMap<OwnedEventId, CanonicalJsonObject>;
#[derive(Debug, Serialize, Deserialize)]
struct MockPDU {
content: i32,
prev_events: Vec<OwnedEventId>,
}
fn mock_event_id(id: &i32) -> OwnedEventId {
const DOMAIN: &str = "canterlot.eq";
<OwnedEventId>::try_from(format!("${id}:{DOMAIN}")).unwrap()
}
fn create_graph(data: Vec<(i32, Vec<i32>)>) -> PduStorage {
data.iter()
.map(|(head, tail)| {
let key = mock_event_id(head);
let pdu = MockPDU {
content: *head,
prev_events: tail.iter().map(mock_event_id).collect(),
};
let value = serde_json::to_value(pdu).unwrap();
let value: CanonicalJsonValue = value.try_into().unwrap();
(key, value.as_object().unwrap().to_owned())
})
.collect()
}
fn mock_full_graph() -> PduStorage {
/*
(1)
__________|___________
/ / \ \
(2) (3) (10) (11)
/ \ / \ | |
(4) (5) (6) (7) (12) (13)
| | |
(8) (9) (14)
\ /
(15)
|
(16)
*/
create_graph(vec![
(1, vec![2, 3, 10, 11]),
(2, vec![4, 5]),
(3, vec![6, 7]),
(4, vec![]),
(5, vec![8]),
(6, vec![9]),
(7, vec![]),
(8, vec![15]),
(9, vec![15]),
(10, vec![12]),
(11, vec![13]),
(12, vec![]),
(13, vec![14]),
(14, vec![]),
(15, vec![16]),
(16, vec![16]),
])
}
fn extract_events_payload(events: Vec<Box<RawValue>>) -> Vec<i32> {
events
.iter()
.map(|e| serde_json::from_str(e.get()).unwrap())
.map(|p: MockPDU| p.content)
.collect()
}
#[test]
fn backfill_empty() {
let events = linearize_previous_events(
vec![],
vec![],
16u64,
|_| unreachable!(),
|_| true,
|_| true,
);
assert!(events.is_empty());
}
#[test]
fn backfill_limit() {
/*
(5) → (4) → (3) → (2) → (1) → ×
*/
let events = create_graph(vec![
(1, vec![]),
(2, vec![1]),
(3, vec![2]),
(4, vec![3]),
(5, vec![4]),
]);
let roots = vec![mock_event_id(&5)];
let result = linearize_previous_events(
roots,
vec![],
3u64,
|e| events.get(e).cloned(),
|_| true,
|_| true,
);
assert_eq!(extract_events_payload(result), vec![5, 4, 3])
}
#[test]
fn backfill_bfs() {
let events = mock_full_graph();
let roots = vec![mock_event_id(&1)];
let result = linearize_previous_events(
roots,
vec![],
100u64,
|e| events.get(e).cloned(),
|_| true,
|_| true,
);
assert_eq!(
extract_events_payload(result),
vec![1, 2, 3, 10, 11, 4, 5, 6, 7, 12, 13, 8, 9, 14, 15, 16]
)
}
#[test]
fn backfill_subgraph() {
let events = mock_full_graph();
let roots = vec![mock_event_id(&3)];
let result = linearize_previous_events(
roots,
vec![],
100u64,
|e| events.get(e).cloned(),
|_| true,
|_| true,
);
assert_eq!(extract_events_payload(result), vec![3, 6, 7, 9, 15, 16])
}
#[test]
fn backfill_two_roots() {
let events = mock_full_graph();
let roots = vec![mock_event_id(&3), mock_event_id(&11)];
let result = linearize_previous_events(
roots,
vec![],
100u64,
|e| events.get(e).cloned(),
|_| true,
|_| true,
);
assert_eq!(
extract_events_payload(result),
vec![3, 11, 6, 7, 13, 9, 14, 15, 16]
)
}
#[test]
fn backfill_exclude_events() {
let events = mock_full_graph();
let roots = vec![mock_event_id(&1)];
let excluded_events = vec![
mock_event_id(&14),
mock_event_id(&15),
mock_event_id(&16),
mock_event_id(&3),
];
let result = linearize_previous_events(
roots,
excluded_events,
100u64,
|e| events.get(e).cloned(),
|_| true,
|_| true,
);
assert_eq!(
extract_events_payload(result),
vec![1, 2, 10, 11, 4, 5, 6, 7, 12, 13, 8, 9]
)
}
#[test]
fn backfill_exclude_branch_with_evil_event() {
let events = mock_full_graph();
let roots = vec![mock_event_id(&1)];
let result = linearize_previous_events(
roots,
vec![],
100u64,
|e| events.get(e).cloned(),
|_| true,
|e| {
let value: Value = CanonicalJsonValue::Object(e.clone()).into();
let pdu: MockPDU = serde_json::from_value(value).unwrap();
pdu.content != 3
},
);
assert_eq!(
extract_events_payload(result),
vec![1, 2, 10, 11, 4, 5, 12, 13, 8, 14, 15, 16]
)
}
#[test]
fn backfill_exclude_branch_with_inaccessible_event() {
let events = mock_full_graph();
let roots = vec![mock_event_id(&1)];
let result = linearize_previous_events(
roots,
vec![],
100u64,
|e| events.get(e).cloned(),
|e| e != mock_event_id(&3),
|_| true,
);
assert_eq!(
extract_events_payload(result),
vec![1, 2, 10, 11, 4, 5, 12, 13, 8, 14, 15, 16]
)
}
}