1
0
Fork 0
mirror of https://gitlab.com/famedly/conduit.git synced 2025-07-02 16:38:36 +00:00

feat(presence): implement most features for PoC

This commit is contained in:
Jakub Kubík 2022-11-18 17:18:20 +01:00
parent f956e727e4
commit e7348621bd
No known key found for this signature in database
GPG key ID: D3A0D5D60F3A173F
8 changed files with 183 additions and 129 deletions

View file

@ -60,7 +60,7 @@ pub async fn get_presence_route(
.rooms .rooms
.edus .edus
.presence .presence
.get_last_presence_event(sender_user, &room_id)? .get_presence_event(sender_user, &room_id)?
{ {
presence_event = Some(presence); presence_event = Some(presence);
break; break;

View file

@ -33,6 +33,7 @@ use ruma::{
}, },
directory::{IncomingFilter, IncomingRoomNetwork}, directory::{IncomingFilter, IncomingRoomNetwork},
events::{ events::{
presence::{PresenceEvent, PresenceEventContent},
receipt::{ReceiptEvent, ReceiptEventContent, ReceiptType}, receipt::{ReceiptEvent, ReceiptEventContent, ReceiptType},
room::{ room::{
join_rules::{JoinRule, RoomJoinRulesEventContent}, join_rules::{JoinRule, RoomJoinRulesEventContent},
@ -746,7 +747,33 @@ pub async fn send_transaction_message_route(
.filter_map(|edu| serde_json::from_str::<Edu>(edu.json().get()).ok()) .filter_map(|edu| serde_json::from_str::<Edu>(edu.json().get()).ok())
{ {
match edu { match edu {
Edu::Presence(_) => {} Edu::Presence(presence) => {
for presence_update in presence.push {
let user_id = presence_update.user_id;
for room_id in services()
.rooms
.state_cache
.rooms_joined(&user_id)
.filter_map(|room_id| room_id.ok())
{
services().rooms.edus.presence.update_presence(
&user_id,
&room_id,
PresenceEvent {
content: PresenceEventContent {
avatar_url: services().users.avatar_url(&user_id)?,
currently_active: Some(presence_update.currently_active),
displayname: services().users.displayname(&user_id)?,
last_active_ago: Some(presence_update.last_active_ago),
presence: presence_update.presence.clone(),
status_msg: presence_update.status_msg.clone(),
},
sender: user_id.clone(),
},
)?;
}
}
}
Edu::Receipt(receipt) => { Edu::Receipt(receipt) => {
for (room_id, room_updates) in receipt.receipts { for (room_id, room_updates) in receipt.receipts {
for (user_id, user_updates) in room_updates.read { for (user_id, user_updates) in room_updates.read {

View file

@ -76,6 +76,11 @@ pub struct Config {
pub emergency_password: Option<String>, pub emergency_password: Option<String>,
#[serde(default = "default_presence_idle_timeout")]
pub presence_idle_timeout: u64,
#[serde(default = "default_presence_offline_timeout")]
pub presence_offline_timeout: u64,
#[serde(flatten)] #[serde(flatten)]
pub catchall: BTreeMap<String, IgnoredAny>, pub catchall: BTreeMap<String, IgnoredAny>,
} }
@ -257,6 +262,14 @@ fn default_turn_ttl() -> u64 {
60 * 60 * 24 60 * 60 * 24
} }
fn default_presence_idle_timeout() -> u64 {
1 * 60 as u64
}
fn default_presence_offline_timeout() -> u64 {
15 * 60 as u64
}
// I know, it's a great name // I know, it's a great name
pub fn default_default_room_version() -> RoomVersionId { pub fn default_default_room_version() -> RoomVersionId {
RoomVersionId::V9 RoomVersionId::V9

View file

@ -1,5 +1,7 @@
use futures_util::{stream::FuturesUnordered, StreamExt}; use futures_util::{stream::FuturesUnordered, StreamExt};
use ruma::user_id;
use std::{collections::HashMap, time::Duration}; use std::{collections::HashMap, time::Duration};
use tracing::error;
use ruma::{ use ruma::{
events::presence::PresenceEvent, presence::PresenceState, OwnedUserId, RoomId, UInt, UserId, events::presence::PresenceEvent, presence::PresenceState, OwnedUserId, RoomId, UInt, UserId,
@ -7,9 +9,11 @@ use ruma::{
use tokio::{sync::mpsc, time::sleep}; use tokio::{sync::mpsc, time::sleep};
use crate::{ use crate::{
database::KeyValueDatabase, service, services, utils, utils::u64_from_bytes, Error, Result, database::KeyValueDatabase,
service, services, utils,
utils::{millis_since_unix_epoch, u64_from_bytes},
Error, Result,
}; };
use crate::utils::millis_since_unix_epoch;
pub struct PresenceUpdate { pub struct PresenceUpdate {
count: u64, count: u64,
@ -17,15 +21,15 @@ pub struct PresenceUpdate {
} }
impl PresenceUpdate { impl PresenceUpdate {
fn to_be_bytes(&self) -> &[u8] { fn to_be_bytes(&self) -> Vec<u8> {
&*([self.count.to_be_bytes(), self.timestamp.to_be_bytes()].concat()) [self.count.to_be_bytes(), self.timestamp.to_be_bytes()].concat()
} }
fn from_be_bytes(bytes: &[u8]) -> Result<Self> { fn from_be_bytes(bytes: &[u8]) -> Result<Self> {
let (count_bytes, timestamp_bytes) = bytes.split_at(bytes.len() / 2); let (count_bytes, timestamp_bytes) = bytes.split_at(bytes.len() / 2);
Ok(Self { Ok(Self {
count: u64_from_bytes(count_bytes)?, count: u64_from_bytes(count_bytes).expect("count bytes from DB are valid"),
timestamp: u64_from_bytes(timestamp_bytes)?, timestamp: u64_from_bytes(timestamp_bytes).expect("timestamp bytes from DB are valid"),
}) })
} }
} }
@ -37,19 +41,23 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase {
room_id: &RoomId, room_id: &RoomId,
presence: PresenceEvent, presence: PresenceEvent,
) -> Result<()> { ) -> Result<()> {
let mut roomuser_id = [room_id.as_bytes(), 0xff, user_id.as_bytes()].concat(); let roomuser_id = [room_id.as_bytes(), &[0xff], user_id.as_bytes()].concat();
self.roomuserid_presenceevent.insert( self.roomuserid_presenceevent.insert(
&roomuser_id, &roomuser_id,
&serde_json::to_vec(&presence)?, &serde_json::to_vec(&presence).expect("presence event from DB is valid"),
)?; )?;
self.userid_presenceupdate.insert( self.userid_presenceupdate.insert(
user_id.as_bytes(), user_id.as_bytes(),
PresenceUpdate { &*PresenceUpdate {
count: services().globals.next_count()?, count: services().globals.next_count()?,
timestamp: millis_since_unix_epoch(), timestamp: match presence.content.last_active_ago {
}.to_be_bytes(), Some(active_ago) => millis_since_unix_epoch().saturating_sub(active_ago.into()),
None => millis_since_unix_epoch(),
},
}
.to_be_bytes(),
)?; )?;
Ok(()) Ok(())
@ -58,10 +66,11 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase {
fn ping_presence(&self, user_id: &UserId) -> Result<()> { fn ping_presence(&self, user_id: &UserId) -> Result<()> {
self.userid_presenceupdate.insert( self.userid_presenceupdate.insert(
user_id.as_bytes(), user_id.as_bytes(),
PresenceUpdate { &*PresenceUpdate {
count: services().globals.current_count()?, count: services().globals.current_count()?,
timestamp: millis_since_unix_epoch(), timestamp: millis_since_unix_epoch(),
}.to_be_bytes() }
.to_be_bytes(),
)?; )?;
Ok(()) Ok(())
@ -70,9 +79,7 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase {
fn last_presence_update(&self, user_id: &UserId) -> Result<Option<u64>> { fn last_presence_update(&self, user_id: &UserId) -> Result<Option<u64>> {
self.userid_presenceupdate self.userid_presenceupdate
.get(user_id.as_bytes())? .get(user_id.as_bytes())?
.map(|bytes| { .map(|bytes| PresenceUpdate::from_be_bytes(&bytes).map(|update| update.timestamp))
PresenceUpdate::from_be_bytes(bytes)?.timestamp
})
.transpose() .transpose()
} }
@ -80,57 +87,131 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase {
&self, &self,
room_id: &RoomId, room_id: &RoomId,
user_id: &UserId, user_id: &UserId,
presence_timestamp: u64 presence_timestamp: u64,
) -> Result<Option<PresenceEvent>> { ) -> Result<Option<PresenceEvent>> {
let mut roomuser_id = [room_id.as_bytes(), 0xff, user_id.as_bytes()].concat(); let roomuser_id = [room_id.as_bytes(), &[0xff], user_id.as_bytes()].concat();
self.roomuserid_presenceevent self.roomuserid_presenceevent
.get(&roomuser_id)? .get(&roomuser_id)?
.map(|value| parse_presence_event(&value, presence_timestamp)) .map(|value| parse_presence_event(&value, presence_timestamp))
.transpose() .transpose()
} }
fn presence_since( fn presence_since<'a>(
&self, &'a self,
room_id: &RoomId, room_id: &RoomId,
since: u64, since: u64,
) -> Result<Box<dyn Iterator<Item=(&UserId, PresenceEvent)>>> { ) -> Result<Box<dyn Iterator<Item = (OwnedUserId, PresenceEvent)> + 'a>> {
let services = &services(); let services = &services();
let mut user_timestamp: HashMap<UserId, u64> = self.userid_presenceupdate let user_timestamp: HashMap<OwnedUserId, u64> = self
.userid_presenceupdate
.iter() .iter()
.map(|(user_id_bytes, update_bytes)| (UserId::parse(utils::string_from_bytes(user_id_bytes)), PresenceUpdate::from_be_bytes(update_bytes)?)) .filter_map(|(user_id_bytes, update_bytes)| {
Some((
OwnedUserId::from(
UserId::parse(utils::string_from_bytes(&user_id_bytes).ok()?).ok()?,
),
PresenceUpdate::from_be_bytes(&update_bytes).ok()?,
))
})
.filter_map(|(user_id, presence_update)| { .filter_map(|(user_id, presence_update)| {
if presence_update.count <= since || !services.rooms.state_cache.is_joined(user_id, room_id)? { if presence_update.count <= since
return None || !services
.rooms
.state_cache
.is_joined(&user_id, room_id)
.ok()?
{
return None;
} }
Some((user_id, presence_update.timestamp)) Some((user_id, presence_update.timestamp))
}) })
.collect(); .collect();
Ok( Ok(Box::new(
self.roomuserid_presenceevent self.roomuserid_presenceevent
.iter() .iter()
.filter_map(|user_id_bytes, presence_bytes| (UserId::parse(utils::string_from_bytes(user_id_bytes)), presence_bytes)) .filter_map(|(user_id_bytes, presence_bytes)| {
.filter_map(|user_id, presence_bytes| { Some((
let timestamp = user_timestamp.get(user_id)?; OwnedUserId::from(
UserId::parse(utils::string_from_bytes(&user_id_bytes).ok()?).ok()?,
Some((user_id, parse_presence_event(presence_bytes, *timestamp)?)) ),
presence_bytes,
))
}) })
.into_iter() .filter_map(
) move |(user_id, presence_bytes)| -> Option<(OwnedUserId, PresenceEvent)> {
let timestamp = user_timestamp.get(&user_id)?;
Some((
user_id,
parse_presence_event(&presence_bytes, *timestamp).ok()?,
))
},
),
))
} }
fn presence_maintain( fn presence_maintain(
&self, &self,
mut timer_receiver: mpsc::UnboundedReceiver<Box<UserId>>, mut timer_receiver: mpsc::UnboundedReceiver<OwnedUserId>,
) -> Result<()> { ) -> Result<()> {
let mut timers = FuturesUnordered::new(); let mut timers = FuturesUnordered::new();
// TODO: Get rid of this hack
timers.push(create_presence_timer(
Duration::from_secs(60),
user_id!("@test:test.com").to_owned(),
));
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
tokio::select! { tokio::select! {
Some(_user_id) = timers.next() => { Some(user_id) = timers.next() => {
// TODO: Handle presence timeouts let presence_timestamp = match services().rooms.edus.presence.last_presence_update(&user_id) {
Ok(timestamp) => match timestamp {
Some(timestamp) => timestamp,
None => continue,
},
Err(e) => {
error!("{e}");
continue;
}
};
let presence_state = determine_presence_state(presence_timestamp);
// Continue if there is no change in state
if presence_state != PresenceState::Offline {
continue;
}
for room_id in services()
.rooms
.state_cache
.rooms_joined(&user_id)
.filter_map(|room_id| room_id.ok()) {
let presence_event = match services().rooms.edus.presence.get_presence_event(&user_id, &room_id) {
Ok(event) => match event {
Some(event) => event,
None => continue,
},
Err(e) => {
error!("{e}");
continue;
}
};
match services().rooms.edus.presence.update_presence(&user_id, &room_id, presence_event) {
Ok(()) => (),
Err(e) => {
error!("{e}");
continue;
}
}
// TODO: Send event over federation
}
} }
Some(user_id) = timer_receiver.recv() => { Some(user_id) = timer_receiver.recv() => {
// Idle timeout // Idle timeout
@ -147,7 +228,7 @@ impl service::rooms::edus::presence::Data for KeyValueDatabase {
} }
} }
async fn create_presence_timer(duration: Duration, user_id: Box<UserId>) -> Box<UserId> { async fn create_presence_timer(duration: Duration, user_id: OwnedUserId) -> OwnedUserId {
sleep(duration).await; sleep(duration).await;
user_id user_id
@ -162,9 +243,7 @@ fn parse_presence_event(bytes: &[u8], presence_timestamp: u64) -> Result<Presenc
Ok(presence) Ok(presence)
} }
fn determine_presence_state( fn determine_presence_state(last_active_ago: u64) -> PresenceState {
last_active_ago: u64,
) -> PresenceState {
let globals = &services().globals; let globals = &services().globals;
return if last_active_ago < globals.presence_idle_timeout() { return if last_active_ago < globals.presence_idle_timeout() {
@ -177,10 +256,7 @@ fn determine_presence_state(
} }
/// Translates the timestamp representing last_active_ago to a diff from now. /// Translates the timestamp representing last_active_ago to a diff from now.
fn translate_active_ago( fn translate_active_ago(presence_event: &mut PresenceEvent, last_active_ts: u64) {
presence_event: &mut PresenceEvent,
last_active_ts: u64,
) {
let last_active_ago = millis_since_unix_epoch().saturating_sub(last_active_ts); let last_active_ago = millis_since_unix_epoch().saturating_sub(last_active_ts);
presence_event.content.presence = determine_presence_state(last_active_ago); presence_event.content.presence = determine_presence_state(last_active_ago);

View file

@ -825,9 +825,6 @@ impl KeyValueDatabase {
); );
} }
// This data is probably outdated
db.presenceid_presence.clear()?;
services().admin.start_handler(); services().admin.start_handler();
// Set emergency access for the conduit user // Set emergency access for the conduit user

View file

@ -286,6 +286,14 @@ impl Service {
&self.config.emergency_password &self.config.emergency_password
} }
pub fn presence_idle_timeout(&self) -> u64 {
self.config.presence_idle_timeout
}
pub fn presence_offline_timeout(&self) -> u64 {
self.config.presence_offline_timeout
}
pub fn supported_room_versions(&self) -> Vec<RoomVersionId> { pub fn supported_room_versions(&self) -> Vec<RoomVersionId> {
let mut room_versions: Vec<RoomVersionId> = vec![]; let mut room_versions: Vec<RoomVersionId> = vec![];
room_versions.extend(self.stable_room_versions.clone()); room_versions.extend(self.stable_room_versions.clone());

View file

@ -1,5 +1,3 @@
use std::collections::HashMap;
use crate::Result; use crate::Result;
use ruma::{events::presence::PresenceEvent, OwnedUserId, RoomId, UserId}; use ruma::{events::presence::PresenceEvent, OwnedUserId, RoomId, UserId};
use tokio::sync::mpsc; use tokio::sync::mpsc;
@ -31,12 +29,12 @@ pub trait Data: Send + Sync {
) -> Result<Option<PresenceEvent>>; ) -> Result<Option<PresenceEvent>>;
/// Returns the most recent presence updates that happened after the event with id `since`. /// Returns the most recent presence updates that happened after the event with id `since`.
fn presence_since( fn presence_since<'a>(
&self, &'a self,
room_id: &RoomId, room_id: &RoomId,
since: u64, since: u64,
) -> Result<HashMap<OwnedUserId, PresenceEvent>>; ) -> Result<Box<dyn Iterator<Item = (OwnedUserId, PresenceEvent)> + 'a>>;
fn presence_maintain(&self, timer_receiver: mpsc::UnboundedReceiver<Box<UserId>>) fn presence_maintain(&self, timer_receiver: mpsc::UnboundedReceiver<OwnedUserId>)
-> Result<()>; -> Result<()>;
} }

View file

@ -1,5 +1,4 @@
mod data; mod data;
use std::collections::HashMap;
pub use data::Data; pub use data::Data;
use ruma::{events::presence::PresenceEvent, OwnedUserId, RoomId, UserId}; use ruma::{events::presence::PresenceEvent, OwnedUserId, RoomId, UserId};
@ -11,7 +10,7 @@ pub struct Service {
pub db: &'static dyn Data, pub db: &'static dyn Data,
// Presence timers // Presence timers
timer_sender: mpsc::UnboundedSender<Box<UserId>>, timer_sender: mpsc::UnboundedSender<OwnedUserId>,
} }
impl Service { impl Service {
@ -51,7 +50,11 @@ impl Service {
self.db.ping_presence(user_id) self.db.ping_presence(user_id)
} }
pub fn get_last_presence_event( pub fn last_presence_update(&self, user_id: &UserId) -> Result<Option<u64>> {
self.db.last_presence_update(user_id)
}
pub fn get_presence_event(
&self, &self,
user_id: &UserId, user_id: &UserId,
room_id: &RoomId, room_id: &RoomId,
@ -66,86 +69,18 @@ impl Service {
pub fn presence_maintain( pub fn presence_maintain(
&self, &self,
timer_receiver: mpsc::UnboundedReceiver<Box<UserId>>, timer_receiver: mpsc::UnboundedReceiver<OwnedUserId>,
) -> Result<()> { ) -> Result<()> {
self.db.presence_maintain(timer_receiver) self.db.presence_maintain(timer_receiver)
} }
/* TODO
/// Sets all users to offline who have been quiet for too long.
fn _presence_maintain(
&self,
rooms: &super::Rooms,
globals: &super::super::globals::Globals,
) -> Result<()> {
let current_timestamp = utils::millis_since_unix_epoch();
for (user_id_bytes, last_timestamp) in self
.userid_lastpresenceupdate
.iter()
.filter_map(|(k, bytes)| {
Some((
k,
utils::u64_from_bytes(&bytes)
.map_err(|_| {
Error::bad_database("Invalid timestamp in userid_lastpresenceupdate.")
})
.ok()?,
))
})
.take_while(|(_, timestamp)| current_timestamp.saturating_sub(*timestamp) > 5 * 60_000)
// 5 Minutes
{
// Send new presence events to set the user offline
let count = globals.next_count()?.to_be_bytes();
let user_id: Box<_> = utils::string_from_bytes(&user_id_bytes)
.map_err(|_| {
Error::bad_database("Invalid UserId bytes in userid_lastpresenceupdate.")
})?
.try_into()
.map_err(|_| Error::bad_database("Invalid UserId in userid_lastpresenceupdate."))?;
for room_id in rooms.rooms_joined(&user_id).filter_map(|r| r.ok()) {
let mut presence_id = room_id.as_bytes().to_vec();
presence_id.push(0xff);
presence_id.extend_from_slice(&count);
presence_id.push(0xff);
presence_id.extend_from_slice(&user_id_bytes);
self.presenceid_presence.insert(
&presence_id,
&serde_json::to_vec(&PresenceEvent {
content: PresenceEventContent {
avatar_url: None,
currently_active: None,
displayname: None,
last_active_ago: Some(
last_timestamp.try_into().expect("time is valid"),
),
presence: PresenceState::Offline,
status_msg: None,
},
sender: user_id.to_owned(),
})
.expect("PresenceEvent can be serialized"),
)?;
}
self.userid_lastpresenceupdate.insert(
user_id.as_bytes(),
&utils::millis_since_unix_epoch().to_be_bytes(),
)?;
}
Ok(())
}*/
/// Returns the most recent presence updates that happened after the event with id `since`. /// Returns the most recent presence updates that happened after the event with id `since`.
#[tracing::instrument(skip(self, since, room_id))] #[tracing::instrument(skip(self, since, room_id))]
pub fn presence_since( pub fn presence_since(
&self, &self,
room_id: &RoomId, room_id: &RoomId,
since: u64, since: u64,
) -> Result<HashMap<OwnedUserId, PresenceEvent>> { ) -> Result<Box<dyn Iterator<Item = (OwnedUserId, PresenceEvent)>>> {
self.db.presence_since(room_id, since) self.db.presence_since(room_id, since)
} }
} }