2025-01-04 04:12:50 +00:00
|
|
|
use std::{borrow::Borrow, sync::Arc};
|
2022-09-06 23:15:09 +02:00
|
|
|
|
2024-12-14 21:58:01 -05:00
|
|
|
use conduwuit::{
|
2025-02-23 01:17:45 -05:00
|
|
|
Err, PduCount, PduEvent, Result, at, err,
|
2024-08-08 17:18:30 +00:00
|
|
|
result::{LogErr, NotFound},
|
2025-09-04 10:33:43 -04:00
|
|
|
utils::{self, stream::TryReadyExt},
|
2024-08-08 17:18:30 +00:00
|
|
|
};
|
2024-10-07 17:54:27 +00:00
|
|
|
use database::{Database, Deserialized, Json, KeyVal, Map};
|
2025-02-23 01:17:45 -05:00
|
|
|
use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, future::select_ok, pin_mut};
|
|
|
|
use ruma::{CanonicalJsonObject, EventId, OwnedUserId, RoomId, UserId, api::Direction};
|
2022-09-06 23:15:09 +02:00
|
|
|
|
2024-11-02 06:12:54 +00:00
|
|
|
use super::{PduId, RawPduId};
|
2025-02-23 01:17:45 -05:00
|
|
|
use crate::{Dep, rooms, rooms::short::ShortRoomId};
|
2023-02-20 22:59:45 +01:00
|
|
|
|
2024-06-28 22:51:39 +00:00
|
|
|
pub(super) struct Data {
|
2024-07-18 06:37:47 +00:00
|
|
|
eventid_outlierpdu: Arc<Map>,
|
2024-06-28 22:51:39 +00:00
|
|
|
eventid_pduid: Arc<Map>,
|
|
|
|
pduid_pdu: Arc<Map>,
|
|
|
|
userroomid_highlightcount: Arc<Map>,
|
2024-07-18 06:37:47 +00:00
|
|
|
userroomid_notificationcount: Arc<Map>,
|
|
|
|
pub(super) db: Arc<Database>,
|
|
|
|
services: Services,
|
|
|
|
}
|
|
|
|
|
|
|
|
struct Services {
|
|
|
|
short: Dep<rooms::short::Service>,
|
2022-08-07 19:42:22 +02:00
|
|
|
}
|
2024-05-26 21:29:19 +00:00
|
|
|
|
2024-08-08 17:18:30 +00:00
|
|
|
pub type PdusIterItem = (PduCount, PduEvent);
|
2024-06-28 23:05:45 +00:00
|
|
|
|
2024-05-27 03:17:20 +00:00
|
|
|
impl Data {
|
2024-07-18 06:37:47 +00:00
|
|
|
pub(super) fn new(args: &crate::Args<'_>) -> Self {
|
|
|
|
let db = &args.db;
|
2024-05-27 03:17:20 +00:00
|
|
|
Self {
|
2024-07-18 06:37:47 +00:00
|
|
|
eventid_outlierpdu: db["eventid_outlierpdu"].clone(),
|
2024-06-28 22:51:39 +00:00
|
|
|
eventid_pduid: db["eventid_pduid"].clone(),
|
|
|
|
pduid_pdu: db["pduid_pdu"].clone(),
|
|
|
|
userroomid_highlightcount: db["userroomid_highlightcount"].clone(),
|
2024-07-18 06:37:47 +00:00
|
|
|
userroomid_notificationcount: db["userroomid_notificationcount"].clone(),
|
|
|
|
db: args.db.clone(),
|
|
|
|
services: Services {
|
|
|
|
short: args.depend::<rooms::short::Service>("rooms::short"),
|
|
|
|
},
|
2024-05-27 03:17:20 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2025-01-04 04:12:50 +00:00
|
|
|
#[inline]
|
2024-12-15 00:05:47 -05:00
|
|
|
pub(super) async fn last_timeline_count(
|
|
|
|
&self,
|
|
|
|
sender_user: Option<&UserId>,
|
|
|
|
room_id: &RoomId,
|
|
|
|
) -> Result<PduCount> {
|
2025-01-04 04:12:50 +00:00
|
|
|
let pdus_rev = self.pdus_rev(sender_user, room_id, PduCount::max());
|
|
|
|
|
|
|
|
pin_mut!(pdus_rev);
|
|
|
|
let last_count = pdus_rev
|
|
|
|
.try_next()
|
|
|
|
.await?
|
|
|
|
.map(at!(0))
|
|
|
|
.filter(|&count| matches!(count, PduCount::Normal(_)))
|
|
|
|
.unwrap_or_else(PduCount::max);
|
|
|
|
|
|
|
|
Ok(last_count)
|
|
|
|
}
|
|
|
|
|
|
|
|
#[inline]
|
|
|
|
pub(super) async fn latest_pdu_in_room(
|
|
|
|
&self,
|
|
|
|
sender_user: Option<&UserId>,
|
|
|
|
room_id: &RoomId,
|
|
|
|
) -> Result<PduEvent> {
|
|
|
|
let pdus_rev = self.pdus_rev(sender_user, room_id, PduCount::max());
|
|
|
|
|
|
|
|
pin_mut!(pdus_rev);
|
|
|
|
pdus_rev
|
|
|
|
.try_next()
|
|
|
|
.await?
|
|
|
|
.map(at!(1))
|
|
|
|
.ok_or_else(|| err!(Request(NotFound("no PDU's found in room"))))
|
2024-05-26 21:29:19 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Returns the `count` of this pdu's id.
|
2024-08-08 17:18:30 +00:00
|
|
|
pub(super) async fn get_pdu_count(&self, event_id: &EventId) -> Result<PduCount> {
|
2024-11-02 06:12:54 +00:00
|
|
|
self.get_pdu_id(event_id)
|
2024-08-08 17:18:30 +00:00
|
|
|
.await
|
2024-11-02 06:12:54 +00:00
|
|
|
.map(|pdu_id| pdu_id.pdu_count())
|
2024-05-26 21:29:19 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Returns the json of a pdu.
|
2024-08-08 17:18:30 +00:00
|
|
|
pub(super) async fn get_pdu_json(&self, event_id: &EventId) -> Result<CanonicalJsonObject> {
|
2024-11-27 05:57:20 +00:00
|
|
|
let accepted = self.get_non_outlier_pdu_json(event_id).boxed();
|
|
|
|
let outlier = self
|
|
|
|
.eventid_outlierpdu
|
|
|
|
.get(event_id)
|
|
|
|
.map(Deserialized::deserialized)
|
|
|
|
.boxed();
|
2024-08-08 17:18:30 +00:00
|
|
|
|
2024-11-27 05:57:20 +00:00
|
|
|
select_ok([accepted, outlier]).await.map(at!(0))
|
2024-05-26 21:29:19 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Returns the json of a pdu.
|
2024-12-15 00:05:47 -05:00
|
|
|
pub(super) async fn get_non_outlier_pdu_json(
|
|
|
|
&self,
|
|
|
|
event_id: &EventId,
|
|
|
|
) -> Result<CanonicalJsonObject> {
|
2024-08-08 17:18:30 +00:00
|
|
|
let pduid = self.get_pdu_id(event_id).await?;
|
|
|
|
|
2024-09-29 13:13:09 +00:00
|
|
|
self.pduid_pdu.get(&pduid).await.deserialized()
|
2024-05-26 21:29:19 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Returns the pdu's id.
|
2024-07-03 20:06:43 +00:00
|
|
|
#[inline]
|
2024-11-02 06:12:54 +00:00
|
|
|
pub(super) async fn get_pdu_id(&self, event_id: &EventId) -> Result<RawPduId> {
|
|
|
|
self.eventid_pduid
|
|
|
|
.get(event_id)
|
|
|
|
.await
|
|
|
|
.map(|handle| RawPduId::from(&*handle))
|
2024-05-27 03:17:20 +00:00
|
|
|
}
|
2024-05-26 21:29:19 +00:00
|
|
|
|
2024-08-25 14:32:47 -04:00
|
|
|
/// Returns the pdu directly from `eventid_pduid` only.
|
2024-08-08 17:18:30 +00:00
|
|
|
pub(super) async fn get_non_outlier_pdu(&self, event_id: &EventId) -> Result<PduEvent> {
|
|
|
|
let pduid = self.get_pdu_id(event_id).await?;
|
|
|
|
|
2024-09-29 13:13:09 +00:00
|
|
|
self.pduid_pdu.get(&pduid).await.deserialized()
|
2024-08-08 17:18:30 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Like get_non_outlier_pdu(), but without the expense of fetching and
|
|
|
|
/// parsing the PduEvent
|
2024-10-16 11:33:24 +00:00
|
|
|
pub(super) async fn non_outlier_pdu_exists(&self, event_id: &EventId) -> Result {
|
2024-08-08 17:18:30 +00:00
|
|
|
let pduid = self.get_pdu_id(event_id).await?;
|
|
|
|
|
2025-01-04 04:12:50 +00:00
|
|
|
self.pduid_pdu.exists(&pduid).await
|
2024-05-26 21:29:19 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Returns the pdu.
|
|
|
|
///
|
|
|
|
/// Checks the `eventid_outlierpdu` Tree if not found in the timeline.
|
2024-11-29 08:26:27 +00:00
|
|
|
pub(super) async fn get_pdu(&self, event_id: &EventId) -> Result<PduEvent> {
|
2024-11-27 05:57:20 +00:00
|
|
|
let accepted = self.get_non_outlier_pdu(event_id).boxed();
|
|
|
|
let outlier = self
|
|
|
|
.eventid_outlierpdu
|
|
|
|
.get(event_id)
|
|
|
|
.map(Deserialized::deserialized)
|
|
|
|
.boxed();
|
2024-08-08 17:18:30 +00:00
|
|
|
|
2024-11-27 05:57:20 +00:00
|
|
|
select_ok([accepted, outlier]).await.map(at!(0))
|
2024-08-08 17:18:30 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Like get_non_outlier_pdu(), but without the expense of fetching and
|
|
|
|
/// parsing the PduEvent
|
2025-01-04 04:12:50 +00:00
|
|
|
#[inline]
|
2024-10-16 11:33:24 +00:00
|
|
|
pub(super) async fn outlier_pdu_exists(&self, event_id: &EventId) -> Result {
|
2025-01-04 04:12:50 +00:00
|
|
|
self.eventid_outlierpdu.exists(event_id).await
|
2024-08-08 17:18:30 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Like get_pdu(), but without the expense of fetching and parsing the data
|
2025-01-04 04:12:50 +00:00
|
|
|
pub(super) async fn pdu_exists(&self, event_id: &EventId) -> Result {
|
|
|
|
let non_outlier = self.non_outlier_pdu_exists(event_id).boxed();
|
|
|
|
let outlier = self.outlier_pdu_exists(event_id).boxed();
|
2024-08-08 17:18:30 +00:00
|
|
|
|
2025-01-04 04:12:50 +00:00
|
|
|
select_ok([non_outlier, outlier]).await.map(at!(0))
|
2024-05-26 21:29:19 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Returns the pdu.
|
|
|
|
///
|
|
|
|
/// This does __NOT__ check the outliers `Tree`.
|
2024-11-02 06:12:54 +00:00
|
|
|
pub(super) async fn get_pdu_from_id(&self, pdu_id: &RawPduId) -> Result<PduEvent> {
|
2024-09-29 13:13:09 +00:00
|
|
|
self.pduid_pdu.get(pdu_id).await.deserialized()
|
2024-05-26 21:29:19 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Returns the pdu as a `BTreeMap<String, CanonicalJsonValue>`.
|
2024-12-15 00:05:47 -05:00
|
|
|
pub(super) async fn get_pdu_json_from_id(
|
|
|
|
&self,
|
|
|
|
pdu_id: &RawPduId,
|
|
|
|
) -> Result<CanonicalJsonObject> {
|
2024-09-29 13:13:09 +00:00
|
|
|
self.pduid_pdu.get(pdu_id).await.deserialized()
|
2024-05-26 21:29:19 +00:00
|
|
|
}
|
|
|
|
|
2024-11-02 06:12:54 +00:00
|
|
|
pub(super) async fn append_pdu(
|
2024-12-15 00:05:47 -05:00
|
|
|
&self,
|
|
|
|
pdu_id: &RawPduId,
|
|
|
|
pdu: &PduEvent,
|
|
|
|
json: &CanonicalJsonObject,
|
|
|
|
count: PduCount,
|
2024-11-02 06:12:54 +00:00
|
|
|
) {
|
|
|
|
debug_assert!(matches!(count, PduCount::Normal(_)), "PduCount not Normal");
|
|
|
|
|
2024-10-07 17:54:27 +00:00
|
|
|
self.pduid_pdu.raw_put(pdu_id, Json(json));
|
2024-08-08 17:18:30 +00:00
|
|
|
self.eventid_pduid.insert(pdu.event_id.as_bytes(), pdu_id);
|
|
|
|
self.eventid_outlierpdu.remove(pdu.event_id.as_bytes());
|
2024-05-26 21:29:19 +00:00
|
|
|
}
|
|
|
|
|
2024-12-15 00:05:47 -05:00
|
|
|
pub(super) fn prepend_backfill_pdu(
|
|
|
|
&self,
|
|
|
|
pdu_id: &RawPduId,
|
|
|
|
event_id: &EventId,
|
|
|
|
json: &CanonicalJsonObject,
|
|
|
|
) {
|
2024-10-07 17:54:27 +00:00
|
|
|
self.pduid_pdu.raw_put(pdu_id, Json(json));
|
|
|
|
self.eventid_pduid.insert(event_id, pdu_id);
|
|
|
|
self.eventid_outlierpdu.remove(event_id);
|
2024-05-26 21:29:19 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Removes a pdu and creates a new one with the same id.
|
2024-11-02 06:12:54 +00:00
|
|
|
pub(super) async fn replace_pdu(
|
2024-12-15 00:05:47 -05:00
|
|
|
&self,
|
|
|
|
pdu_id: &RawPduId,
|
|
|
|
pdu_json: &CanonicalJsonObject,
|
2024-11-02 06:12:54 +00:00
|
|
|
) -> Result {
|
2024-09-29 13:13:09 +00:00
|
|
|
if self.pduid_pdu.get(pdu_id).await.is_not_found() {
|
2024-08-08 17:18:30 +00:00
|
|
|
return Err!(Request(NotFound("PDU does not exist.")));
|
2024-05-26 21:29:19 +00:00
|
|
|
}
|
|
|
|
|
2024-10-16 11:33:24 +00:00
|
|
|
self.pduid_pdu.raw_put(pdu_id, Json(pdu_json));
|
2024-08-08 17:18:30 +00:00
|
|
|
|
2024-05-26 21:29:19 +00:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Returns an iterator over all events and their tokens in a room that
|
|
|
|
/// happened before the event with id `until` in reverse-chronological
|
|
|
|
/// order.
|
2025-01-04 04:12:50 +00:00
|
|
|
pub(super) fn pdus_rev<'a>(
|
2024-12-15 00:05:47 -05:00
|
|
|
&'a self,
|
|
|
|
user_id: Option<&'a UserId>,
|
|
|
|
room_id: &'a RoomId,
|
|
|
|
until: PduCount,
|
2025-01-04 04:12:50 +00:00
|
|
|
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
|
|
|
|
self.count_to_id(room_id, until, Direction::Backward)
|
|
|
|
.map_ok(move |current| {
|
|
|
|
let prefix = current.shortroomid();
|
|
|
|
self.pduid_pdu
|
|
|
|
.rev_raw_stream_from(¤t)
|
|
|
|
.ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
|
|
|
|
.ready_and_then(move |item| Self::each_pdu(item, user_id))
|
|
|
|
})
|
|
|
|
.try_flatten_stream()
|
2024-05-26 21:29:19 +00:00
|
|
|
}
|
|
|
|
|
2025-01-04 04:12:50 +00:00
|
|
|
pub(super) fn pdus<'a>(
|
2024-12-15 00:05:47 -05:00
|
|
|
&'a self,
|
|
|
|
user_id: Option<&'a UserId>,
|
|
|
|
room_id: &'a RoomId,
|
|
|
|
from: PduCount,
|
2025-01-04 04:12:50 +00:00
|
|
|
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
|
|
|
|
self.count_to_id(room_id, from, Direction::Forward)
|
|
|
|
.map_ok(move |current| {
|
|
|
|
let prefix = current.shortroomid();
|
|
|
|
self.pduid_pdu
|
|
|
|
.raw_stream_from(¤t)
|
|
|
|
.ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
|
|
|
|
.ready_and_then(move |item| Self::each_pdu(item, user_id))
|
|
|
|
})
|
|
|
|
.try_flatten_stream()
|
2024-08-08 17:18:30 +00:00
|
|
|
}
|
|
|
|
|
2025-01-04 04:12:50 +00:00
|
|
|
fn each_pdu((pdu_id, pdu): KeyVal<'_>, user_id: Option<&UserId>) -> Result<PdusIterItem> {
|
2024-11-02 06:12:54 +00:00
|
|
|
let pdu_id: RawPduId = pdu_id.into();
|
|
|
|
|
2025-01-04 04:12:50 +00:00
|
|
|
let mut pdu = serde_json::from_slice::<PduEvent>(pdu)?;
|
2024-08-08 17:18:30 +00:00
|
|
|
|
2024-11-07 04:49:01 +00:00
|
|
|
if Some(pdu.sender.borrow()) != user_id {
|
2024-08-08 17:18:30 +00:00
|
|
|
pdu.remove_transaction_id().log_err().ok();
|
|
|
|
}
|
|
|
|
|
|
|
|
pdu.add_age().log_err().ok();
|
|
|
|
|
2025-01-04 04:12:50 +00:00
|
|
|
Ok((pdu_id.pdu_count(), pdu))
|
2024-05-26 21:29:19 +00:00
|
|
|
}
|
|
|
|
|
2024-05-27 03:17:20 +00:00
|
|
|
pub(super) fn increment_notification_counts(
|
2024-12-15 00:05:47 -05:00
|
|
|
&self,
|
|
|
|
room_id: &RoomId,
|
|
|
|
notifies: Vec<OwnedUserId>,
|
|
|
|
highlights: Vec<OwnedUserId>,
|
2024-08-08 17:18:30 +00:00
|
|
|
) {
|
|
|
|
let _cork = self.db.cork();
|
|
|
|
|
2024-05-26 21:29:19 +00:00
|
|
|
for user in notifies {
|
|
|
|
let mut userroom_id = user.as_bytes().to_vec();
|
|
|
|
userroom_id.push(0xFF);
|
|
|
|
userroom_id.extend_from_slice(room_id.as_bytes());
|
2024-08-08 17:18:30 +00:00
|
|
|
increment(&self.userroomid_notificationcount, &userroom_id);
|
2024-05-26 21:29:19 +00:00
|
|
|
}
|
2024-08-08 17:18:30 +00:00
|
|
|
|
2024-05-26 21:29:19 +00:00
|
|
|
for user in highlights {
|
|
|
|
let mut userroom_id = user.as_bytes().to_vec();
|
|
|
|
userroom_id.push(0xFF);
|
|
|
|
userroom_id.extend_from_slice(room_id.as_bytes());
|
2024-08-08 17:18:30 +00:00
|
|
|
increment(&self.userroomid_highlightcount, &userroom_id);
|
2024-05-26 21:29:19 +00:00
|
|
|
}
|
|
|
|
}
|
2024-07-18 06:37:47 +00:00
|
|
|
|
2024-12-15 00:05:47 -05:00
|
|
|
async fn count_to_id(
|
|
|
|
&self,
|
|
|
|
room_id: &RoomId,
|
|
|
|
shorteventid: PduCount,
|
|
|
|
dir: Direction,
|
|
|
|
) -> Result<RawPduId> {
|
2024-11-02 06:12:54 +00:00
|
|
|
let shortroomid: ShortRoomId = self
|
2024-07-18 06:37:47 +00:00
|
|
|
.services
|
|
|
|
.short
|
2024-08-08 17:18:30 +00:00
|
|
|
.get_shortroomid(room_id)
|
|
|
|
.await
|
2024-11-02 06:12:54 +00:00
|
|
|
.map_err(|e| err!(Request(NotFound("Room {room_id:?} not found: {e:?}"))))?;
|
2024-08-08 17:18:30 +00:00
|
|
|
|
2024-07-18 06:37:47 +00:00
|
|
|
// +1 so we don't send the base event
|
2024-11-02 06:12:54 +00:00
|
|
|
let pdu_id = PduId {
|
|
|
|
shortroomid,
|
2024-11-11 05:00:29 +00:00
|
|
|
shorteventid: shorteventid.saturating_inc(dir),
|
2024-07-18 06:37:47 +00:00
|
|
|
};
|
2024-05-26 21:29:19 +00:00
|
|
|
|
2024-11-02 06:12:54 +00:00
|
|
|
Ok(pdu_id.into())
|
2024-05-26 21:29:19 +00:00
|
|
|
}
|
|
|
|
}
|
2024-08-08 17:18:30 +00:00
|
|
|
|
|
|
|
//TODO: this is an ABA
|
|
|
|
fn increment(db: &Arc<Map>, key: &[u8]) {
|
2024-09-29 07:37:43 +00:00
|
|
|
let old = db.get_blocking(key);
|
2024-08-08 17:18:30 +00:00
|
|
|
let new = utils::increment(old.ok().as_deref());
|
2024-10-07 17:54:27 +00:00
|
|
|
db.insert(key, new);
|
2024-08-08 17:18:30 +00:00
|
|
|
}
|