From 4556db411dd5b7c4600d9bb3df9b01dd8989519a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Mon, 20 Dec 2021 10:16:22 +0100 Subject: [PATCH] fix: atomic increment --- src/database/abstraction/rocksdb.rs | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/src/database/abstraction/rocksdb.rs b/src/database/abstraction/rocksdb.rs index c9221565..17dbf666 100644 --- a/src/database/abstraction/rocksdb.rs +++ b/src/database/abstraction/rocksdb.rs @@ -16,6 +16,7 @@ pub struct RocksDbEngineTree<'a> { db: Arc, name: &'a str, watchers: RwLock, Vec>>>, + write_lock: RwLock<()> } impl DatabaseEngine for Engine { @@ -77,6 +78,7 @@ impl DatabaseEngine for Engine { name, db: Arc::clone(self), watchers: RwLock::new(HashMap::new()), + write_lock: RwLock::new(()), })) } @@ -120,7 +122,13 @@ impl Tree for RocksDbEngineTree<'_> { } } - Ok(self.db.rocks.put_cf(self.cf(), key, value)?) + let lock = self.write_lock.read().unwrap(); + + let result = self.db.rocks.put_cf(self.cf(), key, value)?; + + drop(lock); + + Ok(result) } fn insert_batch<'a>(&self, iter: &mut dyn Iterator, Vec)>) -> Result<()> { @@ -168,20 +176,27 @@ impl Tree for RocksDbEngineTree<'_> { } fn increment(&self, key: &[u8]) -> Result> { - // TODO: make atomic + let lock = self.write_lock.write().unwrap(); + let old = self.db.rocks.get_cf(self.cf(), &key)?; let new = utils::increment(old.as_deref()).unwrap(); self.db.rocks.put_cf(self.cf(), key, &new)?; + + drop(lock); Ok(new) } fn increment_batch<'a>(&self, iter: &mut dyn Iterator>) -> Result<()> { + let lock = self.write_lock.write().unwrap(); + for key in iter { let old = self.db.rocks.get_cf(self.cf(), &key)?; let new = utils::increment(old.as_deref()).unwrap(); self.db.rocks.put_cf(self.cf(), key, new)?; } + drop(lock); + Ok(()) }