diff --git a/src/api/client_server/context.rs b/src/api/client_server/context.rs index 05303de8..8e193e6b 100644 --- a/src/api/client_server/context.rs +++ b/src/api/client_server/context.rs @@ -109,38 +109,6 @@ pub async fn get_context_route( let events_before: Vec<_> = events_before .into_iter() .map(|(_, pdu)| pdu.to_room_event()) - .filter(|event| { - if let Some(types) = &body.filter.types { - types - .iter() - .find(|t| { - t == &&event - .get_field::("type") - .expect("events should deserialize") - .expect("events should have a type") - }) - .is_some() - } else { - true - } - }) - .filter(|event| { - if !body.filter.not_types.is_empty() { - body - .filter - .not_types - .iter() - .find(|t| { - t == &&event - .get_field::("type") - .expect("events should deserialize") - .expect("events should have a type") - }) - .is_none() - } else { - true - } - }) .collect(); let events_after: Vec<_> = services() @@ -194,10 +162,10 @@ pub async fn get_context_route( .map(|(count, _)| count.stringify()) .unwrap_or_else(|| base_token.stringify()); - let events_after = events_after + let events_after: Vec<_> = events_after .into_iter() - .map(|(_, pdu)| pdu.to_room_event()); - // .collect(); + .map(|(_, pdu)| pdu.to_room_event()) + .collect(); let mut state = Vec::new(); diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs index 3778537e..c340b9e6 100644 --- a/src/api/client_server/sync.rs +++ b/src/api/client_server/sync.rs @@ -1,11 +1,13 @@ use crate::{ service::{pdu::EventHash, rooms::timeline::PduCount}, - services, utils, Error, PduEvent, Result, Ruma, RumaResponse, + services, + utils::{self, filter}, + Error, PduEvent, Result, Ruma, RumaResponse, }; use ruma::{ api::client::{ - filter::{FilterDefinition, LazyLoadOptions, RoomFilter}, + filter::{EventFormat, FilterDefinition, LazyLoadOptions, RoomFilter}, sync::sync_events::{ self, v3::{ @@ -19,7 +21,7 @@ use ruma::{ }, events::{ room::member::{MembershipState, RoomMemberEventContent}, - RoomAccountDataEventType, StateEventType, TimelineEventType, + StateEventType, TimelineEventType, }, serde::Raw, uint, DeviceId, EventId, JsOption, OwnedDeviceId, OwnedUserId, RoomId, UInt, UserId, @@ -194,7 +196,7 @@ async fn sync_helper( .unwrap_or_default(), }; - let event_fields = filter.event_fields.as_ref().map(Vec::as_slice); + let event_fields = filter.event_fields.as_ref(); let (lazy_load_enabled, lazy_load_send_redundant) = match filter.room.state.lazy_load_options { LazyLoadOptions::Enabled { @@ -233,6 +235,15 @@ async fn sync_helper( .collect::>(); for room_id in all_joined_rooms { let room_id = room_id?; + + if filter::rooms( + &Raw::new(&serde_json::json!({"room_id": room_id})).expect("json can be serialized"), + filter.room.rooms.as_ref(), + filter.room.not_rooms.as_ref(), + ) { + continue; + } + if let Ok(joined_room) = load_joined_room( &sender_user, &sender_device, @@ -245,6 +256,7 @@ async fn sync_helper( lazy_load_send_redundant, &filter.room, event_fields, + &filter.event_format, full_state, &mut device_list_updates, &mut left_encrypted_users, @@ -548,6 +560,7 @@ async fn sync_helper( knock: BTreeMap::new(), // TODO }, presence: Presence { + // HashMap events: presence_updates .into_values() .map(|v| Raw::new(&v).expect("PresenceEvent always serializes successfully")) @@ -556,6 +569,7 @@ async fn sync_helper( account_data: GlobalAccountData { events: services() .account_data + // HashMap> .changes_since(None, &sender_user, since)? .into_iter() .filter_map(|(_, v)| { @@ -614,7 +628,8 @@ async fn load_joined_room( lazy_load_enabled: bool, lazy_load_send_redundant: bool, filter: &RoomFilter, - event_fields: &[String], + _event_fields: Option<&Vec>, + event_format: &EventFormat, full_state: bool, device_list_updates: &mut HashSet, left_encrypted_users: &mut HashSet, @@ -1090,7 +1105,26 @@ async fn load_joined_room( let room_events: Vec<_> = timeline_pdus .iter() - .map(|(_, pdu)| pdu.to_sync_room_event()) + .map(|(_, pdu)| match event_format { + EventFormat::Federation => Raw::new(pdu) + .map(Raw::cast) + .expect("json can be serialized"), + _ => pdu.to_sync_room_event(), + }) + .filter(|v| { + filter::senders( + v, + filter.timeline.senders.as_ref(), + filter.timeline.not_senders.as_ref(), + ) + }) + .filter(|v| { + filter::types( + v, + filter.timeline.types.as_ref(), + filter.timeline.not_types.as_ref(), + ) + }) .collect(); let mut edus: Vec<_> = services() @@ -1099,7 +1133,23 @@ async fn load_joined_room( .read_receipt .readreceipts_since(room_id, since) .filter_map(|r| r.ok()) // Filter out buggy events + // TODO .map(|(_, _, v)| v) + .filter(|v| { + filter::senders( + v, + filter.ephemeral.senders.as_ref(), + filter.ephemeral.not_senders.as_ref(), + ) + }) + .filter(|v| { + filter::types( + v, + filter.ephemeral.types.as_ref(), + filter.ephemeral.not_types.as_ref(), + ) + }) + .take(filter.ephemeral.limit.map_or(10, u64::from).min(100) as usize) .collect(); if services() @@ -1132,34 +1182,15 @@ async fn load_joined_room( .account_data .changes_since(Some(room_id), sender_user, since)? .into_iter() - // TODO: contains_url - .filter_map(|(et, v)| { - let et = et.to_string(); - - let (not_types, types) = ( - &filter.account_data.not_types, + .filter_map(|(_, v)| serde_json::from_str(v.json().get()).ok()) + .filter(|v| { + filter::types( + v, filter.account_data.types.as_ref(), - ); - - if not_types.contains(&et) - || types.map(|v| !v.contains(&et)).unwrap_or_default() - { - return None; - } - - serde_json::from_str(v.json().get()) - .map_err(|_| Error::bad_database("Invalid account event in database.")) - .ok() + filter.account_data.not_types.as_ref(), + ) }) - .take( - filter - .account_data - .limit - .map(TryInto::try_into) - .map(Result::ok) - .flatten() - .unwrap_or(usize::MAX), - ) + .take(filter.account_data.limit.map_or(10, u64::from).min(100) as usize) .collect(), }, summary: RoomSummary { @@ -1174,12 +1205,33 @@ async fn load_joined_room( timeline: Timeline { limited: limited || joined_since_last_sync, prev_batch, + // Vec> events: room_events, }, state: State { events: state_events .iter() - .map(|pdu| pdu.to_sync_state_event()) + .map(|pdu| match event_format { + EventFormat::Federation => Raw::new(pdu) + .map(Raw::cast) + .expect("json can be serialized"), + _ => pdu.to_sync_state_event(), + }) + .filter(|v| { + filter::senders( + v, + filter.state.senders.as_ref(), + filter.state.not_senders.as_ref(), + ) + }) + .filter(|v| { + filter::types( + v, + filter.state.types.as_ref(), + filter.state.not_types.as_ref(), + ) + }) + .take(filter.state.limit.map_or(10, u64::from).min(100) as usize) .collect(), }, ephemeral: Ephemeral { events: edus }, diff --git a/src/service/account_data/mod.rs b/src/service/account_data/mod.rs index f2d2b386..7c3a9549 100644 --- a/src/service/account_data/mod.rs +++ b/src/service/account_data/mod.rs @@ -3,7 +3,7 @@ mod data; pub use data::Data; use ruma::{ - events::{AnyRoomAccountDataEvent, AnyTimelineEvent, RoomAccountDataEventType}, + events::{AnyRoomAccountDataEvent, RoomAccountDataEventType}, serde::Raw, RoomId, UserId, }; diff --git a/src/service/pdu.rs b/src/service/pdu.rs index 9f20e3cf..a51d7ec5 100644 --- a/src/service/pdu.rs +++ b/src/service/pdu.rs @@ -15,7 +15,6 @@ use serde::{Deserialize, Serialize}; use serde_json::{ json, value::{to_raw_value, RawValue as RawJsonValue}, - Map, Value, }; use std::{cmp::Ordering, collections::BTreeMap, sync::Arc}; use tracing::warn; diff --git a/src/utils/filter.rs b/src/utils/filter.rs index aa9f6809..22740c80 100644 --- a/src/utils/filter.rs +++ b/src/utils/filter.rs @@ -1,153 +1,39 @@ -// pub fn filter_room_events<'i, I: Iterator>>( -// events: I, -// sender_user: &UserId, -// sender_device: &DeviceId, -// room_id: Option<&RoomId>, -// filter: RoomEventFilter, -// ) -> crate::Result>>> { -// let (lazy_load_enabled, lazy_load_send_redundant) = match &filter.lazy_load_options { -// LazyLoadOptions::Enabled { -// include_redundant_members, -// } => (true, *include_redundant_members), -// _ => (false, false), -// }; +use ruma::{serde::Raw, OwnedRoomId, OwnedUserId}; +use serde::Deserialize; -// let it = Box::new( -// events -// .filter(|event| match &filter.rooms { -// None => true, -// Some(rooms) => rooms.iter().any(|r| { -// r.as_str() -// == event -// .get_field::("room_id") -// .expect("room events should deserialize") -// .expect("room events should have a room_id") -// }), -// }) -// .filter(|event| match &filter.not_rooms[..] { -// [] => true, -// not_rooms => not_rooms.iter().all(|r| { -// r.as_str() -// != event -// .get_field::("room_id") -// .expect("room events should deserialize") -// .expect("room events should have a room_id") -// }), -// }) -// .filter(|event| match &filter.senders { -// None => true, -// Some(rooms) => rooms.iter().any(|r| { -// r.as_str() -// == event -// .get_field::("sender") -// .expect("room events should deserialize") -// .expect("room events should have a sender") -// }), -// }) -// .filter(|event| match &filter.not_senders[..] { -// [] => true, -// not_senders => not_senders.iter().all(|r| { -// r.as_str() -// != event -// .get_field::("sender") -// .expect("room events should deserialize") -// .expect("room events should have a sender") -// }), -// }) -// .filter(|event| match &filter.types { -// None => true, -// Some(types) => types.iter().any(|t| { -// t.as_str() -// == event -// .get_field::("type") -// .expect("room events should deserialize") -// .expect("room events should have a type") -// }), -// }) -// .filter(|event| match &filter.not_types[..] { -// [] => true, -// not_types => not_types.iter().all(|t| { -// t.as_str() -// != event -// .get_field::("type") -// .expect("room events should deserialize") -// .expect("room events should have a type") -// }), -// }) -// .filter(|event| { -// let room_id = event -// .get_field::("room_id") -// .expect("room events should deserialize") -// .expect("room events should have a room_id"); -// let event_id = event -// .get_field::("event_id") -// .expect("room events should deserialize") -// .expect("room events should have an event_id"); +fn inclusion Deserialize<'a> + PartialEq>( + event: &Raw, + field: &str, + include: Option<&Vec>, + exclude: &[F], +) -> bool { + let value = event + .get_field::(field) + .expect("room events should deserialize") + .expect("field should exist"); -// services() -// .rooms -// .state_accessor -// .user_can_see_event(sender_user, &room_id, &event_id) -// .unwrap_or(false) -// }), -// ); - -// let memberships = it -// .map(|event| { -// let room_id = event -// .get_field::("room_id") -// .expect("room events should deserialize") -// .expect("room events should have a room_id"); -// let sender = event -// .get_field::("sender") -// .expect("room events should deserialize") -// .expect("room events should have a sender"); - -// (room_id, sender) -// }) -// .flat_map(|(room_id, sender)| { -// services() -// .rooms -// .lazy_loading -// .lazy_load_was_sent_before(sender_user, sender_device, &room_id, &sender) -// .map(|b| { -// if !b || lazy_load_send_redundant { -// Some(sender) -// } else { -// None -// } -// }) -// .transpose() -// }) -// .collect::>>()?; - -// Ok(it) -// } - -use serde_json::Value; - -pub fn event_fields>(mut json: Value, event_fields: I) -> Value { - let inner = json - .as_object_mut() - .expect("PduEvent should always be an object"); - - // TODO: testing this properly - for field in event_fields { - let mut paths = field.split('.').peekable(); - let mut parent = &mut *inner; - - while let Some(key) = paths.next() { - if paths.peek().is_none() { - parent.remove(key); - } else { - parent = parent - .get_mut(key) - .map(Value::as_object_mut) - .flatten() - .unwrap(); - } - } - } - - json + include + .map(|v| v.iter().any(|item| *item == value)) + .unwrap_or(true) + && exclude.iter().all(|item| *item != value) +} + +pub fn rooms( + event: &Raw, + rooms: Option<&Vec>, + not_rooms: &[OwnedRoomId], +) -> bool { + inclusion(event, "room_id", rooms, not_rooms) +} + +pub fn senders( + event: &Raw, + senders: Option<&Vec>, + not_senders: &[OwnedUserId], +) -> bool { + inclusion(event, "sender", senders, not_senders) +} + +pub fn types(event: &Raw, types: Option<&Vec>, not_types: &[String]) -> bool { + inclusion(event, "event_type", types, not_types) } diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 0475c26a..6ca6a2e3 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,5 +1,5 @@ pub mod error; -mod filter; +pub mod filter; use argon2::{Config, Variant}; use cmp::Ordering;