1
0
Fork 0
mirror of https://gitlab.com/famedly/conduit.git synced 2025-08-01 17:38:36 +00:00

feat: additional implementation of export logic

This commit is contained in:
Tglman 2023-08-27 16:32:17 +01:00
parent 6c8da70122
commit 5e52438b6f
6 changed files with 69 additions and 16 deletions

View file

@ -49,9 +49,9 @@ pub trait KeyValueDatabaseEngine: Send + Sync {
} }
pub trait KvExport { pub trait KvExport {
fn start_index(&mut self, name: &str) -> Result<()>; fn start_tree(&mut self, name: &str) -> Result<()>;
fn key_value(&mut self, key: &[u8], value: &[u8]) -> Result<()>; fn key_value(&mut self, key: &[u8], value: &[u8]) -> Result<()>;
fn end_index(&mut self, name: &str) -> Result<()>; fn end_tree(&mut self, name: &str) -> Result<()>;
} }
pub trait KvTree: Send + Sync { pub trait KvTree: Send + Sync {

View file

@ -9,7 +9,7 @@ use std::{
sync::{Arc, Mutex}, sync::{Arc, Mutex},
}; };
use super::{KeyValueDatabaseEngine, KvTree}; use super::{KeyValueDatabaseEngine, KvExport, KvTree};
type TupleOfBytes = (Vec<u8>, Vec<u8>); type TupleOfBytes = (Vec<u8>, Vec<u8>);
@ -66,6 +66,20 @@ impl KeyValueDatabaseEngine for Arc<Engine> {
self.env.force_sync().map_err(convert_error)?; self.env.force_sync().map_err(convert_error)?;
Ok(()) Ok(())
} }
fn export(&self, exporter: &mut Box<dyn KvExport>) -> Result<()> {
// Heed do not support snapshots
let trees: Vec<String> = unimplemented!("heed has no way lo list trees");
for tree_name in &trees {
exporter.start_tree(tree_name)?;
let tree = self.open_tree(tree_name)?;
for (key, value) in tree.iter() {
exporter.key_value(&key, &value)?;
}
exporter.end_tree(&tree_name)?;
}
Ok(())
}
} }
impl EngineTree { impl EngineTree {

View file

@ -50,14 +50,14 @@ impl KeyValueDatabaseEngine for Arc<Engine> {
let snapshot = self.persy.snapshot()?; let snapshot = self.persy.snapshot()?;
let indexes = snapshot.list_indexes()?; let indexes = snapshot.list_indexes()?;
for (index, _) in indexes { for (index, _) in indexes {
exporter.start_index(&index)?; exporter.start_tree(&index)?;
let data = snapshot.range::<ByteVec, ByteVec, _>(&index, ..)?; let data = snapshot.range::<ByteVec, ByteVec, _>(&index, ..)?;
for (key, values) in data { for (key, values) in data {
for value in values { for value in values {
exporter.key_value(&key, &value)?; exporter.key_value(&key, &value)?;
} }
} }
exporter.end_index(&index)?; exporter.end_tree(&index)?;
} }
Ok(()) Ok(())
} }

View file

@ -131,21 +131,21 @@ impl KeyValueDatabaseEngine for Arc<Engine> {
fn export(&self, exporter: &mut Box<dyn KvExport>) -> Result<()> { fn export(&self, exporter: &mut Box<dyn KvExport>) -> Result<()> {
let snapshot = self.rocks.snapshot(); let snapshot = self.rocks.snapshot();
let indexes = rocksdb::DBWithThreadMode::<rocksdb::MultiThreaded>::list_cf( let column_familes = rocksdb::DBWithThreadMode::<rocksdb::MultiThreaded>::list_cf(
&rocksdb::Options::default(), &rocksdb::Options::default(),
&self.database_path, &self.database_path,
) )
.unwrap(); .unwrap();
for index in indexes { for column_family in column_familes {
if let Some(handle) = self.rocks.cf_handle(&index) { if let Some(handle) = self.rocks.cf_handle(&column_family) {
exporter.start_index(&index)?; exporter.start_tree(&column_family)?;
let data = snapshot.iterator_cf(&handle, rocksdb::IteratorMode::Start); let data = snapshot.iterator_cf(&handle, rocksdb::IteratorMode::Start);
for ele in data { for ele in data {
if let Ok((key, value)) = ele { if let Ok((key, value)) = ele {
exporter.key_value(&key, &value)?; exporter.key_value(&key, &value)?;
} }
} }
exporter.end_index(&index)?; exporter.end_tree(&column_family)?;
} }
} }
Ok(()) Ok(())

View file

@ -27,6 +27,20 @@ impl DatabaseEngine for Engine {
fn flush(self: &Arc<Self>) -> Result<()> { fn flush(self: &Arc<Self>) -> Result<()> {
Ok(()) // noop Ok(()) // noop
} }
fn export(&self, exporter: &mut Box<dyn KvExport>) -> Result<()> {
// Sled do not support snapshots
let indexes = self.0.tree_names();
for index in &indexes {
exporter.start_index(index)?;
let tree = Arc::new(SledEngineTree(self.0.open_tree(name)?));
for (key, value) in tree.iter() {
exporter.key_value(&key, &value)?;
}
exporter.end_index(&index)?;
}
Ok(())
}
} }
impl Tree for SledEngineTree { impl Tree for SledEngineTree {

View file

@ -1,4 +1,4 @@
use super::{watchers::Watchers, KeyValueDatabaseEngine, KvTree}; use super::{watchers::Watchers, KeyValueDatabaseEngine, KvExport, KvTree};
use crate::{database::Config, Result}; use crate::{database::Config, Result};
use parking_lot::{Mutex, MutexGuard}; use parking_lot::{Mutex, MutexGuard};
use rusqlite::{Connection, DatabaseName::Main, OptionalExtension}; use rusqlite::{Connection, DatabaseName::Main, OptionalExtension};
@ -78,6 +78,14 @@ impl Engine {
.pragma_update(Some(Main), "wal_checkpoint", "RESTART")?; .pragma_update(Some(Main), "wal_checkpoint", "RESTART")?;
Ok(()) Ok(())
} }
pub fn open_tree_impl(self: &Arc<Self>, name: &str) -> Arc<dyn KvTree> {
Arc::new(SqliteTable {
engine: Arc::clone(self),
name: name.to_owned(),
watchers: Watchers::default(),
})
}
} }
impl KeyValueDatabaseEngine for Arc<Engine> { impl KeyValueDatabaseEngine for Arc<Engine> {
@ -108,11 +116,7 @@ impl KeyValueDatabaseEngine for Arc<Engine> {
fn open_tree(&self, name: &str) -> Result<Arc<dyn KvTree>> { fn open_tree(&self, name: &str) -> Result<Arc<dyn KvTree>> {
self.write_lock().execute(&format!("CREATE TABLE IF NOT EXISTS {name} ( \"key\" BLOB PRIMARY KEY, \"value\" BLOB NOT NULL )"), [])?; self.write_lock().execute(&format!("CREATE TABLE IF NOT EXISTS {name} ( \"key\" BLOB PRIMARY KEY, \"value\" BLOB NOT NULL )"), [])?;
Ok(Arc::new(SqliteTable { Ok(self.open_tree_impl(name))
engine: Arc::clone(self),
name: name.to_owned(),
watchers: Watchers::default(),
}))
} }
fn flush(&self) -> Result<()> { fn flush(&self) -> Result<()> {
@ -123,6 +127,27 @@ impl KeyValueDatabaseEngine for Arc<Engine> {
fn cleanup(&self) -> Result<()> { fn cleanup(&self) -> Result<()> {
self.flush_wal() self.flush_wal()
} }
fn export(&self, exporter: &mut Box<dyn KvExport>) -> Result<()> {
// TODO: rusqlite do not support snapshot yet, change this when they are supported
let tables: Vec<String> = {
let guard = self.read_lock();
guard
.prepare("SELECT name FROM sqlite_master WHERE type='table'")?
.query_map([], |row| row.get(0))?
.map(|r| r.unwrap())
.collect()
};
for table in &tables {
exporter.start_tree(table)?;
let tree = self.open_tree_impl(table);
for (key, value) in tree.iter() {
exporter.key_value(&key, &value)?;
}
exporter.end_tree(&table)?;
}
Ok(())
}
} }
pub struct SqliteTable { pub struct SqliteTable {