mirror of
https://gitlab.com/famedly/conduit.git
synced 2025-07-22 17:18:35 +00:00
refactor: work on auth chain and state compressor
This commit is contained in:
parent
ada1251a52
commit
5108ce52c2
6 changed files with 152 additions and 108 deletions
24
src/database/key_value/rooms/auth_chain.rs
Normal file
24
src/database/key_value/rooms/auth_chain.rs
Normal file
|
@ -0,0 +1,24 @@
|
||||||
|
impl service::room::auth_chain::Data for KeyValueDatabase {
|
||||||
|
fn get_cached_eventid_authchain<'a>() -> Result<HashSet<u64>> {
|
||||||
|
self.shorteventid_authchain
|
||||||
|
.get(&shorteventid.to_be_bytes())?
|
||||||
|
.map(|chain| {
|
||||||
|
chain
|
||||||
|
.chunks_exact(size_of::<u64>())
|
||||||
|
.map(|chunk| {
|
||||||
|
utils::u64_from_bytes(chunk).expect("byte length is correct")
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn cache_eventid_authchain<'a>(shorteventid: u64, auth_chain: &HashSet<u64>) -> Result<()> {
|
||||||
|
shorteventid_authchain.insert(
|
||||||
|
&shorteventid.to_be_bytes(),
|
||||||
|
&auth_chain
|
||||||
|
.iter()
|
||||||
|
.flat_map(|s| s.to_be_bytes().to_vec())
|
||||||
|
.collect::<Vec<u8>>(),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
48
src/database/key_value/rooms/state_compressor.rs
Normal file
48
src/database/key_value/rooms/state_compressor.rs
Normal file
|
@ -0,0 +1,48 @@
|
||||||
|
impl service::room::state_compressor::Data for KeyValueDatabase {
|
||||||
|
fn get_statediff(shortstatehash: u64) -> Result<StateDiff> {
|
||||||
|
let value = self
|
||||||
|
.shortstatehash_statediff
|
||||||
|
.get(&shortstatehash.to_be_bytes())?
|
||||||
|
.ok_or_else(|| Error::bad_database("State hash does not exist"))?;
|
||||||
|
let parent =
|
||||||
|
utils::u64_from_bytes(&value[0..size_of::<u64>()]).expect("bytes have right length");
|
||||||
|
|
||||||
|
let mut add_mode = true;
|
||||||
|
let mut added = HashSet::new();
|
||||||
|
let mut removed = HashSet::new();
|
||||||
|
|
||||||
|
let mut i = size_of::<u64>();
|
||||||
|
while let Some(v) = value.get(i..i + 2 * size_of::<u64>()) {
|
||||||
|
if add_mode && v.starts_with(&0_u64.to_be_bytes()) {
|
||||||
|
add_mode = false;
|
||||||
|
i += size_of::<u64>();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if add_mode {
|
||||||
|
added.insert(v.try_into().expect("we checked the size above"));
|
||||||
|
} else {
|
||||||
|
removed.insert(v.try_into().expect("we checked the size above"));
|
||||||
|
}
|
||||||
|
i += 2 * size_of::<u64>();
|
||||||
|
}
|
||||||
|
|
||||||
|
StateDiff { parent, added, removed }
|
||||||
|
}
|
||||||
|
|
||||||
|
fn save_statediff(shortstatehash: u64, diff: StateDiff) -> Result<()> {
|
||||||
|
let mut value = diff.parent.to_be_bytes().to_vec();
|
||||||
|
for new in &diff.new {
|
||||||
|
value.extend_from_slice(&new[..]);
|
||||||
|
}
|
||||||
|
|
||||||
|
if !diff.removed.is_empty() {
|
||||||
|
value.extend_from_slice(&0_u64.to_be_bytes());
|
||||||
|
for removed in &diff.removed {
|
||||||
|
value.extend_from_slice(&removed[..]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
self.shortstatehash_statediff
|
||||||
|
.insert(&shortstatehash.to_be_bytes(), &value)?;
|
||||||
|
}
|
||||||
|
}
|
4
src/service/rooms/auth_chain/data.rs
Normal file
4
src/service/rooms/auth_chain/data.rs
Normal file
|
@ -0,0 +1,4 @@
|
||||||
|
pub trait Data {
|
||||||
|
fn get_cached_eventid_authchain<'a>() -> Result<HashSet<u64>>;
|
||||||
|
fn cache_eventid_authchain<'a>(shorteventid: u64, auth_chain: &HashSet<u64>) -> Result<HashSet<u64>>;
|
||||||
|
}
|
53
src/service/rooms/auth_chain/mod.rs
Normal file
53
src/service/rooms/auth_chain/mod.rs
Normal file
|
@ -0,0 +1,53 @@
|
||||||
|
mod data;
|
||||||
|
pub use data::Data;
|
||||||
|
|
||||||
|
use crate::service::*;
|
||||||
|
|
||||||
|
pub struct Service<D: Data> {
|
||||||
|
db: D,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Service<_> {
|
||||||
|
#[tracing::instrument(skip(self))]
|
||||||
|
pub fn get_cached_eventid_authchain<'a>(
|
||||||
|
&'a self,
|
||||||
|
key: &[u64],
|
||||||
|
) -> Result<Option<Arc<HashSet<u64>>>> {
|
||||||
|
// Check RAM cache
|
||||||
|
if let Some(result) = self.auth_chain_cache.lock().unwrap().get_mut(key.to_be_bytes()) {
|
||||||
|
return Ok(Some(Arc::clone(result)));
|
||||||
|
}
|
||||||
|
|
||||||
|
// We only save auth chains for single events in the db
|
||||||
|
if key.len == 1 {
|
||||||
|
// Check DB cache
|
||||||
|
if let Some(chain) = self.db.get_cached_eventid_authchain(key[0])
|
||||||
|
{
|
||||||
|
let chain = Arc::new(chain);
|
||||||
|
|
||||||
|
// Cache in RAM
|
||||||
|
self.auth_chain_cache
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.insert(vec![key[0]], Arc::clone(&chain));
|
||||||
|
|
||||||
|
return Ok(Some(chain));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(skip(self))]
|
||||||
|
pub fn cache_auth_chain(&self, key: Vec<u64>, auth_chain: Arc<HashSet<u64>>) -> Result<()> {
|
||||||
|
// Only persist single events in db
|
||||||
|
if key.len() == 1 {
|
||||||
|
self.db.cache_auth_chain(key[0], auth_chain)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cache in RAM
|
||||||
|
self.auth_chain_cache.lock().unwrap().insert(key, auth_chain);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
10
src/service/rooms/state_compressor/data.rs
Normal file
10
src/service/rooms/state_compressor/data.rs
Normal file
|
@ -0,0 +1,10 @@
|
||||||
|
struct StateDiff {
|
||||||
|
parent: Option<u64>,
|
||||||
|
added: Vec<CompressedStateEvent>,
|
||||||
|
removed: Vec<CompressedStateEvent>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait Data {
|
||||||
|
fn get_statediff(shortstatehash: u64) -> Result<StateDiff>;
|
||||||
|
fn save_statediff(shortstatehash: u64, diff: StateDiff) -> Result<()>;
|
||||||
|
}
|
|
@ -1,4 +1,13 @@
|
||||||
|
mod data;
|
||||||
|
pub use data::Data;
|
||||||
|
|
||||||
|
use crate::service::*;
|
||||||
|
|
||||||
|
pub struct Service<D: Data> {
|
||||||
|
db: D,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Service<_> {
|
||||||
/// Returns a stack with info on shortstatehash, full state, added diff and removed diff for the selected shortstatehash and each parent layer.
|
/// Returns a stack with info on shortstatehash, full state, added diff and removed diff for the selected shortstatehash and each parent layer.
|
||||||
#[tracing::instrument(skip(self))]
|
#[tracing::instrument(skip(self))]
|
||||||
pub fn load_shortstatehash_info(
|
pub fn load_shortstatehash_info(
|
||||||
|
@ -21,31 +30,7 @@
|
||||||
return Ok(r.clone());
|
return Ok(r.clone());
|
||||||
}
|
}
|
||||||
|
|
||||||
let value = self
|
self.db.get_statediff(shortstatehash)?;
|
||||||
.shortstatehash_statediff
|
|
||||||
.get(&shortstatehash.to_be_bytes())?
|
|
||||||
.ok_or_else(|| Error::bad_database("State hash does not exist"))?;
|
|
||||||
let parent =
|
|
||||||
utils::u64_from_bytes(&value[0..size_of::<u64>()]).expect("bytes have right length");
|
|
||||||
|
|
||||||
let mut add_mode = true;
|
|
||||||
let mut added = HashSet::new();
|
|
||||||
let mut removed = HashSet::new();
|
|
||||||
|
|
||||||
let mut i = size_of::<u64>();
|
|
||||||
while let Some(v) = value.get(i..i + 2 * size_of::<u64>()) {
|
|
||||||
if add_mode && v.starts_with(&0_u64.to_be_bytes()) {
|
|
||||||
add_mode = false;
|
|
||||||
i += size_of::<u64>();
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if add_mode {
|
|
||||||
added.insert(v.try_into().expect("we checked the size above"));
|
|
||||||
} else {
|
|
||||||
removed.insert(v.try_into().expect("we checked the size above"));
|
|
||||||
}
|
|
||||||
i += 2 * size_of::<u64>();
|
|
||||||
}
|
|
||||||
|
|
||||||
if parent != 0_u64 {
|
if parent != 0_u64 {
|
||||||
let mut response = self.load_shortstatehash_info(parent)?;
|
let mut response = self.load_shortstatehash_info(parent)?;
|
||||||
|
@ -170,17 +155,7 @@
|
||||||
|
|
||||||
if parent_states.is_empty() {
|
if parent_states.is_empty() {
|
||||||
// There is no parent layer, create a new state
|
// There is no parent layer, create a new state
|
||||||
let mut value = 0_u64.to_be_bytes().to_vec(); // 0 means no parent
|
self.db.save_statediff(shortstatehash, StateDiff { parent: 0, new: statediffnew, removed: statediffremoved })?;
|
||||||
for new in &statediffnew {
|
|
||||||
value.extend_from_slice(&new[..]);
|
|
||||||
}
|
|
||||||
|
|
||||||
if !statediffremoved.is_empty() {
|
|
||||||
warn!("Tried to create new state with removals");
|
|
||||||
}
|
|
||||||
|
|
||||||
self.shortstatehash_statediff
|
|
||||||
.insert(&shortstatehash.to_be_bytes(), &value)?;
|
|
||||||
|
|
||||||
return Ok(());
|
return Ok(());
|
||||||
};
|
};
|
||||||
|
@ -222,20 +197,7 @@
|
||||||
)?;
|
)?;
|
||||||
} else {
|
} else {
|
||||||
// Diff small enough, we add diff as layer on top of parent
|
// Diff small enough, we add diff as layer on top of parent
|
||||||
let mut value = parent.0.to_be_bytes().to_vec();
|
self.db.save_statediff(shortstatehash, StateDiff { parent: parent.0, new: statediffnew, removed: statediffremoved })?;
|
||||||
for new in &statediffnew {
|
|
||||||
value.extend_from_slice(&new[..]);
|
|
||||||
}
|
|
||||||
|
|
||||||
if !statediffremoved.is_empty() {
|
|
||||||
value.extend_from_slice(&0_u64.to_be_bytes());
|
|
||||||
for removed in &statediffremoved {
|
|
||||||
value.extend_from_slice(&removed[..]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
self.shortstatehash_statediff
|
|
||||||
.insert(&shortstatehash.to_be_bytes(), &value)?;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -298,61 +260,4 @@
|
||||||
|
|
||||||
Ok((new_shortstatehash, statediffnew, statediffremoved))
|
Ok((new_shortstatehash, statediffnew, statediffremoved))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub fn get_auth_chain_from_cache<'a>(
|
|
||||||
&'a self,
|
|
||||||
key: &[u64],
|
|
||||||
) -> Result<Option<Arc<HashSet<u64>>>> {
|
|
||||||
// Check RAM cache
|
|
||||||
if let Some(result) = self.auth_chain_cache.lock().unwrap().get_mut(key) {
|
|
||||||
return Ok(Some(Arc::clone(result)));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check DB cache
|
|
||||||
if key.len() == 1 {
|
|
||||||
if let Some(chain) =
|
|
||||||
self.shorteventid_authchain
|
|
||||||
.get(&key[0].to_be_bytes())?
|
|
||||||
.map(|chain| {
|
|
||||||
chain
|
|
||||||
.chunks_exact(size_of::<u64>())
|
|
||||||
.map(|chunk| {
|
|
||||||
utils::u64_from_bytes(chunk).expect("byte length is correct")
|
|
||||||
})
|
|
||||||
.collect()
|
|
||||||
})
|
|
||||||
{
|
|
||||||
let chain = Arc::new(chain);
|
|
||||||
|
|
||||||
// Cache in RAM
|
|
||||||
self.auth_chain_cache
|
|
||||||
.lock()
|
|
||||||
.unwrap()
|
|
||||||
.insert(vec![key[0]], Arc::clone(&chain));
|
|
||||||
|
|
||||||
return Ok(Some(chain));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(None)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub fn cache_auth_chain(&self, key: Vec<u64>, chain: Arc<HashSet<u64>>) -> Result<()> {
|
|
||||||
// Persist in db
|
|
||||||
if key.len() == 1 {
|
|
||||||
self.shorteventid_authchain.insert(
|
|
||||||
&key[0].to_be_bytes(),
|
|
||||||
&chain
|
|
||||||
.iter()
|
|
||||||
.flat_map(|s| s.to_be_bytes().to_vec())
|
|
||||||
.collect::<Vec<u8>>(),
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Cache in RAM
|
|
||||||
self.auth_chain_cache.lock().unwrap().insert(key, chain);
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue