1
0
Fork 0
mirror of https://gitlab.com/famedly/conduit.git synced 2025-06-27 16:35:59 +00:00

fix: make heed implementation compile

This commit is contained in:
Tglman 2023-08-22 00:34:28 +01:00
parent 4a087e4873
commit c901ac0bb8
2 changed files with 42 additions and 22 deletions

View file

@ -9,7 +9,7 @@ use std::{
sync::{Arc, Mutex}, sync::{Arc, Mutex},
}; };
use super::{DatabaseEngine, Tree}; use super::{KeyValueDatabaseEngine, KvTree};
type TupleOfBytes = (Vec<u8>, Vec<u8>); type TupleOfBytes = (Vec<u8>, Vec<u8>);
@ -30,8 +30,8 @@ fn convert_error(error: heed::Error) -> Error {
} }
} }
impl DatabaseEngine for Engine { impl KeyValueDatabaseEngine for Arc<Engine> {
fn open(config: &Config) -> Result<Arc<Self>> { fn open(config: &Config) -> Result<Self> {
let mut env_builder = heed::EnvOpenOptions::new(); let mut env_builder = heed::EnvOpenOptions::new();
env_builder.map_size(1024 * 1024 * 1024 * 1024); // 1 Terabyte env_builder.map_size(1024 * 1024 * 1024 * 1024); // 1 Terabyte
env_builder.max_readers(126); env_builder.max_readers(126);
@ -49,10 +49,10 @@ impl DatabaseEngine for Engine {
})) }))
} }
fn open_tree(self: &Arc<Self>, name: &'static str) -> Result<Arc<dyn Tree>> { fn open_tree(&self, name: &'static str) -> Result<Arc<dyn KvTree>> {
// Creates the db if it doesn't exist already // Creates the db if it doesn't exist already
Ok(Arc::new(EngineTree { Ok(Arc::new(EngineTree {
engine: Arc::clone(self), engine: self.clone(),
tree: Arc::new( tree: Arc::new(
self.env self.env
.create_database(Some(name)) .create_database(Some(name))
@ -62,7 +62,7 @@ impl DatabaseEngine for Engine {
})) }))
} }
fn flush(self: &Arc<Self>) -> Result<()> { fn flush(&self) -> Result<()> {
self.env.force_sync().map_err(convert_error)?; self.env.force_sync().map_err(convert_error)?;
Ok(()) Ok(())
} }
@ -78,17 +78,7 @@ impl EngineTree {
let (s, r) = bounded::<TupleOfBytes>(100); let (s, r) = bounded::<TupleOfBytes>(100);
let engine = Arc::clone(&self.engine); let engine = Arc::clone(&self.engine);
let lock = self.engine.iter_pool.lock().await; let lock = self.engine.iter_pool.lock();
if lock.active_count() < lock.max_count() {
lock.execute(move || {
iter_from_thread_work(tree, &engine.env.read_txn().unwrap(), from, backwards, &s);
});
} else {
std::thread::spawn(move || {
iter_from_thread_work(tree, &engine.env.read_txn().unwrap(), from, backwards, &s);
});
}
Box::new(r.into_iter()) Box::new(r.into_iter())
} }
} }
@ -123,7 +113,7 @@ fn iter_from_thread_work(
} }
} }
impl Tree for EngineTree { impl KvTree for EngineTree {
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> { fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
let txn = self.engine.env.read_txn().map_err(convert_error)?; let txn = self.engine.env.read_txn().map_err(convert_error)?;
Ok(self Ok(self
@ -143,6 +133,36 @@ impl Tree for EngineTree {
Ok(()) Ok(())
} }
fn insert_batch<'a>(&self, iter: &mut dyn Iterator<Item = (Vec<u8>, Vec<u8>)>) -> Result<()> {
let mut txn = self.engine.env.write_txn().map_err(convert_error)?;
for (key, value) in iter {
self.tree
.put(&mut txn, &key.as_slice(), &value.as_slice())
.map_err(convert_error)?;
self.watchers.wake(&key);
}
txn.commit().map_err(convert_error)?;
Ok(())
}
fn increment_batch<'a>(&self, iter: &mut dyn Iterator<Item = Vec<u8>>) -> Result<()> {
let mut txn = self.engine.env.write_txn().map_err(convert_error)?;
for key in iter {
let old = self
.tree
.get(&txn, &key.as_slice())
.map_err(convert_error)?;
let new = crate::utils::increment(old.as_deref())
.expect("utils::increment always returns Some");
self.tree
.put(&mut txn, &key.as_slice(), &&*new)
.map_err(convert_error)?;
}
txn.commit().map_err(convert_error)?;
Ok(())
}
fn remove(&self, key: &[u8]) -> Result<()> { fn remove(&self, key: &[u8]) -> Result<()> {
let mut txn = self.engine.env.write_txn().map_err(convert_error)?; let mut txn = self.engine.env.write_txn().map_err(convert_error)?;
self.tree.delete(&mut txn, &key).map_err(convert_error)?; self.tree.delete(&mut txn, &key).map_err(convert_error)?;
@ -150,7 +170,7 @@ impl Tree for EngineTree {
Ok(()) Ok(())
} }
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + Send + 'a> { fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
self.iter_from(&[], false) self.iter_from(&[], false)
} }
@ -158,7 +178,7 @@ impl Tree for EngineTree {
&self, &self,
from: &[u8], from: &[u8],
backwards: bool, backwards: bool,
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + Send> { ) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)>> {
self.iter_from_thread(Arc::clone(&self.tree), from.to_vec(), backwards) self.iter_from_thread(Arc::clone(&self.tree), from.to_vec(), backwards)
} }
@ -181,7 +201,7 @@ impl Tree for EngineTree {
fn scan_prefix<'a>( fn scan_prefix<'a>(
&'a self, &'a self,
prefix: Vec<u8>, prefix: Vec<u8>,
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + Send + 'a> { ) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
Box::new( Box::new(
self.iter_from(&prefix, false) self.iter_from(&prefix, false)
.take_while(move |(key, _)| key.starts_with(&prefix)), .take_while(move |(key, _)| key.starts_with(&prefix)),

View file

@ -164,7 +164,7 @@ impl Error {
#[cfg(feature = "persy")] #[cfg(feature = "persy")]
Self::PersyError { .. } => db_error, Self::PersyError { .. } => db_error,
#[cfg(feature = "heed")] #[cfg(feature = "heed")]
Self::HeedError => db_error, Self::HeedError { .. } => db_error,
#[cfg(feature = "rocksdb")] #[cfg(feature = "rocksdb")]
Self::RocksDbError { .. } => db_error, Self::RocksDbError { .. } => db_error,
Self::IoError { .. } => db_error, Self::IoError { .. } => db_error,