1
0
Fork 0
mirror of https://gitlab.com/famedly/conduit.git synced 2025-07-22 17:18:35 +00:00
This commit is contained in:
mikoto 2024-05-07 07:42:10 +02:00
parent a0c4a639b2
commit ec48df78da
7 changed files with 193 additions and 145 deletions

View file

@ -78,7 +78,7 @@ http = "0.2.9"
# Used to find data directory for default db path # Used to find data directory for default db path
directories = "5" directories = "5"
# Used for ruma wrapper # Used for ruma wrapper
serde_json = { version = "1.0.96", features = ["raw_value"] } serde_json = { version = "1.0.96", features = ["raw_value", "preserve_order"] }
# Used for appservice registration files # Used for appservice registration files
serde_yaml = "0.9.21" serde_yaml = "0.9.21"
# Used for pdu definition # Used for pdu definition
@ -157,8 +157,6 @@ tikv-jemallocator = { version = "0.5.0", features = [
sd-notify = { version = "0.4.1", optional = true } sd-notify = { version = "0.4.1", optional = true }
url = "2.5.0"
[dependencies.rocksdb] [dependencies.rocksdb]
features = ["lz4", "multi-threaded-cf", "zstd"] features = ["lz4", "multi-threaded-cf", "zstd"]
optional = true optional = true

View file

@ -5,7 +5,7 @@ use crate::{
use ruma::{ use ruma::{
api::client::{ api::client::{
filter::{FilterDefinition, LazyLoadOptions}, filter::{FilterDefinition, LazyLoadOptions, RoomFilter},
sync::sync_events::{ sync::sync_events::{
self, self,
v3::{ v3::{
@ -19,7 +19,7 @@ use ruma::{
}, },
events::{ events::{
room::member::{MembershipState, RoomMemberEventContent}, room::member::{MembershipState, RoomMemberEventContent},
StateEventType, TimelineEventType, RoomAccountDataEventType, StateEventType, TimelineEventType,
}, },
serde::Raw, serde::Raw,
uint, DeviceId, EventId, JsOption, OwnedDeviceId, OwnedUserId, RoomId, UInt, UserId, uint, DeviceId, EventId, JsOption, OwnedDeviceId, OwnedUserId, RoomId, UInt, UserId,
@ -194,6 +194,8 @@ async fn sync_helper(
.unwrap_or_default(), .unwrap_or_default(),
}; };
let event_fields = filter.event_fields.as_ref().map(Vec::as_slice);
let (lazy_load_enabled, lazy_load_send_redundant) = match filter.room.state.lazy_load_options { let (lazy_load_enabled, lazy_load_send_redundant) = match filter.room.state.lazy_load_options {
LazyLoadOptions::Enabled { LazyLoadOptions::Enabled {
include_redundant_members: redundant, include_redundant_members: redundant,
@ -241,6 +243,8 @@ async fn sync_helper(
next_batchcount, next_batchcount,
lazy_load_enabled, lazy_load_enabled,
lazy_load_send_redundant, lazy_load_send_redundant,
&filter.room,
event_fields,
full_state, full_state,
&mut device_list_updates, &mut device_list_updates,
&mut left_encrypted_users, &mut left_encrypted_users,
@ -289,11 +293,14 @@ async fn sync_helper(
} }
let mut left_rooms = BTreeMap::new(); let mut left_rooms = BTreeMap::new();
let all_left_rooms: Vec<_> = services() let all_left_rooms = match filter.room.include_leave {
false => Vec::with_capacity(0),
true => services()
.rooms .rooms
.state_cache .state_cache
.rooms_left(&sender_user) .rooms_left(&sender_user)
.collect(); .collect(),
};
for result in all_left_rooms { for result in all_left_rooms {
let (room_id, _) = result?; let (room_id, _) = result?;
@ -606,6 +613,8 @@ async fn load_joined_room(
next_batchcount: PduCount, next_batchcount: PduCount,
lazy_load_enabled: bool, lazy_load_enabled: bool,
lazy_load_send_redundant: bool, lazy_load_send_redundant: bool,
filter: &RoomFilter,
event_fields: &[String],
full_state: bool, full_state: bool,
device_list_updates: &mut HashSet<OwnedUserId>, device_list_updates: &mut HashSet<OwnedUserId>,
left_encrypted_users: &mut HashSet<OwnedUserId>, left_encrypted_users: &mut HashSet<OwnedUserId>,
@ -1123,11 +1132,34 @@ async fn load_joined_room(
.account_data .account_data
.changes_since(Some(room_id), sender_user, since)? .changes_since(Some(room_id), sender_user, since)?
.into_iter() .into_iter()
.filter_map(|(_, v)| { // TODO: contains_url
.filter_map(|(et, v)| {
let et = et.to_string();
let (not_types, types) = (
&filter.account_data.not_types,
filter.account_data.types.as_ref(),
);
if not_types.contains(&et)
|| types.map(|v| !v.contains(&et)).unwrap_or_default()
{
return None;
}
serde_json::from_str(v.json().get()) serde_json::from_str(v.json().get())
.map_err(|_| Error::bad_database("Invalid account event in database.")) .map_err(|_| Error::bad_database("Invalid account event in database."))
.ok() .ok()
}) })
.take(
filter
.account_data
.limit
.map(TryInto::try_into)
.map(Result::ok)
.flatten()
.unwrap_or(usize::MAX),
)
.collect(), .collect(),
}, },
summary: RoomSummary { summary: RoomSummary {

View file

@ -2,7 +2,7 @@ use std::collections::HashMap;
use ruma::{ use ruma::{
api::client::error::ErrorKind, api::client::error::ErrorKind,
events::{AnyEphemeralRoomEvent, RoomAccountDataEventType}, events::{AnyRoomAccountDataEvent, RoomAccountDataEventType},
serde::Raw, serde::Raw,
RoomId, UserId, RoomId, UserId,
}; };
@ -101,7 +101,7 @@ impl service::account_data::Data for KeyValueDatabase {
room_id: Option<&RoomId>, room_id: Option<&RoomId>,
user_id: &UserId, user_id: &UserId,
since: u64, since: u64,
) -> Result<HashMap<RoomAccountDataEventType, Raw<AnyEphemeralRoomEvent>>> { ) -> Result<HashMap<RoomAccountDataEventType, Raw<AnyRoomAccountDataEvent>>> {
let mut userdata = HashMap::new(); let mut userdata = HashMap::new();
let mut prefix = room_id let mut prefix = room_id
@ -129,7 +129,7 @@ impl service::account_data::Data for KeyValueDatabase {
)?) )?)
.map_err(|_| Error::bad_database("RoomUserData ID in db is invalid."))?, .map_err(|_| Error::bad_database("RoomUserData ID in db is invalid."))?,
), ),
serde_json::from_slice::<Raw<AnyEphemeralRoomEvent>>(&v).map_err(|_| { serde_json::from_slice::<Raw<AnyRoomAccountDataEvent>>(&v).map_err(|_| {
Error::bad_database("Database contains invalid account data.") Error::bad_database("Database contains invalid account data.")
})?, })?,
)) ))

View file

@ -2,7 +2,7 @@ use std::collections::HashMap;
use crate::Result; use crate::Result;
use ruma::{ use ruma::{
events::{AnyEphemeralRoomEvent, RoomAccountDataEventType}, events::{AnyRoomAccountDataEvent, RoomAccountDataEventType},
serde::Raw, serde::Raw,
RoomId, UserId, RoomId, UserId,
}; };
@ -31,5 +31,5 @@ pub trait Data: Send + Sync {
room_id: Option<&RoomId>, room_id: Option<&RoomId>,
user_id: &UserId, user_id: &UserId,
since: u64, since: u64,
) -> Result<HashMap<RoomAccountDataEventType, Raw<AnyEphemeralRoomEvent>>>; ) -> Result<HashMap<RoomAccountDataEventType, Raw<AnyRoomAccountDataEvent>>>;
} }

View file

@ -3,7 +3,7 @@ mod data;
pub use data::Data; pub use data::Data;
use ruma::{ use ruma::{
events::{AnyEphemeralRoomEvent, RoomAccountDataEventType}, events::{AnyRoomAccountDataEvent, AnyTimelineEvent, RoomAccountDataEventType},
serde::Raw, serde::Raw,
RoomId, UserId, RoomId, UserId,
}; };
@ -47,7 +47,7 @@ impl Service {
room_id: Option<&RoomId>, room_id: Option<&RoomId>,
user_id: &UserId, user_id: &UserId,
since: u64, since: u64,
) -> Result<HashMap<RoomAccountDataEventType, Raw<AnyEphemeralRoomEvent>>> { ) -> Result<HashMap<RoomAccountDataEventType, Raw<AnyRoomAccountDataEvent>>> {
self.db.changes_since(room_id, user_id, since) self.db.changes_since(room_id, user_id, since)
} }
} }

View file

@ -15,6 +15,7 @@ use serde::{Deserialize, Serialize};
use serde_json::{ use serde_json::{
json, json,
value::{to_raw_value, RawValue as RawJsonValue}, value::{to_raw_value, RawValue as RawJsonValue},
Map, Value,
}; };
use std::{cmp::Ordering, collections::BTreeMap, sync::Arc}; use std::{cmp::Ordering, collections::BTreeMap, sync::Arc};
use tracing::warn; use tracing::warn;

View file

@ -1,136 +1,153 @@
use std::collections::HashSet; // pub fn filter_room_events<'i, I: Iterator<Item = &'i Raw<AnyTimelineEvent>>>(
// events: I,
// sender_user: &UserId,
// sender_device: &DeviceId,
// room_id: Option<&RoomId>,
// filter: RoomEventFilter,
// ) -> crate::Result<Box<dyn Iterator<Item = &'i Raw<AnyTimelineEvent>>>> {
// let (lazy_load_enabled, lazy_load_send_redundant) = match &filter.lazy_load_options {
// LazyLoadOptions::Enabled {
// include_redundant_members,
// } => (true, *include_redundant_members),
// _ => (false, false),
// };
use ruma::{ // let it = Box::new(
api::client::filter::{LazyLoadOptions, RoomEventFilter}, // events
events::AnyTimelineEvent, // .filter(|event| match &filter.rooms {
serde::Raw, // None => true,
DeviceId, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UserId, // Some(rooms) => rooms.iter().any(|r| {
}; // r.as_str()
// == event
// .get_field::<String>("room_id")
// .expect("room events should deserialize")
// .expect("room events should have a room_id")
// }),
// })
// .filter(|event| match &filter.not_rooms[..] {
// [] => true,
// not_rooms => not_rooms.iter().all(|r| {
// r.as_str()
// != event
// .get_field::<String>("room_id")
// .expect("room events should deserialize")
// .expect("room events should have a room_id")
// }),
// })
// .filter(|event| match &filter.senders {
// None => true,
// Some(rooms) => rooms.iter().any(|r| {
// r.as_str()
// == event
// .get_field::<String>("sender")
// .expect("room events should deserialize")
// .expect("room events should have a sender")
// }),
// })
// .filter(|event| match &filter.not_senders[..] {
// [] => true,
// not_senders => not_senders.iter().all(|r| {
// r.as_str()
// != event
// .get_field::<String>("sender")
// .expect("room events should deserialize")
// .expect("room events should have a sender")
// }),
// })
// .filter(|event| match &filter.types {
// None => true,
// Some(types) => types.iter().any(|t| {
// t.as_str()
// == event
// .get_field::<String>("type")
// .expect("room events should deserialize")
// .expect("room events should have a type")
// }),
// })
// .filter(|event| match &filter.not_types[..] {
// [] => true,
// not_types => not_types.iter().all(|t| {
// t.as_str()
// != event
// .get_field::<String>("type")
// .expect("room events should deserialize")
// .expect("room events should have a type")
// }),
// })
// .filter(|event| {
// let room_id = event
// .get_field::<OwnedRoomId>("room_id")
// .expect("room events should deserialize")
// .expect("room events should have a room_id");
// let event_id = event
// .get_field::<OwnedEventId>("event_id")
// .expect("room events should deserialize")
// .expect("room events should have an event_id");
use crate::services; // services()
// .rooms
// .state_accessor
// .user_can_see_event(sender_user, &room_id, &event_id)
// .unwrap_or(false)
// }),
// );
pub fn filter_room_events<'i, I: Iterator<Item = &'i Raw<AnyTimelineEvent>>>( // let memberships = it
events: I, // .map(|event| {
sender_user: &UserId, // let room_id = event
sender_device: &DeviceId, // .get_field::<OwnedRoomId>("room_id")
room_id: Option<&RoomId>, // .expect("room events should deserialize")
filter: RoomEventFilter, // .expect("room events should have a room_id");
) -> crate::Result<Box<dyn Iterator<Item = &'i Raw<AnyTimelineEvent>>>> { // let sender = event
let (lazy_load_enabled, lazy_load_send_redundant) = match &filter.lazy_load_options { // .get_field::<OwnedUserId>("sender")
LazyLoadOptions::Enabled { // .expect("room events should deserialize")
include_redundant_members, // .expect("room events should have a sender");
} => (true, *include_redundant_members),
_ => (false, false),
};
let it = Box::new( // (room_id, sender)
events // })
.filter(|event| match &filter.rooms { // .flat_map(|(room_id, sender)| {
None => true, // services()
Some(rooms) => rooms.iter().any(|r| { // .rooms
r.as_str() // .lazy_loading
== event // .lazy_load_was_sent_before(sender_user, sender_device, &room_id, &sender)
.get_field::<String>("room_id") // .map(|b| {
.expect("room events should deserialize") // if !b || lazy_load_send_redundant {
.expect("room events should have a room_id") // Some(sender)
}), // } else {
}) // None
.filter(|event| match &filter.not_rooms[..] { // }
[] => true, // })
not_rooms => not_rooms.iter().all(|r| { // .transpose()
r.as_str() // })
!= event // .collect::<crate::Result<HashSet<_>>>()?;
.get_field::<String>("room_id")
.expect("room events should deserialize")
.expect("room events should have a room_id")
}),
})
.filter(|event| match &filter.senders {
None => true,
Some(rooms) => rooms.iter().any(|r| {
r.as_str()
== event
.get_field::<String>("sender")
.expect("room events should deserialize")
.expect("room events should have a sender")
}),
})
.filter(|event| match &filter.not_senders[..] {
[] => true,
not_senders => not_senders.iter().all(|r| {
r.as_str()
!= event
.get_field::<String>("sender")
.expect("room events should deserialize")
.expect("room events should have a sender")
}),
})
.filter(|event| match &filter.types {
None => true,
Some(types) => types.iter().any(|t| {
t.as_str()
== event
.get_field::<String>("type")
.expect("room events should deserialize")
.expect("room events should have a type")
}),
})
.filter(|event| match &filter.not_types[..] {
[] => true,
not_types => not_types.iter().all(|t| {
t.as_str()
!= event
.get_field::<String>("type")
.expect("room events should deserialize")
.expect("room events should have a type")
}),
})
.filter(|event| {
let room_id = event
.get_field::<OwnedRoomId>("room_id")
.expect("room events should deserialize")
.expect("room events should have a room_id");
let event_id = event
.get_field::<OwnedEventId>("event_id")
.expect("room events should deserialize")
.expect("room events should have an event_id");
services() // Ok(it)
.rooms // }
.state_accessor
.user_can_see_event(sender_user, &room_id, &event_id)
.unwrap_or(false)
}),
);
let memberships = it use serde_json::Value;
.map(|event| {
let room_id = event
.get_field::<OwnedRoomId>("room_id")
.expect("room events should deserialize")
.expect("room events should have a room_id");
let sender = event
.get_field::<OwnedUserId>("sender")
.expect("room events should deserialize")
.expect("room events should have a sender");
(room_id, sender) pub fn event_fields<I: Iterator<Item = String>>(mut json: Value, event_fields: I) -> Value {
}) let inner = json
.flat_map(|(room_id, sender)| { .as_object_mut()
services() .expect("PduEvent should always be an object");
.rooms
.lazy_loading // TODO: testing this properly
.lazy_load_was_sent_before(sender_user, sender_device, &room_id, &sender) for field in event_fields {
.map(|b| { let mut paths = field.split('.').peekable();
if !b || lazy_load_send_redundant { let mut parent = &mut *inner;
Some(sender)
while let Some(key) = paths.next() {
if paths.peek().is_none() {
parent.remove(key);
} else { } else {
None parent = parent
.get_mut(key)
.map(Value::as_object_mut)
.flatten()
.unwrap();
}
}
} }
})
.transpose()
})
.collect::<crate::Result<HashSet<_>>>()?;
Ok(it) json
} }