mirror of
https://forgejo.ellis.link/continuwuation/continuwuity.git
synced 2025-09-30 18:42:05 +00:00
fix(stateres): Correctly fetch missing auth events for incoming PDUs
This commit is contained in:
parent
902fe7b7ab
commit
c66f6f8900
3 changed files with 90 additions and 35 deletions
|
@ -36,7 +36,7 @@ pub use self::{
|
||||||
room_version::RoomVersion,
|
room_version::RoomVersion,
|
||||||
};
|
};
|
||||||
use crate::{
|
use crate::{
|
||||||
debug, debug_error,
|
debug, debug_error, err,
|
||||||
matrix::{Event, StateKey},
|
matrix::{Event, StateKey},
|
||||||
state_res::room_version::StateResolutionVersion,
|
state_res::room_version::StateResolutionVersion,
|
||||||
trace,
|
trace,
|
||||||
|
@ -319,8 +319,19 @@ where
|
||||||
path.pop();
|
path.pop();
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
let evt = fetch_event(event_id.clone()).await?;
|
trace!(event_id = event_id.as_str(), "fetching event for its auth events");
|
||||||
stack.push(evt.auth_events().map(ToOwned::to_owned).collect());
|
let evt = fetch_event(event_id.clone()).await;
|
||||||
|
if evt.is_none() {
|
||||||
|
err!("could not fetch event {} to calculate conflicted subgraph", event_id);
|
||||||
|
path.pop();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
stack.push(
|
||||||
|
evt.expect("checked")
|
||||||
|
.auth_events()
|
||||||
|
.map(ToOwned::to_owned)
|
||||||
|
.collect(),
|
||||||
|
);
|
||||||
seen.insert(event_id);
|
seen.insert(event_id);
|
||||||
}
|
}
|
||||||
Some(subgraph)
|
Some(subgraph)
|
||||||
|
|
|
@ -4,9 +4,8 @@ use std::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
Event, PduEvent, debug, debug_error, debug_warn, implement,
|
Event, PduEvent, debug, debug_warn, implement, matrix::event::gen_event_id_canonical_json,
|
||||||
matrix::event::gen_event_id_canonical_json, trace, utils::continue_exponential_backoff_secs,
|
trace, utils::continue_exponential_backoff_secs, warn,
|
||||||
warn,
|
|
||||||
};
|
};
|
||||||
use ruma::{
|
use ruma::{
|
||||||
CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName,
|
CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName,
|
||||||
|
@ -52,12 +51,14 @@ where
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut events_with_auth_events = Vec::with_capacity(events.clone().count());
|
let mut events_with_auth_events = Vec::with_capacity(events.clone().count());
|
||||||
|
trace!("Fetching {} outlier pdus", events.clone().count());
|
||||||
|
|
||||||
for id in events {
|
for id in events {
|
||||||
// a. Look in the main timeline (pduid_pdu tree)
|
// a. Look in the main timeline (pduid_pdu tree)
|
||||||
// b. Look at outlier pdu tree
|
// b. Look at outlier pdu tree
|
||||||
// (get_pdu_json checks both)
|
// (get_pdu_json checks both)
|
||||||
if let Ok(local_pdu) = self.services.timeline.get_pdu(id).await {
|
if let Ok(local_pdu) = self.services.timeline.get_pdu(id).await {
|
||||||
|
trace!("Found {id} in main timeline or outlier tree");
|
||||||
events_with_auth_events.push((id.to_owned(), Some(local_pdu), vec![]));
|
events_with_auth_events.push((id.to_owned(), Some(local_pdu), vec![]));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -104,7 +105,7 @@ where
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!("Fetching {next_id} over federation.");
|
debug!("Fetching {next_id} over federation from {origin}.");
|
||||||
match self
|
match self
|
||||||
.services
|
.services
|
||||||
.sending
|
.sending
|
||||||
|
@ -115,7 +116,7 @@ where
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
| Ok(res) => {
|
| Ok(res) => {
|
||||||
debug!("Got {next_id} over federation");
|
debug!("Got {next_id} over federation from {origin}");
|
||||||
let Ok(room_version_id) = get_room_version_id(create_event) else {
|
let Ok(room_version_id) = get_room_version_id(create_event) else {
|
||||||
back_off((*next_id).to_owned());
|
back_off((*next_id).to_owned());
|
||||||
continue;
|
continue;
|
||||||
|
@ -145,6 +146,9 @@ where
|
||||||
auth_event.clone().into(),
|
auth_event.clone().into(),
|
||||||
) {
|
) {
|
||||||
| Ok(auth_event) => {
|
| Ok(auth_event) => {
|
||||||
|
trace!(
|
||||||
|
"Found auth event id {auth_event} for event {next_id}"
|
||||||
|
);
|
||||||
todo_auth_events.push_back(auth_event);
|
todo_auth_events.push_back(auth_event);
|
||||||
},
|
},
|
||||||
| _ => {
|
| _ => {
|
||||||
|
@ -160,7 +164,7 @@ where
|
||||||
events_all.insert(next_id);
|
events_all.insert(next_id);
|
||||||
},
|
},
|
||||||
| Err(e) => {
|
| Err(e) => {
|
||||||
debug_error!("Failed to fetch event {next_id}: {e}");
|
warn!("Failed to fetch auth event {next_id} from {origin}: {e}");
|
||||||
back_off((*next_id).to_owned());
|
back_off((*next_id).to_owned());
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -175,7 +179,7 @@ where
|
||||||
// b. Look at outlier pdu tree
|
// b. Look at outlier pdu tree
|
||||||
// (get_pdu_json checks both)
|
// (get_pdu_json checks both)
|
||||||
if let Some(local_pdu) = local_pdu {
|
if let Some(local_pdu) = local_pdu {
|
||||||
trace!("Found {id} in db");
|
trace!("Found {id} in main timeline or outlier tree");
|
||||||
pdus.push((local_pdu.clone(), None));
|
pdus.push((local_pdu.clone(), None));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -201,6 +205,7 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
trace!("Handling outlier {next_id}");
|
||||||
match Box::pin(self.handle_outlier_pdu(
|
match Box::pin(self.handle_outlier_pdu(
|
||||||
origin,
|
origin,
|
||||||
create_event,
|
create_event,
|
||||||
|
@ -213,6 +218,7 @@ where
|
||||||
{
|
{
|
||||||
| Ok((pdu, json)) =>
|
| Ok((pdu, json)) =>
|
||||||
if next_id == *id {
|
if next_id == *id {
|
||||||
|
trace!("Handled outlier {next_id} (original request)");
|
||||||
pdus.push((pdu, Some(json)));
|
pdus.push((pdu, Some(json)));
|
||||||
},
|
},
|
||||||
| Err(e) => {
|
| Err(e) => {
|
||||||
|
@ -222,6 +228,6 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
trace!("Fetched and handled {} outlier pdus", pdus.len());
|
||||||
pdus
|
pdus
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,11 +1,12 @@
|
||||||
use std::collections::{BTreeMap, HashMap, hash_map};
|
use std::collections::{BTreeMap, HashMap, hash_map};
|
||||||
|
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
Err, Event, PduEvent, Result, debug, debug_info, err, implement, state_res, trace, warn,
|
Err, Event, PduEvent, Result, debug, debug_info, debug_warn, err, implement, state_res, trace,
|
||||||
};
|
};
|
||||||
use futures::future::ready;
|
use futures::future::ready;
|
||||||
use ruma::{
|
use ruma::{
|
||||||
CanonicalJsonObject, CanonicalJsonValue, EventId, RoomId, ServerName, events::StateEventType,
|
CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName,
|
||||||
|
events::StateEventType,
|
||||||
};
|
};
|
||||||
|
|
||||||
use super::{check_room_id, get_room_version_id, to_room_version};
|
use super::{check_room_id, get_room_version_id, to_room_version};
|
||||||
|
@ -74,36 +75,73 @@ where
|
||||||
|
|
||||||
check_room_id(room_id, &pdu_event)?;
|
check_room_id(room_id, &pdu_event)?;
|
||||||
|
|
||||||
if !auth_events_known {
|
// Fetch all auth events
|
||||||
// 4. fetch any missing auth events doing all checks listed here starting at 1.
|
let mut auth_events: HashMap<OwnedEventId, PduEvent> = HashMap::new();
|
||||||
// These are not timeline events
|
|
||||||
// 5. Reject "due to auth events" if can't get all the auth events or some of
|
for aid in pdu_event.auth_events() {
|
||||||
// the auth events are also rejected "due to auth events"
|
if let Ok(auth_event) = self.services.timeline.get_pdu(aid).await {
|
||||||
// NOTE: Step 5 is not applied anymore because it failed too often
|
check_room_id(room_id, &auth_event)?;
|
||||||
debug!("Fetching auth events");
|
trace!("Found auth event {aid} for outlier event {event_id} locally");
|
||||||
Box::pin(self.fetch_and_handle_outliers(
|
auth_events.insert(aid.to_owned(), auth_event);
|
||||||
origin,
|
} else {
|
||||||
pdu_event.auth_events(),
|
debug_warn!("Could not find auth event {aid} for outlier event {event_id} locally");
|
||||||
create_event,
|
}
|
||||||
room_id,
|
}
|
||||||
))
|
|
||||||
.await;
|
// Fetch any missing ones & reject invalid ones
|
||||||
|
let missing_auth_events = if auth_events_known {
|
||||||
|
pdu_event
|
||||||
|
.auth_events()
|
||||||
|
.filter(|id| !auth_events.contains_key(*id))
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
} else {
|
||||||
|
pdu_event.auth_events().collect::<Vec<_>>()
|
||||||
|
};
|
||||||
|
if !missing_auth_events.is_empty() || !auth_events_known {
|
||||||
|
debug_info!(
|
||||||
|
"Fetching {} missing auth events for outlier event {event_id}",
|
||||||
|
missing_auth_events.len()
|
||||||
|
);
|
||||||
|
for (pdu, _) in self
|
||||||
|
.fetch_and_handle_outliers(
|
||||||
|
origin,
|
||||||
|
missing_auth_events.iter().copied(),
|
||||||
|
create_event,
|
||||||
|
room_id,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
auth_events.insert(pdu.event_id().to_owned(), pdu);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
debug!("No missing auth events for outlier event {event_id}");
|
||||||
|
}
|
||||||
|
// reject if we are still missing some
|
||||||
|
let still_missing = pdu_event
|
||||||
|
.auth_events()
|
||||||
|
.filter(|id| !auth_events.contains_key(*id))
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
if !still_missing.is_empty() {
|
||||||
|
return Err!(Request(InvalidParam(
|
||||||
|
"Could not fetch all auth events for outlier event {event_id}, still missing: \
|
||||||
|
{still_missing:?}"
|
||||||
|
)));
|
||||||
}
|
}
|
||||||
|
|
||||||
// 6. Reject "due to auth events" if the event doesn't pass auth based on the
|
// 6. Reject "due to auth events" if the event doesn't pass auth based on the
|
||||||
// auth events
|
// auth events
|
||||||
debug!("Checking based on auth events");
|
debug!("Checking based on auth events");
|
||||||
|
let mut auth_events_by_key: HashMap<_, _> = HashMap::with_capacity(auth_events.len());
|
||||||
// Build map of auth events
|
// Build map of auth events
|
||||||
let mut auth_events = HashMap::with_capacity(pdu_event.auth_events().count());
|
|
||||||
for id in pdu_event.auth_events() {
|
for id in pdu_event.auth_events() {
|
||||||
let Ok(auth_event) = self.services.timeline.get_pdu(id).await else {
|
let auth_event = auth_events
|
||||||
warn!("Could not find auth event {id}");
|
.get(id)
|
||||||
continue;
|
.expect("we just checked that we have all auth events")
|
||||||
};
|
.to_owned();
|
||||||
|
|
||||||
check_room_id(room_id, &auth_event)?;
|
check_room_id(room_id, &auth_event)?;
|
||||||
|
|
||||||
match auth_events.entry((
|
match auth_events_by_key.entry((
|
||||||
auth_event.kind.to_string().into(),
|
auth_event.kind.to_string().into(),
|
||||||
auth_event
|
auth_event
|
||||||
.state_key
|
.state_key
|
||||||
|
@ -123,7 +161,7 @@ where
|
||||||
|
|
||||||
// The original create event must be in the auth events
|
// The original create event must be in the auth events
|
||||||
if !matches!(
|
if !matches!(
|
||||||
auth_events.get(&(StateEventType::RoomCreate, String::new().into())),
|
auth_events_by_key.get(&(StateEventType::RoomCreate, String::new().into())),
|
||||||
Some(_) | None
|
Some(_) | None
|
||||||
) {
|
) {
|
||||||
return Err!(Request(InvalidParam("Incoming event refers to wrong create event.")));
|
return Err!(Request(InvalidParam("Incoming event refers to wrong create event.")));
|
||||||
|
@ -131,7 +169,7 @@ where
|
||||||
|
|
||||||
let state_fetch = |ty: &StateEventType, sk: &str| {
|
let state_fetch = |ty: &StateEventType, sk: &str| {
|
||||||
let key = (ty.to_owned(), sk.into());
|
let key = (ty.to_owned(), sk.into());
|
||||||
ready(auth_events.get(&key).map(ToOwned::to_owned))
|
ready(auth_events_by_key.get(&key).map(ToOwned::to_owned))
|
||||||
};
|
};
|
||||||
|
|
||||||
let auth_check = state_res::event_auth::auth_check(
|
let auth_check = state_res::event_auth::auth_check(
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue