diff --git a/src/database.rs b/src/database.rs index a6ac67f8..ff508a75 100644 --- a/src/database.rs +++ b/src/database.rs @@ -1,3 +1,34 @@ +use std::{ + collections::{BTreeMap, HashMap, HashSet}, + convert::{TryFrom, TryInto}, + fs::{self, remove_dir_all}, + io::Write, + mem::size_of, + ops::Deref, + path::Path, + sync::{Arc, Mutex, RwLock}, +}; +use std::hash::Hash; + +use directories::ProjectDirs; +use lru_cache::LruCache; +use rocket::{ + futures::{channel::mpsc, stream::FuturesUnordered, StreamExt}, + outcome::{IntoOutcome, try_outcome}, + request::{FromRequest, Request}, + Shutdown, State, +}; +use ruma::{DeviceId, EventId, RoomId, ServerName, UserId}; +use serde::{de::IgnoredAny, Deserialize}; +use tokio::sync::{OwnedRwLockReadGuard, RwLock as TokioRwLock, Semaphore}; +use tracing::{debug, error, warn}; + +use abstraction::DatabaseEngine; + +use crate::{Error, Result, utils}; + +use self::proxy::ProxyConfig; + pub mod abstraction; pub mod account_data; @@ -14,33 +45,6 @@ pub mod transaction_ids; pub mod uiaa; pub mod users; -use crate::{utils, Error, Result}; -use abstraction::DatabaseEngine; -use directories::ProjectDirs; -use lru_cache::LruCache; -use rocket::{ - futures::{channel::mpsc, stream::FuturesUnordered, StreamExt}, - outcome::{try_outcome, IntoOutcome}, - request::{FromRequest, Request}, - Shutdown, State, -}; -use ruma::{DeviceId, EventId, RoomId, ServerName, UserId}; -use serde::{de::IgnoredAny, Deserialize}; -use std::{ - collections::{BTreeMap, HashMap, HashSet}, - convert::{TryFrom, TryInto}, - fs::{self, remove_dir_all}, - io::Write, - mem::size_of, - ops::Deref, - path::Path, - sync::{Arc, Mutex, RwLock}, -}; -use tokio::sync::{OwnedRwLockReadGuard, RwLock as TokioRwLock, Semaphore}; -use tracing::{debug, error, warn}; - -use self::proxy::ProxyConfig; - #[derive(Clone, Debug, Deserialize)] pub struct Config { server_name: Box, @@ -132,6 +136,17 @@ pub type Engine = abstraction::sqlite::Engine; #[cfg(feature = "heed")] pub type Engine = abstraction::heed::Engine; +// for each key: (memory_usage in bytes, items in cache, capacity) +pub struct CacheUsageStatistics { + pdu_cache: (usize, usize, usize), + auth_chain_cache: (usize, usize, usize), + shorteventid_cache: (usize, usize, usize), + eventidshort_cache: (usize, usize, usize), + statekeyshort_cache: (usize, usize, usize), + shortstatekey_cache: (usize, usize, usize), + stateinfo_cache: (usize, usize, usize), +} + pub struct Database { _db: Arc, pub globals: globals::Globals, @@ -163,27 +178,27 @@ impl Database { fn check_sled_or_sqlite_db(config: &Config) -> Result<()> { #[cfg(feature = "backend_sqlite")] - { - let path = Path::new(&config.database_path); + { + let path = Path::new(&config.database_path); - let sled_exists = path.join("db").exists(); - let sqlite_exists = path.join("conduit.db").exists(); - if sled_exists { - if sqlite_exists { - // most likely an in-place directory, only warn - warn!("Both sled and sqlite databases are detected in database directory"); - warn!("Currently running from the sqlite database, but consider removing sled database files to free up space") - } else { - error!( + let sled_exists = path.join("db").exists(); + let sqlite_exists = path.join("conduit.db").exists(); + if sled_exists { + if sqlite_exists { + // most likely an in-place directory, only warn + warn!("Both sled and sqlite databases are detected in database directory"); + warn!("Currently running from the sqlite database, but consider removing sled database files to free up space") + } else { + error!( "Sled database detected, conduit now uses sqlite for database operations" ); - error!("This database must be converted to sqlite, go to https://github.com/ShadowJonathan/conduit_toolbox#conduit_sled_to_sqlite"); - return Err(Error::bad_config( - "sled database detected, migrate to sqlite", - )); + error!("This database must be converted to sqlite, go to https://github.com/ShadowJonathan/conduit_toolbox#conduit_sled_to_sqlite"); + return Err(Error::bad_config( + "sled database detected, migrate to sqlite", + )); + } } } - } Ok(()) } @@ -728,9 +743,9 @@ impl Database { drop(guard); #[cfg(feature = "sqlite")] - { - Self::start_wal_clean_task(Arc::clone(&db), &config).await; - } + { + Self::start_wal_clean_task(Arc::clone(&db), &config).await; + } Ok(db) } @@ -881,7 +896,7 @@ impl Database { tokio::spawn(async move { let mut i = interval(timer_interval); #[cfg(unix)] - let mut s = signal(SignalKind::hangup()).unwrap(); + let mut s = signal(SignalKind::hangup()).unwrap(); loop { #[cfg(unix)] @@ -892,12 +907,13 @@ impl Database { _ = s.recv() => { info!("wal-trunc: Received SIGHUP"); } - }; - #[cfg(not(unix))] - { - i.tick().await; - info!("wal-trunc: Timer ticked") } + ; + #[cfg(not(unix))] + { + i.tick().await; + info!("wal-trunc: Timer ticked") + } let start = Instant::now(); if let Err(e) = db.read().await.flush_wal() { @@ -908,6 +924,65 @@ impl Database { } }); } + + /// Measures memory usage in bytes and how full the caches are in percent for all caches in the Database struct. + pub fn get_cache_usage(&mut self) -> Result { + fn memory_usage_of_locked_cache(cache: &mut Mutex>) -> usize { + let raw_cache = cache.lock().unwrap(); + let mut cache_items_size_sum: usize = 0; + for cache_item in raw_cache.iter() { + cache_items_size_sum += std::mem::size_of_val(&cache_item); + } + cache_items_size_sum += std::mem::size_of_val(&cache); + cache_items_size_sum + } + + fn items_in_locked_cache(cache: &mut Mutex>) -> usize { + cache.lock().unwrap().len() + } + fn capacity_of_locked_cache(cache: &mut Mutex>) -> usize { + cache.lock().unwrap().capacity() + } + + return + Ok(CacheUsageStatistics { + pdu_cache: ( + memory_usage_of_locked_cache(&mut self.rooms.pdu_cache), + items_in_locked_cache(&mut self.rooms.pdu_cache), + capacity_of_locked_cache(&mut self.rooms.pdu_cache) + ), + auth_chain_cache: ( + memory_usage_of_locked_cache(&mut self.rooms.auth_chain_cache), + items_in_locked_cache(&mut self.rooms.auth_chain_cache), + capacity_of_locked_cache(&mut self.rooms.auth_chain_cache) + ), + shorteventid_cache: ( + memory_usage_of_locked_cache(&mut self.rooms.shorteventid_cache), + items_in_locked_cache(&mut self.rooms.shorteventid_cache), + capacity_of_locked_cache(&mut self.rooms.shorteventid_cache) + ), + eventidshort_cache: ( + memory_usage_of_locked_cache(&mut self.rooms.eventidshort_cache), + items_in_locked_cache(&mut self.rooms.eventidshort_cache), + capacity_of_locked_cache(&mut self.rooms.eventidshort_cache) + ), + statekeyshort_cache: ( + memory_usage_of_locked_cache(&mut self.rooms.statekeyshort_cache), + items_in_locked_cache(&mut self.rooms.statekeyshort_cache), + capacity_of_locked_cache(&mut self.rooms.statekeyshort_cache) + ), + shortstatekey_cache: ( + memory_usage_of_locked_cache(&mut self.rooms.shortstatekey_cache), + items_in_locked_cache(&mut self.rooms.shortstatekey_cache), + capacity_of_locked_cache(&mut self.rooms.shortstatekey_cache) + ), + stateinfo_cache: ( + memory_usage_of_locked_cache(&mut self.rooms.stateinfo_cache), + items_in_locked_cache(&mut self.rooms.stateinfo_cache), + capacity_of_locked_cache(&mut self.rooms.stateinfo_cache) + ), + }); + } } pub struct DatabaseGuard(OwnedRwLockReadGuard); diff --git a/src/database/admin.rs b/src/database/admin.rs index 424e6746..d2641a8c 100644 --- a/src/database/admin.rs +++ b/src/database/admin.rs @@ -3,19 +3,21 @@ use std::{ sync::Arc, }; -use crate::{pdu::PduBuilder, Database}; use rocket::futures::{channel::mpsc, stream::StreamExt}; use ruma::{ - events::{room::message, EventType}, + events::{EventType, room::message}, UserId, }; -use tokio::sync::{MutexGuard, RwLock, RwLockReadGuard}; +use tokio::sync::{MutexGuard, RwLock, RwLockWriteGuard}; use tracing::warn; +use crate::{Database, pdu::PduBuilder}; + pub enum AdminCommand { RegisterAppservice(serde_yaml::Value), ListAppservices, SendMessage(message::MessageEventContent), + ShowCacheUsage, } #[derive(Clone)] @@ -59,7 +61,7 @@ impl Admin { drop(guard); let send_message = |message: message::MessageEventContent, - guard: RwLockReadGuard<'_, Database>, + guard: RwLockWriteGuard<'_, Database>, mutex_lock: &MutexGuard<'_, ()>| { guard .rooms @@ -83,7 +85,7 @@ impl Admin { loop { tokio::select! { Some(event) = receiver.next() => { - let guard = db.read().await; + let mut guard = db.write().await; let mutex_state = Arc::clone( guard.globals .roomid_mutex_state @@ -92,6 +94,7 @@ impl Admin { .entry(conduit_room.clone()) .or_default(), ); + let state_lock = mutex_state.lock().await; match event { @@ -114,6 +117,37 @@ impl Admin { AdminCommand::SendMessage(message) => { send_message(message, guard, &state_lock); } + AdminCommand::ShowCacheUsage => { + + fn format_cache_statistics_triple(name: String, triple: (usize, usize, usize)) -> String { + let (memory_usage, item_count, capacity) = triple; + format!( + "{0} is using {1} MB ({2} bytes) of RAM at {3:.2}% utilization.", + name, + memory_usage / 100_00, + memory_usage , + ((item_count as f32 / capacity as f32) * 100.0) + ) + } + + if let Ok(cache_usage_statistics) = guard.get_cache_usage() { + + let mut statistics_lines = Vec::with_capacity(7); + statistics_lines.push(format_cache_statistics_triple("pdu_cache".to_string(), cache_usage_statistics.pdu_cache)); + statistics_lines.push(format_cache_statistics_triple("auth_chain_cache".to_string(), cache_usage_statistics.auth_chain_cache)); + statistics_lines.push(format_cache_statistics_triple("shorteventid_cache".to_string(), cache_usage_statistics.shorteventid_cache)); + statistics_lines.push(format_cache_statistics_triple("eventidshort_cache".to_string(), cache_usage_statistics.eventidshort_cache)); + statistics_lines.push(format_cache_statistics_triple("statekeyshort_cache".to_string(), cache_usage_statistics.statekeyshort_cache)); + statistics_lines.push(format_cache_statistics_triple("shortstatekey_cache".to_string(), cache_usage_statistics.shortstatekey_cache)); + statistics_lines.push(format_cache_statistics_triple("stateinfo_cache".to_string(), cache_usage_statistics.stateinfo_cache)); + + send_message(message::MessageEventContent::text_plain(statistics_lines.join("\n")), guard, &state_lock); + } else { + let result_text = "Could not calculate database cache size"; + send_message(message::MessageEventContent::text_plain(result_text), guard, &state_lock); + } + + } } drop(state_lock); diff --git a/src/database/rooms.rs b/src/database/rooms.rs index 0d99c52b..c445845a 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -1,38 +1,40 @@ -mod edus; - -pub use edus::RoomEdus; -use member::MembershipState; - -use crate::{pdu::PduBuilder, utils, Database, Error, PduEvent, Result}; -use lru_cache::LruCache; -use regex::Regex; -use ring::digest; -use rocket::http::RawStr; -use ruma::{ - api::{client::error::ErrorKind, federation}, - events::{ - ignored_user_list, push_rules, - room::{ - create::CreateEventContent, member, message, power_levels::PowerLevelsEventContent, - }, - AnyStrippedStateEvent, AnySyncStateEvent, EventType, - }, - push::{self, Action, Tweak}, - serde::{CanonicalJsonObject, CanonicalJsonValue, Raw}, - state_res::{self, RoomVersion, StateMap}, - uint, EventId, RoomAliasId, RoomId, RoomVersionId, ServerName, UserId, -}; use std::{ collections::{BTreeMap, HashMap, HashSet}, convert::{TryFrom, TryInto}, mem::size_of, sync::{Arc, Mutex}, }; + +use lru_cache::LruCache; +use member::MembershipState; +use regex::Regex; +use ring::digest; +use rocket::http::RawStr; +use ruma::{ + api::{client::error::ErrorKind, federation}, + EventId, + events::{ + AnyStrippedStateEvent, AnySyncStateEvent, + EventType, + ignored_user_list, push_rules, room::{ + create::CreateEventContent, member, message, power_levels::PowerLevelsEventContent, + }, + }, + push::{self, Action, Tweak}, + RoomAliasId, + RoomId, RoomVersionId, serde::{CanonicalJsonObject, CanonicalJsonValue, Raw}, ServerName, state_res::{self, RoomVersion, StateMap}, uint, UserId, +}; use tokio::sync::MutexGuard; use tracing::{error, warn}; +pub use edus::RoomEdus; + +use crate::{Database, Error, pdu::PduBuilder, PduEvent, Result, utils}; + use super::{abstraction::Tree, admin::AdminCommand, pusher}; +mod edus; + /// The unique identifier of each state group. /// /// This is created when a state group is added to the database by @@ -1563,10 +1565,13 @@ impl Rooms { )); } } + "show_cache_usage" => { + db.admin.send(AdminCommand::ShowCacheUsage); + } _ => { db.admin.send(AdminCommand::SendMessage( message::MessageEventContent::text_plain(format!( - "Unrecognized command: {}", + "Unrecognized command: {}\nAvailable commands:\n- register_appservice\n- list_appservices\n- get_pdu\n- show_cache_usage", command )), ));