1
0
Fork 0
mirror of https://gitlab.com/famedly/conduit.git synced 2025-06-27 16:35:59 +00:00

switch Iterator to Send + Iterator in return types

This changes the SQLite implementation quite a bit. It may potentially
reduce performance, but allows using the new async API and removes some
usage of unsafe
This commit is contained in:
chayleaf 2024-06-23 00:31:27 +07:00
parent a7e34eb0b3
commit a8c9e3eebe
No known key found for this signature in database
GPG key ID: 78171AD46227E68E
37 changed files with 139 additions and 166 deletions

View file

@ -50,13 +50,13 @@ pub trait KvTree: Send + Sync {
fn remove(&self, key: &[u8]) -> Result<()>;
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>;
fn iter<'a>(&'a self) -> Box<dyn Send + Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>;
fn iter_from<'a>(
&'a self,
from: &[u8],
backwards: bool,
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>;
) -> Box<dyn Send + Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>;
fn increment(&self, key: &[u8]) -> Result<Vec<u8>>;
fn increment_batch(&self, iter: &mut dyn Iterator<Item = Vec<u8>>) -> Result<()>;
@ -64,7 +64,7 @@ pub trait KvTree: Send + Sync {
fn scan_prefix<'a>(
&'a self,
prefix: Vec<u8>,
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>;
) -> Box<dyn Send + Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>;
fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>;

View file

