mirror of
https://gitlab.com/famedly/conduit.git
synced 2025-07-02 16:38:36 +00:00
improvement: allow rocksdb again
This commit is contained in:
parent
9b57c89df6
commit
9588f3a319
8 changed files with 364 additions and 11 deletions
113
Cargo.lock
generated
113
Cargo.lock
generated
|
@ -146,6 +146,25 @@ dependencies = [
|
||||||
"serde",
|
"serde",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "bindgen"
|
||||||
|
version = "0.59.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "2bd2a9a458e8f4304c52c43ebb0cfbd520289f8379a52e329a38afda99bf8eb8"
|
||||||
|
dependencies = [
|
||||||
|
"bitflags",
|
||||||
|
"cexpr",
|
||||||
|
"clang-sys",
|
||||||
|
"lazy_static",
|
||||||
|
"lazycell",
|
||||||
|
"peeking_take_while",
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"regex",
|
||||||
|
"rustc-hash",
|
||||||
|
"shlex",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "bitflags"
|
name = "bitflags"
|
||||||
version = "1.3.2"
|
version = "1.3.2"
|
||||||
|
@ -205,6 +224,15 @@ dependencies = [
|
||||||
"jobserver",
|
"jobserver",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "cexpr"
|
||||||
|
version = "0.6.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766"
|
||||||
|
dependencies = [
|
||||||
|
"nom",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "cfg-if"
|
name = "cfg-if"
|
||||||
version = "0.1.10"
|
version = "0.1.10"
|
||||||
|
@ -230,6 +258,17 @@ dependencies = [
|
||||||
"winapi",
|
"winapi",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "clang-sys"
|
||||||
|
version = "1.3.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "fa66045b9cb23c2e9c1520732030608b02ee07e5cfaa5a521ec15ded7fa24c90"
|
||||||
|
dependencies = [
|
||||||
|
"glob",
|
||||||
|
"libc",
|
||||||
|
"libloading",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "color_quant"
|
name = "color_quant"
|
||||||
version = "1.1.0"
|
version = "1.1.0"
|
||||||
|
@ -259,6 +298,7 @@ dependencies = [
|
||||||
"reqwest",
|
"reqwest",
|
||||||
"ring",
|
"ring",
|
||||||
"rocket",
|
"rocket",
|
||||||
|
"rocksdb",
|
||||||
"ruma",
|
"ruma",
|
||||||
"rusqlite",
|
"rusqlite",
|
||||||
"rust-argon2",
|
"rust-argon2",
|
||||||
|
@ -1167,12 +1207,40 @@ version = "1.4.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
|
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "lazycell"
|
||||||
|
version = "1.3.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "libc"
|
name = "libc"
|
||||||
version = "0.2.101"
|
version = "0.2.101"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "3cb00336871be5ed2c8ed44b60ae9959dc5b9f08539422ed43f09e34ecaeba21"
|
checksum = "3cb00336871be5ed2c8ed44b60ae9959dc5b9f08539422ed43f09e34ecaeba21"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "libloading"
|
||||||
|
version = "0.7.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "afe203d669ec979b7128619bae5a63b7b42e9203c1b29146079ee05e2f604b52"
|
||||||
|
dependencies = [
|
||||||
|
"cfg-if 1.0.0",
|
||||||
|
"winapi",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "librocksdb-sys"
|
||||||
|
version = "6.20.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "c309a9d2470844aceb9a4a098cf5286154d20596868b75a6b36357d2bb9ca25d"
|
||||||
|
dependencies = [
|
||||||
|
"bindgen",
|
||||||
|
"cc",
|
||||||
|
"glob",
|
||||||
|
"libc",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "libsqlite3-sys"
|
name = "libsqlite3-sys"
|
||||||
version = "0.22.2"
|
version = "0.22.2"
|
||||||
|
@ -1289,6 +1357,12 @@ version = "0.3.16"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"
|
checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "minimal-lexical"
|
||||||
|
version = "0.2.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "miniz_oxide"
|
name = "miniz_oxide"
|
||||||
version = "0.3.7"
|
version = "0.3.7"
|
||||||
|
@ -1340,6 +1414,17 @@ dependencies = [
|
||||||
"version_check",
|
"version_check",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "nom"
|
||||||
|
version = "7.1.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "1b1d11e1ef389c76fe5b81bcaf2ea32cf88b62bc494e19f493d0b30e7a930109"
|
||||||
|
dependencies = [
|
||||||
|
"memchr",
|
||||||
|
"minimal-lexical",
|
||||||
|
"version_check",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ntapi"
|
name = "ntapi"
|
||||||
version = "0.3.6"
|
version = "0.3.6"
|
||||||
|
@ -1539,6 +1624,12 @@ dependencies = [
|
||||||
"syn",
|
"syn",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "peeking_take_while"
|
||||||
|
version = "0.1.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "pem"
|
name = "pem"
|
||||||
version = "0.8.3"
|
version = "0.8.3"
|
||||||
|
@ -1981,6 +2072,16 @@ dependencies = [
|
||||||
"uncased",
|
"uncased",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "rocksdb"
|
||||||
|
version = "0.16.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "c749134fda8bfc90d0de643d59bfc841dcb3ac8a1062e12b6754bd60235c48b3"
|
||||||
|
dependencies = [
|
||||||
|
"libc",
|
||||||
|
"librocksdb-sys",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ruma"
|
name = "ruma"
|
||||||
version = "0.4.0"
|
version = "0.4.0"
|
||||||
|
@ -2266,6 +2367,12 @@ dependencies = [
|
||||||
"crossbeam-utils 0.8.5",
|
"crossbeam-utils 0.8.5",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "rustc-hash"
|
||||||
|
version = "1.1.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "rustc_version"
|
name = "rustc_version"
|
||||||
version = "0.2.3"
|
version = "0.2.3"
|
||||||
|
@ -2478,6 +2585,12 @@ dependencies = [
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "shlex"
|
||||||
|
version = "1.1.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "signal-hook-registry"
|
name = "signal-hook-registry"
|
||||||
version = "1.4.0"
|
version = "1.4.0"
|
||||||
|
|
|
@ -78,6 +78,7 @@ crossbeam = { version = "0.8.1", optional = true }
|
||||||
num_cpus = "1.13.0"
|
num_cpus = "1.13.0"
|
||||||
threadpool = "1.8.1"
|
threadpool = "1.8.1"
|
||||||
heed = { git = "https://github.com/timokoesters/heed.git", rev = "f6f825da7fb2c758867e05ad973ef800a6fe1d5d", optional = true }
|
heed = { git = "https://github.com/timokoesters/heed.git", rev = "f6f825da7fb2c758867e05ad973ef800a6fe1d5d", optional = true }
|
||||||
|
rocksdb = { version = "0.16.0", features = ["multi-threaded-cf"], optional = true }
|
||||||
thread_local = "1.1.3"
|
thread_local = "1.1.3"
|
||||||
# used for TURN server authentication
|
# used for TURN server authentication
|
||||||
hmac = "0.11.0"
|
hmac = "0.11.0"
|
||||||
|
@ -88,6 +89,7 @@ default = ["conduit_bin", "backend_sqlite"]
|
||||||
backend_sled = ["sled"]
|
backend_sled = ["sled"]
|
||||||
backend_sqlite = ["sqlite"]
|
backend_sqlite = ["sqlite"]
|
||||||
backend_heed = ["heed", "crossbeam"]
|
backend_heed = ["heed", "crossbeam"]
|
||||||
|
backend_rocksdb = ["rocksdb"]
|
||||||
sqlite = ["rusqlite", "parking_lot", "crossbeam", "tokio/signal"]
|
sqlite = ["rusqlite", "parking_lot", "crossbeam", "tokio/signal"]
|
||||||
conduit_bin = [] # TODO: add rocket to this when it is optional
|
conduit_bin = [] # TODO: add rocket to this when it is optional
|
||||||
|
|
||||||
|
|
|
@ -154,6 +154,9 @@ pub type Engine = abstraction::sqlite::Engine;
|
||||||
#[cfg(feature = "heed")]
|
#[cfg(feature = "heed")]
|
||||||
pub type Engine = abstraction::heed::Engine;
|
pub type Engine = abstraction::heed::Engine;
|
||||||
|
|
||||||
|
#[cfg(feature = "rocksdb")]
|
||||||
|
pub type Engine = abstraction::rocksdb::Engine;
|
||||||
|
|
||||||
pub struct Database {
|
pub struct Database {
|
||||||
_db: Arc<Engine>,
|
_db: Arc<Engine>,
|
||||||
pub globals: globals::Globals,
|
pub globals: globals::Globals,
|
||||||
|
@ -315,10 +318,10 @@ impl Database {
|
||||||
.expect("pdu cache capacity fits into usize"),
|
.expect("pdu cache capacity fits into usize"),
|
||||||
)),
|
)),
|
||||||
auth_chain_cache: Mutex::new(LruCache::new(1_000_000)),
|
auth_chain_cache: Mutex::new(LruCache::new(1_000_000)),
|
||||||
shorteventid_cache: Mutex::new(LruCache::new(1_000_000)),
|
shorteventid_cache: Mutex::new(LruCache::new(100_000_000)),
|
||||||
eventidshort_cache: Mutex::new(LruCache::new(1_000_000)),
|
eventidshort_cache: Mutex::new(LruCache::new(100_000_000)),
|
||||||
shortstatekey_cache: Mutex::new(LruCache::new(1_000_000)),
|
shortstatekey_cache: Mutex::new(LruCache::new(100_000_000)),
|
||||||
statekeyshort_cache: Mutex::new(LruCache::new(1_000_000)),
|
statekeyshort_cache: Mutex::new(LruCache::new(100_000_000)),
|
||||||
our_real_users_cache: RwLock::new(HashMap::new()),
|
our_real_users_cache: RwLock::new(HashMap::new()),
|
||||||
appservice_in_room_cache: RwLock::new(HashMap::new()),
|
appservice_in_room_cache: RwLock::new(HashMap::new()),
|
||||||
stateinfo_cache: Mutex::new(LruCache::new(1000)),
|
stateinfo_cache: Mutex::new(LruCache::new(1000)),
|
||||||
|
|
|
@ -12,6 +12,9 @@ pub mod sqlite;
|
||||||
#[cfg(feature = "heed")]
|
#[cfg(feature = "heed")]
|
||||||
pub mod heed;
|
pub mod heed;
|
||||||
|
|
||||||
|
#[cfg(feature = "rocksdb")]
|
||||||
|
pub mod rocksdb;
|
||||||
|
|
||||||
pub trait DatabaseEngine: Sized {
|
pub trait DatabaseEngine: Sized {
|
||||||
fn open(config: &Config) -> Result<Arc<Self>>;
|
fn open(config: &Config) -> Result<Arc<Self>>;
|
||||||
fn open_tree(self: &Arc<Self>, name: &'static str) -> Result<Arc<dyn Tree>>;
|
fn open_tree(self: &Arc<Self>, name: &'static str) -> Result<Arc<dyn Tree>>;
|
||||||
|
|
215
src/database/abstraction/rocksdb.rs
Normal file
215
src/database/abstraction/rocksdb.rs
Normal file
|
@ -0,0 +1,215 @@
|
||||||
|
use super::super::Config;
|
||||||
|
use crate::{utils, Result};
|
||||||
|
|
||||||
|
use std::{future::Future, pin::Pin, sync::Arc};
|
||||||
|
|
||||||
|
use super::{DatabaseEngine, Tree};
|
||||||
|
|
||||||
|
use std::{collections::HashMap, sync::RwLock};
|
||||||
|
|
||||||
|
pub struct Engine {
|
||||||
|
rocks: rocksdb::DBWithThreadMode<rocksdb::MultiThreaded>,
|
||||||
|
old_cfs: Vec<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct RocksDbEngineTree<'a> {
|
||||||
|
db: Arc<Engine>,
|
||||||
|
name: &'a str,
|
||||||
|
watchers: RwLock<HashMap<Vec<u8>, Vec<tokio::sync::oneshot::Sender<()>>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DatabaseEngine for Engine {
|
||||||
|
fn open(config: &Config) -> Result<Arc<Self>> {
|
||||||
|
let mut db_opts = rocksdb::Options::default();
|
||||||
|
db_opts.create_if_missing(true);
|
||||||
|
db_opts.set_max_open_files(16);
|
||||||
|
db_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level);
|
||||||
|
db_opts.set_compression_type(rocksdb::DBCompressionType::Snappy);
|
||||||
|
db_opts.set_target_file_size_base(256 << 20);
|
||||||
|
db_opts.set_write_buffer_size(256 << 20);
|
||||||
|
|
||||||
|
let mut block_based_options = rocksdb::BlockBasedOptions::default();
|
||||||
|
block_based_options.set_block_size(512 << 10);
|
||||||
|
db_opts.set_block_based_table_factory(&block_based_options);
|
||||||
|
|
||||||
|
let cfs = rocksdb::DBWithThreadMode::<rocksdb::MultiThreaded>::list_cf(
|
||||||
|
&db_opts,
|
||||||
|
&config.database_path,
|
||||||
|
)
|
||||||
|
.unwrap_or_default();
|
||||||
|
|
||||||
|
let db = rocksdb::DBWithThreadMode::<rocksdb::MultiThreaded>::open_cf_descriptors(
|
||||||
|
&db_opts,
|
||||||
|
&config.database_path,
|
||||||
|
cfs.iter().map(|name| {
|
||||||
|
let mut options = rocksdb::Options::default();
|
||||||
|
let prefix_extractor = rocksdb::SliceTransform::create_fixed_prefix(1);
|
||||||
|
options.set_prefix_extractor(prefix_extractor);
|
||||||
|
options.set_merge_operator_associative("increment", utils::increment_rocksdb);
|
||||||
|
|
||||||
|
rocksdb::ColumnFamilyDescriptor::new(name, options)
|
||||||
|
}),
|
||||||
|
)?;
|
||||||
|
|
||||||
|
Ok(Arc::new(Engine {
|
||||||
|
rocks: db,
|
||||||
|
old_cfs: cfs,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn open_tree(self: &Arc<Self>, name: &'static str) -> Result<Arc<dyn Tree>> {
|
||||||
|
if !self.old_cfs.contains(&name.to_owned()) {
|
||||||
|
// Create if it didn't exist
|
||||||
|
let mut options = rocksdb::Options::default();
|
||||||
|
let prefix_extractor = rocksdb::SliceTransform::create_fixed_prefix(1);
|
||||||
|
options.set_prefix_extractor(prefix_extractor);
|
||||||
|
options.set_merge_operator_associative("increment", utils::increment_rocksdb);
|
||||||
|
|
||||||
|
let _ = self.rocks.create_cf(name, &options);
|
||||||
|
println!("created cf");
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(Arc::new(RocksDbEngineTree {
|
||||||
|
name,
|
||||||
|
db: Arc::clone(self),
|
||||||
|
watchers: RwLock::new(HashMap::new()),
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn flush(self: &Arc<Self>) -> Result<()> {
|
||||||
|
// TODO?
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RocksDbEngineTree<'_> {
|
||||||
|
fn cf(&self) -> rocksdb::BoundColumnFamily<'_> {
|
||||||
|
self.db.rocks.cf_handle(self.name).unwrap()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Tree for RocksDbEngineTree<'_> {
|
||||||
|
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
|
||||||
|
Ok(self.db.rocks.get_cf(self.cf(), key)?)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> {
|
||||||
|
let watchers = self.watchers.read().unwrap();
|
||||||
|
let mut triggered = Vec::new();
|
||||||
|
|
||||||
|
for length in 0..=key.len() {
|
||||||
|
if watchers.contains_key(&key[..length]) {
|
||||||
|
triggered.push(&key[..length]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
drop(watchers);
|
||||||
|
|
||||||
|
if !triggered.is_empty() {
|
||||||
|
let mut watchers = self.watchers.write().unwrap();
|
||||||
|
for prefix in triggered {
|
||||||
|
if let Some(txs) = watchers.remove(prefix) {
|
||||||
|
for tx in txs {
|
||||||
|
let _ = tx.send(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(self.db.rocks.put_cf(self.cf(), key, value)?)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn insert_batch<'a>(&self, iter: &mut dyn Iterator<Item = (Vec<u8>, Vec<u8>)>) -> Result<()> {
|
||||||
|
for (key, value) in iter {
|
||||||
|
self.db.rocks.put_cf(self.cf(), key, value)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn remove(&self, key: &[u8]) -> Result<()> {
|
||||||
|
Ok(self.db.rocks.delete_cf(self.cf(), key)?)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
|
||||||
|
Box::new(
|
||||||
|
self.db
|
||||||
|
.rocks
|
||||||
|
.iterator_cf(self.cf(), rocksdb::IteratorMode::Start)
|
||||||
|
.map(|(k, v)| (Vec::from(k), Vec::from(v))),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn iter_from<'a>(
|
||||||
|
&'a self,
|
||||||
|
from: &[u8],
|
||||||
|
backwards: bool,
|
||||||
|
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
|
||||||
|
Box::new(
|
||||||
|
self.db
|
||||||
|
.rocks
|
||||||
|
.iterator_cf(
|
||||||
|
self.cf(),
|
||||||
|
rocksdb::IteratorMode::From(
|
||||||
|
from,
|
||||||
|
if backwards {
|
||||||
|
rocksdb::Direction::Reverse
|
||||||
|
} else {
|
||||||
|
rocksdb::Direction::Forward
|
||||||
|
},
|
||||||
|
),
|
||||||
|
)
|
||||||
|
.map(|(k, v)| (Vec::from(k), Vec::from(v))),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn increment(&self, key: &[u8]) -> Result<Vec<u8>> {
|
||||||
|
// TODO: make atomic
|
||||||
|
let old = self.db.rocks.get_cf(self.cf(), &key)?;
|
||||||
|
let new = utils::increment(old.as_deref()).unwrap();
|
||||||
|
self.db.rocks.put_cf(self.cf(), key, &new)?;
|
||||||
|
Ok(new)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn increment_batch<'a>(&self, iter: &mut dyn Iterator<Item = Vec<u8>>) -> Result<()> {
|
||||||
|
for key in iter {
|
||||||
|
let old = self.db.rocks.get_cf(self.cf(), &key)?;
|
||||||
|
let new = utils::increment(old.as_deref()).unwrap();
|
||||||
|
self.db.rocks.put_cf(self.cf(), key, new)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn scan_prefix<'a>(
|
||||||
|
&'a self,
|
||||||
|
prefix: Vec<u8>,
|
||||||
|
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
|
||||||
|
Box::new(
|
||||||
|
self.db
|
||||||
|
.rocks
|
||||||
|
.iterator_cf(
|
||||||
|
self.cf(),
|
||||||
|
rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
|
||||||
|
)
|
||||||
|
.map(|(k, v)| (Vec::from(k), Vec::from(v)))
|
||||||
|
.take_while(move |(k, _)| k.starts_with(&prefix)),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
|
||||||
|
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||||
|
|
||||||
|
self.watchers
|
||||||
|
.write()
|
||||||
|
.unwrap()
|
||||||
|
.entry(prefix.to_vec())
|
||||||
|
.or_default()
|
||||||
|
.push(tx);
|
||||||
|
|
||||||
|
Box::pin(async move {
|
||||||
|
// Tx is never destroyed
|
||||||
|
rx.await.unwrap();
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
|
@ -134,7 +134,7 @@ type TupleOfBytes = (Vec<u8>, Vec<u8>);
|
||||||
impl SqliteTable {
|
impl SqliteTable {
|
||||||
#[tracing::instrument(skip(self, guard, key))]
|
#[tracing::instrument(skip(self, guard, key))]
|
||||||
fn get_with_guard(&self, guard: &Connection, key: &[u8]) -> Result<Option<Vec<u8>>> {
|
fn get_with_guard(&self, guard: &Connection, key: &[u8]) -> Result<Option<Vec<u8>>> {
|
||||||
//dbg!(&self.name);
|
dbg!(&self.name);
|
||||||
Ok(guard
|
Ok(guard
|
||||||
.prepare(format!("SELECT value FROM {} WHERE key = ?", self.name).as_str())?
|
.prepare(format!("SELECT value FROM {} WHERE key = ?", self.name).as_str())?
|
||||||
.query_row([key], |row| row.get(0))
|
.query_row([key], |row| row.get(0))
|
||||||
|
@ -143,7 +143,7 @@ impl SqliteTable {
|
||||||
|
|
||||||
#[tracing::instrument(skip(self, guard, key, value))]
|
#[tracing::instrument(skip(self, guard, key, value))]
|
||||||
fn insert_with_guard(&self, guard: &Connection, key: &[u8], value: &[u8]) -> Result<()> {
|
fn insert_with_guard(&self, guard: &Connection, key: &[u8], value: &[u8]) -> Result<()> {
|
||||||
//dbg!(&self.name);
|
dbg!(&self.name);
|
||||||
guard.execute(
|
guard.execute(
|
||||||
format!(
|
format!(
|
||||||
"INSERT OR REPLACE INTO {} (key, value) VALUES (?, ?)",
|
"INSERT OR REPLACE INTO {} (key, value) VALUES (?, ?)",
|
||||||
|
@ -170,14 +170,14 @@ impl SqliteTable {
|
||||||
|
|
||||||
let statement_ref = NonAliasingBox(statement);
|
let statement_ref = NonAliasingBox(statement);
|
||||||
|
|
||||||
//let name = self.name.clone();
|
let name = self.name.clone();
|
||||||
|
|
||||||
let iterator = Box::new(
|
let iterator = Box::new(
|
||||||
statement
|
statement
|
||||||
.query_map([], |row| Ok((row.get_unwrap(0), row.get_unwrap(1))))
|
.query_map([], |row| Ok((row.get_unwrap(0), row.get_unwrap(1))))
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.map(move |r| {
|
.map(move |r| {
|
||||||
//dbg!(&name);
|
dbg!(&name);
|
||||||
r.unwrap()
|
r.unwrap()
|
||||||
}),
|
}),
|
||||||
);
|
);
|
||||||
|
@ -285,7 +285,7 @@ impl Tree for SqliteTable {
|
||||||
let guard = self.engine.read_lock_iterator();
|
let guard = self.engine.read_lock_iterator();
|
||||||
let from = from.to_vec(); // TODO change interface?
|
let from = from.to_vec(); // TODO change interface?
|
||||||
|
|
||||||
//let name = self.name.clone();
|
let name = self.name.clone();
|
||||||
|
|
||||||
if backwards {
|
if backwards {
|
||||||
let statement = Box::leak(Box::new(
|
let statement = Box::leak(Box::new(
|
||||||
|
@ -304,7 +304,7 @@ impl Tree for SqliteTable {
|
||||||
.query_map([from], |row| Ok((row.get_unwrap(0), row.get_unwrap(1))))
|
.query_map([from], |row| Ok((row.get_unwrap(0), row.get_unwrap(1))))
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.map(move |r| {
|
.map(move |r| {
|
||||||
//dbg!(&name);
|
dbg!(&name);
|
||||||
r.unwrap()
|
r.unwrap()
|
||||||
}),
|
}),
|
||||||
);
|
);
|
||||||
|
@ -329,7 +329,7 @@ impl Tree for SqliteTable {
|
||||||
.query_map([from], |row| Ok((row.get_unwrap(0), row.get_unwrap(1))))
|
.query_map([from], |row| Ok((row.get_unwrap(0), row.get_unwrap(1))))
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.map(move |r| {
|
.map(move |r| {
|
||||||
//dbg!(&name);
|
dbg!(&name);
|
||||||
r.unwrap()
|
r.unwrap()
|
||||||
}),
|
}),
|
||||||
);
|
);
|
||||||
|
|
|
@ -39,6 +39,12 @@ pub enum Error {
|
||||||
#[cfg(feature = "heed")]
|
#[cfg(feature = "heed")]
|
||||||
#[error("There was a problem with the connection to the heed database: {error}")]
|
#[error("There was a problem with the connection to the heed database: {error}")]
|
||||||
HeedError { error: String },
|
HeedError { error: String },
|
||||||
|
#[cfg(feature = "rocksdb")]
|
||||||
|
#[error("There was a problem with the connection to the rocksdb database: {source}")]
|
||||||
|
RocksDbError {
|
||||||
|
#[from]
|
||||||
|
source: rocksdb::Error,
|
||||||
|
},
|
||||||
#[error("Could not generate an image.")]
|
#[error("Could not generate an image.")]
|
||||||
ImageError {
|
ImageError {
|
||||||
#[from]
|
#[from]
|
||||||
|
|
11
src/utils.rs
11
src/utils.rs
|
@ -29,6 +29,17 @@ pub fn increment(old: Option<&[u8]>) -> Option<Vec<u8>> {
|
||||||
Some(number.to_be_bytes().to_vec())
|
Some(number.to_be_bytes().to_vec())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "rocksdb")]
|
||||||
|
pub fn increment_rocksdb(
|
||||||
|
_new_key: &[u8],
|
||||||
|
old: Option<&[u8]>,
|
||||||
|
_operands: &mut rocksdb::MergeOperands,
|
||||||
|
) -> Option<Vec<u8>> {
|
||||||
|
dbg!(_new_key);
|
||||||
|
dbg!(old);
|
||||||
|
increment(old)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn generate_keypair() -> Vec<u8> {
|
pub fn generate_keypair() -> Vec<u8> {
|
||||||
let mut value = random_string(8).as_bytes().to_vec();
|
let mut value = random_string(8).as_bytes().to_vec();
|
||||||
value.push(0xff);
|
value.push(0xff);
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue