1
0
Fork 0
mirror of https://gitlab.com/famedly/conduit.git synced 2025-06-27 16:35:59 +00:00
conduit/src/database/key_value/rooms/state_accessor.rs

187 lines
5.9 KiB
Rust
Raw Normal View History

2022-12-18 06:37:03 +01:00
use std::{collections::HashMap, sync::Arc};
2022-10-05 20:34:31 +02:00
use crate::{database::KeyValueDatabase, service, services, utils, Error, PduEvent, Result};
use async_trait::async_trait;
2022-10-05 20:34:31 +02:00
use ruma::{events::StateEventType, EventId, RoomId};
#[async_trait]
2022-10-05 18:36:12 +02:00
impl service::rooms::state_accessor::Data for KeyValueDatabase {
2022-12-18 06:37:03 +01:00
async fn state_full_ids(&self, shortstatehash: u64) -> Result<HashMap<u64, Arc<EventId>>> {
2022-10-05 20:34:31 +02:00
let full_state = services()
.rooms
.state_compressor
2021-08-12 23:04:00 +02:00
.load_shortstatehash_info(shortstatehash)?
.pop()
.expect("there is always one layer")
.1;
2022-12-18 06:37:03 +01:00
let mut result = HashMap::new();
2022-06-18 16:38:41 +02:00
let mut i = 0;
for compressed in full_state.iter() {
2022-10-05 20:34:31 +02:00
let parsed = services()
.rooms
.state_compressor
2023-12-23 19:48:14 -08:00
.parse_compressed_state_event(compressed)?;
2022-06-18 16:38:41 +02:00
result.insert(parsed.0, parsed.1);
i += 1;
if i % 100 == 0 {
tokio::task::yield_now().await;
}
}
Ok(result)
2021-03-17 22:30:25 +01:00
}
async fn state_full(
2021-03-17 22:30:25 +01:00
&self,
shortstatehash: u64,
2022-04-06 21:31:29 +02:00
) -> Result<HashMap<(StateEventType, String), Arc<PduEvent>>> {
2022-10-05 20:34:31 +02:00
let full_state = services()
.rooms
.state_compressor
2021-08-12 23:04:00 +02:00
.load_shortstatehash_info(shortstatehash)?
.pop()
.expect("there is always one layer")
.1;
2022-06-18 16:38:41 +02:00
let mut result = HashMap::new();
let mut i = 0;
for compressed in full_state.iter() {
2022-10-05 20:34:31 +02:00
let (_, eventid) = services()
.rooms
.state_compressor
2023-12-23 19:48:14 -08:00
.parse_compressed_state_event(compressed)?;
2022-10-05 12:45:54 +02:00
if let Some(pdu) = services().rooms.timeline.get_pdu(&eventid)? {
2022-06-18 16:38:41 +02:00
result.insert(
(
2022-04-06 21:31:29 +02:00
pdu.kind.to_string().into(),
pdu.state_key
.as_ref()
.ok_or_else(|| Error::bad_database("State event has no state key."))?
.clone(),
),
pdu,
2022-06-18 16:38:41 +02:00
);
}
i += 1;
if i % 100 == 0 {
tokio::task::yield_now().await;
}
}
Ok(result)
}
/// Returns a single PDU from `room_id` with key (`event_type`, `state_key`).
fn state_get_id(
&self,
2021-03-17 22:30:25 +01:00
shortstatehash: u64,
2022-04-06 21:31:29 +02:00
event_type: &StateEventType,
state_key: &str,
) -> Result<Option<Arc<EventId>>> {
2022-10-05 20:34:31 +02:00
let shortstatekey = match services()
.rooms
.short
.get_shortstatekey(event_type, state_key)?
{
2021-08-12 23:04:00 +02:00
Some(s) => s,
None => return Ok(None),
};
2022-10-05 20:34:31 +02:00
let full_state = services()
.rooms
.state_compressor
2021-08-12 23:04:00 +02:00
.load_shortstatehash_info(shortstatehash)?
.pop()
.expect("there is always one layer")
.1;
Ok(full_state
.iter()
2021-08-12 23:04:00 +02:00
.find(|bytes| bytes.starts_with(&shortstatekey.to_be_bytes()))
.and_then(|compressed| {
2022-10-05 20:34:31 +02:00
services()
.rooms
.state_compressor
2023-12-23 19:48:14 -08:00
.parse_compressed_state_event(compressed)
.ok()
.map(|(_, id)| id)
}))
}
2021-04-11 21:01:27 +02:00
/// Returns a single PDU from `room_id` with key (`event_type`, `state_key`).
fn state_get(
2021-04-11 21:01:27 +02:00
&self,
shortstatehash: u64,
2022-04-06 21:31:29 +02:00
event_type: &StateEventType,
2021-04-11 21:01:27 +02:00
state_key: &str,
2021-06-30 09:52:01 +02:00
) -> Result<Option<Arc<PduEvent>>> {
2021-04-11 21:01:27 +02:00
self.state_get_id(shortstatehash, event_type, state_key)?
2022-10-05 20:34:31 +02:00
.map_or(Ok(None), |event_id| {
services().rooms.timeline.get_pdu(&event_id)
})
2021-04-11 21:01:27 +02:00
}
/// Returns the state hash for this pdu.
fn pdu_shortstatehash(&self, event_id: &EventId) -> Result<Option<u64>> {
2021-03-17 22:30:25 +01:00
self.eventid_shorteventid
.get(event_id.as_bytes())?
.map_or(Ok(None), |shorteventid| {
self.shorteventid_shortstatehash
.get(&shorteventid)?
.map(|bytes| {
utils::u64_from_bytes(&bytes).map_err(|_| {
2021-03-17 22:30:25 +01:00
Error::bad_database(
"Invalid shortstatehash bytes in shorteventid_shortstatehash",
)
})
})
.transpose()
2021-03-17 22:30:25 +01:00
})
}
/// Returns the full room state.
async fn room_state_full(
&self,
room_id: &RoomId,
2022-04-06 21:31:29 +02:00
) -> Result<HashMap<(StateEventType, String), Arc<PduEvent>>> {
2022-10-05 20:34:31 +02:00
if let Some(current_shortstatehash) =
services().rooms.state.get_room_shortstatehash(room_id)?
{
2022-06-18 16:38:41 +02:00
self.state_full(current_shortstatehash).await
2020-09-12 21:30:07 +02:00
} else {
2021-07-18 20:43:39 +02:00
Ok(HashMap::new())
}
}
2021-04-11 21:01:27 +02:00
/// Returns a single PDU from `room_id` with key (`event_type`, `state_key`).
fn room_state_get_id(
2021-04-11 21:01:27 +02:00
&self,
room_id: &RoomId,
2022-04-06 21:31:29 +02:00
event_type: &StateEventType,
2021-04-11 21:01:27 +02:00
state_key: &str,
) -> Result<Option<Arc<EventId>>> {
2022-10-05 20:34:31 +02:00
if let Some(current_shortstatehash) =
services().rooms.state.get_room_shortstatehash(room_id)?
{
2021-04-11 21:01:27 +02:00
self.state_get_id(current_shortstatehash, event_type, state_key)
} else {
Ok(None)
}
}
/// Returns a single PDU from `room_id` with key (`event_type`, `state_key`).
fn room_state_get(
2020-06-12 13:18:25 +02:00
&self,
room_id: &RoomId,
2022-04-06 21:31:29 +02:00
event_type: &StateEventType,
2020-06-12 13:18:25 +02:00
state_key: &str,
2021-06-30 09:52:01 +02:00
) -> Result<Option<Arc<PduEvent>>> {
2022-10-05 20:34:31 +02:00
if let Some(current_shortstatehash) =
services().rooms.state.get_room_shortstatehash(room_id)?
{
2021-03-24 11:52:10 +01:00
self.state_get(current_shortstatehash, event_type, state_key)
2020-09-12 21:30:07 +02:00
} else {
Ok(None)
}
2020-06-12 13:18:25 +02:00
}
}