@ -74,7 +74,7 @@ impl EngineTree {
tree: Arc<heed::UntypedDatabase>,
from: Vec<u8>,
backwards: bool,
) -> Box<dyn Iterator<Item = TupleOfBytes> + Send + Sync> {
) -> Box<dyn Send + Iterator<Item = TupleOfBytes> + Send + Sync> {
let (s, r) = bounded::<TupleOfBytes>(100);
let engine = Arc::clone(&self.engine);
@ -150,7 +150,7 @@ impl Tree for EngineTree {
Ok(())
}
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + Send + 'a> {
fn iter<'a>(&'a self) -> Box<dyn Send + Iterator<Item = (Vec<u8>, Vec<u8>)> + Send + 'a> {
self.iter_from(&[], false)
}
@ -158,7 +158,7 @@ impl Tree for EngineTree {
&self,
from: &[u8],
backwards: bool,
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + Send> {
) -> Box<dyn Send + Iterator<Item = (Vec<u8>, Vec<u8>)> + Send> {
self.iter_from_thread(Arc::clone(&self.tree), from.to_vec(), backwards)
}
@ -181,7 +181,7 @@ impl Tree for EngineTree {
fn scan_prefix<'a>(
&'a self,
prefix: Vec<u8>,
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + Send + 'a> {
) -> Box<dyn Send + Iterator<Item = (Vec<u8>, Vec<u8>)> + Send + 'a> {
Box::new(
self.iter_from(&prefix, false)
.take_while(move |(key, _)| key.starts_with(&prefix)),

View file

@ -113,7 +113,7 @@ impl KvTree for PersyTree {
Ok(())
}
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
fn iter<'a>(&'a self) -> Box<dyn Send + Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
let iter = self.persy.range::<ByteVec, ByteVec, _>(&self.name, ..);
match iter {
Ok(iter) => Box::new(iter.filter_map(|(k, v)| {
@ -132,7 +132,7 @@ impl KvTree for PersyTree {
&'a self,
from: &[u8],
backwards: bool,
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
) -> Box<dyn Send + Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
let range = if backwards {
self.persy
.range::<ByteVec, ByteVec, _>(&self.name, ..=ByteVec::from(from))
@ -168,7 +168,7 @@ impl KvTree for PersyTree {
fn scan_prefix<'a>(
&'a self,
prefix: Vec<u8>,
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
) -> Box<dyn Send + Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
let range_prefix = ByteVec::from(prefix.clone());
let range = self
.persy

View file

@ -175,7 +175,7 @@ impl KvTree for RocksDbEngineTree<'_> {
.delete_cf_opt(&self.cf(), key, &writeoptions)?)
}
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
fn iter<'a>(&'a self) -> Box<dyn Send + Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
let readoptions = rocksdb::ReadOptions::default();
Box::new(
@ -191,7 +191,7 @@ impl KvTree for RocksDbEngineTree<'_> {
&'a self,
from: &[u8],
backwards: bool,
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
) -> Box<dyn Send + Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
let readoptions = rocksdb::ReadOptions::default();
Box::new(
@ -252,7 +252,7 @@ impl KvTree for RocksDbEngineTree<'_> {
fn scan_prefix<'a>(
&'a self,
prefix: Vec<u8>,
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
) -> Box<dyn Send + Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
let readoptions = rocksdb::ReadOptions::default();
Box::new(

View file

@ -52,7 +52,7 @@ impl Tree for SledEngineTree {
Ok(())
}
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
fn iter<'a>(&'a self) -> Box<dyn Send + Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
Box::new(
self.0
.iter()
@ -70,7 +70,7 @@ impl Tree for SledEngineTree {
&self,
from: &[u8],
backwards: bool,
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)>> {
) -> Box<dyn Send + Iterator<Item = (Vec<u8>, Vec<u8>)>> {
let iter = if backwards {
self.0.range(..=from)
} else {
@ -103,7 +103,7 @@ impl Tree for SledEngineTree {
fn scan_prefix<'a>(
&'a self,
prefix: Vec<u8>,
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
) -> Box<dyn Send + Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
let iter = self
.0
.scan_prefix(prefix)

View file

@ -8,7 +8,7 @@ use std::{
future::Future,
path::{Path, PathBuf},
pin::Pin,
sync::Arc,
sync::{mpsc, Arc},
};
use thread_local::ThreadLocal;
use tracing::debug;
@ -18,26 +18,6 @@ thread_local! {
static READ_CONNECTION_ITERATOR: RefCell<Option<&'static Connection>> = const { RefCell::new(None) };
}
struct PreparedStatementIterator<'a> {
pub iterator: Box<dyn Iterator<Item = TupleOfBytes> + 'a>,
pub _statement_ref: NonAliasingBox<rusqlite::Statement<'a>>,
}
impl Iterator for PreparedStatementIterator<'_> {
type Item = TupleOfBytes;
fn next(&mut self) -> Option<Self::Item> {
self.iterator.next()
}
}
struct NonAliasingBox<T>(*mut T);
impl<T> Drop for NonAliasingBox<T> {
fn drop(&mut self) {
drop(unsafe { Box::from_raw(self.0) });
}
}
pub struct Engine {
writer: Mutex<Connection>,
read_conn_tls: ThreadLocal<Connection>,
@ -135,6 +115,22 @@ pub struct SqliteTable {
type TupleOfBytes = (Vec<u8>, Vec<u8>);
fn iter_stmt(
from: impl rusqlite::Params,
mut statement: rusqlite::Statement,
out: mpsc::SyncSender<TupleOfBytes>,
) {
for item in statement
.query_map(from, |row| Ok((row.get_unwrap(0), row.get_unwrap(1))))
.unwrap()
.map(|r| r.unwrap())
{
if out.send(item).is_err() {
break;
}
}
}
impl SqliteTable {
fn get_with_guard(&self, guard: &Connection, key: &[u8]) -> Result<Option<Vec<u8>>> {
Ok(guard
@ -155,34 +151,18 @@ impl SqliteTable {
Ok(())
}
pub fn iter_with_guard<'a>(
&'a self,
guard: &'a Connection,
) -> Box<dyn Iterator<Item = TupleOfBytes> + 'a> {
let statement = Box::leak(Box::new(
guard
.prepare(&format!(
"SELECT key, value FROM {} ORDER BY key ASC",
&self.name
))
.unwrap(),
));
let statement_ref = NonAliasingBox(statement);
pub fn iter_with_guard(
name: &str,
guard: &Connection,
out: mpsc::SyncSender<TupleOfBytes>,
) {
let statement = guard
.prepare(&format!("SELECT key, value FROM {} ORDER BY key ASC", name))
.unwrap();
//let name = self.name.clone();
let iterator = Box::new(
statement
.query_map([], |row| Ok((row.get_unwrap(0), row.get_unwrap(1))))
.unwrap()
.map(move |r| r.unwrap()),
);
Box::new(PreparedStatementIterator {
iterator,
_statement_ref: statement_ref,
})
iter_stmt([], statement, out)
}
}
@ -241,68 +221,51 @@ impl KvTree for SqliteTable {
Ok(())
}
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = TupleOfBytes> + 'a> {
let guard = self.engine.read_lock_iterator();
fn iter<'a>(&'a self) -> Box<dyn Send + Iterator<Item = TupleOfBytes> + 'a> {
let (tx, rx) = mpsc::sync_channel(1);
let engine = self.engine.clone();
let name = self.name.clone();
tokio::task::spawn_blocking(move || {
let guard = engine.read_lock_iterator();
self.iter_with_guard(guard)
Self::iter_with_guard(&name, guard, tx)
});
Box::new(rx.into_iter())
}
fn iter_from<'a>(
&'a self,
from: &[u8],
backwards: bool,
) -> Box<dyn Iterator<Item = TupleOfBytes> + 'a> {
let guard = self.engine.read_lock_iterator();
) -> Box<dyn Send + Iterator<Item = TupleOfBytes> + 'a> {
let (tx, rx) = mpsc::sync_channel(1);
let engine = self.engine.clone();
let name = self.name.clone();
let from = from.to_vec(); // TODO change interface?
tokio::task::spawn_blocking(move || {
let guard = engine.read_lock_iterator();
//let name = self.name.clone();
if backwards {
let statement = Box::leak(Box::new(
guard
if backwards {
let statement = guard
.prepare(&format!(
"SELECT key, value FROM {} WHERE key <= ? ORDER BY key DESC",
&self.name
&name
))
.unwrap(),
));
.unwrap();
let statement_ref = NonAliasingBox(statement);
let iterator = Box::new(
statement
.query_map([from], |row| Ok((row.get_unwrap(0), row.get_unwrap(1))))
.unwrap()
.map(move |r| r.unwrap()),
);
Box::new(PreparedStatementIterator {
iterator,
_statement_ref: statement_ref,
})
} else {
let statement = Box::leak(Box::new(
guard
iter_stmt([from], statement, tx)
} else {
let statement = guard
.prepare(&format!(
"SELECT key, value FROM {} WHERE key >= ? ORDER BY key ASC",
&self.name
&name
))
.unwrap(),
));
.unwrap();
let statement_ref = NonAliasingBox(statement);
let iterator = Box::new(
statement
.query_map([from], |row| Ok((row.get_unwrap(0), row.get_unwrap(1))))
.unwrap()
.map(move |r| r.unwrap()),
);
Box::new(PreparedStatementIterator {
iterator,
_statement_ref: statement_ref,
})
}
iter_stmt([from], statement, tx)
}
});
Box::new(rx.into_iter())
}
fn increment(&self, key: &[u8]) -> Result<Vec<u8>> {
@ -318,7 +281,10 @@ impl KvTree for SqliteTable {
Ok(new)
}
fn scan_prefix<'a>(&'a self, prefix: Vec<u8>) -> Box<dyn Iterator<Item = TupleOfBytes> + 'a> {
fn scan_prefix<'a>(
&'a self,
prefix: Vec<u8>,
) -> Box<dyn Send + Iterator<Item = TupleOfBytes> + 'a> {
Box::new(
self.iter_from(&prefix, false)
.take_while(move |(key, _)| key.starts_with(&prefix)),

View file

@ -36,7 +36,7 @@ impl service::appservice::Data for KeyValueDatabase {
.transpose()
}
fn iter_ids<'a>(&'a self) -> Result<Box<dyn Iterator<Item = Result<String>> + 'a>> {
fn iter_ids<'a>(&'a self) -> Result<Box<dyn Send + Iterator<Item = Result<String>> + 'a>> {
Ok(Box::new(self.id_appserviceregistrations.iter().map(
|(id, _)| {
utils::string_from_bytes(&id).map_err(|_| {

View file

@ -60,7 +60,7 @@ impl service::pusher::Data for KeyValueDatabase {
fn get_pushkeys<'a>(
&'a self,
sender: &UserId,
) -> Box<dyn Iterator<Item = Result<String>> + 'a> {
) -> Box<dyn Send + Iterator<Item = Result<String>> + 'a> {
let mut prefix = sender.as_bytes().to_vec();
prefix.push(0xff);

View file

@ -52,7 +52,7 @@ impl service::rooms::alias::Data for KeyValueDatabase {
fn local_aliases_for_room<'a>(
&'a self,
room_id: &RoomId,
) -> Box<dyn Iterator<Item = Result<OwnedRoomAliasId>> + 'a> {
) -> Box<dyn Send + Iterator<Item = Result<OwnedRoomAliasId>> + 'a> {
let mut prefix = room_id.as_bytes().to_vec();
prefix.push(0xff);

View file

@ -15,7 +15,7 @@ impl service::rooms::directory::Data for KeyValueDatabase {
Ok(self.publicroomids.get(room_id.as_bytes())?.is_some())
}
fn public_rooms<'a>(&'a self) -> Box<dyn Iterator<Item = Result<OwnedRoomId>> + 'a> {
fn public_rooms<'a>(&'a self) -> Box<dyn Send + Iterator<Item = Result<OwnedRoomId>> + 'a> {
Box::new(self.publicroomids.iter().map(|(bytes, _)| {
RoomId::parse(
utils::string_from_bytes(&bytes).map_err(|_| {

View file

@ -53,7 +53,8 @@ impl service::rooms::edus::read_receipt::Data for KeyValueDatabase {
room_id: &RoomId,
since: u64,
) -> Box<
dyn Iterator<
dyn Send
+ Iterator<
Item = Result<(
OwnedUserId,
u64,

View file

@ -18,7 +18,7 @@ impl service::rooms::metadata::Data for KeyValueDatabase {
.is_some())
}
fn iter_ids<'a>(&'a self) -> Box<dyn Iterator<Item = Result<OwnedRoomId>> + 'a> {
fn iter_ids<'a>(&'a self) -> Box<dyn Send + Iterator<Item = Result<OwnedRoomId>> + 'a> {
Box::new(self.roomid_shortroomid.iter().map(|(bytes, _)| {
RoomId::parse(
utils::string_from_bytes(&bytes).map_err(|_| {

View file

@ -22,7 +22,7 @@ impl service::rooms::pdu_metadata::Data for KeyValueDatabase {
shortroomid: u64,
target: u64,
until: PduCount,
) -> Result<Box<dyn Iterator<Item = Result<(PduCount, PduEvent)>> + 'a>> {
) -> Result<Box<dyn Send + Iterator<Item = Result<(PduCount, PduEvent)>> + 'a>> {
let prefix = target.to_be_bytes().to_vec();
let mut current = prefix.clone();

View file

@ -46,7 +46,7 @@ impl service::rooms::search::Data for KeyValueDatabase {
&'a self,
room_id: &RoomId,
search_string: &str,
) -> Result<Option<(Box<dyn Iterator<Item = Vec<u8>> + 'a>, Vec<String>)>> {
) -> Result<Option<(Box<dyn Send + Iterator<Item = Vec<u8>> + 'a>, Vec<String>)>> {
let prefix = services()
.rooms
.short

View file

@ -244,7 +244,7 @@ impl service::rooms::state_cache::Data for KeyValueDatabase {
fn room_servers<'a>(
&'a self,
room_id: &RoomId,
) -> Box<dyn Iterator<Item = Result<OwnedServerName>> + 'a> {
) -> Box<dyn Send + Iterator<Item = Result<OwnedServerName>> + 'a> {
let mut prefix = room_id.as_bytes().to_vec();
prefix.push(0xff);
@ -277,7 +277,7 @@ impl service::rooms::state_cache::Data for KeyValueDatabase {
fn server_rooms<'a>(
&'a self,
server: &ServerName,
) -> Box<dyn Iterator<Item = Result<OwnedRoomId>> + 'a> {
) -> Box<dyn Send + Iterator<Item = Result<OwnedRoomId>> + 'a> {
let mut prefix = server.as_bytes().to_vec();
prefix.push(0xff);
@ -299,7 +299,7 @@ impl service::rooms::state_cache::Data for KeyValueDatabase {
fn room_members<'a>(
&'a self,
room_id: &RoomId,
) -> Box<dyn Iterator<Item = Result<OwnedUserId>> + 'a> {
) -> Box<dyn Send + Iterator<Item = Result<OwnedUserId>> + 'a> {
let mut prefix = room_id.as_bytes().to_vec();
prefix.push(0xff);
@ -345,7 +345,7 @@ impl service::rooms::state_cache::Data for KeyValueDatabase {
fn room_useroncejoined<'a>(
&'a self,
room_id: &RoomId,
) -> Box<dyn Iterator<Item = Result<OwnedUserId>> + 'a> {
) -> Box<dyn Send + Iterator<Item = Result<OwnedUserId>> + 'a> {
let mut prefix = room_id.as_bytes().to_vec();
prefix.push(0xff);
@ -375,7 +375,7 @@ impl service::rooms::state_cache::Data for KeyValueDatabase {
fn room_members_invited<'a>(
&'a self,
room_id: &RoomId,
) -> Box<dyn Iterator<Item = Result<OwnedUserId>> + 'a> {
) -> Box<dyn Send + Iterator<Item = Result<OwnedUserId>> + 'a> {
let mut prefix = room_id.as_bytes().to_vec();
prefix.push(0xff);
@ -433,7 +433,7 @@ impl service::rooms::state_cache::Data for KeyValueDatabase {
fn rooms_joined<'a>(
&'a self,
user_id: &UserId,
) -> Box<dyn Iterator<Item = Result<OwnedRoomId>> + 'a> {
) -> Box<dyn Send + Iterator<Item = Result<OwnedRoomId>> + 'a> {
Box::new(
self.userroomid_joined
.scan_prefix(user_id.as_bytes().to_vec())
@ -459,7 +459,8 @@ impl service::rooms::state_cache::Data for KeyValueDatabase {
fn rooms_invited<'a>(
&'a self,
user_id: &UserId,
) -> Box<dyn Iterator<Item = Result<(OwnedRoomId, Vec<Raw<AnyStrippedStateEvent>>)>> + 'a> {
) -> Box<dyn Send + Iterator<Item = Result<(OwnedRoomId, Vec<Raw<AnyStrippedStateEvent>>)>> + 'a>
{
let mut prefix = user_id.as_bytes().to_vec();
prefix.push(0xff);
@ -538,7 +539,8 @@ impl service::rooms::state_cache::Data for KeyValueDatabase {
fn rooms_left<'a>(
&'a self,
user_id: &UserId,
) -> Box<dyn Iterator<Item = Result<(OwnedRoomId, Vec<Raw<AnySyncStateEvent>>)>> + 'a> {
) -> Box<dyn Send + Iterator<Item = Result<(OwnedRoomId, Vec<Raw<AnySyncStateEvent>>)>> + 'a>
{
let mut prefix = user_id.as_bytes().to_vec();
prefix.push(0xff);

View file

@ -11,7 +11,7 @@ impl service::rooms::threads::Data for KeyValueDatabase {
room_id: &'a RoomId,
until: u64,
_include: &'a IncludeThreads,
) -> Result<Box<dyn Iterator<Item = Result<(u64, PduEvent)>> + 'a>> {
) -> Result<Box<dyn Send + Iterator<Item = Result<(u64, PduEvent)>> + 'a>> {
let prefix = services()
.rooms
.short

View file

@ -228,7 +228,7 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
user_id: &UserId,
room_id: &RoomId,
until: PduCount,
) -> Result<Box<dyn Iterator<Item = Result<(PduCount, PduEvent)>> + 'a>> {
) -> Result<Box<dyn Send + Iterator<Item = Result<(PduCount, PduEvent)>> + 'a>> {
let (prefix, current) = count_to_id(room_id, until, 1, true)?;
let user_id = user_id.to_owned();
@ -255,7 +255,7 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
user_id: &UserId,
room_id: &RoomId,
from: PduCount,
) -> Result<Box<dyn Iterator<Item = Result<(PduCount, PduEvent)>> + 'a>> {
) -> Result<Box<dyn Send + Iterator<Item = Result<(PduCount, PduEvent)>> + 'a>> {
let (prefix, current) = count_to_id(room_id, from, 1, false)?;
let user_id = user_id.to_owned();

View file

@ -111,7 +111,7 @@ impl service::rooms::user::Data for KeyValueDatabase {
fn get_shared_rooms<'a>(
&'a self,
users: Vec<OwnedUserId>,
) -> Result<Box<dyn Iterator<Item = Result<OwnedRoomId>> + 'a>> {
) -> Result<Box<dyn Send + Iterator<Item = Result<OwnedRoomId>> + 'a>> {
let iterators = users.into_iter().map(move |user_id| {
let mut prefix = user_id.as_bytes().to_vec();
prefix.push(0xff);

View file

@ -12,7 +12,8 @@ use crate::{
impl service::sending::Data for KeyValueDatabase {
fn active_requests<'a>(
&'a self,
) -> Box<dyn Iterator<Item = Result<(Vec<u8>, OutgoingKind, SendingEventType)>> + 'a> {
) -> Box<dyn Send + Iterator<Item = Result<(Vec<u8>, OutgoingKind, SendingEventType)>> + 'a>
{
Box::new(
self.servercurrentevent_data
.iter()
@ -23,7 +24,7 @@ impl service::sending::Data for KeyValueDatabase {
fn active_requests_for<'a>(
&'a self,
outgoing_kind: &OutgoingKind,
) -> Box<dyn Iterator<Item = Result<(Vec<u8>, SendingEventType)>> + 'a> {
) -> Box<dyn Send + Iterator<Item = Result<(Vec<u8>, SendingEventType)>> + 'a> {
let prefix = outgoing_kind.get_prefix();
Box::new(
self.servercurrentevent_data
@ -87,7 +88,7 @@ impl service::sending::Data for KeyValueDatabase {
fn queued_requests<'a>(
&'a self,
outgoing_kind: &OutgoingKind,
) -> Box<dyn Iterator<Item = Result<(SendingEventType, Vec<u8>)>> + 'a> {
) -> Box<dyn Send + Iterator<Item = Result<(SendingEventType, Vec<u8>)>> + 'a> {
let prefix = outgoing_kind.get_prefix();
return Box::new(
self.servernameevent_data

View file

@ -68,7 +68,7 @@ impl service::users::Data for KeyValueDatabase {
}
/// Returns an iterator over all users on this homeserver.
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = Result<OwnedUserId>> + 'a> {
fn iter<'a>(&'a self) -> Box<dyn Send + Iterator<Item = Result<OwnedUserId>> + 'a> {
Box::new(self.userid_password.iter().map(|(bytes, _)| {
UserId::parse(utils::string_from_bytes(&bytes).map_err(|_| {
Error::bad_database("User ID in userid_password is invalid unicode.")
@ -259,7 +259,7 @@ impl service::users::Data for KeyValueDatabase {
fn all_device_ids<'a>(
&'a self,
user_id: &UserId,
) -> Box<dyn Iterator<Item = Result<OwnedDeviceId>> + 'a> {
) -> Box<dyn Send + Iterator<Item = Result<OwnedDeviceId>> + 'a> {
let mut prefix = user_id.as_bytes().to_vec();
prefix.push(0xff);
// All devices have metadata
@ -584,7 +584,7 @@ impl service::users::Data for KeyValueDatabase {
user_or_room_id: &str,
from: u64,
to: Option<u64>,
) -> Box<dyn Iterator<Item = Result<OwnedUserId>> + 'a> {
) -> Box<dyn Send + Iterator<Item = Result<OwnedUserId>> + 'a> {
let mut prefix = user_or_room_id.as_bytes().to_vec();
prefix.push(0xff);
@ -899,7 +899,7 @@ impl service::users::Data for KeyValueDatabase {
fn all_devices_metadata<'a>(
&'a self,
user_id: &UserId,
) -> Box<dyn Iterator<Item = Result<Device>> + 'a> {
) -> Box<dyn Send + Iterator<Item = Result<Device>> + 'a> {
let mut key = user_id.as_bytes().to_vec();
key.push(0xff);

View file

@ -15,7 +15,7 @@ pub trait Data: Send + Sync {
fn get_registration(&self, id: &str) -> Result<Option<Registration>>;
fn iter_ids<'a>(&'a self) -> Result<Box<dyn Iterator<Item = Result<String>> + 'a>>;
fn iter_ids<'a>(&'a self) -> Result<Box<dyn Send + Iterator<Item = Result<String>> + 'a>>;
fn all(&self) -> Result<Vec<(String, Registration)>>;
}

View file

@ -11,6 +11,8 @@ pub trait Data: Send + Sync {
fn get_pushers(&self, sender: &UserId) -> Result<Vec<Pusher>>;
fn get_pushkeys<'a>(&'a self, sender: &UserId)
-> Box<dyn Iterator<Item = Result<String>> + 'a>;
fn get_pushkeys<'a>(
&'a self,
sender: &UserId,
) -> Box<dyn Send + Iterator<Item = Result<String>> + 'a>;
}

View file

@ -39,7 +39,7 @@ impl Service {
self.db.get_pushers(sender)
}
pub fn get_pushkeys(&self, sender: &UserId) -> Box<dyn Iterator<Item = Result<String>>> {
pub fn get_pushkeys(&self, sender: &UserId) -> Box<dyn Send + Iterator<Item = Result<String>>> {
self.db.get_pushkeys(sender)
}

View file

@ -18,5 +18,5 @@ pub trait Data: Send + Sync {
fn local_aliases_for_room<'a>(
&'a self,
room_id: &RoomId,
) -> Box<dyn Iterator<Item = Result<OwnedRoomAliasId>> + 'a>;
) -> Box<dyn Send + Iterator<Item = Result<OwnedRoomAliasId>> + 'a>;
}

View file

@ -95,7 +95,7 @@ impl Service {
pub fn local_aliases_for_room<'a>(
&'a self,
room_id: &RoomId,
) -> Box<dyn Iterator<Item = Result<OwnedRoomAliasId>> + 'a> {
) -> Box<dyn Send + Iterator<Item = Result<OwnedRoomAliasId>> + 'a> {
self.db.local_aliases_for_room(room_id)
}
}

