1
0
Fork 0
mirror of https://gitlab.com/famedly/conduit.git synced 2025-08-01 17:38:36 +00:00
conduit/src/service/rooms/state/mod.rs

261 lines
8.5 KiB
Rust
Raw Normal View History

mod data;
pub use data::Data;
use crate::service::*;
2022-06-20 11:31:27 +02:00
pub struct Service<D: Data> {
db: D,
}
impl Service<_> {
2022-06-20 12:08:58 +02:00
/// Set the room to the given statehash and update caches.
#[tracing::instrument(skip(self, new_state_ids_compressed, db))]
2020-09-13 22:24:36 +02:00
pub fn force_state(
&self,
room_id: &RoomId,
2022-06-20 12:08:58 +02:00
shortstatehash: u64,
statediffnew: HashSet<CompressedStateEvent>,
statediffremoved: HashSet<CompressedStateEvent>,
db: &Database,
2020-09-13 22:24:36 +02:00
) -> Result<()> {
for event_id in statediffnew.into_iter().filter_map(|new| {
state_compressor::parse_compressed_state_event(new)
.ok()
.map(|(_, id)| id)
}) {
let pdu = match timeline::get_pdu_json(&event_id)? {
Some(pdu) => pdu,
None => continue,
};
if pdu.get("type").and_then(|val| val.as_str()) != Some("m.room.member") {
continue;
}
let pdu: PduEvent = match serde_json::from_str(
&serde_json::to_string(&pdu).expect("CanonicalJsonObj can be serialized to JSON"),
) {
Ok(pdu) => pdu,
Err(_) => continue,
};
#[derive(Deserialize)]
struct ExtractMembership {
membership: MembershipState,
}
let membership = match serde_json::from_str::<ExtractMembership>(pdu.content.get()) {
Ok(e) => e.membership,
Err(_) => continue,
};
let state_key = match pdu.state_key {
Some(k) => k,
None => continue,
};
2021-11-27 00:30:28 +01:00
let user_id = match UserId::parse(state_key) {
Ok(id) => id,
Err(_) => continue,
};
room::state_cache::update_membership(room_id, &user_id, membership, &pdu.sender, None, db, false)?;
2020-09-13 22:24:36 +02:00
}
room::state_cache::update_joined_count(room_id, db)?;
2021-08-17 00:22:52 +02:00
db.set_room_state(room_id, new_shortstatehash);
Ok(())
}
2022-06-19 22:56:14 +02:00
/// Generates a new StateHash and associates it with the incoming event.
///
/// This adds all current state events (not including the incoming event)
/// to `stateid_pduid` and adds the incoming event to `eventid_statehash`.
#[tracing::instrument(skip(self, state_ids_compressed, globals))]
pub fn set_event_state(
&self,
event_id: &EventId,
2021-08-12 23:04:00 +02:00
room_id: &RoomId,
state_ids_compressed: HashSet<CompressedStateEvent>,
globals: &super::globals::Globals,
) -> Result<()> {
let shorteventid = short::get_or_create_shorteventid(event_id, globals)?;
let previous_shortstatehash = db.get_room_shortstatehash(room_id)?;
2021-08-12 23:04:00 +02:00
let state_hash = super::calculate_hash(
&state_ids_compressed
.iter()
.map(|s| &s[..])
.collect::<Vec<_>>(),
);
let (shortstatehash, already_existed) =
short::get_or_create_shortstatehash(&state_hash, globals)?;
if !already_existed {
2021-08-12 23:04:00 +02:00
let states_parents = previous_shortstatehash
.map_or_else(|| Ok(Vec::new()), |p| room::state_compressor.load_shortstatehash_info(p))?;
2021-08-12 23:04:00 +02:00
let (statediffnew, statediffremoved) =
if let Some(parent_stateinfo) = states_parents.last() {
let statediffnew: HashSet<_> = state_ids_compressed
2021-08-12 23:04:00 +02:00
.difference(&parent_stateinfo.1)
.copied()
.collect();
2021-08-12 23:04:00 +02:00
let statediffremoved: HashSet<_> = parent_stateinfo
2021-08-12 23:04:00 +02:00
.1
.difference(&state_ids_compressed)
.copied()
.collect();
2021-08-12 23:04:00 +02:00
(statediffnew, statediffremoved)
} else {
(state_ids_compressed, HashSet::new())
};
state_compressor::save_state_from_diff(
2021-08-12 23:04:00 +02:00
shortstatehash,
2021-08-31 21:20:03 +02:00
statediffnew,
statediffremoved,
2021-08-12 23:04:00 +02:00
1_000_000, // high number because no state will be based on this one
states_parents,
)?;
}
db.set_event_state(&shorteventid.to_be_bytes(), &shortstatehash.to_be_bytes())?;
Ok(())
}
/// Generates a new StateHash and associates it with the incoming event.
///
/// This adds all current state events (not including the incoming event)
2021-03-17 22:30:25 +01:00
/// to `stateid_pduid` and adds the incoming event to `eventid_statehash`.
#[tracing::instrument(skip(self, new_pdu, globals))]
2020-12-19 16:00:11 +01:00
pub fn append_to_state(
&self,
new_pdu: &PduEvent,
globals: &super::globals::Globals,
2021-03-17 22:30:25 +01:00
) -> Result<u64> {
let shorteventid = self.get_or_create_shorteventid(&new_pdu.event_id, globals)?;
let previous_shortstatehash = self.get_room_shortstatehash(&new_pdu.room_id)?;
2021-08-12 23:04:00 +02:00
if let Some(p) = previous_shortstatehash {
self.shorteventid_shortstatehash
.insert(&shorteventid.to_be_bytes(), &p.to_be_bytes())?;
}
2021-03-17 22:30:25 +01:00
2020-09-12 21:30:07 +02:00
if let Some(state_key) = &new_pdu.state_key {
2021-08-12 23:04:00 +02:00
let states_parents = previous_shortstatehash
.map_or_else(|| Ok(Vec::new()), |p| self.load_shortstatehash_info(p))?;
2020-12-19 16:00:11 +01:00
2022-04-06 21:31:29 +02:00
let shortstatekey = self.get_or_create_shortstatekey(
&new_pdu.kind.to_string().into(),
state_key,
globals,
)?;
2020-09-12 21:30:07 +02:00
let new = self.compress_state_event(shortstatekey, &new_pdu.event_id, globals)?;
2021-08-12 23:04:00 +02:00
let replaces = states_parents
.last()
.map(|info| {
info.1
.iter()
.find(|bytes| bytes.starts_with(&shortstatekey.to_be_bytes()))
})
.unwrap_or_default();
if Some(&new) == replaces {
return Ok(previous_shortstatehash.expect("must exist"));
}
2021-08-12 23:04:00 +02:00
// TODO: statehash with deterministic inputs
let shortstatehash = globals.next_count()?;
2021-08-12 23:04:00 +02:00
let mut statediffnew = HashSet::new();
statediffnew.insert(new);
let mut statediffremoved = HashSet::new();
if let Some(replaces) = replaces {
statediffremoved.insert(*replaces);
2021-08-12 23:04:00 +02:00
}
2021-08-12 23:04:00 +02:00
self.save_state_from_diff(
shortstatehash,
statediffnew,
statediffremoved,
2,
states_parents,
)?;
2021-03-17 22:30:25 +01:00
Ok(shortstatehash)
2020-09-12 21:30:07 +02:00
} else {
2021-08-12 23:04:00 +02:00
Ok(previous_shortstatehash.expect("first event in room must be a state event"))
2020-09-12 21:30:07 +02:00
}
}
#[tracing::instrument(skip(self, invite_event))]
2021-04-25 14:10:07 +02:00
pub fn calculate_invite_state(
&self,
invite_event: &PduEvent,
) -> Result<Vec<Raw<AnyStrippedStateEvent>>> {
let mut state = Vec::new();
// Add recommended events
if let Some(e) =
2022-04-06 21:31:29 +02:00
self.room_state_get(&invite_event.room_id, &StateEventType::RoomCreate, "")?
2021-04-25 14:10:07 +02:00
{
state.push(e.to_stripped_state_event());
}
if let Some(e) =
2022-04-06 21:31:29 +02:00
self.room_state_get(&invite_event.room_id, &StateEventType::RoomJoinRules, "")?
2021-04-25 14:10:07 +02:00
{
state.push(e.to_stripped_state_event());
}
2022-04-06 21:31:29 +02:00
if let Some(e) = self.room_state_get(
&invite_event.room_id,
&StateEventType::RoomCanonicalAlias,
"",
)? {
state.push(e.to_stripped_state_event());
}
if let Some(e) =
self.room_state_get(&invite_event.room_id, &StateEventType::RoomAvatar, "")?
{
2021-04-25 14:10:07 +02:00
state.push(e.to_stripped_state_event());
}
2022-04-06 21:31:29 +02:00
if let Some(e) =
self.room_state_get(&invite_event.room_id, &StateEventType::RoomName, "")?
{
2021-04-25 14:10:07 +02:00
state.push(e.to_stripped_state_event());
}
if let Some(e) = self.room_state_get(
&invite_event.room_id,
2022-04-06 21:31:29 +02:00
&StateEventType::RoomMember,
2021-04-25 14:10:07 +02:00
invite_event.sender.as_str(),
)? {
state.push(e.to_stripped_state_event());
}
state.push(invite_event.to_stripped_state_event());
Ok(state)
}
#[tracing::instrument(skip(self))]
2021-03-17 22:30:25 +01:00
pub fn set_room_state(&self, room_id: &RoomId, shortstatehash: u64) -> Result<()> {
self.roomid_shortstatehash
.insert(room_id.as_bytes(), &shortstatehash.to_be_bytes())?;
2020-12-31 14:52:08 +01:00
Ok(())
}
pub fn db(&self) -> D {
&self.db
}
2022-06-20 11:31:27 +02:00
}