mirror of
https://gitlab.com/famedly/conduit.git
synced 2025-06-27 16:35:59 +00:00
feat(devices): update the device last seen timestamp on usage
This commit is contained in:
parent
a1886a1396
commit
09e1713c30
11 changed files with 170 additions and 31 deletions
|
@ -17,8 +17,8 @@ pub async fn get_devices_route(
|
|||
|
||||
let devices: Vec<device::Device> = services()
|
||||
.users
|
||||
.all_devices_metadata(sender_user)
|
||||
.filter_map(|r| r.ok()) // Filter out buggy devices
|
||||
.all_user_devices_metadata(sender_user)
|
||||
.await
|
||||
.collect();
|
||||
|
||||
Ok(get_devices::v3::Response { devices })
|
||||
|
|
|
@ -71,6 +71,17 @@ pub async fn sync_events_route(
|
|||
) -> Result<sync_events::v3::Response, RumaResponse<UiaaResponse>> {
|
||||
let sender_user = body.sender_user.expect("user is authenticated");
|
||||
let sender_device = body.sender_device.expect("user is authenticated");
|
||||
|
||||
let cloned_sender_user = sender_user.clone();
|
||||
let cloned_sender_device = sender_device.clone();
|
||||
// No need to block sync on device last-seen update
|
||||
tokio::spawn(async move {
|
||||
services()
|
||||
.users
|
||||
.update_device_last_seen(cloned_sender_user, cloned_sender_device)
|
||||
.await;
|
||||
});
|
||||
|
||||
let body = body.body;
|
||||
|
||||
let mut rx = match services()
|
||||
|
@ -1274,6 +1285,17 @@ pub async fn sync_events_v5_route(
|
|||
) -> Result<sync_events::v5::Response, RumaResponse<UiaaResponse>> {
|
||||
let sender_user = body.sender_user.expect("user is authenticated");
|
||||
let sender_device = body.sender_device.expect("user is authenticated");
|
||||
|
||||
let cloned_sender_user = sender_user.clone();
|
||||
let cloned_sender_device = sender_device.clone();
|
||||
// No need to block sync on device last-seen update
|
||||
tokio::spawn(async move {
|
||||
services()
|
||||
.users
|
||||
.update_device_last_seen(cloned_sender_user, cloned_sender_device)
|
||||
.await;
|
||||
});
|
||||
|
||||
let mut body = body.body;
|
||||
// Setup watchers, so if there's no response, we can wait for them
|
||||
let watcher = services().globals.watch(&sender_user, &sender_device);
|
||||
|
|
|
@ -89,7 +89,7 @@ where
|
|||
if let Some(reg_info) = services().appservice.find_from_token(token).await {
|
||||
Token::Appservice(Box::new(reg_info.clone()))
|
||||
} else if let Some((user_id, device_id)) = services().users.find_from_token(token)? {
|
||||
Token::User((user_id, OwnedDeviceId::from(device_id)))
|
||||
Token::User((user_id, device_id))
|
||||
} else {
|
||||
Token::Invalid
|
||||
}
|
||||
|
|
|
@ -2351,8 +2351,8 @@ pub async fn get_devices_route(
|
|||
.expect("version will not grow that large"),
|
||||
devices: services()
|
||||
.users
|
||||
.all_devices_metadata(&body.user_id)
|
||||
.filter_map(|r| r.ok())
|
||||
.all_user_devices_metadata(&body.user_id)
|
||||
.await
|
||||
.filter_map(|metadata| {
|
||||
Some(UserDevice {
|
||||
keys: services()
|
||||
|
|
|
@ -41,7 +41,7 @@ impl service::users::Data for KeyValueDatabase {
|
|||
}
|
||||
|
||||
/// Find out which user an access token belongs to.
|
||||
fn find_from_token(&self, token: &str) -> Result<Option<(OwnedUserId, String)>> {
|
||||
fn find_from_token(&self, token: &str) -> Result<Option<(OwnedUserId, OwnedDeviceId)>> {
|
||||
self.token_userdeviceid
|
||||
.get(token.as_bytes())?
|
||||
.map_or(Ok(None), |bytes| {
|
||||
|
@ -60,9 +60,11 @@ impl service::users::Data for KeyValueDatabase {
|
|||
.map_err(|_| {
|
||||
Error::bad_database("User ID in token_userdeviceid is invalid.")
|
||||
})?,
|
||||
utils::string_from_bytes(device_bytes).map_err(|_| {
|
||||
Error::bad_database("Device ID in token_userdeviceid is invalid.")
|
||||
})?,
|
||||
utils::string_from_bytes(device_bytes)
|
||||
.map_err(|_| {
|
||||
Error::bad_database("Device ID in token_userdeviceid is invalid.")
|
||||
})?
|
||||
.into(),
|
||||
)))
|
||||
})
|
||||
}
|
||||
|
@ -894,7 +896,7 @@ impl service::users::Data for KeyValueDatabase {
|
|||
})
|
||||
}
|
||||
|
||||
fn all_devices_metadata<'a>(
|
||||
fn all_user_devices_metadata<'a>(
|
||||
&'a self,
|
||||
user_id: &UserId,
|
||||
) -> Box<dyn Iterator<Item = Result<Device>> + 'a> {
|
||||
|
@ -912,6 +914,38 @@ impl service::users::Data for KeyValueDatabase {
|
|||
)
|
||||
}
|
||||
|
||||
fn set_devices_last_seen<'a>(
|
||||
&'a self,
|
||||
devices: &'a BTreeMap<(OwnedUserId, OwnedDeviceId), MilliSecondsSinceUnixEpoch>,
|
||||
) -> Box<dyn Iterator<Item = Result<()>> + 'a> {
|
||||
Box::new(devices.iter().map(|((user, device), timestamp)| {
|
||||
let mut key = user.as_bytes().to_vec();
|
||||
key.push(0xff);
|
||||
key.extend_from_slice(device.as_bytes());
|
||||
|
||||
// Device may have been removed since last db write
|
||||
if let Some(mut device_metadata) = self
|
||||
.userdeviceid_metadata
|
||||
.get(&key)?
|
||||
.map(|bytes| {
|
||||
serde_json::from_slice::<Device>(&bytes).map_err(|_| {
|
||||
Error::bad_database("Device in userdeviceid_metadata is invalid.")
|
||||
})
|
||||
})
|
||||
.transpose()?
|
||||
{
|
||||
device_metadata.last_seen_ts = Some(*timestamp);
|
||||
|
||||
self.userdeviceid_metadata.insert(
|
||||
&key,
|
||||
&serde_json::to_vec(&device_metadata).expect("Device always serializes"),
|
||||
)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}))
|
||||
}
|
||||
|
||||
/// Creates a new sync filter. Returns the filter id.
|
||||
fn create_filter(&self, user_id: &UserId, filter: &FilterDefinition) -> Result<String> {
|
||||
let filter_id = utils::random_string(4);
|
||||
|
|
|
@ -1139,6 +1139,7 @@ impl KeyValueDatabase {
|
|||
services().sending.start_handler();
|
||||
|
||||
services().media.start_time_retention_checker();
|
||||
services().users.start_device_last_seen_update_task();
|
||||
|
||||
Self::start_cleanup_task().await;
|
||||
if services().globals.allow_check_for_updates() {
|
||||
|
|
|
@ -543,7 +543,7 @@ async fn shutdown_signal(handle: ServerHandle) {
|
|||
warn!("Received {}, shutting down...", sig);
|
||||
handle.graceful_shutdown(Some(Duration::from_secs(30)));
|
||||
|
||||
services().globals.shutdown();
|
||||
services().globals.shutdown().await;
|
||||
|
||||
#[cfg(feature = "systemd")]
|
||||
let _ = sd_notify::notify(true, &[sd_notify::NotifyState::Stopping]);
|
||||
|
|
|
@ -512,11 +512,13 @@ impl Service {
|
|||
Ok(r)
|
||||
}
|
||||
|
||||
pub fn shutdown(&self) {
|
||||
pub async fn shutdown(&self) {
|
||||
self.shutdown.store(true, atomic::Ordering::Relaxed);
|
||||
// On shutdown
|
||||
info!(target: "shutdown-sync", "Received shutdown notification, notifying sync helpers...");
|
||||
services().globals.rotate.fire();
|
||||
// Force write before shutdown
|
||||
services().users.try_update_device_last_seen().await;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -29,7 +29,7 @@ pub struct Services {
|
|||
pub rooms: rooms::Service,
|
||||
pub transaction_ids: transaction_ids::Service,
|
||||
pub uiaa: uiaa::Service,
|
||||
pub users: users::Service,
|
||||
pub users: Arc<users::Service>,
|
||||
pub account_data: account_data::Service,
|
||||
pub admin: Arc<admin::Service>,
|
||||
pub globals: globals::Service,
|
||||
|
@ -112,10 +112,11 @@ impl Services {
|
|||
},
|
||||
transaction_ids: transaction_ids::Service { db },
|
||||
uiaa: uiaa::Service { db },
|
||||
users: users::Service {
|
||||
users: Arc::new(users::Service {
|
||||
db,
|
||||
connections: StdMutex::new(BTreeMap::new()),
|
||||
},
|
||||
device_last_seen: Mutex::new(BTreeMap::new()),
|
||||
}),
|
||||
account_data: account_data::Service { db },
|
||||
admin: admin::Service::build(),
|
||||
key_backups: key_backups::Service { db },
|
||||
|
|
|
@ -4,8 +4,8 @@ use ruma::{
|
|||
encryption::{CrossSigningKey, DeviceKeys, OneTimeKey},
|
||||
events::AnyToDeviceEvent,
|
||||
serde::Raw,
|
||||
DeviceId, OneTimeKeyAlgorithm, OwnedDeviceId, OwnedMxcUri, OwnedOneTimeKeyId, OwnedUserId,
|
||||
UInt, UserId,
|
||||
DeviceId, MilliSecondsSinceUnixEpoch, OneTimeKeyAlgorithm, OwnedDeviceId, OwnedMxcUri,
|
||||
OwnedOneTimeKeyId, OwnedUserId, UInt, UserId,
|
||||
};
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
|
@ -20,7 +20,7 @@ pub trait Data: Send + Sync {
|
|||
fn count(&self) -> Result<usize>;
|
||||
|
||||
/// Find out which user an access token belongs to.
|
||||
fn find_from_token(&self, token: &str) -> Result<Option<(OwnedUserId, String)>>;
|
||||
fn find_from_token(&self, token: &str) -> Result<Option<(OwnedUserId, OwnedDeviceId)>>;
|
||||
|
||||
/// Returns an iterator over all users on this homeserver.
|
||||
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = Result<OwnedUserId>> + 'a>;
|
||||
|
@ -202,11 +202,16 @@ pub trait Data: Send + Sync {
|
|||
|
||||
fn get_devicelist_version(&self, user_id: &UserId) -> Result<Option<u64>>;
|
||||
|
||||
fn all_devices_metadata<'a>(
|
||||
fn all_user_devices_metadata<'a>(
|
||||
&'a self,
|
||||
user_id: &UserId,
|
||||
) -> Box<dyn Iterator<Item = Result<Device>> + 'a>;
|
||||
|
||||
fn set_devices_last_seen<'a>(
|
||||
&'a self,
|
||||
devices: &'a BTreeMap<(OwnedUserId, OwnedDeviceId), MilliSecondsSinceUnixEpoch>,
|
||||
) -> Box<dyn Iterator<Item = Result<()>> + 'a>;
|
||||
|
||||
/// Creates a new sync filter. Returns the filter id.
|
||||
fn create_filter(&self, user_id: &UserId, filter: &FilterDefinition) -> Result<String>;
|
||||
|
||||
|
|
|
@ -2,7 +2,8 @@ mod data;
|
|||
use std::{
|
||||
collections::{BTreeMap, BTreeSet},
|
||||
mem,
|
||||
sync::{Arc, Mutex},
|
||||
sync::{Arc, Mutex as StdMutex},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
pub use data::Data;
|
||||
|
@ -15,9 +16,11 @@ use ruma::{
|
|||
encryption::{CrossSigningKey, DeviceKeys, OneTimeKey},
|
||||
events::AnyToDeviceEvent,
|
||||
serde::Raw,
|
||||
DeviceId, OneTimeKeyAlgorithm, OwnedDeviceId, OwnedMxcUri, OwnedOneTimeKeyId, OwnedRoomId,
|
||||
OwnedUserId, UInt, UserId,
|
||||
DeviceId, MilliSecondsSinceUnixEpoch, OneTimeKeyAlgorithm, OwnedDeviceId, OwnedMxcUri,
|
||||
OwnedOneTimeKeyId, OwnedRoomId, OwnedUserId, UInt, UserId,
|
||||
};
|
||||
use tokio::{sync::Mutex, time::interval};
|
||||
use tracing::{debug, warn};
|
||||
|
||||
use crate::{services, Error, Result};
|
||||
|
||||
|
@ -32,10 +35,59 @@ pub struct Service {
|
|||
pub db: &'static dyn Data,
|
||||
#[allow(clippy::type_complexity)]
|
||||
pub connections:
|
||||
Mutex<BTreeMap<(OwnedUserId, OwnedDeviceId, String), Arc<Mutex<SlidingSyncCache>>>>,
|
||||
StdMutex<BTreeMap<(OwnedUserId, OwnedDeviceId, String), Arc<StdMutex<SlidingSyncCache>>>>,
|
||||
pub device_last_seen: Mutex<BTreeMap<(OwnedUserId, OwnedDeviceId), MilliSecondsSinceUnixEpoch>>,
|
||||
}
|
||||
|
||||
impl Service {
|
||||
pub fn start_device_last_seen_update_task(self: &Arc<Self>) {
|
||||
let self2 = Arc::clone(self);
|
||||
tokio::spawn(async move {
|
||||
// Actually writes the new device times to the database every 55 minutes.
|
||||
// The device times are always returned fresh from memory
|
||||
// if they have been changed after the last write.
|
||||
let timer_interval = Duration::from_secs(60 * 5);
|
||||
let mut i = interval(timer_interval);
|
||||
loop {
|
||||
i.tick().await;
|
||||
let _ = self2.try_update_device_last_seen().await;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
pub async fn try_update_device_last_seen(&self) {
|
||||
debug!("Writing cached device last-seens to database");
|
||||
for error in self.write_cached_last_seen().await {
|
||||
warn!("Error writing last seen timestamp of device to database: {error}");
|
||||
}
|
||||
}
|
||||
|
||||
/// Writes all the currently cached last seen timestamps of devices to the database,
|
||||
/// clearing the cache in the process
|
||||
async fn write_cached_last_seen(&self) -> Vec<Error> {
|
||||
let mut guard = self.device_last_seen.lock().await;
|
||||
if !guard.is_empty() {
|
||||
// TODO: Replace with `replace` once/if `tokio::sync::Mutex` implements the equivalent
|
||||
// method from `std`: https://doc.rust-lang.org/std/sync/struct.Mutex.html#method.replace
|
||||
// i.e. instead of the `let mut guard` above:
|
||||
//let map = self.device_last_seen.replace(BTreeMap::new()).await;
|
||||
// We do a clone instead as we don't want start having a backlog of awaiting `lock`s
|
||||
// for all these DB fetches and writes, which admittedly, might not even be a big deal.
|
||||
let map = guard.clone();
|
||||
guard.clear();
|
||||
drop(guard);
|
||||
|
||||
let result = self
|
||||
.db
|
||||
.set_devices_last_seen(&map)
|
||||
.filter_map(Result::err)
|
||||
.collect();
|
||||
result
|
||||
} else {
|
||||
Vec::new()
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if a user has an account on this homeserver.
|
||||
pub fn exists(&self, user_id: &UserId) -> Result<bool> {
|
||||
self.db.exists(user_id)
|
||||
|
@ -68,7 +120,7 @@ impl Service {
|
|||
cache
|
||||
.entry((user_id, device_id, conn_id))
|
||||
.or_insert_with(|| {
|
||||
Arc::new(Mutex::new(SlidingSyncCache {
|
||||
Arc::new(StdMutex::new(SlidingSyncCache {
|
||||
lists: BTreeMap::new(),
|
||||
subscriptions: BTreeMap::new(),
|
||||
known_rooms: BTreeMap::new(),
|
||||
|
@ -161,7 +213,7 @@ impl Service {
|
|||
cache
|
||||
.entry((user_id, device_id, conn_id))
|
||||
.or_insert_with(|| {
|
||||
Arc::new(Mutex::new(SlidingSyncCache {
|
||||
Arc::new(StdMutex::new(SlidingSyncCache {
|
||||
lists: BTreeMap::new(),
|
||||
subscriptions: BTreeMap::new(),
|
||||
known_rooms: BTreeMap::new(),
|
||||
|
@ -189,7 +241,7 @@ impl Service {
|
|||
cache
|
||||
.entry((user_id, device_id, conn_id))
|
||||
.or_insert_with(|| {
|
||||
Arc::new(Mutex::new(SlidingSyncCache {
|
||||
Arc::new(StdMutex::new(SlidingSyncCache {
|
||||
lists: BTreeMap::new(),
|
||||
subscriptions: BTreeMap::new(),
|
||||
known_rooms: BTreeMap::new(),
|
||||
|
@ -245,7 +297,7 @@ impl Service {
|
|||
}
|
||||
|
||||
/// Find out which user an access token belongs to.
|
||||
pub fn find_from_token(&self, token: &str) -> Result<Option<(OwnedUserId, String)>> {
|
||||
pub fn find_from_token(&self, token: &str) -> Result<Option<(OwnedUserId, OwnedDeviceId)>> {
|
||||
self.db.find_from_token(token)
|
||||
}
|
||||
|
||||
|
@ -519,11 +571,25 @@ impl Service {
|
|||
self.db.get_devicelist_version(user_id)
|
||||
}
|
||||
|
||||
pub fn all_devices_metadata<'a>(
|
||||
pub async fn all_user_devices_metadata<'a>(
|
||||
&'a self,
|
||||
user_id: &UserId,
|
||||
) -> impl Iterator<Item = Result<Device>> + 'a {
|
||||
self.db.all_devices_metadata(user_id)
|
||||
user_id: &'a UserId,
|
||||
) -> impl Iterator<Item = Device> + 'a {
|
||||
let all_devices: Vec<_> = self
|
||||
.db
|
||||
.all_user_devices_metadata(user_id)
|
||||
.filter_map(Result::ok)
|
||||
// RumaHandler trait complains if we don't collect
|
||||
.collect();
|
||||
let device_last_seen = self.device_last_seen.lock().await;
|
||||
|
||||
// Updates the timestamps with the cached ones
|
||||
all_devices.into_iter().map(move |mut d| {
|
||||
if let Some(ts) = device_last_seen.get(&(user_id.to_owned(), d.device_id.clone())) {
|
||||
d.last_seen_ts = Some(*ts);
|
||||
};
|
||||
d
|
||||
})
|
||||
}
|
||||
|
||||
/// Deactivate account
|
||||
|
@ -564,6 +630,14 @@ impl Service {
|
|||
pub fn find_from_openid_token(&self, token: &str) -> Result<Option<OwnedUserId>> {
|
||||
self.db.find_from_openid_token(token)
|
||||
}
|
||||
|
||||
/// Sets the device_last_seen timestamp of a given device to now
|
||||
pub async fn update_device_last_seen(&self, user_id: OwnedUserId, device_id: OwnedDeviceId) {
|
||||
self.device_last_seen
|
||||
.lock()
|
||||
.await
|
||||
.insert((user_id, device_id), MilliSecondsSinceUnixEpoch::now());
|
||||
}
|
||||
}
|
||||
|
||||
/// Ensure that a user only sees signatures from themselves and the target user
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue