From 0753076e947a48638853b3ef2c7936bc462dca08 Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Sat, 3 Jul 2021 21:19:49 +0200 Subject: [PATCH] chutulu is my copilot --- Cargo.lock | 1 + Cargo.toml | 3 +- src/database.rs | 12 +- src/database/abstraction.rs | 1 + src/database/abstraction/sqlite.rs | 282 ++++++++++++++++++++++++----- src/database/media.rs | 10 +- src/database/rooms.rs | 8 +- 7 files changed, 262 insertions(+), 55 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9266c131..9a035427 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -255,6 +255,7 @@ dependencies = [ "image", "jsonwebtoken", "log", + "num_cpus", "lru-cache", "opentelemetry", "opentelemetry-jaeger", diff --git a/Cargo.toml b/Cargo.toml index 17b1841d..e7bb3b8d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,13 +77,14 @@ lru-cache = "0.1.2" rusqlite = { version = "0.25.3", optional = true } parking_lot = { version = "0.11.1", optional = true } crossbeam = { version = "0.8.1", optional = true } +num_cpus = { version = "1.13.0", optional = true } [features] default = ["conduit_bin", "backend_sqlite"] backend_sled = ["sled"] backend_rocksdb = ["rocksdb"] backend_sqlite = ["sqlite"] -sqlite = ["rusqlite", "parking_lot", "crossbeam"] +sqlite = ["rusqlite", "parking_lot", "crossbeam", "num_cpus"] conduit_bin = [] # TODO: add rocket to this when it is optional [[bin]] diff --git a/src/database.rs b/src/database.rs index 76e4ed0c..de4f4412 100644 --- a/src/database.rs +++ b/src/database.rs @@ -93,6 +93,7 @@ pub type Engine = abstraction::rocksdb::RocksDbEngine; pub type Engine = abstraction::sqlite::SqliteEngine; pub struct Database { + _db: Arc, pub globals: globals::Globals, pub users: users::Users, pub uiaa: uiaa::Uiaa, @@ -132,6 +133,7 @@ impl Database { let (sending_sender, sending_receiver) = mpsc::unbounded(); let db = Arc::new(Self { + _db: builder.clone(), users: users::Users { userid_password: builder.open_tree("userid_password")?, userid_displayname: builder.open_tree("userid_displayname")?, @@ -421,8 +423,12 @@ impl Database { } pub async fn flush(&self) -> Result<()> { - // noop while we don't use sled 1.0 - //self._db.flush_async().await?; - Ok(()) + let start = std::time::Instant::now(); + + let res = self._db.flush(); + + log::debug!("flush: took {:?}", start.elapsed()); + + res } } diff --git a/src/database/abstraction.rs b/src/database/abstraction.rs index 1ab5dead..fb11ba0b 100644 --- a/src/database/abstraction.rs +++ b/src/database/abstraction.rs @@ -15,6 +15,7 @@ pub mod sqlite; pub trait DatabaseEngine: Sized { fn open(config: &Config) -> Result>; fn open_tree(self: &Arc, name: &'static str) -> Result>; + fn flush(self: &Arc) -> Result<()>; } pub trait Tree: Send + Sync { diff --git a/src/database/abstraction/sqlite.rs b/src/database/abstraction/sqlite.rs index 35078f94..be5ce6c2 100644 --- a/src/database/abstraction/sqlite.rs +++ b/src/database/abstraction/sqlite.rs @@ -1,19 +1,27 @@ -use std::{future::Future, pin::Pin, sync::Arc, thread}; +use std::{ + future::Future, + ops::Deref, + path::{Path, PathBuf}, + pin::Pin, + sync::{Arc, Weak}, + thread, + time::{Duration, Instant}, +}; use crate::{database::Config, Result}; use super::{DatabaseEngine, Tree}; -use std::{collections::BTreeMap, sync::RwLock}; +use std::collections::BTreeMap; + +use log::debug; use crossbeam::channel::{bounded, Sender as ChannelSender}; -use parking_lot::{Mutex, MutexGuard}; -use rusqlite::{params, Connection, OptionalExtension}; +use parking_lot::{Mutex, MutexGuard, RwLock}; +use rusqlite::{params, Connection, DatabaseName::Main, OptionalExtension}; use tokio::sync::oneshot::Sender; -type SqliteHandle = Arc>; - // const SQL_CREATE_TABLE: &str = // "CREATE TABLE IF NOT EXISTS {} {{ \"key\" BLOB PRIMARY KEY, \"value\" BLOB NOT NULL }}"; // const SQL_SELECT: &str = "SELECT value FROM {} WHERE key = ?"; @@ -25,23 +33,137 @@ type SqliteHandle = Arc>; // const SQL_SELECT_ITER_FROM_BACKWARDS: &str = // "SELECT key, value FROM {} WHERE key <= ? ORDER BY DESC"; +struct Pool { + writer: Mutex, + readers: Vec>, + reader_rwlock: RwLock<()>, + path: PathBuf, +} + +pub const MILLI: Duration = Duration::from_millis(1); + +enum HoldingConn<'a> { + FromGuard(MutexGuard<'a, Connection>), + FromOwned(Connection), +} + +impl<'a> Deref for HoldingConn<'a> { + type Target = Connection; + + fn deref(&self) -> &Self::Target { + match self { + HoldingConn::FromGuard(guard) => guard.deref(), + HoldingConn::FromOwned(conn) => conn, + } + } +} + +impl Pool { + fn new>(path: P, num_readers: usize) -> Result { + let writer = Mutex::new(Self::prepare_conn(&path)?); + + let mut readers = Vec::new(); + + for _ in 0..num_readers { + readers.push(Mutex::new(Self::prepare_conn(&path)?)) + } + + Ok(Self { + writer, + readers, + reader_rwlock: RwLock::new(()), + path: path.as_ref().to_path_buf(), + }) + } + + fn prepare_conn>(path: P) -> Result { + let conn = Connection::open(path)?; + + conn.pragma_update(Some(Main), "journal_mode", &"WAL".to_owned())?; + + // conn.pragma_update(Some(Main), "wal_autocheckpoint", &250)?; + + // conn.pragma_update(Some(Main), "wal_checkpoint", &"FULL".to_owned())?; + + conn.pragma_update(Some(Main), "synchronous", &"OFF".to_owned())?; + + Ok(conn) + } + + fn write_lock(&self) -> MutexGuard<'_, Connection> { + self.writer.lock() + } + + fn read_lock(&self) -> HoldingConn<'_> { + let _guard = self.reader_rwlock.read(); + + for r in &self.readers { + if let Some(reader) = r.try_lock() { + return HoldingConn::FromGuard(reader); + } + } + + drop(_guard); + + log::warn!("all readers locked, creating spillover reader..."); + + let spilled = Self::prepare_conn(&self.path).unwrap(); + + return HoldingConn::FromOwned(spilled); + } +} + pub struct SqliteEngine { - handle: SqliteHandle, + pool: Pool, + iterator_lock: RwLock<()>, } impl DatabaseEngine for SqliteEngine { fn open(config: &Config) -> Result> { - let conn = Connection::open(format!("{}/conduit.db", &config.database_path))?; + let pool = Pool::new( + format!("{}/conduit.db", &config.database_path), + num_cpus::get(), + )?; - conn.pragma_update(None, "journal_mode", &"WAL".to_owned())?; + pool.write_lock() + .execute("CREATE TABLE IF NOT EXISTS _noop (\"key\" INT)", params![])?; - let handle = Arc::new(Mutex::new(conn)); + let arc = Arc::new(SqliteEngine { + pool, + iterator_lock: RwLock::new(()), + }); - Ok(Arc::new(SqliteEngine { handle })) + let weak: Weak = Arc::downgrade(&arc); + + thread::spawn(move || { + let r = crossbeam::channel::tick(Duration::from_secs(60)); + + let weak = weak; + + loop { + let _ = r.recv(); + + if let Some(arc) = Weak::upgrade(&weak) { + log::warn!("wal-trunc: locking..."); + let iterator_guard = arc.iterator_lock.write(); + let read_guard = arc.pool.reader_rwlock.write(); + log::warn!("wal-trunc: locked, flushing..."); + let start = Instant::now(); + arc.flush_wal().unwrap(); + log::warn!("wal-trunc: locked, flushed in {:?}", start.elapsed()); + drop(read_guard); + drop(iterator_guard); + } else { + break; + } + } + }); + + Ok(arc) } fn open_tree(self: &Arc, name: &str) -> Result> { - self.handle.lock().execute(format!("CREATE TABLE IF NOT EXISTS {} ( \"key\" BLOB PRIMARY KEY, \"value\" BLOB NOT NULL )", name).as_str(), [])?; + self.pool.write_lock().execute(format!("CREATE TABLE IF NOT EXISTS {} ( \"key\" BLOB PRIMARY KEY, \"value\" BLOB NOT NULL )", name).as_str(), [])?; Ok(Arc::new(SqliteTable { engine: Arc::clone(self), @@ -49,6 +171,40 @@ impl DatabaseEngine for SqliteEngine { watchers: RwLock::new(BTreeMap::new()), })) } + + fn flush(self: &Arc) -> Result<()> { + self.pool + .write_lock() + .execute_batch( + " + PRAGMA synchronous=FULL; + BEGIN; + DELETE FROM _noop; + INSERT INTO _noop VALUES (1); + COMMIT; + PRAGMA synchronous=OFF; + ", + ) + .map_err(Into::into) + } +} + +impl SqliteEngine { + fn flush_wal(self: &Arc) -> Result<()> { + self.pool + .write_lock() + .execute_batch( + " + PRAGMA synchronous=FULL; PRAGMA wal_checkpoint=TRUNCATE; + BEGIN; + DELETE FROM _noop; + INSERT INTO _noop VALUES (1); + COMMIT; + PRAGMA wal_checkpoint=PASSIVE; PRAGMA synchronous=OFF; + ", + ) + .map_err(Into::into) + } } pub struct SqliteTable { @@ -60,26 +216,17 @@ pub struct SqliteTable { type TupleOfBytes = (Vec, Vec); impl SqliteTable { - fn get_with_guard( - &self, - guard: &MutexGuard<'_, Connection>, - key: &[u8], - ) -> Result>> { + fn get_with_guard(&self, guard: &Connection, key: &[u8]) -> Result>> { Ok(guard .prepare(format!("SELECT value FROM {} WHERE key = ?", self.name).as_str())? .query_row([key], |row| row.get(0)) .optional()?) } - fn insert_with_guard( - &self, - guard: &MutexGuard<'_, Connection>, - key: &[u8], - value: &[u8], - ) -> Result<()> { + fn insert_with_guard(&self, guard: &Connection, key: &[u8], value: &[u8]) -> Result<()> { guard.execute( format!( - "INSERT OR REPLACE INTO {} (key, value) VALUES (?, ?)", + "INSERT INTO {} (key, value) VALUES (?, ?) ON CONFLICT(key) DO UPDATE SET value = excluded.value", self.name ) .as_str(), @@ -88,18 +235,18 @@ impl SqliteTable { Ok(()) } - fn _iter_from_thread( - &self, - mutex: Arc>, - f: F, - ) -> Box + Send> + fn _iter_from_thread(&self, f: F) -> Box + Send> where - F: (FnOnce(MutexGuard<'_, Connection>, ChannelSender)) + Send + 'static, + F: (for<'a> FnOnce(&'a Connection, ChannelSender)) + Send + 'static, { let (s, r) = bounded::(5); + let engine = self.engine.clone(); + thread::spawn(move || { - let _ = f(mutex.lock(), s); + let guard = engine.iterator_lock.read(); + let _ = f(&engine.pool.read_lock(), s); + drop(guard); }); Box::new(r.into_iter()) @@ -108,7 +255,7 @@ impl SqliteTable { macro_rules! iter_from_thread { ($self:expr, $sql:expr, $param:expr) => { - $self._iter_from_thread($self.engine.handle.clone(), move |guard, s| { + $self._iter_from_thread(move |guard, s| { let _ = guard .prepare($sql) .unwrap() @@ -122,13 +269,33 @@ macro_rules! iter_from_thread { impl Tree for SqliteTable { fn get(&self, key: &[u8]) -> Result>> { - self.get_with_guard(&self.engine.handle.lock(), key) + let guard = self.engine.pool.read_lock(); + + // let start = Instant::now(); + + let val = self.get_with_guard(&guard, key); + + // debug!("get: took {:?}", start.elapsed()); + // debug!("get key: {:?}", &key) + + val } fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { - self.insert_with_guard(&self.engine.handle.lock(), key, value)?; + { + let guard = self.engine.pool.write_lock(); - let watchers = self.watchers.read().unwrap(); + let start = Instant::now(); + + self.insert_with_guard(&guard, key, value)?; + + let elapsed = start.elapsed(); + if elapsed > MILLI { + debug!("insert: took {:012?} : {}", elapsed, &self.name); + } + } + + let watchers = self.watchers.read(); let mut triggered = Vec::new(); for length in 0..=key.len() { @@ -140,7 +307,7 @@ impl Tree for SqliteTable { drop(watchers); if !triggered.is_empty() { - let mut watchers = self.watchers.write().unwrap(); + let mut watchers = self.watchers.write(); for prefix in triggered { if let Some(txs) = watchers.remove(prefix) { for tx in txs { @@ -154,10 +321,22 @@ impl Tree for SqliteTable { } fn remove(&self, key: &[u8]) -> Result<()> { - self.engine.handle.lock().execute( + let guard = self.engine.pool.write_lock(); + + let start = Instant::now(); + + guard.execute( format!("DELETE FROM {} WHERE key = ?", self.name).as_str(), [key], )?; + + let elapsed = start.elapsed(); + + if elapsed > MILLI { + debug!("remove: took {:012?} : {}", elapsed, &self.name); + } + // debug!("remove key: {:?}", &key); + Ok(()) } @@ -201,7 +380,9 @@ impl Tree for SqliteTable { } fn increment(&self, key: &[u8]) -> Result> { - let guard = self.engine.handle.lock(); + let guard = self.engine.pool.write_lock(); + + let start = Instant::now(); let old = self.get_with_guard(&guard, key)?; @@ -210,11 +391,16 @@ impl Tree for SqliteTable { self.insert_with_guard(&guard, key, &new)?; + let elapsed = start.elapsed(); + + if elapsed > MILLI { + debug!("increment: took {:012?} : {}", elapsed, &self.name); + } + // debug!("increment key: {:?}", &key); + Ok(new) } - // TODO: make this use take_while - fn scan_prefix<'a>( &'a self, prefix: Vec, @@ -229,7 +415,10 @@ impl Tree for SqliteTable { // .as_str(), // [prefix] // ) - Box::new(self.iter_from(&prefix, false).take_while(move |(key, _)| key.starts_with(&prefix))) + Box::new( + self.iter_from(&prefix, false) + .take_while(move |(key, _)| key.starts_with(&prefix)), + ) } fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin + Send + 'a>> { @@ -237,7 +426,6 @@ impl Tree for SqliteTable { self.watchers .write() - .unwrap() .entry(prefix.to_vec()) .or_default() .push(tx); @@ -249,10 +437,12 @@ impl Tree for SqliteTable { } fn clear(&self) -> Result<()> { - self.engine.handle.lock().execute( - format!("DELETE FROM {}", self.name).as_str(), - [], - )?; + debug!("clear: running"); + self.engine + .pool + .write_lock() + .execute(format!("DELETE FROM {}", self.name).as_str(), [])?; + debug!("clear: ran"); Ok(()) } } diff --git a/src/database/media.rs b/src/database/media.rs index a1fe26e1..404a6c01 100644 --- a/src/database/media.rs +++ b/src/database/media.rs @@ -189,7 +189,10 @@ impl Media { original_prefix.extend_from_slice(&0_u32.to_be_bytes()); // Height = 0 if it's not a thumbnail original_prefix.push(0xff); - if let Some((key, _)) = self.mediaid_file.scan_prefix(thumbnail_prefix).next() { + if let Some((key, _)) = { + /* scoped to explicitly drop iterator */ + self.mediaid_file.scan_prefix(thumbnail_prefix).next() + } { // Using saved thumbnail let path = globals.get_media_file(&key); let mut file = Vec::new(); @@ -224,7 +227,10 @@ impl Media { content_type, file: file.to_vec(), })) - } else if let Some((key, _)) = self.mediaid_file.scan_prefix(original_prefix).next() { + } else if let Some((key, _)) = { + /* scoped to explicitly drop iterator */ + self.mediaid_file.scan_prefix(original_prefix).next() + } { // Generate a thumbnail let path = globals.get_media_file(&key); let mut file = Vec::new(); diff --git a/src/database/rooms.rs b/src/database/rooms.rs index eb1a9244..23cd5702 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -733,6 +733,8 @@ impl Rooms { .filter(|user_id| user_id.server_name() == db.globals.server_name()) .filter(|user_id| !db.users.is_deactivated(user_id).unwrap_or(false)) .filter(|user_id| self.is_joined(&user_id, &pdu.room_id).unwrap_or(false)) + .collect::>() + /* to consume iterator */ { // Don't notify the user of their own events if user == pdu.sender { @@ -1209,13 +1211,13 @@ impl Rooms { redacts, } = pdu_builder; // TODO: Make sure this isn't called twice in parallel - let prev_events = dbg!(self + let prev_events = self .get_pdu_leaves(&room_id)? .into_iter() .take(20) - .collect::>()); + .collect::>(); - let create_event = dbg!(self.room_state_get(&room_id, &EventType::RoomCreate, ""))?; + let create_event = self.room_state_get(&room_id, &EventType::RoomCreate, "")?; let create_event_content = create_event .as_ref()