View file

@ -12,5 +12,5 @@ pub trait Data: Send + Sync {
fn is_public_room(&self, room_id: &RoomId) -> Result<bool>;
/// Returns the unsorted public room directory
fn public_rooms<'a>(&'a self) -> Box<dyn Iterator<Item = Result<OwnedRoomId>> + 'a>;
fn public_rooms<'a>(&'a self) -> Box<dyn Send + Iterator<Item = Result<OwnedRoomId>> + 'a>;
}

View file

@ -17,7 +17,8 @@ pub trait Data: Send + Sync {
room_id: &RoomId,
since: u64,
) -> Box<
dyn Iterator<
dyn Send
+ Iterator<
Item = Result<(
OwnedUserId,
u64,

View file

@ -3,7 +3,7 @@ use ruma::{OwnedRoomId, RoomId};
pub trait Data: Send + Sync {
fn exists(&self, room_id: &RoomId) -> Result<bool>;
fn iter_ids<'a>(&'a self) -> Box<dyn Iterator<Item = Result<OwnedRoomId>> + 'a>;
fn iter_ids<'a>(&'a self) -> Box<dyn Send + Iterator<Item = Result<OwnedRoomId>> + 'a>;
fn is_disabled(&self, room_id: &RoomId) -> Result<bool>;
fn disable_room(&self, room_id: &RoomId, disabled: bool) -> Result<()>;
}

