From 9017efe45b6ba18bbf33bbd68ab7c415e79c36af Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Sat, 19 Jul 2025 20:22:29 +0100 Subject: [PATCH] feat(policy-server): Policy server following --- src/core/matrix/state_res/event_auth.rs | 7 +- .../rooms/event_handler/call_policyserv.rs | 71 +++++++++++++++++++ src/service/rooms/event_handler/mod.rs | 1 + .../event_handler/upgrade_outlier_pdu.rs | 34 ++++++--- 4 files changed, 102 insertions(+), 11 deletions(-) create mode 100644 src/service/rooms/event_handler/call_policyserv.rs diff --git a/src/core/matrix/state_res/event_auth.rs b/src/core/matrix/state_res/event_auth.rs index 5c36ce03..819d05e2 100644 --- a/src/core/matrix/state_res/event_auth.rs +++ b/src/core/matrix/state_res/event_auth.rs @@ -5,7 +5,7 @@ use futures::{ future::{OptionFuture, join3}, }; use ruma::{ - Int, OwnedUserId, RoomVersionId, UserId, + EventId, Int, OwnedUserId, RoomVersionId, UserId, events::room::{ create::RoomCreateEventContent, join_rules::{JoinRule, RoomJoinRulesEventContent}, @@ -217,8 +217,9 @@ where } /* - // TODO: In the past this code caused problems federating with synapse, maybe this has been - // resolved already. Needs testing. + // TODO: In the past this code was commented as it caused problems with Synapse. This is no + // longer the case. This needs to be implemented. + // See also: https://github.com/ruma/ruma/pull/2064 // // 2. Reject if auth_events // a. auth_events cannot have duplicate keys since it's a BTree diff --git a/src/service/rooms/event_handler/call_policyserv.rs b/src/service/rooms/event_handler/call_policyserv.rs new file mode 100644 index 00000000..4a52227d --- /dev/null +++ b/src/service/rooms/event_handler/call_policyserv.rs @@ -0,0 +1,71 @@ +use conduwuit::{ + Err, Event, PduEvent, Result, debug, implement, utils::to_canonical_object, warn, +}; +use ruma::{ + RoomId, ServerName, + api::federation::room::policy::v1::Request as PolicyRequest, + canonical_json::to_canonical_value, + events::{StateEventType, room::policy::RoomPolicyEventContent}, +}; + +/// Returns Ok if the policy server allows the event +#[implement(super::Service)] +#[tracing::instrument(skip_all, level = "debug")] +pub async fn policyserv_check(&self, pdu: &PduEvent, room_id: &RoomId) -> Result { + let Ok(policyserver) = self + .services + .state_accessor + .room_state_get_content(room_id, &StateEventType::RoomPolicy, "") + .await + .map(|c: RoomPolicyEventContent| c) + else { + return Ok(()); + }; + + let via = match policyserver.via { + | Some(ref via) => ServerName::parse(via)?, + | None => { + debug!("No policy server configured for room {room_id}"); + return Ok(()); + }, + }; + // TODO: dont do *this* + let pdu_json = self.services.timeline.get_pdu_json(pdu.event_id()).await?; + let outgoing = self + .services + .sending + .convert_to_outgoing_federation_event(pdu_json) + .await; + // let s = match serde_json::to_string(outgoing.as_ref()) { + // | Ok(s) => s, + // | Err(e) => { + // warn!("Failed to convert pdu {} to outgoing federation event: {e}", + // pdu.event_id()); return Err!(Request(InvalidParam("Failed to convert PDU + // to outgoing event."))); }, + // }; + debug!("Checking pdu {outgoing:?} for spam with policy server {via} for room {room_id}"); + let response = self + .services + .sending + .send_federation_request(via, PolicyRequest { + event_id: pdu.event_id().to_owned(), + pdu: Some(outgoing), + }) + .await; + let response = match response { + | Ok(response) => response, + | Err(e) => { + warn!("Failed to contact policy server {via} for room {room_id}: {e}"); + return Ok(()); + }, + }; + if response.recommendation == "spam" { + warn!( + "Event {} in room {room_id} was marked as spam by policy server {via}", + pdu.event_id().to_owned() + ); + return Err!(Request(Forbidden("Event was marked as spam by policy server"))); + }; + + Ok(()) +} diff --git a/src/service/rooms/event_handler/mod.rs b/src/service/rooms/event_handler/mod.rs index 4e59c207..ba5ad7e2 100644 --- a/src/service/rooms/event_handler/mod.rs +++ b/src/service/rooms/event_handler/mod.rs @@ -1,4 +1,5 @@ mod acl_check; +mod call_policyserv; mod fetch_and_handle_outliers; mod fetch_prev; mod fetch_state; diff --git a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs index 4093cb05..abb5c116 100644 --- a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs +++ b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs @@ -1,7 +1,7 @@ use std::{borrow::Borrow, collections::BTreeMap, iter::once, sync::Arc, time::Instant}; use conduwuit::{ - Err, Result, debug, debug_info, err, implement, is_equal_to, + Err, Result, debug, debug_info, err, implement, info, is_equal_to, matrix::{Event, EventTypeExt, PduEvent, StateKey, state_res}, trace, utils::stream::{BroadbandExt, ReadyExt}, @@ -47,7 +47,7 @@ where return Err!(Request(InvalidParam("Event has been soft failed"))); } - debug!("Upgrading to timeline pdu"); + debug!("Upgrading pdu {} from outlier to timeline pdu", incoming_pdu.event_id); let timer = Instant::now(); let room_version_id = get_room_version_id(create_event)?; @@ -55,7 +55,7 @@ where // backwards extremities doing all the checks in this list starting at 1. // These are not timeline events. - debug!("Resolving state at event"); + debug!("Resolving state at event {}", incoming_pdu.event_id); let mut state_at_incoming_event = if incoming_pdu.prev_events().count() == 1 { self.state_at_incoming_degree_one(&incoming_pdu).await? } else { @@ -74,7 +74,7 @@ where let room_version = to_room_version(&room_version_id); - debug!("Performing auth check"); + debug!("Performing auth check to upgrade {}", incoming_pdu.event_id); // 11. Check the auth of the event passes based on the state of the event let state_fetch_state = &state_at_incoming_event; let state_fetch = |k: StateEventType, s: StateKey| async move { @@ -84,6 +84,7 @@ where self.services.timeline.get_pdu(event_id).await.ok() }; + debug!("running auth check on {}", incoming_pdu.event_id); let auth_check = state_res::event_auth::auth_check( &room_version, &incoming_pdu, @@ -97,7 +98,7 @@ where return Err!(Request(Forbidden("Event has failed auth check with state at the event."))); } - debug!("Gathering auth events"); + debug!("Gathering auth events for {}", incoming_pdu.event_id); let auth_events = self .services .state @@ -115,6 +116,7 @@ where ready(auth_events.get(&key).map(ToOwned::to_owned)) }; + debug!("running auth check on {} with claimed state auth", incoming_pdu.event_id); let auth_check = state_res::event_auth::auth_check( &room_version, &incoming_pdu, @@ -125,8 +127,8 @@ where .map_err(|e| err!(Request(Forbidden("Auth check failed: {e:?}"))))?; // Soft fail check before doing state res - debug!("Performing soft-fail check"); - let soft_fail = match (auth_check, incoming_pdu.redacts_id(&room_version_id)) { + debug!("Performing soft-fail check on {}", incoming_pdu.event_id); + let mut soft_fail = match (auth_check, incoming_pdu.redacts_id(&room_version_id)) { | (false, _) => true, | (true, None) => false, | (true, Some(redact_id)) => @@ -219,10 +221,26 @@ where .await?; } + // 14-pre. If the event is not a state event, ask the policy server about it + if incoming_pdu.state_key.is_none() + && incoming_pdu.sender().server_name() != self.services.globals.server_name() + { + debug!("Checking policy server for event {}", incoming_pdu.event_id); + let policy = self.policyserv_check(&incoming_pdu, room_id); + if let Err(e) = policy.await { + warn!("Policy server check failed for event {}: {e}", incoming_pdu.event_id); + if !soft_fail { + soft_fail = true; + } + } + debug!("Policy server check passed for event {}", incoming_pdu.event_id); + } + // 14. Check if the event passes auth based on the "current state" of the room, // if not soft fail it if soft_fail { - debug!("Soft failing event"); + info!("Soft failing event {}", incoming_pdu.event_id); + // assert!(extremities.is_empty(), "soft_fail extremities empty"); let extremities = extremities.iter().map(Borrow::borrow); self.services