1
0
Fork 0
mirror of https://gitlab.com/famedly/conduit.git synced 2025-06-27 16:35:59 +00:00

Work on event_handler, lazy_loading, metadata, outlier, pdu_metadata

This commit is contained in:
Timo Kösters 2022-07-10 10:06:23 +02:00
parent b5305ba217
commit 173f8b1b4d
No known key found for this signature in database
GPG key ID: 356E705610F626D5
11 changed files with 1286 additions and 1135 deletions

View file

@ -509,3 +509,142 @@ fn parse_presence_event(bytes: &[u8]) -> Result<PresenceEvent> {
.map(|timestamp| current_timestamp - timestamp);
}
}
impl service::room::lazy_load::Data for KeyValueDatabase {
fn lazy_load_was_sent_before(
&self,
user_id: &UserId,
device_id: &DeviceId,
room_id: &RoomId,
ll_user: &UserId,
) -> Result<bool> {
let mut key = user_id.as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(device_id.as_bytes());
key.push(0xff);
key.extend_from_slice(room_id.as_bytes());
key.push(0xff);
key.extend_from_slice(ll_user.as_bytes());
Ok(self.lazyloadedids.get(&key)?.is_some())
}
fn lazy_load_confirm_delivery(
&self,
user_id: &UserId,
device_id: &DeviceId,
room_id: &RoomId,
since: u64,
) -> Result<()> {
if let Some(user_ids) = self.lazy_load_waiting.lock().unwrap().remove(&(
user_id.to_owned(),
device_id.to_owned(),
room_id.to_owned(),
since,
)) {
let mut prefix = user_id.as_bytes().to_vec();
prefix.push(0xff);
prefix.extend_from_slice(device_id.as_bytes());
prefix.push(0xff);
prefix.extend_from_slice(room_id.as_bytes());
prefix.push(0xff);
for ll_id in user_ids {
let mut key = prefix.clone();
key.extend_from_slice(ll_id.as_bytes());
self.lazyloadedids.insert(&key, &[])?;
}
}
Ok(())
}
fn lazy_load_reset(
&self,
user_id: &UserId,
device_id: &DeviceId,
room_id: &RoomId,
) -> Result<()> {
let mut prefix = user_id.as_bytes().to_vec();
prefix.push(0xff);
prefix.extend_from_slice(device_id.as_bytes());
prefix.push(0xff);
prefix.extend_from_slice(room_id.as_bytes());
prefix.push(0xff);
for (key, _) in self.lazyloadedids.scan_prefix(prefix) {
self.lazyloadedids.remove(&key)?;
}
Ok(())
}
}
impl service::room::metadata::Data for KeyValueDatabase {
fn exists(&self, room_id: &RoomId) -> Result<bool> {
let prefix = match self.get_shortroomid(room_id)? {
Some(b) => b.to_be_bytes().to_vec(),
None => return Ok(false),
};
// Look for PDUs in that room.
Ok(self
.pduid_pdu
.iter_from(&prefix, false)
.next()
.filter(|(k, _)| k.starts_with(&prefix))
.is_some())
}
}
impl service::room::outlier::Data for KeyValueDatabase {
fn get_outlier_pdu_json(&self, event_id: &EventId) -> Result<Option<CanonicalJsonObject>> {
self.eventid_outlierpdu
.get(event_id.as_bytes())?
.map_or(Ok(None), |pdu| {
serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db."))
})
}
fn get_outlier_pdu(&self, event_id: &EventId) -> Result<Option<PduEvent>> {
self.eventid_outlierpdu
.get(event_id.as_bytes())?
.map_or(Ok(None), |pdu| {
serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db."))
})
}
fn add_pdu_outlier(&self, event_id: &EventId, pdu: &CanonicalJsonObject) -> Result<()> {
self.eventid_outlierpdu.insert(
event_id.as_bytes(),
&serde_json::to_vec(&pdu).expect("CanonicalJsonObject is valid"),
)
}
}
impl service::room::pdu_metadata::Data for KeyValueDatabase {
fn mark_as_referenced(&self, room_id: &RoomId, event_ids: &[Arc<EventId>]) -> Result<()> {
for prev in event_ids {
let mut key = room_id.as_bytes().to_vec();
key.extend_from_slice(prev.as_bytes());
self.referencedevents.insert(&key, &[])?;
}
Ok(())
}
fn is_event_referenced(&self, room_id: &RoomId, event_id: &EventId) -> Result<bool> {
let mut key = room_id.as_bytes().to_vec();
key.extend_from_slice(event_id.as_bytes());
Ok(self.referencedevents.get(&key)?.is_some())
}
fn mark_event_soft_failed(&self, event_id: &EventId) -> Result<()> {
self.softfailedeventids.insert(event_id.as_bytes(), &[])
}
fn is_event_soft_failed(&self, event_id: &EventId) -> Result<bool> {
self.softfailedeventids
.get(event_id.as_bytes())
.map(|o| o.is_some())
}
}

