From 8f4f3314c6c24ca9eb54fc25927d4eb186b27115 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Kub=C3=ADk?= Date: Sun, 10 Jul 2022 14:37:34 +0200 Subject: [PATCH] refactor(edus): split edus into separate modules --- src/database/key_value.rs | 14 +- src/service/rooms/edus/data.rs | 91 ------- src/service/rooms/edus/mod.rs | 259 +------------------- src/service/rooms/edus/presence/data.rs | 33 +++ src/service/rooms/edus/presence/mod.rs | 119 +++++++++ src/service/rooms/edus/read_receipt/data.rs | 31 +++ src/service/rooms/edus/read_receipt/mod.rs | 53 ++++ src/service/rooms/edus/typing/data.rs | 14 ++ src/service/rooms/edus/typing/mod.rs | 87 +++++++ 9 files changed, 349 insertions(+), 352 deletions(-) delete mode 100644 src/service/rooms/edus/data.rs create mode 100644 src/service/rooms/edus/presence/data.rs create mode 100644 src/service/rooms/edus/presence/mod.rs create mode 100644 src/service/rooms/edus/read_receipt/data.rs create mode 100644 src/service/rooms/edus/read_receipt/mod.rs create mode 100644 src/service/rooms/edus/typing/data.rs create mode 100644 src/service/rooms/edus/typing/mod.rs diff --git a/src/database/key_value.rs b/src/database/key_value.rs index 34916e4b..0be13115 100644 --- a/src/database/key_value.rs +++ b/src/database/key_value.rs @@ -156,7 +156,7 @@ impl service::room::directory::Data for KeyValueDatabase { } } -impl service::room::edus::Data for KeyValueDatabase { +impl service::room::edus::read_receipt::Data for KeyValueDatabase { fn readreceipt_update( &self, user_id: &UserId, @@ -203,7 +203,7 @@ impl service::room::edus::Data for KeyValueDatabase { room_id: &RoomId, since: u64, ) -> impl Iterator< - Item = Result<( + Item=Result<( Box, u64, Raw, @@ -229,7 +229,7 @@ impl service::room::edus::Data for KeyValueDatabase { Error::bad_database("Invalid readreceiptid userid bytes in db.") })?, ) - .map_err(|_| Error::bad_database("Invalid readreceiptid userid in db."))?; + .map_err(|_| Error::bad_database("Invalid readreceiptid userid in db."))?; let mut json = serde_json::from_slice::(&v).map_err(|_| { Error::bad_database("Read receipt in roomlatestid_roomlatest is invalid json.") @@ -293,7 +293,9 @@ impl service::room::edus::Data for KeyValueDatabase { .transpose()? .unwrap_or(0)) } +} +impl service::room::edus::typing::Data for KeyValueDatabase { fn typing_add( &self, user_id: &UserId, @@ -379,14 +381,16 @@ impl service::room::edus::Data for KeyValueDatabase { let user_id = UserId::parse(utils::string_from_bytes(&user_id).map_err(|_| { Error::bad_database("User ID in typingid_userid is invalid unicode.") })?) - .map_err(|_| Error::bad_database("User ID in typingid_userid is invalid."))?; + .map_err(|_| Error::bad_database("User ID in typingid_userid is invalid."))?; user_ids.insert(user_id); } Ok(user_ids) } +} +impl service::room::edus::presence::Data for KeyValueDatabase { fn update_presence( &self, user_id: &UserId, @@ -416,7 +420,7 @@ impl service::room::edus::Data for KeyValueDatabase { Ok(()) } - pub fn ping_presence(&self, user_id: &UserId) -> Result<()> { + fn ping_presence(&self, user_id: &UserId) -> Result<()> { self.userid_lastpresenceupdate.insert( user_id.as_bytes(), &utils::millis_since_unix_epoch().to_be_bytes(), diff --git a/src/service/rooms/edus/data.rs b/src/service/rooms/edus/data.rs deleted file mode 100644 index 16c14cf3..00000000 --- a/src/service/rooms/edus/data.rs +++ /dev/null @@ -1,91 +0,0 @@ -pub trait Data { - /// Replaces the previous read receipt. - fn readreceipt_update( - &self, - user_id: &UserId, - room_id: &RoomId, - event: ReceiptEvent, - ) -> Result<()>; - - /// Returns an iterator over the most recent read_receipts in a room that happened after the event with id `since`. - fn readreceipts_since( - &self, - room_id: &RoomId, - since: u64, - ) -> impl Iterator< - Item = Result<( - Box, - u64, - Raw, - )>, - >; - - /// Sets a private read marker at `count`. - fn private_read_set( - &self, - room_id: &RoomId, - user_id: &UserId, - count: u64, - ) -> Result<()>; - - /// Returns the private read marker. - fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result>; - - /// Returns the count of the last typing update in this room. - fn last_privateread_update(&self, user_id: &UserId, room_id: &RoomId) -> Result; - - /// Sets a user as typing until the timeout timestamp is reached or roomtyping_remove is - /// called. - fn typing_add( - &self, - user_id: &UserId, - room_id: &RoomId, - timeout: u64, - ) -> Result<()>; - - /// Removes a user from typing before the timeout is reached. - fn typing_remove( - &self, - user_id: &UserId, - room_id: &RoomId, - ) -> Result<()>; - - /// Returns the count of the last typing update in this room. - fn last_typing_update( - &self, - room_id: &RoomId, - ) -> Result; - - /// Returns all user ids currently typing. - fn typings_all( - &self, - room_id: &RoomId, - ) -> Result>; - - /// Adds a presence event which will be saved until a new event replaces it. - /// - /// Note: This method takes a RoomId because presence updates are always bound to rooms to - /// make sure users outside these rooms can't see them. - fn update_presence( - &self, - user_id: &UserId, - room_id: &RoomId, - presence: PresenceEvent, - ) -> Result<()>; - - /// Resets the presence timeout, so the user will stay in their current presence state. - fn ping_presence(&self, user_id: &UserId) -> Result<()>; - - /// Returns the timestamp of the last presence update of this user in millis since the unix epoch. - fn last_presence_update(&self, user_id: &UserId) -> Result>; - - /// Returns the presence event with correct last_active_ago. - fn get_presence_event(&self, room_id: &RoomId, user_id: &UserId, count: u64) -> Result>; - - /// Returns the most recent presence updates that happened after the event with id `since`. - fn presence_since( - &self, - room_id: &RoomId, - since: u64, - ) -> Result, PresenceEvent>>; -} diff --git a/src/service/rooms/edus/mod.rs b/src/service/rooms/edus/mod.rs index 06adf57e..d8ce5300 100644 --- a/src/service/rooms/edus/mod.rs +++ b/src/service/rooms/edus/mod.rs @@ -1,256 +1,3 @@ -mod data; -pub use data::Data; - -use crate::service::*; - -pub struct Service { - db: D, -} - -impl Service<_> { - /// Replaces the previous read receipt. - pub fn readreceipt_update( - &self, - user_id: &UserId, - room_id: &RoomId, - event: ReceiptEvent, - ) -> Result<()> { - self.db.readreceipt_update(user_id, room_id, event); - } - - /// Returns an iterator over the most recent read_receipts in a room that happened after the event with id `since`. - #[tracing::instrument(skip(self))] - pub fn readreceipts_since<'a>( - &'a self, - room_id: &RoomId, - since: u64, - ) -> impl Iterator< - Item = Result<( - Box, - u64, - Raw, - )>, - > + 'a { - self.db.readreceipts_since(room_id, since) - } - - /// Sets a private read marker at `count`. - #[tracing::instrument(skip(self, globals))] - pub fn private_read_set( - &self, - room_id: &RoomId, - user_id: &UserId, - count: u64, - ) -> Result<()> { - self.db.private_read_set(room_id, user_id, count) - } - - /// Returns the private read marker. - #[tracing::instrument(skip(self))] - pub fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result> { - self.db.private_read_get(room_id, user_id) - } - - /// Returns the count of the last typing update in this room. - pub fn last_privateread_update(&self, user_id: &UserId, room_id: &RoomId) -> Result { - self.db.last_privateread_update(user_id, room_id) - } - - /// Sets a user as typing until the timeout timestamp is reached or roomtyping_remove is - /// called. - pub fn typing_add( - &self, - user_id: &UserId, - room_id: &RoomId, - timeout: u64, - ) -> Result<()> { - self.db.typing_add(user_id, room_id, timeout) - } - - /// Removes a user from typing before the timeout is reached. - pub fn typing_remove( - &self, - user_id: &UserId, - room_id: &RoomId, - ) -> Result<()> { - self.db.typing_remove(user_id, room_id) - } - - /* TODO: Do this in background thread? - /// Makes sure that typing events with old timestamps get removed. - fn typings_maintain( - &self, - room_id: &RoomId, - globals: &super::super::globals::Globals, - ) -> Result<()> { - let mut prefix = room_id.as_bytes().to_vec(); - prefix.push(0xff); - - let current_timestamp = utils::millis_since_unix_epoch(); - - let mut found_outdated = false; - - // Find all outdated edus before inserting a new one - for outdated_edu in self - .typingid_userid - .scan_prefix(prefix) - .map(|(key, _)| { - Ok::<_, Error>(( - key.clone(), - utils::u64_from_bytes( - &key.splitn(2, |&b| b == 0xff).nth(1).ok_or_else(|| { - Error::bad_database("RoomTyping has invalid timestamp or delimiters.") - })?[0..mem::size_of::()], - ) - .map_err(|_| Error::bad_database("RoomTyping has invalid timestamp bytes."))?, - )) - }) - .filter_map(|r| r.ok()) - .take_while(|&(_, timestamp)| timestamp < current_timestamp) - { - // This is an outdated edu (time > timestamp) - self.typingid_userid.remove(&outdated_edu.0)?; - found_outdated = true; - } - - if found_outdated { - self.roomid_lasttypingupdate - .insert(room_id.as_bytes(), &globals.next_count()?.to_be_bytes())?; - } - - Ok(()) - } - */ - - /// Returns the count of the last typing update in this room. - #[tracing::instrument(skip(self, globals))] - pub fn last_typing_update( - &self, - room_id: &RoomId, - ) -> Result { - self.db.last_typing_update(room_id) - } - - /// Returns a new typing EDU. - pub fn typings_all( - &self, - room_id: &RoomId, - ) -> Result> { - let user_ids = self.db.typings_all(room_id)?; - - Ok(SyncEphemeralRoomEvent { - content: ruma::events::typing::TypingEventContent { - user_ids: user_ids.into_iter().collect(), - }, - }) - } - - /// Adds a presence event which will be saved until a new event replaces it. - /// - /// Note: This method takes a RoomId because presence updates are always bound to rooms to - /// make sure users outside these rooms can't see them. - pub fn update_presence( - &self, - user_id: &UserId, - room_id: &RoomId, - presence: PresenceEvent, - ) -> Result<()> { - self.db.update_presence(user_id, room_id, presence) - } - - /// Resets the presence timeout, so the user will stay in their current presence state. - pub fn ping_presence(&self, user_id: &UserId) -> Result<()> { - self.db.ping_presence(user_id) - } - - pub fn get_last_presence_event( - &self, - user_id: &UserId, - room_id: &RoomId, - ) -> Result> { - let last_update = match self.db.last_presence_update(user_id)? { - Some(last) => last, - None => return Ok(None), - }; - - self.db.get_presence_event(room_id, user_id, last_update) - } - - /* TODO - /// Sets all users to offline who have been quiet for too long. - fn _presence_maintain( - &self, - rooms: &super::Rooms, - globals: &super::super::globals::Globals, - ) -> Result<()> { - let current_timestamp = utils::millis_since_unix_epoch(); - - for (user_id_bytes, last_timestamp) in self - .userid_lastpresenceupdate - .iter() - .filter_map(|(k, bytes)| { - Some(( - k, - utils::u64_from_bytes(&bytes) - .map_err(|_| { - Error::bad_database("Invalid timestamp in userid_lastpresenceupdate.") - }) - .ok()?, - )) - }) - .take_while(|(_, timestamp)| current_timestamp.saturating_sub(*timestamp) > 5 * 60_000) - // 5 Minutes - { - // Send new presence events to set the user offline - let count = globals.next_count()?.to_be_bytes(); - let user_id: Box<_> = utils::string_from_bytes(&user_id_bytes) - .map_err(|_| { - Error::bad_database("Invalid UserId bytes in userid_lastpresenceupdate.") - })? - .try_into() - .map_err(|_| Error::bad_database("Invalid UserId in userid_lastpresenceupdate."))?; - for room_id in rooms.rooms_joined(&user_id).filter_map(|r| r.ok()) { - let mut presence_id = room_id.as_bytes().to_vec(); - presence_id.push(0xff); - presence_id.extend_from_slice(&count); - presence_id.push(0xff); - presence_id.extend_from_slice(&user_id_bytes); - - self.presenceid_presence.insert( - &presence_id, - &serde_json::to_vec(&PresenceEvent { - content: PresenceEventContent { - avatar_url: None, - currently_active: None, - displayname: None, - last_active_ago: Some( - last_timestamp.try_into().expect("time is valid"), - ), - presence: PresenceState::Offline, - status_msg: None, - }, - sender: user_id.to_owned(), - }) - .expect("PresenceEvent can be serialized"), - )?; - } - - self.userid_lastpresenceupdate.insert( - user_id.as_bytes(), - &utils::millis_since_unix_epoch().to_be_bytes(), - )?; - } - - Ok(()) - }*/ - - /// Returns the most recent presence updates that happened after the event with id `since`. - #[tracing::instrument(skip(self, since, _rooms, _globals))] - pub fn presence_since( - &self, - room_id: &RoomId, - since: u64, - ) -> Result, PresenceEvent>> { - self.db.presence_since(room_id, since) - } -} +pub mod presence; +pub mod read_receipt; +pub mod typing; diff --git a/src/service/rooms/edus/presence/data.rs b/src/service/rooms/edus/presence/data.rs new file mode 100644 index 00000000..de72e219 --- /dev/null +++ b/src/service/rooms/edus/presence/data.rs @@ -0,0 +1,33 @@ +pub trait Data { + /// Adds a presence event which will be saved until a new event replaces it. + /// + /// Note: This method takes a RoomId because presence updates are always bound to rooms to + /// make sure users outside these rooms can't see them. + fn update_presence( + &self, + user_id: &UserId, + room_id: &RoomId, + presence: PresenceEvent, + ) -> Result<()>; + + /// Resets the presence timeout, so the user will stay in their current presence state. + fn ping_presence(&self, user_id: &UserId) -> Result<()>; + + /// Returns the timestamp of the last presence update of this user in millis since the unix epoch. + fn last_presence_update(&self, user_id: &UserId) -> Result>; + + /// Returns the presence event with correct last_active_ago. + fn get_presence_event( + &self, + room_id: &RoomId, + user_id: &UserId, + count: u64, + ) -> Result>; + + /// Returns the most recent presence updates that happened after the event with id `since`. + fn presence_since( + &self, + room_id: &RoomId, + since: u64, + ) -> Result, PresenceEvent>>; +} diff --git a/src/service/rooms/edus/presence/mod.rs b/src/service/rooms/edus/presence/mod.rs new file mode 100644 index 00000000..5793a799 --- /dev/null +++ b/src/service/rooms/edus/presence/mod.rs @@ -0,0 +1,119 @@ +mod data; +pub use data::Data; + +use crate::service::*; + +pub struct Service { + db: D, +} + +impl Service<_> { + /// Adds a presence event which will be saved until a new event replaces it. + /// + /// Note: This method takes a RoomId because presence updates are always bound to rooms to + /// make sure users outside these rooms can't see them. + pub fn update_presence( + &self, + user_id: &UserId, + room_id: &RoomId, + presence: PresenceEvent, + ) -> Result<()> { + self.db.update_presence(user_id, room_id, presence) + } + + /// Resets the presence timeout, so the user will stay in their current presence state. + pub fn ping_presence(&self, user_id: &UserId) -> Result<()> { + self.db.ping_presence(user_id) + } + + pub fn get_last_presence_event( + &self, + user_id: &UserId, + room_id: &RoomId, + ) -> Result> { + let last_update = match self.db.last_presence_update(user_id)? { + Some(last) => last, + None => return Ok(None), + }; + + self.db.get_presence_event(room_id, user_id, last_update) + } + + /* TODO + /// Sets all users to offline who have been quiet for too long. + fn _presence_maintain( + &self, + rooms: &super::Rooms, + globals: &super::super::globals::Globals, + ) -> Result<()> { + let current_timestamp = utils::millis_since_unix_epoch(); + + for (user_id_bytes, last_timestamp) in self + .userid_lastpresenceupdate + .iter() + .filter_map(|(k, bytes)| { + Some(( + k, + utils::u64_from_bytes(&bytes) + .map_err(|_| { + Error::bad_database("Invalid timestamp in userid_lastpresenceupdate.") + }) + .ok()?, + )) + }) + .take_while(|(_, timestamp)| current_timestamp.saturating_sub(*timestamp) > 5 * 60_000) + // 5 Minutes + { + // Send new presence events to set the user offline + let count = globals.next_count()?.to_be_bytes(); + let user_id: Box<_> = utils::string_from_bytes(&user_id_bytes) + .map_err(|_| { + Error::bad_database("Invalid UserId bytes in userid_lastpresenceupdate.") + })? + .try_into() + .map_err(|_| Error::bad_database("Invalid UserId in userid_lastpresenceupdate."))?; + for room_id in rooms.rooms_joined(&user_id).filter_map(|r| r.ok()) { + let mut presence_id = room_id.as_bytes().to_vec(); + presence_id.push(0xff); + presence_id.extend_from_slice(&count); + presence_id.push(0xff); + presence_id.extend_from_slice(&user_id_bytes); + + self.presenceid_presence.insert( + &presence_id, + &serde_json::to_vec(&PresenceEvent { + content: PresenceEventContent { + avatar_url: None, + currently_active: None, + displayname: None, + last_active_ago: Some( + last_timestamp.try_into().expect("time is valid"), + ), + presence: PresenceState::Offline, + status_msg: None, + }, + sender: user_id.to_owned(), + }) + .expect("PresenceEvent can be serialized"), + )?; + } + + self.userid_lastpresenceupdate.insert( + user_id.as_bytes(), + &utils::millis_since_unix_epoch().to_be_bytes(), + )?; + } + + Ok(()) + }*/ + + /// Returns the most recent presence updates that happened after the event with id `since`. + #[tracing::instrument(skip(self, since, _rooms, _globals))] + pub fn presence_since( + &self, + room_id: &RoomId, + since: u64, + ) -> Result, PresenceEvent>> { + self.db.presence_since(room_id, since) + } +} diff --git a/src/service/rooms/edus/read_receipt/data.rs b/src/service/rooms/edus/read_receipt/data.rs new file mode 100644 index 00000000..4befcf2c --- /dev/null +++ b/src/service/rooms/edus/read_receipt/data.rs @@ -0,0 +1,31 @@ +pub trait Data { + /// Replaces the previous read receipt. + fn readreceipt_update( + &self, + user_id: &UserId, + room_id: &RoomId, + event: ReceiptEvent, + ) -> Result<()>; + + /// Returns an iterator over the most recent read_receipts in a room that happened after the event with id `since`. + fn readreceipts_since( + &self, + room_id: &RoomId, + since: u64, + ) -> impl Iterator< + Item = Result<( + Box, + u64, + Raw, + )>, + >; + + /// Sets a private read marker at `count`. + fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64) -> Result<()>; + + /// Returns the private read marker. + fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result>; + + /// Returns the count of the last typing update in this room. + fn last_privateread_update(&self, user_id: &UserId, room_id: &RoomId) -> Result; +} diff --git a/src/service/rooms/edus/read_receipt/mod.rs b/src/service/rooms/edus/read_receipt/mod.rs new file mode 100644 index 00000000..9cd474fb --- /dev/null +++ b/src/service/rooms/edus/read_receipt/mod.rs @@ -0,0 +1,53 @@ +mod data; +pub use data::Data; + +use crate::service::*; + +pub struct Service { + db: D, +} + +impl Service<_> { + /// Replaces the previous read receipt. + pub fn readreceipt_update( + &self, + user_id: &UserId, + room_id: &RoomId, + event: ReceiptEvent, + ) -> Result<()> { + self.db.readreceipt_update(user_id, room_id, event); + } + + /// Returns an iterator over the most recent read_receipts in a room that happened after the event with id `since`. + #[tracing::instrument(skip(self))] + pub fn readreceipts_since<'a>( + &'a self, + room_id: &RoomId, + since: u64, + ) -> impl Iterator< + Item = Result<( + Box, + u64, + Raw, + )>, + > + 'a { + self.db.readreceipts_since(room_id, since) + } + + /// Sets a private read marker at `count`. + #[tracing::instrument(skip(self, globals))] + pub fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64) -> Result<()> { + self.db.private_read_set(room_id, user_id, count) + } + + /// Returns the private read marker. + #[tracing::instrument(skip(self))] + pub fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result> { + self.db.private_read_get(room_id, user_id) + } + + /// Returns the count of the last typing update in this room. + pub fn last_privateread_update(&self, user_id: &UserId, room_id: &RoomId) -> Result { + self.db.last_privateread_update(user_id, room_id) + } +} diff --git a/src/service/rooms/edus/typing/data.rs b/src/service/rooms/edus/typing/data.rs new file mode 100644 index 00000000..83ff90ea --- /dev/null +++ b/src/service/rooms/edus/typing/data.rs @@ -0,0 +1,14 @@ +pub trait Data { + /// Sets a user as typing until the timeout timestamp is reached or roomtyping_remove is + /// called. + fn typing_add(&self, user_id: &UserId, room_id: &RoomId, timeout: u64) -> Result<()>; + + /// Removes a user from typing before the timeout is reached. + fn typing_remove(&self, user_id: &UserId, room_id: &RoomId) -> Result<()>; + + /// Returns the count of the last typing update in this room. + fn last_typing_update(&self, room_id: &RoomId) -> Result; + + /// Returns all user ids currently typing. + fn typings_all(&self, room_id: &RoomId) -> Result>; +} diff --git a/src/service/rooms/edus/typing/mod.rs b/src/service/rooms/edus/typing/mod.rs new file mode 100644 index 00000000..b29c7888 --- /dev/null +++ b/src/service/rooms/edus/typing/mod.rs @@ -0,0 +1,87 @@ +mod data; +pub use data::Data; + +use crate::service::*; + +pub struct Service { + db: D, +} + +impl Service<_> { + /// Sets a user as typing until the timeout timestamp is reached or roomtyping_remove is + /// called. + pub fn typing_add(&self, user_id: &UserId, room_id: &RoomId, timeout: u64) -> Result<()> { + self.db.typing_add(user_id, room_id, timeout) + } + + /// Removes a user from typing before the timeout is reached. + pub fn typing_remove(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> { + self.db.typing_remove(user_id, room_id) + } + + /* TODO: Do this in background thread? + /// Makes sure that typing events with old timestamps get removed. + fn typings_maintain( + &self, + room_id: &RoomId, + globals: &super::super::globals::Globals, + ) -> Result<()> { + let mut prefix = room_id.as_bytes().to_vec(); + prefix.push(0xff); + + let current_timestamp = utils::millis_since_unix_epoch(); + + let mut found_outdated = false; + + // Find all outdated edus before inserting a new one + for outdated_edu in self + .typingid_userid + .scan_prefix(prefix) + .map(|(key, _)| { + Ok::<_, Error>(( + key.clone(), + utils::u64_from_bytes( + &key.splitn(2, |&b| b == 0xff).nth(1).ok_or_else(|| { + Error::bad_database("RoomTyping has invalid timestamp or delimiters.") + })?[0..mem::size_of::()], + ) + .map_err(|_| Error::bad_database("RoomTyping has invalid timestamp bytes."))?, + )) + }) + .filter_map(|r| r.ok()) + .take_while(|&(_, timestamp)| timestamp < current_timestamp) + { + // This is an outdated edu (time > timestamp) + self.typingid_userid.remove(&outdated_edu.0)?; + found_outdated = true; + } + + if found_outdated { + self.roomid_lasttypingupdate + .insert(room_id.as_bytes(), &globals.next_count()?.to_be_bytes())?; + } + + Ok(()) + } + */ + + /// Returns the count of the last typing update in this room. + #[tracing::instrument(skip(self, globals))] + pub fn last_typing_update(&self, room_id: &RoomId) -> Result { + self.db.last_typing_update(room_id) + } + + /// Returns a new typing EDU. + pub fn typings_all( + &self, + room_id: &RoomId, + ) -> Result> { + let user_ids = self.db.typings_all(room_id)?; + + Ok(SyncEphemeralRoomEvent { + content: ruma::events::typing::TypingEventContent { + user_ids: user_ids.into_iter().collect(), + }, + }) + } +}