From 0c23874194101f2ce2e812fb258025ce66e3e124 Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Wed, 7 Jul 2021 20:36:41 +0200 Subject: [PATCH] add config and optimise --- Cargo.lock | 1 + Cargo.toml | 4 +- src/database.rs | 81 ++++++++++++++++++++++++++++++ src/database/abstraction/sqlite.rs | 34 +++++++++---- src/main.rs | 42 +--------------- 5 files changed, 109 insertions(+), 53 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4458d710..0d73e5e2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1178,6 +1178,7 @@ version = "0.22.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290b64917f8b0cb885d9de0f9959fe1f775d7fa12f1da2db9001c1c8ab60f89d" dependencies = [ + "cc", "pkg-config", "vcpkg", ] diff --git a/Cargo.toml b/Cargo.toml index e7bb3b8d..7edf6417 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -74,7 +74,7 @@ tracing-opentelemetry = "0.11.0" opentelemetry-jaeger = "0.11.0" pretty_env_logger = "0.4.0" lru-cache = "0.1.2" -rusqlite = { version = "0.25.3", optional = true } +rusqlite = { version = "0.25.3", optional = true, features = ["bundled"] } parking_lot = { version = "0.11.1", optional = true } crossbeam = { version = "0.8.1", optional = true } num_cpus = { version = "1.13.0", optional = true } @@ -84,7 +84,7 @@ default = ["conduit_bin", "backend_sqlite"] backend_sled = ["sled"] backend_rocksdb = ["rocksdb"] backend_sqlite = ["sqlite"] -sqlite = ["rusqlite", "parking_lot", "crossbeam", "num_cpus"] +sqlite = ["rusqlite", "parking_lot", "crossbeam", "num_cpus", "tokio/signal"] conduit_bin = [] # TODO: add rocket to this when it is optional [[bin]] diff --git a/src/database.rs b/src/database.rs index bd582990..e5bb8656 100644 --- a/src/database.rs +++ b/src/database.rs @@ -44,6 +44,16 @@ pub struct Config { database_path: String, #[serde(default = "default_cache_capacity")] cache_capacity: u32, + #[serde(default = "default_sqlite_cache_kib")] + sqlite_cache_kib: u32, + #[serde(default = "default_sqlite_read_pool_size")] + sqlite_read_pool_size: usize, + #[serde(default = "false_fn")] + sqlite_wal_clean_timer: bool, + #[serde(default = "default_sqlite_wal_clean_second_interval")] + sqlite_wal_clean_second_interval: u32, + #[serde(default = "default_sqlite_wal_clean_second_timeout")] + sqlite_wal_clean_second_timeout: u32, #[serde(default = "default_max_request_size")] max_request_size: u32, #[serde(default = "default_max_concurrent_requests")] @@ -77,6 +87,22 @@ fn default_cache_capacity() -> u32 { 1024 * 1024 * 1024 } +fn default_sqlite_cache_kib() -> u32 { + 2000 +} + +fn default_sqlite_read_pool_size() -> usize { + num_cpus::get().max(1) +} + +fn default_sqlite_wal_clean_second_interval() -> u32 { + 60 +} + +fn default_sqlite_wal_clean_second_timeout() -> u32 { + 2 +} + fn default_max_request_size() -> u32 { 20 * 1024 * 1024 // Default to 20 MB } @@ -451,6 +477,61 @@ impl Database { pub fn flush_wal(&self) -> Result<()> { self._db.flush_wal() } + + #[cfg(feature = "sqlite")] + pub async fn start_wal_clean_task(lock: &Arc>, config: &Config) { + use tokio::{ + signal::unix::{signal, SignalKind}, + time::{interval, timeout}, + }; + + use std::{ + sync::Weak, + time::{Duration, Instant}, + }; + + let weak: Weak> = Arc::downgrade(&lock); + + let lock_timeout = Duration::from_secs(config.sqlite_wal_clean_second_timeout as u64); + let timer_interval = Duration::from_secs(config.sqlite_wal_clean_second_interval as u64); + let do_timer = config.sqlite_wal_clean_timer; + + tokio::spawn(async move { + let mut i = interval(timer_interval); + let mut s = signal(SignalKind::hangup()).unwrap(); + + loop { + if do_timer { + i.tick().await; + log::info!(target: "wal-trunc", "Timer ticked") + } else { + s.recv().await; + log::info!(target: "wal-trunc", "Received SIGHUP") + } + + if let Some(arc) = Weak::upgrade(&weak) { + log::info!(target: "wal-trunc", "Locking..."); + let guard = { + if let Ok(guard) = timeout(lock_timeout, arc.write()).await { + guard + } else { + log::info!(target: "wal-trunc", "Lock failed in timeout, canceled."); + continue; + } + }; + log::info!(target: "wal-trunc", "Locked, flushing..."); + let start = Instant::now(); + if let Err(e) = guard.flush_wal() { + log::error!(target: "wal-trunc", "Errored: {}", e); + } else { + log::info!(target: "wal-trunc", "Flushed in {:?}", start.elapsed()); + } + } else { + break; + } + } + }); + } } pub struct ReadGuard(OwnedRwLockReadGuard); diff --git a/src/database/abstraction/sqlite.rs b/src/database/abstraction/sqlite.rs index 164d9858..7e5490cf 100644 --- a/src/database/abstraction/sqlite.rs +++ b/src/database/abstraction/sqlite.rs @@ -36,6 +36,7 @@ use tokio::sync::oneshot::Sender; struct Pool { writer: Mutex, readers: Vec>, + spill_tracker: Arc<()>, path: PathBuf, } @@ -43,7 +44,7 @@ pub const MILLI: Duration = Duration::from_millis(1); enum HoldingConn<'a> { FromGuard(MutexGuard<'a, Connection>), - FromOwned(Connection), + FromOwned(Connection, Arc<()>), } impl<'a> Deref for HoldingConn<'a> { @@ -52,29 +53,30 @@ impl<'a> Deref for HoldingConn<'a> { fn deref(&self) -> &Self::Target { match self { HoldingConn::FromGuard(guard) => guard.deref(), - HoldingConn::FromOwned(conn) => conn, + HoldingConn::FromOwned(conn, _) => conn, } } } impl Pool { - fn new>(path: P, num_readers: usize) -> Result { - let writer = Mutex::new(Self::prepare_conn(&path)?); + fn new>(path: P, num_readers: usize, cache_size: u32) -> Result { + let writer = Mutex::new(Self::prepare_conn(&path, Some(cache_size))?); let mut readers = Vec::new(); for _ in 0..num_readers { - readers.push(Mutex::new(Self::prepare_conn(&path)?)) + readers.push(Mutex::new(Self::prepare_conn(&path, Some(cache_size))?)) } Ok(Self { writer, readers, + spill_tracker: Arc::new(()), path: path.as_ref().to_path_buf(), }) } - fn prepare_conn>(path: P) -> Result { + fn prepare_conn>(path: P, cache_size: Option) -> Result { let conn = Connection::open(path)?; conn.pragma_update(Some(Main), "journal_mode", &"WAL".to_owned())?; @@ -85,6 +87,10 @@ impl Pool { conn.pragma_update(Some(Main), "synchronous", &"OFF".to_owned())?; + if let Some(cache_kib) = cache_size { + conn.pragma_update(Some(Main), "cache_size", &(-Into::::into(cache_kib)))?; + } + Ok(conn) } @@ -99,11 +105,18 @@ impl Pool { } } - log::warn!("all readers locked, creating spillover reader..."); + let spill_arc = self.spill_tracker.clone(); + let now_count = Arc::strong_count(&spill_arc) - 1 /* because one is held by the pool */; - let spilled = Self::prepare_conn(&self.path).unwrap(); + log::warn!("read_lock: all readers locked, creating spillover reader..."); - return HoldingConn::FromOwned(spilled); + if now_count > 1 { + log::warn!("read_lock: now {} spillover readers exist", now_count); + } + + let spilled = Self::prepare_conn(&self.path, None).unwrap(); + + return HoldingConn::FromOwned(spilled, spill_arc); } } @@ -115,7 +128,8 @@ impl DatabaseEngine for SqliteEngine { fn open(config: &Config) -> Result> { let pool = Pool::new( format!("{}/conduit.db", &config.database_path), - num_cpus::get(), + config.sqlite_read_pool_size, + config.sqlite_cache_kib, )?; pool.write_lock() diff --git a/src/main.rs b/src/main.rs index 5c5ea847..22c44b54 100644 --- a/src/main.rs +++ b/src/main.rs @@ -203,47 +203,7 @@ async fn main() { .expect("config is valid"); #[cfg(feature = "sqlite")] - { - use tokio::time::{interval, timeout}; - - use std::{ - sync::Weak, - time::{Duration, Instant}, - }; - - let weak: Weak> = Arc::downgrade(&db); - - tokio::spawn(async { - let weak = weak; - - let mut i = interval(Duration::from_secs(60)); - - loop { - i.tick().await; - - if let Some(arc) = Weak::upgrade(&weak) { - log::warn!("wal-trunc: locking..."); - let guard = { - if let Ok(guard) = timeout(Duration::from_secs(5), arc.write()).await { - guard - } else { - log::warn!("wal-trunc: lock failed in timeout, canceled."); - continue; - } - }; - log::warn!("wal-trunc: locked, flushing..."); - let start = Instant::now(); - if let Err(e) = guard.flush_wal() { - log::warn!("wal-trunc: errored: {}", e); - } else { - log::warn!("wal-trunc: flushed in {:?}", start.elapsed()); - } - } else { - break; - } - } - }); - } + Database::start_wal_clean_task(&db, &config).await; if config.allow_jaeger { let (tracer, _uninstall) = opentelemetry_jaeger::new_pipeline()