diff --git a/src/database/key_value.rs b/src/database/key_value.rs index 8ae51eb8..9bed73ca 100644 --- a/src/database/key_value.rs +++ b/src/database/key_value.rs @@ -11,7 +11,7 @@ impl service::room::state::Data for KeyValueDatabase { }) } - fn set_room_state(room_id: &RoomId, new_shortstatehash: u64 + fn set_room_state(&self, room_id: &RoomId, new_shortstatehash: u64 _mutex_lock: &MutexGuard<'_, StateLock>, // Take mutex guard to make sure users get the room state mutex ) -> Result<()> { self.roomid_shortstatehash @@ -19,7 +19,7 @@ impl service::room::state::Data for KeyValueDatabase { Ok(()) } - fn set_event_state() -> Result<()> { + fn set_event_state(&self) -> Result<()> { db.shorteventid_shortstatehash .insert(&shorteventid.to_be_bytes(), &shortstatehash.to_be_bytes())?; Ok(()) @@ -63,3 +63,449 @@ impl service::room::state::Data for KeyValueDatabase { } } + +impl service::room::alias::Data for KeyValueDatabase { + fn set_alias( + &self, + alias: &RoomAliasId, + room_id: Option<&RoomId> + ) -> Result<()> { + self.alias_roomid + .insert(alias.alias().as_bytes(), room_id.as_bytes())?; + let mut aliasid = room_id.as_bytes().to_vec(); + aliasid.push(0xff); + aliasid.extend_from_slice(&globals.next_count()?.to_be_bytes()); + self.aliasid_alias.insert(&aliasid, &*alias.as_bytes())?; + Ok(()) + } + + fn remove_alias( + &self, + alias: &RoomAliasId, + ) -> Result<()> { + if let Some(room_id) = self.alias_roomid.get(alias.alias().as_bytes())? { + let mut prefix = room_id.to_vec(); + prefix.push(0xff); + + for (key, _) in self.aliasid_alias.scan_prefix(prefix) { + self.aliasid_alias.remove(&key)?; + } + self.alias_roomid.remove(alias.alias().as_bytes())?; + } else { + return Err(Error::BadRequest( + ErrorKind::NotFound, + "Alias does not exist.", + )); + } + Ok(()) + } + + fn resolve_local_alias( + &self, + alias: &RoomAliasId + ) -> Result<()> { + self.alias_roomid + .get(alias.alias().as_bytes())? + .map(|bytes| { + RoomId::parse(utils::string_from_bytes(&bytes).map_err(|_| { + Error::bad_database("Room ID in alias_roomid is invalid unicode.") + })?) + .map_err(|_| Error::bad_database("Room ID in alias_roomid is invalid.")) + }) + .transpose() + } + + fn local_aliases_for_room( + &self, + room_id: &RoomId, + ) -> Result<()> { + let mut prefix = room_id.as_bytes().to_vec(); + prefix.push(0xff); + + self.aliasid_alias.scan_prefix(prefix).map(|(_, bytes)| { + utils::string_from_bytes(&bytes) + .map_err(|_| Error::bad_database("Invalid alias bytes in aliasid_alias."))? + .try_into() + .map_err(|_| Error::bad_database("Invalid alias in aliasid_alias.")) + }) + } +} + +impl service::room::directory::Data for KeyValueDatabase { + fn set_public(&self, room_id: &RoomId) -> Result<()> { + self.publicroomids.insert(room_id.as_bytes(), &[])?; + } + + fn set_not_public(&self, room_id: &RoomId) -> Result<()> { + self.publicroomids.remove(room_id.as_bytes())?; + } + + fn is_public_room(&self, room_id: &RoomId) -> Result { + Ok(self.publicroomids.get(room_id.as_bytes())?.is_some()) + } + + fn public_rooms(&self) -> impl Iterator>> + '_ { + self.publicroomids.iter().map(|(bytes, _)| { + RoomId::parse( + utils::string_from_bytes(&bytes).map_err(|_| { + Error::bad_database("Room ID in publicroomids is invalid unicode.") + })?, + ) + .map_err(|_| Error::bad_database("Room ID in publicroomids is invalid.")) + }) + } +} + +impl service::room::edus::Data for KeyValueDatabase { + fn readreceipt_update( + &self, + user_id: &UserId, + room_id: &RoomId, + event: ReceiptEvent, + ) -> Result<()> { + let mut prefix = room_id.as_bytes().to_vec(); + prefix.push(0xff); + + let mut last_possible_key = prefix.clone(); + last_possible_key.extend_from_slice(&u64::MAX.to_be_bytes()); + + // Remove old entry + if let Some((old, _)) = self + .readreceiptid_readreceipt + .iter_from(&last_possible_key, true) + .take_while(|(key, _)| key.starts_with(&prefix)) + .find(|(key, _)| { + key.rsplit(|&b| b == 0xff) + .next() + .expect("rsplit always returns an element") + == user_id.as_bytes() + }) + { + // This is the old room_latest + self.readreceiptid_readreceipt.remove(&old)?; + } + + let mut room_latest_id = prefix; + room_latest_id.extend_from_slice(&globals.next_count()?.to_be_bytes()); + room_latest_id.push(0xff); + room_latest_id.extend_from_slice(user_id.as_bytes()); + + self.readreceiptid_readreceipt.insert( + &room_latest_id, + &serde_json::to_vec(&event).expect("EduEvent::to_string always works"), + )?; + + Ok(()) + } + + pub fn readreceipts_since<'a>( + &'a self, + room_id: &RoomId, + since: u64, + ) -> impl Iterator< + Item = Result<( + Box, + u64, + Raw, + )>, + > + 'a { + let mut prefix = room_id.as_bytes().to_vec(); + prefix.push(0xff); + let prefix2 = prefix.clone(); + + let mut first_possible_edu = prefix.clone(); + first_possible_edu.extend_from_slice(&(since + 1).to_be_bytes()); // +1 so we don't send the event at since + + self.readreceiptid_readreceipt + .iter_from(&first_possible_edu, false) + .take_while(move |(k, _)| k.starts_with(&prefix2)) + .map(move |(k, v)| { + let count = + utils::u64_from_bytes(&k[prefix.len()..prefix.len() + mem::size_of::()]) + .map_err(|_| Error::bad_database("Invalid readreceiptid count in db."))?; + let user_id = UserId::parse( + utils::string_from_bytes(&k[prefix.len() + mem::size_of::() + 1..]) + .map_err(|_| { + Error::bad_database("Invalid readreceiptid userid bytes in db.") + })?, + ) + .map_err(|_| Error::bad_database("Invalid readreceiptid userid in db."))?; + + let mut json = serde_json::from_slice::(&v).map_err(|_| { + Error::bad_database("Read receipt in roomlatestid_roomlatest is invalid json.") + })?; + json.remove("room_id"); + + Ok(( + user_id, + count, + Raw::from_json( + serde_json::value::to_raw_value(&json).expect("json is valid raw value"), + ), + )) + }) + } + + fn private_read_set( + &self, + room_id: &RoomId, + user_id: &UserId, + count: u64, + ) -> Result<()> { + let mut key = room_id.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(user_id.as_bytes()); + + self.roomuserid_privateread + .insert(&key, &count.to_be_bytes())?; + + self.roomuserid_lastprivatereadupdate + .insert(&key, &globals.next_count()?.to_be_bytes())?; + } + + fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result> { + let mut key = room_id.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(user_id.as_bytes()); + + self.roomuserid_privateread + .get(&key)? + .map_or(Ok(None), |v| { + Ok(Some(utils::u64_from_bytes(&v).map_err(|_| { + Error::bad_database("Invalid private read marker bytes") + })?)) + }) + } + + fn last_privateread_update(&self, user_id: &UserId, room_id: &RoomId) -> Result { + let mut key = room_id.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(user_id.as_bytes()); + + Ok(self + .roomuserid_lastprivatereadupdate + .get(&key)? + .map(|bytes| { + utils::u64_from_bytes(&bytes).map_err(|_| { + Error::bad_database("Count in roomuserid_lastprivatereadupdate is invalid.") + }) + }) + .transpose()? + .unwrap_or(0)) + } + + fn typing_add( + &self, + user_id: &UserId, + room_id: &RoomId, + timeout: u64, + globals: &super::super::globals::Globals, + ) -> Result<()> { + let mut prefix = room_id.as_bytes().to_vec(); + prefix.push(0xff); + + let count = globals.next_count()?.to_be_bytes(); + + let mut room_typing_id = prefix; + room_typing_id.extend_from_slice(&timeout.to_be_bytes()); + room_typing_id.push(0xff); + room_typing_id.extend_from_slice(&count); + + self.typingid_userid + .insert(&room_typing_id, &*user_id.as_bytes())?; + + self.roomid_lasttypingupdate + .insert(room_id.as_bytes(), &count)?; + + Ok(()) + } + + fn typing_remove( + &self, + user_id: &UserId, + room_id: &RoomId, + ) -> Result<()> { + let mut prefix = room_id.as_bytes().to_vec(); + prefix.push(0xff); + + let user_id = user_id.to_string(); + + let mut found_outdated = false; + + // Maybe there are multiple ones from calling roomtyping_add multiple times + for outdated_edu in self + .typingid_userid + .scan_prefix(prefix) + .filter(|(_, v)| &**v == user_id.as_bytes()) + { + self.typingid_userid.remove(&outdated_edu.0)?; + found_outdated = true; + } + + if found_outdated { + self.roomid_lasttypingupdate + .insert(room_id.as_bytes(), &globals.next_count()?.to_be_bytes())?; + } + + Ok(()) + } + + fn last_typing_update( + &self, + room_id: &RoomId, + ) -> Result { + Ok(self + .roomid_lasttypingupdate + .get(room_id.as_bytes())? + .map(|bytes| { + utils::u64_from_bytes(&bytes).map_err(|_| { + Error::bad_database("Count in roomid_lastroomactiveupdate is invalid.") + }) + }) + .transpose()? + .unwrap_or(0)) + } + + fn typings_all( + &self, + room_id: &RoomId, + ) -> Result> { + let mut prefix = room_id.as_bytes().to_vec(); + prefix.push(0xff); + + let mut user_ids = HashSet::new(); + + for (_, user_id) in self.typingid_userid.scan_prefix(prefix) { + let user_id = UserId::parse(utils::string_from_bytes(&user_id).map_err(|_| { + Error::bad_database("User ID in typingid_userid is invalid unicode.") + })?) + .map_err(|_| Error::bad_database("User ID in typingid_userid is invalid."))?; + + user_ids.insert(user_id); + } + + Ok(user_ids) + } + + fn update_presence( + &self, + user_id: &UserId, + room_id: &RoomId, + presence: PresenceEvent, + ) -> Result<()> { + // TODO: Remove old entry? Or maybe just wipe completely from time to time? + + let count = globals.next_count()?.to_be_bytes(); + + let mut presence_id = room_id.as_bytes().to_vec(); + presence_id.push(0xff); + presence_id.extend_from_slice(&count); + presence_id.push(0xff); + presence_id.extend_from_slice(presence.sender.as_bytes()); + + self.presenceid_presence.insert( + &presence_id, + &serde_json::to_vec(&presence).expect("PresenceEvent can be serialized"), + )?; + + self.userid_lastpresenceupdate.insert( + user_id.as_bytes(), + &utils::millis_since_unix_epoch().to_be_bytes(), + )?; + + Ok(()) + } + + pub fn ping_presence(&self, user_id: &UserId) -> Result<()> { + self.userid_lastpresenceupdate.insert( + user_id.as_bytes(), + &utils::millis_since_unix_epoch().to_be_bytes(), + )?; + + Ok(()) + } + + fn last_presence_update(&self, user_id: &UserId) -> Result> { + self.userid_lastpresenceupdate + .get(user_id.as_bytes())? + .map(|bytes| { + utils::u64_from_bytes(&bytes).map_err(|_| { + Error::bad_database("Invalid timestamp in userid_lastpresenceupdate.") + }) + }) + .transpose() + } + + fn get_presence_event( + &self, + user_id: &UserId, + room_id: &RoomId, + count: u64, + ) -> Result> { + let mut presence_id = room_id.as_bytes().to_vec(); + presence_id.push(0xff); + presence_id.extend_from_slice(&count.to_be_bytes()); + presence_id.push(0xff); + presence_id.extend_from_slice(user_id.as_bytes()); + + self.presenceid_presence + .get(&presence_id)? + .map(|value| parse_presence_event(&value)) + .transpose() + } + + fn presence_since( + &self, + room_id: &RoomId, + since: u64, + ) -> Result, PresenceEvent>> { + let mut prefix = room_id.as_bytes().to_vec(); + prefix.push(0xff); + + let mut first_possible_edu = prefix.clone(); + first_possible_edu.extend_from_slice(&(since + 1).to_be_bytes()); // +1 so we don't send the event at since + let mut hashmap = HashMap::new(); + + for (key, value) in self + .presenceid_presence + .iter_from(&*first_possible_edu, false) + .take_while(|(key, _)| key.starts_with(&prefix)) + { + let user_id = UserId::parse( + utils::string_from_bytes( + key.rsplit(|&b| b == 0xff) + .next() + .expect("rsplit always returns an element"), + ) + .map_err(|_| Error::bad_database("Invalid UserId bytes in presenceid_presence."))?, + ) + .map_err(|_| Error::bad_database("Invalid UserId in presenceid_presence."))?; + + let presence = parse_presence_event(&value)?; + + hashmap.insert(user_id, presence); + } + + Ok(hashmap) + } +} + +fn parse_presence_event(bytes: &[u8]) -> Result { + let mut presence: PresenceEvent = serde_json::from_slice(bytes) + .map_err(|_| Error::bad_database("Invalid presence event in db."))?; + + let current_timestamp: UInt = utils::millis_since_unix_epoch() + .try_into() + .expect("time is valid"); + + if presence.content.presence == PresenceState::Online { + // Don't set last_active_ago when the user is online + presence.content.last_active_ago = None; + } else { + // Convert from timestamp to duration + presence.content.last_active_ago = presence + .content + .last_active_ago + .map(|timestamp| current_timestamp - timestamp); + } +} diff --git a/src/service/rooms/alias/data.rs b/src/service/rooms/alias/data.rs new file mode 100644 index 00000000..9dbfc7b5 --- /dev/null +++ b/src/service/rooms/alias/data.rs @@ -0,0 +1,22 @@ +pub trait Data { + /// Creates or updates the alias to the given room id. + pub fn set_alias( + alias: &RoomAliasId, + room_id: &RoomId + ) -> Result<()>; + + /// Forgets about an alias. Returns an error if the alias did not exist. + pub fn remove_alias( + alias: &RoomAliasId, + ) -> Result<()>; + + /// Looks up the roomid for the given alias. + pub fn resolve_local_alias( + alias: &RoomAliasId, + ) -> Result<()>; + + /// Returns all local aliases that point to the given room + pub fn local_aliases_for_room( + alias: &RoomAliasId, + ) -> Result<()>; +} diff --git a/src/service/rooms/alias/mod.rs b/src/service/rooms/alias/mod.rs index 393ad671..cfe05396 100644 --- a/src/service/rooms/alias/mod.rs +++ b/src/service/rooms/alias/mod.rs @@ -1,66 +1,40 @@ +mod data; +pub use data::Data; +use crate::service::*; + +pub struct Service { + db: D, +} + +impl Service<_> { #[tracing::instrument(skip(self, globals))] pub fn set_alias( &self, alias: &RoomAliasId, - room_id: Option<&RoomId>, - globals: &super::globals::Globals, + room_id: &RoomId, ) -> Result<()> { - if let Some(room_id) = room_id { - // New alias - self.alias_roomid - .insert(alias.alias().as_bytes(), room_id.as_bytes())?; - let mut aliasid = room_id.as_bytes().to_vec(); - aliasid.push(0xff); - aliasid.extend_from_slice(&globals.next_count()?.to_be_bytes()); - self.aliasid_alias.insert(&aliasid, &*alias.as_bytes())?; - } else { - // room_id=None means remove alias - if let Some(room_id) = self.alias_roomid.get(alias.alias().as_bytes())? { - let mut prefix = room_id.to_vec(); - prefix.push(0xff); + self.db.set_alias(alias, room_id) + } - for (key, _) in self.aliasid_alias.scan_prefix(prefix) { - self.aliasid_alias.remove(&key)?; - } - self.alias_roomid.remove(alias.alias().as_bytes())?; - } else { - return Err(Error::BadRequest( - ErrorKind::NotFound, - "Alias does not exist.", - )); - } - } - - Ok(()) + #[tracing::instrument(skip(self, globals))] + pub fn remove_alias( + &self, + alias: &RoomAliasId, + ) -> Result<()> { + self.db.remove_alias(alias) } #[tracing::instrument(skip(self))] - pub fn id_from_alias(&self, alias: &RoomAliasId) -> Result>> { - self.alias_roomid - .get(alias.alias().as_bytes())? - .map(|bytes| { - RoomId::parse(utils::string_from_bytes(&bytes).map_err(|_| { - Error::bad_database("Room ID in alias_roomid is invalid unicode.") - })?) - .map_err(|_| Error::bad_database("Room ID in alias_roomid is invalid.")) - }) - .transpose() + pub fn resolve_local_alias(&self, alias: &RoomAliasId) -> Result>> { + self.db.resolve_local_alias(alias: &RoomAliasId) } #[tracing::instrument(skip(self))] - pub fn room_aliases<'a>( + pub fn local_aliases_for_room<'a>( &'a self, room_id: &RoomId, ) -> impl Iterator>> + 'a { - let mut prefix = room_id.as_bytes().to_vec(); - prefix.push(0xff); - - self.aliasid_alias.scan_prefix(prefix).map(|(_, bytes)| { - utils::string_from_bytes(&bytes) - .map_err(|_| Error::bad_database("Invalid alias bytes in aliasid_alias."))? - .try_into() - .map_err(|_| Error::bad_database("Invalid alias in aliasid_alias.")) - }) + self.db.local_aliases_for_room(room_id) } - +} diff --git a/src/service/rooms/directory/data.rs b/src/service/rooms/directory/data.rs new file mode 100644 index 00000000..83d78853 --- /dev/null +++ b/src/service/rooms/directory/data.rs @@ -0,0 +1,13 @@ +pub trait Data { + /// Adds the room to the public room directory + fn set_public(room_id: &RoomId) -> Result<()>; + + /// Removes the room from the public room directory. + fn set_not_public(room_id: &RoomId) -> Result<()>; + + /// Returns true if the room is in the public room directory. + fn is_public_room(room_id: &RoomId) -> Result; + + /// Returns the unsorted public room directory + fn public_rooms() -> impl Iterator>> + '_; +} diff --git a/src/service/rooms/directory/mod.rs b/src/service/rooms/directory/mod.rs index 8be7bd57..b92933f4 100644 --- a/src/service/rooms/directory/mod.rs +++ b/src/service/rooms/directory/mod.rs @@ -1,29 +1,30 @@ +mod data; +pub use data::Data; + +use crate::service::*; + +pub struct Service { + db: D, +} + +impl Service<_> { + #[tracing::instrument(skip(self))] + pub fn set_public(&self, room_id: &RoomId) -> Result<()> { + self.db.set_public(&self, room_id) + } #[tracing::instrument(skip(self))] - pub fn set_public(&self, room_id: &RoomId, public: bool) -> Result<()> { - if public { - self.publicroomids.insert(room_id.as_bytes(), &[])?; - } else { - self.publicroomids.remove(room_id.as_bytes())?; - } - - Ok(()) + pub fn set_not_public(&self, room_id: &RoomId) -> Result<()> { + self.db.set_not_public(&self, room_id) } #[tracing::instrument(skip(self))] pub fn is_public_room(&self, room_id: &RoomId) -> Result { - Ok(self.publicroomids.get(room_id.as_bytes())?.is_some()) + self.db.is_public_room(&self, room_id) } #[tracing::instrument(skip(self))] pub fn public_rooms(&self) -> impl Iterator>> + '_ { - self.publicroomids.iter().map(|(bytes, _)| { - RoomId::parse( - utils::string_from_bytes(&bytes).map_err(|_| { - Error::bad_database("Room ID in publicroomids is invalid unicode.") - })?, - ) - .map_err(|_| Error::bad_database("Room ID in publicroomids is invalid.")) - }) + self.db.public_rooms(&self, room_id) } - +} diff --git a/src/service/rooms/edus/data.rs b/src/service/rooms/edus/data.rs new file mode 100644 index 00000000..16c14cf3 --- /dev/null +++ b/src/service/rooms/edus/data.rs @@ -0,0 +1,91 @@ +pub trait Data { + /// Replaces the previous read receipt. + fn readreceipt_update( + &self, + user_id: &UserId, + room_id: &RoomId, + event: ReceiptEvent, + ) -> Result<()>; + + /// Returns an iterator over the most recent read_receipts in a room that happened after the event with id `since`. + fn readreceipts_since( + &self, + room_id: &RoomId, + since: u64, + ) -> impl Iterator< + Item = Result<( + Box, + u64, + Raw, + )>, + >; + + /// Sets a private read marker at `count`. + fn private_read_set( + &self, + room_id: &RoomId, + user_id: &UserId, + count: u64, + ) -> Result<()>; + + /// Returns the private read marker. + fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result>; + + /// Returns the count of the last typing update in this room. + fn last_privateread_update(&self, user_id: &UserId, room_id: &RoomId) -> Result; + + /// Sets a user as typing until the timeout timestamp is reached or roomtyping_remove is + /// called. + fn typing_add( + &self, + user_id: &UserId, + room_id: &RoomId, + timeout: u64, + ) -> Result<()>; + + /// Removes a user from typing before the timeout is reached. + fn typing_remove( + &self, + user_id: &UserId, + room_id: &RoomId, + ) -> Result<()>; + + /// Returns the count of the last typing update in this room. + fn last_typing_update( + &self, + room_id: &RoomId, + ) -> Result; + + /// Returns all user ids currently typing. + fn typings_all( + &self, + room_id: &RoomId, + ) -> Result>; + + /// Adds a presence event which will be saved until a new event replaces it. + /// + /// Note: This method takes a RoomId because presence updates are always bound to rooms to + /// make sure users outside these rooms can't see them. + fn update_presence( + &self, + user_id: &UserId, + room_id: &RoomId, + presence: PresenceEvent, + ) -> Result<()>; + + /// Resets the presence timeout, so the user will stay in their current presence state. + fn ping_presence(&self, user_id: &UserId) -> Result<()>; + + /// Returns the timestamp of the last presence update of this user in millis since the unix epoch. + fn last_presence_update(&self, user_id: &UserId) -> Result>; + + /// Returns the presence event with correct last_active_ago. + fn get_presence_event(&self, room_id: &RoomId, user_id: &UserId, count: u64) -> Result>; + + /// Returns the most recent presence updates that happened after the event with id `since`. + fn presence_since( + &self, + room_id: &RoomId, + since: u64, + ) -> Result, PresenceEvent>>; +} diff --git a/src/service/rooms/edus/mod.rs b/src/service/rooms/edus/mod.rs index 118efd4c..06adf57e 100644 --- a/src/service/rooms/edus/mod.rs +++ b/src/service/rooms/edus/mod.rs @@ -1,73 +1,21 @@ -use crate::{database::abstraction::Tree, utils, Error, Result}; -use ruma::{ - events::{ - presence::{PresenceEvent, PresenceEventContent}, - receipt::ReceiptEvent, - SyncEphemeralRoomEvent, - }, - presence::PresenceState, - serde::Raw, - signatures::CanonicalJsonObject, - RoomId, UInt, UserId, -}; -use std::{ - collections::{HashMap, HashSet}, - mem, - sync::Arc, -}; +mod data; +pub use data::Data; -pub struct RoomEdus { - pub(in super::super) readreceiptid_readreceipt: Arc, // ReadReceiptId = RoomId + Count + UserId - pub(in super::super) roomuserid_privateread: Arc, // RoomUserId = Room + User, PrivateRead = Count - pub(in super::super) roomuserid_lastprivatereadupdate: Arc, // LastPrivateReadUpdate = Count - pub(in super::super) typingid_userid: Arc, // TypingId = RoomId + TimeoutTime + Count - pub(in super::super) roomid_lasttypingupdate: Arc, // LastRoomTypingUpdate = Count - pub(in super::super) presenceid_presence: Arc, // PresenceId = RoomId + Count + UserId - pub(in super::super) userid_lastpresenceupdate: Arc, // LastPresenceUpdate = Count +use crate::service::*; + +pub struct Service { + db: D, } -impl RoomEdus { - /// Adds an event which will be saved until a new event replaces it (e.g. read receipt). +impl Service<_> { + /// Replaces the previous read receipt. pub fn readreceipt_update( &self, user_id: &UserId, room_id: &RoomId, event: ReceiptEvent, - globals: &super::super::globals::Globals, ) -> Result<()> { - let mut prefix = room_id.as_bytes().to_vec(); - prefix.push(0xff); - - let mut last_possible_key = prefix.clone(); - last_possible_key.extend_from_slice(&u64::MAX.to_be_bytes()); - - // Remove old entry - if let Some((old, _)) = self - .readreceiptid_readreceipt - .iter_from(&last_possible_key, true) - .take_while(|(key, _)| key.starts_with(&prefix)) - .find(|(key, _)| { - key.rsplit(|&b| b == 0xff) - .next() - .expect("rsplit always returns an element") - == user_id.as_bytes() - }) - { - // This is the old room_latest - self.readreceiptid_readreceipt.remove(&old)?; - } - - let mut room_latest_id = prefix; - room_latest_id.extend_from_slice(&globals.next_count()?.to_be_bytes()); - room_latest_id.push(0xff); - room_latest_id.extend_from_slice(user_id.as_bytes()); - - self.readreceiptid_readreceipt.insert( - &room_latest_id, - &serde_json::to_vec(&event).expect("EduEvent::to_string always works"), - )?; - - Ok(()) + self.db.readreceipt_update(user_id, room_id, event); } /// Returns an iterator over the most recent read_receipts in a room that happened after the event with id `since`. @@ -83,41 +31,7 @@ impl RoomEdus { Raw, )>, > + 'a { - let mut prefix = room_id.as_bytes().to_vec(); - prefix.push(0xff); - let prefix2 = prefix.clone(); - - let mut first_possible_edu = prefix.clone(); - first_possible_edu.extend_from_slice(&(since + 1).to_be_bytes()); // +1 so we don't send the event at since - - self.readreceiptid_readreceipt - .iter_from(&first_possible_edu, false) - .take_while(move |(k, _)| k.starts_with(&prefix2)) - .map(move |(k, v)| { - let count = - utils::u64_from_bytes(&k[prefix.len()..prefix.len() + mem::size_of::()]) - .map_err(|_| Error::bad_database("Invalid readreceiptid count in db."))?; - let user_id = UserId::parse( - utils::string_from_bytes(&k[prefix.len() + mem::size_of::() + 1..]) - .map_err(|_| { - Error::bad_database("Invalid readreceiptid userid bytes in db.") - })?, - ) - .map_err(|_| Error::bad_database("Invalid readreceiptid userid in db."))?; - - let mut json = serde_json::from_slice::(&v).map_err(|_| { - Error::bad_database("Read receipt in roomlatestid_roomlatest is invalid json.") - })?; - json.remove("room_id"); - - Ok(( - user_id, - count, - Raw::from_json( - serde_json::value::to_raw_value(&json).expect("json is valid raw value"), - ), - )) - }) + self.db.readreceipts_since(room_id, since) } /// Sets a private read marker at `count`. @@ -127,53 +41,19 @@ impl RoomEdus { room_id: &RoomId, user_id: &UserId, count: u64, - globals: &super::super::globals::Globals, ) -> Result<()> { - let mut key = room_id.as_bytes().to_vec(); - key.push(0xff); - key.extend_from_slice(user_id.as_bytes()); - - self.roomuserid_privateread - .insert(&key, &count.to_be_bytes())?; - - self.roomuserid_lastprivatereadupdate - .insert(&key, &globals.next_count()?.to_be_bytes())?; - - Ok(()) + self.db.private_read_set(room_id, user_id, count) } /// Returns the private read marker. #[tracing::instrument(skip(self))] pub fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result> { - let mut key = room_id.as_bytes().to_vec(); - key.push(0xff); - key.extend_from_slice(user_id.as_bytes()); - - self.roomuserid_privateread - .get(&key)? - .map_or(Ok(None), |v| { - Ok(Some(utils::u64_from_bytes(&v).map_err(|_| { - Error::bad_database("Invalid private read marker bytes") - })?)) - }) + self.db.private_read_get(room_id, user_id) } /// Returns the count of the last typing update in this room. pub fn last_privateread_update(&self, user_id: &UserId, room_id: &RoomId) -> Result { - let mut key = room_id.as_bytes().to_vec(); - key.push(0xff); - key.extend_from_slice(user_id.as_bytes()); - - Ok(self - .roomuserid_lastprivatereadupdate - .get(&key)? - .map(|bytes| { - utils::u64_from_bytes(&bytes).map_err(|_| { - Error::bad_database("Count in roomuserid_lastprivatereadupdate is invalid.") - }) - }) - .transpose()? - .unwrap_or(0)) + self.db.last_privateread_update(user_id, room_id) } /// Sets a user as typing until the timeout timestamp is reached or roomtyping_remove is @@ -183,25 +63,8 @@ impl RoomEdus { user_id: &UserId, room_id: &RoomId, timeout: u64, - globals: &super::super::globals::Globals, ) -> Result<()> { - let mut prefix = room_id.as_bytes().to_vec(); - prefix.push(0xff); - - let count = globals.next_count()?.to_be_bytes(); - - let mut room_typing_id = prefix; - room_typing_id.extend_from_slice(&timeout.to_be_bytes()); - room_typing_id.push(0xff); - room_typing_id.extend_from_slice(&count); - - self.typingid_userid - .insert(&room_typing_id, &*user_id.as_bytes())?; - - self.roomid_lasttypingupdate - .insert(room_id.as_bytes(), &count)?; - - Ok(()) + self.db.typing_add(user_id, room_id, timeout) } /// Removes a user from typing before the timeout is reached. @@ -209,33 +72,11 @@ impl RoomEdus { &self, user_id: &UserId, room_id: &RoomId, - globals: &super::super::globals::Globals, ) -> Result<()> { - let mut prefix = room_id.as_bytes().to_vec(); - prefix.push(0xff); - - let user_id = user_id.to_string(); - - let mut found_outdated = false; - - // Maybe there are multiple ones from calling roomtyping_add multiple times - for outdated_edu in self - .typingid_userid - .scan_prefix(prefix) - .filter(|(_, v)| &**v == user_id.as_bytes()) - { - self.typingid_userid.remove(&outdated_edu.0)?; - found_outdated = true; - } - - if found_outdated { - self.roomid_lasttypingupdate - .insert(room_id.as_bytes(), &globals.next_count()?.to_be_bytes())?; - } - - Ok(()) + self.db.typing_remove(user_id, room_id) } + /* TODO: Do this in background thread? /// Makes sure that typing events with old timestamps get removed. fn typings_maintain( &self, @@ -279,45 +120,23 @@ impl RoomEdus { Ok(()) } + */ /// Returns the count of the last typing update in this room. #[tracing::instrument(skip(self, globals))] pub fn last_typing_update( &self, room_id: &RoomId, - globals: &super::super::globals::Globals, ) -> Result { - self.typings_maintain(room_id, globals)?; - - Ok(self - .roomid_lasttypingupdate - .get(room_id.as_bytes())? - .map(|bytes| { - utils::u64_from_bytes(&bytes).map_err(|_| { - Error::bad_database("Count in roomid_lastroomactiveupdate is invalid.") - }) - }) - .transpose()? - .unwrap_or(0)) + self.db.last_typing_update(room_id) } + /// Returns a new typing EDU. pub fn typings_all( &self, room_id: &RoomId, ) -> Result> { - let mut prefix = room_id.as_bytes().to_vec(); - prefix.push(0xff); - - let mut user_ids = HashSet::new(); - - for (_, user_id) in self.typingid_userid.scan_prefix(prefix) { - let user_id = UserId::parse(utils::string_from_bytes(&user_id).map_err(|_| { - Error::bad_database("User ID in typingid_userid is invalid unicode.") - })?) - .map_err(|_| Error::bad_database("User ID in typingid_userid is invalid."))?; - - user_ids.insert(user_id); - } + let user_ids = self.db.typings_all(room_id)?; Ok(SyncEphemeralRoomEvent { content: ruma::events::typing::TypingEventContent { @@ -335,52 +154,13 @@ impl RoomEdus { user_id: &UserId, room_id: &RoomId, presence: PresenceEvent, - globals: &super::super::globals::Globals, ) -> Result<()> { - // TODO: Remove old entry? Or maybe just wipe completely from time to time? - - let count = globals.next_count()?.to_be_bytes(); - - let mut presence_id = room_id.as_bytes().to_vec(); - presence_id.push(0xff); - presence_id.extend_from_slice(&count); - presence_id.push(0xff); - presence_id.extend_from_slice(presence.sender.as_bytes()); - - self.presenceid_presence.insert( - &presence_id, - &serde_json::to_vec(&presence).expect("PresenceEvent can be serialized"), - )?; - - self.userid_lastpresenceupdate.insert( - user_id.as_bytes(), - &utils::millis_since_unix_epoch().to_be_bytes(), - )?; - - Ok(()) + self.db.update_presence(user_id, room_id, presence) } /// Resets the presence timeout, so the user will stay in their current presence state. - #[tracing::instrument(skip(self))] pub fn ping_presence(&self, user_id: &UserId) -> Result<()> { - self.userid_lastpresenceupdate.insert( - user_id.as_bytes(), - &utils::millis_since_unix_epoch().to_be_bytes(), - )?; - - Ok(()) - } - - /// Returns the timestamp of the last presence update of this user in millis since the unix epoch. - pub fn last_presence_update(&self, user_id: &UserId) -> Result> { - self.userid_lastpresenceupdate - .get(user_id.as_bytes())? - .map(|bytes| { - utils::u64_from_bytes(&bytes).map_err(|_| { - Error::bad_database("Invalid timestamp in userid_lastpresenceupdate.") - }) - }) - .transpose() + self.db.ping_presence(user_id) } pub fn get_last_presence_event( @@ -388,42 +168,15 @@ impl RoomEdus { user_id: &UserId, room_id: &RoomId, ) -> Result> { - let last_update = match self.last_presence_update(user_id)? { + let last_update = match self.db.last_presence_update(user_id)? { Some(last) => last, None => return Ok(None), }; - let mut presence_id = room_id.as_bytes().to_vec(); - presence_id.push(0xff); - presence_id.extend_from_slice(&last_update.to_be_bytes()); - presence_id.push(0xff); - presence_id.extend_from_slice(user_id.as_bytes()); - - self.presenceid_presence - .get(&presence_id)? - .map(|value| { - let mut presence: PresenceEvent = serde_json::from_slice(&value) - .map_err(|_| Error::bad_database("Invalid presence event in db."))?; - let current_timestamp: UInt = utils::millis_since_unix_epoch() - .try_into() - .expect("time is valid"); - - if presence.content.presence == PresenceState::Online { - // Don't set last_active_ago when the user is online - presence.content.last_active_ago = None; - } else { - // Convert from timestamp to duration - presence.content.last_active_ago = presence - .content - .last_active_ago - .map(|timestamp| current_timestamp - timestamp); - } - - Ok(presence) - }) - .transpose() + self.db.get_presence_event(room_id, user_id, last_update) } + /* TODO /// Sets all users to offline who have been quiet for too long. fn _presence_maintain( &self, @@ -489,62 +242,15 @@ impl RoomEdus { } Ok(()) - } + }*/ - /// Returns an iterator over the most recent presence updates that happened after the event with id `since`. + /// Returns the most recent presence updates that happened after the event with id `since`. #[tracing::instrument(skip(self, since, _rooms, _globals))] pub fn presence_since( &self, room_id: &RoomId, since: u64, - _rooms: &super::Rooms, - _globals: &super::super::globals::Globals, ) -> Result, PresenceEvent>> { - //self.presence_maintain(rooms, globals)?; - - let mut prefix = room_id.as_bytes().to_vec(); - prefix.push(0xff); - - let mut first_possible_edu = prefix.clone(); - first_possible_edu.extend_from_slice(&(since + 1).to_be_bytes()); // +1 so we don't send the event at since - let mut hashmap = HashMap::new(); - - for (key, value) in self - .presenceid_presence - .iter_from(&*first_possible_edu, false) - .take_while(|(key, _)| key.starts_with(&prefix)) - { - let user_id = UserId::parse( - utils::string_from_bytes( - key.rsplit(|&b| b == 0xff) - .next() - .expect("rsplit always returns an element"), - ) - .map_err(|_| Error::bad_database("Invalid UserId bytes in presenceid_presence."))?, - ) - .map_err(|_| Error::bad_database("Invalid UserId in presenceid_presence."))?; - - let mut presence: PresenceEvent = serde_json::from_slice(&value) - .map_err(|_| Error::bad_database("Invalid presence event in db."))?; - - let current_timestamp: UInt = utils::millis_since_unix_epoch() - .try_into() - .expect("time is valid"); - - if presence.content.presence == PresenceState::Online { - // Don't set last_active_ago when the user is online - presence.content.last_active_ago = None; - } else { - // Convert from timestamp to duration - presence.content.last_active_ago = presence - .content - .last_active_ago - .map(|timestamp| current_timestamp - timestamp); - } - - hashmap.insert(user_id, presence); - } - - Ok(hashmap) + self.db.presence_since(room_id, since) } } diff --git a/src/service/rooms/state/mod.rs b/src/service/rooms/state/mod.rs index bf926078..b513ab53 100644 --- a/src/service/rooms/state/mod.rs +++ b/src/service/rooms/state/mod.rs @@ -14,8 +14,8 @@ impl Service<_> { &self, room_id: &RoomId, shortstatehash: u64, - statediffnew :HashSet, - statediffremoved :HashSet, + statediffnew: HashSet, + statediffremoved: HashSet, db: &Database, ) -> Result<()> {