diff --git a/src/database/key_value/sending.rs b/src/database/key_value/sending.rs index b7064610..e17adb74 100644 --- a/src/database/key_value/sending.rs +++ b/src/database/key_value/sending.rs @@ -4,7 +4,7 @@ use crate::{ database::KeyValueDatabase, service::{ self, - sending::{OutgoingKind, SendingEventType}, + sending::{OutgoingDestination, SendingEventType}, }, services, utils, Error, Result, }; @@ -12,7 +12,8 @@ use crate::{ impl service::sending::Data for KeyValueDatabase { fn active_requests<'a>( &'a self, - ) -> Box, OutgoingKind, SendingEventType)>> + 'a> { + ) -> Box, OutgoingDestination, SendingEventType)>> + 'a> + { Box::new( self.servercurrentevent_data .iter() @@ -22,7 +23,7 @@ impl service::sending::Data for KeyValueDatabase { fn active_requests_for<'a>( &'a self, - outgoing_kind: &OutgoingKind, + outgoing_kind: &OutgoingDestination, ) -> Box, SendingEventType)>> + 'a> { let prefix = outgoing_kind.get_prefix(); Box::new( @@ -36,7 +37,7 @@ impl service::sending::Data for KeyValueDatabase { self.servercurrentevent_data.remove(&key) } - fn delete_all_active_requests_for(&self, outgoing_kind: &OutgoingKind) -> Result<()> { + fn delete_all_active_requests_for(&self, outgoing_kind: &OutgoingDestination) -> Result<()> { let prefix = outgoing_kind.get_prefix(); for (key, _) in self.servercurrentevent_data.scan_prefix(prefix) { self.servercurrentevent_data.remove(&key)?; @@ -45,7 +46,7 @@ impl service::sending::Data for KeyValueDatabase { Ok(()) } - fn delete_all_requests_for(&self, outgoing_kind: &OutgoingKind) -> Result<()> { + fn delete_all_requests_for(&self, outgoing_kind: &OutgoingDestination) -> Result<()> { let prefix = outgoing_kind.get_prefix(); for (key, _) in self.servercurrentevent_data.scan_prefix(prefix.clone()) { self.servercurrentevent_data.remove(&key).unwrap(); @@ -60,7 +61,7 @@ impl service::sending::Data for KeyValueDatabase { fn queue_requests( &self, - requests: &[(&OutgoingKind, SendingEventType)], + requests: &[(&OutgoingDestination, SendingEventType)], ) -> Result>> { let mut batch = Vec::new(); let mut keys = Vec::new(); @@ -86,7 +87,7 @@ impl service::sending::Data for KeyValueDatabase { fn queued_requests<'a>( &'a self, - outgoing_kind: &OutgoingKind, + outgoing_kind: &OutgoingDestination, ) -> Box)>> + 'a> { let prefix = outgoing_kind.get_prefix(); Box::new( @@ -129,7 +130,7 @@ impl service::sending::Data for KeyValueDatabase { fn parse_servercurrentevent( key: &[u8], value: Vec, -) -> Result<(OutgoingKind, SendingEventType)> { +) -> Result<(OutgoingDestination, SendingEventType)> { // Appservices start with a plus Ok::<_, Error>(if key.starts_with(b"+") { let mut parts = key[1..].splitn(2, |&b| b == 0xff); @@ -144,7 +145,7 @@ fn parse_servercurrentevent( })?; ( - OutgoingKind::Appservice(server), + OutgoingDestination::Appservice(server), if value.is_empty() { SendingEventType::Pdu(event.to_vec()) } else { @@ -171,7 +172,7 @@ fn parse_servercurrentevent( .ok_or_else(|| Error::bad_database("Invalid bytes in servercurrentpdus."))?; ( - OutgoingKind::Push(user_id, pushkey_string), + OutgoingDestination::Push(user_id, pushkey_string), if value.is_empty() { SendingEventType::Pdu(event.to_vec()) } else { @@ -192,7 +193,7 @@ fn parse_servercurrentevent( })?; ( - OutgoingKind::Normal(ServerName::parse(server).map_err(|_| { + OutgoingDestination::Normal(ServerName::parse(server).map_err(|_| { Error::bad_database("Invalid server string in server_currenttransaction") })?), if value.is_empty() { diff --git a/src/service/sending/data.rs b/src/service/sending/data.rs index 8b4d236f..69f50b85 100644 --- a/src/service/sending/data.rs +++ b/src/service/sending/data.rs @@ -2,27 +2,27 @@ use ruma::ServerName; use crate::Result; -use super::{OutgoingKind, SendingEventType}; +use super::{OutgoingDestination, SendingEventType}; pub trait Data: Send + Sync { #[allow(clippy::type_complexity)] fn active_requests<'a>( &'a self, - ) -> Box, OutgoingKind, SendingEventType)>> + 'a>; + ) -> Box, OutgoingDestination, SendingEventType)>> + 'a>; fn active_requests_for<'a>( &'a self, - outgoing_kind: &OutgoingKind, + outgoing_kind: &OutgoingDestination, ) -> Box, SendingEventType)>> + 'a>; fn delete_active_request(&self, key: Vec) -> Result<()>; - fn delete_all_active_requests_for(&self, outgoing_kind: &OutgoingKind) -> Result<()>; - fn delete_all_requests_for(&self, outgoing_kind: &OutgoingKind) -> Result<()>; + fn delete_all_active_requests_for(&self, outgoing_kind: &OutgoingDestination) -> Result<()>; + fn delete_all_requests_for(&self, outgoing_kind: &OutgoingDestination) -> Result<()>; fn queue_requests( &self, - requests: &[(&OutgoingKind, SendingEventType)], + requests: &[(&OutgoingDestination, SendingEventType)], ) -> Result>>; fn queued_requests<'a>( &'a self, - outgoing_kind: &OutgoingKind, + outgoing_kind: &OutgoingDestination, ) -> Box)>> + 'a>; fn mark_as_active(&self, events: &[(SendingEventType, Vec)]) -> Result<()>; fn set_latest_educount(&self, server_name: &ServerName, educount: u64) -> Result<()>; diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index 63536c7b..0f07f231 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -3,8 +3,9 @@ mod data; pub use data::Data; use std::{ - collections::{BTreeMap, HashMap, HashSet}, + collections::{hash_map::Entry, BTreeMap, HashMap, HashSet}, fmt::Debug, + future::Future, sync::Arc, time::{Duration, Instant}, }; @@ -40,34 +41,34 @@ use ruma::{ }; use tokio::{ select, - sync::{mpsc, Mutex, Semaphore}, + sync::{mpsc, oneshot, Mutex, Semaphore}, }; use tracing::{debug, error, warn}; #[derive(Clone, Debug, PartialEq, Eq, Hash)] -pub enum OutgoingKind { +pub enum OutgoingDestination { Appservice(String), Push(OwnedUserId, String), // user and pushkey Normal(OwnedServerName), } -impl OutgoingKind { +impl OutgoingDestination { #[tracing::instrument(skip(self))] pub fn get_prefix(&self) -> Vec { let mut prefix = match self { - OutgoingKind::Appservice(server) => { + OutgoingDestination::Appservice(server) => { let mut p = b"+".to_vec(); p.extend_from_slice(server.as_bytes()); p } - OutgoingKind::Push(user, pushkey) => { + OutgoingDestination::Push(user, pushkey) => { let mut p = b"$".to_vec(); p.extend_from_slice(user.as_bytes()); p.push(0xff); p.extend_from_slice(pushkey.as_bytes()); p } - OutgoingKind::Normal(server) => { + OutgoingDestination::Normal(server) => { let mut p = Vec::new(); p.extend_from_slice(server.as_bytes()); p @@ -77,6 +78,47 @@ impl OutgoingKind { prefix } + + // This wraps the OutgoingDestination key in an interruptible sleep future. + // + // The first return value is the future, the second is the oneshot that interrupts that future, + // and causes it to return instantly. + fn wrap_in_interruptible_sleep( + self, + at: Instant, + ) -> (impl Future, oneshot::Sender<()>) { + let (tx, rx) = oneshot::channel(); + let at = tokio::time::Instant::from_std(at); + + ( + async move { + let _ = tokio::time::timeout_at(at, rx).await; + + self + }, + tx, + ) + } +} + +impl std::fmt::Display for OutgoingDestination { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + OutgoingDestination::Appservice(appservice_id) => { + write!(f, "Appservice (ID {:?})", appservice_id) + } + OutgoingDestination::Push(user, push_key) => { + write!( + f, + "User Push Service (for {:?}, with key {:?})", + user, push_key + ) + } + OutgoingDestination::Normal(server) => { + write!(f, "Matrix Server ({:?})", server) + } + } + } } #[derive(Clone, Debug, PartialEq, Eq, Hash)] @@ -90,14 +132,29 @@ pub struct Service { /// The state for a given state hash. pub(super) maximum_requests: Arc, - pub sender: mpsc::UnboundedSender<(OutgoingKind, SendingEventType, Vec)>, - receiver: Mutex)>>, + pub sender: mpsc::UnboundedSender<(OutgoingDestination, SendingEventType, Vec)>, + receiver: Mutex)>>, } enum TransactionStatus { + // Currently running (for the first time) Running, - Failed(u32, Instant), // number of times failed, time of last failure - Retrying(u32), // number of times failed + // Failed, backing off for a retry + Failed { + failures: u32, + waker: Option>, + }, + // Currently retrying + Retrying { + failures: u32, + }, // number of times failed +} + +// A control-flow enum to dictate what the handler should do after (trying to) prepare a transaction +enum TransactionPrepOutcome { + Send(Vec), + Wake(OutgoingDestination), + Nothing, } impl Service { @@ -119,14 +176,17 @@ impl Service { } async fn handler(&self) -> Result<()> { - let mut receiver = self.receiver.lock().await; + let mut new_transactions = self.receiver.lock().await; + let (waking_sender, mut waking_receiver) = mpsc::unbounded_channel(); - let mut futures = FuturesUnordered::new(); + let mut outgoing = FuturesUnordered::new(); + let mut retrying = FuturesUnordered::new(); - let mut current_transaction_status = HashMap::::new(); + let mut current_transaction_status = + HashMap::::new(); // Retry requests we could not finish yet - let mut initial_transactions = HashMap::>::new(); + let mut initial_transactions = HashMap::>::new(); for (key, outgoing_kind, event) in self.db.active_requests().filter_map(|r| r.ok()) { let entry = initial_transactions @@ -147,12 +207,34 @@ impl Service { for (outgoing_kind, events) in initial_transactions { current_transaction_status.insert(outgoing_kind.clone(), TransactionStatus::Running); - futures.push(Self::handle_events(outgoing_kind.clone(), events)); + outgoing.push(Self::handle_events(outgoing_kind.clone(), events)); } loop { select! { - Some(response) = futures.next() => { + // New transactions to be sent out (from server/user activity) + Some((dest, event, key)) = new_transactions.recv() => { + match self.prepare_transaction( + &dest, + vec![(event, key)], + &mut current_transaction_status, + true, + ) { + Ok(TransactionPrepOutcome::Send(events)) => { + outgoing.push(Self::handle_events(dest, events)); + }, + Ok(TransactionPrepOutcome::Wake(dest)) => { + waking_sender.send(dest).expect("nothing closes this channel but ourselves"); + }, + Ok(TransactionPrepOutcome::Nothing) => {}, + Err(err) => { + error!("Ignoring error in (fresh) outgoing request ({}) handler: {}", dest, err) + } + } + }, + + Some(response) = outgoing.next() => { + // Outgoing transaction succeeded match response { Ok(outgoing_kind) => { self.db.delete_all_active_requests_for(&outgoing_kind)?; @@ -164,9 +246,12 @@ impl Service { // Insert pdus we found self.db.mark_as_active(&new_events)?; - futures.push( + // Clear retries + current_transaction_status.insert(outgoing_kind.clone(), TransactionStatus::Running); + + outgoing.push( Self::handle_events( - outgoing_kind.clone(), + outgoing_kind, new_events.into_iter().map(|(event, _)| event).collect(), ) ); @@ -174,72 +259,160 @@ impl Service { current_transaction_status.remove(&outgoing_kind); } } - Err((outgoing_kind, _)) => { - current_transaction_status.entry(outgoing_kind).and_modify(|e| *e = match e { - TransactionStatus::Running => TransactionStatus::Failed(1, Instant::now()), - TransactionStatus::Retrying(n) => TransactionStatus::Failed(*n+1, Instant::now()), - TransactionStatus::Failed(_, _) => { - error!("Request that was not even running failed?!"); - return - }, - }); + + // Outgoing transaction failed + Err((destination, err)) => { + // Set status to Failed, create timer + let timer = Self::mark_failed_and_backoff(&mut current_transaction_status, destination.clone()); + + // Add timer to loop + retrying.push(timer); + + warn!("Outgoing request to {} failed: {}", destination, err); } }; }, - Some((outgoing_kind, event, key)) = receiver.recv() => { - if let Ok(Some(events)) = self.select_events( - &outgoing_kind, - vec![(event, key)], + + // Transaction retry timers firing + Some(dest) = retrying.next() => { + // Transition Failed => Retrying, return pending old transaction events + match self.prepare_transaction( + &dest, + vec![], // will be ignored because fresh == false &mut current_transaction_status, + false, ) { - futures.push(Self::handle_events(outgoing_kind, events)); + Ok(TransactionPrepOutcome::Send(events)) => { + outgoing.push(Self::handle_events(dest, events)); + } + Ok(_) => { + // Unreachable because fresh == false + unreachable!("prepare_transaction on a stale transaction {} did not return ::Send", dest) + } + + Err(err) => { + error!("Ignoring error in (stale) outgoing request ({}) handler: {}", dest, err); + + // transaction dropped, so drop destination as well. + current_transaction_status.remove(&dest); + } } - } + }, + + // Explicit wakeups, makes a backoff timer return immediately + Some(outgoing) = waking_receiver.recv() => { + if let Some(TransactionStatus::Failed { waker, .. }) = current_transaction_status.get_mut(&outgoing) { + if let Some(waker) = waker.take() { + let _ = waker.send(()); + } + } + }, } } } + /// Generates timer/oneshot, alters status to reflect Failed + /// + /// Returns timer/oneshot future to wake up loop for next retry + fn mark_failed_and_backoff( + status: &mut HashMap, + dest: OutgoingDestination, + ) -> impl Future { + let now = Instant::now(); + + let entry = status + .get_mut(&dest) + .expect("guaranteed to be set before this function"); + + let failures = match entry { + // Running -> Failed + TransactionStatus::Running => 1, + // Retrying -> Failed + TransactionStatus::Retrying { failures } => *failures + 1, + + // The transition of Failed -> Retrying is handled by handle_events + TransactionStatus::Failed { .. } => { + unreachable!("TransactionStatus in inconsistent state: Expected either Running or Retrying, got Failed, bailing...") + } + }; + + const ONE_DAY: Duration = Duration::from_secs(60 * 60 * 24); + + // Exponential backoff, clamp upper value to one day + let next_wakeup = now + (Duration::from_secs(30) * failures * failures).min(ONE_DAY); + + let (fut, waker) = dest.wrap_in_interruptible_sleep(next_wakeup); + + *entry = TransactionStatus::Failed { + failures, + waker: Some(waker), + }; + + fut + } + + // This prepares a transaction, checks the transaction state, and selects appropriate events. #[tracing::instrument(skip(self, outgoing_kind, new_events, current_transaction_status))] - fn select_events( + fn prepare_transaction( &self, - outgoing_kind: &OutgoingKind, + outgoing_kind: &OutgoingDestination, new_events: Vec<(SendingEventType, Vec)>, // Events we want to send: event and full key - current_transaction_status: &mut HashMap, - ) -> Result>> { - let mut retry = false; - let mut allow = true; + current_transaction_status: &mut HashMap, + fresh: bool, // Wether or not this transaction came from server activity. + ) -> Result { + let mut allowed = true; + let mut retrying = false; + let mut wake_up = false; let entry = current_transaction_status.entry(outgoing_kind.clone()); - entry - .and_modify(|e| match e { - TransactionStatus::Running | TransactionStatus::Retrying(_) => { - allow = false; // already running - } - TransactionStatus::Failed(tries, time) => { - // Fail if a request has failed recently (exponential backoff) - let mut min_elapsed_duration = Duration::from_secs(30) * (*tries) * (*tries); - if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) { - min_elapsed_duration = Duration::from_secs(60 * 60 * 24); + if fresh { + // If its fresh, we initialise the status if we need to. + // + // We do nothing if it is already running or retrying. + // + // We return with a wake if it is in the Failed state. + entry + .and_modify(|e| match e { + TransactionStatus::Running | TransactionStatus::Retrying { .. } => { + // already running + allowed = false; } - - if time.elapsed() < min_elapsed_duration { - allow = false; - } else { - retry = true; - *e = TransactionStatus::Retrying(*tries); + TransactionStatus::Failed { .. } => { + // Currently sleeping, time to call the kool-aid man + wake_up = true; } - } - }) - .or_insert(TransactionStatus::Running); + }) + .or_insert(TransactionStatus::Running); + } else { + // If it's not fresh, we expect an entry. + // + // We also expect us to be the only one who are touching this destination right now, and its a stale transaction, so it must be in the Failed state + match entry { + Entry::Occupied(mut e) => { + let e = e.get_mut(); + match e { + TransactionStatus::Failed { failures, .. } => { + *e = TransactionStatus::Retrying { failures: *failures }; + retrying = true; + }, - if !allow { - return Ok(None); + _ => unreachable!("Encountered bad state when preparing stale transaction: expected Failed state, got Running or Retrying") + } + }, + Entry::Vacant(_) => unreachable!("Encountered bad state when preparing stale transaction: expected Failed state, got vacant state"), + } + } + + if wake_up { + return Ok(TransactionPrepOutcome::Wake(outgoing_kind.clone())); + } else if !allowed { + return Ok(TransactionPrepOutcome::Nothing); } let mut events = Vec::new(); - if retry { + if retrying { // We retry the previous transaction for (_, e) in self .db @@ -254,7 +427,7 @@ impl Service { events.push(e); } - if let OutgoingKind::Normal(server_name) = outgoing_kind { + if let OutgoingDestination::Normal(server_name) = outgoing_kind { if let Ok((select_edus, last_count)) = self.select_edus(server_name) { events.extend(select_edus.into_iter().map(SendingEventType::Edu)); @@ -263,7 +436,7 @@ impl Service { } } - Ok(Some(events)) + Ok(TransactionPrepOutcome::Send(events)) } #[tracing::instrument(skip(self, server_name))] @@ -371,7 +544,7 @@ impl Service { #[tracing::instrument(skip(self, pdu_id, user, pushkey))] pub fn send_push_pdu(&self, pdu_id: &[u8], user: &UserId, pushkey: String) -> Result<()> { - let outgoing_kind = OutgoingKind::Push(user.to_owned(), pushkey); + let outgoing_kind = OutgoingDestination::Push(user.to_owned(), pushkey); let event = SendingEventType::Pdu(pdu_id.to_owned()); let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?; self.sender @@ -391,7 +564,7 @@ impl Service { .into_iter() .map(|server| { ( - OutgoingKind::Normal(server), + OutgoingDestination::Normal(server), SendingEventType::Pdu(pdu_id.to_owned()), ) }) @@ -418,7 +591,7 @@ impl Service { serialized: Vec, id: u64, ) -> Result<()> { - let outgoing_kind = OutgoingKind::Normal(server.to_owned()); + let outgoing_kind = OutgoingDestination::Normal(server.to_owned()); let event = SendingEventType::Edu(serialized); let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?; self.sender @@ -430,7 +603,7 @@ impl Service { #[tracing::instrument(skip(self))] pub fn send_pdu_appservice(&self, appservice_id: String, pdu_id: Vec) -> Result<()> { - let outgoing_kind = OutgoingKind::Appservice(appservice_id); + let outgoing_kind = OutgoingDestination::Appservice(appservice_id); let event = SendingEventType::Pdu(pdu_id); let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?; self.sender @@ -446,18 +619,18 @@ impl Service { #[tracing::instrument(skip(self))] pub fn cleanup_events(&self, appservice_id: String) -> Result<()> { self.db - .delete_all_requests_for(&OutgoingKind::Appservice(appservice_id))?; + .delete_all_requests_for(&OutgoingDestination::Appservice(appservice_id))?; Ok(()) } #[tracing::instrument(skip(events, kind))] async fn handle_events( - kind: OutgoingKind, + kind: OutgoingDestination, events: Vec, - ) -> Result { + ) -> Result { match &kind { - OutgoingKind::Appservice(id) => { + OutgoingDestination::Appservice(id) => { let mut pdu_jsons = Vec::new(); for event in &events { @@ -522,7 +695,7 @@ impl Service { response } - OutgoingKind::Push(userid, pushkey) => { + OutgoingDestination::Push(userid, pushkey) => { let mut pdus = Vec::new(); for event in &events { @@ -561,14 +734,16 @@ impl Service { } } - let pusher = match services() - .pusher - .get_pusher(userid, pushkey) - .map_err(|e| (OutgoingKind::Push(userid.clone(), pushkey.clone()), e))? - { - Some(pusher) => pusher, - None => continue, - }; + let pusher = + match services().pusher.get_pusher(userid, pushkey).map_err(|e| { + ( + OutgoingDestination::Push(userid.clone(), pushkey.clone()), + e, + ) + })? { + Some(pusher) => pusher, + None => continue, + }; let rules_for_user = services() .account_data @@ -601,9 +776,9 @@ impl Service { drop(permit); } - Ok(OutgoingKind::Push(userid.clone(), pushkey.clone())) + Ok(OutgoingDestination::Push(userid.clone(), pushkey.clone())) } - OutgoingKind::Normal(server) => { + OutgoingDestination::Normal(server) => { let mut edu_jsons = Vec::new(); let mut pdu_jsons = Vec::new(); @@ -615,11 +790,11 @@ impl Service { services().rooms .timeline .get_pdu_json_from_id(pdu_id) - .map_err(|e| (OutgoingKind::Normal(server.clone()), e))? + .map_err(|e| (OutgoingDestination::Normal(server.clone()), e))? .ok_or_else(|| { error!("event not found: {server} {pdu_id:?}"); ( - OutgoingKind::Normal(server.clone()), + OutgoingDestination::Normal(server.clone()), Error::bad_database( "[Normal] Event in servernamevent_data not found in db.", ),