1
0
Fork 0
mirror of https://gitlab.com/famedly/conduit.git synced 2025-07-02 16:38:36 +00:00

Merge branch 'timer-retry' into 'next'

make federation retry timer-based

Closes #142 and #135

See merge request famedly/conduit!181
This commit is contained in:
Jonathan de Jong 2025-06-17 18:24:24 +00:00
commit 5366070396
3 changed files with 279 additions and 103 deletions

View file

@ -4,7 +4,7 @@ use crate::{
database::KeyValueDatabase, database::KeyValueDatabase,
service::{ service::{
self, self,
sending::{OutgoingKind, SendingEventType}, sending::{OutgoingDestination, SendingEventType},
}, },
services, utils, Error, Result, services, utils, Error, Result,
}; };
@ -12,7 +12,8 @@ use crate::{
impl service::sending::Data for KeyValueDatabase { impl service::sending::Data for KeyValueDatabase {
fn active_requests<'a>( fn active_requests<'a>(
&'a self, &'a self,
) -> Box<dyn Iterator<Item = Result<(Vec<u8>, OutgoingKind, SendingEventType)>> + 'a> { ) -> Box<dyn Iterator<Item = Result<(Vec<u8>, OutgoingDestination, SendingEventType)>> + 'a>
{
Box::new( Box::new(
self.servercurrentevent_data self.servercurrentevent_data
.iter() .iter()
@ -22,7 +23,7 @@ impl service::sending::Data for KeyValueDatabase {
fn active_requests_for<'a>( fn active_requests_for<'a>(
&'a self, &'a self,
outgoing_kind: &OutgoingKind, outgoing_kind: &OutgoingDestination,
) -> Box<dyn Iterator<Item = Result<(Vec<u8>, SendingEventType)>> + 'a> { ) -> Box<dyn Iterator<Item = Result<(Vec<u8>, SendingEventType)>> + 'a> {
let prefix = outgoing_kind.get_prefix(); let prefix = outgoing_kind.get_prefix();
Box::new( Box::new(
@ -36,7 +37,7 @@ impl service::sending::Data for KeyValueDatabase {
self.servercurrentevent_data.remove(&key) self.servercurrentevent_data.remove(&key)
} }
fn delete_all_active_requests_for(&self, outgoing_kind: &OutgoingKind) -> Result<()> { fn delete_all_active_requests_for(&self, outgoing_kind: &OutgoingDestination) -> Result<()> {
let prefix = outgoing_kind.get_prefix(); let prefix = outgoing_kind.get_prefix();
for (key, _) in self.servercurrentevent_data.scan_prefix(prefix) { for (key, _) in self.servercurrentevent_data.scan_prefix(prefix) {
self.servercurrentevent_data.remove(&key)?; self.servercurrentevent_data.remove(&key)?;
@ -45,7 +46,7 @@ impl service::sending::Data for KeyValueDatabase {
Ok(()) Ok(())
} }
fn delete_all_requests_for(&self, outgoing_kind: &OutgoingKind) -> Result<()> { fn delete_all_requests_for(&self, outgoing_kind: &OutgoingDestination) -> Result<()> {
let prefix = outgoing_kind.get_prefix(); let prefix = outgoing_kind.get_prefix();
for (key, _) in self.servercurrentevent_data.scan_prefix(prefix.clone()) { for (key, _) in self.servercurrentevent_data.scan_prefix(prefix.clone()) {
self.servercurrentevent_data.remove(&key).unwrap(); self.servercurrentevent_data.remove(&key).unwrap();
@ -60,7 +61,7 @@ impl service::sending::Data for KeyValueDatabase {
fn queue_requests( fn queue_requests(
&self, &self,
requests: &[(&OutgoingKind, SendingEventType)], requests: &[(&OutgoingDestination, SendingEventType)],
) -> Result<Vec<Vec<u8>>> { ) -> Result<Vec<Vec<u8>>> {
let mut batch = Vec::new(); let mut batch = Vec::new();
let mut keys = Vec::new(); let mut keys = Vec::new();
@ -86,7 +87,7 @@ impl service::sending::Data for KeyValueDatabase {
fn queued_requests<'a>( fn queued_requests<'a>(
&'a self, &'a self,
outgoing_kind: &OutgoingKind, outgoing_kind: &OutgoingDestination,
) -> Box<dyn Iterator<Item = Result<(SendingEventType, Vec<u8>)>> + 'a> { ) -> Box<dyn Iterator<Item = Result<(SendingEventType, Vec<u8>)>> + 'a> {
let prefix = outgoing_kind.get_prefix(); let prefix = outgoing_kind.get_prefix();
Box::new( Box::new(
@ -129,7 +130,7 @@ impl service::sending::Data for KeyValueDatabase {
fn parse_servercurrentevent( fn parse_servercurrentevent(
key: &[u8], key: &[u8],
value: Vec<u8>, value: Vec<u8>,
) -> Result<(OutgoingKind, SendingEventType)> { ) -> Result<(OutgoingDestination, SendingEventType)> {
// Appservices start with a plus // Appservices start with a plus
Ok::<_, Error>(if key.starts_with(b"+") { Ok::<_, Error>(if key.starts_with(b"+") {
let mut parts = key[1..].splitn(2, |&b| b == 0xff); let mut parts = key[1..].splitn(2, |&b| b == 0xff);
@ -144,7 +145,7 @@ fn parse_servercurrentevent(
})?; })?;
( (
OutgoingKind::Appservice(server), OutgoingDestination::Appservice(server),
if value.is_empty() { if value.is_empty() {
SendingEventType::Pdu(event.to_vec()) SendingEventType::Pdu(event.to_vec())
} else { } else {
@ -171,7 +172,7 @@ fn parse_servercurrentevent(
.ok_or_else(|| Error::bad_database("Invalid bytes in servercurrentpdus."))?; .ok_or_else(|| Error::bad_database("Invalid bytes in servercurrentpdus."))?;
( (
OutgoingKind::Push(user_id, pushkey_string), OutgoingDestination::Push(user_id, pushkey_string),
if value.is_empty() { if value.is_empty() {
SendingEventType::Pdu(event.to_vec()) SendingEventType::Pdu(event.to_vec())
} else { } else {
@ -192,7 +193,7 @@ fn parse_servercurrentevent(
})?; })?;
( (
OutgoingKind::Normal(ServerName::parse(server).map_err(|_| { OutgoingDestination::Normal(ServerName::parse(server).map_err(|_| {
Error::bad_database("Invalid server string in server_currenttransaction") Error::bad_database("Invalid server string in server_currenttransaction")
})?), })?),
if value.is_empty() { if value.is_empty() {

View file

@ -2,27 +2,27 @@ use ruma::ServerName;
use crate::Result; use crate::Result;
use super::{OutgoingKind, SendingEventType}; use super::{OutgoingDestination, SendingEventType};
pub trait Data: Send + Sync { pub trait Data: Send + Sync {
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
fn active_requests<'a>( fn active_requests<'a>(
&'a self, &'a self,
) -> Box<dyn Iterator<Item = Result<(Vec<u8>, OutgoingKind, SendingEventType)>> + 'a>; ) -> Box<dyn Iterator<Item = Result<(Vec<u8>, OutgoingDestination, SendingEventType)>> + 'a>;
fn active_requests_for<'a>( fn active_requests_for<'a>(
&'a self, &'a self,
outgoing_kind: &OutgoingKind, outgoing_kind: &OutgoingDestination,
) -> Box<dyn Iterator<Item = Result<(Vec<u8>, SendingEventType)>> + 'a>; ) -> Box<dyn Iterator<Item = Result<(Vec<u8>, SendingEventType)>> + 'a>;
fn delete_active_request(&self, key: Vec<u8>) -> Result<()>; fn delete_active_request(&self, key: Vec<u8>) -> Result<()>;
fn delete_all_active_requests_for(&self, outgoing_kind: &OutgoingKind) -> Result<()>; fn delete_all_active_requests_for(&self, outgoing_kind: &OutgoingDestination) -> Result<()>;
fn delete_all_requests_for(&self, outgoing_kind: &OutgoingKind) -> Result<()>; fn delete_all_requests_for(&self, outgoing_kind: &OutgoingDestination) -> Result<()>;
fn queue_requests( fn queue_requests(
&self, &self,
requests: &[(&OutgoingKind, SendingEventType)], requests: &[(&OutgoingDestination, SendingEventType)],
) -> Result<Vec<Vec<u8>>>; ) -> Result<Vec<Vec<u8>>>;
fn queued_requests<'a>( fn queued_requests<'a>(
&'a self, &'a self,
outgoing_kind: &OutgoingKind, outgoing_kind: &OutgoingDestination,
) -> Box<dyn Iterator<Item = Result<(SendingEventType, Vec<u8>)>> + 'a>; ) -> Box<dyn Iterator<Item = Result<(SendingEventType, Vec<u8>)>> + 'a>;
fn mark_as_active(&self, events: &[(SendingEventType, Vec<u8>)]) -> Result<()>; fn mark_as_active(&self, events: &[(SendingEventType, Vec<u8>)]) -> Result<()>;
fn set_latest_educount(&self, server_name: &ServerName, educount: u64) -> Result<()>; fn set_latest_educount(&self, server_name: &ServerName, educount: u64) -> Result<()>;

View file

@ -3,8 +3,9 @@ mod data;
pub use data::Data; pub use data::Data;
use std::{ use std::{
collections::{BTreeMap, HashMap, HashSet}, collections::{hash_map::Entry, BTreeMap, HashMap, HashSet},
fmt::Debug, fmt::Debug,
future::Future,
sync::Arc, sync::Arc,
time::{Duration, Instant}, time::{Duration, Instant},
}; };
@ -40,34 +41,34 @@ use ruma::{
}; };
use tokio::{ use tokio::{
select, select,
sync::{mpsc, Mutex, Semaphore}, sync::{mpsc, oneshot, Mutex, Semaphore},
}; };
use tracing::{debug, error, warn}; use tracing::{debug, error, warn};
#[derive(Clone, Debug, PartialEq, Eq, Hash)] #[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum OutgoingKind { pub enum OutgoingDestination {
Appservice(String), Appservice(String),
Push(OwnedUserId, String), // user and pushkey Push(OwnedUserId, String), // user and pushkey
Normal(OwnedServerName), Normal(OwnedServerName),
} }
impl OutgoingKind { impl OutgoingDestination {
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self))]
pub fn get_prefix(&self) -> Vec<u8> { pub fn get_prefix(&self) -> Vec<u8> {
let mut prefix = match self { let mut prefix = match self {
OutgoingKind::Appservice(server) => { OutgoingDestination::Appservice(server) => {
let mut p = b"+".to_vec(); let mut p = b"+".to_vec();
p.extend_from_slice(server.as_bytes()); p.extend_from_slice(server.as_bytes());
p p
} }
OutgoingKind::Push(user, pushkey) => { OutgoingDestination::Push(user, pushkey) => {
let mut p = b"$".to_vec(); let mut p = b"$".to_vec();
p.extend_from_slice(user.as_bytes()); p.extend_from_slice(user.as_bytes());
p.push(0xff); p.push(0xff);
p.extend_from_slice(pushkey.as_bytes()); p.extend_from_slice(pushkey.as_bytes());
p p
} }
OutgoingKind::Normal(server) => { OutgoingDestination::Normal(server) => {
let mut p = Vec::new(); let mut p = Vec::new();
p.extend_from_slice(server.as_bytes()); p.extend_from_slice(server.as_bytes());
p p
@ -77,6 +78,47 @@ impl OutgoingKind {
prefix prefix
} }
// This wraps the OutgoingDestination key in an interruptible sleep future.
//
// The first return value is the future, the second is the oneshot that interrupts that future,
// and causes it to return instantly.
fn wrap_in_interruptible_sleep(
self,
at: Instant,
) -> (impl Future<Output = Self>, oneshot::Sender<()>) {
let (tx, rx) = oneshot::channel();
let at = tokio::time::Instant::from_std(at);
(
async move {
let _ = tokio::time::timeout_at(at, rx).await;
self
},
tx,
)
}
}
impl std::fmt::Display for OutgoingDestination {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
OutgoingDestination::Appservice(appservice_id) => {
write!(f, "Appservice (ID {:?})", appservice_id)
}
OutgoingDestination::Push(user, push_key) => {
write!(
f,
"User Push Service (for {:?}, with key {:?})",
user, push_key
)
}
OutgoingDestination::Normal(server) => {
write!(f, "Matrix Server ({:?})", server)
}
}
}
} }
#[derive(Clone, Debug, PartialEq, Eq, Hash)] #[derive(Clone, Debug, PartialEq, Eq, Hash)]
@ -90,14 +132,29 @@ pub struct Service {
/// The state for a given state hash. /// The state for a given state hash.
pub(super) maximum_requests: Arc<Semaphore>, pub(super) maximum_requests: Arc<Semaphore>,
pub sender: mpsc::UnboundedSender<(OutgoingKind, SendingEventType, Vec<u8>)>, pub sender: mpsc::UnboundedSender<(OutgoingDestination, SendingEventType, Vec<u8>)>,
receiver: Mutex<mpsc::UnboundedReceiver<(OutgoingKind, SendingEventType, Vec<u8>)>>, receiver: Mutex<mpsc::UnboundedReceiver<(OutgoingDestination, SendingEventType, Vec<u8>)>>,
} }
enum TransactionStatus { enum TransactionStatus {
// Currently running (for the first time)
Running, Running,
Failed(u32, Instant), // number of times failed, time of last failure // Failed, backing off for a retry
Retrying(u32), // number of times failed Failed {
failures: u32,
waker: Option<oneshot::Sender<()>>,
},
// Currently retrying
Retrying {
failures: u32,
}, // number of times failed
}
// A control-flow enum to dictate what the handler should do after (trying to) prepare a transaction
enum TransactionPrepOutcome {
Send(Vec<SendingEventType>),
Wake(OutgoingDestination),
Nothing,
} }
impl Service { impl Service {
@ -119,14 +176,17 @@ impl Service {
} }
async fn handler(&self) -> Result<()> { async fn handler(&self) -> Result<()> {
let mut receiver = self.receiver.lock().await; let mut new_transactions = self.receiver.lock().await;
let (waking_sender, mut waking_receiver) = mpsc::unbounded_channel();
let mut futures = FuturesUnordered::new(); let mut outgoing = FuturesUnordered::new();
let mut retrying = FuturesUnordered::new();
let mut current_transaction_status = HashMap::<OutgoingKind, TransactionStatus>::new(); let mut current_transaction_status =
HashMap::<OutgoingDestination, TransactionStatus>::new();
// Retry requests we could not finish yet // Retry requests we could not finish yet
let mut initial_transactions = HashMap::<OutgoingKind, Vec<SendingEventType>>::new(); let mut initial_transactions = HashMap::<OutgoingDestination, Vec<SendingEventType>>::new();
for (key, outgoing_kind, event) in self.db.active_requests().filter_map(|r| r.ok()) { for (key, outgoing_kind, event) in self.db.active_requests().filter_map(|r| r.ok()) {
let entry = initial_transactions let entry = initial_transactions
@ -147,12 +207,34 @@ impl Service {
for (outgoing_kind, events) in initial_transactions { for (outgoing_kind, events) in initial_transactions {
current_transaction_status.insert(outgoing_kind.clone(), TransactionStatus::Running); current_transaction_status.insert(outgoing_kind.clone(), TransactionStatus::Running);
futures.push(Self::handle_events(outgoing_kind.clone(), events)); outgoing.push(Self::handle_events(outgoing_kind.clone(), events));
} }
loop { loop {
select! { select! {
Some(response) = futures.next() => { // New transactions to be sent out (from server/user activity)
Some((dest, event, key)) = new_transactions.recv() => {
match self.prepare_transaction(
&dest,
vec![(event, key)],
&mut current_transaction_status,
true,
) {
Ok(TransactionPrepOutcome::Send(events)) => {
outgoing.push(Self::handle_events(dest, events));
},
Ok(TransactionPrepOutcome::Wake(dest)) => {
waking_sender.send(dest).expect("nothing closes this channel but ourselves");
},
Ok(TransactionPrepOutcome::Nothing) => {},
Err(err) => {
error!("Ignoring error in (fresh) outgoing request ({}) handler: {}", dest, err)
}
}
},
Some(response) = outgoing.next() => {
// Outgoing transaction succeeded
match response { match response {
Ok(outgoing_kind) => { Ok(outgoing_kind) => {
self.db.delete_all_active_requests_for(&outgoing_kind)?; self.db.delete_all_active_requests_for(&outgoing_kind)?;
@ -164,9 +246,12 @@ impl Service {
// Insert pdus we found // Insert pdus we found
self.db.mark_as_active(&new_events)?; self.db.mark_as_active(&new_events)?;
futures.push( // Clear retries
current_transaction_status.insert(outgoing_kind.clone(), TransactionStatus::Running);
outgoing.push(
Self::handle_events( Self::handle_events(
outgoing_kind.clone(), outgoing_kind,
new_events.into_iter().map(|(event, _)| event).collect(), new_events.into_iter().map(|(event, _)| event).collect(),
) )
); );
@ -174,72 +259,160 @@ impl Service {
current_transaction_status.remove(&outgoing_kind); current_transaction_status.remove(&outgoing_kind);
} }
} }
Err((outgoing_kind, _)) => {
current_transaction_status.entry(outgoing_kind).and_modify(|e| *e = match e { // Outgoing transaction failed
TransactionStatus::Running => TransactionStatus::Failed(1, Instant::now()), Err((destination, err)) => {
TransactionStatus::Retrying(n) => TransactionStatus::Failed(*n+1, Instant::now()), // Set status to Failed, create timer
TransactionStatus::Failed(_, _) => { let timer = Self::mark_failed_and_backoff(&mut current_transaction_status, destination.clone());
error!("Request that was not even running failed?!");
return // Add timer to loop
}, retrying.push(timer);
});
warn!("Outgoing request to {} failed: {}", destination, err);
} }
}; };
}, },
Some((outgoing_kind, event, key)) = receiver.recv() => {
if let Ok(Some(events)) = self.select_events( // Transaction retry timers firing
&outgoing_kind, Some(dest) = retrying.next() => {
vec![(event, key)], // Transition Failed => Retrying, return pending old transaction events
match self.prepare_transaction(
&dest,
vec![], // will be ignored because fresh == false
&mut current_transaction_status, &mut current_transaction_status,
false,
) { ) {
futures.push(Self::handle_events(outgoing_kind, events)); Ok(TransactionPrepOutcome::Send(events)) => {
outgoing.push(Self::handle_events(dest, events));
}
Ok(_) => {
// Unreachable because fresh == false
unreachable!("prepare_transaction on a stale transaction {} did not return ::Send", dest)
}
Err(err) => {
error!("Ignoring error in (stale) outgoing request ({}) handler: {}", dest, err);
// transaction dropped, so drop destination as well.
current_transaction_status.remove(&dest);
}
} }
} },
// Explicit wakeups, makes a backoff timer return immediately
Some(outgoing) = waking_receiver.recv() => {
if let Some(TransactionStatus::Failed { waker, .. }) = current_transaction_status.get_mut(&outgoing) {
if let Some(waker) = waker.take() {
let _ = waker.send(());
}
}
},
} }
} }
} }
/// Generates timer/oneshot, alters status to reflect Failed
///
/// Returns timer/oneshot future to wake up loop for next retry
fn mark_failed_and_backoff(
status: &mut HashMap<OutgoingDestination, TransactionStatus>,
dest: OutgoingDestination,
) -> impl Future<Output = OutgoingDestination> {
let now = Instant::now();
let entry = status
.get_mut(&dest)
.expect("guaranteed to be set before this function");
let failures = match entry {
// Running -> Failed
TransactionStatus::Running => 1,
// Retrying -> Failed
TransactionStatus::Retrying { failures } => *failures + 1,
// The transition of Failed -> Retrying is handled by handle_events
TransactionStatus::Failed { .. } => {
unreachable!("TransactionStatus in inconsistent state: Expected either Running or Retrying, got Failed, bailing...")
}
};
const ONE_DAY: Duration = Duration::from_secs(60 * 60 * 24);
// Exponential backoff, clamp upper value to one day
let next_wakeup = now + (Duration::from_secs(30) * failures * failures).min(ONE_DAY);
let (fut, waker) = dest.wrap_in_interruptible_sleep(next_wakeup);
*entry = TransactionStatus::Failed {
failures,
waker: Some(waker),
};
fut
}
// This prepares a transaction, checks the transaction state, and selects appropriate events.
#[tracing::instrument(skip(self, outgoing_kind, new_events, current_transaction_status))] #[tracing::instrument(skip(self, outgoing_kind, new_events, current_transaction_status))]
fn select_events( fn prepare_transaction(
&self, &self,
outgoing_kind: &OutgoingKind, outgoing_kind: &OutgoingDestination,
new_events: Vec<(SendingEventType, Vec<u8>)>, // Events we want to send: event and full key new_events: Vec<(SendingEventType, Vec<u8>)>, // Events we want to send: event and full key
current_transaction_status: &mut HashMap<OutgoingKind, TransactionStatus>, current_transaction_status: &mut HashMap<OutgoingDestination, TransactionStatus>,
) -> Result<Option<Vec<SendingEventType>>> { fresh: bool, // Wether or not this transaction came from server activity.
let mut retry = false; ) -> Result<TransactionPrepOutcome> {
let mut allow = true; let mut allowed = true;
let mut retrying = false;
let mut wake_up = false;
let entry = current_transaction_status.entry(outgoing_kind.clone()); let entry = current_transaction_status.entry(outgoing_kind.clone());
entry if fresh {
.and_modify(|e| match e { // If its fresh, we initialise the status if we need to.
TransactionStatus::Running | TransactionStatus::Retrying(_) => { //
allow = false; // already running // We do nothing if it is already running or retrying.
} //
TransactionStatus::Failed(tries, time) => { // We return with a wake if it is in the Failed state.
// Fail if a request has failed recently (exponential backoff) entry
let mut min_elapsed_duration = Duration::from_secs(30) * (*tries) * (*tries); .and_modify(|e| match e {
if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) { TransactionStatus::Running | TransactionStatus::Retrying { .. } => {
min_elapsed_duration = Duration::from_secs(60 * 60 * 24); // already running
allowed = false;
} }
TransactionStatus::Failed { .. } => {
if time.elapsed() < min_elapsed_duration { // Currently sleeping, time to call the kool-aid man
allow = false; wake_up = true;
} else {
retry = true;
*e = TransactionStatus::Retrying(*tries);
} }
} })
}) .or_insert(TransactionStatus::Running);
.or_insert(TransactionStatus::Running); } else {
// If it's not fresh, we expect an entry.
//
// We also expect us to be the only one who are touching this destination right now, and its a stale transaction, so it must be in the Failed state
match entry {
Entry::Occupied(mut e) => {
let e = e.get_mut();
match e {
TransactionStatus::Failed { failures, .. } => {
*e = TransactionStatus::Retrying { failures: *failures };
retrying = true;
},
if !allow { _ => unreachable!("Encountered bad state when preparing stale transaction: expected Failed state, got Running or Retrying")
return Ok(None); }
},
Entry::Vacant(_) => unreachable!("Encountered bad state when preparing stale transaction: expected Failed state, got vacant state"),
}
}
if wake_up {
return Ok(TransactionPrepOutcome::Wake(outgoing_kind.clone()));
} else if !allowed {
return Ok(TransactionPrepOutcome::Nothing);
} }
let mut events = Vec::new(); let mut events = Vec::new();
if retry { if retrying {
// We retry the previous transaction // We retry the previous transaction
for (_, e) in self for (_, e) in self
.db .db
@ -254,7 +427,7 @@ impl Service {
events.push(e); events.push(e);
} }
if let OutgoingKind::Normal(server_name) = outgoing_kind { if let OutgoingDestination::Normal(server_name) = outgoing_kind {
if let Ok((select_edus, last_count)) = self.select_edus(server_name) { if let Ok((select_edus, last_count)) = self.select_edus(server_name) {
events.extend(select_edus.into_iter().map(SendingEventType::Edu)); events.extend(select_edus.into_iter().map(SendingEventType::Edu));
@ -263,7 +436,7 @@ impl Service {
} }
} }
Ok(Some(events)) Ok(TransactionPrepOutcome::Send(events))
} }
#[tracing::instrument(skip(self, server_name))] #[tracing::instrument(skip(self, server_name))]
@ -371,7 +544,7 @@ impl Service {
#[tracing::instrument(skip(self, pdu_id, user, pushkey))] #[tracing::instrument(skip(self, pdu_id, user, pushkey))]
pub fn send_push_pdu(&self, pdu_id: &[u8], user: &UserId, pushkey: String) -> Result<()> { pub fn send_push_pdu(&self, pdu_id: &[u8], user: &UserId, pushkey: String) -> Result<()> {
let outgoing_kind = OutgoingKind::Push(user.to_owned(), pushkey); let outgoing_kind = OutgoingDestination::Push(user.to_owned(), pushkey);
let event = SendingEventType::Pdu(pdu_id.to_owned()); let event = SendingEventType::Pdu(pdu_id.to_owned());
let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?; let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?;
self.sender self.sender
@ -391,7 +564,7 @@ impl Service {
.into_iter() .into_iter()
.map(|server| { .map(|server| {
( (
OutgoingKind::Normal(server), OutgoingDestination::Normal(server),
SendingEventType::Pdu(pdu_id.to_owned()), SendingEventType::Pdu(pdu_id.to_owned()),
) )
}) })
@ -418,7 +591,7 @@ impl Service {
serialized: Vec<u8>, serialized: Vec<u8>,
id: u64, id: u64,
) -> Result<()> { ) -> Result<()> {
let outgoing_kind = OutgoingKind::Normal(server.to_owned()); let outgoing_kind = OutgoingDestination::Normal(server.to_owned());
let event = SendingEventType::Edu(serialized); let event = SendingEventType::Edu(serialized);
let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?; let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?;
self.sender self.sender
@ -430,7 +603,7 @@ impl Service {
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self))]
pub fn send_pdu_appservice(&self, appservice_id: String, pdu_id: Vec<u8>) -> Result<()> { pub fn send_pdu_appservice(&self, appservice_id: String, pdu_id: Vec<u8>) -> Result<()> {
let outgoing_kind = OutgoingKind::Appservice(appservice_id); let outgoing_kind = OutgoingDestination::Appservice(appservice_id);
let event = SendingEventType::Pdu(pdu_id); let event = SendingEventType::Pdu(pdu_id);
let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?; let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?;
self.sender self.sender
@ -446,18 +619,18 @@ impl Service {
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self))]
pub fn cleanup_events(&self, appservice_id: String) -> Result<()> { pub fn cleanup_events(&self, appservice_id: String) -> Result<()> {
self.db self.db
.delete_all_requests_for(&OutgoingKind::Appservice(appservice_id))?; .delete_all_requests_for(&OutgoingDestination::Appservice(appservice_id))?;
Ok(()) Ok(())
} }
#[tracing::instrument(skip(events, kind))] #[tracing::instrument(skip(events, kind))]
async fn handle_events( async fn handle_events(
kind: OutgoingKind, kind: OutgoingDestination,
events: Vec<SendingEventType>, events: Vec<SendingEventType>,
) -> Result<OutgoingKind, (OutgoingKind, Error)> { ) -> Result<OutgoingDestination, (OutgoingDestination, Error)> {
match &kind { match &kind {
OutgoingKind::Appservice(id) => { OutgoingDestination::Appservice(id) => {
let mut pdu_jsons = Vec::new(); let mut pdu_jsons = Vec::new();
for event in &events { for event in &events {
@ -522,7 +695,7 @@ impl Service {
response response
} }
OutgoingKind::Push(userid, pushkey) => { OutgoingDestination::Push(userid, pushkey) => {
let mut pdus = Vec::new(); let mut pdus = Vec::new();
for event in &events { for event in &events {
@ -561,14 +734,16 @@ impl Service {
} }
} }
let pusher = match services() let pusher =
.pusher match services().pusher.get_pusher(userid, pushkey).map_err(|e| {
.get_pusher(userid, pushkey) (
.map_err(|e| (OutgoingKind::Push(userid.clone(), pushkey.clone()), e))? OutgoingDestination::Push(userid.clone(), pushkey.clone()),
{ e,
Some(pusher) => pusher, )
None => continue, })? {
}; Some(pusher) => pusher,
None => continue,
};
let rules_for_user = services() let rules_for_user = services()
.account_data .account_data
@ -601,9 +776,9 @@ impl Service {
drop(permit); drop(permit);
} }
Ok(OutgoingKind::Push(userid.clone(), pushkey.clone())) Ok(OutgoingDestination::Push(userid.clone(), pushkey.clone()))
} }
OutgoingKind::Normal(server) => { OutgoingDestination::Normal(server) => {
let mut edu_jsons = Vec::new(); let mut edu_jsons = Vec::new();
let mut pdu_jsons = Vec::new(); let mut pdu_jsons = Vec::new();
@ -615,11 +790,11 @@ impl Service {
services().rooms services().rooms
.timeline .timeline
.get_pdu_json_from_id(pdu_id) .get_pdu_json_from_id(pdu_id)
.map_err(|e| (OutgoingKind::Normal(server.clone()), e))? .map_err(|e| (OutgoingDestination::Normal(server.clone()), e))?
.ok_or_else(|| { .ok_or_else(|| {
error!("event not found: {server} {pdu_id:?}"); error!("event not found: {server} {pdu_id:?}");
( (
OutgoingKind::Normal(server.clone()), OutgoingDestination::Normal(server.clone()),
Error::bad_database( Error::bad_database(
"[Normal] Event in servernamevent_data not found in db.", "[Normal] Event in servernamevent_data not found in db.",
), ),