mirror of
https://gitlab.com/famedly/conduit.git
synced 2025-06-27 16:35:59 +00:00
implement sync rotation
This commit is contained in:
parent
bcfea98457
commit
7e9014d5c9
3 changed files with 37 additions and 5 deletions
|
@ -456,6 +456,8 @@ impl Database {
|
||||||
.watch_prefix(&userid_bytes),
|
.watch_prefix(&userid_bytes),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
futures.push(Box::pin(self.globals.rotate.watch()));
|
||||||
|
|
||||||
// Wait until one of them finds something
|
// Wait until one of them finds something
|
||||||
futures.next().await;
|
futures.next().await;
|
||||||
}
|
}
|
||||||
|
@ -509,6 +511,11 @@ impl Database {
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Some(arc) = Weak::upgrade(&weak) {
|
if let Some(arc) = Weak::upgrade(&weak) {
|
||||||
|
log::info!(target: "wal-trunc", "Rotating sync helpers...");
|
||||||
|
// This actually creates a very small race condition between firing this and trying to acquire the subsequent write lock.
|
||||||
|
// Though it is not a huge deal if the write lock doesn't "catch", as it'll harmlessly time out.
|
||||||
|
arc.read().await.globals.rotate.fire();
|
||||||
|
|
||||||
log::info!(target: "wal-trunc", "Locking...");
|
log::info!(target: "wal-trunc", "Locking...");
|
||||||
let guard = {
|
let guard = {
|
||||||
if let Ok(guard) = timeout(lock_timeout, arc.write()).await {
|
if let Ok(guard) = timeout(lock_timeout, arc.write()).await {
|
||||||
|
|
|
@ -16,7 +16,7 @@ use super::{DatabaseEngine, Tree};
|
||||||
use log::debug;
|
use log::debug;
|
||||||
|
|
||||||
use crossbeam::channel::{bounded, Sender as ChannelSender};
|
use crossbeam::channel::{bounded, Sender as ChannelSender};
|
||||||
use parking_lot::{Mutex, MutexGuard, RwLock};
|
use parking_lot::{FairMutex, FairMutexGuard, Mutex, MutexGuard, RwLock};
|
||||||
use rusqlite::{params, Connection, DatabaseName::Main, OptionalExtension};
|
use rusqlite::{params, Connection, DatabaseName::Main, OptionalExtension};
|
||||||
|
|
||||||
use tokio::sync::oneshot::Sender;
|
use tokio::sync::oneshot::Sender;
|
||||||
|
@ -33,7 +33,7 @@ use tokio::sync::oneshot::Sender;
|
||||||
// "SELECT key, value FROM {} WHERE key <= ? ORDER BY DESC";
|
// "SELECT key, value FROM {} WHERE key <= ? ORDER BY DESC";
|
||||||
|
|
||||||
struct Pool {
|
struct Pool {
|
||||||
writer: Mutex<Connection>,
|
writer: FairMutex<Connection>,
|
||||||
readers: Vec<Mutex<Connection>>,
|
readers: Vec<Mutex<Connection>>,
|
||||||
spill_tracker: Arc<()>,
|
spill_tracker: Arc<()>,
|
||||||
path: PathBuf,
|
path: PathBuf,
|
||||||
|
@ -59,7 +59,7 @@ impl<'a> Deref for HoldingConn<'a> {
|
||||||
|
|
||||||
impl Pool {
|
impl Pool {
|
||||||
fn new<P: AsRef<Path>>(path: P, num_readers: usize, cache_size: u32) -> Result<Self> {
|
fn new<P: AsRef<Path>>(path: P, num_readers: usize, cache_size: u32) -> Result<Self> {
|
||||||
let writer = Mutex::new(Self::prepare_conn(&path, Some(cache_size))?);
|
let writer = FairMutex::new(Self::prepare_conn(&path, Some(cache_size))?);
|
||||||
|
|
||||||
let mut readers = Vec::new();
|
let mut readers = Vec::new();
|
||||||
|
|
||||||
|
@ -93,7 +93,7 @@ impl Pool {
|
||||||
Ok(conn)
|
Ok(conn)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn write_lock(&self) -> MutexGuard<'_, Connection> {
|
fn write_lock(&self) -> FairMutexGuard<'_, Connection> {
|
||||||
self.writer.lock()
|
self.writer.lock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -11,11 +11,12 @@ use rustls::{ServerCertVerifier, WebPKIVerifier};
|
||||||
use std::{
|
use std::{
|
||||||
collections::{BTreeMap, HashMap},
|
collections::{BTreeMap, HashMap},
|
||||||
fs,
|
fs,
|
||||||
|
future::Future,
|
||||||
path::PathBuf,
|
path::PathBuf,
|
||||||
sync::{Arc, RwLock},
|
sync::{Arc, RwLock},
|
||||||
time::{Duration, Instant},
|
time::{Duration, Instant},
|
||||||
};
|
};
|
||||||
use tokio::sync::Semaphore;
|
use tokio::sync::{broadcast, Semaphore};
|
||||||
use trust_dns_resolver::TokioAsyncResolver;
|
use trust_dns_resolver::TokioAsyncResolver;
|
||||||
|
|
||||||
use super::abstraction::Tree;
|
use super::abstraction::Tree;
|
||||||
|
@ -47,6 +48,7 @@ pub struct Globals {
|
||||||
), // since, rx
|
), // since, rx
|
||||||
>,
|
>,
|
||||||
>,
|
>,
|
||||||
|
pub rotate: RotationHandler,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct MatrixServerVerifier {
|
struct MatrixServerVerifier {
|
||||||
|
@ -82,6 +84,28 @@ impl ServerCertVerifier for MatrixServerVerifier {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub struct RotationHandler(broadcast::Sender<()>, broadcast::Receiver<()>);
|
||||||
|
|
||||||
|
impl RotationHandler {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
let (s, r) = broadcast::channel::<()>(1);
|
||||||
|
|
||||||
|
Self(s, r)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn watch(&self) -> impl Future<Output = ()> {
|
||||||
|
let mut r = self.0.subscribe();
|
||||||
|
|
||||||
|
async move {
|
||||||
|
let _ = r.recv().await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn fire(&self) {
|
||||||
|
let _ = self.0.send(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Globals {
|
impl Globals {
|
||||||
pub fn load(
|
pub fn load(
|
||||||
globals: Arc<dyn Tree>,
|
globals: Arc<dyn Tree>,
|
||||||
|
@ -168,6 +192,7 @@ impl Globals {
|
||||||
bad_signature_ratelimiter: Arc::new(RwLock::new(BTreeMap::new())),
|
bad_signature_ratelimiter: Arc::new(RwLock::new(BTreeMap::new())),
|
||||||
servername_ratelimiter: Arc::new(RwLock::new(BTreeMap::new())),
|
servername_ratelimiter: Arc::new(RwLock::new(BTreeMap::new())),
|
||||||
sync_receivers: RwLock::new(BTreeMap::new()),
|
sync_receivers: RwLock::new(BTreeMap::new()),
|
||||||
|
rotate: RotationHandler::new(),
|
||||||
};
|
};
|
||||||
|
|
||||||
fs::create_dir_all(s.get_media_folder())?;
|
fs::create_dir_all(s.get_media_folder())?;
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue