From 09e1713c303dc4aa1427a783699c9f7ebc4e999c Mon Sep 17 00:00:00 2001 From: Matthias Ahouansou Date: Fri, 19 Apr 2024 17:47:21 +0100 Subject: [PATCH] feat(devices): update the device last seen timestamp on usage --- src/api/client_server/device.rs | 4 +- src/api/client_server/sync.rs | 22 ++++++++ src/api/ruma_wrapper/axum.rs | 2 +- src/api/server_server.rs | 4 +- src/database/key_value/users.rs | 44 +++++++++++++-- src/database/mod.rs | 1 + src/main.rs | 2 +- src/service/globals/mod.rs | 4 +- src/service/mod.rs | 7 ++- src/service/users/data.rs | 13 +++-- src/service/users/mod.rs | 98 +++++++++++++++++++++++++++++---- 11 files changed, 170 insertions(+), 31 deletions(-) diff --git a/src/api/client_server/device.rs b/src/api/client_server/device.rs index 9a42f048..c5b52f05 100644 --- a/src/api/client_server/device.rs +++ b/src/api/client_server/device.rs @@ -17,8 +17,8 @@ pub async fn get_devices_route( let devices: Vec = services() .users - .all_devices_metadata(sender_user) - .filter_map(|r| r.ok()) // Filter out buggy devices + .all_user_devices_metadata(sender_user) + .await .collect(); Ok(get_devices::v3::Response { devices }) diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs index 3485477a..356c7155 100644 --- a/src/api/client_server/sync.rs +++ b/src/api/client_server/sync.rs @@ -71,6 +71,17 @@ pub async fn sync_events_route( ) -> Result> { let sender_user = body.sender_user.expect("user is authenticated"); let sender_device = body.sender_device.expect("user is authenticated"); + + let cloned_sender_user = sender_user.clone(); + let cloned_sender_device = sender_device.clone(); + // No need to block sync on device last-seen update + tokio::spawn(async move { + services() + .users + .update_device_last_seen(cloned_sender_user, cloned_sender_device) + .await; + }); + let body = body.body; let mut rx = match services() @@ -1274,6 +1285,17 @@ pub async fn sync_events_v5_route( ) -> Result> { let sender_user = body.sender_user.expect("user is authenticated"); let sender_device = body.sender_device.expect("user is authenticated"); + + let cloned_sender_user = sender_user.clone(); + let cloned_sender_device = sender_device.clone(); + // No need to block sync on device last-seen update + tokio::spawn(async move { + services() + .users + .update_device_last_seen(cloned_sender_user, cloned_sender_device) + .await; + }); + let mut body = body.body; // Setup watchers, so if there's no response, we can wait for them let watcher = services().globals.watch(&sender_user, &sender_device); diff --git a/src/api/ruma_wrapper/axum.rs b/src/api/ruma_wrapper/axum.rs index dd755f64..4be5ec2b 100644 --- a/src/api/ruma_wrapper/axum.rs +++ b/src/api/ruma_wrapper/axum.rs @@ -89,7 +89,7 @@ where if let Some(reg_info) = services().appservice.find_from_token(token).await { Token::Appservice(Box::new(reg_info.clone())) } else if let Some((user_id, device_id)) = services().users.find_from_token(token)? { - Token::User((user_id, OwnedDeviceId::from(device_id))) + Token::User((user_id, device_id)) } else { Token::Invalid } diff --git a/src/api/server_server.rs b/src/api/server_server.rs index 439139fc..4bb93414 100644 --- a/src/api/server_server.rs +++ b/src/api/server_server.rs @@ -2351,8 +2351,8 @@ pub async fn get_devices_route( .expect("version will not grow that large"), devices: services() .users - .all_devices_metadata(&body.user_id) - .filter_map(|r| r.ok()) + .all_user_devices_metadata(&body.user_id) + .await .filter_map(|metadata| { Some(UserDevice { keys: services() diff --git a/src/database/key_value/users.rs b/src/database/key_value/users.rs index 47c79d2e..e8c22739 100644 --- a/src/database/key_value/users.rs +++ b/src/database/key_value/users.rs @@ -41,7 +41,7 @@ impl service::users::Data for KeyValueDatabase { } /// Find out which user an access token belongs to. - fn find_from_token(&self, token: &str) -> Result> { + fn find_from_token(&self, token: &str) -> Result> { self.token_userdeviceid .get(token.as_bytes())? .map_or(Ok(None), |bytes| { @@ -60,9 +60,11 @@ impl service::users::Data for KeyValueDatabase { .map_err(|_| { Error::bad_database("User ID in token_userdeviceid is invalid.") })?, - utils::string_from_bytes(device_bytes).map_err(|_| { - Error::bad_database("Device ID in token_userdeviceid is invalid.") - })?, + utils::string_from_bytes(device_bytes) + .map_err(|_| { + Error::bad_database("Device ID in token_userdeviceid is invalid.") + })? + .into(), ))) }) } @@ -894,7 +896,7 @@ impl service::users::Data for KeyValueDatabase { }) } - fn all_devices_metadata<'a>( + fn all_user_devices_metadata<'a>( &'a self, user_id: &UserId, ) -> Box> + 'a> { @@ -912,6 +914,38 @@ impl service::users::Data for KeyValueDatabase { ) } + fn set_devices_last_seen<'a>( + &'a self, + devices: &'a BTreeMap<(OwnedUserId, OwnedDeviceId), MilliSecondsSinceUnixEpoch>, + ) -> Box> + 'a> { + Box::new(devices.iter().map(|((user, device), timestamp)| { + let mut key = user.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(device.as_bytes()); + + // Device may have been removed since last db write + if let Some(mut device_metadata) = self + .userdeviceid_metadata + .get(&key)? + .map(|bytes| { + serde_json::from_slice::(&bytes).map_err(|_| { + Error::bad_database("Device in userdeviceid_metadata is invalid.") + }) + }) + .transpose()? + { + device_metadata.last_seen_ts = Some(*timestamp); + + self.userdeviceid_metadata.insert( + &key, + &serde_json::to_vec(&device_metadata).expect("Device always serializes"), + )?; + } + + Ok(()) + })) + } + /// Creates a new sync filter. Returns the filter id. fn create_filter(&self, user_id: &UserId, filter: &FilterDefinition) -> Result { let filter_id = utils::random_string(4); diff --git a/src/database/mod.rs b/src/database/mod.rs index 30d4231a..d2fb3267 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -1139,6 +1139,7 @@ impl KeyValueDatabase { services().sending.start_handler(); services().media.start_time_retention_checker(); + services().users.start_device_last_seen_update_task(); Self::start_cleanup_task().await; if services().globals.allow_check_for_updates() { diff --git a/src/main.rs b/src/main.rs index 93362f2c..3b9763c4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -543,7 +543,7 @@ async fn shutdown_signal(handle: ServerHandle) { warn!("Received {}, shutting down...", sig); handle.graceful_shutdown(Some(Duration::from_secs(30))); - services().globals.shutdown(); + services().globals.shutdown().await; #[cfg(feature = "systemd")] let _ = sd_notify::notify(true, &[sd_notify::NotifyState::Stopping]); diff --git a/src/service/globals/mod.rs b/src/service/globals/mod.rs index 05c3eab1..f55e8c1a 100644 --- a/src/service/globals/mod.rs +++ b/src/service/globals/mod.rs @@ -512,11 +512,13 @@ impl Service { Ok(r) } - pub fn shutdown(&self) { + pub async fn shutdown(&self) { self.shutdown.store(true, atomic::Ordering::Relaxed); // On shutdown info!(target: "shutdown-sync", "Received shutdown notification, notifying sync helpers..."); services().globals.rotate.fire(); + // Force write before shutdown + services().users.try_update_device_last_seen().await; } } diff --git a/src/service/mod.rs b/src/service/mod.rs index 832ca8ae..432c0e7a 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -29,7 +29,7 @@ pub struct Services { pub rooms: rooms::Service, pub transaction_ids: transaction_ids::Service, pub uiaa: uiaa::Service, - pub users: users::Service, + pub users: Arc, pub account_data: account_data::Service, pub admin: Arc, pub globals: globals::Service, @@ -112,10 +112,11 @@ impl Services { }, transaction_ids: transaction_ids::Service { db }, uiaa: uiaa::Service { db }, - users: users::Service { + users: Arc::new(users::Service { db, connections: StdMutex::new(BTreeMap::new()), - }, + device_last_seen: Mutex::new(BTreeMap::new()), + }), account_data: account_data::Service { db }, admin: admin::Service::build(), key_backups: key_backups::Service { db }, diff --git a/src/service/users/data.rs b/src/service/users/data.rs index 52cb8a70..74ec8f87 100644 --- a/src/service/users/data.rs +++ b/src/service/users/data.rs @@ -4,8 +4,8 @@ use ruma::{ encryption::{CrossSigningKey, DeviceKeys, OneTimeKey}, events::AnyToDeviceEvent, serde::Raw, - DeviceId, OneTimeKeyAlgorithm, OwnedDeviceId, OwnedMxcUri, OwnedOneTimeKeyId, OwnedUserId, - UInt, UserId, + DeviceId, MilliSecondsSinceUnixEpoch, OneTimeKeyAlgorithm, OwnedDeviceId, OwnedMxcUri, + OwnedOneTimeKeyId, OwnedUserId, UInt, UserId, }; use std::collections::BTreeMap; @@ -20,7 +20,7 @@ pub trait Data: Send + Sync { fn count(&self) -> Result; /// Find out which user an access token belongs to. - fn find_from_token(&self, token: &str) -> Result>; + fn find_from_token(&self, token: &str) -> Result>; /// Returns an iterator over all users on this homeserver. fn iter<'a>(&'a self) -> Box> + 'a>; @@ -202,11 +202,16 @@ pub trait Data: Send + Sync { fn get_devicelist_version(&self, user_id: &UserId) -> Result>; - fn all_devices_metadata<'a>( + fn all_user_devices_metadata<'a>( &'a self, user_id: &UserId, ) -> Box> + 'a>; + fn set_devices_last_seen<'a>( + &'a self, + devices: &'a BTreeMap<(OwnedUserId, OwnedDeviceId), MilliSecondsSinceUnixEpoch>, + ) -> Box> + 'a>; + /// Creates a new sync filter. Returns the filter id. fn create_filter(&self, user_id: &UserId, filter: &FilterDefinition) -> Result; diff --git a/src/service/users/mod.rs b/src/service/users/mod.rs index 0cfe346b..f5d57fb3 100644 --- a/src/service/users/mod.rs +++ b/src/service/users/mod.rs @@ -2,7 +2,8 @@ mod data; use std::{ collections::{BTreeMap, BTreeSet}, mem, - sync::{Arc, Mutex}, + sync::{Arc, Mutex as StdMutex}, + time::Duration, }; pub use data::Data; @@ -15,9 +16,11 @@ use ruma::{ encryption::{CrossSigningKey, DeviceKeys, OneTimeKey}, events::AnyToDeviceEvent, serde::Raw, - DeviceId, OneTimeKeyAlgorithm, OwnedDeviceId, OwnedMxcUri, OwnedOneTimeKeyId, OwnedRoomId, - OwnedUserId, UInt, UserId, + DeviceId, MilliSecondsSinceUnixEpoch, OneTimeKeyAlgorithm, OwnedDeviceId, OwnedMxcUri, + OwnedOneTimeKeyId, OwnedRoomId, OwnedUserId, UInt, UserId, }; +use tokio::{sync::Mutex, time::interval}; +use tracing::{debug, warn}; use crate::{services, Error, Result}; @@ -32,10 +35,59 @@ pub struct Service { pub db: &'static dyn Data, #[allow(clippy::type_complexity)] pub connections: - Mutex>>>, + StdMutex>>>, + pub device_last_seen: Mutex>, } impl Service { + pub fn start_device_last_seen_update_task(self: &Arc) { + let self2 = Arc::clone(self); + tokio::spawn(async move { + // Actually writes the new device times to the database every 55 minutes. + // The device times are always returned fresh from memory + // if they have been changed after the last write. + let timer_interval = Duration::from_secs(60 * 5); + let mut i = interval(timer_interval); + loop { + i.tick().await; + let _ = self2.try_update_device_last_seen().await; + } + }); + } + + pub async fn try_update_device_last_seen(&self) { + debug!("Writing cached device last-seens to database"); + for error in self.write_cached_last_seen().await { + warn!("Error writing last seen timestamp of device to database: {error}"); + } + } + + /// Writes all the currently cached last seen timestamps of devices to the database, + /// clearing the cache in the process + async fn write_cached_last_seen(&self) -> Vec { + let mut guard = self.device_last_seen.lock().await; + if !guard.is_empty() { + // TODO: Replace with `replace` once/if `tokio::sync::Mutex` implements the equivalent + // method from `std`: https://doc.rust-lang.org/std/sync/struct.Mutex.html#method.replace + // i.e. instead of the `let mut guard` above: + //let map = self.device_last_seen.replace(BTreeMap::new()).await; + // We do a clone instead as we don't want start having a backlog of awaiting `lock`s + // for all these DB fetches and writes, which admittedly, might not even be a big deal. + let map = guard.clone(); + guard.clear(); + drop(guard); + + let result = self + .db + .set_devices_last_seen(&map) + .filter_map(Result::err) + .collect(); + result + } else { + Vec::new() + } + } + /// Check if a user has an account on this homeserver. pub fn exists(&self, user_id: &UserId) -> Result { self.db.exists(user_id) @@ -68,7 +120,7 @@ impl Service { cache .entry((user_id, device_id, conn_id)) .or_insert_with(|| { - Arc::new(Mutex::new(SlidingSyncCache { + Arc::new(StdMutex::new(SlidingSyncCache { lists: BTreeMap::new(), subscriptions: BTreeMap::new(), known_rooms: BTreeMap::new(), @@ -161,7 +213,7 @@ impl Service { cache .entry((user_id, device_id, conn_id)) .or_insert_with(|| { - Arc::new(Mutex::new(SlidingSyncCache { + Arc::new(StdMutex::new(SlidingSyncCache { lists: BTreeMap::new(), subscriptions: BTreeMap::new(), known_rooms: BTreeMap::new(), @@ -189,7 +241,7 @@ impl Service { cache .entry((user_id, device_id, conn_id)) .or_insert_with(|| { - Arc::new(Mutex::new(SlidingSyncCache { + Arc::new(StdMutex::new(SlidingSyncCache { lists: BTreeMap::new(), subscriptions: BTreeMap::new(), known_rooms: BTreeMap::new(), @@ -245,7 +297,7 @@ impl Service { } /// Find out which user an access token belongs to. - pub fn find_from_token(&self, token: &str) -> Result> { + pub fn find_from_token(&self, token: &str) -> Result> { self.db.find_from_token(token) } @@ -519,11 +571,25 @@ impl Service { self.db.get_devicelist_version(user_id) } - pub fn all_devices_metadata<'a>( + pub async fn all_user_devices_metadata<'a>( &'a self, - user_id: &UserId, - ) -> impl Iterator> + 'a { - self.db.all_devices_metadata(user_id) + user_id: &'a UserId, + ) -> impl Iterator + 'a { + let all_devices: Vec<_> = self + .db + .all_user_devices_metadata(user_id) + .filter_map(Result::ok) + // RumaHandler trait complains if we don't collect + .collect(); + let device_last_seen = self.device_last_seen.lock().await; + + // Updates the timestamps with the cached ones + all_devices.into_iter().map(move |mut d| { + if let Some(ts) = device_last_seen.get(&(user_id.to_owned(), d.device_id.clone())) { + d.last_seen_ts = Some(*ts); + }; + d + }) } /// Deactivate account @@ -564,6 +630,14 @@ impl Service { pub fn find_from_openid_token(&self, token: &str) -> Result> { self.db.find_from_openid_token(token) } + + /// Sets the device_last_seen timestamp of a given device to now + pub async fn update_device_last_seen(&self, user_id: OwnedUserId, device_id: OwnedDeviceId) { + self.device_last_seen + .lock() + .await + .insert((user_id, device_id), MilliSecondsSinceUnixEpoch::now()); + } } /// Ensure that a user only sees signatures from themselves and the target user