1
0
Fork 0
mirror of https://forgejo.ellis.link/continuwuation/continuwuity.git synced 2025-09-30 18:42:05 +00:00

Eliminate associated Id type from trait Event.

Co-authored-by: Jade Ellis <jade@ellis.link>
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2025-04-22 11:00:55 +00:00 committed by Jade Ellis
parent 44302ce732
commit f605913ea9
No known key found for this signature in database
GPG key ID: 8705A2A3EBF77BD2
7 changed files with 116 additions and 130 deletions

View file

@ -20,7 +20,7 @@ use std::{
use futures::{Future, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future};
use ruma::{
EventId, Int, MilliSecondsSinceUnixEpoch, RoomVersionId,
EventId, Int, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomVersionId,
events::{
StateEventType, TimelineEventType,
room::member::{MembershipState, RoomMemberEventContent},
@ -39,9 +39,7 @@ use crate::{
debug, debug_error,
matrix::{event::Event, pdu::StateKey},
trace,
utils::stream::{
BroadbandExt, IterStream, ReadyExt, TryBroadbandExt, TryReadyExt, WidebandExt,
},
utils::stream::{BroadbandExt, IterStream, ReadyExt, TryBroadbandExt, WidebandExt},
warn,
};
@ -79,20 +77,19 @@ type Result<T, E = Error> = crate::Result<T, E>;
pub async fn resolve<'a, E, Sets, SetIter, Hasher, Fetch, FetchFut, Exists, ExistsFut>(
room_version: &RoomVersionId,
state_sets: Sets,
auth_chain_sets: &'a [HashSet<E::Id, Hasher>],
auth_chain_sets: &'a [HashSet<OwnedEventId, Hasher>],
event_fetch: &Fetch,
event_exists: &Exists,
) -> Result<StateMap<E::Id>>
) -> Result<StateMap<OwnedEventId>>
where
Fetch: Fn(E::Id) -> FetchFut + Sync,
Fetch: Fn(OwnedEventId) -> FetchFut + Sync,
FetchFut: Future<Output = Option<E>> + Send,
Exists: Fn(E::Id) -> ExistsFut + Sync,
Exists: Fn(OwnedEventId) -> ExistsFut + Sync,
ExistsFut: Future<Output = bool> + Send,
Sets: IntoIterator<IntoIter = SetIter> + Send,
SetIter: Iterator<Item = &'a StateMap<E::Id>> + Clone + Send,
SetIter: Iterator<Item = &'a StateMap<OwnedEventId>> + Clone + Send,
Hasher: BuildHasher + Send + Sync,
E: Event + Clone + Send + Sync,
E::Id: Borrow<EventId> + Send + Sync,
for<'b> &'b E: Send,
{
debug!("State resolution starting");
@ -153,7 +150,7 @@ where
// Sequentially auth check each control event.
let resolved_control = iterative_auth_check(
&room_version,
sorted_control_levels.iter().stream(),
sorted_control_levels.iter().stream().map(AsRef::as_ref),
clean.clone(),
&event_fetch,
)
@ -170,7 +167,7 @@ where
// that failed auth
let events_to_resolve: Vec<_> = all_conflicted
.iter()
.filter(|&id| !deduped_power_ev.contains(id.borrow()))
.filter(|&id| !deduped_power_ev.contains(id))
.cloned()
.collect();
@ -190,7 +187,7 @@ where
let mut resolved_state = iterative_auth_check(
&room_version,
sorted_left_events.iter().stream(),
sorted_left_events.iter().stream().map(AsRef::as_ref),
resolved_control, // The control events are added to the final resolved state
&event_fetch,
)
@ -283,15 +280,14 @@ where
/// earlier (further back in time) origin server timestamp.
#[tracing::instrument(level = "debug", skip_all)]
async fn reverse_topological_power_sort<E, F, Fut>(
events_to_sort: Vec<E::Id>,
auth_diff: &HashSet<E::Id>,
events_to_sort: Vec<OwnedEventId>,
auth_diff: &HashSet<OwnedEventId>,
fetch_event: &F,
) -> Result<Vec<E::Id>>
) -> Result<Vec<OwnedEventId>>
where
F: Fn(E::Id) -> Fut + Sync,
F: Fn(OwnedEventId) -> Fut + Sync,
Fut: Future<Output = Option<E>> + Send,
E: Event + Send + Sync,
E::Id: Borrow<EventId> + Send + Sync,
{
debug!("reverse topological sort of power events");
@ -303,6 +299,7 @@ where
// This is used in the `key_fn` passed to the lexico_topo_sort fn
let event_to_pl: HashMap<_, _> = graph
.keys()
.cloned()
.stream()
.broad_filter_map(async |event_id| {
let pl = get_power_level_for_sender(&event_id, fetch_event)
@ -321,14 +318,15 @@ where
.boxed()
.await;
let event_to_pl = &event_to_pl;
let fetcher = |event_id: E::Id| async move {
let fetcher = async |event_id: OwnedEventId| {
let pl = *event_to_pl
.get(event_id.borrow())
.get(&event_id)
.ok_or_else(|| Error::NotFound(String::new()))?;
let ev = fetch_event(event_id)
.await
.ok_or_else(|| Error::NotFound(String::new()))?;
Ok((pl, ev.origin_server_ts()))
};
@ -465,18 +463,17 @@ where
/// the eventId at the eventId's generation (we walk backwards to `EventId`s
/// most recent previous power level event).
async fn get_power_level_for_sender<E, F, Fut>(
event_id: E::Id,
event_id: &EventId,
fetch_event: &F,
) -> serde_json::Result<Int>
where
F: Fn(E::Id) -> Fut + Sync,
F: Fn(OwnedEventId) -> Fut + Sync,
Fut: Future<Output = Option<E>> + Send,
E: Event + Send,
E::Id: Borrow<EventId> + Send,
{
debug!("fetch event ({event_id}) senders power level");
let event = fetch_event(event_id).await;
let event = fetch_event(event_id.to_owned()).await;
let auth_events = event.as_ref().map(Event::auth_events);
@ -484,7 +481,7 @@ where
.into_iter()
.flatten()
.stream()
.broadn_filter_map(5, |aid| fetch_event(aid.clone()))
.broadn_filter_map(5, |aid| fetch_event(aid.to_owned()))
.ready_find(|aev| is_type_and_key(aev, &TimelineEventType::RoomPowerLevels, ""))
.await;
@ -517,14 +514,13 @@ where
async fn iterative_auth_check<'a, E, F, Fut, S>(
room_version: &RoomVersion,
events_to_check: S,
unconflicted_state: StateMap<E::Id>,
unconflicted_state: StateMap<OwnedEventId>,
fetch_event: &F,
) -> Result<StateMap<E::Id>>
) -> Result<StateMap<OwnedEventId>>
where
F: Fn(E::Id) -> Fut + Sync,
F: Fn(OwnedEventId) -> Fut + Sync,
Fut: Future<Output = Option<E>> + Send,
E::Id: Borrow<EventId> + Clone + Eq + Ord + Send + Sync + 'a,
S: Stream<Item = &'a E::Id> + Send + 'a,
S: Stream<Item = &'a EventId> + Send + 'a,
E: Event + Clone + Send + Sync,
{
debug!("starting iterative auth check");
@ -532,7 +528,7 @@ where
let events_to_check: Vec<_> = events_to_check
.map(Result::Ok)
.broad_and_then(async |event_id| {
fetch_event(event_id.clone())
fetch_event(event_id.to_owned())
.await
.ok_or_else(|| Error::NotFound(format!("Failed to find {event_id}")))
})
@ -540,16 +536,16 @@ where
.boxed()
.await?;
let auth_event_ids: HashSet<E::Id> = events_to_check
let auth_event_ids: HashSet<OwnedEventId> = events_to_check
.iter()
.flat_map(|event: &E| event.auth_events().map(Clone::clone))
.flat_map(|event: &E| event.auth_events().map(ToOwned::to_owned))
.collect();
let auth_events: HashMap<E::Id, E> = auth_event_ids
let auth_events: HashMap<OwnedEventId, E> = auth_event_ids
.into_iter()
.stream()
.broad_filter_map(fetch_event)
.map(|auth_event| (auth_event.event_id().clone(), auth_event))
.map(|auth_event| (auth_event.event_id().to_owned(), auth_event))
.collect()
.boxed()
.await;
@ -570,7 +566,7 @@ where
let mut auth_state = StateMap::new();
for aid in event.auth_events() {
if let Some(ev) = auth_events.get(aid.borrow()) {
if let Some(ev) = auth_events.get(aid) {
//TODO: synapse checks "rejected_reason" which is most likely related to
// soft-failing
auth_state.insert(
@ -581,7 +577,7 @@ where
ev.clone(),
);
} else {
warn!(event_id = aid.borrow().as_str(), "missing auth event");
warn!(event_id = aid.as_str(), "missing auth event");
}
}
@ -590,7 +586,7 @@ where
.stream()
.ready_filter_map(|key| Some((key, resolved_state.get(key)?)))
.filter_map(|(key, ev_id)| async move {
if let Some(event) = auth_events.get(ev_id.borrow()) {
if let Some(event) = auth_events.get(ev_id) {
Some((key, event.clone()))
} else {
Some((key, fetch_event(ev_id.clone()).await?))
@ -622,7 +618,7 @@ where
// add event to resolved state map
resolved_state.insert(
event.event_type().with_state_key(state_key),
event.event_id().clone(),
event.event_id().to_owned(),
);
},
| Ok(false) => {
@ -649,15 +645,14 @@ where
/// level as a parent) will be marked as depth 1. depth 1 is "older" than depth
/// 0.
async fn mainline_sort<E, F, Fut>(
to_sort: &[E::Id],
resolved_power_level: Option<E::Id>,
to_sort: &[OwnedEventId],
resolved_power_level: Option<OwnedEventId>,
fetch_event: &F,
) -> Result<Vec<E::Id>>
) -> Result<Vec<OwnedEventId>>
where
F: Fn(E::Id) -> Fut + Sync,
F: Fn(OwnedEventId) -> Fut + Sync,
Fut: Future<Output = Option<E>> + Send,
E: Event + Clone + Send + Sync,
E::Id: Borrow<EventId> + Clone + Send + Sync,
{
debug!("mainline sort of events");
@ -677,7 +672,7 @@ where
pl = None;
for aid in event.auth_events() {
let ev = fetch_event(aid.clone())
let ev = fetch_event(aid.to_owned())
.await
.ok_or_else(|| Error::NotFound(format!("Failed to find {aid}")))?;
@ -723,26 +718,25 @@ where
/// that has an associated mainline depth.
async fn get_mainline_depth<E, F, Fut>(
mut event: Option<E>,
mainline_map: &HashMap<E::Id, usize>,
mainline_map: &HashMap<OwnedEventId, usize>,
fetch_event: &F,
) -> Result<usize>
where
F: Fn(E::Id) -> Fut + Sync,
F: Fn(OwnedEventId) -> Fut + Sync,
Fut: Future<Output = Option<E>> + Send,
E: Event + Send + Sync,
E::Id: Borrow<EventId> + Send + Sync,
{
while let Some(sort_ev) = event {
debug!(event_id = sort_ev.event_id().borrow().as_str(), "mainline");
debug!(event_id = sort_ev.event_id().as_str(), "mainline");
let id = sort_ev.event_id();
if let Some(depth) = mainline_map.get(id.borrow()) {
if let Some(depth) = mainline_map.get(id) {
return Ok(*depth);
}
event = None;
for aid in sort_ev.auth_events() {
let aev = fetch_event(aid.clone())
let aev = fetch_event(aid.to_owned())
.await
.ok_or_else(|| Error::NotFound(format!("Failed to find {aid}")))?;
@ -757,15 +751,14 @@ where
}
async fn add_event_and_auth_chain_to_graph<E, F, Fut>(
graph: &mut HashMap<E::Id, HashSet<E::Id>>,
event_id: E::Id,
auth_diff: &HashSet<E::Id>,
graph: &mut HashMap<OwnedEventId, HashSet<OwnedEventId>>,
event_id: OwnedEventId,
auth_diff: &HashSet<OwnedEventId>,
fetch_event: &F,
) where
F: Fn(E::Id) -> Fut + Sync,
F: Fn(OwnedEventId) -> Fut + Sync,
Fut: Future<Output = Option<E>> + Send,
E: Event + Send + Sync,
E::Id: Borrow<EventId> + Clone + Send + Sync,
{
let mut state = vec![event_id];
while let Some(eid) = state.pop() {
@ -775,26 +768,27 @@ async fn add_event_and_auth_chain_to_graph<E, F, Fut>(
// Prefer the store to event as the store filters dedups the events
for aid in auth_events {
if auth_diff.contains(aid.borrow()) {
if !graph.contains_key(aid.borrow()) {
if auth_diff.contains(aid) {
if !graph.contains_key(aid) {
state.push(aid.to_owned());
}
// We just inserted this at the start of the while loop
graph.get_mut(eid.borrow()).unwrap().insert(aid.to_owned());
graph
.get_mut(&eid)
.expect("We just inserted this at the start of the while loop")
.insert(aid.to_owned());
}
}
}
}
async fn is_power_event_id<E, F, Fut>(event_id: &E::Id, fetch: &F) -> bool
async fn is_power_event_id<E, F, Fut>(event_id: &EventId, fetch: &F) -> bool
where
F: Fn(E::Id) -> Fut + Sync,
F: Fn(OwnedEventId) -> Fut + Sync,
Fut: Future<Output = Option<E>> + Send,
E: Event + Send,
E::Id: Borrow<EventId> + Send + Sync,
{
match fetch(event_id.clone()).await.as_ref() {
match fetch(event_id.to_owned()).await.as_ref() {
| Some(state) => is_power_event(state),
| _ => false,
}
@ -904,7 +898,7 @@ mod tests {
let resolved_power = super::iterative_auth_check(
&RoomVersion::V6,
sorted_power_events.iter().stream(),
sorted_power_events.iter().map(AsRef::as_ref).stream(),
HashMap::new(), // unconflicted events
&fetcher,
)
@ -1289,7 +1283,7 @@ mod tests {
let ev_map = store.0.clone();
let fetcher = |id| ready(ev_map.get(&id).cloned());
let exists = |id: <PduEvent as Event>::Id| ready(ev_map.get(&*id).is_some());
let exists = |id: OwnedEventId| ready(ev_map.get(&*id).is_some());
let state_sets = [state_at_bob, state_at_charlie];
let auth_chain: Vec<_> = state_sets