From 22e3416745b20f85a93b9e5e5fefb39fce1ceca8 Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Sat, 3 Jul 2021 01:13:50 +0200 Subject: [PATCH] YEET --- Cargo.lock | 141 ++++++++++++- Cargo.toml | 8 +- docker-compose.yml | 8 +- rjbench_testing/docker-compose.yml | 32 +++ src/database.rs | 7 +- src/database/abstraction.rs | 296 +--------------------------- src/database/abstraction/rocksdb.rs | 176 +++++++++++++++++ src/database/abstraction/sled.rs | 115 +++++++++++ src/database/abstraction/sqlite.rs | 273 +++++++++++++++++++++++++ src/database/account_data.rs | 2 +- src/database/appservice.rs | 4 +- src/database/pusher.rs | 2 +- src/database/rooms.rs | 18 +- src/database/sending.rs | 2 +- src/error.rs | 6 + 15 files changed, 774 insertions(+), 316 deletions(-) create mode 100644 rjbench_testing/docker-compose.yml create mode 100644 src/database/abstraction/rocksdb.rs create mode 100644 src/database/abstraction/sled.rs create mode 100644 src/database/abstraction/sqlite.rs diff --git a/Cargo.lock b/Cargo.lock index 76e727e3..9266c131 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6,6 +6,17 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" +[[package]] +name = "ahash" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43bb833f0bf979d8475d38fbf09ed3b8a55e1885fe93ad3f93239fc6a4f17b98" +dependencies = [ + "getrandom 0.2.3", + "once_cell", + "version_check", +] + [[package]] name = "aho-corasick" version = "0.7.15" @@ -238,6 +249,7 @@ version = "0.1.0" dependencies = [ "base64 0.13.0", "bytes", + "crossbeam", "directories", "http", "image", @@ -246,6 +258,7 @@ dependencies = [ "lru-cache", "opentelemetry", "opentelemetry-jaeger", + "parking_lot", "pretty_env_logger", "rand 0.8.3", "regex", @@ -254,6 +267,7 @@ dependencies = [ "rocket", "rocksdb", "ruma", + "rusqlite", "rust-argon2", "rustls", "rustls-native-certs", @@ -340,10 +354,45 @@ dependencies = [ ] [[package]] -name = "crossbeam-epoch" -version = "0.9.3" +name = "crossbeam" +version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2584f639eb95fea8c798496315b297cf81b9b58b6d30ab066a75455333cf4b12" +checksum = "4ae5588f6b3c3cb05239e90bd110f257254aecd01e4635400391aeae07497845" +dependencies = [ + "cfg-if 1.0.0", + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-channel" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4" +dependencies = [ + "cfg-if 1.0.0", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-deque" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94af6efb46fef72616855b036a624cf27ba656ffc9be1b9a3c931cfc7749a9a9" +dependencies = [ + "cfg-if 1.0.0", + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ec02e091aa634e2c3ada4a392989e7c3116673ef0ac5b72232439094d73b7fd" dependencies = [ "cfg-if 1.0.0", "crossbeam-utils", @@ -353,12 +402,21 @@ dependencies = [ ] [[package]] -name = "crossbeam-utils" -version = "0.8.3" +name = "crossbeam-queue" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7e9d99fa91428effe99c5c6d4634cdeba32b8cf784fc428a2a687f61a952c49" +checksum = "9b10ddc024425c88c2ad148c1b0fd53f4c6d38db9697c9f1588381212fa657c9" +dependencies = [ + "cfg-if 1.0.0", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db" dependencies = [ - "autocfg", "cfg-if 1.0.0", "lazy_static", ] @@ -547,6 +605,18 @@ dependencies = [ "termcolor", ] +[[package]] +name = "fallible-iterator" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" + +[[package]] +name = "fallible-streaming-iterator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" + [[package]] name = "figment" version = "0.10.5" @@ -774,6 +844,24 @@ version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d7afe4a420e3fe79967a00898cc1f4db7c8a49a9333a29f8a4bd76a253d5cd04" +[[package]] +name = "hashbrown" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" +dependencies = [ + "ahash", +] + +[[package]] +name = "hashlink" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7249a3129cbc1ffccd74857f81464a323a152173cdb134e0fd81bc803b29facf" +dependencies = [ + "hashbrown 0.11.2", +] + [[package]] name = "heck" version = "0.3.2" @@ -920,7 +1008,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "824845a0bf897a9042383849b02c1bc219c2383772efcd5c6f9766fa4b81aef3" dependencies = [ "autocfg", - "hashbrown", + "hashbrown 0.9.1", "serde", ] @@ -1083,6 +1171,16 @@ dependencies = [ "libc", ] +[[package]] +name = "libsqlite3-sys" +version = "0.22.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "290b64917f8b0cb885d9de0f9959fe1f775d7fa12f1da2db9001c1c8ab60f89d" +dependencies = [ + "pkg-config", + "vcpkg", +] + [[package]] name = "linked-hash-map" version = "0.5.4" @@ -1484,6 +1582,12 @@ dependencies = [ "zeroize", ] +[[package]] +name = "pkg-config" +version = "0.3.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3831453b3449ceb48b6d9c7ad7c96d5ea673e9b470a1dc578c2ce6521230884c" + [[package]] name = "png" version = "0.16.8" @@ -2136,6 +2240,21 @@ dependencies = [ "tracing", ] +[[package]] +name = "rusqlite" +version = "0.25.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57adcf67c8faaf96f3248c2a7b419a0dbc52ebe36ba83dd57fe83827c1ea4eb3" +dependencies = [ + "bitflags", + "fallible-iterator", + "fallible-streaming-iterator", + "hashlink", + "libsqlite3-sys", + "memchr", + "smallvec", +] + [[package]] name = "rust-argon2" version = "0.8.3" @@ -3007,6 +3126,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "vcpkg" +version = "0.2.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "025ce40a007e1907e58d5bc1a594def78e5573bb0b1160bc389634e8f12e4faa" + [[package]] name = "version_check" version = "0.9.3" diff --git a/Cargo.toml b/Cargo.toml index 426d242c..17b1841d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ tokio = "1.2.0" # Used for storing data permanently sled = { version = "0.34.6", features = ["compression", "no_metrics"], optional = true } rocksdb = { version = "0.16.0", features = ["multi-threaded-cf"], optional = true } +# sqlx = { version = "0.5.5", features = ["sqlite", "runtime-tokio-rustls"], optional = true } #sled = { git = "https://github.com/spacejam/sled.git", rev = "e4640e0773595229f398438886f19bca6f7326a2", features = ["compression"] } # Used for the http request / response body type for Ruma endpoints used with reqwest @@ -73,11 +74,16 @@ tracing-opentelemetry = "0.11.0" opentelemetry-jaeger = "0.11.0" pretty_env_logger = "0.4.0" lru-cache = "0.1.2" +rusqlite = { version = "0.25.3", optional = true } +parking_lot = { version = "0.11.1", optional = true } +crossbeam = { version = "0.8.1", optional = true } [features] -default = ["conduit_bin", "backend_sled"] +default = ["conduit_bin", "backend_sqlite"] backend_sled = ["sled"] backend_rocksdb = ["rocksdb"] +backend_sqlite = ["sqlite"] +sqlite = ["rusqlite", "parking_lot", "crossbeam"] conduit_bin = [] # TODO: add rocket to this when it is optional [[bin]] diff --git a/docker-compose.yml b/docker-compose.yml index cfc24628..fe13fdc8 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -14,13 +14,13 @@ services: args: CREATED: '2021-03-16T08:18:27Z' VERSION: '0.1.0' - LOCAL: 'false' + LOCAL: 'true' GIT_REF: origin/master restart: unless-stopped ports: - 8448:8000 volumes: - - db:/srv/conduit/.local/share/conduit + - ./target/db:/srv/conduit/.local/share/conduit ### Uncomment if you want to use conduit.toml to configure Conduit ### Note: Set env vars will override conduit.toml values # - ./conduit.toml:/srv/conduit/conduit.toml @@ -55,5 +55,5 @@ services: # depends_on: # - homeserver -volumes: - db: +# volumes: +# db: diff --git a/rjbench_testing/docker-compose.yml b/rjbench_testing/docker-compose.yml new file mode 100644 index 00000000..44da5b55 --- /dev/null +++ b/rjbench_testing/docker-compose.yml @@ -0,0 +1,32 @@ +# Conduit +version: '3' + +services: + homeserver: + image: ubuntu:21.04 + restart: unless-stopped + working_dir: "/srv/conduit" + entrypoint: /srv/conduit/conduit + ports: + - 8448:8000 + volumes: + - ../target/db:/srv/conduit/.local/share/conduit + - ../target/debug/conduit:/srv/conduit/conduit + - ./conduit.toml:/srv/conduit/conduit.toml:ro + environment: + # CONDUIT_SERVER_NAME: localhost:8000 # replace with your own name + # CONDUIT_TRUSTED_SERVERS: '["matrix.org"]' + ### Uncomment and change values as desired + # CONDUIT_ADDRESS: 127.0.0.1 + # CONDUIT_PORT: 8000 + CONDUIT_CONFIG: '/srv/conduit/conduit.toml' + # Available levels are: error, warn, info, debug, trace - more info at: https://docs.rs/env_logger/*/env_logger/#enabling-logging + # CONDUIT_LOG: debug # default is: "info,rocket=off,_=off,sled=off" + # CONDUIT_ALLOW_JAEGER: 'false' + # CONDUIT_ALLOW_REGISTRATION : 'false' + # CONDUIT_ALLOW_ENCRYPTION: 'false' + # CONDUIT_ALLOW_FEDERATION: 'false' + # CONDUIT_DATABASE_PATH: /srv/conduit/.local/share/conduit + # CONDUIT_WORKERS: 10 + # CONDUIT_MAX_REQUEST_SIZE: 20_000_000 # in bytes, ~20 MB + diff --git a/src/database.rs b/src/database.rs index ec4052cb..76e4ed0c 100644 --- a/src/database.rs +++ b/src/database.rs @@ -84,10 +84,13 @@ fn default_log() -> String { } #[cfg(feature = "sled")] -pub type Engine = abstraction::SledEngine; +pub type Engine = abstraction::sled::SledEngine; #[cfg(feature = "rocksdb")] -pub type Engine = abstraction::RocksDbEngine; +pub type Engine = abstraction::rocksdb::RocksDbEngine; + +#[cfg(feature = "sqlite")] +pub type Engine = abstraction::sqlite::SqliteEngine; pub struct Database { pub globals: globals::Globals, diff --git a/src/database/abstraction.rs b/src/database/abstraction.rs index f81c9def..1ab5dead 100644 --- a/src/database/abstraction.rs +++ b/src/database/abstraction.rs @@ -1,24 +1,16 @@ use super::Config; -use crate::{utils, Result}; -use log::warn; +use crate::Result; + use std::{future::Future, pin::Pin, sync::Arc}; #[cfg(feature = "rocksdb")] -use std::{collections::BTreeMap, sync::RwLock}; +pub mod rocksdb; #[cfg(feature = "sled")] -pub struct SledEngine(sled::Db); -#[cfg(feature = "sled")] -pub struct SledEngineTree(sled::Tree); +pub mod sled; -#[cfg(feature = "rocksdb")] -pub struct RocksDbEngine(rocksdb::DBWithThreadMode); -#[cfg(feature = "rocksdb")] -pub struct RocksDbEngineTree<'a> { - db: Arc, - name: &'a str, - watchers: RwLock, Vec>>>, -} +#[cfg(feature = "sqlite")] +pub mod sqlite; pub trait DatabaseEngine: Sized { fn open(config: &Config) -> Result>; @@ -32,20 +24,20 @@ pub trait Tree: Send + Sync { fn remove(&self, key: &[u8]) -> Result<()>; - fn iter<'a>(&'a self) -> Box, Box<[u8]>)> + Send + Sync + 'a>; + fn iter<'a>(&'a self) -> Box, Vec)> + Send + 'a>; fn iter_from<'a>( &'a self, from: &[u8], backwards: bool, - ) -> Box, Box<[u8]>)> + 'a>; + ) -> Box, Vec)> + Send + 'a>; fn increment(&self, key: &[u8]) -> Result>; fn scan_prefix<'a>( &'a self, prefix: Vec, - ) -> Box, Box<[u8]>)> + Send + 'a>; + ) -> Box, Vec)> + Send + 'a>; fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin + Send + 'a>>; @@ -57,273 +49,3 @@ pub trait Tree: Send + Sync { Ok(()) } } - -#[cfg(feature = "sled")] -impl DatabaseEngine for SledEngine { - fn open(config: &Config) -> Result> { - Ok(Arc::new(SledEngine( - sled::Config::default() - .path(&config.database_path) - .cache_capacity(config.cache_capacity as u64) - .use_compression(true) - .open()?, - ))) - } - - fn open_tree(self: &Arc, name: &'static str) -> Result> { - Ok(Arc::new(SledEngineTree(self.0.open_tree(name)?))) - } -} - -#[cfg(feature = "sled")] -impl Tree for SledEngineTree { - fn get(&self, key: &[u8]) -> Result>> { - Ok(self.0.get(key)?.map(|v| v.to_vec())) - } - - fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { - self.0.insert(key, value)?; - Ok(()) - } - - fn remove(&self, key: &[u8]) -> Result<()> { - self.0.remove(key)?; - Ok(()) - } - - fn iter<'a>(&'a self) -> Box, Box<[u8]>)> + Send + Sync + 'a> { - Box::new( - self.0 - .iter() - .filter_map(|r| { - if let Err(e) = &r { - warn!("Error: {}", e); - } - r.ok() - }) - .map(|(k, v)| (k.to_vec().into(), v.to_vec().into())), - ) - } - - fn iter_from( - &self, - from: &[u8], - backwards: bool, - ) -> Box, Box<[u8]>)>> { - let iter = if backwards { - self.0.range(..from) - } else { - self.0.range(from..) - }; - - let iter = iter - .filter_map(|r| { - if let Err(e) = &r { - warn!("Error: {}", e); - } - r.ok() - }) - .map(|(k, v)| (k.to_vec().into(), v.to_vec().into())); - - if backwards { - Box::new(iter.rev()) - } else { - Box::new(iter) - } - } - - fn increment(&self, key: &[u8]) -> Result> { - Ok(self - .0 - .update_and_fetch(key, utils::increment) - .map(|o| o.expect("increment always sets a value").to_vec())?) - } - - fn scan_prefix<'a>( - &'a self, - prefix: Vec, - ) -> Box, Box<[u8]>)> + Send + 'a> { - let iter = self - .0 - .scan_prefix(prefix) - .filter_map(|r| { - if let Err(e) = &r { - warn!("Error: {}", e); - } - r.ok() - }) - .map(|(k, v)| (k.to_vec().into(), v.to_vec().into())); - - Box::new(iter) - } - - fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin + Send + 'a>> { - let prefix = prefix.to_vec(); - Box::pin(async move { - self.0.watch_prefix(prefix).await; - }) - } -} - -#[cfg(feature = "rocksdb")] -impl DatabaseEngine for RocksDbEngine { - fn open(config: &Config) -> Result> { - let mut db_opts = rocksdb::Options::default(); - db_opts.create_if_missing(true); - db_opts.set_max_open_files(16); - db_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level); - db_opts.set_compression_type(rocksdb::DBCompressionType::Snappy); - db_opts.set_target_file_size_base(256 << 20); - db_opts.set_write_buffer_size(256 << 20); - - let mut block_based_options = rocksdb::BlockBasedOptions::default(); - block_based_options.set_block_size(512 << 10); - db_opts.set_block_based_table_factory(&block_based_options); - - let cfs = rocksdb::DBWithThreadMode::::list_cf( - &db_opts, - &config.database_path, - ) - .unwrap_or_default(); - - let mut options = rocksdb::Options::default(); - options.set_merge_operator_associative("increment", utils::increment_rocksdb); - - let db = rocksdb::DBWithThreadMode::::open_cf_descriptors( - &db_opts, - &config.database_path, - cfs.iter() - .map(|name| rocksdb::ColumnFamilyDescriptor::new(name, options.clone())), - )?; - - Ok(Arc::new(RocksDbEngine(db))) - } - - fn open_tree(self: &Arc, name: &'static str) -> Result> { - let mut options = rocksdb::Options::default(); - options.set_merge_operator_associative("increment", utils::increment_rocksdb); - - // Create if it doesn't exist - let _ = self.0.create_cf(name, &options); - - Ok(Arc::new(RocksDbEngineTree { - name, - db: Arc::clone(self), - watchers: RwLock::new(BTreeMap::new()), - })) - } -} - -#[cfg(feature = "rocksdb")] -impl RocksDbEngineTree<'_> { - fn cf(&self) -> rocksdb::BoundColumnFamily<'_> { - self.db.0.cf_handle(self.name).unwrap() - } -} - -#[cfg(feature = "rocksdb")] -impl Tree for RocksDbEngineTree<'_> { - fn get(&self, key: &[u8]) -> Result>> { - Ok(self.db.0.get_cf(self.cf(), key)?) - } - - fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { - let watchers = self.watchers.read().unwrap(); - let mut triggered = Vec::new(); - - for length in 0..=key.len() { - if watchers.contains_key(&key[..length]) { - triggered.push(&key[..length]); - } - } - - drop(watchers); - - if !triggered.is_empty() { - let mut watchers = self.watchers.write().unwrap(); - for prefix in triggered { - if let Some(txs) = watchers.remove(prefix) { - for tx in txs { - let _ = tx.send(()); - } - } - } - } - - Ok(self.db.0.put_cf(self.cf(), key, value)?) - } - - fn remove(&self, key: &[u8]) -> Result<()> { - Ok(self.db.0.delete_cf(self.cf(), key)?) - } - - fn iter<'a>(&'a self) -> Box, Box<[u8]>)> + Send + Sync + 'a> { - Box::new( - self.db - .0 - .iterator_cf(self.cf(), rocksdb::IteratorMode::Start), - ) - } - - fn iter_from<'a>( - &'a self, - from: &[u8], - backwards: bool, - ) -> Box, Box<[u8]>)> + 'a> { - Box::new(self.db.0.iterator_cf( - self.cf(), - rocksdb::IteratorMode::From( - from, - if backwards { - rocksdb::Direction::Reverse - } else { - rocksdb::Direction::Forward - }, - ), - )) - } - - fn increment(&self, key: &[u8]) -> Result> { - let stats = rocksdb::perf::get_memory_usage_stats(Some(&[&self.db.0]), None).unwrap(); - dbg!(stats.mem_table_total); - dbg!(stats.mem_table_unflushed); - dbg!(stats.mem_table_readers_total); - dbg!(stats.cache_total); - // TODO: atomic? - let old = self.get(key)?; - let new = utils::increment(old.as_deref()).unwrap(); - self.insert(key, &new)?; - Ok(new) - } - - fn scan_prefix<'a>( - &'a self, - prefix: Vec, - ) -> Box, Box<[u8]>)> + Send + 'a> { - Box::new( - self.db - .0 - .iterator_cf( - self.cf(), - rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward), - ) - .take_while(move |(k, _)| k.starts_with(&prefix)), - ) - } - - fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin + Send + 'a>> { - let (tx, rx) = tokio::sync::oneshot::channel(); - - self.watchers - .write() - .unwrap() - .entry(prefix.to_vec()) - .or_default() - .push(tx); - - Box::pin(async move { - // Tx is never destroyed - rx.await.unwrap(); - }) - } -} diff --git a/src/database/abstraction/rocksdb.rs b/src/database/abstraction/rocksdb.rs new file mode 100644 index 00000000..88b6297c --- /dev/null +++ b/src/database/abstraction/rocksdb.rs @@ -0,0 +1,176 @@ +use super::super::Config; +use crate::{utils, Result}; + +use std::{future::Future, pin::Pin, sync::Arc}; + +use super::{DatabaseEngine, Tree}; + +use std::{collections::BTreeMap, sync::RwLock}; + +pub struct RocksDbEngine(rocksdb::DBWithThreadMode); + +pub struct RocksDbEngineTree<'a> { + db: Arc, + name: &'a str, + watchers: RwLock, Vec>>>, +} + +impl DatabaseEngine for RocksDbEngine { + fn open(config: &Config) -> Result> { + let mut db_opts = rocksdb::Options::default(); + db_opts.create_if_missing(true); + db_opts.set_max_open_files(16); + db_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level); + db_opts.set_compression_type(rocksdb::DBCompressionType::Snappy); + db_opts.set_target_file_size_base(256 << 20); + db_opts.set_write_buffer_size(256 << 20); + + let mut block_based_options = rocksdb::BlockBasedOptions::default(); + block_based_options.set_block_size(512 << 10); + db_opts.set_block_based_table_factory(&block_based_options); + + let cfs = rocksdb::DBWithThreadMode::::list_cf( + &db_opts, + &config.database_path, + ) + .unwrap_or_default(); + + let mut options = rocksdb::Options::default(); + options.set_merge_operator_associative("increment", utils::increment_rocksdb); + + let db = rocksdb::DBWithThreadMode::::open_cf_descriptors( + &db_opts, + &config.database_path, + cfs.iter() + .map(|name| rocksdb::ColumnFamilyDescriptor::new(name, options.clone())), + )?; + + Ok(Arc::new(RocksDbEngine(db))) + } + + fn open_tree(self: &Arc, name: &'static str) -> Result> { + let mut options = rocksdb::Options::default(); + options.set_merge_operator_associative("increment", utils::increment_rocksdb); + + // Create if it doesn't exist + let _ = self.0.create_cf(name, &options); + + Ok(Arc::new(RocksDbEngineTree { + name, + db: Arc::clone(self), + watchers: RwLock::new(BTreeMap::new()), + })) + } +} + +impl RocksDbEngineTree<'_> { + fn cf(&self) -> rocksdb::BoundColumnFamily<'_> { + self.db.0.cf_handle(self.name).unwrap() + } +} + +impl Tree for RocksDbEngineTree<'_> { + fn get(&self, key: &[u8]) -> Result>> { + Ok(self.db.0.get_cf(self.cf(), key)?) + } + + fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { + let watchers = self.watchers.read().unwrap(); + let mut triggered = Vec::new(); + + for length in 0..=key.len() { + if watchers.contains_key(&key[..length]) { + triggered.push(&key[..length]); + } + } + + drop(watchers); + + if !triggered.is_empty() { + let mut watchers = self.watchers.write().unwrap(); + for prefix in triggered { + if let Some(txs) = watchers.remove(prefix) { + for tx in txs { + let _ = tx.send(()); + } + } + } + } + + Ok(self.db.0.put_cf(self.cf(), key, value)?) + } + + fn remove(&self, key: &[u8]) -> Result<()> { + Ok(self.db.0.delete_cf(self.cf(), key)?) + } + + fn iter<'a>(&'a self) -> Box, Vec)> + Send + Sync + 'a> { + Box::new( + self.db + .0 + .iterator_cf(self.cf(), rocksdb::IteratorMode::Start), + ) + } + + fn iter_from<'a>( + &'a self, + from: &[u8], + backwards: bool, + ) -> Box, Vec)> + 'a> { + Box::new(self.db.0.iterator_cf( + self.cf(), + rocksdb::IteratorMode::From( + from, + if backwards { + rocksdb::Direction::Reverse + } else { + rocksdb::Direction::Forward + }, + ), + )) + } + + fn increment(&self, key: &[u8]) -> Result> { + let stats = rocksdb::perf::get_memory_usage_stats(Some(&[&self.db.0]), None).unwrap(); + dbg!(stats.mem_table_total); + dbg!(stats.mem_table_unflushed); + dbg!(stats.mem_table_readers_total); + dbg!(stats.cache_total); + // TODO: atomic? + let old = self.get(key)?; + let new = utils::increment(old.as_deref()).unwrap(); + self.insert(key, &new)?; + Ok(new) + } + + fn scan_prefix<'a>( + &'a self, + prefix: Vec, + ) -> Box, Vec)> + Send + 'a> { + Box::new( + self.db + .0 + .iterator_cf( + self.cf(), + rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward), + ) + .take_while(move |(k, _)| k.starts_with(&prefix)), + ) + } + + fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin + Send + 'a>> { + let (tx, rx) = tokio::sync::oneshot::channel(); + + self.watchers + .write() + .unwrap() + .entry(prefix.to_vec()) + .or_default() + .push(tx); + + Box::pin(async move { + // Tx is never destroyed + rx.await.unwrap(); + }) + } +} diff --git a/src/database/abstraction/sled.rs b/src/database/abstraction/sled.rs new file mode 100644 index 00000000..2f3fb346 --- /dev/null +++ b/src/database/abstraction/sled.rs @@ -0,0 +1,115 @@ +use super::super::Config; +use crate::{utils, Result}; +use log::warn; +use std::{future::Future, pin::Pin, sync::Arc}; + +use super::{DatabaseEngine, Tree}; + +pub struct SledEngine(sled::Db); + +pub struct SledEngineTree(sled::Tree); + +impl DatabaseEngine for SledEngine { + fn open(config: &Config) -> Result> { + Ok(Arc::new(SledEngine( + sled::Config::default() + .path(&config.database_path) + .cache_capacity(config.cache_capacity as u64) + .use_compression(true) + .open()?, + ))) + } + + fn open_tree(self: &Arc, name: &'static str) -> Result> { + Ok(Arc::new(SledEngineTree(self.0.open_tree(name)?))) + } +} + +impl Tree for SledEngineTree { + fn get(&self, key: &[u8]) -> Result>> { + Ok(self.0.get(key)?.map(|v| v.to_vec())) + } + + fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { + self.0.insert(key, value)?; + Ok(()) + } + + fn remove(&self, key: &[u8]) -> Result<()> { + self.0.remove(key)?; + Ok(()) + } + + fn iter<'a>(&'a self) -> Box, Vec)> + Send + Sync + 'a> { + Box::new( + self.0 + .iter() + .filter_map(|r| { + if let Err(e) = &r { + warn!("Error: {}", e); + } + r.ok() + }) + .map(|(k, v)| (k.to_vec().into(), v.to_vec().into())), + ) + } + + fn iter_from( + &self, + from: &[u8], + backwards: bool, + ) -> Box, Vec)>> { + let iter = if backwards { + self.0.range(..from) + } else { + self.0.range(from..) + }; + + let iter = iter + .filter_map(|r| { + if let Err(e) = &r { + warn!("Error: {}", e); + } + r.ok() + }) + .map(|(k, v)| (k.to_vec().into(), v.to_vec().into())); + + if backwards { + Box::new(iter.rev()) + } else { + Box::new(iter) + } + } + + fn increment(&self, key: &[u8]) -> Result> { + Ok(self + .0 + .update_and_fetch(key, utils::increment) + .map(|o| o.expect("increment always sets a value").to_vec())?) + } + + fn scan_prefix<'a>( + &'a self, + prefix: Vec, + ) -> Box, Vec)> + Send + 'a> { + let iter = self + .0 + .scan_prefix(prefix) + .filter_map(|r| { + if let Err(e) = &r { + warn!("Error: {}", e); + } + r.ok() + }) + .map(|(k, v)| (k.to_vec().into(), v.to_vec().into())); + + Box::new(iter) + } + + fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin + Send + 'a>> { + let prefix = prefix.to_vec(); + Box::pin(async move { + self.0.watch_prefix(prefix).await; + }) + } +} diff --git a/src/database/abstraction/sqlite.rs b/src/database/abstraction/sqlite.rs new file mode 100644 index 00000000..35078f94 --- /dev/null +++ b/src/database/abstraction/sqlite.rs @@ -0,0 +1,273 @@ +use std::{future::Future, pin::Pin, sync::Arc, thread}; + +use crate::{database::Config, Result}; + +use super::{DatabaseEngine, Tree}; + +use std::{collections::BTreeMap, sync::RwLock}; + +use crossbeam::channel::{bounded, Sender as ChannelSender}; +use parking_lot::{Mutex, MutexGuard}; +use rusqlite::{params, Connection, OptionalExtension}; + +use tokio::sync::oneshot::Sender; + +type SqliteHandle = Arc>; + +// const SQL_CREATE_TABLE: &str = +// "CREATE TABLE IF NOT EXISTS {} {{ \"key\" BLOB PRIMARY KEY, \"value\" BLOB NOT NULL }}"; +// const SQL_SELECT: &str = "SELECT value FROM {} WHERE key = ?"; +// const SQL_INSERT: &str = "INSERT OR REPLACE INTO {} (key, value) VALUES (?, ?)"; +// const SQL_DELETE: &str = "DELETE FROM {} WHERE key = ?"; +// const SQL_SELECT_ITER: &str = "SELECT key, value FROM {}"; +// const SQL_SELECT_PREFIX: &str = "SELECT key, value FROM {} WHERE key LIKE ?||'%' ORDER BY key ASC"; +// const SQL_SELECT_ITER_FROM_FORWARDS: &str = "SELECT key, value FROM {} WHERE key >= ? ORDER BY ASC"; +// const SQL_SELECT_ITER_FROM_BACKWARDS: &str = +// "SELECT key, value FROM {} WHERE key <= ? ORDER BY DESC"; + +pub struct SqliteEngine { + handle: SqliteHandle, +} + +impl DatabaseEngine for SqliteEngine { + fn open(config: &Config) -> Result> { + let conn = Connection::open(format!("{}/conduit.db", &config.database_path))?; + + conn.pragma_update(None, "journal_mode", &"WAL".to_owned())?; + + let handle = Arc::new(Mutex::new(conn)); + + Ok(Arc::new(SqliteEngine { handle })) + } + + fn open_tree(self: &Arc, name: &str) -> Result> { + self.handle.lock().execute(format!("CREATE TABLE IF NOT EXISTS {} ( \"key\" BLOB PRIMARY KEY, \"value\" BLOB NOT NULL )", name).as_str(), [])?; + + Ok(Arc::new(SqliteTable { + engine: Arc::clone(self), + name: name.to_owned(), + watchers: RwLock::new(BTreeMap::new()), + })) + } +} + +pub struct SqliteTable { + engine: Arc, + name: String, + watchers: RwLock, Vec>>>, +} + +type TupleOfBytes = (Vec, Vec); + +impl SqliteTable { + fn get_with_guard( + &self, + guard: &MutexGuard<'_, Connection>, + key: &[u8], + ) -> Result>> { + Ok(guard + .prepare(format!("SELECT value FROM {} WHERE key = ?", self.name).as_str())? + .query_row([key], |row| row.get(0)) + .optional()?) + } + + fn insert_with_guard( + &self, + guard: &MutexGuard<'_, Connection>, + key: &[u8], + value: &[u8], + ) -> Result<()> { + guard.execute( + format!( + "INSERT OR REPLACE INTO {} (key, value) VALUES (?, ?)", + self.name + ) + .as_str(), + [key, value], + )?; + Ok(()) + } + + fn _iter_from_thread( + &self, + mutex: Arc>, + f: F, + ) -> Box + Send> + where + F: (FnOnce(MutexGuard<'_, Connection>, ChannelSender)) + Send + 'static, + { + let (s, r) = bounded::(5); + + thread::spawn(move || { + let _ = f(mutex.lock(), s); + }); + + Box::new(r.into_iter()) + } +} + +macro_rules! iter_from_thread { + ($self:expr, $sql:expr, $param:expr) => { + $self._iter_from_thread($self.engine.handle.clone(), move |guard, s| { + let _ = guard + .prepare($sql) + .unwrap() + .query_map($param, |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) + .unwrap() + .map(|r| r.unwrap()) + .try_for_each(|bob| s.send(bob)); + }) + }; +} + +impl Tree for SqliteTable { + fn get(&self, key: &[u8]) -> Result>> { + self.get_with_guard(&self.engine.handle.lock(), key) + } + + fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { + self.insert_with_guard(&self.engine.handle.lock(), key, value)?; + + let watchers = self.watchers.read().unwrap(); + let mut triggered = Vec::new(); + + for length in 0..=key.len() { + if watchers.contains_key(&key[..length]) { + triggered.push(&key[..length]); + } + } + + drop(watchers); + + if !triggered.is_empty() { + let mut watchers = self.watchers.write().unwrap(); + for prefix in triggered { + if let Some(txs) = watchers.remove(prefix) { + for tx in txs { + let _ = tx.send(()); + } + } + } + }; + + Ok(()) + } + + fn remove(&self, key: &[u8]) -> Result<()> { + self.engine.handle.lock().execute( + format!("DELETE FROM {} WHERE key = ?", self.name).as_str(), + [key], + )?; + Ok(()) + } + + fn iter<'a>(&'a self) -> Box + Send + 'a> { + let name = self.name.clone(); + iter_from_thread!( + self, + format!("SELECT key, value FROM {}", name).as_str(), + params![] + ) + } + + fn iter_from<'a>( + &'a self, + from: &[u8], + backwards: bool, + ) -> Box + Send + 'a> { + let name = self.name.clone(); + let from = from.to_vec(); // TODO change interface? + if backwards { + iter_from_thread!( + self, + format!( // TODO change to <= on rebase + "SELECT key, value FROM {} WHERE key < ? ORDER BY key DESC", + name + ) + .as_str(), + [from] + ) + } else { + iter_from_thread!( + self, + format!( + "SELECT key, value FROM {} WHERE key >= ? ORDER BY key ASC", + name + ) + .as_str(), + [from] + ) + } + } + + fn increment(&self, key: &[u8]) -> Result> { + let guard = self.engine.handle.lock(); + + let old = self.get_with_guard(&guard, key)?; + + let new = + crate::utils::increment(old.as_deref()).expect("utils::increment always returns Some"); + + self.insert_with_guard(&guard, key, &new)?; + + Ok(new) + } + + // TODO: make this use take_while + + fn scan_prefix<'a>( + &'a self, + prefix: Vec, + ) -> Box + Send + 'a> { + // let name = self.name.clone(); + // iter_from_thread!( + // self, + // format!( + // "SELECT key, value FROM {} WHERE key BETWEEN ?1 AND ?1 || X'FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF' ORDER BY key ASC", + // name + // ) + // .as_str(), + // [prefix] + // ) + Box::new(self.iter_from(&prefix, false).take_while(move |(key, _)| key.starts_with(&prefix))) + } + + fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin + Send + 'a>> { + let (tx, rx) = tokio::sync::oneshot::channel(); + + self.watchers + .write() + .unwrap() + .entry(prefix.to_vec()) + .or_default() + .push(tx); + + Box::pin(async move { + // Tx is never destroyed + rx.await.unwrap(); + }) + } + + fn clear(&self) -> Result<()> { + self.engine.handle.lock().execute( + format!("DELETE FROM {}", self.name).as_str(), + [], + )?; + Ok(()) + } +} + +// TODO +// struct Pool { +// writer: Mutex, +// readers: [Mutex; NUM_READERS], +// } + +// // then, to pick a reader: +// for r in &pool.readers { +// if let Ok(reader) = r.try_lock() { +// // use reader +// } +// } +// // none unlocked, pick the next reader +// pool.readers[pool.counter.fetch_add(1, Relaxed) % NUM_READERS].lock() diff --git a/src/database/account_data.rs b/src/database/account_data.rs index 2ba7bc3d..b1d5b6b5 100644 --- a/src/database/account_data.rs +++ b/src/database/account_data.rs @@ -127,7 +127,7 @@ impl AccountData { room_id: Option<&RoomId>, user_id: &UserId, kind: &EventType, - ) -> Result, Box<[u8]>)>> { + ) -> Result, Vec)>> { let mut prefix = room_id .map(|r| r.to_string()) .unwrap_or_default() diff --git a/src/database/appservice.rs b/src/database/appservice.rs index 4bf3a218..f39520c7 100644 --- a/src/database/appservice.rs +++ b/src/database/appservice.rs @@ -49,7 +49,7 @@ impl Appservice { ) } - pub fn iter_ids(&self) -> Result> + Send + Sync + '_> { + pub fn iter_ids(&self) -> Result> + Send + '_> { Ok(self.id_appserviceregistrations.iter().map(|(id, _)| { utils::string_from_bytes(&id) .map_err(|_| Error::bad_database("Invalid id bytes in id_appserviceregistrations.")) @@ -58,7 +58,7 @@ impl Appservice { pub fn iter_all( &self, - ) -> Result> + '_ + Send + Sync> { + ) -> Result> + '_ + Send> { Ok(self.iter_ids()?.filter_map(|id| id.ok()).map(move |id| { Ok(( id.clone(), diff --git a/src/database/pusher.rs b/src/database/pusher.rs index a27bf2ce..3210cb18 100644 --- a/src/database/pusher.rs +++ b/src/database/pusher.rs @@ -73,7 +73,7 @@ impl PushData { pub fn get_pusher_senderkeys<'a>( &'a self, sender: &UserId, - ) -> impl Iterator> + 'a { + ) -> impl Iterator> + 'a { let mut prefix = sender.as_bytes().to_vec(); prefix.push(0xff); diff --git a/src/database/rooms.rs b/src/database/rooms.rs index e23b8046..eb1a9244 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -1078,13 +1078,13 @@ impl Rooms { .scan_prefix(old_shortstatehash.clone()) // Chop the old_shortstatehash out leaving behind the short state key .map(|(k, v)| (k[old_shortstatehash.len()..].to_vec(), v)) - .collect::, Box<[u8]>>>() + .collect::, Vec>>() } else { HashMap::new() }; if let Some(state_key) = &new_pdu.state_key { - let mut new_state: HashMap, Box<[u8]>> = old_state; + let mut new_state: HashMap, Vec> = old_state; let mut new_state_key = new_pdu.kind.as_ref().as_bytes().to_vec(); new_state_key.push(0xff); @@ -1209,13 +1209,13 @@ impl Rooms { redacts, } = pdu_builder; // TODO: Make sure this isn't called twice in parallel - let prev_events = self + let prev_events = dbg!(self .get_pdu_leaves(&room_id)? .into_iter() .take(20) - .collect::>(); + .collect::>()); - let create_event = self.room_state_get(&room_id, &EventType::RoomCreate, "")?; + let create_event = dbg!(self.room_state_get(&room_id, &EventType::RoomCreate, ""))?; let create_event_content = create_event .as_ref() @@ -1450,7 +1450,7 @@ impl Rooms { &'a self, user_id: &UserId, room_id: &RoomId, - ) -> impl Iterator, PduEvent)>> + 'a { + ) -> impl Iterator, PduEvent)>> + 'a { self.pdus_since(user_id, room_id, 0) } @@ -1462,7 +1462,7 @@ impl Rooms { user_id: &UserId, room_id: &RoomId, since: u64, - ) -> impl Iterator, PduEvent)>> + 'a { + ) -> impl Iterator, PduEvent)>> + 'a { let mut prefix = room_id.as_bytes().to_vec(); prefix.push(0xff); @@ -1491,7 +1491,7 @@ impl Rooms { user_id: &UserId, room_id: &RoomId, until: u64, - ) -> impl Iterator, PduEvent)>> + 'a { + ) -> impl Iterator, PduEvent)>> + 'a { // Create the first part of the full pdu id let mut prefix = room_id.as_bytes().to_vec(); prefix.push(0xff); @@ -1523,7 +1523,7 @@ impl Rooms { user_id: &UserId, room_id: &RoomId, from: u64, - ) -> impl Iterator, PduEvent)>> + 'a { + ) -> impl Iterator, PduEvent)>> + 'a { // Create the first part of the full pdu id let mut prefix = room_id.as_bytes().to_vec(); prefix.push(0xff); diff --git a/src/database/sending.rs b/src/database/sending.rs index ecf07618..c2e13976 100644 --- a/src/database/sending.rs +++ b/src/database/sending.rs @@ -357,7 +357,7 @@ impl Sending { } #[tracing::instrument(skip(self))] - pub fn send_push_pdu(&self, pdu_id: &[u8], senderkey: Box<[u8]>) -> Result<()> { + pub fn send_push_pdu(&self, pdu_id: &[u8], senderkey: Vec) -> Result<()> { let mut key = b"$".to_vec(); key.extend_from_slice(&senderkey); key.push(0xff); diff --git a/src/error.rs b/src/error.rs index 501c77d1..3091b9d3 100644 --- a/src/error.rs +++ b/src/error.rs @@ -35,6 +35,12 @@ pub enum Error { #[from] source: rocksdb::Error, }, + #[cfg(feature = "sqlite")] + #[error("There was a problem with the connection to the sqlite database: {source}")] + SqliteError { + #[from] + source: rusqlite::Error, + }, #[error("Could not generate an image.")] ImageError { #[from]