From e770de3d3dc11afc59f83e5eac89591bf9191e12 Mon Sep 17 00:00:00 2001 From: Tglman Date: Sat, 15 Jan 2022 14:07:16 +0000 Subject: [PATCH 01/12] test: add abstract test implementations and specific implementation for sqlite --- Cargo.toml | 3 + src/database/abstraction.rs | 3 + src/database/abstraction/tests.rs | 227 ++++++++++++++++++++++++++++++ 3 files changed, 233 insertions(+) create mode 100644 src/database/abstraction/tests.rs diff --git a/Cargo.toml b/Cargo.toml index 0cdde4ab..cbf7ba5f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -175,6 +175,9 @@ version = "0.25" [target.'cfg(unix)'.dependencies] nix = { version = "0.28", features = ["resource"] } +[dev-dependencies] +tempfile = "3.2" + [features] default = ["backend_rocksdb", "backend_sqlite", "conduit_bin", "systemd"] #backend_sled = ["sled"] diff --git a/src/database/abstraction.rs b/src/database/abstraction.rs index 93660f9f..a3c6ccfd 100644 --- a/src/database/abstraction.rs +++ b/src/database/abstraction.rs @@ -26,6 +26,9 @@ pub mod persy; ))] pub mod watchers; +#[cfg(test)] +mod tests; + pub trait KeyValueDatabaseEngine: Send + Sync { fn open(config: &Config) -> Result where diff --git a/src/database/abstraction/tests.rs b/src/database/abstraction/tests.rs new file mode 100644 index 00000000..313a55f3 --- /dev/null +++ b/src/database/abstraction/tests.rs @@ -0,0 +1,227 @@ +use crate::database::{abstraction::Tree, Config}; + +fn insert_get(tree: &dyn Tree) { + let key = "key".as_bytes(); + let value = "value".as_bytes(); + tree.insert(key, value).unwrap(); + let read = tree.get(key).unwrap(); + assert_eq!(read, Some(value.to_owned())); +} + +fn insert_get_remove(tree: &dyn Tree) { + let key = "key".as_bytes(); + let value = "value".as_bytes(); + tree.insert(key, value).unwrap(); + let read = tree.get(key).unwrap(); + assert_eq!(read, Some(value.to_owned())); + tree.remove(key).unwrap(); + let read = tree.get(key).unwrap(); + assert_eq!(read, None); +} + +fn batch_insert_get(tree: &dyn Tree) { + let key = "key".as_bytes(); + let value = "value".as_bytes(); + let key1 = "key1".as_bytes(); + let value1 = "value1".as_bytes(); + let key2 = "key2".as_bytes(); + let value2 = "value2".as_bytes(); + tree.insert_batch( + &mut vec![ + (key.to_owned(), value.to_owned()), + (key1.to_owned(), value1.to_owned()), + (key2.to_owned(), value2.to_owned()), + ] + .into_iter(), + ) + .unwrap(); + let read = tree.get(key).unwrap(); + assert_eq!(read, Some(value.to_owned())); + let read = tree.get(key1).unwrap(); + assert_eq!(read, Some(value1.to_owned())); + let read = tree.get(key2).unwrap(); + assert_eq!(read, Some(value2.to_owned())); +} + +fn insert_iter(tree: &dyn Tree) { + let key = "key".as_bytes(); + let value = "value".as_bytes(); + tree.insert(key, value).unwrap(); + let key1 = "key1".as_bytes(); + let value1 = "value1".as_bytes(); + tree.insert(key1, value1).unwrap(); + let key2 = "key2".as_bytes(); + let value2 = "value2".as_bytes(); + tree.insert(key2, value2).unwrap(); + let mut iter = tree.iter(); + assert_eq!(iter.next(), Some((key.to_owned(), value.to_owned()))); + assert_eq!(iter.next(), Some((key1.to_owned(), value1.to_owned()))); + assert_eq!(iter.next(), Some((key2.to_owned(), value2.to_owned()))); + assert_eq!(iter.next(), None); +} + +fn insert_iter_from(tree: &dyn Tree) { + let key = "key".as_bytes(); + let value = "value".as_bytes(); + tree.insert(key, value).unwrap(); + let key1 = "key1".as_bytes(); + let value1 = "value1".as_bytes(); + tree.insert(key1, value1).unwrap(); + let key2 = "key2".as_bytes(); + let value2 = "value2".as_bytes(); + tree.insert(key2, value2).unwrap(); + let mut iter = tree.iter_from(key1, false); + assert_eq!(iter.next(), Some((key1.to_owned(), value1.to_owned()))); + assert_eq!(iter.next(), Some((key2.to_owned(), value2.to_owned()))); + assert_eq!(iter.next(), None); + let mut iter = tree.iter_from(key1, true); + assert_eq!(iter.next(), Some((key1.to_owned(), value1.to_owned()))); + assert_eq!(iter.next(), Some((key.to_owned(), value.to_owned()))); + assert_eq!(iter.next(), None); +} + +fn insert_iter_prefix(tree: &dyn Tree) { + let key = "key".as_bytes(); + let value = "value".as_bytes(); + tree.insert(key, value).unwrap(); + let key1 = "key1".as_bytes(); + let value1 = "value1".as_bytes(); + tree.insert(key1, value1).unwrap(); + let key11 = "key11".as_bytes(); + let value11 = "value11".as_bytes(); + tree.insert(key11, value11).unwrap(); + let key2 = "key2".as_bytes(); + let value2 = "value2".as_bytes(); + tree.insert(key2, value2).unwrap(); + let mut iter = tree.scan_prefix(key1.to_owned()); + assert_eq!(iter.next(), Some((key1.to_owned(), value1.to_owned()))); + assert_eq!(iter.next(), Some((key11.to_owned(), value11.to_owned()))); + assert_eq!(iter.next(), None); +} + +fn insert_clear(tree: &dyn Tree) { + let key = "key".as_bytes(); + let value = "value".as_bytes(); + tree.insert(key, value).unwrap(); + let key1 = "key1".as_bytes(); + let value1 = "value1".as_bytes(); + tree.insert(key1, value1).unwrap(); + let key2 = "key2".as_bytes(); + let value2 = "value2".as_bytes(); + tree.insert(key2, value2).unwrap(); + assert_eq!(tree.iter().count(), 3); + tree.clear().unwrap(); + assert_eq!(tree.iter().count(), 0); +} + +fn increment(tree: &dyn Tree) { + let key = "key".as_bytes(); + tree.increment(key).unwrap(); + let read = tree.get(key).unwrap(); + assert_eq!(crate::utils::u64_from_bytes(&read.unwrap()).unwrap(), 1); + tree.increment(key).unwrap(); + let read = tree.get(key).unwrap(); + assert_eq!(crate::utils::u64_from_bytes(&read.unwrap()).unwrap(), 2); +} + +fn increment_batch(tree: &dyn Tree) { + let key = "key".as_bytes(); + let key1 = "key1".as_bytes(); + tree.increment_batch(&mut vec![key.to_owned(), key1.to_owned()].into_iter()) + .unwrap(); + let read = tree.get(key).unwrap(); + assert_eq!(crate::utils::u64_from_bytes(&read.unwrap()).unwrap(), 1); + let read = tree.get(key1).unwrap(); + assert_eq!(crate::utils::u64_from_bytes(&read.unwrap()).unwrap(), 1); + tree.increment_batch(&mut vec![key.to_owned(), key1.to_owned()].into_iter()) + .unwrap(); + let read = tree.get(key).unwrap(); + assert_eq!(crate::utils::u64_from_bytes(&read.unwrap()).unwrap(), 2); + let read = tree.get(key1).unwrap(); + assert_eq!(crate::utils::u64_from_bytes(&read.unwrap()).unwrap(), 2); +} + +fn empty_config(database_path: &str) -> Config { + use rocket::figment::providers::{Format, Toml}; + Toml::from_str(&format!( + r#" +server_name = "test" +database_path = "{}" +"#, + database_path + )) + .unwrap() +} + +#[cfg(feature = "sqlite")] +mod sqlite { + + use super::*; + use crate::database::abstraction::{DatabaseEngine, Tree}; + use std::sync::Arc; + use tempfile::{Builder, TempDir}; + + /// Make sure to keep the reference of the tree returned values for + /// the length of the test, to avoid early cleanups that may create test issues + fn open_tree(test_name: &str) -> (Arc, impl DatabaseEngine, TempDir) { + let db_folder = Builder::new().prefix(test_name).tempdir().unwrap(); + let config = empty_config(db_folder.path().to_str().unwrap()); + let instance = Arc::::open(&config).unwrap(); + let tree = instance.open_tree("test").unwrap(); + (tree, instance, db_folder) + } + + #[test] + fn sqlite_insert_get() { + let (tree, _inst, _temp_dir) = open_tree("insert_get"); + insert_get(&*tree) + } + + #[test] + fn sqlite_insert_get_remove() { + let (tree, _inst, _temp_dir) = open_tree("insert_get_remove"); + insert_get_remove(&*tree) + } + + #[test] + fn sqlite_batch_insert_get() { + let (tree, _inst, _temp_dir) = open_tree("batch_insert_get"); + batch_insert_get(&*tree) + } + + #[test] + fn sqlite_insert_iter() { + let (tree, _inst, _temp_dir) = open_tree("insert_iter"); + insert_iter(&*tree) + } + + #[test] + fn sqlite_insert_iter_from() { + let (tree, _inst, _temp_dir) = open_tree("insert_iter_from"); + insert_iter_from(&*tree) + } + + #[test] + fn sqlite_insert_iter_prefix() { + let (tree, _inst, _temp_dir) = open_tree("insert_iter_prefix"); + insert_iter_prefix(&*tree) + } + + #[test] + fn sqlite_insert_clear() { + let (tree, _inst, _temp_dir) = open_tree("insert_iter_prefix"); + insert_clear(&*tree) + } + + #[test] + fn sqlite_increment() { + let (tree, _inst, _temp_dir) = open_tree("increment"); + increment(&*tree) + } + + #[test] + fn sqlite_increment_batch() { + let (tree, _inst, _temp_dir) = open_tree("increment_batch"); + increment_batch(&*tree) + } +} From 326c345811aa4a253c374d57a1e20e5f4a3dc668 Mon Sep 17 00:00:00 2001 From: Tglman Date: Sat, 22 Jan 2022 13:55:58 +0000 Subject: [PATCH 02/12] test:add replace test and refactor for reduce specific test boilerplate --- src/database/abstraction/tests.rs | 65 ++++++++++++++++++++----------- 1 file changed, 43 insertions(+), 22 deletions(-) diff --git a/src/database/abstraction/tests.rs b/src/database/abstraction/tests.rs index 313a55f3..b460db75 100644 --- a/src/database/abstraction/tests.rs +++ b/src/database/abstraction/tests.rs @@ -1,4 +1,6 @@ -use crate::database::{abstraction::Tree, Config}; +use crate::database::{abstraction::{DatabaseEngine, Tree}, Config}; +use std::sync::Arc; +use tempfile::{Builder, TempDir}; fn insert_get(tree: &dyn Tree) { let key = "key".as_bytes(); @@ -8,6 +10,19 @@ fn insert_get(tree: &dyn Tree) { assert_eq!(read, Some(value.to_owned())); } +fn insert_get_replace(tree: &dyn Tree) { + let key = "key".as_bytes(); + let value = "value".as_bytes(); + tree.insert(key, value).unwrap(); + let read = tree.get(key).unwrap(); + assert_eq!(read, Some(value.to_owned())); + + let value1 = "value1".as_bytes(); + tree.insert(key, value1).unwrap(); + let read = tree.get(key).unwrap(); + assert_eq!(read, Some(value1.to_owned())); +} + fn insert_get_remove(tree: &dyn Tree) { let key = "key".as_bytes(); let value = "value".as_bytes(); @@ -153,69 +168,75 @@ database_path = "{}" .unwrap() } +/// Make sure to keep the reference of the tree returned values for +/// the length of the test, to avoid early cleanups that may create test issues +fn open_tree(test_name: &str) -> (Arc, impl DatabaseEngine, TempDir) + where Arc: DatabaseEngine +{ + let db_folder = Builder::new().prefix(test_name).tempdir().unwrap(); + let config = empty_config(db_folder.path().to_str().unwrap()); + let instance = Arc::::open(&config).unwrap(); + let tree = instance.open_tree("test").unwrap(); + (tree, instance, db_folder) +} + #[cfg(feature = "sqlite")] mod sqlite { use super::*; - use crate::database::abstraction::{DatabaseEngine, Tree}; - use std::sync::Arc; - use tempfile::{Builder, TempDir}; - - /// Make sure to keep the reference of the tree returned values for - /// the length of the test, to avoid early cleanups that may create test issues - fn open_tree(test_name: &str) -> (Arc, impl DatabaseEngine, TempDir) { - let db_folder = Builder::new().prefix(test_name).tempdir().unwrap(); - let config = empty_config(db_folder.path().to_str().unwrap()); - let instance = Arc::::open(&config).unwrap(); - let tree = instance.open_tree("test").unwrap(); - (tree, instance, db_folder) - } + use crate::database::abstraction::sqlite::Engine; #[test] fn sqlite_insert_get() { - let (tree, _inst, _temp_dir) = open_tree("insert_get"); + let (tree, _inst, _temp_dir) = open_tree::("sqlite_insert_get"); insert_get(&*tree) } + + #[test] + fn sqlite_insert_replace_get() { + let (tree, _inst, _temp_dir) = open_tree::("sqlite_insert_get_replace"); + insert_get_replace(&*tree) + } #[test] fn sqlite_insert_get_remove() { - let (tree, _inst, _temp_dir) = open_tree("insert_get_remove"); + let (tree, _inst, _temp_dir) = open_tree::("sqlite_insert_get_remove"); insert_get_remove(&*tree) } #[test] fn sqlite_batch_insert_get() { - let (tree, _inst, _temp_dir) = open_tree("batch_insert_get"); + let (tree, _inst, _temp_dir) = open_tree::("sqlite_batch_insert_get"); batch_insert_get(&*tree) } #[test] fn sqlite_insert_iter() { - let (tree, _inst, _temp_dir) = open_tree("insert_iter"); + let (tree, _inst, _temp_dir) = open_tree::("sqlite_insert_iter"); insert_iter(&*tree) } #[test] fn sqlite_insert_iter_from() { - let (tree, _inst, _temp_dir) = open_tree("insert_iter_from"); + let (tree, _inst, _temp_dir) = open_tree::("sqlite_insert_iter_from"); insert_iter_from(&*tree) } #[test] fn sqlite_insert_iter_prefix() { - let (tree, _inst, _temp_dir) = open_tree("insert_iter_prefix"); + let (tree, _inst, _temp_dir) = open_tree::("sqlite_insert_iter_prefix"); insert_iter_prefix(&*tree) } #[test] fn sqlite_insert_clear() { - let (tree, _inst, _temp_dir) = open_tree("insert_iter_prefix"); + let (tree, _inst, _temp_dir) = open_tree::("sqlite_insert_iter_prefix"); insert_clear(&*tree) } #[test] fn sqlite_increment() { - let (tree, _inst, _temp_dir) = open_tree("increment"); + let (tree, _inst, _temp_dir) = open_tree::("sqlite_increment"); increment(&*tree) } From 5a66ffd9e6fdd1c016e56679878fc59d5bba9a3b Mon Sep 17 00:00:00 2001 From: Tglman Date: Sat, 22 Jan 2022 14:07:50 +0000 Subject: [PATCH 03/12] refactor: changed tree specific impls to be one liner --- src/database/abstraction/tests.rs | 146 ++++++++++++++++++------------ 1 file changed, 90 insertions(+), 56 deletions(-) diff --git a/src/database/abstraction/tests.rs b/src/database/abstraction/tests.rs index b460db75..0187f2f1 100644 --- a/src/database/abstraction/tests.rs +++ b/src/database/abstraction/tests.rs @@ -1,8 +1,40 @@ -use crate::database::{abstraction::{DatabaseEngine, Tree}, Config}; +use crate::database::{ + abstraction::{DatabaseEngine, Tree}, + Config, +}; use std::sync::Arc; use tempfile::{Builder, TempDir}; -fn insert_get(tree: &dyn Tree) { +fn empty_config(database_path: &str) -> Config { + use rocket::figment::providers::{Format, Toml}; + Toml::from_str(&format!( + r#" +server_name = "test" +database_path = "{}" +"#, + database_path + )) + .unwrap() +} + +/// Make sure to keep the reference of the tree returned values for +/// the length of the test, to avoid early cleanups that may create test issues +fn open_tree(test_name: &str) -> (Arc, impl DatabaseEngine, TempDir) +where + Arc: DatabaseEngine, +{ + let db_folder = Builder::new().prefix(test_name).tempdir().unwrap(); + let config = empty_config(db_folder.path().to_str().unwrap()); + let instance = Arc::::open(&config).unwrap(); + let tree = instance.open_tree("test").unwrap(); + (tree, instance, db_folder) +} + +fn insert_get(name: &str) +where + Arc: DatabaseEngine, +{ + let (tree, _inst, _temp_dir) = open_tree::(name); let key = "key".as_bytes(); let value = "value".as_bytes(); tree.insert(key, value).unwrap(); @@ -10,7 +42,11 @@ fn insert_get(tree: &dyn Tree) { assert_eq!(read, Some(value.to_owned())); } -fn insert_get_replace(tree: &dyn Tree) { +fn insert_get_replace(name: &str) +where + Arc: DatabaseEngine, +{ + let (tree, _inst, _temp_dir) = open_tree::(name); let key = "key".as_bytes(); let value = "value".as_bytes(); tree.insert(key, value).unwrap(); @@ -23,7 +59,11 @@ fn insert_get_replace(tree: &dyn Tree) { assert_eq!(read, Some(value1.to_owned())); } -fn insert_get_remove(tree: &dyn Tree) { +fn insert_get_remove(name: &str) +where + Arc: DatabaseEngine, +{ + let (tree, _inst, _temp_dir) = open_tree::(name); let key = "key".as_bytes(); let value = "value".as_bytes(); tree.insert(key, value).unwrap(); @@ -34,7 +74,11 @@ fn insert_get_remove(tree: &dyn Tree) { assert_eq!(read, None); } -fn batch_insert_get(tree: &dyn Tree) { +fn batch_insert_get(name: &str) +where + Arc: DatabaseEngine, +{ + let (tree, _inst, _temp_dir) = open_tree::(name); let key = "key".as_bytes(); let value = "value".as_bytes(); let key1 = "key1".as_bytes(); @@ -58,7 +102,11 @@ fn batch_insert_get(tree: &dyn Tree) { assert_eq!(read, Some(value2.to_owned())); } -fn insert_iter(tree: &dyn Tree) { +fn insert_iter(name: &str) +where + Arc: DatabaseEngine, +{ + let (tree, _inst, _temp_dir) = open_tree::(name); let key = "key".as_bytes(); let value = "value".as_bytes(); tree.insert(key, value).unwrap(); @@ -75,7 +123,11 @@ fn insert_iter(tree: &dyn Tree) { assert_eq!(iter.next(), None); } -fn insert_iter_from(tree: &dyn Tree) { +fn insert_iter_from(name: &str) +where + Arc: DatabaseEngine, +{ + let (tree, _inst, _temp_dir) = open_tree::(name); let key = "key".as_bytes(); let value = "value".as_bytes(); tree.insert(key, value).unwrap(); @@ -95,7 +147,11 @@ fn insert_iter_from(tree: &dyn Tree) { assert_eq!(iter.next(), None); } -fn insert_iter_prefix(tree: &dyn Tree) { +fn insert_iter_prefix(name: &str) +where + Arc: DatabaseEngine, +{ + let (tree, _inst, _temp_dir) = open_tree::(name); let key = "key".as_bytes(); let value = "value".as_bytes(); tree.insert(key, value).unwrap(); @@ -114,7 +170,11 @@ fn insert_iter_prefix(tree: &dyn Tree) { assert_eq!(iter.next(), None); } -fn insert_clear(tree: &dyn Tree) { +fn insert_clear(name: &str) +where + Arc: DatabaseEngine, +{ + let (tree, _inst, _temp_dir) = open_tree::(name); let key = "key".as_bytes(); let value = "value".as_bytes(); tree.insert(key, value).unwrap(); @@ -129,7 +189,11 @@ fn insert_clear(tree: &dyn Tree) { assert_eq!(tree.iter().count(), 0); } -fn increment(tree: &dyn Tree) { +fn increment(name: &str) +where + Arc: DatabaseEngine, +{ + let (tree, _inst, _temp_dir) = open_tree::(name); let key = "key".as_bytes(); tree.increment(key).unwrap(); let read = tree.get(key).unwrap(); @@ -139,7 +203,11 @@ fn increment(tree: &dyn Tree) { assert_eq!(crate::utils::u64_from_bytes(&read.unwrap()).unwrap(), 2); } -fn increment_batch(tree: &dyn Tree) { +fn increment_batch(name: &str) +where + Arc: DatabaseEngine, +{ + let (tree, _inst, _temp_dir) = open_tree::(name); let key = "key".as_bytes(); let key1 = "key1".as_bytes(); tree.increment_batch(&mut vec![key.to_owned(), key1.to_owned()].into_iter()) @@ -156,30 +224,6 @@ fn increment_batch(tree: &dyn Tree) { assert_eq!(crate::utils::u64_from_bytes(&read.unwrap()).unwrap(), 2); } -fn empty_config(database_path: &str) -> Config { - use rocket::figment::providers::{Format, Toml}; - Toml::from_str(&format!( - r#" -server_name = "test" -database_path = "{}" -"#, - database_path - )) - .unwrap() -} - -/// Make sure to keep the reference of the tree returned values for -/// the length of the test, to avoid early cleanups that may create test issues -fn open_tree(test_name: &str) -> (Arc, impl DatabaseEngine, TempDir) - where Arc: DatabaseEngine -{ - let db_folder = Builder::new().prefix(test_name).tempdir().unwrap(); - let config = empty_config(db_folder.path().to_str().unwrap()); - let instance = Arc::::open(&config).unwrap(); - let tree = instance.open_tree("test").unwrap(); - (tree, instance, db_folder) -} - #[cfg(feature = "sqlite")] mod sqlite { @@ -188,61 +232,51 @@ mod sqlite { #[test] fn sqlite_insert_get() { - let (tree, _inst, _temp_dir) = open_tree::("sqlite_insert_get"); - insert_get(&*tree) + insert_get::("sqlite_insert_get") } - + #[test] fn sqlite_insert_replace_get() { - let (tree, _inst, _temp_dir) = open_tree::("sqlite_insert_get_replace"); - insert_get_replace(&*tree) + insert_get_replace::("sqlite_insert_get_replace") } #[test] fn sqlite_insert_get_remove() { - let (tree, _inst, _temp_dir) = open_tree::("sqlite_insert_get_remove"); - insert_get_remove(&*tree) + insert_get_remove::("sqlite_insert_get_remove") } #[test] fn sqlite_batch_insert_get() { - let (tree, _inst, _temp_dir) = open_tree::("sqlite_batch_insert_get"); - batch_insert_get(&*tree) + batch_insert_get::("sqlite_batch_insert_get") } #[test] fn sqlite_insert_iter() { - let (tree, _inst, _temp_dir) = open_tree::("sqlite_insert_iter"); - insert_iter(&*tree) + insert_iter::("sqlite_insert_iter") } #[test] fn sqlite_insert_iter_from() { - let (tree, _inst, _temp_dir) = open_tree::("sqlite_insert_iter_from"); - insert_iter_from(&*tree) + insert_iter_from::("sqlite_insert_iter_from") } #[test] fn sqlite_insert_iter_prefix() { - let (tree, _inst, _temp_dir) = open_tree::("sqlite_insert_iter_prefix"); - insert_iter_prefix(&*tree) + insert_iter_prefix::("sqlite_insert_iter_prefix") } #[test] fn sqlite_insert_clear() { - let (tree, _inst, _temp_dir) = open_tree::("sqlite_insert_iter_prefix"); - insert_clear(&*tree) + insert_clear::("sqlite_insert_iter_prefix") } #[test] fn sqlite_increment() { - let (tree, _inst, _temp_dir) = open_tree::("sqlite_increment"); - increment(&*tree) + increment::("sqlite_increment") } #[test] fn sqlite_increment_batch() { - let (tree, _inst, _temp_dir) = open_tree("increment_batch"); - increment_batch(&*tree) + increment_batch::("sqlite_increment_batch") } } From 5591ce3e1b1cc25eb6c0171ab6e2a71a9d0eb184 Mon Sep 17 00:00:00 2001 From: Tglman Date: Sat, 22 Jan 2022 14:18:47 +0000 Subject: [PATCH 04/12] test: add tests for rocksdb and persy impl --- src/database/abstraction/tests.rs | 113 ++++++++++++++++++++++++++++++ 1 file changed, 113 insertions(+) diff --git a/src/database/abstraction/tests.rs b/src/database/abstraction/tests.rs index 0187f2f1..287a3884 100644 --- a/src/database/abstraction/tests.rs +++ b/src/database/abstraction/tests.rs @@ -280,3 +280,116 @@ mod sqlite { increment_batch::("sqlite_increment_batch") } } + +#[cfg(feature = "rocksdb")] +mod rocksdb { + + use super::*; + use crate::database::abstraction::rocksdb::Engine; + + #[test] + fn rocksdb_insert_get() { + insert_get::("rocksdb_insert_get") + } + + #[test] + fn rocksdb_insert_replace_get() { + insert_get_replace::("rocksdb_insert_get_replace") + } + + #[test] + fn rocksdb_insert_get_remove() { + insert_get_remove::("rocksdb_insert_get_remove") + } + + #[test] + fn rocksdb_batch_insert_get() { + batch_insert_get::("rocksdb_batch_insert_get") + } + + #[test] + fn rocksdb_insert_iter() { + insert_iter::("rocksdb_insert_iter") + } + + #[test] + fn rocksdb_insert_iter_from() { + insert_iter_from::("rocksdb_insert_iter_from") + } + + #[test] + fn rocksdb_insert_iter_prefix() { + insert_iter_prefix::("rocksdb_insert_iter_prefix") + } + + #[test] + fn rocksdb_insert_clear() { + insert_clear::("rocksdb_insert_iter_prefix") + } + + #[test] + fn rocksdb_increment() { + increment::("rocksdb_increment") + } + + #[test] + fn rocksdb_increment_batch() { + increment_batch::("rocksdb_increment_batch") + } +} +#[cfg(feature = "persy")] +mod persy { + + use super::*; + use crate::database::abstraction::persy::Engine; + + #[test] + fn persy_insert_get() { + insert_get::("persy_insert_get") + } + + #[test] + fn persy_insert_replace_get() { + insert_get_replace::("persy_insert_get_replace") + } + + #[test] + fn persy_insert_get_remove() { + insert_get_remove::("persy_insert_get_remove") + } + + #[test] + fn persy_batch_insert_get() { + batch_insert_get::("persy_batch_insert_get") + } + + #[test] + fn persy_insert_iter() { + insert_iter::("persy_insert_iter") + } + + #[test] + fn persy_insert_iter_from() { + insert_iter_from::("persy_insert_iter_from") + } + + #[test] + fn persy_insert_iter_prefix() { + insert_iter_prefix::("persy_insert_iter_prefix") + } + + #[test] + fn persy_insert_clear() { + insert_clear::("persy_insert_iter_prefix") + } + + #[test] + fn persy_increment() { + increment::("persy_increment") + } + + #[test] + fn persy_increment_batch() { + increment_batch::("persy_increment_batch") + } +} From 62f04d22e1c64902f2bc9626fc6bd33825719cfb Mon Sep 17 00:00:00 2001 From: Tglman Date: Tue, 1 Feb 2022 20:58:51 +0000 Subject: [PATCH 05/12] test:add entry for seamless remove of not existing key from database --- src/database/abstraction/tests.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/database/abstraction/tests.rs b/src/database/abstraction/tests.rs index 287a3884..d4939537 100644 --- a/src/database/abstraction/tests.rs +++ b/src/database/abstraction/tests.rs @@ -72,6 +72,8 @@ where tree.remove(key).unwrap(); let read = tree.get(key).unwrap(); assert_eq!(read, None); + // Remove of not existing key should run seamless + tree.remove(key).unwrap(); } fn batch_insert_get(name: &str) From 2d641b62a6a0693286132c4be81ac380f4456eb2 Mon Sep 17 00:00:00 2001 From: Tglman Date: Sat, 12 Feb 2022 23:39:45 +0000 Subject: [PATCH 06/12] fix:update implementation to recent changes on next --- src/database/abstraction/tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/database/abstraction/tests.rs b/src/database/abstraction/tests.rs index d4939537..a5cd1dd6 100644 --- a/src/database/abstraction/tests.rs +++ b/src/database/abstraction/tests.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use tempfile::{Builder, TempDir}; fn empty_config(database_path: &str) -> Config { - use rocket::figment::providers::{Format, Toml}; + use figment::providers::{Format, Toml}; Toml::from_str(&format!( r#" server_name = "test" From 4a087e4873f92b39f247d3bdcacd2c0f0512d252 Mon Sep 17 00:00:00 2001 From: Tglman Date: Wed, 26 Oct 2022 21:27:15 +0100 Subject: [PATCH 07/12] chore: fix tests after refactor --- src/database/abstraction/tests.rs | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/src/database/abstraction/tests.rs b/src/database/abstraction/tests.rs index a5cd1dd6..b9a565ea 100644 --- a/src/database/abstraction/tests.rs +++ b/src/database/abstraction/tests.rs @@ -1,5 +1,5 @@ use crate::database::{ - abstraction::{DatabaseEngine, Tree}, + abstraction::{KeyValueDatabaseEngine, KvTree}, Config, }; use std::sync::Arc; @@ -19,9 +19,9 @@ database_path = "{}" /// Make sure to keep the reference of the tree returned values for /// the length of the test, to avoid early cleanups that may create test issues -fn open_tree(test_name: &str) -> (Arc, impl DatabaseEngine, TempDir) +fn open_tree(test_name: &str) -> (Arc, impl KeyValueDatabaseEngine, TempDir) where - Arc: DatabaseEngine, + Arc: KeyValueDatabaseEngine, { let db_folder = Builder::new().prefix(test_name).tempdir().unwrap(); let config = empty_config(db_folder.path().to_str().unwrap()); @@ -32,7 +32,7 @@ where fn insert_get(name: &str) where - Arc: DatabaseEngine, + Arc: KeyValueDatabaseEngine, { let (tree, _inst, _temp_dir) = open_tree::(name); let key = "key".as_bytes(); @@ -44,7 +44,7 @@ where fn insert_get_replace(name: &str) where - Arc: DatabaseEngine, + Arc: KeyValueDatabaseEngine, { let (tree, _inst, _temp_dir) = open_tree::(name); let key = "key".as_bytes(); @@ -61,7 +61,7 @@ where fn insert_get_remove(name: &str) where - Arc: DatabaseEngine, + Arc: KeyValueDatabaseEngine, { let (tree, _inst, _temp_dir) = open_tree::(name); let key = "key".as_bytes(); @@ -78,7 +78,7 @@ where fn batch_insert_get(name: &str) where - Arc: DatabaseEngine, + Arc: KeyValueDatabaseEngine, { let (tree, _inst, _temp_dir) = open_tree::(name); let key = "key".as_bytes(); @@ -106,7 +106,7 @@ where fn insert_iter(name: &str) where - Arc: DatabaseEngine, + Arc: KeyValueDatabaseEngine, { let (tree, _inst, _temp_dir) = open_tree::(name); let key = "key".as_bytes(); @@ -127,7 +127,7 @@ where fn insert_iter_from(name: &str) where - Arc: DatabaseEngine, + Arc: KeyValueDatabaseEngine, { let (tree, _inst, _temp_dir) = open_tree::(name); let key = "key".as_bytes(); @@ -151,7 +151,7 @@ where fn insert_iter_prefix(name: &str) where - Arc: DatabaseEngine, + Arc: KeyValueDatabaseEngine, { let (tree, _inst, _temp_dir) = open_tree::(name); let key = "key".as_bytes(); @@ -174,7 +174,7 @@ where fn insert_clear(name: &str) where - Arc: DatabaseEngine, + Arc: KeyValueDatabaseEngine, { let (tree, _inst, _temp_dir) = open_tree::(name); let key = "key".as_bytes(); @@ -193,7 +193,7 @@ where fn increment(name: &str) where - Arc: DatabaseEngine, + Arc: KeyValueDatabaseEngine, { let (tree, _inst, _temp_dir) = open_tree::(name); let key = "key".as_bytes(); @@ -207,7 +207,7 @@ where fn increment_batch(name: &str) where - Arc: DatabaseEngine, + Arc: KeyValueDatabaseEngine, { let (tree, _inst, _temp_dir) = open_tree::(name); let key = "key".as_bytes(); From c901ac0bb83fe6608f75b4e72a997a46ee5a64b3 Mon Sep 17 00:00:00 2001 From: Tglman Date: Tue, 22 Aug 2023 00:34:28 +0100 Subject: [PATCH 08/12] fix: make heed implementation compile --- src/database/abstraction/heed.rs | 62 +++++++++++++++++++++----------- src/utils/error.rs | 2 +- 2 files changed, 42 insertions(+), 22 deletions(-) diff --git a/src/database/abstraction/heed.rs b/src/database/abstraction/heed.rs index 9cca0975..8f577020 100644 --- a/src/database/abstraction/heed.rs +++ b/src/database/abstraction/heed.rs @@ -9,7 +9,7 @@ use std::{ sync::{Arc, Mutex}, }; -use super::{DatabaseEngine, Tree}; +use super::{KeyValueDatabaseEngine, KvTree}; type TupleOfBytes = (Vec, Vec); @@ -30,8 +30,8 @@ fn convert_error(error: heed::Error) -> Error { } } -impl DatabaseEngine for Engine { - fn open(config: &Config) -> Result> { +impl KeyValueDatabaseEngine for Arc { + fn open(config: &Config) -> Result { let mut env_builder = heed::EnvOpenOptions::new(); env_builder.map_size(1024 * 1024 * 1024 * 1024); // 1 Terabyte env_builder.max_readers(126); @@ -49,10 +49,10 @@ impl DatabaseEngine for Engine { })) } - fn open_tree(self: &Arc, name: &'static str) -> Result> { + fn open_tree(&self, name: &'static str) -> Result> { // Creates the db if it doesn't exist already Ok(Arc::new(EngineTree { - engine: Arc::clone(self), + engine: self.clone(), tree: Arc::new( self.env .create_database(Some(name)) @@ -62,7 +62,7 @@ impl DatabaseEngine for Engine { })) } - fn flush(self: &Arc) -> Result<()> { + fn flush(&self) -> Result<()> { self.env.force_sync().map_err(convert_error)?; Ok(()) } @@ -78,17 +78,7 @@ impl EngineTree { let (s, r) = bounded::(100); let engine = Arc::clone(&self.engine); - let lock = self.engine.iter_pool.lock().await; - if lock.active_count() < lock.max_count() { - lock.execute(move || { - iter_from_thread_work(tree, &engine.env.read_txn().unwrap(), from, backwards, &s); - }); - } else { - std::thread::spawn(move || { - iter_from_thread_work(tree, &engine.env.read_txn().unwrap(), from, backwards, &s); - }); - } - + let lock = self.engine.iter_pool.lock(); Box::new(r.into_iter()) } } @@ -123,7 +113,7 @@ fn iter_from_thread_work( } } -impl Tree for EngineTree { +impl KvTree for EngineTree { fn get(&self, key: &[u8]) -> Result>> { let txn = self.engine.env.read_txn().map_err(convert_error)?; Ok(self @@ -143,6 +133,36 @@ impl Tree for EngineTree { Ok(()) } + fn insert_batch<'a>(&self, iter: &mut dyn Iterator, Vec)>) -> Result<()> { + let mut txn = self.engine.env.write_txn().map_err(convert_error)?; + for (key, value) in iter { + self.tree + .put(&mut txn, &key.as_slice(), &value.as_slice()) + .map_err(convert_error)?; + self.watchers.wake(&key); + } + txn.commit().map_err(convert_error)?; + Ok(()) + } + + fn increment_batch<'a>(&self, iter: &mut dyn Iterator>) -> Result<()> { + let mut txn = self.engine.env.write_txn().map_err(convert_error)?; + for key in iter { + let old = self + .tree + .get(&txn, &key.as_slice()) + .map_err(convert_error)?; + let new = crate::utils::increment(old.as_deref()) + .expect("utils::increment always returns Some"); + + self.tree + .put(&mut txn, &key.as_slice(), &&*new) + .map_err(convert_error)?; + } + txn.commit().map_err(convert_error)?; + Ok(()) + } + fn remove(&self, key: &[u8]) -> Result<()> { let mut txn = self.engine.env.write_txn().map_err(convert_error)?; self.tree.delete(&mut txn, &key).map_err(convert_error)?; @@ -150,7 +170,7 @@ impl Tree for EngineTree { Ok(()) } - fn iter<'a>(&'a self) -> Box, Vec)> + Send + 'a> { + fn iter<'a>(&'a self) -> Box, Vec)> + 'a> { self.iter_from(&[], false) } @@ -158,7 +178,7 @@ impl Tree for EngineTree { &self, from: &[u8], backwards: bool, - ) -> Box, Vec)> + Send> { + ) -> Box, Vec)>> { self.iter_from_thread(Arc::clone(&self.tree), from.to_vec(), backwards) } @@ -181,7 +201,7 @@ impl Tree for EngineTree { fn scan_prefix<'a>( &'a self, prefix: Vec, - ) -> Box, Vec)> + Send + 'a> { + ) -> Box, Vec)> + 'a> { Box::new( self.iter_from(&prefix, false) .take_while(move |(key, _)| key.starts_with(&prefix)), diff --git a/src/utils/error.rs b/src/utils/error.rs index 1d811106..4be3fbab 100644 --- a/src/utils/error.rs +++ b/src/utils/error.rs @@ -164,7 +164,7 @@ impl Error { #[cfg(feature = "persy")] Self::PersyError { .. } => db_error, #[cfg(feature = "heed")] - Self::HeedError => db_error, + Self::HeedError { .. } => db_error, #[cfg(feature = "rocksdb")] Self::RocksDbError { .. } => db_error, Self::IoError { .. } => db_error, From 6c8da70122bb595b1453d422a0c039f776be27bd Mon Sep 17 00:00:00 2001 From: Tglman Date: Tue, 22 Aug 2023 01:47:11 +0100 Subject: [PATCH 09/12] feat: first implementation of KeyValue data export --- src/database/abstraction.rs | 11 +++++++++++ src/database/abstraction/persy.rs | 18 +++++++++++++++++- src/database/abstraction/rocksdb.rs | 28 +++++++++++++++++++++++++++- 3 files changed, 55 insertions(+), 2 deletions(-) diff --git a/src/database/abstraction.rs b/src/database/abstraction.rs index a3c6ccfd..ea5fd424 100644 --- a/src/database/abstraction.rs +++ b/src/database/abstraction.rs @@ -41,6 +41,17 @@ pub trait KeyValueDatabaseEngine: Send + Sync { fn memory_usage(&self) -> Result { Ok("Current database engine does not support memory usage reporting.".to_owned()) } + fn clear_caches(&self) {} + + fn export(&self, _exporter: &mut Box) -> Result<()> { + unimplemented!() + } +} + +pub trait KvExport { + fn start_index(&mut self, name: &str) -> Result<()>; + fn key_value(&mut self, key: &[u8], value: &[u8]) -> Result<()>; + fn end_index(&mut self, name: &str) -> Result<()>; } pub trait KvTree: Send + Sync { diff --git a/src/database/abstraction/persy.rs b/src/database/abstraction/persy.rs index da7d4cf0..563607a7 100644 --- a/src/database/abstraction/persy.rs +++ b/src/database/abstraction/persy.rs @@ -1,6 +1,6 @@ use crate::{ database::{ - abstraction::{watchers::Watchers, KeyValueDatabaseEngine, KvTree}, + abstraction::{watchers::Watchers, KeyValueDatabaseEngine, KvExport, KvTree}, Config, }, Result, @@ -45,6 +45,22 @@ impl KeyValueDatabaseEngine for Arc { fn flush(&self) -> Result<()> { Ok(()) } + + fn export(&self, exporter: &mut Box) -> Result<()> { + let snapshot = self.persy.snapshot()?; + let indexes = snapshot.list_indexes()?; + for (index, _) in indexes { + exporter.start_index(&index)?; + let data = snapshot.range::(&index, ..)?; + for (key, values) in data { + for value in values { + exporter.key_value(&key, &value)?; + } + } + exporter.end_index(&index)?; + } + Ok(()) + } } pub struct PersyTree { diff --git a/src/database/abstraction/rocksdb.rs b/src/database/abstraction/rocksdb.rs index cf77e3dd..a7afae5b 100644 --- a/src/database/abstraction/rocksdb.rs +++ b/src/database/abstraction/rocksdb.rs @@ -1,4 +1,4 @@ -use super::{super::Config, watchers::Watchers, KeyValueDatabaseEngine, KvTree}; +use super::{super::Config, watchers::Watchers, KeyValueDatabaseEngine, KvExport, KvTree}; use crate::{utils, Result}; use std::{ future::Future, @@ -11,6 +11,7 @@ pub struct Engine { max_open_files: i32, cache: rocksdb::Cache, old_cfs: Vec, + database_path: String, } pub struct RocksDbEngineTree<'a> { @@ -85,6 +86,7 @@ impl KeyValueDatabaseEngine for Arc { max_open_files: config.rocksdb_max_open_files, cache: rocksdb_cache, old_cfs: cfs, + database_path: config.database_path.clone(), })) } @@ -126,6 +128,30 @@ impl KeyValueDatabaseEngine for Arc { self.cache.get_pinned_usage() as f64 / 1024.0 / 1024.0, )) } + + fn export(&self, exporter: &mut Box) -> Result<()> { + let snapshot = self.rocks.snapshot(); + let indexes = rocksdb::DBWithThreadMode::::list_cf( + &rocksdb::Options::default(), + &self.database_path, + ) + .unwrap(); + for index in indexes { + if let Some(handle) = self.rocks.cf_handle(&index) { + exporter.start_index(&index)?; + let data = snapshot.iterator_cf(&handle, rocksdb::IteratorMode::Start); + for ele in data { + if let Ok((key, value)) = ele { + exporter.key_value(&key, &value)?; + } + } + exporter.end_index(&index)?; + } + } + Ok(()) + } + + fn clear_caches(&self) {} } impl RocksDbEngineTree<'_> { From 5e52438b6fbc0d44fd59d84b1ac6802d6f885faa Mon Sep 17 00:00:00 2001 From: Tglman Date: Sun, 27 Aug 2023 16:32:17 +0100 Subject: [PATCH 10/12] feat: additional implementation of export logic --- src/database/abstraction.rs | 4 ++-- src/database/abstraction/heed.rs | 16 ++++++++++++- src/database/abstraction/persy.rs | 4 ++-- src/database/abstraction/rocksdb.rs | 10 ++++---- src/database/abstraction/sled.rs | 14 +++++++++++ src/database/abstraction/sqlite.rs | 37 ++++++++++++++++++++++++----- 6 files changed, 69 insertions(+), 16 deletions(-) diff --git a/src/database/abstraction.rs b/src/database/abstraction.rs index ea5fd424..ffcf476d 100644 --- a/src/database/abstraction.rs +++ b/src/database/abstraction.rs @@ -49,9 +49,9 @@ pub trait KeyValueDatabaseEngine: Send + Sync { } pub trait KvExport { - fn start_index(&mut self, name: &str) -> Result<()>; + fn start_tree(&mut self, name: &str) -> Result<()>; fn key_value(&mut self, key: &[u8], value: &[u8]) -> Result<()>; - fn end_index(&mut self, name: &str) -> Result<()>; + fn end_tree(&mut self, name: &str) -> Result<()>; } pub trait KvTree: Send + Sync { diff --git a/src/database/abstraction/heed.rs b/src/database/abstraction/heed.rs index 8f577020..ff843d97 100644 --- a/src/database/abstraction/heed.rs +++ b/src/database/abstraction/heed.rs @@ -9,7 +9,7 @@ use std::{ sync::{Arc, Mutex}, }; -use super::{KeyValueDatabaseEngine, KvTree}; +use super::{KeyValueDatabaseEngine, KvExport, KvTree}; type TupleOfBytes = (Vec, Vec); @@ -66,6 +66,20 @@ impl KeyValueDatabaseEngine for Arc { self.env.force_sync().map_err(convert_error)?; Ok(()) } + + fn export(&self, exporter: &mut Box) -> Result<()> { + // Heed do not support snapshots + let trees: Vec = unimplemented!("heed has no way lo list trees"); + for tree_name in &trees { + exporter.start_tree(tree_name)?; + let tree = self.open_tree(tree_name)?; + for (key, value) in tree.iter() { + exporter.key_value(&key, &value)?; + } + exporter.end_tree(&tree_name)?; + } + Ok(()) + } } impl EngineTree { diff --git a/src/database/abstraction/persy.rs b/src/database/abstraction/persy.rs index 563607a7..99cbc061 100644 --- a/src/database/abstraction/persy.rs +++ b/src/database/abstraction/persy.rs @@ -50,14 +50,14 @@ impl KeyValueDatabaseEngine for Arc { let snapshot = self.persy.snapshot()?; let indexes = snapshot.list_indexes()?; for (index, _) in indexes { - exporter.start_index(&index)?; + exporter.start_tree(&index)?; let data = snapshot.range::(&index, ..)?; for (key, values) in data { for value in values { exporter.key_value(&key, &value)?; } } - exporter.end_index(&index)?; + exporter.end_tree(&index)?; } Ok(()) } diff --git a/src/database/abstraction/rocksdb.rs b/src/database/abstraction/rocksdb.rs index a7afae5b..84ccf109 100644 --- a/src/database/abstraction/rocksdb.rs +++ b/src/database/abstraction/rocksdb.rs @@ -131,21 +131,21 @@ impl KeyValueDatabaseEngine for Arc { fn export(&self, exporter: &mut Box) -> Result<()> { let snapshot = self.rocks.snapshot(); - let indexes = rocksdb::DBWithThreadMode::::list_cf( + let column_familes = rocksdb::DBWithThreadMode::::list_cf( &rocksdb::Options::default(), &self.database_path, ) .unwrap(); - for index in indexes { - if let Some(handle) = self.rocks.cf_handle(&index) { - exporter.start_index(&index)?; + for column_family in column_familes { + if let Some(handle) = self.rocks.cf_handle(&column_family) { + exporter.start_tree(&column_family)?; let data = snapshot.iterator_cf(&handle, rocksdb::IteratorMode::Start); for ele in data { if let Ok((key, value)) = ele { exporter.key_value(&key, &value)?; } } - exporter.end_index(&index)?; + exporter.end_tree(&column_family)?; } } Ok(()) diff --git a/src/database/abstraction/sled.rs b/src/database/abstraction/sled.rs index 87defc57..454a4b83 100644 --- a/src/database/abstraction/sled.rs +++ b/src/database/abstraction/sled.rs @@ -27,6 +27,20 @@ impl DatabaseEngine for Engine { fn flush(self: &Arc) -> Result<()> { Ok(()) // noop } + + fn export(&self, exporter: &mut Box) -> Result<()> { + // Sled do not support snapshots + let indexes = self.0.tree_names(); + for index in &indexes { + exporter.start_index(index)?; + let tree = Arc::new(SledEngineTree(self.0.open_tree(name)?)); + for (key, value) in tree.iter() { + exporter.key_value(&key, &value)?; + } + exporter.end_index(&index)?; + } + Ok(()) + } } impl Tree for SledEngineTree { diff --git a/src/database/abstraction/sqlite.rs b/src/database/abstraction/sqlite.rs index b448c3b6..35d07dfe 100644 --- a/src/database/abstraction/sqlite.rs +++ b/src/database/abstraction/sqlite.rs @@ -1,4 +1,4 @@ -use super::{watchers::Watchers, KeyValueDatabaseEngine, KvTree}; +use super::{watchers::Watchers, KeyValueDatabaseEngine, KvExport, KvTree}; use crate::{database::Config, Result}; use parking_lot::{Mutex, MutexGuard}; use rusqlite::{Connection, DatabaseName::Main, OptionalExtension}; @@ -78,6 +78,14 @@ impl Engine { .pragma_update(Some(Main), "wal_checkpoint", "RESTART")?; Ok(()) } + + pub fn open_tree_impl(self: &Arc, name: &str) -> Arc { + Arc::new(SqliteTable { + engine: Arc::clone(self), + name: name.to_owned(), + watchers: Watchers::default(), + }) + } } impl KeyValueDatabaseEngine for Arc { @@ -108,11 +116,7 @@ impl KeyValueDatabaseEngine for Arc { fn open_tree(&self, name: &str) -> Result> { self.write_lock().execute(&format!("CREATE TABLE IF NOT EXISTS {name} ( \"key\" BLOB PRIMARY KEY, \"value\" BLOB NOT NULL )"), [])?; - Ok(Arc::new(SqliteTable { - engine: Arc::clone(self), - name: name.to_owned(), - watchers: Watchers::default(), - })) + Ok(self.open_tree_impl(name)) } fn flush(&self) -> Result<()> { @@ -123,6 +127,27 @@ impl KeyValueDatabaseEngine for Arc { fn cleanup(&self) -> Result<()> { self.flush_wal() } + + fn export(&self, exporter: &mut Box) -> Result<()> { + // TODO: rusqlite do not support snapshot yet, change this when they are supported + let tables: Vec = { + let guard = self.read_lock(); + guard + .prepare("SELECT name FROM sqlite_master WHERE type='table'")? + .query_map([], |row| row.get(0))? + .map(|r| r.unwrap()) + .collect() + }; + for table in &tables { + exporter.start_tree(table)?; + let tree = self.open_tree_impl(table); + for (key, value) in tree.iter() { + exporter.key_value(&key, &value)?; + } + exporter.end_tree(&table)?; + } + Ok(()) + } } pub struct SqliteTable { From 0f1c3222ed027f67f448ee57865a695911136aa1 Mon Sep 17 00:00:00 2001 From: Tglman Date: Sun, 27 Aug 2023 18:56:09 +0100 Subject: [PATCH 11/12] test: add test for exporting and importing data from tree store --- src/database/abstraction.rs | 6 +- src/database/abstraction/heed.rs | 4 +- src/database/abstraction/persy.rs | 4 +- src/database/abstraction/rocksdb.rs | 16 ++-- src/database/abstraction/sqlite.rs | 2 +- src/database/abstraction/tests.rs | 121 +++++++++++++++++++++++++++- 6 files changed, 132 insertions(+), 21 deletions(-) diff --git a/src/database/abstraction.rs b/src/database/abstraction.rs index ffcf476d..5832c944 100644 --- a/src/database/abstraction.rs +++ b/src/database/abstraction.rs @@ -33,7 +33,7 @@ pub trait KeyValueDatabaseEngine: Send + Sync { fn open(config: &Config) -> Result where Self: Sized; - fn open_tree(&self, name: &'static str) -> Result>; + fn open_tree(&self, name: &str) -> Result>; fn flush(&self) -> Result<()>; fn cleanup(&self) -> Result<()> { Ok(()) @@ -43,9 +43,7 @@ pub trait KeyValueDatabaseEngine: Send + Sync { } fn clear_caches(&self) {} - fn export(&self, _exporter: &mut Box) -> Result<()> { - unimplemented!() - } + fn export(&self, exporter: &mut dyn KvExport) -> Result<()>; } pub trait KvExport { diff --git a/src/database/abstraction/heed.rs b/src/database/abstraction/heed.rs index ff843d97..f215f6d5 100644 --- a/src/database/abstraction/heed.rs +++ b/src/database/abstraction/heed.rs @@ -49,7 +49,7 @@ impl KeyValueDatabaseEngine for Arc { })) } - fn open_tree(&self, name: &'static str) -> Result> { + fn open_tree(&self, name: &str) -> Result> { // Creates the db if it doesn't exist already Ok(Arc::new(EngineTree { engine: self.clone(), @@ -67,7 +67,7 @@ impl KeyValueDatabaseEngine for Arc { Ok(()) } - fn export(&self, exporter: &mut Box) -> Result<()> { + fn export(&self, exporter: &mut dyn KvExport) -> Result<()> { // Heed do not support snapshots let trees: Vec = unimplemented!("heed has no way lo list trees"); for tree_name in &trees { diff --git a/src/database/abstraction/persy.rs b/src/database/abstraction/persy.rs index 99cbc061..7730fb07 100644 --- a/src/database/abstraction/persy.rs +++ b/src/database/abstraction/persy.rs @@ -27,7 +27,7 @@ impl KeyValueDatabaseEngine for Arc { Ok(Arc::new(Engine { persy })) } - fn open_tree(&self, name: &'static str) -> Result> { + fn open_tree(&self, name: &str) -> Result> { // Create if it doesn't exist if !self.persy.exists_index(name)? { let mut tx = self.persy.begin()?; @@ -46,7 +46,7 @@ impl KeyValueDatabaseEngine for Arc { Ok(()) } - fn export(&self, exporter: &mut Box) -> Result<()> { + fn export(&self, exporter: &mut dyn KvExport) -> Result<()> { let snapshot = self.persy.snapshot()?; let indexes = snapshot.list_indexes()?; for (index, _) in indexes { diff --git a/src/database/abstraction/rocksdb.rs b/src/database/abstraction/rocksdb.rs index 84ccf109..e441875e 100644 --- a/src/database/abstraction/rocksdb.rs +++ b/src/database/abstraction/rocksdb.rs @@ -14,9 +14,9 @@ pub struct Engine { database_path: String, } -pub struct RocksDbEngineTree<'a> { +pub struct RocksDbEngineTree { db: Arc, - name: &'a str, + name: String, watchers: Watchers, write_lock: RwLock<()>, } @@ -90,7 +90,7 @@ impl KeyValueDatabaseEngine for Arc { })) } - fn open_tree(&self, name: &'static str) -> Result> { + fn open_tree(&self, name: &str) -> Result> { if !self.old_cfs.contains(&name.to_owned()) { // Create if it didn't exist let _ = self @@ -99,7 +99,7 @@ impl KeyValueDatabaseEngine for Arc { } Ok(Arc::new(RocksDbEngineTree { - name, + name: name.to_owned(), db: Arc::clone(self), watchers: Watchers::default(), write_lock: RwLock::new(()), @@ -129,7 +129,7 @@ impl KeyValueDatabaseEngine for Arc { )) } - fn export(&self, exporter: &mut Box) -> Result<()> { + fn export(&self, exporter: &mut dyn KvExport) -> Result<()> { let snapshot = self.rocks.snapshot(); let column_familes = rocksdb::DBWithThreadMode::::list_cf( &rocksdb::Options::default(), @@ -154,13 +154,13 @@ impl KeyValueDatabaseEngine for Arc { fn clear_caches(&self) {} } -impl RocksDbEngineTree<'_> { +impl RocksDbEngineTree { fn cf(&self) -> Arc> { - self.db.rocks.cf_handle(self.name).unwrap() + self.db.rocks.cf_handle(&self.name).unwrap() } } -impl KvTree for RocksDbEngineTree<'_> { +impl KvTree for RocksDbEngineTree { fn get(&self, key: &[u8]) -> Result>> { let readoptions = rocksdb::ReadOptions::default(); diff --git a/src/database/abstraction/sqlite.rs b/src/database/abstraction/sqlite.rs index 35d07dfe..1d3e6cd0 100644 --- a/src/database/abstraction/sqlite.rs +++ b/src/database/abstraction/sqlite.rs @@ -128,7 +128,7 @@ impl KeyValueDatabaseEngine for Arc { self.flush_wal() } - fn export(&self, exporter: &mut Box) -> Result<()> { + fn export(&self, exporter: &mut dyn KvExport) -> Result<()> { // TODO: rusqlite do not support snapshot yet, change this when they are supported let tables: Vec = { let guard = self.read_lock(); diff --git a/src/database/abstraction/tests.rs b/src/database/abstraction/tests.rs index b9a565ea..c901a44b 100644 --- a/src/database/abstraction/tests.rs +++ b/src/database/abstraction/tests.rs @@ -1,5 +1,5 @@ use crate::database::{ - abstraction::{KeyValueDatabaseEngine, KvTree}, + abstraction::{KeyValueDatabaseEngine, KvExport, KvTree}, Config, }; use std::sync::Arc; @@ -17,15 +17,22 @@ database_path = "{}" .unwrap() } -/// Make sure to keep the reference of the tree returned values for -/// the length of the test, to avoid early cleanups that may create test issues -fn open_tree(test_name: &str) -> (Arc, impl KeyValueDatabaseEngine, TempDir) +fn open_instance(test_name: &str) -> (Arc, TempDir) where Arc: KeyValueDatabaseEngine, { let db_folder = Builder::new().prefix(test_name).tempdir().unwrap(); let config = empty_config(db_folder.path().to_str().unwrap()); let instance = Arc::::open(&config).unwrap(); + (instance, db_folder) +} +/// Make sure to keep the reference of the tree returned values for +/// the length of the test, to avoid early cleanups that may create test issues +fn open_tree(test_name: &str) -> (Arc, impl KeyValueDatabaseEngine, TempDir) +where + Arc: KeyValueDatabaseEngine, +{ + let (instance, db_folder) = open_instance(test_name); let tree = instance.open_tree("test").unwrap(); (tree, instance, db_folder) } @@ -226,6 +233,98 @@ where assert_eq!(crate::utils::u64_from_bytes(&read.unwrap()).unwrap(), 2); } +#[derive(Default)] +struct TestBackup { + data: Vec<(String, Vec, Vec)>, + current_tree: String, +} +impl TestBackup { + fn import(&self, store: &Arc) -> crate::Result<()> + where + Arc: KeyValueDatabaseEngine, + { + for (tree, k, v) in &self.data { + let data = store.open_tree(&tree)?; + data.insert(&k, &v)?; + } + Ok(()) + } +} +impl KvExport for TestBackup { + fn start_tree(&mut self, name: &str) -> crate::Result<()> { + self.current_tree = name.to_owned(); + Ok(()) + } + + fn key_value(&mut self, key: &[u8], value: &[u8]) -> crate::Result<()> { + self.data + .push((self.current_tree.clone(), key.to_owned(), value.to_owned())); + Ok(()) + } + + fn end_tree(&mut self, _name: &str) -> crate::Result<()> { + Ok(()) + } +} + +fn insert_data(instance: &Arc, data: &str) +where + Arc: KeyValueDatabaseEngine, +{ + let tree = instance.open_tree(data).unwrap(); + let key = format!("{}", data); + let value = "value".as_bytes(); + tree.insert(key.as_bytes(), value).unwrap(); + let key1 = format!("{}1", data); + let value1 = "value1".as_bytes(); + tree.insert(key1.as_bytes(), value1).unwrap(); + let key2 = format!("{}2", data); + let value2 = "value2".as_bytes(); + tree.insert(key2.as_bytes(), value2).unwrap(); +} + +fn check_data(instance: &Arc, data: &str) +where + Arc: KeyValueDatabaseEngine, +{ + let tree = instance.open_tree(data).unwrap(); + let key = format!("{}", data); + let value = "value".as_bytes(); + let key1 = format!("{}1", data); + let value1 = "value1".as_bytes(); + let key2 = format!("{}2", data); + let value2 = "value2".as_bytes(); + let mut iter = tree.iter(); + assert_eq!( + iter.next(), + Some((key.as_bytes().to_owned(), value.to_owned())) + ); + assert_eq!( + iter.next(), + Some((key1.as_bytes().to_owned(), value1.to_owned())) + ); + assert_eq!( + iter.next(), + Some((key2.as_bytes().to_owned(), value2.to_owned())) + ); + assert_eq!(iter.next(), None); +} + +fn test_export_import(test_name: &str) +where + Arc: KeyValueDatabaseEngine, +{ + let (instance, _db_folder) = open_instance(test_name); + insert_data(&instance, "one"); + insert_data(&instance, "two"); + let mut bk = TestBackup::default(); + instance.export(&mut bk).unwrap(); + let (instance_r, _db_folder) = open_instance(&format!("{}_restore", test_name)); + bk.import(&instance_r).unwrap(); + check_data(&instance_r, "one"); + check_data(&instance_r, "two"); +} + #[cfg(feature = "sqlite")] mod sqlite { @@ -281,6 +380,10 @@ mod sqlite { fn sqlite_increment_batch() { increment_batch::("sqlite_increment_batch") } + #[test] + fn sqlite_export_import() { + test_export_import::("sqlite_export_import") + } } #[cfg(feature = "rocksdb")] @@ -338,6 +441,11 @@ mod rocksdb { fn rocksdb_increment_batch() { increment_batch::("rocksdb_increment_batch") } + + #[test] + fn rocksdb_export_import() { + test_export_import::("rocksdb_export_import") + } } #[cfg(feature = "persy")] mod persy { @@ -394,4 +502,9 @@ mod persy { fn persy_increment_batch() { increment_batch::("persy_increment_batch") } + + #[test] + fn persy_export_import() { + test_export_import::("persy_export_import") + } } From dd75a7ba84b88acc221f0d7df2c0457ba5348b8f Mon Sep 17 00:00:00 2001 From: Tglman Date: Wed, 30 Aug 2023 22:35:04 +0100 Subject: [PATCH 12/12] feat: add implementation to export and import low level data in json --- src/database/abstraction.rs | 2 + .../abstraction/json_stream_export.rs | 77 +++++++++++++++++++ src/database/abstraction/tests.rs | 34 +++++++- 3 files changed, 112 insertions(+), 1 deletion(-) create mode 100644 src/database/abstraction/json_stream_export.rs diff --git a/src/database/abstraction.rs b/src/database/abstraction.rs index 5832c944..4d9da37d 100644 --- a/src/database/abstraction.rs +++ b/src/database/abstraction.rs @@ -26,6 +26,8 @@ pub mod persy; ))] pub mod watchers; +pub mod json_stream_export; + #[cfg(test)] mod tests; diff --git a/src/database/abstraction/json_stream_export.rs b/src/database/abstraction/json_stream_export.rs new file mode 100644 index 00000000..a37f31e6 --- /dev/null +++ b/src/database/abstraction/json_stream_export.rs @@ -0,0 +1,77 @@ +use crate::{ + database::abstraction::{KeyValueDatabaseEngine, KvExport}, + Result, +}; +use base64::{engine::general_purpose::STANDARD, Engine}; +use serde::{Deserialize, Serialize}; +use std::io::{BufRead, BufReader, Read, Write}; + +pub trait KeyValueJsonExporter { + fn export_json_stream(&self, output: &mut dyn Write) -> Result<()>; + fn import_json_stream(&self, input: &mut dyn Read) -> Result<()>; +} + +#[derive(Serialize, Deserialize)] +struct Entry { + tree: String, + key: String, + value: String, +} + +struct JsonExporter<'a> { + current_name: String, + write: &'a mut dyn Write, +} +impl<'a> KvExport for JsonExporter<'a> { + fn start_tree(&mut self, name: &str) -> Result<()> { + self.current_name = name.to_owned(); + Ok(()) + } + + fn key_value(&mut self, key: &[u8], value: &[u8]) -> Result<()> { + let entry = Entry { + tree: self.current_name.clone(), + key: STANDARD.encode(key), + value: STANDARD.encode(value), + }; + writeln!(self.write, "{}", serde_json::to_string(&entry).unwrap())?; + Ok(()) + } + + fn end_tree(&mut self, _name: &str) -> Result<()> { + Ok(()) + } +} + +impl KeyValueJsonExporter for T { + fn export_json_stream(&self, output: &mut dyn Write) -> Result<()> { + self.export(&mut JsonExporter { + current_name: Default::default(), + write: output, + })?; + Ok(()) + } + fn import_json_stream(&self, input: &mut dyn Read) -> Result<()> { + let bf = BufReader::new(input); + //Just a cache to avoid to reopen the tree all the times + let mut cur_tree = None; + for line in bf.lines() { + if let Ok(entry) = serde_json::from_str::(&line?) { + if let (Ok(key), Ok(value)) = + (STANDARD.decode(&entry.key), STANDARD.decode(&entry.value)) + { + let (tree, tree_name) = match cur_tree { + Some((tree, tree_name)) if tree_name == entry.tree => (tree, tree_name), + _ => { + let tree = self.open_tree(&entry.tree)?; + (tree, entry.tree.clone()) + } + }; + tree.insert(&key, &value)?; + cur_tree = Some((tree, tree_name)); + } + } + } + Ok(()) + } +} diff --git a/src/database/abstraction/tests.rs b/src/database/abstraction/tests.rs index c901a44b..da9a6de4 100644 --- a/src/database/abstraction/tests.rs +++ b/src/database/abstraction/tests.rs @@ -1,5 +1,7 @@ use crate::database::{ - abstraction::{KeyValueDatabaseEngine, KvExport, KvTree}, + abstraction::{ + json_stream_export::KeyValueJsonExporter, KeyValueDatabaseEngine, KvExport, KvTree, + }, Config, }; use std::sync::Arc; @@ -325,6 +327,23 @@ where check_data(&instance_r, "two"); } +fn test_export_import_json(test_name: &str) +where + Arc: KeyValueDatabaseEngine, +{ + let (instance, _db_folder) = open_instance(test_name); + insert_data(&instance, "one"); + insert_data(&instance, "two"); + let mut buffer = Vec::new(); + instance.export_json_stream(&mut buffer).unwrap(); + let (instance_r, _db_folder) = open_instance(&format!("{}_restore", test_name)); + instance_r + .import_json_stream(&mut std::io::Cursor::new(buffer)) + .unwrap(); + check_data(&instance_r, "one"); + check_data(&instance_r, "two"); +} + #[cfg(feature = "sqlite")] mod sqlite { @@ -384,6 +403,10 @@ mod sqlite { fn sqlite_export_import() { test_export_import::("sqlite_export_import") } + #[test] + fn sqlite_export_import_json() { + test_export_import_json::("sqlite_export_import_json") + } } #[cfg(feature = "rocksdb")] @@ -446,6 +469,10 @@ mod rocksdb { fn rocksdb_export_import() { test_export_import::("rocksdb_export_import") } + #[test] + fn rocksdb_export_import_json() { + test_export_import_json::("rocksdb_export_import_json") + } } #[cfg(feature = "persy")] mod persy { @@ -507,4 +534,9 @@ mod persy { fn persy_export_import() { test_export_import::("persy_export_import") } + + #[test] + fn persy_export_import_json() { + test_export_import_json::("persy_export_import_json") + } }