View file

@ -16,7 +16,7 @@ impl Service {
self.db.exists(room_id)
}
pub fn iter_ids<'a>(&'a self) -> Box<dyn Iterator<Item = Result<OwnedRoomId>> + 'a> {
pub fn iter_ids<'a>(&'a self) -> Box<dyn Send + Iterator<Item = Result<OwnedRoomId>> + 'a> {
self.db.iter_ids()
}

View file

@ -12,7 +12,7 @@ pub trait Data: Send + Sync {
room_id: u64,
target: u64,
until: PduCount,
) -> Result<Box<dyn Iterator<Item = Result<(PduCount, PduEvent)>> + 'a>>;
) -> Result<Box<dyn Send + Iterator<Item = Result<(PduCount, PduEvent)>> + 'a>>;
fn mark_as_referenced(&self, room_id: &RoomId, event_ids: &[Arc<EventId>]) -> Result<()>;
fn is_event_referenced(&self, room_id: &RoomId, event_id: &EventId) -> Result<bool>;
fn mark_event_soft_failed(&self, event_id: &EventId) -> Result<()>;

View file

@ -11,5 +11,5 @@ pub trait Data: Send + Sync {
&'a self,
room_id: &RoomId,
search_string: &str,
) -> Result<Option<(Box<dyn Iterator<Item = Vec<u8>> + 'a>, Vec<String>)>>;
) -> Result<Option<(Box<dyn Send + Iterator<Item = Vec<u8>> + 'a>, Vec<String>)>>;
}

