1
0
Fork 0
mirror of https://forgejo.ellis.link/continuwuation/continuwuity.git synced 2025-07-27 10:18:30 +00:00

refactor: Replace remaining std Mutexes

This commit is contained in:
Jade Ellis 2025-07-19 22:05:43 +01:00
parent a1d616e3e3
commit 374fb2745c
No known key found for this signature in database
GPG key ID: 8705A2A3EBF77BD2
11 changed files with 83 additions and 117 deletions

View file

@ -71,7 +71,7 @@ pub fn backup_count(&self) -> Result<usize> {
fn backup_engine(&self) -> Result<BackupEngine> { fn backup_engine(&self) -> Result<BackupEngine> {
let path = self.backup_path()?; let path = self.backup_path()?;
let options = BackupEngineOptions::new(path).map_err(map_err)?; let options = BackupEngineOptions::new(path).map_err(map_err)?;
BackupEngine::open(&options, &*self.ctx.env.lock()?).map_err(map_err) BackupEngine::open(&options, &self.ctx.env.lock()).map_err(map_err)
} }
#[implement(Engine)] #[implement(Engine)]

View file

@ -232,7 +232,7 @@ fn get_cache(ctx: &Context, desc: &Descriptor) -> Option<Cache> {
cache_opts.set_num_shard_bits(shard_bits); cache_opts.set_num_shard_bits(shard_bits);
cache_opts.set_capacity(size); cache_opts.set_capacity(size);
let mut caches = ctx.col_cache.lock().expect("locked"); let mut caches = ctx.col_cache.lock();
match desc.cache_disp { match desc.cache_disp {
| CacheDisp::Unique if desc.cache_size == 0 => None, | CacheDisp::Unique if desc.cache_size == 0 => None,
| CacheDisp::Unique => { | CacheDisp::Unique => {

View file

@ -1,9 +1,6 @@
use std::{ use std::{collections::BTreeMap, sync::Arc};
collections::BTreeMap,
sync::{Arc, Mutex},
};
use conduwuit::{Result, Server, debug, utils::math::usize_from_f64}; use conduwuit::{Result, Server, SyncMutex, debug, utils::math::usize_from_f64};
use rocksdb::{Cache, Env, LruCacheOptions}; use rocksdb::{Cache, Env, LruCacheOptions};
use crate::{or_else, pool::Pool}; use crate::{or_else, pool::Pool};
@ -14,9 +11,9 @@ use crate::{or_else, pool::Pool};
/// These assets are housed in the shared Context. /// These assets are housed in the shared Context.
pub(crate) struct Context { pub(crate) struct Context {
pub(crate) pool: Arc<Pool>, pub(crate) pool: Arc<Pool>,
pub(crate) col_cache: Mutex<BTreeMap<String, Cache>>, pub(crate) col_cache: SyncMutex<BTreeMap<String, Cache>>,
pub(crate) row_cache: Mutex<Cache>, pub(crate) row_cache: SyncMutex<Cache>,
pub(crate) env: Mutex<Env>, pub(crate) env: SyncMutex<Env>,
pub(crate) server: Arc<Server>, pub(crate) server: Arc<Server>,
} }
@ -68,7 +65,7 @@ impl Drop for Context {
debug!("Closing frontend pool"); debug!("Closing frontend pool");
self.pool.close(); self.pool.close();
let mut env = self.env.lock().expect("locked"); let mut env = self.env.lock();
debug!("Shutting down background threads"); debug!("Shutting down background threads");
env.set_high_priority_background_threads(0); env.set_high_priority_background_threads(0);

View file

@ -9,7 +9,7 @@ use crate::or_else;
#[implement(Engine)] #[implement(Engine)]
pub fn memory_usage(&self) -> Result<String> { pub fn memory_usage(&self) -> Result<String> {
let mut res = String::new(); let mut res = String::new();
let stats = get_memory_usage_stats(Some(&[&self.db]), Some(&[&*self.ctx.row_cache.lock()?])) let stats = get_memory_usage_stats(Some(&[&self.db]), Some(&[&*self.ctx.row_cache.lock()]))
.or_else(or_else)?; .or_else(or_else)?;
let mibs = |input| f64::from(u32::try_from(input / 1024).unwrap_or(0)) / 1024.0; let mibs = |input| f64::from(u32::try_from(input / 1024).unwrap_or(0)) / 1024.0;
writeln!( writeln!(
@ -19,10 +19,10 @@ pub fn memory_usage(&self) -> Result<String> {
mibs(stats.mem_table_total), mibs(stats.mem_table_total),
mibs(stats.mem_table_unflushed), mibs(stats.mem_table_unflushed),
mibs(stats.mem_table_readers_total), mibs(stats.mem_table_readers_total),
mibs(u64::try_from(self.ctx.row_cache.lock()?.get_usage())?), mibs(u64::try_from(self.ctx.row_cache.lock().get_usage())?),
)?; )?;
for (name, cache) in &*self.ctx.col_cache.lock()? { for (name, cache) in &*self.ctx.col_cache.lock() {
writeln!(res, "{name} cache: {:.2} MiB", mibs(u64::try_from(cache.get_usage())?))?; writeln!(res, "{name} cache: {:.2} MiB", mibs(u64::try_from(cache.get_usage())?))?;
} }

View file

@ -23,11 +23,7 @@ pub(crate) async fn open(ctx: Arc<Context>, desc: &[Descriptor]) -> Result<Arc<S
let config = &server.config; let config = &server.config;
let path = &config.database_path; let path = &config.database_path;
let db_opts = db_options( let db_opts = db_options(config, &ctx.env.lock(), &ctx.row_cache.lock())?;
config,
&ctx.env.lock().expect("environment locked"),
&ctx.row_cache.lock().expect("row cache locked"),
)?;
let cfds = Self::configure_cfds(&ctx, &db_opts, desc)?; let cfds = Self::configure_cfds(&ctx, &db_opts, desc)?;
let num_cfds = cfds.len(); let num_cfds = cfds.len();

View file

@ -3,7 +3,7 @@ mod configure;
use std::{ use std::{
mem::take, mem::take,
sync::{ sync::{
Arc, Mutex, Arc,
atomic::{AtomicUsize, Ordering}, atomic::{AtomicUsize, Ordering},
}, },
thread, thread,
@ -12,7 +12,7 @@ use std::{
use async_channel::{QueueStrategy, Receiver, RecvError, Sender}; use async_channel::{QueueStrategy, Receiver, RecvError, Sender};
use conduwuit::{ use conduwuit::{
Error, Result, Server, debug, err, error, implement, Error, Result, Server, SyncMutex, debug, err, error, implement,
result::DebugInspect, result::DebugInspect,
smallvec::SmallVec, smallvec::SmallVec,
trace, trace,
@ -31,7 +31,7 @@ use crate::{Handle, Map, keyval::KeyBuf, stream};
pub(crate) struct Pool { pub(crate) struct Pool {
server: Arc<Server>, server: Arc<Server>,
queues: Vec<Sender<Cmd>>, queues: Vec<Sender<Cmd>>,
workers: Mutex<Vec<JoinHandle<()>>>, workers: SyncMutex<Vec<JoinHandle<()>>>,
topology: Vec<usize>, topology: Vec<usize>,
busy: AtomicUsize, busy: AtomicUsize,
queued_max: AtomicUsize, queued_max: AtomicUsize,
@ -115,7 +115,7 @@ impl Drop for Pool {
#[implement(Pool)] #[implement(Pool)]
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
pub(crate) fn close(&self) { pub(crate) fn close(&self) {
let workers = take(&mut *self.workers.lock().expect("locked")); let workers = take(&mut *self.workers.lock());
let senders = self.queues.iter().map(Sender::sender_count).sum::<usize>(); let senders = self.queues.iter().map(Sender::sender_count).sum::<usize>();
@ -154,7 +154,7 @@ pub(crate) fn close(&self) {
#[implement(Pool)] #[implement(Pool)]
fn spawn_until(self: &Arc<Self>, recv: &[Receiver<Cmd>], count: usize) -> Result { fn spawn_until(self: &Arc<Self>, recv: &[Receiver<Cmd>], count: usize) -> Result {
let mut workers = self.workers.lock().expect("locked"); let mut workers = self.workers.lock();
while workers.len() < count { while workers.len() < count {
self.clone().spawn_one(&mut workers, recv)?; self.clone().spawn_one(&mut workers, recv)?;
} }

View file

@ -1,11 +1,8 @@
#![cfg(feature = "console")] #![cfg(feature = "console")]
use std::{ use std::{collections::VecDeque, sync::Arc};
collections::VecDeque,
sync::{Arc, Mutex},
};
use conduwuit::{Server, debug, defer, error, log, log::is_systemd_mode}; use conduwuit::{Server, SyncMutex, debug, defer, error, log, log::is_systemd_mode};
use futures::future::{AbortHandle, Abortable}; use futures::future::{AbortHandle, Abortable};
use ruma::events::room::message::RoomMessageEventContent; use ruma::events::room::message::RoomMessageEventContent;
use rustyline_async::{Readline, ReadlineError, ReadlineEvent}; use rustyline_async::{Readline, ReadlineError, ReadlineEvent};
@ -17,10 +14,10 @@ use crate::{Dep, admin};
pub struct Console { pub struct Console {
server: Arc<Server>, server: Arc<Server>,
admin: Dep<admin::Service>, admin: Dep<admin::Service>,
worker_join: Mutex<Option<JoinHandle<()>>>, worker_join: SyncMutex<Option<JoinHandle<()>>>,
input_abort: Mutex<Option<AbortHandle>>, input_abort: SyncMutex<Option<AbortHandle>>,
command_abort: Mutex<Option<AbortHandle>>, command_abort: SyncMutex<Option<AbortHandle>>,
history: Mutex<VecDeque<String>>, history: SyncMutex<VecDeque<String>>,
output: MadSkin, output: MadSkin,
} }
@ -50,7 +47,7 @@ impl Console {
} }
pub async fn start(self: &Arc<Self>) { pub async fn start(self: &Arc<Self>) {
let mut worker_join = self.worker_join.lock().expect("locked"); let mut worker_join = self.worker_join.lock();
if worker_join.is_none() { if worker_join.is_none() {
let self_ = Arc::clone(self); let self_ = Arc::clone(self);
_ = worker_join.insert(self.server.runtime().spawn(self_.worker())); _ = worker_join.insert(self.server.runtime().spawn(self_.worker()));
@ -60,7 +57,7 @@ impl Console {
pub async fn close(self: &Arc<Self>) { pub async fn close(self: &Arc<Self>) {
self.interrupt(); self.interrupt();
let Some(worker_join) = self.worker_join.lock().expect("locked").take() else { let Some(worker_join) = self.worker_join.lock().take() else {
return; return;
}; };
@ -70,22 +67,18 @@ impl Console {
pub fn interrupt(self: &Arc<Self>) { pub fn interrupt(self: &Arc<Self>) {
self.interrupt_command(); self.interrupt_command();
self.interrupt_readline(); self.interrupt_readline();
self.worker_join self.worker_join.lock().as_ref().map(JoinHandle::abort);
.lock()
.expect("locked")
.as_ref()
.map(JoinHandle::abort);
} }
pub fn interrupt_readline(self: &Arc<Self>) { pub fn interrupt_readline(self: &Arc<Self>) {
if let Some(input_abort) = self.input_abort.lock().expect("locked").take() { if let Some(input_abort) = self.input_abort.lock().take() {
debug!("Interrupting console readline..."); debug!("Interrupting console readline...");
input_abort.abort(); input_abort.abort();
} }
} }
pub fn interrupt_command(self: &Arc<Self>) { pub fn interrupt_command(self: &Arc<Self>) {
if let Some(command_abort) = self.command_abort.lock().expect("locked").take() { if let Some(command_abort) = self.command_abort.lock().take() {
debug!("Interrupting console command..."); debug!("Interrupting console command...");
command_abort.abort(); command_abort.abort();
} }
@ -120,7 +113,7 @@ impl Console {
} }
debug!("session ending"); debug!("session ending");
self.worker_join.lock().expect("locked").take(); self.worker_join.lock().take();
} }
async fn readline(self: &Arc<Self>) -> Result<ReadlineEvent, ReadlineError> { async fn readline(self: &Arc<Self>) -> Result<ReadlineEvent, ReadlineError> {
@ -135,9 +128,9 @@ impl Console {
let (abort, abort_reg) = AbortHandle::new_pair(); let (abort, abort_reg) = AbortHandle::new_pair();
let future = Abortable::new(future, abort_reg); let future = Abortable::new(future, abort_reg);
_ = self.input_abort.lock().expect("locked").insert(abort); _ = self.input_abort.lock().insert(abort);
defer! {{ defer! {{
_ = self.input_abort.lock().expect("locked").take(); _ = self.input_abort.lock().take();
}} }}
let Ok(result) = future.await else { let Ok(result) = future.await else {
@ -158,9 +151,9 @@ impl Console {
let (abort, abort_reg) = AbortHandle::new_pair(); let (abort, abort_reg) = AbortHandle::new_pair();
let future = Abortable::new(future, abort_reg); let future = Abortable::new(future, abort_reg);
_ = self.command_abort.lock().expect("locked").insert(abort); _ = self.command_abort.lock().insert(abort);
defer! {{ defer! {{
_ = self.command_abort.lock().expect("locked").take(); _ = self.command_abort.lock().take();
}} }}
_ = future.await; _ = future.await;
@ -184,20 +177,15 @@ impl Console {
} }
fn set_history(&self, readline: &mut Readline) { fn set_history(&self, readline: &mut Readline) {
self.history self.history.lock().iter().rev().for_each(|entry| {
.lock() readline
.expect("locked") .add_history_entry(entry.clone())
.iter() .expect("added history entry");
.rev() });
.for_each(|entry| {
readline
.add_history_entry(entry.clone())
.expect("added history entry");
});
} }
fn add_history(&self, line: String) { fn add_history(&self, line: String) {
let mut history = self.history.lock().expect("locked"); let mut history = self.history.lock();
history.push_front(line); history.push_front(line);
history.truncate(HISTORY_LIMIT); history.truncate(HISTORY_LIMIT);
} }

View file

@ -1,9 +1,6 @@
use std::{ use std::{mem::size_of, sync::Arc};
mem::size_of,
sync::{Arc, Mutex},
};
use conduwuit::{Err, Result, err, utils, utils::math::usize_from_f64}; use conduwuit::{Err, Result, SyncMutex, err, utils, utils::math::usize_from_f64};
use database::Map; use database::Map;
use lru_cache::LruCache; use lru_cache::LruCache;
@ -11,7 +8,7 @@ use crate::rooms::short::ShortEventId;
pub(super) struct Data { pub(super) struct Data {
shorteventid_authchain: Arc<Map>, shorteventid_authchain: Arc<Map>,
pub(super) auth_chain_cache: Mutex<LruCache<Vec<u64>, Arc<[ShortEventId]>>>, pub(super) auth_chain_cache: SyncMutex<LruCache<Vec<u64>, Arc<[ShortEventId]>>>,
} }
impl Data { impl Data {
@ -23,7 +20,7 @@ impl Data {
.expect("valid cache size"); .expect("valid cache size");
Self { Self {
shorteventid_authchain: db["shorteventid_authchain"].clone(), shorteventid_authchain: db["shorteventid_authchain"].clone(),
auth_chain_cache: Mutex::new(LruCache::new(cache_size)), auth_chain_cache: SyncMutex::new(LruCache::new(cache_size)),
} }
} }
@ -34,12 +31,7 @@ impl Data {
debug_assert!(!key.is_empty(), "auth_chain key must not be empty"); debug_assert!(!key.is_empty(), "auth_chain key must not be empty");
// Check RAM cache // Check RAM cache
if let Some(result) = self if let Some(result) = self.auth_chain_cache.lock().get_mut(key) {
.auth_chain_cache
.lock()
.expect("cache locked")
.get_mut(key)
{
return Ok(Arc::clone(result)); return Ok(Arc::clone(result));
} }
@ -63,7 +55,6 @@ impl Data {
// Cache in RAM // Cache in RAM
self.auth_chain_cache self.auth_chain_cache
.lock() .lock()
.expect("cache locked")
.insert(vec![key[0]], Arc::clone(&chain)); .insert(vec![key[0]], Arc::clone(&chain));
Ok(chain) Ok(chain)
@ -84,9 +75,6 @@ impl Data {
} }
// Cache in RAM // Cache in RAM
self.auth_chain_cache self.auth_chain_cache.lock().insert(key, auth_chain);
.lock()
.expect("cache locked")
.insert(key, auth_chain);
} }
} }

View file

@ -248,10 +248,10 @@ pub fn cache_auth_chain_vec(&self, key: Vec<u64>, auth_chain: &[ShortEventId]) {
#[implement(Service)] #[implement(Service)]
pub fn get_cache_usage(&self) -> (usize, usize) { pub fn get_cache_usage(&self) -> (usize, usize) {
let cache = self.db.auth_chain_cache.lock().expect("locked"); let cache = self.db.auth_chain_cache.lock();
(cache.len(), cache.capacity()) (cache.len(), cache.capacity())
} }
#[implement(Service)] #[implement(Service)]
pub fn clear_cache(&self) { self.db.auth_chain_cache.lock().expect("locked").clear(); } pub fn clear_cache(&self) { self.db.auth_chain_cache.lock().clear(); }

View file

@ -2,12 +2,12 @@ use std::{
collections::{BTreeSet, HashMap}, collections::{BTreeSet, HashMap},
fmt::{Debug, Write}, fmt::{Debug, Write},
mem::size_of, mem::size_of,
sync::{Arc, Mutex}, sync::Arc,
}; };
use async_trait::async_trait; use async_trait::async_trait;
use conduwuit::{ use conduwuit::{
Result, Result, SyncMutex,
arrayvec::ArrayVec, arrayvec::ArrayVec,
at, checked, err, expected, implement, utils, at, checked, err, expected, implement, utils,
utils::{bytes, math::usize_from_f64, stream::IterStream}, utils::{bytes, math::usize_from_f64, stream::IterStream},
@ -23,7 +23,7 @@ use crate::{
}; };
pub struct Service { pub struct Service {
pub stateinfo_cache: Mutex<StateInfoLruCache>, pub stateinfo_cache: SyncMutex<StateInfoLruCache>,
db: Data, db: Data,
services: Services, services: Services,
} }
@ -86,7 +86,7 @@ impl crate::Service for Service {
async fn memory_usage(&self, out: &mut (dyn Write + Send)) -> Result { async fn memory_usage(&self, out: &mut (dyn Write + Send)) -> Result {
let (cache_len, ents) = { let (cache_len, ents) = {
let cache = self.stateinfo_cache.lock().expect("locked"); let cache = self.stateinfo_cache.lock();
let ents = cache.iter().map(at!(1)).flat_map(|vec| vec.iter()).fold( let ents = cache.iter().map(at!(1)).flat_map(|vec| vec.iter()).fold(
HashMap::new(), HashMap::new(),
|mut ents, ssi| { |mut ents, ssi| {
@ -110,7 +110,7 @@ impl crate::Service for Service {
Ok(()) Ok(())
} }
async fn clear_cache(&self) { self.stateinfo_cache.lock().expect("locked").clear(); } async fn clear_cache(&self) { self.stateinfo_cache.lock().clear(); }
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) } fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
} }
@ -123,7 +123,7 @@ pub async fn load_shortstatehash_info(
&self, &self,
shortstatehash: ShortStateHash, shortstatehash: ShortStateHash,
) -> Result<ShortStateInfoVec> { ) -> Result<ShortStateInfoVec> {
if let Some(r) = self.stateinfo_cache.lock()?.get_mut(&shortstatehash) { if let Some(r) = self.stateinfo_cache.lock().get_mut(&shortstatehash) {
return Ok(r.clone()); return Ok(r.clone());
} }
@ -152,7 +152,7 @@ async fn cache_shortstatehash_info(
shortstatehash: ShortStateHash, shortstatehash: ShortStateHash,
stack: ShortStateInfoVec, stack: ShortStateInfoVec,
) -> Result { ) -> Result {
self.stateinfo_cache.lock()?.insert(shortstatehash, stack); self.stateinfo_cache.lock().insert(shortstatehash, stack);
Ok(()) Ok(())
} }

View file

@ -2,10 +2,10 @@ mod watch;
use std::{ use std::{
collections::{BTreeMap, BTreeSet}, collections::{BTreeMap, BTreeSet},
sync::{Arc, Mutex, Mutex as StdMutex}, sync::Arc,
}; };
use conduwuit::{Result, Server}; use conduwuit::{Result, Server, SyncMutex};
use database::Map; use database::Map;
use ruma::{ use ruma::{
OwnedDeviceId, OwnedRoomId, OwnedUserId, OwnedDeviceId, OwnedRoomId, OwnedUserId,
@ -62,11 +62,11 @@ struct SnakeSyncCache {
extensions: v5::request::Extensions, extensions: v5::request::Extensions,
} }
type DbConnections<K, V> = Mutex<BTreeMap<K, V>>; type DbConnections<K, V> = SyncMutex<BTreeMap<K, V>>;
type DbConnectionsKey = (OwnedUserId, OwnedDeviceId, String); type DbConnectionsKey = (OwnedUserId, OwnedDeviceId, String);
type DbConnectionsVal = Arc<Mutex<SlidingSyncCache>>; type DbConnectionsVal = Arc<SyncMutex<SlidingSyncCache>>;
type SnakeConnectionsKey = (OwnedUserId, OwnedDeviceId, Option<String>); type SnakeConnectionsKey = (OwnedUserId, OwnedDeviceId, Option<String>);
type SnakeConnectionsVal = Arc<Mutex<SnakeSyncCache>>; type SnakeConnectionsVal = Arc<SyncMutex<SnakeSyncCache>>;
impl crate::Service for Service { impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> { fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
@ -90,8 +90,8 @@ impl crate::Service for Service {
state_cache: args.depend::<rooms::state_cache::Service>("rooms::state_cache"), state_cache: args.depend::<rooms::state_cache::Service>("rooms::state_cache"),
typing: args.depend::<rooms::typing::Service>("rooms::typing"), typing: args.depend::<rooms::typing::Service>("rooms::typing"),
}, },
connections: StdMutex::new(BTreeMap::new()), connections: SyncMutex::new(BTreeMap::new()),
snake_connections: StdMutex::new(BTreeMap::new()), snake_connections: SyncMutex::new(BTreeMap::new()),
})) }))
} }
@ -100,22 +100,19 @@ impl crate::Service for Service {
impl Service { impl Service {
pub fn snake_connection_cached(&self, key: &SnakeConnectionsKey) -> bool { pub fn snake_connection_cached(&self, key: &SnakeConnectionsKey) -> bool {
self.snake_connections self.snake_connections.lock().contains_key(key)
.lock()
.expect("locked")
.contains_key(key)
} }
pub fn forget_snake_sync_connection(&self, key: &SnakeConnectionsKey) { pub fn forget_snake_sync_connection(&self, key: &SnakeConnectionsKey) {
self.snake_connections.lock().expect("locked").remove(key); self.snake_connections.lock().remove(key);
} }
pub fn remembered(&self, key: &DbConnectionsKey) -> bool { pub fn remembered(&self, key: &DbConnectionsKey) -> bool {
self.connections.lock().expect("locked").contains_key(key) self.connections.lock().contains_key(key)
} }
pub fn forget_sync_request_connection(&self, key: &DbConnectionsKey) { pub fn forget_sync_request_connection(&self, key: &DbConnectionsKey) {
self.connections.lock().expect("locked").remove(key); self.connections.lock().remove(key);
} }
pub fn update_snake_sync_request_with_cache( pub fn update_snake_sync_request_with_cache(
@ -123,13 +120,13 @@ impl Service {
snake_key: &SnakeConnectionsKey, snake_key: &SnakeConnectionsKey,
request: &mut v5::Request, request: &mut v5::Request,
) -> BTreeMap<String, BTreeMap<OwnedRoomId, u64>> { ) -> BTreeMap<String, BTreeMap<OwnedRoomId, u64>> {
let mut cache = self.snake_connections.lock().expect("locked"); let mut cache = self.snake_connections.lock();
let cached = Arc::clone( let cached = Arc::clone(
cache cache
.entry(snake_key.clone()) .entry(snake_key.clone())
.or_insert_with(|| Arc::new(Mutex::new(SnakeSyncCache::default()))), .or_insert_with(|| Arc::new(SyncMutex::new(SnakeSyncCache::default()))),
); );
let cached = &mut cached.lock().expect("locked"); let cached = &mut cached.lock();
drop(cache); drop(cache);
//v5::Request::try_from_http_request(req, path_args); //v5::Request::try_from_http_request(req, path_args);
@ -232,16 +229,16 @@ impl Service {
}; };
let key = into_db_key(key.0.clone(), key.1.clone(), conn_id); let key = into_db_key(key.0.clone(), key.1.clone(), conn_id);
let mut cache = self.connections.lock().expect("locked"); let mut cache = self.connections.lock();
let cached = Arc::clone(cache.entry(key).or_insert_with(|| { let cached = Arc::clone(cache.entry(key).or_insert_with(|| {
Arc::new(Mutex::new(SlidingSyncCache { Arc::new(SyncMutex::new(SlidingSyncCache {
lists: BTreeMap::new(), lists: BTreeMap::new(),
subscriptions: BTreeMap::new(), subscriptions: BTreeMap::new(),
known_rooms: BTreeMap::new(), known_rooms: BTreeMap::new(),
extensions: ExtensionsConfig::default(), extensions: ExtensionsConfig::default(),
})) }))
})); }));
let cached = &mut cached.lock().expect("locked"); let cached = &mut cached.lock();
drop(cache); drop(cache);
for (list_id, list) in &mut request.lists { for (list_id, list) in &mut request.lists {
@ -328,16 +325,16 @@ impl Service {
key: &DbConnectionsKey, key: &DbConnectionsKey,
subscriptions: BTreeMap<OwnedRoomId, sync_events::v4::RoomSubscription>, subscriptions: BTreeMap<OwnedRoomId, sync_events::v4::RoomSubscription>,
) { ) {
let mut cache = self.connections.lock().expect("locked"); let mut cache = self.connections.lock();
let cached = Arc::clone(cache.entry(key.clone()).or_insert_with(|| { let cached = Arc::clone(cache.entry(key.clone()).or_insert_with(|| {
Arc::new(Mutex::new(SlidingSyncCache { Arc::new(SyncMutex::new(SlidingSyncCache {
lists: BTreeMap::new(), lists: BTreeMap::new(),
subscriptions: BTreeMap::new(), subscriptions: BTreeMap::new(),
known_rooms: BTreeMap::new(), known_rooms: BTreeMap::new(),
extensions: ExtensionsConfig::default(), extensions: ExtensionsConfig::default(),
})) }))
})); }));
let cached = &mut cached.lock().expect("locked"); let cached = &mut cached.lock();
drop(cache); drop(cache);
cached.subscriptions = subscriptions; cached.subscriptions = subscriptions;
@ -350,16 +347,16 @@ impl Service {
new_cached_rooms: BTreeSet<OwnedRoomId>, new_cached_rooms: BTreeSet<OwnedRoomId>,
globalsince: u64, globalsince: u64,
) { ) {
let mut cache = self.connections.lock().expect("locked"); let mut cache = self.connections.lock();
let cached = Arc::clone(cache.entry(key.clone()).or_insert_with(|| { let cached = Arc::clone(cache.entry(key.clone()).or_insert_with(|| {
Arc::new(Mutex::new(SlidingSyncCache { Arc::new(SyncMutex::new(SlidingSyncCache {
lists: BTreeMap::new(), lists: BTreeMap::new(),
subscriptions: BTreeMap::new(), subscriptions: BTreeMap::new(),
known_rooms: BTreeMap::new(), known_rooms: BTreeMap::new(),
extensions: ExtensionsConfig::default(), extensions: ExtensionsConfig::default(),
})) }))
})); }));
let cached = &mut cached.lock().expect("locked"); let cached = &mut cached.lock();
drop(cache); drop(cache);
for (room_id, lastsince) in cached for (room_id, lastsince) in cached
@ -386,13 +383,13 @@ impl Service {
globalsince: u64, globalsince: u64,
) { ) {
assert!(key.2.is_some(), "Some(conn_id) required for this call"); assert!(key.2.is_some(), "Some(conn_id) required for this call");
let mut cache = self.snake_connections.lock().expect("locked"); let mut cache = self.snake_connections.lock();
let cached = Arc::clone( let cached = Arc::clone(
cache cache
.entry(key.clone()) .entry(key.clone())
.or_insert_with(|| Arc::new(Mutex::new(SnakeSyncCache::default()))), .or_insert_with(|| Arc::new(SyncMutex::new(SnakeSyncCache::default()))),
); );
let cached = &mut cached.lock().expect("locked"); let cached = &mut cached.lock();
drop(cache); drop(cache);
for (room_id, lastsince) in cached for (room_id, lastsince) in cached
@ -416,13 +413,13 @@ impl Service {
key: &SnakeConnectionsKey, key: &SnakeConnectionsKey,
subscriptions: BTreeMap<OwnedRoomId, v5::request::RoomSubscription>, subscriptions: BTreeMap<OwnedRoomId, v5::request::RoomSubscription>,
) { ) {
let mut cache = self.snake_connections.lock().expect("locked"); let mut cache = self.snake_connections.lock();
let cached = Arc::clone( let cached = Arc::clone(
cache cache
.entry(key.clone()) .entry(key.clone())
.or_insert_with(|| Arc::new(Mutex::new(SnakeSyncCache::default()))), .or_insert_with(|| Arc::new(SyncMutex::new(SnakeSyncCache::default()))),
); );
let cached = &mut cached.lock().expect("locked"); let cached = &mut cached.lock();
drop(cache); drop(cache);
cached.subscriptions = subscriptions; cached.subscriptions = subscriptions;