View file

@ -2,32 +2,37 @@
/// An async function that can recursively call itself.
type AsyncRecursiveType<'a, T> = Pin<Box<dyn Future<Output = T> + 'a + Send>>;
/// When receiving an event one needs to:
/// 0. Check the server is in the room
/// 1. Skip the PDU if we already know about it
/// 2. Check signatures, otherwise drop
/// 3. Check content hash, redact if doesn't match
/// 4. Fetch any missing auth events doing all checks listed here starting at 1. These are not
/// timeline events
/// 5. Reject "due to auth events" if can't get all the auth events or some of the auth events are
/// also rejected "due to auth events"
/// 6. Reject "due to auth events" if the event doesn't pass auth based on the auth events
/// 7. Persist this event as an outlier
/// 8. If not timeline event: stop
/// 9. Fetch any missing prev events doing all checks listed here starting at 1. These are timeline
/// events
/// 10. Fetch missing state and auth chain events by calling /state_ids at backwards extremities
/// doing all the checks in this list starting at 1. These are not timeline events
/// 11. Check the auth of the event passes based on the state of the event
/// 12. Ensure that the state is derived from the previous current state (i.e. we calculated by
/// doing state res where one of the inputs was a previously trusted set of state, don't just
/// trust a set of state we got from a remote)
/// 13. Check if the event passes auth based on the "current state" of the room, if not "soft fail"
/// it
/// 14. Use state resolution to find new room state
// We use some AsyncRecursiveType hacks here so we can call this async funtion recursively
#[tracing::instrument(skip(value, is_timeline_event, db, pub_key_map))]
pub(crate) async fn handle_incoming_pdu<'a>(
use crate::service::*;
pub struct Service;
impl Service {
/// When receiving an event one needs to:
/// 0. Check the server is in the room
/// 1. Skip the PDU if we already know about it
/// 2. Check signatures, otherwise drop
/// 3. Check content hash, redact if doesn't match
/// 4. Fetch any missing auth events doing all checks listed here starting at 1. These are not
/// timeline events
/// 5. Reject "due to auth events" if can't get all the auth events or some of the auth events are
/// also rejected "due to auth events"
/// 6. Reject "due to auth events" if the event doesn't pass auth based on the auth events
/// 7. Persist this event as an outlier
/// 8. If not timeline event: stop
/// 9. Fetch any missing prev events doing all checks listed here starting at 1. These are timeline
/// events
/// 10. Fetch missing state and auth chain events by calling /state_ids at backwards extremities
/// doing all the checks in this list starting at 1. These are not timeline events
/// 11. Check the auth of the event passes based on the state of the event
/// 12. Ensure that the state is derived from the previous current state (i.e. we calculated by
/// doing state res where one of the inputs was a previously trusted set of state, don't just
/// trust a set of state we got from a remote)
/// 13. Check if the event passes auth based on the "current state" of the room, if not "soft fail"
/// it
/// 14. Use state resolution to find new room state
// We use some AsyncRecursiveType hacks here so we can call this async funtion recursively
#[tracing::instrument(skip(value, is_timeline_event, db, pub_key_map))]
pub(crate) async fn handle_incoming_pdu<'a>(
origin: &'a ServerName,
event_id: &'a EventId,
room_id: &'a RoomId,
@ -35,7 +40,7 @@ pub(crate) async fn handle_incoming_pdu<'a>(
is_timeline_event: bool,
db: &'a Database,
pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, Base64>>>,
) -> Result<Option<Vec<u8>>> {
) -> Result<Option<Vec<u8>>> {
db.rooms.exists(room_id)?.ok_or(Error::BadRequest(ErrorKind::NotFound, "Room is unknown to this server"))?;
db.rooms.is_disabled(room_id)?.ok_or(Error::BadRequest(ErrorKind::Forbidden, "Federation of this room is currently disabled on this server."))?;
@ -189,10 +194,10 @@ pub(crate) async fn handle_incoming_pdu<'a>(
.remove(&room_id.to_owned());
r
}
}
#[tracing::instrument(skip(create_event, value, db, pub_key_map))]
fn handle_outlier_pdu<'a>(
#[tracing::instrument(skip(create_event, value, db, pub_key_map))]
fn handle_outlier_pdu<'a>(
origin: &'a ServerName,
create_event: &'a PduEvent,
event_id: &'a EventId,
@ -200,7 +205,7 @@ fn handle_outlier_pdu<'a>(
value: BTreeMap<String, CanonicalJsonValue>,
db: &'a Database,
pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, Base64>>>,
) -> AsyncRecursiveType<'a, Result<(Arc<PduEvent>, BTreeMap<String, CanonicalJsonValue>), String>> {
) -> AsyncRecursiveType<'a, Result<(Arc<PduEvent>, BTreeMap<String, CanonicalJsonValue>), String>> {
Box::pin(async move {
// TODO: For RoomVersion6 we must check that Raw<..> is canonical do we anywhere?: https://matrix.org/docs/spec/rooms/v6#canonical-json
@ -335,10 +340,10 @@ fn handle_outlier_pdu<'a>(
Ok((Arc::new(incoming_pdu), val))
})
}
}
#[tracing::instrument(skip(incoming_pdu, val, create_event, db, pub_key_map))]
async fn upgrade_outlier_to_timeline_pdu(
#[tracing::instrument(skip(incoming_pdu, val, create_event, db, pub_key_map))]
async fn upgrade_outlier_to_timeline_pdu(
incoming_pdu: Arc<PduEvent>,
val: BTreeMap<String, CanonicalJsonValue>,
create_event: &PduEvent,
@ -346,7 +351,7 @@ async fn upgrade_outlier_to_timeline_pdu(
db: &Database,
room_id: &RoomId,
pub_key_map: &RwLock<BTreeMap<String, BTreeMap<String, Base64>>>,
) -> Result<Option<Vec<u8>>, String> {
) -> Result<Option<Vec<u8>>, String> {
// Skip the PDU if we already have it as a timeline event
if let Ok(Some(pduid)) = db.rooms.get_pdu_id(&incoming_pdu.event_id) {
return Ok(Some(pduid));
@ -917,26 +922,26 @@ async fn upgrade_outlier_to_timeline_pdu(
// Event has passed all auth/stateres checks
drop(state_lock);
Ok(pdu_id)
}
}
/// Find the event and auth it. Once the event is validated (steps 1 - 8)
/// it is appended to the outliers Tree.
///
/// Returns pdu and if we fetched it over federation the raw json.
///
/// a. Look in the main timeline (pduid_pdu tree)
/// b. Look at outlier pdu tree
/// c. Ask origin server over federation
/// d. TODO: Ask other servers over federation?
#[tracing::instrument(skip_all)]
pub(crate) fn fetch_and_handle_outliers<'a>(
/// Find the event and auth it. Once the event is validated (steps 1 - 8)
/// it is appended to the outliers Tree.
///
/// Returns pdu and if we fetched it over federation the raw json.
///
/// a. Look in the main timeline (pduid_pdu tree)
/// b. Look at outlier pdu tree
/// c. Ask origin server over federation
/// d. TODO: Ask other servers over federation?
#[tracing::instrument(skip_all)]
pub(crate) fn fetch_and_handle_outliers<'a>(
db: &'a Database,
origin: &'a ServerName,
events: &'a [Arc<EventId>],
create_event: &'a PduEvent,
room_id: &'a RoomId,
pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, Base64>>>,
) -> AsyncRecursiveType<'a, Vec<(Arc<PduEvent>, Option<BTreeMap<String, CanonicalJsonValue>>)>> {
) -> AsyncRecursiveType<'a, Vec<(Arc<PduEvent>, Option<BTreeMap<String, CanonicalJsonValue>>)>> {
Box::pin(async move {
let back_off = |id| match db.globals.bad_event_ratelimiter.write().unwrap().entry(id) {
hash_map::Entry::Vacant(e) => {
@ -1071,11 +1076,11 @@ pub(crate) fn fetch_and_handle_outliers<'a>(
}
pdus
})
}
}
fn fetch_unknown_prev_events(initial_set: Vec<Arc<EventId>>) -> Vec<Arc<EventId>> {
fn fetch_unknown_prev_events(initial_set: Vec<Arc<EventId>>) -> Vec<Arc<EventId>> {
let mut graph: HashMap<Arc<EventId>, _> = HashMap::new();
let mut eventid_info = HashMap::new();
let mut todo_outlier_stack: Vec<Arc<EventId>> = initial_set;
@ -1149,4 +1154,5 @@ fn fetch_unknown_prev_events(initial_set: Vec<Arc<EventId>>) -> Vec<Arc<EventId>
.map_err(|_| "Error sorting prev events".to_owned())?;
sorted
}
}