View file

@ -31,7 +31,7 @@ pub trait Data: Send + Sync {
fn room_servers<'a>(
&'a self,
room_id: &RoomId,
) -> Box<dyn Iterator<Item = Result<OwnedServerName>> + 'a>;
) -> Box<dyn Send + Iterator<Item = Result<OwnedServerName>> + 'a>;
fn server_in_room(&self, server: &ServerName, room_id: &RoomId) -> Result<bool>;
@ -39,13 +39,13 @@ pub trait Data: Send + Sync {
fn server_rooms<'a>(
&'a self,
server: &ServerName,
) -> Box<dyn Iterator<Item = Result<OwnedRoomId>> + 'a>;
) -> Box<dyn Send + Iterator<Item = Result<OwnedRoomId>> + 'a>;
/// Returns an iterator over all joined members of a room.
fn room_members<'a>(
&'a self,
room_id: &RoomId,
) -> Box<dyn Iterator<Item = Result<OwnedUserId>> + 'a>;
) -> Box<dyn Send + Iterator<Item = Result<OwnedUserId>> + 'a>;
fn room_joined_count(&self, room_id: &RoomId) -> Result<Option<u64>>;
@ -55,13 +55,13 @@ pub trait Data: Send + Sync {
fn room_useroncejoined<'a>(
&'a self,
room_id: &RoomId,
) -> Box<dyn Iterator<Item = Result<OwnedUserId>> + 'a>;
) -> Box<dyn Send + Iterator<Item = Result<OwnedUserId>> + 'a>;
/// Returns an iterator over all invited members of a room.
fn room_members_invited<'a>(
&'a self,
room_id: &RoomId,
) -> Box<dyn Iterator<Item = Result<OwnedUserId>> + 'a>;
) -> Box<dyn Send + Iterator<Item = Result<OwnedUserId>> + 'a>;
fn get_invite_count(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<u64>>;
@ -71,14 +71,14 @@ pub trait Data: Send + Sync {
fn rooms_joined<'a>(
&'a self,
user_id: &UserId,
) -> Box<dyn Iterator<Item = Result<OwnedRoomId>> + 'a>;
) -> Box<dyn Send + Iterator<Item = Result<OwnedRoomId>> + 'a>;
/// Returns an iterator over all rooms a user was invited to.
#[allow(clippy::type_complexity)]
fn rooms_invited<'a>(
&'a self,
user_id: &UserId,
) -> Box<dyn Iterator<Item = Result<(OwnedRoomId, Vec<Raw<AnyStrippedStateEvent>>)>> + 'a>;
) -> Box<dyn Send + Iterator<Item = Result<(OwnedRoomId, Vec<Raw<AnyStrippedStateEvent>>)>> + 'a>;
fn invite_state(
&self,
@ -97,7 +97,7 @@ pub trait Data: Send + Sync {
fn rooms_left<'a>(
&'a self,
user_id: &UserId,
) -> Box<dyn Iterator<Item = Result<(OwnedRoomId, Vec<Raw<AnySyncStateEvent>>)>> + 'a>;
) -> Box<dyn Send + Iterator<Item = Result<(OwnedRoomId, Vec<Raw<AnySyncStateEvent>>)>> + 'a>;
fn once_joined(&self, user_id: &UserId, room_id: &RoomId) -> Result<bool>;

