From 374fb2745c47e8b26e0b5daa435ace709f446498 Mon Sep 17 00:00:00 2001 From: Jade Ellis Date: Sat, 19 Jul 2025 22:05:43 +0100 Subject: [PATCH] refactor: Replace remaining std Mutexes --- src/database/engine/backup.rs | 2 +- src/database/engine/cf_opts.rs | 2 +- src/database/engine/context.rs | 15 +++--- src/database/engine/memory_usage.rs | 6 +-- src/database/engine/open.rs | 6 +-- src/database/pool.rs | 10 ++-- src/service/admin/console.rs | 56 ++++++++------------- src/service/rooms/auth_chain/data.rs | 24 +++------ src/service/rooms/auth_chain/mod.rs | 4 +- src/service/rooms/state_compressor/mod.rs | 14 +++--- src/service/sync/mod.rs | 61 +++++++++++------------ 11 files changed, 83 insertions(+), 117 deletions(-) diff --git a/src/database/engine/backup.rs b/src/database/engine/backup.rs index ac72e6d4..4cdb6172 100644 --- a/src/database/engine/backup.rs +++ b/src/database/engine/backup.rs @@ -71,7 +71,7 @@ pub fn backup_count(&self) -> Result { fn backup_engine(&self) -> Result { let path = self.backup_path()?; let options = BackupEngineOptions::new(path).map_err(map_err)?; - BackupEngine::open(&options, &*self.ctx.env.lock()?).map_err(map_err) + BackupEngine::open(&options, &self.ctx.env.lock()).map_err(map_err) } #[implement(Engine)] diff --git a/src/database/engine/cf_opts.rs b/src/database/engine/cf_opts.rs index cbbd1012..58358f02 100644 --- a/src/database/engine/cf_opts.rs +++ b/src/database/engine/cf_opts.rs @@ -232,7 +232,7 @@ fn get_cache(ctx: &Context, desc: &Descriptor) -> Option { cache_opts.set_num_shard_bits(shard_bits); cache_opts.set_capacity(size); - let mut caches = ctx.col_cache.lock().expect("locked"); + let mut caches = ctx.col_cache.lock(); match desc.cache_disp { | CacheDisp::Unique if desc.cache_size == 0 => None, | CacheDisp::Unique => { diff --git a/src/database/engine/context.rs b/src/database/engine/context.rs index 380e37af..3b9238bd 100644 --- a/src/database/engine/context.rs +++ b/src/database/engine/context.rs @@ -1,9 +1,6 @@ -use std::{ - collections::BTreeMap, - sync::{Arc, Mutex}, -}; +use std::{collections::BTreeMap, sync::Arc}; -use conduwuit::{Result, Server, debug, utils::math::usize_from_f64}; +use conduwuit::{Result, Server, SyncMutex, debug, utils::math::usize_from_f64}; use rocksdb::{Cache, Env, LruCacheOptions}; use crate::{or_else, pool::Pool}; @@ -14,9 +11,9 @@ use crate::{or_else, pool::Pool}; /// These assets are housed in the shared Context. pub(crate) struct Context { pub(crate) pool: Arc, - pub(crate) col_cache: Mutex>, - pub(crate) row_cache: Mutex, - pub(crate) env: Mutex, + pub(crate) col_cache: SyncMutex>, + pub(crate) row_cache: SyncMutex, + pub(crate) env: SyncMutex, pub(crate) server: Arc, } @@ -68,7 +65,7 @@ impl Drop for Context { debug!("Closing frontend pool"); self.pool.close(); - let mut env = self.env.lock().expect("locked"); + let mut env = self.env.lock(); debug!("Shutting down background threads"); env.set_high_priority_background_threads(0); diff --git a/src/database/engine/memory_usage.rs b/src/database/engine/memory_usage.rs index 9bb5c535..21af35c8 100644 --- a/src/database/engine/memory_usage.rs +++ b/src/database/engine/memory_usage.rs @@ -9,7 +9,7 @@ use crate::or_else; #[implement(Engine)] pub fn memory_usage(&self) -> Result { let mut res = String::new(); - let stats = get_memory_usage_stats(Some(&[&self.db]), Some(&[&*self.ctx.row_cache.lock()?])) + let stats = get_memory_usage_stats(Some(&[&self.db]), Some(&[&*self.ctx.row_cache.lock()])) .or_else(or_else)?; let mibs = |input| f64::from(u32::try_from(input / 1024).unwrap_or(0)) / 1024.0; writeln!( @@ -19,10 +19,10 @@ pub fn memory_usage(&self) -> Result { mibs(stats.mem_table_total), mibs(stats.mem_table_unflushed), mibs(stats.mem_table_readers_total), - mibs(u64::try_from(self.ctx.row_cache.lock()?.get_usage())?), + mibs(u64::try_from(self.ctx.row_cache.lock().get_usage())?), )?; - for (name, cache) in &*self.ctx.col_cache.lock()? { + for (name, cache) in &*self.ctx.col_cache.lock() { writeln!(res, "{name} cache: {:.2} MiB", mibs(u64::try_from(cache.get_usage())?))?; } diff --git a/src/database/engine/open.rs b/src/database/engine/open.rs index 84e59a6a..7b9d93c2 100644 --- a/src/database/engine/open.rs +++ b/src/database/engine/open.rs @@ -23,11 +23,7 @@ pub(crate) async fn open(ctx: Arc, desc: &[Descriptor]) -> Result, queues: Vec>, - workers: Mutex>>, + workers: SyncMutex>>, topology: Vec, busy: AtomicUsize, queued_max: AtomicUsize, @@ -115,7 +115,7 @@ impl Drop for Pool { #[implement(Pool)] #[tracing::instrument(skip_all)] pub(crate) fn close(&self) { - let workers = take(&mut *self.workers.lock().expect("locked")); + let workers = take(&mut *self.workers.lock()); let senders = self.queues.iter().map(Sender::sender_count).sum::(); @@ -154,7 +154,7 @@ pub(crate) fn close(&self) { #[implement(Pool)] fn spawn_until(self: &Arc, recv: &[Receiver], count: usize) -> Result { - let mut workers = self.workers.lock().expect("locked"); + let mut workers = self.workers.lock(); while workers.len() < count { self.clone().spawn_one(&mut workers, recv)?; } diff --git a/src/service/admin/console.rs b/src/service/admin/console.rs index 02f41303..931bb719 100644 --- a/src/service/admin/console.rs +++ b/src/service/admin/console.rs @@ -1,11 +1,8 @@ #![cfg(feature = "console")] -use std::{ - collections::VecDeque, - sync::{Arc, Mutex}, -}; +use std::{collections::VecDeque, sync::Arc}; -use conduwuit::{Server, debug, defer, error, log, log::is_systemd_mode}; +use conduwuit::{Server, SyncMutex, debug, defer, error, log, log::is_systemd_mode}; use futures::future::{AbortHandle, Abortable}; use ruma::events::room::message::RoomMessageEventContent; use rustyline_async::{Readline, ReadlineError, ReadlineEvent}; @@ -17,10 +14,10 @@ use crate::{Dep, admin}; pub struct Console { server: Arc, admin: Dep, - worker_join: Mutex>>, - input_abort: Mutex>, - command_abort: Mutex>, - history: Mutex>, + worker_join: SyncMutex>>, + input_abort: SyncMutex>, + command_abort: SyncMutex>, + history: SyncMutex>, output: MadSkin, } @@ -50,7 +47,7 @@ impl Console { } pub async fn start(self: &Arc) { - let mut worker_join = self.worker_join.lock().expect("locked"); + let mut worker_join = self.worker_join.lock(); if worker_join.is_none() { let self_ = Arc::clone(self); _ = worker_join.insert(self.server.runtime().spawn(self_.worker())); @@ -60,7 +57,7 @@ impl Console { pub async fn close(self: &Arc) { self.interrupt(); - let Some(worker_join) = self.worker_join.lock().expect("locked").take() else { + let Some(worker_join) = self.worker_join.lock().take() else { return; }; @@ -70,22 +67,18 @@ impl Console { pub fn interrupt(self: &Arc) { self.interrupt_command(); self.interrupt_readline(); - self.worker_join - .lock() - .expect("locked") - .as_ref() - .map(JoinHandle::abort); + self.worker_join.lock().as_ref().map(JoinHandle::abort); } pub fn interrupt_readline(self: &Arc) { - if let Some(input_abort) = self.input_abort.lock().expect("locked").take() { + if let Some(input_abort) = self.input_abort.lock().take() { debug!("Interrupting console readline..."); input_abort.abort(); } } pub fn interrupt_command(self: &Arc) { - if let Some(command_abort) = self.command_abort.lock().expect("locked").take() { + if let Some(command_abort) = self.command_abort.lock().take() { debug!("Interrupting console command..."); command_abort.abort(); } @@ -120,7 +113,7 @@ impl Console { } debug!("session ending"); - self.worker_join.lock().expect("locked").take(); + self.worker_join.lock().take(); } async fn readline(self: &Arc) -> Result { @@ -135,9 +128,9 @@ impl Console { let (abort, abort_reg) = AbortHandle::new_pair(); let future = Abortable::new(future, abort_reg); - _ = self.input_abort.lock().expect("locked").insert(abort); + _ = self.input_abort.lock().insert(abort); defer! {{ - _ = self.input_abort.lock().expect("locked").take(); + _ = self.input_abort.lock().take(); }} let Ok(result) = future.await else { @@ -158,9 +151,9 @@ impl Console { let (abort, abort_reg) = AbortHandle::new_pair(); let future = Abortable::new(future, abort_reg); - _ = self.command_abort.lock().expect("locked").insert(abort); + _ = self.command_abort.lock().insert(abort); defer! {{ - _ = self.command_abort.lock().expect("locked").take(); + _ = self.command_abort.lock().take(); }} _ = future.await; @@ -184,20 +177,15 @@ impl Console { } fn set_history(&self, readline: &mut Readline) { - self.history - .lock() - .expect("locked") - .iter() - .rev() - .for_each(|entry| { - readline - .add_history_entry(entry.clone()) - .expect("added history entry"); - }); + self.history.lock().iter().rev().for_each(|entry| { + readline + .add_history_entry(entry.clone()) + .expect("added history entry"); + }); } fn add_history(&self, line: String) { - let mut history = self.history.lock().expect("locked"); + let mut history = self.history.lock(); history.push_front(line); history.truncate(HISTORY_LIMIT); } diff --git a/src/service/rooms/auth_chain/data.rs b/src/service/rooms/auth_chain/data.rs index 8c3588cc..e9e40979 100644 --- a/src/service/rooms/auth_chain/data.rs +++ b/src/service/rooms/auth_chain/data.rs @@ -1,9 +1,6 @@ -use std::{ - mem::size_of, - sync::{Arc, Mutex}, -}; +use std::{mem::size_of, sync::Arc}; -use conduwuit::{Err, Result, err, utils, utils::math::usize_from_f64}; +use conduwuit::{Err, Result, SyncMutex, err, utils, utils::math::usize_from_f64}; use database::Map; use lru_cache::LruCache; @@ -11,7 +8,7 @@ use crate::rooms::short::ShortEventId; pub(super) struct Data { shorteventid_authchain: Arc, - pub(super) auth_chain_cache: Mutex, Arc<[ShortEventId]>>>, + pub(super) auth_chain_cache: SyncMutex, Arc<[ShortEventId]>>>, } impl Data { @@ -23,7 +20,7 @@ impl Data { .expect("valid cache size"); Self { shorteventid_authchain: db["shorteventid_authchain"].clone(), - auth_chain_cache: Mutex::new(LruCache::new(cache_size)), + auth_chain_cache: SyncMutex::new(LruCache::new(cache_size)), } } @@ -34,12 +31,7 @@ impl Data { debug_assert!(!key.is_empty(), "auth_chain key must not be empty"); // Check RAM cache - if let Some(result) = self - .auth_chain_cache - .lock() - .expect("cache locked") - .get_mut(key) - { + if let Some(result) = self.auth_chain_cache.lock().get_mut(key) { return Ok(Arc::clone(result)); } @@ -63,7 +55,6 @@ impl Data { // Cache in RAM self.auth_chain_cache .lock() - .expect("cache locked") .insert(vec![key[0]], Arc::clone(&chain)); Ok(chain) @@ -84,9 +75,6 @@ impl Data { } // Cache in RAM - self.auth_chain_cache - .lock() - .expect("cache locked") - .insert(key, auth_chain); + self.auth_chain_cache.lock().insert(key, auth_chain); } } diff --git a/src/service/rooms/auth_chain/mod.rs b/src/service/rooms/auth_chain/mod.rs index 0903ea75..79d4d070 100644 --- a/src/service/rooms/auth_chain/mod.rs +++ b/src/service/rooms/auth_chain/mod.rs @@ -248,10 +248,10 @@ pub fn cache_auth_chain_vec(&self, key: Vec, auth_chain: &[ShortEventId]) { #[implement(Service)] pub fn get_cache_usage(&self) -> (usize, usize) { - let cache = self.db.auth_chain_cache.lock().expect("locked"); + let cache = self.db.auth_chain_cache.lock(); (cache.len(), cache.capacity()) } #[implement(Service)] -pub fn clear_cache(&self) { self.db.auth_chain_cache.lock().expect("locked").clear(); } +pub fn clear_cache(&self) { self.db.auth_chain_cache.lock().clear(); } diff --git a/src/service/rooms/state_compressor/mod.rs b/src/service/rooms/state_compressor/mod.rs index a33fb342..f7f7d043 100644 --- a/src/service/rooms/state_compressor/mod.rs +++ b/src/service/rooms/state_compressor/mod.rs @@ -2,12 +2,12 @@ use std::{ collections::{BTreeSet, HashMap}, fmt::{Debug, Write}, mem::size_of, - sync::{Arc, Mutex}, + sync::Arc, }; use async_trait::async_trait; use conduwuit::{ - Result, + Result, SyncMutex, arrayvec::ArrayVec, at, checked, err, expected, implement, utils, utils::{bytes, math::usize_from_f64, stream::IterStream}, @@ -23,7 +23,7 @@ use crate::{ }; pub struct Service { - pub stateinfo_cache: Mutex, + pub stateinfo_cache: SyncMutex, db: Data, services: Services, } @@ -86,7 +86,7 @@ impl crate::Service for Service { async fn memory_usage(&self, out: &mut (dyn Write + Send)) -> Result { let (cache_len, ents) = { - let cache = self.stateinfo_cache.lock().expect("locked"); + let cache = self.stateinfo_cache.lock(); let ents = cache.iter().map(at!(1)).flat_map(|vec| vec.iter()).fold( HashMap::new(), |mut ents, ssi| { @@ -110,7 +110,7 @@ impl crate::Service for Service { Ok(()) } - async fn clear_cache(&self) { self.stateinfo_cache.lock().expect("locked").clear(); } + async fn clear_cache(&self) { self.stateinfo_cache.lock().clear(); } fn name(&self) -> &str { crate::service::make_name(std::module_path!()) } } @@ -123,7 +123,7 @@ pub async fn load_shortstatehash_info( &self, shortstatehash: ShortStateHash, ) -> Result { - if let Some(r) = self.stateinfo_cache.lock()?.get_mut(&shortstatehash) { + if let Some(r) = self.stateinfo_cache.lock().get_mut(&shortstatehash) { return Ok(r.clone()); } @@ -152,7 +152,7 @@ async fn cache_shortstatehash_info( shortstatehash: ShortStateHash, stack: ShortStateInfoVec, ) -> Result { - self.stateinfo_cache.lock()?.insert(shortstatehash, stack); + self.stateinfo_cache.lock().insert(shortstatehash, stack); Ok(()) } diff --git a/src/service/sync/mod.rs b/src/service/sync/mod.rs index b095d2c1..6ac579f4 100644 --- a/src/service/sync/mod.rs +++ b/src/service/sync/mod.rs @@ -2,10 +2,10 @@ mod watch; use std::{ collections::{BTreeMap, BTreeSet}, - sync::{Arc, Mutex, Mutex as StdMutex}, + sync::Arc, }; -use conduwuit::{Result, Server}; +use conduwuit::{Result, Server, SyncMutex}; use database::Map; use ruma::{ OwnedDeviceId, OwnedRoomId, OwnedUserId, @@ -62,11 +62,11 @@ struct SnakeSyncCache { extensions: v5::request::Extensions, } -type DbConnections = Mutex>; +type DbConnections = SyncMutex>; type DbConnectionsKey = (OwnedUserId, OwnedDeviceId, String); -type DbConnectionsVal = Arc>; +type DbConnectionsVal = Arc>; type SnakeConnectionsKey = (OwnedUserId, OwnedDeviceId, Option); -type SnakeConnectionsVal = Arc>; +type SnakeConnectionsVal = Arc>; impl crate::Service for Service { fn build(args: crate::Args<'_>) -> Result> { @@ -90,8 +90,8 @@ impl crate::Service for Service { state_cache: args.depend::("rooms::state_cache"), typing: args.depend::("rooms::typing"), }, - connections: StdMutex::new(BTreeMap::new()), - snake_connections: StdMutex::new(BTreeMap::new()), + connections: SyncMutex::new(BTreeMap::new()), + snake_connections: SyncMutex::new(BTreeMap::new()), })) } @@ -100,22 +100,19 @@ impl crate::Service for Service { impl Service { pub fn snake_connection_cached(&self, key: &SnakeConnectionsKey) -> bool { - self.snake_connections - .lock() - .expect("locked") - .contains_key(key) + self.snake_connections.lock().contains_key(key) } pub fn forget_snake_sync_connection(&self, key: &SnakeConnectionsKey) { - self.snake_connections.lock().expect("locked").remove(key); + self.snake_connections.lock().remove(key); } pub fn remembered(&self, key: &DbConnectionsKey) -> bool { - self.connections.lock().expect("locked").contains_key(key) + self.connections.lock().contains_key(key) } pub fn forget_sync_request_connection(&self, key: &DbConnectionsKey) { - self.connections.lock().expect("locked").remove(key); + self.connections.lock().remove(key); } pub fn update_snake_sync_request_with_cache( @@ -123,13 +120,13 @@ impl Service { snake_key: &SnakeConnectionsKey, request: &mut v5::Request, ) -> BTreeMap> { - let mut cache = self.snake_connections.lock().expect("locked"); + let mut cache = self.snake_connections.lock(); let cached = Arc::clone( cache .entry(snake_key.clone()) - .or_insert_with(|| Arc::new(Mutex::new(SnakeSyncCache::default()))), + .or_insert_with(|| Arc::new(SyncMutex::new(SnakeSyncCache::default()))), ); - let cached = &mut cached.lock().expect("locked"); + let cached = &mut cached.lock(); drop(cache); //v5::Request::try_from_http_request(req, path_args); @@ -232,16 +229,16 @@ impl Service { }; let key = into_db_key(key.0.clone(), key.1.clone(), conn_id); - let mut cache = self.connections.lock().expect("locked"); + let mut cache = self.connections.lock(); let cached = Arc::clone(cache.entry(key).or_insert_with(|| { - Arc::new(Mutex::new(SlidingSyncCache { + Arc::new(SyncMutex::new(SlidingSyncCache { lists: BTreeMap::new(), subscriptions: BTreeMap::new(), known_rooms: BTreeMap::new(), extensions: ExtensionsConfig::default(), })) })); - let cached = &mut cached.lock().expect("locked"); + let cached = &mut cached.lock(); drop(cache); for (list_id, list) in &mut request.lists { @@ -328,16 +325,16 @@ impl Service { key: &DbConnectionsKey, subscriptions: BTreeMap, ) { - let mut cache = self.connections.lock().expect("locked"); + let mut cache = self.connections.lock(); let cached = Arc::clone(cache.entry(key.clone()).or_insert_with(|| { - Arc::new(Mutex::new(SlidingSyncCache { + Arc::new(SyncMutex::new(SlidingSyncCache { lists: BTreeMap::new(), subscriptions: BTreeMap::new(), known_rooms: BTreeMap::new(), extensions: ExtensionsConfig::default(), })) })); - let cached = &mut cached.lock().expect("locked"); + let cached = &mut cached.lock(); drop(cache); cached.subscriptions = subscriptions; @@ -350,16 +347,16 @@ impl Service { new_cached_rooms: BTreeSet, globalsince: u64, ) { - let mut cache = self.connections.lock().expect("locked"); + let mut cache = self.connections.lock(); let cached = Arc::clone(cache.entry(key.clone()).or_insert_with(|| { - Arc::new(Mutex::new(SlidingSyncCache { + Arc::new(SyncMutex::new(SlidingSyncCache { lists: BTreeMap::new(), subscriptions: BTreeMap::new(), known_rooms: BTreeMap::new(), extensions: ExtensionsConfig::default(), })) })); - let cached = &mut cached.lock().expect("locked"); + let cached = &mut cached.lock(); drop(cache); for (room_id, lastsince) in cached @@ -386,13 +383,13 @@ impl Service { globalsince: u64, ) { assert!(key.2.is_some(), "Some(conn_id) required for this call"); - let mut cache = self.snake_connections.lock().expect("locked"); + let mut cache = self.snake_connections.lock(); let cached = Arc::clone( cache .entry(key.clone()) - .or_insert_with(|| Arc::new(Mutex::new(SnakeSyncCache::default()))), + .or_insert_with(|| Arc::new(SyncMutex::new(SnakeSyncCache::default()))), ); - let cached = &mut cached.lock().expect("locked"); + let cached = &mut cached.lock(); drop(cache); for (room_id, lastsince) in cached @@ -416,13 +413,13 @@ impl Service { key: &SnakeConnectionsKey, subscriptions: BTreeMap, ) { - let mut cache = self.snake_connections.lock().expect("locked"); + let mut cache = self.snake_connections.lock(); let cached = Arc::clone( cache .entry(key.clone()) - .or_insert_with(|| Arc::new(Mutex::new(SnakeSyncCache::default()))), + .or_insert_with(|| Arc::new(SyncMutex::new(SnakeSyncCache::default()))), ); - let cached = &mut cached.lock().expect("locked"); + let cached = &mut cached.lock(); drop(cache); cached.subscriptions = subscriptions;