diff --git a/src/database/abstraction/heed.rs b/src/database/abstraction/heed.rs index 9cca0975..8f577020 100644 --- a/src/database/abstraction/heed.rs +++ b/src/database/abstraction/heed.rs @@ -9,7 +9,7 @@ use std::{ sync::{Arc, Mutex}, }; -use super::{DatabaseEngine, Tree}; +use super::{KeyValueDatabaseEngine, KvTree}; type TupleOfBytes = (Vec, Vec); @@ -30,8 +30,8 @@ fn convert_error(error: heed::Error) -> Error { } } -impl DatabaseEngine for Engine { - fn open(config: &Config) -> Result> { +impl KeyValueDatabaseEngine for Arc { + fn open(config: &Config) -> Result { let mut env_builder = heed::EnvOpenOptions::new(); env_builder.map_size(1024 * 1024 * 1024 * 1024); // 1 Terabyte env_builder.max_readers(126); @@ -49,10 +49,10 @@ impl DatabaseEngine for Engine { })) } - fn open_tree(self: &Arc, name: &'static str) -> Result> { + fn open_tree(&self, name: &'static str) -> Result> { // Creates the db if it doesn't exist already Ok(Arc::new(EngineTree { - engine: Arc::clone(self), + engine: self.clone(), tree: Arc::new( self.env .create_database(Some(name)) @@ -62,7 +62,7 @@ impl DatabaseEngine for Engine { })) } - fn flush(self: &Arc) -> Result<()> { + fn flush(&self) -> Result<()> { self.env.force_sync().map_err(convert_error)?; Ok(()) } @@ -78,17 +78,7 @@ impl EngineTree { let (s, r) = bounded::(100); let engine = Arc::clone(&self.engine); - let lock = self.engine.iter_pool.lock().await; - if lock.active_count() < lock.max_count() { - lock.execute(move || { - iter_from_thread_work(tree, &engine.env.read_txn().unwrap(), from, backwards, &s); - }); - } else { - std::thread::spawn(move || { - iter_from_thread_work(tree, &engine.env.read_txn().unwrap(), from, backwards, &s); - }); - } - + let lock = self.engine.iter_pool.lock(); Box::new(r.into_iter()) } } @@ -123,7 +113,7 @@ fn iter_from_thread_work( } } -impl Tree for EngineTree { +impl KvTree for EngineTree { fn get(&self, key: &[u8]) -> Result>> { let txn = self.engine.env.read_txn().map_err(convert_error)?; Ok(self @@ -143,6 +133,36 @@ impl Tree for EngineTree { Ok(()) } + fn insert_batch<'a>(&self, iter: &mut dyn Iterator, Vec)>) -> Result<()> { + let mut txn = self.engine.env.write_txn().map_err(convert_error)?; + for (key, value) in iter { + self.tree + .put(&mut txn, &key.as_slice(), &value.as_slice()) + .map_err(convert_error)?; + self.watchers.wake(&key); + } + txn.commit().map_err(convert_error)?; + Ok(()) + } + + fn increment_batch<'a>(&self, iter: &mut dyn Iterator>) -> Result<()> { + let mut txn = self.engine.env.write_txn().map_err(convert_error)?; + for key in iter { + let old = self + .tree + .get(&txn, &key.as_slice()) + .map_err(convert_error)?; + let new = crate::utils::increment(old.as_deref()) + .expect("utils::increment always returns Some"); + + self.tree + .put(&mut txn, &key.as_slice(), &&*new) + .map_err(convert_error)?; + } + txn.commit().map_err(convert_error)?; + Ok(()) + } + fn remove(&self, key: &[u8]) -> Result<()> { let mut txn = self.engine.env.write_txn().map_err(convert_error)?; self.tree.delete(&mut txn, &key).map_err(convert_error)?; @@ -150,7 +170,7 @@ impl Tree for EngineTree { Ok(()) } - fn iter<'a>(&'a self) -> Box, Vec)> + Send + 'a> { + fn iter<'a>(&'a self) -> Box, Vec)> + 'a> { self.iter_from(&[], false) } @@ -158,7 +178,7 @@ impl Tree for EngineTree { &self, from: &[u8], backwards: bool, - ) -> Box, Vec)> + Send> { + ) -> Box, Vec)>> { self.iter_from_thread(Arc::clone(&self.tree), from.to_vec(), backwards) } @@ -181,7 +201,7 @@ impl Tree for EngineTree { fn scan_prefix<'a>( &'a self, prefix: Vec, - ) -> Box, Vec)> + Send + 'a> { + ) -> Box, Vec)> + 'a> { Box::new( self.iter_from(&prefix, false) .take_while(move |(key, _)| key.starts_with(&prefix)), diff --git a/src/utils/error.rs b/src/utils/error.rs index 1d811106..4be3fbab 100644 --- a/src/utils/error.rs +++ b/src/utils/error.rs @@ -164,7 +164,7 @@ impl Error { #[cfg(feature = "persy")] Self::PersyError { .. } => db_error, #[cfg(feature = "heed")] - Self::HeedError => db_error, + Self::HeedError { .. } => db_error, #[cfg(feature = "rocksdb")] Self::RocksDbError { .. } => db_error, Self::IoError { .. } => db_error,