From 624d1fcc9a02e9e207ae4121b31a7f3f16f6f9f8 Mon Sep 17 00:00:00 2001 From: mikoto Date: Mon, 29 Apr 2024 20:59:02 +0200 Subject: [PATCH 1/7] draft: RoomEventFilter --- Cargo.toml | 2 ++ src/api/client_server/context.rs | 32 +++++++++++++++++ src/database/key_value/rooms/search.rs | 20 +++++++++-- src/database/mod.rs | 2 ++ src/utils/filter.rs | 48 ++++++++++++++++++++++++++ src/utils/mod.rs | 1 + 6 files changed, 103 insertions(+), 2 deletions(-) create mode 100644 src/utils/filter.rs diff --git a/Cargo.toml b/Cargo.toml index 3a5c2647..65209a97 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -157,6 +157,8 @@ tikv-jemallocator = { version = "0.5.0", features = [ sd-notify = { version = "0.4.1", optional = true } +url = "2.5.0" + [dependencies.rocksdb] features = ["lz4", "multi-threaded-cf", "zstd"] optional = true diff --git a/src/api/client_server/context.rs b/src/api/client_server/context.rs index 8e193e6b..011e2647 100644 --- a/src/api/client_server/context.rs +++ b/src/api/client_server/context.rs @@ -109,6 +109,38 @@ pub async fn get_context_route( let events_before: Vec<_> = events_before .into_iter() .map(|(_, pdu)| pdu.to_room_event()) + .filter(|event| { + if let Some(types) = &body.filter.types { + types + .iter() + .find(|t| { + t == &&event + .get_field::("type") + .expect("events should deserialize") + .expect("events should have a type") + }) + .is_some() + } else { + true + } + }) + .filter(|event| { + if !body.filter.not_types.is_empty() { + body + .filter + .not_types + .iter() + .find(|t| { + t == &&event + .get_field::("type") + .expect("events should deserialize") + .expect("events should have a type") + }) + .is_none() + } else { + true + } + }) .collect(); let events_after: Vec<_> = services() diff --git a/src/database/key_value/rooms/search.rs b/src/database/key_value/rooms/search.rs index ad573f06..387eb9da 100644 --- a/src/database/key_value/rooms/search.rs +++ b/src/database/key_value/rooms/search.rs @@ -1,10 +1,13 @@ +use std::str::FromStr; + use ruma::RoomId; +use url::Url; use crate::{database::KeyValueDatabase, service, services, utils, Result}; impl service::rooms::search::Data for KeyValueDatabase { fn index_pdu<'a>(&self, shortroomid: u64, pdu_id: &[u8], message_body: &str) -> Result<()> { - let mut batch = message_body + let mut token_batch = message_body .split_terminator(|c: char| !c.is_alphanumeric()) .filter(|s| !s.is_empty()) .filter(|word| word.len() <= 50) @@ -17,7 +20,20 @@ impl service::rooms::search::Data for KeyValueDatabase { (key, Vec::new()) }); - self.tokenids.insert_batch(&mut batch) + self.tokenids.insert_batch(&mut token_batch)?; + + let mut url_batch = message_body + .split_terminator(|c: char| !c.is_whitespace()) + .filter(|word| Url::from_str(word).is_ok()) + .map(|url| { + let mut key = shortroomid.to_be_bytes().to_vec(); + key.extend_from_slice(url.as_bytes()); + key.push(0xff); + key.extend_from_slice(pdu_id); // TODO: currently we save the room id a second time here + (key, Vec::new()) + }); + + self.urltokenids.insert_batch(&mut url_batch) } fn search_pdus<'a>( diff --git a/src/database/mod.rs b/src/database/mod.rs index 8d1b1913..5397fe20 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -85,6 +85,7 @@ pub struct KeyValueDatabase { pub(super) threadid_userids: Arc, // ThreadId = RoomId + Count pub(super) tokenids: Arc, // TokenId = ShortRoomId + Token + PduIdCount + pub(super) urltokenids: Arc, // useful for `RoomEventFilter::contains_url` /// Participating servers in a room. pub(super) roomserverids: Arc, // RoomServerId = RoomId + ServerName @@ -312,6 +313,7 @@ impl KeyValueDatabase { threadid_userids: builder.open_tree("threadid_userids")?, tokenids: builder.open_tree("tokenids")?, + urltokenids: builder.open_tree("urltokenids")?, roomserverids: builder.open_tree("roomserverids")?, serverroomids: builder.open_tree("serverroomids")?, diff --git a/src/utils/filter.rs b/src/utils/filter.rs new file mode 100644 index 00000000..ae0e833c --- /dev/null +++ b/src/utils/filter.rs @@ -0,0 +1,48 @@ +use ruma::{api::client::filter::RoomEventFilter, events::AnyTimelineEvent, serde::Raw}; + +pub fn filter_room_events>>( + events: I, + filter: RoomEventFilter, +) { + events + .filter(|event| match &filter.types { + None => true, + Some(types) => types.iter().any(|t| { + t.as_str() + == event + .get_field::("type") + .expect("room events should deserialize") + .expect("room events should have a type") + }), + }) + .filter(|event| match &filter.not_types[..] { + [] => true, + not_types => not_types.iter().all(|t| { + t.as_str() + != event + .get_field::("type") + .expect("room events should deserialize") + .expect("room events should have a type") + }), + }) + .filter(|event| match &filter.rooms { + None => true, + Some(rooms) => rooms.iter().any(|r| { + r.as_str() + == event + .get_field::("room_id") + .expect("room events should deserialize") + .expect("room events should have a type") + }), + }) + .filter(|event| match &filter.rooms { + None => true, + Some(rooms) => rooms.iter().all(|r| { + r.as_str() + != event + .get_field::("room_id") + .expect("room events should deserialize") + .expect("room events should have a type") + }), + }); +} diff --git a/src/utils/mod.rs b/src/utils/mod.rs index d09a1033..0475c26a 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,4 +1,5 @@ pub mod error; +mod filter; use argon2::{Config, Variant}; use cmp::Ordering; From de806e5d1b16e4ce8026ba066e3c1ea711a85d10 Mon Sep 17 00:00:00 2001 From: mikoto Date: Tue, 7 May 2024 07:42:10 +0200 Subject: [PATCH 2/7] wip --- src/api/client_server/context.rs | 6 +- src/utils/filter.rs | 170 +++++++++++++++++++++++-------- 2 files changed, 132 insertions(+), 44 deletions(-) diff --git a/src/api/client_server/context.rs b/src/api/client_server/context.rs index 011e2647..05303de8 100644 --- a/src/api/client_server/context.rs +++ b/src/api/client_server/context.rs @@ -194,10 +194,10 @@ pub async fn get_context_route( .map(|(count, _)| count.stringify()) .unwrap_or_else(|| base_token.stringify()); - let events_after: Vec<_> = events_after + let events_after = events_after .into_iter() - .map(|(_, pdu)| pdu.to_room_event()) - .collect(); + .map(|(_, pdu)| pdu.to_room_event()); + // .collect(); let mut state = Vec::new(); diff --git a/src/utils/filter.rs b/src/utils/filter.rs index ae0e833c..2c0a1a92 100644 --- a/src/utils/filter.rs +++ b/src/utils/filter.rs @@ -1,48 +1,136 @@ -use ruma::{api::client::filter::RoomEventFilter, events::AnyTimelineEvent, serde::Raw}; +use std::collections::HashSet; -pub fn filter_room_events>>( +use ruma::{ + api::client::filter::{LazyLoadOptions, RoomEventFilter}, + events::AnyTimelineEvent, + serde::Raw, + DeviceId, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UserId, +}; + +use crate::services; + +pub fn filter_room_events<'i, I: Iterator>>( events: I, + sender_user: &UserId, + sender_device: &DeviceId, + room_id: Option<&RoomId>, filter: RoomEventFilter, -) { - events - .filter(|event| match &filter.types { - None => true, - Some(types) => types.iter().any(|t| { - t.as_str() - == event - .get_field::("type") - .expect("room events should deserialize") - .expect("room events should have a type") +) -> crate::Result>>> { + let (lazy_load_enabled, lazy_load_send_redundant) = match &filter.lazy_load_options { + LazyLoadOptions::Enabled { + include_redundant_members, + } => (true, *include_redundant_members), + _ => (false, false), + }; + + let it = Box::new( + events + .filter(|event| match &filter.rooms { + None => true, + Some(rooms) => rooms.iter().any(|r| { + r.as_str() + == event + .get_field::("room_id") + .expect("room events should deserialize") + .expect("room events should have a room_id") + }), + }) + .filter(|event| match &filter.not_rooms[..] { + [] => true, + not_rooms => not_rooms.iter().all(|r| { + r.as_str() + != event + .get_field::("room_id") + .expect("room events should deserialize") + .expect("room events should have a room_id") + }), + }) + .filter(|event| match &filter.senders { + None => true, + Some(rooms) => rooms.iter().any(|r| { + r.as_str() + == event + .get_field::("sender") + .expect("room events should deserialize") + .expect("room events should have a sender") + }), + }) + .filter(|event| match &filter.not_senders[..] { + [] => true, + not_senders => not_senders.iter().all(|r| { + r.as_str() + != event + .get_field::("sender") + .expect("room events should deserialize") + .expect("room events should have a sender") + }), + }) + .filter(|event| match &filter.types { + None => true, + Some(types) => types.iter().any(|t| { + t.as_str() + == event + .get_field::("type") + .expect("room events should deserialize") + .expect("room events should have a type") + }), + }) + .filter(|event| match &filter.not_types[..] { + [] => true, + not_types => not_types.iter().all(|t| { + t.as_str() + != event + .get_field::("type") + .expect("room events should deserialize") + .expect("room events should have a type") + }), + }) + .filter(|event| { + let room_id = event + .get_field::("room_id") + .expect("room events should deserialize") + .expect("room events should have a room_id"); + let event_id = event + .get_field::("event_id") + .expect("room events should deserialize") + .expect("room events should have an event_id"); + + services() + .rooms + .state_accessor + .user_can_see_event(sender_user, &room_id, &event_id) + .unwrap_or(false) }), + ); + + let memberships = it + .map(|event| { + let room_id = event + .get_field::("room_id") + .expect("room events should deserialize") + .expect("room events should have a room_id"); + let sender = event + .get_field::("sender") + .expect("room events should deserialize") + .expect("room events should have a sender"); + + (room_id, sender) }) - .filter(|event| match &filter.not_types[..] { - [] => true, - not_types => not_types.iter().all(|t| { - t.as_str() - != event - .get_field::("type") - .expect("room events should deserialize") - .expect("room events should have a type") - }), + .flat_map(|(room_id, sender)| { + services() + .rooms + .lazy_loading + .lazy_load_was_sent_before(sender_user, sender_device, &room_id, &sender) + .map(|b| { + if !b || lazy_load_send_redundant { + Some(sender) + } else { + None + } + }) + .transpose() }) - .filter(|event| match &filter.rooms { - None => true, - Some(rooms) => rooms.iter().any(|r| { - r.as_str() - == event - .get_field::("room_id") - .expect("room events should deserialize") - .expect("room events should have a type") - }), - }) - .filter(|event| match &filter.rooms { - None => true, - Some(rooms) => rooms.iter().all(|r| { - r.as_str() - != event - .get_field::("room_id") - .expect("room events should deserialize") - .expect("room events should have a type") - }), - }); + .collect::>>()?; + + Ok(it) } From a0c4a639b2ae1e8bf1c407754005f14ceb34bef4 Mon Sep 17 00:00:00 2001 From: Matthias Ahouansou Date: Tue, 14 May 2024 21:20:16 +0100 Subject: [PATCH 3/7] docs(faq): add instructions on how to make a user admin --- docs/faq.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/faq.md b/docs/faq.md index 4c23a25c..17c8c9d8 100644 --- a/docs/faq.md +++ b/docs/faq.md @@ -35,3 +35,7 @@ Here is an example: Not really. You can reuse the domain of your current server with Conduit, but you will not be able to migrate accounts automatically. Rooms that were federated can be re-joined via the other participating servers, however media and the like may be deleted from remote servers after some time, and hence might not be recoverable. + +## How do I make someone an admin? + +Simply invite them to the admin room. Once joined, they can administer the server by interacting with the `@conduit:` user. From ec48df78daaeb769ee71511f531b5000bc5ddf7d Mon Sep 17 00:00:00 2001 From: mikoto Date: Tue, 7 May 2024 07:42:10 +0200 Subject: [PATCH 4/7] wip --- Cargo.toml | 4 +- src/api/client_server/sync.rs | 48 ++++- src/database/key_value/account_data.rs | 6 +- src/service/account_data/data.rs | 4 +- src/service/account_data/mod.rs | 4 +- src/service/pdu.rs | 1 + src/utils/filter.rs | 271 +++++++++++++------------ 7 files changed, 193 insertions(+), 145 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 65209a97..2c44f40f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,7 +78,7 @@ http = "0.2.9" # Used to find data directory for default db path directories = "5" # Used for ruma wrapper -serde_json = { version = "1.0.96", features = ["raw_value"] } +serde_json = { version = "1.0.96", features = ["raw_value", "preserve_order"] } # Used for appservice registration files serde_yaml = "0.9.21" # Used for pdu definition @@ -157,8 +157,6 @@ tikv-jemallocator = { version = "0.5.0", features = [ sd-notify = { version = "0.4.1", optional = true } -url = "2.5.0" - [dependencies.rocksdb] features = ["lz4", "multi-threaded-cf", "zstd"] optional = true diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs index e0c6e0b9..3778537e 100644 --- a/src/api/client_server/sync.rs +++ b/src/api/client_server/sync.rs @@ -5,7 +5,7 @@ use crate::{ use ruma::{ api::client::{ - filter::{FilterDefinition, LazyLoadOptions}, + filter::{FilterDefinition, LazyLoadOptions, RoomFilter}, sync::sync_events::{ self, v3::{ @@ -19,7 +19,7 @@ use ruma::{ }, events::{ room::member::{MembershipState, RoomMemberEventContent}, - StateEventType, TimelineEventType, + RoomAccountDataEventType, StateEventType, TimelineEventType, }, serde::Raw, uint, DeviceId, EventId, JsOption, OwnedDeviceId, OwnedUserId, RoomId, UInt, UserId, @@ -194,6 +194,8 @@ async fn sync_helper( .unwrap_or_default(), }; + let event_fields = filter.event_fields.as_ref().map(Vec::as_slice); + let (lazy_load_enabled, lazy_load_send_redundant) = match filter.room.state.lazy_load_options { LazyLoadOptions::Enabled { include_redundant_members: redundant, @@ -241,6 +243,8 @@ async fn sync_helper( next_batchcount, lazy_load_enabled, lazy_load_send_redundant, + &filter.room, + event_fields, full_state, &mut device_list_updates, &mut left_encrypted_users, @@ -289,11 +293,14 @@ async fn sync_helper( } let mut left_rooms = BTreeMap::new(); - let all_left_rooms: Vec<_> = services() - .rooms - .state_cache - .rooms_left(&sender_user) - .collect(); + let all_left_rooms = match filter.room.include_leave { + false => Vec::with_capacity(0), + true => services() + .rooms + .state_cache + .rooms_left(&sender_user) + .collect(), + }; for result in all_left_rooms { let (room_id, _) = result?; @@ -606,6 +613,8 @@ async fn load_joined_room( next_batchcount: PduCount, lazy_load_enabled: bool, lazy_load_send_redundant: bool, + filter: &RoomFilter, + event_fields: &[String], full_state: bool, device_list_updates: &mut HashSet, left_encrypted_users: &mut HashSet, @@ -1123,11 +1132,34 @@ async fn load_joined_room( .account_data .changes_since(Some(room_id), sender_user, since)? .into_iter() - .filter_map(|(_, v)| { + // TODO: contains_url + .filter_map(|(et, v)| { + let et = et.to_string(); + + let (not_types, types) = ( + &filter.account_data.not_types, + filter.account_data.types.as_ref(), + ); + + if not_types.contains(&et) + || types.map(|v| !v.contains(&et)).unwrap_or_default() + { + return None; + } + serde_json::from_str(v.json().get()) .map_err(|_| Error::bad_database("Invalid account event in database.")) .ok() }) + .take( + filter + .account_data + .limit + .map(TryInto::try_into) + .map(Result::ok) + .flatten() + .unwrap_or(usize::MAX), + ) .collect(), }, summary: RoomSummary { diff --git a/src/database/key_value/account_data.rs b/src/database/key_value/account_data.rs index 970b36b5..b6d1fc26 100644 --- a/src/database/key_value/account_data.rs +++ b/src/database/key_value/account_data.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use ruma::{ api::client::error::ErrorKind, - events::{AnyEphemeralRoomEvent, RoomAccountDataEventType}, + events::{AnyRoomAccountDataEvent, RoomAccountDataEventType}, serde::Raw, RoomId, UserId, }; @@ -101,7 +101,7 @@ impl service::account_data::Data for KeyValueDatabase { room_id: Option<&RoomId>, user_id: &UserId, since: u64, - ) -> Result>> { + ) -> Result>> { let mut userdata = HashMap::new(); let mut prefix = room_id @@ -129,7 +129,7 @@ impl service::account_data::Data for KeyValueDatabase { )?) .map_err(|_| Error::bad_database("RoomUserData ID in db is invalid."))?, ), - serde_json::from_slice::>(&v).map_err(|_| { + serde_json::from_slice::>(&v).map_err(|_| { Error::bad_database("Database contains invalid account data.") })?, )) diff --git a/src/service/account_data/data.rs b/src/service/account_data/data.rs index c7c92981..c6eb23c0 100644 --- a/src/service/account_data/data.rs +++ b/src/service/account_data/data.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use crate::Result; use ruma::{ - events::{AnyEphemeralRoomEvent, RoomAccountDataEventType}, + events::{AnyRoomAccountDataEvent, RoomAccountDataEventType}, serde::Raw, RoomId, UserId, }; @@ -31,5 +31,5 @@ pub trait Data: Send + Sync { room_id: Option<&RoomId>, user_id: &UserId, since: u64, - ) -> Result>>; + ) -> Result>>; } diff --git a/src/service/account_data/mod.rs b/src/service/account_data/mod.rs index f9c49b1a..f2d2b386 100644 --- a/src/service/account_data/mod.rs +++ b/src/service/account_data/mod.rs @@ -3,7 +3,7 @@ mod data; pub use data::Data; use ruma::{ - events::{AnyEphemeralRoomEvent, RoomAccountDataEventType}, + events::{AnyRoomAccountDataEvent, AnyTimelineEvent, RoomAccountDataEventType}, serde::Raw, RoomId, UserId, }; @@ -47,7 +47,7 @@ impl Service { room_id: Option<&RoomId>, user_id: &UserId, since: u64, - ) -> Result>> { + ) -> Result>> { self.db.changes_since(room_id, user_id, since) } } diff --git a/src/service/pdu.rs b/src/service/pdu.rs index a51d7ec5..9f20e3cf 100644 --- a/src/service/pdu.rs +++ b/src/service/pdu.rs @@ -15,6 +15,7 @@ use serde::{Deserialize, Serialize}; use serde_json::{ json, value::{to_raw_value, RawValue as RawJsonValue}, + Map, Value, }; use std::{cmp::Ordering, collections::BTreeMap, sync::Arc}; use tracing::warn; diff --git a/src/utils/filter.rs b/src/utils/filter.rs index 2c0a1a92..aa9f6809 100644 --- a/src/utils/filter.rs +++ b/src/utils/filter.rs @@ -1,136 +1,153 @@ -use std::collections::HashSet; +// pub fn filter_room_events<'i, I: Iterator>>( +// events: I, +// sender_user: &UserId, +// sender_device: &DeviceId, +// room_id: Option<&RoomId>, +// filter: RoomEventFilter, +// ) -> crate::Result>>> { +// let (lazy_load_enabled, lazy_load_send_redundant) = match &filter.lazy_load_options { +// LazyLoadOptions::Enabled { +// include_redundant_members, +// } => (true, *include_redundant_members), +// _ => (false, false), +// }; -use ruma::{ - api::client::filter::{LazyLoadOptions, RoomEventFilter}, - events::AnyTimelineEvent, - serde::Raw, - DeviceId, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UserId, -}; +// let it = Box::new( +// events +// .filter(|event| match &filter.rooms { +// None => true, +// Some(rooms) => rooms.iter().any(|r| { +// r.as_str() +// == event +// .get_field::("room_id") +// .expect("room events should deserialize") +// .expect("room events should have a room_id") +// }), +// }) +// .filter(|event| match &filter.not_rooms[..] { +// [] => true, +// not_rooms => not_rooms.iter().all(|r| { +// r.as_str() +// != event +// .get_field::("room_id") +// .expect("room events should deserialize") +// .expect("room events should have a room_id") +// }), +// }) +// .filter(|event| match &filter.senders { +// None => true, +// Some(rooms) => rooms.iter().any(|r| { +// r.as_str() +// == event +// .get_field::("sender") +// .expect("room events should deserialize") +// .expect("room events should have a sender") +// }), +// }) +// .filter(|event| match &filter.not_senders[..] { +// [] => true, +// not_senders => not_senders.iter().all(|r| { +// r.as_str() +// != event +// .get_field::("sender") +// .expect("room events should deserialize") +// .expect("room events should have a sender") +// }), +// }) +// .filter(|event| match &filter.types { +// None => true, +// Some(types) => types.iter().any(|t| { +// t.as_str() +// == event +// .get_field::("type") +// .expect("room events should deserialize") +// .expect("room events should have a type") +// }), +// }) +// .filter(|event| match &filter.not_types[..] { +// [] => true, +// not_types => not_types.iter().all(|t| { +// t.as_str() +// != event +// .get_field::("type") +// .expect("room events should deserialize") +// .expect("room events should have a type") +// }), +// }) +// .filter(|event| { +// let room_id = event +// .get_field::("room_id") +// .expect("room events should deserialize") +// .expect("room events should have a room_id"); +// let event_id = event +// .get_field::("event_id") +// .expect("room events should deserialize") +// .expect("room events should have an event_id"); -use crate::services; +// services() +// .rooms +// .state_accessor +// .user_can_see_event(sender_user, &room_id, &event_id) +// .unwrap_or(false) +// }), +// ); -pub fn filter_room_events<'i, I: Iterator>>( - events: I, - sender_user: &UserId, - sender_device: &DeviceId, - room_id: Option<&RoomId>, - filter: RoomEventFilter, -) -> crate::Result>>> { - let (lazy_load_enabled, lazy_load_send_redundant) = match &filter.lazy_load_options { - LazyLoadOptions::Enabled { - include_redundant_members, - } => (true, *include_redundant_members), - _ => (false, false), - }; +// let memberships = it +// .map(|event| { +// let room_id = event +// .get_field::("room_id") +// .expect("room events should deserialize") +// .expect("room events should have a room_id"); +// let sender = event +// .get_field::("sender") +// .expect("room events should deserialize") +// .expect("room events should have a sender"); - let it = Box::new( - events - .filter(|event| match &filter.rooms { - None => true, - Some(rooms) => rooms.iter().any(|r| { - r.as_str() - == event - .get_field::("room_id") - .expect("room events should deserialize") - .expect("room events should have a room_id") - }), - }) - .filter(|event| match &filter.not_rooms[..] { - [] => true, - not_rooms => not_rooms.iter().all(|r| { - r.as_str() - != event - .get_field::("room_id") - .expect("room events should deserialize") - .expect("room events should have a room_id") - }), - }) - .filter(|event| match &filter.senders { - None => true, - Some(rooms) => rooms.iter().any(|r| { - r.as_str() - == event - .get_field::("sender") - .expect("room events should deserialize") - .expect("room events should have a sender") - }), - }) - .filter(|event| match &filter.not_senders[..] { - [] => true, - not_senders => not_senders.iter().all(|r| { - r.as_str() - != event - .get_field::("sender") - .expect("room events should deserialize") - .expect("room events should have a sender") - }), - }) - .filter(|event| match &filter.types { - None => true, - Some(types) => types.iter().any(|t| { - t.as_str() - == event - .get_field::("type") - .expect("room events should deserialize") - .expect("room events should have a type") - }), - }) - .filter(|event| match &filter.not_types[..] { - [] => true, - not_types => not_types.iter().all(|t| { - t.as_str() - != event - .get_field::("type") - .expect("room events should deserialize") - .expect("room events should have a type") - }), - }) - .filter(|event| { - let room_id = event - .get_field::("room_id") - .expect("room events should deserialize") - .expect("room events should have a room_id"); - let event_id = event - .get_field::("event_id") - .expect("room events should deserialize") - .expect("room events should have an event_id"); +// (room_id, sender) +// }) +// .flat_map(|(room_id, sender)| { +// services() +// .rooms +// .lazy_loading +// .lazy_load_was_sent_before(sender_user, sender_device, &room_id, &sender) +// .map(|b| { +// if !b || lazy_load_send_redundant { +// Some(sender) +// } else { +// None +// } +// }) +// .transpose() +// }) +// .collect::>>()?; - services() - .rooms - .state_accessor - .user_can_see_event(sender_user, &room_id, &event_id) - .unwrap_or(false) - }), - ); +// Ok(it) +// } - let memberships = it - .map(|event| { - let room_id = event - .get_field::("room_id") - .expect("room events should deserialize") - .expect("room events should have a room_id"); - let sender = event - .get_field::("sender") - .expect("room events should deserialize") - .expect("room events should have a sender"); +use serde_json::Value; - (room_id, sender) - }) - .flat_map(|(room_id, sender)| { - services() - .rooms - .lazy_loading - .lazy_load_was_sent_before(sender_user, sender_device, &room_id, &sender) - .map(|b| { - if !b || lazy_load_send_redundant { - Some(sender) - } else { - None - } - }) - .transpose() - }) - .collect::>>()?; +pub fn event_fields>(mut json: Value, event_fields: I) -> Value { + let inner = json + .as_object_mut() + .expect("PduEvent should always be an object"); - Ok(it) + // TODO: testing this properly + for field in event_fields { + let mut paths = field.split('.').peekable(); + let mut parent = &mut *inner; + + while let Some(key) = paths.next() { + if paths.peek().is_none() { + parent.remove(key); + } else { + parent = parent + .get_mut(key) + .map(Value::as_object_mut) + .flatten() + .unwrap(); + } + } + } + + json } From 96074406b12eff81bb142c4c802f50c520569241 Mon Sep 17 00:00:00 2001 From: mikoto Date: Wed, 22 May 2024 23:26:07 +0200 Subject: [PATCH 5/7] feat: apply RoomFilter --- src/api/client_server/context.rs | 38 +------ src/api/client_server/sync.rs | 118 ++++++++++++++------ src/service/account_data/mod.rs | 2 +- src/service/pdu.rs | 1 - src/utils/filter.rs | 186 ++++++------------------------- src/utils/mod.rs | 2 +- 6 files changed, 126 insertions(+), 221 deletions(-) diff --git a/src/api/client_server/context.rs b/src/api/client_server/context.rs index 05303de8..8e193e6b 100644 --- a/src/api/client_server/context.rs +++ b/src/api/client_server/context.rs @@ -109,38 +109,6 @@ pub async fn get_context_route( let events_before: Vec<_> = events_before .into_iter() .map(|(_, pdu)| pdu.to_room_event()) - .filter(|event| { - if let Some(types) = &body.filter.types { - types - .iter() - .find(|t| { - t == &&event - .get_field::("type") - .expect("events should deserialize") - .expect("events should have a type") - }) - .is_some() - } else { - true - } - }) - .filter(|event| { - if !body.filter.not_types.is_empty() { - body - .filter - .not_types - .iter() - .find(|t| { - t == &&event - .get_field::("type") - .expect("events should deserialize") - .expect("events should have a type") - }) - .is_none() - } else { - true - } - }) .collect(); let events_after: Vec<_> = services() @@ -194,10 +162,10 @@ pub async fn get_context_route( .map(|(count, _)| count.stringify()) .unwrap_or_else(|| base_token.stringify()); - let events_after = events_after + let events_after: Vec<_> = events_after .into_iter() - .map(|(_, pdu)| pdu.to_room_event()); - // .collect(); + .map(|(_, pdu)| pdu.to_room_event()) + .collect(); let mut state = Vec::new(); diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs index 3778537e..c340b9e6 100644 --- a/src/api/client_server/sync.rs +++ b/src/api/client_server/sync.rs @@ -1,11 +1,13 @@ use crate::{ service::{pdu::EventHash, rooms::timeline::PduCount}, - services, utils, Error, PduEvent, Result, Ruma, RumaResponse, + services, + utils::{self, filter}, + Error, PduEvent, Result, Ruma, RumaResponse, }; use ruma::{ api::client::{ - filter::{FilterDefinition, LazyLoadOptions, RoomFilter}, + filter::{EventFormat, FilterDefinition, LazyLoadOptions, RoomFilter}, sync::sync_events::{ self, v3::{ @@ -19,7 +21,7 @@ use ruma::{ }, events::{ room::member::{MembershipState, RoomMemberEventContent}, - RoomAccountDataEventType, StateEventType, TimelineEventType, + StateEventType, TimelineEventType, }, serde::Raw, uint, DeviceId, EventId, JsOption, OwnedDeviceId, OwnedUserId, RoomId, UInt, UserId, @@ -194,7 +196,7 @@ async fn sync_helper( .unwrap_or_default(), }; - let event_fields = filter.event_fields.as_ref().map(Vec::as_slice); + let event_fields = filter.event_fields.as_ref(); let (lazy_load_enabled, lazy_load_send_redundant) = match filter.room.state.lazy_load_options { LazyLoadOptions::Enabled { @@ -233,6 +235,15 @@ async fn sync_helper( .collect::>(); for room_id in all_joined_rooms { let room_id = room_id?; + + if filter::rooms( + &Raw::new(&serde_json::json!({"room_id": room_id})).expect("json can be serialized"), + filter.room.rooms.as_ref(), + filter.room.not_rooms.as_ref(), + ) { + continue; + } + if let Ok(joined_room) = load_joined_room( &sender_user, &sender_device, @@ -245,6 +256,7 @@ async fn sync_helper( lazy_load_send_redundant, &filter.room, event_fields, + &filter.event_format, full_state, &mut device_list_updates, &mut left_encrypted_users, @@ -548,6 +560,7 @@ async fn sync_helper( knock: BTreeMap::new(), // TODO }, presence: Presence { + // HashMap events: presence_updates .into_values() .map(|v| Raw::new(&v).expect("PresenceEvent always serializes successfully")) @@ -556,6 +569,7 @@ async fn sync_helper( account_data: GlobalAccountData { events: services() .account_data + // HashMap> .changes_since(None, &sender_user, since)? .into_iter() .filter_map(|(_, v)| { @@ -614,7 +628,8 @@ async fn load_joined_room( lazy_load_enabled: bool, lazy_load_send_redundant: bool, filter: &RoomFilter, - event_fields: &[String], + _event_fields: Option<&Vec>, + event_format: &EventFormat, full_state: bool, device_list_updates: &mut HashSet, left_encrypted_users: &mut HashSet, @@ -1090,7 +1105,26 @@ async fn load_joined_room( let room_events: Vec<_> = timeline_pdus .iter() - .map(|(_, pdu)| pdu.to_sync_room_event()) + .map(|(_, pdu)| match event_format { + EventFormat::Federation => Raw::new(pdu) + .map(Raw::cast) + .expect("json can be serialized"), + _ => pdu.to_sync_room_event(), + }) + .filter(|v| { + filter::senders( + v, + filter.timeline.senders.as_ref(), + filter.timeline.not_senders.as_ref(), + ) + }) + .filter(|v| { + filter::types( + v, + filter.timeline.types.as_ref(), + filter.timeline.not_types.as_ref(), + ) + }) .collect(); let mut edus: Vec<_> = services() @@ -1099,7 +1133,23 @@ async fn load_joined_room( .read_receipt .readreceipts_since(room_id, since) .filter_map(|r| r.ok()) // Filter out buggy events + // TODO .map(|(_, _, v)| v) + .filter(|v| { + filter::senders( + v, + filter.ephemeral.senders.as_ref(), + filter.ephemeral.not_senders.as_ref(), + ) + }) + .filter(|v| { + filter::types( + v, + filter.ephemeral.types.as_ref(), + filter.ephemeral.not_types.as_ref(), + ) + }) + .take(filter.ephemeral.limit.map_or(10, u64::from).min(100) as usize) .collect(); if services() @@ -1132,34 +1182,15 @@ async fn load_joined_room( .account_data .changes_since(Some(room_id), sender_user, since)? .into_iter() - // TODO: contains_url - .filter_map(|(et, v)| { - let et = et.to_string(); - - let (not_types, types) = ( - &filter.account_data.not_types, + .filter_map(|(_, v)| serde_json::from_str(v.json().get()).ok()) + .filter(|v| { + filter::types( + v, filter.account_data.types.as_ref(), - ); - - if not_types.contains(&et) - || types.map(|v| !v.contains(&et)).unwrap_or_default() - { - return None; - } - - serde_json::from_str(v.json().get()) - .map_err(|_| Error::bad_database("Invalid account event in database.")) - .ok() + filter.account_data.not_types.as_ref(), + ) }) - .take( - filter - .account_data - .limit - .map(TryInto::try_into) - .map(Result::ok) - .flatten() - .unwrap_or(usize::MAX), - ) + .take(filter.account_data.limit.map_or(10, u64::from).min(100) as usize) .collect(), }, summary: RoomSummary { @@ -1174,12 +1205,33 @@ async fn load_joined_room( timeline: Timeline { limited: limited || joined_since_last_sync, prev_batch, + // Vec> events: room_events, }, state: State { events: state_events .iter() - .map(|pdu| pdu.to_sync_state_event()) + .map(|pdu| match event_format { + EventFormat::Federation => Raw::new(pdu) + .map(Raw::cast) + .expect("json can be serialized"), + _ => pdu.to_sync_state_event(), + }) + .filter(|v| { + filter::senders( + v, + filter.state.senders.as_ref(), + filter.state.not_senders.as_ref(), + ) + }) + .filter(|v| { + filter::types( + v, + filter.state.types.as_ref(), + filter.state.not_types.as_ref(), + ) + }) + .take(filter.state.limit.map_or(10, u64::from).min(100) as usize) .collect(), }, ephemeral: Ephemeral { events: edus }, diff --git a/src/service/account_data/mod.rs b/src/service/account_data/mod.rs index f2d2b386..7c3a9549 100644 --- a/src/service/account_data/mod.rs +++ b/src/service/account_data/mod.rs @@ -3,7 +3,7 @@ mod data; pub use data::Data; use ruma::{ - events::{AnyRoomAccountDataEvent, AnyTimelineEvent, RoomAccountDataEventType}, + events::{AnyRoomAccountDataEvent, RoomAccountDataEventType}, serde::Raw, RoomId, UserId, }; diff --git a/src/service/pdu.rs b/src/service/pdu.rs index 9f20e3cf..a51d7ec5 100644 --- a/src/service/pdu.rs +++ b/src/service/pdu.rs @@ -15,7 +15,6 @@ use serde::{Deserialize, Serialize}; use serde_json::{ json, value::{to_raw_value, RawValue as RawJsonValue}, - Map, Value, }; use std::{cmp::Ordering, collections::BTreeMap, sync::Arc}; use tracing::warn; diff --git a/src/utils/filter.rs b/src/utils/filter.rs index aa9f6809..22740c80 100644 --- a/src/utils/filter.rs +++ b/src/utils/filter.rs @@ -1,153 +1,39 @@ -// pub fn filter_room_events<'i, I: Iterator>>( -// events: I, -// sender_user: &UserId, -// sender_device: &DeviceId, -// room_id: Option<&RoomId>, -// filter: RoomEventFilter, -// ) -> crate::Result>>> { -// let (lazy_load_enabled, lazy_load_send_redundant) = match &filter.lazy_load_options { -// LazyLoadOptions::Enabled { -// include_redundant_members, -// } => (true, *include_redundant_members), -// _ => (false, false), -// }; +use ruma::{serde::Raw, OwnedRoomId, OwnedUserId}; +use serde::Deserialize; -// let it = Box::new( -// events -// .filter(|event| match &filter.rooms { -// None => true, -// Some(rooms) => rooms.iter().any(|r| { -// r.as_str() -// == event -// .get_field::("room_id") -// .expect("room events should deserialize") -// .expect("room events should have a room_id") -// }), -// }) -// .filter(|event| match &filter.not_rooms[..] { -// [] => true, -// not_rooms => not_rooms.iter().all(|r| { -// r.as_str() -// != event -// .get_field::("room_id") -// .expect("room events should deserialize") -// .expect("room events should have a room_id") -// }), -// }) -// .filter(|event| match &filter.senders { -// None => true, -// Some(rooms) => rooms.iter().any(|r| { -// r.as_str() -// == event -// .get_field::("sender") -// .expect("room events should deserialize") -// .expect("room events should have a sender") -// }), -// }) -// .filter(|event| match &filter.not_senders[..] { -// [] => true, -// not_senders => not_senders.iter().all(|r| { -// r.as_str() -// != event -// .get_field::("sender") -// .expect("room events should deserialize") -// .expect("room events should have a sender") -// }), -// }) -// .filter(|event| match &filter.types { -// None => true, -// Some(types) => types.iter().any(|t| { -// t.as_str() -// == event -// .get_field::("type") -// .expect("room events should deserialize") -// .expect("room events should have a type") -// }), -// }) -// .filter(|event| match &filter.not_types[..] { -// [] => true, -// not_types => not_types.iter().all(|t| { -// t.as_str() -// != event -// .get_field::("type") -// .expect("room events should deserialize") -// .expect("room events should have a type") -// }), -// }) -// .filter(|event| { -// let room_id = event -// .get_field::("room_id") -// .expect("room events should deserialize") -// .expect("room events should have a room_id"); -// let event_id = event -// .get_field::("event_id") -// .expect("room events should deserialize") -// .expect("room events should have an event_id"); +fn inclusion Deserialize<'a> + PartialEq>( + event: &Raw, + field: &str, + include: Option<&Vec>, + exclude: &[F], +) -> bool { + let value = event + .get_field::(field) + .expect("room events should deserialize") + .expect("field should exist"); -// services() -// .rooms -// .state_accessor -// .user_can_see_event(sender_user, &room_id, &event_id) -// .unwrap_or(false) -// }), -// ); - -// let memberships = it -// .map(|event| { -// let room_id = event -// .get_field::("room_id") -// .expect("room events should deserialize") -// .expect("room events should have a room_id"); -// let sender = event -// .get_field::("sender") -// .expect("room events should deserialize") -// .expect("room events should have a sender"); - -// (room_id, sender) -// }) -// .flat_map(|(room_id, sender)| { -// services() -// .rooms -// .lazy_loading -// .lazy_load_was_sent_before(sender_user, sender_device, &room_id, &sender) -// .map(|b| { -// if !b || lazy_load_send_redundant { -// Some(sender) -// } else { -// None -// } -// }) -// .transpose() -// }) -// .collect::>>()?; - -// Ok(it) -// } - -use serde_json::Value; - -pub fn event_fields>(mut json: Value, event_fields: I) -> Value { - let inner = json - .as_object_mut() - .expect("PduEvent should always be an object"); - - // TODO: testing this properly - for field in event_fields { - let mut paths = field.split('.').peekable(); - let mut parent = &mut *inner; - - while let Some(key) = paths.next() { - if paths.peek().is_none() { - parent.remove(key); - } else { - parent = parent - .get_mut(key) - .map(Value::as_object_mut) - .flatten() - .unwrap(); - } - } - } - - json + include + .map(|v| v.iter().any(|item| *item == value)) + .unwrap_or(true) + && exclude.iter().all(|item| *item != value) +} + +pub fn rooms( + event: &Raw, + rooms: Option<&Vec>, + not_rooms: &[OwnedRoomId], +) -> bool { + inclusion(event, "room_id", rooms, not_rooms) +} + +pub fn senders( + event: &Raw, + senders: Option<&Vec>, + not_senders: &[OwnedUserId], +) -> bool { + inclusion(event, "sender", senders, not_senders) +} + +pub fn types(event: &Raw, types: Option<&Vec>, not_types: &[String]) -> bool { + inclusion(event, "event_type", types, not_types) } diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 0475c26a..6ca6a2e3 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,5 +1,5 @@ pub mod error; -mod filter; +pub mod filter; use argon2::{Config, Variant}; use cmp::Ordering; From 7438f2c4e0d8e517fb30c6912d2f106a0843aef1 Mon Sep 17 00:00:00 2001 From: mikoto Date: Thu, 23 May 2024 00:37:49 +0200 Subject: [PATCH 6/7] contains_url draft --- src/api/client_server/sync.rs | 6 +++- src/database/key_value/rooms/search.rs | 40 ++++++++++++++++++-------- src/service/rooms/search/data.rs | 2 ++ 3 files changed, 35 insertions(+), 13 deletions(-) diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs index c340b9e6..8c95a3ef 100644 --- a/src/api/client_server/sync.rs +++ b/src/api/client_server/sync.rs @@ -7,7 +7,7 @@ use crate::{ use ruma::{ api::client::{ - filter::{EventFormat, FilterDefinition, LazyLoadOptions, RoomFilter}, + filter::{EventFormat, FilterDefinition, LazyLoadOptions, RoomFilter, UrlFilter}, sync::sync_events::{ self, v3::{ @@ -1111,6 +1111,10 @@ async fn load_joined_room( .expect("json can be serialized"), _ => pdu.to_sync_room_event(), }) + .filter(|v| match filter.timeline.url_filter.unwrap_or(true) { + UrlFilter::EventsWithUrl => todo!(), + UrlFilter::EventsWithoutUrl => todo!(), + }) .filter(|v| { filter::senders( v, diff --git a/src/database/key_value/rooms/search.rs b/src/database/key_value/rooms/search.rs index 387eb9da..c2bdfa8c 100644 --- a/src/database/key_value/rooms/search.rs +++ b/src/database/key_value/rooms/search.rs @@ -7,10 +7,17 @@ use crate::{database::KeyValueDatabase, service, services, utils, Result}; impl service::rooms::search::Data for KeyValueDatabase { fn index_pdu<'a>(&self, shortroomid: u64, pdu_id: &[u8], message_body: &str) -> Result<()> { + let mut contains_url = false; + let mut token_batch = message_body .split_terminator(|c: char| !c.is_alphanumeric()) .filter(|s| !s.is_empty()) - .filter(|word| word.len() <= 50) + .filter(|word| { + contains_url = + contains_url || (word.starts_with("http") && Url::from_str(word).is_ok()); + + word.len() <= 50 + }) .map(str::to_lowercase) .map(|word| { let mut key = shortroomid.to_be_bytes().to_vec(); @@ -22,18 +29,15 @@ impl service::rooms::search::Data for KeyValueDatabase { self.tokenids.insert_batch(&mut token_batch)?; - let mut url_batch = message_body - .split_terminator(|c: char| !c.is_whitespace()) - .filter(|word| Url::from_str(word).is_ok()) - .map(|url| { - let mut key = shortroomid.to_be_bytes().to_vec(); - key.extend_from_slice(url.as_bytes()); - key.push(0xff); - key.extend_from_slice(pdu_id); // TODO: currently we save the room id a second time here - (key, Vec::new()) - }); + if contains_url { + let mut key = shortroomid.to_be_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(pdu_id); - self.urltokenids.insert_batch(&mut url_batch) + self.urltokenids.insert(&key, <&[u8]>::default())?; + } + + Ok(()) } fn search_pdus<'a>( @@ -80,4 +84,16 @@ impl service::rooms::search::Data for KeyValueDatabase { Ok(Some((Box::new(common_elements), words))) } + + fn contains_url<'a>(&'a self, room_id: &RoomId, pdu_id: &[u8]) -> Result { + let prefix = services() + .rooms + .short + .get_shortroomid(room_id)? + .expect("room exists") + .to_be_bytes() + .to_vec(); + + todo!() + } } diff --git a/src/service/rooms/search/data.rs b/src/service/rooms/search/data.rs index 7ea7e3d1..7d046fa5 100644 --- a/src/service/rooms/search/data.rs +++ b/src/service/rooms/search/data.rs @@ -10,4 +10,6 @@ pub trait Data: Send + Sync { room_id: &RoomId, search_string: &str, ) -> Result> + 'a>, Vec)>>; + + fn contains_url<'a>(&'a self, room_id: &RoomId, pdu_id: &[u8]) -> Result; } From a39eaf20800f5cf3fa84dab0accfcf21ca6d9d69 Mon Sep 17 00:00:00 2001 From: mikoto Date: Thu, 23 May 2024 02:19:59 +0200 Subject: [PATCH 7/7] create indices for and filter by contains_url --- Cargo.lock | 1 + rust-toolchain.toml | 1 + src/api/client_server/context.rs | 83 +++++++++++++++++++++----- src/api/client_server/sync.rs | 11 ++-- src/database/key_value/rooms/search.rs | 14 +++-- src/service/rooms/search/data.rs | 2 +- src/service/rooms/search/mod.rs | 5 ++ src/utils/filter.rs | 28 ++++++++- 8 files changed, 117 insertions(+), 28 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8453335a..abfbda74 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2441,6 +2441,7 @@ version = "1.0.114" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c5f09b1bd632ef549eaa9f60a1f8de742bdbc698e6cee2095fc84dde5f549ae0" dependencies = [ + "indexmap 2.2.5", "itoa", "ryu", "serde", diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 3ffd3a5e..37cf28af 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -14,6 +14,7 @@ channel = "1.78.0" components = [ # For rust-analyzer "rust-src", + "rust-analyzer", ] targets = [ "aarch64-unknown-linux-musl", diff --git a/src/api/client_server/context.rs b/src/api/client_server/context.rs index 8e193e6b..c4e4c732 100644 --- a/src/api/client_server/context.rs +++ b/src/api/client_server/context.rs @@ -1,7 +1,8 @@ -use crate::{services, Error, Result, Ruma}; +use crate::{services, utils::filter, Error, Result, Ruma}; use ruma::{ api::client::{context::get_context, error::ErrorKind, filter::LazyLoadOptions}, events::StateEventType, + serde::Raw, }; use std::collections::HashSet; use tracing::error; @@ -78,7 +79,6 @@ pub async fn get_context_route( .rooms .timeline .pdus_until(sender_user, &room_id, base_token)? - .take(limit / 2) .filter_map(|r| r.ok()) // Remove buggy events .filter(|(_, pdu)| { services() @@ -109,13 +109,33 @@ pub async fn get_context_route( let events_before: Vec<_> = events_before .into_iter() .map(|(_, pdu)| pdu.to_room_event()) + .filter(|v| { + filter::senders( + v, + body.filter.senders.as_ref(), + body.filter.not_senders.as_ref(), + ) + }) + .filter(|v| { + filter::types( + v, + body.filter.types.as_ref(), + body.filter.not_types.as_ref(), + ) + }) + .filter(|v| { + body.filter + .url_filter + .map(|f| filter::url(v, &room_id, f)) + .unwrap_or(true) + }) + .take(limit / 2) .collect(); let events_after: Vec<_> = services() .rooms .timeline .pdus_after(sender_user, &room_id, base_token)? - .take(limit / 2) .filter_map(|r| r.ok()) // Remove buggy events .filter(|(_, pdu)| { services() @@ -165,6 +185,27 @@ pub async fn get_context_route( let events_after: Vec<_> = events_after .into_iter() .map(|(_, pdu)| pdu.to_room_event()) + .filter(|v| { + filter::senders( + v, + body.filter.senders.as_ref(), + body.filter.not_senders.as_ref(), + ) + }) + .filter(|v| { + filter::types( + v, + body.filter.types.as_ref(), + body.filter.not_types.as_ref(), + ) + }) + .filter(|v| { + body.filter + .url_filter + .map(|f| filter::url(v, &room_id, f)) + .unwrap_or(true) + }) + .take(limit / 2) .collect(); let mut state = Vec::new(); @@ -175,24 +216,34 @@ pub async fn get_context_route( .short .get_statekey_from_short(shortstatekey)?; - if event_type != StateEventType::RoomMember { - let pdu = match services().rooms.timeline.get_pdu(&id)? { - Some(pdu) => pdu, + if !filter::types( + &Raw::new(&serde_json::json!({"type": event_type})).expect("json can be serialized"), + body.filter.types.as_ref(), + body.filter.not_types.as_ref(), + ) { + continue; + } + + if event_type != StateEventType::RoomMember + || (!lazy_load_enabled || lazy_loaded.contains(&state_key)) + { + let event = match services().rooms.timeline.get_pdu(&id)? { + Some(pdu) => pdu.to_state_event(), None => { error!("Pdu in state not found: {}", id); continue; } }; - state.push(pdu.to_state_event()); - } else if !lazy_load_enabled || lazy_loaded.contains(&state_key) { - let pdu = match services().rooms.timeline.get_pdu(&id)? { - Some(pdu) => pdu, - None => { - error!("Pdu in state not found: {}", id); - continue; - } - }; - state.push(pdu.to_state_event()); + + if !filter::senders( + &event, + body.filter.senders.as_ref(), + body.filter.not_senders.as_ref(), + ) { + continue; + } + + state.push(event); } } diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs index 8c95a3ef..59026a70 100644 --- a/src/api/client_server/sync.rs +++ b/src/api/client_server/sync.rs @@ -7,7 +7,7 @@ use crate::{ use ruma::{ api::client::{ - filter::{EventFormat, FilterDefinition, LazyLoadOptions, RoomFilter, UrlFilter}, + filter::{EventFormat, FilterDefinition, LazyLoadOptions, RoomFilter}, sync::sync_events::{ self, v3::{ @@ -1111,9 +1111,12 @@ async fn load_joined_room( .expect("json can be serialized"), _ => pdu.to_sync_room_event(), }) - .filter(|v| match filter.timeline.url_filter.unwrap_or(true) { - UrlFilter::EventsWithUrl => todo!(), - UrlFilter::EventsWithoutUrl => todo!(), + .filter(|v| { + filter + .timeline + .url_filter + .map(|f| filter::url(v, room_id, f)) + .unwrap_or(true) }) .filter(|v| { filter::senders( diff --git a/src/database/key_value/rooms/search.rs b/src/database/key_value/rooms/search.rs index c2bdfa8c..1c347d76 100644 --- a/src/database/key_value/rooms/search.rs +++ b/src/database/key_value/rooms/search.rs @@ -34,7 +34,7 @@ impl service::rooms::search::Data for KeyValueDatabase { key.push(0xff); key.extend_from_slice(pdu_id); - self.urltokenids.insert(&key, <&[u8]>::default())?; + self.urltokenids.insert(&key, Default::default())?; } Ok(()) @@ -85,15 +85,17 @@ impl service::rooms::search::Data for KeyValueDatabase { Ok(Some((Box::new(common_elements), words))) } - fn contains_url<'a>(&'a self, room_id: &RoomId, pdu_id: &[u8]) -> Result { + fn contains_url(&self, room_id: &RoomId, pdu_id: &[u8]) -> Result { let prefix = services() .rooms .short .get_shortroomid(room_id)? - .expect("room exists") - .to_be_bytes() - .to_vec(); + .expect("room exists"); - todo!() + let mut key = prefix.to_be_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(pdu_id); + + self.urltokenids.get(&key).map(|v| v.is_some()) } } diff --git a/src/service/rooms/search/data.rs b/src/service/rooms/search/data.rs index 7d046fa5..0ec480b1 100644 --- a/src/service/rooms/search/data.rs +++ b/src/service/rooms/search/data.rs @@ -11,5 +11,5 @@ pub trait Data: Send + Sync { search_string: &str, ) -> Result> + 'a>, Vec)>>; - fn contains_url<'a>(&'a self, room_id: &RoomId, pdu_id: &[u8]) -> Result; + fn contains_url(&self, room_id: &RoomId, pdu_id: &[u8]) -> Result; } diff --git a/src/service/rooms/search/mod.rs b/src/service/rooms/search/mod.rs index b6f35e79..39a97e43 100644 --- a/src/service/rooms/search/mod.rs +++ b/src/service/rooms/search/mod.rs @@ -23,4 +23,9 @@ impl Service { ) -> Result> + 'a, Vec)>> { self.db.search_pdus(room_id, search_string) } + + #[tracing::instrument(skip(self))] + pub fn contains_url(&self, room_id: &RoomId, pdu_id: &[u8]) -> Result { + self.db.contains_url(room_id, pdu_id) + } } diff --git a/src/utils/filter.rs b/src/utils/filter.rs index 22740c80..64350aff 100644 --- a/src/utils/filter.rs +++ b/src/utils/filter.rs @@ -1,6 +1,10 @@ -use ruma::{serde::Raw, OwnedRoomId, OwnedUserId}; +use ruma::{ + api::client::filter::UrlFilter, serde::Raw, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, +}; use serde::Deserialize; +use crate::services; + fn inclusion Deserialize<'a> + PartialEq>( event: &Raw, field: &str, @@ -37,3 +41,25 @@ pub fn senders( pub fn types(event: &Raw, types: Option<&Vec>, not_types: &[String]) -> bool { inclusion(event, "event_type", types, not_types) } + +pub fn url(event: &Raw, room_id: &RoomId, filter: UrlFilter) -> bool { + let Ok(Some(pdu_id)) = services().rooms.timeline.get_pdu_id( + &event + .get_field::("event_id") + .expect("event_id can be deserialized") + .expect("event should have event_id"), + ) else { + return filter == UrlFilter::EventsWithoutUrl; + }; + + let contains_url = services() + .rooms + .search + .contains_url(room_id, &pdu_id) + .unwrap_or(false); + + match filter { + UrlFilter::EventsWithUrl => contains_url, + UrlFilter::EventsWithoutUrl => !contains_url, + } +}