View file

@ -9,7 +9,7 @@ pub trait Data: Send + Sync {
room_id: &'a RoomId,
until: u64,
include: &'a IncludeThreads,
) -> Result<Box<dyn Iterator<Item = Result<(u64, PduEvent)>> + 'a>>;
) -> Result<Box<dyn Send + Iterator<Item = Result<(u64, PduEvent)>> + 'a>>;
fn update_participants(&self, root_id: &[u8], participants: &[OwnedUserId]) -> Result<()>;
fn get_participants(&self, root_id: &[u8]) -> Result<Option<Vec<OwnedUserId>>>;

View file

@ -72,7 +72,7 @@ pub trait Data: Send + Sync {
user_id: &UserId,
room_id: &RoomId,
until: PduCount,
) -> Result<Box<dyn Iterator<Item = Result<(PduCount, PduEvent)>> + 'a>>;
) -> Result<Box<dyn Send + Iterator<Item = Result<(PduCount, PduEvent)>> + 'a>>;
/// Returns an iterator over all events in a room that happened after the event with id `from`
/// in chronological order.
@ -82,7 +82,7 @@ pub trait Data: Send + Sync {
user_id: &UserId,
room_id: &RoomId,
from: PduCount,
) -> Result<Box<dyn Iterator<Item = Result<(PduCount, PduEvent)>> + 'a>>;
) -> Result<Box<dyn Send + Iterator<Item = Result<(PduCount, PduEvent)>> + 'a>>;
fn increment_notification_counts(
&self,

View file

@ -23,5 +23,5 @@ pub trait Data: Send + Sync {
fn get_shared_rooms<'a>(
&'a self,
users: Vec<OwnedUserId>,
) -> Result<Box<dyn Iterator<Item = Result<OwnedRoomId>> + 'a>>;
) -> Result<Box<dyn Send + Iterator<Item = Result<OwnedRoomId>> + 'a>>;
}

