2022-07-10 17:23:26 +02:00
|
|
|
mod data;
|
2024-06-28 22:51:39 +00:00
|
|
|
|
2022-10-05 20:34:31 +02:00
|
|
|
use std::{
|
|
|
|
collections::{BTreeSet, HashSet},
|
|
|
|
sync::Arc,
|
|
|
|
};
|
2022-09-06 23:15:09 +02:00
|
|
|
|
2024-07-13 21:11:05 +00:00
|
|
|
use conduit::{debug, error, trace, validated, warn, Err, Result};
|
2024-05-26 21:29:19 +00:00
|
|
|
use data::Data;
|
2024-07-13 21:11:05 +00:00
|
|
|
use ruma::{EventId, RoomId};
|
2021-08-12 23:04:00 +02:00
|
|
|
|
2024-06-28 22:51:39 +00:00
|
|
|
use crate::services;
|
2021-08-14 19:07:50 +02:00
|
|
|
|
2024-05-09 15:59:08 -07:00
|
|
|
pub struct Service {
|
2024-05-27 03:17:20 +00:00
|
|
|
db: Data,
|
2022-07-10 17:23:26 +02:00
|
|
|
}
|
2022-06-20 12:08:58 +02:00
|
|
|
|
2024-07-04 03:26:19 +00:00
|
|
|
impl crate::Service for Service {
|
|
|
|
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
|
|
|
|
Ok(Arc::new(Self {
|
|
|
|
db: Data::new(args.server, args.db),
|
|
|
|
}))
|
2024-05-27 03:17:20 +00:00
|
|
|
}
|
|
|
|
|
2024-07-04 03:26:19 +00:00
|
|
|
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Service {
|
2024-05-09 15:59:08 -07:00
|
|
|
pub async fn event_ids_iter<'a>(
|
2024-04-10 13:55:09 -07:00
|
|
|
&self, room_id: &RoomId, starting_events_: Vec<Arc<EventId>>,
|
2022-10-05 18:36:12 +02:00
|
|
|
) -> Result<impl Iterator<Item = Arc<EventId>> + 'a> {
|
2024-04-10 13:55:09 -07:00
|
|
|
let mut starting_events: Vec<&EventId> = Vec::with_capacity(starting_events_.len());
|
|
|
|
for starting_event in &starting_events_ {
|
|
|
|
starting_events.push(starting_event);
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(self
|
|
|
|
.get_auth_chain(room_id, &starting_events)
|
|
|
|
.await?
|
|
|
|
.into_iter()
|
|
|
|
.filter_map(move |sid| services().rooms.short.get_eventid_from_short(sid).ok()))
|
|
|
|
}
|
|
|
|
|
2024-06-17 06:32:53 +00:00
|
|
|
#[tracing::instrument(skip_all, name = "auth_chain")]
|
2024-05-09 15:59:08 -07:00
|
|
|
pub async fn get_auth_chain(&self, room_id: &RoomId, starting_events: &[&EventId]) -> Result<Vec<u64>> {
|
2024-07-07 06:17:58 +00:00
|
|
|
const NUM_BUCKETS: usize = 50; //TODO: change possible w/o disrupting db?
|
2024-04-10 13:55:09 -07:00
|
|
|
const BUCKET: BTreeSet<(u64, &EventId)> = BTreeSet::new();
|
|
|
|
|
|
|
|
let started = std::time::Instant::now();
|
2024-07-07 06:17:58 +00:00
|
|
|
let mut buckets = [BUCKET; NUM_BUCKETS];
|
2024-07-07 04:46:16 +00:00
|
|
|
for (i, &short) in services()
|
2024-04-10 13:55:09 -07:00
|
|
|
.rooms
|
|
|
|
.short
|
|
|
|
.multi_get_or_create_shorteventid(starting_events)?
|
|
|
|
.iter()
|
|
|
|
.enumerate()
|
|
|
|
{
|
2024-07-07 06:17:58 +00:00
|
|
|
let bucket: usize = short.try_into()?;
|
|
|
|
let bucket: usize = validated!(bucket % NUM_BUCKETS)?;
|
|
|
|
buckets[bucket].insert((short, starting_events[i]));
|
2024-03-05 19:48:54 -05:00
|
|
|
}
|
|
|
|
|
2024-04-10 13:55:09 -07:00
|
|
|
debug!(
|
|
|
|
starting_events = ?starting_events.len(),
|
|
|
|
elapsed = ?started.elapsed(),
|
|
|
|
"start",
|
|
|
|
);
|
2024-03-05 19:48:54 -05:00
|
|
|
|
2024-06-01 00:49:56 -04:00
|
|
|
let mut hits: usize = 0;
|
|
|
|
let mut misses: usize = 0;
|
|
|
|
let mut full_auth_chain = Vec::with_capacity(buckets.len());
|
2022-10-05 18:36:12 +02:00
|
|
|
for chunk in buckets {
|
|
|
|
if chunk.is_empty() {
|
|
|
|
continue;
|
|
|
|
}
|
2024-03-05 19:48:54 -05:00
|
|
|
|
2022-10-05 18:36:12 +02:00
|
|
|
let chunk_key: Vec<u64> = chunk.iter().map(|(short, _)| short).copied().collect();
|
2024-06-07 03:40:58 -04:00
|
|
|
if let Some(cached) = self.get_cached_eventid_authchain(&chunk_key)? {
|
2024-05-04 19:24:48 +00:00
|
|
|
trace!("Found cache entry for whole chunk");
|
2022-10-05 18:36:12 +02:00
|
|
|
full_auth_chain.extend(cached.iter().copied());
|
2024-06-01 00:49:56 -04:00
|
|
|
hits = hits.saturating_add(1);
|
2022-10-05 18:36:12 +02:00
|
|
|
continue;
|
|
|
|
}
|
2024-03-05 19:48:54 -05:00
|
|
|
|
2024-06-01 00:49:56 -04:00
|
|
|
let mut hits2: usize = 0;
|
|
|
|
let mut misses2: usize = 0;
|
|
|
|
let mut chunk_cache = Vec::with_capacity(chunk.len());
|
2022-10-05 18:36:12 +02:00
|
|
|
for (sevent_id, event_id) in chunk {
|
2024-06-07 03:40:58 -04:00
|
|
|
if let Some(cached) = self.get_cached_eventid_authchain(&[sevent_id])? {
|
2024-05-04 19:24:48 +00:00
|
|
|
trace!(?event_id, "Found cache entry for event");
|
2022-10-05 18:36:12 +02:00
|
|
|
chunk_cache.extend(cached.iter().copied());
|
2024-06-01 00:49:56 -04:00
|
|
|
hits2 = hits2.saturating_add(1);
|
2022-10-05 18:36:12 +02:00
|
|
|
} else {
|
2024-04-10 13:55:09 -07:00
|
|
|
let auth_chain = self.get_auth_chain_inner(room_id, event_id)?;
|
2024-06-07 03:40:58 -04:00
|
|
|
self.cache_auth_chain(vec![sevent_id], &auth_chain)?;
|
2024-04-10 13:55:09 -07:00
|
|
|
chunk_cache.extend(auth_chain.iter());
|
2024-06-01 00:49:56 -04:00
|
|
|
misses2 = misses2.saturating_add(1);
|
2022-12-18 07:47:18 +01:00
|
|
|
debug!(
|
|
|
|
event_id = ?event_id,
|
|
|
|
chain_length = ?auth_chain.len(),
|
2024-04-10 13:55:09 -07:00
|
|
|
chunk_cache_length = ?chunk_cache.len(),
|
|
|
|
elapsed = ?started.elapsed(),
|
2022-12-18 07:47:18 +01:00
|
|
|
"Cache missed event"
|
2022-10-05 18:36:12 +02:00
|
|
|
);
|
|
|
|
};
|
|
|
|
}
|
2024-04-10 13:55:09 -07:00
|
|
|
|
|
|
|
chunk_cache.sort_unstable();
|
|
|
|
chunk_cache.dedup();
|
2024-06-07 03:40:58 -04:00
|
|
|
self.cache_auth_chain_vec(chunk_key, &chunk_cache)?;
|
2024-04-10 13:55:09 -07:00
|
|
|
full_auth_chain.extend(chunk_cache.iter());
|
2024-06-01 00:49:56 -04:00
|
|
|
misses = misses.saturating_add(1);
|
2022-12-18 07:47:18 +01:00
|
|
|
debug!(
|
|
|
|
chunk_cache_length = ?chunk_cache.len(),
|
|
|
|
hits = ?hits2,
|
|
|
|
misses = ?misses2,
|
2024-04-10 13:55:09 -07:00
|
|
|
elapsed = ?started.elapsed(),
|
2022-12-18 07:47:18 +01:00
|
|
|
"Chunk missed",
|
2022-10-05 18:36:12 +02:00
|
|
|
);
|
|
|
|
}
|
2024-03-05 19:48:54 -05:00
|
|
|
|
2024-06-02 00:27:03 +00:00
|
|
|
full_auth_chain.sort_unstable();
|
2024-04-10 13:55:09 -07:00
|
|
|
full_auth_chain.dedup();
|
2022-12-18 07:47:18 +01:00
|
|
|
debug!(
|
|
|
|
chain_length = ?full_auth_chain.len(),
|
|
|
|
hits = ?hits,
|
|
|
|
misses = ?misses,
|
2024-04-10 13:55:09 -07:00
|
|
|
elapsed = ?started.elapsed(),
|
|
|
|
"done",
|
2022-10-05 18:36:12 +02:00
|
|
|
);
|
2024-03-05 19:48:54 -05:00
|
|
|
|
2024-04-10 13:55:09 -07:00
|
|
|
Ok(full_auth_chain)
|
2022-10-05 18:36:12 +02:00
|
|
|
}
|
2024-03-05 19:48:54 -05:00
|
|
|
|
2024-05-04 19:24:48 +00:00
|
|
|
#[tracing::instrument(skip(self, room_id))]
|
2022-10-05 20:34:31 +02:00
|
|
|
fn get_auth_chain_inner(&self, room_id: &RoomId, event_id: &EventId) -> Result<HashSet<u64>> {
|
2022-10-05 18:36:12 +02:00
|
|
|
let mut todo = vec![Arc::from(event_id)];
|
|
|
|
let mut found = HashSet::new();
|
2024-03-05 19:48:54 -05:00
|
|
|
|
2022-10-05 18:36:12 +02:00
|
|
|
while let Some(event_id) = todo.pop() {
|
2024-05-04 19:24:48 +00:00
|
|
|
trace!(?event_id, "processing auth event");
|
|
|
|
|
2022-10-05 18:36:12 +02:00
|
|
|
match services().rooms.timeline.get_pdu(&event_id) {
|
|
|
|
Ok(Some(pdu)) => {
|
|
|
|
if pdu.room_id != room_id {
|
2024-07-13 21:11:05 +00:00
|
|
|
return Err!(Request(Forbidden(
|
|
|
|
"auth event {event_id:?} for incorrect room {} which is not {}",
|
|
|
|
pdu.room_id,
|
|
|
|
room_id
|
|
|
|
)));
|
2022-10-05 18:36:12 +02:00
|
|
|
}
|
|
|
|
for auth_event in &pdu.auth_events {
|
2024-03-25 17:05:11 -04:00
|
|
|
let sauthevent = services()
|
|
|
|
.rooms
|
|
|
|
.short
|
|
|
|
.get_or_create_shorteventid(auth_event)?;
|
2024-03-05 19:48:54 -05:00
|
|
|
|
2024-04-08 07:58:49 -07:00
|
|
|
if found.insert(sauthevent) {
|
2024-05-04 19:24:48 +00:00
|
|
|
trace!(?event_id, ?auth_event, "adding auth event to processing queue");
|
2022-10-05 18:36:12 +02:00
|
|
|
todo.push(auth_event.clone());
|
2024-03-05 19:48:54 -05:00
|
|
|
}
|
2022-10-05 18:36:12 +02:00
|
|
|
}
|
|
|
|
},
|
|
|
|
Ok(None) => {
|
2022-12-18 07:47:18 +01:00
|
|
|
warn!(?event_id, "Could not find pdu mentioned in auth events");
|
2022-10-05 18:36:12 +02:00
|
|
|
},
|
2022-12-18 07:47:18 +01:00
|
|
|
Err(error) => {
|
|
|
|
error!(?event_id, ?error, "Could not load event in auth chain");
|
2024-03-05 19:48:54 -05:00
|
|
|
},
|
2022-10-05 18:36:12 +02:00
|
|
|
}
|
|
|
|
}
|
2024-03-05 19:48:54 -05:00
|
|
|
|
2022-10-05 18:36:12 +02:00
|
|
|
Ok(found)
|
|
|
|
}
|
2024-04-08 09:01:28 -07:00
|
|
|
|
2024-05-09 15:59:08 -07:00
|
|
|
pub fn get_cached_eventid_authchain(&self, key: &[u64]) -> Result<Option<Arc<[u64]>>> {
|
2024-04-08 09:01:28 -07:00
|
|
|
self.db.get_cached_eventid_authchain(key)
|
|
|
|
}
|
|
|
|
|
2024-07-07 19:03:15 +00:00
|
|
|
#[tracing::instrument(skip(self), level = "debug")]
|
2024-05-09 15:59:08 -07:00
|
|
|
pub fn cache_auth_chain(&self, key: Vec<u64>, auth_chain: &HashSet<u64>) -> Result<()> {
|
2024-04-08 09:01:28 -07:00
|
|
|
self.db
|
|
|
|
.cache_auth_chain(key, auth_chain.iter().copied().collect::<Arc<[u64]>>())
|
|
|
|
}
|
2024-04-10 13:55:09 -07:00
|
|
|
|
2024-07-07 19:03:15 +00:00
|
|
|
#[tracing::instrument(skip(self), level = "debug")]
|
2024-05-09 15:59:08 -07:00
|
|
|
pub fn cache_auth_chain_vec(&self, key: Vec<u64>, auth_chain: &Vec<u64>) -> Result<()> {
|
2024-04-10 13:55:09 -07:00
|
|
|
self.db
|
|
|
|
.cache_auth_chain(key, auth_chain.iter().copied().collect::<Arc<[u64]>>())
|
|
|
|
}
|
2024-07-02 22:40:58 +00:00
|
|
|
|
|
|
|
pub fn get_cache_usage(&self) -> (usize, usize) {
|
|
|
|
let cache = self.db.auth_chain_cache.lock().expect("locked");
|
|
|
|
(cache.len(), cache.capacity())
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn clear_cache(&self) { self.db.auth_chain_cache.lock().expect("locked").clear(); }
|
2022-07-10 17:23:26 +02:00
|
|
|
}
|