View file

@ -0,0 +1,24 @@
pub trait Data {
fn lazy_load_was_sent_before(
&self,
user_id: &UserId,
device_id: &DeviceId,
room_id: &RoomId,
ll_user: &UserId,
) -> Result<bool>;
fn lazy_load_confirm_delivery(
&self,
user_id: &UserId,
device_id: &DeviceId,
room_id: &RoomId,
since: u64,
) -> Result<()>;
fn lazy_load_reset(
&self,
user_id: &UserId,
device_id: &DeviceId,
room_id: &RoomId,
) -> Result<()>;
}

View file

@ -1,4 +1,13 @@
mod data;
pub use data::Data;
use crate::service::*;
pub struct Service<D: Data> {
db: D,
}
impl Service<_> {
#[tracing::instrument(skip(self))]
pub fn lazy_load_was_sent_before(
&self,
@ -7,14 +16,7 @@
room_id: &RoomId,
ll_user: &UserId,
) -> Result<bool> {
let mut key = user_id.as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(device_id.as_bytes());
key.push(0xff);
key.extend_from_slice(room_id.as_bytes());
key.push(0xff);
key.extend_from_slice(ll_user.as_bytes());
Ok(self.lazyloadedids.get(&key)?.is_some())
self.db.lazy_load_was_sent_before(user_id, device_id, room_id, ll_user)
}
#[tracing::instrument(skip(self))]
@ -45,27 +47,7 @@
room_id: &RoomId,
since: u64,
) -> Result<()> {
if let Some(user_ids) = self.lazy_load_waiting.lock().unwrap().remove(&(
user_id.to_owned(),
device_id.to_owned(),
room_id.to_owned(),
since,
)) {
let mut prefix = user_id.as_bytes().to_vec();
prefix.push(0xff);
prefix.extend_from_slice(device_id.as_bytes());
prefix.push(0xff);
prefix.extend_from_slice(room_id.as_bytes());
prefix.push(0xff);
for ll_id in user_ids {
let mut key = prefix.clone();
key.extend_from_slice(ll_id.as_bytes());
self.lazyloadedids.insert(&key, &[])?;
}
}
Ok(())
self.db.lazy_load_confirm_delivery(user_d, device_id, room_id, since)
}
#[tracing::instrument(skip(self))]
@ -75,17 +57,6 @@
device_id: &DeviceId,
room_id: &RoomId,
) -> Result<()> {
let mut prefix = user_id.as_bytes().to_vec();
prefix.push(0xff);
prefix.extend_from_slice(device_id.as_bytes());
prefix.push(0xff);
prefix.extend_from_slice(room_id.as_bytes());
prefix.push(0xff);
for (key, _) in self.lazyloadedids.scan_prefix(prefix) {
self.lazyloadedids.remove(&key)?;
self.db.lazy_load_reset(user_id, device_id, room_id);
}
Ok(())
}
}