View file

@ -8,11 +8,11 @@ pub trait Data: Send + Sync {
#[allow(clippy::type_complexity)]
fn active_requests<'a>(
&'a self,
) -> Box<dyn Iterator<Item = Result<(Vec<u8>, OutgoingKind, SendingEventType)>> + 'a>;
) -> Box<dyn Send + Iterator<Item = Result<(Vec<u8>, OutgoingKind, SendingEventType)>> + 'a>;
fn active_requests_for<'a>(
&'a self,
outgoing_kind: &OutgoingKind,
) -> Box<dyn Iterator<Item = Result<(Vec<u8>, SendingEventType)>> + 'a>;
) -> Box<dyn Send + Iterator<Item = Result<(Vec<u8>, SendingEventType)>> + 'a>;
fn delete_active_request(&self, key: Vec<u8>) -> Result<()>;
fn delete_all_active_requests_for(&self, outgoing_kind: &OutgoingKind) -> Result<()>;
fn delete_all_requests_for(&self, outgoing_kind: &OutgoingKind) -> Result<()>;
@ -23,7 +23,7 @@ pub trait Data: Send + Sync {
fn queued_requests<'a>(
&'a self,
outgoing_kind: &OutgoingKind,
) -> Box<dyn Iterator<Item = Result<(SendingEventType, Vec<u8>)>> + 'a>;
) -> Box<dyn Send + Iterator<Item = Result<(SendingEventType, Vec<u8>)>> + 'a>;
fn mark_as_active(&self, events: &[(SendingEventType, Vec<u8>)]) -> Result<()>;
fn set_latest_educount(&self, server_name: &ServerName, educount: u64) -> Result<()>;
fn get_latest_educount(&self, server_name: &ServerName) -> Result<u64>;

