diff --git a/Cargo.lock b/Cargo.lock index 4b020ed8..033a2637 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -514,7 +514,6 @@ dependencies = [ "opentelemetry-otlp", "opentelemetry_sdk", "parking_lot", - "persy", "rand", "regex", "reqwest", @@ -587,21 +586,6 @@ dependencies = [ "libc", ] -[[package]] -name = "crc" -version = "3.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69e6e4d7b33a94f0991c26729976b10ebde1d34c3ee82408fb536164fa10d636" -dependencies = [ - "crc-catalog", -] - -[[package]] -name = "crc-catalog" -version = "2.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" - [[package]] name = "crc32fast" version = "1.4.2" @@ -842,16 +826,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "fs2" -version = "0.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213" -dependencies = [ - "libc", - "winapi", -] - [[package]] name = "futures-channel" version = "0.3.30" @@ -1904,22 +1878,6 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" -[[package]] -name = "persy" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ef4b7250ab3a90ded0e284b2633469c23ef01ea868fe7cbb64e2f0a7d6f6d02" -dependencies = [ - "crc", - "data-encoding", - "fs2", - "linked-hash-map", - "rand", - "thiserror", - "unsigned-varint", - "zigzag", -] - [[package]] name = "pin-project" version = "1.1.5" @@ -3395,12 +3353,6 @@ version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861" -[[package]] -name = "unsigned-varint" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb066959b24b5196ae73cb057f45598450d2c5f71460e98c49b738086eff9c06" - [[package]] name = "untrusted" version = "0.9.0" @@ -3802,15 +3754,6 @@ version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" -[[package]] -name = "zigzag" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70b40401a28d86ce16a330b863b86fd7dbee4d7c940587ab09ab8c019f9e3fdf" -dependencies = [ - "num-traits", -] - [[package]] name = "zstd-sys" version = "2.0.10+zstd.1.5.6" diff --git a/Cargo.toml b/Cargo.toml index 0cdde4ab..8bd84200 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,10 +49,6 @@ tower-service = "0.3" # Async runtime and utilities tokio = { version = "1.28.1", features = ["fs", "macros", "signal", "sync"] } -# Used for storing data permanently -#sled = { version = "0.34.7", features = ["compression", "no_metrics"], optional = true } -#sled = { git = "https://github.com/spacejam/sled.git", rev = "e4640e0773595229f398438886f19bca6f7326a2", features = ["compression"] } -persy = { version = "1.4.4", optional = true, features = ["background_ops"] } # Used for the http request / response body type for Ruma endpoints used with reqwest bytes = "1.4.0" @@ -116,7 +112,6 @@ rusqlite = { version = "0.31", optional = true, features = ["bundled"] } # crossbeam = { version = "0.8.2", optional = true } num_cpus = "1.15.0" threadpool = "1.8.1" -# heed = { git = "https://github.com/timokoesters/heed.git", rev = "f6f825da7fb2c758867e05ad973ef800a6fe1d5d", optional = true } # Used for ruma wrapper serde_html_form = "0.2.0" @@ -176,13 +171,10 @@ version = "0.25" nix = { version = "0.28", features = ["resource"] } [features] -default = ["backend_rocksdb", "backend_sqlite", "conduit_bin", "systemd"] -#backend_sled = ["sled"] -backend_persy = ["parking_lot", "persy"] -backend_sqlite = ["sqlite"] -#backend_heed = ["heed", "crossbeam"] backend_rocksdb = ["rocksdb"] +backend_sqlite = ["sqlite"] conduit_bin = ["axum"] +default = ["backend_rocksdb", "backend_sqlite", "conduit_bin", "systemd"] jemalloc = ["tikv-jemallocator"] sqlite = ["parking_lot", "rusqlite", "tokio/signal"] systemd = ["sd-notify"] diff --git a/src/database/abstraction.rs b/src/database/abstraction.rs index 93660f9f..a1913b23 100644 --- a/src/database/abstraction.rs +++ b/src/database/abstraction.rs @@ -3,27 +3,13 @@ use crate::Result; use std::{future::Future, pin::Pin, sync::Arc}; -#[cfg(feature = "sled")] -pub mod sled; - #[cfg(feature = "sqlite")] pub mod sqlite; -#[cfg(feature = "heed")] -pub mod heed; - #[cfg(feature = "rocksdb")] pub mod rocksdb; -#[cfg(feature = "persy")] -pub mod persy; - -#[cfg(any( - feature = "sqlite", - feature = "rocksdb", - feature = "heed", - feature = "persy" -))] +#[cfg(any(feature = "sqlite", feature = "rocksdb"))] pub mod watchers; pub trait KeyValueDatabaseEngine: Send + Sync { diff --git a/src/database/abstraction/heed.rs b/src/database/abstraction/heed.rs deleted file mode 100644 index 9cca0975..00000000 --- a/src/database/abstraction/heed.rs +++ /dev/null @@ -1,194 +0,0 @@ -use super::{super::Config, watchers::Watchers}; -use crossbeam::channel::{bounded, Sender as ChannelSender}; -use threadpool::ThreadPool; - -use crate::{Error, Result}; -use std::{ - future::Future, - pin::Pin, - sync::{Arc, Mutex}, -}; - -use super::{DatabaseEngine, Tree}; - -type TupleOfBytes = (Vec, Vec); - -pub struct Engine { - env: heed::Env, - iter_pool: Mutex, -} - -pub struct EngineTree { - engine: Arc, - tree: Arc, - watchers: Watchers, -} - -fn convert_error(error: heed::Error) -> Error { - Error::HeedError { - error: error.to_string(), - } -} - -impl DatabaseEngine for Engine { - fn open(config: &Config) -> Result> { - let mut env_builder = heed::EnvOpenOptions::new(); - env_builder.map_size(1024 * 1024 * 1024 * 1024); // 1 Terabyte - env_builder.max_readers(126); - env_builder.max_dbs(128); - unsafe { - env_builder.flag(heed::flags::Flags::MdbWriteMap); - env_builder.flag(heed::flags::Flags::MdbMapAsync); - } - - Ok(Arc::new(Engine { - env: env_builder - .open(&config.database_path) - .map_err(convert_error)?, - iter_pool: Mutex::new(ThreadPool::new(10)), - })) - } - - fn open_tree(self: &Arc, name: &'static str) -> Result> { - // Creates the db if it doesn't exist already - Ok(Arc::new(EngineTree { - engine: Arc::clone(self), - tree: Arc::new( - self.env - .create_database(Some(name)) - .map_err(convert_error)?, - ), - watchers: Default::default(), - })) - } - - fn flush(self: &Arc) -> Result<()> { - self.env.force_sync().map_err(convert_error)?; - Ok(()) - } -} - -impl EngineTree { - fn iter_from_thread( - &self, - tree: Arc, - from: Vec, - backwards: bool, - ) -> Box + Send + Sync> { - let (s, r) = bounded::(100); - let engine = Arc::clone(&self.engine); - - let lock = self.engine.iter_pool.lock().await; - if lock.active_count() < lock.max_count() { - lock.execute(move || { - iter_from_thread_work(tree, &engine.env.read_txn().unwrap(), from, backwards, &s); - }); - } else { - std::thread::spawn(move || { - iter_from_thread_work(tree, &engine.env.read_txn().unwrap(), from, backwards, &s); - }); - } - - Box::new(r.into_iter()) - } -} - -fn iter_from_thread_work( - tree: Arc, - txn: &heed::RoTxn<'_>, - from: Vec, - backwards: bool, - s: &ChannelSender<(Vec, Vec)>, -) { - if backwards { - for (k, v) in tree.rev_range(txn, ..=&*from).unwrap().map(|r| r.unwrap()) { - if s.send((k.to_vec(), v.to_vec())).is_err() { - return; - } - } - } else { - if from.is_empty() { - for (k, v) in tree.iter(txn).unwrap().map(|r| r.unwrap()) { - if s.send((k.to_vec(), v.to_vec())).is_err() { - return; - } - } - } else { - for (k, v) in tree.range(txn, &*from..).unwrap().map(|r| r.unwrap()) { - if s.send((k.to_vec(), v.to_vec())).is_err() { - return; - } - } - } - } -} - -impl Tree for EngineTree { - fn get(&self, key: &[u8]) -> Result>> { - let txn = self.engine.env.read_txn().map_err(convert_error)?; - Ok(self - .tree - .get(&txn, &key) - .map_err(convert_error)? - .map(|s| s.to_vec())) - } - - fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { - let mut txn = self.engine.env.write_txn().map_err(convert_error)?; - self.tree - .put(&mut txn, &key, &value) - .map_err(convert_error)?; - txn.commit().map_err(convert_error)?; - self.watchers.wake(key); - Ok(()) - } - - fn remove(&self, key: &[u8]) -> Result<()> { - let mut txn = self.engine.env.write_txn().map_err(convert_error)?; - self.tree.delete(&mut txn, &key).map_err(convert_error)?; - txn.commit().map_err(convert_error)?; - Ok(()) - } - - fn iter<'a>(&'a self) -> Box, Vec)> + Send + 'a> { - self.iter_from(&[], false) - } - - fn iter_from( - &self, - from: &[u8], - backwards: bool, - ) -> Box, Vec)> + Send> { - self.iter_from_thread(Arc::clone(&self.tree), from.to_vec(), backwards) - } - - fn increment(&self, key: &[u8]) -> Result> { - let mut txn = self.engine.env.write_txn().map_err(convert_error)?; - - let old = self.tree.get(&txn, &key).map_err(convert_error)?; - let new = - crate::utils::increment(old.as_deref()).expect("utils::increment always returns Some"); - - self.tree - .put(&mut txn, &key, &&*new) - .map_err(convert_error)?; - - txn.commit().map_err(convert_error)?; - - Ok(new) - } - - fn scan_prefix<'a>( - &'a self, - prefix: Vec, - ) -> Box, Vec)> + Send + 'a> { - Box::new( - self.iter_from(&prefix, false) - .take_while(move |(key, _)| key.starts_with(&prefix)), - ) - } - - fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin + Send + 'a>> { - self.watchers.watch(prefix) - } -} diff --git a/src/database/abstraction/persy.rs b/src/database/abstraction/persy.rs deleted file mode 100644 index da7d4cf0..00000000 --- a/src/database/abstraction/persy.rs +++ /dev/null @@ -1,197 +0,0 @@ -use crate::{ - database::{ - abstraction::{watchers::Watchers, KeyValueDatabaseEngine, KvTree}, - Config, - }, - Result, -}; -use persy::{ByteVec, OpenOptions, Persy, Transaction, TransactionConfig, ValueMode}; - -use std::{future::Future, pin::Pin, sync::Arc}; - -use tracing::warn; - -pub struct Engine { - persy: Persy, -} - -impl KeyValueDatabaseEngine for Arc { - fn open(config: &Config) -> Result { - let mut cfg = persy::Config::new(); - cfg.change_cache_size((config.db_cache_capacity_mb * 1024.0 * 1024.0) as u64); - - let persy = OpenOptions::new() - .create(true) - .config(cfg) - .open(&format!("{}/db.persy", config.database_path))?; - Ok(Arc::new(Engine { persy })) - } - - fn open_tree(&self, name: &'static str) -> Result> { - // Create if it doesn't exist - if !self.persy.exists_index(name)? { - let mut tx = self.persy.begin()?; - tx.create_index::(name, ValueMode::Replace)?; - tx.prepare()?.commit()?; - } - - Ok(Arc::new(PersyTree { - persy: self.persy.clone(), - name: name.to_owned(), - watchers: Watchers::default(), - })) - } - - fn flush(&self) -> Result<()> { - Ok(()) - } -} - -pub struct PersyTree { - persy: Persy, - name: String, - watchers: Watchers, -} - -impl PersyTree { - fn begin(&self) -> Result { - Ok(self - .persy - .begin_with(TransactionConfig::new().set_background_sync(true))?) - } -} - -impl KvTree for PersyTree { - fn get(&self, key: &[u8]) -> Result>> { - let result = self - .persy - .get::(&self.name, &ByteVec::from(key))? - .next() - .map(|v| (*v).to_owned()); - Ok(result) - } - - fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { - self.insert_batch(&mut Some((key.to_owned(), value.to_owned())).into_iter())?; - self.watchers.wake(key); - Ok(()) - } - - fn insert_batch<'a>(&self, iter: &mut dyn Iterator, Vec)>) -> Result<()> { - let mut tx = self.begin()?; - for (key, value) in iter { - tx.put::( - &self.name, - ByteVec::from(key.clone()), - ByteVec::from(value), - )?; - } - tx.prepare()?.commit()?; - Ok(()) - } - - fn increment_batch<'a>(&self, iter: &mut dyn Iterator>) -> Result<()> { - let mut tx = self.begin()?; - for key in iter { - let old = tx - .get::(&self.name, &ByteVec::from(key.clone()))? - .next() - .map(|v| (*v).to_owned()); - let new = crate::utils::increment(old.as_deref()).unwrap(); - tx.put::(&self.name, ByteVec::from(key), ByteVec::from(new))?; - } - tx.prepare()?.commit()?; - Ok(()) - } - - fn remove(&self, key: &[u8]) -> Result<()> { - let mut tx = self.begin()?; - tx.remove::(&self.name, ByteVec::from(key), None)?; - tx.prepare()?.commit()?; - Ok(()) - } - - fn iter<'a>(&'a self) -> Box, Vec)> + 'a> { - let iter = self.persy.range::(&self.name, ..); - match iter { - Ok(iter) => Box::new(iter.filter_map(|(k, v)| { - v.into_iter() - .map(|val| ((*k).to_owned(), (*val).to_owned())) - .next() - })), - Err(e) => { - warn!("error iterating {:?}", e); - Box::new(std::iter::empty()) - } - } - } - - fn iter_from<'a>( - &'a self, - from: &[u8], - backwards: bool, - ) -> Box, Vec)> + 'a> { - let range = if backwards { - self.persy - .range::(&self.name, ..=ByteVec::from(from)) - } else { - self.persy - .range::(&self.name, ByteVec::from(from)..) - }; - match range { - Ok(iter) => { - let map = iter.filter_map(|(k, v)| { - v.into_iter() - .map(|val| ((*k).to_owned(), (*val).to_owned())) - .next() - }); - if backwards { - Box::new(map.rev()) - } else { - Box::new(map) - } - } - Err(e) => { - warn!("error iterating with prefix {:?}", e); - Box::new(std::iter::empty()) - } - } - } - - fn increment(&self, key: &[u8]) -> Result> { - self.increment_batch(&mut Some(key.to_owned()).into_iter())?; - Ok(self.get(key)?.unwrap()) - } - - fn scan_prefix<'a>( - &'a self, - prefix: Vec, - ) -> Box, Vec)> + 'a> { - let range_prefix = ByteVec::from(prefix.clone()); - let range = self - .persy - .range::(&self.name, range_prefix..); - - match range { - Ok(iter) => { - let owned_prefix = prefix.clone(); - Box::new( - iter.take_while(move |(k, _)| (*k).starts_with(&owned_prefix)) - .filter_map(|(k, v)| { - v.into_iter() - .map(|val| ((*k).to_owned(), (*val).to_owned())) - .next() - }), - ) - } - Err(e) => { - warn!("error scanning prefix {:?}", e); - Box::new(std::iter::empty()) - } - } - } - - fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin + Send + 'a>> { - self.watchers.watch(prefix) - } -} diff --git a/src/database/abstraction/sled.rs b/src/database/abstraction/sled.rs deleted file mode 100644 index 87defc57..00000000 --- a/src/database/abstraction/sled.rs +++ /dev/null @@ -1,127 +0,0 @@ -use super::super::Config; -use crate::{utils, Result}; -use std::{future::Future, pin::Pin, sync::Arc}; -use tracing::warn; - -use super::{DatabaseEngine, Tree}; - -pub struct Engine(sled::Db); - -pub struct SledEngineTree(sled::Tree); - -impl DatabaseEngine for Engine { - fn open(config: &Config) -> Result> { - Ok(Arc::new(Engine( - sled::Config::default() - .path(&config.database_path) - .cache_capacity((config.db_cache_capacity_mb * 1024.0 * 1024.0) as u64) - .use_compression(true) - .open()?, - ))) - } - - fn open_tree(self: &Arc, name: &'static str) -> Result> { - Ok(Arc::new(SledEngineTree(self.0.open_tree(name)?))) - } - - fn flush(self: &Arc) -> Result<()> { - Ok(()) // noop - } -} - -impl Tree for SledEngineTree { - fn get(&self, key: &[u8]) -> Result>> { - Ok(self.0.get(key)?.map(|v| v.to_vec())) - } - - fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { - self.0.insert(key, value)?; - Ok(()) - } - - fn insert_batch<'a>(&self, iter: &mut dyn Iterator, Vec)>) -> Result<()> { - for (key, value) in iter { - self.0.insert(key, value)?; - } - - Ok(()) - } - - fn remove(&self, key: &[u8]) -> Result<()> { - self.0.remove(key)?; - Ok(()) - } - - fn iter<'a>(&'a self) -> Box, Vec)> + 'a> { - Box::new( - self.0 - .iter() - .filter_map(|r| { - if let Err(e) = &r { - warn!("Error: {}", e); - } - r.ok() - }) - .map(|(k, v)| (k.to_vec().into(), v.to_vec().into())), - ) - } - - fn iter_from( - &self, - from: &[u8], - backwards: bool, - ) -> Box, Vec)>> { - let iter = if backwards { - self.0.range(..=from) - } else { - self.0.range(from..) - }; - - let iter = iter - .filter_map(|r| { - if let Err(e) = &r { - warn!("Error: {}", e); - } - r.ok() - }) - .map(|(k, v)| (k.to_vec().into(), v.to_vec().into())); - - if backwards { - Box::new(iter.rev()) - } else { - Box::new(iter) - } - } - - fn increment(&self, key: &[u8]) -> Result> { - Ok(self - .0 - .update_and_fetch(key, utils::increment) - .map(|o| o.expect("increment always sets a value").to_vec())?) - } - - fn scan_prefix<'a>( - &'a self, - prefix: Vec, - ) -> Box, Vec)> + 'a> { - let iter = self - .0 - .scan_prefix(prefix) - .filter_map(|r| { - if let Err(e) = &r { - warn!("Error: {}", e); - } - r.ok() - }) - .map(|(k, v)| (k.to_vec().into(), v.to_vec().into())); - - Box::new(iter) - } - - fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin + Send + 'a>> { - let prefix = prefix.to_vec(); - Box::pin(async move { - self.0.watch_prefix(prefix).await; - }) - } -} diff --git a/src/database/mod.rs b/src/database/mod.rs index 2317f7a8..b4794dbc 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -246,24 +246,10 @@ impl KeyValueDatabase { } let builder: Arc = match &*config.database_backend { - "sqlite" => { - #[cfg(not(feature = "sqlite"))] - return Err(Error::BadConfig("Database backend not found.")); - #[cfg(feature = "sqlite")] - Arc::new(Arc::::open(&config)?) - } - "rocksdb" => { - #[cfg(not(feature = "rocksdb"))] - return Err(Error::BadConfig("Database backend not found.")); - #[cfg(feature = "rocksdb")] - Arc::new(Arc::::open(&config)?) - } - "persy" => { - #[cfg(not(feature = "persy"))] - return Err(Error::BadConfig("Database backend not found.")); - #[cfg(feature = "persy")] - Arc::new(Arc::::open(&config)?) - } + #[cfg(feature = "sqlite")] + "sqlite" => Arc::new(Arc::::open(&config)?), + #[cfg(feature = "rocksdb")] + "rocksdb" => Arc::new(Arc::::open(&config)?), _ => { return Err(Error::BadConfig("Database backend not found.")); } diff --git a/src/utils/error.rs b/src/utils/error.rs index 1d811106..c9018924 100644 --- a/src/utils/error.rs +++ b/src/utils/error.rs @@ -11,33 +11,18 @@ use ruma::{ use thiserror::Error; use tracing::{error, info}; -#[cfg(feature = "persy")] -use persy::PersyError; - use crate::RumaResponse; pub type Result = std::result::Result; #[derive(Error, Debug)] pub enum Error { - #[cfg(feature = "sled")] - #[error("There was a problem with the connection to the sled database.")] - SledError { - #[from] - source: sled::Error, - }, #[cfg(feature = "sqlite")] #[error("There was a problem with the connection to the sqlite database: {source}")] SqliteError { #[from] source: rusqlite::Error, }, - #[cfg(feature = "persy")] - #[error("There was a problem with the connection to the persy database.")] - PersyError { source: PersyError }, - #[cfg(feature = "heed")] - #[error("There was a problem with the connection to the heed database: {error}")] - HeedError { error: String }, #[cfg(feature = "rocksdb")] #[error("There was a problem with the connection to the rocksdb database: {source}")] RocksDbError { @@ -157,14 +142,8 @@ impl Error { let db_error = String::from("Database or I/O error occurred."); match self { - #[cfg(feature = "sled")] - Self::SledError { .. } => db_error, #[cfg(feature = "sqlite")] Self::SqliteError { .. } => db_error, - #[cfg(feature = "persy")] - Self::PersyError { .. } => db_error, - #[cfg(feature = "heed")] - Self::HeedError => db_error, #[cfg(feature = "rocksdb")] Self::RocksDbError { .. } => db_error, Self::IoError { .. } => db_error, @@ -175,15 +154,6 @@ impl Error { } } -#[cfg(feature = "persy")] -impl> From> for Error { - fn from(err: persy::PE) -> Self { - Error::PersyError { - source: err.error().into(), - } - } -} - impl From for Error { fn from(i: Infallible) -> Self { match i {}