From 6d29098d1af955d98ec57da4593836e67e8df090 Mon Sep 17 00:00:00 2001 From: Jade Ellis Date: Sat, 19 Jul 2025 22:20:26 +0100 Subject: [PATCH] refactor: Replace remaining std RwLocks --- src/admin/federation/commands.rs | 3 +-- src/admin/mod.rs | 12 ++------- src/core/alloc/je.rs | 7 +++--- src/service/admin/mod.rs | 25 +++++++------------ src/service/globals/mod.rs | 22 +++++----------- .../fetch_and_handle_outliers.rs | 3 --- .../event_handler/handle_incoming_pdu.rs | 3 --- .../rooms/event_handler/handle_prev_pdu.rs | 3 --- src/service/rooms/event_handler/mod.rs | 17 +++---------- src/service/rooms/state_cache/mod.rs | 22 +++++----------- src/service/rooms/state_cache/update.rs | 5 +--- 11 files changed, 32 insertions(+), 90 deletions(-) diff --git a/src/admin/federation/commands.rs b/src/admin/federation/commands.rs index 545dcbca..f77dadab 100644 --- a/src/admin/federation/commands.rs +++ b/src/admin/federation/commands.rs @@ -26,8 +26,7 @@ pub(super) async fn incoming_federation(&self) -> Result { .rooms .event_handler .federation_handletime - .read() - .expect("locked"); + .read(); let mut msg = format!("Handling {} incoming pdus:\n", map.len()); for (r, (e, i)) in map.iter() { diff --git a/src/admin/mod.rs b/src/admin/mod.rs index 732b8ce0..1d46590b 100644 --- a/src/admin/mod.rs +++ b/src/admin/mod.rs @@ -37,11 +37,7 @@ pub use crate::admin::AdminCommand; /// Install the admin command processor pub async fn init(admin_service: &service::admin::Service) { - _ = admin_service - .complete - .write() - .expect("locked for writing") - .insert(processor::complete); + _ = admin_service.complete.write().insert(processor::complete); _ = admin_service .handle .write() @@ -52,9 +48,5 @@ pub async fn init(admin_service: &service::admin::Service) { /// Uninstall the admin command handler pub async fn fini(admin_service: &service::admin::Service) { _ = admin_service.handle.write().await.take(); - _ = admin_service - .complete - .write() - .expect("locked for writing") - .take(); + _ = admin_service.complete.write().take(); } diff --git a/src/core/alloc/je.rs b/src/core/alloc/je.rs index e138233e..77deebc5 100644 --- a/src/core/alloc/je.rs +++ b/src/core/alloc/je.rs @@ -4,7 +4,6 @@ use std::{ cell::OnceCell, ffi::{CStr, c_char, c_void}, fmt::Debug, - sync::RwLock, }; use arrayvec::ArrayVec; @@ -13,7 +12,7 @@ use tikv_jemalloc_sys as ffi; use tikv_jemallocator as jemalloc; use crate::{ - Result, err, is_equal_to, is_nonzero, + Result, SyncRwLock, err, is_equal_to, is_nonzero, utils::{math, math::Tried}, }; @@ -40,7 +39,7 @@ const MALLOC_CONF_PROF: &str = ""; #[global_allocator] static JEMALLOC: jemalloc::Jemalloc = jemalloc::Jemalloc; -static CONTROL: RwLock<()> = RwLock::new(()); +static CONTROL: SyncRwLock<()> = SyncRwLock::new(()); type Name = ArrayVec; type Key = ArrayVec; @@ -332,7 +331,7 @@ fn set(key: &Key, val: T) -> Result where T: Copy + Debug, { - let _lock = CONTROL.write()?; + let _lock = CONTROL.write(); let res = xchg(key, val)?; inc_epoch()?; diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index f496c414..c052198c 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -5,11 +5,11 @@ mod grant; use std::{ pin::Pin, - sync::{Arc, RwLock as StdRwLock, Weak}, + sync::{Arc, Weak}, }; use async_trait::async_trait; -use conduwuit::{Err, utils}; +use conduwuit::{Err, SyncRwLock, utils}; use conduwuit_core::{ Error, Event, Result, Server, debug, err, error, error::default_log, pdu::PduBuilder, }; @@ -36,7 +36,7 @@ pub struct Service { services: Services, channel: (Sender, Receiver), pub handle: RwLock>, - pub complete: StdRwLock>, + pub complete: SyncRwLock>, #[cfg(feature = "console")] pub console: Arc, } @@ -50,7 +50,7 @@ struct Services { state_cache: Dep, state_accessor: Dep, account_data: Dep, - services: StdRwLock>>, + services: SyncRwLock>>, media: Dep, } @@ -105,7 +105,7 @@ impl crate::Service for Service { }, channel: loole::bounded(COMMAND_QUEUE_LIMIT), handle: RwLock::new(None), - complete: StdRwLock::new(None), + complete: SyncRwLock::new(None), #[cfg(feature = "console")] console: console::Console::new(&args), })) @@ -312,10 +312,7 @@ impl Service { /// Invokes the tab-completer to complete the command. When unavailable, /// None is returned. pub fn complete_command(&self, command: &str) -> Option { - self.complete - .read() - .expect("locked for reading") - .map(|complete| complete(command)) + self.complete.read().map(|complete| complete(command)) } async fn handle_signal(&self, sig: &'static str) { @@ -338,17 +335,13 @@ impl Service { } async fn process_command(&self, command: CommandInput) -> ProcessorResult { - let handle = &self - .handle - .read() - .await - .expect("Admin module is not loaded"); + let handle_guard = self.handle.read().await; + let handle = handle_guard.as_ref().expect("Admin module is not loaded"); let services = self .services .services .read() - .expect("locked") .as_ref() .and_then(Weak::upgrade) .expect("Services self-reference not initialized."); @@ -523,7 +516,7 @@ impl Service { /// Sets the self-reference to crate::Services which will provide context to /// the admin commands. pub(super) fn set_services(&self, services: Option<&Arc>) { - let receiver = &mut *self.services.services.write().expect("locked for writing"); + let receiver = &mut *self.services.services.write(); let weak = services.map(Arc::downgrade); *receiver = weak; } diff --git a/src/service/globals/mod.rs b/src/service/globals/mod.rs index a23a4c21..12f2ec78 100644 --- a/src/service/globals/mod.rs +++ b/src/service/globals/mod.rs @@ -1,14 +1,9 @@ mod data; -use std::{ - collections::HashMap, - fmt::Write, - sync::{Arc, RwLock}, - time::Instant, -}; +use std::{collections::HashMap, fmt::Write, sync::Arc, time::Instant}; use async_trait::async_trait; -use conduwuit::{Result, Server, error, utils::bytes::pretty}; +use conduwuit::{Result, Server, SyncRwLock, error, utils::bytes::pretty}; use data::Data; use regex::RegexSet; use ruma::{OwnedEventId, OwnedRoomAliasId, OwnedServerName, OwnedUserId, ServerName, UserId}; @@ -19,7 +14,7 @@ pub struct Service { pub db: Data, server: Arc, - pub bad_event_ratelimiter: Arc>>, + pub bad_event_ratelimiter: Arc>>, pub server_user: OwnedUserId, pub admin_alias: OwnedRoomAliasId, pub turn_secret: String, @@ -62,7 +57,7 @@ impl crate::Service for Service { Ok(Arc::new(Self { db, server: args.server.clone(), - bad_event_ratelimiter: Arc::new(RwLock::new(HashMap::new())), + bad_event_ratelimiter: Arc::new(SyncRwLock::new(HashMap::new())), admin_alias: OwnedRoomAliasId::try_from(format!("#admins:{}", &args.server.name)) .expect("#admins:server_name is valid alias name"), server_user: UserId::parse_with_server_name( @@ -76,7 +71,7 @@ impl crate::Service for Service { } async fn memory_usage(&self, out: &mut (dyn Write + Send)) -> Result { - let (ber_count, ber_bytes) = self.bad_event_ratelimiter.read()?.iter().fold( + let (ber_count, ber_bytes) = self.bad_event_ratelimiter.read().iter().fold( (0_usize, 0_usize), |(mut count, mut bytes), (event_id, _)| { bytes = bytes.saturating_add(event_id.capacity()); @@ -91,12 +86,7 @@ impl crate::Service for Service { Ok(()) } - async fn clear_cache(&self) { - self.bad_event_ratelimiter - .write() - .expect("locked for writing") - .clear(); - } + async fn clear_cache(&self) { self.bad_event_ratelimiter.write().clear(); } fn name(&self) -> &str { service::make_name(std::module_path!()) } } diff --git a/src/service/rooms/event_handler/fetch_and_handle_outliers.rs b/src/service/rooms/event_handler/fetch_and_handle_outliers.rs index 44027e04..59b768f2 100644 --- a/src/service/rooms/event_handler/fetch_and_handle_outliers.rs +++ b/src/service/rooms/event_handler/fetch_and_handle_outliers.rs @@ -41,7 +41,6 @@ where .globals .bad_event_ratelimiter .write() - .expect("locked") .entry(id) { | hash_map::Entry::Vacant(e) => { @@ -76,7 +75,6 @@ where .globals .bad_event_ratelimiter .read() - .expect("locked") .get(&*next_id) { // Exponential backoff @@ -187,7 +185,6 @@ where .globals .bad_event_ratelimiter .read() - .expect("locked") .get(&*next_id) { // Exponential backoff diff --git a/src/service/rooms/event_handler/handle_incoming_pdu.rs b/src/service/rooms/event_handler/handle_incoming_pdu.rs index 86a05e0a..5299e8d4 100644 --- a/src/service/rooms/event_handler/handle_incoming_pdu.rs +++ b/src/service/rooms/event_handler/handle_incoming_pdu.rs @@ -160,7 +160,6 @@ pub async fn handle_incoming_pdu<'a>( .globals .bad_event_ratelimiter .write() - .expect("locked") .entry(prev_id.into()) { | hash_map::Entry::Vacant(e) => { @@ -181,13 +180,11 @@ pub async fn handle_incoming_pdu<'a>( let start_time = Instant::now(); self.federation_handletime .write() - .expect("locked") .insert(room_id.into(), (event_id.to_owned(), start_time)); defer! {{ self.federation_handletime .write() - .expect("locked") .remove(room_id); }}; diff --git a/src/service/rooms/event_handler/handle_prev_pdu.rs b/src/service/rooms/event_handler/handle_prev_pdu.rs index cd46310a..cb4978d9 100644 --- a/src/service/rooms/event_handler/handle_prev_pdu.rs +++ b/src/service/rooms/event_handler/handle_prev_pdu.rs @@ -42,7 +42,6 @@ where .globals .bad_event_ratelimiter .read() - .expect("locked") .get(prev_id) { // Exponential backoff @@ -70,13 +69,11 @@ where let start_time = Instant::now(); self.federation_handletime .write() - .expect("locked") .insert(room_id.into(), ((*prev_id).to_owned(), start_time)); defer! {{ self.federation_handletime .write() - .expect("locked") .remove(room_id); }}; diff --git a/src/service/rooms/event_handler/mod.rs b/src/service/rooms/event_handler/mod.rs index aed38e1e..4e59c207 100644 --- a/src/service/rooms/event_handler/mod.rs +++ b/src/service/rooms/event_handler/mod.rs @@ -10,15 +10,10 @@ mod resolve_state; mod state_at_incoming; mod upgrade_outlier_pdu; -use std::{ - collections::HashMap, - fmt::Write, - sync::{Arc, RwLock as StdRwLock}, - time::Instant, -}; +use std::{collections::HashMap, fmt::Write, sync::Arc, time::Instant}; use async_trait::async_trait; -use conduwuit::{Err, Event, PduEvent, Result, RoomVersion, Server, utils::MutexMap}; +use conduwuit::{Err, Event, PduEvent, Result, RoomVersion, Server, SyncRwLock, utils::MutexMap}; use ruma::{ OwnedEventId, OwnedRoomId, RoomId, RoomVersionId, events::room::create::RoomCreateEventContent, @@ -28,7 +23,7 @@ use crate::{Dep, globals, rooms, sending, server_keys}; pub struct Service { pub mutex_federation: RoomMutexMap, - pub federation_handletime: StdRwLock, + pub federation_handletime: SyncRwLock, services: Services, } @@ -81,11 +76,7 @@ impl crate::Service for Service { let mutex_federation = self.mutex_federation.len(); writeln!(out, "federation_mutex: {mutex_federation}")?; - let federation_handletime = self - .federation_handletime - .read() - .expect("locked for reading") - .len(); + let federation_handletime = self.federation_handletime.read().len(); writeln!(out, "federation_handletime: {federation_handletime}")?; Ok(()) diff --git a/src/service/rooms/state_cache/mod.rs b/src/service/rooms/state_cache/mod.rs index 9429be79..e9845fbf 100644 --- a/src/service/rooms/state_cache/mod.rs +++ b/src/service/rooms/state_cache/mod.rs @@ -1,13 +1,10 @@ mod update; mod via; -use std::{ - collections::HashMap, - sync::{Arc, RwLock}, -}; +use std::{collections::HashMap, sync::Arc}; use conduwuit::{ - Result, implement, + Result, SyncRwLock, implement, result::LogErr, utils::{ReadyExt, stream::TryIgnore}, warn, @@ -54,14 +51,14 @@ struct Data { userroomid_knockedstate: Arc, } -type AppServiceInRoomCache = RwLock>>; +type AppServiceInRoomCache = SyncRwLock>>; type StrippedStateEventItem = (OwnedRoomId, Vec>); type SyncStateEventItem = (OwnedRoomId, Vec>); impl crate::Service for Service { fn build(args: crate::Args<'_>) -> Result> { Ok(Arc::new(Self { - appservice_in_room_cache: RwLock::new(HashMap::new()), + appservice_in_room_cache: SyncRwLock::new(HashMap::new()), services: Services { account_data: args.depend::("account_data"), config: args.depend::("config"), @@ -99,7 +96,6 @@ pub async fn appservice_in_room(&self, room_id: &RoomId, appservice: &Registrati if let Some(cached) = self .appservice_in_room_cache .read() - .expect("locked") .get(room_id) .and_then(|map| map.get(&appservice.registration.id)) .copied() @@ -124,7 +120,6 @@ pub async fn appservice_in_room(&self, room_id: &RoomId, appservice: &Registrati self.appservice_in_room_cache .write() - .expect("locked") .entry(room_id.into()) .or_default() .insert(appservice.registration.id.clone(), in_room); @@ -134,19 +129,14 @@ pub async fn appservice_in_room(&self, room_id: &RoomId, appservice: &Registrati #[implement(Service)] pub fn get_appservice_in_room_cache_usage(&self) -> (usize, usize) { - let cache = self.appservice_in_room_cache.read().expect("locked"); + let cache = self.appservice_in_room_cache.read(); (cache.len(), cache.capacity()) } #[implement(Service)] #[tracing::instrument(level = "debug", skip_all)] -pub fn clear_appservice_in_room_cache(&self) { - self.appservice_in_room_cache - .write() - .expect("locked") - .clear(); -} +pub fn clear_appservice_in_room_cache(&self) { self.appservice_in_room_cache.write().clear(); } /// Returns an iterator of all servers participating in this room. #[implement(Service)] diff --git a/src/service/rooms/state_cache/update.rs b/src/service/rooms/state_cache/update.rs index 02c6bec6..32c67947 100644 --- a/src/service/rooms/state_cache/update.rs +++ b/src/service/rooms/state_cache/update.rs @@ -211,10 +211,7 @@ pub async fn update_joined_count(&self, room_id: &RoomId) { self.db.serverroomids.put_raw(serverroom_id, []); } - self.appservice_in_room_cache - .write() - .expect("locked") - .remove(room_id); + self.appservice_in_room_cache.write().remove(room_id); } /// Direct DB function to directly mark a user as joined. It is not