1
0
Fork 0
mirror of https://gitlab.com/famedly/conduit.git synced 2025-07-22 17:18:35 +00:00

remove eldrich being and install good being

This commit is contained in:
Jonathan de Jong 2021-07-04 01:18:06 +02:00
parent 9df86c2c1e
commit 14e6afc45e
34 changed files with 371 additions and 315 deletions

View file

@ -3,7 +3,7 @@ use std::{
ops::Deref,
path::{Path, PathBuf},
pin::Pin,
sync::{Arc, Weak},
sync::Arc,
thread,
time::{Duration, Instant},
};
@ -36,7 +36,6 @@ use tokio::sync::oneshot::Sender;
struct Pool {
writer: Mutex<Connection>,
readers: Vec<Mutex<Connection>>,
reader_rwlock: RwLock<()>,
path: PathBuf,
}
@ -71,7 +70,6 @@ impl Pool {
Ok(Self {
writer,
readers,
reader_rwlock: RwLock::new(()),
path: path.as_ref().to_path_buf(),
})
}
@ -95,16 +93,12 @@ impl Pool {
}
fn read_lock(&self) -> HoldingConn<'_> {
let _guard = self.reader_rwlock.read();
for r in &self.readers {
if let Some(reader) = r.try_lock() {
return HoldingConn::FromGuard(reader);
}
}
drop(_guard);
log::warn!("all readers locked, creating spillover reader...");
let spilled = Self::prepare_conn(&self.path).unwrap();
@ -115,7 +109,6 @@ impl Pool {
pub struct SqliteEngine {
pool: Pool,
iterator_lock: RwLock<()>,
}
impl DatabaseEngine for SqliteEngine {
@ -128,36 +121,7 @@ impl DatabaseEngine for SqliteEngine {
pool.write_lock()
.execute("CREATE TABLE IF NOT EXISTS _noop (\"key\" INT)", params![])?;
let arc = Arc::new(SqliteEngine {
pool,
iterator_lock: RwLock::new(()),
});
let weak: Weak<SqliteEngine> = Arc::downgrade(&arc);
thread::spawn(move || {
let r = crossbeam::channel::tick(Duration::from_secs(60));
let weak = weak;
loop {
let _ = r.recv();
if let Some(arc) = Weak::upgrade(&weak) {
log::warn!("wal-trunc: locking...");
let iterator_guard = arc.iterator_lock.write();
let read_guard = arc.pool.reader_rwlock.write();
log::warn!("wal-trunc: locked, flushing...");
let start = Instant::now();
arc.flush_wal().unwrap();
log::warn!("wal-trunc: locked, flushed in {:?}", start.elapsed());
drop(read_guard);
drop(iterator_guard);
} else {
break;
}
}
});
let arc = Arc::new(SqliteEngine { pool });
Ok(arc)
}
@ -190,7 +154,7 @@ impl DatabaseEngine for SqliteEngine {
}
impl SqliteEngine {
fn flush_wal(self: &Arc<Self>) -> Result<()> {
pub fn flush_wal(self: &Arc<Self>) -> Result<()> {
self.pool
.write_lock()
.execute_batch(
@ -244,9 +208,7 @@ impl SqliteTable {
let engine = self.engine.clone();
thread::spawn(move || {
let guard = engine.iterator_lock.read();
let _ = f(&engine.pool.read_lock(), s);
drop(guard);
});
Box::new(r.into_iter())