1
0
Fork 0
mirror of https://gitlab.com/famedly/conduit.git synced 2025-08-01 17:38:36 +00:00

feat: apply RoomFilter

This commit is contained in:
mikoto 2024-05-22 23:26:07 +02:00
parent ec48df78da
commit 96074406b1
6 changed files with 126 additions and 221 deletions

View file

@ -109,38 +109,6 @@ pub async fn get_context_route(
let events_before: Vec<_> = events_before
.into_iter()
.map(|(_, pdu)| pdu.to_room_event())
.filter(|event| {
if let Some(types) = &body.filter.types {
types
.iter()
.find(|t| {
t == &&event
.get_field::<String>("type")
.expect("events should deserialize")
.expect("events should have a type")
})
.is_some()
} else {
true
}
})
.filter(|event| {
if !body.filter.not_types.is_empty() {
body
.filter
.not_types
.iter()
.find(|t| {
t == &&event
.get_field::<String>("type")
.expect("events should deserialize")
.expect("events should have a type")
})
.is_none()
} else {
true
}
})
.collect();
let events_after: Vec<_> = services()
@ -194,10 +162,10 @@ pub async fn get_context_route(
.map(|(count, _)| count.stringify())
.unwrap_or_else(|| base_token.stringify());
let events_after = events_after
let events_after: Vec<_> = events_after
.into_iter()
.map(|(_, pdu)| pdu.to_room_event());
// .collect();
.map(|(_, pdu)| pdu.to_room_event())
.collect();
let mut state = Vec::new();

View file

@ -1,11 +1,13 @@
use crate::{
service::{pdu::EventHash, rooms::timeline::PduCount},
services, utils, Error, PduEvent, Result, Ruma, RumaResponse,
services,
utils::{self, filter},
Error, PduEvent, Result, Ruma, RumaResponse,
};
use ruma::{
api::client::{
filter::{FilterDefinition, LazyLoadOptions, RoomFilter},
filter::{EventFormat, FilterDefinition, LazyLoadOptions, RoomFilter},
sync::sync_events::{
self,
v3::{
@ -19,7 +21,7 @@ use ruma::{
},
events::{
room::member::{MembershipState, RoomMemberEventContent},
RoomAccountDataEventType, StateEventType, TimelineEventType,
StateEventType, TimelineEventType,
},
serde::Raw,
uint, DeviceId, EventId, JsOption, OwnedDeviceId, OwnedUserId, RoomId, UInt, UserId,
@ -194,7 +196,7 @@ async fn sync_helper(
.unwrap_or_default(),
};
let event_fields = filter.event_fields.as_ref().map(Vec::as_slice);
let event_fields = filter.event_fields.as_ref();
let (lazy_load_enabled, lazy_load_send_redundant) = match filter.room.state.lazy_load_options {
LazyLoadOptions::Enabled {
@ -233,6 +235,15 @@ async fn sync_helper(
.collect::<Vec<_>>();
for room_id in all_joined_rooms {
let room_id = room_id?;
if filter::rooms(
&Raw::new(&serde_json::json!({"room_id": room_id})).expect("json can be serialized"),
filter.room.rooms.as_ref(),
filter.room.not_rooms.as_ref(),
) {
continue;
}
if let Ok(joined_room) = load_joined_room(
&sender_user,
&sender_device,
@ -245,6 +256,7 @@ async fn sync_helper(
lazy_load_send_redundant,
&filter.room,
event_fields,
&filter.event_format,
full_state,
&mut device_list_updates,
&mut left_encrypted_users,
@ -548,6 +560,7 @@ async fn sync_helper(
knock: BTreeMap::new(), // TODO
},
presence: Presence {
// HashMap<OwnedUserId, PresenceEvent>
events: presence_updates
.into_values()
.map(|v| Raw::new(&v).expect("PresenceEvent always serializes successfully"))
@ -556,6 +569,7 @@ async fn sync_helper(
account_data: GlobalAccountData {
events: services()
.account_data
// HashMap<RoomAccountDataEventType, Raw<AnyRoomAccountDataEvent>>
.changes_since(None, &sender_user, since)?
.into_iter()
.filter_map(|(_, v)| {
@ -614,7 +628,8 @@ async fn load_joined_room(
lazy_load_enabled: bool,
lazy_load_send_redundant: bool,
filter: &RoomFilter,
event_fields: &[String],
_event_fields: Option<&Vec<String>>,
event_format: &EventFormat,
full_state: bool,
device_list_updates: &mut HashSet<OwnedUserId>,
left_encrypted_users: &mut HashSet<OwnedUserId>,
@ -1090,7 +1105,26 @@ async fn load_joined_room(
let room_events: Vec<_> = timeline_pdus
.iter()
.map(|(_, pdu)| pdu.to_sync_room_event())
.map(|(_, pdu)| match event_format {
EventFormat::Federation => Raw::new(pdu)
.map(Raw::cast)
.expect("json can be serialized"),
_ => pdu.to_sync_room_event(),
})
.filter(|v| {
filter::senders(
v,
filter.timeline.senders.as_ref(),
filter.timeline.not_senders.as_ref(),
)
})
.filter(|v| {
filter::types(
v,
filter.timeline.types.as_ref(),
filter.timeline.not_types.as_ref(),
)
})
.collect();
let mut edus: Vec<_> = services()
@ -1099,7 +1133,23 @@ async fn load_joined_room(
.read_receipt
.readreceipts_since(room_id, since)
.filter_map(|r| r.ok()) // Filter out buggy events
// TODO
.map(|(_, _, v)| v)
.filter(|v| {
filter::senders(
v,
filter.ephemeral.senders.as_ref(),
filter.ephemeral.not_senders.as_ref(),
)
})
.filter(|v| {
filter::types(
v,
filter.ephemeral.types.as_ref(),
filter.ephemeral.not_types.as_ref(),
)
})
.take(filter.ephemeral.limit.map_or(10, u64::from).min(100) as usize)
.collect();
if services()
@ -1132,34 +1182,15 @@ async fn load_joined_room(
.account_data
.changes_since(Some(room_id), sender_user, since)?
.into_iter()
// TODO: contains_url
.filter_map(|(et, v)| {
let et = et.to_string();
let (not_types, types) = (
&filter.account_data.not_types,
.filter_map(|(_, v)| serde_json::from_str(v.json().get()).ok())
.filter(|v| {
filter::types(
v,
filter.account_data.types.as_ref(),
);
if not_types.contains(&et)
|| types.map(|v| !v.contains(&et)).unwrap_or_default()
{
return None;
}
serde_json::from_str(v.json().get())
.map_err(|_| Error::bad_database("Invalid account event in database."))
.ok()
filter.account_data.not_types.as_ref(),
)
})
.take(
filter
.account_data
.limit
.map(TryInto::try_into)
.map(Result::ok)
.flatten()
.unwrap_or(usize::MAX),
)
.take(filter.account_data.limit.map_or(10, u64::from).min(100) as usize)
.collect(),
},
summary: RoomSummary {
@ -1174,12 +1205,33 @@ async fn load_joined_room(
timeline: Timeline {
limited: limited || joined_since_last_sync,
prev_batch,
// Vec<Raw<AnySyncTimelineEvent>>
events: room_events,
},
state: State {
events: state_events
.iter()
.map(|pdu| pdu.to_sync_state_event())
.map(|pdu| match event_format {
EventFormat::Federation => Raw::new(pdu)
.map(Raw::cast)
.expect("json can be serialized"),
_ => pdu.to_sync_state_event(),
})
.filter(|v| {
filter::senders(
v,
filter.state.senders.as_ref(),
filter.state.not_senders.as_ref(),
)
})
.filter(|v| {
filter::types(
v,
filter.state.types.as_ref(),
filter.state.not_types.as_ref(),
)
})
.take(filter.state.limit.map_or(10, u64::from).min(100) as usize)
.collect(),
},
ephemeral: Ephemeral { events: edus },

View file

@ -3,7 +3,7 @@ mod data;
pub use data::Data;
use ruma::{
events::{AnyRoomAccountDataEvent, AnyTimelineEvent, RoomAccountDataEventType},
events::{AnyRoomAccountDataEvent, RoomAccountDataEventType},
serde::Raw,
RoomId, UserId,
};

View file

@ -15,7 +15,6 @@ use serde::{Deserialize, Serialize};
use serde_json::{
json,
value::{to_raw_value, RawValue as RawJsonValue},
Map, Value,
};
use std::{cmp::Ordering, collections::BTreeMap, sync::Arc};
use tracing::warn;

View file

@ -1,153 +1,39 @@
// pub fn filter_room_events<'i, I: Iterator<Item = &'i Raw<AnyTimelineEvent>>>(
// events: I,
// sender_user: &UserId,
// sender_device: &DeviceId,
// room_id: Option<&RoomId>,
// filter: RoomEventFilter,
// ) -> crate::Result<Box<dyn Iterator<Item = &'i Raw<AnyTimelineEvent>>>> {
// let (lazy_load_enabled, lazy_load_send_redundant) = match &filter.lazy_load_options {
// LazyLoadOptions::Enabled {
// include_redundant_members,
// } => (true, *include_redundant_members),
// _ => (false, false),
// };
use ruma::{serde::Raw, OwnedRoomId, OwnedUserId};
use serde::Deserialize;
// let it = Box::new(
// events
// .filter(|event| match &filter.rooms {
// None => true,
// Some(rooms) => rooms.iter().any(|r| {
// r.as_str()
// == event
// .get_field::<String>("room_id")
// .expect("room events should deserialize")
// .expect("room events should have a room_id")
// }),
// })
// .filter(|event| match &filter.not_rooms[..] {
// [] => true,
// not_rooms => not_rooms.iter().all(|r| {
// r.as_str()
// != event
// .get_field::<String>("room_id")
// .expect("room events should deserialize")
// .expect("room events should have a room_id")
// }),
// })
// .filter(|event| match &filter.senders {
// None => true,
// Some(rooms) => rooms.iter().any(|r| {
// r.as_str()
// == event
// .get_field::<String>("sender")
// .expect("room events should deserialize")
// .expect("room events should have a sender")
// }),
// })
// .filter(|event| match &filter.not_senders[..] {
// [] => true,
// not_senders => not_senders.iter().all(|r| {
// r.as_str()
// != event
// .get_field::<String>("sender")
// .expect("room events should deserialize")
// .expect("room events should have a sender")
// }),
// })
// .filter(|event| match &filter.types {
// None => true,
// Some(types) => types.iter().any(|t| {
// t.as_str()
// == event
// .get_field::<String>("type")
// .expect("room events should deserialize")
// .expect("room events should have a type")
// }),
// })
// .filter(|event| match &filter.not_types[..] {
// [] => true,
// not_types => not_types.iter().all(|t| {
// t.as_str()
// != event
// .get_field::<String>("type")
// .expect("room events should deserialize")
// .expect("room events should have a type")
// }),
// })
// .filter(|event| {
// let room_id = event
// .get_field::<OwnedRoomId>("room_id")
// .expect("room events should deserialize")
// .expect("room events should have a room_id");
// let event_id = event
// .get_field::<OwnedEventId>("event_id")
// .expect("room events should deserialize")
// .expect("room events should have an event_id");
fn inclusion<T, F: for<'a> Deserialize<'a> + PartialEq>(
event: &Raw<T>,
field: &str,
include: Option<&Vec<F>>,
exclude: &[F],
) -> bool {
let value = event
.get_field::<F>(field)
.expect("room events should deserialize")
.expect("field should exist");
// services()
// .rooms
// .state_accessor
// .user_can_see_event(sender_user, &room_id, &event_id)
// .unwrap_or(false)
// }),
// );
// let memberships = it
// .map(|event| {
// let room_id = event
// .get_field::<OwnedRoomId>("room_id")
// .expect("room events should deserialize")
// .expect("room events should have a room_id");
// let sender = event
// .get_field::<OwnedUserId>("sender")
// .expect("room events should deserialize")
// .expect("room events should have a sender");
// (room_id, sender)
// })
// .flat_map(|(room_id, sender)| {
// services()
// .rooms
// .lazy_loading
// .lazy_load_was_sent_before(sender_user, sender_device, &room_id, &sender)
// .map(|b| {
// if !b || lazy_load_send_redundant {
// Some(sender)
// } else {
// None
// }
// })
// .transpose()
// })
// .collect::<crate::Result<HashSet<_>>>()?;
// Ok(it)
// }
use serde_json::Value;
pub fn event_fields<I: Iterator<Item = String>>(mut json: Value, event_fields: I) -> Value {
let inner = json
.as_object_mut()
.expect("PduEvent should always be an object");
// TODO: testing this properly
for field in event_fields {
let mut paths = field.split('.').peekable();
let mut parent = &mut *inner;
while let Some(key) = paths.next() {
if paths.peek().is_none() {
parent.remove(key);
} else {
parent = parent
.get_mut(key)
.map(Value::as_object_mut)
.flatten()
.unwrap();
}
}
}
json
include
.map(|v| v.iter().any(|item| *item == value))
.unwrap_or(true)
&& exclude.iter().all(|item| *item != value)
}
pub fn rooms<T>(
event: &Raw<T>,
rooms: Option<&Vec<OwnedRoomId>>,
not_rooms: &[OwnedRoomId],
) -> bool {
inclusion(event, "room_id", rooms, not_rooms)
}
pub fn senders<T>(
event: &Raw<T>,
senders: Option<&Vec<OwnedUserId>>,
not_senders: &[OwnedUserId],
) -> bool {
inclusion(event, "sender", senders, not_senders)
}
pub fn types<T>(event: &Raw<T>, types: Option<&Vec<String>>, not_types: &[String]) -> bool {
inclusion(event, "event_type", types, not_types)
}

View file

@ -1,5 +1,5 @@
pub mod error;
mod filter;
pub mod filter;
use argon2::{Config, Variant};
use cmp::Ordering;