From b4065a3e28bd95ae45cf102f34865bb1d285d54d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Fri, 30 Jun 2023 17:49:52 +0200 Subject: [PATCH] feat: Admin command to ask other servers for state --- src/service/admin/mod.rs | 76 ++++++++++- src/service/rooms/event_handler/mod.rs | 167 ++++++++++++++----------- src/service/rooms/state/mod.rs | 17 ++- 3 files changed, 183 insertions(+), 77 deletions(-) diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index d37ec69e..9851d34b 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -1,7 +1,7 @@ use std::{ collections::BTreeMap, convert::{TryFrom, TryInto}, - sync::Arc, + sync::{Arc, RwLock}, time::Instant, }; @@ -21,12 +21,13 @@ use ruma::{ power_levels::RoomPowerLevelsEventContent, topic::RoomTopicEventContent, }, - TimelineEventType, + StateEventType, TimelineEventType, }, EventId, OwnedRoomAliasId, RoomAliasId, RoomId, RoomVersionId, ServerName, UserId, }; use serde_json::value::to_raw_value; use tokio::sync::{mpsc, Mutex, MutexGuard}; +use tracing::{error, info}; use crate::{ api::client_server::{leave_all_rooms, AUTO_GEN_PASSWORD_LENGTH}, @@ -153,6 +154,12 @@ enum AdminCommand { password: Option, }, + AskForState { + room_id: Box, + event_id: Box, + server: Box, + }, + /// Disables incoming federation handling for a room. DisableRoom { room_id: Box }, /// Enables incoming federation handling for a room again. @@ -736,6 +743,71 @@ impl Service { ) } } + AdminCommand::AskForState { + room_id, + event_id, + server, + } => { + let create_event = services() + .rooms + .state_accessor + .room_state_get(&room_id, &StateEventType::RoomCreate, "")? + .ok_or_else(|| Error::bad_database("Failed to find create event in db."))?; + let create_event_content: RoomCreateEventContent = + serde_json::from_str(create_event.content.get()).map_err(|e| { + error!("Invalid create event: {}", e); + Error::BadDatabase("Invalid create event in db") + })?; + let room_version_id = &create_event_content.room_version; + + let state_at_event = services() + .rooms + .event_handler + .ask_for_state( + &room_id, + &event_id, + &server, + &create_event, + room_version_id, + &mut RwLock::new(BTreeMap::new()), + ) + .await?; + + // We start looking at current room state now, so lets lock the room + let mutex_state = Arc::clone( + services() + .globals + .roomid_mutex_state + .write() + .unwrap() + .entry((*room_id).to_owned()) + .or_default(), + ); + let state_lock = mutex_state.lock().await; + + let new_room_state = services() + .rooms + .event_handler + .resolve_state(&room_id, room_version_id, state_at_event) + .await?; + + // Set the new room state to the resolved state + info!("Forcing new room state"); + + let (sstatehash, new, removed) = services() + .rooms + .state_compressor + .save_state(&room_id, new_room_state)?; + + services() + .rooms + .state + .force_state(&room_id, sstatehash, new, removed, &state_lock) + .await?; + + drop(state_lock); + RoomMessageEventContent::text_plain("Updated state.") + } }; Ok(reply_message_content) diff --git a/src/service/rooms/event_handler/mod.rs b/src/service/rooms/event_handler/mod.rs index ef5616eb..88d54298 100644 --- a/src/service/rooms/event_handler/mod.rs +++ b/src/service/rooms/event_handler/mod.rs @@ -632,81 +632,18 @@ impl Service { } if state_at_incoming_event.is_none() { - info!("Calling /state_ids"); - // Call /state_ids to find out what the state at this pdu is. We trust the server's - // response to some extend, but we still do a lot of checks on the events - match services() - .sending - .send_federation_request( + state_at_incoming_event = Some( + self.ask_for_state( + room_id, + &incoming_pdu.event_id, origin, - get_room_state_ids::v1::Request { - room_id: room_id.to_owned(), - event_id: (*incoming_pdu.event_id).to_owned(), - }, + create_event, + room_version_id, + pub_key_map, ) - .await - { - Ok(res) => { - info!("Fetching state events at event."); - let state_vec = self - .fetch_and_handle_outliers( - origin, - &res.pdu_ids - .iter() - .map(|x| Arc::from(&**x)) - .collect::>(), - create_event, - room_id, - room_version_id, - pub_key_map, - ) - .await; - - let mut state: HashMap<_, Arc> = HashMap::new(); - for (pdu, _) in state_vec { - let state_key = pdu.state_key.clone().ok_or_else(|| { - Error::bad_database("Found non-state pdu in state events.") - })?; - - let shortstatekey = services().rooms.short.get_or_create_shortstatekey( - &pdu.kind.to_string().into(), - &state_key, - )?; - - match state.entry(shortstatekey) { - hash_map::Entry::Vacant(v) => { - v.insert(Arc::from(&*pdu.event_id)); - } - hash_map::Entry::Occupied(_) => return Err( - Error::bad_database("State event's type and state_key combination exists multiple times."), - ), - } - } - - // The original create event must still be in the state - let create_shortstatekey = services() - .rooms - .short - .get_shortstatekey(&StateEventType::RoomCreate, "")? - .expect("Room exists"); - - if state.get(&create_shortstatekey).map(|id| id.as_ref()) - != Some(&create_event.event_id) - { - return Err(Error::bad_database( - "Incoming event refers to wrong create event.", - )); - } - - state_at_incoming_event = Some(state); - } - Err(e) => { - warn!("Fetching state for event failed: {}", e); - return Err(e); - } - }; + .await?, + ); } - let state_at_incoming_event = state_at_incoming_event.expect("we always set this to some above"); @@ -884,7 +821,91 @@ impl Service { Ok(pdu_id) } - async fn resolve_state( + pub async fn ask_for_state( + &self, + room_id: &RoomId, + event_id: &EventId, + server_name: &ServerName, + create_event: &PduEvent, + room_version_id: &RoomVersionId, + pub_key_map: &RwLock>>, + ) -> Result>> { + info!("Calling /state_ids"); + // Call /state_ids to find out what the state at this pdu is. We trust the server's + // response to some extend, but we still do a lot of checks on the events + match services() + .sending + .send_federation_request( + server_name, + get_room_state_ids::v1::Request { + room_id: room_id.to_owned(), + event_id: event_id.to_owned(), + }, + ) + .await + { + Ok(res) => { + info!("Fetching state events at event."); + let state_vec = self + .fetch_and_handle_outliers( + server_name, + &res.pdu_ids + .iter() + .map(|x| Arc::from(&**x)) + .collect::>(), + create_event, + room_id, + room_version_id, + pub_key_map, + ) + .await; + + let mut state: HashMap<_, Arc> = HashMap::new(); + for (pdu, _) in state_vec { + let state_key = pdu.state_key.clone().ok_or_else(|| { + Error::bad_database("Found non-state pdu in state events.") + })?; + + let shortstatekey = services() + .rooms + .short + .get_or_create_shortstatekey(&pdu.kind.to_string().into(), &state_key)?; + + match state.entry(shortstatekey) { + hash_map::Entry::Vacant(v) => { + v.insert(Arc::from(&*pdu.event_id)); + } + hash_map::Entry::Occupied(_) => return Err(Error::bad_database( + "State event's type and state_key combination exists multiple times.", + )), + } + } + + // The original create event must still be in the state + let create_shortstatekey = services() + .rooms + .short + .get_shortstatekey(&StateEventType::RoomCreate, "")? + .expect("Room exists"); + + if state.get(&create_shortstatekey).map(|id| id.as_ref()) + != Some(&create_event.event_id) + { + return Err(Error::bad_database( + "Incoming event refers to wrong create event.", + )); + } + + Ok(state) + } + Err(e) => { + warn!("Fetching state for event failed: {}", e); + Err(e) + } + } + } + + pub async fn resolve_state( &self, room_id: &RoomId, room_version_id: &RoomVersionId, diff --git a/src/service/rooms/state/mod.rs b/src/service/rooms/state/mod.rs index ca9430f1..45cb4d45 100644 --- a/src/service/rooms/state/mod.rs +++ b/src/service/rooms/state/mod.rs @@ -16,7 +16,7 @@ use ruma::{ }; use serde::Deserialize; use tokio::sync::MutexGuard; -use tracing::warn; +use tracing::{info, warn}; use crate::{services, utils::calculate_hash, Error, PduEvent, Result}; @@ -33,7 +33,7 @@ impl Service { room_id: &RoomId, shortstatehash: u64, statediffnew: Arc>, - _statediffremoved: Arc>, + statediffremoved: Arc>, state_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex ) -> Result<()> { for event_id in statediffnew.iter().filter_map(|new| { @@ -49,6 +49,8 @@ impl Service { None => continue, }; + info!("New in state: {event_id}"); + if pdu.get("type").and_then(|val| val.as_str()) != Some("m.room.member") { continue; } @@ -90,6 +92,17 @@ impl Service { )?; } + for event_id in statediffremoved.iter().filter_map(|removed| { + services() + .rooms + .state_compressor + .parse_compressed_state_event(&removed) + .ok() + .map(|(_, id)| id) + }) { + info!("Removed from state: {event_id}"); + } + services().rooms.state_cache.update_joined_count(room_id)?; self.db