View file

@ -0,0 +1,3 @@
pub trait Data {
fn exists(&self, room_id: &RoomId) -> Result<bool>;
}

View file

@ -1,44 +1,16 @@
mod data;
pub use data::Data;
use crate::service::*;
pub struct Service<D: Data> {
db: D,
}
impl Service<_> {
/// Checks if a room exists.
#[tracing::instrument(skip(self))]
pub fn exists(&self, room_id: &RoomId) -> Result<bool> {
let prefix = match self.get_shortroomid(room_id)? {
Some(b) => b.to_be_bytes().to_vec(),
None => return Ok(false),
};
// Look for PDUs in that room.
Ok(self
.pduid_pdu
.iter_from(&prefix, false)
.next()
.filter(|(k, _)| k.starts_with(&prefix))
.is_some())
self.db.exists(room_id)
}
pub fn get_shortroomid(&self, room_id: &RoomId) -> Result<Option<u64>> {
self.roomid_shortroomid
.get(room_id.as_bytes())?
.map(|bytes| {
utils::u64_from_bytes(&bytes)
.map_err(|_| Error::bad_database("Invalid shortroomid in db."))
})
.transpose()
}
pub fn get_or_create_shortroomid(
&self,
room_id: &RoomId,
globals: &super::globals::Globals,
) -> Result<u64> {
Ok(match self.roomid_shortroomid.get(room_id.as_bytes())? {
Some(short) => utils::u64_from_bytes(&short)
.map_err(|_| Error::bad_database("Invalid shortroomid in db."))?,
None => {
let short = globals.next_count()?;
self.roomid_shortroomid
.insert(room_id.as_bytes(), &short.to_be_bytes())?;
short
}
})
}
}

View file

