2022-10-08 13:02:52 +02:00
mod data ;
pub use data ::Data ;
2020-12-19 16:00:11 +01:00
use std ::{
2021-08-24 19:10:31 +02:00
collections ::{ BTreeMap , HashMap , HashSet } ,
2021-02-11 13:16:14 +01:00
fmt ::Debug ,
2020-12-19 16:00:11 +01:00
sync ::Arc ,
2022-10-08 13:02:52 +02:00
time ::{ Duration , Instant } , iter ,
2020-12-19 16:00:11 +01:00
} ;
2020-09-15 16:13:54 +02:00
2021-03-15 09:48:19 +01:00
use crate ::{
2022-10-05 20:34:31 +02:00
api ::{ appservice_server , server_server } ,
services ,
utils ::{ self , calculate_hash } ,
2022-10-08 13:02:52 +02:00
Error , PduEvent , Result , Config ,
2021-03-15 09:48:19 +01:00
} ;
2020-09-23 15:23:29 +02:00
use federation ::transactions ::send_transaction_message ;
2022-01-20 11:51:31 +01:00
use futures_util ::{ stream ::FuturesUnordered , StreamExt } ;
2022-10-05 20:41:05 +02:00
2020-12-08 10:33:44 +01:00
use ruma ::{
2021-05-17 10:25:27 +02:00
api ::{
appservice ,
federation ::{
self ,
2021-08-24 19:10:31 +02:00
transactions ::edu ::{
DeviceListUpdateContent , Edu , ReceiptContent , ReceiptData , ReceiptMap ,
} ,
2021-05-17 10:25:27 +02:00
} ,
OutgoingRequest ,
} ,
2021-08-24 19:10:31 +02:00
device_id ,
2022-04-07 13:22:10 +02:00
events ::{ push_rules ::PushRulesEvent , AnySyncEphemeralRoomEvent , GlobalAccountDataEventType } ,
2021-05-20 23:46:52 +02:00
push ,
receipt ::ReceiptType ,
2021-08-24 19:10:31 +02:00
uint , MilliSecondsSinceUnixEpoch , ServerName , UInt , UserId ,
2020-12-08 10:33:44 +01:00
} ;
2021-07-14 07:07:08 +00:00
use tokio ::{
select ,
2022-10-05 20:41:05 +02:00
sync ::{ mpsc , Semaphore } ,
2021-07-14 07:07:08 +00:00
} ;
2021-07-29 08:36:01 +02:00
use tracing ::{ error , warn } ;
2021-01-26 21:54:35 -05:00
#[ derive(Clone, Debug, PartialEq, Eq, Hash) ]
pub enum OutgoingKind {
2022-03-02 23:48:01 +08:00
Appservice ( String ) ,
2022-10-08 13:02:52 +02:00
Push ( Box < UserId > , String ) , // user and pushkey
2021-01-26 21:54:35 -05:00
Normal ( Box < ServerName > ) ,
}
2021-05-12 20:04:28 +02:00
impl OutgoingKind {
2021-07-29 08:36:01 +02:00
#[ tracing::instrument(skip(self)) ]
2021-05-12 20:04:28 +02:00
pub fn get_prefix ( & self ) -> Vec < u8 > {
let mut prefix = match self {
OutgoingKind ::Appservice ( server ) = > {
let mut p = b " + " . to_vec ( ) ;
p . extend_from_slice ( server . as_bytes ( ) ) ;
p
}
OutgoingKind ::Push ( user , pushkey ) = > {
let mut p = b " $ " . to_vec ( ) ;
2022-10-08 13:02:52 +02:00
p . extend_from_slice ( user . as_bytes ( ) ) ;
2021-05-12 20:04:28 +02:00
p . push ( 0xff ) ;
2022-10-08 13:02:52 +02:00
p . extend_from_slice ( pushkey . as_bytes ( ) ) ;
2021-05-12 20:04:28 +02:00
p
}
OutgoingKind ::Normal ( server ) = > {
let mut p = Vec ::new ( ) ;
p . extend_from_slice ( server . as_bytes ( ) ) ;
p
}
} ;
prefix . push ( 0xff ) ;
prefix
}
}
#[ derive(Clone, Debug, PartialEq, Eq, Hash) ]
pub enum SendingEventType {
2022-10-08 13:02:52 +02:00
Pdu ( Vec < u8 > ) , // pduid
Edu ( Vec < u8 > ) , // pdu json
2021-05-12 20:04:28 +02:00
}
2022-09-07 13:25:51 +02:00
pub struct Service {
2022-10-08 13:02:52 +02:00
db : & 'static dyn Data ,
2020-09-15 16:13:54 +02:00
/// The state for a given state hash.
2020-12-19 16:00:11 +01:00
pub ( super ) maximum_requests : Arc < Semaphore > ,
2022-10-08 13:02:52 +02:00
pub sender : mpsc ::UnboundedSender < ( OutgoingKind , SendingEventType , Vec < u8 > ) > ,
2020-09-15 16:13:54 +02:00
}
2021-04-24 18:01:05 +02:00
enum TransactionStatus {
Running ,
Failed ( u32 , Instant ) , // number of times failed, time of last failure
Retrying ( u32 ) , // number of times failed
}
2022-09-07 13:25:51 +02:00
impl Service {
2022-10-08 13:02:52 +02:00
pub fn build ( db : & 'static dyn Data , config : & Config ) -> Arc < Self > {
let ( sender , receiver ) = mpsc ::unbounded_channel ( ) ;
let self1 = Arc ::new ( Self { db , sender , maximum_requests : Arc ::new ( Semaphore ::new ( config . max_concurrent_requests as usize ) ) } ) ;
let self2 = Arc ::clone ( & self1 ) ;
2020-09-15 16:13:54 +02:00
tokio ::spawn ( async move {
2022-10-08 13:02:52 +02:00
self2 . start_handler ( receiver ) . await . unwrap ( ) ;
} ) ;
2020-09-23 15:23:29 +02:00
2022-10-08 13:02:52 +02:00
self1
}
2020-10-21 16:08:54 +02:00
2022-10-08 13:02:52 +02:00
async fn start_handler ( & self , mut receiver : mpsc ::UnboundedReceiver < ( OutgoingKind , SendingEventType , Vec < u8 > ) > ) -> Result < ( ) > {
let mut futures = FuturesUnordered ::new ( ) ;
2021-07-14 07:07:08 +00:00
2022-10-08 13:02:52 +02:00
let mut current_transaction_status = HashMap ::< OutgoingKind , TransactionStatus > ::new ( ) ;
2021-02-26 13:24:07 +01:00
2022-10-08 13:02:52 +02:00
// Retry requests we could not finish yet
let mut initial_transactions = HashMap ::< OutgoingKind , Vec < SendingEventType > > ::new ( ) ;
2020-10-21 16:08:54 +02:00
2022-10-08 13:02:52 +02:00
for ( key , outgoing_kind , event ) in self . db . active_requests ( ) . filter_map ( | r | r . ok ( ) )
{
let entry = initial_transactions
. entry ( outgoing_kind . clone ( ) )
. or_insert_with ( Vec ::new ) ;
if entry . len ( ) > 30 {
warn! (
" Dropping some current events: {:?} {:?} {:?} " ,
key , outgoing_kind , event
) ;
self . db . delete_active_request ( key ) ? ;
continue ;
2020-10-21 16:08:54 +02:00
}
2022-10-08 13:02:52 +02:00
entry . push ( event ) ;
}
for ( outgoing_kind , events ) in initial_transactions {
current_transaction_status
. insert ( outgoing_kind . clone ( ) , TransactionStatus ::Running ) ;
futures . push ( Self ::handle_events ( outgoing_kind . clone ( ) , events ) ) ;
}
loop {
select! {
Some ( response ) = futures . next ( ) = > {
match response {
Ok ( outgoing_kind ) = > {
self . db . delete_all_active_requests_for ( & outgoing_kind ) ? ;
// Find events that have been added since starting the last request
let new_events = self . db . queued_requests ( & outgoing_kind ) . filter_map ( | r | r . ok ( ) ) . take ( 30 ) . collect ::< Vec < _ > > ( ) ;
// TODO: find edus
if ! new_events . is_empty ( ) {
// Insert pdus we found
self . db . mark_as_active ( & new_events ) ? ;
futures . push (
Self ::handle_events (
outgoing_kind . clone ( ) ,
new_events . into_iter ( ) . map ( | ( event , _ ) | event ) . collect ( ) ,
)
) ;
} else {
current_transaction_status . remove ( & outgoing_kind ) ;
2020-09-15 16:13:54 +02:00
}
2020-09-23 15:23:29 +02:00
}
2022-10-08 13:02:52 +02:00
Err ( ( outgoing_kind , _ ) ) = > {
current_transaction_status . entry ( outgoing_kind ) . and_modify ( | e | * e = match e {
TransactionStatus ::Running = > TransactionStatus ::Failed ( 1 , Instant ::now ( ) ) ,
TransactionStatus ::Retrying ( n ) = > TransactionStatus ::Failed ( * n + 1 , Instant ::now ( ) ) ,
TransactionStatus ::Failed ( _ , _ ) = > {
error! ( " Request that was not even running failed?! " ) ;
return
} ,
} ) ;
}
} ;
} ,
Some ( ( outgoing_kind , event , key ) ) = receiver . recv ( ) = > {
if let Ok ( Some ( events ) ) = self . select_events (
& outgoing_kind ,
vec! [ ( event , key ) ] ,
& mut current_transaction_status ,
) {
futures . push ( Self ::handle_events ( outgoing_kind , events ) ) ;
2020-09-23 15:23:29 +02:00
}
2020-09-15 16:13:54 +02:00
}
}
2022-10-08 13:02:52 +02:00
}
2020-09-15 16:13:54 +02:00
}
2022-10-08 13:02:52 +02:00
#[ tracing::instrument(skip(self, outgoing_kind, new_events, current_transaction_status)) ]
2021-05-12 20:04:28 +02:00
fn select_events (
2022-10-08 13:02:52 +02:00
& self ,
2021-05-12 20:04:28 +02:00
outgoing_kind : & OutgoingKind ,
2021-06-08 18:10:00 +02:00
new_events : Vec < ( SendingEventType , Vec < u8 > ) > , // Events we want to send: event and full key
2022-10-08 13:02:52 +02:00
current_transaction_status : & mut HashMap < OutgoingKind , TransactionStatus > ,
2021-06-08 18:10:00 +02:00
) -> Result < Option < Vec < SendingEventType > > > {
2021-05-12 20:04:28 +02:00
let mut retry = false ;
let mut allow = true ;
2022-10-08 13:02:52 +02:00
let entry = current_transaction_status . entry ( outgoing_kind . clone ( ) ) ;
2021-05-12 20:04:28 +02:00
entry
. and_modify ( | e | match e {
TransactionStatus ::Running | TransactionStatus ::Retrying ( _ ) = > {
allow = false ; // already running
}
TransactionStatus ::Failed ( tries , time ) = > {
// Fail if a request has failed recently (exponential backoff)
let mut min_elapsed_duration = Duration ::from_secs ( 30 ) * ( * tries ) * ( * tries ) ;
if min_elapsed_duration > Duration ::from_secs ( 60 * 60 * 24 ) {
min_elapsed_duration = Duration ::from_secs ( 60 * 60 * 24 ) ;
}
if time . elapsed ( ) < min_elapsed_duration {
allow = false ;
} else {
retry = true ;
* e = TransactionStatus ::Retrying ( * tries ) ;
}
}
} )
. or_insert ( TransactionStatus ::Running ) ;
if ! allow {
2021-06-08 18:10:00 +02:00
return Ok ( None ) ;
2021-05-12 20:04:28 +02:00
}
let mut events = Vec ::new ( ) ;
if retry {
// We retry the previous transaction
2022-10-08 13:02:52 +02:00
for ( _ , e ) in self . db . active_requests_for ( outgoing_kind ) . filter_map ( | r | r . ok ( ) ) {
events . push ( e ) ;
2021-05-12 20:04:28 +02:00
}
} else {
2022-10-08 13:02:52 +02:00
self . db . mark_as_active ( & new_events ) ? ;
for ( e , _ ) in new_events {
2021-05-12 20:04:28 +02:00
events . push ( e ) ;
}
2021-05-17 10:25:27 +02:00
2021-05-20 23:46:52 +02:00
if let OutgoingKind ::Normal ( server_name ) = outgoing_kind {
2022-10-08 13:02:52 +02:00
if let Ok ( ( select_edus , last_count ) ) = self . select_edus ( server_name ) {
2021-07-20 21:17:15 +02:00
events . extend ( select_edus . into_iter ( ) . map ( SendingEventType ::Edu ) ) ;
2022-10-08 13:02:52 +02:00
self . db . set_latest_educount ( server_name , last_count ) ? ;
2021-05-17 10:25:27 +02:00
}
}
2021-05-12 20:04:28 +02:00
}
2021-06-08 18:10:00 +02:00
Ok ( Some ( events ) )
2021-05-12 20:04:28 +02:00
}
2022-10-08 13:02:52 +02:00
#[ tracing::instrument(skip(self, server_name)) ]
pub fn select_edus ( & self , server_name : & ServerName ) -> Result < ( Vec < Vec < u8 > > , u64 ) > {
2021-05-17 10:25:27 +02:00
// u64: count of last edu
2022-10-08 13:02:52 +02:00
let since = self . db . get_latest_educount ( server_name ) ? ;
2021-05-17 10:25:27 +02:00
let mut events = Vec ::new ( ) ;
let mut max_edu_count = since ;
2021-08-24 19:10:31 +02:00
let mut device_list_changes = HashSet ::new ( ) ;
2022-10-08 13:02:52 +02:00
' outer : for room_id in services ( ) . rooms . state_cache . server_rooms ( server_name ) {
2021-05-17 10:25:27 +02:00
let room_id = room_id ? ;
2021-08-24 19:10:31 +02:00
// Look for device list updates in this room
device_list_changes . extend (
2022-10-05 20:34:31 +02:00
services ( )
. users
2021-08-24 19:10:31 +02:00
. keys_changed ( & room_id . to_string ( ) , since , None )
. filter_map ( | r | r . ok ( ) )
2022-09-07 13:25:51 +02:00
. filter ( | user_id | user_id . server_name ( ) = = services ( ) . globals . server_name ( ) ) ,
2021-08-24 19:10:31 +02:00
) ;
// Look for read receipts in this room
2022-10-08 13:02:52 +02:00
for r in services ( ) . rooms . edus . read_receipt . readreceipts_since ( & room_id , since ) {
2021-05-17 10:25:27 +02:00
let ( user_id , count , read_receipt ) = r ? ;
if count > max_edu_count {
max_edu_count = count ;
}
2022-09-07 13:25:51 +02:00
if user_id . server_name ( ) ! = services ( ) . globals . server_name ( ) {
2021-05-17 10:25:27 +02:00
continue ;
}
2021-10-13 11:51:30 +02:00
let event : AnySyncEphemeralRoomEvent =
serde_json ::from_str ( read_receipt . json ( ) . get ( ) )
2021-05-17 10:25:27 +02:00
. map_err ( | _ | Error ::bad_database ( " Invalid edu event in read_receipts. " ) ) ? ;
let federation_event = match event {
AnySyncEphemeralRoomEvent ::Receipt ( r ) = > {
let mut read = BTreeMap ::new ( ) ;
2021-05-20 23:46:52 +02:00
let ( event_id , mut receipt ) = r
2021-05-17 10:25:27 +02:00
. content
. 0
. into_iter ( )
. next ( )
. expect ( " we only use one event per read receipt " ) ;
let receipt = receipt
2021-05-20 23:46:52 +02:00
. remove ( & ReceiptType ::Read )
2021-05-17 10:25:27 +02:00
. expect ( " our read receipts always set this " )
. remove ( & user_id )
. expect ( " our read receipts always have the user here " ) ;
read . insert (
user_id ,
ReceiptData {
data : receipt . clone ( ) ,
event_ids : vec ! [ event_id . clone ( ) ] ,
} ,
) ;
let receipt_map = ReceiptMap { read } ;
let mut receipts = BTreeMap ::new ( ) ;
receipts . insert ( room_id . clone ( ) , receipt_map ) ;
Edu ::Receipt ( ReceiptContent { receipts } )
}
_ = > {
Error ::bad_database ( " Invalid event type in read_receipts " ) ;
continue ;
}
} ;
2021-07-20 21:17:15 +02:00
events . push ( serde_json ::to_vec ( & federation_event ) . expect ( " json can be serialized " ) ) ;
2021-05-17 10:25:27 +02:00
if events . len ( ) > = 20 {
break 'outer ;
}
}
}
2021-08-24 19:10:31 +02:00
for user_id in device_list_changes {
// Empty prev id forces synapse to resync: https://github.com/matrix-org/synapse/blob/98aec1cc9da2bd6b8e34ffb282c85abf9b8b42ca/synapse/handlers/device.py#L767
// Because synapse resyncs, we can just insert dummy data
let edu = Edu ::DeviceListUpdate ( DeviceListUpdateContent {
user_id ,
2021-11-26 20:36:40 +01:00
device_id : device_id ! ( " dummy " ) . to_owned ( ) ,
2021-10-13 10:16:45 +02:00
device_display_name : Some ( " Dummy " . to_owned ( ) ) ,
2021-08-24 19:10:31 +02:00
stream_id : uint ! ( 1 ) ,
prev_id : Vec ::new ( ) ,
deleted : None ,
keys : None ,
} ) ;
events . push ( serde_json ::to_vec ( & edu ) . expect ( " json can be serialized " ) ) ;
}
2021-05-17 10:25:27 +02:00
Ok ( ( events , max_edu_count ) )
}
2022-10-08 13:02:52 +02:00
#[ tracing::instrument(skip(self, pdu_id, user, pushkey)) ]
pub fn send_push_pdu ( & self , pdu_id : & [ u8 ] , user : & UserId , pushkey : String ) -> Result < ( ) > {
let outgoing_kind = OutgoingKind ::Push ( user . to_owned ( ) , pushkey ) ;
let event = SendingEventType ::Pdu ( pdu_id . to_owned ( ) ) ;
let keys = self . db . queue_requests ( & [ ( & outgoing_kind , event . clone ( ) ) ] ) ? ;
self . sender . send ( ( outgoing_kind , event , keys . into_iter ( ) . next ( ) . unwrap ( ) ) ) . unwrap ( ) ;
2021-01-26 21:54:35 -05:00
Ok ( ( ) )
}
2021-09-14 14:23:43 +02:00
#[ tracing::instrument(skip(self, servers, pdu_id)) ]
pub fn send_pdu < I : Iterator < Item = Box < ServerName > > > (
& self ,
servers : I ,
pdu_id : & [ u8 ] ,
) -> Result < ( ) > {
2022-10-08 13:02:52 +02:00
let requests = servers . into_iter ( ) . map ( | server | ( OutgoingKind ::Normal ( server ) , SendingEventType ::Pdu ( pdu_id . to_owned ( ) ) ) ) . collect ::< Vec < _ > > ( ) ;
let keys = self . db . queue_requests ( & requests . iter ( ) . map ( | ( o , e ) | ( o , e . clone ( ) ) ) . collect ::< Vec < _ > > ( ) ) ? ;
for ( ( outgoing_kind , event ) , key ) in requests . into_iter ( ) . zip ( keys ) {
self . sender . send ( ( outgoing_kind . to_owned ( ) , event , key ) ) . unwrap ( ) ;
}
2021-07-20 21:17:15 +02:00
Ok ( ( ) )
}
2021-07-29 08:36:01 +02:00
#[ tracing::instrument(skip(self, server, serialized)) ]
2021-08-25 14:42:46 +02:00
pub fn send_reliable_edu (
& self ,
server : & ServerName ,
serialized : Vec < u8 > ,
id : u64 ,
) -> Result < ( ) > {
2022-10-08 13:02:52 +02:00
let outgoing_kind = OutgoingKind ::Normal ( server . to_owned ( ) ) ;
let event = SendingEventType ::Edu ( serialized ) ;
let keys = self . db . queue_requests ( & [ ( & outgoing_kind , event . clone ( ) ) ] ) ? ;
self . sender . send ( ( outgoing_kind , event , keys . into_iter ( ) . next ( ) . unwrap ( ) ) ) . unwrap ( ) ;
2020-09-15 16:13:54 +02:00
Ok ( ( ) )
}
2020-09-23 15:23:29 +02:00
2021-02-28 12:41:03 +01:00
#[ tracing::instrument(skip(self)) ]
2022-10-08 13:02:52 +02:00
pub fn send_pdu_appservice ( & self , appservice_id : String , pdu_id : Vec < u8 > ) -> Result < ( ) > {
let outgoing_kind = OutgoingKind ::Appservice ( appservice_id ) ;
let event = SendingEventType ::Pdu ( pdu_id ) ;
let keys = self . db . queue_requests ( & [ ( & outgoing_kind , event . clone ( ) ) ] ) ? ;
self . sender . send ( ( outgoing_kind , event , keys . into_iter ( ) . next ( ) . unwrap ( ) ) ) . unwrap ( ) ;
2020-12-08 10:33:44 +01:00
Ok ( ( ) )
}
2022-01-31 09:27:31 +01:00
/// Cleanup event data
/// Used for instance after we remove an appservice registration
///
#[ tracing::instrument(skip(self)) ]
2022-10-08 13:02:52 +02:00
pub fn cleanup_events ( & self , appservice_id : String ) -> Result < ( ) > {
self . db . delete_all_requests_for ( & OutgoingKind ::Appservice ( appservice_id ) ) ? ;
2022-01-31 09:27:31 +01:00
Ok ( ( ) )
}
2022-09-07 13:25:51 +02:00
#[ tracing::instrument(skip(events, kind)) ]
2021-05-12 20:04:28 +02:00
async fn handle_events (
2021-01-26 21:54:35 -05:00
kind : OutgoingKind ,
2021-05-12 20:04:28 +02:00
events : Vec < SendingEventType > ,
2021-10-13 11:56:46 +02:00
) -> Result < OutgoingKind , ( OutgoingKind , Error ) > {
2021-03-16 18:00:26 +01:00
match & kind {
2022-03-02 23:48:01 +08:00
OutgoingKind ::Appservice ( id ) = > {
2021-05-12 20:04:28 +02:00
let mut pdu_jsons = Vec ::new ( ) ;
for event in & events {
match event {
SendingEventType ::Pdu ( pdu_id ) = > {
2022-10-08 13:02:52 +02:00
pdu_jsons . push ( services ( ) . rooms . timeline
2021-09-13 19:45:56 +02:00
. get_pdu_from_id ( pdu_id )
2021-05-12 20:04:28 +02:00
. map_err ( | e | ( kind . clone ( ) , e ) ) ?
2021-01-26 21:54:35 -05:00
. ok_or_else ( | | {
(
2021-05-12 20:04:28 +02:00
kind . clone ( ) ,
2021-01-26 21:54:35 -05:00
Error ::bad_database (
2021-07-29 20:17:47 +02:00
" [Appservice] Event in servernameevent_data not found in db. " ,
2021-01-26 21:54:35 -05:00
) ,
)
} ) ?
2021-05-20 23:46:52 +02:00
. to_room_event ( ) )
2021-05-12 20:04:28 +02:00
}
SendingEventType ::Edu ( _ ) = > {
// Appservices don't need EDUs (?)
}
}
}
2022-09-07 13:25:51 +02:00
let permit = services ( ) . sending . maximum_requests . acquire ( ) . await ;
2021-03-22 14:04:11 +01:00
2021-03-15 09:48:19 +01:00
let response = appservice_server ::send_request (
2022-10-05 20:34:31 +02:00
services ( )
. appservice
2022-03-02 23:48:01 +08:00
. get_registration ( & id )
2022-01-31 09:27:31 +01:00
. map_err ( | e | ( kind . clone ( ) , e ) ) ?
. ok_or_else ( | | {
(
kind . clone ( ) ,
Error ::bad_database (
" [Appservice] Could not load registration from db. " ,
) ,
)
} ) ? ,
2021-01-26 21:54:35 -05:00
appservice ::event ::push_events ::v1 ::Request {
events : & pdu_jsons ,
2022-01-17 14:35:38 +01:00
txn_id : ( & * base64 ::encode_config (
2022-10-08 13:02:52 +02:00
calculate_hash (
2021-05-12 20:04:28 +02:00
& events
. iter ( )
. map ( | e | match e {
SendingEventType ::Edu ( b ) | SendingEventType ::Pdu ( b ) = > & * * b ,
} )
. collect ::< Vec < _ > > ( ) ,
) ,
2021-03-15 09:48:19 +01:00
base64 ::URL_SAFE_NO_PAD ,
2022-01-17 14:39:37 +01:00
) )
. into ( ) ,
2021-01-26 21:54:35 -05:00
} ,
)
. await
2021-03-15 09:48:19 +01:00
. map ( | _response | kind . clone ( ) )
. map_err ( | e | ( kind , e ) ) ;
2021-02-26 13:24:07 +01:00
2021-03-15 09:48:19 +01:00
drop ( permit ) ;
2021-02-26 13:24:07 +01:00
2021-03-15 09:48:19 +01:00
response
2021-01-26 21:54:35 -05:00
}
2022-10-08 13:02:52 +02:00
OutgoingKind ::Push ( userid , pushkey ) = > {
2021-05-12 20:04:28 +02:00
let mut pdus = Vec ::new ( ) ;
for event in & events {
match event {
SendingEventType ::Pdu ( pdu_id ) = > {
pdus . push (
2022-09-07 13:25:51 +02:00
services ( ) . rooms
2022-10-08 13:02:52 +02:00
. timeline
2021-09-13 19:45:56 +02:00
. get_pdu_from_id ( pdu_id )
2021-05-12 20:04:28 +02:00
. map_err ( | e | ( kind . clone ( ) , e ) ) ?
. ok_or_else ( | | {
(
kind . clone ( ) ,
Error ::bad_database (
2021-07-29 20:17:47 +02:00
" [Push] Event in servernamevent_datas not found in db. " ,
2021-05-12 20:04:28 +02:00
) ,
)
} ) ? ,
) ;
}
SendingEventType ::Edu ( _ ) = > {
// Push gateways don't need EDUs (?)
}
}
}
2021-01-29 10:14:09 -05:00
2021-03-16 18:00:26 +01:00
for pdu in pdus {
2021-01-29 10:14:09 -05:00
// Redacted events are not notification targets (we don't send push for them)
2021-10-13 10:16:45 +02:00
if let Some ( unsigned ) = & pdu . unsigned {
if let Ok ( unsigned ) =
serde_json ::from_str ::< serde_json ::Value > ( unsigned . get ( ) )
{
if unsigned . get ( " redacted_because " ) . is_some ( ) {
continue ;
}
}
2021-01-29 10:14:09 -05:00
}
2022-09-07 13:25:51 +02:00
let pusher = match services ( )
2021-03-22 14:04:11 +01:00
. pusher
2022-10-08 13:02:52 +02:00
. get_pusher ( & userid , pushkey )
. map_err ( | e | ( OutgoingKind ::Push ( userid . clone ( ) , pushkey . clone ( ) ) , e ) ) ?
2021-03-22 14:04:11 +01:00
{
Some ( pusher ) = > pusher ,
None = > continue ,
} ;
2022-09-07 13:25:51 +02:00
let rules_for_user = services ( )
2021-03-22 14:04:11 +01:00
. account_data
2022-04-06 21:31:29 +02:00
. get (
None ,
& userid ,
GlobalAccountDataEventType ::PushRules . to_string ( ) . into ( ) ,
)
2021-04-12 12:40:16 +02:00
. unwrap_or_default ( )
2022-10-08 13:02:52 +02:00
. and_then ( | event | serde_json ::from_str ::< PushRulesEvent > ( event . get ( ) ) . ok ( ) )
2021-10-13 11:51:30 +02:00
. map ( | ev : PushRulesEvent | ev . content . global )
2021-04-05 21:25:10 +02:00
. unwrap_or_else ( | | push ::Ruleset ::server_default ( & userid ) ) ;
2021-03-22 14:04:11 +01:00
2022-09-07 13:25:51 +02:00
let unread : UInt = services ( )
2021-03-22 14:04:11 +01:00
. rooms
2022-10-08 13:02:52 +02:00
. user
2021-04-12 12:40:16 +02:00
. notification_count ( & userid , & pdu . room_id )
. map_err ( | e | ( kind . clone ( ) , e ) ) ?
. try_into ( )
. expect ( " notifiation count can't go that high " ) ;
2021-03-22 14:04:11 +01:00
2022-09-07 13:25:51 +02:00
let permit = services ( ) . sending . maximum_requests . acquire ( ) . await ;
2021-03-22 14:04:11 +01:00
2022-10-05 20:34:31 +02:00
let _response = services ( )
. pusher
. send_push_notice ( & userid , unread , & pusher , rules_for_user , & pdu )
. await
. map ( | _response | kind . clone ( ) )
. map_err ( | e | ( kind . clone ( ) , e ) ) ;
2021-03-22 14:04:11 +01:00
drop ( permit ) ;
2021-01-26 21:54:35 -05:00
}
2022-10-08 13:02:52 +02:00
Ok ( OutgoingKind ::Push ( userid . clone ( ) , pushkey . clone ( ) ) )
2021-01-26 21:54:35 -05:00
}
OutgoingKind ::Normal ( server ) = > {
2021-05-12 20:04:28 +02:00
let mut edu_jsons = Vec ::new ( ) ;
let mut pdu_jsons = Vec ::new ( ) ;
for event in & events {
match event {
SendingEventType ::Pdu ( pdu_id ) = > {
2021-01-26 21:54:35 -05:00
// TODO: check room version and remove event_id if needed
2021-07-30 18:05:26 +02:00
let raw = PduEvent ::convert_to_outgoing_federation_event (
2022-09-07 13:25:51 +02:00
services ( ) . rooms
2022-10-08 13:02:52 +02:00
. timeline
2021-09-13 19:45:56 +02:00
. get_pdu_json_from_id ( pdu_id )
2021-07-30 18:05:26 +02:00
. map_err ( | e | ( OutgoingKind ::Normal ( server . clone ( ) ) , e ) ) ?
. ok_or_else ( | | {
(
OutgoingKind ::Normal ( server . clone ( ) ) ,
Error ::bad_database (
" [Normal] Event in servernamevent_datas not found in db. " ,
) ,
)
} ) ? ,
) ;
pdu_jsons . push ( raw ) ;
2021-05-12 20:04:28 +02:00
}
SendingEventType ::Edu ( edu ) = > {
2021-07-30 18:05:26 +02:00
if let Ok ( raw ) = serde_json ::from_slice ( edu ) {
edu_jsons . push ( raw ) ;
}
2021-05-12 20:04:28 +02:00
}
}
}
2021-01-26 21:54:35 -05:00
2022-09-07 13:25:51 +02:00
let permit = services ( ) . sending . maximum_requests . acquire ( ) . await ;
2021-03-15 09:48:19 +01:00
let response = server_server ::send_request (
2021-01-26 21:54:35 -05:00
& * server ,
send_transaction_message ::v1 ::Request {
2022-09-07 13:25:51 +02:00
origin : services ( ) . globals . server_name ( ) ,
2021-01-26 21:54:35 -05:00
pdus : & pdu_jsons ,
2021-05-12 20:04:28 +02:00
edus : & edu_jsons ,
2021-05-20 23:46:52 +02:00
origin_server_ts : MilliSecondsSinceUnixEpoch ::now ( ) ,
2022-01-17 14:35:38 +01:00
transaction_id : ( & * base64 ::encode_config (
2022-10-05 20:33:55 +02:00
calculate_hash (
2021-05-12 20:04:28 +02:00
& events
. iter ( )
. map ( | e | match e {
SendingEventType ::Edu ( b ) | SendingEventType ::Pdu ( b ) = > & * * b ,
} )
. collect ::< Vec < _ > > ( ) ,
) ,
2021-03-15 09:48:19 +01:00
base64 ::URL_SAFE_NO_PAD ,
2022-01-17 14:39:37 +01:00
) )
. into ( ) ,
2021-01-26 21:54:35 -05:00
} ,
)
. await
2021-03-15 09:48:19 +01:00
. map ( | response | {
2021-03-26 11:10:45 +01:00
for pdu in response . pdus {
if pdu . 1. is_err ( ) {
warn! ( " Failed to send to {}: {:?} " , server , pdu ) ;
}
}
2021-03-15 09:48:19 +01:00
kind . clone ( )
2020-12-08 10:33:44 +01:00
} )
2021-03-15 09:48:19 +01:00
. map_err ( | e | ( kind , e ) ) ;
2021-02-26 13:24:07 +01:00
2021-03-15 09:48:19 +01:00
drop ( permit ) ;
2021-02-26 13:24:07 +01:00
2021-03-15 09:48:19 +01:00
response
2021-01-26 21:54:35 -05:00
}
2020-12-08 10:33:44 +01:00
}
2020-09-23 15:23:29 +02:00
}
2020-12-19 16:00:11 +01:00
2022-09-07 13:25:51 +02:00
#[ tracing::instrument(skip(self, destination, request)) ]
2020-12-19 16:00:11 +01:00
pub async fn send_federation_request < T : OutgoingRequest > (
& self ,
2021-01-14 14:39:56 -05:00
destination : & ServerName ,
2020-12-19 16:00:11 +01:00
request : T ,
) -> Result < T ::IncomingResponse >
where
T : Debug ,
{
let permit = self . maximum_requests . acquire ( ) . await ;
2022-09-07 13:25:51 +02:00
let response = server_server ::send_request ( destination , request ) . await ;
2020-12-19 16:00:11 +01:00
drop ( permit ) ;
response
}
2022-09-07 13:25:51 +02:00
#[ tracing::instrument(skip(self, registration, request)) ]
2020-12-19 16:00:11 +01:00
pub async fn send_appservice_request < T : OutgoingRequest > (
& self ,
registration : serde_yaml ::Value ,
request : T ,
) -> Result < T ::IncomingResponse >
where
T : Debug ,
{
let permit = self . maximum_requests . acquire ( ) . await ;
2022-09-07 13:25:51 +02:00
let response = appservice_server ::send_request ( registration , request ) . await ;
2020-12-19 16:00:11 +01:00
drop ( permit ) ;
response
}
2020-09-15 16:13:54 +02:00
}