1
0
Fork 0
mirror of https://gitlab.com/famedly/conduit.git synced 2025-08-01 17:38:36 +00:00
conduit/src/service/rooms/state/mod.rs

433 lines
14 KiB
Rust
Raw Normal View History

mod data;
2022-10-05 20:34:31 +02:00
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
pub use data::Data;
2022-10-05 20:34:31 +02:00
use ruma::{
api::client::error::ErrorKind,
2022-10-05 20:34:31 +02:00
events::{
room::{create::RoomCreateEventContent, member::MembershipState},
2023-02-26 16:29:06 +01:00
AnyStrippedStateEvent, StateEventType, TimelineEventType,
2022-10-05 20:34:31 +02:00
},
serde::Raw,
state_res::{self, StateMap},
2022-10-09 17:25:06 +02:00
EventId, OwnedEventId, RoomId, RoomVersionId, UserId,
2022-10-05 20:34:31 +02:00
};
use serde::Deserialize;
2022-10-05 18:36:12 +02:00
use tokio::sync::MutexGuard;
use tracing::warn;
2022-10-05 20:34:31 +02:00
use crate::{services, utils::calculate_hash, Error, PduEvent, Result};
2022-09-07 13:25:51 +02:00
use super::state_compressor::CompressedStateEvent;
2022-10-05 12:45:54 +02:00
pub struct Service {
2022-10-08 13:02:52 +02:00
pub db: &'static dyn Data,
2022-06-20 11:31:27 +02:00
}
2022-10-05 12:45:54 +02:00
impl Service {
2022-06-20 12:08:58 +02:00
/// Set the room to the given statehash and update caches.
2022-10-05 15:33:57 +02:00
pub async fn force_state(
2020-09-13 22:24:36 +02:00
&self,
room_id: &RoomId,
2022-06-20 12:08:58 +02:00
shortstatehash: u64,
statediffnew: Arc<HashSet<CompressedStateEvent>>,
_statediffremoved: Arc<HashSet<CompressedStateEvent>>,
state_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex
2020-09-13 22:24:36 +02:00
) -> Result<()> {
for event_id in statediffnew.iter().filter_map(|new| {
2022-10-05 20:34:31 +02:00
services()
.rooms
.state_compressor
2023-12-23 19:48:14 -08:00
.parse_compressed_state_event(new)
.ok()
.map(|(_, id)| id)
}) {
2022-09-07 13:25:51 +02:00
let pdu = match services().rooms.timeline.get_pdu_json(&event_id)? {
Some(pdu) => pdu,
None => continue,
};
let pdu: PduEvent = match serde_json::from_str(
&serde_json::to_string(&pdu).expect("CanonicalJsonObj can be serialized to JSON"),
) {
Ok(pdu) => pdu,
Err(_) => continue,
};
2023-07-02 22:50:50 +02:00
match pdu.kind {
TimelineEventType::RoomMember => {
#[derive(Deserialize)]
struct ExtractMembership {
membership: MembershipState,
}
2023-07-02 22:50:50 +02:00
let membership =
match serde_json::from_str::<ExtractMembership>(pdu.content.get()) {
Ok(e) => e.membership,
Err(_) => continue,
};
2023-07-02 22:50:50 +02:00
let state_key = match pdu.state_key {
Some(k) => k,
None => continue,
};
2023-07-02 22:50:50 +02:00
let user_id = match UserId::parse(state_key) {
Ok(id) => id,
Err(_) => continue,
};
2023-07-02 22:50:50 +02:00
services().rooms.state_cache.update_membership(
room_id,
&user_id,
membership,
&pdu.sender,
None,
false,
)?;
}
TimelineEventType::SpaceChild => {
services()
.rooms
.spaces
.roomid_spacehierarchy_cache
2023-07-02 22:50:50 +02:00
.lock()
.await
2023-07-02 22:50:50 +02:00
.remove(&pdu.room_id);
}
_ => continue,
}
2020-09-13 22:24:36 +02:00
}
2022-10-05 15:33:57 +02:00
services().rooms.state_cache.update_joined_count(room_id)?;
2021-08-17 00:22:52 +02:00
2022-10-08 13:03:07 +02:00
self.db
2022-10-10 14:09:11 +02:00
.set_room_state(room_id, shortstatehash, state_lock)?;
2022-10-05 12:45:54 +02:00
Ok(())
}
2022-06-19 22:56:14 +02:00
/// Generates a new StateHash and associates it with the incoming event.
///
/// This adds all current state events (not including the incoming event)
/// to `stateid_pduid` and adds the incoming event to `eventid_statehash`.
#[tracing::instrument(skip(self, state_ids_compressed))]
pub fn set_event_state(
&self,
event_id: &EventId,
2021-08-12 23:04:00 +02:00
room_id: &RoomId,
state_ids_compressed: Arc<HashSet<CompressedStateEvent>>,
2022-10-05 09:34:25 +02:00
) -> Result<u64> {
2022-10-05 20:34:31 +02:00
let shorteventid = services()
.rooms
.short
.get_or_create_shorteventid(event_id)?;
let previous_shortstatehash = self.db.get_room_shortstatehash(room_id)?;
2021-08-12 23:04:00 +02:00
let state_hash = calculate_hash(
&state_ids_compressed
.iter()
.map(|s| &s[..])
.collect::<Vec<_>>(),
);
2022-10-05 20:34:31 +02:00
let (shortstatehash, already_existed) = services()
.rooms
.short
.get_or_create_shortstatehash(&state_hash)?;
if !already_existed {
2022-10-05 20:34:31 +02:00
let states_parents = previous_shortstatehash.map_or_else(
|| Ok(Vec::new()),
|p| {
services()
.rooms
.state_compressor
.load_shortstatehash_info(p)
},
)?;
2021-08-12 23:04:00 +02:00
let (statediffnew, statediffremoved) =
if let Some(parent_stateinfo) = states_parents.last() {
let statediffnew: HashSet<_> = state_ids_compressed
2021-08-12 23:04:00 +02:00
.difference(&parent_stateinfo.1)
.copied()
.collect();
2021-08-12 23:04:00 +02:00
let statediffremoved: HashSet<_> = parent_stateinfo
2021-08-12 23:04:00 +02:00
.1
.difference(&state_ids_compressed)
.copied()
.collect();
2021-08-12 23:04:00 +02:00
(Arc::new(statediffnew), Arc::new(statediffremoved))
2021-08-12 23:04:00 +02:00
} else {
(state_ids_compressed, Arc::new(HashSet::new()))
2021-08-12 23:04:00 +02:00
};
2022-10-05 18:36:12 +02:00
services().rooms.state_compressor.save_state_from_diff(
2021-08-12 23:04:00 +02:00
shortstatehash,
2021-08-31 21:20:03 +02:00
statediffnew,
statediffremoved,
2021-08-12 23:04:00 +02:00
1_000_000, // high number because no state will be based on this one
states_parents,
)?;
}
2022-10-05 18:36:12 +02:00
self.db.set_event_state(shorteventid, shortstatehash)?;
2022-10-05 09:34:25 +02:00
Ok(shortstatehash)
}
/// Generates a new StateHash and associates it with the incoming event.
///
/// This adds all current state events (not including the incoming event)
2021-03-17 22:30:25 +01:00
/// to `stateid_pduid` and adds the incoming event to `eventid_statehash`.
#[tracing::instrument(skip(self, new_pdu))]
2022-10-05 20:34:31 +02:00
pub fn append_to_state(&self, new_pdu: &PduEvent) -> Result<u64> {
let shorteventid = services()
.rooms
.short
.get_or_create_shorteventid(&new_pdu.event_id)?;
let previous_shortstatehash = self.get_room_shortstatehash(&new_pdu.room_id)?;
2021-08-12 23:04:00 +02:00
if let Some(p) = previous_shortstatehash {
2022-10-05 18:36:12 +02:00
self.db.set_event_state(shorteventid, p)?;
2021-08-12 23:04:00 +02:00
}
2021-03-17 22:30:25 +01:00
2020-09-12 21:30:07 +02:00
if let Some(state_key) = &new_pdu.state_key {
2022-10-05 20:34:31 +02:00
let states_parents = previous_shortstatehash.map_or_else(
|| Ok(Vec::new()),
|p| {
services()
.rooms
.state_compressor
.load_shortstatehash_info(p)
},
2022-04-06 21:31:29 +02:00
)?;
2020-09-12 21:30:07 +02:00
2022-10-05 20:34:31 +02:00
let shortstatekey = services()
.rooms
.short
.get_or_create_shortstatekey(&new_pdu.kind.to_string().into(), state_key)?;
let new = services()
.rooms
.state_compressor
.compress_state_event(shortstatekey, &new_pdu.event_id)?;
2021-08-12 23:04:00 +02:00
let replaces = states_parents
.last()
.map(|info| {
info.1
.iter()
.find(|bytes| bytes.starts_with(&shortstatekey.to_be_bytes()))
})
.unwrap_or_default();
if Some(&new) == replaces {
return Ok(previous_shortstatehash.expect("must exist"));
}
2021-08-12 23:04:00 +02:00
// TODO: statehash with deterministic inputs
2022-09-07 13:25:51 +02:00
let shortstatehash = services().globals.next_count()?;
2021-08-12 23:04:00 +02:00
let mut statediffnew = HashSet::new();
statediffnew.insert(new);
let mut statediffremoved = HashSet::new();
if let Some(replaces) = replaces {
statediffremoved.insert(*replaces);
2021-08-12 23:04:00 +02:00
}
2022-10-05 18:36:12 +02:00
services().rooms.state_compressor.save_state_from_diff(
2021-08-12 23:04:00 +02:00
shortstatehash,
Arc::new(statediffnew),
Arc::new(statediffremoved),
2021-08-12 23:04:00 +02:00
2,
states_parents,
)?;
2021-03-17 22:30:25 +01:00
Ok(shortstatehash)
2020-09-12 21:30:07 +02:00
} else {
2021-08-12 23:04:00 +02:00
Ok(previous_shortstatehash.expect("first event in room must be a state event"))
2020-09-12 21:30:07 +02:00
}
}
#[tracing::instrument(skip(self, invite_event))]
2021-04-25 14:10:07 +02:00
pub fn calculate_invite_state(
&self,
invite_event: &PduEvent,
) -> Result<Vec<Raw<AnyStrippedStateEvent>>> {
let mut state = Vec::new();
// Add recommended events
2022-10-05 20:34:31 +02:00
if let Some(e) = services().rooms.state_accessor.room_state_get(
&invite_event.room_id,
&StateEventType::RoomCreate,
"",
)? {
2021-04-25 14:10:07 +02:00
state.push(e.to_stripped_state_event());
}
2022-10-05 20:34:31 +02:00
if let Some(e) = services().rooms.state_accessor.room_state_get(
&invite_event.room_id,
&StateEventType::RoomJoinRules,
"",
)? {
2021-04-25 14:10:07 +02:00
state.push(e.to_stripped_state_event());
}
2022-10-05 18:36:12 +02:00
if let Some(e) = services().rooms.state_accessor.room_state_get(
2022-04-06 21:31:29 +02:00
&invite_event.room_id,
&StateEventType::RoomCanonicalAlias,
"",
)? {
state.push(e.to_stripped_state_event());
}
2022-10-05 20:34:31 +02:00
if let Some(e) = services().rooms.state_accessor.room_state_get(
&invite_event.room_id,
&StateEventType::RoomAvatar,
"",
)? {
2021-04-25 14:10:07 +02:00
state.push(e.to_stripped_state_event());
}
2022-10-05 20:34:31 +02:00
if let Some(e) = services().rooms.state_accessor.room_state_get(
&invite_event.room_id,
&StateEventType::RoomName,
"",
)? {
2021-04-25 14:10:07 +02:00
state.push(e.to_stripped_state_event());
}
2022-10-05 18:36:12 +02:00
if let Some(e) = services().rooms.state_accessor.room_state_get(
2021-04-25 14:10:07 +02:00
&invite_event.room_id,
2022-04-06 21:31:29 +02:00
&StateEventType::RoomMember,
2021-04-25 14:10:07 +02:00
invite_event.sender.as_str(),
)? {
state.push(e.to_stripped_state_event());
}
state.push(invite_event.to_stripped_state_event());
Ok(state)
}
/// Set the state hash to a new version, but does not update state_cache.
#[tracing::instrument(skip(self))]
2022-10-05 20:34:31 +02:00
pub fn set_room_state(
&self,
room_id: &RoomId,
shortstatehash: u64,
2022-10-05 18:36:12 +02:00
mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex
2022-10-05 20:34:31 +02:00
) -> Result<()> {
2022-10-05 18:36:12 +02:00
self.db.set_room_state(room_id, shortstatehash, mutex_lock)
2020-12-31 14:52:08 +01:00
}
/// Returns the room's version.
#[tracing::instrument(skip(self))]
pub fn get_room_version(&self, room_id: &RoomId) -> Result<RoomVersionId> {
2022-10-05 20:34:31 +02:00
let create_event = services().rooms.state_accessor.room_state_get(
room_id,
&StateEventType::RoomCreate,
"",
)?;
let create_event_content: RoomCreateEventContent = create_event
.as_ref()
.map(|create_event| {
serde_json::from_str(create_event.content.get()).map_err(|e| {
warn!("Invalid create event: {}", e);
Error::bad_database("Invalid create event in db.")
})
})
.transpose()?
.ok_or_else(|| Error::BadRequest(ErrorKind::InvalidParam, "No create event found"))?;
Ok(create_event_content.room_version)
}
2022-09-07 13:25:51 +02:00
pub fn get_room_shortstatehash(&self, room_id: &RoomId) -> Result<Option<u64>> {
self.db.get_room_shortstatehash(room_id)
}
2022-10-05 09:34:25 +02:00
pub fn get_forward_extremities(&self, room_id: &RoomId) -> Result<HashSet<Arc<EventId>>> {
self.db.get_forward_extremities(room_id)
}
2022-10-05 18:36:12 +02:00
2022-11-21 09:51:39 +01:00
pub fn set_forward_extremities(
2022-10-05 20:34:31 +02:00
&self,
2022-10-05 20:33:55 +02:00
room_id: &RoomId,
2022-10-09 17:25:06 +02:00
event_ids: Vec<OwnedEventId>,
2022-10-05 20:33:55 +02:00
state_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex
) -> Result<()> {
2022-10-05 20:34:31 +02:00
self.db
.set_forward_extremities(room_id, event_ids, state_lock)
2022-10-05 20:33:55 +02:00
}
2022-10-05 18:36:12 +02:00
/// This fetches auth events from the current state.
#[tracing::instrument(skip(self))]
pub fn get_auth_events(
&self,
room_id: &RoomId,
2023-02-26 16:29:06 +01:00
kind: &TimelineEventType,
2022-10-05 18:36:12 +02:00
sender: &UserId,
state_key: Option<&str>,
content: &serde_json::value::RawValue,
) -> Result<StateMap<Arc<PduEvent>>> {
2022-10-05 20:34:31 +02:00
let shortstatehash = if let Some(current_shortstatehash) =
services().rooms.state.get_room_shortstatehash(room_id)?
{
current_shortstatehash
} else {
return Ok(HashMap::new());
};
2022-10-05 18:36:12 +02:00
let auth_events = state_res::auth_types_for_event(kind, sender, state_key, content)
.expect("content is a valid JSON object");
let mut sauthevents = auth_events
.into_iter()
.filter_map(|(event_type, state_key)| {
2022-10-05 20:34:31 +02:00
services()
.rooms
.short
.get_shortstatekey(&event_type.to_string().into(), &state_key)
2022-10-05 18:36:12 +02:00
.ok()
.flatten()
.map(|s| (s, (event_type, state_key)))
})
.collect::<HashMap<_, _>>();
2022-10-05 20:34:31 +02:00
let full_state = services()
.rooms
.state_compressor
2022-10-05 18:36:12 +02:00
.load_shortstatehash_info(shortstatehash)?
.pop()
.expect("there is always one layer")
.1;
Ok(full_state
.iter()
2022-10-05 20:34:31 +02:00
.filter_map(|compressed| {
services()
.rooms
.state_compressor
2023-12-23 19:48:14 -08:00
.parse_compressed_state_event(compressed)
2022-10-05 20:34:31 +02:00
.ok()
})
2022-10-05 18:36:12 +02:00
.filter_map(|(shortstatekey, event_id)| {
sauthevents.remove(&shortstatekey).map(|k| (k, event_id))
})
2022-10-05 20:34:31 +02:00
.filter_map(|(k, event_id)| {
services()
.rooms
.timeline
.get_pdu(&event_id)
.ok()
.flatten()
.map(|pdu| (k, pdu))
})
2022-10-05 18:36:12 +02:00
.collect())
}
2022-06-20 11:31:27 +02:00
}