@ -0,0 +1,5 @@
pub trait Data {
fn get_outlier_pdu_json(&self, event_id: &EventId) -> Result<Option<CanonicalJsonObject>>;
fn get_outlier_pdu(&self, event_id: &EventId) -> Result<Option<PduEvent>>;
fn add_pdu_outlier(&self, event_id: &EventId, pdu: &CanonicalJsonObject) -> Result<()>;
}

View file

@ -1,27 +1,26 @@
mod data;
pub use data::Data;
use crate::service::*;
pub struct Service<D: Data> {
db: D,
}
impl Service<_> {
/// Returns the pdu from the outlier tree.
pub fn get_outlier_pdu_json(&self, event_id: &EventId) -> Result<Option<CanonicalJsonObject>> {
self.eventid_outlierpdu
.get(event_id.as_bytes())?
.map_or(Ok(None), |pdu| {
serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db."))
})
self.db.get_outlier_pdu_json(event_id)
}
/// Returns the pdu from the outlier tree.
pub fn get_pdu_outlier(&self, event_id: &EventId) -> Result<Option<PduEvent>> {
self.eventid_outlierpdu
.get(event_id.as_bytes())?
.map_or(Ok(None), |pdu| {
serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db."))
})
self.db.get_outlier_pdu(event_id)
}
/// Append the PDU as an outlier.
#[tracing::instrument(skip(self, pdu))]
pub fn add_pdu_outlier(&self, event_id: &EventId, pdu: &CanonicalJsonObject) -> Result<()> {
self.eventid_outlierpdu.insert(
event_id.as_bytes(),
&serde_json::to_vec(&pdu).expect("CanonicalJsonObject is valid"),
)
self.db.add_pdu_outlier(event_id, pdu)
}
}

View file

@ -0,0 +1,6 @@
pub trait Data {
fn mark_as_referenced(&self, room_id: &RoomId, event_ids: &[Arc<EventId>]) -> Result<()>;
fn is_event_referenced(&self, room_id: &RoomId, event_id: &EventId) -> Result<bool>;
fn mark_event_soft_failed(&self, event_id: &EventId) -> Result<()>;
fn is_event_soft_failed(&self, event_id: &EventId) -> Result<bool>;
}

View file

@ -1,31 +1,30 @@
mod data;
pub use data::Data;
use crate::service::*;
pub struct Service<D: Data> {
db: D,
}
impl Service<_> {
#[tracing::instrument(skip(self, room_id, event_ids))]
pub fn mark_as_referenced(&self, room_id: &RoomId, event_ids: &[Arc<EventId>]) -> Result<()> {
for prev in event_ids {
let mut key = room_id.as_bytes().to_vec();
key.extend_from_slice(prev.as_bytes());
self.referencedevents.insert(&key, &[])?;
}
Ok(())
self.db.mark_as_referenced(room_id, event_ids)
}
#[tracing::instrument(skip(self))]
pub fn is_event_referenced(&self, room_id: &RoomId, event_id: &EventId) -> Result<bool> {
let mut key = room_id.as_bytes().to_vec();
key.extend_from_slice(event_id.as_bytes());
Ok(self.referencedevents.get(&key)?.is_some())
self.db.is_event_referenced(room_id, event_id)
}
#[tracing::instrument(skip(self))]
pub fn mark_event_soft_failed(&self, event_id: &EventId) -> Result<()> {
self.softfailedeventids.insert(event_id.as_bytes(), &[])
self.db.mark_event_soft_failed(event_id)
}
#[tracing::instrument(skip(self))]
pub fn is_event_soft_failed(&self, event_id: &EventId) -> Result<bool> {
self.softfailedeventids
.get(event_id.as_bytes())
.map(|o| o.is_some())
self.db.is_event_soft_failed(event_id)
}
}

View file

@ -196,3 +196,30 @@
})
}
pub fn get_shortroomid(&self, room_id: &RoomId) -> Result<Option<u64>> {
self.roomid_shortroomid
.get(room_id.as_bytes())?
.map(|bytes| {
utils::u64_from_bytes(&bytes)
.map_err(|_| Error::bad_database("Invalid shortroomid in db."))
})
.transpose()
}
pub fn get_or_create_shortroomid(
&self,
room_id: &RoomId,
globals: &super::globals::Globals,
) -> Result<u64> {
Ok(match self.roomid_shortroomid.get(room_id.as_bytes())? {
Some(short) => utils::u64_from_bytes(&short)
.map_err(|_| Error::bad_database("Invalid shortroomid in db."))?,
None => {
let short = globals.next_count()?;
self.roomid_shortroomid
.insert(room_id.as_bytes(), &short.to_be_bytes())?;
short
}
})
}