View file

@ -23,7 +23,7 @@ pub trait Data: Send + Sync {
fn find_from_token(&self, token: &str) -> Result<Option<(OwnedUserId, String)>>;
/// Returns an iterator over all users on this homeserver.
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = Result<OwnedUserId>> + 'a>;
fn iter<'a>(&'a self) -> Box<dyn Send + Iterator<Item = Result<OwnedUserId>> + 'a>;
/// Returns a list of local users as list of usernames.
///
@ -70,7 +70,7 @@ pub trait Data: Send + Sync {
fn all_device_ids<'a>(
&'a self,
user_id: &UserId,
) -> Box<dyn Iterator<Item = Result<OwnedDeviceId>> + 'a>;
) -> Box<dyn Send + Iterator<Item = Result<OwnedDeviceId>> + 'a>;
/// Replaces the access token of one device.
fn set_token(&self, user_id: &UserId, device_id: &DeviceId, token: &str) -> Result<()>;
@ -127,7 +127,7 @@ pub trait Data: Send + Sync {
user_or_room_id: &str,
from: u64,
to: Option<u64>,
) -> Box<dyn Iterator<Item = Result<OwnedUserId>> + 'a>;
) -> Box<dyn Send + Iterator<Item = Result<OwnedUserId>> + 'a>;
fn mark_device_key_update(&self, user_id: &UserId) -> Result<()>;
@ -205,7 +205,7 @@ pub trait Data: Send + Sync {
fn all_devices_metadata<'a>(
&'a self,
user_id: &UserId,
) -> Box<dyn Iterator<Item = Result<Device>> + 'a>;
) -> Box<dyn Send + Iterator<Item = Result<Device>> + 'a>;
/// Creates a new sync filter. Returns the filter id.
fn create_filter(&self, user_id: &UserId, filter: &FilterDefinition) -> Result<String>;